Skip to main content

moqtap_codec/draft11/
message.rs

1//! Draft-11 control message encoding and decoding.
2//!
3//! Key changes from draft-09/10:
4//! - Setup IDs: 0x40/0x41 -> 0x20/0x21
5//! - `subscribe_id` -> `request_id` throughout
6//! - MaxSubscribeId -> MaxRequestId, SubscribesBlocked -> RequestsBlocked
7//! - Subscribe gains `track_alias` and `forward` fields; group_order/forward/filter_type as VarInt
8//! - SubscribeOk: no track_alias; group_order/content_exists as VarInt
9//! - SubscribeError gains trailing `track_alias`
10//! - SubscribeDone gains `stream_count`
11//! - SubscribeUpdate uses start_group/start_object (not Location); forward as VarInt
12//! - Announce gains `request_id`; AnnounceOk/AnnounceError use `request_id`
13//! - AnnounceCancel: `namespace + error_code + reason_phrase`
14//! - SubscribeAnnounces gains `request_id`
15//! - TrackStatusRequest gains `request_id` and `parameters`
16//! - TrackStatus restructured: `request_id + status_code + largest_location + parameters`
17//! - Fetch restructured: 3 fetch types (Standalone, RelativeJoining, AbsoluteJoining)
18//! - FetchOk: group_order + end_of_track + end_location (no track_alias)
19//! - Uses even/odd KVP encoding (not d07 format)
20//! - Framing: type_id(vi) + payload_length(16) + payload
21
22pub use crate::error::{
23    CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_NAMESPACE_TUPLE_SIZE,
24    MAX_REASON_PHRASE_LENGTH,
25};
26use crate::kvp::KeyValuePair;
27use crate::types::{self, *};
28use crate::varint::VarInt;
29use bytes::{Buf, BufMut};
30
31/// Control message type IDs (draft-11).
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33#[repr(u64)]
34pub enum MessageType {
35    /// SubscribeUpdate (type 0x02).
36    SubscribeUpdate = 0x02,
37    /// Subscribe (type 0x03).
38    Subscribe = 0x03,
39    /// SubscribeOk (type 0x04).
40    SubscribeOk = 0x04,
41    /// SubscribeError (type 0x05).
42    SubscribeError = 0x05,
43    /// Announce (type 0x06).
44    Announce = 0x06,
45    /// AnnounceOk (type 0x07).
46    AnnounceOk = 0x07,
47    /// AnnounceError (type 0x08).
48    AnnounceError = 0x08,
49    /// Unannounce (type 0x09).
50    Unannounce = 0x09,
51    /// Unsubscribe (type 0x0A).
52    Unsubscribe = 0x0A,
53    /// SubscribeDone (type 0x0B).
54    SubscribeDone = 0x0B,
55    /// AnnounceCancel (type 0x0C).
56    AnnounceCancel = 0x0C,
57    /// TrackStatusRequest (type 0x0D).
58    TrackStatusRequest = 0x0D,
59    /// TrackStatus (type 0x0E).
60    TrackStatus = 0x0E,
61    /// GoAway (type 0x10).
62    GoAway = 0x10,
63    /// SubscribeAnnounces (type 0x11).
64    SubscribeAnnounces = 0x11,
65    /// SubscribeAnnouncesOk (type 0x12).
66    SubscribeAnnouncesOk = 0x12,
67    /// SubscribeAnnouncesError (type 0x13).
68    SubscribeAnnouncesError = 0x13,
69    /// UnsubscribeAnnounces (type 0x14).
70    UnsubscribeAnnounces = 0x14,
71    /// MaxRequestId (type 0x15).
72    MaxRequestId = 0x15,
73    /// Fetch (type 0x16).
74    Fetch = 0x16,
75    /// FetchCancel (type 0x17).
76    FetchCancel = 0x17,
77    /// FetchOk (type 0x18).
78    FetchOk = 0x18,
79    /// FetchError (type 0x19).
80    FetchError = 0x19,
81    /// RequestsBlocked (type 0x1A).
82    RequestsBlocked = 0x1A,
83    /// ClientSetup (type 0x20).
84    ClientSetup = 0x20,
85    /// ServerSetup (type 0x21).
86    ServerSetup = 0x21,
87}
88
89impl MessageType {
90    /// Look up a message type by its wire ID.
91    pub fn from_id(id: u64) -> Option<Self> {
92        match id {
93            0x02 => Some(MessageType::SubscribeUpdate),
94            0x03 => Some(MessageType::Subscribe),
95            0x04 => Some(MessageType::SubscribeOk),
96            0x05 => Some(MessageType::SubscribeError),
97            0x06 => Some(MessageType::Announce),
98            0x07 => Some(MessageType::AnnounceOk),
99            0x08 => Some(MessageType::AnnounceError),
100            0x09 => Some(MessageType::Unannounce),
101            0x0A => Some(MessageType::Unsubscribe),
102            0x0B => Some(MessageType::SubscribeDone),
103            0x0C => Some(MessageType::AnnounceCancel),
104            0x0D => Some(MessageType::TrackStatusRequest),
105            0x0E => Some(MessageType::TrackStatus),
106            0x10 => Some(MessageType::GoAway),
107            0x11 => Some(MessageType::SubscribeAnnounces),
108            0x12 => Some(MessageType::SubscribeAnnouncesOk),
109            0x13 => Some(MessageType::SubscribeAnnouncesError),
110            0x14 => Some(MessageType::UnsubscribeAnnounces),
111            0x15 => Some(MessageType::MaxRequestId),
112            0x16 => Some(MessageType::Fetch),
113            0x17 => Some(MessageType::FetchCancel),
114            0x18 => Some(MessageType::FetchOk),
115            0x19 => Some(MessageType::FetchError),
116            0x1A => Some(MessageType::RequestsBlocked),
117            0x20 => Some(MessageType::ClientSetup),
118            0x21 => Some(MessageType::ServerSetup),
119            _ => None,
120        }
121    }
122
123    /// Return the wire ID for this message type.
124    pub fn id(&self) -> u64 {
125        *self as u64
126    }
127}
128
129// ============================================================
130// Session Lifecycle Messages
131// ============================================================
132
133/// CLIENT_SETUP message (type 0x20).
134#[derive(Debug, Clone, PartialEq, Eq)]
135pub struct ClientSetup {
136    /// List of MoQT versions supported by the client.
137    pub supported_versions: Vec<VarInt>,
138    /// Setup parameters (even/odd KVP encoding).
139    pub parameters: Vec<KeyValuePair>,
140}
141
142/// SERVER_SETUP message (type 0x21).
143#[derive(Debug, Clone, PartialEq, Eq)]
144pub struct ServerSetup {
145    /// The MoQT version selected by the server.
146    pub selected_version: VarInt,
147    /// Setup parameters (even/odd KVP encoding).
148    pub parameters: Vec<KeyValuePair>,
149}
150
151/// GOAWAY message (type 0x10).
152#[derive(Debug, Clone, PartialEq, Eq)]
153pub struct GoAway {
154    /// URI for the new session to connect to.
155    pub new_session_uri: Vec<u8>,
156}
157
158/// MAX_REQUEST_ID message (type 0x15).
159#[derive(Debug, Clone, PartialEq, Eq)]
160pub struct MaxRequestId {
161    /// The maximum request ID the peer may use.
162    pub request_id: VarInt,
163}
164
165/// REQUESTS_BLOCKED message (type 0x1A).
166#[derive(Debug, Clone, PartialEq, Eq)]
167pub struct RequestsBlocked {
168    /// The request ID that is currently blocked on.
169    pub maximum_request_id: VarInt,
170}
171
172// ============================================================
173// Subscribe Messages
174// ============================================================
175
176/// SUBSCRIBE message (type 0x03).
177#[derive(Debug, Clone, PartialEq, Eq)]
178pub struct Subscribe {
179    /// The request ID for this subscription.
180    pub request_id: VarInt,
181    /// The track alias assigned by the subscriber.
182    pub track_alias: VarInt,
183    /// The track namespace.
184    pub track_namespace: TrackNamespace,
185    /// The track name within the namespace.
186    pub track_name: Vec<u8>,
187    /// Subscriber priority for this track.
188    pub subscriber_priority: u8,
189    /// Requested group delivery order (VarInt).
190    pub group_order: VarInt,
191    /// Whether to forward data on this subscription (VarInt).
192    pub forward: VarInt,
193    /// The filter type controlling which objects are delivered (VarInt).
194    pub filter_type: VarInt,
195    /// Present only for AbsoluteStart (3) and AbsoluteRange (4) filter types.
196    pub start_group: Option<VarInt>,
197    /// Present only for AbsoluteStart (3) and AbsoluteRange (4) filter types.
198    pub start_object: Option<VarInt>,
199    /// Present only for AbsoluteRange (4) filter type.
200    pub end_group: Option<VarInt>,
201    /// Subscribe parameters (even/odd KVP encoding).
202    pub parameters: Vec<KeyValuePair>,
203}
204
205/// SUBSCRIBE_OK message (type 0x04).
206#[derive(Debug, Clone, PartialEq, Eq)]
207pub struct SubscribeOk {
208    /// The request ID this response corresponds to.
209    pub request_id: VarInt,
210    /// Subscription expiry in milliseconds (0 = no expiry).
211    pub expires: VarInt,
212    /// The group delivery order chosen by the publisher (VarInt).
213    pub group_order: VarInt,
214    /// Whether the largest location is included (VarInt).
215    pub content_exists: VarInt,
216    /// Present only when content_exists != 0.
217    pub largest_location: Option<Location>,
218    /// Response parameters (even/odd KVP encoding).
219    pub parameters: Vec<KeyValuePair>,
220}
221
222/// SUBSCRIBE_ERROR message (type 0x05).
223#[derive(Debug, Clone, PartialEq, Eq)]
224pub struct SubscribeError {
225    /// The request ID this error corresponds to.
226    pub request_id: VarInt,
227    /// Application-defined error code.
228    pub error_code: VarInt,
229    /// Human-readable reason phrase.
230    pub reason_phrase: Vec<u8>,
231    /// The track alias.
232    pub track_alias: VarInt,
233}
234
235/// SUBSCRIBE_UPDATE message (type 0x02).
236#[derive(Debug, Clone, PartialEq, Eq)]
237pub struct SubscribeUpdate {
238    /// The request ID of the subscription to update.
239    pub request_id: VarInt,
240    /// Updated start group.
241    pub start_group: VarInt,
242    /// Updated start object.
243    pub start_object: VarInt,
244    /// Updated end group (0 = open-ended).
245    pub end_group: VarInt,
246    /// Updated subscriber priority.
247    pub subscriber_priority: u8,
248    /// Updated forward preference (VarInt).
249    pub forward: VarInt,
250    /// Updated parameters (even/odd KVP encoding).
251    pub parameters: Vec<KeyValuePair>,
252}
253
254/// SUBSCRIBE_DONE message (type 0x0B).
255#[derive(Debug, Clone, PartialEq, Eq)]
256pub struct SubscribeDone {
257    /// The request ID of the completed subscription.
258    pub request_id: VarInt,
259    /// Status code indicating the reason for completion.
260    pub status_code: VarInt,
261    /// The number of streams opened for this subscription.
262    pub stream_count: VarInt,
263    /// Human-readable reason phrase.
264    pub reason_phrase: Vec<u8>,
265}
266
267/// UNSUBSCRIBE message (type 0x0A).
268#[derive(Debug, Clone, PartialEq, Eq)]
269pub struct Unsubscribe {
270    /// The request ID of the subscription to cancel.
271    pub request_id: VarInt,
272}
273
274// ============================================================
275// Announce Messages
276// ============================================================
277
278/// ANNOUNCE message (type 0x06).
279#[derive(Debug, Clone, PartialEq, Eq)]
280pub struct Announce {
281    /// The request ID for this announcement.
282    pub request_id: VarInt,
283    /// The track namespace to announce.
284    pub track_namespace: TrackNamespace,
285    /// Announce parameters (even/odd KVP encoding).
286    pub parameters: Vec<KeyValuePair>,
287}
288
289/// ANNOUNCE_OK message (type 0x07).
290#[derive(Debug, Clone, PartialEq, Eq)]
291pub struct AnnounceOk {
292    /// The request ID this response corresponds to.
293    pub request_id: VarInt,
294}
295
296/// ANNOUNCE_ERROR message (type 0x08).
297#[derive(Debug, Clone, PartialEq, Eq)]
298pub struct AnnounceError {
299    /// The request ID this error corresponds to.
300    pub request_id: VarInt,
301    /// Application-defined error code.
302    pub error_code: VarInt,
303    /// Human-readable reason phrase.
304    pub reason_phrase: Vec<u8>,
305}
306
307/// ANNOUNCE_CANCEL message (type 0x0C).
308#[derive(Debug, Clone, PartialEq, Eq)]
309pub struct AnnounceCancel {
310    /// The track namespace being cancelled.
311    pub track_namespace: TrackNamespace,
312    /// Application-defined error code.
313    pub error_code: VarInt,
314    /// Human-readable reason phrase.
315    pub reason_phrase: Vec<u8>,
316}
317
318/// UNANNOUNCE message (type 0x09).
319#[derive(Debug, Clone, PartialEq, Eq)]
320pub struct Unannounce {
321    /// The track namespace to unannounce.
322    pub track_namespace: TrackNamespace,
323}
324
325// ============================================================
326// Subscribe Announces Messages
327// ============================================================
328
329/// SUBSCRIBE_ANNOUNCES message (type 0x11).
330#[derive(Debug, Clone, PartialEq, Eq)]
331pub struct SubscribeAnnounces {
332    /// The request ID for this subscription.
333    pub request_id: VarInt,
334    /// The track namespace prefix to subscribe to.
335    pub track_namespace_prefix: TrackNamespace,
336    /// Subscribe announces parameters (even/odd KVP encoding).
337    pub parameters: Vec<KeyValuePair>,
338}
339
340/// SUBSCRIBE_ANNOUNCES_OK message (type 0x12).
341#[derive(Debug, Clone, PartialEq, Eq)]
342pub struct SubscribeAnnouncesOk {
343    /// The request ID this response corresponds to.
344    pub request_id: VarInt,
345}
346
347/// SUBSCRIBE_ANNOUNCES_ERROR message (type 0x13).
348#[derive(Debug, Clone, PartialEq, Eq)]
349pub struct SubscribeAnnouncesError {
350    /// The request ID this error corresponds to.
351    pub request_id: VarInt,
352    /// Application-defined error code.
353    pub error_code: VarInt,
354    /// Human-readable reason phrase.
355    pub reason_phrase: Vec<u8>,
356}
357
358/// UNSUBSCRIBE_ANNOUNCES message (type 0x14).
359#[derive(Debug, Clone, PartialEq, Eq)]
360pub struct UnsubscribeAnnounces {
361    /// The track namespace prefix to unsubscribe from.
362    pub track_namespace_prefix: TrackNamespace,
363}
364
365// ============================================================
366// Track Status Messages
367// ============================================================
368
369/// TRACK_STATUS_REQUEST message (type 0x0D).
370#[derive(Debug, Clone, PartialEq, Eq)]
371pub struct TrackStatusRequest {
372    /// The request ID for this status query.
373    pub request_id: VarInt,
374    /// The track namespace to query.
375    pub track_namespace: TrackNamespace,
376    /// The track name within the namespace.
377    pub track_name: Vec<u8>,
378    /// Track status request parameters (even/odd KVP encoding).
379    pub parameters: Vec<KeyValuePair>,
380}
381
382/// TRACK_STATUS message (type 0x0E).
383#[derive(Debug, Clone, PartialEq, Eq)]
384pub struct TrackStatus {
385    /// The request ID this status corresponds to.
386    pub request_id: VarInt,
387    /// The track status code.
388    pub status_code: VarInt,
389    /// The largest location (always present in draft-11).
390    pub largest_location: Location,
391    /// Track status parameters (even/odd KVP encoding).
392    pub parameters: Vec<KeyValuePair>,
393}
394
395// ============================================================
396// Fetch Messages
397// ============================================================
398
399/// Fetch type discriminant.
400#[derive(Debug, Clone, Copy, PartialEq, Eq)]
401#[repr(u64)]
402pub enum FetchType {
403    /// Standalone fetch with full track + range.
404    Standalone = 1,
405    /// Joining fetch relative to a subscribe.
406    RelativeJoining = 2,
407    /// Joining fetch with absolute group start.
408    AbsoluteJoining = 3,
409}
410
411impl FetchType {
412    /// Convert a raw u64 to a `FetchType`, if valid.
413    pub fn from_u64(v: u64) -> Option<Self> {
414        match v {
415            1 => Some(FetchType::Standalone),
416            2 => Some(FetchType::RelativeJoining),
417            3 => Some(FetchType::AbsoluteJoining),
418            _ => None,
419        }
420    }
421}
422
423/// FETCH message (type 0x16).
424#[derive(Debug, Clone, PartialEq, Eq)]
425pub struct Fetch {
426    /// The request ID for this fetch.
427    pub request_id: VarInt,
428    /// Subscriber priority for delivery ordering.
429    pub subscriber_priority: u8,
430    /// Requested group delivery order (VarInt).
431    pub group_order: VarInt,
432    /// The fetch type discriminant.
433    pub fetch_type: FetchType,
434    /// Fetch-type-specific payload.
435    pub fetch_payload: FetchPayload,
436    /// Fetch parameters (even/odd KVP encoding).
437    pub parameters: Vec<KeyValuePair>,
438}
439
440/// Fetch-type-specific payload fields.
441#[derive(Debug, Clone, PartialEq, Eq)]
442pub enum FetchPayload {
443    /// Standalone fetch (type 1).
444    Standalone {
445        /// The track namespace.
446        track_namespace: TrackNamespace,
447        /// The track name.
448        track_name: Vec<u8>,
449        /// Start group ID.
450        start_group: VarInt,
451        /// Start object ID.
452        start_object: VarInt,
453        /// End group ID.
454        end_group: VarInt,
455        /// End object ID.
456        end_object: VarInt,
457    },
458    /// Joining fetch (types 2 and 3).
459    Joining {
460        /// The subscribe request ID to join.
461        joining_subscribe_id: VarInt,
462        /// The joining start (offset for relative, group for absolute).
463        joining_start: VarInt,
464    },
465}
466
467/// FETCH_OK message (type 0x18).
468#[derive(Debug, Clone, PartialEq, Eq)]
469pub struct FetchOk {
470    /// The request ID this response corresponds to.
471    pub request_id: VarInt,
472    /// The group delivery order chosen by the publisher (VarInt).
473    pub group_order: VarInt,
474    /// Whether the end of the track has been reached (VarInt).
475    pub end_of_track: VarInt,
476    /// The end location of the fetch response.
477    pub end_location: Location,
478    /// Response parameters (even/odd KVP encoding).
479    pub parameters: Vec<KeyValuePair>,
480}
481
482/// FETCH_ERROR message (type 0x19).
483#[derive(Debug, Clone, PartialEq, Eq)]
484pub struct FetchError {
485    /// The request ID this error corresponds to.
486    pub request_id: VarInt,
487    /// Application-defined error code.
488    pub error_code: VarInt,
489    /// Human-readable reason phrase.
490    pub reason_phrase: Vec<u8>,
491}
492
493/// FETCH_CANCEL message (type 0x17).
494#[derive(Debug, Clone, PartialEq, Eq)]
495pub struct FetchCancel {
496    /// The request ID of the fetch to cancel.
497    pub request_id: VarInt,
498}
499
500// ============================================================
501// Unified Message Enum
502// ============================================================
503
504/// A parsed MoQT control message (draft-11).
505#[derive(Debug, Clone, PartialEq, Eq)]
506pub enum ControlMessage {
507    /// ClientSetup (type 0x20).
508    ClientSetup(ClientSetup),
509    /// ServerSetup (type 0x21).
510    ServerSetup(ServerSetup),
511    /// GoAway (type 0x10).
512    GoAway(GoAway),
513    /// MaxRequestId (type 0x15).
514    MaxRequestId(MaxRequestId),
515    /// RequestsBlocked (type 0x1A).
516    RequestsBlocked(RequestsBlocked),
517    /// Subscribe (type 0x03).
518    Subscribe(Subscribe),
519    /// SubscribeOk (type 0x04).
520    SubscribeOk(SubscribeOk),
521    /// SubscribeError (type 0x05).
522    SubscribeError(SubscribeError),
523    /// SubscribeUpdate (type 0x02).
524    SubscribeUpdate(SubscribeUpdate),
525    /// SubscribeDone (type 0x0B).
526    SubscribeDone(SubscribeDone),
527    /// Unsubscribe (type 0x0A).
528    Unsubscribe(Unsubscribe),
529    /// Announce (type 0x06).
530    Announce(Announce),
531    /// AnnounceOk (type 0x07).
532    AnnounceOk(AnnounceOk),
533    /// AnnounceError (type 0x08).
534    AnnounceError(AnnounceError),
535    /// AnnounceCancel (type 0x0C).
536    AnnounceCancel(AnnounceCancel),
537    /// Unannounce (type 0x09).
538    Unannounce(Unannounce),
539    /// SubscribeAnnounces (type 0x11).
540    SubscribeAnnounces(SubscribeAnnounces),
541    /// SubscribeAnnouncesOk (type 0x12).
542    SubscribeAnnouncesOk(SubscribeAnnouncesOk),
543    /// SubscribeAnnouncesError (type 0x13).
544    SubscribeAnnouncesError(SubscribeAnnouncesError),
545    /// UnsubscribeAnnounces (type 0x14).
546    UnsubscribeAnnounces(UnsubscribeAnnounces),
547    /// TrackStatusRequest (type 0x0D).
548    TrackStatusRequest(TrackStatusRequest),
549    /// TrackStatus (type 0x0E).
550    TrackStatus(TrackStatus),
551    /// Fetch (type 0x16).
552    Fetch(Fetch),
553    /// FetchOk (type 0x18).
554    FetchOk(FetchOk),
555    /// FetchError (type 0x19).
556    FetchError(FetchError),
557    /// FetchCancel (type 0x17).
558    FetchCancel(FetchCancel),
559}
560
561impl ControlMessage {
562    /// Encode this control message to bytes (including type ID and length prefix).
563    ///
564    /// Draft-11 framing: type_id(vi) + payload_length(16) + payload.
565    pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
566        let mut payload = Vec::with_capacity(256);
567        self.encode_payload(&mut payload)?;
568
569        if payload.len() > MAX_MESSAGE_LENGTH {
570            return Err(CodecError::MessageTooLong(payload.len()));
571        }
572
573        VarInt::from_usize(self.message_type().id() as usize).encode(buf);
574        // Draft-11: 16-bit length (big-endian)
575        buf.put_u16(payload.len() as u16);
576        buf.put_slice(&payload);
577        Ok(())
578    }
579
580    /// Decode a control message from bytes (reads type ID and length prefix first).
581    ///
582    /// Draft-11 framing: type_id(vi) + payload_length(16) + payload.
583    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
584        let type_id = VarInt::decode(buf)?.into_inner();
585        let msg_type =
586            MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
587        // Draft-11: 16-bit length (big-endian)
588        if buf.remaining() < 2 {
589            return Err(CodecError::UnexpectedEnd);
590        }
591        let payload_len = buf.get_u16() as usize;
592        if buf.remaining() < payload_len {
593            return Err(CodecError::UnexpectedEnd);
594        }
595        let payload_bytes = buf.copy_to_bytes(payload_len);
596        let mut payload = &payload_bytes[..];
597        Self::decode_payload(msg_type, &mut payload)
598    }
599
600    fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
601        match self {
602            ControlMessage::ClientSetup(m) => {
603                VarInt::from_usize(m.supported_versions.len()).encode(buf);
604                for v in &m.supported_versions {
605                    v.encode(buf);
606                }
607                KeyValuePair::encode_list(&m.parameters, buf);
608            }
609            ControlMessage::ServerSetup(m) => {
610                m.selected_version.encode(buf);
611                KeyValuePair::encode_list(&m.parameters, buf);
612            }
613            ControlMessage::GoAway(m) => {
614                if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
615                    return Err(CodecError::GoAwayUriTooLong);
616                }
617                VarInt::from_usize(m.new_session_uri.len()).encode(buf);
618                buf.put_slice(&m.new_session_uri);
619            }
620            ControlMessage::MaxRequestId(m) => {
621                m.request_id.encode(buf);
622            }
623            ControlMessage::RequestsBlocked(m) => {
624                m.maximum_request_id.encode(buf);
625            }
626            ControlMessage::Subscribe(m) => {
627                m.request_id.encode(buf);
628                m.track_alias.encode(buf);
629                m.track_namespace.encode(buf);
630                VarInt::from_usize(m.track_name.len()).encode(buf);
631                buf.put_slice(&m.track_name);
632                buf.put_u8(m.subscriber_priority);
633                m.group_order.encode(buf);
634                m.forward.encode(buf);
635                m.filter_type.encode(buf);
636                if let Some(sg) = &m.start_group {
637                    sg.encode(buf);
638                }
639                if let Some(so) = &m.start_object {
640                    so.encode(buf);
641                }
642                if let Some(eg) = &m.end_group {
643                    eg.encode(buf);
644                }
645                KeyValuePair::encode_list(&m.parameters, buf);
646            }
647            ControlMessage::SubscribeOk(m) => {
648                m.request_id.encode(buf);
649                m.expires.encode(buf);
650                m.group_order.encode(buf);
651                m.content_exists.encode(buf);
652                if let Some(loc) = &m.largest_location {
653                    loc.encode(buf);
654                }
655                KeyValuePair::encode_list(&m.parameters, buf);
656            }
657            ControlMessage::SubscribeError(m) => {
658                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
659                    return Err(CodecError::ReasonPhraseTooLong);
660                }
661                m.request_id.encode(buf);
662                m.error_code.encode(buf);
663                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
664                buf.put_slice(&m.reason_phrase);
665                m.track_alias.encode(buf);
666            }
667            ControlMessage::SubscribeUpdate(m) => {
668                m.request_id.encode(buf);
669                m.start_group.encode(buf);
670                m.start_object.encode(buf);
671                m.end_group.encode(buf);
672                buf.put_u8(m.subscriber_priority);
673                m.forward.encode(buf);
674                KeyValuePair::encode_list(&m.parameters, buf);
675            }
676            ControlMessage::SubscribeDone(m) => {
677                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
678                    return Err(CodecError::ReasonPhraseTooLong);
679                }
680                m.request_id.encode(buf);
681                m.status_code.encode(buf);
682                m.stream_count.encode(buf);
683                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
684                buf.put_slice(&m.reason_phrase);
685            }
686            ControlMessage::Unsubscribe(m) => {
687                m.request_id.encode(buf);
688            }
689            ControlMessage::Announce(m) => {
690                m.request_id.encode(buf);
691                m.track_namespace.encode(buf);
692                KeyValuePair::encode_list(&m.parameters, buf);
693            }
694            ControlMessage::AnnounceOk(m) => {
695                m.request_id.encode(buf);
696            }
697            ControlMessage::AnnounceError(m) => {
698                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
699                    return Err(CodecError::ReasonPhraseTooLong);
700                }
701                m.request_id.encode(buf);
702                m.error_code.encode(buf);
703                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
704                buf.put_slice(&m.reason_phrase);
705            }
706            ControlMessage::AnnounceCancel(m) => {
707                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
708                    return Err(CodecError::ReasonPhraseTooLong);
709                }
710                m.track_namespace.encode(buf);
711                m.error_code.encode(buf);
712                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
713                buf.put_slice(&m.reason_phrase);
714            }
715            ControlMessage::Unannounce(m) => {
716                m.track_namespace.encode(buf);
717            }
718            ControlMessage::SubscribeAnnounces(m) => {
719                m.request_id.encode(buf);
720                m.track_namespace_prefix.encode(buf);
721                KeyValuePair::encode_list(&m.parameters, buf);
722            }
723            ControlMessage::SubscribeAnnouncesOk(m) => {
724                m.request_id.encode(buf);
725            }
726            ControlMessage::SubscribeAnnouncesError(m) => {
727                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
728                    return Err(CodecError::ReasonPhraseTooLong);
729                }
730                m.request_id.encode(buf);
731                m.error_code.encode(buf);
732                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
733                buf.put_slice(&m.reason_phrase);
734            }
735            ControlMessage::UnsubscribeAnnounces(m) => {
736                m.track_namespace_prefix.encode(buf);
737            }
738            ControlMessage::TrackStatusRequest(m) => {
739                m.request_id.encode(buf);
740                m.track_namespace.encode(buf);
741                VarInt::from_usize(m.track_name.len()).encode(buf);
742                buf.put_slice(&m.track_name);
743                KeyValuePair::encode_list(&m.parameters, buf);
744            }
745            ControlMessage::TrackStatus(m) => {
746                m.request_id.encode(buf);
747                m.status_code.encode(buf);
748                m.largest_location.encode(buf);
749                KeyValuePair::encode_list(&m.parameters, buf);
750            }
751            ControlMessage::Fetch(m) => {
752                m.request_id.encode(buf);
753                buf.put_u8(m.subscriber_priority);
754                m.group_order.encode(buf);
755                VarInt::from_usize(m.fetch_type as usize).encode(buf);
756                match &m.fetch_payload {
757                    FetchPayload::Standalone {
758                        track_namespace,
759                        track_name,
760                        start_group,
761                        start_object,
762                        end_group,
763                        end_object,
764                    } => {
765                        track_namespace.encode(buf);
766                        VarInt::from_usize(track_name.len()).encode(buf);
767                        buf.put_slice(track_name);
768                        start_group.encode(buf);
769                        start_object.encode(buf);
770                        end_group.encode(buf);
771                        end_object.encode(buf);
772                    }
773                    FetchPayload::Joining { joining_subscribe_id, joining_start } => {
774                        joining_subscribe_id.encode(buf);
775                        joining_start.encode(buf);
776                    }
777                }
778                KeyValuePair::encode_list(&m.parameters, buf);
779            }
780            ControlMessage::FetchOk(m) => {
781                m.request_id.encode(buf);
782                m.group_order.encode(buf);
783                m.end_of_track.encode(buf);
784                m.end_location.encode(buf);
785                KeyValuePair::encode_list(&m.parameters, buf);
786            }
787            ControlMessage::FetchError(m) => {
788                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
789                    return Err(CodecError::ReasonPhraseTooLong);
790                }
791                m.request_id.encode(buf);
792                m.error_code.encode(buf);
793                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
794                buf.put_slice(&m.reason_phrase);
795            }
796            ControlMessage::FetchCancel(m) => {
797                m.request_id.encode(buf);
798            }
799        }
800        Ok(())
801    }
802
803    fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
804        match msg_type {
805            MessageType::ClientSetup => {
806                let num_versions = VarInt::decode(buf)?.into_inner() as usize;
807                if num_versions == 0 {
808                    return Err(CodecError::InvalidField);
809                }
810                let mut supported_versions = Vec::with_capacity(num_versions);
811                for _ in 0..num_versions {
812                    supported_versions.push(VarInt::decode(buf)?);
813                }
814                let parameters = KeyValuePair::decode_list(buf)?;
815                Ok(ControlMessage::ClientSetup(ClientSetup { supported_versions, parameters }))
816            }
817            MessageType::ServerSetup => {
818                let selected_version = VarInt::decode(buf)?;
819                let parameters = KeyValuePair::decode_list(buf)?;
820                Ok(ControlMessage::ServerSetup(ServerSetup { selected_version, parameters }))
821            }
822            MessageType::GoAway => {
823                let uri_len = VarInt::decode(buf)?.into_inner() as usize;
824                let uri = types::read_bytes(buf, uri_len)?;
825                Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
826            }
827            MessageType::MaxRequestId => {
828                let request_id = VarInt::decode(buf)?;
829                Ok(ControlMessage::MaxRequestId(MaxRequestId { request_id }))
830            }
831            MessageType::RequestsBlocked => {
832                let maximum_request_id = VarInt::decode(buf)?;
833                Ok(ControlMessage::RequestsBlocked(RequestsBlocked { maximum_request_id }))
834            }
835            MessageType::Subscribe => {
836                let request_id = VarInt::decode(buf)?;
837                let track_alias = VarInt::decode(buf)?;
838                let track_namespace = TrackNamespace::decode(buf)?;
839                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
840                let track_name = types::read_bytes(buf, track_name_len)?;
841                if buf.remaining() < 1 {
842                    return Err(CodecError::UnexpectedEnd);
843                }
844                let subscriber_priority = buf.get_u8();
845                let group_order = VarInt::decode(buf)?;
846                let forward = VarInt::decode(buf)?;
847                let filter_type = VarInt::decode(buf)?;
848                let ft_val = filter_type.into_inner();
849                if ft_val == 0 || ft_val > 4 {
850                    return Err(CodecError::InvalidField);
851                }
852                let (start_group, start_object) = if ft_val == 3 || ft_val == 4 {
853                    (Some(VarInt::decode(buf)?), Some(VarInt::decode(buf)?))
854                } else {
855                    (None, None)
856                };
857                let end_group = if ft_val == 4 { Some(VarInt::decode(buf)?) } else { None };
858                let parameters = KeyValuePair::decode_list(buf)?;
859                Ok(ControlMessage::Subscribe(Subscribe {
860                    request_id,
861                    track_alias,
862                    track_namespace,
863                    track_name,
864                    subscriber_priority,
865                    group_order,
866                    forward,
867                    filter_type,
868                    start_group,
869                    start_object,
870                    end_group,
871                    parameters,
872                }))
873            }
874            MessageType::SubscribeOk => {
875                let request_id = VarInt::decode(buf)?;
876                let expires = VarInt::decode(buf)?;
877                let group_order = VarInt::decode(buf)?;
878                let content_exists = VarInt::decode(buf)?;
879                let largest_location = if content_exists.into_inner() != 0 {
880                    Some(Location::decode(buf)?)
881                } else {
882                    None
883                };
884                let parameters = KeyValuePair::decode_list(buf)?;
885                Ok(ControlMessage::SubscribeOk(SubscribeOk {
886                    request_id,
887                    expires,
888                    group_order,
889                    content_exists,
890                    largest_location,
891                    parameters,
892                }))
893            }
894            MessageType::SubscribeError => {
895                let request_id = VarInt::decode(buf)?;
896                let error_code = VarInt::decode(buf)?;
897                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
898                let reason_phrase = types::read_bytes(buf, reason_len)?;
899                let track_alias = VarInt::decode(buf)?;
900                Ok(ControlMessage::SubscribeError(SubscribeError {
901                    request_id,
902                    error_code,
903                    reason_phrase,
904                    track_alias,
905                }))
906            }
907            MessageType::SubscribeUpdate => {
908                let request_id = VarInt::decode(buf)?;
909                let start_group = VarInt::decode(buf)?;
910                let start_object = VarInt::decode(buf)?;
911                let end_group = VarInt::decode(buf)?;
912                if buf.remaining() < 1 {
913                    return Err(CodecError::UnexpectedEnd);
914                }
915                let subscriber_priority = buf.get_u8();
916                let forward = VarInt::decode(buf)?;
917                let parameters = KeyValuePair::decode_list(buf)?;
918                Ok(ControlMessage::SubscribeUpdate(SubscribeUpdate {
919                    request_id,
920                    start_group,
921                    start_object,
922                    end_group,
923                    subscriber_priority,
924                    forward,
925                    parameters,
926                }))
927            }
928            MessageType::SubscribeDone => {
929                let request_id = VarInt::decode(buf)?;
930                let status_code = VarInt::decode(buf)?;
931                let stream_count = VarInt::decode(buf)?;
932                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
933                let reason_phrase = types::read_bytes(buf, reason_len)?;
934                Ok(ControlMessage::SubscribeDone(SubscribeDone {
935                    request_id,
936                    status_code,
937                    stream_count,
938                    reason_phrase,
939                }))
940            }
941            MessageType::Unsubscribe => {
942                let request_id = VarInt::decode(buf)?;
943                Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
944            }
945            MessageType::Announce => {
946                let request_id = VarInt::decode(buf)?;
947                let track_namespace = TrackNamespace::decode(buf)?;
948                let parameters = KeyValuePair::decode_list(buf)?;
949                Ok(ControlMessage::Announce(Announce { request_id, track_namespace, parameters }))
950            }
951            MessageType::AnnounceOk => {
952                let request_id = VarInt::decode(buf)?;
953                Ok(ControlMessage::AnnounceOk(AnnounceOk { request_id }))
954            }
955            MessageType::AnnounceError => {
956                let request_id = VarInt::decode(buf)?;
957                let error_code = VarInt::decode(buf)?;
958                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
959                let reason_phrase = types::read_bytes(buf, reason_len)?;
960                Ok(ControlMessage::AnnounceError(AnnounceError {
961                    request_id,
962                    error_code,
963                    reason_phrase,
964                }))
965            }
966            MessageType::AnnounceCancel => {
967                let track_namespace = TrackNamespace::decode(buf)?;
968                let error_code = VarInt::decode(buf)?;
969                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
970                let reason_phrase = types::read_bytes(buf, reason_len)?;
971                Ok(ControlMessage::AnnounceCancel(AnnounceCancel {
972                    track_namespace,
973                    error_code,
974                    reason_phrase,
975                }))
976            }
977            MessageType::Unannounce => {
978                let track_namespace = TrackNamespace::decode(buf)?;
979                Ok(ControlMessage::Unannounce(Unannounce { track_namespace }))
980            }
981            MessageType::SubscribeAnnounces => {
982                let request_id = VarInt::decode(buf)?;
983                let track_namespace_prefix = TrackNamespace::decode(buf)?;
984                let parameters = KeyValuePair::decode_list(buf)?;
985                Ok(ControlMessage::SubscribeAnnounces(SubscribeAnnounces {
986                    request_id,
987                    track_namespace_prefix,
988                    parameters,
989                }))
990            }
991            MessageType::SubscribeAnnouncesOk => {
992                let request_id = VarInt::decode(buf)?;
993                Ok(ControlMessage::SubscribeAnnouncesOk(SubscribeAnnouncesOk { request_id }))
994            }
995            MessageType::SubscribeAnnouncesError => {
996                let request_id = VarInt::decode(buf)?;
997                let error_code = VarInt::decode(buf)?;
998                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
999                let reason_phrase = types::read_bytes(buf, reason_len)?;
1000                Ok(ControlMessage::SubscribeAnnouncesError(SubscribeAnnouncesError {
1001                    request_id,
1002                    error_code,
1003                    reason_phrase,
1004                }))
1005            }
1006            MessageType::UnsubscribeAnnounces => {
1007                let track_namespace_prefix = TrackNamespace::decode(buf)?;
1008                Ok(ControlMessage::UnsubscribeAnnounces(UnsubscribeAnnounces {
1009                    track_namespace_prefix,
1010                }))
1011            }
1012            MessageType::TrackStatusRequest => {
1013                let request_id = VarInt::decode(buf)?;
1014                let track_namespace = TrackNamespace::decode(buf)?;
1015                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1016                let track_name = types::read_bytes(buf, track_name_len)?;
1017                let parameters = KeyValuePair::decode_list(buf)?;
1018                Ok(ControlMessage::TrackStatusRequest(TrackStatusRequest {
1019                    request_id,
1020                    track_namespace,
1021                    track_name,
1022                    parameters,
1023                }))
1024            }
1025            MessageType::TrackStatus => {
1026                let request_id = VarInt::decode(buf)?;
1027                let status_code = VarInt::decode(buf)?;
1028                let largest_location = Location::decode(buf)?;
1029                let parameters = KeyValuePair::decode_list(buf)?;
1030                Ok(ControlMessage::TrackStatus(TrackStatus {
1031                    request_id,
1032                    status_code,
1033                    largest_location,
1034                    parameters,
1035                }))
1036            }
1037            MessageType::Fetch => {
1038                let request_id = VarInt::decode(buf)?;
1039                if buf.remaining() < 1 {
1040                    return Err(CodecError::UnexpectedEnd);
1041                }
1042                let subscriber_priority = buf.get_u8();
1043                let group_order = VarInt::decode(buf)?;
1044                let fetch_type_val = VarInt::decode(buf)?.into_inner();
1045                let fetch_type =
1046                    FetchType::from_u64(fetch_type_val).ok_or(CodecError::InvalidField)?;
1047                let fetch_payload = match fetch_type {
1048                    FetchType::Standalone => {
1049                        let track_namespace = TrackNamespace::decode(buf)?;
1050                        let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1051                        let track_name = types::read_bytes(buf, track_name_len)?;
1052                        let start_group = VarInt::decode(buf)?;
1053                        let start_object = VarInt::decode(buf)?;
1054                        let end_group = VarInt::decode(buf)?;
1055                        let end_object = VarInt::decode(buf)?;
1056                        FetchPayload::Standalone {
1057                            track_namespace,
1058                            track_name,
1059                            start_group,
1060                            start_object,
1061                            end_group,
1062                            end_object,
1063                        }
1064                    }
1065                    FetchType::RelativeJoining | FetchType::AbsoluteJoining => {
1066                        let joining_subscribe_id = VarInt::decode(buf)?;
1067                        let joining_start = VarInt::decode(buf)?;
1068                        FetchPayload::Joining { joining_subscribe_id, joining_start }
1069                    }
1070                };
1071                let parameters = KeyValuePair::decode_list(buf)?;
1072                Ok(ControlMessage::Fetch(Fetch {
1073                    request_id,
1074                    subscriber_priority,
1075                    group_order,
1076                    fetch_type,
1077                    fetch_payload,
1078                    parameters,
1079                }))
1080            }
1081            MessageType::FetchOk => {
1082                let request_id = VarInt::decode(buf)?;
1083                let group_order = VarInt::decode(buf)?;
1084                let end_of_track = VarInt::decode(buf)?;
1085                let end_location = Location::decode(buf)?;
1086                let parameters = KeyValuePair::decode_list(buf)?;
1087                Ok(ControlMessage::FetchOk(FetchOk {
1088                    request_id,
1089                    group_order,
1090                    end_of_track,
1091                    end_location,
1092                    parameters,
1093                }))
1094            }
1095            MessageType::FetchError => {
1096                let request_id = VarInt::decode(buf)?;
1097                let error_code = VarInt::decode(buf)?;
1098                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1099                let reason_phrase = types::read_bytes(buf, reason_len)?;
1100                Ok(ControlMessage::FetchError(FetchError { request_id, error_code, reason_phrase }))
1101            }
1102            MessageType::FetchCancel => {
1103                let request_id = VarInt::decode(buf)?;
1104                Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
1105            }
1106        }
1107    }
1108
1109    /// Get the message type ID for this message.
1110    pub fn message_type(&self) -> MessageType {
1111        match self {
1112            ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
1113            ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
1114            ControlMessage::GoAway(_) => MessageType::GoAway,
1115            ControlMessage::MaxRequestId(_) => MessageType::MaxRequestId,
1116            ControlMessage::RequestsBlocked(_) => MessageType::RequestsBlocked,
1117            ControlMessage::Subscribe(_) => MessageType::Subscribe,
1118            ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
1119            ControlMessage::SubscribeError(_) => MessageType::SubscribeError,
1120            ControlMessage::SubscribeUpdate(_) => MessageType::SubscribeUpdate,
1121            ControlMessage::SubscribeDone(_) => MessageType::SubscribeDone,
1122            ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
1123            ControlMessage::Announce(_) => MessageType::Announce,
1124            ControlMessage::AnnounceOk(_) => MessageType::AnnounceOk,
1125            ControlMessage::AnnounceError(_) => MessageType::AnnounceError,
1126            ControlMessage::AnnounceCancel(_) => MessageType::AnnounceCancel,
1127            ControlMessage::Unannounce(_) => MessageType::Unannounce,
1128            ControlMessage::SubscribeAnnounces(_) => MessageType::SubscribeAnnounces,
1129            ControlMessage::SubscribeAnnouncesOk(_) => MessageType::SubscribeAnnouncesOk,
1130            ControlMessage::SubscribeAnnouncesError(_) => MessageType::SubscribeAnnouncesError,
1131            ControlMessage::UnsubscribeAnnounces(_) => MessageType::UnsubscribeAnnounces,
1132            ControlMessage::TrackStatusRequest(_) => MessageType::TrackStatusRequest,
1133            ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
1134            ControlMessage::Fetch(_) => MessageType::Fetch,
1135            ControlMessage::FetchOk(_) => MessageType::FetchOk,
1136            ControlMessage::FetchError(_) => MessageType::FetchError,
1137            ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
1138        }
1139    }
1140}