Skip to main content

moqtap_codec/draft17/
message.rs

1//! Draft-17 control message encoding and decoding.
2//!
3//! Key differences from draft-16:
4//! - Framing: Type (varint) + Length (16-bit fixed) + Payload.
5//! - Unified SETUP (0x2F00) with delta-encoded KVP options (even/odd).
6//! - Parameters: count-prefixed, delta-encoded types, type-specific value encoding.
7//! - RequestOk/RequestError/PublishOk/PublishDone/FetchOk: no request_id.
8//! - Request messages gain required_request_id_delta.
9//! - New: PublishBlocked. FetchType gains AbsoluteJoining.
10//! - SubscribeOk/Publish/FetchOk gain track_properties after parameters.
11//! - Removed: ClientSetup, ServerSetup, MaxRequestId, RequestsBlocked, Unsubscribe,
12//!   PublishNamespaceDone, PublishNamespaceCancel, FetchCancel.
13
14pub use crate::error::{
15    CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_NAMESPACE_TUPLE_SIZE,
16    MAX_REASON_PHRASE_LENGTH,
17};
18use crate::kvp::{KeyValuePair, KvpValue};
19use crate::types::*;
20use crate::varint::VarInt;
21use bytes::{Buf, BufMut};
22
23// ============================================================
24// Parameter encoding helpers for draft-17
25// ============================================================
26
27/// How a parameter value is encoded on the wire.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29enum ParamEncoding {
30    /// Bare varint.
31    Varint,
32    /// Single byte (uint8).
33    Uint8,
34    /// Two consecutive varints (group, object).
35    Location,
36    /// Length-prefixed bytes.
37    LengthPrefixed,
38}
39
40fn param_encoding(key: u64) -> Option<ParamEncoding> {
41    match key {
42        0x02 | 0x04 | 0x08 | 0x32 => Some(ParamEncoding::Varint),
43        0x10 | 0x20 | 0x22 => Some(ParamEncoding::Uint8),
44        0x09 => Some(ParamEncoding::Location),
45        0x03 | 0x21 => Some(ParamEncoding::LengthPrefixed),
46        _ => None,
47    }
48}
49
50/// Decode a count-prefixed list of parameters with delta-encoded types.
51fn decode_parameters(buf: &mut impl Buf) -> Result<Vec<KeyValuePair>, CodecError> {
52    let count = VarInt::decode(buf)?.into_inner() as usize;
53    let mut params = Vec::with_capacity(count);
54    let mut prev_key: u64 = 0;
55
56    for _ in 0..count {
57        let delta = VarInt::decode(buf)?.into_inner();
58        let abs_key = prev_key + delta;
59        prev_key = abs_key;
60
61        let encoding = param_encoding(abs_key).ok_or(CodecError::InvalidField)?;
62
63        let value = match encoding {
64            ParamEncoding::Varint => {
65                let v = VarInt::decode(buf)?;
66                KvpValue::Varint(v)
67            }
68            ParamEncoding::Uint8 => {
69                if buf.remaining() < 1 {
70                    return Err(CodecError::UnexpectedEnd);
71                }
72                let byte = buf.get_u8();
73                KvpValue::Varint(VarInt::from_u64(byte as u64).unwrap())
74            }
75            ParamEncoding::Location => {
76                let group = VarInt::decode(buf)?;
77                let object = VarInt::decode(buf)?;
78                let mut encoded = Vec::new();
79                group.encode(&mut encoded);
80                object.encode(&mut encoded);
81                KvpValue::Bytes(encoded)
82            }
83            ParamEncoding::LengthPrefixed => {
84                let len = VarInt::decode(buf)?.into_inner() as usize;
85                let data = read_bytes(buf, len)?;
86                KvpValue::Bytes(data)
87            }
88        };
89
90        params.push(KeyValuePair { key: VarInt::from_u64(abs_key).unwrap(), value });
91    }
92    Ok(params)
93}
94
95/// Encode a count-prefixed list of parameters with delta-encoded types.
96fn encode_parameters(params: &[KeyValuePair], buf: &mut impl BufMut) {
97    VarInt::from_usize(params.len()).encode(buf);
98    let mut prev_key: u64 = 0;
99
100    for p in params {
101        let abs_key = p.key.into_inner();
102        let delta = abs_key - prev_key;
103        prev_key = abs_key;
104        VarInt::from_u64(delta).unwrap().encode(buf);
105
106        let encoding = param_encoding(abs_key);
107        match (&p.value, encoding) {
108            (KvpValue::Varint(v), Some(ParamEncoding::Varint)) => {
109                v.encode(buf);
110            }
111            (KvpValue::Varint(v), Some(ParamEncoding::Uint8)) => {
112                buf.put_u8(v.into_inner() as u8);
113            }
114            (KvpValue::Bytes(b), Some(ParamEncoding::Location)) => {
115                buf.put_slice(b);
116            }
117            (KvpValue::Bytes(b), Some(ParamEncoding::LengthPrefixed)) => {
118                VarInt::from_usize(b.len()).encode(buf);
119                buf.put_slice(b);
120            }
121            _ => {
122                // Fallback: encode as KVP even/odd
123                match &p.value {
124                    KvpValue::Varint(v) => v.encode(buf),
125                    KvpValue::Bytes(b) => {
126                        VarInt::from_usize(b.len()).encode(buf);
127                        buf.put_slice(b);
128                    }
129                }
130            }
131        }
132    }
133}
134
135/// Decode delta-encoded KVPs with even/odd convention (for setup options
136/// and track properties). Read until buffer is exhausted.
137fn decode_kvp_delta(buf: &mut impl Buf) -> Result<Vec<KeyValuePair>, CodecError> {
138    let mut pairs = Vec::new();
139    let mut prev_key: u64 = 0;
140
141    while buf.has_remaining() {
142        let delta = VarInt::decode(buf)?.into_inner();
143        let abs_key = prev_key + delta;
144        prev_key = abs_key;
145
146        let value = if abs_key % 2 == 0 {
147            let v = VarInt::decode(buf)?;
148            KvpValue::Varint(v)
149        } else {
150            let len = VarInt::decode(buf)?.into_inner() as usize;
151            let data = read_bytes(buf, len)?;
152            KvpValue::Bytes(data)
153        };
154
155        pairs.push(KeyValuePair { key: VarInt::from_u64(abs_key).unwrap(), value });
156    }
157    Ok(pairs)
158}
159
160/// Encode delta-encoded KVPs with even/odd convention.
161fn encode_kvp_delta(pairs: &[KeyValuePair], buf: &mut impl BufMut) {
162    let mut prev_key: u64 = 0;
163    for p in pairs {
164        let abs_key = p.key.into_inner();
165        let delta = abs_key - prev_key;
166        prev_key = abs_key;
167        VarInt::from_u64(delta).unwrap().encode(buf);
168        match &p.value {
169            KvpValue::Varint(v) => v.encode(buf),
170            KvpValue::Bytes(b) => {
171                VarInt::from_usize(b.len()).encode(buf);
172                buf.put_slice(b);
173            }
174        }
175    }
176}
177
178// ============================================================
179// Message Types
180// ============================================================
181
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183#[repr(u64)]
184pub enum MessageType {
185    RequestUpdate = 0x02,
186    Subscribe = 0x03,
187    SubscribeOk = 0x04,
188    RequestError = 0x05,
189    PublishNamespace = 0x06,
190    RequestOk = 0x07,
191    Namespace = 0x08,
192    PublishDone = 0x0B,
193    TrackStatus = 0x0D,
194    NamespaceDone = 0x0E,
195    PublishBlocked = 0x0F,
196    GoAway = 0x10,
197    SubscribeNamespace = 0x11,
198    Fetch = 0x16,
199    FetchOk = 0x18,
200    Publish = 0x1D,
201    PublishOk = 0x1E,
202    Setup = 0x2F00,
203}
204
205impl MessageType {
206    pub fn from_id(id: u64) -> Option<Self> {
207        match id {
208            0x02 => Some(MessageType::RequestUpdate),
209            0x03 => Some(MessageType::Subscribe),
210            0x04 => Some(MessageType::SubscribeOk),
211            0x05 => Some(MessageType::RequestError),
212            0x06 => Some(MessageType::PublishNamespace),
213            0x07 => Some(MessageType::RequestOk),
214            0x08 => Some(MessageType::Namespace),
215            0x0B => Some(MessageType::PublishDone),
216            0x0D => Some(MessageType::TrackStatus),
217            0x0E => Some(MessageType::NamespaceDone),
218            0x0F => Some(MessageType::PublishBlocked),
219            0x10 => Some(MessageType::GoAway),
220            0x11 => Some(MessageType::SubscribeNamespace),
221            0x16 => Some(MessageType::Fetch),
222            0x18 => Some(MessageType::FetchOk),
223            0x1D => Some(MessageType::Publish),
224            0x1E => Some(MessageType::PublishOk),
225            0x2F00 => Some(MessageType::Setup),
226            _ => None,
227        }
228    }
229
230    pub fn id(&self) -> u64 {
231        *self as u64
232    }
233}
234
235// ============================================================
236// Session Lifecycle Messages
237// ============================================================
238
239/// Unified SETUP (0x2F00). Replaces ClientSetup/ServerSetup.
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct Setup {
242    pub options: Vec<KeyValuePair>,
243}
244
245#[derive(Debug, Clone, PartialEq, Eq)]
246pub struct GoAway {
247    pub new_session_uri: Vec<u8>,
248    pub timeout: VarInt,
249}
250
251// ============================================================
252// Consolidated Response Messages
253// ============================================================
254
255/// REQUEST_OK (0x07). No request_id in draft-17.
256#[derive(Debug, Clone, PartialEq, Eq)]
257pub struct RequestOk {
258    pub parameters: Vec<KeyValuePair>,
259}
260
261/// REQUEST_ERROR (0x05). No request_id in draft-17.
262#[derive(Debug, Clone, PartialEq, Eq)]
263pub struct RequestError {
264    pub error_code: VarInt,
265    pub retry_interval: VarInt,
266    pub reason_phrase: Vec<u8>,
267}
268
269// ============================================================
270// Subscribe Messages
271// ============================================================
272
273#[derive(Debug, Clone, PartialEq, Eq)]
274pub struct Subscribe {
275    pub request_id: VarInt,
276    pub required_request_id_delta: VarInt,
277    pub track_namespace: TrackNamespace,
278    pub track_name: Vec<u8>,
279    pub parameters: Vec<KeyValuePair>,
280}
281
282/// SUBSCRIBE_OK (0x04). No request_id in draft-17. Gains track_properties.
283#[derive(Debug, Clone, PartialEq, Eq)]
284pub struct SubscribeOk {
285    pub track_alias: VarInt,
286    pub parameters: Vec<KeyValuePair>,
287    pub track_properties: Vec<KeyValuePair>,
288}
289
290#[derive(Debug, Clone, PartialEq, Eq)]
291pub struct RequestUpdate {
292    pub request_id: VarInt,
293    pub required_request_id_delta: VarInt,
294    pub parameters: Vec<KeyValuePair>,
295}
296
297// ============================================================
298// Publish Messages
299// ============================================================
300
301#[derive(Debug, Clone, PartialEq, Eq)]
302pub struct Publish {
303    pub request_id: VarInt,
304    pub required_request_id_delta: VarInt,
305    pub track_namespace: TrackNamespace,
306    pub track_name: Vec<u8>,
307    pub track_alias: VarInt,
308    pub parameters: Vec<KeyValuePair>,
309    pub track_properties: Vec<KeyValuePair>,
310}
311
312/// PUBLISH_OK (0x1E). No request_id in draft-17.
313#[derive(Debug, Clone, PartialEq, Eq)]
314pub struct PublishOk {
315    pub parameters: Vec<KeyValuePair>,
316}
317
318/// PUBLISH_DONE (0x0B). No request_id in draft-17.
319#[derive(Debug, Clone, PartialEq, Eq)]
320pub struct PublishDone {
321    pub status_code: VarInt,
322    pub stream_count: VarInt,
323    pub reason_phrase: Vec<u8>,
324}
325
326// ============================================================
327// Publish Namespace Messages
328// ============================================================
329
330#[derive(Debug, Clone, PartialEq, Eq)]
331pub struct PublishNamespace {
332    pub request_id: VarInt,
333    pub required_request_id_delta: VarInt,
334    pub track_namespace: TrackNamespace,
335    pub parameters: Vec<KeyValuePair>,
336}
337
338// ============================================================
339// Namespace Messages
340// ============================================================
341
342#[derive(Debug, Clone, PartialEq, Eq)]
343pub struct Namespace {
344    pub namespace_suffix: TrackNamespace,
345}
346
347#[derive(Debug, Clone, PartialEq, Eq)]
348pub struct NamespaceDone {
349    pub namespace_suffix: TrackNamespace,
350}
351
352// ============================================================
353// Subscribe Namespace Messages
354// ============================================================
355
356#[derive(Debug, Clone, PartialEq, Eq)]
357pub struct SubscribeNamespace {
358    pub request_id: VarInt,
359    pub required_request_id_delta: VarInt,
360    pub namespace_prefix: TrackNamespace,
361    pub subscribe_options: VarInt,
362    pub parameters: Vec<KeyValuePair>,
363}
364
365// ============================================================
366// Track Status Messages
367// ============================================================
368
369#[derive(Debug, Clone, PartialEq, Eq)]
370pub struct TrackStatus {
371    pub request_id: VarInt,
372    pub required_request_id_delta: VarInt,
373    pub track_namespace: TrackNamespace,
374    pub track_name: Vec<u8>,
375    pub parameters: Vec<KeyValuePair>,
376}
377
378// ============================================================
379// Fetch Messages
380// ============================================================
381
382#[derive(Debug, Clone, Copy, PartialEq, Eq)]
383#[repr(u64)]
384pub enum FetchType {
385    Standalone = 1,
386    RelativeJoining = 2,
387    AbsoluteJoining = 3,
388}
389
390impl FetchType {
391    pub fn from_u64(v: u64) -> Option<Self> {
392        match v {
393            1 => Some(FetchType::Standalone),
394            2 => Some(FetchType::RelativeJoining),
395            3 => Some(FetchType::AbsoluteJoining),
396            _ => None,
397        }
398    }
399}
400
401#[derive(Debug, Clone, PartialEq, Eq)]
402pub struct Fetch {
403    pub request_id: VarInt,
404    pub required_request_id_delta: VarInt,
405    pub fetch_type: FetchType,
406    pub fetch_payload: FetchPayload,
407    pub parameters: Vec<KeyValuePair>,
408}
409
410#[derive(Debug, Clone, PartialEq, Eq)]
411pub enum FetchPayload {
412    Standalone {
413        track_namespace: TrackNamespace,
414        track_name: Vec<u8>,
415        start_group: VarInt,
416        start_object: VarInt,
417        end_group: VarInt,
418        end_object: VarInt,
419    },
420    Joining {
421        joining_request_id: VarInt,
422        joining_start: VarInt,
423    },
424}
425
426/// FETCH_OK (0x18). No request_id in draft-17. end_of_track is uint8.
427#[derive(Debug, Clone, PartialEq, Eq)]
428pub struct FetchOk {
429    pub end_of_track: u8,
430    pub end_group: VarInt,
431    pub end_object: VarInt,
432    pub parameters: Vec<KeyValuePair>,
433    pub track_properties: Vec<KeyValuePair>,
434}
435
436// ============================================================
437// Publish Blocked (new in draft-17)
438// ============================================================
439
440#[derive(Debug, Clone, PartialEq, Eq)]
441pub struct PublishBlocked {
442    pub namespace_suffix: TrackNamespace,
443    pub track_name: Vec<u8>,
444}
445
446// ============================================================
447// Unified Message Enum
448// ============================================================
449
450#[derive(Debug, Clone, PartialEq, Eq)]
451pub enum ControlMessage {
452    Setup(Setup),
453    GoAway(GoAway),
454    RequestOk(RequestOk),
455    RequestError(RequestError),
456    Subscribe(Subscribe),
457    SubscribeOk(SubscribeOk),
458    RequestUpdate(RequestUpdate),
459    Publish(Publish),
460    PublishOk(PublishOk),
461    PublishDone(PublishDone),
462    PublishNamespace(PublishNamespace),
463    Namespace(Namespace),
464    NamespaceDone(NamespaceDone),
465    SubscribeNamespace(SubscribeNamespace),
466    TrackStatus(TrackStatus),
467    Fetch(Fetch),
468    FetchOk(FetchOk),
469    PublishBlocked(PublishBlocked),
470}
471
472impl ControlMessage {
473    pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
474        let mut payload = Vec::with_capacity(256);
475        self.encode_payload(&mut payload)?;
476
477        if payload.len() > MAX_MESSAGE_LENGTH {
478            return Err(CodecError::MessageTooLong(payload.len()));
479        }
480
481        let msg_type = self.message_type();
482        VarInt::from_usize(msg_type.id() as usize).encode(buf);
483        // Draft-17: 16-bit length (big-endian)
484        buf.put_u16(payload.len() as u16);
485        buf.put_slice(&payload);
486        Ok(())
487    }
488
489    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
490        let type_id = VarInt::decode(buf)?.into_inner();
491        let msg_type =
492            MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
493        // Draft-17: 16-bit length (big-endian)
494        if buf.remaining() < 2 {
495            return Err(CodecError::UnexpectedEnd);
496        }
497        let payload_len = buf.get_u16() as usize;
498        if buf.remaining() < payload_len {
499            return Err(CodecError::UnexpectedEnd);
500        }
501        let payload_bytes = buf.copy_to_bytes(payload_len);
502        let mut payload = &payload_bytes[..];
503        Self::decode_payload(msg_type, &mut payload)
504    }
505
506    fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
507        match self {
508            ControlMessage::Setup(m) => {
509                encode_kvp_delta(&m.options, buf);
510            }
511            ControlMessage::GoAway(m) => {
512                if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
513                    return Err(CodecError::GoAwayUriTooLong);
514                }
515                VarInt::from_usize(m.new_session_uri.len()).encode(buf);
516                buf.put_slice(&m.new_session_uri);
517                m.timeout.encode(buf);
518            }
519            ControlMessage::RequestOk(m) => {
520                encode_parameters(&m.parameters, buf);
521            }
522            ControlMessage::RequestError(m) => {
523                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
524                    return Err(CodecError::ReasonPhraseTooLong);
525                }
526                m.error_code.encode(buf);
527                m.retry_interval.encode(buf);
528                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
529                buf.put_slice(&m.reason_phrase);
530            }
531            ControlMessage::Subscribe(m) => {
532                m.request_id.encode(buf);
533                m.required_request_id_delta.encode(buf);
534                m.track_namespace.encode(buf);
535                VarInt::from_usize(m.track_name.len()).encode(buf);
536                buf.put_slice(&m.track_name);
537                encode_parameters(&m.parameters, buf);
538            }
539            ControlMessage::SubscribeOk(m) => {
540                m.track_alias.encode(buf);
541                encode_parameters(&m.parameters, buf);
542                encode_kvp_delta(&m.track_properties, buf);
543            }
544            ControlMessage::RequestUpdate(m) => {
545                m.request_id.encode(buf);
546                m.required_request_id_delta.encode(buf);
547                encode_parameters(&m.parameters, buf);
548            }
549            ControlMessage::Publish(m) => {
550                m.request_id.encode(buf);
551                m.required_request_id_delta.encode(buf);
552                m.track_namespace.encode(buf);
553                VarInt::from_usize(m.track_name.len()).encode(buf);
554                buf.put_slice(&m.track_name);
555                m.track_alias.encode(buf);
556                encode_parameters(&m.parameters, buf);
557                encode_kvp_delta(&m.track_properties, buf);
558            }
559            ControlMessage::PublishOk(m) => {
560                encode_parameters(&m.parameters, buf);
561            }
562            ControlMessage::PublishDone(m) => {
563                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
564                    return Err(CodecError::ReasonPhraseTooLong);
565                }
566                m.status_code.encode(buf);
567                m.stream_count.encode(buf);
568                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
569                buf.put_slice(&m.reason_phrase);
570            }
571            ControlMessage::PublishNamespace(m) => {
572                m.request_id.encode(buf);
573                m.required_request_id_delta.encode(buf);
574                m.track_namespace.encode(buf);
575                encode_parameters(&m.parameters, buf);
576            }
577            ControlMessage::Namespace(m) => {
578                m.namespace_suffix.encode(buf);
579            }
580            ControlMessage::NamespaceDone(m) => {
581                m.namespace_suffix.encode(buf);
582            }
583            ControlMessage::SubscribeNamespace(m) => {
584                m.request_id.encode(buf);
585                m.required_request_id_delta.encode(buf);
586                m.namespace_prefix.encode(buf);
587                m.subscribe_options.encode(buf);
588                encode_parameters(&m.parameters, buf);
589            }
590            ControlMessage::TrackStatus(m) => {
591                m.request_id.encode(buf);
592                m.required_request_id_delta.encode(buf);
593                m.track_namespace.encode(buf);
594                VarInt::from_usize(m.track_name.len()).encode(buf);
595                buf.put_slice(&m.track_name);
596                encode_parameters(&m.parameters, buf);
597            }
598            ControlMessage::Fetch(m) => {
599                m.request_id.encode(buf);
600                m.required_request_id_delta.encode(buf);
601                VarInt::from_usize(m.fetch_type as usize).encode(buf);
602                match &m.fetch_payload {
603                    FetchPayload::Standalone {
604                        track_namespace,
605                        track_name,
606                        start_group,
607                        start_object,
608                        end_group,
609                        end_object,
610                    } => {
611                        track_namespace.encode(buf);
612                        VarInt::from_usize(track_name.len()).encode(buf);
613                        buf.put_slice(track_name);
614                        start_group.encode(buf);
615                        start_object.encode(buf);
616                        end_group.encode(buf);
617                        end_object.encode(buf);
618                    }
619                    FetchPayload::Joining { joining_request_id, joining_start } => {
620                        joining_request_id.encode(buf);
621                        joining_start.encode(buf);
622                    }
623                }
624                encode_parameters(&m.parameters, buf);
625            }
626            ControlMessage::FetchOk(m) => {
627                buf.put_u8(m.end_of_track);
628                m.end_group.encode(buf);
629                m.end_object.encode(buf);
630                encode_parameters(&m.parameters, buf);
631                encode_kvp_delta(&m.track_properties, buf);
632            }
633            ControlMessage::PublishBlocked(m) => {
634                m.namespace_suffix.encode(buf);
635                VarInt::from_usize(m.track_name.len()).encode(buf);
636                buf.put_slice(&m.track_name);
637            }
638        }
639        Ok(())
640    }
641
642    fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
643        match msg_type {
644            MessageType::Setup => {
645                let options = decode_kvp_delta(buf)?;
646                Ok(ControlMessage::Setup(Setup { options }))
647            }
648            MessageType::GoAway => {
649                let uri_len = VarInt::decode(buf)?.into_inner() as usize;
650                let uri = read_bytes(buf, uri_len)?;
651                let timeout = VarInt::decode(buf)?;
652                Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri, timeout }))
653            }
654            MessageType::RequestOk => {
655                let parameters = decode_parameters(buf)?;
656                Ok(ControlMessage::RequestOk(RequestOk { parameters }))
657            }
658            MessageType::RequestError => {
659                let error_code = VarInt::decode(buf)?;
660                let retry_interval = VarInt::decode(buf)?;
661                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
662                let reason_phrase = read_bytes(buf, reason_len)?;
663                Ok(ControlMessage::RequestError(RequestError {
664                    error_code,
665                    retry_interval,
666                    reason_phrase,
667                }))
668            }
669            MessageType::Subscribe => {
670                let request_id = VarInt::decode(buf)?;
671                let required_request_id_delta = VarInt::decode(buf)?;
672                let track_namespace = TrackNamespace::decode(buf)?;
673                let tn_len = VarInt::decode(buf)?.into_inner() as usize;
674                let track_name = read_bytes(buf, tn_len)?;
675                let parameters = decode_parameters(buf)?;
676                Ok(ControlMessage::Subscribe(Subscribe {
677                    request_id,
678                    required_request_id_delta,
679                    track_namespace,
680                    track_name,
681                    parameters,
682                }))
683            }
684            MessageType::SubscribeOk => {
685                let track_alias = VarInt::decode(buf)?;
686                let parameters = decode_parameters(buf)?;
687                let track_properties = decode_kvp_delta(buf)?;
688                Ok(ControlMessage::SubscribeOk(SubscribeOk {
689                    track_alias,
690                    parameters,
691                    track_properties,
692                }))
693            }
694            MessageType::RequestUpdate => {
695                let request_id = VarInt::decode(buf)?;
696                let required_request_id_delta = VarInt::decode(buf)?;
697                let parameters = decode_parameters(buf)?;
698                Ok(ControlMessage::RequestUpdate(RequestUpdate {
699                    request_id,
700                    required_request_id_delta,
701                    parameters,
702                }))
703            }
704            MessageType::Publish => {
705                let request_id = VarInt::decode(buf)?;
706                let required_request_id_delta = VarInt::decode(buf)?;
707                let track_namespace = TrackNamespace::decode(buf)?;
708                let tn_len = VarInt::decode(buf)?.into_inner() as usize;
709                let track_name = read_bytes(buf, tn_len)?;
710                let track_alias = VarInt::decode(buf)?;
711                let parameters = decode_parameters(buf)?;
712                let track_properties = decode_kvp_delta(buf)?;
713                Ok(ControlMessage::Publish(Publish {
714                    request_id,
715                    required_request_id_delta,
716                    track_namespace,
717                    track_name,
718                    track_alias,
719                    parameters,
720                    track_properties,
721                }))
722            }
723            MessageType::PublishOk => {
724                let parameters = decode_parameters(buf)?;
725                Ok(ControlMessage::PublishOk(PublishOk { parameters }))
726            }
727            MessageType::PublishDone => {
728                let status_code = VarInt::decode(buf)?;
729                let stream_count = VarInt::decode(buf)?;
730                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
731                let reason_phrase = read_bytes(buf, reason_len)?;
732                Ok(ControlMessage::PublishDone(PublishDone {
733                    status_code,
734                    stream_count,
735                    reason_phrase,
736                }))
737            }
738            MessageType::PublishNamespace => {
739                let request_id = VarInt::decode(buf)?;
740                let required_request_id_delta = VarInt::decode(buf)?;
741                let track_namespace = TrackNamespace::decode(buf)?;
742                let parameters = decode_parameters(buf)?;
743                Ok(ControlMessage::PublishNamespace(PublishNamespace {
744                    request_id,
745                    required_request_id_delta,
746                    track_namespace,
747                    parameters,
748                }))
749            }
750            MessageType::Namespace => {
751                let namespace_suffix = TrackNamespace::decode_allow_empty(buf)?;
752                Ok(ControlMessage::Namespace(Namespace { namespace_suffix }))
753            }
754            MessageType::NamespaceDone => {
755                let namespace_suffix = TrackNamespace::decode_allow_empty(buf)?;
756                Ok(ControlMessage::NamespaceDone(NamespaceDone { namespace_suffix }))
757            }
758            MessageType::SubscribeNamespace => {
759                let request_id = VarInt::decode(buf)?;
760                let required_request_id_delta = VarInt::decode(buf)?;
761                let namespace_prefix = TrackNamespace::decode_allow_empty(buf)?;
762                let subscribe_options = VarInt::decode(buf)?;
763                let parameters = decode_parameters(buf)?;
764                Ok(ControlMessage::SubscribeNamespace(SubscribeNamespace {
765                    request_id,
766                    required_request_id_delta,
767                    namespace_prefix,
768                    subscribe_options,
769                    parameters,
770                }))
771            }
772            MessageType::TrackStatus => {
773                let request_id = VarInt::decode(buf)?;
774                let required_request_id_delta = VarInt::decode(buf)?;
775                let track_namespace = TrackNamespace::decode(buf)?;
776                let tn_len = VarInt::decode(buf)?.into_inner() as usize;
777                let track_name = read_bytes(buf, tn_len)?;
778                let parameters = decode_parameters(buf)?;
779                Ok(ControlMessage::TrackStatus(TrackStatus {
780                    request_id,
781                    required_request_id_delta,
782                    track_namespace,
783                    track_name,
784                    parameters,
785                }))
786            }
787            MessageType::Fetch => {
788                let request_id = VarInt::decode(buf)?;
789                let required_request_id_delta = VarInt::decode(buf)?;
790                let fetch_type_val = VarInt::decode(buf)?.into_inner();
791                let fetch_type =
792                    FetchType::from_u64(fetch_type_val).ok_or(CodecError::InvalidField)?;
793                let fetch_payload = match fetch_type {
794                    FetchType::Standalone => {
795                        let track_namespace = TrackNamespace::decode(buf)?;
796                        let tn_len = VarInt::decode(buf)?.into_inner() as usize;
797                        let track_name = read_bytes(buf, tn_len)?;
798                        let start_group = VarInt::decode(buf)?;
799                        let start_object = VarInt::decode(buf)?;
800                        let end_group = VarInt::decode(buf)?;
801                        let end_object = VarInt::decode(buf)?;
802                        FetchPayload::Standalone {
803                            track_namespace,
804                            track_name,
805                            start_group,
806                            start_object,
807                            end_group,
808                            end_object,
809                        }
810                    }
811                    FetchType::RelativeJoining | FetchType::AbsoluteJoining => {
812                        let joining_request_id = VarInt::decode(buf)?;
813                        let joining_start = VarInt::decode(buf)?;
814                        FetchPayload::Joining { joining_request_id, joining_start }
815                    }
816                };
817                let parameters = decode_parameters(buf)?;
818                Ok(ControlMessage::Fetch(Fetch {
819                    request_id,
820                    required_request_id_delta,
821                    fetch_type,
822                    fetch_payload,
823                    parameters,
824                }))
825            }
826            MessageType::FetchOk => {
827                if buf.remaining() < 1 {
828                    return Err(CodecError::UnexpectedEnd);
829                }
830                let end_of_track = buf.get_u8();
831                let end_group = VarInt::decode(buf)?;
832                let end_object = VarInt::decode(buf)?;
833                let parameters = decode_parameters(buf)?;
834                let track_properties = decode_kvp_delta(buf)?;
835                Ok(ControlMessage::FetchOk(FetchOk {
836                    end_of_track,
837                    end_group,
838                    end_object,
839                    parameters,
840                    track_properties,
841                }))
842            }
843            MessageType::PublishBlocked => {
844                let namespace_suffix = TrackNamespace::decode_allow_empty(buf)?;
845                let tn_len = VarInt::decode(buf)?.into_inner() as usize;
846                let track_name = read_bytes(buf, tn_len)?;
847                Ok(ControlMessage::PublishBlocked(PublishBlocked { namespace_suffix, track_name }))
848            }
849        }
850    }
851
852    pub fn message_type(&self) -> MessageType {
853        match self {
854            ControlMessage::Setup(_) => MessageType::Setup,
855            ControlMessage::GoAway(_) => MessageType::GoAway,
856            ControlMessage::RequestOk(_) => MessageType::RequestOk,
857            ControlMessage::RequestError(_) => MessageType::RequestError,
858            ControlMessage::Subscribe(_) => MessageType::Subscribe,
859            ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
860            ControlMessage::RequestUpdate(_) => MessageType::RequestUpdate,
861            ControlMessage::Publish(_) => MessageType::Publish,
862            ControlMessage::PublishOk(_) => MessageType::PublishOk,
863            ControlMessage::PublishDone(_) => MessageType::PublishDone,
864            ControlMessage::PublishNamespace(_) => MessageType::PublishNamespace,
865            ControlMessage::Namespace(_) => MessageType::Namespace,
866            ControlMessage::NamespaceDone(_) => MessageType::NamespaceDone,
867            ControlMessage::SubscribeNamespace(_) => MessageType::SubscribeNamespace,
868            ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
869            ControlMessage::Fetch(_) => MessageType::Fetch,
870            ControlMessage::FetchOk(_) => MessageType::FetchOk,
871            ControlMessage::PublishBlocked(_) => MessageType::PublishBlocked,
872        }
873    }
874}