moqtap_codec/draft15/
data_stream.rs1use crate::error::CodecError;
14use crate::varint::VarInt;
15use bytes::{Buf, BufMut};
16
17#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct SubgroupHeader {
28 pub header_type: u8,
29 pub track_alias: VarInt,
30 pub group_id: VarInt,
31 pub subgroup_id: VarInt,
32 pub publisher_priority: Option<u8>,
33}
34
35impl SubgroupHeader {
36 pub fn has_extensions(&self) -> bool {
37 self.header_type & 0x01 != 0
38 }
39
40 pub fn has_end_of_group(&self) -> bool {
41 self.header_type & 0x02 != 0
42 }
43
44 pub fn has_explicit_subgroup_id(&self) -> bool {
45 self.header_type & 0x04 != 0
46 }
47
48 pub fn has_priority(&self) -> bool {
49 self.header_type & 0x20 == 0
50 }
51
52 pub fn encode(&self, buf: &mut impl BufMut) {
53 VarInt::from_usize(self.header_type as usize).encode(buf);
54 self.track_alias.encode(buf);
55 self.group_id.encode(buf);
56 if self.has_explicit_subgroup_id() {
57 self.subgroup_id.encode(buf);
58 }
59 if let Some(p) = self.publisher_priority {
60 buf.put_u8(p);
61 }
62 }
63
64 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
65 let header_type = VarInt::decode(buf)?.into_inner() as u8;
66 let base = header_type & 0xD0; if base != 0x10 && base != 0x30 {
68 return Err(CodecError::InvalidField);
69 }
70 let track_alias = VarInt::decode(buf)?;
71 let group_id = VarInt::decode(buf)?;
72 let subgroup_id =
73 if header_type & 0x04 != 0 { VarInt::decode(buf)? } else { VarInt::from_usize(0) };
74 let publisher_priority = if header_type & 0x20 == 0 {
75 if buf.remaining() < 1 {
76 return Err(CodecError::UnexpectedEnd);
77 }
78 Some(buf.get_u8())
79 } else {
80 None
81 };
82 Ok(Self { header_type, track_alias, group_id, subgroup_id, publisher_priority })
83 }
84}
85
86#[derive(Debug, Clone, PartialEq, Eq)]
96pub struct SubgroupObject {
97 pub object_id: VarInt,
99 pub extension_headers: Vec<u8>,
104 pub payload_length: VarInt,
107 pub object_status: Option<VarInt>,
109 pub payload: Vec<u8>,
111}
112
113#[derive(Debug, Clone)]
118pub struct SubgroupObjectReader {
119 extensions_present: bool,
120 prev_object_id: Option<u64>,
121}
122
123impl SubgroupObjectReader {
124 pub fn new(header: &SubgroupHeader) -> Self {
126 Self { extensions_present: header.has_extensions(), prev_object_id: None }
127 }
128
129 pub fn read_object(&mut self, buf: &mut impl Buf) -> Result<SubgroupObject, CodecError> {
131 let delta = VarInt::decode(buf)?.into_inner();
132 let object_id_val = match self.prev_object_id {
139 None => delta,
140 Some(prev) => {
141 if self.extensions_present {
142 prev.checked_add(delta).ok_or(CodecError::InvalidField)?
143 } else {
144 prev.checked_add(1)
145 .and_then(|v| v.checked_add(delta))
146 .ok_or(CodecError::InvalidField)?
147 }
148 }
149 };
150 self.prev_object_id = Some(object_id_val);
151 let object_id = VarInt::from_u64(object_id_val).map_err(|_| CodecError::InvalidField)?;
152
153 let extension_headers = if self.extensions_present {
154 let ext_len = VarInt::decode(buf)?.into_inner() as usize;
158 crate::types::read_bytes(buf, ext_len)?
159 } else {
160 Vec::new()
161 };
162
163 let payload_length_vi = VarInt::decode(buf)?;
164 let payload_length_val = payload_length_vi.into_inner() as usize;
165 let (object_status, payload) = if payload_length_val == 0 {
166 let status = VarInt::decode(buf)?;
167 (Some(status), Vec::new())
168 } else {
169 let payload = crate::types::read_bytes(buf, payload_length_val)?;
170 (None, payload)
171 };
172
173 Ok(SubgroupObject {
174 object_id,
175 extension_headers,
176 payload_length: payload_length_vi,
177 object_status,
178 payload,
179 })
180 }
181
182 pub fn write_object(
184 &mut self,
185 object: &SubgroupObject,
186 buf: &mut impl BufMut,
187 ) -> Result<(), CodecError> {
188 let oid = object.object_id.into_inner();
189 let delta = match self.prev_object_id {
190 None => oid,
191 Some(prev) => oid.checked_sub(prev).ok_or(CodecError::InvalidField)?,
192 };
193 VarInt::from_u64(delta).map_err(|_| CodecError::InvalidField)?.encode(buf);
194 if self.extensions_present {
195 let ext_len = object.extension_headers.len();
196 VarInt::from_usize(ext_len).encode(buf);
197 buf.put_slice(&object.extension_headers);
198 }
199 object.payload_length.encode(buf);
200 if object.payload_length.into_inner() == 0 {
201 if let Some(s) = &object.object_status {
202 s.encode(buf);
203 } else {
204 VarInt::from_u64(0).unwrap().encode(buf);
206 }
207 } else {
208 buf.put_slice(&object.payload);
209 }
210 self.prev_object_id = Some(oid);
211 Ok(())
212 }
213}
214
215#[derive(Debug, Clone, PartialEq, Eq)]
224pub struct DatagramHeader {
225 pub datagram_type: u8,
227 pub track_alias: VarInt,
229 pub group_id: VarInt,
231 pub object_id: VarInt,
233 pub publisher_priority: u8,
235 pub extension_headers: Vec<u8>,
237 pub object_status: Option<VarInt>,
239}
240
241impl DatagramHeader {
242 pub fn has_object_id(&self) -> bool {
244 self.datagram_type & 0x04 == 0
245 }
246
247 pub fn is_end_of_group(&self) -> bool {
249 self.datagram_type & 0x02 != 0
250 }
251
252 pub fn is_status(&self) -> bool {
254 self.datagram_type & 0x20 != 0
255 }
256
257 pub fn has_extensions(&self) -> bool {
259 self.datagram_type & 0x01 != 0
260 }
261
262 pub fn encode(&self, buf: &mut impl BufMut) {
264 VarInt::from_usize(self.datagram_type as usize).encode(buf);
265 self.track_alias.encode(buf);
266 self.group_id.encode(buf);
267 if self.has_object_id() {
268 self.object_id.encode(buf);
269 }
270 buf.put_u8(self.publisher_priority);
271 if self.has_extensions() {
272 VarInt::from_usize(self.extension_headers.len()).encode(buf);
273 buf.put_slice(&self.extension_headers);
274 }
275 if self.is_status() {
276 if let Some(s) = &self.object_status {
277 s.encode(buf);
278 }
279 }
280 }
281
282 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
284 let datagram_type = VarInt::decode(buf)?.into_inner() as u8;
285 let track_alias = VarInt::decode(buf)?;
286 let group_id = VarInt::decode(buf)?;
287 let object_id =
288 if datagram_type & 0x04 == 0 { VarInt::decode(buf)? } else { VarInt::from_usize(0) };
289 if buf.remaining() < 1 {
290 return Err(CodecError::UnexpectedEnd);
291 }
292 let publisher_priority = buf.get_u8();
293 let extension_headers = if datagram_type & 0x01 != 0 {
294 let ext_len = VarInt::decode(buf)?.into_inner() as usize;
295 crate::types::read_bytes(buf, ext_len)?
296 } else {
297 Vec::new()
298 };
299 let object_status =
300 if datagram_type & 0x20 != 0 { Some(VarInt::decode(buf)?) } else { None };
301 Ok(Self {
302 datagram_type,
303 track_alias,
304 group_id,
305 object_id,
306 publisher_priority,
307 extension_headers,
308 object_status,
309 })
310 }
311}
312
313#[derive(Debug, Clone, PartialEq, Eq)]
319pub struct FetchHeader {
320 pub request_id: VarInt,
321}
322
323impl FetchHeader {
324 pub fn encode(&self, buf: &mut impl BufMut) {
325 VarInt::from_usize(0x05).encode(buf);
326 self.request_id.encode(buf);
327 }
328
329 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
330 let stream_type = VarInt::decode(buf)?.into_inner();
331 if stream_type != 0x05 {
332 return Err(CodecError::InvalidField);
333 }
334 let request_id = VarInt::decode(buf)?;
335 Ok(Self { request_id })
336 }
337}