Skip to main content

moqtap_proxy/parser/
control.rs

1//! Inline control stream parser.
2//!
3//! Buffers raw bytes from the forwarding loop and decodes complete MoQT
4//! control messages without modifying the forwarded data.
5
6use bytes::{Buf, Bytes, BytesMut};
7
8use moqtap_codec::dispatch::AnyControlMessage;
9use moqtap_codec::varint::VarInt;
10use moqtap_codec::version::DraftVersion;
11
12/// A successfully parsed control frame.
13///
14/// `raw_bytes` is populated only when the parser was constructed with
15/// [`ControlStreamParser::new_capturing`]; the default observation-only
16/// parser leaves it as `None` to avoid copying bytes that already flow
17/// through the forwarding path.
18#[derive(Debug, Clone)]
19pub struct ParsedFrame {
20    /// The decoded control message.
21    pub message: AnyControlMessage,
22    /// The original wire bytes of this frame — only set when the parser
23    /// is in capturing mode (used by hook-driven mutation).
24    pub raw_bytes: Option<Bytes>,
25}
26
27/// Result of feeding bytes to the control stream parser.
28#[derive(Debug)]
29pub enum ParseResult {
30    /// One or more complete messages were decoded.
31    Messages(Vec<ParsedFrame>),
32    /// Need more data — bytes are buffered internally.
33    NeedMore,
34}
35
36/// Stateful inline parser for a MoQT control stream.
37///
38/// Accepts raw byte chunks (as they arrive from `RecvStream::read`),
39/// buffers them, and emits complete `ParsedFrame`s. In the default
40/// (non-capturing) mode the parser does not clone the frame bytes; in
41/// capturing mode it does, so a hook can rewrite the frame before the
42/// proxy forwards it.
43pub struct ControlStreamParser {
44    buf: BytesMut,
45    draft: DraftVersion,
46    capture_raw: bool,
47}
48
49impl ControlStreamParser {
50    /// Create a new observation-only parser.
51    ///
52    /// `ParsedFrame::raw_bytes` will be `None`; use
53    /// [`Self::new_capturing`] when a hook needs to mutate frames.
54    pub fn new(draft: DraftVersion) -> Self {
55        Self { buf: BytesMut::with_capacity(4096), draft, capture_raw: false }
56    }
57
58    /// Create a new parser that captures the raw wire bytes of each frame.
59    ///
60    /// Use this variant only when a hook may rewrite frames; the extra
61    /// `Bytes::copy_from_slice` per frame is unnecessary for pure
62    /// pass-through forwarding.
63    pub fn new_capturing(draft: DraftVersion) -> Self {
64        Self { buf: BytesMut::with_capacity(4096), draft, capture_raw: true }
65    }
66
67    /// Feed raw bytes into the parser.
68    ///
69    /// Returns `ParseResult::Messages` if one or more complete frames
70    /// could be decoded, or `ParseResult::NeedMore` if more data is
71    /// needed. Partial frames are buffered internally.
72    pub fn feed(&mut self, data: &[u8]) -> ParseResult {
73        self.buf.extend_from_slice(data);
74        let mut frames = Vec::new();
75
76        loop {
77            // Need at least 1 byte to determine type varint length
78            if self.buf.is_empty() {
79                break;
80            }
81
82            // Read type_id varint length from first byte
83            let type_len = varint_len(self.buf[0]);
84            if self.buf.len() < type_len {
85                break;
86            }
87
88            // Peek at type_id (don't advance buf yet)
89            let mut cursor = &self.buf[..type_len];
90            if VarInt::decode(&mut cursor).is_err() {
91                break;
92            }
93
94            // Read payload length. Draft-11+ uses 16-bit BE; earlier drafts
95            // use a QUIC varint.
96            let (payload_len, total) = if self.draft.uses_fixed_length_framing() {
97                // Draft-11+: type_id(vi) + length(u16 BE) + payload
98                if self.buf.len() < type_len + 2 {
99                    break;
100                }
101                let hi = self.buf[type_len] as usize;
102                let lo = self.buf[type_len + 1] as usize;
103                let payload_len = (hi << 8) | lo;
104                (payload_len, type_len + 2 + payload_len)
105            } else {
106                // Draft-07..10: type_id(vi) + length(vi) + payload
107                if self.buf.len() <= type_len {
108                    break;
109                }
110                let payload_len_varint_len = varint_len(self.buf[type_len]);
111                if self.buf.len() < type_len + payload_len_varint_len {
112                    break;
113                }
114                let mut cursor = &self.buf[type_len..type_len + payload_len_varint_len];
115                let payload_len = match VarInt::decode(&mut cursor) {
116                    Ok(v) => v.into_inner() as usize,
117                    Err(_) => break,
118                };
119                (payload_len, type_len + payload_len_varint_len + payload_len)
120            };
121            let _ = payload_len; // used via total
122
123            // Check if we have the full frame
124            if self.buf.len() < total {
125                break;
126            }
127
128            // Only clone the wire bytes when a hook might rewrite them;
129            // the observation-only path forwards the original buffer.
130            let raw_bytes = if self.capture_raw {
131                Some(Bytes::copy_from_slice(&self.buf[..total]))
132            } else {
133                None
134            };
135
136            // Decode from a clone (so we don't corrupt the buffer on error)
137            let mut decode_buf = &self.buf[..total];
138            match AnyControlMessage::decode(self.draft, &mut decode_buf) {
139                Ok(message) => {
140                    self.buf.advance(total);
141                    frames.push(ParsedFrame { message, raw_bytes });
142                }
143                Err(_) => {
144                    // Skip this frame on decode error — advance past it
145                    // so we don't get stuck in an infinite loop.
146                    self.buf.advance(total);
147                    break;
148                }
149            }
150        }
151
152        if frames.is_empty() {
153            ParseResult::NeedMore
154        } else {
155            ParseResult::Messages(frames)
156        }
157    }
158
159    /// Returns the draft version this parser is configured for.
160    pub fn draft(&self) -> DraftVersion {
161        self.draft
162    }
163}
164
165impl Default for ControlStreamParser {
166    fn default() -> Self {
167        Self::new(DraftVersion::Draft14)
168    }
169}
170
171/// Determine the encoded length of a QUIC varint from its first byte.
172fn varint_len(first_byte: u8) -> usize {
173    1 << (first_byte >> 6)
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179
180    #[test]
181    fn varint_len_values() {
182        assert_eq!(varint_len(0x00), 1);
183        assert_eq!(varint_len(0x3F), 1);
184        assert_eq!(varint_len(0x40), 2);
185        assert_eq!(varint_len(0x80), 4);
186        assert_eq!(varint_len(0xC0), 8);
187    }
188}