moqtap_proxy/parser/data.rs
1//! Inline data stream parser.
2//!
3//! Parses subgroup/fetch stream headers and object headers from
4//! forwarded unidirectional stream bytes.
5
6use bytes::{Buf, BytesMut};
7
8use moqtap_codec::dispatch::{AnyFetchHeader, AnyObjectHeader, AnySubgroupHeader};
9use moqtap_codec::version::DraftVersion;
10
11use crate::event::DataStreamHeaderKind;
12
13/// The expected type of data stream.
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum DataStreamType {
16 /// Subgroup data stream (most common).
17 Subgroup,
18 /// Fetch response data stream.
19 Fetch,
20}
21
22/// Parsing state for a unidirectional data stream.
23#[derive(Debug)]
24enum DataStreamState {
25 /// Waiting for the stream header.
26 AwaitingHeader,
27 /// Header parsed, waiting for object headers.
28 InStream,
29}
30
31/// Result of parsing data stream bytes.
32///
33/// The proxy forwards data stream bytes verbatim; raw wire bytes for
34/// headers are not exposed here because nothing in the forwarding path
35/// needs them. If byte-level mutation of data streams is ever required,
36/// add a dedicated variant or a capturing parser mode.
37#[derive(Debug)]
38pub enum DataParseResult {
39 /// A stream header was parsed.
40 Header(DataStreamHeaderKind),
41 /// An object header was parsed.
42 Object(AnyObjectHeader),
43 /// Need more data.
44 NeedMore,
45 /// Parse error (non-fatal — forwarding continues).
46 Error(String),
47}
48
49/// Stateful inline parser for a MoQT data stream.
50///
51/// Accepts raw byte chunks and emits parsed headers and object headers.
52/// The parser buffers partial data and tries to decode when enough bytes
53/// are available.
54pub struct DataStreamParser {
55 buf: BytesMut,
56 stream_type: DataStreamType,
57 state: DataStreamState,
58 draft: DraftVersion,
59}
60
61impl DataStreamParser {
62 /// Create a new parser for the given stream type and draft version.
63 pub fn new(stream_type: DataStreamType, draft: DraftVersion) -> Self {
64 Self {
65 buf: BytesMut::with_capacity(4096),
66 stream_type,
67 state: DataStreamState::AwaitingHeader,
68 draft,
69 }
70 }
71
72 /// Feed raw bytes into the parser.
73 ///
74 /// Returns a list of parsed results. May return multiple results if
75 /// the data contains several complete items.
76 pub fn feed(&mut self, data: &[u8]) -> Vec<DataParseResult> {
77 self.buf.extend_from_slice(data);
78 let mut results = Vec::new();
79
80 loop {
81 if self.buf.is_empty() {
82 break;
83 }
84
85 match self.state {
86 DataStreamState::AwaitingHeader => match self.try_parse_header() {
87 Some(Ok(header)) => {
88 self.state = DataStreamState::InStream;
89 results.push(DataParseResult::Header(header));
90 }
91 Some(Err(e)) => {
92 results.push(DataParseResult::Error(e));
93 break;
94 }
95 None => {
96 if results.is_empty() {
97 results.push(DataParseResult::NeedMore);
98 }
99 break;
100 }
101 },
102 DataStreamState::InStream => match self.try_parse_object() {
103 Some(Ok(header)) => {
104 results.push(DataParseResult::Object(header));
105 }
106 Some(Err(e)) => {
107 results.push(DataParseResult::Error(e));
108 break;
109 }
110 None => {
111 if results.is_empty() {
112 results.push(DataParseResult::NeedMore);
113 }
114 break;
115 }
116 },
117 }
118 }
119
120 results
121 }
122
123 /// Try to parse the stream header from the buffer.
124 fn try_parse_header(&mut self) -> Option<Result<DataStreamHeaderKind, String>> {
125 let snapshot = &self.buf[..];
126 let mut cursor = snapshot;
127
128 match self.stream_type {
129 DataStreamType::Subgroup => match AnySubgroupHeader::decode(self.draft, &mut cursor) {
130 Ok(header) => {
131 let consumed = snapshot.len() - cursor.remaining();
132 self.buf.advance(consumed);
133 Some(Ok(DataStreamHeaderKind::Subgroup(header)))
134 }
135 Err(e) => {
136 if is_incomplete_error(&e) {
137 None
138 } else {
139 Some(Err(format!("subgroup header decode: {e}")))
140 }
141 }
142 },
143 DataStreamType::Fetch => match AnyFetchHeader::decode(self.draft, &mut cursor) {
144 Ok(header) => {
145 let consumed = snapshot.len() - cursor.remaining();
146 self.buf.advance(consumed);
147 Some(Ok(DataStreamHeaderKind::Fetch(header)))
148 }
149 Err(e) => {
150 if is_incomplete_error(&e) {
151 None
152 } else {
153 Some(Err(format!("fetch header decode: {e}")))
154 }
155 }
156 },
157 }
158 }
159
160 /// Try to parse an object header from the buffer.
161 ///
162 /// Note: drafts 14-17 do not have a standalone `AnyObjectHeader`
163 /// variant — their subgroup objects use delta-encoded object IDs
164 /// and header-typed extension/properties flags that require stateful
165 /// per-stream context (a `SubgroupObjectReader`). Proxy-side parsing
166 /// of those drafts' object streams would need to track that state
167 /// per stream; that work is out of scope here, and this path will
168 /// return `CodecError::UnsupportedDraft` for drafts 14-17.
169 fn try_parse_object(&mut self) -> Option<Result<AnyObjectHeader, String>> {
170 let snapshot = &self.buf[..];
171 let mut cursor = snapshot;
172
173 match AnyObjectHeader::decode(self.draft, &mut cursor) {
174 Ok(header) => {
175 let consumed = snapshot.len() - cursor.remaining();
176 self.buf.advance(consumed);
177 Some(Ok(header))
178 }
179 Err(e) => {
180 if is_incomplete_error(&e) {
181 None
182 } else {
183 Some(Err(format!("object header decode: {e}")))
184 }
185 }
186 }
187 }
188}
189
190/// Check if a codec error indicates incomplete data (need more bytes).
191fn is_incomplete_error(e: &moqtap_codec::error::CodecError) -> bool {
192 matches!(e, moqtap_codec::error::CodecError::UnexpectedEnd)
193 || matches!(
194 e,
195 moqtap_codec::error::CodecError::VarInt(
196 moqtap_codec::varint::VarIntError::UnexpectedEnd
197 )
198 )
199}