Skip to main content

moqtap_codec/draft07/
message.rs

1use crate::error::{
2    CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_REASON_PHRASE_LENGTH,
3};
4use crate::kvp::KeyValuePair;
5use crate::types::read_bytes;
6use crate::types::*;
7use crate::varint::VarInt;
8use bytes::{Buf, BufMut};
9
10/// Control message type IDs (draft-07).
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12#[repr(u64)]
13pub enum MessageType {
14    /// SubscribeUpdate (type 0x02).
15    SubscribeUpdate = 0x02,
16    /// Subscribe (type 0x03).
17    Subscribe = 0x03,
18    /// SubscribeOk (type 0x04).
19    SubscribeOk = 0x04,
20    /// SubscribeError (type 0x05).
21    SubscribeError = 0x05,
22    /// Announce (type 0x06).
23    Announce = 0x06,
24    /// AnnounceOk (type 0x07).
25    AnnounceOk = 0x07,
26    /// AnnounceError (type 0x08).
27    AnnounceError = 0x08,
28    /// Unannounce (type 0x09).
29    Unannounce = 0x09,
30    /// Unsubscribe (type 0x0A).
31    Unsubscribe = 0x0A,
32    /// SubscribeDone (type 0x0B).
33    SubscribeDone = 0x0B,
34    /// AnnounceCancel (type 0x0C).
35    AnnounceCancel = 0x0C,
36    /// TrackStatusRequest (type 0x0D).
37    TrackStatusRequest = 0x0D,
38    /// TrackStatus (type 0x0E).
39    TrackStatus = 0x0E,
40    /// GoAway (type 0x10).
41    GoAway = 0x10,
42    /// SubscribeAnnounces (type 0x11).
43    SubscribeAnnounces = 0x11,
44    /// SubscribeAnnouncesOk (type 0x12).
45    SubscribeAnnouncesOk = 0x12,
46    /// SubscribeAnnouncesError (type 0x13).
47    SubscribeAnnouncesError = 0x13,
48    /// UnsubscribeAnnounces (type 0x14).
49    UnsubscribeAnnounces = 0x14,
50    /// MaxSubscribeId (type 0x15).
51    MaxSubscribeId = 0x15,
52    /// Fetch (type 0x16).
53    Fetch = 0x16,
54    /// FetchCancel (type 0x17).
55    FetchCancel = 0x17,
56    /// FetchOk (type 0x18).
57    FetchOk = 0x18,
58    /// FetchError (type 0x19).
59    FetchError = 0x19,
60    /// ClientSetup (type 0x40).
61    ClientSetup = 0x40,
62    /// ServerSetup (type 0x41).
63    ServerSetup = 0x41,
64}
65
66impl MessageType {
67    /// Look up a message type by its wire ID.
68    pub fn from_id(id: u64) -> Option<Self> {
69        match id {
70            0x02 => Some(MessageType::SubscribeUpdate),
71            0x03 => Some(MessageType::Subscribe),
72            0x04 => Some(MessageType::SubscribeOk),
73            0x05 => Some(MessageType::SubscribeError),
74            0x06 => Some(MessageType::Announce),
75            0x07 => Some(MessageType::AnnounceOk),
76            0x08 => Some(MessageType::AnnounceError),
77            0x09 => Some(MessageType::Unannounce),
78            0x0A => Some(MessageType::Unsubscribe),
79            0x0B => Some(MessageType::SubscribeDone),
80            0x0C => Some(MessageType::AnnounceCancel),
81            0x0D => Some(MessageType::TrackStatusRequest),
82            0x0E => Some(MessageType::TrackStatus),
83            0x10 => Some(MessageType::GoAway),
84            0x11 => Some(MessageType::SubscribeAnnounces),
85            0x12 => Some(MessageType::SubscribeAnnouncesOk),
86            0x13 => Some(MessageType::SubscribeAnnouncesError),
87            0x14 => Some(MessageType::UnsubscribeAnnounces),
88            0x15 => Some(MessageType::MaxSubscribeId),
89            0x16 => Some(MessageType::Fetch),
90            0x17 => Some(MessageType::FetchCancel),
91            0x18 => Some(MessageType::FetchOk),
92            0x19 => Some(MessageType::FetchError),
93            0x40 => Some(MessageType::ClientSetup),
94            0x41 => Some(MessageType::ServerSetup),
95            _ => None,
96        }
97    }
98
99    /// Return the wire ID for this message type.
100    pub fn id(&self) -> u64 {
101        *self as u64
102    }
103}
104
105// ============================================================
106// Session Lifecycle Messages
107// ============================================================
108
109/// CLIENT_SETUP message (type 0x40).
110#[derive(Debug, Clone, PartialEq, Eq)]
111pub struct ClientSetup {
112    /// The list of MoQT versions supported by the client.
113    pub supported_versions: Vec<VarInt>,
114    /// Setup parameters sent by the client.
115    pub parameters: Vec<KeyValuePair>,
116}
117
118/// SERVER_SETUP message (type 0x41).
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub struct ServerSetup {
121    /// The MoQT version selected by the server.
122    pub selected_version: VarInt,
123    /// Setup parameters sent by the server.
124    pub parameters: Vec<KeyValuePair>,
125}
126
127/// GOAWAY message (type 0x10).
128#[derive(Debug, Clone, PartialEq, Eq)]
129pub struct GoAway {
130    /// The URI for the new session the client should connect to.
131    pub new_session_uri: Vec<u8>,
132}
133
134/// MAX_SUBSCRIBE_ID message (type 0x15).
135#[derive(Debug, Clone, PartialEq, Eq)]
136pub struct MaxSubscribeId {
137    /// The maximum subscribe ID the peer is willing to accept.
138    pub subscribe_id: VarInt,
139}
140
141// ============================================================
142// Subscribe Messages
143// ============================================================
144
145/// SUBSCRIBE message (type 0x03).
146#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct Subscribe {
148    /// The subscribe ID for this request.
149    pub subscribe_id: VarInt,
150    /// The track alias assigned by the subscriber.
151    pub track_alias: VarInt,
152    /// The track namespace to subscribe to.
153    pub track_namespace: TrackNamespace,
154    /// The track name within the namespace.
155    pub track_name: Vec<u8>,
156    /// The priority of this subscriber relative to others.
157    pub subscriber_priority: u8,
158    /// The requested group delivery order.
159    pub group_order: GroupOrder,
160    /// The filter type controlling which objects are delivered.
161    pub filter_type: FilterType,
162    /// Present only for AbsoluteStart and AbsoluteRange filter types.
163    pub start_location: Option<Location>,
164    /// Present only for AbsoluteRange filter type (end_group).
165    pub end_group: Option<VarInt>,
166    /// Present only for AbsoluteRange filter type (end_object).
167    pub end_object: Option<VarInt>,
168    /// Subscribe parameters.
169    pub parameters: Vec<KeyValuePair>,
170}
171
172/// SUBSCRIBE_OK message (type 0x04).
173#[derive(Debug, Clone, PartialEq, Eq)]
174pub struct SubscribeOk {
175    /// The subscribe ID this response corresponds to.
176    pub subscribe_id: VarInt,
177    /// The expiration time for this subscription in milliseconds.
178    pub expires: VarInt,
179    /// The group delivery order chosen by the publisher.
180    pub group_order: GroupOrder,
181    /// Whether the largest location is included.
182    pub content_exists: ContentExists,
183    /// Present only when content_exists == HasLargestLocation.
184    pub largest_group_id: Option<VarInt>,
185    /// Present only when content_exists == HasLargestLocation.
186    pub largest_object_id: Option<VarInt>,
187    /// Subscribe OK parameters.
188    pub parameters: Vec<KeyValuePair>,
189}
190
191/// SUBSCRIBE_ERROR message (type 0x05).
192#[derive(Debug, Clone, PartialEq, Eq)]
193pub struct SubscribeError {
194    /// The subscribe ID this error corresponds to.
195    pub subscribe_id: VarInt,
196    /// The error code indicating the reason for failure.
197    pub error_code: VarInt,
198    /// A human-readable reason for the error.
199    pub reason_phrase: Vec<u8>,
200    /// The track alias from the original subscribe request.
201    pub track_alias: VarInt,
202}
203
204/// SUBSCRIBE_UPDATE message (type 0x02).
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct SubscribeUpdate {
207    /// The subscribe ID to update.
208    pub subscribe_id: VarInt,
209    /// The new start group.
210    pub start_group: VarInt,
211    /// The new start object.
212    pub start_object: VarInt,
213    /// The new end group.
214    pub end_group: VarInt,
215    /// The new end object.
216    pub end_object: VarInt,
217    /// The updated subscriber priority.
218    pub subscriber_priority: u8,
219    /// Updated subscribe parameters.
220    pub parameters: Vec<KeyValuePair>,
221}
222
223/// SUBSCRIBE_DONE message (type 0x0B).
224#[derive(Debug, Clone, PartialEq, Eq)]
225pub struct SubscribeDone {
226    /// The subscribe ID this message refers to.
227    pub subscribe_id: VarInt,
228    /// The status code for the subscription completion.
229    pub status_code: VarInt,
230    /// A human-readable reason phrase.
231    pub reason_phrase: Vec<u8>,
232    /// Whether the final location is included.
233    pub content_exists: ContentExists,
234    /// Present only when content_exists == HasLargestLocation.
235    pub final_group: Option<VarInt>,
236    /// Present only when content_exists == HasLargestLocation.
237    pub final_object: Option<VarInt>,
238}
239
240/// UNSUBSCRIBE message (type 0x0A).
241#[derive(Debug, Clone, PartialEq, Eq)]
242pub struct Unsubscribe {
243    /// The subscribe ID to unsubscribe from.
244    pub subscribe_id: VarInt,
245}
246
247// ============================================================
248// Announce Messages
249// ============================================================
250
251/// ANNOUNCE message (type 0x06).
252#[derive(Debug, Clone, PartialEq, Eq)]
253pub struct Announce {
254    /// The track namespace being announced.
255    pub track_namespace: TrackNamespace,
256    /// Announce parameters.
257    pub parameters: Vec<KeyValuePair>,
258}
259
260/// ANNOUNCE_OK message (type 0x07).
261#[derive(Debug, Clone, PartialEq, Eq)]
262pub struct AnnounceOk {
263    /// The track namespace that was accepted.
264    pub track_namespace: TrackNamespace,
265}
266
267/// ANNOUNCE_ERROR message (type 0x08).
268#[derive(Debug, Clone, PartialEq, Eq)]
269pub struct AnnounceError {
270    /// The track namespace that was rejected.
271    pub track_namespace: TrackNamespace,
272    /// The error code indicating the reason for failure.
273    pub error_code: VarInt,
274    /// A human-readable reason for the error.
275    pub reason_phrase: Vec<u8>,
276}
277
278/// ANNOUNCE_CANCEL message (type 0x0C).
279#[derive(Debug, Clone, PartialEq, Eq)]
280pub struct AnnounceCancel {
281    /// The track namespace being cancelled.
282    pub track_namespace: TrackNamespace,
283    /// The error code indicating the reason for cancellation.
284    pub error_code: VarInt,
285    /// A human-readable reason for the cancellation.
286    pub reason_phrase: Vec<u8>,
287}
288
289/// UNANNOUNCE message (type 0x09).
290#[derive(Debug, Clone, PartialEq, Eq)]
291pub struct Unannounce {
292    /// The track namespace being unannounced.
293    pub track_namespace: TrackNamespace,
294}
295
296// ============================================================
297// Subscribe Announces Messages
298// ============================================================
299
300/// SUBSCRIBE_ANNOUNCES message (type 0x11).
301#[derive(Debug, Clone, PartialEq, Eq)]
302pub struct SubscribeAnnounces {
303    /// The track namespace prefix to subscribe to announcements for.
304    pub track_namespace_prefix: TrackNamespace,
305    /// Subscribe announces parameters.
306    pub parameters: Vec<KeyValuePair>,
307}
308
309/// SUBSCRIBE_ANNOUNCES_OK message (type 0x12).
310#[derive(Debug, Clone, PartialEq, Eq)]
311pub struct SubscribeAnnouncesOk {
312    /// The track namespace prefix that was accepted.
313    pub track_namespace_prefix: TrackNamespace,
314}
315
316/// SUBSCRIBE_ANNOUNCES_ERROR message (type 0x13).
317#[derive(Debug, Clone, PartialEq, Eq)]
318pub struct SubscribeAnnouncesError {
319    /// The track namespace prefix that was rejected.
320    pub track_namespace_prefix: TrackNamespace,
321    /// The error code indicating the reason for failure.
322    pub error_code: VarInt,
323    /// A human-readable reason for the error.
324    pub reason_phrase: Vec<u8>,
325}
326
327/// UNSUBSCRIBE_ANNOUNCES message (type 0x14).
328#[derive(Debug, Clone, PartialEq, Eq)]
329pub struct UnsubscribeAnnounces {
330    /// The track namespace prefix to unsubscribe from.
331    pub track_namespace_prefix: TrackNamespace,
332}
333
334// ============================================================
335// Track Status Messages
336// ============================================================
337
338/// TRACK_STATUS_REQUEST message (type 0x0D).
339#[derive(Debug, Clone, PartialEq, Eq)]
340pub struct TrackStatusRequest {
341    /// The track namespace to query status for.
342    pub track_namespace: TrackNamespace,
343    /// The track name to query status for.
344    pub track_name: Vec<u8>,
345}
346
347/// TRACK_STATUS message (type 0x0E).
348#[derive(Debug, Clone, PartialEq, Eq)]
349pub struct TrackStatus {
350    /// The track namespace this status is for.
351    pub track_namespace: TrackNamespace,
352    /// The track name this status is for.
353    pub track_name: Vec<u8>,
354    /// The status code for the track.
355    pub status_code: VarInt,
356    /// The last group ID available on this track.
357    pub last_group_id: VarInt,
358    /// The last object ID available on this track.
359    pub last_object_id: VarInt,
360}
361
362// ============================================================
363// Fetch Messages
364// ============================================================
365
366/// FETCH message (type 0x16).
367#[derive(Debug, Clone, PartialEq, Eq)]
368pub struct Fetch {
369    /// The subscribe ID for this fetch request.
370    pub subscribe_id: VarInt,
371    /// The track namespace to fetch from.
372    pub track_namespace: TrackNamespace,
373    /// The track name to fetch.
374    pub track_name: Vec<u8>,
375    /// The priority of this subscriber relative to others.
376    pub subscriber_priority: u8,
377    /// The requested group delivery order.
378    pub group_order: GroupOrder,
379    /// The start group for the fetch range.
380    pub start_group: VarInt,
381    /// The start object for the fetch range.
382    pub start_object: VarInt,
383    /// The end group for the fetch range.
384    pub end_group: VarInt,
385    /// The end object for the fetch range.
386    pub end_object: VarInt,
387    /// Fetch parameters.
388    pub parameters: Vec<KeyValuePair>,
389}
390
391/// FETCH_OK message (type 0x18).
392#[derive(Debug, Clone, PartialEq, Eq)]
393pub struct FetchOk {
394    /// The subscribe ID this response corresponds to.
395    pub subscribe_id: VarInt,
396    /// The group delivery order chosen by the publisher.
397    pub group_order: GroupOrder,
398    /// Whether this fetch reaches the end of the track (1 = yes).
399    pub end_of_track: u8,
400    /// Present only when end_of_track == 1.
401    pub largest_group_id: Option<VarInt>,
402    /// Present only when end_of_track == 1.
403    pub largest_object_id: Option<VarInt>,
404    /// Fetch OK parameters.
405    pub parameters: Vec<KeyValuePair>,
406}
407
408/// FETCH_ERROR message (type 0x19).
409#[derive(Debug, Clone, PartialEq, Eq)]
410pub struct FetchError {
411    /// The subscribe ID this error corresponds to.
412    pub subscribe_id: VarInt,
413    /// The error code indicating the reason for failure.
414    pub error_code: VarInt,
415    /// A human-readable reason for the error.
416    pub reason_phrase: Vec<u8>,
417}
418
419/// FETCH_CANCEL message (type 0x17).
420#[derive(Debug, Clone, PartialEq, Eq)]
421pub struct FetchCancel {
422    /// The subscribe ID for the fetch to cancel.
423    pub subscribe_id: VarInt,
424}
425
426// ============================================================
427// Unified Message Enum
428// ============================================================
429
430/// A decoded MoQT control message (draft-07).
431#[derive(Debug, Clone, PartialEq, Eq)]
432pub enum ControlMessage {
433    /// ClientSetup (type 0x40).
434    ClientSetup(ClientSetup),
435    /// ServerSetup (type 0x41).
436    ServerSetup(ServerSetup),
437    /// GoAway (type 0x10).
438    GoAway(GoAway),
439    /// MaxSubscribeId (type 0x15).
440    MaxSubscribeId(MaxSubscribeId),
441    /// Subscribe (type 0x03).
442    Subscribe(Subscribe),
443    /// SubscribeOk (type 0x04).
444    SubscribeOk(SubscribeOk),
445    /// SubscribeError (type 0x05).
446    SubscribeError(SubscribeError),
447    /// SubscribeUpdate (type 0x02).
448    SubscribeUpdate(SubscribeUpdate),
449    /// SubscribeDone (type 0x0B).
450    SubscribeDone(SubscribeDone),
451    /// Unsubscribe (type 0x0A).
452    Unsubscribe(Unsubscribe),
453    /// Announce (type 0x06).
454    Announce(Announce),
455    /// AnnounceOk (type 0x07).
456    AnnounceOk(AnnounceOk),
457    /// AnnounceError (type 0x08).
458    AnnounceError(AnnounceError),
459    /// AnnounceCancel (type 0x0C).
460    AnnounceCancel(AnnounceCancel),
461    /// Unannounce (type 0x09).
462    Unannounce(Unannounce),
463    /// SubscribeAnnounces (type 0x11).
464    SubscribeAnnounces(SubscribeAnnounces),
465    /// SubscribeAnnouncesOk (type 0x12).
466    SubscribeAnnouncesOk(SubscribeAnnouncesOk),
467    /// SubscribeAnnouncesError (type 0x13).
468    SubscribeAnnouncesError(SubscribeAnnouncesError),
469    /// UnsubscribeAnnounces (type 0x14).
470    UnsubscribeAnnounces(UnsubscribeAnnounces),
471    /// TrackStatusRequest (type 0x0D).
472    TrackStatusRequest(TrackStatusRequest),
473    /// TrackStatus (type 0x0E).
474    TrackStatus(TrackStatus),
475    /// Fetch (type 0x16).
476    Fetch(Fetch),
477    /// FetchOk (type 0x18).
478    FetchOk(FetchOk),
479    /// FetchError (type 0x19).
480    FetchError(FetchError),
481    /// FetchCancel (type 0x17).
482    FetchCancel(FetchCancel),
483}
484
485impl ControlMessage {
486    /// Return the message type for this control message.
487    pub fn message_type(&self) -> MessageType {
488        match self {
489            ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
490            ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
491            ControlMessage::GoAway(_) => MessageType::GoAway,
492            ControlMessage::MaxSubscribeId(_) => MessageType::MaxSubscribeId,
493            ControlMessage::Subscribe(_) => MessageType::Subscribe,
494            ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
495            ControlMessage::SubscribeError(_) => MessageType::SubscribeError,
496            ControlMessage::SubscribeUpdate(_) => MessageType::SubscribeUpdate,
497            ControlMessage::SubscribeDone(_) => MessageType::SubscribeDone,
498            ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
499            ControlMessage::Announce(_) => MessageType::Announce,
500            ControlMessage::AnnounceOk(_) => MessageType::AnnounceOk,
501            ControlMessage::AnnounceError(_) => MessageType::AnnounceError,
502            ControlMessage::AnnounceCancel(_) => MessageType::AnnounceCancel,
503            ControlMessage::Unannounce(_) => MessageType::Unannounce,
504            ControlMessage::SubscribeAnnounces(_) => MessageType::SubscribeAnnounces,
505            ControlMessage::SubscribeAnnouncesOk(_) => MessageType::SubscribeAnnouncesOk,
506            ControlMessage::SubscribeAnnouncesError(_) => MessageType::SubscribeAnnouncesError,
507            ControlMessage::UnsubscribeAnnounces(_) => MessageType::UnsubscribeAnnounces,
508            ControlMessage::TrackStatusRequest(_) => MessageType::TrackStatusRequest,
509            ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
510            ControlMessage::Fetch(_) => MessageType::Fetch,
511            ControlMessage::FetchOk(_) => MessageType::FetchOk,
512            ControlMessage::FetchError(_) => MessageType::FetchError,
513            ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
514        }
515    }
516
517    /// Encode this control message (type ID + length prefix + payload).
518    pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
519        let mut payload = Vec::with_capacity(256);
520        self.encode_payload(&mut payload)?;
521
522        if payload.len() > MAX_MESSAGE_LENGTH {
523            return Err(CodecError::MessageTooLong(payload.len()));
524        }
525
526        VarInt::from_usize(self.message_type().id() as usize).encode(buf);
527        VarInt::from_usize(payload.len()).encode(buf);
528        buf.put_slice(&payload);
529        Ok(())
530    }
531
532    /// Decode a control message from bytes.
533    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
534        let type_id = VarInt::decode(buf)?.into_inner();
535        let msg_type =
536            MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
537        let payload_len = VarInt::decode(buf)?.into_inner() as usize;
538        if buf.remaining() < payload_len {
539            return Err(CodecError::UnexpectedEnd);
540        }
541        let payload_bytes = buf.copy_to_bytes(payload_len);
542        let mut payload = &payload_bytes[..];
543        Self::decode_payload(msg_type, &mut payload)
544    }
545
546    fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
547        match self {
548            ControlMessage::ClientSetup(m) => {
549                VarInt::from_usize(m.supported_versions.len()).encode(buf);
550                for v in &m.supported_versions {
551                    v.encode(buf);
552                }
553                KeyValuePair::encode_list_d07(&m.parameters, buf);
554            }
555            ControlMessage::ServerSetup(m) => {
556                m.selected_version.encode(buf);
557                KeyValuePair::encode_list_d07(&m.parameters, buf);
558            }
559            ControlMessage::GoAway(m) => {
560                if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
561                    return Err(CodecError::GoAwayUriTooLong);
562                }
563                VarInt::from_usize(m.new_session_uri.len()).encode(buf);
564                buf.put_slice(&m.new_session_uri);
565            }
566            ControlMessage::MaxSubscribeId(m) => {
567                m.subscribe_id.encode(buf);
568            }
569            ControlMessage::Subscribe(m) => {
570                m.subscribe_id.encode(buf);
571                m.track_alias.encode(buf);
572                m.track_namespace.encode(buf);
573                VarInt::from_usize(m.track_name.len()).encode(buf);
574                buf.put_slice(&m.track_name);
575                buf.put_u8(m.subscriber_priority);
576                buf.put_u8(m.group_order as u8);
577                buf.put_u8(m.filter_type as u8);
578                if let Some(loc) = &m.start_location {
579                    loc.encode(buf);
580                }
581                if let Some(eg) = &m.end_group {
582                    eg.encode(buf);
583                }
584                if let Some(eo) = &m.end_object {
585                    eo.encode(buf);
586                }
587                KeyValuePair::encode_list_d07(&m.parameters, buf);
588            }
589            ControlMessage::SubscribeOk(m) => {
590                m.subscribe_id.encode(buf);
591                m.expires.encode(buf);
592                buf.put_u8(m.group_order as u8);
593                buf.put_u8(m.content_exists as u8);
594                if let Some(gid) = &m.largest_group_id {
595                    gid.encode(buf);
596                }
597                if let Some(oid) = &m.largest_object_id {
598                    oid.encode(buf);
599                }
600                KeyValuePair::encode_list_d07(&m.parameters, buf);
601            }
602            ControlMessage::SubscribeError(m) => {
603                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
604                    return Err(CodecError::ReasonPhraseTooLong);
605                }
606                m.subscribe_id.encode(buf);
607                m.error_code.encode(buf);
608                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
609                buf.put_slice(&m.reason_phrase);
610                m.track_alias.encode(buf);
611            }
612            ControlMessage::SubscribeUpdate(m) => {
613                m.subscribe_id.encode(buf);
614                m.start_group.encode(buf);
615                m.start_object.encode(buf);
616                m.end_group.encode(buf);
617                m.end_object.encode(buf);
618                buf.put_u8(m.subscriber_priority);
619                KeyValuePair::encode_list_d07(&m.parameters, buf);
620            }
621            ControlMessage::SubscribeDone(m) => {
622                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
623                    return Err(CodecError::ReasonPhraseTooLong);
624                }
625                m.subscribe_id.encode(buf);
626                m.status_code.encode(buf);
627                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
628                buf.put_slice(&m.reason_phrase);
629                buf.put_u8(m.content_exists as u8);
630                if let Some(fg) = &m.final_group {
631                    fg.encode(buf);
632                }
633                if let Some(fo) = &m.final_object {
634                    fo.encode(buf);
635                }
636            }
637            ControlMessage::Unsubscribe(m) => {
638                m.subscribe_id.encode(buf);
639            }
640            ControlMessage::Announce(m) => {
641                m.track_namespace.encode(buf);
642                KeyValuePair::encode_list_d07(&m.parameters, buf);
643            }
644            ControlMessage::AnnounceOk(m) => {
645                m.track_namespace.encode(buf);
646            }
647            ControlMessage::AnnounceError(m) => {
648                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
649                    return Err(CodecError::ReasonPhraseTooLong);
650                }
651                m.track_namespace.encode(buf);
652                m.error_code.encode(buf);
653                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
654                buf.put_slice(&m.reason_phrase);
655            }
656            ControlMessage::AnnounceCancel(m) => {
657                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
658                    return Err(CodecError::ReasonPhraseTooLong);
659                }
660                m.track_namespace.encode(buf);
661                m.error_code.encode(buf);
662                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
663                buf.put_slice(&m.reason_phrase);
664            }
665            ControlMessage::Unannounce(m) => {
666                m.track_namespace.encode(buf);
667            }
668            ControlMessage::SubscribeAnnounces(m) => {
669                m.track_namespace_prefix.encode(buf);
670                KeyValuePair::encode_list_d07(&m.parameters, buf);
671            }
672            ControlMessage::SubscribeAnnouncesOk(m) => {
673                m.track_namespace_prefix.encode(buf);
674            }
675            ControlMessage::SubscribeAnnouncesError(m) => {
676                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
677                    return Err(CodecError::ReasonPhraseTooLong);
678                }
679                m.track_namespace_prefix.encode(buf);
680                m.error_code.encode(buf);
681                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
682                buf.put_slice(&m.reason_phrase);
683            }
684            ControlMessage::UnsubscribeAnnounces(m) => {
685                m.track_namespace_prefix.encode(buf);
686            }
687            ControlMessage::TrackStatusRequest(m) => {
688                m.track_namespace.encode(buf);
689                VarInt::from_usize(m.track_name.len()).encode(buf);
690                buf.put_slice(&m.track_name);
691            }
692            ControlMessage::TrackStatus(m) => {
693                m.track_namespace.encode(buf);
694                VarInt::from_usize(m.track_name.len()).encode(buf);
695                buf.put_slice(&m.track_name);
696                m.status_code.encode(buf);
697                m.last_group_id.encode(buf);
698                m.last_object_id.encode(buf);
699            }
700            ControlMessage::Fetch(m) => {
701                m.subscribe_id.encode(buf);
702                m.track_namespace.encode(buf);
703                VarInt::from_usize(m.track_name.len()).encode(buf);
704                buf.put_slice(&m.track_name);
705                buf.put_u8(m.subscriber_priority);
706                buf.put_u8(m.group_order as u8);
707                m.start_group.encode(buf);
708                m.start_object.encode(buf);
709                m.end_group.encode(buf);
710                m.end_object.encode(buf);
711                KeyValuePair::encode_list_d07(&m.parameters, buf);
712            }
713            ControlMessage::FetchOk(m) => {
714                m.subscribe_id.encode(buf);
715                buf.put_u8(m.group_order as u8);
716                buf.put_u8(m.end_of_track);
717                if let Some(gid) = &m.largest_group_id {
718                    gid.encode(buf);
719                }
720                if let Some(oid) = &m.largest_object_id {
721                    oid.encode(buf);
722                }
723                KeyValuePair::encode_list_d07(&m.parameters, buf);
724            }
725            ControlMessage::FetchError(m) => {
726                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
727                    return Err(CodecError::ReasonPhraseTooLong);
728                }
729                m.subscribe_id.encode(buf);
730                m.error_code.encode(buf);
731                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
732                buf.put_slice(&m.reason_phrase);
733            }
734            ControlMessage::FetchCancel(m) => {
735                m.subscribe_id.encode(buf);
736            }
737        }
738        Ok(())
739    }
740
741    fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
742        match msg_type {
743            MessageType::ClientSetup => {
744                let num_versions = VarInt::decode(buf)?.into_inner() as usize;
745                if num_versions == 0 {
746                    return Err(CodecError::InvalidField);
747                }
748                let mut supported_versions = Vec::with_capacity(num_versions);
749                for _ in 0..num_versions {
750                    supported_versions.push(VarInt::decode(buf)?);
751                }
752                let parameters = KeyValuePair::decode_list_d07(buf)?;
753                Ok(ControlMessage::ClientSetup(ClientSetup { supported_versions, parameters }))
754            }
755            MessageType::ServerSetup => {
756                let selected_version = VarInt::decode(buf)?;
757                let parameters = KeyValuePair::decode_list_d07(buf)?;
758                Ok(ControlMessage::ServerSetup(ServerSetup { selected_version, parameters }))
759            }
760            MessageType::GoAway => {
761                let uri_len = VarInt::decode(buf)?.into_inner() as usize;
762                let uri = read_bytes(buf, uri_len)?;
763                Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
764            }
765            MessageType::MaxSubscribeId => {
766                let subscribe_id = VarInt::decode(buf)?;
767                Ok(ControlMessage::MaxSubscribeId(MaxSubscribeId { subscribe_id }))
768            }
769            MessageType::Subscribe => {
770                let subscribe_id = VarInt::decode(buf)?;
771                let track_alias = VarInt::decode(buf)?;
772                let track_namespace = TrackNamespace::decode(buf)?;
773                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
774                let track_name = read_bytes(buf, track_name_len)?;
775                if buf.remaining() < 3 {
776                    return Err(CodecError::UnexpectedEnd);
777                }
778                let subscriber_priority = buf.get_u8();
779                let group_order =
780                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
781                let filter_val = buf.get_u8();
782                let filter_type =
783                    FilterType::from_u8(filter_val).ok_or(CodecError::InvalidField)?;
784                let start_location = match filter_type {
785                    FilterType::AbsoluteStart | FilterType::AbsoluteRange => {
786                        Some(Location::decode(buf)?)
787                    }
788                    _ => None,
789                };
790                let (end_group, end_object) = match filter_type {
791                    FilterType::AbsoluteRange => {
792                        let eg = VarInt::decode(buf)?;
793                        let eo = VarInt::decode(buf)?;
794                        (Some(eg), Some(eo))
795                    }
796                    _ => (None, None),
797                };
798                let parameters = KeyValuePair::decode_list_d07(buf)?;
799                Ok(ControlMessage::Subscribe(Subscribe {
800                    subscribe_id,
801                    track_alias,
802                    track_namespace,
803                    track_name,
804                    subscriber_priority,
805                    group_order,
806                    filter_type,
807                    start_location,
808                    end_group,
809                    end_object,
810                    parameters,
811                }))
812            }
813            MessageType::SubscribeOk => {
814                let subscribe_id = VarInt::decode(buf)?;
815                let expires = VarInt::decode(buf)?;
816                if buf.remaining() < 2 {
817                    return Err(CodecError::UnexpectedEnd);
818                }
819                let group_order =
820                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
821                let content_exists_val = buf.get_u8();
822                let content_exists = match content_exists_val {
823                    0 => ContentExists::NoLargestLocation,
824                    1 => ContentExists::HasLargestLocation,
825                    _ => return Err(CodecError::InvalidField),
826                };
827                let (largest_group_id, largest_object_id) =
828                    if content_exists == ContentExists::HasLargestLocation {
829                        let gid = VarInt::decode(buf)?;
830                        let oid = VarInt::decode(buf)?;
831                        (Some(gid), Some(oid))
832                    } else {
833                        (None, None)
834                    };
835                let parameters = KeyValuePair::decode_list_d07(buf)?;
836                Ok(ControlMessage::SubscribeOk(SubscribeOk {
837                    subscribe_id,
838                    expires,
839                    group_order,
840                    content_exists,
841                    largest_group_id,
842                    largest_object_id,
843                    parameters,
844                }))
845            }
846            MessageType::SubscribeError => {
847                let subscribe_id = VarInt::decode(buf)?;
848                let error_code = VarInt::decode(buf)?;
849                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
850                let reason_phrase = read_bytes(buf, reason_len)?;
851                let track_alias = VarInt::decode(buf)?;
852                Ok(ControlMessage::SubscribeError(SubscribeError {
853                    subscribe_id,
854                    error_code,
855                    reason_phrase,
856                    track_alias,
857                }))
858            }
859            MessageType::SubscribeUpdate => {
860                let subscribe_id = VarInt::decode(buf)?;
861                let start_group = VarInt::decode(buf)?;
862                let start_object = VarInt::decode(buf)?;
863                let end_group = VarInt::decode(buf)?;
864                let end_object = VarInt::decode(buf)?;
865                if buf.remaining() < 1 {
866                    return Err(CodecError::UnexpectedEnd);
867                }
868                let subscriber_priority = buf.get_u8();
869                let parameters = KeyValuePair::decode_list_d07(buf)?;
870                Ok(ControlMessage::SubscribeUpdate(SubscribeUpdate {
871                    subscribe_id,
872                    start_group,
873                    start_object,
874                    end_group,
875                    end_object,
876                    subscriber_priority,
877                    parameters,
878                }))
879            }
880            MessageType::SubscribeDone => {
881                let subscribe_id = VarInt::decode(buf)?;
882                let status_code = VarInt::decode(buf)?;
883                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
884                let reason_phrase = read_bytes(buf, reason_len)?;
885                if buf.remaining() < 1 {
886                    return Err(CodecError::UnexpectedEnd);
887                }
888                let content_exists_val = buf.get_u8();
889                let content_exists = match content_exists_val {
890                    0 => ContentExists::NoLargestLocation,
891                    1 => ContentExists::HasLargestLocation,
892                    _ => return Err(CodecError::InvalidField),
893                };
894                let (final_group, final_object) =
895                    if content_exists == ContentExists::HasLargestLocation {
896                        let fg = VarInt::decode(buf)?;
897                        let fo = VarInt::decode(buf)?;
898                        (Some(fg), Some(fo))
899                    } else {
900                        (None, None)
901                    };
902                Ok(ControlMessage::SubscribeDone(SubscribeDone {
903                    subscribe_id,
904                    status_code,
905                    reason_phrase,
906                    content_exists,
907                    final_group,
908                    final_object,
909                }))
910            }
911            MessageType::Unsubscribe => {
912                let subscribe_id = VarInt::decode(buf)?;
913                Ok(ControlMessage::Unsubscribe(Unsubscribe { subscribe_id }))
914            }
915            MessageType::Announce => {
916                let track_namespace = TrackNamespace::decode(buf)?;
917                let parameters = KeyValuePair::decode_list_d07(buf)?;
918                Ok(ControlMessage::Announce(Announce { track_namespace, parameters }))
919            }
920            MessageType::AnnounceOk => {
921                let track_namespace = TrackNamespace::decode(buf)?;
922                Ok(ControlMessage::AnnounceOk(AnnounceOk { track_namespace }))
923            }
924            MessageType::AnnounceError => {
925                let track_namespace = TrackNamespace::decode(buf)?;
926                let error_code = VarInt::decode(buf)?;
927                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
928                let reason_phrase = read_bytes(buf, reason_len)?;
929                Ok(ControlMessage::AnnounceError(AnnounceError {
930                    track_namespace,
931                    error_code,
932                    reason_phrase,
933                }))
934            }
935            MessageType::AnnounceCancel => {
936                let track_namespace = TrackNamespace::decode(buf)?;
937                let error_code = VarInt::decode(buf)?;
938                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
939                let reason_phrase = read_bytes(buf, reason_len)?;
940                Ok(ControlMessage::AnnounceCancel(AnnounceCancel {
941                    track_namespace,
942                    error_code,
943                    reason_phrase,
944                }))
945            }
946            MessageType::Unannounce => {
947                let track_namespace = TrackNamespace::decode(buf)?;
948                Ok(ControlMessage::Unannounce(Unannounce { track_namespace }))
949            }
950            MessageType::SubscribeAnnounces => {
951                let track_namespace_prefix = TrackNamespace::decode(buf)?;
952                let parameters = KeyValuePair::decode_list_d07(buf)?;
953                Ok(ControlMessage::SubscribeAnnounces(SubscribeAnnounces {
954                    track_namespace_prefix,
955                    parameters,
956                }))
957            }
958            MessageType::SubscribeAnnouncesOk => {
959                let track_namespace_prefix = TrackNamespace::decode(buf)?;
960                Ok(ControlMessage::SubscribeAnnouncesOk(SubscribeAnnouncesOk {
961                    track_namespace_prefix,
962                }))
963            }
964            MessageType::SubscribeAnnouncesError => {
965                let track_namespace_prefix = TrackNamespace::decode(buf)?;
966                let error_code = VarInt::decode(buf)?;
967                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
968                let reason_phrase = read_bytes(buf, reason_len)?;
969                Ok(ControlMessage::SubscribeAnnouncesError(SubscribeAnnouncesError {
970                    track_namespace_prefix,
971                    error_code,
972                    reason_phrase,
973                }))
974            }
975            MessageType::UnsubscribeAnnounces => {
976                let track_namespace_prefix = TrackNamespace::decode(buf)?;
977                Ok(ControlMessage::UnsubscribeAnnounces(UnsubscribeAnnounces {
978                    track_namespace_prefix,
979                }))
980            }
981            MessageType::TrackStatusRequest => {
982                let track_namespace = TrackNamespace::decode(buf)?;
983                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
984                let track_name = read_bytes(buf, track_name_len)?;
985                Ok(ControlMessage::TrackStatusRequest(TrackStatusRequest {
986                    track_namespace,
987                    track_name,
988                }))
989            }
990            MessageType::TrackStatus => {
991                let track_namespace = TrackNamespace::decode(buf)?;
992                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
993                let track_name = read_bytes(buf, track_name_len)?;
994                let status_code = VarInt::decode(buf)?;
995                let last_group_id = VarInt::decode(buf)?;
996                let last_object_id = VarInt::decode(buf)?;
997                Ok(ControlMessage::TrackStatus(TrackStatus {
998                    track_namespace,
999                    track_name,
1000                    status_code,
1001                    last_group_id,
1002                    last_object_id,
1003                }))
1004            }
1005            MessageType::Fetch => {
1006                let subscribe_id = VarInt::decode(buf)?;
1007                let track_namespace = TrackNamespace::decode(buf)?;
1008                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1009                let track_name = read_bytes(buf, track_name_len)?;
1010                if buf.remaining() < 2 {
1011                    return Err(CodecError::UnexpectedEnd);
1012                }
1013                let subscriber_priority = buf.get_u8();
1014                let group_order =
1015                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1016                let start_group = VarInt::decode(buf)?;
1017                let start_object = VarInt::decode(buf)?;
1018                let end_group = VarInt::decode(buf)?;
1019                let end_object = VarInt::decode(buf)?;
1020                let parameters = KeyValuePair::decode_list_d07(buf)?;
1021                Ok(ControlMessage::Fetch(Fetch {
1022                    subscribe_id,
1023                    track_namespace,
1024                    track_name,
1025                    subscriber_priority,
1026                    group_order,
1027                    start_group,
1028                    start_object,
1029                    end_group,
1030                    end_object,
1031                    parameters,
1032                }))
1033            }
1034            MessageType::FetchOk => {
1035                let subscribe_id = VarInt::decode(buf)?;
1036                if buf.remaining() < 2 {
1037                    return Err(CodecError::UnexpectedEnd);
1038                }
1039                let group_order =
1040                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1041                let end_of_track = buf.get_u8();
1042                let largest_group_id = Some(VarInt::decode(buf)?);
1043                let largest_object_id = Some(VarInt::decode(buf)?);
1044                let parameters = KeyValuePair::decode_list_d07(buf)?;
1045                Ok(ControlMessage::FetchOk(FetchOk {
1046                    subscribe_id,
1047                    group_order,
1048                    end_of_track,
1049                    largest_group_id,
1050                    largest_object_id,
1051                    parameters,
1052                }))
1053            }
1054            MessageType::FetchError => {
1055                let subscribe_id = VarInt::decode(buf)?;
1056                let error_code = VarInt::decode(buf)?;
1057                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1058                let reason_phrase = read_bytes(buf, reason_len)?;
1059                Ok(ControlMessage::FetchError(FetchError {
1060                    subscribe_id,
1061                    error_code,
1062                    reason_phrase,
1063                }))
1064            }
1065            MessageType::FetchCancel => {
1066                let subscribe_id = VarInt::decode(buf)?;
1067                Ok(ControlMessage::FetchCancel(FetchCancel { subscribe_id }))
1068            }
1069        }
1070    }
1071}