Skip to main content

moqtap_codec/draft16/
message.rs

1//! Draft-16 control message encoding and decoding.
2//!
3//! Key changes from draft-15:
4//! - SubscribeUpdate → RequestUpdate, field renamed to existing_request_id
5//! - New: Namespace (0x08), NamespaceDone (0x0e) — namespace_suffix only
6//! - Removed: UnsubscribeNamespace (0x14)
7//! - RequestError gains retry_interval field
8//! - SubscribeNamespace gains subscribe_options varint
9//! - PublishNamespaceDone simplifies to just request_id
10//! - Framing: type_id(vi) + payload_length(16) + payload (same as draft-15)
11
12pub use crate::error::{
13    CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_NAMESPACE_TUPLE_SIZE,
14    MAX_REASON_PHRASE_LENGTH,
15};
16use crate::kvp::KeyValuePair;
17use crate::types::*;
18use crate::varint::VarInt;
19use bytes::{Buf, BufMut};
20
21/// Decode any remaining bytes in `buf` as a sequence of KVPs until `buf`
22/// is empty. Used for draft-16 `track_extensions` which has no explicit
23/// count — extensions simply fill the rest of the control-message payload.
24fn decode_track_extensions(buf: &mut impl Buf) -> Result<Vec<KeyValuePair>, CodecError> {
25    let mut out = Vec::new();
26    while buf.has_remaining() {
27        out.push(KeyValuePair::decode(buf)?);
28    }
29    Ok(out)
30}
31
32/// Encode `track_extensions` (each KVP back-to-back, no count prefix).
33fn encode_track_extensions(exts: &[KeyValuePair], buf: &mut impl BufMut) {
34    for kvp in exts {
35        kvp.encode(buf);
36    }
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40#[repr(u64)]
41pub enum MessageType {
42    RequestUpdate = 0x02,
43    Subscribe = 0x03,
44    SubscribeOk = 0x04,
45    RequestError = 0x05,
46    PublishNamespace = 0x06,
47    RequestOk = 0x07,
48    Namespace = 0x08,
49    PublishNamespaceDone = 0x09,
50    Unsubscribe = 0x0A,
51    PublishDone = 0x0B,
52    PublishNamespaceCancel = 0x0C,
53    TrackStatus = 0x0D,
54    NamespaceDone = 0x0E,
55    GoAway = 0x10,
56    SubscribeNamespace = 0x11,
57    MaxRequestId = 0x15,
58    Fetch = 0x16,
59    FetchCancel = 0x17,
60    FetchOk = 0x18,
61    RequestsBlocked = 0x1A,
62    Publish = 0x1D,
63    PublishOk = 0x1E,
64    ClientSetup = 0x20,
65    ServerSetup = 0x21,
66}
67
68impl MessageType {
69    pub fn from_id(id: u64) -> Option<Self> {
70        match id {
71            0x02 => Some(MessageType::RequestUpdate),
72            0x03 => Some(MessageType::Subscribe),
73            0x04 => Some(MessageType::SubscribeOk),
74            0x05 => Some(MessageType::RequestError),
75            0x06 => Some(MessageType::PublishNamespace),
76            0x07 => Some(MessageType::RequestOk),
77            0x08 => Some(MessageType::Namespace),
78            0x09 => Some(MessageType::PublishNamespaceDone),
79            0x0A => Some(MessageType::Unsubscribe),
80            0x0B => Some(MessageType::PublishDone),
81            0x0C => Some(MessageType::PublishNamespaceCancel),
82            0x0D => Some(MessageType::TrackStatus),
83            0x0E => Some(MessageType::NamespaceDone),
84            0x10 => Some(MessageType::GoAway),
85            0x11 => Some(MessageType::SubscribeNamespace),
86            0x15 => Some(MessageType::MaxRequestId),
87            0x16 => Some(MessageType::Fetch),
88            0x17 => Some(MessageType::FetchCancel),
89            0x18 => Some(MessageType::FetchOk),
90            0x1A => Some(MessageType::RequestsBlocked),
91            0x1D => Some(MessageType::Publish),
92            0x1E => Some(MessageType::PublishOk),
93            0x20 => Some(MessageType::ClientSetup),
94            0x21 => Some(MessageType::ServerSetup),
95            _ => None,
96        }
97    }
98
99    pub fn id(&self) -> u64 {
100        *self as u64
101    }
102}
103
104// ============================================================
105// Session Lifecycle Messages
106// ============================================================
107
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct ClientSetup {
110    pub parameters: Vec<KeyValuePair>,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct ServerSetup {
115    pub parameters: Vec<KeyValuePair>,
116}
117
118#[derive(Debug, Clone, PartialEq, Eq)]
119pub struct GoAway {
120    pub new_session_uri: Vec<u8>,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq)]
124pub struct MaxRequestId {
125    pub request_id: VarInt,
126}
127
128#[derive(Debug, Clone, PartialEq, Eq)]
129pub struct RequestsBlocked {
130    pub maximum_request_id: VarInt,
131}
132
133// ============================================================
134// Consolidated Response Messages
135// ============================================================
136
137#[derive(Debug, Clone, PartialEq, Eq)]
138pub struct RequestOk {
139    pub request_id: VarInt,
140    pub parameters: Vec<KeyValuePair>,
141}
142
143/// REQUEST_ERROR (0x05). Draft-16 adds retry_interval field.
144#[derive(Debug, Clone, PartialEq, Eq)]
145pub struct RequestError {
146    pub request_id: VarInt,
147    pub error_code: VarInt,
148    pub retry_interval: VarInt,
149    pub reason_phrase: Vec<u8>,
150}
151
152// ============================================================
153// Subscribe Messages
154// ============================================================
155
156#[derive(Debug, Clone, PartialEq, Eq)]
157pub struct Subscribe {
158    pub request_id: VarInt,
159    pub track_namespace: TrackNamespace,
160    pub track_name: Vec<u8>,
161    pub parameters: Vec<KeyValuePair>,
162}
163
164#[derive(Debug, Clone, PartialEq, Eq)]
165pub struct SubscribeOk {
166    pub request_id: VarInt,
167    pub track_alias: VarInt,
168    pub parameters: Vec<KeyValuePair>,
169    /// Track extensions: KVPs that follow `parameters` and continue until
170    /// the end of the control-message payload. Empty if none.
171    pub track_extensions: Vec<KeyValuePair>,
172}
173
174/// REQUEST_UPDATE (0x02). Renamed from SubscribeUpdate.
175#[derive(Debug, Clone, PartialEq, Eq)]
176pub struct RequestUpdate {
177    pub request_id: VarInt,
178    pub existing_request_id: VarInt,
179    pub parameters: Vec<KeyValuePair>,
180}
181
182#[derive(Debug, Clone, PartialEq, Eq)]
183pub struct Unsubscribe {
184    pub request_id: VarInt,
185}
186
187// ============================================================
188// Publish Messages
189// ============================================================
190
191#[derive(Debug, Clone, PartialEq, Eq)]
192pub struct Publish {
193    pub request_id: VarInt,
194    pub track_namespace: TrackNamespace,
195    pub track_name: Vec<u8>,
196    pub track_alias: VarInt,
197    pub parameters: Vec<KeyValuePair>,
198    /// Track extensions: KVPs that follow `parameters` and continue until
199    /// the end of the control-message payload. Empty if none.
200    pub track_extensions: Vec<KeyValuePair>,
201}
202
203#[derive(Debug, Clone, PartialEq, Eq)]
204pub struct PublishOk {
205    pub request_id: VarInt,
206    pub parameters: Vec<KeyValuePair>,
207}
208
209#[derive(Debug, Clone, PartialEq, Eq)]
210pub struct PublishDone {
211    pub request_id: VarInt,
212    pub status_code: VarInt,
213    pub stream_count: VarInt,
214    pub reason_phrase: Vec<u8>,
215}
216
217// ============================================================
218// Publish Namespace Messages
219// ============================================================
220
221#[derive(Debug, Clone, PartialEq, Eq)]
222pub struct PublishNamespace {
223    pub request_id: VarInt,
224    pub track_namespace: TrackNamespace,
225    pub parameters: Vec<KeyValuePair>,
226}
227
228/// PUBLISH_NAMESPACE_DONE (0x09). Draft-16: just request_id (was namespace in d15).
229#[derive(Debug, Clone, PartialEq, Eq)]
230pub struct PublishNamespaceDone {
231    pub request_id: VarInt,
232}
233
234/// PUBLISH_NAMESPACE_CANCEL (0x0C). Draft-16: request_id + error_code + reason.
235#[derive(Debug, Clone, PartialEq, Eq)]
236pub struct PublishNamespaceCancel {
237    pub request_id: VarInt,
238    pub error_code: VarInt,
239    pub reason_phrase: Vec<u8>,
240}
241
242// ============================================================
243// Namespace Messages (new in draft-16)
244// ============================================================
245
246/// NAMESPACE (0x08). Carries namespace_suffix.
247#[derive(Debug, Clone, PartialEq, Eq)]
248pub struct Namespace {
249    pub namespace_suffix: TrackNamespace,
250}
251
252/// NAMESPACE_DONE (0x0E). Carries namespace_suffix.
253#[derive(Debug, Clone, PartialEq, Eq)]
254pub struct NamespaceDone {
255    pub namespace_suffix: TrackNamespace,
256}
257
258// ============================================================
259// Subscribe Namespace Messages
260// ============================================================
261
262/// SUBSCRIBE_NAMESPACE (0x11). Draft-16: gains subscribe_options varint.
263#[derive(Debug, Clone, PartialEq, Eq)]
264pub struct SubscribeNamespace {
265    pub request_id: VarInt,
266    pub namespace_prefix: TrackNamespace,
267    pub subscribe_options: VarInt,
268    pub parameters: Vec<KeyValuePair>,
269}
270
271// ============================================================
272// Track Status Messages
273// ============================================================
274
275#[derive(Debug, Clone, PartialEq, Eq)]
276pub struct TrackStatus {
277    pub request_id: VarInt,
278    pub track_namespace: TrackNamespace,
279    pub track_name: Vec<u8>,
280    pub parameters: Vec<KeyValuePair>,
281}
282
283// ============================================================
284// Fetch Messages
285// ============================================================
286
287#[derive(Debug, Clone, Copy, PartialEq, Eq)]
288#[repr(u64)]
289pub enum FetchType {
290    /// Standalone fetch with explicit track + range.
291    Standalone = 1,
292    /// Joining fetch using a relative group offset.
293    RelativeJoining = 2,
294    /// Joining fetch using an absolute group.
295    AbsoluteJoining = 3,
296}
297
298impl FetchType {
299    /// Map a varint value to a FetchType, returning None for unknown values.
300    pub fn from_u64(v: u64) -> Option<Self> {
301        match v {
302            1 => Some(FetchType::Standalone),
303            2 => Some(FetchType::RelativeJoining),
304            3 => Some(FetchType::AbsoluteJoining),
305            _ => None,
306        }
307    }
308}
309
310#[derive(Debug, Clone, PartialEq, Eq)]
311pub struct Fetch {
312    pub request_id: VarInt,
313    pub fetch_type: FetchType,
314    pub fetch_payload: FetchPayload,
315    pub parameters: Vec<KeyValuePair>,
316}
317
318#[derive(Debug, Clone, PartialEq, Eq)]
319pub enum FetchPayload {
320    Standalone {
321        track_namespace: TrackNamespace,
322        track_name: Vec<u8>,
323        start_group: VarInt,
324        start_object: VarInt,
325        end_group: VarInt,
326        end_object: VarInt,
327    },
328    Joining {
329        joining_request_id: VarInt,
330        joining_start: VarInt,
331    },
332}
333
334#[derive(Debug, Clone, PartialEq, Eq)]
335pub struct FetchOk {
336    pub request_id: VarInt,
337    pub end_of_track: VarInt,
338    pub end_group: VarInt,
339    pub end_object: VarInt,
340    pub parameters: Vec<KeyValuePair>,
341    /// Track extensions: KVPs that follow `parameters` and continue until
342    /// the end of the control-message payload. Empty if none.
343    pub track_extensions: Vec<KeyValuePair>,
344}
345
346#[derive(Debug, Clone, PartialEq, Eq)]
347pub struct FetchCancel {
348    pub request_id: VarInt,
349}
350
351// ============================================================
352// Unified Message Enum
353// ============================================================
354
355#[derive(Debug, Clone, PartialEq, Eq)]
356pub enum ControlMessage {
357    ClientSetup(ClientSetup),
358    ServerSetup(ServerSetup),
359    GoAway(GoAway),
360    MaxRequestId(MaxRequestId),
361    RequestsBlocked(RequestsBlocked),
362    RequestOk(RequestOk),
363    RequestError(RequestError),
364    Subscribe(Subscribe),
365    SubscribeOk(SubscribeOk),
366    RequestUpdate(RequestUpdate),
367    Unsubscribe(Unsubscribe),
368    Publish(Publish),
369    PublishOk(PublishOk),
370    PublishDone(PublishDone),
371    PublishNamespace(PublishNamespace),
372    PublishNamespaceDone(PublishNamespaceDone),
373    PublishNamespaceCancel(PublishNamespaceCancel),
374    Namespace(Namespace),
375    NamespaceDone(NamespaceDone),
376    SubscribeNamespace(SubscribeNamespace),
377    TrackStatus(TrackStatus),
378    Fetch(Fetch),
379    FetchOk(FetchOk),
380    FetchCancel(FetchCancel),
381}
382
383impl ControlMessage {
384    pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
385        let mut payload = Vec::with_capacity(256);
386        self.encode_payload(&mut payload)?;
387
388        if payload.len() > MAX_MESSAGE_LENGTH {
389            return Err(CodecError::MessageTooLong(payload.len()));
390        }
391
392        let msg_type = self.message_type();
393        VarInt::from_usize(msg_type.id() as usize).encode(buf);
394        // Draft-16: 16-bit length (big-endian)
395        buf.put_u16(payload.len() as u16);
396        buf.put_slice(&payload);
397        Ok(())
398    }
399
400    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
401        let type_id = VarInt::decode(buf)?.into_inner();
402        let msg_type =
403            MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
404        // Draft-16: 16-bit length (big-endian)
405        if buf.remaining() < 2 {
406            return Err(CodecError::UnexpectedEnd);
407        }
408        let payload_len = buf.get_u16() as usize;
409        if buf.remaining() < payload_len {
410            return Err(CodecError::UnexpectedEnd);
411        }
412        let payload_bytes = buf.copy_to_bytes(payload_len);
413        let mut payload = &payload_bytes[..];
414        Self::decode_payload(msg_type, &mut payload)
415    }
416
417    fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
418        match self {
419            ControlMessage::ClientSetup(m) => {
420                KeyValuePair::encode_list(&m.parameters, buf);
421            }
422            ControlMessage::ServerSetup(m) => {
423                KeyValuePair::encode_list(&m.parameters, buf);
424            }
425            ControlMessage::GoAway(m) => {
426                if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
427                    return Err(CodecError::GoAwayUriTooLong);
428                }
429                VarInt::from_usize(m.new_session_uri.len()).encode(buf);
430                buf.put_slice(&m.new_session_uri);
431            }
432            ControlMessage::MaxRequestId(m) => {
433                m.request_id.encode(buf);
434            }
435            ControlMessage::RequestsBlocked(m) => {
436                m.maximum_request_id.encode(buf);
437            }
438            ControlMessage::RequestOk(m) => {
439                m.request_id.encode(buf);
440                KeyValuePair::encode_list(&m.parameters, buf);
441            }
442            ControlMessage::RequestError(m) => {
443                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
444                    return Err(CodecError::ReasonPhraseTooLong);
445                }
446                m.request_id.encode(buf);
447                m.error_code.encode(buf);
448                m.retry_interval.encode(buf);
449                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
450                buf.put_slice(&m.reason_phrase);
451            }
452            ControlMessage::Subscribe(m) => {
453                m.request_id.encode(buf);
454                m.track_namespace.encode(buf);
455                VarInt::from_usize(m.track_name.len()).encode(buf);
456                buf.put_slice(&m.track_name);
457                KeyValuePair::encode_list(&m.parameters, buf);
458            }
459            ControlMessage::SubscribeOk(m) => {
460                m.request_id.encode(buf);
461                m.track_alias.encode(buf);
462                KeyValuePair::encode_list(&m.parameters, buf);
463                encode_track_extensions(&m.track_extensions, buf);
464            }
465            ControlMessage::RequestUpdate(m) => {
466                m.request_id.encode(buf);
467                m.existing_request_id.encode(buf);
468                KeyValuePair::encode_list(&m.parameters, buf);
469            }
470            ControlMessage::Unsubscribe(m) => {
471                m.request_id.encode(buf);
472            }
473            ControlMessage::Publish(m) => {
474                m.request_id.encode(buf);
475                m.track_namespace.encode(buf);
476                VarInt::from_usize(m.track_name.len()).encode(buf);
477                buf.put_slice(&m.track_name);
478                m.track_alias.encode(buf);
479                KeyValuePair::encode_list(&m.parameters, buf);
480                encode_track_extensions(&m.track_extensions, buf);
481            }
482            ControlMessage::PublishOk(m) => {
483                m.request_id.encode(buf);
484                KeyValuePair::encode_list(&m.parameters, buf);
485            }
486            ControlMessage::PublishDone(m) => {
487                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
488                    return Err(CodecError::ReasonPhraseTooLong);
489                }
490                m.request_id.encode(buf);
491                m.status_code.encode(buf);
492                m.stream_count.encode(buf);
493                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
494                buf.put_slice(&m.reason_phrase);
495            }
496            ControlMessage::PublishNamespace(m) => {
497                m.request_id.encode(buf);
498                m.track_namespace.encode(buf);
499                KeyValuePair::encode_list(&m.parameters, buf);
500            }
501            ControlMessage::PublishNamespaceDone(m) => {
502                m.request_id.encode(buf);
503            }
504            ControlMessage::PublishNamespaceCancel(m) => {
505                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
506                    return Err(CodecError::ReasonPhraseTooLong);
507                }
508                m.request_id.encode(buf);
509                m.error_code.encode(buf);
510                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
511                buf.put_slice(&m.reason_phrase);
512            }
513            ControlMessage::Namespace(m) => {
514                m.namespace_suffix.encode(buf);
515            }
516            ControlMessage::NamespaceDone(m) => {
517                m.namespace_suffix.encode(buf);
518            }
519            ControlMessage::SubscribeNamespace(m) => {
520                m.request_id.encode(buf);
521                m.namespace_prefix.encode(buf);
522                m.subscribe_options.encode(buf);
523                KeyValuePair::encode_list(&m.parameters, buf);
524            }
525            ControlMessage::TrackStatus(m) => {
526                m.request_id.encode(buf);
527                m.track_namespace.encode(buf);
528                VarInt::from_usize(m.track_name.len()).encode(buf);
529                buf.put_slice(&m.track_name);
530                KeyValuePair::encode_list(&m.parameters, buf);
531            }
532            ControlMessage::Fetch(m) => {
533                m.request_id.encode(buf);
534                VarInt::from_usize(m.fetch_type as usize).encode(buf);
535                match &m.fetch_payload {
536                    FetchPayload::Standalone {
537                        track_namespace,
538                        track_name,
539                        start_group,
540                        start_object,
541                        end_group,
542                        end_object,
543                    } => {
544                        track_namespace.encode(buf);
545                        VarInt::from_usize(track_name.len()).encode(buf);
546                        buf.put_slice(track_name);
547                        start_group.encode(buf);
548                        start_object.encode(buf);
549                        end_group.encode(buf);
550                        end_object.encode(buf);
551                    }
552                    FetchPayload::Joining { joining_request_id, joining_start } => {
553                        joining_request_id.encode(buf);
554                        joining_start.encode(buf);
555                    }
556                }
557                KeyValuePair::encode_list(&m.parameters, buf);
558            }
559            ControlMessage::FetchOk(m) => {
560                m.request_id.encode(buf);
561                m.end_of_track.encode(buf);
562                m.end_group.encode(buf);
563                m.end_object.encode(buf);
564                KeyValuePair::encode_list(&m.parameters, buf);
565                encode_track_extensions(&m.track_extensions, buf);
566            }
567            ControlMessage::FetchCancel(m) => {
568                m.request_id.encode(buf);
569            }
570        }
571        Ok(())
572    }
573
574    fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
575        match msg_type {
576            MessageType::ClientSetup => {
577                let parameters = KeyValuePair::decode_list(buf)?;
578                Ok(ControlMessage::ClientSetup(ClientSetup { parameters }))
579            }
580            MessageType::ServerSetup => {
581                let parameters = KeyValuePair::decode_list(buf)?;
582                Ok(ControlMessage::ServerSetup(ServerSetup { parameters }))
583            }
584            MessageType::GoAway => {
585                let uri_len = VarInt::decode(buf)?.into_inner() as usize;
586                let uri = read_bytes(buf, uri_len)?;
587                Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
588            }
589            MessageType::MaxRequestId => {
590                let request_id = VarInt::decode(buf)?;
591                Ok(ControlMessage::MaxRequestId(MaxRequestId { request_id }))
592            }
593            MessageType::RequestsBlocked => {
594                let maximum_request_id = VarInt::decode(buf)?;
595                Ok(ControlMessage::RequestsBlocked(RequestsBlocked { maximum_request_id }))
596            }
597            MessageType::RequestOk => {
598                let request_id = VarInt::decode(buf)?;
599                let parameters = KeyValuePair::decode_list(buf)?;
600                Ok(ControlMessage::RequestOk(RequestOk { request_id, parameters }))
601            }
602            MessageType::RequestError => {
603                let request_id = VarInt::decode(buf)?;
604                let error_code = VarInt::decode(buf)?;
605                let retry_interval = VarInt::decode(buf)?;
606                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
607                let reason_phrase = read_bytes(buf, reason_len)?;
608                Ok(ControlMessage::RequestError(RequestError {
609                    request_id,
610                    error_code,
611                    retry_interval,
612                    reason_phrase,
613                }))
614            }
615            MessageType::Subscribe => {
616                let request_id = VarInt::decode(buf)?;
617                let track_namespace = TrackNamespace::decode(buf)?;
618                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
619                let track_name = read_bytes(buf, track_name_len)?;
620                let parameters = KeyValuePair::decode_list(buf)?;
621                Ok(ControlMessage::Subscribe(Subscribe {
622                    request_id,
623                    track_namespace,
624                    track_name,
625                    parameters,
626                }))
627            }
628            MessageType::SubscribeOk => {
629                let request_id = VarInt::decode(buf)?;
630                let track_alias = VarInt::decode(buf)?;
631                let parameters = KeyValuePair::decode_list(buf)?;
632                let track_extensions = decode_track_extensions(buf)?;
633                Ok(ControlMessage::SubscribeOk(SubscribeOk {
634                    request_id,
635                    track_alias,
636                    parameters,
637                    track_extensions,
638                }))
639            }
640            MessageType::RequestUpdate => {
641                let request_id = VarInt::decode(buf)?;
642                let existing_request_id = VarInt::decode(buf)?;
643                let parameters = KeyValuePair::decode_list(buf)?;
644                Ok(ControlMessage::RequestUpdate(RequestUpdate {
645                    request_id,
646                    existing_request_id,
647                    parameters,
648                }))
649            }
650            MessageType::Unsubscribe => {
651                let request_id = VarInt::decode(buf)?;
652                Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
653            }
654            MessageType::Publish => {
655                let request_id = VarInt::decode(buf)?;
656                let track_namespace = TrackNamespace::decode(buf)?;
657                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
658                let track_name = read_bytes(buf, track_name_len)?;
659                let track_alias = VarInt::decode(buf)?;
660                let parameters = KeyValuePair::decode_list(buf)?;
661                let track_extensions = decode_track_extensions(buf)?;
662                Ok(ControlMessage::Publish(Publish {
663                    request_id,
664                    track_namespace,
665                    track_name,
666                    track_alias,
667                    parameters,
668                    track_extensions,
669                }))
670            }
671            MessageType::PublishOk => {
672                let request_id = VarInt::decode(buf)?;
673                let parameters = KeyValuePair::decode_list(buf)?;
674                Ok(ControlMessage::PublishOk(PublishOk { request_id, parameters }))
675            }
676            MessageType::PublishDone => {
677                let request_id = VarInt::decode(buf)?;
678                let status_code = VarInt::decode(buf)?;
679                let stream_count = VarInt::decode(buf)?;
680                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
681                let reason_phrase = read_bytes(buf, reason_len)?;
682                Ok(ControlMessage::PublishDone(PublishDone {
683                    request_id,
684                    status_code,
685                    stream_count,
686                    reason_phrase,
687                }))
688            }
689            MessageType::PublishNamespace => {
690                let request_id = VarInt::decode(buf)?;
691                let track_namespace = TrackNamespace::decode(buf)?;
692                let parameters = KeyValuePair::decode_list(buf)?;
693                Ok(ControlMessage::PublishNamespace(PublishNamespace {
694                    request_id,
695                    track_namespace,
696                    parameters,
697                }))
698            }
699            MessageType::PublishNamespaceDone => {
700                let request_id = VarInt::decode(buf)?;
701                Ok(ControlMessage::PublishNamespaceDone(PublishNamespaceDone { request_id }))
702            }
703            MessageType::PublishNamespaceCancel => {
704                let request_id = VarInt::decode(buf)?;
705                let error_code = VarInt::decode(buf)?;
706                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
707                let reason_phrase = read_bytes(buf, reason_len)?;
708                Ok(ControlMessage::PublishNamespaceCancel(PublishNamespaceCancel {
709                    request_id,
710                    error_code,
711                    reason_phrase,
712                }))
713            }
714            MessageType::Namespace => {
715                let namespace_suffix = TrackNamespace::decode_allow_empty(buf)?;
716                Ok(ControlMessage::Namespace(Namespace { namespace_suffix }))
717            }
718            MessageType::NamespaceDone => {
719                let namespace_suffix = TrackNamespace::decode_allow_empty(buf)?;
720                Ok(ControlMessage::NamespaceDone(NamespaceDone { namespace_suffix }))
721            }
722            MessageType::SubscribeNamespace => {
723                let request_id = VarInt::decode(buf)?;
724                let namespace_prefix = TrackNamespace::decode(buf)?;
725                let subscribe_options = VarInt::decode(buf)?;
726                let parameters = KeyValuePair::decode_list(buf)?;
727                Ok(ControlMessage::SubscribeNamespace(SubscribeNamespace {
728                    request_id,
729                    namespace_prefix,
730                    subscribe_options,
731                    parameters,
732                }))
733            }
734            MessageType::TrackStatus => {
735                let request_id = VarInt::decode(buf)?;
736                let track_namespace = TrackNamespace::decode(buf)?;
737                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
738                let track_name = read_bytes(buf, track_name_len)?;
739                let parameters = KeyValuePair::decode_list(buf)?;
740                Ok(ControlMessage::TrackStatus(TrackStatus {
741                    request_id,
742                    track_namespace,
743                    track_name,
744                    parameters,
745                }))
746            }
747            MessageType::Fetch => {
748                let request_id = VarInt::decode(buf)?;
749                let fetch_type_val = VarInt::decode(buf)?.into_inner();
750                let fetch_type =
751                    FetchType::from_u64(fetch_type_val).ok_or(CodecError::InvalidField)?;
752                let fetch_payload = match fetch_type {
753                    FetchType::Standalone => {
754                        let track_namespace = TrackNamespace::decode(buf)?;
755                        let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
756                        let track_name = read_bytes(buf, track_name_len)?;
757                        let start_group = VarInt::decode(buf)?;
758                        let start_object = VarInt::decode(buf)?;
759                        let end_group = VarInt::decode(buf)?;
760                        let end_object = VarInt::decode(buf)?;
761                        FetchPayload::Standalone {
762                            track_namespace,
763                            track_name,
764                            start_group,
765                            start_object,
766                            end_group,
767                            end_object,
768                        }
769                    }
770                    FetchType::RelativeJoining | FetchType::AbsoluteJoining => {
771                        let joining_request_id = VarInt::decode(buf)?;
772                        let joining_start = VarInt::decode(buf)?;
773                        FetchPayload::Joining { joining_request_id, joining_start }
774                    }
775                };
776                let parameters = KeyValuePair::decode_list(buf)?;
777                Ok(ControlMessage::Fetch(Fetch {
778                    request_id,
779                    fetch_type,
780                    fetch_payload,
781                    parameters,
782                }))
783            }
784            MessageType::FetchOk => {
785                let request_id = VarInt::decode(buf)?;
786                let end_of_track = VarInt::decode(buf)?;
787                let end_group = VarInt::decode(buf)?;
788                let end_object = VarInt::decode(buf)?;
789                let parameters = KeyValuePair::decode_list(buf)?;
790                let track_extensions = decode_track_extensions(buf)?;
791                Ok(ControlMessage::FetchOk(FetchOk {
792                    request_id,
793                    end_of_track,
794                    end_group,
795                    end_object,
796                    parameters,
797                    track_extensions,
798                }))
799            }
800            MessageType::FetchCancel => {
801                let request_id = VarInt::decode(buf)?;
802                Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
803            }
804        }
805    }
806
807    pub fn message_type(&self) -> MessageType {
808        match self {
809            ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
810            ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
811            ControlMessage::GoAway(_) => MessageType::GoAway,
812            ControlMessage::MaxRequestId(_) => MessageType::MaxRequestId,
813            ControlMessage::RequestsBlocked(_) => MessageType::RequestsBlocked,
814            ControlMessage::RequestOk(_) => MessageType::RequestOk,
815            ControlMessage::RequestError(_) => MessageType::RequestError,
816            ControlMessage::Subscribe(_) => MessageType::Subscribe,
817            ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
818            ControlMessage::RequestUpdate(_) => MessageType::RequestUpdate,
819            ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
820            ControlMessage::Publish(_) => MessageType::Publish,
821            ControlMessage::PublishOk(_) => MessageType::PublishOk,
822            ControlMessage::PublishDone(_) => MessageType::PublishDone,
823            ControlMessage::PublishNamespace(_) => MessageType::PublishNamespace,
824            ControlMessage::PublishNamespaceDone(_) => MessageType::PublishNamespaceDone,
825            ControlMessage::PublishNamespaceCancel(_) => MessageType::PublishNamespaceCancel,
826            ControlMessage::Namespace(_) => MessageType::Namespace,
827            ControlMessage::NamespaceDone(_) => MessageType::NamespaceDone,
828            ControlMessage::SubscribeNamespace(_) => MessageType::SubscribeNamespace,
829            ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
830            ControlMessage::Fetch(_) => MessageType::Fetch,
831            ControlMessage::FetchOk(_) => MessageType::FetchOk,
832            ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
833        }
834    }
835}