Skip to main content

moqtap_proxy/parser/
data.rs

1//! Inline data stream parser.
2//!
3//! Parses subgroup/fetch stream headers and object headers from
4//! forwarded unidirectional stream bytes.
5
6use bytes::{Buf, BytesMut};
7
8use moqtap_codec::dispatch::{AnyFetchHeader, AnyObjectHeader, AnySubgroupHeader};
9use moqtap_codec::version::DraftVersion;
10
11use crate::event::DataStreamHeaderKind;
12
13/// The expected type of data stream.
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum DataStreamType {
16    /// Subgroup data stream (most common).
17    Subgroup,
18    /// Fetch response data stream.
19    Fetch,
20}
21
22/// Parsing state for a unidirectional data stream.
23#[derive(Debug)]
24enum DataStreamState {
25    /// Waiting for the stream header.
26    AwaitingHeader,
27    /// Header parsed, waiting for object headers.
28    InStream,
29}
30
31/// Result of parsing data stream bytes.
32///
33/// The proxy forwards data stream bytes verbatim; raw wire bytes for
34/// headers are not exposed here because nothing in the forwarding path
35/// needs them. If byte-level mutation of data streams is ever required,
36/// add a dedicated variant or a capturing parser mode.
37#[derive(Debug)]
38pub enum DataParseResult {
39    /// A stream header was parsed.
40    Header(DataStreamHeaderKind),
41    /// An object header was parsed.
42    Object(AnyObjectHeader),
43    /// Need more data.
44    NeedMore,
45    /// Parse error (non-fatal — forwarding continues).
46    Error(String),
47}
48
49/// Stateful inline parser for a MoQT data stream.
50///
51/// Accepts raw byte chunks and emits parsed headers and object headers.
52/// The parser buffers partial data and tries to decode when enough bytes
53/// are available.
54pub struct DataStreamParser {
55    buf: BytesMut,
56    stream_type: DataStreamType,
57    state: DataStreamState,
58    draft: DraftVersion,
59}
60
61impl DataStreamParser {
62    /// Create a new parser for the given stream type and draft version.
63    pub fn new(stream_type: DataStreamType, draft: DraftVersion) -> Self {
64        Self {
65            buf: BytesMut::with_capacity(4096),
66            stream_type,
67            state: DataStreamState::AwaitingHeader,
68            draft,
69        }
70    }
71
72    /// Feed raw bytes into the parser.
73    ///
74    /// Returns a list of parsed results. May return multiple results if
75    /// the data contains several complete items.
76    pub fn feed(&mut self, data: &[u8]) -> Vec<DataParseResult> {
77        self.buf.extend_from_slice(data);
78        let mut results = Vec::new();
79
80        loop {
81            if self.buf.is_empty() {
82                break;
83            }
84
85            match self.state {
86                DataStreamState::AwaitingHeader => match self.try_parse_header() {
87                    Some(Ok(header)) => {
88                        self.state = DataStreamState::InStream;
89                        results.push(DataParseResult::Header(header));
90                    }
91                    Some(Err(e)) => {
92                        results.push(DataParseResult::Error(e));
93                        break;
94                    }
95                    None => {
96                        if results.is_empty() {
97                            results.push(DataParseResult::NeedMore);
98                        }
99                        break;
100                    }
101                },
102                DataStreamState::InStream => match self.try_parse_object() {
103                    Some(Ok(header)) => {
104                        results.push(DataParseResult::Object(header));
105                    }
106                    Some(Err(e)) => {
107                        results.push(DataParseResult::Error(e));
108                        break;
109                    }
110                    None => {
111                        if results.is_empty() {
112                            results.push(DataParseResult::NeedMore);
113                        }
114                        break;
115                    }
116                },
117            }
118        }
119
120        results
121    }
122
123    /// Try to parse the stream header from the buffer.
124    fn try_parse_header(&mut self) -> Option<Result<DataStreamHeaderKind, String>> {
125        let snapshot = &self.buf[..];
126        let mut cursor = snapshot;
127
128        match self.stream_type {
129            DataStreamType::Subgroup => match AnySubgroupHeader::decode(self.draft, &mut cursor) {
130                Ok(header) => {
131                    let consumed = snapshot.len() - cursor.remaining();
132                    self.buf.advance(consumed);
133                    Some(Ok(DataStreamHeaderKind::Subgroup(header)))
134                }
135                Err(e) => {
136                    if is_incomplete_error(&e) {
137                        None
138                    } else {
139                        Some(Err(format!("subgroup header decode: {e}")))
140                    }
141                }
142            },
143            DataStreamType::Fetch => match AnyFetchHeader::decode(self.draft, &mut cursor) {
144                Ok(header) => {
145                    let consumed = snapshot.len() - cursor.remaining();
146                    self.buf.advance(consumed);
147                    Some(Ok(DataStreamHeaderKind::Fetch(header)))
148                }
149                Err(e) => {
150                    if is_incomplete_error(&e) {
151                        None
152                    } else {
153                        Some(Err(format!("fetch header decode: {e}")))
154                    }
155                }
156            },
157        }
158    }
159
160    /// Try to parse an object header from the buffer.
161    ///
162    /// Note: drafts 14-17 do not have a standalone `AnyObjectHeader`
163    /// variant — their subgroup objects use delta-encoded object IDs
164    /// and header-typed extension/properties flags that require stateful
165    /// per-stream context (a `SubgroupObjectReader`). Proxy-side parsing
166    /// of those drafts' object streams would need to track that state
167    /// per stream; that work is out of scope here, and this path will
168    /// return `CodecError::UnsupportedDraft` for drafts 14-17.
169    fn try_parse_object(&mut self) -> Option<Result<AnyObjectHeader, String>> {
170        let snapshot = &self.buf[..];
171        let mut cursor = snapshot;
172
173        match AnyObjectHeader::decode(self.draft, &mut cursor) {
174            Ok(header) => {
175                let consumed = snapshot.len() - cursor.remaining();
176                self.buf.advance(consumed);
177                Some(Ok(header))
178            }
179            Err(e) => {
180                if is_incomplete_error(&e) {
181                    None
182                } else {
183                    Some(Err(format!("object header decode: {e}")))
184                }
185            }
186        }
187    }
188}
189
190/// Check if a codec error indicates incomplete data (need more bytes).
191fn is_incomplete_error(e: &moqtap_codec::error::CodecError) -> bool {
192    matches!(e, moqtap_codec::error::CodecError::UnexpectedEnd)
193        || matches!(
194            e,
195            moqtap_codec::error::CodecError::VarInt(
196                moqtap_codec::varint::VarIntError::UnexpectedEnd
197            )
198        )
199}