1use crate::error::{
4 CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_REASON_PHRASE_LENGTH,
5};
6use crate::kvp::KeyValuePair;
7use crate::types::read_bytes;
8use crate::types::*;
9use crate::varint::VarInt;
10use bytes::{Buf, BufMut};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14#[repr(u64)]
15pub enum MessageType {
16 SubscribeUpdate = 0x02,
18 Subscribe = 0x03,
20 SubscribeOk = 0x04,
22 SubscribeError = 0x05,
24 Announce = 0x06,
26 AnnounceOk = 0x07,
28 AnnounceError = 0x08,
30 Unannounce = 0x09,
32 Unsubscribe = 0x0A,
34 SubscribeDone = 0x0B,
36 AnnounceCancel = 0x0C,
38 TrackStatusRequest = 0x0D,
40 TrackStatus = 0x0E,
42 GoAway = 0x10,
44 SubscribeAnnounces = 0x11,
46 SubscribeAnnouncesOk = 0x12,
48 SubscribeAnnouncesError = 0x13,
50 UnsubscribeAnnounces = 0x14,
52 MaxSubscribeId = 0x15,
54 Fetch = 0x16,
56 FetchCancel = 0x17,
58 FetchOk = 0x18,
60 FetchError = 0x19,
62 SubscribesBlocked = 0x1A,
64 ClientSetup = 0x40,
66 ServerSetup = 0x41,
68}
69
70impl MessageType {
71 pub fn from_id(id: u64) -> Option<Self> {
73 match id {
74 0x02 => Some(MessageType::SubscribeUpdate),
75 0x03 => Some(MessageType::Subscribe),
76 0x04 => Some(MessageType::SubscribeOk),
77 0x05 => Some(MessageType::SubscribeError),
78 0x06 => Some(MessageType::Announce),
79 0x07 => Some(MessageType::AnnounceOk),
80 0x08 => Some(MessageType::AnnounceError),
81 0x09 => Some(MessageType::Unannounce),
82 0x0A => Some(MessageType::Unsubscribe),
83 0x0B => Some(MessageType::SubscribeDone),
84 0x0C => Some(MessageType::AnnounceCancel),
85 0x0D => Some(MessageType::TrackStatusRequest),
86 0x0E => Some(MessageType::TrackStatus),
87 0x10 => Some(MessageType::GoAway),
88 0x11 => Some(MessageType::SubscribeAnnounces),
89 0x12 => Some(MessageType::SubscribeAnnouncesOk),
90 0x13 => Some(MessageType::SubscribeAnnouncesError),
91 0x14 => Some(MessageType::UnsubscribeAnnounces),
92 0x15 => Some(MessageType::MaxSubscribeId),
93 0x16 => Some(MessageType::Fetch),
94 0x17 => Some(MessageType::FetchCancel),
95 0x18 => Some(MessageType::FetchOk),
96 0x19 => Some(MessageType::FetchError),
97 0x1A => Some(MessageType::SubscribesBlocked),
98 0x40 => Some(MessageType::ClientSetup),
99 0x41 => Some(MessageType::ServerSetup),
100 _ => None,
101 }
102 }
103
104 pub fn id(&self) -> u64 {
106 *self as u64
107 }
108}
109
110#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct ClientSetup {
117 pub supported_versions: Vec<VarInt>,
119 pub parameters: Vec<KeyValuePair>,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq)]
125pub struct ServerSetup {
126 pub selected_version: VarInt,
128 pub parameters: Vec<KeyValuePair>,
130}
131
132#[derive(Debug, Clone, PartialEq, Eq)]
134pub struct GoAway {
135 pub new_session_uri: Vec<u8>,
137}
138
139#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct MaxSubscribeId {
142 pub subscribe_id: VarInt,
144}
145
146#[derive(Debug, Clone, PartialEq, Eq)]
148pub struct SubscribesBlocked {
149 pub maximum_subscribe_id: VarInt,
151}
152
153#[derive(Debug, Clone, PartialEq, Eq)]
161pub struct Subscribe {
162 pub subscribe_id: VarInt,
164 pub track_alias: VarInt,
166 pub track_namespace: TrackNamespace,
168 pub track_name: Vec<u8>,
170 pub subscriber_priority: u8,
172 pub group_order: GroupOrder,
174 pub filter_type: FilterType,
176 pub start_location: Option<Location>,
178 pub end_group: Option<VarInt>,
180 pub parameters: Vec<KeyValuePair>,
182}
183
184#[derive(Debug, Clone, PartialEq, Eq)]
186pub struct SubscribeOk {
187 pub subscribe_id: VarInt,
189 pub expires: VarInt,
191 pub group_order: GroupOrder,
193 pub content_exists: ContentExists,
195 pub largest_group_id: Option<VarInt>,
197 pub largest_object_id: Option<VarInt>,
199 pub parameters: Vec<KeyValuePair>,
201}
202
203#[derive(Debug, Clone, PartialEq, Eq)]
205pub struct SubscribeError {
206 pub subscribe_id: VarInt,
208 pub error_code: VarInt,
210 pub reason_phrase: Vec<u8>,
212 pub track_alias: VarInt,
214}
215
216#[derive(Debug, Clone, PartialEq, Eq)]
220pub struct SubscribeUpdate {
221 pub subscribe_id: VarInt,
223 pub start_group: VarInt,
225 pub start_object: VarInt,
227 pub end_group: VarInt,
229 pub subscriber_priority: u8,
231 pub parameters: Vec<KeyValuePair>,
233}
234
235#[derive(Debug, Clone, PartialEq, Eq)]
239pub struct SubscribeDone {
240 pub subscribe_id: VarInt,
242 pub status_code: VarInt,
244 pub stream_count: VarInt,
246 pub reason_phrase: Vec<u8>,
248}
249
250#[derive(Debug, Clone, PartialEq, Eq)]
252pub struct Unsubscribe {
253 pub subscribe_id: VarInt,
255}
256
257#[derive(Debug, Clone, PartialEq, Eq)]
263pub struct Announce {
264 pub track_namespace: TrackNamespace,
266 pub parameters: Vec<KeyValuePair>,
268}
269
270#[derive(Debug, Clone, PartialEq, Eq)]
272pub struct AnnounceOk {
273 pub track_namespace: TrackNamespace,
275}
276
277#[derive(Debug, Clone, PartialEq, Eq)]
279pub struct AnnounceError {
280 pub track_namespace: TrackNamespace,
282 pub error_code: VarInt,
284 pub reason_phrase: Vec<u8>,
286}
287
288#[derive(Debug, Clone, PartialEq, Eq)]
290pub struct AnnounceCancel {
291 pub track_namespace: TrackNamespace,
293 pub error_code: VarInt,
295 pub reason_phrase: Vec<u8>,
297}
298
299#[derive(Debug, Clone, PartialEq, Eq)]
301pub struct Unannounce {
302 pub track_namespace: TrackNamespace,
304}
305
306#[derive(Debug, Clone, PartialEq, Eq)]
312pub struct SubscribeAnnounces {
313 pub track_namespace_prefix: TrackNamespace,
315 pub parameters: Vec<KeyValuePair>,
317}
318
319#[derive(Debug, Clone, PartialEq, Eq)]
321pub struct SubscribeAnnouncesOk {
322 pub track_namespace_prefix: TrackNamespace,
324}
325
326#[derive(Debug, Clone, PartialEq, Eq)]
328pub struct SubscribeAnnouncesError {
329 pub track_namespace_prefix: TrackNamespace,
331 pub error_code: VarInt,
333 pub reason_phrase: Vec<u8>,
335}
336
337#[derive(Debug, Clone, PartialEq, Eq)]
339pub struct UnsubscribeAnnounces {
340 pub track_namespace_prefix: TrackNamespace,
342}
343
344#[derive(Debug, Clone, PartialEq, Eq)]
350pub struct TrackStatusRequest {
351 pub track_namespace: TrackNamespace,
353 pub track_name: Vec<u8>,
355}
356
357#[derive(Debug, Clone, PartialEq, Eq)]
359pub struct TrackStatus {
360 pub track_namespace: TrackNamespace,
362 pub track_name: Vec<u8>,
364 pub status_code: VarInt,
366 pub last_group_id: VarInt,
368 pub last_object_id: VarInt,
370}
371
372#[derive(Debug, Clone, Copy, PartialEq, Eq)]
378#[repr(u64)]
379pub enum FetchType {
380 Standalone = 1,
382 Joining = 2,
384}
385
386impl FetchType {
387 pub fn from_u64(v: u64) -> Option<Self> {
389 match v {
390 1 => Some(FetchType::Standalone),
391 2 => Some(FetchType::Joining),
392 _ => None,
393 }
394 }
395}
396
397#[derive(Debug, Clone, PartialEq, Eq)]
401pub struct Fetch {
402 pub subscribe_id: VarInt,
404 pub subscriber_priority: u8,
406 pub group_order: GroupOrder,
408 pub fetch_type: FetchType,
410 pub track_namespace: Option<TrackNamespace>,
412 pub track_name: Option<Vec<u8>>,
414 pub start_group: Option<VarInt>,
416 pub start_object: Option<VarInt>,
418 pub end_group: Option<VarInt>,
420 pub end_object: Option<VarInt>,
422 pub joining_subscribe_id: Option<VarInt>,
424 pub preceding_group_offset: Option<VarInt>,
426 pub parameters: Vec<KeyValuePair>,
428}
429
430#[derive(Debug, Clone, PartialEq, Eq)]
434pub struct FetchOk {
435 pub subscribe_id: VarInt,
437 pub group_order: GroupOrder,
439 pub end_of_track: u8,
441 pub largest_group_id: VarInt,
443 pub largest_object_id: VarInt,
445 pub parameters: Vec<KeyValuePair>,
447}
448
449#[derive(Debug, Clone, PartialEq, Eq)]
451pub struct FetchError {
452 pub subscribe_id: VarInt,
454 pub error_code: VarInt,
456 pub reason_phrase: Vec<u8>,
458}
459
460#[derive(Debug, Clone, PartialEq, Eq)]
462pub struct FetchCancel {
463 pub subscribe_id: VarInt,
465}
466
467#[derive(Debug, Clone, PartialEq, Eq)]
473pub enum ControlMessage {
474 ClientSetup(ClientSetup),
476 ServerSetup(ServerSetup),
478 GoAway(GoAway),
480 MaxSubscribeId(MaxSubscribeId),
482 SubscribesBlocked(SubscribesBlocked),
484 Subscribe(Subscribe),
486 SubscribeOk(SubscribeOk),
488 SubscribeError(SubscribeError),
490 SubscribeUpdate(SubscribeUpdate),
492 SubscribeDone(SubscribeDone),
494 Unsubscribe(Unsubscribe),
496 Announce(Announce),
498 AnnounceOk(AnnounceOk),
500 AnnounceError(AnnounceError),
502 AnnounceCancel(AnnounceCancel),
504 Unannounce(Unannounce),
506 SubscribeAnnounces(SubscribeAnnounces),
508 SubscribeAnnouncesOk(SubscribeAnnouncesOk),
510 SubscribeAnnouncesError(SubscribeAnnouncesError),
512 UnsubscribeAnnounces(UnsubscribeAnnounces),
514 TrackStatusRequest(TrackStatusRequest),
516 TrackStatus(TrackStatus),
518 Fetch(Fetch),
520 FetchOk(FetchOk),
522 FetchError(FetchError),
524 FetchCancel(FetchCancel),
526}
527
528impl ControlMessage {
529 pub fn message_type(&self) -> MessageType {
531 match self {
532 ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
533 ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
534 ControlMessage::GoAway(_) => MessageType::GoAway,
535 ControlMessage::MaxSubscribeId(_) => MessageType::MaxSubscribeId,
536 ControlMessage::SubscribesBlocked(_) => MessageType::SubscribesBlocked,
537 ControlMessage::Subscribe(_) => MessageType::Subscribe,
538 ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
539 ControlMessage::SubscribeError(_) => MessageType::SubscribeError,
540 ControlMessage::SubscribeUpdate(_) => MessageType::SubscribeUpdate,
541 ControlMessage::SubscribeDone(_) => MessageType::SubscribeDone,
542 ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
543 ControlMessage::Announce(_) => MessageType::Announce,
544 ControlMessage::AnnounceOk(_) => MessageType::AnnounceOk,
545 ControlMessage::AnnounceError(_) => MessageType::AnnounceError,
546 ControlMessage::AnnounceCancel(_) => MessageType::AnnounceCancel,
547 ControlMessage::Unannounce(_) => MessageType::Unannounce,
548 ControlMessage::SubscribeAnnounces(_) => MessageType::SubscribeAnnounces,
549 ControlMessage::SubscribeAnnouncesOk(_) => MessageType::SubscribeAnnouncesOk,
550 ControlMessage::SubscribeAnnouncesError(_) => MessageType::SubscribeAnnouncesError,
551 ControlMessage::UnsubscribeAnnounces(_) => MessageType::UnsubscribeAnnounces,
552 ControlMessage::TrackStatusRequest(_) => MessageType::TrackStatusRequest,
553 ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
554 ControlMessage::Fetch(_) => MessageType::Fetch,
555 ControlMessage::FetchOk(_) => MessageType::FetchOk,
556 ControlMessage::FetchError(_) => MessageType::FetchError,
557 ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
558 }
559 }
560
561 pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
563 let mut payload = Vec::with_capacity(256);
564 self.encode_payload(&mut payload)?;
565
566 if payload.len() > MAX_MESSAGE_LENGTH {
567 return Err(CodecError::MessageTooLong(payload.len()));
568 }
569
570 VarInt::from_usize(self.message_type().id() as usize).encode(buf);
571 VarInt::from_usize(payload.len()).encode(buf);
572 buf.put_slice(&payload);
573 Ok(())
574 }
575
576 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
578 let type_id = VarInt::decode(buf)?.into_inner();
579 let msg_type =
580 MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
581 let payload_len = VarInt::decode(buf)?.into_inner() as usize;
582 if buf.remaining() < payload_len {
583 return Err(CodecError::UnexpectedEnd);
584 }
585 let payload_bytes = buf.copy_to_bytes(payload_len);
586 let mut payload = &payload_bytes[..];
587 Self::decode_payload(msg_type, &mut payload)
588 }
589
590 fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
591 match self {
592 ControlMessage::ClientSetup(m) => {
593 VarInt::from_usize(m.supported_versions.len()).encode(buf);
594 for v in &m.supported_versions {
595 v.encode(buf);
596 }
597 KeyValuePair::encode_list_d07(&m.parameters, buf);
598 }
599 ControlMessage::ServerSetup(m) => {
600 m.selected_version.encode(buf);
601 KeyValuePair::encode_list_d07(&m.parameters, buf);
602 }
603 ControlMessage::GoAway(m) => {
604 if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
605 return Err(CodecError::GoAwayUriTooLong);
606 }
607 VarInt::from_usize(m.new_session_uri.len()).encode(buf);
608 buf.put_slice(&m.new_session_uri);
609 }
610 ControlMessage::MaxSubscribeId(m) => {
611 m.subscribe_id.encode(buf);
612 }
613 ControlMessage::SubscribesBlocked(m) => {
614 m.maximum_subscribe_id.encode(buf);
615 }
616 ControlMessage::Subscribe(m) => {
617 m.subscribe_id.encode(buf);
618 m.track_alias.encode(buf);
619 m.track_namespace.encode(buf);
620 VarInt::from_usize(m.track_name.len()).encode(buf);
621 buf.put_slice(&m.track_name);
622 buf.put_u8(m.subscriber_priority);
623 buf.put_u8(m.group_order as u8);
624 VarInt::from_usize(m.filter_type as usize).encode(buf);
625 if let Some(loc) = &m.start_location {
626 loc.encode(buf);
627 }
628 if let Some(eg) = &m.end_group {
629 eg.encode(buf);
630 }
631 KeyValuePair::encode_list_d07(&m.parameters, buf);
632 }
633 ControlMessage::SubscribeOk(m) => {
634 m.subscribe_id.encode(buf);
635 m.expires.encode(buf);
636 buf.put_u8(m.group_order as u8);
637 buf.put_u8(m.content_exists as u8);
638 if let Some(gid) = &m.largest_group_id {
639 gid.encode(buf);
640 }
641 if let Some(oid) = &m.largest_object_id {
642 oid.encode(buf);
643 }
644 KeyValuePair::encode_list_d07(&m.parameters, buf);
645 }
646 ControlMessage::SubscribeError(m) => {
647 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
648 return Err(CodecError::ReasonPhraseTooLong);
649 }
650 m.subscribe_id.encode(buf);
651 m.error_code.encode(buf);
652 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
653 buf.put_slice(&m.reason_phrase);
654 m.track_alias.encode(buf);
655 }
656 ControlMessage::SubscribeUpdate(m) => {
657 m.subscribe_id.encode(buf);
658 m.start_group.encode(buf);
659 m.start_object.encode(buf);
660 m.end_group.encode(buf);
661 buf.put_u8(m.subscriber_priority);
662 KeyValuePair::encode_list_d07(&m.parameters, buf);
663 }
664 ControlMessage::SubscribeDone(m) => {
665 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
666 return Err(CodecError::ReasonPhraseTooLong);
667 }
668 m.subscribe_id.encode(buf);
669 m.status_code.encode(buf);
670 m.stream_count.encode(buf);
671 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
672 buf.put_slice(&m.reason_phrase);
673 }
674 ControlMessage::Unsubscribe(m) => {
675 m.subscribe_id.encode(buf);
676 }
677 ControlMessage::Announce(m) => {
678 m.track_namespace.encode(buf);
679 KeyValuePair::encode_list_d07(&m.parameters, buf);
680 }
681 ControlMessage::AnnounceOk(m) => {
682 m.track_namespace.encode(buf);
683 }
684 ControlMessage::AnnounceError(m) => {
685 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
686 return Err(CodecError::ReasonPhraseTooLong);
687 }
688 m.track_namespace.encode(buf);
689 m.error_code.encode(buf);
690 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
691 buf.put_slice(&m.reason_phrase);
692 }
693 ControlMessage::AnnounceCancel(m) => {
694 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
695 return Err(CodecError::ReasonPhraseTooLong);
696 }
697 m.track_namespace.encode(buf);
698 m.error_code.encode(buf);
699 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
700 buf.put_slice(&m.reason_phrase);
701 }
702 ControlMessage::Unannounce(m) => {
703 m.track_namespace.encode(buf);
704 }
705 ControlMessage::SubscribeAnnounces(m) => {
706 m.track_namespace_prefix.encode(buf);
707 KeyValuePair::encode_list_d07(&m.parameters, buf);
708 }
709 ControlMessage::SubscribeAnnouncesOk(m) => {
710 m.track_namespace_prefix.encode(buf);
711 }
712 ControlMessage::SubscribeAnnouncesError(m) => {
713 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
714 return Err(CodecError::ReasonPhraseTooLong);
715 }
716 m.track_namespace_prefix.encode(buf);
717 m.error_code.encode(buf);
718 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
719 buf.put_slice(&m.reason_phrase);
720 }
721 ControlMessage::UnsubscribeAnnounces(m) => {
722 m.track_namespace_prefix.encode(buf);
723 }
724 ControlMessage::TrackStatusRequest(m) => {
725 m.track_namespace.encode(buf);
726 VarInt::from_usize(m.track_name.len()).encode(buf);
727 buf.put_slice(&m.track_name);
728 }
729 ControlMessage::TrackStatus(m) => {
730 m.track_namespace.encode(buf);
731 VarInt::from_usize(m.track_name.len()).encode(buf);
732 buf.put_slice(&m.track_name);
733 m.status_code.encode(buf);
734 m.last_group_id.encode(buf);
735 m.last_object_id.encode(buf);
736 }
737 ControlMessage::Fetch(m) => {
738 m.subscribe_id.encode(buf);
739 buf.put_u8(m.subscriber_priority);
740 buf.put_u8(m.group_order as u8);
741 VarInt::from_usize(m.fetch_type as usize).encode(buf);
742 match m.fetch_type {
743 FetchType::Standalone => {
744 if let Some(ns) = &m.track_namespace {
745 ns.encode(buf);
746 }
747 if let Some(name) = &m.track_name {
748 VarInt::from_usize(name.len()).encode(buf);
749 buf.put_slice(name);
750 }
751 if let Some(sg) = &m.start_group {
752 sg.encode(buf);
753 }
754 if let Some(so) = &m.start_object {
755 so.encode(buf);
756 }
757 if let Some(eg) = &m.end_group {
758 eg.encode(buf);
759 }
760 if let Some(eo) = &m.end_object {
761 eo.encode(buf);
762 }
763 }
764 FetchType::Joining => {
765 if let Some(jsi) = &m.joining_subscribe_id {
766 jsi.encode(buf);
767 }
768 if let Some(pgo) = &m.preceding_group_offset {
769 pgo.encode(buf);
770 }
771 }
772 }
773 KeyValuePair::encode_list_d07(&m.parameters, buf);
774 }
775 ControlMessage::FetchOk(m) => {
776 m.subscribe_id.encode(buf);
777 buf.put_u8(m.group_order as u8);
778 buf.put_u8(m.end_of_track);
779 m.largest_group_id.encode(buf);
780 m.largest_object_id.encode(buf);
781 KeyValuePair::encode_list_d07(&m.parameters, buf);
782 }
783 ControlMessage::FetchError(m) => {
784 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
785 return Err(CodecError::ReasonPhraseTooLong);
786 }
787 m.subscribe_id.encode(buf);
788 m.error_code.encode(buf);
789 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
790 buf.put_slice(&m.reason_phrase);
791 }
792 ControlMessage::FetchCancel(m) => {
793 m.subscribe_id.encode(buf);
794 }
795 }
796 Ok(())
797 }
798
799 fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
800 match msg_type {
801 MessageType::ClientSetup => {
802 let num_versions = VarInt::decode(buf)?.into_inner() as usize;
803 if num_versions == 0 {
804 return Err(CodecError::InvalidField);
805 }
806 let mut supported_versions = Vec::with_capacity(num_versions);
807 for _ in 0..num_versions {
808 supported_versions.push(VarInt::decode(buf)?);
809 }
810 let parameters = KeyValuePair::decode_list_d07(buf)?;
811 Ok(ControlMessage::ClientSetup(ClientSetup { supported_versions, parameters }))
812 }
813 MessageType::ServerSetup => {
814 let selected_version = VarInt::decode(buf)?;
815 let parameters = KeyValuePair::decode_list_d07(buf)?;
816 Ok(ControlMessage::ServerSetup(ServerSetup { selected_version, parameters }))
817 }
818 MessageType::GoAway => {
819 let uri_len = VarInt::decode(buf)?.into_inner() as usize;
820 let uri = read_bytes(buf, uri_len)?;
821 Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
822 }
823 MessageType::MaxSubscribeId => {
824 let subscribe_id = VarInt::decode(buf)?;
825 Ok(ControlMessage::MaxSubscribeId(MaxSubscribeId { subscribe_id }))
826 }
827 MessageType::SubscribesBlocked => {
828 let maximum_subscribe_id = VarInt::decode(buf)?;
829 Ok(ControlMessage::SubscribesBlocked(SubscribesBlocked { maximum_subscribe_id }))
830 }
831 MessageType::Subscribe => {
832 let subscribe_id = VarInt::decode(buf)?;
833 let track_alias = VarInt::decode(buf)?;
834 let track_namespace = TrackNamespace::decode(buf)?;
835 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
836 let track_name = read_bytes(buf, track_name_len)?;
837 if buf.remaining() < 2 {
838 return Err(CodecError::UnexpectedEnd);
839 }
840 let subscriber_priority = buf.get_u8();
841 let group_order =
842 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
843 let filter_val = VarInt::decode(buf)?.into_inner();
844 let filter_type =
845 FilterType::from_u64(filter_val).ok_or(CodecError::InvalidField)?;
846 let start_location = match filter_type {
847 FilterType::AbsoluteStart | FilterType::AbsoluteRange => {
848 Some(Location::decode(buf)?)
849 }
850 _ => None,
851 };
852 let end_group = match filter_type {
853 FilterType::AbsoluteRange => Some(VarInt::decode(buf)?),
854 _ => None,
855 };
856 let parameters = KeyValuePair::decode_list_d07(buf)?;
857 Ok(ControlMessage::Subscribe(Subscribe {
858 subscribe_id,
859 track_alias,
860 track_namespace,
861 track_name,
862 subscriber_priority,
863 group_order,
864 filter_type,
865 start_location,
866 end_group,
867 parameters,
868 }))
869 }
870 MessageType::SubscribeOk => {
871 let subscribe_id = VarInt::decode(buf)?;
872 let expires = VarInt::decode(buf)?;
873 if buf.remaining() < 2 {
874 return Err(CodecError::UnexpectedEnd);
875 }
876 let group_order =
877 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
878 let content_exists_val = buf.get_u8();
879 let content_exists = match content_exists_val {
880 0 => ContentExists::NoLargestLocation,
881 1 => ContentExists::HasLargestLocation,
882 _ => return Err(CodecError::InvalidField),
883 };
884 let (largest_group_id, largest_object_id) =
885 if content_exists == ContentExists::HasLargestLocation {
886 let gid = VarInt::decode(buf)?;
887 let oid = VarInt::decode(buf)?;
888 (Some(gid), Some(oid))
889 } else {
890 (None, None)
891 };
892 let parameters = KeyValuePair::decode_list_d07(buf)?;
893 Ok(ControlMessage::SubscribeOk(SubscribeOk {
894 subscribe_id,
895 expires,
896 group_order,
897 content_exists,
898 largest_group_id,
899 largest_object_id,
900 parameters,
901 }))
902 }
903 MessageType::SubscribeError => {
904 let subscribe_id = VarInt::decode(buf)?;
905 let error_code = VarInt::decode(buf)?;
906 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
907 let reason_phrase = read_bytes(buf, reason_len)?;
908 let track_alias = VarInt::decode(buf)?;
909 Ok(ControlMessage::SubscribeError(SubscribeError {
910 subscribe_id,
911 error_code,
912 reason_phrase,
913 track_alias,
914 }))
915 }
916 MessageType::SubscribeUpdate => {
917 let subscribe_id = VarInt::decode(buf)?;
918 let start_group = VarInt::decode(buf)?;
919 let start_object = VarInt::decode(buf)?;
920 let end_group = VarInt::decode(buf)?;
921 if buf.remaining() < 1 {
922 return Err(CodecError::UnexpectedEnd);
923 }
924 let subscriber_priority = buf.get_u8();
925 let parameters = KeyValuePair::decode_list_d07(buf)?;
926 Ok(ControlMessage::SubscribeUpdate(SubscribeUpdate {
927 subscribe_id,
928 start_group,
929 start_object,
930 end_group,
931 subscriber_priority,
932 parameters,
933 }))
934 }
935 MessageType::SubscribeDone => {
936 let subscribe_id = VarInt::decode(buf)?;
937 let status_code = VarInt::decode(buf)?;
938 let stream_count = VarInt::decode(buf)?;
939 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
940 let reason_phrase = read_bytes(buf, reason_len)?;
941 Ok(ControlMessage::SubscribeDone(SubscribeDone {
942 subscribe_id,
943 status_code,
944 stream_count,
945 reason_phrase,
946 }))
947 }
948 MessageType::Unsubscribe => {
949 let subscribe_id = VarInt::decode(buf)?;
950 Ok(ControlMessage::Unsubscribe(Unsubscribe { subscribe_id }))
951 }
952 MessageType::Announce => {
953 let track_namespace = TrackNamespace::decode(buf)?;
954 let parameters = KeyValuePair::decode_list_d07(buf)?;
955 Ok(ControlMessage::Announce(Announce { track_namespace, parameters }))
956 }
957 MessageType::AnnounceOk => {
958 let track_namespace = TrackNamespace::decode(buf)?;
959 Ok(ControlMessage::AnnounceOk(AnnounceOk { track_namespace }))
960 }
961 MessageType::AnnounceError => {
962 let track_namespace = TrackNamespace::decode(buf)?;
963 let error_code = VarInt::decode(buf)?;
964 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
965 let reason_phrase = read_bytes(buf, reason_len)?;
966 Ok(ControlMessage::AnnounceError(AnnounceError {
967 track_namespace,
968 error_code,
969 reason_phrase,
970 }))
971 }
972 MessageType::AnnounceCancel => {
973 let track_namespace = TrackNamespace::decode(buf)?;
974 let error_code = VarInt::decode(buf)?;
975 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
976 let reason_phrase = read_bytes(buf, reason_len)?;
977 Ok(ControlMessage::AnnounceCancel(AnnounceCancel {
978 track_namespace,
979 error_code,
980 reason_phrase,
981 }))
982 }
983 MessageType::Unannounce => {
984 let track_namespace = TrackNamespace::decode(buf)?;
985 Ok(ControlMessage::Unannounce(Unannounce { track_namespace }))
986 }
987 MessageType::SubscribeAnnounces => {
988 let track_namespace_prefix = TrackNamespace::decode(buf)?;
989 let parameters = KeyValuePair::decode_list_d07(buf)?;
990 Ok(ControlMessage::SubscribeAnnounces(SubscribeAnnounces {
991 track_namespace_prefix,
992 parameters,
993 }))
994 }
995 MessageType::SubscribeAnnouncesOk => {
996 let track_namespace_prefix = TrackNamespace::decode(buf)?;
997 Ok(ControlMessage::SubscribeAnnouncesOk(SubscribeAnnouncesOk {
998 track_namespace_prefix,
999 }))
1000 }
1001 MessageType::SubscribeAnnouncesError => {
1002 let track_namespace_prefix = TrackNamespace::decode(buf)?;
1003 let error_code = VarInt::decode(buf)?;
1004 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1005 let reason_phrase = read_bytes(buf, reason_len)?;
1006 Ok(ControlMessage::SubscribeAnnouncesError(SubscribeAnnouncesError {
1007 track_namespace_prefix,
1008 error_code,
1009 reason_phrase,
1010 }))
1011 }
1012 MessageType::UnsubscribeAnnounces => {
1013 let track_namespace_prefix = TrackNamespace::decode(buf)?;
1014 Ok(ControlMessage::UnsubscribeAnnounces(UnsubscribeAnnounces {
1015 track_namespace_prefix,
1016 }))
1017 }
1018 MessageType::TrackStatusRequest => {
1019 let track_namespace = TrackNamespace::decode(buf)?;
1020 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1021 let track_name = read_bytes(buf, track_name_len)?;
1022 Ok(ControlMessage::TrackStatusRequest(TrackStatusRequest {
1023 track_namespace,
1024 track_name,
1025 }))
1026 }
1027 MessageType::TrackStatus => {
1028 let track_namespace = TrackNamespace::decode(buf)?;
1029 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1030 let track_name = read_bytes(buf, track_name_len)?;
1031 let status_code = VarInt::decode(buf)?;
1032 let last_group_id = VarInt::decode(buf)?;
1033 let last_object_id = VarInt::decode(buf)?;
1034 Ok(ControlMessage::TrackStatus(TrackStatus {
1035 track_namespace,
1036 track_name,
1037 status_code,
1038 last_group_id,
1039 last_object_id,
1040 }))
1041 }
1042 MessageType::Fetch => {
1043 let subscribe_id = VarInt::decode(buf)?;
1044 if buf.remaining() < 2 {
1045 return Err(CodecError::UnexpectedEnd);
1046 }
1047 let subscriber_priority = buf.get_u8();
1048 let group_order =
1049 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1050 let fetch_type_val = VarInt::decode(buf)?.into_inner();
1051 let fetch_type =
1052 FetchType::from_u64(fetch_type_val).ok_or(CodecError::InvalidField)?;
1053 let (
1054 track_namespace,
1055 track_name,
1056 start_group,
1057 start_object,
1058 end_group,
1059 end_object,
1060 joining_subscribe_id,
1061 preceding_group_offset,
1062 ) = match fetch_type {
1063 FetchType::Standalone => {
1064 let ns = TrackNamespace::decode(buf)?;
1065 let name_len = VarInt::decode(buf)?.into_inner() as usize;
1066 let name = read_bytes(buf, name_len)?;
1067 let sg = VarInt::decode(buf)?;
1068 let so = VarInt::decode(buf)?;
1069 let eg = VarInt::decode(buf)?;
1070 let eo = VarInt::decode(buf)?;
1071 (Some(ns), Some(name), Some(sg), Some(so), Some(eg), Some(eo), None, None)
1072 }
1073 FetchType::Joining => {
1074 let jsi = VarInt::decode(buf)?;
1075 let pgo = VarInt::decode(buf)?;
1076 (None, None, None, None, None, None, Some(jsi), Some(pgo))
1077 }
1078 };
1079 let parameters = KeyValuePair::decode_list_d07(buf)?;
1080 Ok(ControlMessage::Fetch(Fetch {
1081 subscribe_id,
1082 subscriber_priority,
1083 group_order,
1084 fetch_type,
1085 track_namespace,
1086 track_name,
1087 start_group,
1088 start_object,
1089 end_group,
1090 end_object,
1091 joining_subscribe_id,
1092 preceding_group_offset,
1093 parameters,
1094 }))
1095 }
1096 MessageType::FetchOk => {
1097 let subscribe_id = VarInt::decode(buf)?;
1098 if buf.remaining() < 2 {
1099 return Err(CodecError::UnexpectedEnd);
1100 }
1101 let group_order =
1102 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1103 let end_of_track = buf.get_u8();
1104 let largest_group_id = VarInt::decode(buf)?;
1105 let largest_object_id = VarInt::decode(buf)?;
1106 let parameters = KeyValuePair::decode_list_d07(buf)?;
1107 Ok(ControlMessage::FetchOk(FetchOk {
1108 subscribe_id,
1109 group_order,
1110 end_of_track,
1111 largest_group_id,
1112 largest_object_id,
1113 parameters,
1114 }))
1115 }
1116 MessageType::FetchError => {
1117 let subscribe_id = VarInt::decode(buf)?;
1118 let error_code = VarInt::decode(buf)?;
1119 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1120 let reason_phrase = read_bytes(buf, reason_len)?;
1121 Ok(ControlMessage::FetchError(FetchError {
1122 subscribe_id,
1123 error_code,
1124 reason_phrase,
1125 }))
1126 }
1127 MessageType::FetchCancel => {
1128 let subscribe_id = VarInt::decode(buf)?;
1129 Ok(ControlMessage::FetchCancel(FetchCancel { subscribe_id }))
1130 }
1131 }
1132 }
1133}