moqtap_codec/draft07/
data_stream.rs1use super::types::ObjectStatus;
2use crate::error::CodecError;
3use crate::varint::VarInt;
4use bytes::{Buf, BufMut};
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8#[repr(u64)]
9pub enum StreamType {
10 Datagram = 0x01,
12 Subgroup = 0x04,
14 Fetch = 0x05,
16}
17
18impl StreamType {
19 pub fn from_id(id: u64) -> Option<Self> {
21 match id {
22 0x01 => Some(StreamType::Datagram),
23 0x04 => Some(StreamType::Subgroup),
24 0x05 => Some(StreamType::Fetch),
25 _ => None,
26 }
27 }
28}
29
30#[derive(Debug, Clone, PartialEq, Eq)]
36pub struct SubgroupHeader {
37 pub track_alias: VarInt,
39 pub group_id: VarInt,
41 pub subgroup_id: VarInt,
43 pub publisher_priority: u8,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct ObjectHeader {
54 pub object_id: VarInt,
56 pub payload_length: VarInt,
58 pub object_status: ObjectStatus,
60}
61
62impl SubgroupHeader {
63 pub fn encode(&self, buf: &mut impl BufMut) {
65 self.track_alias.encode(buf);
66 self.group_id.encode(buf);
67 self.subgroup_id.encode(buf);
68 buf.put_u8(self.publisher_priority);
69 }
70
71 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
73 let track_alias = VarInt::decode(buf)?;
74 let group_id = VarInt::decode(buf)?;
75 let subgroup_id = VarInt::decode(buf)?;
76 if buf.remaining() < 1 {
77 return Err(CodecError::UnexpectedEnd);
78 }
79 let publisher_priority = buf.get_u8();
80 Ok(Self { track_alias, group_id, subgroup_id, publisher_priority })
81 }
82}
83
84impl ObjectHeader {
85 pub fn encode(&self, buf: &mut impl BufMut) {
87 self.object_id.encode(buf);
88 self.payload_length.encode(buf);
89 if self.payload_length.into_inner() == 0 {
90 VarInt::from_usize(self.object_status as usize).encode(buf);
91 }
92 }
93
94 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
96 let object_id = VarInt::decode(buf)?;
97 let payload_length = VarInt::decode(buf)?;
98 let object_status = if payload_length.into_inner() == 0 {
99 let status_val = VarInt::decode(buf)?.into_inner();
100 ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
101 } else {
102 ObjectStatus::Normal
103 };
104 Ok(Self { object_id, payload_length, object_status })
105 }
106}
107
108#[derive(Debug, Clone, PartialEq, Eq)]
120pub struct DatagramHeader {
121 pub track_alias: VarInt,
123 pub group_id: VarInt,
125 pub object_id: VarInt,
127 pub publisher_priority: u8,
129 pub object_status: ObjectStatus,
131 pub payload_length: VarInt,
133}
134
135impl DatagramHeader {
136 pub fn encode(&self, buf: &mut impl BufMut) {
138 self.track_alias.encode(buf);
139 self.group_id.encode(buf);
140 self.object_id.encode(buf);
141 buf.put_u8(self.publisher_priority);
142 self.payload_length.encode(buf);
143 if self.payload_length.into_inner() == 0 {
144 VarInt::from_usize(self.object_status as usize).encode(buf);
145 }
146 }
147
148 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
150 let track_alias = VarInt::decode(buf)?;
151 let group_id = VarInt::decode(buf)?;
152 let object_id = VarInt::decode(buf)?;
153 if buf.remaining() < 1 {
154 return Err(CodecError::UnexpectedEnd);
155 }
156 let publisher_priority = buf.get_u8();
157 let payload_length = VarInt::decode(buf)?;
158 let object_status = if payload_length.into_inner() == 0 {
159 let status_val = VarInt::decode(buf)?.into_inner();
160 ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
161 } else {
162 ObjectStatus::Normal
163 };
164 Ok(Self {
165 track_alias,
166 group_id,
167 object_id,
168 publisher_priority,
169 object_status,
170 payload_length,
171 })
172 }
173}
174
175#[derive(Debug, Clone, PartialEq, Eq)]
181pub struct FetchHeader {
182 pub subscribe_id: VarInt,
184}
185
186#[derive(Debug, Clone, PartialEq, Eq)]
193pub struct FetchObjectHeader {
194 pub group_id: VarInt,
196 pub subgroup_id: VarInt,
198 pub object_id: VarInt,
200 pub publisher_priority: u8,
202 pub object_status: ObjectStatus,
204 pub payload_length: VarInt,
206}
207
208impl FetchHeader {
209 pub fn encode(&self, buf: &mut impl BufMut) {
211 self.subscribe_id.encode(buf);
212 }
213
214 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
216 let subscribe_id = VarInt::decode(buf)?;
217 Ok(Self { subscribe_id })
218 }
219}
220
221impl FetchObjectHeader {
222 pub fn encode(&self, buf: &mut impl BufMut) {
224 self.group_id.encode(buf);
225 self.subgroup_id.encode(buf);
226 self.object_id.encode(buf);
227 buf.put_u8(self.publisher_priority);
228 self.payload_length.encode(buf);
229 if self.payload_length.into_inner() == 0 {
230 VarInt::from_usize(self.object_status as usize).encode(buf);
231 }
232 }
233
234 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
236 let group_id = VarInt::decode(buf)?;
237 let subgroup_id = VarInt::decode(buf)?;
238 let object_id = VarInt::decode(buf)?;
239 if buf.remaining() < 1 {
240 return Err(CodecError::UnexpectedEnd);
241 }
242 let publisher_priority = buf.get_u8();
243 let payload_length = VarInt::decode(buf)?;
244 let object_status = if payload_length.into_inner() == 0 {
245 let status_val = VarInt::decode(buf)?.into_inner();
246 ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
247 } else {
248 ObjectStatus::Normal
249 };
250 Ok(Self {
251 group_id,
252 subgroup_id,
253 object_id,
254 publisher_priority,
255 object_status,
256 payload_length,
257 })
258 }
259}