Skip to main content

moqtap_codec/draft10/
data_stream.rs

1//! Draft-10 data stream header encoding and decoding.
2//!
3//! Changes from draft-08:
4//! - `extension_count` → `extension_headers_length` (byte length, not count)
5//! - Datagram (0x01): no `payload_length` or `object_status`; payload is remaining bytes
6//! - DatagramStatus (0x02): gains `extension_headers_length` field
7
8use super::types::ObjectStatus;
9use crate::error::CodecError;
10use crate::types::read_bytes;
11use crate::varint::VarInt;
12use bytes::{Buf, BufMut};
13
14/// Stream type IDs for draft-10 data streams (same IDs as draft-08).
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16#[repr(u64)]
17pub enum StreamType {
18    /// Datagram with payload (0x01).
19    Datagram = 0x01,
20    /// Datagram with status only, no payload (0x02).
21    DatagramStatus = 0x02,
22    /// Subgroup stream type (0x04).
23    Subgroup = 0x04,
24    /// Fetch stream type (0x05).
25    Fetch = 0x05,
26}
27
28impl StreamType {
29    /// Convert a raw stream type ID to a `StreamType`, if valid.
30    pub fn from_id(id: u64) -> Option<Self> {
31        match id {
32            0x01 => Some(StreamType::Datagram),
33            0x02 => Some(StreamType::DatagramStatus),
34            0x04 => Some(StreamType::Subgroup),
35            0x05 => Some(StreamType::Fetch),
36            _ => None,
37        }
38    }
39}
40
41// ── Extension helpers (length-based, not count-based) ─────────
42
43/// Read `byte_len` bytes of raw extension data from the buffer.
44fn read_extension_bytes(buf: &mut impl Buf, byte_len: u64) -> Result<Vec<u8>, CodecError> {
45    read_bytes(buf, byte_len as usize)
46}
47
48/// Encode extension bytes to the buffer (just writes the raw bytes).
49fn encode_extensions(extensions: &[u8], buf: &mut impl BufMut) {
50    buf.put_slice(extensions);
51}
52
53// ============================================================
54// Subgroup stream (type 0x04)
55// ============================================================
56
57/// Subgroup stream header (follows the stream type varint).
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct SubgroupHeader {
60    /// Track alias identifying the subscription.
61    pub track_alias: VarInt,
62    /// Group identifier.
63    pub group_id: VarInt,
64    /// Subgroup identifier within the group.
65    pub subgroup_id: VarInt,
66    /// Publisher priority for delivery ordering.
67    pub publisher_priority: u8,
68}
69
70/// Object within a subgroup stream (draft-10).
71///
72/// Encoding: object_id(vi), extension_headers_length(vi), [extensions...],
73///   payload_length(vi),
74///   if payload_length == 0: object_status(vi)
75///   else: payload bytes
76#[derive(Debug, Clone, PartialEq, Eq)]
77pub struct ObjectHeader {
78    /// Object identifier within the subgroup.
79    pub object_id: VarInt,
80    /// Total byte length of extension headers.
81    pub extension_headers_length: VarInt,
82    /// Raw extension bytes (opaque).
83    pub extensions: Vec<u8>,
84    /// Length of the object payload in bytes.
85    pub payload_length: VarInt,
86    /// Status of this object.
87    pub object_status: ObjectStatus,
88}
89
90impl SubgroupHeader {
91    /// Encode the subgroup header into the buffer.
92    pub fn encode(&self, buf: &mut impl BufMut) {
93        self.track_alias.encode(buf);
94        self.group_id.encode(buf);
95        self.subgroup_id.encode(buf);
96        buf.put_u8(self.publisher_priority);
97    }
98
99    /// Decode a subgroup header from the buffer.
100    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
101        let track_alias = VarInt::decode(buf)?;
102        let group_id = VarInt::decode(buf)?;
103        let subgroup_id = VarInt::decode(buf)?;
104        if buf.remaining() < 1 {
105            return Err(CodecError::UnexpectedEnd);
106        }
107        let publisher_priority = buf.get_u8();
108        Ok(Self { track_alias, group_id, subgroup_id, publisher_priority })
109    }
110}
111
112impl ObjectHeader {
113    /// Encode the object header into the buffer.
114    pub fn encode(&self, buf: &mut impl BufMut) {
115        self.object_id.encode(buf);
116        self.extension_headers_length.encode(buf);
117        encode_extensions(&self.extensions, buf);
118        self.payload_length.encode(buf);
119        if self.payload_length.into_inner() == 0 {
120            VarInt::from_usize(self.object_status as usize).encode(buf);
121        }
122    }
123
124    /// Decode an object header from the buffer.
125    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
126        let object_id = VarInt::decode(buf)?;
127        let extension_headers_length = VarInt::decode(buf)?;
128        let extensions = read_extension_bytes(buf, extension_headers_length.into_inner())?;
129        let payload_length = VarInt::decode(buf)?;
130        let object_status = if payload_length.into_inner() == 0 {
131            let status_val = VarInt::decode(buf)?.into_inner();
132            ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
133        } else {
134            ObjectStatus::Normal
135        };
136        Ok(Self { object_id, extension_headers_length, extensions, payload_length, object_status })
137    }
138}
139
140// ============================================================
141// Datagram (type 0x01)
142// ============================================================
143
144/// Datagram header with payload (draft-10, type 0x01).
145///
146/// Draft-10 change: no `payload_length` or `object_status` fields.
147/// Payload is the remaining bytes in the datagram.
148///
149/// Encoding (after type varint):
150///   track_alias(vi), group_id(vi), object_id(vi),
151///   publisher_priority(u8), extension_headers_length(vi), [extensions...],
152///   [remaining bytes = payload]
153#[derive(Debug, Clone, PartialEq, Eq)]
154pub struct DatagramHeader {
155    /// Track alias identifying the subscription.
156    pub track_alias: VarInt,
157    /// Group identifier.
158    pub group_id: VarInt,
159    /// Object identifier within the group.
160    pub object_id: VarInt,
161    /// Publisher priority for delivery ordering.
162    pub publisher_priority: u8,
163    /// Total byte length of extension headers.
164    pub extension_headers_length: VarInt,
165    /// Raw extension bytes (opaque).
166    pub extensions: Vec<u8>,
167}
168
169impl DatagramHeader {
170    /// Encode the datagram header into the buffer.
171    pub fn encode(&self, buf: &mut impl BufMut) {
172        self.track_alias.encode(buf);
173        self.group_id.encode(buf);
174        self.object_id.encode(buf);
175        buf.put_u8(self.publisher_priority);
176        self.extension_headers_length.encode(buf);
177        encode_extensions(&self.extensions, buf);
178    }
179
180    /// Decode a datagram header from the buffer.
181    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
182        let track_alias = VarInt::decode(buf)?;
183        let group_id = VarInt::decode(buf)?;
184        let object_id = VarInt::decode(buf)?;
185        if buf.remaining() < 1 {
186            return Err(CodecError::UnexpectedEnd);
187        }
188        let publisher_priority = buf.get_u8();
189        let extension_headers_length = VarInt::decode(buf)?;
190        let extensions = read_extension_bytes(buf, extension_headers_length.into_inner())?;
191        Ok(Self {
192            track_alias,
193            group_id,
194            object_id,
195            publisher_priority,
196            extension_headers_length,
197            extensions,
198        })
199    }
200}
201
202// ============================================================
203// Datagram Status (type 0x02)
204// ============================================================
205
206/// Datagram status header (draft-10, type 0x02).
207///
208/// Draft-10 change: gains `extension_headers_length` field.
209#[derive(Debug, Clone, PartialEq, Eq)]
210pub struct DatagramStatusHeader {
211    /// Track alias identifying the subscription.
212    pub track_alias: VarInt,
213    /// Group identifier.
214    pub group_id: VarInt,
215    /// Object identifier within the group.
216    pub object_id: VarInt,
217    /// Publisher priority for delivery ordering.
218    pub publisher_priority: u8,
219    /// Total byte length of extension headers.
220    pub extension_headers_length: VarInt,
221    /// Raw extension bytes (opaque).
222    pub extensions: Vec<u8>,
223    /// Object status code.
224    pub object_status: ObjectStatus,
225}
226
227impl DatagramStatusHeader {
228    /// Encode the datagram status header into the buffer.
229    pub fn encode(&self, buf: &mut impl BufMut) {
230        self.track_alias.encode(buf);
231        self.group_id.encode(buf);
232        self.object_id.encode(buf);
233        buf.put_u8(self.publisher_priority);
234        self.extension_headers_length.encode(buf);
235        encode_extensions(&self.extensions, buf);
236        VarInt::from_usize(self.object_status as usize).encode(buf);
237    }
238
239    /// Decode a datagram status header from the buffer.
240    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
241        let track_alias = VarInt::decode(buf)?;
242        let group_id = VarInt::decode(buf)?;
243        let object_id = VarInt::decode(buf)?;
244        if buf.remaining() < 1 {
245            return Err(CodecError::UnexpectedEnd);
246        }
247        let publisher_priority = buf.get_u8();
248        let extension_headers_length = VarInt::decode(buf)?;
249        let extensions = read_extension_bytes(buf, extension_headers_length.into_inner())?;
250        let status_val = VarInt::decode(buf)?.into_inner();
251        let object_status = ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?;
252        Ok(Self {
253            track_alias,
254            group_id,
255            object_id,
256            publisher_priority,
257            extension_headers_length,
258            extensions,
259            object_status,
260        })
261    }
262}
263
264// ============================================================
265// Fetch stream (type 0x05)
266// ============================================================
267
268/// Fetch stream header (follows the stream type varint).
269#[derive(Debug, Clone, PartialEq, Eq)]
270pub struct FetchHeader {
271    /// Subscribe ID this fetch responds to.
272    pub subscribe_id: VarInt,
273}
274
275/// Object within a fetch stream (draft-10).
276///
277/// Uses `extension_headers_length` instead of `extension_count`.
278#[derive(Debug, Clone, PartialEq, Eq)]
279pub struct FetchObjectHeader {
280    /// Group identifier.
281    pub group_id: VarInt,
282    /// Subgroup identifier within the group.
283    pub subgroup_id: VarInt,
284    /// Object identifier within the subgroup.
285    pub object_id: VarInt,
286    /// Publisher priority for delivery ordering.
287    pub publisher_priority: u8,
288    /// Total byte length of extension headers.
289    pub extension_headers_length: VarInt,
290    /// Raw extension bytes (opaque).
291    pub extensions: Vec<u8>,
292    /// Status of this object.
293    pub object_status: ObjectStatus,
294    /// Length of the object payload in bytes.
295    pub payload_length: VarInt,
296}
297
298impl FetchHeader {
299    /// Encode the fetch header into the buffer.
300    pub fn encode(&self, buf: &mut impl BufMut) {
301        self.subscribe_id.encode(buf);
302    }
303
304    /// Decode a fetch header from the buffer.
305    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
306        let subscribe_id = VarInt::decode(buf)?;
307        Ok(Self { subscribe_id })
308    }
309}
310
311impl FetchObjectHeader {
312    /// Encode the fetch object header into the buffer.
313    pub fn encode(&self, buf: &mut impl BufMut) {
314        self.group_id.encode(buf);
315        self.subgroup_id.encode(buf);
316        self.object_id.encode(buf);
317        buf.put_u8(self.publisher_priority);
318        self.extension_headers_length.encode(buf);
319        encode_extensions(&self.extensions, buf);
320        self.payload_length.encode(buf);
321        if self.payload_length.into_inner() == 0 {
322            VarInt::from_usize(self.object_status as usize).encode(buf);
323        }
324    }
325
326    /// Decode a fetch object header from the buffer.
327    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
328        let group_id = VarInt::decode(buf)?;
329        let subgroup_id = VarInt::decode(buf)?;
330        let object_id = VarInt::decode(buf)?;
331        if buf.remaining() < 1 {
332            return Err(CodecError::UnexpectedEnd);
333        }
334        let publisher_priority = buf.get_u8();
335        let extension_headers_length = VarInt::decode(buf)?;
336        let extensions = read_extension_bytes(buf, extension_headers_length.into_inner())?;
337        let payload_length = VarInt::decode(buf)?;
338        let object_status = if payload_length.into_inner() == 0 {
339            let status_val = VarInt::decode(buf)?.into_inner();
340            ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
341        } else {
342            ObjectStatus::Normal
343        };
344        Ok(Self {
345            group_id,
346            subgroup_id,
347            object_id,
348            publisher_priority,
349            extension_headers_length,
350            extensions,
351            object_status,
352            payload_length,
353        })
354    }
355}