Skip to main content

moqtap_codec/draft12/
message.rs

1//! Draft-12 control message encoding and decoding.
2//!
3//! Key changes from draft-11:
4//! - Subscribe: `track_alias` removed (moved to SubscribeOk)
5//! - SubscribeOk: `track_alias` added (after request_id)
6//! - SubscribeError: trailing `track_alias` removed
7//! - New messages: Publish (0x1D), PublishOk (0x1E), PublishError (0x1F)
8//! - Same message type IDs for all other messages
9//! - Same framing: type_id(vi) + payload_length(16) + payload
10//! - Same even/odd KVP encoding
11
12pub use crate::error::{
13    CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_NAMESPACE_TUPLE_SIZE,
14    MAX_REASON_PHRASE_LENGTH,
15};
16use crate::kvp::KeyValuePair;
17use crate::types::{self, *};
18use crate::varint::VarInt;
19use bytes::{Buf, BufMut};
20
21/// Control message type IDs (draft-12).
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23#[repr(u64)]
24pub enum MessageType {
25    SubscribeUpdate = 0x02,
26    Subscribe = 0x03,
27    SubscribeOk = 0x04,
28    SubscribeError = 0x05,
29    Announce = 0x06,
30    AnnounceOk = 0x07,
31    AnnounceError = 0x08,
32    Unannounce = 0x09,
33    Unsubscribe = 0x0A,
34    SubscribeDone = 0x0B,
35    AnnounceCancel = 0x0C,
36    TrackStatusRequest = 0x0D,
37    TrackStatus = 0x0E,
38    GoAway = 0x10,
39    SubscribeAnnounces = 0x11,
40    SubscribeAnnouncesOk = 0x12,
41    SubscribeAnnouncesError = 0x13,
42    UnsubscribeAnnounces = 0x14,
43    MaxRequestId = 0x15,
44    Fetch = 0x16,
45    FetchCancel = 0x17,
46    FetchOk = 0x18,
47    FetchError = 0x19,
48    RequestsBlocked = 0x1A,
49    Publish = 0x1D,
50    PublishOk = 0x1E,
51    PublishError = 0x1F,
52    ClientSetup = 0x20,
53    ServerSetup = 0x21,
54}
55
56impl MessageType {
57    pub fn from_id(id: u64) -> Option<Self> {
58        match id {
59            0x02 => Some(MessageType::SubscribeUpdate),
60            0x03 => Some(MessageType::Subscribe),
61            0x04 => Some(MessageType::SubscribeOk),
62            0x05 => Some(MessageType::SubscribeError),
63            0x06 => Some(MessageType::Announce),
64            0x07 => Some(MessageType::AnnounceOk),
65            0x08 => Some(MessageType::AnnounceError),
66            0x09 => Some(MessageType::Unannounce),
67            0x0A => Some(MessageType::Unsubscribe),
68            0x0B => Some(MessageType::SubscribeDone),
69            0x0C => Some(MessageType::AnnounceCancel),
70            0x0D => Some(MessageType::TrackStatusRequest),
71            0x0E => Some(MessageType::TrackStatus),
72            0x10 => Some(MessageType::GoAway),
73            0x11 => Some(MessageType::SubscribeAnnounces),
74            0x12 => Some(MessageType::SubscribeAnnouncesOk),
75            0x13 => Some(MessageType::SubscribeAnnouncesError),
76            0x14 => Some(MessageType::UnsubscribeAnnounces),
77            0x15 => Some(MessageType::MaxRequestId),
78            0x16 => Some(MessageType::Fetch),
79            0x17 => Some(MessageType::FetchCancel),
80            0x18 => Some(MessageType::FetchOk),
81            0x19 => Some(MessageType::FetchError),
82            0x1A => Some(MessageType::RequestsBlocked),
83            0x1D => Some(MessageType::Publish),
84            0x1E => Some(MessageType::PublishOk),
85            0x1F => Some(MessageType::PublishError),
86            0x20 => Some(MessageType::ClientSetup),
87            0x21 => Some(MessageType::ServerSetup),
88            _ => None,
89        }
90    }
91
92    pub fn id(&self) -> u64 {
93        *self as u64
94    }
95}
96
97// ============================================================
98// Session Lifecycle Messages
99// ============================================================
100
101#[derive(Debug, Clone, PartialEq, Eq)]
102pub struct ClientSetup {
103    pub supported_versions: Vec<VarInt>,
104    pub parameters: Vec<KeyValuePair>,
105}
106
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct ServerSetup {
109    pub selected_version: VarInt,
110    pub parameters: Vec<KeyValuePair>,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct GoAway {
115    pub new_session_uri: Vec<u8>,
116}
117
118#[derive(Debug, Clone, PartialEq, Eq)]
119pub struct MaxRequestId {
120    pub request_id: VarInt,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq)]
124pub struct RequestsBlocked {
125    pub maximum_request_id: VarInt,
126}
127
128// ============================================================
129// Subscribe Messages
130// ============================================================
131
132/// SUBSCRIBE message (type 0x03). Draft-12: no track_alias (moved to SubscribeOk).
133#[derive(Debug, Clone, PartialEq, Eq)]
134pub struct Subscribe {
135    pub request_id: VarInt,
136    pub track_namespace: TrackNamespace,
137    pub track_name: Vec<u8>,
138    pub subscriber_priority: u8,
139    pub group_order: VarInt,
140    pub forward: VarInt,
141    pub filter_type: VarInt,
142    pub start_group: Option<VarInt>,
143    pub start_object: Option<VarInt>,
144    pub end_group: Option<VarInt>,
145    pub parameters: Vec<KeyValuePair>,
146}
147
148/// SUBSCRIBE_OK message (type 0x04). Draft-12: gains track_alias.
149#[derive(Debug, Clone, PartialEq, Eq)]
150pub struct SubscribeOk {
151    pub request_id: VarInt,
152    pub track_alias: VarInt,
153    pub expires: VarInt,
154    pub group_order: VarInt,
155    pub content_exists: VarInt,
156    pub largest_location: Option<Location>,
157    pub parameters: Vec<KeyValuePair>,
158}
159
160/// SUBSCRIBE_ERROR message (type 0x05). Draft-12: no trailing track_alias.
161#[derive(Debug, Clone, PartialEq, Eq)]
162pub struct SubscribeError {
163    pub request_id: VarInt,
164    pub error_code: VarInt,
165    pub reason_phrase: Vec<u8>,
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
169pub struct SubscribeUpdate {
170    pub request_id: VarInt,
171    pub start_group: VarInt,
172    pub start_object: VarInt,
173    pub end_group: VarInt,
174    pub subscriber_priority: u8,
175    pub forward: VarInt,
176    pub parameters: Vec<KeyValuePair>,
177}
178
179#[derive(Debug, Clone, PartialEq, Eq)]
180pub struct SubscribeDone {
181    pub request_id: VarInt,
182    pub status_code: VarInt,
183    pub stream_count: VarInt,
184    pub reason_phrase: Vec<u8>,
185}
186
187#[derive(Debug, Clone, PartialEq, Eq)]
188pub struct Unsubscribe {
189    pub request_id: VarInt,
190}
191
192// ============================================================
193// Announce Messages
194// ============================================================
195
196#[derive(Debug, Clone, PartialEq, Eq)]
197pub struct Announce {
198    pub request_id: VarInt,
199    pub track_namespace: TrackNamespace,
200    pub parameters: Vec<KeyValuePair>,
201}
202
203#[derive(Debug, Clone, PartialEq, Eq)]
204pub struct AnnounceOk {
205    pub request_id: VarInt,
206}
207
208#[derive(Debug, Clone, PartialEq, Eq)]
209pub struct AnnounceError {
210    pub request_id: VarInt,
211    pub error_code: VarInt,
212    pub reason_phrase: Vec<u8>,
213}
214
215#[derive(Debug, Clone, PartialEq, Eq)]
216pub struct AnnounceCancel {
217    pub track_namespace: TrackNamespace,
218    pub error_code: VarInt,
219    pub reason_phrase: Vec<u8>,
220}
221
222#[derive(Debug, Clone, PartialEq, Eq)]
223pub struct Unannounce {
224    pub track_namespace: TrackNamespace,
225}
226
227// ============================================================
228// Subscribe Announces Messages
229// ============================================================
230
231#[derive(Debug, Clone, PartialEq, Eq)]
232pub struct SubscribeAnnounces {
233    pub request_id: VarInt,
234    pub track_namespace_prefix: TrackNamespace,
235    pub parameters: Vec<KeyValuePair>,
236}
237
238#[derive(Debug, Clone, PartialEq, Eq)]
239pub struct SubscribeAnnouncesOk {
240    pub request_id: VarInt,
241}
242
243#[derive(Debug, Clone, PartialEq, Eq)]
244pub struct SubscribeAnnouncesError {
245    pub request_id: VarInt,
246    pub error_code: VarInt,
247    pub reason_phrase: Vec<u8>,
248}
249
250#[derive(Debug, Clone, PartialEq, Eq)]
251pub struct UnsubscribeAnnounces {
252    pub track_namespace_prefix: TrackNamespace,
253}
254
255// ============================================================
256// Track Status Messages
257// ============================================================
258
259#[derive(Debug, Clone, PartialEq, Eq)]
260pub struct TrackStatusRequest {
261    pub request_id: VarInt,
262    pub track_namespace: TrackNamespace,
263    pub track_name: Vec<u8>,
264    pub parameters: Vec<KeyValuePair>,
265}
266
267#[derive(Debug, Clone, PartialEq, Eq)]
268pub struct TrackStatus {
269    pub request_id: VarInt,
270    pub status_code: VarInt,
271    pub largest_location: Location,
272    pub parameters: Vec<KeyValuePair>,
273}
274
275// ============================================================
276// Fetch Messages
277// ============================================================
278
279#[derive(Debug, Clone, Copy, PartialEq, Eq)]
280#[repr(u64)]
281pub enum FetchType {
282    Standalone = 1,
283    RelativeJoining = 2,
284    AbsoluteJoining = 3,
285}
286
287impl FetchType {
288    pub fn from_u64(v: u64) -> Option<Self> {
289        match v {
290            1 => Some(FetchType::Standalone),
291            2 => Some(FetchType::RelativeJoining),
292            3 => Some(FetchType::AbsoluteJoining),
293            _ => None,
294        }
295    }
296}
297
298#[derive(Debug, Clone, PartialEq, Eq)]
299pub struct Fetch {
300    pub request_id: VarInt,
301    pub subscriber_priority: u8,
302    pub group_order: VarInt,
303    pub fetch_type: FetchType,
304    pub fetch_payload: FetchPayload,
305    pub parameters: Vec<KeyValuePair>,
306}
307
308#[derive(Debug, Clone, PartialEq, Eq)]
309pub enum FetchPayload {
310    Standalone {
311        track_namespace: TrackNamespace,
312        track_name: Vec<u8>,
313        start_group: VarInt,
314        start_object: VarInt,
315        end_group: VarInt,
316        end_object: VarInt,
317    },
318    Joining {
319        joining_subscribe_id: VarInt,
320        joining_start: VarInt,
321    },
322}
323
324#[derive(Debug, Clone, PartialEq, Eq)]
325pub struct FetchOk {
326    pub request_id: VarInt,
327    pub group_order: VarInt,
328    pub end_of_track: VarInt,
329    pub end_location: Location,
330    pub parameters: Vec<KeyValuePair>,
331}
332
333#[derive(Debug, Clone, PartialEq, Eq)]
334pub struct FetchError {
335    pub request_id: VarInt,
336    pub error_code: VarInt,
337    pub reason_phrase: Vec<u8>,
338}
339
340#[derive(Debug, Clone, PartialEq, Eq)]
341pub struct FetchCancel {
342    pub request_id: VarInt,
343}
344
345// ============================================================
346// Publish Messages (NEW in draft-12)
347// ============================================================
348
349/// PUBLISH message (type 0x1D).
350#[derive(Debug, Clone, PartialEq, Eq)]
351pub struct Publish {
352    pub request_id: VarInt,
353    pub track_namespace: TrackNamespace,
354    pub track_name: Vec<u8>,
355    pub track_alias: VarInt,
356    pub group_order: VarInt,
357    pub content_exists: VarInt,
358    pub largest_location: Option<Location>,
359    pub forward: VarInt,
360    pub parameters: Vec<KeyValuePair>,
361}
362
363/// PUBLISH_OK message (type 0x1E).
364#[derive(Debug, Clone, PartialEq, Eq)]
365pub struct PublishOk {
366    pub request_id: VarInt,
367    pub forward: VarInt,
368    pub subscriber_priority: u8,
369    pub group_order: VarInt,
370    pub filter_type: VarInt,
371    pub start_group: Option<VarInt>,
372    pub start_object: Option<VarInt>,
373    pub end_group: Option<VarInt>,
374    pub parameters: Vec<KeyValuePair>,
375}
376
377/// PUBLISH_ERROR message (type 0x1F).
378#[derive(Debug, Clone, PartialEq, Eq)]
379pub struct PublishError {
380    pub request_id: VarInt,
381    pub error_code: VarInt,
382    pub reason_phrase: Vec<u8>,
383}
384
385// ============================================================
386// Unified Message Enum
387// ============================================================
388
389#[derive(Debug, Clone, PartialEq, Eq)]
390pub enum ControlMessage {
391    ClientSetup(ClientSetup),
392    ServerSetup(ServerSetup),
393    GoAway(GoAway),
394    MaxRequestId(MaxRequestId),
395    RequestsBlocked(RequestsBlocked),
396    Subscribe(Subscribe),
397    SubscribeOk(SubscribeOk),
398    SubscribeError(SubscribeError),
399    SubscribeUpdate(SubscribeUpdate),
400    SubscribeDone(SubscribeDone),
401    Unsubscribe(Unsubscribe),
402    Announce(Announce),
403    AnnounceOk(AnnounceOk),
404    AnnounceError(AnnounceError),
405    AnnounceCancel(AnnounceCancel),
406    Unannounce(Unannounce),
407    SubscribeAnnounces(SubscribeAnnounces),
408    SubscribeAnnouncesOk(SubscribeAnnouncesOk),
409    SubscribeAnnouncesError(SubscribeAnnouncesError),
410    UnsubscribeAnnounces(UnsubscribeAnnounces),
411    TrackStatusRequest(TrackStatusRequest),
412    TrackStatus(TrackStatus),
413    Fetch(Fetch),
414    FetchOk(FetchOk),
415    FetchError(FetchError),
416    FetchCancel(FetchCancel),
417    Publish(Publish),
418    PublishOk(PublishOk),
419    PublishError(PublishError),
420}
421
422impl ControlMessage {
423    /// Encode this control message to bytes.
424    ///
425    /// Draft-12 framing: type_id(vi) + payload_length(16) + payload.
426    pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
427        let mut payload = Vec::with_capacity(256);
428        self.encode_payload(&mut payload)?;
429
430        if payload.len() > MAX_MESSAGE_LENGTH {
431            return Err(CodecError::MessageTooLong(payload.len()));
432        }
433
434        VarInt::from_usize(self.message_type().id() as usize).encode(buf);
435        // Draft-12: 16-bit length (big-endian)
436        buf.put_u16(payload.len() as u16);
437        buf.put_slice(&payload);
438        Ok(())
439    }
440
441    /// Decode a control message from bytes.
442    ///
443    /// Draft-12 framing: type_id(vi) + payload_length(16) + payload.
444    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
445        let type_id = VarInt::decode(buf)?.into_inner();
446        let msg_type =
447            MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
448        // Draft-12: 16-bit length (big-endian)
449        if buf.remaining() < 2 {
450            return Err(CodecError::UnexpectedEnd);
451        }
452        let payload_len = buf.get_u16() as usize;
453        if buf.remaining() < payload_len {
454            return Err(CodecError::UnexpectedEnd);
455        }
456        let payload_bytes = buf.copy_to_bytes(payload_len);
457        let mut payload = &payload_bytes[..];
458        Self::decode_payload(msg_type, &mut payload)
459    }
460
461    fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
462        match self {
463            ControlMessage::ClientSetup(m) => {
464                VarInt::from_usize(m.supported_versions.len()).encode(buf);
465                for v in &m.supported_versions {
466                    v.encode(buf);
467                }
468                KeyValuePair::encode_list(&m.parameters, buf);
469            }
470            ControlMessage::ServerSetup(m) => {
471                m.selected_version.encode(buf);
472                KeyValuePair::encode_list(&m.parameters, buf);
473            }
474            ControlMessage::GoAway(m) => {
475                if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
476                    return Err(CodecError::GoAwayUriTooLong);
477                }
478                VarInt::from_usize(m.new_session_uri.len()).encode(buf);
479                buf.put_slice(&m.new_session_uri);
480            }
481            ControlMessage::MaxRequestId(m) => {
482                m.request_id.encode(buf);
483            }
484            ControlMessage::RequestsBlocked(m) => {
485                m.maximum_request_id.encode(buf);
486            }
487            ControlMessage::Subscribe(m) => {
488                m.request_id.encode(buf);
489                m.track_namespace.encode(buf);
490                VarInt::from_usize(m.track_name.len()).encode(buf);
491                buf.put_slice(&m.track_name);
492                buf.put_u8(m.subscriber_priority);
493                m.group_order.encode(buf);
494                m.forward.encode(buf);
495                m.filter_type.encode(buf);
496                if let Some(sg) = &m.start_group {
497                    sg.encode(buf);
498                }
499                if let Some(so) = &m.start_object {
500                    so.encode(buf);
501                }
502                if let Some(eg) = &m.end_group {
503                    eg.encode(buf);
504                }
505                KeyValuePair::encode_list(&m.parameters, buf);
506            }
507            ControlMessage::SubscribeOk(m) => {
508                m.request_id.encode(buf);
509                m.track_alias.encode(buf);
510                m.expires.encode(buf);
511                m.group_order.encode(buf);
512                m.content_exists.encode(buf);
513                if let Some(loc) = &m.largest_location {
514                    loc.encode(buf);
515                }
516                KeyValuePair::encode_list(&m.parameters, buf);
517            }
518            ControlMessage::SubscribeError(m) => {
519                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
520                    return Err(CodecError::ReasonPhraseTooLong);
521                }
522                m.request_id.encode(buf);
523                m.error_code.encode(buf);
524                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
525                buf.put_slice(&m.reason_phrase);
526            }
527            ControlMessage::SubscribeUpdate(m) => {
528                m.request_id.encode(buf);
529                m.start_group.encode(buf);
530                m.start_object.encode(buf);
531                m.end_group.encode(buf);
532                buf.put_u8(m.subscriber_priority);
533                m.forward.encode(buf);
534                KeyValuePair::encode_list(&m.parameters, buf);
535            }
536            ControlMessage::SubscribeDone(m) => {
537                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
538                    return Err(CodecError::ReasonPhraseTooLong);
539                }
540                m.request_id.encode(buf);
541                m.status_code.encode(buf);
542                m.stream_count.encode(buf);
543                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
544                buf.put_slice(&m.reason_phrase);
545            }
546            ControlMessage::Unsubscribe(m) => {
547                m.request_id.encode(buf);
548            }
549            ControlMessage::Announce(m) => {
550                m.request_id.encode(buf);
551                m.track_namespace.encode(buf);
552                KeyValuePair::encode_list(&m.parameters, buf);
553            }
554            ControlMessage::AnnounceOk(m) => {
555                m.request_id.encode(buf);
556            }
557            ControlMessage::AnnounceError(m) => {
558                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
559                    return Err(CodecError::ReasonPhraseTooLong);
560                }
561                m.request_id.encode(buf);
562                m.error_code.encode(buf);
563                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
564                buf.put_slice(&m.reason_phrase);
565            }
566            ControlMessage::AnnounceCancel(m) => {
567                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
568                    return Err(CodecError::ReasonPhraseTooLong);
569                }
570                m.track_namespace.encode(buf);
571                m.error_code.encode(buf);
572                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
573                buf.put_slice(&m.reason_phrase);
574            }
575            ControlMessage::Unannounce(m) => {
576                m.track_namespace.encode(buf);
577            }
578            ControlMessage::SubscribeAnnounces(m) => {
579                m.request_id.encode(buf);
580                m.track_namespace_prefix.encode(buf);
581                KeyValuePair::encode_list(&m.parameters, buf);
582            }
583            ControlMessage::SubscribeAnnouncesOk(m) => {
584                m.request_id.encode(buf);
585            }
586            ControlMessage::SubscribeAnnouncesError(m) => {
587                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
588                    return Err(CodecError::ReasonPhraseTooLong);
589                }
590                m.request_id.encode(buf);
591                m.error_code.encode(buf);
592                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
593                buf.put_slice(&m.reason_phrase);
594            }
595            ControlMessage::UnsubscribeAnnounces(m) => {
596                m.track_namespace_prefix.encode(buf);
597            }
598            ControlMessage::TrackStatusRequest(m) => {
599                m.request_id.encode(buf);
600                m.track_namespace.encode(buf);
601                VarInt::from_usize(m.track_name.len()).encode(buf);
602                buf.put_slice(&m.track_name);
603                KeyValuePair::encode_list(&m.parameters, buf);
604            }
605            ControlMessage::TrackStatus(m) => {
606                m.request_id.encode(buf);
607                m.status_code.encode(buf);
608                m.largest_location.encode(buf);
609                KeyValuePair::encode_list(&m.parameters, buf);
610            }
611            ControlMessage::Fetch(m) => {
612                m.request_id.encode(buf);
613                buf.put_u8(m.subscriber_priority);
614                m.group_order.encode(buf);
615                VarInt::from_usize(m.fetch_type as usize).encode(buf);
616                match &m.fetch_payload {
617                    FetchPayload::Standalone {
618                        track_namespace,
619                        track_name,
620                        start_group,
621                        start_object,
622                        end_group,
623                        end_object,
624                    } => {
625                        track_namespace.encode(buf);
626                        VarInt::from_usize(track_name.len()).encode(buf);
627                        buf.put_slice(track_name);
628                        start_group.encode(buf);
629                        start_object.encode(buf);
630                        end_group.encode(buf);
631                        end_object.encode(buf);
632                    }
633                    FetchPayload::Joining { joining_subscribe_id, joining_start } => {
634                        joining_subscribe_id.encode(buf);
635                        joining_start.encode(buf);
636                    }
637                }
638                KeyValuePair::encode_list(&m.parameters, buf);
639            }
640            ControlMessage::FetchOk(m) => {
641                m.request_id.encode(buf);
642                m.group_order.encode(buf);
643                m.end_of_track.encode(buf);
644                m.end_location.encode(buf);
645                KeyValuePair::encode_list(&m.parameters, buf);
646            }
647            ControlMessage::FetchError(m) => {
648                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
649                    return Err(CodecError::ReasonPhraseTooLong);
650                }
651                m.request_id.encode(buf);
652                m.error_code.encode(buf);
653                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
654                buf.put_slice(&m.reason_phrase);
655            }
656            ControlMessage::FetchCancel(m) => {
657                m.request_id.encode(buf);
658            }
659            ControlMessage::Publish(m) => {
660                m.request_id.encode(buf);
661                m.track_namespace.encode(buf);
662                VarInt::from_usize(m.track_name.len()).encode(buf);
663                buf.put_slice(&m.track_name);
664                m.track_alias.encode(buf);
665                m.group_order.encode(buf);
666                m.content_exists.encode(buf);
667                if let Some(loc) = &m.largest_location {
668                    loc.encode(buf);
669                }
670                m.forward.encode(buf);
671                KeyValuePair::encode_list(&m.parameters, buf);
672            }
673            ControlMessage::PublishOk(m) => {
674                m.request_id.encode(buf);
675                m.forward.encode(buf);
676                buf.put_u8(m.subscriber_priority);
677                m.group_order.encode(buf);
678                m.filter_type.encode(buf);
679                if let Some(sg) = &m.start_group {
680                    sg.encode(buf);
681                }
682                if let Some(so) = &m.start_object {
683                    so.encode(buf);
684                }
685                if let Some(eg) = &m.end_group {
686                    eg.encode(buf);
687                }
688                KeyValuePair::encode_list(&m.parameters, buf);
689            }
690            ControlMessage::PublishError(m) => {
691                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
692                    return Err(CodecError::ReasonPhraseTooLong);
693                }
694                m.request_id.encode(buf);
695                m.error_code.encode(buf);
696                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
697                buf.put_slice(&m.reason_phrase);
698            }
699        }
700        Ok(())
701    }
702
703    fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
704        match msg_type {
705            MessageType::ClientSetup => {
706                let num_versions = VarInt::decode(buf)?.into_inner() as usize;
707                if num_versions == 0 {
708                    return Err(CodecError::InvalidField);
709                }
710                let mut supported_versions = Vec::with_capacity(num_versions);
711                for _ in 0..num_versions {
712                    supported_versions.push(VarInt::decode(buf)?);
713                }
714                let parameters = KeyValuePair::decode_list(buf)?;
715                Ok(ControlMessage::ClientSetup(ClientSetup { supported_versions, parameters }))
716            }
717            MessageType::ServerSetup => {
718                let selected_version = VarInt::decode(buf)?;
719                let parameters = KeyValuePair::decode_list(buf)?;
720                Ok(ControlMessage::ServerSetup(ServerSetup { selected_version, parameters }))
721            }
722            MessageType::GoAway => {
723                let uri_len = VarInt::decode(buf)?.into_inner() as usize;
724                let uri = types::read_bytes(buf, uri_len)?;
725                Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
726            }
727            MessageType::MaxRequestId => {
728                let request_id = VarInt::decode(buf)?;
729                Ok(ControlMessage::MaxRequestId(MaxRequestId { request_id }))
730            }
731            MessageType::RequestsBlocked => {
732                let maximum_request_id = VarInt::decode(buf)?;
733                Ok(ControlMessage::RequestsBlocked(RequestsBlocked { maximum_request_id }))
734            }
735            MessageType::Subscribe => {
736                let request_id = VarInt::decode(buf)?;
737                let track_namespace = TrackNamespace::decode(buf)?;
738                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
739                let track_name = types::read_bytes(buf, track_name_len)?;
740                if buf.remaining() < 1 {
741                    return Err(CodecError::UnexpectedEnd);
742                }
743                let subscriber_priority = buf.get_u8();
744                let group_order = VarInt::decode(buf)?;
745                let forward = VarInt::decode(buf)?;
746                let filter_type = VarInt::decode(buf)?;
747                let ft_val = filter_type.into_inner();
748                if ft_val == 0 || ft_val > 4 {
749                    return Err(CodecError::InvalidField);
750                }
751                let (start_group, start_object) = if ft_val == 3 || ft_val == 4 {
752                    (Some(VarInt::decode(buf)?), Some(VarInt::decode(buf)?))
753                } else {
754                    (None, None)
755                };
756                let end_group = if ft_val == 4 { Some(VarInt::decode(buf)?) } else { None };
757                let parameters = KeyValuePair::decode_list(buf)?;
758                Ok(ControlMessage::Subscribe(Subscribe {
759                    request_id,
760                    track_namespace,
761                    track_name,
762                    subscriber_priority,
763                    group_order,
764                    forward,
765                    filter_type,
766                    start_group,
767                    start_object,
768                    end_group,
769                    parameters,
770                }))
771            }
772            MessageType::SubscribeOk => {
773                let request_id = VarInt::decode(buf)?;
774                let track_alias = VarInt::decode(buf)?;
775                let expires = VarInt::decode(buf)?;
776                let group_order = VarInt::decode(buf)?;
777                let content_exists = VarInt::decode(buf)?;
778                let largest_location = if content_exists.into_inner() != 0 {
779                    Some(Location::decode(buf)?)
780                } else {
781                    None
782                };
783                let parameters = KeyValuePair::decode_list(buf)?;
784                Ok(ControlMessage::SubscribeOk(SubscribeOk {
785                    request_id,
786                    track_alias,
787                    expires,
788                    group_order,
789                    content_exists,
790                    largest_location,
791                    parameters,
792                }))
793            }
794            MessageType::SubscribeError => {
795                let request_id = VarInt::decode(buf)?;
796                let error_code = VarInt::decode(buf)?;
797                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
798                let reason_phrase = types::read_bytes(buf, reason_len)?;
799                Ok(ControlMessage::SubscribeError(SubscribeError {
800                    request_id,
801                    error_code,
802                    reason_phrase,
803                }))
804            }
805            MessageType::SubscribeUpdate => {
806                let request_id = VarInt::decode(buf)?;
807                let start_group = VarInt::decode(buf)?;
808                let start_object = VarInt::decode(buf)?;
809                let end_group = VarInt::decode(buf)?;
810                if buf.remaining() < 1 {
811                    return Err(CodecError::UnexpectedEnd);
812                }
813                let subscriber_priority = buf.get_u8();
814                let forward = VarInt::decode(buf)?;
815                let parameters = KeyValuePair::decode_list(buf)?;
816                Ok(ControlMessage::SubscribeUpdate(SubscribeUpdate {
817                    request_id,
818                    start_group,
819                    start_object,
820                    end_group,
821                    subscriber_priority,
822                    forward,
823                    parameters,
824                }))
825            }
826            MessageType::SubscribeDone => {
827                let request_id = VarInt::decode(buf)?;
828                let status_code = VarInt::decode(buf)?;
829                let stream_count = VarInt::decode(buf)?;
830                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
831                let reason_phrase = types::read_bytes(buf, reason_len)?;
832                Ok(ControlMessage::SubscribeDone(SubscribeDone {
833                    request_id,
834                    status_code,
835                    stream_count,
836                    reason_phrase,
837                }))
838            }
839            MessageType::Unsubscribe => {
840                let request_id = VarInt::decode(buf)?;
841                Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
842            }
843            MessageType::Announce => {
844                let request_id = VarInt::decode(buf)?;
845                let track_namespace = TrackNamespace::decode(buf)?;
846                let parameters = KeyValuePair::decode_list(buf)?;
847                Ok(ControlMessage::Announce(Announce { request_id, track_namespace, parameters }))
848            }
849            MessageType::AnnounceOk => {
850                let request_id = VarInt::decode(buf)?;
851                Ok(ControlMessage::AnnounceOk(AnnounceOk { request_id }))
852            }
853            MessageType::AnnounceError => {
854                let request_id = VarInt::decode(buf)?;
855                let error_code = VarInt::decode(buf)?;
856                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
857                let reason_phrase = types::read_bytes(buf, reason_len)?;
858                Ok(ControlMessage::AnnounceError(AnnounceError {
859                    request_id,
860                    error_code,
861                    reason_phrase,
862                }))
863            }
864            MessageType::AnnounceCancel => {
865                let track_namespace = TrackNamespace::decode(buf)?;
866                let error_code = VarInt::decode(buf)?;
867                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
868                let reason_phrase = types::read_bytes(buf, reason_len)?;
869                Ok(ControlMessage::AnnounceCancel(AnnounceCancel {
870                    track_namespace,
871                    error_code,
872                    reason_phrase,
873                }))
874            }
875            MessageType::Unannounce => {
876                let track_namespace = TrackNamespace::decode(buf)?;
877                Ok(ControlMessage::Unannounce(Unannounce { track_namespace }))
878            }
879            MessageType::SubscribeAnnounces => {
880                let request_id = VarInt::decode(buf)?;
881                let track_namespace_prefix = TrackNamespace::decode(buf)?;
882                let parameters = KeyValuePair::decode_list(buf)?;
883                Ok(ControlMessage::SubscribeAnnounces(SubscribeAnnounces {
884                    request_id,
885                    track_namespace_prefix,
886                    parameters,
887                }))
888            }
889            MessageType::SubscribeAnnouncesOk => {
890                let request_id = VarInt::decode(buf)?;
891                Ok(ControlMessage::SubscribeAnnouncesOk(SubscribeAnnouncesOk { request_id }))
892            }
893            MessageType::SubscribeAnnouncesError => {
894                let request_id = VarInt::decode(buf)?;
895                let error_code = VarInt::decode(buf)?;
896                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
897                let reason_phrase = types::read_bytes(buf, reason_len)?;
898                Ok(ControlMessage::SubscribeAnnouncesError(SubscribeAnnouncesError {
899                    request_id,
900                    error_code,
901                    reason_phrase,
902                }))
903            }
904            MessageType::UnsubscribeAnnounces => {
905                let track_namespace_prefix = TrackNamespace::decode(buf)?;
906                Ok(ControlMessage::UnsubscribeAnnounces(UnsubscribeAnnounces {
907                    track_namespace_prefix,
908                }))
909            }
910            MessageType::TrackStatusRequest => {
911                let request_id = VarInt::decode(buf)?;
912                let track_namespace = TrackNamespace::decode(buf)?;
913                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
914                let track_name = types::read_bytes(buf, track_name_len)?;
915                let parameters = KeyValuePair::decode_list(buf)?;
916                Ok(ControlMessage::TrackStatusRequest(TrackStatusRequest {
917                    request_id,
918                    track_namespace,
919                    track_name,
920                    parameters,
921                }))
922            }
923            MessageType::TrackStatus => {
924                let request_id = VarInt::decode(buf)?;
925                let status_code = VarInt::decode(buf)?;
926                let largest_location = Location::decode(buf)?;
927                let parameters = KeyValuePair::decode_list(buf)?;
928                Ok(ControlMessage::TrackStatus(TrackStatus {
929                    request_id,
930                    status_code,
931                    largest_location,
932                    parameters,
933                }))
934            }
935            MessageType::Fetch => {
936                let request_id = VarInt::decode(buf)?;
937                if buf.remaining() < 1 {
938                    return Err(CodecError::UnexpectedEnd);
939                }
940                let subscriber_priority = buf.get_u8();
941                let group_order = VarInt::decode(buf)?;
942                let fetch_type_val = VarInt::decode(buf)?.into_inner();
943                let fetch_type =
944                    FetchType::from_u64(fetch_type_val).ok_or(CodecError::InvalidField)?;
945                let fetch_payload = match fetch_type {
946                    FetchType::Standalone => {
947                        let track_namespace = TrackNamespace::decode(buf)?;
948                        let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
949                        let track_name = types::read_bytes(buf, track_name_len)?;
950                        let start_group = VarInt::decode(buf)?;
951                        let start_object = VarInt::decode(buf)?;
952                        let end_group = VarInt::decode(buf)?;
953                        let end_object = VarInt::decode(buf)?;
954                        FetchPayload::Standalone {
955                            track_namespace,
956                            track_name,
957                            start_group,
958                            start_object,
959                            end_group,
960                            end_object,
961                        }
962                    }
963                    FetchType::RelativeJoining | FetchType::AbsoluteJoining => {
964                        let joining_subscribe_id = VarInt::decode(buf)?;
965                        let joining_start = VarInt::decode(buf)?;
966                        FetchPayload::Joining { joining_subscribe_id, joining_start }
967                    }
968                };
969                let parameters = KeyValuePair::decode_list(buf)?;
970                Ok(ControlMessage::Fetch(Fetch {
971                    request_id,
972                    subscriber_priority,
973                    group_order,
974                    fetch_type,
975                    fetch_payload,
976                    parameters,
977                }))
978            }
979            MessageType::FetchOk => {
980                let request_id = VarInt::decode(buf)?;
981                let group_order = VarInt::decode(buf)?;
982                let end_of_track = VarInt::decode(buf)?;
983                let end_location = Location::decode(buf)?;
984                let parameters = KeyValuePair::decode_list(buf)?;
985                Ok(ControlMessage::FetchOk(FetchOk {
986                    request_id,
987                    group_order,
988                    end_of_track,
989                    end_location,
990                    parameters,
991                }))
992            }
993            MessageType::FetchError => {
994                let request_id = VarInt::decode(buf)?;
995                let error_code = VarInt::decode(buf)?;
996                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
997                let reason_phrase = types::read_bytes(buf, reason_len)?;
998                Ok(ControlMessage::FetchError(FetchError { request_id, error_code, reason_phrase }))
999            }
1000            MessageType::FetchCancel => {
1001                let request_id = VarInt::decode(buf)?;
1002                Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
1003            }
1004            MessageType::Publish => {
1005                let request_id = VarInt::decode(buf)?;
1006                let track_namespace = TrackNamespace::decode(buf)?;
1007                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1008                let track_name = types::read_bytes(buf, track_name_len)?;
1009                let track_alias = VarInt::decode(buf)?;
1010                let group_order = VarInt::decode(buf)?;
1011                let content_exists = VarInt::decode(buf)?;
1012                let largest_location = if content_exists.into_inner() != 0 {
1013                    Some(Location::decode(buf)?)
1014                } else {
1015                    None
1016                };
1017                let forward = VarInt::decode(buf)?;
1018                let parameters = KeyValuePair::decode_list(buf)?;
1019                Ok(ControlMessage::Publish(Publish {
1020                    request_id,
1021                    track_namespace,
1022                    track_name,
1023                    track_alias,
1024                    group_order,
1025                    content_exists,
1026                    largest_location,
1027                    forward,
1028                    parameters,
1029                }))
1030            }
1031            MessageType::PublishOk => {
1032                let request_id = VarInt::decode(buf)?;
1033                let forward = VarInt::decode(buf)?;
1034                if buf.remaining() < 1 {
1035                    return Err(CodecError::UnexpectedEnd);
1036                }
1037                let subscriber_priority = buf.get_u8();
1038                let group_order = VarInt::decode(buf)?;
1039                let filter_type = VarInt::decode(buf)?;
1040                let ft_val = filter_type.into_inner();
1041                if ft_val == 0 || ft_val > 4 {
1042                    return Err(CodecError::InvalidField);
1043                }
1044                let (start_group, start_object) = if ft_val == 3 || ft_val == 4 {
1045                    (Some(VarInt::decode(buf)?), Some(VarInt::decode(buf)?))
1046                } else {
1047                    (None, None)
1048                };
1049                let end_group = if ft_val == 4 { Some(VarInt::decode(buf)?) } else { None };
1050                let parameters = KeyValuePair::decode_list(buf)?;
1051                Ok(ControlMessage::PublishOk(PublishOk {
1052                    request_id,
1053                    forward,
1054                    subscriber_priority,
1055                    group_order,
1056                    filter_type,
1057                    start_group,
1058                    start_object,
1059                    end_group,
1060                    parameters,
1061                }))
1062            }
1063            MessageType::PublishError => {
1064                let request_id = VarInt::decode(buf)?;
1065                let error_code = VarInt::decode(buf)?;
1066                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1067                let reason_phrase = types::read_bytes(buf, reason_len)?;
1068                Ok(ControlMessage::PublishError(PublishError {
1069                    request_id,
1070                    error_code,
1071                    reason_phrase,
1072                }))
1073            }
1074        }
1075    }
1076
1077    pub fn message_type(&self) -> MessageType {
1078        match self {
1079            ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
1080            ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
1081            ControlMessage::GoAway(_) => MessageType::GoAway,
1082            ControlMessage::MaxRequestId(_) => MessageType::MaxRequestId,
1083            ControlMessage::RequestsBlocked(_) => MessageType::RequestsBlocked,
1084            ControlMessage::Subscribe(_) => MessageType::Subscribe,
1085            ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
1086            ControlMessage::SubscribeError(_) => MessageType::SubscribeError,
1087            ControlMessage::SubscribeUpdate(_) => MessageType::SubscribeUpdate,
1088            ControlMessage::SubscribeDone(_) => MessageType::SubscribeDone,
1089            ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
1090            ControlMessage::Announce(_) => MessageType::Announce,
1091            ControlMessage::AnnounceOk(_) => MessageType::AnnounceOk,
1092            ControlMessage::AnnounceError(_) => MessageType::AnnounceError,
1093            ControlMessage::AnnounceCancel(_) => MessageType::AnnounceCancel,
1094            ControlMessage::Unannounce(_) => MessageType::Unannounce,
1095            ControlMessage::SubscribeAnnounces(_) => MessageType::SubscribeAnnounces,
1096            ControlMessage::SubscribeAnnouncesOk(_) => MessageType::SubscribeAnnouncesOk,
1097            ControlMessage::SubscribeAnnouncesError(_) => MessageType::SubscribeAnnouncesError,
1098            ControlMessage::UnsubscribeAnnounces(_) => MessageType::UnsubscribeAnnounces,
1099            ControlMessage::TrackStatusRequest(_) => MessageType::TrackStatusRequest,
1100            ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
1101            ControlMessage::Fetch(_) => MessageType::Fetch,
1102            ControlMessage::FetchOk(_) => MessageType::FetchOk,
1103            ControlMessage::FetchError(_) => MessageType::FetchError,
1104            ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
1105            ControlMessage::Publish(_) => MessageType::Publish,
1106            ControlMessage::PublishOk(_) => MessageType::PublishOk,
1107            ControlMessage::PublishError(_) => MessageType::PublishError,
1108        }
1109    }
1110}