Skip to main content

moqtap_codec/draft11/
data_stream.rs

1//! Draft-11 data stream header encoding and decoding.
2//!
3//! Changes from draft-09/10:
4//! - Datagram stream type IDs: 0x00 (no ext), 0x01 (with ext), 0x02 (status, no ext),
5//!   0x03 (status, with ext)
6//! - Subgroup stream types: 0x08-0x0D (6 variants based on subgroup_id encoding and extensions)
7//! - Fetch stream type: 0x05 (request_id only in header)
8//! - Object within subgroup: object_id + [ext_headers_length + extensions] + payload_length
9//!   + [object_status if payload_length=0]
10
11use super::types::ObjectStatus;
12use crate::error::CodecError;
13use crate::types::read_bytes;
14use crate::varint::VarInt;
15use bytes::{Buf, BufMut};
16
17/// Stream type IDs for draft-11 data streams.
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19#[repr(u64)]
20pub enum StreamType {
21    /// Object datagram, no extensions (0x00).
22    Datagram = 0x00,
23    /// Object datagram, with extensions (0x01).
24    DatagramExt = 0x01,
25    /// Object datagram status, no extensions (0x02).
26    DatagramStatus = 0x02,
27    /// Object datagram status, with extensions (0x03).
28    DatagramStatusExt = 0x03,
29    /// Fetch response stream (0x05).
30    Fetch = 0x05,
31    /// Subgroup: subgroup_id=0, no extensions (0x08).
32    SubgroupZero = 0x08,
33    /// Subgroup: subgroup_id=0, with extensions (0x09).
34    SubgroupZeroExt = 0x09,
35    /// Subgroup: subgroup_id=first object ID, no extensions (0x0A).
36    SubgroupFirstObj = 0x0A,
37    /// Subgroup: subgroup_id=first object ID, with extensions (0x0B).
38    SubgroupFirstObjExt = 0x0B,
39    /// Subgroup: explicit subgroup_id, no extensions (0x0C).
40    SubgroupExplicit = 0x0C,
41    /// Subgroup: explicit subgroup_id, with extensions (0x0D).
42    SubgroupExplicitExt = 0x0D,
43}
44
45impl StreamType {
46    /// Convert a raw stream type ID to a `StreamType`, if valid.
47    pub fn from_id(id: u64) -> Option<Self> {
48        match id {
49            0x00 => Some(StreamType::Datagram),
50            0x01 => Some(StreamType::DatagramExt),
51            0x02 => Some(StreamType::DatagramStatus),
52            0x03 => Some(StreamType::DatagramStatusExt),
53            0x05 => Some(StreamType::Fetch),
54            0x08 => Some(StreamType::SubgroupZero),
55            0x09 => Some(StreamType::SubgroupZeroExt),
56            0x0A => Some(StreamType::SubgroupFirstObj),
57            0x0B => Some(StreamType::SubgroupFirstObjExt),
58            0x0C => Some(StreamType::SubgroupExplicit),
59            0x0D => Some(StreamType::SubgroupExplicitExt),
60            _ => None,
61        }
62    }
63
64    /// Whether this stream type is a subgroup variant.
65    pub fn is_subgroup(&self) -> bool {
66        matches!(
67            self,
68            StreamType::SubgroupZero
69                | StreamType::SubgroupZeroExt
70                | StreamType::SubgroupFirstObj
71                | StreamType::SubgroupFirstObjExt
72                | StreamType::SubgroupExplicit
73                | StreamType::SubgroupExplicitExt
74        )
75    }
76
77    /// Whether this stream type includes extension headers on objects.
78    pub fn has_extensions(&self) -> bool {
79        matches!(
80            self,
81            StreamType::DatagramExt
82                | StreamType::DatagramStatusExt
83                | StreamType::SubgroupZeroExt
84                | StreamType::SubgroupFirstObjExt
85                | StreamType::SubgroupExplicitExt
86        )
87    }
88}
89
90// ── Extension helpers ─────────────────────────────────────────
91
92fn read_extension_bytes(buf: &mut impl Buf, byte_len: u64) -> Result<Vec<u8>, CodecError> {
93    read_bytes(buf, byte_len as usize)
94}
95
96// ============================================================
97// Subgroup stream header
98// ============================================================
99
100/// Subgroup stream header (unified across all 6 stream type variants).
101///
102/// Decoded representation includes `stream_type` to preserve the variant.
103#[derive(Debug, Clone, PartialEq, Eq)]
104pub struct SubgroupHeader {
105    /// The stream type variant used for encoding.
106    pub stream_type: StreamType,
107    /// Track alias identifying the subscription.
108    pub track_alias: VarInt,
109    /// Group identifier.
110    pub group_id: VarInt,
111    /// Subgroup identifier within the group.
112    pub subgroup_id: VarInt,
113    /// Publisher priority for delivery ordering.
114    pub publisher_priority: u8,
115}
116
117impl SubgroupHeader {
118    /// Encode the subgroup header (always as explicit subgroup_id format).
119    pub fn encode(&self, buf: &mut impl BufMut) {
120        self.track_alias.encode(buf);
121        self.group_id.encode(buf);
122        match self.stream_type {
123            StreamType::SubgroupExplicit | StreamType::SubgroupExplicitExt => {
124                self.subgroup_id.encode(buf);
125            }
126            _ => {}
127        }
128        buf.put_u8(self.publisher_priority);
129    }
130
131    /// Decode a subgroup header (assumes explicit subgroup_id format).
132    ///
133    /// For stream-type-aware decoding, use [`Self::decode_with_type`].
134    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
135        Self::decode_with_type(StreamType::SubgroupExplicit, buf)
136    }
137
138    /// Decode a subgroup header with the specific stream type variant.
139    pub fn decode_with_type(
140        stream_type: StreamType,
141        buf: &mut impl Buf,
142    ) -> Result<Self, CodecError> {
143        let track_alias = VarInt::decode(buf)?;
144        let group_id = VarInt::decode(buf)?;
145        let subgroup_id = match stream_type {
146            StreamType::SubgroupZero | StreamType::SubgroupZeroExt => VarInt::from_usize(0),
147            StreamType::SubgroupExplicit | StreamType::SubgroupExplicitExt => VarInt::decode(buf)?,
148            // For FirstObj variants, subgroup_id is the first object's ID.
149            // We read it later from the first object. Set to 0 for now;
150            // the caller should update after reading the first object.
151            StreamType::SubgroupFirstObj | StreamType::SubgroupFirstObjExt => VarInt::from_usize(0),
152            _ => return Err(CodecError::InvalidField),
153        };
154        if buf.remaining() < 1 {
155            return Err(CodecError::UnexpectedEnd);
156        }
157        let publisher_priority = buf.get_u8();
158        Ok(Self { stream_type, track_alias, group_id, subgroup_id, publisher_priority })
159    }
160}
161
162// ============================================================
163// Object header within subgroup
164// ============================================================
165
166/// Object within a subgroup stream (draft-11).
167#[derive(Debug, Clone, PartialEq, Eq)]
168pub struct ObjectHeader {
169    /// Object identifier within the subgroup.
170    pub object_id: VarInt,
171    /// Total byte length of extension headers (0 if no extensions).
172    pub extension_headers_length: VarInt,
173    /// Raw extension bytes (opaque).
174    pub extensions: Vec<u8>,
175    /// Length of the object payload in bytes.
176    pub payload_length: VarInt,
177    /// Object status (Normal unless payload_length == 0).
178    pub object_status: ObjectStatus,
179}
180
181impl ObjectHeader {
182    /// Encode the object header (no extensions).
183    ///
184    /// For extension-aware encoding, use [`Self::encode_with_extensions`].
185    pub fn encode(&self, buf: &mut impl BufMut) {
186        self.encode_with_extensions(false, buf);
187    }
188
189    /// Encode the object header with extensions control.
190    pub fn encode_with_extensions(&self, has_extensions: bool, buf: &mut impl BufMut) {
191        self.object_id.encode(buf);
192        if has_extensions {
193            self.extension_headers_length.encode(buf);
194            buf.put_slice(&self.extensions);
195        }
196        self.payload_length.encode(buf);
197        if self.payload_length.into_inner() == 0 {
198            VarInt::from_usize(self.object_status as usize).encode(buf);
199        }
200    }
201
202    /// Decode an object header (no extensions).
203    ///
204    /// For extension-aware decoding, use [`Self::decode_with_extensions`].
205    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
206        Self::decode_with_extensions(false, buf)
207    }
208
209    /// Decode an object header with extensions control.
210    pub fn decode_with_extensions(
211        has_extensions: bool,
212        buf: &mut impl Buf,
213    ) -> Result<Self, CodecError> {
214        let object_id = VarInt::decode(buf)?;
215        let (extension_headers_length, extensions) = if has_extensions {
216            let ehl = VarInt::decode(buf)?;
217            let ext = read_extension_bytes(buf, ehl.into_inner())?;
218            (ehl, ext)
219        } else {
220            (VarInt::from_usize(0), Vec::new())
221        };
222        let payload_length = VarInt::decode(buf)?;
223        let object_status = if payload_length.into_inner() == 0 {
224            let sv = VarInt::decode(buf)?.into_inner();
225            ObjectStatus::from_u64(sv).ok_or(CodecError::InvalidField)?
226        } else {
227            ObjectStatus::Normal
228        };
229        Ok(Self { object_id, extension_headers_length, extensions, payload_length, object_status })
230    }
231}
232
233// ============================================================
234// Datagram (types 0x00, 0x01)
235// ============================================================
236
237/// Datagram header (draft-11, types 0x00/0x01).
238///
239/// Payload is the remaining bytes in the datagram after the header.
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct DatagramHeader {
242    /// Track alias identifying the subscription.
243    pub track_alias: VarInt,
244    /// Group identifier.
245    pub group_id: VarInt,
246    /// Object identifier within the group.
247    pub object_id: VarInt,
248    /// Publisher priority for delivery ordering.
249    pub publisher_priority: u8,
250    /// Total byte length of extension headers (0 for type 0x00).
251    pub extension_headers_length: VarInt,
252    /// Raw extension bytes.
253    pub extensions: Vec<u8>,
254}
255
256impl DatagramHeader {
257    /// Encode the datagram header (no extensions).
258    pub fn encode(&self, buf: &mut impl BufMut) {
259        self.encode_with_extensions(false, buf);
260    }
261
262    /// Encode the datagram header with extensions control.
263    pub fn encode_with_extensions(&self, has_extensions: bool, buf: &mut impl BufMut) {
264        self.track_alias.encode(buf);
265        self.group_id.encode(buf);
266        self.object_id.encode(buf);
267        buf.put_u8(self.publisher_priority);
268        if has_extensions {
269            self.extension_headers_length.encode(buf);
270            buf.put_slice(&self.extensions);
271        }
272    }
273
274    /// Decode a datagram header (no extensions).
275    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
276        Self::decode_with_extensions(false, buf)
277    }
278
279    /// Decode a datagram header with extensions control.
280    pub fn decode_with_extensions(
281        has_extensions: bool,
282        buf: &mut impl Buf,
283    ) -> Result<Self, CodecError> {
284        let track_alias = VarInt::decode(buf)?;
285        let group_id = VarInt::decode(buf)?;
286        let object_id = VarInt::decode(buf)?;
287        if buf.remaining() < 1 {
288            return Err(CodecError::UnexpectedEnd);
289        }
290        let publisher_priority = buf.get_u8();
291        let (extension_headers_length, extensions) = if has_extensions {
292            let ehl = VarInt::decode(buf)?;
293            let ext = read_extension_bytes(buf, ehl.into_inner())?;
294            (ehl, ext)
295        } else {
296            (VarInt::from_usize(0), Vec::new())
297        };
298        Ok(Self {
299            track_alias,
300            group_id,
301            object_id,
302            publisher_priority,
303            extension_headers_length,
304            extensions,
305        })
306    }
307}
308
309// ============================================================
310// Datagram Status (types 0x02, 0x03)
311// ============================================================
312
313/// Datagram status header (draft-11, types 0x02/0x03).
314#[derive(Debug, Clone, PartialEq, Eq)]
315pub struct DatagramStatusHeader {
316    /// Track alias identifying the subscription.
317    pub track_alias: VarInt,
318    /// Group identifier.
319    pub group_id: VarInt,
320    /// Object identifier within the group.
321    pub object_id: VarInt,
322    /// Publisher priority for delivery ordering.
323    pub publisher_priority: u8,
324    /// Total byte length of extension headers (0 for type 0x02).
325    pub extension_headers_length: VarInt,
326    /// Raw extension bytes.
327    pub extensions: Vec<u8>,
328    /// Object status code.
329    pub object_status: ObjectStatus,
330}
331
332impl DatagramStatusHeader {
333    /// Encode the datagram status header (no extensions).
334    pub fn encode(&self, buf: &mut impl BufMut) {
335        self.encode_with_extensions(false, buf);
336    }
337
338    /// Encode the datagram status header with extensions control.
339    pub fn encode_with_extensions(&self, has_extensions: bool, buf: &mut impl BufMut) {
340        self.track_alias.encode(buf);
341        self.group_id.encode(buf);
342        self.object_id.encode(buf);
343        buf.put_u8(self.publisher_priority);
344        if has_extensions {
345            self.extension_headers_length.encode(buf);
346            buf.put_slice(&self.extensions);
347        }
348        VarInt::from_usize(self.object_status as usize).encode(buf);
349    }
350
351    /// Decode a datagram status header (no extensions).
352    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
353        Self::decode_with_extensions(false, buf)
354    }
355
356    /// Decode a datagram status header with extensions control.
357    pub fn decode_with_extensions(
358        has_extensions: bool,
359        buf: &mut impl Buf,
360    ) -> Result<Self, CodecError> {
361        let track_alias = VarInt::decode(buf)?;
362        let group_id = VarInt::decode(buf)?;
363        let object_id = VarInt::decode(buf)?;
364        if buf.remaining() < 1 {
365            return Err(CodecError::UnexpectedEnd);
366        }
367        let publisher_priority = buf.get_u8();
368        let (extension_headers_length, extensions) = if has_extensions {
369            let ehl = VarInt::decode(buf)?;
370            let ext = read_extension_bytes(buf, ehl.into_inner())?;
371            (ehl, ext)
372        } else {
373            (VarInt::from_usize(0), Vec::new())
374        };
375        let sv = VarInt::decode(buf)?.into_inner();
376        let object_status = ObjectStatus::from_u64(sv).ok_or(CodecError::InvalidField)?;
377        Ok(Self {
378            track_alias,
379            group_id,
380            object_id,
381            publisher_priority,
382            extension_headers_length,
383            extensions,
384            object_status,
385        })
386    }
387}
388
389// ============================================================
390// Fetch stream (type 0x05)
391// ============================================================
392
393/// Fetch stream header (draft-11, type 0x05).
394///
395/// Contains only the request_id. Objects follow inline.
396#[derive(Debug, Clone, PartialEq, Eq)]
397pub struct FetchHeader {
398    /// Request ID this fetch responds to.
399    pub request_id: VarInt,
400}
401
402/// Object within a fetch stream (draft-11).
403#[derive(Debug, Clone, PartialEq, Eq)]
404pub struct FetchObjectHeader {
405    /// Group identifier.
406    pub group_id: VarInt,
407    /// Subgroup identifier within the group.
408    pub subgroup_id: VarInt,
409    /// Object identifier within the subgroup.
410    pub object_id: VarInt,
411    /// Publisher priority for delivery ordering.
412    pub publisher_priority: u8,
413    /// Total byte length of extension headers.
414    pub extension_headers_length: VarInt,
415    /// Raw extension bytes.
416    pub extensions: Vec<u8>,
417    /// Length of the object payload in bytes.
418    pub payload_length: VarInt,
419    /// Object status (Normal unless payload_length == 0).
420    pub object_status: ObjectStatus,
421}
422
423impl FetchHeader {
424    /// Encode the fetch header.
425    pub fn encode(&self, buf: &mut impl BufMut) {
426        self.request_id.encode(buf);
427    }
428
429    /// Decode a fetch header.
430    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
431        let request_id = VarInt::decode(buf)?;
432        Ok(Self { request_id })
433    }
434}
435
436impl FetchObjectHeader {
437    /// Encode the fetch object header.
438    pub fn encode(&self, buf: &mut impl BufMut) {
439        self.group_id.encode(buf);
440        self.subgroup_id.encode(buf);
441        self.object_id.encode(buf);
442        buf.put_u8(self.publisher_priority);
443        self.extension_headers_length.encode(buf);
444        buf.put_slice(&self.extensions);
445        self.payload_length.encode(buf);
446        if self.payload_length.into_inner() == 0 {
447            VarInt::from_usize(self.object_status as usize).encode(buf);
448        }
449    }
450
451    /// Decode a fetch object header.
452    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
453        let group_id = VarInt::decode(buf)?;
454        let subgroup_id = VarInt::decode(buf)?;
455        let object_id = VarInt::decode(buf)?;
456        if buf.remaining() < 1 {
457            return Err(CodecError::UnexpectedEnd);
458        }
459        let publisher_priority = buf.get_u8();
460        let extension_headers_length = VarInt::decode(buf)?;
461        let extensions = read_extension_bytes(buf, extension_headers_length.into_inner())?;
462        let payload_length = VarInt::decode(buf)?;
463        let object_status = if payload_length.into_inner() == 0 {
464            let sv = VarInt::decode(buf)?.into_inner();
465            ObjectStatus::from_u64(sv).ok_or(CodecError::InvalidField)?
466        } else {
467            ObjectStatus::Normal
468        };
469        Ok(Self {
470            group_id,
471            subgroup_id,
472            object_id,
473            publisher_priority,
474            extension_headers_length,
475            extensions,
476            payload_length,
477            object_status,
478        })
479    }
480}