1use super::types::ObjectStatus;
12use crate::error::CodecError;
13use crate::types::read_bytes;
14use crate::varint::VarInt;
15use bytes::{Buf, BufMut};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19#[repr(u64)]
20pub enum StreamType {
21 Datagram = 0x00,
23 DatagramExt = 0x01,
25 DatagramStatus = 0x02,
27 DatagramStatusExt = 0x03,
29 Fetch = 0x05,
31 SubgroupZero = 0x08,
33 SubgroupZeroExt = 0x09,
35 SubgroupFirstObj = 0x0A,
37 SubgroupFirstObjExt = 0x0B,
39 SubgroupExplicit = 0x0C,
41 SubgroupExplicitExt = 0x0D,
43}
44
45impl StreamType {
46 pub fn from_id(id: u64) -> Option<Self> {
48 match id {
49 0x00 => Some(StreamType::Datagram),
50 0x01 => Some(StreamType::DatagramExt),
51 0x02 => Some(StreamType::DatagramStatus),
52 0x03 => Some(StreamType::DatagramStatusExt),
53 0x05 => Some(StreamType::Fetch),
54 0x08 => Some(StreamType::SubgroupZero),
55 0x09 => Some(StreamType::SubgroupZeroExt),
56 0x0A => Some(StreamType::SubgroupFirstObj),
57 0x0B => Some(StreamType::SubgroupFirstObjExt),
58 0x0C => Some(StreamType::SubgroupExplicit),
59 0x0D => Some(StreamType::SubgroupExplicitExt),
60 _ => None,
61 }
62 }
63
64 pub fn is_subgroup(&self) -> bool {
66 matches!(
67 self,
68 StreamType::SubgroupZero
69 | StreamType::SubgroupZeroExt
70 | StreamType::SubgroupFirstObj
71 | StreamType::SubgroupFirstObjExt
72 | StreamType::SubgroupExplicit
73 | StreamType::SubgroupExplicitExt
74 )
75 }
76
77 pub fn has_extensions(&self) -> bool {
79 matches!(
80 self,
81 StreamType::DatagramExt
82 | StreamType::DatagramStatusExt
83 | StreamType::SubgroupZeroExt
84 | StreamType::SubgroupFirstObjExt
85 | StreamType::SubgroupExplicitExt
86 )
87 }
88}
89
90fn read_extension_bytes(buf: &mut impl Buf, byte_len: u64) -> Result<Vec<u8>, CodecError> {
93 read_bytes(buf, byte_len as usize)
94}
95
96#[derive(Debug, Clone, PartialEq, Eq)]
104pub struct SubgroupHeader {
105 pub stream_type: StreamType,
107 pub track_alias: VarInt,
109 pub group_id: VarInt,
111 pub subgroup_id: VarInt,
113 pub publisher_priority: u8,
115}
116
117impl SubgroupHeader {
118 pub fn encode(&self, buf: &mut impl BufMut) {
120 self.track_alias.encode(buf);
121 self.group_id.encode(buf);
122 match self.stream_type {
123 StreamType::SubgroupExplicit | StreamType::SubgroupExplicitExt => {
124 self.subgroup_id.encode(buf);
125 }
126 _ => {}
127 }
128 buf.put_u8(self.publisher_priority);
129 }
130
131 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
135 Self::decode_with_type(StreamType::SubgroupExplicit, buf)
136 }
137
138 pub fn decode_with_type(
140 stream_type: StreamType,
141 buf: &mut impl Buf,
142 ) -> Result<Self, CodecError> {
143 let track_alias = VarInt::decode(buf)?;
144 let group_id = VarInt::decode(buf)?;
145 let subgroup_id = match stream_type {
146 StreamType::SubgroupZero | StreamType::SubgroupZeroExt => VarInt::from_usize(0),
147 StreamType::SubgroupExplicit | StreamType::SubgroupExplicitExt => VarInt::decode(buf)?,
148 StreamType::SubgroupFirstObj | StreamType::SubgroupFirstObjExt => VarInt::from_usize(0),
152 _ => return Err(CodecError::InvalidField),
153 };
154 if buf.remaining() < 1 {
155 return Err(CodecError::UnexpectedEnd);
156 }
157 let publisher_priority = buf.get_u8();
158 Ok(Self { stream_type, track_alias, group_id, subgroup_id, publisher_priority })
159 }
160}
161
162#[derive(Debug, Clone, PartialEq, Eq)]
168pub struct ObjectHeader {
169 pub object_id: VarInt,
171 pub extension_headers_length: VarInt,
173 pub extensions: Vec<u8>,
175 pub payload_length: VarInt,
177 pub object_status: ObjectStatus,
179}
180
181impl ObjectHeader {
182 pub fn encode(&self, buf: &mut impl BufMut) {
186 self.encode_with_extensions(false, buf);
187 }
188
189 pub fn encode_with_extensions(&self, has_extensions: bool, buf: &mut impl BufMut) {
191 self.object_id.encode(buf);
192 if has_extensions {
193 self.extension_headers_length.encode(buf);
194 buf.put_slice(&self.extensions);
195 }
196 self.payload_length.encode(buf);
197 if self.payload_length.into_inner() == 0 {
198 VarInt::from_usize(self.object_status as usize).encode(buf);
199 }
200 }
201
202 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
206 Self::decode_with_extensions(false, buf)
207 }
208
209 pub fn decode_with_extensions(
211 has_extensions: bool,
212 buf: &mut impl Buf,
213 ) -> Result<Self, CodecError> {
214 let object_id = VarInt::decode(buf)?;
215 let (extension_headers_length, extensions) = if has_extensions {
216 let ehl = VarInt::decode(buf)?;
217 let ext = read_extension_bytes(buf, ehl.into_inner())?;
218 (ehl, ext)
219 } else {
220 (VarInt::from_usize(0), Vec::new())
221 };
222 let payload_length = VarInt::decode(buf)?;
223 let object_status = if payload_length.into_inner() == 0 {
224 let sv = VarInt::decode(buf)?.into_inner();
225 ObjectStatus::from_u64(sv).ok_or(CodecError::InvalidField)?
226 } else {
227 ObjectStatus::Normal
228 };
229 Ok(Self { object_id, extension_headers_length, extensions, payload_length, object_status })
230 }
231}
232
233#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct DatagramHeader {
242 pub track_alias: VarInt,
244 pub group_id: VarInt,
246 pub object_id: VarInt,
248 pub publisher_priority: u8,
250 pub extension_headers_length: VarInt,
252 pub extensions: Vec<u8>,
254}
255
256impl DatagramHeader {
257 pub fn encode(&self, buf: &mut impl BufMut) {
259 self.encode_with_extensions(false, buf);
260 }
261
262 pub fn encode_with_extensions(&self, has_extensions: bool, buf: &mut impl BufMut) {
264 self.track_alias.encode(buf);
265 self.group_id.encode(buf);
266 self.object_id.encode(buf);
267 buf.put_u8(self.publisher_priority);
268 if has_extensions {
269 self.extension_headers_length.encode(buf);
270 buf.put_slice(&self.extensions);
271 }
272 }
273
274 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
276 Self::decode_with_extensions(false, buf)
277 }
278
279 pub fn decode_with_extensions(
281 has_extensions: bool,
282 buf: &mut impl Buf,
283 ) -> Result<Self, CodecError> {
284 let track_alias = VarInt::decode(buf)?;
285 let group_id = VarInt::decode(buf)?;
286 let object_id = VarInt::decode(buf)?;
287 if buf.remaining() < 1 {
288 return Err(CodecError::UnexpectedEnd);
289 }
290 let publisher_priority = buf.get_u8();
291 let (extension_headers_length, extensions) = if has_extensions {
292 let ehl = VarInt::decode(buf)?;
293 let ext = read_extension_bytes(buf, ehl.into_inner())?;
294 (ehl, ext)
295 } else {
296 (VarInt::from_usize(0), Vec::new())
297 };
298 Ok(Self {
299 track_alias,
300 group_id,
301 object_id,
302 publisher_priority,
303 extension_headers_length,
304 extensions,
305 })
306 }
307}
308
309#[derive(Debug, Clone, PartialEq, Eq)]
315pub struct DatagramStatusHeader {
316 pub track_alias: VarInt,
318 pub group_id: VarInt,
320 pub object_id: VarInt,
322 pub publisher_priority: u8,
324 pub extension_headers_length: VarInt,
326 pub extensions: Vec<u8>,
328 pub object_status: ObjectStatus,
330}
331
332impl DatagramStatusHeader {
333 pub fn encode(&self, buf: &mut impl BufMut) {
335 self.encode_with_extensions(false, buf);
336 }
337
338 pub fn encode_with_extensions(&self, has_extensions: bool, buf: &mut impl BufMut) {
340 self.track_alias.encode(buf);
341 self.group_id.encode(buf);
342 self.object_id.encode(buf);
343 buf.put_u8(self.publisher_priority);
344 if has_extensions {
345 self.extension_headers_length.encode(buf);
346 buf.put_slice(&self.extensions);
347 }
348 VarInt::from_usize(self.object_status as usize).encode(buf);
349 }
350
351 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
353 Self::decode_with_extensions(false, buf)
354 }
355
356 pub fn decode_with_extensions(
358 has_extensions: bool,
359 buf: &mut impl Buf,
360 ) -> Result<Self, CodecError> {
361 let track_alias = VarInt::decode(buf)?;
362 let group_id = VarInt::decode(buf)?;
363 let object_id = VarInt::decode(buf)?;
364 if buf.remaining() < 1 {
365 return Err(CodecError::UnexpectedEnd);
366 }
367 let publisher_priority = buf.get_u8();
368 let (extension_headers_length, extensions) = if has_extensions {
369 let ehl = VarInt::decode(buf)?;
370 let ext = read_extension_bytes(buf, ehl.into_inner())?;
371 (ehl, ext)
372 } else {
373 (VarInt::from_usize(0), Vec::new())
374 };
375 let sv = VarInt::decode(buf)?.into_inner();
376 let object_status = ObjectStatus::from_u64(sv).ok_or(CodecError::InvalidField)?;
377 Ok(Self {
378 track_alias,
379 group_id,
380 object_id,
381 publisher_priority,
382 extension_headers_length,
383 extensions,
384 object_status,
385 })
386 }
387}
388
389#[derive(Debug, Clone, PartialEq, Eq)]
397pub struct FetchHeader {
398 pub request_id: VarInt,
400}
401
402#[derive(Debug, Clone, PartialEq, Eq)]
404pub struct FetchObjectHeader {
405 pub group_id: VarInt,
407 pub subgroup_id: VarInt,
409 pub object_id: VarInt,
411 pub publisher_priority: u8,
413 pub extension_headers_length: VarInt,
415 pub extensions: Vec<u8>,
417 pub payload_length: VarInt,
419 pub object_status: ObjectStatus,
421}
422
423impl FetchHeader {
424 pub fn encode(&self, buf: &mut impl BufMut) {
426 self.request_id.encode(buf);
427 }
428
429 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
431 let request_id = VarInt::decode(buf)?;
432 Ok(Self { request_id })
433 }
434}
435
436impl FetchObjectHeader {
437 pub fn encode(&self, buf: &mut impl BufMut) {
439 self.group_id.encode(buf);
440 self.subgroup_id.encode(buf);
441 self.object_id.encode(buf);
442 buf.put_u8(self.publisher_priority);
443 self.extension_headers_length.encode(buf);
444 buf.put_slice(&self.extensions);
445 self.payload_length.encode(buf);
446 if self.payload_length.into_inner() == 0 {
447 VarInt::from_usize(self.object_status as usize).encode(buf);
448 }
449 }
450
451 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
453 let group_id = VarInt::decode(buf)?;
454 let subgroup_id = VarInt::decode(buf)?;
455 let object_id = VarInt::decode(buf)?;
456 if buf.remaining() < 1 {
457 return Err(CodecError::UnexpectedEnd);
458 }
459 let publisher_priority = buf.get_u8();
460 let extension_headers_length = VarInt::decode(buf)?;
461 let extensions = read_extension_bytes(buf, extension_headers_length.into_inner())?;
462 let payload_length = VarInt::decode(buf)?;
463 let object_status = if payload_length.into_inner() == 0 {
464 let sv = VarInt::decode(buf)?.into_inner();
465 ObjectStatus::from_u64(sv).ok_or(CodecError::InvalidField)?
466 } else {
467 ObjectStatus::Normal
468 };
469 Ok(Self {
470 group_id,
471 subgroup_id,
472 object_id,
473 publisher_priority,
474 extension_headers_length,
475 extensions,
476 payload_length,
477 object_status,
478 })
479 }
480}