Skip to main content

moqtap_codec/draft16/
data_stream.rs

1//! Draft-16 data stream header encoding and decoding.
2//!
3//! Draft-16 subgroup type-byte flag layout (differs from draft-15):
4//! - `& 0x01`: extensions present on objects
5//! - `& 0x02`: subgroup_id_mode bit — when set, subgroup_id = first object_id
6//! - `& 0x04`: explicit subgroup_id present on the wire
7//! - `& 0x08`: end-of-group marker
8//! - `& 0x20`: no publisher_priority (0x30+ types)
9//!
10//! Draft-16 datagram type-byte flag layout:
11//! - `0x01`: extensions present (byte-length-prefixed blob)
12//! - `0x02`: end-of-group
13//! - `0x04`: no object_id (object_id = 0 implied)
14//! - `0x08`: default priority (priority omitted, inherited)
15//! - `0x20`: status datagram (carries object_status instead of payload)
16//!
17//! Extension headers in draft-16 are byte-length-prefixed opaque blobs
18//! (not count-prefixed as in draft-14).
19
20use crate::error::CodecError;
21use crate::varint::VarInt;
22use bytes::{Buf, BufMut};
23
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct SubgroupHeader {
26    pub header_type: u8,
27    pub track_alias: VarInt,
28    pub group_id: VarInt,
29    pub subgroup_id: VarInt,
30    pub publisher_priority: Option<u8>,
31}
32
33impl SubgroupHeader {
34    pub fn has_extensions(&self) -> bool {
35        self.header_type & 0x01 != 0
36    }
37
38    /// When set, the subgroup_id is implicitly the first object's ID
39    /// (not transmitted on the wire).
40    pub fn subgroup_id_from_first_object(&self) -> bool {
41        self.header_type & 0x02 != 0
42    }
43
44    pub fn has_explicit_subgroup_id(&self) -> bool {
45        self.header_type & 0x04 != 0
46    }
47
48    pub fn has_end_of_group(&self) -> bool {
49        self.header_type & 0x08 != 0
50    }
51
52    pub fn has_priority(&self) -> bool {
53        self.header_type & 0x20 == 0
54    }
55
56    pub fn encode(&self, buf: &mut impl BufMut) {
57        VarInt::from_usize(self.header_type as usize).encode(buf);
58        self.track_alias.encode(buf);
59        self.group_id.encode(buf);
60        if self.has_explicit_subgroup_id() {
61            self.subgroup_id.encode(buf);
62        }
63        if let Some(p) = self.publisher_priority {
64            buf.put_u8(p);
65        }
66    }
67
68    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
69        let header_type = VarInt::decode(buf)?.into_inner() as u8;
70        let base = header_type & 0xD0;
71        if base != 0x10 && base != 0x30 {
72            return Err(CodecError::InvalidField);
73        }
74        let track_alias = VarInt::decode(buf)?;
75        let group_id = VarInt::decode(buf)?;
76        let subgroup_id =
77            if header_type & 0x04 != 0 { VarInt::decode(buf)? } else { VarInt::from_usize(0) };
78        let publisher_priority = if header_type & 0x20 == 0 {
79            if buf.remaining() < 1 {
80                return Err(CodecError::UnexpectedEnd);
81            }
82            Some(buf.get_u8())
83        } else {
84            None
85        };
86        Ok(Self { header_type, track_alias, group_id, subgroup_id, publisher_priority })
87    }
88}
89
90/// One object within a draft-16 subgroup stream with its Object ID
91/// already resolved from the delta encoding. See
92/// [`SubgroupObjectReader`] for stateful encode/decode.
93#[derive(Debug, Clone, PartialEq, Eq)]
94pub struct SubgroupObject {
95    pub object_id: VarInt,
96    pub extension_headers: Vec<u8>,
97    pub payload_length: VarInt,
98    pub object_status: Option<VarInt>,
99    pub payload: Vec<u8>,
100}
101
102/// Stateful reader/writer for draft-16 subgroup objects. Mirrors the
103/// draft-15 semantics (delta-encoded object IDs and header-typed
104/// extension presence).
105#[derive(Debug, Clone)]
106pub struct SubgroupObjectReader {
107    extensions_present: bool,
108    prev_object_id: Option<u64>,
109}
110
111impl SubgroupObjectReader {
112    pub fn new(header: &SubgroupHeader) -> Self {
113        Self { extensions_present: header.has_extensions(), prev_object_id: None }
114    }
115
116    pub fn read_object(&mut self, buf: &mut impl Buf) -> Result<SubgroupObject, CodecError> {
117        let delta = VarInt::decode(buf)?.into_inner();
118        // Draft-16 subgroup object delta encoding:
119        // - First object: `delta` is the absolute object_id.
120        // - Subsequent objects (no extensions): `delta` gap to next id,
121        //   resolved as `prev + delta + 1`.
122        // - Subsequent objects (extensions flag set): `delta` is the
123        //   already-adjusted offset, resolved as `prev + delta`.
124        let object_id_val = match self.prev_object_id {
125            None => delta,
126            Some(prev) => {
127                if self.extensions_present {
128                    prev.checked_add(delta).ok_or(CodecError::InvalidField)?
129                } else {
130                    prev.checked_add(1)
131                        .and_then(|v| v.checked_add(delta))
132                        .ok_or(CodecError::InvalidField)?
133                }
134            }
135        };
136        self.prev_object_id = Some(object_id_val);
137        let object_id = VarInt::from_u64(object_id_val).map_err(|_| CodecError::InvalidField)?;
138
139        // Draft-16: extensions are a byte-length-prefixed opaque blob.
140        let extension_headers = if self.extensions_present {
141            let ext_len = VarInt::decode(buf)?.into_inner() as usize;
142            crate::types::read_bytes(buf, ext_len)?
143        } else {
144            Vec::new()
145        };
146
147        let payload_length_vi = VarInt::decode(buf)?;
148        let payload_length_val = payload_length_vi.into_inner() as usize;
149        let (object_status, payload) = if payload_length_val == 0 {
150            let status = VarInt::decode(buf)?;
151            (Some(status), Vec::new())
152        } else {
153            let payload = crate::types::read_bytes(buf, payload_length_val)?;
154            (None, payload)
155        };
156
157        Ok(SubgroupObject {
158            object_id,
159            extension_headers,
160            payload_length: payload_length_vi,
161            object_status,
162            payload,
163        })
164    }
165
166    pub fn write_object(
167        &mut self,
168        object: &SubgroupObject,
169        buf: &mut impl BufMut,
170    ) -> Result<(), CodecError> {
171        let oid = object.object_id.into_inner();
172        let delta = match self.prev_object_id {
173            None => oid,
174            Some(prev) => {
175                if self.extensions_present {
176                    oid.checked_sub(prev).ok_or(CodecError::InvalidField)?
177                } else {
178                    oid.checked_sub(prev)
179                        .and_then(|v| v.checked_sub(1))
180                        .ok_or(CodecError::InvalidField)?
181                }
182            }
183        };
184        VarInt::from_u64(delta).map_err(|_| CodecError::InvalidField)?.encode(buf);
185        if self.extensions_present {
186            let ext_len = object.extension_headers.len();
187            VarInt::from_usize(ext_len).encode(buf);
188            buf.put_slice(&object.extension_headers);
189        }
190        object.payload_length.encode(buf);
191        if object.payload_length.into_inner() == 0 {
192            if let Some(s) = &object.object_status {
193                s.encode(buf);
194            } else {
195                VarInt::from_u64(0).unwrap().encode(buf);
196            }
197        } else {
198            buf.put_slice(&object.payload);
199        }
200        self.prev_object_id = Some(oid);
201        Ok(())
202    }
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct DatagramHeader {
207    pub datagram_type: u8,
208    pub track_alias: VarInt,
209    pub group_id: VarInt,
210    pub object_id: VarInt,
211    /// Publisher priority — `None` when the DEFAULT_PRIORITY flag is set and
212    /// the priority is inherited from the subscription's control message.
213    pub publisher_priority: Option<u8>,
214    /// Opaque extension-headers blob (only when flag 0x01 is set).
215    pub extension_headers: Vec<u8>,
216    pub object_status: Option<VarInt>,
217}
218
219impl DatagramHeader {
220    pub fn has_extensions(&self) -> bool {
221        self.datagram_type & 0x01 != 0
222    }
223
224    pub fn is_end_of_group(&self) -> bool {
225        self.datagram_type & 0x02 != 0
226    }
227
228    pub fn has_object_id(&self) -> bool {
229        self.datagram_type & 0x04 == 0
230    }
231
232    /// When set, publisher_priority is omitted on the wire and inherited
233    /// from the subscription / control-message context.
234    pub fn has_default_priority(&self) -> bool {
235        self.datagram_type & 0x08 != 0
236    }
237
238    pub fn is_status(&self) -> bool {
239        self.datagram_type & 0x20 != 0
240    }
241
242    pub fn encode(&self, buf: &mut impl BufMut) {
243        VarInt::from_usize(self.datagram_type as usize).encode(buf);
244        self.track_alias.encode(buf);
245        self.group_id.encode(buf);
246        if self.has_object_id() {
247            self.object_id.encode(buf);
248        }
249        if !self.has_default_priority() {
250            buf.put_u8(self.publisher_priority.unwrap_or(128));
251        }
252        if self.has_extensions() {
253            VarInt::from_usize(self.extension_headers.len()).encode(buf);
254            buf.put_slice(&self.extension_headers);
255        }
256        if self.is_status() {
257            if let Some(s) = &self.object_status {
258                s.encode(buf);
259            }
260        }
261    }
262
263    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
264        let datagram_type = VarInt::decode(buf)?.into_inner() as u8;
265        let track_alias = VarInt::decode(buf)?;
266        let group_id = VarInt::decode(buf)?;
267        let object_id =
268            if datagram_type & 0x04 == 0 { VarInt::decode(buf)? } else { VarInt::from_usize(0) };
269        let publisher_priority = if datagram_type & 0x08 != 0 {
270            None
271        } else {
272            if buf.remaining() < 1 {
273                return Err(CodecError::UnexpectedEnd);
274            }
275            Some(buf.get_u8())
276        };
277        let extension_headers = if datagram_type & 0x01 != 0 {
278            let ext_len = VarInt::decode(buf)?.into_inner() as usize;
279            crate::types::read_bytes(buf, ext_len)?
280        } else {
281            Vec::new()
282        };
283        let object_status =
284            if datagram_type & 0x20 != 0 { Some(VarInt::decode(buf)?) } else { None };
285        Ok(Self {
286            datagram_type,
287            track_alias,
288            group_id,
289            object_id,
290            publisher_priority,
291            extension_headers,
292            object_status,
293        })
294    }
295}
296
297#[derive(Debug, Clone, PartialEq, Eq)]
298pub struct FetchHeader {
299    pub request_id: VarInt,
300}
301
302impl FetchHeader {
303    pub fn encode(&self, buf: &mut impl BufMut) {
304        VarInt::from_usize(0x05).encode(buf);
305        self.request_id.encode(buf);
306    }
307
308    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
309        let stream_type = VarInt::decode(buf)?.into_inner();
310        if stream_type != 0x05 {
311            return Err(CodecError::InvalidField);
312        }
313        let request_id = VarInt::decode(buf)?;
314        Ok(Self { request_id })
315    }
316}