moqtap_codec/draft16/
data_stream.rs1use crate::error::CodecError;
21use crate::varint::VarInt;
22use bytes::{Buf, BufMut};
23
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct SubgroupHeader {
26 pub header_type: u8,
27 pub track_alias: VarInt,
28 pub group_id: VarInt,
29 pub subgroup_id: VarInt,
30 pub publisher_priority: Option<u8>,
31}
32
33impl SubgroupHeader {
34 pub fn has_extensions(&self) -> bool {
35 self.header_type & 0x01 != 0
36 }
37
38 pub fn subgroup_id_from_first_object(&self) -> bool {
41 self.header_type & 0x02 != 0
42 }
43
44 pub fn has_explicit_subgroup_id(&self) -> bool {
45 self.header_type & 0x04 != 0
46 }
47
48 pub fn has_end_of_group(&self) -> bool {
49 self.header_type & 0x08 != 0
50 }
51
52 pub fn has_priority(&self) -> bool {
53 self.header_type & 0x20 == 0
54 }
55
56 pub fn encode(&self, buf: &mut impl BufMut) {
57 VarInt::from_usize(self.header_type as usize).encode(buf);
58 self.track_alias.encode(buf);
59 self.group_id.encode(buf);
60 if self.has_explicit_subgroup_id() {
61 self.subgroup_id.encode(buf);
62 }
63 if let Some(p) = self.publisher_priority {
64 buf.put_u8(p);
65 }
66 }
67
68 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
69 let header_type = VarInt::decode(buf)?.into_inner() as u8;
70 let base = header_type & 0xD0;
71 if base != 0x10 && base != 0x30 {
72 return Err(CodecError::InvalidField);
73 }
74 let track_alias = VarInt::decode(buf)?;
75 let group_id = VarInt::decode(buf)?;
76 let subgroup_id =
77 if header_type & 0x04 != 0 { VarInt::decode(buf)? } else { VarInt::from_usize(0) };
78 let publisher_priority = if header_type & 0x20 == 0 {
79 if buf.remaining() < 1 {
80 return Err(CodecError::UnexpectedEnd);
81 }
82 Some(buf.get_u8())
83 } else {
84 None
85 };
86 Ok(Self { header_type, track_alias, group_id, subgroup_id, publisher_priority })
87 }
88}
89
90#[derive(Debug, Clone, PartialEq, Eq)]
94pub struct SubgroupObject {
95 pub object_id: VarInt,
96 pub extension_headers: Vec<u8>,
97 pub payload_length: VarInt,
98 pub object_status: Option<VarInt>,
99 pub payload: Vec<u8>,
100}
101
102#[derive(Debug, Clone)]
106pub struct SubgroupObjectReader {
107 extensions_present: bool,
108 prev_object_id: Option<u64>,
109}
110
111impl SubgroupObjectReader {
112 pub fn new(header: &SubgroupHeader) -> Self {
113 Self { extensions_present: header.has_extensions(), prev_object_id: None }
114 }
115
116 pub fn read_object(&mut self, buf: &mut impl Buf) -> Result<SubgroupObject, CodecError> {
117 let delta = VarInt::decode(buf)?.into_inner();
118 let object_id_val = match self.prev_object_id {
125 None => delta,
126 Some(prev) => {
127 if self.extensions_present {
128 prev.checked_add(delta).ok_or(CodecError::InvalidField)?
129 } else {
130 prev.checked_add(1)
131 .and_then(|v| v.checked_add(delta))
132 .ok_or(CodecError::InvalidField)?
133 }
134 }
135 };
136 self.prev_object_id = Some(object_id_val);
137 let object_id = VarInt::from_u64(object_id_val).map_err(|_| CodecError::InvalidField)?;
138
139 let extension_headers = if self.extensions_present {
141 let ext_len = VarInt::decode(buf)?.into_inner() as usize;
142 crate::types::read_bytes(buf, ext_len)?
143 } else {
144 Vec::new()
145 };
146
147 let payload_length_vi = VarInt::decode(buf)?;
148 let payload_length_val = payload_length_vi.into_inner() as usize;
149 let (object_status, payload) = if payload_length_val == 0 {
150 let status = VarInt::decode(buf)?;
151 (Some(status), Vec::new())
152 } else {
153 let payload = crate::types::read_bytes(buf, payload_length_val)?;
154 (None, payload)
155 };
156
157 Ok(SubgroupObject {
158 object_id,
159 extension_headers,
160 payload_length: payload_length_vi,
161 object_status,
162 payload,
163 })
164 }
165
166 pub fn write_object(
167 &mut self,
168 object: &SubgroupObject,
169 buf: &mut impl BufMut,
170 ) -> Result<(), CodecError> {
171 let oid = object.object_id.into_inner();
172 let delta = match self.prev_object_id {
173 None => oid,
174 Some(prev) => {
175 if self.extensions_present {
176 oid.checked_sub(prev).ok_or(CodecError::InvalidField)?
177 } else {
178 oid.checked_sub(prev)
179 .and_then(|v| v.checked_sub(1))
180 .ok_or(CodecError::InvalidField)?
181 }
182 }
183 };
184 VarInt::from_u64(delta).map_err(|_| CodecError::InvalidField)?.encode(buf);
185 if self.extensions_present {
186 let ext_len = object.extension_headers.len();
187 VarInt::from_usize(ext_len).encode(buf);
188 buf.put_slice(&object.extension_headers);
189 }
190 object.payload_length.encode(buf);
191 if object.payload_length.into_inner() == 0 {
192 if let Some(s) = &object.object_status {
193 s.encode(buf);
194 } else {
195 VarInt::from_u64(0).unwrap().encode(buf);
196 }
197 } else {
198 buf.put_slice(&object.payload);
199 }
200 self.prev_object_id = Some(oid);
201 Ok(())
202 }
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct DatagramHeader {
207 pub datagram_type: u8,
208 pub track_alias: VarInt,
209 pub group_id: VarInt,
210 pub object_id: VarInt,
211 pub publisher_priority: Option<u8>,
214 pub extension_headers: Vec<u8>,
216 pub object_status: Option<VarInt>,
217}
218
219impl DatagramHeader {
220 pub fn has_extensions(&self) -> bool {
221 self.datagram_type & 0x01 != 0
222 }
223
224 pub fn is_end_of_group(&self) -> bool {
225 self.datagram_type & 0x02 != 0
226 }
227
228 pub fn has_object_id(&self) -> bool {
229 self.datagram_type & 0x04 == 0
230 }
231
232 pub fn has_default_priority(&self) -> bool {
235 self.datagram_type & 0x08 != 0
236 }
237
238 pub fn is_status(&self) -> bool {
239 self.datagram_type & 0x20 != 0
240 }
241
242 pub fn encode(&self, buf: &mut impl BufMut) {
243 VarInt::from_usize(self.datagram_type as usize).encode(buf);
244 self.track_alias.encode(buf);
245 self.group_id.encode(buf);
246 if self.has_object_id() {
247 self.object_id.encode(buf);
248 }
249 if !self.has_default_priority() {
250 buf.put_u8(self.publisher_priority.unwrap_or(128));
251 }
252 if self.has_extensions() {
253 VarInt::from_usize(self.extension_headers.len()).encode(buf);
254 buf.put_slice(&self.extension_headers);
255 }
256 if self.is_status() {
257 if let Some(s) = &self.object_status {
258 s.encode(buf);
259 }
260 }
261 }
262
263 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
264 let datagram_type = VarInt::decode(buf)?.into_inner() as u8;
265 let track_alias = VarInt::decode(buf)?;
266 let group_id = VarInt::decode(buf)?;
267 let object_id =
268 if datagram_type & 0x04 == 0 { VarInt::decode(buf)? } else { VarInt::from_usize(0) };
269 let publisher_priority = if datagram_type & 0x08 != 0 {
270 None
271 } else {
272 if buf.remaining() < 1 {
273 return Err(CodecError::UnexpectedEnd);
274 }
275 Some(buf.get_u8())
276 };
277 let extension_headers = if datagram_type & 0x01 != 0 {
278 let ext_len = VarInt::decode(buf)?.into_inner() as usize;
279 crate::types::read_bytes(buf, ext_len)?
280 } else {
281 Vec::new()
282 };
283 let object_status =
284 if datagram_type & 0x20 != 0 { Some(VarInt::decode(buf)?) } else { None };
285 Ok(Self {
286 datagram_type,
287 track_alias,
288 group_id,
289 object_id,
290 publisher_priority,
291 extension_headers,
292 object_status,
293 })
294 }
295}
296
297#[derive(Debug, Clone, PartialEq, Eq)]
298pub struct FetchHeader {
299 pub request_id: VarInt,
300}
301
302impl FetchHeader {
303 pub fn encode(&self, buf: &mut impl BufMut) {
304 VarInt::from_usize(0x05).encode(buf);
305 self.request_id.encode(buf);
306 }
307
308 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
309 let stream_type = VarInt::decode(buf)?.into_inner();
310 if stream_type != 0x05 {
311 return Err(CodecError::InvalidField);
312 }
313 let request_id = VarInt::decode(buf)?;
314 Ok(Self { request_id })
315 }
316}