Skip to main content

moqtap_codec/draft14/
message.rs

1pub use crate::error::{
2    CodecError, MAX_FULL_TRACK_NAME_LENGTH, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH,
3    MAX_NAMESPACE_TUPLE_SIZE, MAX_REASON_PHRASE_LENGTH,
4};
5use crate::kvp::KeyValuePair;
6use crate::types::*;
7use crate::varint::VarInt;
8use bytes::{Buf, BufMut};
9
10/// Control message type IDs (draft-14).
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12#[repr(u64)]
13pub enum MessageType {
14    /// SubscribeUpdate (type 0x02).
15    SubscribeUpdate = 0x02,
16    /// Subscribe (type 0x03).
17    Subscribe = 0x03,
18    /// SubscribeOk (type 0x04).
19    SubscribeOk = 0x04,
20    /// SubscribeError (type 0x05).
21    SubscribeError = 0x05,
22    /// PublishNamespace (type 0x06).
23    PublishNamespace = 0x06,
24    /// PublishNamespaceOk (type 0x07).
25    PublishNamespaceOk = 0x07,
26    /// PublishNamespaceError (type 0x08).
27    PublishNamespaceError = 0x08,
28    /// PublishNamespaceDone (type 0x09).
29    PublishNamespaceDone = 0x09,
30    /// Unsubscribe (type 0x0A).
31    Unsubscribe = 0x0A,
32    /// PublishDone (type 0x0B).
33    PublishDone = 0x0B,
34    /// PublishNamespaceCancel (type 0x0C).
35    PublishNamespaceCancel = 0x0C,
36    /// TrackStatus (type 0x0D).
37    TrackStatus = 0x0D,
38    /// TrackStatusOk (type 0x0E).
39    TrackStatusOk = 0x0E,
40    /// TrackStatusError (type 0x0F).
41    TrackStatusError = 0x0F,
42    /// GoAway (type 0x10).
43    GoAway = 0x10,
44    /// SubscribeNamespace (type 0x11).
45    SubscribeNamespace = 0x11,
46    /// SubscribeNamespaceOk (type 0x12).
47    SubscribeNamespaceOk = 0x12,
48    /// SubscribeNamespaceError (type 0x13).
49    SubscribeNamespaceError = 0x13,
50    /// UnsubscribeNamespace (type 0x14).
51    UnsubscribeNamespace = 0x14,
52    /// MaxRequestId (type 0x15).
53    MaxRequestId = 0x15,
54    /// Fetch (type 0x16).
55    Fetch = 0x16,
56    /// FetchCancel (type 0x17).
57    FetchCancel = 0x17,
58    /// FetchOk (type 0x18).
59    FetchOk = 0x18,
60    /// FetchError (type 0x19).
61    FetchError = 0x19,
62    /// RequestsBlocked (type 0x1A).
63    RequestsBlocked = 0x1A,
64    /// Publish (type 0x1D).
65    Publish = 0x1D,
66    /// PublishOk (type 0x1E).
67    PublishOk = 0x1E,
68    /// PublishError (type 0x1F).
69    PublishError = 0x1F,
70    /// ClientSetup (type 0x20).
71    ClientSetup = 0x20,
72    /// ServerSetup (type 0x21).
73    ServerSetup = 0x21,
74}
75
76impl MessageType {
77    /// Look up a message type by its wire ID.
78    pub fn from_id(id: u64) -> Option<Self> {
79        match id {
80            0x02 => Some(MessageType::SubscribeUpdate),
81            0x03 => Some(MessageType::Subscribe),
82            0x04 => Some(MessageType::SubscribeOk),
83            0x05 => Some(MessageType::SubscribeError),
84            0x06 => Some(MessageType::PublishNamespace),
85            0x07 => Some(MessageType::PublishNamespaceOk),
86            0x08 => Some(MessageType::PublishNamespaceError),
87            0x09 => Some(MessageType::PublishNamespaceDone),
88            0x0A => Some(MessageType::Unsubscribe),
89            0x0B => Some(MessageType::PublishDone),
90            0x0C => Some(MessageType::PublishNamespaceCancel),
91            0x0D => Some(MessageType::TrackStatus),
92            0x0E => Some(MessageType::TrackStatusOk),
93            0x0F => Some(MessageType::TrackStatusError),
94            0x10 => Some(MessageType::GoAway),
95            0x11 => Some(MessageType::SubscribeNamespace),
96            0x12 => Some(MessageType::SubscribeNamespaceOk),
97            0x13 => Some(MessageType::SubscribeNamespaceError),
98            0x14 => Some(MessageType::UnsubscribeNamespace),
99            0x15 => Some(MessageType::MaxRequestId),
100            0x16 => Some(MessageType::Fetch),
101            0x17 => Some(MessageType::FetchCancel),
102            0x18 => Some(MessageType::FetchOk),
103            0x19 => Some(MessageType::FetchError),
104            0x1A => Some(MessageType::RequestsBlocked),
105            0x1D => Some(MessageType::Publish),
106            0x1E => Some(MessageType::PublishOk),
107            0x1F => Some(MessageType::PublishError),
108            0x20 => Some(MessageType::ClientSetup),
109            0x21 => Some(MessageType::ServerSetup),
110            _ => None,
111        }
112    }
113
114    /// Return the wire ID for this message type.
115    pub fn id(&self) -> u64 {
116        *self as u64
117    }
118}
119
120// ============================================================
121// Session Lifecycle Messages
122// ============================================================
123
124/// CLIENT_SETUP message (type 0x20).
125#[derive(Debug, Clone, PartialEq, Eq)]
126pub struct ClientSetup {
127    /// List of MoQT versions supported by the client.
128    pub supported_versions: Vec<VarInt>,
129    /// Setup parameters.
130    pub parameters: Vec<KeyValuePair>,
131}
132
133/// SERVER_SETUP message (type 0x21).
134#[derive(Debug, Clone, PartialEq, Eq)]
135pub struct ServerSetup {
136    /// The MoQT version selected by the server.
137    pub selected_version: VarInt,
138    /// Setup parameters.
139    pub parameters: Vec<KeyValuePair>,
140}
141
142/// GOAWAY message (type 0x10).
143#[derive(Debug, Clone, PartialEq, Eq)]
144pub struct GoAway {
145    /// URI for the new session to connect to.
146    pub new_session_uri: Vec<u8>,
147}
148
149/// MAX_REQUEST_ID message (type 0x15).
150#[derive(Debug, Clone, PartialEq, Eq)]
151pub struct MaxRequestId {
152    /// The maximum request ID the peer may use.
153    pub request_id: VarInt,
154}
155
156/// REQUESTS_BLOCKED message (type 0x1A).
157#[derive(Debug, Clone, PartialEq, Eq)]
158pub struct RequestsBlocked {
159    /// The request ID that is currently blocked on.
160    pub maximum_request_id: VarInt,
161}
162
163// ============================================================
164// Subscribe Messages
165// ============================================================
166
167/// SUBSCRIBE message (type 0x03).
168#[derive(Debug, Clone, PartialEq, Eq)]
169pub struct Subscribe {
170    /// The request ID for this subscription.
171    pub request_id: VarInt,
172    /// The track namespace.
173    pub track_namespace: TrackNamespace,
174    /// The track name within the namespace.
175    pub track_name: Vec<u8>,
176    /// Subscriber priority for this track.
177    pub subscriber_priority: u8,
178    /// Requested group delivery order.
179    pub group_order: GroupOrder,
180    /// Whether to forward data on this subscription.
181    pub forward: Forward,
182    /// The filter type controlling which objects are delivered.
183    pub filter_type: FilterType,
184    /// Present only for AbsoluteStart and AbsoluteRange filter types.
185    pub start_location: Option<Location>,
186    /// Present only for AbsoluteRange filter type.
187    pub end_group: Option<VarInt>,
188    /// Subscribe parameters.
189    pub parameters: Vec<KeyValuePair>,
190}
191
192/// SUBSCRIBE_OK message (type 0x04).
193#[derive(Debug, Clone, PartialEq, Eq)]
194pub struct SubscribeOk {
195    /// The request ID this response corresponds to.
196    pub request_id: VarInt,
197    /// The track alias assigned by the publisher.
198    pub track_alias: VarInt,
199    /// Subscription expiry in milliseconds (0 = no expiry).
200    pub expires: VarInt,
201    /// The group delivery order chosen by the publisher.
202    pub group_order: GroupOrder,
203    /// Whether the largest location is included.
204    pub content_exists: ContentExists,
205    /// Present only when content_exists == HasLargestLocation.
206    pub largest_location: Option<Location>,
207    /// Response parameters.
208    pub parameters: Vec<KeyValuePair>,
209}
210
211/// SUBSCRIBE_ERROR message (type 0x05).
212#[derive(Debug, Clone, PartialEq, Eq)]
213pub struct SubscribeError {
214    /// The request ID this error corresponds to.
215    pub request_id: VarInt,
216    /// Application-defined error code.
217    pub error_code: VarInt,
218    /// Human-readable reason phrase.
219    pub reason_phrase: Vec<u8>,
220}
221
222/// SUBSCRIBE_UPDATE message (type 0x02).
223#[derive(Debug, Clone, PartialEq, Eq)]
224pub struct SubscribeUpdate {
225    /// The request ID for this update message.
226    pub request_id: VarInt,
227    /// The request ID of the subscription being updated.
228    pub subscription_request_id: VarInt,
229    /// Updated start location.
230    pub start_location: Location,
231    /// Updated end group.
232    pub end_group: VarInt,
233    /// Updated subscriber priority.
234    pub subscriber_priority: u8,
235    /// Updated forward preference.
236    pub forward: Forward,
237    /// Updated parameters.
238    pub parameters: Vec<KeyValuePair>,
239}
240
241/// UNSUBSCRIBE message (type 0x0A).
242#[derive(Debug, Clone, PartialEq, Eq)]
243pub struct Unsubscribe {
244    /// The request ID of the subscription to cancel.
245    pub request_id: VarInt,
246}
247
248// ============================================================
249// Publish Messages
250// ============================================================
251
252/// PUBLISH message (type 0x1D).
253#[derive(Debug, Clone, PartialEq, Eq)]
254pub struct Publish {
255    /// Request ID.
256    pub request_id: VarInt,
257    /// Track namespace.
258    pub track_namespace: TrackNamespace,
259    /// Track name.
260    pub track_name: Vec<u8>,
261    /// Track alias assigned by the publisher.
262    pub track_alias: VarInt,
263    /// Group delivery order.
264    pub group_order: GroupOrder,
265    /// Whether a largest location is included.
266    pub content_exists: ContentExists,
267    /// Largest location, present when content_exists == HasLargestLocation.
268    pub largest_location: Option<Location>,
269    /// Forward preference.
270    pub forward: Forward,
271    /// Publish parameters.
272    pub parameters: Vec<KeyValuePair>,
273}
274
275/// PUBLISH_OK message (type 0x1E).
276#[derive(Debug, Clone, PartialEq, Eq)]
277pub struct PublishOk {
278    /// Request ID this response corresponds to.
279    pub request_id: VarInt,
280    /// Forward preference.
281    pub forward: Forward,
282    /// Subscriber priority.
283    pub subscriber_priority: u8,
284    /// Group order.
285    pub group_order: GroupOrder,
286    /// Filter type.
287    pub filter_type: FilterType,
288    /// Present only for AbsoluteStart and AbsoluteRange filter types.
289    pub start_location: Option<Location>,
290    /// Present only for AbsoluteRange filter type.
291    pub end_group: Option<VarInt>,
292    /// Response parameters.
293    pub parameters: Vec<KeyValuePair>,
294}
295
296/// PUBLISH_ERROR message (type 0x1F).
297#[derive(Debug, Clone, PartialEq, Eq)]
298pub struct PublishError {
299    /// The request ID this error corresponds to.
300    pub request_id: VarInt,
301    /// Application-defined error code.
302    pub error_code: VarInt,
303    /// Human-readable reason phrase.
304    pub reason_phrase: Vec<u8>,
305}
306
307/// PUBLISH_DONE message (type 0x0B).
308#[derive(Debug, Clone, PartialEq, Eq)]
309pub struct PublishDone {
310    /// Request ID.
311    pub request_id: VarInt,
312    /// Status code describing why the publish finished.
313    pub status_code: VarInt,
314    /// Number of data streams used by this publish.
315    pub stream_count: VarInt,
316    /// Human-readable reason phrase.
317    pub reason_phrase: Vec<u8>,
318}
319
320// ============================================================
321// Publish Namespace Messages
322// ============================================================
323
324/// PUBLISH_NAMESPACE message (type 0x06).
325#[derive(Debug, Clone, PartialEq, Eq)]
326pub struct PublishNamespace {
327    /// The request ID for this namespace publish.
328    pub request_id: VarInt,
329    /// The track namespace to publish.
330    pub track_namespace: TrackNamespace,
331    /// Publish namespace parameters.
332    pub parameters: Vec<KeyValuePair>,
333}
334
335/// PUBLISH_NAMESPACE_OK message (type 0x07).
336#[derive(Debug, Clone, PartialEq, Eq)]
337pub struct PublishNamespaceOk {
338    /// The request ID this response corresponds to.
339    pub request_id: VarInt,
340    /// Response parameters.
341    pub parameters: Vec<KeyValuePair>,
342}
343
344/// PUBLISH_NAMESPACE_ERROR message (type 0x08).
345#[derive(Debug, Clone, PartialEq, Eq)]
346pub struct PublishNamespaceError {
347    /// The request ID this error corresponds to.
348    pub request_id: VarInt,
349    /// Application-defined error code.
350    pub error_code: VarInt,
351    /// Human-readable reason phrase.
352    pub reason_phrase: Vec<u8>,
353}
354
355/// PUBLISH_NAMESPACE_DONE message (type 0x09).
356#[derive(Debug, Clone, PartialEq, Eq)]
357pub struct PublishNamespaceDone {
358    /// Track namespace being finalized.
359    pub track_namespace: TrackNamespace,
360}
361
362/// PUBLISH_NAMESPACE_CANCEL message (type 0x0C).
363#[derive(Debug, Clone, PartialEq, Eq)]
364pub struct PublishNamespaceCancel {
365    /// Track namespace being cancelled.
366    pub track_namespace: TrackNamespace,
367    /// Application-defined error code.
368    pub error_code: VarInt,
369    /// Human-readable reason phrase.
370    pub reason_phrase: Vec<u8>,
371}
372
373// ============================================================
374// Subscribe Namespace Messages
375// ============================================================
376
377/// SUBSCRIBE_NAMESPACE message (type 0x11).
378#[derive(Debug, Clone, PartialEq, Eq)]
379pub struct SubscribeNamespace {
380    /// The request ID for this namespace subscription.
381    pub request_id: VarInt,
382    /// The track namespace to subscribe to.
383    pub track_namespace: TrackNamespace,
384    /// Subscribe namespace parameters.
385    pub parameters: Vec<KeyValuePair>,
386}
387
388/// SUBSCRIBE_NAMESPACE_OK message (type 0x12).
389#[derive(Debug, Clone, PartialEq, Eq)]
390pub struct SubscribeNamespaceOk {
391    /// The request ID this response corresponds to.
392    pub request_id: VarInt,
393    /// Response parameters.
394    pub parameters: Vec<KeyValuePair>,
395}
396
397/// SUBSCRIBE_NAMESPACE_ERROR message (type 0x13).
398#[derive(Debug, Clone, PartialEq, Eq)]
399pub struct SubscribeNamespaceError {
400    /// The request ID this error corresponds to.
401    pub request_id: VarInt,
402    /// Application-defined error code.
403    pub error_code: VarInt,
404    /// Human-readable reason phrase.
405    pub reason_phrase: Vec<u8>,
406}
407
408/// UNSUBSCRIBE_NAMESPACE message (type 0x14).
409#[derive(Debug, Clone, PartialEq, Eq)]
410pub struct UnsubscribeNamespace {
411    /// The namespace prefix of the namespace subscription to cancel.
412    pub track_namespace_prefix: TrackNamespace,
413}
414
415// ============================================================
416// Fetch Messages
417// ============================================================
418
419/// FETCH type discriminator (standalone vs joining).
420#[derive(Debug, Clone, Copy, PartialEq, Eq)]
421#[repr(u64)]
422pub enum FetchType {
423    /// Standalone fetch with explicit track and range.
424    Standalone = 1,
425    /// Joining fetch relative to a subscribe request.
426    RelativeJoining = 2,
427    /// Joining fetch at an absolute group.
428    AbsoluteJoining = 3,
429}
430
431impl FetchType {
432    /// Convert a raw wire value to a [`FetchType`].
433    pub fn from_u64(v: u64) -> Option<Self> {
434        match v {
435            1 => Some(FetchType::Standalone),
436            2 => Some(FetchType::RelativeJoining),
437            3 => Some(FetchType::AbsoluteJoining),
438            _ => None,
439        }
440    }
441}
442
443/// FETCH payload — either a standalone fetch or a joining fetch.
444#[derive(Debug, Clone, PartialEq, Eq)]
445pub enum FetchPayload {
446    /// Standalone fetch.
447    Standalone {
448        /// Track namespace.
449        track_namespace: TrackNamespace,
450        /// Track name.
451        track_name: Vec<u8>,
452        /// Starting group ID.
453        start_group: VarInt,
454        /// Starting object ID.
455        start_object: VarInt,
456        /// Ending group ID.
457        end_group: VarInt,
458        /// Ending object ID.
459        end_object: VarInt,
460    },
461    /// Joining fetch.
462    Joining {
463        /// Joining subscribe request ID.
464        joining_request_id: VarInt,
465        /// Joining start (relative offset or absolute group).
466        joining_start: VarInt,
467    },
468}
469
470/// FETCH message (type 0x16).
471#[derive(Debug, Clone, PartialEq, Eq)]
472pub struct Fetch {
473    /// The request ID for this fetch.
474    pub request_id: VarInt,
475    /// Subscriber priority.
476    pub subscriber_priority: u8,
477    /// Requested group order.
478    pub group_order: GroupOrder,
479    /// Fetch type discriminator.
480    pub fetch_type: FetchType,
481    /// Variant-specific payload.
482    pub fetch_payload: FetchPayload,
483    /// Fetch parameters.
484    pub parameters: Vec<KeyValuePair>,
485}
486
487/// FETCH_OK message (type 0x18).
488#[derive(Debug, Clone, PartialEq, Eq)]
489pub struct FetchOk {
490    /// The request ID this response corresponds to.
491    pub request_id: VarInt,
492    /// Group order chosen by the publisher.
493    pub group_order: GroupOrder,
494    /// End-of-track flag.
495    pub end_of_track: VarInt,
496    /// End location (largest group / object in the fetch).
497    pub end_location: Location,
498    /// Response parameters.
499    pub parameters: Vec<KeyValuePair>,
500}
501
502/// FETCH_ERROR message (type 0x19).
503#[derive(Debug, Clone, PartialEq, Eq)]
504pub struct FetchError {
505    /// The request ID this error corresponds to.
506    pub request_id: VarInt,
507    /// Application-defined error code.
508    pub error_code: VarInt,
509    /// Human-readable reason phrase.
510    pub reason_phrase: Vec<u8>,
511}
512
513/// FETCH_CANCEL message (type 0x17).
514#[derive(Debug, Clone, PartialEq, Eq)]
515pub struct FetchCancel {
516    /// The request ID of the fetch to cancel.
517    pub request_id: VarInt,
518}
519
520// ============================================================
521// Track Status Messages
522// ============================================================
523
524/// TRACK_STATUS message (type 0x0D) — subscribe-like request.
525#[derive(Debug, Clone, PartialEq, Eq)]
526pub struct TrackStatus {
527    /// The request ID for this track status query.
528    pub request_id: VarInt,
529    /// The track namespace to query status for.
530    pub track_namespace: TrackNamespace,
531    /// The track name within the namespace.
532    pub track_name: Vec<u8>,
533    /// Subscriber priority.
534    pub subscriber_priority: u8,
535    /// Requested group order.
536    pub group_order: GroupOrder,
537    /// Forward preference.
538    pub forward: Forward,
539    /// Filter type.
540    pub filter_type: FilterType,
541    /// Present only for AbsoluteStart and AbsoluteRange filter types.
542    pub start_location: Option<Location>,
543    /// Present only for AbsoluteRange filter type.
544    pub end_group: Option<VarInt>,
545    /// Track status parameters.
546    pub parameters: Vec<KeyValuePair>,
547}
548
549/// TRACK_STATUS_OK message (type 0x0E) — subscribe_ok-like response.
550#[derive(Debug, Clone, PartialEq, Eq)]
551pub struct TrackStatusOk {
552    /// The request ID this response corresponds to.
553    pub request_id: VarInt,
554    /// Track alias.
555    pub track_alias: VarInt,
556    /// Subscription expiry in milliseconds.
557    pub expires: VarInt,
558    /// Group order.
559    pub group_order: GroupOrder,
560    /// Whether content exists / largest location is present.
561    pub content_exists: ContentExists,
562    /// The largest location, present when content_exists == HasLargestLocation.
563    pub largest_location: Option<Location>,
564    /// Response parameters.
565    pub parameters: Vec<KeyValuePair>,
566}
567
568/// TRACK_STATUS_ERROR message (type 0x0F).
569#[derive(Debug, Clone, PartialEq, Eq)]
570pub struct TrackStatusError {
571    /// The request ID this error corresponds to.
572    pub request_id: VarInt,
573    /// Application-defined error code.
574    pub error_code: VarInt,
575    /// Human-readable reason phrase.
576    pub reason_phrase: Vec<u8>,
577}
578
579// ============================================================
580// Unified Message Enum
581// ============================================================
582
583/// A parsed MoQT control message (draft-14).
584#[derive(Debug, Clone, PartialEq, Eq)]
585pub enum ControlMessage {
586    /// ClientSetup (type 0x20).
587    ClientSetup(ClientSetup),
588    /// ServerSetup (type 0x21).
589    ServerSetup(ServerSetup),
590    /// GoAway (type 0x10).
591    GoAway(GoAway),
592    /// MaxRequestId (type 0x15).
593    MaxRequestId(MaxRequestId),
594    /// RequestsBlocked (type 0x1A).
595    RequestsBlocked(RequestsBlocked),
596    /// Subscribe (type 0x03).
597    Subscribe(Subscribe),
598    /// SubscribeOk (type 0x04).
599    SubscribeOk(SubscribeOk),
600    /// SubscribeError (type 0x05).
601    SubscribeError(SubscribeError),
602    /// SubscribeUpdate (type 0x02).
603    SubscribeUpdate(SubscribeUpdate),
604    /// Unsubscribe (type 0x0A).
605    Unsubscribe(Unsubscribe),
606    /// Publish (type 0x1D).
607    Publish(Publish),
608    /// PublishOk (type 0x1E).
609    PublishOk(PublishOk),
610    /// PublishError (type 0x1F).
611    PublishError(PublishError),
612    /// PublishDone (type 0x0B).
613    PublishDone(PublishDone),
614    /// PublishNamespace (type 0x06).
615    PublishNamespace(PublishNamespace),
616    /// PublishNamespaceOk (type 0x07).
617    PublishNamespaceOk(PublishNamespaceOk),
618    /// PublishNamespaceError (type 0x08).
619    PublishNamespaceError(PublishNamespaceError),
620    /// PublishNamespaceDone (type 0x09).
621    PublishNamespaceDone(PublishNamespaceDone),
622    /// PublishNamespaceCancel (type 0x0C).
623    PublishNamespaceCancel(PublishNamespaceCancel),
624    /// SubscribeNamespace (type 0x11).
625    SubscribeNamespace(SubscribeNamespace),
626    /// SubscribeNamespaceOk (type 0x12).
627    SubscribeNamespaceOk(SubscribeNamespaceOk),
628    /// SubscribeNamespaceError (type 0x13).
629    SubscribeNamespaceError(SubscribeNamespaceError),
630    /// UnsubscribeNamespace (type 0x14).
631    UnsubscribeNamespace(UnsubscribeNamespace),
632    /// Fetch (type 0x16).
633    Fetch(Fetch),
634    /// FetchOk (type 0x18).
635    FetchOk(FetchOk),
636    /// FetchError (type 0x19).
637    FetchError(FetchError),
638    /// FetchCancel (type 0x17).
639    FetchCancel(FetchCancel),
640    /// TrackStatus (type 0x0D).
641    TrackStatus(TrackStatus),
642    /// TrackStatusOk (type 0x0E).
643    TrackStatusOk(TrackStatusOk),
644    /// TrackStatusError (type 0x0F).
645    TrackStatusError(TrackStatusError),
646}
647
648impl ControlMessage {
649    /// Encode this control message to bytes (including type ID and length prefix).
650    ///
651    /// Draft-14 framing: type_id(vi) + payload_length(16) + payload.
652    pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
653        let mut payload = Vec::with_capacity(256);
654        self.encode_payload(&mut payload)?;
655
656        if payload.len() > MAX_MESSAGE_LENGTH {
657            return Err(CodecError::MessageTooLong(payload.len()));
658        }
659
660        VarInt::from_usize(self.message_type().id() as usize).encode(buf);
661        // Draft-14: 16-bit length (big-endian)
662        buf.put_u16(payload.len() as u16);
663        buf.put_slice(&payload);
664        Ok(())
665    }
666
667    /// Decode a control message from bytes (reads type ID and length prefix first).
668    ///
669    /// Draft-14 framing: type_id(vi) + payload_length(16) + payload.
670    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
671        let type_id = VarInt::decode(buf)?.into_inner();
672        let msg_type =
673            MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
674        // Draft-14: 16-bit length (big-endian)
675        if buf.remaining() < 2 {
676            return Err(CodecError::UnexpectedEnd);
677        }
678        let payload_len = buf.get_u16() as usize;
679        if buf.remaining() < payload_len {
680            return Err(CodecError::UnexpectedEnd);
681        }
682        let payload_bytes = buf.copy_to_bytes(payload_len);
683        let mut payload = &payload_bytes[..];
684        Self::decode_payload(msg_type, &mut payload)
685    }
686
687    fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
688        match self {
689            ControlMessage::ClientSetup(m) => {
690                VarInt::from_usize(m.supported_versions.len()).encode(buf);
691                for v in &m.supported_versions {
692                    v.encode(buf);
693                }
694                KeyValuePair::encode_list(&m.parameters, buf);
695            }
696            ControlMessage::ServerSetup(m) => {
697                m.selected_version.encode(buf);
698                KeyValuePair::encode_list(&m.parameters, buf);
699            }
700            ControlMessage::GoAway(m) => {
701                if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
702                    return Err(CodecError::GoAwayUriTooLong);
703                }
704                VarInt::from_usize(m.new_session_uri.len()).encode(buf);
705                buf.put_slice(&m.new_session_uri);
706            }
707            ControlMessage::MaxRequestId(m) => {
708                m.request_id.encode(buf);
709            }
710            ControlMessage::RequestsBlocked(m) => {
711                m.maximum_request_id.encode(buf);
712            }
713            ControlMessage::Subscribe(m) => {
714                m.request_id.encode(buf);
715                m.track_namespace.encode(buf);
716                VarInt::from_usize(m.track_name.len()).encode(buf);
717                buf.put_slice(&m.track_name);
718                buf.put_u8(m.subscriber_priority);
719                buf.put_u8(m.group_order as u8);
720                buf.put_u8(m.forward as u8);
721                buf.put_u8(m.filter_type as u8);
722                if let Some(loc) = &m.start_location {
723                    loc.encode(buf);
724                }
725                if let Some(eg) = &m.end_group {
726                    eg.encode(buf);
727                }
728                KeyValuePair::encode_list(&m.parameters, buf);
729            }
730            ControlMessage::SubscribeOk(m) => {
731                m.request_id.encode(buf);
732                m.track_alias.encode(buf);
733                m.expires.encode(buf);
734                buf.put_u8(m.group_order as u8);
735                buf.put_u8(m.content_exists as u8);
736                if let Some(loc) = &m.largest_location {
737                    loc.encode(buf);
738                }
739                KeyValuePair::encode_list(&m.parameters, buf);
740            }
741            ControlMessage::SubscribeError(m) => {
742                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
743                    return Err(CodecError::ReasonPhraseTooLong);
744                }
745                m.request_id.encode(buf);
746                m.error_code.encode(buf);
747                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
748                buf.put_slice(&m.reason_phrase);
749            }
750            ControlMessage::SubscribeUpdate(m) => {
751                m.request_id.encode(buf);
752                m.subscription_request_id.encode(buf);
753                m.start_location.encode(buf);
754                m.end_group.encode(buf);
755                buf.put_u8(m.subscriber_priority);
756                buf.put_u8(m.forward as u8);
757                KeyValuePair::encode_list(&m.parameters, buf);
758            }
759            ControlMessage::Unsubscribe(m) => {
760                m.request_id.encode(buf);
761            }
762            ControlMessage::Publish(m) => {
763                m.request_id.encode(buf);
764                m.track_namespace.encode(buf);
765                VarInt::from_usize(m.track_name.len()).encode(buf);
766                buf.put_slice(&m.track_name);
767                m.track_alias.encode(buf);
768                buf.put_u8(m.group_order as u8);
769                buf.put_u8(m.content_exists as u8);
770                if let Some(loc) = &m.largest_location {
771                    loc.encode(buf);
772                }
773                buf.put_u8(m.forward as u8);
774                KeyValuePair::encode_list(&m.parameters, buf);
775            }
776            ControlMessage::PublishOk(m) => {
777                m.request_id.encode(buf);
778                buf.put_u8(m.forward as u8);
779                buf.put_u8(m.subscriber_priority);
780                buf.put_u8(m.group_order as u8);
781                buf.put_u8(m.filter_type as u8);
782                if let Some(loc) = &m.start_location {
783                    loc.encode(buf);
784                }
785                if let Some(eg) = &m.end_group {
786                    eg.encode(buf);
787                }
788                KeyValuePair::encode_list(&m.parameters, buf);
789            }
790            ControlMessage::PublishError(m) => {
791                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
792                    return Err(CodecError::ReasonPhraseTooLong);
793                }
794                m.request_id.encode(buf);
795                m.error_code.encode(buf);
796                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
797                buf.put_slice(&m.reason_phrase);
798            }
799            ControlMessage::PublishDone(m) => {
800                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
801                    return Err(CodecError::ReasonPhraseTooLong);
802                }
803                m.request_id.encode(buf);
804                m.status_code.encode(buf);
805                m.stream_count.encode(buf);
806                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
807                buf.put_slice(&m.reason_phrase);
808            }
809            ControlMessage::PublishNamespace(m) => {
810                m.request_id.encode(buf);
811                m.track_namespace.encode(buf);
812                KeyValuePair::encode_list(&m.parameters, buf);
813            }
814            ControlMessage::PublishNamespaceOk(m) => {
815                m.request_id.encode(buf);
816                KeyValuePair::encode_list(&m.parameters, buf);
817            }
818            ControlMessage::PublishNamespaceError(m) => {
819                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
820                    return Err(CodecError::ReasonPhraseTooLong);
821                }
822                m.request_id.encode(buf);
823                m.error_code.encode(buf);
824                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
825                buf.put_slice(&m.reason_phrase);
826            }
827            ControlMessage::PublishNamespaceDone(m) => {
828                m.track_namespace.encode(buf);
829            }
830            ControlMessage::PublishNamespaceCancel(m) => {
831                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
832                    return Err(CodecError::ReasonPhraseTooLong);
833                }
834                m.track_namespace.encode(buf);
835                m.error_code.encode(buf);
836                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
837                buf.put_slice(&m.reason_phrase);
838            }
839            ControlMessage::SubscribeNamespace(m) => {
840                m.request_id.encode(buf);
841                m.track_namespace.encode(buf);
842                KeyValuePair::encode_list(&m.parameters, buf);
843            }
844            ControlMessage::SubscribeNamespaceOk(m) => {
845                m.request_id.encode(buf);
846                KeyValuePair::encode_list(&m.parameters, buf);
847            }
848            ControlMessage::SubscribeNamespaceError(m) => {
849                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
850                    return Err(CodecError::ReasonPhraseTooLong);
851                }
852                m.request_id.encode(buf);
853                m.error_code.encode(buf);
854                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
855                buf.put_slice(&m.reason_phrase);
856            }
857            ControlMessage::UnsubscribeNamespace(m) => {
858                m.track_namespace_prefix.encode(buf);
859            }
860            ControlMessage::Fetch(m) => {
861                m.request_id.encode(buf);
862                buf.put_u8(m.subscriber_priority);
863                buf.put_u8(m.group_order as u8);
864                VarInt::from_usize(m.fetch_type as usize).encode(buf);
865                match &m.fetch_payload {
866                    FetchPayload::Standalone {
867                        track_namespace,
868                        track_name,
869                        start_group,
870                        start_object,
871                        end_group,
872                        end_object,
873                    } => {
874                        track_namespace.encode(buf);
875                        VarInt::from_usize(track_name.len()).encode(buf);
876                        buf.put_slice(track_name);
877                        start_group.encode(buf);
878                        start_object.encode(buf);
879                        end_group.encode(buf);
880                        end_object.encode(buf);
881                    }
882                    FetchPayload::Joining { joining_request_id, joining_start } => {
883                        joining_request_id.encode(buf);
884                        joining_start.encode(buf);
885                    }
886                }
887                KeyValuePair::encode_list(&m.parameters, buf);
888            }
889            ControlMessage::FetchOk(m) => {
890                m.request_id.encode(buf);
891                buf.put_u8(m.group_order as u8);
892                m.end_of_track.encode(buf);
893                m.end_location.encode(buf);
894                KeyValuePair::encode_list(&m.parameters, buf);
895            }
896            ControlMessage::FetchError(m) => {
897                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
898                    return Err(CodecError::ReasonPhraseTooLong);
899                }
900                m.request_id.encode(buf);
901                m.error_code.encode(buf);
902                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
903                buf.put_slice(&m.reason_phrase);
904            }
905            ControlMessage::FetchCancel(m) => {
906                m.request_id.encode(buf);
907            }
908            ControlMessage::TrackStatus(m) => {
909                m.request_id.encode(buf);
910                m.track_namespace.encode(buf);
911                VarInt::from_usize(m.track_name.len()).encode(buf);
912                buf.put_slice(&m.track_name);
913                buf.put_u8(m.subscriber_priority);
914                buf.put_u8(m.group_order as u8);
915                buf.put_u8(m.forward as u8);
916                buf.put_u8(m.filter_type as u8);
917                if let Some(loc) = &m.start_location {
918                    loc.encode(buf);
919                }
920                if let Some(eg) = &m.end_group {
921                    eg.encode(buf);
922                }
923                KeyValuePair::encode_list(&m.parameters, buf);
924            }
925            ControlMessage::TrackStatusOk(m) => {
926                m.request_id.encode(buf);
927                m.track_alias.encode(buf);
928                m.expires.encode(buf);
929                buf.put_u8(m.group_order as u8);
930                buf.put_u8(m.content_exists as u8);
931                if let Some(loc) = &m.largest_location {
932                    loc.encode(buf);
933                }
934                KeyValuePair::encode_list(&m.parameters, buf);
935            }
936            ControlMessage::TrackStatusError(m) => {
937                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
938                    return Err(CodecError::ReasonPhraseTooLong);
939                }
940                m.request_id.encode(buf);
941                m.error_code.encode(buf);
942                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
943                buf.put_slice(&m.reason_phrase);
944            }
945        }
946        Ok(())
947    }
948
949    fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
950        match msg_type {
951            MessageType::ClientSetup => {
952                let num_versions = VarInt::decode(buf)?.into_inner() as usize;
953                if num_versions == 0 {
954                    return Err(CodecError::InvalidField);
955                }
956                let mut supported_versions = Vec::with_capacity(num_versions);
957                for _ in 0..num_versions {
958                    supported_versions.push(VarInt::decode(buf)?);
959                }
960                let parameters = KeyValuePair::decode_list(buf)?;
961                Ok(ControlMessage::ClientSetup(ClientSetup { supported_versions, parameters }))
962            }
963            MessageType::ServerSetup => {
964                let selected_version = VarInt::decode(buf)?;
965                let parameters = KeyValuePair::decode_list(buf)?;
966                Ok(ControlMessage::ServerSetup(ServerSetup { selected_version, parameters }))
967            }
968            MessageType::GoAway => {
969                let uri_len = VarInt::decode(buf)?.into_inner() as usize;
970                let uri = read_bytes(buf, uri_len)?;
971                Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
972            }
973            MessageType::MaxRequestId => {
974                let request_id = VarInt::decode(buf)?;
975                Ok(ControlMessage::MaxRequestId(MaxRequestId { request_id }))
976            }
977            MessageType::RequestsBlocked => {
978                let maximum_request_id = VarInt::decode(buf)?;
979                Ok(ControlMessage::RequestsBlocked(RequestsBlocked { maximum_request_id }))
980            }
981            MessageType::Subscribe => {
982                let request_id = VarInt::decode(buf)?;
983                let track_namespace = TrackNamespace::decode(buf)?;
984                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
985                let track_name = read_bytes(buf, track_name_len)?;
986                if buf.remaining() < 4 {
987                    return Err(CodecError::UnexpectedEnd);
988                }
989                let subscriber_priority = buf.get_u8();
990                let group_order =
991                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
992                let forward_val = buf.get_u8();
993                let forward = match forward_val {
994                    0 => Forward::DontForward,
995                    1 => Forward::Forward,
996                    _ => return Err(CodecError::InvalidField),
997                };
998                let filter_val = buf.get_u8();
999                let filter_type =
1000                    FilterType::from_u8(filter_val).ok_or(CodecError::InvalidField)?;
1001                let start_location = match filter_type {
1002                    FilterType::AbsoluteStart | FilterType::AbsoluteRange => {
1003                        Some(Location::decode(buf)?)
1004                    }
1005                    _ => None,
1006                };
1007                let end_group = match filter_type {
1008                    FilterType::AbsoluteRange => Some(VarInt::decode(buf)?),
1009                    _ => None,
1010                };
1011                let parameters = KeyValuePair::decode_list(buf)?;
1012                Ok(ControlMessage::Subscribe(Subscribe {
1013                    request_id,
1014                    track_namespace,
1015                    track_name,
1016                    subscriber_priority,
1017                    group_order,
1018                    forward,
1019                    filter_type,
1020                    start_location,
1021                    end_group,
1022                    parameters,
1023                }))
1024            }
1025            MessageType::SubscribeOk => {
1026                let request_id = VarInt::decode(buf)?;
1027                let track_alias = VarInt::decode(buf)?;
1028                let expires = VarInt::decode(buf)?;
1029                if buf.remaining() < 2 {
1030                    return Err(CodecError::UnexpectedEnd);
1031                }
1032                let group_order =
1033                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1034                let content_exists_val = buf.get_u8();
1035                let content_exists = match content_exists_val {
1036                    0 => ContentExists::NoLargestLocation,
1037                    1 => ContentExists::HasLargestLocation,
1038                    _ => return Err(CodecError::InvalidField),
1039                };
1040                let largest_location = if content_exists == ContentExists::HasLargestLocation {
1041                    Some(Location::decode(buf)?)
1042                } else {
1043                    None
1044                };
1045                let parameters = KeyValuePair::decode_list(buf)?;
1046                Ok(ControlMessage::SubscribeOk(SubscribeOk {
1047                    request_id,
1048                    track_alias,
1049                    expires,
1050                    group_order,
1051                    content_exists,
1052                    largest_location,
1053                    parameters,
1054                }))
1055            }
1056            MessageType::SubscribeError => {
1057                let request_id = VarInt::decode(buf)?;
1058                let error_code = VarInt::decode(buf)?;
1059                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1060                let reason_phrase = read_bytes(buf, reason_len)?;
1061                Ok(ControlMessage::SubscribeError(SubscribeError {
1062                    request_id,
1063                    error_code,
1064                    reason_phrase,
1065                }))
1066            }
1067            MessageType::SubscribeUpdate => {
1068                let request_id = VarInt::decode(buf)?;
1069                let subscription_request_id = VarInt::decode(buf)?;
1070                let start_location = Location::decode(buf)?;
1071                let end_group = VarInt::decode(buf)?;
1072                if buf.remaining() < 2 {
1073                    return Err(CodecError::UnexpectedEnd);
1074                }
1075                let subscriber_priority = buf.get_u8();
1076                let forward_val = buf.get_u8();
1077                let forward = match forward_val {
1078                    0 => Forward::DontForward,
1079                    1 => Forward::Forward,
1080                    _ => return Err(CodecError::InvalidField),
1081                };
1082                let parameters = KeyValuePair::decode_list(buf)?;
1083                Ok(ControlMessage::SubscribeUpdate(SubscribeUpdate {
1084                    request_id,
1085                    subscription_request_id,
1086                    start_location,
1087                    end_group,
1088                    subscriber_priority,
1089                    forward,
1090                    parameters,
1091                }))
1092            }
1093            MessageType::Unsubscribe => {
1094                let request_id = VarInt::decode(buf)?;
1095                Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
1096            }
1097            MessageType::Publish => {
1098                let request_id = VarInt::decode(buf)?;
1099                let track_namespace = TrackNamespace::decode(buf)?;
1100                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1101                let track_name = read_bytes(buf, track_name_len)?;
1102                let track_alias = VarInt::decode(buf)?;
1103                if buf.remaining() < 2 {
1104                    return Err(CodecError::UnexpectedEnd);
1105                }
1106                let group_order =
1107                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1108                let content_exists_val = buf.get_u8();
1109                let content_exists = match content_exists_val {
1110                    0 => ContentExists::NoLargestLocation,
1111                    1 => ContentExists::HasLargestLocation,
1112                    _ => return Err(CodecError::InvalidField),
1113                };
1114                let largest_location = if content_exists == ContentExists::HasLargestLocation {
1115                    Some(Location::decode(buf)?)
1116                } else {
1117                    None
1118                };
1119                if buf.remaining() < 1 {
1120                    return Err(CodecError::UnexpectedEnd);
1121                }
1122                let forward_val = buf.get_u8();
1123                let forward = match forward_val {
1124                    0 => Forward::DontForward,
1125                    1 => Forward::Forward,
1126                    _ => return Err(CodecError::InvalidField),
1127                };
1128                let parameters = KeyValuePair::decode_list(buf)?;
1129                Ok(ControlMessage::Publish(Publish {
1130                    request_id,
1131                    track_namespace,
1132                    track_name,
1133                    track_alias,
1134                    group_order,
1135                    content_exists,
1136                    largest_location,
1137                    forward,
1138                    parameters,
1139                }))
1140            }
1141            MessageType::PublishOk => {
1142                let request_id = VarInt::decode(buf)?;
1143                if buf.remaining() < 4 {
1144                    return Err(CodecError::UnexpectedEnd);
1145                }
1146                let forward_val = buf.get_u8();
1147                let forward = match forward_val {
1148                    0 => Forward::DontForward,
1149                    1 => Forward::Forward,
1150                    _ => return Err(CodecError::InvalidField),
1151                };
1152                let subscriber_priority = buf.get_u8();
1153                let group_order =
1154                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1155                let filter_val = buf.get_u8();
1156                let filter_type =
1157                    FilterType::from_u8(filter_val).ok_or(CodecError::InvalidField)?;
1158                let start_location = match filter_type {
1159                    FilterType::AbsoluteStart | FilterType::AbsoluteRange => {
1160                        Some(Location::decode(buf)?)
1161                    }
1162                    _ => None,
1163                };
1164                let end_group = match filter_type {
1165                    FilterType::AbsoluteRange => Some(VarInt::decode(buf)?),
1166                    _ => None,
1167                };
1168                let parameters = KeyValuePair::decode_list(buf)?;
1169                Ok(ControlMessage::PublishOk(PublishOk {
1170                    request_id,
1171                    forward,
1172                    subscriber_priority,
1173                    group_order,
1174                    filter_type,
1175                    start_location,
1176                    end_group,
1177                    parameters,
1178                }))
1179            }
1180            MessageType::PublishError => {
1181                let request_id = VarInt::decode(buf)?;
1182                let error_code = VarInt::decode(buf)?;
1183                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1184                let reason_phrase = read_bytes(buf, reason_len)?;
1185                Ok(ControlMessage::PublishError(PublishError {
1186                    request_id,
1187                    error_code,
1188                    reason_phrase,
1189                }))
1190            }
1191            MessageType::PublishDone => {
1192                let request_id = VarInt::decode(buf)?;
1193                let status_code = VarInt::decode(buf)?;
1194                let stream_count = VarInt::decode(buf)?;
1195                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1196                let reason_phrase = read_bytes(buf, reason_len)?;
1197                Ok(ControlMessage::PublishDone(PublishDone {
1198                    request_id,
1199                    status_code,
1200                    stream_count,
1201                    reason_phrase,
1202                }))
1203            }
1204            MessageType::PublishNamespace => {
1205                let request_id = VarInt::decode(buf)?;
1206                let track_namespace = TrackNamespace::decode(buf)?;
1207                let parameters = KeyValuePair::decode_list(buf)?;
1208                Ok(ControlMessage::PublishNamespace(PublishNamespace {
1209                    request_id,
1210                    track_namespace,
1211                    parameters,
1212                }))
1213            }
1214            MessageType::PublishNamespaceOk => {
1215                let request_id = VarInt::decode(buf)?;
1216                let parameters = KeyValuePair::decode_list(buf)?;
1217                Ok(ControlMessage::PublishNamespaceOk(PublishNamespaceOk {
1218                    request_id,
1219                    parameters,
1220                }))
1221            }
1222            MessageType::PublishNamespaceError => {
1223                let request_id = VarInt::decode(buf)?;
1224                let error_code = VarInt::decode(buf)?;
1225                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1226                let reason_phrase = read_bytes(buf, reason_len)?;
1227                Ok(ControlMessage::PublishNamespaceError(PublishNamespaceError {
1228                    request_id,
1229                    error_code,
1230                    reason_phrase,
1231                }))
1232            }
1233            MessageType::PublishNamespaceDone => {
1234                let track_namespace = TrackNamespace::decode(buf)?;
1235                Ok(ControlMessage::PublishNamespaceDone(PublishNamespaceDone { track_namespace }))
1236            }
1237            MessageType::PublishNamespaceCancel => {
1238                let track_namespace = TrackNamespace::decode(buf)?;
1239                let error_code = VarInt::decode(buf)?;
1240                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1241                let reason_phrase = read_bytes(buf, reason_len)?;
1242                Ok(ControlMessage::PublishNamespaceCancel(PublishNamespaceCancel {
1243                    track_namespace,
1244                    error_code,
1245                    reason_phrase,
1246                }))
1247            }
1248            MessageType::SubscribeNamespace => {
1249                let request_id = VarInt::decode(buf)?;
1250                let track_namespace = TrackNamespace::decode(buf)?;
1251                let parameters = KeyValuePair::decode_list(buf)?;
1252                Ok(ControlMessage::SubscribeNamespace(SubscribeNamespace {
1253                    request_id,
1254                    track_namespace,
1255                    parameters,
1256                }))
1257            }
1258            MessageType::SubscribeNamespaceOk => {
1259                let request_id = VarInt::decode(buf)?;
1260                let parameters = KeyValuePair::decode_list(buf)?;
1261                Ok(ControlMessage::SubscribeNamespaceOk(SubscribeNamespaceOk {
1262                    request_id,
1263                    parameters,
1264                }))
1265            }
1266            MessageType::SubscribeNamespaceError => {
1267                let request_id = VarInt::decode(buf)?;
1268                let error_code = VarInt::decode(buf)?;
1269                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1270                let reason_phrase = read_bytes(buf, reason_len)?;
1271                Ok(ControlMessage::SubscribeNamespaceError(SubscribeNamespaceError {
1272                    request_id,
1273                    error_code,
1274                    reason_phrase,
1275                }))
1276            }
1277            MessageType::UnsubscribeNamespace => {
1278                let track_namespace_prefix = TrackNamespace::decode(buf)?;
1279                Ok(ControlMessage::UnsubscribeNamespace(UnsubscribeNamespace {
1280                    track_namespace_prefix,
1281                }))
1282            }
1283            MessageType::Fetch => {
1284                let request_id = VarInt::decode(buf)?;
1285                if buf.remaining() < 2 {
1286                    return Err(CodecError::UnexpectedEnd);
1287                }
1288                let subscriber_priority = buf.get_u8();
1289                let group_order =
1290                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1291                let fetch_type_val = VarInt::decode(buf)?.into_inner();
1292                let fetch_type =
1293                    FetchType::from_u64(fetch_type_val).ok_or(CodecError::InvalidField)?;
1294                let fetch_payload = match fetch_type {
1295                    FetchType::Standalone => {
1296                        let track_namespace = TrackNamespace::decode(buf)?;
1297                        let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1298                        let track_name = read_bytes(buf, track_name_len)?;
1299                        let start_group = VarInt::decode(buf)?;
1300                        let start_object = VarInt::decode(buf)?;
1301                        let end_group = VarInt::decode(buf)?;
1302                        let end_object = VarInt::decode(buf)?;
1303                        FetchPayload::Standalone {
1304                            track_namespace,
1305                            track_name,
1306                            start_group,
1307                            start_object,
1308                            end_group,
1309                            end_object,
1310                        }
1311                    }
1312                    FetchType::RelativeJoining | FetchType::AbsoluteJoining => {
1313                        let joining_request_id = VarInt::decode(buf)?;
1314                        let joining_start = VarInt::decode(buf)?;
1315                        FetchPayload::Joining { joining_request_id, joining_start }
1316                    }
1317                };
1318                let parameters = KeyValuePair::decode_list(buf)?;
1319                Ok(ControlMessage::Fetch(Fetch {
1320                    request_id,
1321                    subscriber_priority,
1322                    group_order,
1323                    fetch_type,
1324                    fetch_payload,
1325                    parameters,
1326                }))
1327            }
1328            MessageType::FetchOk => {
1329                let request_id = VarInt::decode(buf)?;
1330                if buf.remaining() < 1 {
1331                    return Err(CodecError::UnexpectedEnd);
1332                }
1333                let group_order =
1334                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1335                let end_of_track = VarInt::decode(buf)?;
1336                let end_location = Location::decode(buf)?;
1337                let parameters = KeyValuePair::decode_list(buf)?;
1338                Ok(ControlMessage::FetchOk(FetchOk {
1339                    request_id,
1340                    group_order,
1341                    end_of_track,
1342                    end_location,
1343                    parameters,
1344                }))
1345            }
1346            MessageType::FetchError => {
1347                let request_id = VarInt::decode(buf)?;
1348                let error_code = VarInt::decode(buf)?;
1349                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1350                let reason_phrase = read_bytes(buf, reason_len)?;
1351                Ok(ControlMessage::FetchError(FetchError { request_id, error_code, reason_phrase }))
1352            }
1353            MessageType::FetchCancel => {
1354                let request_id = VarInt::decode(buf)?;
1355                Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
1356            }
1357            MessageType::TrackStatus => {
1358                let request_id = VarInt::decode(buf)?;
1359                let track_namespace = TrackNamespace::decode(buf)?;
1360                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1361                let track_name = read_bytes(buf, track_name_len)?;
1362                if buf.remaining() < 4 {
1363                    return Err(CodecError::UnexpectedEnd);
1364                }
1365                let subscriber_priority = buf.get_u8();
1366                let group_order =
1367                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1368                let forward_val = buf.get_u8();
1369                let forward = match forward_val {
1370                    0 => Forward::DontForward,
1371                    1 => Forward::Forward,
1372                    _ => return Err(CodecError::InvalidField),
1373                };
1374                let filter_val = buf.get_u8();
1375                let filter_type =
1376                    FilterType::from_u8(filter_val).ok_or(CodecError::InvalidField)?;
1377                let start_location = match filter_type {
1378                    FilterType::AbsoluteStart | FilterType::AbsoluteRange => {
1379                        Some(Location::decode(buf)?)
1380                    }
1381                    _ => None,
1382                };
1383                let end_group = match filter_type {
1384                    FilterType::AbsoluteRange => Some(VarInt::decode(buf)?),
1385                    _ => None,
1386                };
1387                let parameters = KeyValuePair::decode_list(buf)?;
1388                Ok(ControlMessage::TrackStatus(TrackStatus {
1389                    request_id,
1390                    track_namespace,
1391                    track_name,
1392                    subscriber_priority,
1393                    group_order,
1394                    forward,
1395                    filter_type,
1396                    start_location,
1397                    end_group,
1398                    parameters,
1399                }))
1400            }
1401            MessageType::TrackStatusOk => {
1402                let request_id = VarInt::decode(buf)?;
1403                let track_alias = VarInt::decode(buf)?;
1404                let expires = VarInt::decode(buf)?;
1405                if buf.remaining() < 2 {
1406                    return Err(CodecError::UnexpectedEnd);
1407                }
1408                let group_order =
1409                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1410                let content_exists_val = buf.get_u8();
1411                let content_exists = match content_exists_val {
1412                    0 => ContentExists::NoLargestLocation,
1413                    1 => ContentExists::HasLargestLocation,
1414                    _ => return Err(CodecError::InvalidField),
1415                };
1416                let largest_location = if content_exists == ContentExists::HasLargestLocation {
1417                    Some(Location::decode(buf)?)
1418                } else {
1419                    None
1420                };
1421                let parameters = KeyValuePair::decode_list(buf)?;
1422                Ok(ControlMessage::TrackStatusOk(TrackStatusOk {
1423                    request_id,
1424                    track_alias,
1425                    expires,
1426                    group_order,
1427                    content_exists,
1428                    largest_location,
1429                    parameters,
1430                }))
1431            }
1432            MessageType::TrackStatusError => {
1433                let request_id = VarInt::decode(buf)?;
1434                let error_code = VarInt::decode(buf)?;
1435                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1436                let reason_phrase = read_bytes(buf, reason_len)?;
1437                Ok(ControlMessage::TrackStatusError(TrackStatusError {
1438                    request_id,
1439                    error_code,
1440                    reason_phrase,
1441                }))
1442            }
1443        }
1444    }
1445
1446    /// Get the message type ID for this message.
1447    pub fn message_type(&self) -> MessageType {
1448        match self {
1449            ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
1450            ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
1451            ControlMessage::GoAway(_) => MessageType::GoAway,
1452            ControlMessage::MaxRequestId(_) => MessageType::MaxRequestId,
1453            ControlMessage::RequestsBlocked(_) => MessageType::RequestsBlocked,
1454            ControlMessage::Subscribe(_) => MessageType::Subscribe,
1455            ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
1456            ControlMessage::SubscribeError(_) => MessageType::SubscribeError,
1457            ControlMessage::SubscribeUpdate(_) => MessageType::SubscribeUpdate,
1458            ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
1459            ControlMessage::Publish(_) => MessageType::Publish,
1460            ControlMessage::PublishOk(_) => MessageType::PublishOk,
1461            ControlMessage::PublishError(_) => MessageType::PublishError,
1462            ControlMessage::PublishDone(_) => MessageType::PublishDone,
1463            ControlMessage::PublishNamespace(_) => MessageType::PublishNamespace,
1464            ControlMessage::PublishNamespaceOk(_) => MessageType::PublishNamespaceOk,
1465            ControlMessage::PublishNamespaceError(_) => MessageType::PublishNamespaceError,
1466            ControlMessage::PublishNamespaceDone(_) => MessageType::PublishNamespaceDone,
1467            ControlMessage::PublishNamespaceCancel(_) => MessageType::PublishNamespaceCancel,
1468            ControlMessage::SubscribeNamespace(_) => MessageType::SubscribeNamespace,
1469            ControlMessage::SubscribeNamespaceOk(_) => MessageType::SubscribeNamespaceOk,
1470            ControlMessage::SubscribeNamespaceError(_) => MessageType::SubscribeNamespaceError,
1471            ControlMessage::UnsubscribeNamespace(_) => MessageType::UnsubscribeNamespace,
1472            ControlMessage::Fetch(_) => MessageType::Fetch,
1473            ControlMessage::FetchOk(_) => MessageType::FetchOk,
1474            ControlMessage::FetchError(_) => MessageType::FetchError,
1475            ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
1476            ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
1477            ControlMessage::TrackStatusOk(_) => MessageType::TrackStatusOk,
1478            ControlMessage::TrackStatusError(_) => MessageType::TrackStatusError,
1479        }
1480    }
1481}