1use super::types::ObjectStatus;
9use crate::error::CodecError;
10use crate::types::read_bytes;
11use crate::varint::VarInt;
12use bytes::{Buf, BufMut};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16#[repr(u64)]
17pub enum StreamType {
18 Datagram = 0x01,
20 DatagramStatus = 0x02,
22 Subgroup = 0x04,
24 Fetch = 0x05,
26}
27
28impl StreamType {
29 pub fn from_id(id: u64) -> Option<Self> {
31 match id {
32 0x01 => Some(StreamType::Datagram),
33 0x02 => Some(StreamType::DatagramStatus),
34 0x04 => Some(StreamType::Subgroup),
35 0x05 => Some(StreamType::Fetch),
36 _ => None,
37 }
38 }
39}
40
41fn skip_extensions(buf: &mut impl Buf, count: u64) -> Result<Vec<u8>, CodecError> {
49 let mut raw = Vec::new();
50 for _ in 0..count {
51 let ext_type = VarInt::decode(buf)?;
52 ext_type.encode(&mut raw);
53 if ext_type.into_inner() % 2 == 0 {
54 let val = VarInt::decode(buf)?;
55 val.encode(&mut raw);
56 } else {
57 let len = VarInt::decode(buf)?.into_inner() as usize;
58 VarInt::from_usize(len).encode(&mut raw);
59 let bytes = read_bytes(buf, len)?;
60 raw.extend_from_slice(&bytes);
61 }
62 }
63 Ok(raw)
64}
65
66fn encode_extensions(extensions: &[u8], buf: &mut impl BufMut) {
68 buf.put_slice(extensions);
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
77pub struct SubgroupHeader {
78 pub track_alias: VarInt,
80 pub group_id: VarInt,
82 pub subgroup_id: VarInt,
84 pub publisher_priority: u8,
86}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct ObjectHeader {
96 pub object_id: VarInt,
98 pub extension_count: VarInt,
100 pub extensions: Vec<u8>,
102 pub payload_length: VarInt,
104 pub object_status: ObjectStatus,
106}
107
108impl SubgroupHeader {
109 pub fn encode(&self, buf: &mut impl BufMut) {
111 self.track_alias.encode(buf);
112 self.group_id.encode(buf);
113 self.subgroup_id.encode(buf);
114 buf.put_u8(self.publisher_priority);
115 }
116
117 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
119 let track_alias = VarInt::decode(buf)?;
120 let group_id = VarInt::decode(buf)?;
121 let subgroup_id = VarInt::decode(buf)?;
122 if buf.remaining() < 1 {
123 return Err(CodecError::UnexpectedEnd);
124 }
125 let publisher_priority = buf.get_u8();
126 Ok(Self { track_alias, group_id, subgroup_id, publisher_priority })
127 }
128}
129
130impl ObjectHeader {
131 pub fn encode(&self, buf: &mut impl BufMut) {
133 self.object_id.encode(buf);
134 self.extension_count.encode(buf);
135 encode_extensions(&self.extensions, buf);
136 self.payload_length.encode(buf);
137 if self.payload_length.into_inner() == 0 {
138 VarInt::from_usize(self.object_status as usize).encode(buf);
139 }
140 }
141
142 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
144 let object_id = VarInt::decode(buf)?;
145 let extension_count = VarInt::decode(buf)?;
146 let extensions = skip_extensions(buf, extension_count.into_inner())?;
147 let payload_length = VarInt::decode(buf)?;
148 let object_status = if payload_length.into_inner() == 0 {
149 let status_val = VarInt::decode(buf)?.into_inner();
150 ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
151 } else {
152 ObjectStatus::Normal
153 };
154 Ok(Self { object_id, extension_count, extensions, payload_length, object_status })
155 }
156}
157
158#[derive(Debug, Clone, PartialEq, Eq)]
171pub struct DatagramHeader {
172 pub track_alias: VarInt,
174 pub group_id: VarInt,
176 pub object_id: VarInt,
178 pub publisher_priority: u8,
180 pub extension_count: VarInt,
182 pub extensions: Vec<u8>,
184 pub object_status: ObjectStatus,
186 pub payload_length: VarInt,
188}
189
190impl DatagramHeader {
191 pub fn encode(&self, buf: &mut impl BufMut) {
193 self.track_alias.encode(buf);
194 self.group_id.encode(buf);
195 self.object_id.encode(buf);
196 buf.put_u8(self.publisher_priority);
197 self.extension_count.encode(buf);
198 encode_extensions(&self.extensions, buf);
199 self.payload_length.encode(buf);
200 if self.payload_length.into_inner() == 0 {
201 VarInt::from_usize(self.object_status as usize).encode(buf);
202 }
203 }
204
205 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
207 let track_alias = VarInt::decode(buf)?;
208 let group_id = VarInt::decode(buf)?;
209 let object_id = VarInt::decode(buf)?;
210 if buf.remaining() < 1 {
211 return Err(CodecError::UnexpectedEnd);
212 }
213 let publisher_priority = buf.get_u8();
214 let extension_count = VarInt::decode(buf)?;
215 let extensions = skip_extensions(buf, extension_count.into_inner())?;
216 let payload_length = VarInt::decode(buf)?;
217 let object_status = if payload_length.into_inner() == 0 {
218 let status_val = VarInt::decode(buf)?.into_inner();
219 ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
220 } else {
221 ObjectStatus::Normal
222 };
223 Ok(Self {
224 track_alias,
225 group_id,
226 object_id,
227 publisher_priority,
228 extension_count,
229 extensions,
230 object_status,
231 payload_length,
232 })
233 }
234}
235
236#[derive(Debug, Clone, PartialEq, Eq)]
244pub struct DatagramStatusHeader {
245 pub track_alias: VarInt,
247 pub group_id: VarInt,
249 pub object_id: VarInt,
251 pub publisher_priority: u8,
253 pub object_status: ObjectStatus,
255}
256
257impl DatagramStatusHeader {
258 pub fn encode(&self, buf: &mut impl BufMut) {
260 self.track_alias.encode(buf);
261 self.group_id.encode(buf);
262 self.object_id.encode(buf);
263 buf.put_u8(self.publisher_priority);
264 VarInt::from_usize(self.object_status as usize).encode(buf);
265 }
266
267 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
269 let track_alias = VarInt::decode(buf)?;
270 let group_id = VarInt::decode(buf)?;
271 let object_id = VarInt::decode(buf)?;
272 if buf.remaining() < 1 {
273 return Err(CodecError::UnexpectedEnd);
274 }
275 let publisher_priority = buf.get_u8();
276 let status_val = VarInt::decode(buf)?.into_inner();
277 let object_status = ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?;
278 Ok(Self { track_alias, group_id, object_id, publisher_priority, object_status })
279 }
280}
281
282#[derive(Debug, Clone, PartialEq, Eq)]
288pub struct FetchHeader {
289 pub subscribe_id: VarInt,
291}
292
293#[derive(Debug, Clone, PartialEq, Eq)]
301pub struct FetchObjectHeader {
302 pub group_id: VarInt,
304 pub subgroup_id: VarInt,
306 pub object_id: VarInt,
308 pub publisher_priority: u8,
310 pub extension_count: VarInt,
312 pub extensions: Vec<u8>,
314 pub object_status: ObjectStatus,
316 pub payload_length: VarInt,
318}
319
320impl FetchHeader {
321 pub fn encode(&self, buf: &mut impl BufMut) {
323 self.subscribe_id.encode(buf);
324 }
325
326 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
328 let subscribe_id = VarInt::decode(buf)?;
329 Ok(Self { subscribe_id })
330 }
331}
332
333impl FetchObjectHeader {
334 pub fn encode(&self, buf: &mut impl BufMut) {
336 self.group_id.encode(buf);
337 self.subgroup_id.encode(buf);
338 self.object_id.encode(buf);
339 buf.put_u8(self.publisher_priority);
340 self.extension_count.encode(buf);
341 encode_extensions(&self.extensions, buf);
342 self.payload_length.encode(buf);
343 if self.payload_length.into_inner() == 0 {
344 VarInt::from_usize(self.object_status as usize).encode(buf);
345 }
346 }
347
348 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
350 let group_id = VarInt::decode(buf)?;
351 let subgroup_id = VarInt::decode(buf)?;
352 let object_id = VarInt::decode(buf)?;
353 if buf.remaining() < 1 {
354 return Err(CodecError::UnexpectedEnd);
355 }
356 let publisher_priority = buf.get_u8();
357 let extension_count = VarInt::decode(buf)?;
358 let extensions = skip_extensions(buf, extension_count.into_inner())?;
359 let payload_length = VarInt::decode(buf)?;
360 let object_status = if payload_length.into_inner() == 0 {
361 let status_val = VarInt::decode(buf)?.into_inner();
362 ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
363 } else {
364 ObjectStatus::Normal
365 };
366 Ok(Self {
367 group_id,
368 subgroup_id,
369 object_id,
370 publisher_priority,
371 extension_count,
372 extensions,
373 object_status,
374 payload_length,
375 })
376 }
377}