Skip to main content

moqtap_codec/draft08/
message.rs

1//! Draft-08 control message encoding and decoding.
2
3use crate::error::{
4    CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_REASON_PHRASE_LENGTH,
5};
6use crate::kvp::KeyValuePair;
7use crate::types::read_bytes;
8use crate::types::*;
9use crate::varint::VarInt;
10use bytes::{Buf, BufMut};
11
12/// Control message type IDs (draft-08).
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14#[repr(u64)]
15pub enum MessageType {
16    /// SubscribeUpdate (type 0x02).
17    SubscribeUpdate = 0x02,
18    /// Subscribe (type 0x03).
19    Subscribe = 0x03,
20    /// SubscribeOk (type 0x04).
21    SubscribeOk = 0x04,
22    /// SubscribeError (type 0x05).
23    SubscribeError = 0x05,
24    /// Announce (type 0x06).
25    Announce = 0x06,
26    /// AnnounceOk (type 0x07).
27    AnnounceOk = 0x07,
28    /// AnnounceError (type 0x08).
29    AnnounceError = 0x08,
30    /// Unannounce (type 0x09).
31    Unannounce = 0x09,
32    /// Unsubscribe (type 0x0A).
33    Unsubscribe = 0x0A,
34    /// SubscribeDone (type 0x0B).
35    SubscribeDone = 0x0B,
36    /// AnnounceCancel (type 0x0C).
37    AnnounceCancel = 0x0C,
38    /// TrackStatusRequest (type 0x0D).
39    TrackStatusRequest = 0x0D,
40    /// TrackStatus (type 0x0E).
41    TrackStatus = 0x0E,
42    /// GoAway (type 0x10).
43    GoAway = 0x10,
44    /// SubscribeAnnounces (type 0x11).
45    SubscribeAnnounces = 0x11,
46    /// SubscribeAnnouncesOk (type 0x12).
47    SubscribeAnnouncesOk = 0x12,
48    /// SubscribeAnnouncesError (type 0x13).
49    SubscribeAnnouncesError = 0x13,
50    /// UnsubscribeAnnounces (type 0x14).
51    UnsubscribeAnnounces = 0x14,
52    /// MaxSubscribeId (type 0x15).
53    MaxSubscribeId = 0x15,
54    /// Fetch (type 0x16).
55    Fetch = 0x16,
56    /// FetchCancel (type 0x17).
57    FetchCancel = 0x17,
58    /// FetchOk (type 0x18).
59    FetchOk = 0x18,
60    /// FetchError (type 0x19).
61    FetchError = 0x19,
62    /// SubscribesBlocked (type 0x1A).
63    SubscribesBlocked = 0x1A,
64    /// ClientSetup (type 0x40).
65    ClientSetup = 0x40,
66    /// ServerSetup (type 0x41).
67    ServerSetup = 0x41,
68}
69
70impl MessageType {
71    /// Look up a message type by its wire ID.
72    pub fn from_id(id: u64) -> Option<Self> {
73        match id {
74            0x02 => Some(MessageType::SubscribeUpdate),
75            0x03 => Some(MessageType::Subscribe),
76            0x04 => Some(MessageType::SubscribeOk),
77            0x05 => Some(MessageType::SubscribeError),
78            0x06 => Some(MessageType::Announce),
79            0x07 => Some(MessageType::AnnounceOk),
80            0x08 => Some(MessageType::AnnounceError),
81            0x09 => Some(MessageType::Unannounce),
82            0x0A => Some(MessageType::Unsubscribe),
83            0x0B => Some(MessageType::SubscribeDone),
84            0x0C => Some(MessageType::AnnounceCancel),
85            0x0D => Some(MessageType::TrackStatusRequest),
86            0x0E => Some(MessageType::TrackStatus),
87            0x10 => Some(MessageType::GoAway),
88            0x11 => Some(MessageType::SubscribeAnnounces),
89            0x12 => Some(MessageType::SubscribeAnnouncesOk),
90            0x13 => Some(MessageType::SubscribeAnnouncesError),
91            0x14 => Some(MessageType::UnsubscribeAnnounces),
92            0x15 => Some(MessageType::MaxSubscribeId),
93            0x16 => Some(MessageType::Fetch),
94            0x17 => Some(MessageType::FetchCancel),
95            0x18 => Some(MessageType::FetchOk),
96            0x19 => Some(MessageType::FetchError),
97            0x1A => Some(MessageType::SubscribesBlocked),
98            0x40 => Some(MessageType::ClientSetup),
99            0x41 => Some(MessageType::ServerSetup),
100            _ => None,
101        }
102    }
103
104    /// Return the wire ID for this message type.
105    pub fn id(&self) -> u64 {
106        *self as u64
107    }
108}
109
110// ============================================================
111// Session Lifecycle Messages
112// ============================================================
113
114/// CLIENT_SETUP message (type 0x40).
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct ClientSetup {
117    /// The list of MoQT versions supported by the client.
118    pub supported_versions: Vec<VarInt>,
119    /// Setup parameters sent by the client.
120    pub parameters: Vec<KeyValuePair>,
121}
122
123/// SERVER_SETUP message (type 0x41).
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub struct ServerSetup {
126    /// The MoQT version selected by the server.
127    pub selected_version: VarInt,
128    /// Setup parameters sent by the server.
129    pub parameters: Vec<KeyValuePair>,
130}
131
132/// GOAWAY message (type 0x10).
133#[derive(Debug, Clone, PartialEq, Eq)]
134pub struct GoAway {
135    /// The URI for the new session the client should connect to.
136    pub new_session_uri: Vec<u8>,
137}
138
139/// MAX_SUBSCRIBE_ID message (type 0x15).
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct MaxSubscribeId {
142    /// The maximum subscribe ID the peer is willing to accept.
143    pub subscribe_id: VarInt,
144}
145
146/// SUBSCRIBES_BLOCKED message (type 0x1A).
147#[derive(Debug, Clone, PartialEq, Eq)]
148pub struct SubscribesBlocked {
149    /// The maximum subscribe ID advertised by the peer.
150    pub maximum_subscribe_id: VarInt,
151}
152
153// ============================================================
154// Subscribe Messages
155// ============================================================
156
157/// SUBSCRIBE message (type 0x03).
158///
159/// Draft-08: filter_type is a VarInt. AbsoluteRange has no end_object.
160#[derive(Debug, Clone, PartialEq, Eq)]
161pub struct Subscribe {
162    /// The subscribe ID for this request.
163    pub subscribe_id: VarInt,
164    /// The track alias assigned by the subscriber.
165    pub track_alias: VarInt,
166    /// The track namespace to subscribe to.
167    pub track_namespace: TrackNamespace,
168    /// The track name within the namespace.
169    pub track_name: Vec<u8>,
170    /// The priority of this subscriber relative to others.
171    pub subscriber_priority: u8,
172    /// The requested group delivery order.
173    pub group_order: GroupOrder,
174    /// The filter type controlling which objects are delivered.
175    pub filter_type: FilterType,
176    /// Present only for AbsoluteStart and AbsoluteRange filter types.
177    pub start_location: Option<Location>,
178    /// Present only for AbsoluteRange filter type (end_group only, no end_object).
179    pub end_group: Option<VarInt>,
180    /// Subscribe parameters.
181    pub parameters: Vec<KeyValuePair>,
182}
183
184/// SUBSCRIBE_OK message (type 0x04).
185#[derive(Debug, Clone, PartialEq, Eq)]
186pub struct SubscribeOk {
187    /// The subscribe ID this response corresponds to.
188    pub subscribe_id: VarInt,
189    /// The expiration time for this subscription in milliseconds.
190    pub expires: VarInt,
191    /// The group delivery order chosen by the publisher.
192    pub group_order: GroupOrder,
193    /// Whether the largest location is included.
194    pub content_exists: ContentExists,
195    /// Present only when content_exists == HasLargestLocation.
196    pub largest_group_id: Option<VarInt>,
197    /// Present only when content_exists == HasLargestLocation.
198    pub largest_object_id: Option<VarInt>,
199    /// Subscribe OK parameters.
200    pub parameters: Vec<KeyValuePair>,
201}
202
203/// SUBSCRIBE_ERROR message (type 0x05).
204#[derive(Debug, Clone, PartialEq, Eq)]
205pub struct SubscribeError {
206    /// The subscribe ID this error corresponds to.
207    pub subscribe_id: VarInt,
208    /// The error code indicating the reason for failure.
209    pub error_code: VarInt,
210    /// A human-readable reason for the error.
211    pub reason_phrase: Vec<u8>,
212    /// The track alias from the original subscribe request.
213    pub track_alias: VarInt,
214}
215
216/// SUBSCRIBE_UPDATE message (type 0x02).
217///
218/// Draft-08: no end_object field (only end_group).
219#[derive(Debug, Clone, PartialEq, Eq)]
220pub struct SubscribeUpdate {
221    /// The subscribe ID to update.
222    pub subscribe_id: VarInt,
223    /// The new start group.
224    pub start_group: VarInt,
225    /// The new start object.
226    pub start_object: VarInt,
227    /// The new end group.
228    pub end_group: VarInt,
229    /// The updated subscriber priority.
230    pub subscriber_priority: u8,
231    /// Updated subscribe parameters.
232    pub parameters: Vec<KeyValuePair>,
233}
234
235/// SUBSCRIBE_DONE message (type 0x0B).
236///
237/// Draft-08: has stream_count instead of content_exists/final_group/final_object.
238#[derive(Debug, Clone, PartialEq, Eq)]
239pub struct SubscribeDone {
240    /// The subscribe ID this message refers to.
241    pub subscribe_id: VarInt,
242    /// The status code for the subscription completion.
243    pub status_code: VarInt,
244    /// Number of streams delivered.
245    pub stream_count: VarInt,
246    /// A human-readable reason phrase.
247    pub reason_phrase: Vec<u8>,
248}
249
250/// UNSUBSCRIBE message (type 0x0A).
251#[derive(Debug, Clone, PartialEq, Eq)]
252pub struct Unsubscribe {
253    /// The subscribe ID to unsubscribe from.
254    pub subscribe_id: VarInt,
255}
256
257// ============================================================
258// Announce Messages
259// ============================================================
260
261/// ANNOUNCE message (type 0x06).
262#[derive(Debug, Clone, PartialEq, Eq)]
263pub struct Announce {
264    /// The track namespace being announced.
265    pub track_namespace: TrackNamespace,
266    /// Announce parameters.
267    pub parameters: Vec<KeyValuePair>,
268}
269
270/// ANNOUNCE_OK message (type 0x07).
271#[derive(Debug, Clone, PartialEq, Eq)]
272pub struct AnnounceOk {
273    /// The track namespace that was accepted.
274    pub track_namespace: TrackNamespace,
275}
276
277/// ANNOUNCE_ERROR message (type 0x08).
278#[derive(Debug, Clone, PartialEq, Eq)]
279pub struct AnnounceError {
280    /// The track namespace that was rejected.
281    pub track_namespace: TrackNamespace,
282    /// The error code indicating the reason for failure.
283    pub error_code: VarInt,
284    /// A human-readable reason for the error.
285    pub reason_phrase: Vec<u8>,
286}
287
288/// ANNOUNCE_CANCEL message (type 0x0C).
289#[derive(Debug, Clone, PartialEq, Eq)]
290pub struct AnnounceCancel {
291    /// The track namespace being cancelled.
292    pub track_namespace: TrackNamespace,
293    /// The error code indicating the reason for cancellation.
294    pub error_code: VarInt,
295    /// A human-readable reason for the cancellation.
296    pub reason_phrase: Vec<u8>,
297}
298
299/// UNANNOUNCE message (type 0x09).
300#[derive(Debug, Clone, PartialEq, Eq)]
301pub struct Unannounce {
302    /// The track namespace being unannounced.
303    pub track_namespace: TrackNamespace,
304}
305
306// ============================================================
307// Subscribe Announces Messages
308// ============================================================
309
310/// SUBSCRIBE_ANNOUNCES message (type 0x11).
311#[derive(Debug, Clone, PartialEq, Eq)]
312pub struct SubscribeAnnounces {
313    /// The track namespace prefix to subscribe to announcements for.
314    pub track_namespace_prefix: TrackNamespace,
315    /// Subscribe announces parameters.
316    pub parameters: Vec<KeyValuePair>,
317}
318
319/// SUBSCRIBE_ANNOUNCES_OK message (type 0x12).
320#[derive(Debug, Clone, PartialEq, Eq)]
321pub struct SubscribeAnnouncesOk {
322    /// The track namespace prefix that was accepted.
323    pub track_namespace_prefix: TrackNamespace,
324}
325
326/// SUBSCRIBE_ANNOUNCES_ERROR message (type 0x13).
327#[derive(Debug, Clone, PartialEq, Eq)]
328pub struct SubscribeAnnouncesError {
329    /// The track namespace prefix that was rejected.
330    pub track_namespace_prefix: TrackNamespace,
331    /// The error code indicating the reason for failure.
332    pub error_code: VarInt,
333    /// A human-readable reason for the error.
334    pub reason_phrase: Vec<u8>,
335}
336
337/// UNSUBSCRIBE_ANNOUNCES message (type 0x14).
338#[derive(Debug, Clone, PartialEq, Eq)]
339pub struct UnsubscribeAnnounces {
340    /// The track namespace prefix to unsubscribe from.
341    pub track_namespace_prefix: TrackNamespace,
342}
343
344// ============================================================
345// Track Status Messages
346// ============================================================
347
348/// TRACK_STATUS_REQUEST message (type 0x0D).
349#[derive(Debug, Clone, PartialEq, Eq)]
350pub struct TrackStatusRequest {
351    /// The track namespace to query status for.
352    pub track_namespace: TrackNamespace,
353    /// The track name to query status for.
354    pub track_name: Vec<u8>,
355}
356
357/// TRACK_STATUS message (type 0x0E).
358#[derive(Debug, Clone, PartialEq, Eq)]
359pub struct TrackStatus {
360    /// The track namespace this status is for.
361    pub track_namespace: TrackNamespace,
362    /// The track name this status is for.
363    pub track_name: Vec<u8>,
364    /// The status code for the track.
365    pub status_code: VarInt,
366    /// The last group ID available on this track.
367    pub last_group_id: VarInt,
368    /// The last object ID available on this track.
369    pub last_object_id: VarInt,
370}
371
372// ============================================================
373// Fetch Messages
374// ============================================================
375
376/// Fetch type for FETCH message (draft-08).
377#[derive(Debug, Clone, Copy, PartialEq, Eq)]
378#[repr(u64)]
379pub enum FetchType {
380    /// Standalone fetch with explicit track and range.
381    Standalone = 1,
382    /// Joining fetch referencing an existing subscription.
383    Joining = 2,
384}
385
386impl FetchType {
387    /// Convert a raw value to a `FetchType`, if valid.
388    pub fn from_u64(v: u64) -> Option<Self> {
389        match v {
390            1 => Some(FetchType::Standalone),
391            2 => Some(FetchType::Joining),
392            _ => None,
393        }
394    }
395}
396
397/// FETCH message (type 0x16).
398///
399/// Draft-08: has fetch_type (standalone vs joining).
400#[derive(Debug, Clone, PartialEq, Eq)]
401pub struct Fetch {
402    /// The subscribe ID for this fetch request.
403    pub subscribe_id: VarInt,
404    /// The priority of this subscriber relative to others.
405    pub subscriber_priority: u8,
406    /// The requested group delivery order.
407    pub group_order: GroupOrder,
408    /// The fetch type (standalone or joining).
409    pub fetch_type: FetchType,
410    /// Track namespace (standalone only).
411    pub track_namespace: Option<TrackNamespace>,
412    /// Track name (standalone only).
413    pub track_name: Option<Vec<u8>>,
414    /// Start group (standalone only).
415    pub start_group: Option<VarInt>,
416    /// Start object (standalone only).
417    pub start_object: Option<VarInt>,
418    /// End group (standalone only).
419    pub end_group: Option<VarInt>,
420    /// End object (standalone only).
421    pub end_object: Option<VarInt>,
422    /// Joining subscribe ID (joining only).
423    pub joining_subscribe_id: Option<VarInt>,
424    /// Preceding group offset (joining only).
425    pub preceding_group_offset: Option<VarInt>,
426    /// Fetch parameters.
427    pub parameters: Vec<KeyValuePair>,
428}
429
430/// FETCH_OK message (type 0x18).
431///
432/// Draft-08: largest_group_id and largest_object_id are always present.
433#[derive(Debug, Clone, PartialEq, Eq)]
434pub struct FetchOk {
435    /// The subscribe ID this response corresponds to.
436    pub subscribe_id: VarInt,
437    /// The group delivery order chosen by the publisher.
438    pub group_order: GroupOrder,
439    /// Whether this fetch reaches the end of the track (1 = yes).
440    pub end_of_track: u8,
441    /// The largest group ID available.
442    pub largest_group_id: VarInt,
443    /// The largest object ID available.
444    pub largest_object_id: VarInt,
445    /// Fetch OK parameters.
446    pub parameters: Vec<KeyValuePair>,
447}
448
449/// FETCH_ERROR message (type 0x19).
450#[derive(Debug, Clone, PartialEq, Eq)]
451pub struct FetchError {
452    /// The subscribe ID this error corresponds to.
453    pub subscribe_id: VarInt,
454    /// The error code indicating the reason for failure.
455    pub error_code: VarInt,
456    /// A human-readable reason for the error.
457    pub reason_phrase: Vec<u8>,
458}
459
460/// FETCH_CANCEL message (type 0x17).
461#[derive(Debug, Clone, PartialEq, Eq)]
462pub struct FetchCancel {
463    /// The subscribe ID for the fetch to cancel.
464    pub subscribe_id: VarInt,
465}
466
467// ============================================================
468// Unified Message Enum
469// ============================================================
470
471/// A decoded MoQT control message (draft-08).
472#[derive(Debug, Clone, PartialEq, Eq)]
473pub enum ControlMessage {
474    /// ClientSetup (type 0x40).
475    ClientSetup(ClientSetup),
476    /// ServerSetup (type 0x41).
477    ServerSetup(ServerSetup),
478    /// GoAway (type 0x10).
479    GoAway(GoAway),
480    /// MaxSubscribeId (type 0x15).
481    MaxSubscribeId(MaxSubscribeId),
482    /// SubscribesBlocked (type 0x1A).
483    SubscribesBlocked(SubscribesBlocked),
484    /// Subscribe (type 0x03).
485    Subscribe(Subscribe),
486    /// SubscribeOk (type 0x04).
487    SubscribeOk(SubscribeOk),
488    /// SubscribeError (type 0x05).
489    SubscribeError(SubscribeError),
490    /// SubscribeUpdate (type 0x02).
491    SubscribeUpdate(SubscribeUpdate),
492    /// SubscribeDone (type 0x0B).
493    SubscribeDone(SubscribeDone),
494    /// Unsubscribe (type 0x0A).
495    Unsubscribe(Unsubscribe),
496    /// Announce (type 0x06).
497    Announce(Announce),
498    /// AnnounceOk (type 0x07).
499    AnnounceOk(AnnounceOk),
500    /// AnnounceError (type 0x08).
501    AnnounceError(AnnounceError),
502    /// AnnounceCancel (type 0x0C).
503    AnnounceCancel(AnnounceCancel),
504    /// Unannounce (type 0x09).
505    Unannounce(Unannounce),
506    /// SubscribeAnnounces (type 0x11).
507    SubscribeAnnounces(SubscribeAnnounces),
508    /// SubscribeAnnouncesOk (type 0x12).
509    SubscribeAnnouncesOk(SubscribeAnnouncesOk),
510    /// SubscribeAnnouncesError (type 0x13).
511    SubscribeAnnouncesError(SubscribeAnnouncesError),
512    /// UnsubscribeAnnounces (type 0x14).
513    UnsubscribeAnnounces(UnsubscribeAnnounces),
514    /// TrackStatusRequest (type 0x0D).
515    TrackStatusRequest(TrackStatusRequest),
516    /// TrackStatus (type 0x0E).
517    TrackStatus(TrackStatus),
518    /// Fetch (type 0x16).
519    Fetch(Fetch),
520    /// FetchOk (type 0x18).
521    FetchOk(FetchOk),
522    /// FetchError (type 0x19).
523    FetchError(FetchError),
524    /// FetchCancel (type 0x17).
525    FetchCancel(FetchCancel),
526}
527
528impl ControlMessage {
529    /// Return the message type for this control message.
530    pub fn message_type(&self) -> MessageType {
531        match self {
532            ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
533            ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
534            ControlMessage::GoAway(_) => MessageType::GoAway,
535            ControlMessage::MaxSubscribeId(_) => MessageType::MaxSubscribeId,
536            ControlMessage::SubscribesBlocked(_) => MessageType::SubscribesBlocked,
537            ControlMessage::Subscribe(_) => MessageType::Subscribe,
538            ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
539            ControlMessage::SubscribeError(_) => MessageType::SubscribeError,
540            ControlMessage::SubscribeUpdate(_) => MessageType::SubscribeUpdate,
541            ControlMessage::SubscribeDone(_) => MessageType::SubscribeDone,
542            ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
543            ControlMessage::Announce(_) => MessageType::Announce,
544            ControlMessage::AnnounceOk(_) => MessageType::AnnounceOk,
545            ControlMessage::AnnounceError(_) => MessageType::AnnounceError,
546            ControlMessage::AnnounceCancel(_) => MessageType::AnnounceCancel,
547            ControlMessage::Unannounce(_) => MessageType::Unannounce,
548            ControlMessage::SubscribeAnnounces(_) => MessageType::SubscribeAnnounces,
549            ControlMessage::SubscribeAnnouncesOk(_) => MessageType::SubscribeAnnouncesOk,
550            ControlMessage::SubscribeAnnouncesError(_) => MessageType::SubscribeAnnouncesError,
551            ControlMessage::UnsubscribeAnnounces(_) => MessageType::UnsubscribeAnnounces,
552            ControlMessage::TrackStatusRequest(_) => MessageType::TrackStatusRequest,
553            ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
554            ControlMessage::Fetch(_) => MessageType::Fetch,
555            ControlMessage::FetchOk(_) => MessageType::FetchOk,
556            ControlMessage::FetchError(_) => MessageType::FetchError,
557            ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
558        }
559    }
560
561    /// Encode this control message (type ID + length prefix + payload).
562    pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
563        let mut payload = Vec::with_capacity(256);
564        self.encode_payload(&mut payload)?;
565
566        if payload.len() > MAX_MESSAGE_LENGTH {
567            return Err(CodecError::MessageTooLong(payload.len()));
568        }
569
570        VarInt::from_usize(self.message_type().id() as usize).encode(buf);
571        VarInt::from_usize(payload.len()).encode(buf);
572        buf.put_slice(&payload);
573        Ok(())
574    }
575
576    /// Decode a control message from bytes.
577    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
578        let type_id = VarInt::decode(buf)?.into_inner();
579        let msg_type =
580            MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
581        let payload_len = VarInt::decode(buf)?.into_inner() as usize;
582        if buf.remaining() < payload_len {
583            return Err(CodecError::UnexpectedEnd);
584        }
585        let payload_bytes = buf.copy_to_bytes(payload_len);
586        let mut payload = &payload_bytes[..];
587        Self::decode_payload(msg_type, &mut payload)
588    }
589
590    fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
591        match self {
592            ControlMessage::ClientSetup(m) => {
593                VarInt::from_usize(m.supported_versions.len()).encode(buf);
594                for v in &m.supported_versions {
595                    v.encode(buf);
596                }
597                KeyValuePair::encode_list_d07(&m.parameters, buf);
598            }
599            ControlMessage::ServerSetup(m) => {
600                m.selected_version.encode(buf);
601                KeyValuePair::encode_list_d07(&m.parameters, buf);
602            }
603            ControlMessage::GoAway(m) => {
604                if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
605                    return Err(CodecError::GoAwayUriTooLong);
606                }
607                VarInt::from_usize(m.new_session_uri.len()).encode(buf);
608                buf.put_slice(&m.new_session_uri);
609            }
610            ControlMessage::MaxSubscribeId(m) => {
611                m.subscribe_id.encode(buf);
612            }
613            ControlMessage::SubscribesBlocked(m) => {
614                m.maximum_subscribe_id.encode(buf);
615            }
616            ControlMessage::Subscribe(m) => {
617                m.subscribe_id.encode(buf);
618                m.track_alias.encode(buf);
619                m.track_namespace.encode(buf);
620                VarInt::from_usize(m.track_name.len()).encode(buf);
621                buf.put_slice(&m.track_name);
622                buf.put_u8(m.subscriber_priority);
623                buf.put_u8(m.group_order as u8);
624                VarInt::from_usize(m.filter_type as usize).encode(buf);
625                if let Some(loc) = &m.start_location {
626                    loc.encode(buf);
627                }
628                if let Some(eg) = &m.end_group {
629                    eg.encode(buf);
630                }
631                KeyValuePair::encode_list_d07(&m.parameters, buf);
632            }
633            ControlMessage::SubscribeOk(m) => {
634                m.subscribe_id.encode(buf);
635                m.expires.encode(buf);
636                buf.put_u8(m.group_order as u8);
637                buf.put_u8(m.content_exists as u8);
638                if let Some(gid) = &m.largest_group_id {
639                    gid.encode(buf);
640                }
641                if let Some(oid) = &m.largest_object_id {
642                    oid.encode(buf);
643                }
644                KeyValuePair::encode_list_d07(&m.parameters, buf);
645            }
646            ControlMessage::SubscribeError(m) => {
647                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
648                    return Err(CodecError::ReasonPhraseTooLong);
649                }
650                m.subscribe_id.encode(buf);
651                m.error_code.encode(buf);
652                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
653                buf.put_slice(&m.reason_phrase);
654                m.track_alias.encode(buf);
655            }
656            ControlMessage::SubscribeUpdate(m) => {
657                m.subscribe_id.encode(buf);
658                m.start_group.encode(buf);
659                m.start_object.encode(buf);
660                m.end_group.encode(buf);
661                buf.put_u8(m.subscriber_priority);
662                KeyValuePair::encode_list_d07(&m.parameters, buf);
663            }
664            ControlMessage::SubscribeDone(m) => {
665                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
666                    return Err(CodecError::ReasonPhraseTooLong);
667                }
668                m.subscribe_id.encode(buf);
669                m.status_code.encode(buf);
670                m.stream_count.encode(buf);
671                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
672                buf.put_slice(&m.reason_phrase);
673            }
674            ControlMessage::Unsubscribe(m) => {
675                m.subscribe_id.encode(buf);
676            }
677            ControlMessage::Announce(m) => {
678                m.track_namespace.encode(buf);
679                KeyValuePair::encode_list_d07(&m.parameters, buf);
680            }
681            ControlMessage::AnnounceOk(m) => {
682                m.track_namespace.encode(buf);
683            }
684            ControlMessage::AnnounceError(m) => {
685                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
686                    return Err(CodecError::ReasonPhraseTooLong);
687                }
688                m.track_namespace.encode(buf);
689                m.error_code.encode(buf);
690                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
691                buf.put_slice(&m.reason_phrase);
692            }
693            ControlMessage::AnnounceCancel(m) => {
694                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
695                    return Err(CodecError::ReasonPhraseTooLong);
696                }
697                m.track_namespace.encode(buf);
698                m.error_code.encode(buf);
699                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
700                buf.put_slice(&m.reason_phrase);
701            }
702            ControlMessage::Unannounce(m) => {
703                m.track_namespace.encode(buf);
704            }
705            ControlMessage::SubscribeAnnounces(m) => {
706                m.track_namespace_prefix.encode(buf);
707                KeyValuePair::encode_list_d07(&m.parameters, buf);
708            }
709            ControlMessage::SubscribeAnnouncesOk(m) => {
710                m.track_namespace_prefix.encode(buf);
711            }
712            ControlMessage::SubscribeAnnouncesError(m) => {
713                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
714                    return Err(CodecError::ReasonPhraseTooLong);
715                }
716                m.track_namespace_prefix.encode(buf);
717                m.error_code.encode(buf);
718                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
719                buf.put_slice(&m.reason_phrase);
720            }
721            ControlMessage::UnsubscribeAnnounces(m) => {
722                m.track_namespace_prefix.encode(buf);
723            }
724            ControlMessage::TrackStatusRequest(m) => {
725                m.track_namespace.encode(buf);
726                VarInt::from_usize(m.track_name.len()).encode(buf);
727                buf.put_slice(&m.track_name);
728            }
729            ControlMessage::TrackStatus(m) => {
730                m.track_namespace.encode(buf);
731                VarInt::from_usize(m.track_name.len()).encode(buf);
732                buf.put_slice(&m.track_name);
733                m.status_code.encode(buf);
734                m.last_group_id.encode(buf);
735                m.last_object_id.encode(buf);
736            }
737            ControlMessage::Fetch(m) => {
738                m.subscribe_id.encode(buf);
739                buf.put_u8(m.subscriber_priority);
740                buf.put_u8(m.group_order as u8);
741                VarInt::from_usize(m.fetch_type as usize).encode(buf);
742                match m.fetch_type {
743                    FetchType::Standalone => {
744                        if let Some(ns) = &m.track_namespace {
745                            ns.encode(buf);
746                        }
747                        if let Some(name) = &m.track_name {
748                            VarInt::from_usize(name.len()).encode(buf);
749                            buf.put_slice(name);
750                        }
751                        if let Some(sg) = &m.start_group {
752                            sg.encode(buf);
753                        }
754                        if let Some(so) = &m.start_object {
755                            so.encode(buf);
756                        }
757                        if let Some(eg) = &m.end_group {
758                            eg.encode(buf);
759                        }
760                        if let Some(eo) = &m.end_object {
761                            eo.encode(buf);
762                        }
763                    }
764                    FetchType::Joining => {
765                        if let Some(jsi) = &m.joining_subscribe_id {
766                            jsi.encode(buf);
767                        }
768                        if let Some(pgo) = &m.preceding_group_offset {
769                            pgo.encode(buf);
770                        }
771                    }
772                }
773                KeyValuePair::encode_list_d07(&m.parameters, buf);
774            }
775            ControlMessage::FetchOk(m) => {
776                m.subscribe_id.encode(buf);
777                buf.put_u8(m.group_order as u8);
778                buf.put_u8(m.end_of_track);
779                m.largest_group_id.encode(buf);
780                m.largest_object_id.encode(buf);
781                KeyValuePair::encode_list_d07(&m.parameters, buf);
782            }
783            ControlMessage::FetchError(m) => {
784                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
785                    return Err(CodecError::ReasonPhraseTooLong);
786                }
787                m.subscribe_id.encode(buf);
788                m.error_code.encode(buf);
789                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
790                buf.put_slice(&m.reason_phrase);
791            }
792            ControlMessage::FetchCancel(m) => {
793                m.subscribe_id.encode(buf);
794            }
795        }
796        Ok(())
797    }
798
799    fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
800        match msg_type {
801            MessageType::ClientSetup => {
802                let num_versions = VarInt::decode(buf)?.into_inner() as usize;
803                if num_versions == 0 {
804                    return Err(CodecError::InvalidField);
805                }
806                let mut supported_versions = Vec::with_capacity(num_versions);
807                for _ in 0..num_versions {
808                    supported_versions.push(VarInt::decode(buf)?);
809                }
810                let parameters = KeyValuePair::decode_list_d07(buf)?;
811                Ok(ControlMessage::ClientSetup(ClientSetup { supported_versions, parameters }))
812            }
813            MessageType::ServerSetup => {
814                let selected_version = VarInt::decode(buf)?;
815                let parameters = KeyValuePair::decode_list_d07(buf)?;
816                Ok(ControlMessage::ServerSetup(ServerSetup { selected_version, parameters }))
817            }
818            MessageType::GoAway => {
819                let uri_len = VarInt::decode(buf)?.into_inner() as usize;
820                let uri = read_bytes(buf, uri_len)?;
821                Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
822            }
823            MessageType::MaxSubscribeId => {
824                let subscribe_id = VarInt::decode(buf)?;
825                Ok(ControlMessage::MaxSubscribeId(MaxSubscribeId { subscribe_id }))
826            }
827            MessageType::SubscribesBlocked => {
828                let maximum_subscribe_id = VarInt::decode(buf)?;
829                Ok(ControlMessage::SubscribesBlocked(SubscribesBlocked { maximum_subscribe_id }))
830            }
831            MessageType::Subscribe => {
832                let subscribe_id = VarInt::decode(buf)?;
833                let track_alias = VarInt::decode(buf)?;
834                let track_namespace = TrackNamespace::decode(buf)?;
835                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
836                let track_name = read_bytes(buf, track_name_len)?;
837                if buf.remaining() < 2 {
838                    return Err(CodecError::UnexpectedEnd);
839                }
840                let subscriber_priority = buf.get_u8();
841                let group_order =
842                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
843                let filter_val = VarInt::decode(buf)?.into_inner();
844                let filter_type =
845                    FilterType::from_u64(filter_val).ok_or(CodecError::InvalidField)?;
846                let start_location = match filter_type {
847                    FilterType::AbsoluteStart | FilterType::AbsoluteRange => {
848                        Some(Location::decode(buf)?)
849                    }
850                    _ => None,
851                };
852                let end_group = match filter_type {
853                    FilterType::AbsoluteRange => Some(VarInt::decode(buf)?),
854                    _ => None,
855                };
856                let parameters = KeyValuePair::decode_list_d07(buf)?;
857                Ok(ControlMessage::Subscribe(Subscribe {
858                    subscribe_id,
859                    track_alias,
860                    track_namespace,
861                    track_name,
862                    subscriber_priority,
863                    group_order,
864                    filter_type,
865                    start_location,
866                    end_group,
867                    parameters,
868                }))
869            }
870            MessageType::SubscribeOk => {
871                let subscribe_id = VarInt::decode(buf)?;
872                let expires = VarInt::decode(buf)?;
873                if buf.remaining() < 2 {
874                    return Err(CodecError::UnexpectedEnd);
875                }
876                let group_order =
877                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
878                let content_exists_val = buf.get_u8();
879                let content_exists = match content_exists_val {
880                    0 => ContentExists::NoLargestLocation,
881                    1 => ContentExists::HasLargestLocation,
882                    _ => return Err(CodecError::InvalidField),
883                };
884                let (largest_group_id, largest_object_id) =
885                    if content_exists == ContentExists::HasLargestLocation {
886                        let gid = VarInt::decode(buf)?;
887                        let oid = VarInt::decode(buf)?;
888                        (Some(gid), Some(oid))
889                    } else {
890                        (None, None)
891                    };
892                let parameters = KeyValuePair::decode_list_d07(buf)?;
893                Ok(ControlMessage::SubscribeOk(SubscribeOk {
894                    subscribe_id,
895                    expires,
896                    group_order,
897                    content_exists,
898                    largest_group_id,
899                    largest_object_id,
900                    parameters,
901                }))
902            }
903            MessageType::SubscribeError => {
904                let subscribe_id = VarInt::decode(buf)?;
905                let error_code = VarInt::decode(buf)?;
906                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
907                let reason_phrase = read_bytes(buf, reason_len)?;
908                let track_alias = VarInt::decode(buf)?;
909                Ok(ControlMessage::SubscribeError(SubscribeError {
910                    subscribe_id,
911                    error_code,
912                    reason_phrase,
913                    track_alias,
914                }))
915            }
916            MessageType::SubscribeUpdate => {
917                let subscribe_id = VarInt::decode(buf)?;
918                let start_group = VarInt::decode(buf)?;
919                let start_object = VarInt::decode(buf)?;
920                let end_group = VarInt::decode(buf)?;
921                if buf.remaining() < 1 {
922                    return Err(CodecError::UnexpectedEnd);
923                }
924                let subscriber_priority = buf.get_u8();
925                let parameters = KeyValuePair::decode_list_d07(buf)?;
926                Ok(ControlMessage::SubscribeUpdate(SubscribeUpdate {
927                    subscribe_id,
928                    start_group,
929                    start_object,
930                    end_group,
931                    subscriber_priority,
932                    parameters,
933                }))
934            }
935            MessageType::SubscribeDone => {
936                let subscribe_id = VarInt::decode(buf)?;
937                let status_code = VarInt::decode(buf)?;
938                let stream_count = VarInt::decode(buf)?;
939                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
940                let reason_phrase = read_bytes(buf, reason_len)?;
941                Ok(ControlMessage::SubscribeDone(SubscribeDone {
942                    subscribe_id,
943                    status_code,
944                    stream_count,
945                    reason_phrase,
946                }))
947            }
948            MessageType::Unsubscribe => {
949                let subscribe_id = VarInt::decode(buf)?;
950                Ok(ControlMessage::Unsubscribe(Unsubscribe { subscribe_id }))
951            }
952            MessageType::Announce => {
953                let track_namespace = TrackNamespace::decode(buf)?;
954                let parameters = KeyValuePair::decode_list_d07(buf)?;
955                Ok(ControlMessage::Announce(Announce { track_namespace, parameters }))
956            }
957            MessageType::AnnounceOk => {
958                let track_namespace = TrackNamespace::decode(buf)?;
959                Ok(ControlMessage::AnnounceOk(AnnounceOk { track_namespace }))
960            }
961            MessageType::AnnounceError => {
962                let track_namespace = TrackNamespace::decode(buf)?;
963                let error_code = VarInt::decode(buf)?;
964                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
965                let reason_phrase = read_bytes(buf, reason_len)?;
966                Ok(ControlMessage::AnnounceError(AnnounceError {
967                    track_namespace,
968                    error_code,
969                    reason_phrase,
970                }))
971            }
972            MessageType::AnnounceCancel => {
973                let track_namespace = TrackNamespace::decode(buf)?;
974                let error_code = VarInt::decode(buf)?;
975                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
976                let reason_phrase = read_bytes(buf, reason_len)?;
977                Ok(ControlMessage::AnnounceCancel(AnnounceCancel {
978                    track_namespace,
979                    error_code,
980                    reason_phrase,
981                }))
982            }
983            MessageType::Unannounce => {
984                let track_namespace = TrackNamespace::decode(buf)?;
985                Ok(ControlMessage::Unannounce(Unannounce { track_namespace }))
986            }
987            MessageType::SubscribeAnnounces => {
988                let track_namespace_prefix = TrackNamespace::decode(buf)?;
989                let parameters = KeyValuePair::decode_list_d07(buf)?;
990                Ok(ControlMessage::SubscribeAnnounces(SubscribeAnnounces {
991                    track_namespace_prefix,
992                    parameters,
993                }))
994            }
995            MessageType::SubscribeAnnouncesOk => {
996                let track_namespace_prefix = TrackNamespace::decode(buf)?;
997                Ok(ControlMessage::SubscribeAnnouncesOk(SubscribeAnnouncesOk {
998                    track_namespace_prefix,
999                }))
1000            }
1001            MessageType::SubscribeAnnouncesError => {
1002                let track_namespace_prefix = TrackNamespace::decode(buf)?;
1003                let error_code = VarInt::decode(buf)?;
1004                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1005                let reason_phrase = read_bytes(buf, reason_len)?;
1006                Ok(ControlMessage::SubscribeAnnouncesError(SubscribeAnnouncesError {
1007                    track_namespace_prefix,
1008                    error_code,
1009                    reason_phrase,
1010                }))
1011            }
1012            MessageType::UnsubscribeAnnounces => {
1013                let track_namespace_prefix = TrackNamespace::decode(buf)?;
1014                Ok(ControlMessage::UnsubscribeAnnounces(UnsubscribeAnnounces {
1015                    track_namespace_prefix,
1016                }))
1017            }
1018            MessageType::TrackStatusRequest => {
1019                let track_namespace = TrackNamespace::decode(buf)?;
1020                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1021                let track_name = read_bytes(buf, track_name_len)?;
1022                Ok(ControlMessage::TrackStatusRequest(TrackStatusRequest {
1023                    track_namespace,
1024                    track_name,
1025                }))
1026            }
1027            MessageType::TrackStatus => {
1028                let track_namespace = TrackNamespace::decode(buf)?;
1029                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1030                let track_name = read_bytes(buf, track_name_len)?;
1031                let status_code = VarInt::decode(buf)?;
1032                let last_group_id = VarInt::decode(buf)?;
1033                let last_object_id = VarInt::decode(buf)?;
1034                Ok(ControlMessage::TrackStatus(TrackStatus {
1035                    track_namespace,
1036                    track_name,
1037                    status_code,
1038                    last_group_id,
1039                    last_object_id,
1040                }))
1041            }
1042            MessageType::Fetch => {
1043                let subscribe_id = VarInt::decode(buf)?;
1044                if buf.remaining() < 2 {
1045                    return Err(CodecError::UnexpectedEnd);
1046                }
1047                let subscriber_priority = buf.get_u8();
1048                let group_order =
1049                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1050                let fetch_type_val = VarInt::decode(buf)?.into_inner();
1051                let fetch_type =
1052                    FetchType::from_u64(fetch_type_val).ok_or(CodecError::InvalidField)?;
1053                let (
1054                    track_namespace,
1055                    track_name,
1056                    start_group,
1057                    start_object,
1058                    end_group,
1059                    end_object,
1060                    joining_subscribe_id,
1061                    preceding_group_offset,
1062                ) = match fetch_type {
1063                    FetchType::Standalone => {
1064                        let ns = TrackNamespace::decode(buf)?;
1065                        let name_len = VarInt::decode(buf)?.into_inner() as usize;
1066                        let name = read_bytes(buf, name_len)?;
1067                        let sg = VarInt::decode(buf)?;
1068                        let so = VarInt::decode(buf)?;
1069                        let eg = VarInt::decode(buf)?;
1070                        let eo = VarInt::decode(buf)?;
1071                        (Some(ns), Some(name), Some(sg), Some(so), Some(eg), Some(eo), None, None)
1072                    }
1073                    FetchType::Joining => {
1074                        let jsi = VarInt::decode(buf)?;
1075                        let pgo = VarInt::decode(buf)?;
1076                        (None, None, None, None, None, None, Some(jsi), Some(pgo))
1077                    }
1078                };
1079                let parameters = KeyValuePair::decode_list_d07(buf)?;
1080                Ok(ControlMessage::Fetch(Fetch {
1081                    subscribe_id,
1082                    subscriber_priority,
1083                    group_order,
1084                    fetch_type,
1085                    track_namespace,
1086                    track_name,
1087                    start_group,
1088                    start_object,
1089                    end_group,
1090                    end_object,
1091                    joining_subscribe_id,
1092                    preceding_group_offset,
1093                    parameters,
1094                }))
1095            }
1096            MessageType::FetchOk => {
1097                let subscribe_id = VarInt::decode(buf)?;
1098                if buf.remaining() < 2 {
1099                    return Err(CodecError::UnexpectedEnd);
1100                }
1101                let group_order =
1102                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1103                let end_of_track = buf.get_u8();
1104                let largest_group_id = VarInt::decode(buf)?;
1105                let largest_object_id = VarInt::decode(buf)?;
1106                let parameters = KeyValuePair::decode_list_d07(buf)?;
1107                Ok(ControlMessage::FetchOk(FetchOk {
1108                    subscribe_id,
1109                    group_order,
1110                    end_of_track,
1111                    largest_group_id,
1112                    largest_object_id,
1113                    parameters,
1114                }))
1115            }
1116            MessageType::FetchError => {
1117                let subscribe_id = VarInt::decode(buf)?;
1118                let error_code = VarInt::decode(buf)?;
1119                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1120                let reason_phrase = read_bytes(buf, reason_len)?;
1121                Ok(ControlMessage::FetchError(FetchError {
1122                    subscribe_id,
1123                    error_code,
1124                    reason_phrase,
1125                }))
1126            }
1127            MessageType::FetchCancel => {
1128                let subscribe_id = VarInt::decode(buf)?;
1129                Ok(ControlMessage::FetchCancel(FetchCancel { subscribe_id }))
1130            }
1131        }
1132    }
1133}