Skip to main content

moqtap_codec/draft15/
data_stream.rs

1//! Draft-15 data stream header encoding and decoding.
2//!
3//! Draft-15 data streams differ significantly from draft-14:
4//! - Subgroup stream types encode flags in the type byte (0x10-0x17, 0x30-0x37)
5//! - Priority is optional (absent when type & 0x20)
6//! - Subgroup ID is optional (present when type & 0x04)
7//! - Extensions flag (type & 0x01) affects per-object parsing
8//! - Datagram types: 0x00 (normal), 0x02 (end-of-group), 0x04 (no object_id),
9//!   0x20 (status)
10//! - Fetch objects use serialization_flags for delta encoding
11//! - Object IDs in subgroups use delta encoding (first=absolute, subsequent=delta+1)
12
13use crate::error::CodecError;
14use crate::varint::VarInt;
15use bytes::{Buf, BufMut};
16
17// ── Subgroup streams ───────────────────────────────────────
18
19/// Subgroup stream header for draft-15.
20///
21/// The `header_type` byte encodes several flags:
22/// - `& 0x01`: extensions present on objects
23/// - `& 0x02`: end-of-group marker
24/// - `& 0x04`: explicit subgroup_id present
25/// - `& 0x20`: no publisher_priority (0x30+ types)
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct SubgroupHeader {
28    pub header_type: u8,
29    pub track_alias: VarInt,
30    pub group_id: VarInt,
31    pub subgroup_id: VarInt,
32    pub publisher_priority: Option<u8>,
33}
34
35impl SubgroupHeader {
36    pub fn has_extensions(&self) -> bool {
37        self.header_type & 0x01 != 0
38    }
39
40    pub fn has_end_of_group(&self) -> bool {
41        self.header_type & 0x02 != 0
42    }
43
44    pub fn has_explicit_subgroup_id(&self) -> bool {
45        self.header_type & 0x04 != 0
46    }
47
48    pub fn has_priority(&self) -> bool {
49        self.header_type & 0x20 == 0
50    }
51
52    pub fn encode(&self, buf: &mut impl BufMut) {
53        VarInt::from_usize(self.header_type as usize).encode(buf);
54        self.track_alias.encode(buf);
55        self.group_id.encode(buf);
56        if self.has_explicit_subgroup_id() {
57            self.subgroup_id.encode(buf);
58        }
59        if let Some(p) = self.publisher_priority {
60            buf.put_u8(p);
61        }
62    }
63
64    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
65        let header_type = VarInt::decode(buf)?.into_inner() as u8;
66        let base = header_type & 0xD0; // mask out lower flag bits
67        if base != 0x10 && base != 0x30 {
68            return Err(CodecError::InvalidField);
69        }
70        let track_alias = VarInt::decode(buf)?;
71        let group_id = VarInt::decode(buf)?;
72        let subgroup_id =
73            if header_type & 0x04 != 0 { VarInt::decode(buf)? } else { VarInt::from_usize(0) };
74        let publisher_priority = if header_type & 0x20 == 0 {
75            if buf.remaining() < 1 {
76                return Err(CodecError::UnexpectedEnd);
77            }
78            Some(buf.get_u8())
79        } else {
80            None
81        };
82        Ok(Self { header_type, track_alias, group_id, subgroup_id, publisher_priority })
83    }
84}
85
86// ── Subgroup objects (stateful) ─────────────────────────────
87
88/// One object within a draft-15 subgroup stream with its Object ID
89/// already resolved from the delta encoding.
90///
91/// Draft-15 object framing requires context from the enclosing
92/// [`SubgroupHeader`] (specifically, whether extension headers are
93/// present and the running delta state), so decoding/encoding uses a
94/// stateful [`SubgroupObjectReader`] rather than a standalone method.
95#[derive(Debug, Clone, PartialEq, Eq)]
96pub struct SubgroupObject {
97    /// Resolved absolute Object ID.
98    pub object_id: VarInt,
99    /// Raw extension-header bytes. Empty when the stream header does
100    /// not set the extensions-present bit. When present, holds the
101    /// entire on-wire extension block starting with the `ext_count`
102    /// varint followed by each extension's `key`, `vlen`, and value.
103    pub extension_headers: Vec<u8>,
104    /// Payload length as encoded on the wire. Zero when the object is
105    /// a status-only object.
106    pub payload_length: VarInt,
107    /// Object status; `Some` when `payload_length == 0`.
108    pub object_status: Option<VarInt>,
109    /// Payload bytes; empty when `object_status` is `Some`.
110    pub payload: Vec<u8>,
111}
112
113/// Stateful reader/writer for draft-15 subgroup objects.
114///
115/// Carries the running delta state for object IDs and remembers whether
116/// extension headers are present on this stream.
117#[derive(Debug, Clone)]
118pub struct SubgroupObjectReader {
119    extensions_present: bool,
120    prev_object_id: Option<u64>,
121}
122
123impl SubgroupObjectReader {
124    /// Build a reader seeded from the enclosing subgroup header.
125    pub fn new(header: &SubgroupHeader) -> Self {
126        Self { extensions_present: header.has_extensions(), prev_object_id: None }
127    }
128
129    /// Decode the next object from `buf`.
130    pub fn read_object(&mut self, buf: &mut impl Buf) -> Result<SubgroupObject, CodecError> {
131        let delta = VarInt::decode(buf)?.into_inner();
132        // Draft-15 subgroup object delta encoding:
133        // - First object: `delta` is the absolute object_id.
134        // - Subsequent objects (no extensions): `delta` is the gap to the
135        //   next object; the resolved id is `prev + delta + 1`.
136        // - Subsequent objects (extensions flag set): `delta` is the
137        //   already-adjusted offset; the resolved id is `prev + delta`.
138        let object_id_val = match self.prev_object_id {
139            None => delta,
140            Some(prev) => {
141                if self.extensions_present {
142                    prev.checked_add(delta).ok_or(CodecError::InvalidField)?
143                } else {
144                    prev.checked_add(1)
145                        .and_then(|v| v.checked_add(delta))
146                        .ok_or(CodecError::InvalidField)?
147                }
148            }
149        };
150        self.prev_object_id = Some(object_id_val);
151        let object_id = VarInt::from_u64(object_id_val).map_err(|_| CodecError::InvalidField)?;
152
153        let extension_headers = if self.extensions_present {
154            // Draft-15+: extensions are a byte-length-prefixed opaque
155            // blob. We copy the blob verbatim; callers that want
156            // structured extensions can parse the returned bytes.
157            let ext_len = VarInt::decode(buf)?.into_inner() as usize;
158            crate::types::read_bytes(buf, ext_len)?
159        } else {
160            Vec::new()
161        };
162
163        let payload_length_vi = VarInt::decode(buf)?;
164        let payload_length_val = payload_length_vi.into_inner() as usize;
165        let (object_status, payload) = if payload_length_val == 0 {
166            let status = VarInt::decode(buf)?;
167            (Some(status), Vec::new())
168        } else {
169            let payload = crate::types::read_bytes(buf, payload_length_val)?;
170            (None, payload)
171        };
172
173        Ok(SubgroupObject {
174            object_id,
175            extension_headers,
176            payload_length: payload_length_vi,
177            object_status,
178            payload,
179        })
180    }
181
182    /// Serialize an object, producing the correct delta encoding.
183    pub fn write_object(
184        &mut self,
185        object: &SubgroupObject,
186        buf: &mut impl BufMut,
187    ) -> Result<(), CodecError> {
188        let oid = object.object_id.into_inner();
189        let delta = match self.prev_object_id {
190            None => oid,
191            Some(prev) => oid.checked_sub(prev).ok_or(CodecError::InvalidField)?,
192        };
193        VarInt::from_u64(delta).map_err(|_| CodecError::InvalidField)?.encode(buf);
194        if self.extensions_present {
195            let ext_len = object.extension_headers.len();
196            VarInt::from_usize(ext_len).encode(buf);
197            buf.put_slice(&object.extension_headers);
198        }
199        object.payload_length.encode(buf);
200        if object.payload_length.into_inner() == 0 {
201            if let Some(s) = &object.object_status {
202                s.encode(buf);
203            } else {
204                // Default to status 0 when none supplied.
205                VarInt::from_u64(0).unwrap().encode(buf);
206            }
207        } else {
208            buf.put_slice(&object.payload);
209        }
210        self.prev_object_id = Some(oid);
211        Ok(())
212    }
213}
214
215// ── Datagram headers ───────────────────────────────────────
216
217/// Datagram header for draft-15.
218///
219/// The `datagram_type` byte encodes flags:
220/// - `0x02`: end-of-group
221/// - `0x04`: no object_id (object_id = 0 implied)
222/// - `0x20`: status datagram (carries object_status instead of payload)
223#[derive(Debug, Clone, PartialEq, Eq)]
224pub struct DatagramHeader {
225    /// Raw datagram-type byte encoding flags + kind.
226    pub datagram_type: u8,
227    /// Track alias identifying the track.
228    pub track_alias: VarInt,
229    /// Group ID for the contained object.
230    pub group_id: VarInt,
231    /// Object ID (zero when the `no-object-id` flag is set).
232    pub object_id: VarInt,
233    /// Publisher priority.
234    pub publisher_priority: u8,
235    /// Opaque extension-headers blob (only when the `0x01` flag is set).
236    pub extension_headers: Vec<u8>,
237    /// Object status (only when the `0x20` status flag is set).
238    pub object_status: Option<VarInt>,
239}
240
241impl DatagramHeader {
242    /// Whether the datagram carries an explicit object_id.
243    pub fn has_object_id(&self) -> bool {
244        self.datagram_type & 0x04 == 0
245    }
246
247    /// Whether this datagram marks the end of its group.
248    pub fn is_end_of_group(&self) -> bool {
249        self.datagram_type & 0x02 != 0
250    }
251
252    /// Whether this datagram carries an object_status instead of payload.
253    pub fn is_status(&self) -> bool {
254        self.datagram_type & 0x20 != 0
255    }
256
257    /// Whether this datagram carries extension headers.
258    pub fn has_extensions(&self) -> bool {
259        self.datagram_type & 0x01 != 0
260    }
261
262    /// Encode the datagram header to `buf`.
263    pub fn encode(&self, buf: &mut impl BufMut) {
264        VarInt::from_usize(self.datagram_type as usize).encode(buf);
265        self.track_alias.encode(buf);
266        self.group_id.encode(buf);
267        if self.has_object_id() {
268            self.object_id.encode(buf);
269        }
270        buf.put_u8(self.publisher_priority);
271        if self.has_extensions() {
272            VarInt::from_usize(self.extension_headers.len()).encode(buf);
273            buf.put_slice(&self.extension_headers);
274        }
275        if self.is_status() {
276            if let Some(s) = &self.object_status {
277                s.encode(buf);
278            }
279        }
280    }
281
282    /// Decode a datagram header from `buf`.
283    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
284        let datagram_type = VarInt::decode(buf)?.into_inner() as u8;
285        let track_alias = VarInt::decode(buf)?;
286        let group_id = VarInt::decode(buf)?;
287        let object_id =
288            if datagram_type & 0x04 == 0 { VarInt::decode(buf)? } else { VarInt::from_usize(0) };
289        if buf.remaining() < 1 {
290            return Err(CodecError::UnexpectedEnd);
291        }
292        let publisher_priority = buf.get_u8();
293        let extension_headers = if datagram_type & 0x01 != 0 {
294            let ext_len = VarInt::decode(buf)?.into_inner() as usize;
295            crate::types::read_bytes(buf, ext_len)?
296        } else {
297            Vec::new()
298        };
299        let object_status =
300            if datagram_type & 0x20 != 0 { Some(VarInt::decode(buf)?) } else { None };
301        Ok(Self {
302            datagram_type,
303            track_alias,
304            group_id,
305            object_id,
306            publisher_priority,
307            extension_headers,
308            object_status,
309        })
310    }
311}
312
313// ── Fetch stream headers ───────────────────────────────────
314
315/// Fetch stream header for draft-15.
316///
317/// Stream type is 0x05. Only contains a request_id.
318#[derive(Debug, Clone, PartialEq, Eq)]
319pub struct FetchHeader {
320    pub request_id: VarInt,
321}
322
323impl FetchHeader {
324    pub fn encode(&self, buf: &mut impl BufMut) {
325        VarInt::from_usize(0x05).encode(buf);
326        self.request_id.encode(buf);
327    }
328
329    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
330        let stream_type = VarInt::decode(buf)?.into_inner();
331        if stream_type != 0x05 {
332            return Err(CodecError::InvalidField);
333        }
334        let request_id = VarInt::decode(buf)?;
335        Ok(Self { request_id })
336    }
337}