Skip to main content

moqtap_codec/draft13/
message.rs

1//! Draft-13 control message encoding and decoding.
2//!
3//! Key changes from draft-12:
4//! - `subscribe_announces` → `subscribe_namespace` (same IDs 0x11–0x14)
5//! - TrackStatusRequest (0x0D) → TrackStatus: subscribe-like request with
6//!   subscriber_priority, group_order, forward, filter_type
7//! - TrackStatus (0x0E) → TrackStatusOk: subscribe_ok-like response with
8//!   track_alias, expires, group_order, content_exists, largest_location
9//! - New TrackStatusError (0x0F): request_id + error_code + reason_phrase
10//! - Publish/PublishOk/PublishError (0x1D-0x1F) added
11
12pub use crate::error::{
13    CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_NAMESPACE_TUPLE_SIZE,
14    MAX_REASON_PHRASE_LENGTH,
15};
16use crate::kvp::KeyValuePair;
17use crate::types::*;
18use crate::varint::VarInt;
19use bytes::{Buf, BufMut};
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22#[repr(u64)]
23pub enum MessageType {
24    SubscribeUpdate = 0x02,
25    Subscribe = 0x03,
26    SubscribeOk = 0x04,
27    SubscribeError = 0x05,
28    Announce = 0x06,
29    AnnounceOk = 0x07,
30    AnnounceError = 0x08,
31    Unannounce = 0x09,
32    Unsubscribe = 0x0A,
33    SubscribeDone = 0x0B,
34    AnnounceCancel = 0x0C,
35    TrackStatus = 0x0D,
36    TrackStatusOk = 0x0E,
37    TrackStatusError = 0x0F,
38    GoAway = 0x10,
39    SubscribeNamespace = 0x11,
40    SubscribeNamespaceOk = 0x12,
41    SubscribeNamespaceError = 0x13,
42    UnsubscribeNamespace = 0x14,
43    MaxRequestId = 0x15,
44    Fetch = 0x16,
45    FetchCancel = 0x17,
46    FetchOk = 0x18,
47    FetchError = 0x19,
48    RequestsBlocked = 0x1A,
49    Publish = 0x1D,
50    PublishOk = 0x1E,
51    PublishError = 0x1F,
52    ClientSetup = 0x20,
53    ServerSetup = 0x21,
54}
55
56impl MessageType {
57    pub fn from_id(id: u64) -> Option<Self> {
58        match id {
59            0x02 => Some(MessageType::SubscribeUpdate),
60            0x03 => Some(MessageType::Subscribe),
61            0x04 => Some(MessageType::SubscribeOk),
62            0x05 => Some(MessageType::SubscribeError),
63            0x06 => Some(MessageType::Announce),
64            0x07 => Some(MessageType::AnnounceOk),
65            0x08 => Some(MessageType::AnnounceError),
66            0x09 => Some(MessageType::Unannounce),
67            0x0A => Some(MessageType::Unsubscribe),
68            0x0B => Some(MessageType::SubscribeDone),
69            0x0C => Some(MessageType::AnnounceCancel),
70            0x0D => Some(MessageType::TrackStatus),
71            0x0E => Some(MessageType::TrackStatusOk),
72            0x0F => Some(MessageType::TrackStatusError),
73            0x10 => Some(MessageType::GoAway),
74            0x11 => Some(MessageType::SubscribeNamespace),
75            0x12 => Some(MessageType::SubscribeNamespaceOk),
76            0x13 => Some(MessageType::SubscribeNamespaceError),
77            0x14 => Some(MessageType::UnsubscribeNamespace),
78            0x15 => Some(MessageType::MaxRequestId),
79            0x16 => Some(MessageType::Fetch),
80            0x17 => Some(MessageType::FetchCancel),
81            0x18 => Some(MessageType::FetchOk),
82            0x19 => Some(MessageType::FetchError),
83            0x1A => Some(MessageType::RequestsBlocked),
84            0x1D => Some(MessageType::Publish),
85            0x1E => Some(MessageType::PublishOk),
86            0x1F => Some(MessageType::PublishError),
87            0x20 => Some(MessageType::ClientSetup),
88            0x21 => Some(MessageType::ServerSetup),
89            _ => None,
90        }
91    }
92
93    pub fn id(&self) -> u64 {
94        *self as u64
95    }
96}
97
98// ============================================================
99// Session Lifecycle Messages
100// ============================================================
101
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct ClientSetup {
104    pub supported_versions: Vec<VarInt>,
105    pub parameters: Vec<KeyValuePair>,
106}
107
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct ServerSetup {
110    pub selected_version: VarInt,
111    pub parameters: Vec<KeyValuePair>,
112}
113
114#[derive(Debug, Clone, PartialEq, Eq)]
115pub struct GoAway {
116    pub new_session_uri: Vec<u8>,
117}
118
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub struct MaxRequestId {
121    pub request_id: VarInt,
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub struct RequestsBlocked {
126    pub maximum_request_id: VarInt,
127}
128
129// ============================================================
130// Subscribe Messages
131// ============================================================
132
133#[derive(Debug, Clone, PartialEq, Eq)]
134pub struct Subscribe {
135    pub request_id: VarInt,
136    pub track_namespace: TrackNamespace,
137    pub track_name: Vec<u8>,
138    pub subscriber_priority: u8,
139    pub group_order: VarInt,
140    pub forward: VarInt,
141    pub filter_type: VarInt,
142    pub start_group: Option<VarInt>,
143    pub start_object: Option<VarInt>,
144    pub end_group: Option<VarInt>,
145    pub parameters: Vec<KeyValuePair>,
146}
147
148#[derive(Debug, Clone, PartialEq, Eq)]
149pub struct SubscribeOk {
150    pub request_id: VarInt,
151    pub track_alias: VarInt,
152    pub expires: VarInt,
153    pub group_order: VarInt,
154    pub content_exists: VarInt,
155    pub largest_location: Option<Location>,
156    pub parameters: Vec<KeyValuePair>,
157}
158
159#[derive(Debug, Clone, PartialEq, Eq)]
160pub struct SubscribeError {
161    pub request_id: VarInt,
162    pub error_code: VarInt,
163    pub reason_phrase: Vec<u8>,
164}
165
166#[derive(Debug, Clone, PartialEq, Eq)]
167pub struct SubscribeUpdate {
168    pub request_id: VarInt,
169    pub start_group: VarInt,
170    pub start_object: VarInt,
171    pub end_group: VarInt,
172    pub subscriber_priority: u8,
173    pub forward: VarInt,
174    pub parameters: Vec<KeyValuePair>,
175}
176
177#[derive(Debug, Clone, PartialEq, Eq)]
178pub struct SubscribeDone {
179    pub request_id: VarInt,
180    pub status_code: VarInt,
181    pub stream_count: VarInt,
182    pub reason_phrase: Vec<u8>,
183}
184
185#[derive(Debug, Clone, PartialEq, Eq)]
186pub struct Unsubscribe {
187    pub request_id: VarInt,
188}
189
190// ============================================================
191// Announce Messages
192// ============================================================
193
194#[derive(Debug, Clone, PartialEq, Eq)]
195pub struct Announce {
196    pub request_id: VarInt,
197    pub track_namespace: TrackNamespace,
198    pub parameters: Vec<KeyValuePair>,
199}
200
201#[derive(Debug, Clone, PartialEq, Eq)]
202pub struct AnnounceOk {
203    pub request_id: VarInt,
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
207pub struct AnnounceError {
208    pub request_id: VarInt,
209    pub error_code: VarInt,
210    pub reason_phrase: Vec<u8>,
211}
212
213#[derive(Debug, Clone, PartialEq, Eq)]
214pub struct AnnounceCancel {
215    pub track_namespace: TrackNamespace,
216    pub error_code: VarInt,
217    pub reason_phrase: Vec<u8>,
218}
219
220#[derive(Debug, Clone, PartialEq, Eq)]
221pub struct Unannounce {
222    pub track_namespace: TrackNamespace,
223}
224
225// ============================================================
226// Subscribe Namespace Messages (renamed from Subscribe Announces)
227// ============================================================
228
229#[derive(Debug, Clone, PartialEq, Eq)]
230pub struct SubscribeNamespace {
231    pub request_id: VarInt,
232    pub track_namespace_prefix: TrackNamespace,
233    pub parameters: Vec<KeyValuePair>,
234}
235
236#[derive(Debug, Clone, PartialEq, Eq)]
237pub struct SubscribeNamespaceOk {
238    pub request_id: VarInt,
239}
240
241#[derive(Debug, Clone, PartialEq, Eq)]
242pub struct SubscribeNamespaceError {
243    pub request_id: VarInt,
244    pub error_code: VarInt,
245    pub reason_phrase: Vec<u8>,
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
249pub struct UnsubscribeNamespace {
250    pub track_namespace_prefix: TrackNamespace,
251}
252
253// ============================================================
254// Track Status Messages (restructured from draft-12)
255// ============================================================
256
257/// TRACK_STATUS message (type 0x0D). Draft-13: subscribe-like request.
258#[derive(Debug, Clone, PartialEq, Eq)]
259pub struct TrackStatus {
260    pub request_id: VarInt,
261    pub track_namespace: TrackNamespace,
262    pub track_name: Vec<u8>,
263    pub subscriber_priority: u8,
264    pub group_order: VarInt,
265    pub forward: VarInt,
266    pub filter_type: VarInt,
267    pub parameters: Vec<KeyValuePair>,
268}
269
270/// TRACK_STATUS_OK message (type 0x0E). Draft-13: subscribe_ok-like response.
271#[derive(Debug, Clone, PartialEq, Eq)]
272pub struct TrackStatusOk {
273    pub request_id: VarInt,
274    pub track_alias: VarInt,
275    pub expires: VarInt,
276    pub group_order: VarInt,
277    pub content_exists: VarInt,
278    pub largest_location: Option<Location>,
279    pub parameters: Vec<KeyValuePair>,
280}
281
282/// TRACK_STATUS_ERROR message (type 0x0F). New in draft-13.
283#[derive(Debug, Clone, PartialEq, Eq)]
284pub struct TrackStatusError {
285    pub request_id: VarInt,
286    pub error_code: VarInt,
287    pub reason_phrase: Vec<u8>,
288}
289
290// ============================================================
291// Fetch Messages
292// ============================================================
293
294#[derive(Debug, Clone, Copy, PartialEq, Eq)]
295#[repr(u64)]
296pub enum FetchType {
297    Standalone = 1,
298    RelativeJoining = 2,
299    AbsoluteJoining = 3,
300}
301
302impl FetchType {
303    pub fn from_u64(v: u64) -> Option<Self> {
304        match v {
305            1 => Some(FetchType::Standalone),
306            2 => Some(FetchType::RelativeJoining),
307            3 => Some(FetchType::AbsoluteJoining),
308            _ => None,
309        }
310    }
311}
312
313#[derive(Debug, Clone, PartialEq, Eq)]
314pub struct Fetch {
315    pub request_id: VarInt,
316    pub subscriber_priority: u8,
317    pub group_order: VarInt,
318    pub fetch_type: FetchType,
319    pub fetch_payload: FetchPayload,
320    pub parameters: Vec<KeyValuePair>,
321}
322
323#[derive(Debug, Clone, PartialEq, Eq)]
324pub enum FetchPayload {
325    Standalone {
326        track_namespace: TrackNamespace,
327        track_name: Vec<u8>,
328        start_group: VarInt,
329        start_object: VarInt,
330        end_group: VarInt,
331        end_object: VarInt,
332    },
333    Joining {
334        joining_subscribe_id: VarInt,
335        joining_start: VarInt,
336    },
337}
338
339#[derive(Debug, Clone, PartialEq, Eq)]
340pub struct FetchOk {
341    pub request_id: VarInt,
342    pub group_order: VarInt,
343    pub end_of_track: VarInt,
344    pub end_location: Location,
345    pub parameters: Vec<KeyValuePair>,
346}
347
348#[derive(Debug, Clone, PartialEq, Eq)]
349pub struct FetchError {
350    pub request_id: VarInt,
351    pub error_code: VarInt,
352    pub reason_phrase: Vec<u8>,
353}
354
355#[derive(Debug, Clone, PartialEq, Eq)]
356pub struct FetchCancel {
357    pub request_id: VarInt,
358}
359
360// ============================================================
361// Publish Messages
362// ============================================================
363
364#[derive(Debug, Clone, PartialEq, Eq)]
365pub struct Publish {
366    pub request_id: VarInt,
367    pub track_namespace: TrackNamespace,
368    pub track_name: Vec<u8>,
369    pub track_alias: VarInt,
370    pub group_order: VarInt,
371    pub content_exists: VarInt,
372    pub largest_location: Option<Location>,
373    pub forward: VarInt,
374    pub parameters: Vec<KeyValuePair>,
375}
376
377#[derive(Debug, Clone, PartialEq, Eq)]
378pub struct PublishOk {
379    pub request_id: VarInt,
380    pub forward: VarInt,
381    pub subscriber_priority: u8,
382    pub group_order: VarInt,
383    pub filter_type: VarInt,
384    pub start_group: Option<VarInt>,
385    pub start_object: Option<VarInt>,
386    pub end_group: Option<VarInt>,
387    pub parameters: Vec<KeyValuePair>,
388}
389
390#[derive(Debug, Clone, PartialEq, Eq)]
391pub struct PublishError {
392    pub request_id: VarInt,
393    pub error_code: VarInt,
394    pub reason_phrase: Vec<u8>,
395}
396
397// ============================================================
398// Unified Message Enum
399// ============================================================
400
401#[derive(Debug, Clone, PartialEq, Eq)]
402pub enum ControlMessage {
403    ClientSetup(ClientSetup),
404    ServerSetup(ServerSetup),
405    GoAway(GoAway),
406    MaxRequestId(MaxRequestId),
407    RequestsBlocked(RequestsBlocked),
408    Subscribe(Subscribe),
409    SubscribeOk(SubscribeOk),
410    SubscribeError(SubscribeError),
411    SubscribeUpdate(SubscribeUpdate),
412    SubscribeDone(SubscribeDone),
413    Unsubscribe(Unsubscribe),
414    Announce(Announce),
415    AnnounceOk(AnnounceOk),
416    AnnounceError(AnnounceError),
417    AnnounceCancel(AnnounceCancel),
418    Unannounce(Unannounce),
419    SubscribeNamespace(SubscribeNamespace),
420    SubscribeNamespaceOk(SubscribeNamespaceOk),
421    SubscribeNamespaceError(SubscribeNamespaceError),
422    UnsubscribeNamespace(UnsubscribeNamespace),
423    TrackStatus(TrackStatus),
424    TrackStatusOk(TrackStatusOk),
425    TrackStatusError(TrackStatusError),
426    Fetch(Fetch),
427    FetchOk(FetchOk),
428    FetchError(FetchError),
429    FetchCancel(FetchCancel),
430    Publish(Publish),
431    PublishOk(PublishOk),
432    PublishError(PublishError),
433}
434
435impl ControlMessage {
436    /// Encode this control message to bytes.
437    ///
438    /// Draft-13 framing: type_id(vi) + payload_length(16) + payload.
439    pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
440        let mut payload = Vec::with_capacity(256);
441        self.encode_payload(&mut payload)?;
442
443        if payload.len() > MAX_MESSAGE_LENGTH {
444            return Err(CodecError::MessageTooLong(payload.len()));
445        }
446
447        VarInt::from_usize(self.message_type().id() as usize).encode(buf);
448        // Draft-13: 16-bit length (big-endian)
449        buf.put_u16(payload.len() as u16);
450        buf.put_slice(&payload);
451        Ok(())
452    }
453
454    /// Decode a control message from bytes.
455    ///
456    /// Draft-13 framing: type_id(vi) + payload_length(16) + payload.
457    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
458        let type_id = VarInt::decode(buf)?.into_inner();
459        let msg_type =
460            MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
461        // Draft-13: 16-bit length (big-endian)
462        if buf.remaining() < 2 {
463            return Err(CodecError::UnexpectedEnd);
464        }
465        let payload_len = buf.get_u16() as usize;
466        if buf.remaining() < payload_len {
467            return Err(CodecError::UnexpectedEnd);
468        }
469        let payload_bytes = buf.copy_to_bytes(payload_len);
470        let mut payload = &payload_bytes[..];
471        Self::decode_payload(msg_type, &mut payload)
472    }
473
474    fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
475        match self {
476            ControlMessage::ClientSetup(m) => {
477                VarInt::from_usize(m.supported_versions.len()).encode(buf);
478                for v in &m.supported_versions {
479                    v.encode(buf);
480                }
481                KeyValuePair::encode_list(&m.parameters, buf);
482            }
483            ControlMessage::ServerSetup(m) => {
484                m.selected_version.encode(buf);
485                KeyValuePair::encode_list(&m.parameters, buf);
486            }
487            ControlMessage::GoAway(m) => {
488                if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
489                    return Err(CodecError::GoAwayUriTooLong);
490                }
491                VarInt::from_usize(m.new_session_uri.len()).encode(buf);
492                buf.put_slice(&m.new_session_uri);
493            }
494            ControlMessage::MaxRequestId(m) => {
495                m.request_id.encode(buf);
496            }
497            ControlMessage::RequestsBlocked(m) => {
498                m.maximum_request_id.encode(buf);
499            }
500            ControlMessage::Subscribe(m) => {
501                m.request_id.encode(buf);
502                m.track_namespace.encode(buf);
503                VarInt::from_usize(m.track_name.len()).encode(buf);
504                buf.put_slice(&m.track_name);
505                buf.put_u8(m.subscriber_priority);
506                m.group_order.encode(buf);
507                m.forward.encode(buf);
508                m.filter_type.encode(buf);
509                if let Some(sg) = &m.start_group {
510                    sg.encode(buf);
511                }
512                if let Some(so) = &m.start_object {
513                    so.encode(buf);
514                }
515                if let Some(eg) = &m.end_group {
516                    eg.encode(buf);
517                }
518                KeyValuePair::encode_list(&m.parameters, buf);
519            }
520            ControlMessage::SubscribeOk(m) => {
521                m.request_id.encode(buf);
522                m.track_alias.encode(buf);
523                m.expires.encode(buf);
524                m.group_order.encode(buf);
525                m.content_exists.encode(buf);
526                if let Some(loc) = &m.largest_location {
527                    loc.encode(buf);
528                }
529                KeyValuePair::encode_list(&m.parameters, buf);
530            }
531            ControlMessage::SubscribeError(m) => {
532                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
533                    return Err(CodecError::ReasonPhraseTooLong);
534                }
535                m.request_id.encode(buf);
536                m.error_code.encode(buf);
537                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
538                buf.put_slice(&m.reason_phrase);
539            }
540            ControlMessage::SubscribeUpdate(m) => {
541                m.request_id.encode(buf);
542                m.start_group.encode(buf);
543                m.start_object.encode(buf);
544                m.end_group.encode(buf);
545                buf.put_u8(m.subscriber_priority);
546                m.forward.encode(buf);
547                KeyValuePair::encode_list(&m.parameters, buf);
548            }
549            ControlMessage::SubscribeDone(m) => {
550                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
551                    return Err(CodecError::ReasonPhraseTooLong);
552                }
553                m.request_id.encode(buf);
554                m.status_code.encode(buf);
555                m.stream_count.encode(buf);
556                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
557                buf.put_slice(&m.reason_phrase);
558            }
559            ControlMessage::Unsubscribe(m) => {
560                m.request_id.encode(buf);
561            }
562            ControlMessage::Announce(m) => {
563                m.request_id.encode(buf);
564                m.track_namespace.encode(buf);
565                KeyValuePair::encode_list(&m.parameters, buf);
566            }
567            ControlMessage::AnnounceOk(m) => {
568                m.request_id.encode(buf);
569            }
570            ControlMessage::AnnounceError(m) => {
571                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
572                    return Err(CodecError::ReasonPhraseTooLong);
573                }
574                m.request_id.encode(buf);
575                m.error_code.encode(buf);
576                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
577                buf.put_slice(&m.reason_phrase);
578            }
579            ControlMessage::AnnounceCancel(m) => {
580                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
581                    return Err(CodecError::ReasonPhraseTooLong);
582                }
583                m.track_namespace.encode(buf);
584                m.error_code.encode(buf);
585                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
586                buf.put_slice(&m.reason_phrase);
587            }
588            ControlMessage::Unannounce(m) => {
589                m.track_namespace.encode(buf);
590            }
591            ControlMessage::SubscribeNamespace(m) => {
592                m.request_id.encode(buf);
593                m.track_namespace_prefix.encode(buf);
594                KeyValuePair::encode_list(&m.parameters, buf);
595            }
596            ControlMessage::SubscribeNamespaceOk(m) => {
597                m.request_id.encode(buf);
598            }
599            ControlMessage::SubscribeNamespaceError(m) => {
600                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
601                    return Err(CodecError::ReasonPhraseTooLong);
602                }
603                m.request_id.encode(buf);
604                m.error_code.encode(buf);
605                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
606                buf.put_slice(&m.reason_phrase);
607            }
608            ControlMessage::UnsubscribeNamespace(m) => {
609                m.track_namespace_prefix.encode(buf);
610            }
611            ControlMessage::TrackStatus(m) => {
612                m.request_id.encode(buf);
613                m.track_namespace.encode(buf);
614                VarInt::from_usize(m.track_name.len()).encode(buf);
615                buf.put_slice(&m.track_name);
616                buf.put_u8(m.subscriber_priority);
617                m.group_order.encode(buf);
618                m.forward.encode(buf);
619                m.filter_type.encode(buf);
620                KeyValuePair::encode_list(&m.parameters, buf);
621            }
622            ControlMessage::TrackStatusOk(m) => {
623                m.request_id.encode(buf);
624                m.track_alias.encode(buf);
625                m.expires.encode(buf);
626                m.group_order.encode(buf);
627                m.content_exists.encode(buf);
628                if let Some(loc) = &m.largest_location {
629                    loc.encode(buf);
630                }
631                KeyValuePair::encode_list(&m.parameters, buf);
632            }
633            ControlMessage::TrackStatusError(m) => {
634                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
635                    return Err(CodecError::ReasonPhraseTooLong);
636                }
637                m.request_id.encode(buf);
638                m.error_code.encode(buf);
639                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
640                buf.put_slice(&m.reason_phrase);
641            }
642            ControlMessage::Fetch(m) => {
643                m.request_id.encode(buf);
644                buf.put_u8(m.subscriber_priority);
645                m.group_order.encode(buf);
646                VarInt::from_usize(m.fetch_type as usize).encode(buf);
647                match &m.fetch_payload {
648                    FetchPayload::Standalone {
649                        track_namespace,
650                        track_name,
651                        start_group,
652                        start_object,
653                        end_group,
654                        end_object,
655                    } => {
656                        track_namespace.encode(buf);
657                        VarInt::from_usize(track_name.len()).encode(buf);
658                        buf.put_slice(track_name);
659                        start_group.encode(buf);
660                        start_object.encode(buf);
661                        end_group.encode(buf);
662                        end_object.encode(buf);
663                    }
664                    FetchPayload::Joining { joining_subscribe_id, joining_start } => {
665                        joining_subscribe_id.encode(buf);
666                        joining_start.encode(buf);
667                    }
668                }
669                KeyValuePair::encode_list(&m.parameters, buf);
670            }
671            ControlMessage::FetchOk(m) => {
672                m.request_id.encode(buf);
673                m.group_order.encode(buf);
674                m.end_of_track.encode(buf);
675                m.end_location.encode(buf);
676                KeyValuePair::encode_list(&m.parameters, buf);
677            }
678            ControlMessage::FetchError(m) => {
679                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
680                    return Err(CodecError::ReasonPhraseTooLong);
681                }
682                m.request_id.encode(buf);
683                m.error_code.encode(buf);
684                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
685                buf.put_slice(&m.reason_phrase);
686            }
687            ControlMessage::FetchCancel(m) => {
688                m.request_id.encode(buf);
689            }
690            ControlMessage::Publish(m) => {
691                m.request_id.encode(buf);
692                m.track_namespace.encode(buf);
693                VarInt::from_usize(m.track_name.len()).encode(buf);
694                buf.put_slice(&m.track_name);
695                m.track_alias.encode(buf);
696                m.group_order.encode(buf);
697                m.content_exists.encode(buf);
698                if let Some(loc) = &m.largest_location {
699                    loc.encode(buf);
700                }
701                m.forward.encode(buf);
702                KeyValuePair::encode_list(&m.parameters, buf);
703            }
704            ControlMessage::PublishOk(m) => {
705                m.request_id.encode(buf);
706                m.forward.encode(buf);
707                buf.put_u8(m.subscriber_priority);
708                m.group_order.encode(buf);
709                m.filter_type.encode(buf);
710                if let Some(sg) = &m.start_group {
711                    sg.encode(buf);
712                }
713                if let Some(so) = &m.start_object {
714                    so.encode(buf);
715                }
716                if let Some(eg) = &m.end_group {
717                    eg.encode(buf);
718                }
719                KeyValuePair::encode_list(&m.parameters, buf);
720            }
721            ControlMessage::PublishError(m) => {
722                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
723                    return Err(CodecError::ReasonPhraseTooLong);
724                }
725                m.request_id.encode(buf);
726                m.error_code.encode(buf);
727                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
728                buf.put_slice(&m.reason_phrase);
729            }
730        }
731        Ok(())
732    }
733
734    fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
735        match msg_type {
736            MessageType::ClientSetup => {
737                let num_versions = VarInt::decode(buf)?.into_inner() as usize;
738                if num_versions == 0 {
739                    return Err(CodecError::InvalidField);
740                }
741                let mut supported_versions = Vec::with_capacity(num_versions);
742                for _ in 0..num_versions {
743                    supported_versions.push(VarInt::decode(buf)?);
744                }
745                let parameters = KeyValuePair::decode_list(buf)?;
746                Ok(ControlMessage::ClientSetup(ClientSetup { supported_versions, parameters }))
747            }
748            MessageType::ServerSetup => {
749                let selected_version = VarInt::decode(buf)?;
750                let parameters = KeyValuePair::decode_list(buf)?;
751                Ok(ControlMessage::ServerSetup(ServerSetup { selected_version, parameters }))
752            }
753            MessageType::GoAway => {
754                let uri_len = VarInt::decode(buf)?.into_inner() as usize;
755                let uri = read_bytes(buf, uri_len)?;
756                Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
757            }
758            MessageType::MaxRequestId => {
759                let request_id = VarInt::decode(buf)?;
760                Ok(ControlMessage::MaxRequestId(MaxRequestId { request_id }))
761            }
762            MessageType::RequestsBlocked => {
763                let maximum_request_id = VarInt::decode(buf)?;
764                Ok(ControlMessage::RequestsBlocked(RequestsBlocked { maximum_request_id }))
765            }
766            MessageType::Subscribe => {
767                let request_id = VarInt::decode(buf)?;
768                let track_namespace = TrackNamespace::decode(buf)?;
769                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
770                let track_name = read_bytes(buf, track_name_len)?;
771                if buf.remaining() < 1 {
772                    return Err(CodecError::UnexpectedEnd);
773                }
774                let subscriber_priority = buf.get_u8();
775                let group_order = VarInt::decode(buf)?;
776                let forward = VarInt::decode(buf)?;
777                let filter_type = VarInt::decode(buf)?;
778                let ft_val = filter_type.into_inner();
779                if ft_val == 0 || ft_val > 4 {
780                    return Err(CodecError::InvalidField);
781                }
782                let (start_group, start_object) = if ft_val == 3 || ft_val == 4 {
783                    (Some(VarInt::decode(buf)?), Some(VarInt::decode(buf)?))
784                } else {
785                    (None, None)
786                };
787                let end_group = if ft_val == 4 { Some(VarInt::decode(buf)?) } else { None };
788                let parameters = KeyValuePair::decode_list(buf)?;
789                Ok(ControlMessage::Subscribe(Subscribe {
790                    request_id,
791                    track_namespace,
792                    track_name,
793                    subscriber_priority,
794                    group_order,
795                    forward,
796                    filter_type,
797                    start_group,
798                    start_object,
799                    end_group,
800                    parameters,
801                }))
802            }
803            MessageType::SubscribeOk => {
804                let request_id = VarInt::decode(buf)?;
805                let track_alias = VarInt::decode(buf)?;
806                let expires = VarInt::decode(buf)?;
807                let group_order = VarInt::decode(buf)?;
808                let content_exists = VarInt::decode(buf)?;
809                let largest_location = if content_exists.into_inner() != 0 {
810                    Some(Location::decode(buf)?)
811                } else {
812                    None
813                };
814                let parameters = KeyValuePair::decode_list(buf)?;
815                Ok(ControlMessage::SubscribeOk(SubscribeOk {
816                    request_id,
817                    track_alias,
818                    expires,
819                    group_order,
820                    content_exists,
821                    largest_location,
822                    parameters,
823                }))
824            }
825            MessageType::SubscribeError => {
826                let request_id = VarInt::decode(buf)?;
827                let error_code = VarInt::decode(buf)?;
828                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
829                let reason_phrase = read_bytes(buf, reason_len)?;
830                Ok(ControlMessage::SubscribeError(SubscribeError {
831                    request_id,
832                    error_code,
833                    reason_phrase,
834                }))
835            }
836            MessageType::SubscribeUpdate => {
837                let request_id = VarInt::decode(buf)?;
838                let start_group = VarInt::decode(buf)?;
839                let start_object = VarInt::decode(buf)?;
840                let end_group = VarInt::decode(buf)?;
841                if buf.remaining() < 1 {
842                    return Err(CodecError::UnexpectedEnd);
843                }
844                let subscriber_priority = buf.get_u8();
845                let forward = VarInt::decode(buf)?;
846                let parameters = KeyValuePair::decode_list(buf)?;
847                Ok(ControlMessage::SubscribeUpdate(SubscribeUpdate {
848                    request_id,
849                    start_group,
850                    start_object,
851                    end_group,
852                    subscriber_priority,
853                    forward,
854                    parameters,
855                }))
856            }
857            MessageType::SubscribeDone => {
858                let request_id = VarInt::decode(buf)?;
859                let status_code = VarInt::decode(buf)?;
860                let stream_count = VarInt::decode(buf)?;
861                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
862                let reason_phrase = read_bytes(buf, reason_len)?;
863                Ok(ControlMessage::SubscribeDone(SubscribeDone {
864                    request_id,
865                    status_code,
866                    stream_count,
867                    reason_phrase,
868                }))
869            }
870            MessageType::Unsubscribe => {
871                let request_id = VarInt::decode(buf)?;
872                Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
873            }
874            MessageType::Announce => {
875                let request_id = VarInt::decode(buf)?;
876                let track_namespace = TrackNamespace::decode(buf)?;
877                let parameters = KeyValuePair::decode_list(buf)?;
878                Ok(ControlMessage::Announce(Announce { request_id, track_namespace, parameters }))
879            }
880            MessageType::AnnounceOk => {
881                let request_id = VarInt::decode(buf)?;
882                Ok(ControlMessage::AnnounceOk(AnnounceOk { request_id }))
883            }
884            MessageType::AnnounceError => {
885                let request_id = VarInt::decode(buf)?;
886                let error_code = VarInt::decode(buf)?;
887                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
888                let reason_phrase = read_bytes(buf, reason_len)?;
889                Ok(ControlMessage::AnnounceError(AnnounceError {
890                    request_id,
891                    error_code,
892                    reason_phrase,
893                }))
894            }
895            MessageType::AnnounceCancel => {
896                let track_namespace = TrackNamespace::decode(buf)?;
897                let error_code = VarInt::decode(buf)?;
898                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
899                let reason_phrase = read_bytes(buf, reason_len)?;
900                Ok(ControlMessage::AnnounceCancel(AnnounceCancel {
901                    track_namespace,
902                    error_code,
903                    reason_phrase,
904                }))
905            }
906            MessageType::Unannounce => {
907                let track_namespace = TrackNamespace::decode(buf)?;
908                Ok(ControlMessage::Unannounce(Unannounce { track_namespace }))
909            }
910            MessageType::SubscribeNamespace => {
911                let request_id = VarInt::decode(buf)?;
912                let track_namespace_prefix = TrackNamespace::decode(buf)?;
913                let parameters = KeyValuePair::decode_list(buf)?;
914                Ok(ControlMessage::SubscribeNamespace(SubscribeNamespace {
915                    request_id,
916                    track_namespace_prefix,
917                    parameters,
918                }))
919            }
920            MessageType::SubscribeNamespaceOk => {
921                let request_id = VarInt::decode(buf)?;
922                Ok(ControlMessage::SubscribeNamespaceOk(SubscribeNamespaceOk { request_id }))
923            }
924            MessageType::SubscribeNamespaceError => {
925                let request_id = VarInt::decode(buf)?;
926                let error_code = VarInt::decode(buf)?;
927                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
928                let reason_phrase = read_bytes(buf, reason_len)?;
929                Ok(ControlMessage::SubscribeNamespaceError(SubscribeNamespaceError {
930                    request_id,
931                    error_code,
932                    reason_phrase,
933                }))
934            }
935            MessageType::UnsubscribeNamespace => {
936                let track_namespace_prefix = TrackNamespace::decode(buf)?;
937                Ok(ControlMessage::UnsubscribeNamespace(UnsubscribeNamespace {
938                    track_namespace_prefix,
939                }))
940            }
941            MessageType::TrackStatus => {
942                let request_id = VarInt::decode(buf)?;
943                let track_namespace = TrackNamespace::decode(buf)?;
944                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
945                let track_name = read_bytes(buf, track_name_len)?;
946                if buf.remaining() < 1 {
947                    return Err(CodecError::UnexpectedEnd);
948                }
949                let subscriber_priority = buf.get_u8();
950                let group_order = VarInt::decode(buf)?;
951                let forward = VarInt::decode(buf)?;
952                let filter_type = VarInt::decode(buf)?;
953                let parameters = KeyValuePair::decode_list(buf)?;
954                Ok(ControlMessage::TrackStatus(TrackStatus {
955                    request_id,
956                    track_namespace,
957                    track_name,
958                    subscriber_priority,
959                    group_order,
960                    forward,
961                    filter_type,
962                    parameters,
963                }))
964            }
965            MessageType::TrackStatusOk => {
966                let request_id = VarInt::decode(buf)?;
967                let track_alias = VarInt::decode(buf)?;
968                let expires = VarInt::decode(buf)?;
969                let group_order = VarInt::decode(buf)?;
970                let content_exists = VarInt::decode(buf)?;
971                let largest_location = if content_exists.into_inner() != 0 {
972                    Some(Location::decode(buf)?)
973                } else {
974                    None
975                };
976                let parameters = KeyValuePair::decode_list(buf)?;
977                Ok(ControlMessage::TrackStatusOk(TrackStatusOk {
978                    request_id,
979                    track_alias,
980                    expires,
981                    group_order,
982                    content_exists,
983                    largest_location,
984                    parameters,
985                }))
986            }
987            MessageType::TrackStatusError => {
988                let request_id = VarInt::decode(buf)?;
989                let error_code = VarInt::decode(buf)?;
990                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
991                let reason_phrase = read_bytes(buf, reason_len)?;
992                Ok(ControlMessage::TrackStatusError(TrackStatusError {
993                    request_id,
994                    error_code,
995                    reason_phrase,
996                }))
997            }
998            MessageType::Fetch => {
999                let request_id = VarInt::decode(buf)?;
1000                if buf.remaining() < 1 {
1001                    return Err(CodecError::UnexpectedEnd);
1002                }
1003                let subscriber_priority = buf.get_u8();
1004                let group_order = VarInt::decode(buf)?;
1005                let fetch_type_val = VarInt::decode(buf)?.into_inner();
1006                let fetch_type =
1007                    FetchType::from_u64(fetch_type_val).ok_or(CodecError::InvalidField)?;
1008                let fetch_payload = match fetch_type {
1009                    FetchType::Standalone => {
1010                        let track_namespace = TrackNamespace::decode(buf)?;
1011                        let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1012                        let track_name = read_bytes(buf, track_name_len)?;
1013                        let start_group = VarInt::decode(buf)?;
1014                        let start_object = VarInt::decode(buf)?;
1015                        let end_group = VarInt::decode(buf)?;
1016                        let end_object = VarInt::decode(buf)?;
1017                        FetchPayload::Standalone {
1018                            track_namespace,
1019                            track_name,
1020                            start_group,
1021                            start_object,
1022                            end_group,
1023                            end_object,
1024                        }
1025                    }
1026                    FetchType::RelativeJoining | FetchType::AbsoluteJoining => {
1027                        let joining_subscribe_id = VarInt::decode(buf)?;
1028                        let joining_start = VarInt::decode(buf)?;
1029                        FetchPayload::Joining { joining_subscribe_id, joining_start }
1030                    }
1031                };
1032                let parameters = KeyValuePair::decode_list(buf)?;
1033                Ok(ControlMessage::Fetch(Fetch {
1034                    request_id,
1035                    subscriber_priority,
1036                    group_order,
1037                    fetch_type,
1038                    fetch_payload,
1039                    parameters,
1040                }))
1041            }
1042            MessageType::FetchOk => {
1043                let request_id = VarInt::decode(buf)?;
1044                let group_order = VarInt::decode(buf)?;
1045                let end_of_track = VarInt::decode(buf)?;
1046                let end_location = Location::decode(buf)?;
1047                let parameters = KeyValuePair::decode_list(buf)?;
1048                Ok(ControlMessage::FetchOk(FetchOk {
1049                    request_id,
1050                    group_order,
1051                    end_of_track,
1052                    end_location,
1053                    parameters,
1054                }))
1055            }
1056            MessageType::FetchError => {
1057                let request_id = VarInt::decode(buf)?;
1058                let error_code = VarInt::decode(buf)?;
1059                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1060                let reason_phrase = read_bytes(buf, reason_len)?;
1061                Ok(ControlMessage::FetchError(FetchError { request_id, error_code, reason_phrase }))
1062            }
1063            MessageType::FetchCancel => {
1064                let request_id = VarInt::decode(buf)?;
1065                Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
1066            }
1067            MessageType::Publish => {
1068                let request_id = VarInt::decode(buf)?;
1069                let track_namespace = TrackNamespace::decode(buf)?;
1070                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1071                let track_name = read_bytes(buf, track_name_len)?;
1072                let track_alias = VarInt::decode(buf)?;
1073                let group_order = VarInt::decode(buf)?;
1074                let content_exists = VarInt::decode(buf)?;
1075                let largest_location = if content_exists.into_inner() != 0 {
1076                    Some(Location::decode(buf)?)
1077                } else {
1078                    None
1079                };
1080                let forward = VarInt::decode(buf)?;
1081                let parameters = KeyValuePair::decode_list(buf)?;
1082                Ok(ControlMessage::Publish(Publish {
1083                    request_id,
1084                    track_namespace,
1085                    track_name,
1086                    track_alias,
1087                    group_order,
1088                    content_exists,
1089                    largest_location,
1090                    forward,
1091                    parameters,
1092                }))
1093            }
1094            MessageType::PublishOk => {
1095                let request_id = VarInt::decode(buf)?;
1096                let forward = VarInt::decode(buf)?;
1097                if buf.remaining() < 1 {
1098                    return Err(CodecError::UnexpectedEnd);
1099                }
1100                let subscriber_priority = buf.get_u8();
1101                let group_order = VarInt::decode(buf)?;
1102                let filter_type = VarInt::decode(buf)?;
1103                let ft_val = filter_type.into_inner();
1104                if ft_val == 0 || ft_val > 4 {
1105                    return Err(CodecError::InvalidField);
1106                }
1107                let (start_group, start_object) = if ft_val == 3 || ft_val == 4 {
1108                    (Some(VarInt::decode(buf)?), Some(VarInt::decode(buf)?))
1109                } else {
1110                    (None, None)
1111                };
1112                let end_group = if ft_val == 4 { Some(VarInt::decode(buf)?) } else { None };
1113                let parameters = KeyValuePair::decode_list(buf)?;
1114                Ok(ControlMessage::PublishOk(PublishOk {
1115                    request_id,
1116                    forward,
1117                    subscriber_priority,
1118                    group_order,
1119                    filter_type,
1120                    start_group,
1121                    start_object,
1122                    end_group,
1123                    parameters,
1124                }))
1125            }
1126            MessageType::PublishError => {
1127                let request_id = VarInt::decode(buf)?;
1128                let error_code = VarInt::decode(buf)?;
1129                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1130                let reason_phrase = read_bytes(buf, reason_len)?;
1131                Ok(ControlMessage::PublishError(PublishError {
1132                    request_id,
1133                    error_code,
1134                    reason_phrase,
1135                }))
1136            }
1137        }
1138    }
1139
1140    pub fn message_type(&self) -> MessageType {
1141        match self {
1142            ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
1143            ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
1144            ControlMessage::GoAway(_) => MessageType::GoAway,
1145            ControlMessage::MaxRequestId(_) => MessageType::MaxRequestId,
1146            ControlMessage::RequestsBlocked(_) => MessageType::RequestsBlocked,
1147            ControlMessage::Subscribe(_) => MessageType::Subscribe,
1148            ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
1149            ControlMessage::SubscribeError(_) => MessageType::SubscribeError,
1150            ControlMessage::SubscribeUpdate(_) => MessageType::SubscribeUpdate,
1151            ControlMessage::SubscribeDone(_) => MessageType::SubscribeDone,
1152            ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
1153            ControlMessage::Announce(_) => MessageType::Announce,
1154            ControlMessage::AnnounceOk(_) => MessageType::AnnounceOk,
1155            ControlMessage::AnnounceError(_) => MessageType::AnnounceError,
1156            ControlMessage::AnnounceCancel(_) => MessageType::AnnounceCancel,
1157            ControlMessage::Unannounce(_) => MessageType::Unannounce,
1158            ControlMessage::SubscribeNamespace(_) => MessageType::SubscribeNamespace,
1159            ControlMessage::SubscribeNamespaceOk(_) => MessageType::SubscribeNamespaceOk,
1160            ControlMessage::SubscribeNamespaceError(_) => MessageType::SubscribeNamespaceError,
1161            ControlMessage::UnsubscribeNamespace(_) => MessageType::UnsubscribeNamespace,
1162            ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
1163            ControlMessage::TrackStatusOk(_) => MessageType::TrackStatusOk,
1164            ControlMessage::TrackStatusError(_) => MessageType::TrackStatusError,
1165            ControlMessage::Fetch(_) => MessageType::Fetch,
1166            ControlMessage::FetchOk(_) => MessageType::FetchOk,
1167            ControlMessage::FetchError(_) => MessageType::FetchError,
1168            ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
1169            ControlMessage::Publish(_) => MessageType::Publish,
1170            ControlMessage::PublishOk(_) => MessageType::PublishOk,
1171            ControlMessage::PublishError(_) => MessageType::PublishError,
1172        }
1173    }
1174}