Skip to main content

moqtap_codec/draft14/
data_stream.rs

1//! Draft-14 data streams (§10): subgroup streams, fetch streams, datagrams.
2//!
3//! This module follows the draft-14 spec exactly. Key shapes:
4//!
5//! * **Subgroup stream** (§10.4.2): starts with a Type byte `0x10..=0x1D`
6//!   whose bit-flags determine whether a Subgroup ID field is present,
7//!   whether the subgroup ID is zero or the first Object ID, whether
8//!   extension headers are present, and whether the stream ends at a
9//!   group boundary. Object IDs are delta-encoded relative to the
10//!   previous Object ID in the same stream.
11//!
12//! * **Fetch stream** (§10.4.4): Type `0x05`, Request ID, then a sequence
13//!   of self-describing objects until FIN.
14//!
15//! * **Datagram** (§10.3.1): Type byte `0x00..=0x07` or `0x20..=0x21`
16//!   with bit-flags for End of Group, Extensions Present, Object ID
17//!   Present, and Status vs Payload.
18
19use bytes::{Buf, BufMut};
20
21use super::types::ObjectStatus;
22use crate::error::CodecError;
23use crate::varint::VarInt;
24
25// ============================================================
26// Subgroup stream (Type 0x10..=0x1D)
27// ============================================================
28
29/// Subgroup stream type byte (§10.4.2, Table 7).
30///
31/// The 12 defined types encode four independent boolean fields in the
32/// low nibble:
33///
34/// * bit 0 (`0x01`) — Extensions Present
35/// * bit 1 (`0x02`) — Subgroup ID derives from first Object ID
36///   (only meaningful when bit 2 is clear)
37/// * bit 2 (`0x04`) — Subgroup ID Field Present (explicit Subgroup ID varint)
38/// * bit 3 (`0x08`) — Contains End of Group
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub struct SubgroupStreamType(u8);
41
42impl SubgroupStreamType {
43    /// The raw wire byte.
44    pub fn as_u8(self) -> u8 {
45        self.0
46    }
47
48    /// Create a [`SubgroupStreamType`] from its raw byte, validating
49    /// that it is one of the 12 defined values in Table 7.
50    pub fn from_u8(v: u8) -> Option<Self> {
51        if (0x10..=0x15).contains(&v) || (0x18..=0x1D).contains(&v) {
52            Some(SubgroupStreamType(v))
53        } else {
54            None
55        }
56    }
57
58    /// Build a subgroup stream type from its component flags.
59    ///
60    /// `subgroup_id_is_first_object` and `subgroup_id_field_present` are
61    /// mutually exclusive — if both are set, the resulting type has the
62    /// "Subgroup ID Field Present" bit set (bit 2 wins).
63    pub fn from_flags(
64        subgroup_id_field_present: bool,
65        subgroup_id_is_first_object: bool,
66        extensions_present: bool,
67        end_of_group: bool,
68    ) -> Self {
69        let mut v: u8 = 0x10;
70        if extensions_present {
71            v |= 0x01;
72        }
73        if subgroup_id_field_present {
74            v |= 0x04;
75        } else if subgroup_id_is_first_object {
76            v |= 0x02;
77        }
78        if end_of_group {
79            v |= 0x08;
80        }
81        SubgroupStreamType(v)
82    }
83
84    /// True if the header carries an explicit Subgroup ID varint.
85    pub fn has_subgroup_id_field(self) -> bool {
86        self.0 & 0x04 != 0
87    }
88
89    /// True if the subgroup ID is defined to equal the first Object ID
90    /// in the stream (applies only when [`Self::has_subgroup_id_field`]
91    /// is false).
92    pub fn subgroup_id_is_first_object(self) -> bool {
93        !self.has_subgroup_id_field() && (self.0 & 0x02 != 0)
94    }
95
96    /// True if every object in the stream carries extension headers.
97    pub fn extensions_present(self) -> bool {
98        self.0 & 0x01 != 0
99    }
100
101    /// True if the last object on the stream (prior to FIN) is the end
102    /// of its group.
103    pub fn contains_end_of_group(self) -> bool {
104        self.0 & 0x08 != 0
105    }
106}
107
108/// Subgroup stream header (§10.4.2, Figure 32).
109#[derive(Debug, Clone, PartialEq, Eq)]
110pub struct SubgroupHeader {
111    /// Type byte identifying the flag set for this stream.
112    pub stream_type: SubgroupStreamType,
113    /// Track alias (Section 10.1).
114    pub track_alias: VarInt,
115    /// Group ID.
116    pub group_id: VarInt,
117    /// Explicit Subgroup ID — present only when the stream type sets
118    /// `Subgroup ID Field Present = Yes`. For types where the subgroup
119    /// ID is implicit (0 or the first Object ID), the effective
120    /// subgroup ID is resolved on the receive side by the reader.
121    pub subgroup_id: Option<VarInt>,
122    /// Publisher priority (Section 7).
123    pub publisher_priority: u8,
124}
125
126impl SubgroupHeader {
127    /// Encode the header including the leading stream type byte.
128    pub fn encode(&self, buf: &mut impl BufMut) {
129        VarInt::from_u64(self.stream_type.as_u8() as u64).unwrap().encode(buf);
130        self.track_alias.encode(buf);
131        self.group_id.encode(buf);
132        if self.stream_type.has_subgroup_id_field() {
133            let sg = self.subgroup_id.unwrap_or_else(|| VarInt::from_u64(0).unwrap());
134            sg.encode(buf);
135        }
136        buf.put_u8(self.publisher_priority);
137    }
138
139    /// Decode a subgroup header (leading type byte + remaining fields).
140    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
141        let type_val = VarInt::decode(buf)?.into_inner();
142        if type_val > 0xFF {
143            return Err(CodecError::InvalidField);
144        }
145        let stream_type =
146            SubgroupStreamType::from_u8(type_val as u8).ok_or(CodecError::InvalidField)?;
147        let track_alias = VarInt::decode(buf)?;
148        let group_id = VarInt::decode(buf)?;
149        let subgroup_id =
150            if stream_type.has_subgroup_id_field() { Some(VarInt::decode(buf)?) } else { None };
151        if buf.remaining() < 1 {
152            return Err(CodecError::UnexpectedEnd);
153        }
154        let publisher_priority = buf.get_u8();
155        Ok(SubgroupHeader { stream_type, track_alias, group_id, subgroup_id, publisher_priority })
156    }
157}
158
159/// One object within a subgroup stream, with the Object ID already
160/// resolved from its delta encoding.
161#[derive(Debug, Clone, PartialEq, Eq)]
162pub struct SubgroupObject {
163    /// Resolved Object ID (delta decoded to an absolute value).
164    pub object_id: VarInt,
165    /// Raw extension-header bytes. Empty when the stream type has
166    /// `Extensions Present = No`, or when present but the length was 0.
167    /// The content is a sequence of Key-Value-Pairs (§10.2.1.2) but is
168    /// left opaque here — relays and subscribers that do not understand
169    /// specific extensions must forward or ignore the bytes unchanged.
170    pub extension_headers: Vec<u8>,
171    /// Object Status when `payload.is_empty()` and the object was sent
172    /// with an explicit status code; `None` when a non-empty payload
173    /// follows (status is implicitly [`ObjectStatus::Normal`]).
174    pub status: Option<ObjectStatus>,
175    /// Object payload. Empty when `status` is `Some(..)`.
176    pub payload: Vec<u8>,
177}
178
179/// Stateful reader for the object fields on a subgroup stream.
180///
181/// Object IDs on a subgroup stream are delta-encoded against the
182/// previous Object ID, and whether extension headers are present is
183/// fixed by the enclosing [`SubgroupHeader`]'s stream type. This reader
184/// carries that context across successive `read_object` calls.
185#[derive(Debug, Clone)]
186pub struct SubgroupObjectReader {
187    extensions_present: bool,
188    prev_object_id: Option<u64>,
189}
190
191impl SubgroupObjectReader {
192    /// Create a reader from a parsed subgroup header.
193    pub fn new(header: &SubgroupHeader) -> Self {
194        Self { extensions_present: header.stream_type.extensions_present(), prev_object_id: None }
195    }
196
197    /// Decode the next object from `buf`. Caller is responsible for
198    /// ensuring the buffer contains a complete object (draft-14 objects
199    /// are length-delimited by the payload-length field, so the buffer
200    /// boundary is known once the header portion has been consumed).
201    pub fn read_object(&mut self, buf: &mut impl Buf) -> Result<SubgroupObject, CodecError> {
202        let delta = VarInt::decode(buf)?.into_inner();
203        let object_id_val = match self.prev_object_id {
204            None => delta,
205            Some(prev) => prev
206                .checked_add(1)
207                .and_then(|v| v.checked_add(delta))
208                .ok_or(CodecError::InvalidField)?,
209        };
210        self.prev_object_id = Some(object_id_val);
211        let object_id = VarInt::from_u64(object_id_val).map_err(|_| CodecError::InvalidField)?;
212
213        let extension_headers = if self.extensions_present {
214            let ext_len = VarInt::decode(buf)?.into_inner() as usize;
215            crate::types::read_bytes(buf, ext_len)?
216        } else {
217            Vec::new()
218        };
219
220        let payload_length = VarInt::decode(buf)?.into_inner() as usize;
221        let (status, payload) = if payload_length == 0 {
222            let status_val = VarInt::decode(buf)?.into_inner();
223            let status = ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?;
224            (Some(status), Vec::new())
225        } else {
226            let payload = crate::types::read_bytes(buf, payload_length)?;
227            (None, payload)
228        };
229
230        Ok(SubgroupObject { object_id, extension_headers, status, payload })
231    }
232
233    /// Serialize a subgroup object using the reader's delta state. Intended
234    /// for senders that want to build a stream incrementally — tracks
235    /// `prev_object_id` so successive calls produce correct deltas.
236    ///
237    /// Returns an error if `object.object_id <= prev_object_id`, which
238    /// would produce an invalid delta.
239    pub fn write_object(
240        &mut self,
241        object: &SubgroupObject,
242        buf: &mut impl BufMut,
243    ) -> Result<(), CodecError> {
244        let oid = object.object_id.into_inner();
245        let delta = match self.prev_object_id {
246            None => oid,
247            Some(prev) => oid
248                .checked_sub(prev)
249                .and_then(|v| v.checked_sub(1))
250                .ok_or(CodecError::InvalidField)?,
251        };
252        VarInt::from_u64(delta).map_err(|_| CodecError::InvalidField)?.encode(buf);
253        if self.extensions_present {
254            VarInt::from_u64(object.extension_headers.len() as u64)
255                .map_err(|_| CodecError::InvalidField)?
256                .encode(buf);
257            buf.put_slice(&object.extension_headers);
258        }
259        if let Some(status) = object.status {
260            VarInt::from_u64(0).unwrap().encode(buf);
261            VarInt::from_u64(status.as_u64()).unwrap().encode(buf);
262        } else {
263            VarInt::from_u64(object.payload.len() as u64)
264                .map_err(|_| CodecError::InvalidField)?
265                .encode(buf);
266            buf.put_slice(&object.payload);
267        }
268        self.prev_object_id = Some(oid);
269        Ok(())
270    }
271}
272
273// ============================================================
274// Fetch stream (Type 0x05)
275// ============================================================
276
277/// Draft-14 fetch stream type byte.
278pub const FETCH_STREAM_TYPE: u8 = 0x05;
279
280/// Fetch stream header (§10.4.4, Figure 34).
281#[derive(Debug, Clone, PartialEq, Eq)]
282pub struct FetchHeader {
283    /// Request ID from the originating FETCH control message.
284    pub request_id: VarInt,
285}
286
287impl FetchHeader {
288    /// Encode the header including the leading type byte.
289    pub fn encode(&self, buf: &mut impl BufMut) {
290        VarInt::from_u64(FETCH_STREAM_TYPE as u64).unwrap().encode(buf);
291        self.request_id.encode(buf);
292    }
293
294    /// Decode the header. Errors if the type byte is not `0x05`.
295    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
296        let type_val = VarInt::decode(buf)?.into_inner();
297        if type_val != FETCH_STREAM_TYPE as u64 {
298            return Err(CodecError::InvalidField);
299        }
300        let request_id = VarInt::decode(buf)?;
301        Ok(FetchHeader { request_id })
302    }
303}
304
305/// One object carried on a fetch stream (§10.4.4, Figure 35).
306///
307/// Every object on a fetch stream is self-describing — unlike subgroup
308/// streams, there is no delta encoding and extension headers are always
309/// length-prefixed (the length is zero when absent).
310#[derive(Debug, Clone, PartialEq, Eq)]
311pub struct FetchObject {
312    /// Group ID.
313    pub group_id: VarInt,
314    /// Subgroup ID. For objects whose Forwarding Preference is Datagram,
315    /// this is set to the Object ID.
316    pub subgroup_id: VarInt,
317    /// Object ID.
318    pub object_id: VarInt,
319    /// Publisher priority.
320    pub publisher_priority: u8,
321    /// Raw extension-header bytes (opaque sequence of Key-Value-Pairs).
322    pub extension_headers: Vec<u8>,
323    /// Object status when `payload.is_empty()`, otherwise `None`.
324    pub status: Option<ObjectStatus>,
325    /// Object payload.
326    pub payload: Vec<u8>,
327}
328
329impl FetchObject {
330    /// Encode one fetch object.
331    pub fn encode(&self, buf: &mut impl BufMut) {
332        self.group_id.encode(buf);
333        self.subgroup_id.encode(buf);
334        self.object_id.encode(buf);
335        buf.put_u8(self.publisher_priority);
336        VarInt::from_u64(self.extension_headers.len() as u64).unwrap().encode(buf);
337        buf.put_slice(&self.extension_headers);
338        if let Some(status) = self.status {
339            VarInt::from_u64(0).unwrap().encode(buf);
340            VarInt::from_u64(status.as_u64()).unwrap().encode(buf);
341        } else {
342            VarInt::from_u64(self.payload.len() as u64).unwrap().encode(buf);
343            buf.put_slice(&self.payload);
344        }
345    }
346
347    /// Decode one fetch object.
348    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
349        let group_id = VarInt::decode(buf)?;
350        let subgroup_id = VarInt::decode(buf)?;
351        let object_id = VarInt::decode(buf)?;
352        if buf.remaining() < 1 {
353            return Err(CodecError::UnexpectedEnd);
354        }
355        let publisher_priority = buf.get_u8();
356        let ext_len = VarInt::decode(buf)?.into_inner() as usize;
357        let extension_headers = crate::types::read_bytes(buf, ext_len)?;
358        let payload_length = VarInt::decode(buf)?.into_inner() as usize;
359        let (status, payload) = if payload_length == 0 {
360            let status_val = VarInt::decode(buf)?.into_inner();
361            let status = ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?;
362            (Some(status), Vec::new())
363        } else {
364            (None, crate::types::read_bytes(buf, payload_length)?)
365        };
366        Ok(FetchObject {
367            group_id,
368            subgroup_id,
369            object_id,
370            publisher_priority,
371            extension_headers,
372            status,
373            payload,
374        })
375    }
376}
377
378// ============================================================
379// Datagram (Type 0x00..=0x07, 0x20..=0x21)
380// ============================================================
381
382/// Datagram type byte (§10.3.1, Table 6).
383///
384/// Bit layout (low nibble):
385///
386/// * bit 0 (`0x01`) — Extensions Present
387/// * bit 1 (`0x02`) — End of Group
388/// * bit 2 (`0x04`) — Object ID **absent** (when set, Object ID = 0)
389///
390/// Status variants use the high nibble (`0x20..=0x21`). Only types
391/// `0x00..=0x07`, `0x20`, `0x21` are defined.
392#[derive(Debug, Clone, Copy, PartialEq, Eq)]
393pub struct DatagramType(u8);
394
395impl DatagramType {
396    /// Raw wire byte.
397    pub fn as_u8(self) -> u8 {
398        self.0
399    }
400
401    /// Validate and wrap a raw wire byte.
402    pub fn from_u8(v: u8) -> Option<Self> {
403        if (0x00..=0x07).contains(&v) || v == 0x20 || v == 0x21 {
404            Some(DatagramType(v))
405        } else {
406            None
407        }
408    }
409
410    /// Build a payload-bearing datagram type (`0x00..=0x07`).
411    pub fn payload(object_id_present: bool, extensions_present: bool, end_of_group: bool) -> Self {
412        let mut v: u8 = 0x00;
413        if extensions_present {
414            v |= 0x01;
415        }
416        if end_of_group {
417            v |= 0x02;
418        }
419        if !object_id_present {
420            v |= 0x04;
421        }
422        DatagramType(v)
423    }
424
425    /// Build a status-only datagram type (`0x20` or `0x21`).
426    pub fn status(extensions_present: bool) -> Self {
427        if extensions_present {
428            DatagramType(0x21)
429        } else {
430            DatagramType(0x20)
431        }
432    }
433
434    /// True when the datagram carries an Object Status instead of a
435    /// payload (types `0x20` / `0x21`).
436    pub fn is_status(self) -> bool {
437        self.0 >= 0x20
438    }
439
440    /// True when the datagram carries an explicit Object ID field.
441    pub fn object_id_present(self) -> bool {
442        // Bit 2 is only meaningful in the 0x00..=0x07 range; status
443        // variants (0x20/0x21) always carry an Object ID per Table 6.
444        if self.is_status() {
445            true
446        } else {
447            self.0 & 0x04 == 0
448        }
449    }
450
451    /// True if the last object of the group is conveyed.
452    pub fn end_of_group(self) -> bool {
453        !self.is_status() && (self.0 & 0x02 != 0)
454    }
455
456    /// True if extension headers are present in this datagram.
457    pub fn extensions_present(self) -> bool {
458        self.0 & 0x01 != 0
459    }
460}
461
462/// Datagram carrying a single object (§10.3.1, Figure 31).
463#[derive(Debug, Clone, PartialEq, Eq)]
464pub struct DatagramObject {
465    /// Datagram type byte.
466    pub datagram_type: DatagramType,
467    /// Track alias.
468    pub track_alias: VarInt,
469    /// Group ID.
470    pub group_id: VarInt,
471    /// Object ID. Defaults to 0 when
472    /// [`DatagramType::object_id_present`] is false.
473    pub object_id: VarInt,
474    /// Publisher priority.
475    pub publisher_priority: u8,
476    /// Raw extension-header bytes (empty unless
477    /// [`DatagramType::extensions_present`] is true).
478    pub extension_headers: Vec<u8>,
479    /// Object status (only present for status-type datagrams).
480    pub status: Option<ObjectStatus>,
481    /// Object payload (empty for status-type datagrams).
482    pub payload: Vec<u8>,
483}
484
485impl DatagramObject {
486    /// Encode the datagram in full.
487    pub fn encode(&self, buf: &mut impl BufMut) {
488        VarInt::from_u64(self.datagram_type.as_u8() as u64).unwrap().encode(buf);
489        self.track_alias.encode(buf);
490        self.group_id.encode(buf);
491        if self.datagram_type.object_id_present() {
492            self.object_id.encode(buf);
493        }
494        buf.put_u8(self.publisher_priority);
495        if self.datagram_type.extensions_present() {
496            VarInt::from_u64(self.extension_headers.len() as u64).unwrap().encode(buf);
497            buf.put_slice(&self.extension_headers);
498        }
499        if self.datagram_type.is_status() {
500            let status = self.status.unwrap_or(ObjectStatus::Normal);
501            VarInt::from_u64(status.as_u64()).unwrap().encode(buf);
502        } else {
503            buf.put_slice(&self.payload);
504        }
505    }
506
507    /// Decode a datagram. The buffer must contain the full datagram —
508    /// payload-bearing types extend to the end of the QUIC datagram,
509    /// which the caller is responsible for delimiting.
510    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
511        let type_val = VarInt::decode(buf)?.into_inner();
512        if type_val > 0xFF {
513            return Err(CodecError::InvalidField);
514        }
515        let datagram_type =
516            DatagramType::from_u8(type_val as u8).ok_or(CodecError::InvalidField)?;
517        let track_alias = VarInt::decode(buf)?;
518        let group_id = VarInt::decode(buf)?;
519        let object_id = if datagram_type.object_id_present() {
520            VarInt::decode(buf)?
521        } else {
522            VarInt::from_u64(0).unwrap()
523        };
524        if buf.remaining() < 1 {
525            return Err(CodecError::UnexpectedEnd);
526        }
527        let publisher_priority = buf.get_u8();
528        let extension_headers = if datagram_type.extensions_present() {
529            let ext_len = VarInt::decode(buf)?.into_inner() as usize;
530            crate::types::read_bytes(buf, ext_len)?
531        } else {
532            Vec::new()
533        };
534        let (status, payload) = if datagram_type.is_status() {
535            let status_val = VarInt::decode(buf)?.into_inner();
536            let status = ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?;
537            (Some(status), Vec::new())
538        } else {
539            let remaining = buf.remaining();
540            (None, crate::types::read_bytes(buf, remaining)?)
541        };
542        Ok(DatagramObject {
543            datagram_type,
544            track_alias,
545            group_id,
546            object_id,
547            publisher_priority,
548            extension_headers,
549            status,
550            payload,
551        })
552    }
553}
554
555#[cfg(test)]
556mod tests {
557    use super::*;
558
559    fn vi(v: u64) -> VarInt {
560        VarInt::from_u64(v).unwrap()
561    }
562
563    // ── SubgroupStreamType flag helpers ─────────────────────
564
565    #[test]
566    fn subgroup_type_0x10_all_off() {
567        let t = SubgroupStreamType::from_u8(0x10).unwrap();
568        assert!(!t.has_subgroup_id_field());
569        assert!(!t.subgroup_id_is_first_object());
570        assert!(!t.extensions_present());
571        assert!(!t.contains_end_of_group());
572    }
573
574    #[test]
575    fn subgroup_type_0x15_explicit_with_ext() {
576        let t = SubgroupStreamType::from_u8(0x15).unwrap();
577        assert!(t.has_subgroup_id_field());
578        assert!(!t.subgroup_id_is_first_object());
579        assert!(t.extensions_present());
580        assert!(!t.contains_end_of_group());
581    }
582
583    #[test]
584    fn subgroup_type_0x1d_all_on() {
585        let t = SubgroupStreamType::from_u8(0x1D).unwrap();
586        assert!(t.has_subgroup_id_field());
587        assert!(t.extensions_present());
588        assert!(t.contains_end_of_group());
589    }
590
591    #[test]
592    fn subgroup_type_0x12_first_object() {
593        let t = SubgroupStreamType::from_u8(0x12).unwrap();
594        assert!(!t.has_subgroup_id_field());
595        assert!(t.subgroup_id_is_first_object());
596        assert!(!t.extensions_present());
597    }
598
599    #[test]
600    fn subgroup_type_rejects_undefined() {
601        for bad in [0x00u8, 0x0F, 0x16, 0x17, 0x1E, 0x1F, 0x20] {
602            assert!(SubgroupStreamType::from_u8(bad).is_none(), "0x{bad:02x} should be rejected");
603        }
604    }
605
606    #[test]
607    fn subgroup_type_from_flags_roundtrip() {
608        for &f_sg in &[false, true] {
609            for &f_first in &[false, true] {
610                for &f_ext in &[false, true] {
611                    for &f_eog in &[false, true] {
612                        let t = SubgroupStreamType::from_flags(f_sg, f_first, f_ext, f_eog);
613                        assert_eq!(t.has_subgroup_id_field(), f_sg);
614                        // subgroup_id_is_first_object only meaningful when
615                        // explicit field is absent
616                        if !f_sg {
617                            assert_eq!(t.subgroup_id_is_first_object(), f_first);
618                        }
619                        assert_eq!(t.extensions_present(), f_ext);
620                        assert_eq!(t.contains_end_of_group(), f_eog);
621                    }
622                }
623            }
624        }
625    }
626
627    // ── SubgroupHeader round-trip ───────────────────────────
628
629    #[test]
630    fn subgroup_header_roundtrip_0x10() {
631        let h = SubgroupHeader {
632            stream_type: SubgroupStreamType::from_u8(0x10).unwrap(),
633            track_alias: vi(1),
634            group_id: vi(0),
635            subgroup_id: None,
636            publisher_priority: 128,
637        };
638        let mut buf = Vec::new();
639        h.encode(&mut buf);
640        assert_eq!(buf[0], 0x10);
641        let decoded = SubgroupHeader::decode(&mut &buf[..]).unwrap();
642        assert_eq!(decoded, h);
643    }
644
645    #[test]
646    fn subgroup_header_roundtrip_explicit_subgroup() {
647        let h = SubgroupHeader {
648            stream_type: SubgroupStreamType::from_u8(0x14).unwrap(),
649            track_alias: vi(5),
650            group_id: vi(10),
651            subgroup_id: Some(vi(2)),
652            publisher_priority: 64,
653        };
654        let mut buf = Vec::new();
655        h.encode(&mut buf);
656        let decoded = SubgroupHeader::decode(&mut &buf[..]).unwrap();
657        assert_eq!(decoded, h);
658    }
659
660    #[test]
661    fn subgroup_header_decode_rejects_bad_type() {
662        // type byte 0x16 is undefined
663        let buf = [0x16u8, 0x01, 0x00, 0x80];
664        let err = SubgroupHeader::decode(&mut &buf[..]).unwrap_err();
665        assert!(matches!(err, CodecError::InvalidField));
666    }
667
668    // ── Subgroup object reader (delta + extensions) ─────────
669
670    #[test]
671    fn subgroup_reader_delta_sequential_ids() {
672        // Type 0x10: no subgroup field, no extensions, no eog
673        let header = SubgroupHeader {
674            stream_type: SubgroupStreamType::from_u8(0x10).unwrap(),
675            track_alias: vi(1),
676            group_id: vi(0),
677            subgroup_id: None,
678            publisher_priority: 0,
679        };
680
681        let mut write = SubgroupObjectReader::new(&header);
682        let mut buf = Vec::new();
683        for i in 0..3u64 {
684            let obj = SubgroupObject {
685                object_id: vi(i),
686                extension_headers: vec![],
687                status: None,
688                payload: vec![0xAA + i as u8; 4],
689            };
690            write.write_object(&obj, &mut buf).unwrap();
691        }
692
693        let mut read = SubgroupObjectReader::new(&header);
694        let mut cursor = &buf[..];
695        let o0 = read.read_object(&mut cursor).unwrap();
696        assert_eq!(o0.object_id.into_inner(), 0);
697        assert_eq!(o0.payload, vec![0xAA; 4]);
698        let o1 = read.read_object(&mut cursor).unwrap();
699        assert_eq!(o1.object_id.into_inner(), 1);
700        let o2 = read.read_object(&mut cursor).unwrap();
701        assert_eq!(o2.object_id.into_inner(), 2);
702    }
703
704    #[test]
705    fn subgroup_reader_delta_sparse_ids() {
706        // Object IDs 5, 10, 11 — deltas are 5, 4, 0
707        let header = SubgroupHeader {
708            stream_type: SubgroupStreamType::from_u8(0x10).unwrap(),
709            track_alias: vi(1),
710            group_id: vi(0),
711            subgroup_id: None,
712            publisher_priority: 0,
713        };
714        let mut write = SubgroupObjectReader::new(&header);
715        let mut buf = Vec::new();
716        for &id in &[5u64, 10, 11] {
717            write
718                .write_object(
719                    &SubgroupObject {
720                        object_id: vi(id),
721                        extension_headers: vec![],
722                        status: None,
723                        payload: vec![1, 2, 3],
724                    },
725                    &mut buf,
726                )
727                .unwrap();
728        }
729        let mut read = SubgroupObjectReader::new(&header);
730        let mut cursor = &buf[..];
731        assert_eq!(read.read_object(&mut cursor).unwrap().object_id.into_inner(), 5);
732        assert_eq!(read.read_object(&mut cursor).unwrap().object_id.into_inner(), 10);
733        assert_eq!(read.read_object(&mut cursor).unwrap().object_id.into_inner(), 11);
734    }
735
736    #[test]
737    fn subgroup_reader_with_extensions() {
738        // Type 0x11: extensions present
739        let header = SubgroupHeader {
740            stream_type: SubgroupStreamType::from_u8(0x11).unwrap(),
741            track_alias: vi(1),
742            group_id: vi(0),
743            subgroup_id: None,
744            publisher_priority: 0,
745        };
746        let mut write = SubgroupObjectReader::new(&header);
747        let mut buf = Vec::new();
748        write
749            .write_object(
750                &SubgroupObject {
751                    object_id: vi(0),
752                    extension_headers: vec![0x01, 0x02, 0x03],
753                    status: None,
754                    payload: vec![0xFF],
755                },
756                &mut buf,
757            )
758            .unwrap();
759        let mut read = SubgroupObjectReader::new(&header);
760        let o = read.read_object(&mut &buf[..]).unwrap();
761        assert_eq!(o.extension_headers, vec![0x01, 0x02, 0x03]);
762        assert_eq!(o.payload, vec![0xFF]);
763    }
764
765    #[test]
766    fn subgroup_reader_status_object() {
767        let header = SubgroupHeader {
768            stream_type: SubgroupStreamType::from_u8(0x10).unwrap(),
769            track_alias: vi(1),
770            group_id: vi(0),
771            subgroup_id: None,
772            publisher_priority: 0,
773        };
774        let mut write = SubgroupObjectReader::new(&header);
775        let mut buf = Vec::new();
776        write
777            .write_object(
778                &SubgroupObject {
779                    object_id: vi(7),
780                    extension_headers: vec![],
781                    status: Some(ObjectStatus::EndOfGroup),
782                    payload: vec![],
783                },
784                &mut buf,
785            )
786            .unwrap();
787        let mut read = SubgroupObjectReader::new(&header);
788        let o = read.read_object(&mut &buf[..]).unwrap();
789        assert_eq!(o.object_id.into_inner(), 7);
790        assert_eq!(o.status, Some(ObjectStatus::EndOfGroup));
791        assert!(o.payload.is_empty());
792    }
793
794    // ── FetchHeader + FetchObject ───────────────────────────
795
796    #[test]
797    fn fetch_header_roundtrip() {
798        let h = FetchHeader { request_id: vi(99) };
799        let mut buf = Vec::new();
800        h.encode(&mut buf);
801        assert_eq!(buf[0], 0x05);
802        assert_eq!(FetchHeader::decode(&mut &buf[..]).unwrap(), h);
803    }
804
805    #[test]
806    fn fetch_header_rejects_wrong_type() {
807        let buf = [0x10u8, 0x05];
808        assert!(FetchHeader::decode(&mut &buf[..]).is_err());
809    }
810
811    #[test]
812    fn fetch_object_roundtrip_with_payload() {
813        let obj = FetchObject {
814            group_id: vi(3),
815            subgroup_id: vi(1),
816            object_id: vi(7),
817            publisher_priority: 200,
818            extension_headers: vec![0xAA, 0xBB],
819            status: None,
820            payload: vec![1, 2, 3, 4],
821        };
822        let mut buf = Vec::new();
823        obj.encode(&mut buf);
824        assert_eq!(FetchObject::decode(&mut &buf[..]).unwrap(), obj);
825    }
826
827    #[test]
828    fn fetch_object_roundtrip_status() {
829        let obj = FetchObject {
830            group_id: vi(3),
831            subgroup_id: vi(1),
832            object_id: vi(8),
833            publisher_priority: 200,
834            extension_headers: vec![],
835            status: Some(ObjectStatus::ObjectDoesNotExist),
836            payload: vec![],
837        };
838        let mut buf = Vec::new();
839        obj.encode(&mut buf);
840        assert_eq!(FetchObject::decode(&mut &buf[..]).unwrap(), obj);
841    }
842
843    // ── DatagramType ────────────────────────────────────────
844
845    #[test]
846    fn datagram_type_variants() {
847        let t0 = DatagramType::from_u8(0x00).unwrap();
848        assert!(t0.object_id_present());
849        assert!(!t0.extensions_present());
850        assert!(!t0.end_of_group());
851        assert!(!t0.is_status());
852
853        let t7 = DatagramType::from_u8(0x07).unwrap();
854        assert!(!t7.object_id_present()); // bit 2 set
855        assert!(t7.extensions_present());
856        assert!(t7.end_of_group());
857        assert!(!t7.is_status());
858
859        let t20 = DatagramType::from_u8(0x20).unwrap();
860        assert!(t20.is_status());
861        assert!(!t20.extensions_present());
862        // Status datagrams always carry Object ID
863        assert!(t20.object_id_present());
864
865        let t21 = DatagramType::from_u8(0x21).unwrap();
866        assert!(t21.is_status());
867        assert!(t21.extensions_present());
868    }
869
870    #[test]
871    fn datagram_type_rejects_undefined() {
872        for bad in [0x08u8, 0x10, 0x1F, 0x22, 0x80] {
873            assert!(DatagramType::from_u8(bad).is_none(), "0x{bad:02x}");
874        }
875    }
876
877    // ── DatagramObject round-trip ───────────────────────────
878
879    #[test]
880    fn datagram_object_0x00_roundtrip() {
881        let d = DatagramObject {
882            datagram_type: DatagramType::from_u8(0x00).unwrap(),
883            track_alias: vi(1),
884            group_id: vi(2),
885            object_id: vi(3),
886            publisher_priority: 100,
887            extension_headers: vec![],
888            status: None,
889            payload: vec![0xDE, 0xAD, 0xBE, 0xEF],
890        };
891        let mut buf = Vec::new();
892        d.encode(&mut buf);
893        assert_eq!(DatagramObject::decode(&mut &buf[..]).unwrap(), d);
894    }
895
896    #[test]
897    fn datagram_object_0x04_no_object_id() {
898        // 0x04: no object id field, implicit 0
899        let d = DatagramObject {
900            datagram_type: DatagramType::from_u8(0x04).unwrap(),
901            track_alias: vi(1),
902            group_id: vi(2),
903            object_id: vi(0),
904            publisher_priority: 100,
905            extension_headers: vec![],
906            status: None,
907            payload: vec![0xAA],
908        };
909        let mut buf = Vec::new();
910        d.encode(&mut buf);
911        let decoded = DatagramObject::decode(&mut &buf[..]).unwrap();
912        assert_eq!(decoded, d);
913    }
914
915    #[test]
916    fn datagram_object_0x21_status_with_extensions() {
917        let d = DatagramObject {
918            datagram_type: DatagramType::from_u8(0x21).unwrap(),
919            track_alias: vi(9),
920            group_id: vi(4),
921            object_id: vi(11),
922            publisher_priority: 50,
923            extension_headers: vec![0xCA, 0xFE],
924            status: Some(ObjectStatus::EndOfTrack),
925            payload: vec![],
926        };
927        let mut buf = Vec::new();
928        d.encode(&mut buf);
929        assert_eq!(DatagramObject::decode(&mut &buf[..]).unwrap(), d);
930    }
931}