Skip to main content

moqtap_codec/draft10/
message.rs

1//! Draft-10 control message encoding and decoding.
2//!
3//! Wire format is identical to draft-08 except `filter_type=1`
4//! (NextGroupStart/LatestGroup) is removed from SUBSCRIBE.
5
6use crate::error::{
7    CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_REASON_PHRASE_LENGTH,
8};
9use crate::kvp::KeyValuePair;
10use crate::types::read_bytes;
11use crate::types::*;
12use crate::varint::VarInt;
13use bytes::{Buf, BufMut};
14
15/// Control message type IDs (draft-10).
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17#[repr(u64)]
18pub enum MessageType {
19    /// SubscribeUpdate (type 0x02).
20    SubscribeUpdate = 0x02,
21    /// Subscribe (type 0x03).
22    Subscribe = 0x03,
23    /// SubscribeOk (type 0x04).
24    SubscribeOk = 0x04,
25    /// SubscribeError (type 0x05).
26    SubscribeError = 0x05,
27    /// Announce (type 0x06).
28    Announce = 0x06,
29    /// AnnounceOk (type 0x07).
30    AnnounceOk = 0x07,
31    /// AnnounceError (type 0x08).
32    AnnounceError = 0x08,
33    /// Unannounce (type 0x09).
34    Unannounce = 0x09,
35    /// Unsubscribe (type 0x0A).
36    Unsubscribe = 0x0A,
37    /// SubscribeDone (type 0x0B).
38    SubscribeDone = 0x0B,
39    /// AnnounceCancel (type 0x0C).
40    AnnounceCancel = 0x0C,
41    /// TrackStatusRequest (type 0x0D).
42    TrackStatusRequest = 0x0D,
43    /// TrackStatus (type 0x0E).
44    TrackStatus = 0x0E,
45    /// GoAway (type 0x10).
46    GoAway = 0x10,
47    /// SubscribeAnnounces (type 0x11).
48    SubscribeAnnounces = 0x11,
49    /// SubscribeAnnouncesOk (type 0x12).
50    SubscribeAnnouncesOk = 0x12,
51    /// SubscribeAnnouncesError (type 0x13).
52    SubscribeAnnouncesError = 0x13,
53    /// UnsubscribeAnnounces (type 0x14).
54    UnsubscribeAnnounces = 0x14,
55    /// MaxSubscribeId (type 0x15).
56    MaxSubscribeId = 0x15,
57    /// Fetch (type 0x16).
58    Fetch = 0x16,
59    /// FetchCancel (type 0x17).
60    FetchCancel = 0x17,
61    /// FetchOk (type 0x18).
62    FetchOk = 0x18,
63    /// FetchError (type 0x19).
64    FetchError = 0x19,
65    /// SubscribesBlocked (type 0x1A).
66    SubscribesBlocked = 0x1A,
67    /// ClientSetup (type 0x40).
68    ClientSetup = 0x40,
69    /// ServerSetup (type 0x41).
70    ServerSetup = 0x41,
71}
72
73impl MessageType {
74    /// Look up a message type by its wire ID.
75    pub fn from_id(id: u64) -> Option<Self> {
76        match id {
77            0x02 => Some(MessageType::SubscribeUpdate),
78            0x03 => Some(MessageType::Subscribe),
79            0x04 => Some(MessageType::SubscribeOk),
80            0x05 => Some(MessageType::SubscribeError),
81            0x06 => Some(MessageType::Announce),
82            0x07 => Some(MessageType::AnnounceOk),
83            0x08 => Some(MessageType::AnnounceError),
84            0x09 => Some(MessageType::Unannounce),
85            0x0A => Some(MessageType::Unsubscribe),
86            0x0B => Some(MessageType::SubscribeDone),
87            0x0C => Some(MessageType::AnnounceCancel),
88            0x0D => Some(MessageType::TrackStatusRequest),
89            0x0E => Some(MessageType::TrackStatus),
90            0x10 => Some(MessageType::GoAway),
91            0x11 => Some(MessageType::SubscribeAnnounces),
92            0x12 => Some(MessageType::SubscribeAnnouncesOk),
93            0x13 => Some(MessageType::SubscribeAnnouncesError),
94            0x14 => Some(MessageType::UnsubscribeAnnounces),
95            0x15 => Some(MessageType::MaxSubscribeId),
96            0x16 => Some(MessageType::Fetch),
97            0x17 => Some(MessageType::FetchCancel),
98            0x18 => Some(MessageType::FetchOk),
99            0x19 => Some(MessageType::FetchError),
100            0x1A => Some(MessageType::SubscribesBlocked),
101            0x40 => Some(MessageType::ClientSetup),
102            0x41 => Some(MessageType::ServerSetup),
103            _ => None,
104        }
105    }
106
107    /// Return the wire ID for this message type.
108    pub fn id(&self) -> u64 {
109        *self as u64
110    }
111}
112
113// ============================================================
114// Session Lifecycle Messages
115// ============================================================
116
117/// CLIENT_SETUP message (type 0x40).
118#[derive(Debug, Clone, PartialEq, Eq)]
119pub struct ClientSetup {
120    /// The list of MoQT versions supported by the client.
121    pub supported_versions: Vec<VarInt>,
122    /// Setup parameters sent by the client.
123    pub parameters: Vec<KeyValuePair>,
124}
125
126/// SERVER_SETUP message (type 0x41).
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub struct ServerSetup {
129    /// The MoQT version selected by the server.
130    pub selected_version: VarInt,
131    /// Setup parameters sent by the server.
132    pub parameters: Vec<KeyValuePair>,
133}
134
135/// GOAWAY message (type 0x10).
136#[derive(Debug, Clone, PartialEq, Eq)]
137pub struct GoAway {
138    /// The URI for the new session the client should connect to.
139    pub new_session_uri: Vec<u8>,
140}
141
142/// MAX_SUBSCRIBE_ID message (type 0x15).
143#[derive(Debug, Clone, PartialEq, Eq)]
144pub struct MaxSubscribeId {
145    /// The maximum subscribe ID the peer is willing to accept.
146    pub subscribe_id: VarInt,
147}
148
149/// SUBSCRIBES_BLOCKED message (type 0x1A).
150#[derive(Debug, Clone, PartialEq, Eq)]
151pub struct SubscribesBlocked {
152    /// The maximum subscribe ID advertised by the peer.
153    pub maximum_subscribe_id: VarInt,
154}
155
156// ============================================================
157// Subscribe Messages
158// ============================================================
159
160/// SUBSCRIBE message (type 0x03).
161///
162/// Draft-10: filter_type=1 (NextGroupStart) is rejected on decode.
163#[derive(Debug, Clone, PartialEq, Eq)]
164pub struct Subscribe {
165    /// The subscribe ID for this request.
166    pub subscribe_id: VarInt,
167    /// The track alias assigned by the subscriber.
168    pub track_alias: VarInt,
169    /// The track namespace to subscribe to.
170    pub track_namespace: TrackNamespace,
171    /// The track name within the namespace.
172    pub track_name: Vec<u8>,
173    /// The priority of this subscriber relative to others.
174    pub subscriber_priority: u8,
175    /// The requested group delivery order.
176    pub group_order: GroupOrder,
177    /// The filter type controlling which objects are delivered.
178    pub filter_type: FilterType,
179    /// Present only for AbsoluteStart and AbsoluteRange filter types.
180    pub start_location: Option<Location>,
181    /// Present only for AbsoluteRange filter type (end_group only, no end_object).
182    pub end_group: Option<VarInt>,
183    /// Subscribe parameters.
184    pub parameters: Vec<KeyValuePair>,
185}
186
187/// SUBSCRIBE_OK message (type 0x04).
188#[derive(Debug, Clone, PartialEq, Eq)]
189pub struct SubscribeOk {
190    /// The subscribe ID this response corresponds to.
191    pub subscribe_id: VarInt,
192    /// The expiration time for this subscription in milliseconds.
193    pub expires: VarInt,
194    /// The group delivery order chosen by the publisher.
195    pub group_order: GroupOrder,
196    /// Whether the largest location is included.
197    pub content_exists: ContentExists,
198    /// Present only when content_exists == HasLargestLocation.
199    pub largest_group_id: Option<VarInt>,
200    /// Present only when content_exists == HasLargestLocation.
201    pub largest_object_id: Option<VarInt>,
202    /// Subscribe OK parameters.
203    pub parameters: Vec<KeyValuePair>,
204}
205
206/// SUBSCRIBE_ERROR message (type 0x05).
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct SubscribeError {
209    /// The subscribe ID this error corresponds to.
210    pub subscribe_id: VarInt,
211    /// The error code indicating the reason for failure.
212    pub error_code: VarInt,
213    /// A human-readable reason for the error.
214    pub reason_phrase: Vec<u8>,
215    /// The track alias from the original subscribe request.
216    pub track_alias: VarInt,
217}
218
219/// SUBSCRIBE_UPDATE message (type 0x02).
220#[derive(Debug, Clone, PartialEq, Eq)]
221pub struct SubscribeUpdate {
222    /// The subscribe ID to update.
223    pub subscribe_id: VarInt,
224    /// The new start group.
225    pub start_group: VarInt,
226    /// The new start object.
227    pub start_object: VarInt,
228    /// The new end group.
229    pub end_group: VarInt,
230    /// The updated subscriber priority.
231    pub subscriber_priority: u8,
232    /// Updated subscribe parameters.
233    pub parameters: Vec<KeyValuePair>,
234}
235
236/// SUBSCRIBE_DONE message (type 0x0B).
237#[derive(Debug, Clone, PartialEq, Eq)]
238pub struct SubscribeDone {
239    /// The subscribe ID this message refers to.
240    pub subscribe_id: VarInt,
241    /// The status code for the subscription completion.
242    pub status_code: VarInt,
243    /// Number of streams delivered.
244    pub stream_count: VarInt,
245    /// A human-readable reason phrase.
246    pub reason_phrase: Vec<u8>,
247}
248
249/// UNSUBSCRIBE message (type 0x0A).
250#[derive(Debug, Clone, PartialEq, Eq)]
251pub struct Unsubscribe {
252    /// The subscribe ID to unsubscribe from.
253    pub subscribe_id: VarInt,
254}
255
256// ============================================================
257// Announce Messages
258// ============================================================
259
260/// ANNOUNCE message (type 0x06).
261#[derive(Debug, Clone, PartialEq, Eq)]
262pub struct Announce {
263    /// The track namespace being announced.
264    pub track_namespace: TrackNamespace,
265    /// Announce parameters.
266    pub parameters: Vec<KeyValuePair>,
267}
268
269/// ANNOUNCE_OK message (type 0x07).
270#[derive(Debug, Clone, PartialEq, Eq)]
271pub struct AnnounceOk {
272    /// The track namespace that was accepted.
273    pub track_namespace: TrackNamespace,
274}
275
276/// ANNOUNCE_ERROR message (type 0x08).
277#[derive(Debug, Clone, PartialEq, Eq)]
278pub struct AnnounceError {
279    /// The track namespace that was rejected.
280    pub track_namespace: TrackNamespace,
281    /// The error code indicating the reason for failure.
282    pub error_code: VarInt,
283    /// A human-readable reason for the error.
284    pub reason_phrase: Vec<u8>,
285}
286
287/// ANNOUNCE_CANCEL message (type 0x0C).
288#[derive(Debug, Clone, PartialEq, Eq)]
289pub struct AnnounceCancel {
290    /// The track namespace being cancelled.
291    pub track_namespace: TrackNamespace,
292    /// The error code indicating the reason for cancellation.
293    pub error_code: VarInt,
294    /// A human-readable reason for the cancellation.
295    pub reason_phrase: Vec<u8>,
296}
297
298/// UNANNOUNCE message (type 0x09).
299#[derive(Debug, Clone, PartialEq, Eq)]
300pub struct Unannounce {
301    /// The track namespace being unannounced.
302    pub track_namespace: TrackNamespace,
303}
304
305// ============================================================
306// Subscribe Announces Messages
307// ============================================================
308
309/// SUBSCRIBE_ANNOUNCES message (type 0x11).
310#[derive(Debug, Clone, PartialEq, Eq)]
311pub struct SubscribeAnnounces {
312    /// The track namespace prefix to subscribe to announcements for.
313    pub track_namespace_prefix: TrackNamespace,
314    /// Subscribe announces parameters.
315    pub parameters: Vec<KeyValuePair>,
316}
317
318/// SUBSCRIBE_ANNOUNCES_OK message (type 0x12).
319#[derive(Debug, Clone, PartialEq, Eq)]
320pub struct SubscribeAnnouncesOk {
321    /// The track namespace prefix that was accepted.
322    pub track_namespace_prefix: TrackNamespace,
323}
324
325/// SUBSCRIBE_ANNOUNCES_ERROR message (type 0x13).
326#[derive(Debug, Clone, PartialEq, Eq)]
327pub struct SubscribeAnnouncesError {
328    /// The track namespace prefix that was rejected.
329    pub track_namespace_prefix: TrackNamespace,
330    /// The error code indicating the reason for failure.
331    pub error_code: VarInt,
332    /// A human-readable reason for the error.
333    pub reason_phrase: Vec<u8>,
334}
335
336/// UNSUBSCRIBE_ANNOUNCES message (type 0x14).
337#[derive(Debug, Clone, PartialEq, Eq)]
338pub struct UnsubscribeAnnounces {
339    /// The track namespace prefix to unsubscribe from.
340    pub track_namespace_prefix: TrackNamespace,
341}
342
343// ============================================================
344// Track Status Messages
345// ============================================================
346
347/// TRACK_STATUS_REQUEST message (type 0x0D).
348#[derive(Debug, Clone, PartialEq, Eq)]
349pub struct TrackStatusRequest {
350    /// The track namespace to query status for.
351    pub track_namespace: TrackNamespace,
352    /// The track name to query status for.
353    pub track_name: Vec<u8>,
354}
355
356/// TRACK_STATUS message (type 0x0E).
357#[derive(Debug, Clone, PartialEq, Eq)]
358pub struct TrackStatus {
359    /// The track namespace this status is for.
360    pub track_namespace: TrackNamespace,
361    /// The track name this status is for.
362    pub track_name: Vec<u8>,
363    /// The status code for the track.
364    pub status_code: VarInt,
365    /// The last group ID available on this track.
366    pub last_group_id: VarInt,
367    /// The last object ID available on this track.
368    pub last_object_id: VarInt,
369}
370
371// ============================================================
372// Fetch Messages
373// ============================================================
374
375/// Fetch type for FETCH message (draft-10).
376#[derive(Debug, Clone, Copy, PartialEq, Eq)]
377#[repr(u64)]
378pub enum FetchType {
379    /// Standalone fetch with explicit track and range.
380    Standalone = 1,
381    /// Joining fetch referencing an existing subscription.
382    Joining = 2,
383}
384
385impl FetchType {
386    /// Convert a raw value to a `FetchType`, if valid.
387    pub fn from_u64(v: u64) -> Option<Self> {
388        match v {
389            1 => Some(FetchType::Standalone),
390            2 => Some(FetchType::Joining),
391            _ => None,
392        }
393    }
394}
395
396/// FETCH message (type 0x16).
397#[derive(Debug, Clone, PartialEq, Eq)]
398pub struct Fetch {
399    /// The subscribe ID for this fetch request.
400    pub subscribe_id: VarInt,
401    /// The priority of this subscriber relative to others.
402    pub subscriber_priority: u8,
403    /// The requested group delivery order.
404    pub group_order: GroupOrder,
405    /// The fetch type (standalone or joining).
406    pub fetch_type: FetchType,
407    /// Track namespace (standalone only).
408    pub track_namespace: Option<TrackNamespace>,
409    /// Track name (standalone only).
410    pub track_name: Option<Vec<u8>>,
411    /// Start group (standalone only).
412    pub start_group: Option<VarInt>,
413    /// Start object (standalone only).
414    pub start_object: Option<VarInt>,
415    /// End group (standalone only).
416    pub end_group: Option<VarInt>,
417    /// End object (standalone only).
418    pub end_object: Option<VarInt>,
419    /// Joining subscribe ID (joining only).
420    pub joining_subscribe_id: Option<VarInt>,
421    /// Preceding group offset (joining only).
422    pub preceding_group_offset: Option<VarInt>,
423    /// Fetch parameters.
424    pub parameters: Vec<KeyValuePair>,
425}
426
427/// FETCH_OK message (type 0x18).
428#[derive(Debug, Clone, PartialEq, Eq)]
429pub struct FetchOk {
430    /// The subscribe ID this response corresponds to.
431    pub subscribe_id: VarInt,
432    /// The group delivery order chosen by the publisher.
433    pub group_order: GroupOrder,
434    /// Whether this fetch reaches the end of the track (1 = yes).
435    pub end_of_track: u8,
436    /// The largest group ID available.
437    pub largest_group_id: VarInt,
438    /// The largest object ID available.
439    pub largest_object_id: VarInt,
440    /// Fetch OK parameters.
441    pub parameters: Vec<KeyValuePair>,
442}
443
444/// FETCH_ERROR message (type 0x19).
445#[derive(Debug, Clone, PartialEq, Eq)]
446pub struct FetchError {
447    /// The subscribe ID this error corresponds to.
448    pub subscribe_id: VarInt,
449    /// The error code indicating the reason for failure.
450    pub error_code: VarInt,
451    /// A human-readable reason for the error.
452    pub reason_phrase: Vec<u8>,
453}
454
455/// FETCH_CANCEL message (type 0x17).
456#[derive(Debug, Clone, PartialEq, Eq)]
457pub struct FetchCancel {
458    /// The subscribe ID for the fetch to cancel.
459    pub subscribe_id: VarInt,
460}
461
462// ============================================================
463// Unified Message Enum
464// ============================================================
465
466/// A decoded MoQT control message (draft-10).
467#[derive(Debug, Clone, PartialEq, Eq)]
468pub enum ControlMessage {
469    /// ClientSetup (type 0x40).
470    ClientSetup(ClientSetup),
471    /// ServerSetup (type 0x41).
472    ServerSetup(ServerSetup),
473    /// GoAway (type 0x10).
474    GoAway(GoAway),
475    /// MaxSubscribeId (type 0x15).
476    MaxSubscribeId(MaxSubscribeId),
477    /// SubscribesBlocked (type 0x1A).
478    SubscribesBlocked(SubscribesBlocked),
479    /// Subscribe (type 0x03).
480    Subscribe(Subscribe),
481    /// SubscribeOk (type 0x04).
482    SubscribeOk(SubscribeOk),
483    /// SubscribeError (type 0x05).
484    SubscribeError(SubscribeError),
485    /// SubscribeUpdate (type 0x02).
486    SubscribeUpdate(SubscribeUpdate),
487    /// SubscribeDone (type 0x0B).
488    SubscribeDone(SubscribeDone),
489    /// Unsubscribe (type 0x0A).
490    Unsubscribe(Unsubscribe),
491    /// Announce (type 0x06).
492    Announce(Announce),
493    /// AnnounceOk (type 0x07).
494    AnnounceOk(AnnounceOk),
495    /// AnnounceError (type 0x08).
496    AnnounceError(AnnounceError),
497    /// AnnounceCancel (type 0x0C).
498    AnnounceCancel(AnnounceCancel),
499    /// Unannounce (type 0x09).
500    Unannounce(Unannounce),
501    /// SubscribeAnnounces (type 0x11).
502    SubscribeAnnounces(SubscribeAnnounces),
503    /// SubscribeAnnouncesOk (type 0x12).
504    SubscribeAnnouncesOk(SubscribeAnnouncesOk),
505    /// SubscribeAnnouncesError (type 0x13).
506    SubscribeAnnouncesError(SubscribeAnnouncesError),
507    /// UnsubscribeAnnounces (type 0x14).
508    UnsubscribeAnnounces(UnsubscribeAnnounces),
509    /// TrackStatusRequest (type 0x0D).
510    TrackStatusRequest(TrackStatusRequest),
511    /// TrackStatus (type 0x0E).
512    TrackStatus(TrackStatus),
513    /// Fetch (type 0x16).
514    Fetch(Fetch),
515    /// FetchOk (type 0x18).
516    FetchOk(FetchOk),
517    /// FetchError (type 0x19).
518    FetchError(FetchError),
519    /// FetchCancel (type 0x17).
520    FetchCancel(FetchCancel),
521}
522
523impl ControlMessage {
524    /// Return the message type for this control message.
525    pub fn message_type(&self) -> MessageType {
526        match self {
527            ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
528            ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
529            ControlMessage::GoAway(_) => MessageType::GoAway,
530            ControlMessage::MaxSubscribeId(_) => MessageType::MaxSubscribeId,
531            ControlMessage::SubscribesBlocked(_) => MessageType::SubscribesBlocked,
532            ControlMessage::Subscribe(_) => MessageType::Subscribe,
533            ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
534            ControlMessage::SubscribeError(_) => MessageType::SubscribeError,
535            ControlMessage::SubscribeUpdate(_) => MessageType::SubscribeUpdate,
536            ControlMessage::SubscribeDone(_) => MessageType::SubscribeDone,
537            ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
538            ControlMessage::Announce(_) => MessageType::Announce,
539            ControlMessage::AnnounceOk(_) => MessageType::AnnounceOk,
540            ControlMessage::AnnounceError(_) => MessageType::AnnounceError,
541            ControlMessage::AnnounceCancel(_) => MessageType::AnnounceCancel,
542            ControlMessage::Unannounce(_) => MessageType::Unannounce,
543            ControlMessage::SubscribeAnnounces(_) => MessageType::SubscribeAnnounces,
544            ControlMessage::SubscribeAnnouncesOk(_) => MessageType::SubscribeAnnouncesOk,
545            ControlMessage::SubscribeAnnouncesError(_) => MessageType::SubscribeAnnouncesError,
546            ControlMessage::UnsubscribeAnnounces(_) => MessageType::UnsubscribeAnnounces,
547            ControlMessage::TrackStatusRequest(_) => MessageType::TrackStatusRequest,
548            ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
549            ControlMessage::Fetch(_) => MessageType::Fetch,
550            ControlMessage::FetchOk(_) => MessageType::FetchOk,
551            ControlMessage::FetchError(_) => MessageType::FetchError,
552            ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
553        }
554    }
555
556    /// Encode this control message (type ID + length prefix + payload).
557    pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
558        let mut payload = Vec::with_capacity(256);
559        self.encode_payload(&mut payload)?;
560
561        if payload.len() > MAX_MESSAGE_LENGTH {
562            return Err(CodecError::MessageTooLong(payload.len()));
563        }
564
565        VarInt::from_usize(self.message_type().id() as usize).encode(buf);
566        VarInt::from_usize(payload.len()).encode(buf);
567        buf.put_slice(&payload);
568        Ok(())
569    }
570
571    /// Decode a control message from bytes.
572    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
573        let type_id = VarInt::decode(buf)?.into_inner();
574        let msg_type =
575            MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
576        let payload_len = VarInt::decode(buf)?.into_inner() as usize;
577        if buf.remaining() < payload_len {
578            return Err(CodecError::UnexpectedEnd);
579        }
580        let payload_bytes = buf.copy_to_bytes(payload_len);
581        let mut payload = &payload_bytes[..];
582        Self::decode_payload(msg_type, &mut payload)
583    }
584
585    fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
586        match self {
587            ControlMessage::ClientSetup(m) => {
588                VarInt::from_usize(m.supported_versions.len()).encode(buf);
589                for v in &m.supported_versions {
590                    v.encode(buf);
591                }
592                KeyValuePair::encode_list_d07(&m.parameters, buf);
593            }
594            ControlMessage::ServerSetup(m) => {
595                m.selected_version.encode(buf);
596                KeyValuePair::encode_list_d07(&m.parameters, buf);
597            }
598            ControlMessage::GoAway(m) => {
599                if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
600                    return Err(CodecError::GoAwayUriTooLong);
601                }
602                VarInt::from_usize(m.new_session_uri.len()).encode(buf);
603                buf.put_slice(&m.new_session_uri);
604            }
605            ControlMessage::MaxSubscribeId(m) => {
606                m.subscribe_id.encode(buf);
607            }
608            ControlMessage::SubscribesBlocked(m) => {
609                m.maximum_subscribe_id.encode(buf);
610            }
611            ControlMessage::Subscribe(m) => {
612                m.subscribe_id.encode(buf);
613                m.track_alias.encode(buf);
614                m.track_namespace.encode(buf);
615                VarInt::from_usize(m.track_name.len()).encode(buf);
616                buf.put_slice(&m.track_name);
617                buf.put_u8(m.subscriber_priority);
618                buf.put_u8(m.group_order as u8);
619                VarInt::from_usize(m.filter_type as usize).encode(buf);
620                if let Some(loc) = &m.start_location {
621                    loc.encode(buf);
622                }
623                if let Some(eg) = &m.end_group {
624                    eg.encode(buf);
625                }
626                KeyValuePair::encode_list_d07(&m.parameters, buf);
627            }
628            ControlMessage::SubscribeOk(m) => {
629                m.subscribe_id.encode(buf);
630                m.expires.encode(buf);
631                buf.put_u8(m.group_order as u8);
632                buf.put_u8(m.content_exists as u8);
633                if let Some(gid) = &m.largest_group_id {
634                    gid.encode(buf);
635                }
636                if let Some(oid) = &m.largest_object_id {
637                    oid.encode(buf);
638                }
639                KeyValuePair::encode_list_d07(&m.parameters, buf);
640            }
641            ControlMessage::SubscribeError(m) => {
642                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
643                    return Err(CodecError::ReasonPhraseTooLong);
644                }
645                m.subscribe_id.encode(buf);
646                m.error_code.encode(buf);
647                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
648                buf.put_slice(&m.reason_phrase);
649                m.track_alias.encode(buf);
650            }
651            ControlMessage::SubscribeUpdate(m) => {
652                m.subscribe_id.encode(buf);
653                m.start_group.encode(buf);
654                m.start_object.encode(buf);
655                m.end_group.encode(buf);
656                buf.put_u8(m.subscriber_priority);
657                KeyValuePair::encode_list_d07(&m.parameters, buf);
658            }
659            ControlMessage::SubscribeDone(m) => {
660                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
661                    return Err(CodecError::ReasonPhraseTooLong);
662                }
663                m.subscribe_id.encode(buf);
664                m.status_code.encode(buf);
665                m.stream_count.encode(buf);
666                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
667                buf.put_slice(&m.reason_phrase);
668            }
669            ControlMessage::Unsubscribe(m) => {
670                m.subscribe_id.encode(buf);
671            }
672            ControlMessage::Announce(m) => {
673                m.track_namespace.encode(buf);
674                KeyValuePair::encode_list_d07(&m.parameters, buf);
675            }
676            ControlMessage::AnnounceOk(m) => {
677                m.track_namespace.encode(buf);
678            }
679            ControlMessage::AnnounceError(m) => {
680                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
681                    return Err(CodecError::ReasonPhraseTooLong);
682                }
683                m.track_namespace.encode(buf);
684                m.error_code.encode(buf);
685                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
686                buf.put_slice(&m.reason_phrase);
687            }
688            ControlMessage::AnnounceCancel(m) => {
689                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
690                    return Err(CodecError::ReasonPhraseTooLong);
691                }
692                m.track_namespace.encode(buf);
693                m.error_code.encode(buf);
694                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
695                buf.put_slice(&m.reason_phrase);
696            }
697            ControlMessage::Unannounce(m) => {
698                m.track_namespace.encode(buf);
699            }
700            ControlMessage::SubscribeAnnounces(m) => {
701                m.track_namespace_prefix.encode(buf);
702                KeyValuePair::encode_list_d07(&m.parameters, buf);
703            }
704            ControlMessage::SubscribeAnnouncesOk(m) => {
705                m.track_namespace_prefix.encode(buf);
706            }
707            ControlMessage::SubscribeAnnouncesError(m) => {
708                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
709                    return Err(CodecError::ReasonPhraseTooLong);
710                }
711                m.track_namespace_prefix.encode(buf);
712                m.error_code.encode(buf);
713                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
714                buf.put_slice(&m.reason_phrase);
715            }
716            ControlMessage::UnsubscribeAnnounces(m) => {
717                m.track_namespace_prefix.encode(buf);
718            }
719            ControlMessage::TrackStatusRequest(m) => {
720                m.track_namespace.encode(buf);
721                VarInt::from_usize(m.track_name.len()).encode(buf);
722                buf.put_slice(&m.track_name);
723            }
724            ControlMessage::TrackStatus(m) => {
725                m.track_namespace.encode(buf);
726                VarInt::from_usize(m.track_name.len()).encode(buf);
727                buf.put_slice(&m.track_name);
728                m.status_code.encode(buf);
729                m.last_group_id.encode(buf);
730                m.last_object_id.encode(buf);
731            }
732            ControlMessage::Fetch(m) => {
733                m.subscribe_id.encode(buf);
734                buf.put_u8(m.subscriber_priority);
735                buf.put_u8(m.group_order as u8);
736                VarInt::from_usize(m.fetch_type as usize).encode(buf);
737                match m.fetch_type {
738                    FetchType::Standalone => {
739                        if let Some(ns) = &m.track_namespace {
740                            ns.encode(buf);
741                        }
742                        if let Some(name) = &m.track_name {
743                            VarInt::from_usize(name.len()).encode(buf);
744                            buf.put_slice(name);
745                        }
746                        if let Some(sg) = &m.start_group {
747                            sg.encode(buf);
748                        }
749                        if let Some(so) = &m.start_object {
750                            so.encode(buf);
751                        }
752                        if let Some(eg) = &m.end_group {
753                            eg.encode(buf);
754                        }
755                        if let Some(eo) = &m.end_object {
756                            eo.encode(buf);
757                        }
758                    }
759                    FetchType::Joining => {
760                        if let Some(jsi) = &m.joining_subscribe_id {
761                            jsi.encode(buf);
762                        }
763                        if let Some(pgo) = &m.preceding_group_offset {
764                            pgo.encode(buf);
765                        }
766                    }
767                }
768                KeyValuePair::encode_list_d07(&m.parameters, buf);
769            }
770            ControlMessage::FetchOk(m) => {
771                m.subscribe_id.encode(buf);
772                buf.put_u8(m.group_order as u8);
773                buf.put_u8(m.end_of_track);
774                m.largest_group_id.encode(buf);
775                m.largest_object_id.encode(buf);
776                KeyValuePair::encode_list_d07(&m.parameters, buf);
777            }
778            ControlMessage::FetchError(m) => {
779                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
780                    return Err(CodecError::ReasonPhraseTooLong);
781                }
782                m.subscribe_id.encode(buf);
783                m.error_code.encode(buf);
784                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
785                buf.put_slice(&m.reason_phrase);
786            }
787            ControlMessage::FetchCancel(m) => {
788                m.subscribe_id.encode(buf);
789            }
790        }
791        Ok(())
792    }
793
794    fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
795        match msg_type {
796            MessageType::ClientSetup => {
797                let num_versions = VarInt::decode(buf)?.into_inner() as usize;
798                if num_versions == 0 {
799                    return Err(CodecError::InvalidField);
800                }
801                let mut supported_versions = Vec::with_capacity(num_versions);
802                for _ in 0..num_versions {
803                    supported_versions.push(VarInt::decode(buf)?);
804                }
805                let parameters = KeyValuePair::decode_list_d07(buf)?;
806                Ok(ControlMessage::ClientSetup(ClientSetup { supported_versions, parameters }))
807            }
808            MessageType::ServerSetup => {
809                let selected_version = VarInt::decode(buf)?;
810                let parameters = KeyValuePair::decode_list_d07(buf)?;
811                Ok(ControlMessage::ServerSetup(ServerSetup { selected_version, parameters }))
812            }
813            MessageType::GoAway => {
814                let uri_len = VarInt::decode(buf)?.into_inner() as usize;
815                let uri = read_bytes(buf, uri_len)?;
816                Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
817            }
818            MessageType::MaxSubscribeId => {
819                let subscribe_id = VarInt::decode(buf)?;
820                Ok(ControlMessage::MaxSubscribeId(MaxSubscribeId { subscribe_id }))
821            }
822            MessageType::SubscribesBlocked => {
823                let maximum_subscribe_id = VarInt::decode(buf)?;
824                Ok(ControlMessage::SubscribesBlocked(SubscribesBlocked { maximum_subscribe_id }))
825            }
826            MessageType::Subscribe => {
827                let subscribe_id = VarInt::decode(buf)?;
828                let track_alias = VarInt::decode(buf)?;
829                let track_namespace = TrackNamespace::decode(buf)?;
830                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
831                let track_name = read_bytes(buf, track_name_len)?;
832                if buf.remaining() < 2 {
833                    return Err(CodecError::UnexpectedEnd);
834                }
835                let subscriber_priority = buf.get_u8();
836                let group_order =
837                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
838                let filter_val = VarInt::decode(buf)?.into_inner();
839                // Draft-10: filter_type=1 (NextGroupStart/LatestGroup) is removed.
840                if filter_val == 1 {
841                    return Err(CodecError::InvalidField);
842                }
843                let filter_type =
844                    FilterType::from_u64(filter_val).ok_or(CodecError::InvalidField)?;
845                let start_location = match filter_type {
846                    FilterType::AbsoluteStart | FilterType::AbsoluteRange => {
847                        Some(Location::decode(buf)?)
848                    }
849                    _ => None,
850                };
851                let end_group = match filter_type {
852                    FilterType::AbsoluteRange => Some(VarInt::decode(buf)?),
853                    _ => None,
854                };
855                let parameters = KeyValuePair::decode_list_d07(buf)?;
856                Ok(ControlMessage::Subscribe(Subscribe {
857                    subscribe_id,
858                    track_alias,
859                    track_namespace,
860                    track_name,
861                    subscriber_priority,
862                    group_order,
863                    filter_type,
864                    start_location,
865                    end_group,
866                    parameters,
867                }))
868            }
869            MessageType::SubscribeOk => {
870                let subscribe_id = VarInt::decode(buf)?;
871                let expires = VarInt::decode(buf)?;
872                if buf.remaining() < 2 {
873                    return Err(CodecError::UnexpectedEnd);
874                }
875                let group_order =
876                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
877                let content_exists_val = buf.get_u8();
878                let content_exists = match content_exists_val {
879                    0 => ContentExists::NoLargestLocation,
880                    1 => ContentExists::HasLargestLocation,
881                    _ => return Err(CodecError::InvalidField),
882                };
883                let (largest_group_id, largest_object_id) =
884                    if content_exists == ContentExists::HasLargestLocation {
885                        let gid = VarInt::decode(buf)?;
886                        let oid = VarInt::decode(buf)?;
887                        (Some(gid), Some(oid))
888                    } else {
889                        (None, None)
890                    };
891                let parameters = KeyValuePair::decode_list_d07(buf)?;
892                Ok(ControlMessage::SubscribeOk(SubscribeOk {
893                    subscribe_id,
894                    expires,
895                    group_order,
896                    content_exists,
897                    largest_group_id,
898                    largest_object_id,
899                    parameters,
900                }))
901            }
902            MessageType::SubscribeError => {
903                let subscribe_id = VarInt::decode(buf)?;
904                let error_code = VarInt::decode(buf)?;
905                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
906                let reason_phrase = read_bytes(buf, reason_len)?;
907                let track_alias = VarInt::decode(buf)?;
908                Ok(ControlMessage::SubscribeError(SubscribeError {
909                    subscribe_id,
910                    error_code,
911                    reason_phrase,
912                    track_alias,
913                }))
914            }
915            MessageType::SubscribeUpdate => {
916                let subscribe_id = VarInt::decode(buf)?;
917                let start_group = VarInt::decode(buf)?;
918                let start_object = VarInt::decode(buf)?;
919                let end_group = VarInt::decode(buf)?;
920                if buf.remaining() < 1 {
921                    return Err(CodecError::UnexpectedEnd);
922                }
923                let subscriber_priority = buf.get_u8();
924                let parameters = KeyValuePair::decode_list_d07(buf)?;
925                Ok(ControlMessage::SubscribeUpdate(SubscribeUpdate {
926                    subscribe_id,
927                    start_group,
928                    start_object,
929                    end_group,
930                    subscriber_priority,
931                    parameters,
932                }))
933            }
934            MessageType::SubscribeDone => {
935                let subscribe_id = VarInt::decode(buf)?;
936                let status_code = VarInt::decode(buf)?;
937                let stream_count = VarInt::decode(buf)?;
938                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
939                let reason_phrase = read_bytes(buf, reason_len)?;
940                Ok(ControlMessage::SubscribeDone(SubscribeDone {
941                    subscribe_id,
942                    status_code,
943                    stream_count,
944                    reason_phrase,
945                }))
946            }
947            MessageType::Unsubscribe => {
948                let subscribe_id = VarInt::decode(buf)?;
949                Ok(ControlMessage::Unsubscribe(Unsubscribe { subscribe_id }))
950            }
951            MessageType::Announce => {
952                let track_namespace = TrackNamespace::decode(buf)?;
953                let parameters = KeyValuePair::decode_list_d07(buf)?;
954                Ok(ControlMessage::Announce(Announce { track_namespace, parameters }))
955            }
956            MessageType::AnnounceOk => {
957                let track_namespace = TrackNamespace::decode(buf)?;
958                Ok(ControlMessage::AnnounceOk(AnnounceOk { track_namespace }))
959            }
960            MessageType::AnnounceError => {
961                let track_namespace = TrackNamespace::decode(buf)?;
962                let error_code = VarInt::decode(buf)?;
963                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
964                let reason_phrase = read_bytes(buf, reason_len)?;
965                Ok(ControlMessage::AnnounceError(AnnounceError {
966                    track_namespace,
967                    error_code,
968                    reason_phrase,
969                }))
970            }
971            MessageType::AnnounceCancel => {
972                let track_namespace = TrackNamespace::decode(buf)?;
973                let error_code = VarInt::decode(buf)?;
974                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
975                let reason_phrase = read_bytes(buf, reason_len)?;
976                Ok(ControlMessage::AnnounceCancel(AnnounceCancel {
977                    track_namespace,
978                    error_code,
979                    reason_phrase,
980                }))
981            }
982            MessageType::Unannounce => {
983                let track_namespace = TrackNamespace::decode(buf)?;
984                Ok(ControlMessage::Unannounce(Unannounce { track_namespace }))
985            }
986            MessageType::SubscribeAnnounces => {
987                let track_namespace_prefix = TrackNamespace::decode(buf)?;
988                let parameters = KeyValuePair::decode_list_d07(buf)?;
989                Ok(ControlMessage::SubscribeAnnounces(SubscribeAnnounces {
990                    track_namespace_prefix,
991                    parameters,
992                }))
993            }
994            MessageType::SubscribeAnnouncesOk => {
995                let track_namespace_prefix = TrackNamespace::decode(buf)?;
996                Ok(ControlMessage::SubscribeAnnouncesOk(SubscribeAnnouncesOk {
997                    track_namespace_prefix,
998                }))
999            }
1000            MessageType::SubscribeAnnouncesError => {
1001                let track_namespace_prefix = TrackNamespace::decode(buf)?;
1002                let error_code = VarInt::decode(buf)?;
1003                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1004                let reason_phrase = read_bytes(buf, reason_len)?;
1005                Ok(ControlMessage::SubscribeAnnouncesError(SubscribeAnnouncesError {
1006                    track_namespace_prefix,
1007                    error_code,
1008                    reason_phrase,
1009                }))
1010            }
1011            MessageType::UnsubscribeAnnounces => {
1012                let track_namespace_prefix = TrackNamespace::decode(buf)?;
1013                Ok(ControlMessage::UnsubscribeAnnounces(UnsubscribeAnnounces {
1014                    track_namespace_prefix,
1015                }))
1016            }
1017            MessageType::TrackStatusRequest => {
1018                let track_namespace = TrackNamespace::decode(buf)?;
1019                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1020                let track_name = read_bytes(buf, track_name_len)?;
1021                Ok(ControlMessage::TrackStatusRequest(TrackStatusRequest {
1022                    track_namespace,
1023                    track_name,
1024                }))
1025            }
1026            MessageType::TrackStatus => {
1027                let track_namespace = TrackNamespace::decode(buf)?;
1028                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1029                let track_name = read_bytes(buf, track_name_len)?;
1030                let status_code = VarInt::decode(buf)?;
1031                let last_group_id = VarInt::decode(buf)?;
1032                let last_object_id = VarInt::decode(buf)?;
1033                Ok(ControlMessage::TrackStatus(TrackStatus {
1034                    track_namespace,
1035                    track_name,
1036                    status_code,
1037                    last_group_id,
1038                    last_object_id,
1039                }))
1040            }
1041            MessageType::Fetch => {
1042                let subscribe_id = VarInt::decode(buf)?;
1043                if buf.remaining() < 2 {
1044                    return Err(CodecError::UnexpectedEnd);
1045                }
1046                let subscriber_priority = buf.get_u8();
1047                let group_order =
1048                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1049                let fetch_type_val = VarInt::decode(buf)?.into_inner();
1050                let fetch_type =
1051                    FetchType::from_u64(fetch_type_val).ok_or(CodecError::InvalidField)?;
1052                let (
1053                    track_namespace,
1054                    track_name,
1055                    start_group,
1056                    start_object,
1057                    end_group,
1058                    end_object,
1059                    joining_subscribe_id,
1060                    preceding_group_offset,
1061                ) = match fetch_type {
1062                    FetchType::Standalone => {
1063                        let ns = TrackNamespace::decode(buf)?;
1064                        let name_len = VarInt::decode(buf)?.into_inner() as usize;
1065                        let name = read_bytes(buf, name_len)?;
1066                        let sg = VarInt::decode(buf)?;
1067                        let so = VarInt::decode(buf)?;
1068                        let eg = VarInt::decode(buf)?;
1069                        let eo = VarInt::decode(buf)?;
1070                        (Some(ns), Some(name), Some(sg), Some(so), Some(eg), Some(eo), None, None)
1071                    }
1072                    FetchType::Joining => {
1073                        let jsi = VarInt::decode(buf)?;
1074                        let pgo = VarInt::decode(buf)?;
1075                        (None, None, None, None, None, None, Some(jsi), Some(pgo))
1076                    }
1077                };
1078                let parameters = KeyValuePair::decode_list_d07(buf)?;
1079                Ok(ControlMessage::Fetch(Fetch {
1080                    subscribe_id,
1081                    subscriber_priority,
1082                    group_order,
1083                    fetch_type,
1084                    track_namespace,
1085                    track_name,
1086                    start_group,
1087                    start_object,
1088                    end_group,
1089                    end_object,
1090                    joining_subscribe_id,
1091                    preceding_group_offset,
1092                    parameters,
1093                }))
1094            }
1095            MessageType::FetchOk => {
1096                let subscribe_id = VarInt::decode(buf)?;
1097                if buf.remaining() < 2 {
1098                    return Err(CodecError::UnexpectedEnd);
1099                }
1100                let group_order =
1101                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1102                let end_of_track = buf.get_u8();
1103                let largest_group_id = VarInt::decode(buf)?;
1104                let largest_object_id = VarInt::decode(buf)?;
1105                let parameters = KeyValuePair::decode_list_d07(buf)?;
1106                Ok(ControlMessage::FetchOk(FetchOk {
1107                    subscribe_id,
1108                    group_order,
1109                    end_of_track,
1110                    largest_group_id,
1111                    largest_object_id,
1112                    parameters,
1113                }))
1114            }
1115            MessageType::FetchError => {
1116                let subscribe_id = VarInt::decode(buf)?;
1117                let error_code = VarInt::decode(buf)?;
1118                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1119                let reason_phrase = read_bytes(buf, reason_len)?;
1120                Ok(ControlMessage::FetchError(FetchError {
1121                    subscribe_id,
1122                    error_code,
1123                    reason_phrase,
1124                }))
1125            }
1126            MessageType::FetchCancel => {
1127                let subscribe_id = VarInt::decode(buf)?;
1128                Ok(ControlMessage::FetchCancel(FetchCancel { subscribe_id }))
1129            }
1130        }
1131    }
1132}