moqtap_proxy/parser/control.rs
1//! Inline control stream parser.
2//!
3//! Buffers raw bytes from the forwarding loop and decodes complete MoQT
4//! control messages without modifying the forwarded data.
5
6use bytes::{Buf, Bytes, BytesMut};
7
8use moqtap_codec::dispatch::AnyControlMessage;
9use moqtap_codec::varint::VarInt;
10use moqtap_codec::version::DraftVersion;
11
12/// A successfully parsed control frame.
13///
14/// `raw_bytes` is populated only when the parser was constructed with
15/// [`ControlStreamParser::new_capturing`]; the default observation-only
16/// parser leaves it as `None` to avoid copying bytes that already flow
17/// through the forwarding path.
18#[derive(Debug, Clone)]
19pub struct ParsedFrame {
20 /// The decoded control message.
21 pub message: AnyControlMessage,
22 /// The original wire bytes of this frame — only set when the parser
23 /// is in capturing mode (used by hook-driven mutation).
24 pub raw_bytes: Option<Bytes>,
25}
26
27/// Result of feeding bytes to the control stream parser.
28#[derive(Debug)]
29pub enum ParseResult {
30 /// One or more complete messages were decoded.
31 Messages(Vec<ParsedFrame>),
32 /// Need more data — bytes are buffered internally.
33 NeedMore,
34}
35
36/// Stateful inline parser for a MoQT control stream.
37///
38/// Accepts raw byte chunks (as they arrive from `RecvStream::read`),
39/// buffers them, and emits complete `ParsedFrame`s. In the default
40/// (non-capturing) mode the parser does not clone the frame bytes; in
41/// capturing mode it does, so a hook can rewrite the frame before the
42/// proxy forwards it.
43pub struct ControlStreamParser {
44 buf: BytesMut,
45 draft: DraftVersion,
46 capture_raw: bool,
47}
48
49impl ControlStreamParser {
50 /// Create a new observation-only parser.
51 ///
52 /// `ParsedFrame::raw_bytes` will be `None`; use
53 /// [`Self::new_capturing`] when a hook needs to mutate frames.
54 pub fn new(draft: DraftVersion) -> Self {
55 Self { buf: BytesMut::with_capacity(4096), draft, capture_raw: false }
56 }
57
58 /// Create a new parser that captures the raw wire bytes of each frame.
59 ///
60 /// Use this variant only when a hook may rewrite frames; the extra
61 /// `Bytes::copy_from_slice` per frame is unnecessary for pure
62 /// pass-through forwarding.
63 pub fn new_capturing(draft: DraftVersion) -> Self {
64 Self { buf: BytesMut::with_capacity(4096), draft, capture_raw: true }
65 }
66
67 /// Feed raw bytes into the parser.
68 ///
69 /// Returns `ParseResult::Messages` if one or more complete frames
70 /// could be decoded, or `ParseResult::NeedMore` if more data is
71 /// needed. Partial frames are buffered internally.
72 pub fn feed(&mut self, data: &[u8]) -> ParseResult {
73 self.buf.extend_from_slice(data);
74 let mut frames = Vec::new();
75
76 loop {
77 // Need at least 1 byte to determine type varint length
78 if self.buf.is_empty() {
79 break;
80 }
81
82 // Read type_id varint length from first byte
83 let type_len = varint_len(self.buf[0]);
84 if self.buf.len() < type_len {
85 break;
86 }
87
88 // Peek at type_id (don't advance buf yet)
89 let mut cursor = &self.buf[..type_len];
90 if VarInt::decode(&mut cursor).is_err() {
91 break;
92 }
93
94 // Read payload length. Draft-11+ uses 16-bit BE; earlier drafts
95 // use a QUIC varint.
96 let (payload_len, total) = if self.draft.uses_fixed_length_framing() {
97 // Draft-11+: type_id(vi) + length(u16 BE) + payload
98 if self.buf.len() < type_len + 2 {
99 break;
100 }
101 let hi = self.buf[type_len] as usize;
102 let lo = self.buf[type_len + 1] as usize;
103 let payload_len = (hi << 8) | lo;
104 (payload_len, type_len + 2 + payload_len)
105 } else {
106 // Draft-07..10: type_id(vi) + length(vi) + payload
107 if self.buf.len() <= type_len {
108 break;
109 }
110 let payload_len_varint_len = varint_len(self.buf[type_len]);
111 if self.buf.len() < type_len + payload_len_varint_len {
112 break;
113 }
114 let mut cursor = &self.buf[type_len..type_len + payload_len_varint_len];
115 let payload_len = match VarInt::decode(&mut cursor) {
116 Ok(v) => v.into_inner() as usize,
117 Err(_) => break,
118 };
119 (payload_len, type_len + payload_len_varint_len + payload_len)
120 };
121 let _ = payload_len; // used via total
122
123 // Check if we have the full frame
124 if self.buf.len() < total {
125 break;
126 }
127
128 // Only clone the wire bytes when a hook might rewrite them;
129 // the observation-only path forwards the original buffer.
130 let raw_bytes = if self.capture_raw {
131 Some(Bytes::copy_from_slice(&self.buf[..total]))
132 } else {
133 None
134 };
135
136 // Decode from a clone (so we don't corrupt the buffer on error)
137 let mut decode_buf = &self.buf[..total];
138 match AnyControlMessage::decode(self.draft, &mut decode_buf) {
139 Ok(message) => {
140 self.buf.advance(total);
141 frames.push(ParsedFrame { message, raw_bytes });
142 }
143 Err(_) => {
144 // Skip this frame on decode error — advance past it
145 // so we don't get stuck in an infinite loop.
146 self.buf.advance(total);
147 break;
148 }
149 }
150 }
151
152 if frames.is_empty() {
153 ParseResult::NeedMore
154 } else {
155 ParseResult::Messages(frames)
156 }
157 }
158
159 /// Returns the draft version this parser is configured for.
160 pub fn draft(&self) -> DraftVersion {
161 self.draft
162 }
163}
164
165impl Default for ControlStreamParser {
166 fn default() -> Self {
167 Self::new(DraftVersion::Draft14)
168 }
169}
170
171/// Determine the encoded length of a QUIC varint from its first byte.
172fn varint_len(first_byte: u8) -> usize {
173 1 << (first_byte >> 6)
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179
180 #[test]
181 fn varint_len_values() {
182 assert_eq!(varint_len(0x00), 1);
183 assert_eq!(varint_len(0x3F), 1);
184 assert_eq!(varint_len(0x40), 2);
185 assert_eq!(varint_len(0x80), 4);
186 assert_eq!(varint_len(0xC0), 8);
187 }
188}