1use super::types::ObjectStatus;
9use crate::error::CodecError;
10use crate::types::read_bytes;
11use crate::varint::VarInt;
12use bytes::{Buf, BufMut};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16#[repr(u64)]
17pub enum StreamType {
18 Datagram = 0x01,
20 DatagramStatus = 0x02,
22 Subgroup = 0x04,
24 Fetch = 0x05,
26}
27
28impl StreamType {
29 pub fn from_id(id: u64) -> Option<Self> {
31 match id {
32 0x01 => Some(StreamType::Datagram),
33 0x02 => Some(StreamType::DatagramStatus),
34 0x04 => Some(StreamType::Subgroup),
35 0x05 => Some(StreamType::Fetch),
36 _ => None,
37 }
38 }
39}
40
41fn read_extension_bytes(buf: &mut impl Buf, byte_len: u64) -> Result<Vec<u8>, CodecError> {
45 read_bytes(buf, byte_len as usize)
46}
47
48fn encode_extensions(extensions: &[u8], buf: &mut impl BufMut) {
50 buf.put_slice(extensions);
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct SubgroupHeader {
60 pub track_alias: VarInt,
62 pub group_id: VarInt,
64 pub subgroup_id: VarInt,
66 pub publisher_priority: u8,
68}
69
70#[derive(Debug, Clone, PartialEq, Eq)]
77pub struct ObjectHeader {
78 pub object_id: VarInt,
80 pub extension_headers_length: VarInt,
82 pub extensions: Vec<u8>,
84 pub payload_length: VarInt,
86 pub object_status: ObjectStatus,
88}
89
90impl SubgroupHeader {
91 pub fn encode(&self, buf: &mut impl BufMut) {
93 self.track_alias.encode(buf);
94 self.group_id.encode(buf);
95 self.subgroup_id.encode(buf);
96 buf.put_u8(self.publisher_priority);
97 }
98
99 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
101 let track_alias = VarInt::decode(buf)?;
102 let group_id = VarInt::decode(buf)?;
103 let subgroup_id = VarInt::decode(buf)?;
104 if buf.remaining() < 1 {
105 return Err(CodecError::UnexpectedEnd);
106 }
107 let publisher_priority = buf.get_u8();
108 Ok(Self { track_alias, group_id, subgroup_id, publisher_priority })
109 }
110}
111
112impl ObjectHeader {
113 pub fn encode(&self, buf: &mut impl BufMut) {
115 self.object_id.encode(buf);
116 self.extension_headers_length.encode(buf);
117 encode_extensions(&self.extensions, buf);
118 self.payload_length.encode(buf);
119 if self.payload_length.into_inner() == 0 {
120 VarInt::from_usize(self.object_status as usize).encode(buf);
121 }
122 }
123
124 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
126 let object_id = VarInt::decode(buf)?;
127 let extension_headers_length = VarInt::decode(buf)?;
128 let extensions = read_extension_bytes(buf, extension_headers_length.into_inner())?;
129 let payload_length = VarInt::decode(buf)?;
130 let object_status = if payload_length.into_inner() == 0 {
131 let status_val = VarInt::decode(buf)?.into_inner();
132 ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
133 } else {
134 ObjectStatus::Normal
135 };
136 Ok(Self { object_id, extension_headers_length, extensions, payload_length, object_status })
137 }
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
154pub struct DatagramHeader {
155 pub track_alias: VarInt,
157 pub group_id: VarInt,
159 pub object_id: VarInt,
161 pub publisher_priority: u8,
163 pub extension_headers_length: VarInt,
165 pub extensions: Vec<u8>,
167}
168
169impl DatagramHeader {
170 pub fn encode(&self, buf: &mut impl BufMut) {
172 self.track_alias.encode(buf);
173 self.group_id.encode(buf);
174 self.object_id.encode(buf);
175 buf.put_u8(self.publisher_priority);
176 self.extension_headers_length.encode(buf);
177 encode_extensions(&self.extensions, buf);
178 }
179
180 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
182 let track_alias = VarInt::decode(buf)?;
183 let group_id = VarInt::decode(buf)?;
184 let object_id = VarInt::decode(buf)?;
185 if buf.remaining() < 1 {
186 return Err(CodecError::UnexpectedEnd);
187 }
188 let publisher_priority = buf.get_u8();
189 let extension_headers_length = VarInt::decode(buf)?;
190 let extensions = read_extension_bytes(buf, extension_headers_length.into_inner())?;
191 Ok(Self {
192 track_alias,
193 group_id,
194 object_id,
195 publisher_priority,
196 extension_headers_length,
197 extensions,
198 })
199 }
200}
201
202#[derive(Debug, Clone, PartialEq, Eq)]
210pub struct DatagramStatusHeader {
211 pub track_alias: VarInt,
213 pub group_id: VarInt,
215 pub object_id: VarInt,
217 pub publisher_priority: u8,
219 pub extension_headers_length: VarInt,
221 pub extensions: Vec<u8>,
223 pub object_status: ObjectStatus,
225}
226
227impl DatagramStatusHeader {
228 pub fn encode(&self, buf: &mut impl BufMut) {
230 self.track_alias.encode(buf);
231 self.group_id.encode(buf);
232 self.object_id.encode(buf);
233 buf.put_u8(self.publisher_priority);
234 self.extension_headers_length.encode(buf);
235 encode_extensions(&self.extensions, buf);
236 VarInt::from_usize(self.object_status as usize).encode(buf);
237 }
238
239 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
241 let track_alias = VarInt::decode(buf)?;
242 let group_id = VarInt::decode(buf)?;
243 let object_id = VarInt::decode(buf)?;
244 if buf.remaining() < 1 {
245 return Err(CodecError::UnexpectedEnd);
246 }
247 let publisher_priority = buf.get_u8();
248 let extension_headers_length = VarInt::decode(buf)?;
249 let extensions = read_extension_bytes(buf, extension_headers_length.into_inner())?;
250 let status_val = VarInt::decode(buf)?.into_inner();
251 let object_status = ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?;
252 Ok(Self {
253 track_alias,
254 group_id,
255 object_id,
256 publisher_priority,
257 extension_headers_length,
258 extensions,
259 object_status,
260 })
261 }
262}
263
264#[derive(Debug, Clone, PartialEq, Eq)]
270pub struct FetchHeader {
271 pub subscribe_id: VarInt,
273}
274
275#[derive(Debug, Clone, PartialEq, Eq)]
279pub struct FetchObjectHeader {
280 pub group_id: VarInt,
282 pub subgroup_id: VarInt,
284 pub object_id: VarInt,
286 pub publisher_priority: u8,
288 pub extension_headers_length: VarInt,
290 pub extensions: Vec<u8>,
292 pub object_status: ObjectStatus,
294 pub payload_length: VarInt,
296}
297
298impl FetchHeader {
299 pub fn encode(&self, buf: &mut impl BufMut) {
301 self.subscribe_id.encode(buf);
302 }
303
304 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
306 let subscribe_id = VarInt::decode(buf)?;
307 Ok(Self { subscribe_id })
308 }
309}
310
311impl FetchObjectHeader {
312 pub fn encode(&self, buf: &mut impl BufMut) {
314 self.group_id.encode(buf);
315 self.subgroup_id.encode(buf);
316 self.object_id.encode(buf);
317 buf.put_u8(self.publisher_priority);
318 self.extension_headers_length.encode(buf);
319 encode_extensions(&self.extensions, buf);
320 self.payload_length.encode(buf);
321 if self.payload_length.into_inner() == 0 {
322 VarInt::from_usize(self.object_status as usize).encode(buf);
323 }
324 }
325
326 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
328 let group_id = VarInt::decode(buf)?;
329 let subgroup_id = VarInt::decode(buf)?;
330 let object_id = VarInt::decode(buf)?;
331 if buf.remaining() < 1 {
332 return Err(CodecError::UnexpectedEnd);
333 }
334 let publisher_priority = buf.get_u8();
335 let extension_headers_length = VarInt::decode(buf)?;
336 let extensions = read_extension_bytes(buf, extension_headers_length.into_inner())?;
337 let payload_length = VarInt::decode(buf)?;
338 let object_status = if payload_length.into_inner() == 0 {
339 let status_val = VarInt::decode(buf)?.into_inner();
340 ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
341 } else {
342 ObjectStatus::Normal
343 };
344 Ok(Self {
345 group_id,
346 subgroup_id,
347 object_id,
348 publisher_priority,
349 extension_headers_length,
350 extensions,
351 object_status,
352 payload_length,
353 })
354 }
355}