Skip to main content

moqtap_codec/draft12/
data_stream.rs

1//! Draft-12 data stream header encoding and decoding.
2//!
3//! Changes from draft-11:
4//! - Subgroup stream type IDs shift from 0x08-0x0D to 0x10-0x15
5//! - Datagram types: same as draft-11 (0x00-0x03)
6//! - Fetch type: same as draft-11 (0x05)
7
8use super::types::ObjectStatus;
9use crate::error::CodecError;
10use crate::types::read_bytes;
11use crate::varint::VarInt;
12use bytes::{Buf, BufMut};
13
14/// Stream type IDs for draft-12 data streams.
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16#[repr(u64)]
17pub enum StreamType {
18    /// Object datagram, no extensions (0x00).
19    Datagram = 0x00,
20    /// Object datagram, with extensions (0x01).
21    DatagramExt = 0x01,
22    /// Object datagram status, no extensions (0x02).
23    DatagramStatus = 0x02,
24    /// Object datagram status, with extensions (0x03).
25    DatagramStatusExt = 0x03,
26    /// Fetch response stream (0x05).
27    Fetch = 0x05,
28    /// Subgroup: subgroup_id=0, no extensions (0x10).
29    SubgroupZero = 0x10,
30    /// Subgroup: subgroup_id=0, with extensions (0x11).
31    SubgroupZeroExt = 0x11,
32    /// Subgroup: subgroup_id=first object ID, no extensions (0x12).
33    SubgroupFirstObj = 0x12,
34    /// Subgroup: subgroup_id=first object ID, with extensions (0x13).
35    SubgroupFirstObjExt = 0x13,
36    /// Subgroup: explicit subgroup_id, no extensions (0x14).
37    SubgroupExplicit = 0x14,
38    /// Subgroup: explicit subgroup_id, with extensions (0x15).
39    SubgroupExplicitExt = 0x15,
40    /// Subgroup: subgroup_id=0, contains end of group, no extensions (0x18).
41    SubgroupZeroEog = 0x18,
42    /// Subgroup: subgroup_id=0, contains end of group, with extensions (0x19).
43    SubgroupZeroEogExt = 0x19,
44    /// Subgroup: subgroup_id=first object ID, contains end of group, no extensions (0x1A).
45    SubgroupFirstObjEog = 0x1A,
46    /// Subgroup: subgroup_id=first object ID, contains end of group, with extensions (0x1B).
47    SubgroupFirstObjEogExt = 0x1B,
48    /// Subgroup: explicit subgroup_id, contains end of group, no extensions (0x1C).
49    SubgroupExplicitEog = 0x1C,
50    /// Subgroup: explicit subgroup_id, contains end of group, with extensions (0x1D).
51    SubgroupExplicitEogExt = 0x1D,
52}
53
54impl StreamType {
55    pub fn from_id(id: u64) -> Option<Self> {
56        match id {
57            0x00 => Some(StreamType::Datagram),
58            0x01 => Some(StreamType::DatagramExt),
59            0x02 => Some(StreamType::DatagramStatus),
60            0x03 => Some(StreamType::DatagramStatusExt),
61            0x05 => Some(StreamType::Fetch),
62            0x10 => Some(StreamType::SubgroupZero),
63            0x11 => Some(StreamType::SubgroupZeroExt),
64            0x12 => Some(StreamType::SubgroupFirstObj),
65            0x13 => Some(StreamType::SubgroupFirstObjExt),
66            0x14 => Some(StreamType::SubgroupExplicit),
67            0x15 => Some(StreamType::SubgroupExplicitExt),
68            0x18 => Some(StreamType::SubgroupZeroEog),
69            0x19 => Some(StreamType::SubgroupZeroEogExt),
70            0x1A => Some(StreamType::SubgroupFirstObjEog),
71            0x1B => Some(StreamType::SubgroupFirstObjEogExt),
72            0x1C => Some(StreamType::SubgroupExplicitEog),
73            0x1D => Some(StreamType::SubgroupExplicitEogExt),
74            _ => None,
75        }
76    }
77
78    pub fn is_subgroup(&self) -> bool {
79        matches!(
80            self,
81            StreamType::SubgroupZero
82                | StreamType::SubgroupZeroExt
83                | StreamType::SubgroupFirstObj
84                | StreamType::SubgroupFirstObjExt
85                | StreamType::SubgroupExplicit
86                | StreamType::SubgroupExplicitExt
87                | StreamType::SubgroupZeroEog
88                | StreamType::SubgroupZeroEogExt
89                | StreamType::SubgroupFirstObjEog
90                | StreamType::SubgroupFirstObjEogExt
91                | StreamType::SubgroupExplicitEog
92                | StreamType::SubgroupExplicitEogExt
93        )
94    }
95
96    pub fn has_extensions(&self) -> bool {
97        matches!(
98            self,
99            StreamType::DatagramExt
100                | StreamType::DatagramStatusExt
101                | StreamType::SubgroupZeroExt
102                | StreamType::SubgroupFirstObjExt
103                | StreamType::SubgroupExplicitExt
104                | StreamType::SubgroupZeroEogExt
105                | StreamType::SubgroupFirstObjEogExt
106                | StreamType::SubgroupExplicitEogExt
107        )
108    }
109
110    /// True if this subgroup stream type indicates the stream contains the end of its group.
111    pub fn contains_end_of_group(&self) -> bool {
112        matches!(
113            self,
114            StreamType::SubgroupZeroEog
115                | StreamType::SubgroupZeroEogExt
116                | StreamType::SubgroupFirstObjEog
117                | StreamType::SubgroupFirstObjEogExt
118                | StreamType::SubgroupExplicitEog
119                | StreamType::SubgroupExplicitEogExt
120        )
121    }
122}
123
124// ── Extension helpers ─────────────────────────────────────────
125
126fn read_extension_bytes(buf: &mut impl Buf, byte_len: u64) -> Result<Vec<u8>, CodecError> {
127    read_bytes(buf, byte_len as usize)
128}
129
130// ============================================================
131// Subgroup stream header
132// ============================================================
133
134#[derive(Debug, Clone, PartialEq, Eq)]
135pub struct SubgroupHeader {
136    pub stream_type: StreamType,
137    pub track_alias: VarInt,
138    pub group_id: VarInt,
139    pub subgroup_id: VarInt,
140    pub publisher_priority: u8,
141}
142
143impl SubgroupHeader {
144    pub fn encode(&self, buf: &mut impl BufMut) {
145        self.track_alias.encode(buf);
146        self.group_id.encode(buf);
147        match self.stream_type {
148            StreamType::SubgroupExplicit
149            | StreamType::SubgroupExplicitExt
150            | StreamType::SubgroupExplicitEog
151            | StreamType::SubgroupExplicitEogExt => {
152                self.subgroup_id.encode(buf);
153            }
154            _ => {}
155        }
156        buf.put_u8(self.publisher_priority);
157    }
158
159    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
160        Self::decode_with_type(StreamType::SubgroupExplicit, buf)
161    }
162
163    pub fn decode_with_type(
164        stream_type: StreamType,
165        buf: &mut impl Buf,
166    ) -> Result<Self, CodecError> {
167        let track_alias = VarInt::decode(buf)?;
168        let group_id = VarInt::decode(buf)?;
169        let subgroup_id = match stream_type {
170            StreamType::SubgroupZero
171            | StreamType::SubgroupZeroExt
172            | StreamType::SubgroupZeroEog
173            | StreamType::SubgroupZeroEogExt => VarInt::from_usize(0),
174            StreamType::SubgroupExplicit
175            | StreamType::SubgroupExplicitExt
176            | StreamType::SubgroupExplicitEog
177            | StreamType::SubgroupExplicitEogExt => VarInt::decode(buf)?,
178            StreamType::SubgroupFirstObj
179            | StreamType::SubgroupFirstObjExt
180            | StreamType::SubgroupFirstObjEog
181            | StreamType::SubgroupFirstObjEogExt => VarInt::from_usize(0),
182            _ => return Err(CodecError::InvalidField),
183        };
184        if buf.remaining() < 1 {
185            return Err(CodecError::UnexpectedEnd);
186        }
187        let publisher_priority = buf.get_u8();
188        Ok(Self { stream_type, track_alias, group_id, subgroup_id, publisher_priority })
189    }
190}
191
192// ============================================================
193// Object header within subgroup
194// ============================================================
195
196#[derive(Debug, Clone, PartialEq, Eq)]
197pub struct ObjectHeader {
198    pub object_id: VarInt,
199    pub extension_headers_length: VarInt,
200    pub extensions: Vec<u8>,
201    pub payload_length: VarInt,
202    pub object_status: ObjectStatus,
203}
204
205impl ObjectHeader {
206    pub fn encode(&self, buf: &mut impl BufMut) {
207        self.encode_with_extensions(false, buf);
208    }
209
210    pub fn encode_with_extensions(&self, has_extensions: bool, buf: &mut impl BufMut) {
211        self.object_id.encode(buf);
212        if has_extensions {
213            self.extension_headers_length.encode(buf);
214            buf.put_slice(&self.extensions);
215        }
216        self.payload_length.encode(buf);
217        if self.payload_length.into_inner() == 0 {
218            VarInt::from_usize(self.object_status as usize).encode(buf);
219        }
220    }
221
222    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
223        Self::decode_with_extensions(false, buf)
224    }
225
226    pub fn decode_with_extensions(
227        has_extensions: bool,
228        buf: &mut impl Buf,
229    ) -> Result<Self, CodecError> {
230        let object_id = VarInt::decode(buf)?;
231        let (extension_headers_length, extensions) = if has_extensions {
232            let ehl = VarInt::decode(buf)?;
233            let ext = read_extension_bytes(buf, ehl.into_inner())?;
234            (ehl, ext)
235        } else {
236            (VarInt::from_usize(0), Vec::new())
237        };
238        let payload_length = VarInt::decode(buf)?;
239        let object_status = if payload_length.into_inner() == 0 {
240            let sv = VarInt::decode(buf)?.into_inner();
241            ObjectStatus::from_u64(sv).ok_or(CodecError::InvalidField)?
242        } else {
243            ObjectStatus::Normal
244        };
245        Ok(Self { object_id, extension_headers_length, extensions, payload_length, object_status })
246    }
247}
248
249// ============================================================
250// Datagram (types 0x00, 0x01)
251// ============================================================
252
253#[derive(Debug, Clone, PartialEq, Eq)]
254pub struct DatagramHeader {
255    pub track_alias: VarInt,
256    pub group_id: VarInt,
257    pub object_id: VarInt,
258    pub publisher_priority: u8,
259    pub extension_headers_length: VarInt,
260    pub extensions: Vec<u8>,
261}
262
263impl DatagramHeader {
264    pub fn encode(&self, buf: &mut impl BufMut) {
265        self.encode_with_extensions(false, buf);
266    }
267
268    pub fn encode_with_extensions(&self, has_extensions: bool, buf: &mut impl BufMut) {
269        self.track_alias.encode(buf);
270        self.group_id.encode(buf);
271        self.object_id.encode(buf);
272        buf.put_u8(self.publisher_priority);
273        if has_extensions {
274            self.extension_headers_length.encode(buf);
275            buf.put_slice(&self.extensions);
276        }
277    }
278
279    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
280        Self::decode_with_extensions(false, buf)
281    }
282
283    pub fn decode_with_extensions(
284        has_extensions: bool,
285        buf: &mut impl Buf,
286    ) -> Result<Self, CodecError> {
287        let track_alias = VarInt::decode(buf)?;
288        let group_id = VarInt::decode(buf)?;
289        let object_id = VarInt::decode(buf)?;
290        if buf.remaining() < 1 {
291            return Err(CodecError::UnexpectedEnd);
292        }
293        let publisher_priority = buf.get_u8();
294        let (extension_headers_length, extensions) = if has_extensions {
295            let ehl = VarInt::decode(buf)?;
296            let ext = read_extension_bytes(buf, ehl.into_inner())?;
297            (ehl, ext)
298        } else {
299            (VarInt::from_usize(0), Vec::new())
300        };
301        Ok(Self {
302            track_alias,
303            group_id,
304            object_id,
305            publisher_priority,
306            extension_headers_length,
307            extensions,
308        })
309    }
310}
311
312// ============================================================
313// Datagram Status (types 0x02, 0x03)
314// ============================================================
315
316#[derive(Debug, Clone, PartialEq, Eq)]
317pub struct DatagramStatusHeader {
318    pub track_alias: VarInt,
319    pub group_id: VarInt,
320    pub object_id: VarInt,
321    pub publisher_priority: u8,
322    pub extension_headers_length: VarInt,
323    pub extensions: Vec<u8>,
324    pub object_status: ObjectStatus,
325}
326
327impl DatagramStatusHeader {
328    pub fn encode(&self, buf: &mut impl BufMut) {
329        self.encode_with_extensions(false, buf);
330    }
331
332    pub fn encode_with_extensions(&self, has_extensions: bool, buf: &mut impl BufMut) {
333        self.track_alias.encode(buf);
334        self.group_id.encode(buf);
335        self.object_id.encode(buf);
336        buf.put_u8(self.publisher_priority);
337        if has_extensions {
338            self.extension_headers_length.encode(buf);
339            buf.put_slice(&self.extensions);
340        }
341        VarInt::from_usize(self.object_status as usize).encode(buf);
342    }
343
344    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
345        Self::decode_with_extensions(false, buf)
346    }
347
348    pub fn decode_with_extensions(
349        has_extensions: bool,
350        buf: &mut impl Buf,
351    ) -> Result<Self, CodecError> {
352        let track_alias = VarInt::decode(buf)?;
353        let group_id = VarInt::decode(buf)?;
354        let object_id = VarInt::decode(buf)?;
355        if buf.remaining() < 1 {
356            return Err(CodecError::UnexpectedEnd);
357        }
358        let publisher_priority = buf.get_u8();
359        let (extension_headers_length, extensions) = if has_extensions {
360            let ehl = VarInt::decode(buf)?;
361            let ext = read_extension_bytes(buf, ehl.into_inner())?;
362            (ehl, ext)
363        } else {
364            (VarInt::from_usize(0), Vec::new())
365        };
366        let sv = VarInt::decode(buf)?.into_inner();
367        let object_status = ObjectStatus::from_u64(sv).ok_or(CodecError::InvalidField)?;
368        Ok(Self {
369            track_alias,
370            group_id,
371            object_id,
372            publisher_priority,
373            extension_headers_length,
374            extensions,
375            object_status,
376        })
377    }
378}
379
380// ============================================================
381// Fetch stream (type 0x05)
382// ============================================================
383
384#[derive(Debug, Clone, PartialEq, Eq)]
385pub struct FetchHeader {
386    pub request_id: VarInt,
387}
388
389#[derive(Debug, Clone, PartialEq, Eq)]
390pub struct FetchObjectHeader {
391    pub group_id: VarInt,
392    pub subgroup_id: VarInt,
393    pub object_id: VarInt,
394    pub publisher_priority: u8,
395    pub extension_headers_length: VarInt,
396    pub extensions: Vec<u8>,
397    pub payload_length: VarInt,
398    pub object_status: ObjectStatus,
399}
400
401impl FetchHeader {
402    pub fn encode(&self, buf: &mut impl BufMut) {
403        self.request_id.encode(buf);
404    }
405
406    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
407        let request_id = VarInt::decode(buf)?;
408        Ok(Self { request_id })
409    }
410}
411
412impl FetchObjectHeader {
413    pub fn encode(&self, buf: &mut impl BufMut) {
414        self.group_id.encode(buf);
415        self.subgroup_id.encode(buf);
416        self.object_id.encode(buf);
417        buf.put_u8(self.publisher_priority);
418        self.extension_headers_length.encode(buf);
419        buf.put_slice(&self.extensions);
420        self.payload_length.encode(buf);
421        if self.payload_length.into_inner() == 0 {
422            VarInt::from_usize(self.object_status as usize).encode(buf);
423        }
424    }
425
426    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
427        let group_id = VarInt::decode(buf)?;
428        let subgroup_id = VarInt::decode(buf)?;
429        let object_id = VarInt::decode(buf)?;
430        if buf.remaining() < 1 {
431            return Err(CodecError::UnexpectedEnd);
432        }
433        let publisher_priority = buf.get_u8();
434        let extension_headers_length = VarInt::decode(buf)?;
435        let extensions = read_extension_bytes(buf, extension_headers_length.into_inner())?;
436        let payload_length = VarInt::decode(buf)?;
437        let object_status = if payload_length.into_inner() == 0 {
438            let sv = VarInt::decode(buf)?.into_inner();
439            ObjectStatus::from_u64(sv).ok_or(CodecError::InvalidField)?
440        } else {
441            ObjectStatus::Normal
442        };
443        Ok(Self {
444            group_id,
445            subgroup_id,
446            object_id,
447            publisher_priority,
448            extension_headers_length,
449            extensions,
450            payload_length,
451            object_status,
452        })
453    }
454}