Skip to main content

moqtap_codec/draft08/
data_stream.rs

1//! Draft-08 data stream header encoding and decoding.
2//!
3//! Differences from draft-07:
4//! - Object headers include `extension_count` (varint) + raw extension bytes
5//! - Separate DatagramStatus type (0x02) for status-only datagrams
6//! - Datagram (0x01) includes extension_count + payload
7
8use super::types::ObjectStatus;
9use crate::error::CodecError;
10use crate::types::read_bytes;
11use crate::varint::VarInt;
12use bytes::{Buf, BufMut};
13
14/// Stream type IDs for draft-08 data streams.
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16#[repr(u64)]
17pub enum StreamType {
18    /// Datagram with payload (0x01).
19    Datagram = 0x01,
20    /// Datagram with status only, no payload (0x02).
21    DatagramStatus = 0x02,
22    /// Subgroup stream type (0x04).
23    Subgroup = 0x04,
24    /// Fetch stream type (0x05).
25    Fetch = 0x05,
26}
27
28impl StreamType {
29    /// Convert a raw stream type ID to a `StreamType`, if valid.
30    pub fn from_id(id: u64) -> Option<Self> {
31        match id {
32            0x01 => Some(StreamType::Datagram),
33            0x02 => Some(StreamType::DatagramStatus),
34            0x04 => Some(StreamType::Subgroup),
35            0x05 => Some(StreamType::Fetch),
36            _ => None,
37        }
38    }
39}
40
41// ── Extension helpers ───────────────────────────────────────
42
43/// Skip over extensions in the buffer, reading extension_count varints.
44///
45/// Extension encoding: for each extension, read type (varint).
46/// - Even type: value is a single varint
47/// - Odd type: value is length-prefixed bytes (varint length + bytes)
48fn skip_extensions(buf: &mut impl Buf, count: u64) -> Result<Vec<u8>, CodecError> {
49    let mut raw = Vec::new();
50    for _ in 0..count {
51        let ext_type = VarInt::decode(buf)?;
52        ext_type.encode(&mut raw);
53        if ext_type.into_inner() % 2 == 0 {
54            let val = VarInt::decode(buf)?;
55            val.encode(&mut raw);
56        } else {
57            let len = VarInt::decode(buf)?.into_inner() as usize;
58            VarInt::from_usize(len).encode(&mut raw);
59            let bytes = read_bytes(buf, len)?;
60            raw.extend_from_slice(&bytes);
61        }
62    }
63    Ok(raw)
64}
65
66/// Encode extension bytes back to the buffer.
67fn encode_extensions(extensions: &[u8], buf: &mut impl BufMut) {
68    buf.put_slice(extensions);
69}
70
71// ============================================================
72// Subgroup stream (type 0x04)
73// ============================================================
74
75/// Subgroup stream header (follows the stream type varint).
76#[derive(Debug, Clone, PartialEq, Eq)]
77pub struct SubgroupHeader {
78    /// Track alias identifying the subscription.
79    pub track_alias: VarInt,
80    /// Group identifier.
81    pub group_id: VarInt,
82    /// Subgroup identifier within the group.
83    pub subgroup_id: VarInt,
84    /// Publisher priority for delivery ordering.
85    pub publisher_priority: u8,
86}
87
88/// Object within a subgroup stream (draft-08).
89///
90/// Encoding: object_id(vi), extension_count(vi), [extensions...],
91///   payload_length(vi),
92///   if payload_length == 0: object_status(vi)
93///   else: payload bytes
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct ObjectHeader {
96    /// Object identifier within the subgroup.
97    pub object_id: VarInt,
98    /// Number of extensions.
99    pub extension_count: VarInt,
100    /// Raw extension bytes (opaque).
101    pub extensions: Vec<u8>,
102    /// Length of the object payload in bytes.
103    pub payload_length: VarInt,
104    /// Status of this object.
105    pub object_status: ObjectStatus,
106}
107
108impl SubgroupHeader {
109    /// Encode the subgroup header into the buffer.
110    pub fn encode(&self, buf: &mut impl BufMut) {
111        self.track_alias.encode(buf);
112        self.group_id.encode(buf);
113        self.subgroup_id.encode(buf);
114        buf.put_u8(self.publisher_priority);
115    }
116
117    /// Decode a subgroup header from the buffer.
118    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
119        let track_alias = VarInt::decode(buf)?;
120        let group_id = VarInt::decode(buf)?;
121        let subgroup_id = VarInt::decode(buf)?;
122        if buf.remaining() < 1 {
123            return Err(CodecError::UnexpectedEnd);
124        }
125        let publisher_priority = buf.get_u8();
126        Ok(Self { track_alias, group_id, subgroup_id, publisher_priority })
127    }
128}
129
130impl ObjectHeader {
131    /// Encode the object header into the buffer.
132    pub fn encode(&self, buf: &mut impl BufMut) {
133        self.object_id.encode(buf);
134        self.extension_count.encode(buf);
135        encode_extensions(&self.extensions, buf);
136        self.payload_length.encode(buf);
137        if self.payload_length.into_inner() == 0 {
138            VarInt::from_usize(self.object_status as usize).encode(buf);
139        }
140    }
141
142    /// Decode an object header from the buffer.
143    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
144        let object_id = VarInt::decode(buf)?;
145        let extension_count = VarInt::decode(buf)?;
146        let extensions = skip_extensions(buf, extension_count.into_inner())?;
147        let payload_length = VarInt::decode(buf)?;
148        let object_status = if payload_length.into_inner() == 0 {
149            let status_val = VarInt::decode(buf)?.into_inner();
150            ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
151        } else {
152            ObjectStatus::Normal
153        };
154        Ok(Self { object_id, extension_count, extensions, payload_length, object_status })
155    }
156}
157
158// ============================================================
159// Datagram (type 0x01)
160// ============================================================
161
162/// Datagram header with payload (draft-08, type 0x01).
163///
164/// Encoding (after type varint):
165///   track_alias(vi), group_id(vi), object_id(vi),
166///   publisher_priority(u8), extension_count(vi), [extensions...],
167///   payload_length(vi),
168///   if payload_length == 0: object_status(vi),
169///   payload bytes
170#[derive(Debug, Clone, PartialEq, Eq)]
171pub struct DatagramHeader {
172    /// Track alias identifying the subscription.
173    pub track_alias: VarInt,
174    /// Group identifier.
175    pub group_id: VarInt,
176    /// Object identifier within the group.
177    pub object_id: VarInt,
178    /// Publisher priority for delivery ordering.
179    pub publisher_priority: u8,
180    /// Number of extensions.
181    pub extension_count: VarInt,
182    /// Raw extension bytes (opaque).
183    pub extensions: Vec<u8>,
184    /// Status of this object.
185    pub object_status: ObjectStatus,
186    /// Length of the object payload in bytes.
187    pub payload_length: VarInt,
188}
189
190impl DatagramHeader {
191    /// Encode the datagram header into the buffer.
192    pub fn encode(&self, buf: &mut impl BufMut) {
193        self.track_alias.encode(buf);
194        self.group_id.encode(buf);
195        self.object_id.encode(buf);
196        buf.put_u8(self.publisher_priority);
197        self.extension_count.encode(buf);
198        encode_extensions(&self.extensions, buf);
199        self.payload_length.encode(buf);
200        if self.payload_length.into_inner() == 0 {
201            VarInt::from_usize(self.object_status as usize).encode(buf);
202        }
203    }
204
205    /// Decode a datagram header from the buffer.
206    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
207        let track_alias = VarInt::decode(buf)?;
208        let group_id = VarInt::decode(buf)?;
209        let object_id = VarInt::decode(buf)?;
210        if buf.remaining() < 1 {
211            return Err(CodecError::UnexpectedEnd);
212        }
213        let publisher_priority = buf.get_u8();
214        let extension_count = VarInt::decode(buf)?;
215        let extensions = skip_extensions(buf, extension_count.into_inner())?;
216        let payload_length = VarInt::decode(buf)?;
217        let object_status = if payload_length.into_inner() == 0 {
218            let status_val = VarInt::decode(buf)?.into_inner();
219            ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
220        } else {
221            ObjectStatus::Normal
222        };
223        Ok(Self {
224            track_alias,
225            group_id,
226            object_id,
227            publisher_priority,
228            extension_count,
229            extensions,
230            object_status,
231            payload_length,
232        })
233    }
234}
235
236// ============================================================
237// Datagram Status (type 0x02)
238// ============================================================
239
240/// Datagram status header (draft-08, type 0x02).
241///
242/// Status-only datagram with no payload or extensions.
243#[derive(Debug, Clone, PartialEq, Eq)]
244pub struct DatagramStatusHeader {
245    /// Track alias identifying the subscription.
246    pub track_alias: VarInt,
247    /// Group identifier.
248    pub group_id: VarInt,
249    /// Object identifier within the group.
250    pub object_id: VarInt,
251    /// Publisher priority for delivery ordering.
252    pub publisher_priority: u8,
253    /// Object status code.
254    pub object_status: ObjectStatus,
255}
256
257impl DatagramStatusHeader {
258    /// Encode the datagram status header into the buffer.
259    pub fn encode(&self, buf: &mut impl BufMut) {
260        self.track_alias.encode(buf);
261        self.group_id.encode(buf);
262        self.object_id.encode(buf);
263        buf.put_u8(self.publisher_priority);
264        VarInt::from_usize(self.object_status as usize).encode(buf);
265    }
266
267    /// Decode a datagram status header from the buffer.
268    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
269        let track_alias = VarInt::decode(buf)?;
270        let group_id = VarInt::decode(buf)?;
271        let object_id = VarInt::decode(buf)?;
272        if buf.remaining() < 1 {
273            return Err(CodecError::UnexpectedEnd);
274        }
275        let publisher_priority = buf.get_u8();
276        let status_val = VarInt::decode(buf)?.into_inner();
277        let object_status = ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?;
278        Ok(Self { track_alias, group_id, object_id, publisher_priority, object_status })
279    }
280}
281
282// ============================================================
283// Fetch stream (type 0x05)
284// ============================================================
285
286/// Fetch stream header (follows the stream type varint).
287#[derive(Debug, Clone, PartialEq, Eq)]
288pub struct FetchHeader {
289    /// Subscribe ID this fetch responds to.
290    pub subscribe_id: VarInt,
291}
292
293/// Object within a fetch stream (draft-08).
294///
295/// Encoding: group_id(vi), subgroup_id(vi), object_id(vi),
296///   publisher_priority(u8), extension_count(vi), [extensions...],
297///   payload_length(vi),
298///   [object_status(vi) if payload_length==0],
299///   payload bytes
300#[derive(Debug, Clone, PartialEq, Eq)]
301pub struct FetchObjectHeader {
302    /// Group identifier.
303    pub group_id: VarInt,
304    /// Subgroup identifier within the group.
305    pub subgroup_id: VarInt,
306    /// Object identifier within the subgroup.
307    pub object_id: VarInt,
308    /// Publisher priority for delivery ordering.
309    pub publisher_priority: u8,
310    /// Number of extensions.
311    pub extension_count: VarInt,
312    /// Raw extension bytes (opaque).
313    pub extensions: Vec<u8>,
314    /// Status of this object.
315    pub object_status: ObjectStatus,
316    /// Length of the object payload in bytes.
317    pub payload_length: VarInt,
318}
319
320impl FetchHeader {
321    /// Encode the fetch header into the buffer.
322    pub fn encode(&self, buf: &mut impl BufMut) {
323        self.subscribe_id.encode(buf);
324    }
325
326    /// Decode a fetch header from the buffer.
327    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
328        let subscribe_id = VarInt::decode(buf)?;
329        Ok(Self { subscribe_id })
330    }
331}
332
333impl FetchObjectHeader {
334    /// Encode the fetch object header into the buffer.
335    pub fn encode(&self, buf: &mut impl BufMut) {
336        self.group_id.encode(buf);
337        self.subgroup_id.encode(buf);
338        self.object_id.encode(buf);
339        buf.put_u8(self.publisher_priority);
340        self.extension_count.encode(buf);
341        encode_extensions(&self.extensions, buf);
342        self.payload_length.encode(buf);
343        if self.payload_length.into_inner() == 0 {
344            VarInt::from_usize(self.object_status as usize).encode(buf);
345        }
346    }
347
348    /// Decode a fetch object header from the buffer.
349    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
350        let group_id = VarInt::decode(buf)?;
351        let subgroup_id = VarInt::decode(buf)?;
352        let object_id = VarInt::decode(buf)?;
353        if buf.remaining() < 1 {
354            return Err(CodecError::UnexpectedEnd);
355        }
356        let publisher_priority = buf.get_u8();
357        let extension_count = VarInt::decode(buf)?;
358        let extensions = skip_extensions(buf, extension_count.into_inner())?;
359        let payload_length = VarInt::decode(buf)?;
360        let object_status = if payload_length.into_inner() == 0 {
361            let status_val = VarInt::decode(buf)?.into_inner();
362            ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
363        } else {
364            ObjectStatus::Normal
365        };
366        Ok(Self {
367            group_id,
368            subgroup_id,
369            object_id,
370            publisher_priority,
371            extension_count,
372            extensions,
373            object_status,
374            payload_length,
375        })
376    }
377}