moqtap_codec/draft17/
data_stream.rs1use bytes::{Buf, BufMut};
19
20use crate::error::CodecError;
21use crate::varint::VarInt;
22
23const SUBGROUP_PROPERTIES_BIT: u8 = 0x01;
26const SUBGROUP_ID_MODE_MASK: u8 = 0x06;
27const SUBGROUP_END_OF_GROUP_BIT: u8 = 0x08;
28const SUBGROUP_BASE_BIT: u8 = 0x10;
29const SUBGROUP_DEFAULT_PRIORITY_BIT: u8 = 0x20;
30
31#[derive(Debug, Clone)]
32pub struct SubgroupHeader {
33 pub header_type: u8,
34 pub track_alias: VarInt,
35 pub group_id: VarInt,
36 pub subgroup_id: VarInt,
37 pub publisher_priority: Option<u8>,
38}
39
40impl SubgroupHeader {
41 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
42 if buf.remaining() < 1 {
43 return Err(CodecError::UnexpectedEnd);
44 }
45 let header_type = buf.get_u8();
46
47 if header_type & SUBGROUP_BASE_BIT == 0 {
49 return Err(CodecError::InvalidField);
50 }
51
52 let track_alias = VarInt::decode(buf)?;
53 let group_id = VarInt::decode(buf)?;
54
55 let subgroup_id_mode = (header_type & SUBGROUP_ID_MODE_MASK) >> 1;
56 let subgroup_id = match subgroup_id_mode {
57 0 => VarInt::from_u64(0).unwrap(),
58 2 => VarInt::decode(buf)?,
59 _ => VarInt::from_u64(0).unwrap(),
62 };
63
64 let publisher_priority = if header_type & SUBGROUP_DEFAULT_PRIORITY_BIT == 0 {
65 if buf.remaining() < 1 {
66 return Err(CodecError::UnexpectedEnd);
67 }
68 Some(buf.get_u8())
69 } else {
70 None
71 };
72
73 Ok(SubgroupHeader { header_type, track_alias, group_id, subgroup_id, publisher_priority })
74 }
75
76 pub fn encode(&self, buf: &mut impl BufMut) {
77 buf.put_u8(self.header_type);
78 self.track_alias.encode(buf);
79 self.group_id.encode(buf);
80
81 let subgroup_id_mode = (self.header_type & SUBGROUP_ID_MODE_MASK) >> 1;
82 if subgroup_id_mode == 2 {
83 self.subgroup_id.encode(buf);
84 }
85
86 if self.header_type & SUBGROUP_DEFAULT_PRIORITY_BIT == 0 {
87 buf.put_u8(self.publisher_priority.unwrap_or(128));
88 }
89 }
90
91 pub fn has_properties(&self) -> bool {
92 self.header_type & SUBGROUP_PROPERTIES_BIT != 0
93 }
94
95 pub fn is_end_of_group(&self) -> bool {
96 self.header_type & SUBGROUP_END_OF_GROUP_BIT != 0
97 }
98}
99
100#[derive(Debug, Clone)]
108pub struct SubgroupObject {
109 pub object_id: VarInt,
110 pub extension_headers: Vec<u8>,
114 pub payload_length: VarInt,
115 pub object_status: Option<VarInt>,
116 pub payload: Vec<u8>,
117}
118
119#[derive(Debug, Clone)]
120pub struct SubgroupObjectReader {
121 extensions_present: bool,
122 prev_object_id: Option<u64>,
123}
124
125impl SubgroupObjectReader {
126 pub fn new(header: &SubgroupHeader) -> Self {
127 Self { extensions_present: header.has_properties(), prev_object_id: None }
128 }
129
130 pub fn read_object(&mut self, buf: &mut impl Buf) -> Result<SubgroupObject, CodecError> {
131 let delta = VarInt::decode(buf)?.into_inner();
132 let object_id_val = match self.prev_object_id {
133 None => delta,
134 Some(prev) => prev
135 .checked_add(1)
136 .and_then(|v| v.checked_add(delta))
137 .ok_or(CodecError::InvalidField)?,
138 };
139 self.prev_object_id = Some(object_id_val);
140 let object_id = VarInt::from_u64(object_id_val).map_err(|_| CodecError::InvalidField)?;
141
142 let extension_headers = if self.extensions_present {
143 let mut out: Vec<u8> = Vec::new();
144 let ext_count = VarInt::decode(buf)?;
145 ext_count.encode(&mut out);
146 let count = ext_count.into_inner();
147 for _ in 0..count {
148 let key = VarInt::decode(buf)?;
149 let vlen = VarInt::decode(buf)?;
150 let vlen_usize = vlen.into_inner() as usize;
151 if buf.remaining() < vlen_usize {
152 return Err(CodecError::UnexpectedEnd);
153 }
154 key.encode(&mut out);
155 vlen.encode(&mut out);
156 let value = buf.copy_to_bytes(vlen_usize);
157 out.extend_from_slice(&value);
158 }
159 out
160 } else {
161 Vec::new()
162 };
163
164 let payload_length_vi = VarInt::decode(buf)?;
165 let payload_length_val = payload_length_vi.into_inner() as usize;
166 let (object_status, payload) = if payload_length_val == 0 {
167 let status = VarInt::decode(buf)?;
168 (Some(status), Vec::new())
169 } else {
170 let payload = crate::types::read_bytes(buf, payload_length_val)?;
171 (None, payload)
172 };
173
174 Ok(SubgroupObject {
175 object_id,
176 extension_headers,
177 payload_length: payload_length_vi,
178 object_status,
179 payload,
180 })
181 }
182
183 pub fn write_object(
184 &mut self,
185 object: &SubgroupObject,
186 buf: &mut impl BufMut,
187 ) -> Result<(), CodecError> {
188 let oid = object.object_id.into_inner();
189 let delta = match self.prev_object_id {
190 None => oid,
191 Some(prev) => oid
192 .checked_sub(prev)
193 .and_then(|v| v.checked_sub(1))
194 .ok_or(CodecError::InvalidField)?,
195 };
196 VarInt::from_u64(delta).map_err(|_| CodecError::InvalidField)?.encode(buf);
197 if self.extensions_present {
198 buf.put_slice(&object.extension_headers);
199 }
200 object.payload_length.encode(buf);
201 if object.payload_length.into_inner() == 0 {
202 if let Some(s) = &object.object_status {
203 s.encode(buf);
204 } else {
205 VarInt::from_u64(0).unwrap().encode(buf);
206 }
207 } else {
208 buf.put_slice(&object.payload);
209 }
210 self.prev_object_id = Some(oid);
211 Ok(())
212 }
213}
214
215const DATAGRAM_PROPERTIES_BIT: u8 = 0x01;
218const DATAGRAM_END_OF_GROUP_BIT: u8 = 0x02;
219const DATAGRAM_ZERO_OBJECT_ID_BIT: u8 = 0x04;
220const DATAGRAM_DEFAULT_PRIORITY_BIT: u8 = 0x08;
221const DATAGRAM_STATUS_BIT: u8 = 0x20;
222
223#[derive(Debug, Clone)]
224pub struct DatagramHeader {
225 pub datagram_type: u8,
226 pub track_alias: VarInt,
227 pub group_id: VarInt,
228 pub object_id: VarInt,
229 pub publisher_priority: Option<u8>,
230 pub object_status: Option<u8>,
231}
232
233impl DatagramHeader {
234 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
235 if buf.remaining() < 1 {
236 return Err(CodecError::UnexpectedEnd);
237 }
238 let datagram_type = buf.get_u8();
239
240 let track_alias = VarInt::decode(buf)?;
241 let group_id = VarInt::decode(buf)?;
242
243 let object_id = if datagram_type & DATAGRAM_ZERO_OBJECT_ID_BIT != 0 {
244 VarInt::from_usize(0)
245 } else {
246 VarInt::decode(buf)?
247 };
248
249 let publisher_priority = if datagram_type & DATAGRAM_DEFAULT_PRIORITY_BIT == 0 {
250 if buf.remaining() < 1 {
251 return Err(CodecError::UnexpectedEnd);
252 }
253 Some(buf.get_u8())
254 } else {
255 None
256 };
257
258 if datagram_type & DATAGRAM_PROPERTIES_BIT != 0 {
260 let props_len = VarInt::decode(buf)?.into_inner() as usize;
261 if buf.remaining() < props_len {
262 return Err(CodecError::UnexpectedEnd);
263 }
264 buf.advance(props_len);
265 }
266
267 let object_status = if datagram_type & DATAGRAM_STATUS_BIT != 0 {
268 if buf.remaining() < 1 {
269 return Err(CodecError::UnexpectedEnd);
270 }
271 Some(buf.get_u8())
272 } else {
273 None
274 };
275
276 Ok(DatagramHeader {
277 datagram_type,
278 track_alias,
279 group_id,
280 object_id,
281 publisher_priority,
282 object_status,
283 })
284 }
285
286 pub fn encode(&self, buf: &mut impl BufMut) {
287 buf.put_u8(self.datagram_type);
288 self.track_alias.encode(buf);
289 self.group_id.encode(buf);
290
291 if self.datagram_type & DATAGRAM_ZERO_OBJECT_ID_BIT == 0 {
292 self.object_id.encode(buf);
293 }
294
295 if self.datagram_type & DATAGRAM_DEFAULT_PRIORITY_BIT == 0 {
296 buf.put_u8(self.publisher_priority.unwrap_or(128));
297 }
298
299 if self.datagram_type & DATAGRAM_STATUS_BIT != 0 {
300 buf.put_u8(self.object_status.unwrap_or(0));
301 }
302 }
303
304 pub fn is_end_of_group(&self) -> bool {
305 self.datagram_type & DATAGRAM_END_OF_GROUP_BIT != 0
306 }
307
308 pub fn has_status(&self) -> bool {
309 self.datagram_type & DATAGRAM_STATUS_BIT != 0
310 }
311}
312
313const FETCH_STREAM_TYPE: u64 = 0x05;
316
317#[derive(Debug, Clone)]
318pub struct FetchHeader {
319 pub request_id: VarInt,
320}
321
322impl FetchHeader {
323 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
324 let stream_type = VarInt::decode(buf)?.into_inner();
325 if stream_type != FETCH_STREAM_TYPE {
326 return Err(CodecError::InvalidField);
327 }
328 let request_id = VarInt::decode(buf)?;
329 Ok(FetchHeader { request_id })
330 }
331
332 pub fn encode(&self, buf: &mut impl BufMut) {
333 VarInt::from_usize(FETCH_STREAM_TYPE as usize).encode(buf);
334 self.request_id.encode(buf);
335 }
336}