Skip to main content

moqtap_codec/draft17/
data_stream.rs

1//! Draft-17 data stream header encoding and decoding.
2//!
3//! Subgroup header type byte: 0b00X1XXXX (bit 4 always set)
4//!   - bit 0 (0x01): PROPERTIES
5//!   - bits 1-2 (0x06): SUBGROUP_ID_MODE (0=zero, 1=first_obj, 2=explicit, 3=reserved)
6//!   - bit 3 (0x08): END_OF_GROUP
7//!   - bit 5 (0x20): DEFAULT_PRIORITY (no priority byte)
8//!
9//! Datagram type byte: 0b00X0XXXX (bit 4 always 0)
10//!   - bit 0 (0x01): PROPERTIES
11//!   - bit 1 (0x02): END_OF_GROUP
12//!   - bit 2 (0x04): ZERO_OBJECT_ID (object_id=0, field omitted)
13//!   - bit 3 (0x08): DEFAULT_PRIORITY (no priority byte)
14//!   - bit 5 (0x20): STATUS (status byte replaces payload)
15//!
16//! Fetch header: stream type 0x05 + request_id.
17
18use bytes::{Buf, BufMut};
19
20use crate::error::CodecError;
21use crate::varint::VarInt;
22
23// ── Subgroup ──────────────────────────────────────────────────
24
25const SUBGROUP_PROPERTIES_BIT: u8 = 0x01;
26const SUBGROUP_ID_MODE_MASK: u8 = 0x06;
27const SUBGROUP_END_OF_GROUP_BIT: u8 = 0x08;
28const SUBGROUP_BASE_BIT: u8 = 0x10;
29const SUBGROUP_DEFAULT_PRIORITY_BIT: u8 = 0x20;
30
31#[derive(Debug, Clone)]
32pub struct SubgroupHeader {
33    pub header_type: u8,
34    pub track_alias: VarInt,
35    pub group_id: VarInt,
36    pub subgroup_id: VarInt,
37    pub publisher_priority: Option<u8>,
38}
39
40impl SubgroupHeader {
41    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
42        if buf.remaining() < 1 {
43            return Err(CodecError::UnexpectedEnd);
44        }
45        let header_type = buf.get_u8();
46
47        // Validate: bit 4 must be set
48        if header_type & SUBGROUP_BASE_BIT == 0 {
49            return Err(CodecError::InvalidField);
50        }
51
52        let track_alias = VarInt::decode(buf)?;
53        let group_id = VarInt::decode(buf)?;
54
55        let subgroup_id_mode = (header_type & SUBGROUP_ID_MODE_MASK) >> 1;
56        let subgroup_id = match subgroup_id_mode {
57            0 => VarInt::from_u64(0).unwrap(),
58            2 => VarInt::decode(buf)?,
59            // Modes 1 and 3: mode 1 = first object's ID (resolved later),
60            // mode 3 = reserved. Store 0 for now.
61            _ => VarInt::from_u64(0).unwrap(),
62        };
63
64        let publisher_priority = if header_type & SUBGROUP_DEFAULT_PRIORITY_BIT == 0 {
65            if buf.remaining() < 1 {
66                return Err(CodecError::UnexpectedEnd);
67            }
68            Some(buf.get_u8())
69        } else {
70            None
71        };
72
73        Ok(SubgroupHeader { header_type, track_alias, group_id, subgroup_id, publisher_priority })
74    }
75
76    pub fn encode(&self, buf: &mut impl BufMut) {
77        buf.put_u8(self.header_type);
78        self.track_alias.encode(buf);
79        self.group_id.encode(buf);
80
81        let subgroup_id_mode = (self.header_type & SUBGROUP_ID_MODE_MASK) >> 1;
82        if subgroup_id_mode == 2 {
83            self.subgroup_id.encode(buf);
84        }
85
86        if self.header_type & SUBGROUP_DEFAULT_PRIORITY_BIT == 0 {
87            buf.put_u8(self.publisher_priority.unwrap_or(128));
88        }
89    }
90
91    pub fn has_properties(&self) -> bool {
92        self.header_type & SUBGROUP_PROPERTIES_BIT != 0
93    }
94
95    pub fn is_end_of_group(&self) -> bool {
96        self.header_type & SUBGROUP_END_OF_GROUP_BIT != 0
97    }
98}
99
100// ── Subgroup objects (stateful) ───────────────────────────────
101
102/// One object within a draft-17 subgroup stream. Object IDs are
103/// delta-encoded and the presence of a "properties" block (the
104/// draft-17 equivalent of extension headers) is determined by the
105/// PROPERTIES bit on the enclosing [`SubgroupHeader`]. Use
106/// [`SubgroupObjectReader`] to encode/decode.
107#[derive(Debug, Clone)]
108pub struct SubgroupObject {
109    pub object_id: VarInt,
110    /// Raw properties bytes (empty unless the subgroup header sets the
111    /// PROPERTIES bit). When present, holds the `ext_count` varint
112    /// followed by each property's `key`, `vlen`, and value.
113    pub extension_headers: Vec<u8>,
114    pub payload_length: VarInt,
115    pub object_status: Option<VarInt>,
116    pub payload: Vec<u8>,
117}
118
119#[derive(Debug, Clone)]
120pub struct SubgroupObjectReader {
121    extensions_present: bool,
122    prev_object_id: Option<u64>,
123}
124
125impl SubgroupObjectReader {
126    pub fn new(header: &SubgroupHeader) -> Self {
127        Self { extensions_present: header.has_properties(), prev_object_id: None }
128    }
129
130    pub fn read_object(&mut self, buf: &mut impl Buf) -> Result<SubgroupObject, CodecError> {
131        let delta = VarInt::decode(buf)?.into_inner();
132        let object_id_val = match self.prev_object_id {
133            None => delta,
134            Some(prev) => prev
135                .checked_add(1)
136                .and_then(|v| v.checked_add(delta))
137                .ok_or(CodecError::InvalidField)?,
138        };
139        self.prev_object_id = Some(object_id_val);
140        let object_id = VarInt::from_u64(object_id_val).map_err(|_| CodecError::InvalidField)?;
141
142        let extension_headers = if self.extensions_present {
143            let mut out: Vec<u8> = Vec::new();
144            let ext_count = VarInt::decode(buf)?;
145            ext_count.encode(&mut out);
146            let count = ext_count.into_inner();
147            for _ in 0..count {
148                let key = VarInt::decode(buf)?;
149                let vlen = VarInt::decode(buf)?;
150                let vlen_usize = vlen.into_inner() as usize;
151                if buf.remaining() < vlen_usize {
152                    return Err(CodecError::UnexpectedEnd);
153                }
154                key.encode(&mut out);
155                vlen.encode(&mut out);
156                let value = buf.copy_to_bytes(vlen_usize);
157                out.extend_from_slice(&value);
158            }
159            out
160        } else {
161            Vec::new()
162        };
163
164        let payload_length_vi = VarInt::decode(buf)?;
165        let payload_length_val = payload_length_vi.into_inner() as usize;
166        let (object_status, payload) = if payload_length_val == 0 {
167            let status = VarInt::decode(buf)?;
168            (Some(status), Vec::new())
169        } else {
170            let payload = crate::types::read_bytes(buf, payload_length_val)?;
171            (None, payload)
172        };
173
174        Ok(SubgroupObject {
175            object_id,
176            extension_headers,
177            payload_length: payload_length_vi,
178            object_status,
179            payload,
180        })
181    }
182
183    pub fn write_object(
184        &mut self,
185        object: &SubgroupObject,
186        buf: &mut impl BufMut,
187    ) -> Result<(), CodecError> {
188        let oid = object.object_id.into_inner();
189        let delta = match self.prev_object_id {
190            None => oid,
191            Some(prev) => oid
192                .checked_sub(prev)
193                .and_then(|v| v.checked_sub(1))
194                .ok_or(CodecError::InvalidField)?,
195        };
196        VarInt::from_u64(delta).map_err(|_| CodecError::InvalidField)?.encode(buf);
197        if self.extensions_present {
198            buf.put_slice(&object.extension_headers);
199        }
200        object.payload_length.encode(buf);
201        if object.payload_length.into_inner() == 0 {
202            if let Some(s) = &object.object_status {
203                s.encode(buf);
204            } else {
205                VarInt::from_u64(0).unwrap().encode(buf);
206            }
207        } else {
208            buf.put_slice(&object.payload);
209        }
210        self.prev_object_id = Some(oid);
211        Ok(())
212    }
213}
214
215// ── Datagram ──────────────────────────────────────────────────
216
217const DATAGRAM_PROPERTIES_BIT: u8 = 0x01;
218const DATAGRAM_END_OF_GROUP_BIT: u8 = 0x02;
219const DATAGRAM_ZERO_OBJECT_ID_BIT: u8 = 0x04;
220const DATAGRAM_DEFAULT_PRIORITY_BIT: u8 = 0x08;
221const DATAGRAM_STATUS_BIT: u8 = 0x20;
222
223#[derive(Debug, Clone)]
224pub struct DatagramHeader {
225    pub datagram_type: u8,
226    pub track_alias: VarInt,
227    pub group_id: VarInt,
228    pub object_id: VarInt,
229    pub publisher_priority: Option<u8>,
230    pub object_status: Option<u8>,
231}
232
233impl DatagramHeader {
234    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
235        if buf.remaining() < 1 {
236            return Err(CodecError::UnexpectedEnd);
237        }
238        let datagram_type = buf.get_u8();
239
240        let track_alias = VarInt::decode(buf)?;
241        let group_id = VarInt::decode(buf)?;
242
243        let object_id = if datagram_type & DATAGRAM_ZERO_OBJECT_ID_BIT != 0 {
244            VarInt::from_usize(0)
245        } else {
246            VarInt::decode(buf)?
247        };
248
249        let publisher_priority = if datagram_type & DATAGRAM_DEFAULT_PRIORITY_BIT == 0 {
250            if buf.remaining() < 1 {
251                return Err(CodecError::UnexpectedEnd);
252            }
253            Some(buf.get_u8())
254        } else {
255            None
256        };
257
258        // Skip properties if present
259        if datagram_type & DATAGRAM_PROPERTIES_BIT != 0 {
260            let props_len = VarInt::decode(buf)?.into_inner() as usize;
261            if buf.remaining() < props_len {
262                return Err(CodecError::UnexpectedEnd);
263            }
264            buf.advance(props_len);
265        }
266
267        let object_status = if datagram_type & DATAGRAM_STATUS_BIT != 0 {
268            if buf.remaining() < 1 {
269                return Err(CodecError::UnexpectedEnd);
270            }
271            Some(buf.get_u8())
272        } else {
273            None
274        };
275
276        Ok(DatagramHeader {
277            datagram_type,
278            track_alias,
279            group_id,
280            object_id,
281            publisher_priority,
282            object_status,
283        })
284    }
285
286    pub fn encode(&self, buf: &mut impl BufMut) {
287        buf.put_u8(self.datagram_type);
288        self.track_alias.encode(buf);
289        self.group_id.encode(buf);
290
291        if self.datagram_type & DATAGRAM_ZERO_OBJECT_ID_BIT == 0 {
292            self.object_id.encode(buf);
293        }
294
295        if self.datagram_type & DATAGRAM_DEFAULT_PRIORITY_BIT == 0 {
296            buf.put_u8(self.publisher_priority.unwrap_or(128));
297        }
298
299        if self.datagram_type & DATAGRAM_STATUS_BIT != 0 {
300            buf.put_u8(self.object_status.unwrap_or(0));
301        }
302    }
303
304    pub fn is_end_of_group(&self) -> bool {
305        self.datagram_type & DATAGRAM_END_OF_GROUP_BIT != 0
306    }
307
308    pub fn has_status(&self) -> bool {
309        self.datagram_type & DATAGRAM_STATUS_BIT != 0
310    }
311}
312
313// ── Fetch Header ──────────────────────────────────────────────
314
315const FETCH_STREAM_TYPE: u64 = 0x05;
316
317#[derive(Debug, Clone)]
318pub struct FetchHeader {
319    pub request_id: VarInt,
320}
321
322impl FetchHeader {
323    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
324        let stream_type = VarInt::decode(buf)?.into_inner();
325        if stream_type != FETCH_STREAM_TYPE {
326            return Err(CodecError::InvalidField);
327        }
328        let request_id = VarInt::decode(buf)?;
329        Ok(FetchHeader { request_id })
330    }
331
332    pub fn encode(&self, buf: &mut impl BufMut) {
333        VarInt::from_usize(FETCH_STREAM_TYPE as usize).encode(buf);
334        self.request_id.encode(buf);
335    }
336}