Skip to main content

moqtap_codec/draft07/
data_stream.rs

1use super::types::ObjectStatus;
2use crate::error::CodecError;
3use crate::varint::VarInt;
4use bytes::{Buf, BufMut};
5
6/// Stream type IDs for draft-07 data streams.
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8#[repr(u64)]
9pub enum StreamType {
10    /// Datagram stream type (0x01).
11    Datagram = 0x01,
12    /// Subgroup stream type (0x04).
13    Subgroup = 0x04,
14    /// Fetch stream type (0x05).
15    Fetch = 0x05,
16}
17
18impl StreamType {
19    /// Convert a raw stream type ID to a `StreamType`, if valid.
20    pub fn from_id(id: u64) -> Option<Self> {
21        match id {
22            0x01 => Some(StreamType::Datagram),
23            0x04 => Some(StreamType::Subgroup),
24            0x05 => Some(StreamType::Fetch),
25            _ => None,
26        }
27    }
28}
29
30// ============================================================
31// Subgroup stream (type 0x04)
32// ============================================================
33
34/// Subgroup stream header (follows the stream type varint).
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub struct SubgroupHeader {
37    /// Track alias identifying the subscription.
38    pub track_alias: VarInt,
39    /// Group identifier.
40    pub group_id: VarInt,
41    /// Subgroup identifier within the group.
42    pub subgroup_id: VarInt,
43    /// Publisher priority for delivery ordering.
44    pub publisher_priority: u8,
45}
46
47/// Object within a subgroup stream.
48///
49/// Encoding: object_id(vi), payload_length(vi),
50///   if payload_length == 0: object_status(vi)
51///   else: payload bytes (status is implicitly Normal)
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct ObjectHeader {
54    /// Object identifier within the subgroup.
55    pub object_id: VarInt,
56    /// Length of the object payload in bytes.
57    pub payload_length: VarInt,
58    /// Status of this object.
59    pub object_status: ObjectStatus,
60}
61
62impl SubgroupHeader {
63    /// Encode the subgroup header into the buffer.
64    pub fn encode(&self, buf: &mut impl BufMut) {
65        self.track_alias.encode(buf);
66        self.group_id.encode(buf);
67        self.subgroup_id.encode(buf);
68        buf.put_u8(self.publisher_priority);
69    }
70
71    /// Decode a subgroup header from the buffer.
72    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
73        let track_alias = VarInt::decode(buf)?;
74        let group_id = VarInt::decode(buf)?;
75        let subgroup_id = VarInt::decode(buf)?;
76        if buf.remaining() < 1 {
77            return Err(CodecError::UnexpectedEnd);
78        }
79        let publisher_priority = buf.get_u8();
80        Ok(Self { track_alias, group_id, subgroup_id, publisher_priority })
81    }
82}
83
84impl ObjectHeader {
85    /// Encode the object header into the buffer.
86    pub fn encode(&self, buf: &mut impl BufMut) {
87        self.object_id.encode(buf);
88        self.payload_length.encode(buf);
89        if self.payload_length.into_inner() == 0 {
90            VarInt::from_usize(self.object_status as usize).encode(buf);
91        }
92    }
93
94    /// Decode an object header from the buffer.
95    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
96        let object_id = VarInt::decode(buf)?;
97        let payload_length = VarInt::decode(buf)?;
98        let object_status = if payload_length.into_inner() == 0 {
99            let status_val = VarInt::decode(buf)?.into_inner();
100            ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
101        } else {
102            ObjectStatus::Normal
103        };
104        Ok(Self { object_id, payload_length, object_status })
105    }
106}
107
108// ============================================================
109// Datagram (type 0x01)
110// ============================================================
111
112/// Datagram header (draft-07).
113///
114/// Encoding (after the type varint):
115///   track_alias(vi), group_id(vi), object_id(vi),
116///   publisher_priority(u8), payload_length(vi),
117///   [object_status(vi) if payload_length==0],
118///   payload bytes
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub struct DatagramHeader {
121    /// Track alias identifying the subscription.
122    pub track_alias: VarInt,
123    /// Group identifier.
124    pub group_id: VarInt,
125    /// Object identifier within the group.
126    pub object_id: VarInt,
127    /// Publisher priority for delivery ordering.
128    pub publisher_priority: u8,
129    /// Status of this object.
130    pub object_status: ObjectStatus,
131    /// Length of the object payload in bytes.
132    pub payload_length: VarInt,
133}
134
135impl DatagramHeader {
136    /// Encode the datagram header into the buffer.
137    pub fn encode(&self, buf: &mut impl BufMut) {
138        self.track_alias.encode(buf);
139        self.group_id.encode(buf);
140        self.object_id.encode(buf);
141        buf.put_u8(self.publisher_priority);
142        self.payload_length.encode(buf);
143        if self.payload_length.into_inner() == 0 {
144            VarInt::from_usize(self.object_status as usize).encode(buf);
145        }
146    }
147
148    /// Decode a datagram header from the buffer.
149    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
150        let track_alias = VarInt::decode(buf)?;
151        let group_id = VarInt::decode(buf)?;
152        let object_id = VarInt::decode(buf)?;
153        if buf.remaining() < 1 {
154            return Err(CodecError::UnexpectedEnd);
155        }
156        let publisher_priority = buf.get_u8();
157        let payload_length = VarInt::decode(buf)?;
158        let object_status = if payload_length.into_inner() == 0 {
159            let status_val = VarInt::decode(buf)?.into_inner();
160            ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
161        } else {
162            ObjectStatus::Normal
163        };
164        Ok(Self {
165            track_alias,
166            group_id,
167            object_id,
168            publisher_priority,
169            object_status,
170            payload_length,
171        })
172    }
173}
174
175// ============================================================
176// Fetch stream (type 0x05)
177// ============================================================
178
179/// Fetch stream header (follows the stream type varint).
180#[derive(Debug, Clone, PartialEq, Eq)]
181pub struct FetchHeader {
182    /// Subscribe ID this fetch responds to.
183    pub subscribe_id: VarInt,
184}
185
186/// Object within a fetch stream.
187///
188/// Encoding: group_id(vi), subgroup_id(vi), object_id(vi),
189///   publisher_priority(u8), payload_length(vi),
190///   [object_status(vi) if payload_length==0],
191///   payload bytes
192#[derive(Debug, Clone, PartialEq, Eq)]
193pub struct FetchObjectHeader {
194    /// Group identifier.
195    pub group_id: VarInt,
196    /// Subgroup identifier within the group.
197    pub subgroup_id: VarInt,
198    /// Object identifier within the subgroup.
199    pub object_id: VarInt,
200    /// Publisher priority for delivery ordering.
201    pub publisher_priority: u8,
202    /// Status of this object.
203    pub object_status: ObjectStatus,
204    /// Length of the object payload in bytes.
205    pub payload_length: VarInt,
206}
207
208impl FetchHeader {
209    /// Encode the fetch header into the buffer.
210    pub fn encode(&self, buf: &mut impl BufMut) {
211        self.subscribe_id.encode(buf);
212    }
213
214    /// Decode a fetch header from the buffer.
215    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
216        let subscribe_id = VarInt::decode(buf)?;
217        Ok(Self { subscribe_id })
218    }
219}
220
221impl FetchObjectHeader {
222    /// Encode the fetch object header into the buffer.
223    pub fn encode(&self, buf: &mut impl BufMut) {
224        self.group_id.encode(buf);
225        self.subgroup_id.encode(buf);
226        self.object_id.encode(buf);
227        buf.put_u8(self.publisher_priority);
228        self.payload_length.encode(buf);
229        if self.payload_length.into_inner() == 0 {
230            VarInt::from_usize(self.object_status as usize).encode(buf);
231        }
232    }
233
234    /// Decode a fetch object header from the buffer.
235    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
236        let group_id = VarInt::decode(buf)?;
237        let subgroup_id = VarInt::decode(buf)?;
238        let object_id = VarInt::decode(buf)?;
239        if buf.remaining() < 1 {
240            return Err(CodecError::UnexpectedEnd);
241        }
242        let publisher_priority = buf.get_u8();
243        let payload_length = VarInt::decode(buf)?;
244        let object_status = if payload_length.into_inner() == 0 {
245            let status_val = VarInt::decode(buf)?.into_inner();
246            ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
247        } else {
248            ObjectStatus::Normal
249        };
250        Ok(Self {
251            group_id,
252            subgroup_id,
253            object_id,
254            publisher_priority,
255            object_status,
256            payload_length,
257        })
258    }
259}