1use super::types::ObjectStatus;
9use crate::error::CodecError;
10use crate::types::read_bytes;
11use crate::varint::VarInt;
12use bytes::{Buf, BufMut};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16#[repr(u64)]
17pub enum StreamType {
18 Datagram = 0x00,
20 DatagramExt = 0x01,
22 DatagramStatus = 0x02,
24 DatagramStatusExt = 0x03,
26 Fetch = 0x05,
28 SubgroupZero = 0x10,
30 SubgroupZeroExt = 0x11,
32 SubgroupFirstObj = 0x12,
34 SubgroupFirstObjExt = 0x13,
36 SubgroupExplicit = 0x14,
38 SubgroupExplicitExt = 0x15,
40 SubgroupZeroEog = 0x18,
42 SubgroupZeroEogExt = 0x19,
44 SubgroupFirstObjEog = 0x1A,
46 SubgroupFirstObjEogExt = 0x1B,
48 SubgroupExplicitEog = 0x1C,
50 SubgroupExplicitEogExt = 0x1D,
52}
53
54impl StreamType {
55 pub fn from_id(id: u64) -> Option<Self> {
56 match id {
57 0x00 => Some(StreamType::Datagram),
58 0x01 => Some(StreamType::DatagramExt),
59 0x02 => Some(StreamType::DatagramStatus),
60 0x03 => Some(StreamType::DatagramStatusExt),
61 0x05 => Some(StreamType::Fetch),
62 0x10 => Some(StreamType::SubgroupZero),
63 0x11 => Some(StreamType::SubgroupZeroExt),
64 0x12 => Some(StreamType::SubgroupFirstObj),
65 0x13 => Some(StreamType::SubgroupFirstObjExt),
66 0x14 => Some(StreamType::SubgroupExplicit),
67 0x15 => Some(StreamType::SubgroupExplicitExt),
68 0x18 => Some(StreamType::SubgroupZeroEog),
69 0x19 => Some(StreamType::SubgroupZeroEogExt),
70 0x1A => Some(StreamType::SubgroupFirstObjEog),
71 0x1B => Some(StreamType::SubgroupFirstObjEogExt),
72 0x1C => Some(StreamType::SubgroupExplicitEog),
73 0x1D => Some(StreamType::SubgroupExplicitEogExt),
74 _ => None,
75 }
76 }
77
78 pub fn is_subgroup(&self) -> bool {
79 matches!(
80 self,
81 StreamType::SubgroupZero
82 | StreamType::SubgroupZeroExt
83 | StreamType::SubgroupFirstObj
84 | StreamType::SubgroupFirstObjExt
85 | StreamType::SubgroupExplicit
86 | StreamType::SubgroupExplicitExt
87 | StreamType::SubgroupZeroEog
88 | StreamType::SubgroupZeroEogExt
89 | StreamType::SubgroupFirstObjEog
90 | StreamType::SubgroupFirstObjEogExt
91 | StreamType::SubgroupExplicitEog
92 | StreamType::SubgroupExplicitEogExt
93 )
94 }
95
96 pub fn has_extensions(&self) -> bool {
97 matches!(
98 self,
99 StreamType::DatagramExt
100 | StreamType::DatagramStatusExt
101 | StreamType::SubgroupZeroExt
102 | StreamType::SubgroupFirstObjExt
103 | StreamType::SubgroupExplicitExt
104 | StreamType::SubgroupZeroEogExt
105 | StreamType::SubgroupFirstObjEogExt
106 | StreamType::SubgroupExplicitEogExt
107 )
108 }
109
110 pub fn contains_end_of_group(&self) -> bool {
112 matches!(
113 self,
114 StreamType::SubgroupZeroEog
115 | StreamType::SubgroupZeroEogExt
116 | StreamType::SubgroupFirstObjEog
117 | StreamType::SubgroupFirstObjEogExt
118 | StreamType::SubgroupExplicitEog
119 | StreamType::SubgroupExplicitEogExt
120 )
121 }
122}
123
124fn read_extension_bytes(buf: &mut impl Buf, byte_len: u64) -> Result<Vec<u8>, CodecError> {
127 read_bytes(buf, byte_len as usize)
128}
129
130#[derive(Debug, Clone, PartialEq, Eq)]
135pub struct SubgroupHeader {
136 pub stream_type: StreamType,
137 pub track_alias: VarInt,
138 pub group_id: VarInt,
139 pub subgroup_id: VarInt,
140 pub publisher_priority: u8,
141}
142
143impl SubgroupHeader {
144 pub fn encode(&self, buf: &mut impl BufMut) {
145 self.track_alias.encode(buf);
146 self.group_id.encode(buf);
147 match self.stream_type {
148 StreamType::SubgroupExplicit
149 | StreamType::SubgroupExplicitExt
150 | StreamType::SubgroupExplicitEog
151 | StreamType::SubgroupExplicitEogExt => {
152 self.subgroup_id.encode(buf);
153 }
154 _ => {}
155 }
156 buf.put_u8(self.publisher_priority);
157 }
158
159 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
160 Self::decode_with_type(StreamType::SubgroupExplicit, buf)
161 }
162
163 pub fn decode_with_type(
164 stream_type: StreamType,
165 buf: &mut impl Buf,
166 ) -> Result<Self, CodecError> {
167 let track_alias = VarInt::decode(buf)?;
168 let group_id = VarInt::decode(buf)?;
169 let subgroup_id = match stream_type {
170 StreamType::SubgroupZero
171 | StreamType::SubgroupZeroExt
172 | StreamType::SubgroupZeroEog
173 | StreamType::SubgroupZeroEogExt => VarInt::from_usize(0),
174 StreamType::SubgroupExplicit
175 | StreamType::SubgroupExplicitExt
176 | StreamType::SubgroupExplicitEog
177 | StreamType::SubgroupExplicitEogExt => VarInt::decode(buf)?,
178 StreamType::SubgroupFirstObj
179 | StreamType::SubgroupFirstObjExt
180 | StreamType::SubgroupFirstObjEog
181 | StreamType::SubgroupFirstObjEogExt => VarInt::from_usize(0),
182 _ => return Err(CodecError::InvalidField),
183 };
184 if buf.remaining() < 1 {
185 return Err(CodecError::UnexpectedEnd);
186 }
187 let publisher_priority = buf.get_u8();
188 Ok(Self { stream_type, track_alias, group_id, subgroup_id, publisher_priority })
189 }
190}
191
192#[derive(Debug, Clone, PartialEq, Eq)]
197pub struct ObjectHeader {
198 pub object_id: VarInt,
199 pub extension_headers_length: VarInt,
200 pub extensions: Vec<u8>,
201 pub payload_length: VarInt,
202 pub object_status: ObjectStatus,
203}
204
205impl ObjectHeader {
206 pub fn encode(&self, buf: &mut impl BufMut) {
207 self.encode_with_extensions(false, buf);
208 }
209
210 pub fn encode_with_extensions(&self, has_extensions: bool, buf: &mut impl BufMut) {
211 self.object_id.encode(buf);
212 if has_extensions {
213 self.extension_headers_length.encode(buf);
214 buf.put_slice(&self.extensions);
215 }
216 self.payload_length.encode(buf);
217 if self.payload_length.into_inner() == 0 {
218 VarInt::from_usize(self.object_status as usize).encode(buf);
219 }
220 }
221
222 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
223 Self::decode_with_extensions(false, buf)
224 }
225
226 pub fn decode_with_extensions(
227 has_extensions: bool,
228 buf: &mut impl Buf,
229 ) -> Result<Self, CodecError> {
230 let object_id = VarInt::decode(buf)?;
231 let (extension_headers_length, extensions) = if has_extensions {
232 let ehl = VarInt::decode(buf)?;
233 let ext = read_extension_bytes(buf, ehl.into_inner())?;
234 (ehl, ext)
235 } else {
236 (VarInt::from_usize(0), Vec::new())
237 };
238 let payload_length = VarInt::decode(buf)?;
239 let object_status = if payload_length.into_inner() == 0 {
240 let sv = VarInt::decode(buf)?.into_inner();
241 ObjectStatus::from_u64(sv).ok_or(CodecError::InvalidField)?
242 } else {
243 ObjectStatus::Normal
244 };
245 Ok(Self { object_id, extension_headers_length, extensions, payload_length, object_status })
246 }
247}
248
249#[derive(Debug, Clone, PartialEq, Eq)]
254pub struct DatagramHeader {
255 pub track_alias: VarInt,
256 pub group_id: VarInt,
257 pub object_id: VarInt,
258 pub publisher_priority: u8,
259 pub extension_headers_length: VarInt,
260 pub extensions: Vec<u8>,
261}
262
263impl DatagramHeader {
264 pub fn encode(&self, buf: &mut impl BufMut) {
265 self.encode_with_extensions(false, buf);
266 }
267
268 pub fn encode_with_extensions(&self, has_extensions: bool, buf: &mut impl BufMut) {
269 self.track_alias.encode(buf);
270 self.group_id.encode(buf);
271 self.object_id.encode(buf);
272 buf.put_u8(self.publisher_priority);
273 if has_extensions {
274 self.extension_headers_length.encode(buf);
275 buf.put_slice(&self.extensions);
276 }
277 }
278
279 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
280 Self::decode_with_extensions(false, buf)
281 }
282
283 pub fn decode_with_extensions(
284 has_extensions: bool,
285 buf: &mut impl Buf,
286 ) -> Result<Self, CodecError> {
287 let track_alias = VarInt::decode(buf)?;
288 let group_id = VarInt::decode(buf)?;
289 let object_id = VarInt::decode(buf)?;
290 if buf.remaining() < 1 {
291 return Err(CodecError::UnexpectedEnd);
292 }
293 let publisher_priority = buf.get_u8();
294 let (extension_headers_length, extensions) = if has_extensions {
295 let ehl = VarInt::decode(buf)?;
296 let ext = read_extension_bytes(buf, ehl.into_inner())?;
297 (ehl, ext)
298 } else {
299 (VarInt::from_usize(0), Vec::new())
300 };
301 Ok(Self {
302 track_alias,
303 group_id,
304 object_id,
305 publisher_priority,
306 extension_headers_length,
307 extensions,
308 })
309 }
310}
311
312#[derive(Debug, Clone, PartialEq, Eq)]
317pub struct DatagramStatusHeader {
318 pub track_alias: VarInt,
319 pub group_id: VarInt,
320 pub object_id: VarInt,
321 pub publisher_priority: u8,
322 pub extension_headers_length: VarInt,
323 pub extensions: Vec<u8>,
324 pub object_status: ObjectStatus,
325}
326
327impl DatagramStatusHeader {
328 pub fn encode(&self, buf: &mut impl BufMut) {
329 self.encode_with_extensions(false, buf);
330 }
331
332 pub fn encode_with_extensions(&self, has_extensions: bool, buf: &mut impl BufMut) {
333 self.track_alias.encode(buf);
334 self.group_id.encode(buf);
335 self.object_id.encode(buf);
336 buf.put_u8(self.publisher_priority);
337 if has_extensions {
338 self.extension_headers_length.encode(buf);
339 buf.put_slice(&self.extensions);
340 }
341 VarInt::from_usize(self.object_status as usize).encode(buf);
342 }
343
344 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
345 Self::decode_with_extensions(false, buf)
346 }
347
348 pub fn decode_with_extensions(
349 has_extensions: bool,
350 buf: &mut impl Buf,
351 ) -> Result<Self, CodecError> {
352 let track_alias = VarInt::decode(buf)?;
353 let group_id = VarInt::decode(buf)?;
354 let object_id = VarInt::decode(buf)?;
355 if buf.remaining() < 1 {
356 return Err(CodecError::UnexpectedEnd);
357 }
358 let publisher_priority = buf.get_u8();
359 let (extension_headers_length, extensions) = if has_extensions {
360 let ehl = VarInt::decode(buf)?;
361 let ext = read_extension_bytes(buf, ehl.into_inner())?;
362 (ehl, ext)
363 } else {
364 (VarInt::from_usize(0), Vec::new())
365 };
366 let sv = VarInt::decode(buf)?.into_inner();
367 let object_status = ObjectStatus::from_u64(sv).ok_or(CodecError::InvalidField)?;
368 Ok(Self {
369 track_alias,
370 group_id,
371 object_id,
372 publisher_priority,
373 extension_headers_length,
374 extensions,
375 object_status,
376 })
377 }
378}
379
380#[derive(Debug, Clone, PartialEq, Eq)]
385pub struct FetchHeader {
386 pub request_id: VarInt,
387}
388
389#[derive(Debug, Clone, PartialEq, Eq)]
390pub struct FetchObjectHeader {
391 pub group_id: VarInt,
392 pub subgroup_id: VarInt,
393 pub object_id: VarInt,
394 pub publisher_priority: u8,
395 pub extension_headers_length: VarInt,
396 pub extensions: Vec<u8>,
397 pub payload_length: VarInt,
398 pub object_status: ObjectStatus,
399}
400
401impl FetchHeader {
402 pub fn encode(&self, buf: &mut impl BufMut) {
403 self.request_id.encode(buf);
404 }
405
406 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
407 let request_id = VarInt::decode(buf)?;
408 Ok(Self { request_id })
409 }
410}
411
412impl FetchObjectHeader {
413 pub fn encode(&self, buf: &mut impl BufMut) {
414 self.group_id.encode(buf);
415 self.subgroup_id.encode(buf);
416 self.object_id.encode(buf);
417 buf.put_u8(self.publisher_priority);
418 self.extension_headers_length.encode(buf);
419 buf.put_slice(&self.extensions);
420 self.payload_length.encode(buf);
421 if self.payload_length.into_inner() == 0 {
422 VarInt::from_usize(self.object_status as usize).encode(buf);
423 }
424 }
425
426 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
427 let group_id = VarInt::decode(buf)?;
428 let subgroup_id = VarInt::decode(buf)?;
429 let object_id = VarInt::decode(buf)?;
430 if buf.remaining() < 1 {
431 return Err(CodecError::UnexpectedEnd);
432 }
433 let publisher_priority = buf.get_u8();
434 let extension_headers_length = VarInt::decode(buf)?;
435 let extensions = read_extension_bytes(buf, extension_headers_length.into_inner())?;
436 let payload_length = VarInt::decode(buf)?;
437 let object_status = if payload_length.into_inner() == 0 {
438 let sv = VarInt::decode(buf)?.into_inner();
439 ObjectStatus::from_u64(sv).ok_or(CodecError::InvalidField)?
440 } else {
441 ObjectStatus::Normal
442 };
443 Ok(Self {
444 group_id,
445 subgroup_id,
446 object_id,
447 publisher_priority,
448 extension_headers_length,
449 extensions,
450 payload_length,
451 object_status,
452 })
453 }
454}