Skip to main content

moqtap_codec/draft15/
message.rs

1//! Draft-15 control message encoding and decoding.
2//!
3//! Key changes from draft-14:
4//! - Version negotiation via ALPN — ClientSetup/ServerSetup have no versions
5//! - Consolidated RequestOk (0x07) and RequestError (0x05)
6//! - Subscribe simplified: request_id + ns + track_name + params
7//! - SubscribeOk simplified: request_id + track_alias + params
8//! - Publish simplified: request_id + ns + track_name + track_alias + params
9//! - PublishOk simplified: request_id + params
10//! - SubscribeUpdate: request_id + subscription_request_id + params
11//! - FetchOk: request_id + end_of_track + end_group + end_object + params
12//! - PublishDone (0x0B) replaces SubscribeDone
13//! - Framing: type_id(vi) + payload_length(16) + payload
14
15pub use crate::error::{
16    CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_NAMESPACE_TUPLE_SIZE,
17    MAX_REASON_PHRASE_LENGTH,
18};
19use crate::kvp::KeyValuePair;
20use crate::types::*;
21use crate::varint::VarInt;
22use bytes::{Buf, BufMut};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25#[repr(u64)]
26pub enum MessageType {
27    SubscribeUpdate = 0x02,
28    Subscribe = 0x03,
29    SubscribeOk = 0x04,
30    RequestError = 0x05,
31    PublishNamespace = 0x06,
32    RequestOk = 0x07,
33    PublishNamespaceDone = 0x09,
34    Unsubscribe = 0x0A,
35    PublishDone = 0x0B,
36    PublishNamespaceCancel = 0x0C,
37    TrackStatus = 0x0D,
38    GoAway = 0x10,
39    SubscribeNamespace = 0x11,
40    UnsubscribeNamespace = 0x14,
41    MaxRequestId = 0x15,
42    Fetch = 0x16,
43    FetchCancel = 0x17,
44    FetchOk = 0x18,
45    RequestsBlocked = 0x1A,
46    Publish = 0x1D,
47    PublishOk = 0x1E,
48    ClientSetup = 0x20,
49    ServerSetup = 0x21,
50}
51
52impl MessageType {
53    pub fn from_id(id: u64) -> Option<Self> {
54        match id {
55            0x02 => Some(MessageType::SubscribeUpdate),
56            0x03 => Some(MessageType::Subscribe),
57            0x04 => Some(MessageType::SubscribeOk),
58            0x05 => Some(MessageType::RequestError),
59            0x06 => Some(MessageType::PublishNamespace),
60            0x07 => Some(MessageType::RequestOk),
61            0x09 => Some(MessageType::PublishNamespaceDone),
62            0x0A => Some(MessageType::Unsubscribe),
63            0x0B => Some(MessageType::PublishDone),
64            0x0C => Some(MessageType::PublishNamespaceCancel),
65            0x0D => Some(MessageType::TrackStatus),
66            0x10 => Some(MessageType::GoAway),
67            0x11 => Some(MessageType::SubscribeNamespace),
68            0x14 => Some(MessageType::UnsubscribeNamespace),
69            0x15 => Some(MessageType::MaxRequestId),
70            0x16 => Some(MessageType::Fetch),
71            0x17 => Some(MessageType::FetchCancel),
72            0x18 => Some(MessageType::FetchOk),
73            0x1A => Some(MessageType::RequestsBlocked),
74            0x1D => Some(MessageType::Publish),
75            0x1E => Some(MessageType::PublishOk),
76            0x20 => Some(MessageType::ClientSetup),
77            0x21 => Some(MessageType::ServerSetup),
78            _ => None,
79        }
80    }
81
82    pub fn id(&self) -> u64 {
83        *self as u64
84    }
85}
86
87// ============================================================
88// Session Lifecycle Messages
89// ============================================================
90
91/// CLIENT_SETUP (0x20). Draft-15: no versions, just parameters.
92#[derive(Debug, Clone, PartialEq, Eq)]
93pub struct ClientSetup {
94    pub parameters: Vec<KeyValuePair>,
95}
96
97/// SERVER_SETUP (0x21). Draft-15: no version, just parameters.
98#[derive(Debug, Clone, PartialEq, Eq)]
99pub struct ServerSetup {
100    pub parameters: Vec<KeyValuePair>,
101}
102
103#[derive(Debug, Clone, PartialEq, Eq)]
104pub struct GoAway {
105    pub new_session_uri: Vec<u8>,
106}
107
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct MaxRequestId {
110    pub request_id: VarInt,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct RequestsBlocked {
115    pub maximum_request_id: VarInt,
116}
117
118// ============================================================
119// Consolidated Response Messages
120// ============================================================
121
122/// REQUEST_OK (0x07). Consolidated OK response for all request types.
123#[derive(Debug, Clone, PartialEq, Eq)]
124pub struct RequestOk {
125    pub request_id: VarInt,
126    pub parameters: Vec<KeyValuePair>,
127}
128
129/// REQUEST_ERROR (0x05). Consolidated error response for all request types.
130#[derive(Debug, Clone, PartialEq, Eq)]
131pub struct RequestError {
132    pub request_id: VarInt,
133    pub error_code: VarInt,
134    pub reason_phrase: Vec<u8>,
135}
136
137// ============================================================
138// Subscribe Messages
139// ============================================================
140
141/// SUBSCRIBE (0x03). Simplified: fields moved to parameters.
142#[derive(Debug, Clone, PartialEq, Eq)]
143pub struct Subscribe {
144    pub request_id: VarInt,
145    pub track_namespace: TrackNamespace,
146    pub track_name: Vec<u8>,
147    pub parameters: Vec<KeyValuePair>,
148}
149
150/// SUBSCRIBE_OK (0x04). Simplified: most fields moved to parameters.
151#[derive(Debug, Clone, PartialEq, Eq)]
152pub struct SubscribeOk {
153    pub request_id: VarInt,
154    pub track_alias: VarInt,
155    pub parameters: Vec<KeyValuePair>,
156}
157
158/// SUBSCRIBE_UPDATE (0x02). request_id + subscription_request_id + params.
159#[derive(Debug, Clone, PartialEq, Eq)]
160pub struct SubscribeUpdate {
161    pub request_id: VarInt,
162    pub subscription_request_id: VarInt,
163    pub parameters: Vec<KeyValuePair>,
164}
165
166#[derive(Debug, Clone, PartialEq, Eq)]
167pub struct Unsubscribe {
168    pub request_id: VarInt,
169}
170
171// ============================================================
172// Publish Messages
173// ============================================================
174
175/// PUBLISH (0x1D). Simplified: request_id + ns + name + alias + params.
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct Publish {
178    pub request_id: VarInt,
179    pub track_namespace: TrackNamespace,
180    pub track_name: Vec<u8>,
181    pub track_alias: VarInt,
182    pub parameters: Vec<KeyValuePair>,
183}
184
185/// PUBLISH_OK (0x1E). Simplified: request_id + params.
186#[derive(Debug, Clone, PartialEq, Eq)]
187pub struct PublishOk {
188    pub request_id: VarInt,
189    pub parameters: Vec<KeyValuePair>,
190}
191
192/// PUBLISH_DONE (0x0B). Replaces SubscribeDone.
193#[derive(Debug, Clone, PartialEq, Eq)]
194pub struct PublishDone {
195    pub request_id: VarInt,
196    pub status_code: VarInt,
197    pub stream_count: VarInt,
198    pub reason_phrase: Vec<u8>,
199}
200
201// ============================================================
202// Publish Namespace Messages (renamed from Announce)
203// ============================================================
204
205/// PUBLISH_NAMESPACE (0x06). request_id + namespace + params.
206#[derive(Debug, Clone, PartialEq, Eq)]
207pub struct PublishNamespace {
208    pub request_id: VarInt,
209    pub track_namespace: TrackNamespace,
210    pub parameters: Vec<KeyValuePair>,
211}
212
213/// PUBLISH_NAMESPACE_DONE (0x09). Just namespace.
214#[derive(Debug, Clone, PartialEq, Eq)]
215pub struct PublishNamespaceDone {
216    pub track_namespace: TrackNamespace,
217}
218
219/// PUBLISH_NAMESPACE_CANCEL (0x0C). namespace + error_code + reason.
220#[derive(Debug, Clone, PartialEq, Eq)]
221pub struct PublishNamespaceCancel {
222    pub track_namespace: TrackNamespace,
223    pub error_code: VarInt,
224    pub reason_phrase: Vec<u8>,
225}
226
227// ============================================================
228// Subscribe Namespace Messages
229// ============================================================
230
231#[derive(Debug, Clone, PartialEq, Eq)]
232pub struct SubscribeNamespace {
233    pub request_id: VarInt,
234    pub namespace_prefix: TrackNamespace,
235    pub parameters: Vec<KeyValuePair>,
236}
237
238/// UNSUBSCRIBE_NAMESPACE (0x14). Just request_id.
239#[derive(Debug, Clone, PartialEq, Eq)]
240pub struct UnsubscribeNamespace {
241    pub request_id: VarInt,
242}
243
244// ============================================================
245// Track Status Messages
246// ============================================================
247
248/// TRACK_STATUS (0x0D). Same structure as Subscribe.
249#[derive(Debug, Clone, PartialEq, Eq)]
250pub struct TrackStatus {
251    pub request_id: VarInt,
252    pub track_namespace: TrackNamespace,
253    pub track_name: Vec<u8>,
254    pub parameters: Vec<KeyValuePair>,
255}
256
257// ============================================================
258// Fetch Messages
259// ============================================================
260
261#[derive(Debug, Clone, Copy, PartialEq, Eq)]
262#[repr(u64)]
263pub enum FetchType {
264    /// Standalone fetch with explicit track + range.
265    Standalone = 1,
266    /// Joining fetch using a relative group offset.
267    RelativeJoining = 2,
268    /// Joining fetch using an absolute group.
269    AbsoluteJoining = 3,
270}
271
272impl FetchType {
273    /// Map a varint value to a FetchType, returning None for unknown values.
274    pub fn from_u64(v: u64) -> Option<Self> {
275        match v {
276            1 => Some(FetchType::Standalone),
277            2 => Some(FetchType::RelativeJoining),
278            3 => Some(FetchType::AbsoluteJoining),
279            _ => None,
280        }
281    }
282}
283
284#[derive(Debug, Clone, PartialEq, Eq)]
285pub struct Fetch {
286    pub request_id: VarInt,
287    pub fetch_type: FetchType,
288    pub fetch_payload: FetchPayload,
289    pub parameters: Vec<KeyValuePair>,
290}
291
292#[derive(Debug, Clone, PartialEq, Eq)]
293pub enum FetchPayload {
294    Standalone {
295        track_namespace: TrackNamespace,
296        track_name: Vec<u8>,
297        start_group: VarInt,
298        start_object: VarInt,
299        end_group: VarInt,
300        end_object: VarInt,
301    },
302    Joining {
303        joining_request_id: VarInt,
304        joining_start: VarInt,
305    },
306}
307
308#[derive(Debug, Clone, PartialEq, Eq)]
309pub struct FetchOk {
310    pub request_id: VarInt,
311    pub end_of_track: VarInt,
312    pub end_group: VarInt,
313    pub end_object: VarInt,
314    pub parameters: Vec<KeyValuePair>,
315}
316
317#[derive(Debug, Clone, PartialEq, Eq)]
318pub struct FetchCancel {
319    pub request_id: VarInt,
320}
321
322// ============================================================
323// Unified Message Enum
324// ============================================================
325
326#[derive(Debug, Clone, PartialEq, Eq)]
327pub enum ControlMessage {
328    ClientSetup(ClientSetup),
329    ServerSetup(ServerSetup),
330    GoAway(GoAway),
331    MaxRequestId(MaxRequestId),
332    RequestsBlocked(RequestsBlocked),
333    RequestOk(RequestOk),
334    RequestError(RequestError),
335    Subscribe(Subscribe),
336    SubscribeOk(SubscribeOk),
337    SubscribeUpdate(SubscribeUpdate),
338    Unsubscribe(Unsubscribe),
339    Publish(Publish),
340    PublishOk(PublishOk),
341    PublishDone(PublishDone),
342    PublishNamespace(PublishNamespace),
343    PublishNamespaceDone(PublishNamespaceDone),
344    PublishNamespaceCancel(PublishNamespaceCancel),
345    SubscribeNamespace(SubscribeNamespace),
346    UnsubscribeNamespace(UnsubscribeNamespace),
347    TrackStatus(TrackStatus),
348    Fetch(Fetch),
349    FetchOk(FetchOk),
350    FetchCancel(FetchCancel),
351}
352
353impl ControlMessage {
354    pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
355        let mut payload = Vec::with_capacity(256);
356        self.encode_payload(&mut payload)?;
357
358        if payload.len() > MAX_MESSAGE_LENGTH {
359            return Err(CodecError::MessageTooLong(payload.len()));
360        }
361
362        let msg_type = self.message_type();
363        VarInt::from_usize(msg_type.id() as usize).encode(buf);
364        // Draft-15: 16-bit length (big-endian)
365        buf.put_u16(payload.len() as u16);
366        buf.put_slice(&payload);
367        Ok(())
368    }
369
370    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
371        let type_id = VarInt::decode(buf)?.into_inner();
372        let msg_type =
373            MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
374        // Draft-15: 16-bit length (big-endian)
375        if buf.remaining() < 2 {
376            return Err(CodecError::UnexpectedEnd);
377        }
378        let payload_len = buf.get_u16() as usize;
379        if buf.remaining() < payload_len {
380            return Err(CodecError::UnexpectedEnd);
381        }
382        let payload_bytes = buf.copy_to_bytes(payload_len);
383        let mut payload = &payload_bytes[..];
384        Self::decode_payload(msg_type, &mut payload)
385    }
386
387    fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
388        match self {
389            ControlMessage::ClientSetup(m) => {
390                KeyValuePair::encode_list(&m.parameters, buf);
391            }
392            ControlMessage::ServerSetup(m) => {
393                KeyValuePair::encode_list(&m.parameters, buf);
394            }
395            ControlMessage::GoAway(m) => {
396                if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
397                    return Err(CodecError::GoAwayUriTooLong);
398                }
399                VarInt::from_usize(m.new_session_uri.len()).encode(buf);
400                buf.put_slice(&m.new_session_uri);
401            }
402            ControlMessage::MaxRequestId(m) => {
403                m.request_id.encode(buf);
404            }
405            ControlMessage::RequestsBlocked(m) => {
406                m.maximum_request_id.encode(buf);
407            }
408            ControlMessage::RequestOk(m) => {
409                m.request_id.encode(buf);
410                KeyValuePair::encode_list(&m.parameters, buf);
411            }
412            ControlMessage::RequestError(m) => {
413                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
414                    return Err(CodecError::ReasonPhraseTooLong);
415                }
416                m.request_id.encode(buf);
417                m.error_code.encode(buf);
418                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
419                buf.put_slice(&m.reason_phrase);
420            }
421            ControlMessage::Subscribe(m) => {
422                m.request_id.encode(buf);
423                m.track_namespace.encode(buf);
424                VarInt::from_usize(m.track_name.len()).encode(buf);
425                buf.put_slice(&m.track_name);
426                KeyValuePair::encode_list(&m.parameters, buf);
427            }
428            ControlMessage::SubscribeOk(m) => {
429                m.request_id.encode(buf);
430                m.track_alias.encode(buf);
431                KeyValuePair::encode_list(&m.parameters, buf);
432            }
433            ControlMessage::SubscribeUpdate(m) => {
434                m.request_id.encode(buf);
435                m.subscription_request_id.encode(buf);
436                KeyValuePair::encode_list(&m.parameters, buf);
437            }
438            ControlMessage::Unsubscribe(m) => {
439                m.request_id.encode(buf);
440            }
441            ControlMessage::Publish(m) => {
442                m.request_id.encode(buf);
443                m.track_namespace.encode(buf);
444                VarInt::from_usize(m.track_name.len()).encode(buf);
445                buf.put_slice(&m.track_name);
446                m.track_alias.encode(buf);
447                KeyValuePair::encode_list(&m.parameters, buf);
448            }
449            ControlMessage::PublishOk(m) => {
450                m.request_id.encode(buf);
451                KeyValuePair::encode_list(&m.parameters, buf);
452            }
453            ControlMessage::PublishDone(m) => {
454                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
455                    return Err(CodecError::ReasonPhraseTooLong);
456                }
457                m.request_id.encode(buf);
458                m.status_code.encode(buf);
459                m.stream_count.encode(buf);
460                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
461                buf.put_slice(&m.reason_phrase);
462            }
463            ControlMessage::PublishNamespace(m) => {
464                m.request_id.encode(buf);
465                m.track_namespace.encode(buf);
466                KeyValuePair::encode_list(&m.parameters, buf);
467            }
468            ControlMessage::PublishNamespaceDone(m) => {
469                m.track_namespace.encode(buf);
470            }
471            ControlMessage::PublishNamespaceCancel(m) => {
472                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
473                    return Err(CodecError::ReasonPhraseTooLong);
474                }
475                m.track_namespace.encode(buf);
476                m.error_code.encode(buf);
477                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
478                buf.put_slice(&m.reason_phrase);
479            }
480            ControlMessage::SubscribeNamespace(m) => {
481                m.request_id.encode(buf);
482                m.namespace_prefix.encode(buf);
483                KeyValuePair::encode_list(&m.parameters, buf);
484            }
485            ControlMessage::UnsubscribeNamespace(m) => {
486                m.request_id.encode(buf);
487            }
488            ControlMessage::TrackStatus(m) => {
489                m.request_id.encode(buf);
490                m.track_namespace.encode(buf);
491                VarInt::from_usize(m.track_name.len()).encode(buf);
492                buf.put_slice(&m.track_name);
493                KeyValuePair::encode_list(&m.parameters, buf);
494            }
495            ControlMessage::Fetch(m) => {
496                m.request_id.encode(buf);
497                VarInt::from_usize(m.fetch_type as usize).encode(buf);
498                match &m.fetch_payload {
499                    FetchPayload::Standalone {
500                        track_namespace,
501                        track_name,
502                        start_group,
503                        start_object,
504                        end_group,
505                        end_object,
506                    } => {
507                        track_namespace.encode(buf);
508                        VarInt::from_usize(track_name.len()).encode(buf);
509                        buf.put_slice(track_name);
510                        start_group.encode(buf);
511                        start_object.encode(buf);
512                        end_group.encode(buf);
513                        end_object.encode(buf);
514                    }
515                    FetchPayload::Joining { joining_request_id, joining_start } => {
516                        joining_request_id.encode(buf);
517                        joining_start.encode(buf);
518                    }
519                }
520                KeyValuePair::encode_list(&m.parameters, buf);
521            }
522            ControlMessage::FetchOk(m) => {
523                m.request_id.encode(buf);
524                m.end_of_track.encode(buf);
525                m.end_group.encode(buf);
526                m.end_object.encode(buf);
527                KeyValuePair::encode_list(&m.parameters, buf);
528            }
529            ControlMessage::FetchCancel(m) => {
530                m.request_id.encode(buf);
531            }
532        }
533        Ok(())
534    }
535
536    fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
537        match msg_type {
538            MessageType::ClientSetup => {
539                let parameters = KeyValuePair::decode_list(buf)?;
540                Ok(ControlMessage::ClientSetup(ClientSetup { parameters }))
541            }
542            MessageType::ServerSetup => {
543                let parameters = KeyValuePair::decode_list(buf)?;
544                Ok(ControlMessage::ServerSetup(ServerSetup { parameters }))
545            }
546            MessageType::GoAway => {
547                let uri_len = VarInt::decode(buf)?.into_inner() as usize;
548                let uri = read_bytes(buf, uri_len)?;
549                Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
550            }
551            MessageType::MaxRequestId => {
552                let request_id = VarInt::decode(buf)?;
553                Ok(ControlMessage::MaxRequestId(MaxRequestId { request_id }))
554            }
555            MessageType::RequestsBlocked => {
556                let maximum_request_id = VarInt::decode(buf)?;
557                Ok(ControlMessage::RequestsBlocked(RequestsBlocked { maximum_request_id }))
558            }
559            MessageType::RequestOk => {
560                let request_id = VarInt::decode(buf)?;
561                let parameters = KeyValuePair::decode_list(buf)?;
562                Ok(ControlMessage::RequestOk(RequestOk { request_id, parameters }))
563            }
564            MessageType::RequestError => {
565                let request_id = VarInt::decode(buf)?;
566                let error_code = VarInt::decode(buf)?;
567                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
568                let reason_phrase = read_bytes(buf, reason_len)?;
569                Ok(ControlMessage::RequestError(RequestError {
570                    request_id,
571                    error_code,
572                    reason_phrase,
573                }))
574            }
575            MessageType::Subscribe => {
576                let request_id = VarInt::decode(buf)?;
577                let track_namespace = TrackNamespace::decode(buf)?;
578                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
579                let track_name = read_bytes(buf, track_name_len)?;
580                let parameters = KeyValuePair::decode_list(buf)?;
581                Ok(ControlMessage::Subscribe(Subscribe {
582                    request_id,
583                    track_namespace,
584                    track_name,
585                    parameters,
586                }))
587            }
588            MessageType::SubscribeOk => {
589                let request_id = VarInt::decode(buf)?;
590                let track_alias = VarInt::decode(buf)?;
591                let parameters = KeyValuePair::decode_list(buf)?;
592                Ok(ControlMessage::SubscribeOk(SubscribeOk { request_id, track_alias, parameters }))
593            }
594            MessageType::SubscribeUpdate => {
595                let request_id = VarInt::decode(buf)?;
596                let subscription_request_id = VarInt::decode(buf)?;
597                let parameters = KeyValuePair::decode_list(buf)?;
598                Ok(ControlMessage::SubscribeUpdate(SubscribeUpdate {
599                    request_id,
600                    subscription_request_id,
601                    parameters,
602                }))
603            }
604            MessageType::Unsubscribe => {
605                let request_id = VarInt::decode(buf)?;
606                Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
607            }
608            MessageType::Publish => {
609                let request_id = VarInt::decode(buf)?;
610                let track_namespace = TrackNamespace::decode(buf)?;
611                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
612                let track_name = read_bytes(buf, track_name_len)?;
613                let track_alias = VarInt::decode(buf)?;
614                let parameters = KeyValuePair::decode_list(buf)?;
615                Ok(ControlMessage::Publish(Publish {
616                    request_id,
617                    track_namespace,
618                    track_name,
619                    track_alias,
620                    parameters,
621                }))
622            }
623            MessageType::PublishOk => {
624                let request_id = VarInt::decode(buf)?;
625                let parameters = KeyValuePair::decode_list(buf)?;
626                Ok(ControlMessage::PublishOk(PublishOk { request_id, parameters }))
627            }
628            MessageType::PublishDone => {
629                let request_id = VarInt::decode(buf)?;
630                let status_code = VarInt::decode(buf)?;
631                let stream_count = VarInt::decode(buf)?;
632                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
633                let reason_phrase = read_bytes(buf, reason_len)?;
634                Ok(ControlMessage::PublishDone(PublishDone {
635                    request_id,
636                    status_code,
637                    stream_count,
638                    reason_phrase,
639                }))
640            }
641            MessageType::PublishNamespace => {
642                let request_id = VarInt::decode(buf)?;
643                let track_namespace = TrackNamespace::decode(buf)?;
644                let parameters = KeyValuePair::decode_list(buf)?;
645                Ok(ControlMessage::PublishNamespace(PublishNamespace {
646                    request_id,
647                    track_namespace,
648                    parameters,
649                }))
650            }
651            MessageType::PublishNamespaceDone => {
652                let track_namespace = TrackNamespace::decode(buf)?;
653                Ok(ControlMessage::PublishNamespaceDone(PublishNamespaceDone { track_namespace }))
654            }
655            MessageType::PublishNamespaceCancel => {
656                let track_namespace = TrackNamespace::decode(buf)?;
657                let error_code = VarInt::decode(buf)?;
658                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
659                let reason_phrase = read_bytes(buf, reason_len)?;
660                Ok(ControlMessage::PublishNamespaceCancel(PublishNamespaceCancel {
661                    track_namespace,
662                    error_code,
663                    reason_phrase,
664                }))
665            }
666            MessageType::SubscribeNamespace => {
667                let request_id = VarInt::decode(buf)?;
668                let namespace_prefix = TrackNamespace::decode(buf)?;
669                let parameters = KeyValuePair::decode_list(buf)?;
670                Ok(ControlMessage::SubscribeNamespace(SubscribeNamespace {
671                    request_id,
672                    namespace_prefix,
673                    parameters,
674                }))
675            }
676            MessageType::UnsubscribeNamespace => {
677                let request_id = VarInt::decode(buf)?;
678                Ok(ControlMessage::UnsubscribeNamespace(UnsubscribeNamespace { request_id }))
679            }
680            MessageType::TrackStatus => {
681                let request_id = VarInt::decode(buf)?;
682                let track_namespace = TrackNamespace::decode(buf)?;
683                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
684                let track_name = read_bytes(buf, track_name_len)?;
685                let parameters = KeyValuePair::decode_list(buf)?;
686                Ok(ControlMessage::TrackStatus(TrackStatus {
687                    request_id,
688                    track_namespace,
689                    track_name,
690                    parameters,
691                }))
692            }
693            MessageType::Fetch => {
694                let request_id = VarInt::decode(buf)?;
695                let fetch_type_val = VarInt::decode(buf)?.into_inner();
696                let fetch_type =
697                    FetchType::from_u64(fetch_type_val).ok_or(CodecError::InvalidField)?;
698                let fetch_payload = match fetch_type {
699                    FetchType::Standalone => {
700                        let track_namespace = TrackNamespace::decode(buf)?;
701                        let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
702                        let track_name = read_bytes(buf, track_name_len)?;
703                        let start_group = VarInt::decode(buf)?;
704                        let start_object = VarInt::decode(buf)?;
705                        let end_group = VarInt::decode(buf)?;
706                        let end_object = VarInt::decode(buf)?;
707                        FetchPayload::Standalone {
708                            track_namespace,
709                            track_name,
710                            start_group,
711                            start_object,
712                            end_group,
713                            end_object,
714                        }
715                    }
716                    FetchType::RelativeJoining | FetchType::AbsoluteJoining => {
717                        let joining_request_id = VarInt::decode(buf)?;
718                        let joining_start = VarInt::decode(buf)?;
719                        FetchPayload::Joining { joining_request_id, joining_start }
720                    }
721                };
722                let parameters = KeyValuePair::decode_list(buf)?;
723                Ok(ControlMessage::Fetch(Fetch {
724                    request_id,
725                    fetch_type,
726                    fetch_payload,
727                    parameters,
728                }))
729            }
730            MessageType::FetchOk => {
731                let request_id = VarInt::decode(buf)?;
732                let end_of_track = VarInt::decode(buf)?;
733                let end_group = VarInt::decode(buf)?;
734                let end_object = VarInt::decode(buf)?;
735                let parameters = KeyValuePair::decode_list(buf)?;
736                Ok(ControlMessage::FetchOk(FetchOk {
737                    request_id,
738                    end_of_track,
739                    end_group,
740                    end_object,
741                    parameters,
742                }))
743            }
744            MessageType::FetchCancel => {
745                let request_id = VarInt::decode(buf)?;
746                Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
747            }
748        }
749    }
750
751    pub fn message_type(&self) -> MessageType {
752        match self {
753            ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
754            ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
755            ControlMessage::GoAway(_) => MessageType::GoAway,
756            ControlMessage::MaxRequestId(_) => MessageType::MaxRequestId,
757            ControlMessage::RequestsBlocked(_) => MessageType::RequestsBlocked,
758            ControlMessage::RequestOk(_) => MessageType::RequestOk,
759            ControlMessage::RequestError(_) => MessageType::RequestError,
760            ControlMessage::Subscribe(_) => MessageType::Subscribe,
761            ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
762            ControlMessage::SubscribeUpdate(_) => MessageType::SubscribeUpdate,
763            ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
764            ControlMessage::Publish(_) => MessageType::Publish,
765            ControlMessage::PublishOk(_) => MessageType::PublishOk,
766            ControlMessage::PublishDone(_) => MessageType::PublishDone,
767            ControlMessage::PublishNamespace(_) => MessageType::PublishNamespace,
768            ControlMessage::PublishNamespaceDone(_) => MessageType::PublishNamespaceDone,
769            ControlMessage::PublishNamespaceCancel(_) => MessageType::PublishNamespaceCancel,
770            ControlMessage::SubscribeNamespace(_) => MessageType::SubscribeNamespace,
771            ControlMessage::UnsubscribeNamespace(_) => MessageType::UnsubscribeNamespace,
772            ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
773            ControlMessage::Fetch(_) => MessageType::Fetch,
774            ControlMessage::FetchOk(_) => MessageType::FetchOk,
775            ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
776        }
777    }
778}