Skip to main content

moqtap_codec/draft13/
data_stream.rs

1//! Draft-13 data stream header encoding and decoding.
2//!
3//! - Subgroup stream type IDs: 0x10-0x15, 0x18-0x1D
4//! - Fetch stream type: 0x05
5//! - Datagram types (separate namespace): 0x00-0x05
6
7use super::types::ObjectStatus;
8use crate::error::CodecError;
9use crate::types::read_bytes;
10use crate::varint::VarInt;
11use bytes::{Buf, BufMut};
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14#[repr(u64)]
15pub enum StreamType {
16    Fetch = 0x05,
17    SubgroupZero = 0x10,
18    SubgroupZeroExt = 0x11,
19    SubgroupFirstObj = 0x12,
20    SubgroupFirstObjExt = 0x13,
21    SubgroupExplicit = 0x14,
22    SubgroupExplicitExt = 0x15,
23    SubgroupZeroEog = 0x18,
24    SubgroupZeroEogExt = 0x19,
25    SubgroupFirstObjEog = 0x1A,
26    SubgroupFirstObjEogExt = 0x1B,
27    SubgroupExplicitEog = 0x1C,
28    SubgroupExplicitEogExt = 0x1D,
29}
30
31impl StreamType {
32    pub fn from_id(id: u64) -> Option<Self> {
33        match id {
34            0x05 => Some(StreamType::Fetch),
35            0x10 => Some(StreamType::SubgroupZero),
36            0x11 => Some(StreamType::SubgroupZeroExt),
37            0x12 => Some(StreamType::SubgroupFirstObj),
38            0x13 => Some(StreamType::SubgroupFirstObjExt),
39            0x14 => Some(StreamType::SubgroupExplicit),
40            0x15 => Some(StreamType::SubgroupExplicitExt),
41            0x18 => Some(StreamType::SubgroupZeroEog),
42            0x19 => Some(StreamType::SubgroupZeroEogExt),
43            0x1A => Some(StreamType::SubgroupFirstObjEog),
44            0x1B => Some(StreamType::SubgroupFirstObjEogExt),
45            0x1C => Some(StreamType::SubgroupExplicitEog),
46            0x1D => Some(StreamType::SubgroupExplicitEogExt),
47            _ => None,
48        }
49    }
50
51    pub fn is_subgroup(&self) -> bool {
52        matches!(
53            self,
54            StreamType::SubgroupZero
55                | StreamType::SubgroupZeroExt
56                | StreamType::SubgroupFirstObj
57                | StreamType::SubgroupFirstObjExt
58                | StreamType::SubgroupExplicit
59                | StreamType::SubgroupExplicitExt
60                | StreamType::SubgroupZeroEog
61                | StreamType::SubgroupZeroEogExt
62                | StreamType::SubgroupFirstObjEog
63                | StreamType::SubgroupFirstObjEogExt
64                | StreamType::SubgroupExplicitEog
65                | StreamType::SubgroupExplicitEogExt
66        )
67    }
68
69    pub fn has_extensions(&self) -> bool {
70        matches!(
71            self,
72            StreamType::SubgroupZeroExt
73                | StreamType::SubgroupFirstObjExt
74                | StreamType::SubgroupExplicitExt
75                | StreamType::SubgroupZeroEogExt
76                | StreamType::SubgroupFirstObjEogExt
77                | StreamType::SubgroupExplicitEogExt
78        )
79    }
80
81    pub fn contains_end_of_group(&self) -> bool {
82        matches!(
83            self,
84            StreamType::SubgroupZeroEog
85                | StreamType::SubgroupZeroEogExt
86                | StreamType::SubgroupFirstObjEog
87                | StreamType::SubgroupFirstObjEogExt
88                | StreamType::SubgroupExplicitEog
89                | StreamType::SubgroupExplicitEogExt
90        )
91    }
92}
93
94/// Datagram wire types (separate namespace from QUIC stream types).
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96#[repr(u64)]
97pub enum DatagramType {
98    Datagram = 0x00,
99    DatagramExt = 0x01,
100    DatagramEog = 0x02,
101    DatagramEogExt = 0x03,
102    DatagramStatus = 0x04,
103    DatagramStatusExt = 0x05,
104}
105
106impl DatagramType {
107    pub fn from_id(id: u64) -> Option<Self> {
108        match id {
109            0x00 => Some(DatagramType::Datagram),
110            0x01 => Some(DatagramType::DatagramExt),
111            0x02 => Some(DatagramType::DatagramEog),
112            0x03 => Some(DatagramType::DatagramEogExt),
113            0x04 => Some(DatagramType::DatagramStatus),
114            0x05 => Some(DatagramType::DatagramStatusExt),
115            _ => None,
116        }
117    }
118
119    pub fn has_extensions(&self) -> bool {
120        matches!(
121            self,
122            DatagramType::DatagramExt
123                | DatagramType::DatagramEogExt
124                | DatagramType::DatagramStatusExt
125        )
126    }
127
128    pub fn is_status(&self) -> bool {
129        matches!(self, DatagramType::DatagramStatus | DatagramType::DatagramStatusExt)
130    }
131
132    pub fn is_end_of_group(&self) -> bool {
133        matches!(self, DatagramType::DatagramEog | DatagramType::DatagramEogExt)
134    }
135}
136
137fn read_extension_bytes(buf: &mut impl Buf, byte_len: u64) -> Result<Vec<u8>, CodecError> {
138    read_bytes(buf, byte_len as usize)
139}
140
141// ============================================================
142// Subgroup stream header
143// ============================================================
144
145#[derive(Debug, Clone, PartialEq, Eq)]
146pub struct SubgroupHeader {
147    pub stream_type: StreamType,
148    pub track_alias: VarInt,
149    pub group_id: VarInt,
150    pub subgroup_id: VarInt,
151    pub publisher_priority: u8,
152}
153
154impl SubgroupHeader {
155    pub fn encode(&self, buf: &mut impl BufMut) {
156        self.track_alias.encode(buf);
157        self.group_id.encode(buf);
158        match self.stream_type {
159            StreamType::SubgroupExplicit
160            | StreamType::SubgroupExplicitExt
161            | StreamType::SubgroupExplicitEog
162            | StreamType::SubgroupExplicitEogExt => {
163                self.subgroup_id.encode(buf);
164            }
165            _ => {}
166        }
167        buf.put_u8(self.publisher_priority);
168    }
169
170    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
171        Self::decode_with_type(StreamType::SubgroupExplicit, buf)
172    }
173
174    pub fn decode_with_type(
175        stream_type: StreamType,
176        buf: &mut impl Buf,
177    ) -> Result<Self, CodecError> {
178        let track_alias = VarInt::decode(buf)?;
179        let group_id = VarInt::decode(buf)?;
180        let subgroup_id = match stream_type {
181            StreamType::SubgroupZero
182            | StreamType::SubgroupZeroExt
183            | StreamType::SubgroupZeroEog
184            | StreamType::SubgroupZeroEogExt => VarInt::from_usize(0),
185            StreamType::SubgroupExplicit
186            | StreamType::SubgroupExplicitExt
187            | StreamType::SubgroupExplicitEog
188            | StreamType::SubgroupExplicitEogExt => VarInt::decode(buf)?,
189            StreamType::SubgroupFirstObj
190            | StreamType::SubgroupFirstObjExt
191            | StreamType::SubgroupFirstObjEog
192            | StreamType::SubgroupFirstObjEogExt => VarInt::from_usize(0),
193            _ => return Err(CodecError::InvalidField),
194        };
195        if buf.remaining() < 1 {
196            return Err(CodecError::UnexpectedEnd);
197        }
198        let publisher_priority = buf.get_u8();
199        Ok(Self { stream_type, track_alias, group_id, subgroup_id, publisher_priority })
200    }
201}
202
203// ============================================================
204// Object header within subgroup
205// ============================================================
206
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct ObjectHeader {
209    pub object_id: VarInt,
210    pub extension_headers_length: VarInt,
211    pub extensions: Vec<u8>,
212    pub payload_length: VarInt,
213    pub object_status: ObjectStatus,
214}
215
216impl ObjectHeader {
217    pub fn encode(&self, buf: &mut impl BufMut) {
218        self.encode_with_extensions(false, buf);
219    }
220
221    pub fn encode_with_extensions(&self, has_extensions: bool, buf: &mut impl BufMut) {
222        self.object_id.encode(buf);
223        if has_extensions {
224            self.extension_headers_length.encode(buf);
225            buf.put_slice(&self.extensions);
226        }
227        self.payload_length.encode(buf);
228        if self.payload_length.into_inner() == 0 {
229            VarInt::from_usize(self.object_status as usize).encode(buf);
230        }
231    }
232
233    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
234        Self::decode_with_extensions(false, buf)
235    }
236
237    pub fn decode_with_extensions(
238        has_extensions: bool,
239        buf: &mut impl Buf,
240    ) -> Result<Self, CodecError> {
241        let object_id = VarInt::decode(buf)?;
242        let (extension_headers_length, extensions) = if has_extensions {
243            let ehl = VarInt::decode(buf)?;
244            let ext = read_extension_bytes(buf, ehl.into_inner())?;
245            (ehl, ext)
246        } else {
247            (VarInt::from_usize(0), Vec::new())
248        };
249        let payload_length = VarInt::decode(buf)?;
250        let object_status = if payload_length.into_inner() == 0 {
251            let sv = VarInt::decode(buf)?.into_inner();
252            ObjectStatus::from_u64(sv).ok_or(CodecError::InvalidField)?
253        } else {
254            ObjectStatus::Normal
255        };
256        Ok(Self { object_id, extension_headers_length, extensions, payload_length, object_status })
257    }
258}
259
260// ============================================================
261// Datagram (types 0x00, 0x01)
262// ============================================================
263
264#[derive(Debug, Clone, PartialEq, Eq)]
265pub struct DatagramHeader {
266    pub track_alias: VarInt,
267    pub group_id: VarInt,
268    pub object_id: VarInt,
269    pub publisher_priority: u8,
270    pub extension_headers_length: VarInt,
271    pub extensions: Vec<u8>,
272    pub end_of_group: bool,
273}
274
275impl DatagramHeader {
276    pub fn encode(&self, buf: &mut impl BufMut) {
277        self.encode_with_extensions(false, buf);
278    }
279
280    pub fn encode_with_extensions(&self, has_extensions: bool, buf: &mut impl BufMut) {
281        self.track_alias.encode(buf);
282        self.group_id.encode(buf);
283        self.object_id.encode(buf);
284        buf.put_u8(self.publisher_priority);
285        if has_extensions {
286            self.extension_headers_length.encode(buf);
287            buf.put_slice(&self.extensions);
288        }
289    }
290
291    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
292        Self::decode_with_extensions(false, buf)
293    }
294
295    pub fn decode_with_extensions(
296        has_extensions: bool,
297        buf: &mut impl Buf,
298    ) -> Result<Self, CodecError> {
299        let track_alias = VarInt::decode(buf)?;
300        let group_id = VarInt::decode(buf)?;
301        let object_id = VarInt::decode(buf)?;
302        if buf.remaining() < 1 {
303            return Err(CodecError::UnexpectedEnd);
304        }
305        let publisher_priority = buf.get_u8();
306        let (extension_headers_length, extensions) = if has_extensions {
307            let ehl = VarInt::decode(buf)?;
308            let ext = read_extension_bytes(buf, ehl.into_inner())?;
309            (ehl, ext)
310        } else {
311            (VarInt::from_usize(0), Vec::new())
312        };
313        Ok(Self {
314            track_alias,
315            group_id,
316            object_id,
317            publisher_priority,
318            extension_headers_length,
319            extensions,
320            end_of_group: false,
321        })
322    }
323}
324
325// ============================================================
326// Datagram Status (types 0x04, 0x05)
327// ============================================================
328
329#[derive(Debug, Clone, PartialEq, Eq)]
330pub struct DatagramStatusHeader {
331    pub track_alias: VarInt,
332    pub group_id: VarInt,
333    pub object_id: VarInt,
334    pub publisher_priority: u8,
335    pub extension_headers_length: VarInt,
336    pub extensions: Vec<u8>,
337    pub object_status: ObjectStatus,
338}
339
340impl DatagramStatusHeader {
341    pub fn encode(&self, buf: &mut impl BufMut) {
342        self.encode_with_extensions(false, buf);
343    }
344
345    pub fn encode_with_extensions(&self, has_extensions: bool, buf: &mut impl BufMut) {
346        self.track_alias.encode(buf);
347        self.group_id.encode(buf);
348        self.object_id.encode(buf);
349        buf.put_u8(self.publisher_priority);
350        if has_extensions {
351            self.extension_headers_length.encode(buf);
352            buf.put_slice(&self.extensions);
353        }
354        VarInt::from_usize(self.object_status as usize).encode(buf);
355    }
356
357    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
358        Self::decode_with_extensions(false, buf)
359    }
360
361    pub fn decode_with_extensions(
362        has_extensions: bool,
363        buf: &mut impl Buf,
364    ) -> Result<Self, CodecError> {
365        let track_alias = VarInt::decode(buf)?;
366        let group_id = VarInt::decode(buf)?;
367        let object_id = VarInt::decode(buf)?;
368        if buf.remaining() < 1 {
369            return Err(CodecError::UnexpectedEnd);
370        }
371        let publisher_priority = buf.get_u8();
372        let (extension_headers_length, extensions) = if has_extensions {
373            let ehl = VarInt::decode(buf)?;
374            let ext = read_extension_bytes(buf, ehl.into_inner())?;
375            (ehl, ext)
376        } else {
377            (VarInt::from_usize(0), Vec::new())
378        };
379        let sv = VarInt::decode(buf)?.into_inner();
380        let object_status = ObjectStatus::from_u64(sv).ok_or(CodecError::InvalidField)?;
381        Ok(Self {
382            track_alias,
383            group_id,
384            object_id,
385            publisher_priority,
386            extension_headers_length,
387            extensions,
388            object_status,
389        })
390    }
391}
392
393// ============================================================
394// Fetch stream (type 0x05)
395// ============================================================
396
397#[derive(Debug, Clone, PartialEq, Eq)]
398pub struct FetchHeader {
399    pub request_id: VarInt,
400}
401
402#[derive(Debug, Clone, PartialEq, Eq)]
403pub struct FetchObjectHeader {
404    pub group_id: VarInt,
405    pub subgroup_id: VarInt,
406    pub object_id: VarInt,
407    pub publisher_priority: u8,
408    pub extension_headers_length: VarInt,
409    pub extensions: Vec<u8>,
410    pub payload_length: VarInt,
411    pub object_status: ObjectStatus,
412}
413
414impl FetchHeader {
415    pub fn encode(&self, buf: &mut impl BufMut) {
416        self.request_id.encode(buf);
417    }
418
419    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
420        let request_id = VarInt::decode(buf)?;
421        Ok(Self { request_id })
422    }
423}
424
425impl FetchObjectHeader {
426    pub fn encode(&self, buf: &mut impl BufMut) {
427        self.group_id.encode(buf);
428        self.subgroup_id.encode(buf);
429        self.object_id.encode(buf);
430        buf.put_u8(self.publisher_priority);
431        self.extension_headers_length.encode(buf);
432        buf.put_slice(&self.extensions);
433        self.payload_length.encode(buf);
434        if self.payload_length.into_inner() == 0 {
435            VarInt::from_usize(self.object_status as usize).encode(buf);
436        }
437    }
438
439    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
440        let group_id = VarInt::decode(buf)?;
441        let subgroup_id = VarInt::decode(buf)?;
442        let object_id = VarInt::decode(buf)?;
443        if buf.remaining() < 1 {
444            return Err(CodecError::UnexpectedEnd);
445        }
446        let publisher_priority = buf.get_u8();
447        let extension_headers_length = VarInt::decode(buf)?;
448        let extensions = read_extension_bytes(buf, extension_headers_length.into_inner())?;
449        let payload_length = VarInt::decode(buf)?;
450        let object_status = if payload_length.into_inner() == 0 {
451            let sv = VarInt::decode(buf)?.into_inner();
452            ObjectStatus::from_u64(sv).ok_or(CodecError::InvalidField)?
453        } else {
454            ObjectStatus::Normal
455        };
456        Ok(Self {
457            group_id,
458            subgroup_id,
459            object_id,
460            publisher_priority,
461            extension_headers_length,
462            extensions,
463            payload_length,
464            object_status,
465        })
466    }
467}