1use super::types::ObjectStatus;
8use crate::error::CodecError;
9use crate::types::read_bytes;
10use crate::varint::VarInt;
11use bytes::{Buf, BufMut};
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14#[repr(u64)]
15pub enum StreamType {
16 Fetch = 0x05,
17 SubgroupZero = 0x10,
18 SubgroupZeroExt = 0x11,
19 SubgroupFirstObj = 0x12,
20 SubgroupFirstObjExt = 0x13,
21 SubgroupExplicit = 0x14,
22 SubgroupExplicitExt = 0x15,
23 SubgroupZeroEog = 0x18,
24 SubgroupZeroEogExt = 0x19,
25 SubgroupFirstObjEog = 0x1A,
26 SubgroupFirstObjEogExt = 0x1B,
27 SubgroupExplicitEog = 0x1C,
28 SubgroupExplicitEogExt = 0x1D,
29}
30
31impl StreamType {
32 pub fn from_id(id: u64) -> Option<Self> {
33 match id {
34 0x05 => Some(StreamType::Fetch),
35 0x10 => Some(StreamType::SubgroupZero),
36 0x11 => Some(StreamType::SubgroupZeroExt),
37 0x12 => Some(StreamType::SubgroupFirstObj),
38 0x13 => Some(StreamType::SubgroupFirstObjExt),
39 0x14 => Some(StreamType::SubgroupExplicit),
40 0x15 => Some(StreamType::SubgroupExplicitExt),
41 0x18 => Some(StreamType::SubgroupZeroEog),
42 0x19 => Some(StreamType::SubgroupZeroEogExt),
43 0x1A => Some(StreamType::SubgroupFirstObjEog),
44 0x1B => Some(StreamType::SubgroupFirstObjEogExt),
45 0x1C => Some(StreamType::SubgroupExplicitEog),
46 0x1D => Some(StreamType::SubgroupExplicitEogExt),
47 _ => None,
48 }
49 }
50
51 pub fn is_subgroup(&self) -> bool {
52 matches!(
53 self,
54 StreamType::SubgroupZero
55 | StreamType::SubgroupZeroExt
56 | StreamType::SubgroupFirstObj
57 | StreamType::SubgroupFirstObjExt
58 | StreamType::SubgroupExplicit
59 | StreamType::SubgroupExplicitExt
60 | StreamType::SubgroupZeroEog
61 | StreamType::SubgroupZeroEogExt
62 | StreamType::SubgroupFirstObjEog
63 | StreamType::SubgroupFirstObjEogExt
64 | StreamType::SubgroupExplicitEog
65 | StreamType::SubgroupExplicitEogExt
66 )
67 }
68
69 pub fn has_extensions(&self) -> bool {
70 matches!(
71 self,
72 StreamType::SubgroupZeroExt
73 | StreamType::SubgroupFirstObjExt
74 | StreamType::SubgroupExplicitExt
75 | StreamType::SubgroupZeroEogExt
76 | StreamType::SubgroupFirstObjEogExt
77 | StreamType::SubgroupExplicitEogExt
78 )
79 }
80
81 pub fn contains_end_of_group(&self) -> bool {
82 matches!(
83 self,
84 StreamType::SubgroupZeroEog
85 | StreamType::SubgroupZeroEogExt
86 | StreamType::SubgroupFirstObjEog
87 | StreamType::SubgroupFirstObjEogExt
88 | StreamType::SubgroupExplicitEog
89 | StreamType::SubgroupExplicitEogExt
90 )
91 }
92}
93
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96#[repr(u64)]
97pub enum DatagramType {
98 Datagram = 0x00,
99 DatagramExt = 0x01,
100 DatagramEog = 0x02,
101 DatagramEogExt = 0x03,
102 DatagramStatus = 0x04,
103 DatagramStatusExt = 0x05,
104}
105
106impl DatagramType {
107 pub fn from_id(id: u64) -> Option<Self> {
108 match id {
109 0x00 => Some(DatagramType::Datagram),
110 0x01 => Some(DatagramType::DatagramExt),
111 0x02 => Some(DatagramType::DatagramEog),
112 0x03 => Some(DatagramType::DatagramEogExt),
113 0x04 => Some(DatagramType::DatagramStatus),
114 0x05 => Some(DatagramType::DatagramStatusExt),
115 _ => None,
116 }
117 }
118
119 pub fn has_extensions(&self) -> bool {
120 matches!(
121 self,
122 DatagramType::DatagramExt
123 | DatagramType::DatagramEogExt
124 | DatagramType::DatagramStatusExt
125 )
126 }
127
128 pub fn is_status(&self) -> bool {
129 matches!(self, DatagramType::DatagramStatus | DatagramType::DatagramStatusExt)
130 }
131
132 pub fn is_end_of_group(&self) -> bool {
133 matches!(self, DatagramType::DatagramEog | DatagramType::DatagramEogExt)
134 }
135}
136
137fn read_extension_bytes(buf: &mut impl Buf, byte_len: u64) -> Result<Vec<u8>, CodecError> {
138 read_bytes(buf, byte_len as usize)
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
146pub struct SubgroupHeader {
147 pub stream_type: StreamType,
148 pub track_alias: VarInt,
149 pub group_id: VarInt,
150 pub subgroup_id: VarInt,
151 pub publisher_priority: u8,
152}
153
154impl SubgroupHeader {
155 pub fn encode(&self, buf: &mut impl BufMut) {
156 self.track_alias.encode(buf);
157 self.group_id.encode(buf);
158 match self.stream_type {
159 StreamType::SubgroupExplicit
160 | StreamType::SubgroupExplicitExt
161 | StreamType::SubgroupExplicitEog
162 | StreamType::SubgroupExplicitEogExt => {
163 self.subgroup_id.encode(buf);
164 }
165 _ => {}
166 }
167 buf.put_u8(self.publisher_priority);
168 }
169
170 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
171 Self::decode_with_type(StreamType::SubgroupExplicit, buf)
172 }
173
174 pub fn decode_with_type(
175 stream_type: StreamType,
176 buf: &mut impl Buf,
177 ) -> Result<Self, CodecError> {
178 let track_alias = VarInt::decode(buf)?;
179 let group_id = VarInt::decode(buf)?;
180 let subgroup_id = match stream_type {
181 StreamType::SubgroupZero
182 | StreamType::SubgroupZeroExt
183 | StreamType::SubgroupZeroEog
184 | StreamType::SubgroupZeroEogExt => VarInt::from_usize(0),
185 StreamType::SubgroupExplicit
186 | StreamType::SubgroupExplicitExt
187 | StreamType::SubgroupExplicitEog
188 | StreamType::SubgroupExplicitEogExt => VarInt::decode(buf)?,
189 StreamType::SubgroupFirstObj
190 | StreamType::SubgroupFirstObjExt
191 | StreamType::SubgroupFirstObjEog
192 | StreamType::SubgroupFirstObjEogExt => VarInt::from_usize(0),
193 _ => return Err(CodecError::InvalidField),
194 };
195 if buf.remaining() < 1 {
196 return Err(CodecError::UnexpectedEnd);
197 }
198 let publisher_priority = buf.get_u8();
199 Ok(Self { stream_type, track_alias, group_id, subgroup_id, publisher_priority })
200 }
201}
202
203#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct ObjectHeader {
209 pub object_id: VarInt,
210 pub extension_headers_length: VarInt,
211 pub extensions: Vec<u8>,
212 pub payload_length: VarInt,
213 pub object_status: ObjectStatus,
214}
215
216impl ObjectHeader {
217 pub fn encode(&self, buf: &mut impl BufMut) {
218 self.encode_with_extensions(false, buf);
219 }
220
221 pub fn encode_with_extensions(&self, has_extensions: bool, buf: &mut impl BufMut) {
222 self.object_id.encode(buf);
223 if has_extensions {
224 self.extension_headers_length.encode(buf);
225 buf.put_slice(&self.extensions);
226 }
227 self.payload_length.encode(buf);
228 if self.payload_length.into_inner() == 0 {
229 VarInt::from_usize(self.object_status as usize).encode(buf);
230 }
231 }
232
233 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
234 Self::decode_with_extensions(false, buf)
235 }
236
237 pub fn decode_with_extensions(
238 has_extensions: bool,
239 buf: &mut impl Buf,
240 ) -> Result<Self, CodecError> {
241 let object_id = VarInt::decode(buf)?;
242 let (extension_headers_length, extensions) = if has_extensions {
243 let ehl = VarInt::decode(buf)?;
244 let ext = read_extension_bytes(buf, ehl.into_inner())?;
245 (ehl, ext)
246 } else {
247 (VarInt::from_usize(0), Vec::new())
248 };
249 let payload_length = VarInt::decode(buf)?;
250 let object_status = if payload_length.into_inner() == 0 {
251 let sv = VarInt::decode(buf)?.into_inner();
252 ObjectStatus::from_u64(sv).ok_or(CodecError::InvalidField)?
253 } else {
254 ObjectStatus::Normal
255 };
256 Ok(Self { object_id, extension_headers_length, extensions, payload_length, object_status })
257 }
258}
259
260#[derive(Debug, Clone, PartialEq, Eq)]
265pub struct DatagramHeader {
266 pub track_alias: VarInt,
267 pub group_id: VarInt,
268 pub object_id: VarInt,
269 pub publisher_priority: u8,
270 pub extension_headers_length: VarInt,
271 pub extensions: Vec<u8>,
272 pub end_of_group: bool,
273}
274
275impl DatagramHeader {
276 pub fn encode(&self, buf: &mut impl BufMut) {
277 self.encode_with_extensions(false, buf);
278 }
279
280 pub fn encode_with_extensions(&self, has_extensions: bool, buf: &mut impl BufMut) {
281 self.track_alias.encode(buf);
282 self.group_id.encode(buf);
283 self.object_id.encode(buf);
284 buf.put_u8(self.publisher_priority);
285 if has_extensions {
286 self.extension_headers_length.encode(buf);
287 buf.put_slice(&self.extensions);
288 }
289 }
290
291 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
292 Self::decode_with_extensions(false, buf)
293 }
294
295 pub fn decode_with_extensions(
296 has_extensions: bool,
297 buf: &mut impl Buf,
298 ) -> Result<Self, CodecError> {
299 let track_alias = VarInt::decode(buf)?;
300 let group_id = VarInt::decode(buf)?;
301 let object_id = VarInt::decode(buf)?;
302 if buf.remaining() < 1 {
303 return Err(CodecError::UnexpectedEnd);
304 }
305 let publisher_priority = buf.get_u8();
306 let (extension_headers_length, extensions) = if has_extensions {
307 let ehl = VarInt::decode(buf)?;
308 let ext = read_extension_bytes(buf, ehl.into_inner())?;
309 (ehl, ext)
310 } else {
311 (VarInt::from_usize(0), Vec::new())
312 };
313 Ok(Self {
314 track_alias,
315 group_id,
316 object_id,
317 publisher_priority,
318 extension_headers_length,
319 extensions,
320 end_of_group: false,
321 })
322 }
323}
324
325#[derive(Debug, Clone, PartialEq, Eq)]
330pub struct DatagramStatusHeader {
331 pub track_alias: VarInt,
332 pub group_id: VarInt,
333 pub object_id: VarInt,
334 pub publisher_priority: u8,
335 pub extension_headers_length: VarInt,
336 pub extensions: Vec<u8>,
337 pub object_status: ObjectStatus,
338}
339
340impl DatagramStatusHeader {
341 pub fn encode(&self, buf: &mut impl BufMut) {
342 self.encode_with_extensions(false, buf);
343 }
344
345 pub fn encode_with_extensions(&self, has_extensions: bool, buf: &mut impl BufMut) {
346 self.track_alias.encode(buf);
347 self.group_id.encode(buf);
348 self.object_id.encode(buf);
349 buf.put_u8(self.publisher_priority);
350 if has_extensions {
351 self.extension_headers_length.encode(buf);
352 buf.put_slice(&self.extensions);
353 }
354 VarInt::from_usize(self.object_status as usize).encode(buf);
355 }
356
357 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
358 Self::decode_with_extensions(false, buf)
359 }
360
361 pub fn decode_with_extensions(
362 has_extensions: bool,
363 buf: &mut impl Buf,
364 ) -> Result<Self, CodecError> {
365 let track_alias = VarInt::decode(buf)?;
366 let group_id = VarInt::decode(buf)?;
367 let object_id = VarInt::decode(buf)?;
368 if buf.remaining() < 1 {
369 return Err(CodecError::UnexpectedEnd);
370 }
371 let publisher_priority = buf.get_u8();
372 let (extension_headers_length, extensions) = if has_extensions {
373 let ehl = VarInt::decode(buf)?;
374 let ext = read_extension_bytes(buf, ehl.into_inner())?;
375 (ehl, ext)
376 } else {
377 (VarInt::from_usize(0), Vec::new())
378 };
379 let sv = VarInt::decode(buf)?.into_inner();
380 let object_status = ObjectStatus::from_u64(sv).ok_or(CodecError::InvalidField)?;
381 Ok(Self {
382 track_alias,
383 group_id,
384 object_id,
385 publisher_priority,
386 extension_headers_length,
387 extensions,
388 object_status,
389 })
390 }
391}
392
393#[derive(Debug, Clone, PartialEq, Eq)]
398pub struct FetchHeader {
399 pub request_id: VarInt,
400}
401
402#[derive(Debug, Clone, PartialEq, Eq)]
403pub struct FetchObjectHeader {
404 pub group_id: VarInt,
405 pub subgroup_id: VarInt,
406 pub object_id: VarInt,
407 pub publisher_priority: u8,
408 pub extension_headers_length: VarInt,
409 pub extensions: Vec<u8>,
410 pub payload_length: VarInt,
411 pub object_status: ObjectStatus,
412}
413
414impl FetchHeader {
415 pub fn encode(&self, buf: &mut impl BufMut) {
416 self.request_id.encode(buf);
417 }
418
419 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
420 let request_id = VarInt::decode(buf)?;
421 Ok(Self { request_id })
422 }
423}
424
425impl FetchObjectHeader {
426 pub fn encode(&self, buf: &mut impl BufMut) {
427 self.group_id.encode(buf);
428 self.subgroup_id.encode(buf);
429 self.object_id.encode(buf);
430 buf.put_u8(self.publisher_priority);
431 self.extension_headers_length.encode(buf);
432 buf.put_slice(&self.extensions);
433 self.payload_length.encode(buf);
434 if self.payload_length.into_inner() == 0 {
435 VarInt::from_usize(self.object_status as usize).encode(buf);
436 }
437 }
438
439 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
440 let group_id = VarInt::decode(buf)?;
441 let subgroup_id = VarInt::decode(buf)?;
442 let object_id = VarInt::decode(buf)?;
443 if buf.remaining() < 1 {
444 return Err(CodecError::UnexpectedEnd);
445 }
446 let publisher_priority = buf.get_u8();
447 let extension_headers_length = VarInt::decode(buf)?;
448 let extensions = read_extension_bytes(buf, extension_headers_length.into_inner())?;
449 let payload_length = VarInt::decode(buf)?;
450 let object_status = if payload_length.into_inner() == 0 {
451 let sv = VarInt::decode(buf)?.into_inner();
452 ObjectStatus::from_u64(sv).ok_or(CodecError::InvalidField)?
453 } else {
454 ObjectStatus::Normal
455 };
456 Ok(Self {
457 group_id,
458 subgroup_id,
459 object_id,
460 publisher_priority,
461 extension_headers_length,
462 extensions,
463 payload_length,
464 object_status,
465 })
466 }
467}