1use crate::error::{
7 CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_REASON_PHRASE_LENGTH,
8};
9use crate::kvp::KeyValuePair;
10use crate::types::read_bytes;
11use crate::types::*;
12use crate::varint::VarInt;
13use bytes::{Buf, BufMut};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17#[repr(u64)]
18pub enum MessageType {
19 SubscribeUpdate = 0x02,
21 Subscribe = 0x03,
23 SubscribeOk = 0x04,
25 SubscribeError = 0x05,
27 Announce = 0x06,
29 AnnounceOk = 0x07,
31 AnnounceError = 0x08,
33 Unannounce = 0x09,
35 Unsubscribe = 0x0A,
37 SubscribeDone = 0x0B,
39 AnnounceCancel = 0x0C,
41 TrackStatusRequest = 0x0D,
43 TrackStatus = 0x0E,
45 GoAway = 0x10,
47 SubscribeAnnounces = 0x11,
49 SubscribeAnnouncesOk = 0x12,
51 SubscribeAnnouncesError = 0x13,
53 UnsubscribeAnnounces = 0x14,
55 MaxSubscribeId = 0x15,
57 Fetch = 0x16,
59 FetchCancel = 0x17,
61 FetchOk = 0x18,
63 FetchError = 0x19,
65 SubscribesBlocked = 0x1A,
67 ClientSetup = 0x40,
69 ServerSetup = 0x41,
71}
72
73impl MessageType {
74 pub fn from_id(id: u64) -> Option<Self> {
76 match id {
77 0x02 => Some(MessageType::SubscribeUpdate),
78 0x03 => Some(MessageType::Subscribe),
79 0x04 => Some(MessageType::SubscribeOk),
80 0x05 => Some(MessageType::SubscribeError),
81 0x06 => Some(MessageType::Announce),
82 0x07 => Some(MessageType::AnnounceOk),
83 0x08 => Some(MessageType::AnnounceError),
84 0x09 => Some(MessageType::Unannounce),
85 0x0A => Some(MessageType::Unsubscribe),
86 0x0B => Some(MessageType::SubscribeDone),
87 0x0C => Some(MessageType::AnnounceCancel),
88 0x0D => Some(MessageType::TrackStatusRequest),
89 0x0E => Some(MessageType::TrackStatus),
90 0x10 => Some(MessageType::GoAway),
91 0x11 => Some(MessageType::SubscribeAnnounces),
92 0x12 => Some(MessageType::SubscribeAnnouncesOk),
93 0x13 => Some(MessageType::SubscribeAnnouncesError),
94 0x14 => Some(MessageType::UnsubscribeAnnounces),
95 0x15 => Some(MessageType::MaxSubscribeId),
96 0x16 => Some(MessageType::Fetch),
97 0x17 => Some(MessageType::FetchCancel),
98 0x18 => Some(MessageType::FetchOk),
99 0x19 => Some(MessageType::FetchError),
100 0x1A => Some(MessageType::SubscribesBlocked),
101 0x40 => Some(MessageType::ClientSetup),
102 0x41 => Some(MessageType::ServerSetup),
103 _ => None,
104 }
105 }
106
107 pub fn id(&self) -> u64 {
109 *self as u64
110 }
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
119pub struct ClientSetup {
120 pub supported_versions: Vec<VarInt>,
122 pub parameters: Vec<KeyValuePair>,
124}
125
126#[derive(Debug, Clone, PartialEq, Eq)]
128pub struct ServerSetup {
129 pub selected_version: VarInt,
131 pub parameters: Vec<KeyValuePair>,
133}
134
135#[derive(Debug, Clone, PartialEq, Eq)]
137pub struct GoAway {
138 pub new_session_uri: Vec<u8>,
140}
141
142#[derive(Debug, Clone, PartialEq, Eq)]
144pub struct MaxSubscribeId {
145 pub subscribe_id: VarInt,
147}
148
149#[derive(Debug, Clone, PartialEq, Eq)]
151pub struct SubscribesBlocked {
152 pub maximum_subscribe_id: VarInt,
154}
155
156#[derive(Debug, Clone, PartialEq, Eq)]
164pub struct Subscribe {
165 pub subscribe_id: VarInt,
167 pub track_alias: VarInt,
169 pub track_namespace: TrackNamespace,
171 pub track_name: Vec<u8>,
173 pub subscriber_priority: u8,
175 pub group_order: GroupOrder,
177 pub filter_type: FilterType,
179 pub start_location: Option<Location>,
181 pub end_group: Option<VarInt>,
183 pub parameters: Vec<KeyValuePair>,
185}
186
187#[derive(Debug, Clone, PartialEq, Eq)]
189pub struct SubscribeOk {
190 pub subscribe_id: VarInt,
192 pub expires: VarInt,
194 pub group_order: GroupOrder,
196 pub content_exists: ContentExists,
198 pub largest_group_id: Option<VarInt>,
200 pub largest_object_id: Option<VarInt>,
202 pub parameters: Vec<KeyValuePair>,
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct SubscribeError {
209 pub subscribe_id: VarInt,
211 pub error_code: VarInt,
213 pub reason_phrase: Vec<u8>,
215 pub track_alias: VarInt,
217}
218
219#[derive(Debug, Clone, PartialEq, Eq)]
221pub struct SubscribeUpdate {
222 pub subscribe_id: VarInt,
224 pub start_group: VarInt,
226 pub start_object: VarInt,
228 pub end_group: VarInt,
230 pub subscriber_priority: u8,
232 pub parameters: Vec<KeyValuePair>,
234}
235
236#[derive(Debug, Clone, PartialEq, Eq)]
238pub struct SubscribeDone {
239 pub subscribe_id: VarInt,
241 pub status_code: VarInt,
243 pub stream_count: VarInt,
245 pub reason_phrase: Vec<u8>,
247}
248
249#[derive(Debug, Clone, PartialEq, Eq)]
251pub struct Unsubscribe {
252 pub subscribe_id: VarInt,
254}
255
256#[derive(Debug, Clone, PartialEq, Eq)]
262pub struct Announce {
263 pub track_namespace: TrackNamespace,
265 pub parameters: Vec<KeyValuePair>,
267}
268
269#[derive(Debug, Clone, PartialEq, Eq)]
271pub struct AnnounceOk {
272 pub track_namespace: TrackNamespace,
274}
275
276#[derive(Debug, Clone, PartialEq, Eq)]
278pub struct AnnounceError {
279 pub track_namespace: TrackNamespace,
281 pub error_code: VarInt,
283 pub reason_phrase: Vec<u8>,
285}
286
287#[derive(Debug, Clone, PartialEq, Eq)]
289pub struct AnnounceCancel {
290 pub track_namespace: TrackNamespace,
292 pub error_code: VarInt,
294 pub reason_phrase: Vec<u8>,
296}
297
298#[derive(Debug, Clone, PartialEq, Eq)]
300pub struct Unannounce {
301 pub track_namespace: TrackNamespace,
303}
304
305#[derive(Debug, Clone, PartialEq, Eq)]
311pub struct SubscribeAnnounces {
312 pub track_namespace_prefix: TrackNamespace,
314 pub parameters: Vec<KeyValuePair>,
316}
317
318#[derive(Debug, Clone, PartialEq, Eq)]
320pub struct SubscribeAnnouncesOk {
321 pub track_namespace_prefix: TrackNamespace,
323}
324
325#[derive(Debug, Clone, PartialEq, Eq)]
327pub struct SubscribeAnnouncesError {
328 pub track_namespace_prefix: TrackNamespace,
330 pub error_code: VarInt,
332 pub reason_phrase: Vec<u8>,
334}
335
336#[derive(Debug, Clone, PartialEq, Eq)]
338pub struct UnsubscribeAnnounces {
339 pub track_namespace_prefix: TrackNamespace,
341}
342
343#[derive(Debug, Clone, PartialEq, Eq)]
349pub struct TrackStatusRequest {
350 pub track_namespace: TrackNamespace,
352 pub track_name: Vec<u8>,
354}
355
356#[derive(Debug, Clone, PartialEq, Eq)]
358pub struct TrackStatus {
359 pub track_namespace: TrackNamespace,
361 pub track_name: Vec<u8>,
363 pub status_code: VarInt,
365 pub last_group_id: VarInt,
367 pub last_object_id: VarInt,
369}
370
371#[derive(Debug, Clone, Copy, PartialEq, Eq)]
377#[repr(u64)]
378pub enum FetchType {
379 Standalone = 1,
381 Joining = 2,
383}
384
385impl FetchType {
386 pub fn from_u64(v: u64) -> Option<Self> {
388 match v {
389 1 => Some(FetchType::Standalone),
390 2 => Some(FetchType::Joining),
391 _ => None,
392 }
393 }
394}
395
396#[derive(Debug, Clone, PartialEq, Eq)]
398pub struct Fetch {
399 pub subscribe_id: VarInt,
401 pub subscriber_priority: u8,
403 pub group_order: GroupOrder,
405 pub fetch_type: FetchType,
407 pub track_namespace: Option<TrackNamespace>,
409 pub track_name: Option<Vec<u8>>,
411 pub start_group: Option<VarInt>,
413 pub start_object: Option<VarInt>,
415 pub end_group: Option<VarInt>,
417 pub end_object: Option<VarInt>,
419 pub joining_subscribe_id: Option<VarInt>,
421 pub preceding_group_offset: Option<VarInt>,
423 pub parameters: Vec<KeyValuePair>,
425}
426
427#[derive(Debug, Clone, PartialEq, Eq)]
429pub struct FetchOk {
430 pub subscribe_id: VarInt,
432 pub group_order: GroupOrder,
434 pub end_of_track: u8,
436 pub largest_group_id: VarInt,
438 pub largest_object_id: VarInt,
440 pub parameters: Vec<KeyValuePair>,
442}
443
444#[derive(Debug, Clone, PartialEq, Eq)]
446pub struct FetchError {
447 pub subscribe_id: VarInt,
449 pub error_code: VarInt,
451 pub reason_phrase: Vec<u8>,
453}
454
455#[derive(Debug, Clone, PartialEq, Eq)]
457pub struct FetchCancel {
458 pub subscribe_id: VarInt,
460}
461
462#[derive(Debug, Clone, PartialEq, Eq)]
468pub enum ControlMessage {
469 ClientSetup(ClientSetup),
471 ServerSetup(ServerSetup),
473 GoAway(GoAway),
475 MaxSubscribeId(MaxSubscribeId),
477 SubscribesBlocked(SubscribesBlocked),
479 Subscribe(Subscribe),
481 SubscribeOk(SubscribeOk),
483 SubscribeError(SubscribeError),
485 SubscribeUpdate(SubscribeUpdate),
487 SubscribeDone(SubscribeDone),
489 Unsubscribe(Unsubscribe),
491 Announce(Announce),
493 AnnounceOk(AnnounceOk),
495 AnnounceError(AnnounceError),
497 AnnounceCancel(AnnounceCancel),
499 Unannounce(Unannounce),
501 SubscribeAnnounces(SubscribeAnnounces),
503 SubscribeAnnouncesOk(SubscribeAnnouncesOk),
505 SubscribeAnnouncesError(SubscribeAnnouncesError),
507 UnsubscribeAnnounces(UnsubscribeAnnounces),
509 TrackStatusRequest(TrackStatusRequest),
511 TrackStatus(TrackStatus),
513 Fetch(Fetch),
515 FetchOk(FetchOk),
517 FetchError(FetchError),
519 FetchCancel(FetchCancel),
521}
522
523impl ControlMessage {
524 pub fn message_type(&self) -> MessageType {
526 match self {
527 ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
528 ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
529 ControlMessage::GoAway(_) => MessageType::GoAway,
530 ControlMessage::MaxSubscribeId(_) => MessageType::MaxSubscribeId,
531 ControlMessage::SubscribesBlocked(_) => MessageType::SubscribesBlocked,
532 ControlMessage::Subscribe(_) => MessageType::Subscribe,
533 ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
534 ControlMessage::SubscribeError(_) => MessageType::SubscribeError,
535 ControlMessage::SubscribeUpdate(_) => MessageType::SubscribeUpdate,
536 ControlMessage::SubscribeDone(_) => MessageType::SubscribeDone,
537 ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
538 ControlMessage::Announce(_) => MessageType::Announce,
539 ControlMessage::AnnounceOk(_) => MessageType::AnnounceOk,
540 ControlMessage::AnnounceError(_) => MessageType::AnnounceError,
541 ControlMessage::AnnounceCancel(_) => MessageType::AnnounceCancel,
542 ControlMessage::Unannounce(_) => MessageType::Unannounce,
543 ControlMessage::SubscribeAnnounces(_) => MessageType::SubscribeAnnounces,
544 ControlMessage::SubscribeAnnouncesOk(_) => MessageType::SubscribeAnnouncesOk,
545 ControlMessage::SubscribeAnnouncesError(_) => MessageType::SubscribeAnnouncesError,
546 ControlMessage::UnsubscribeAnnounces(_) => MessageType::UnsubscribeAnnounces,
547 ControlMessage::TrackStatusRequest(_) => MessageType::TrackStatusRequest,
548 ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
549 ControlMessage::Fetch(_) => MessageType::Fetch,
550 ControlMessage::FetchOk(_) => MessageType::FetchOk,
551 ControlMessage::FetchError(_) => MessageType::FetchError,
552 ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
553 }
554 }
555
556 pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
558 let mut payload = Vec::with_capacity(256);
559 self.encode_payload(&mut payload)?;
560
561 if payload.len() > MAX_MESSAGE_LENGTH {
562 return Err(CodecError::MessageTooLong(payload.len()));
563 }
564
565 VarInt::from_usize(self.message_type().id() as usize).encode(buf);
566 VarInt::from_usize(payload.len()).encode(buf);
567 buf.put_slice(&payload);
568 Ok(())
569 }
570
571 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
573 let type_id = VarInt::decode(buf)?.into_inner();
574 let msg_type =
575 MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
576 let payload_len = VarInt::decode(buf)?.into_inner() as usize;
577 if buf.remaining() < payload_len {
578 return Err(CodecError::UnexpectedEnd);
579 }
580 let payload_bytes = buf.copy_to_bytes(payload_len);
581 let mut payload = &payload_bytes[..];
582 Self::decode_payload(msg_type, &mut payload)
583 }
584
585 fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
586 match self {
587 ControlMessage::ClientSetup(m) => {
588 VarInt::from_usize(m.supported_versions.len()).encode(buf);
589 for v in &m.supported_versions {
590 v.encode(buf);
591 }
592 KeyValuePair::encode_list_d07(&m.parameters, buf);
593 }
594 ControlMessage::ServerSetup(m) => {
595 m.selected_version.encode(buf);
596 KeyValuePair::encode_list_d07(&m.parameters, buf);
597 }
598 ControlMessage::GoAway(m) => {
599 if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
600 return Err(CodecError::GoAwayUriTooLong);
601 }
602 VarInt::from_usize(m.new_session_uri.len()).encode(buf);
603 buf.put_slice(&m.new_session_uri);
604 }
605 ControlMessage::MaxSubscribeId(m) => {
606 m.subscribe_id.encode(buf);
607 }
608 ControlMessage::SubscribesBlocked(m) => {
609 m.maximum_subscribe_id.encode(buf);
610 }
611 ControlMessage::Subscribe(m) => {
612 m.subscribe_id.encode(buf);
613 m.track_alias.encode(buf);
614 m.track_namespace.encode(buf);
615 VarInt::from_usize(m.track_name.len()).encode(buf);
616 buf.put_slice(&m.track_name);
617 buf.put_u8(m.subscriber_priority);
618 buf.put_u8(m.group_order as u8);
619 VarInt::from_usize(m.filter_type as usize).encode(buf);
620 if let Some(loc) = &m.start_location {
621 loc.encode(buf);
622 }
623 if let Some(eg) = &m.end_group {
624 eg.encode(buf);
625 }
626 KeyValuePair::encode_list_d07(&m.parameters, buf);
627 }
628 ControlMessage::SubscribeOk(m) => {
629 m.subscribe_id.encode(buf);
630 m.expires.encode(buf);
631 buf.put_u8(m.group_order as u8);
632 buf.put_u8(m.content_exists as u8);
633 if let Some(gid) = &m.largest_group_id {
634 gid.encode(buf);
635 }
636 if let Some(oid) = &m.largest_object_id {
637 oid.encode(buf);
638 }
639 KeyValuePair::encode_list_d07(&m.parameters, buf);
640 }
641 ControlMessage::SubscribeError(m) => {
642 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
643 return Err(CodecError::ReasonPhraseTooLong);
644 }
645 m.subscribe_id.encode(buf);
646 m.error_code.encode(buf);
647 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
648 buf.put_slice(&m.reason_phrase);
649 m.track_alias.encode(buf);
650 }
651 ControlMessage::SubscribeUpdate(m) => {
652 m.subscribe_id.encode(buf);
653 m.start_group.encode(buf);
654 m.start_object.encode(buf);
655 m.end_group.encode(buf);
656 buf.put_u8(m.subscriber_priority);
657 KeyValuePair::encode_list_d07(&m.parameters, buf);
658 }
659 ControlMessage::SubscribeDone(m) => {
660 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
661 return Err(CodecError::ReasonPhraseTooLong);
662 }
663 m.subscribe_id.encode(buf);
664 m.status_code.encode(buf);
665 m.stream_count.encode(buf);
666 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
667 buf.put_slice(&m.reason_phrase);
668 }
669 ControlMessage::Unsubscribe(m) => {
670 m.subscribe_id.encode(buf);
671 }
672 ControlMessage::Announce(m) => {
673 m.track_namespace.encode(buf);
674 KeyValuePair::encode_list_d07(&m.parameters, buf);
675 }
676 ControlMessage::AnnounceOk(m) => {
677 m.track_namespace.encode(buf);
678 }
679 ControlMessage::AnnounceError(m) => {
680 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
681 return Err(CodecError::ReasonPhraseTooLong);
682 }
683 m.track_namespace.encode(buf);
684 m.error_code.encode(buf);
685 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
686 buf.put_slice(&m.reason_phrase);
687 }
688 ControlMessage::AnnounceCancel(m) => {
689 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
690 return Err(CodecError::ReasonPhraseTooLong);
691 }
692 m.track_namespace.encode(buf);
693 m.error_code.encode(buf);
694 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
695 buf.put_slice(&m.reason_phrase);
696 }
697 ControlMessage::Unannounce(m) => {
698 m.track_namespace.encode(buf);
699 }
700 ControlMessage::SubscribeAnnounces(m) => {
701 m.track_namespace_prefix.encode(buf);
702 KeyValuePair::encode_list_d07(&m.parameters, buf);
703 }
704 ControlMessage::SubscribeAnnouncesOk(m) => {
705 m.track_namespace_prefix.encode(buf);
706 }
707 ControlMessage::SubscribeAnnouncesError(m) => {
708 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
709 return Err(CodecError::ReasonPhraseTooLong);
710 }
711 m.track_namespace_prefix.encode(buf);
712 m.error_code.encode(buf);
713 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
714 buf.put_slice(&m.reason_phrase);
715 }
716 ControlMessage::UnsubscribeAnnounces(m) => {
717 m.track_namespace_prefix.encode(buf);
718 }
719 ControlMessage::TrackStatusRequest(m) => {
720 m.track_namespace.encode(buf);
721 VarInt::from_usize(m.track_name.len()).encode(buf);
722 buf.put_slice(&m.track_name);
723 }
724 ControlMessage::TrackStatus(m) => {
725 m.track_namespace.encode(buf);
726 VarInt::from_usize(m.track_name.len()).encode(buf);
727 buf.put_slice(&m.track_name);
728 m.status_code.encode(buf);
729 m.last_group_id.encode(buf);
730 m.last_object_id.encode(buf);
731 }
732 ControlMessage::Fetch(m) => {
733 m.subscribe_id.encode(buf);
734 buf.put_u8(m.subscriber_priority);
735 buf.put_u8(m.group_order as u8);
736 VarInt::from_usize(m.fetch_type as usize).encode(buf);
737 match m.fetch_type {
738 FetchType::Standalone => {
739 if let Some(ns) = &m.track_namespace {
740 ns.encode(buf);
741 }
742 if let Some(name) = &m.track_name {
743 VarInt::from_usize(name.len()).encode(buf);
744 buf.put_slice(name);
745 }
746 if let Some(sg) = &m.start_group {
747 sg.encode(buf);
748 }
749 if let Some(so) = &m.start_object {
750 so.encode(buf);
751 }
752 if let Some(eg) = &m.end_group {
753 eg.encode(buf);
754 }
755 if let Some(eo) = &m.end_object {
756 eo.encode(buf);
757 }
758 }
759 FetchType::Joining => {
760 if let Some(jsi) = &m.joining_subscribe_id {
761 jsi.encode(buf);
762 }
763 if let Some(pgo) = &m.preceding_group_offset {
764 pgo.encode(buf);
765 }
766 }
767 }
768 KeyValuePair::encode_list_d07(&m.parameters, buf);
769 }
770 ControlMessage::FetchOk(m) => {
771 m.subscribe_id.encode(buf);
772 buf.put_u8(m.group_order as u8);
773 buf.put_u8(m.end_of_track);
774 m.largest_group_id.encode(buf);
775 m.largest_object_id.encode(buf);
776 KeyValuePair::encode_list_d07(&m.parameters, buf);
777 }
778 ControlMessage::FetchError(m) => {
779 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
780 return Err(CodecError::ReasonPhraseTooLong);
781 }
782 m.subscribe_id.encode(buf);
783 m.error_code.encode(buf);
784 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
785 buf.put_slice(&m.reason_phrase);
786 }
787 ControlMessage::FetchCancel(m) => {
788 m.subscribe_id.encode(buf);
789 }
790 }
791 Ok(())
792 }
793
794 fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
795 match msg_type {
796 MessageType::ClientSetup => {
797 let num_versions = VarInt::decode(buf)?.into_inner() as usize;
798 if num_versions == 0 {
799 return Err(CodecError::InvalidField);
800 }
801 let mut supported_versions = Vec::with_capacity(num_versions);
802 for _ in 0..num_versions {
803 supported_versions.push(VarInt::decode(buf)?);
804 }
805 let parameters = KeyValuePair::decode_list_d07(buf)?;
806 Ok(ControlMessage::ClientSetup(ClientSetup { supported_versions, parameters }))
807 }
808 MessageType::ServerSetup => {
809 let selected_version = VarInt::decode(buf)?;
810 let parameters = KeyValuePair::decode_list_d07(buf)?;
811 Ok(ControlMessage::ServerSetup(ServerSetup { selected_version, parameters }))
812 }
813 MessageType::GoAway => {
814 let uri_len = VarInt::decode(buf)?.into_inner() as usize;
815 let uri = read_bytes(buf, uri_len)?;
816 Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
817 }
818 MessageType::MaxSubscribeId => {
819 let subscribe_id = VarInt::decode(buf)?;
820 Ok(ControlMessage::MaxSubscribeId(MaxSubscribeId { subscribe_id }))
821 }
822 MessageType::SubscribesBlocked => {
823 let maximum_subscribe_id = VarInt::decode(buf)?;
824 Ok(ControlMessage::SubscribesBlocked(SubscribesBlocked { maximum_subscribe_id }))
825 }
826 MessageType::Subscribe => {
827 let subscribe_id = VarInt::decode(buf)?;
828 let track_alias = VarInt::decode(buf)?;
829 let track_namespace = TrackNamespace::decode(buf)?;
830 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
831 let track_name = read_bytes(buf, track_name_len)?;
832 if buf.remaining() < 2 {
833 return Err(CodecError::UnexpectedEnd);
834 }
835 let subscriber_priority = buf.get_u8();
836 let group_order =
837 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
838 let filter_val = VarInt::decode(buf)?.into_inner();
839 if filter_val == 1 {
841 return Err(CodecError::InvalidField);
842 }
843 let filter_type =
844 FilterType::from_u64(filter_val).ok_or(CodecError::InvalidField)?;
845 let start_location = match filter_type {
846 FilterType::AbsoluteStart | FilterType::AbsoluteRange => {
847 Some(Location::decode(buf)?)
848 }
849 _ => None,
850 };
851 let end_group = match filter_type {
852 FilterType::AbsoluteRange => Some(VarInt::decode(buf)?),
853 _ => None,
854 };
855 let parameters = KeyValuePair::decode_list_d07(buf)?;
856 Ok(ControlMessage::Subscribe(Subscribe {
857 subscribe_id,
858 track_alias,
859 track_namespace,
860 track_name,
861 subscriber_priority,
862 group_order,
863 filter_type,
864 start_location,
865 end_group,
866 parameters,
867 }))
868 }
869 MessageType::SubscribeOk => {
870 let subscribe_id = VarInt::decode(buf)?;
871 let expires = VarInt::decode(buf)?;
872 if buf.remaining() < 2 {
873 return Err(CodecError::UnexpectedEnd);
874 }
875 let group_order =
876 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
877 let content_exists_val = buf.get_u8();
878 let content_exists = match content_exists_val {
879 0 => ContentExists::NoLargestLocation,
880 1 => ContentExists::HasLargestLocation,
881 _ => return Err(CodecError::InvalidField),
882 };
883 let (largest_group_id, largest_object_id) =
884 if content_exists == ContentExists::HasLargestLocation {
885 let gid = VarInt::decode(buf)?;
886 let oid = VarInt::decode(buf)?;
887 (Some(gid), Some(oid))
888 } else {
889 (None, None)
890 };
891 let parameters = KeyValuePair::decode_list_d07(buf)?;
892 Ok(ControlMessage::SubscribeOk(SubscribeOk {
893 subscribe_id,
894 expires,
895 group_order,
896 content_exists,
897 largest_group_id,
898 largest_object_id,
899 parameters,
900 }))
901 }
902 MessageType::SubscribeError => {
903 let subscribe_id = VarInt::decode(buf)?;
904 let error_code = VarInt::decode(buf)?;
905 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
906 let reason_phrase = read_bytes(buf, reason_len)?;
907 let track_alias = VarInt::decode(buf)?;
908 Ok(ControlMessage::SubscribeError(SubscribeError {
909 subscribe_id,
910 error_code,
911 reason_phrase,
912 track_alias,
913 }))
914 }
915 MessageType::SubscribeUpdate => {
916 let subscribe_id = VarInt::decode(buf)?;
917 let start_group = VarInt::decode(buf)?;
918 let start_object = VarInt::decode(buf)?;
919 let end_group = VarInt::decode(buf)?;
920 if buf.remaining() < 1 {
921 return Err(CodecError::UnexpectedEnd);
922 }
923 let subscriber_priority = buf.get_u8();
924 let parameters = KeyValuePair::decode_list_d07(buf)?;
925 Ok(ControlMessage::SubscribeUpdate(SubscribeUpdate {
926 subscribe_id,
927 start_group,
928 start_object,
929 end_group,
930 subscriber_priority,
931 parameters,
932 }))
933 }
934 MessageType::SubscribeDone => {
935 let subscribe_id = VarInt::decode(buf)?;
936 let status_code = VarInt::decode(buf)?;
937 let stream_count = VarInt::decode(buf)?;
938 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
939 let reason_phrase = read_bytes(buf, reason_len)?;
940 Ok(ControlMessage::SubscribeDone(SubscribeDone {
941 subscribe_id,
942 status_code,
943 stream_count,
944 reason_phrase,
945 }))
946 }
947 MessageType::Unsubscribe => {
948 let subscribe_id = VarInt::decode(buf)?;
949 Ok(ControlMessage::Unsubscribe(Unsubscribe { subscribe_id }))
950 }
951 MessageType::Announce => {
952 let track_namespace = TrackNamespace::decode(buf)?;
953 let parameters = KeyValuePair::decode_list_d07(buf)?;
954 Ok(ControlMessage::Announce(Announce { track_namespace, parameters }))
955 }
956 MessageType::AnnounceOk => {
957 let track_namespace = TrackNamespace::decode(buf)?;
958 Ok(ControlMessage::AnnounceOk(AnnounceOk { track_namespace }))
959 }
960 MessageType::AnnounceError => {
961 let track_namespace = TrackNamespace::decode(buf)?;
962 let error_code = VarInt::decode(buf)?;
963 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
964 let reason_phrase = read_bytes(buf, reason_len)?;
965 Ok(ControlMessage::AnnounceError(AnnounceError {
966 track_namespace,
967 error_code,
968 reason_phrase,
969 }))
970 }
971 MessageType::AnnounceCancel => {
972 let track_namespace = TrackNamespace::decode(buf)?;
973 let error_code = VarInt::decode(buf)?;
974 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
975 let reason_phrase = read_bytes(buf, reason_len)?;
976 Ok(ControlMessage::AnnounceCancel(AnnounceCancel {
977 track_namespace,
978 error_code,
979 reason_phrase,
980 }))
981 }
982 MessageType::Unannounce => {
983 let track_namespace = TrackNamespace::decode(buf)?;
984 Ok(ControlMessage::Unannounce(Unannounce { track_namespace }))
985 }
986 MessageType::SubscribeAnnounces => {
987 let track_namespace_prefix = TrackNamespace::decode(buf)?;
988 let parameters = KeyValuePair::decode_list_d07(buf)?;
989 Ok(ControlMessage::SubscribeAnnounces(SubscribeAnnounces {
990 track_namespace_prefix,
991 parameters,
992 }))
993 }
994 MessageType::SubscribeAnnouncesOk => {
995 let track_namespace_prefix = TrackNamespace::decode(buf)?;
996 Ok(ControlMessage::SubscribeAnnouncesOk(SubscribeAnnouncesOk {
997 track_namespace_prefix,
998 }))
999 }
1000 MessageType::SubscribeAnnouncesError => {
1001 let track_namespace_prefix = TrackNamespace::decode(buf)?;
1002 let error_code = VarInt::decode(buf)?;
1003 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1004 let reason_phrase = read_bytes(buf, reason_len)?;
1005 Ok(ControlMessage::SubscribeAnnouncesError(SubscribeAnnouncesError {
1006 track_namespace_prefix,
1007 error_code,
1008 reason_phrase,
1009 }))
1010 }
1011 MessageType::UnsubscribeAnnounces => {
1012 let track_namespace_prefix = TrackNamespace::decode(buf)?;
1013 Ok(ControlMessage::UnsubscribeAnnounces(UnsubscribeAnnounces {
1014 track_namespace_prefix,
1015 }))
1016 }
1017 MessageType::TrackStatusRequest => {
1018 let track_namespace = TrackNamespace::decode(buf)?;
1019 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1020 let track_name = read_bytes(buf, track_name_len)?;
1021 Ok(ControlMessage::TrackStatusRequest(TrackStatusRequest {
1022 track_namespace,
1023 track_name,
1024 }))
1025 }
1026 MessageType::TrackStatus => {
1027 let track_namespace = TrackNamespace::decode(buf)?;
1028 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1029 let track_name = read_bytes(buf, track_name_len)?;
1030 let status_code = VarInt::decode(buf)?;
1031 let last_group_id = VarInt::decode(buf)?;
1032 let last_object_id = VarInt::decode(buf)?;
1033 Ok(ControlMessage::TrackStatus(TrackStatus {
1034 track_namespace,
1035 track_name,
1036 status_code,
1037 last_group_id,
1038 last_object_id,
1039 }))
1040 }
1041 MessageType::Fetch => {
1042 let subscribe_id = VarInt::decode(buf)?;
1043 if buf.remaining() < 2 {
1044 return Err(CodecError::UnexpectedEnd);
1045 }
1046 let subscriber_priority = buf.get_u8();
1047 let group_order =
1048 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1049 let fetch_type_val = VarInt::decode(buf)?.into_inner();
1050 let fetch_type =
1051 FetchType::from_u64(fetch_type_val).ok_or(CodecError::InvalidField)?;
1052 let (
1053 track_namespace,
1054 track_name,
1055 start_group,
1056 start_object,
1057 end_group,
1058 end_object,
1059 joining_subscribe_id,
1060 preceding_group_offset,
1061 ) = match fetch_type {
1062 FetchType::Standalone => {
1063 let ns = TrackNamespace::decode(buf)?;
1064 let name_len = VarInt::decode(buf)?.into_inner() as usize;
1065 let name = read_bytes(buf, name_len)?;
1066 let sg = VarInt::decode(buf)?;
1067 let so = VarInt::decode(buf)?;
1068 let eg = VarInt::decode(buf)?;
1069 let eo = VarInt::decode(buf)?;
1070 (Some(ns), Some(name), Some(sg), Some(so), Some(eg), Some(eo), None, None)
1071 }
1072 FetchType::Joining => {
1073 let jsi = VarInt::decode(buf)?;
1074 let pgo = VarInt::decode(buf)?;
1075 (None, None, None, None, None, None, Some(jsi), Some(pgo))
1076 }
1077 };
1078 let parameters = KeyValuePair::decode_list_d07(buf)?;
1079 Ok(ControlMessage::Fetch(Fetch {
1080 subscribe_id,
1081 subscriber_priority,
1082 group_order,
1083 fetch_type,
1084 track_namespace,
1085 track_name,
1086 start_group,
1087 start_object,
1088 end_group,
1089 end_object,
1090 joining_subscribe_id,
1091 preceding_group_offset,
1092 parameters,
1093 }))
1094 }
1095 MessageType::FetchOk => {
1096 let subscribe_id = VarInt::decode(buf)?;
1097 if buf.remaining() < 2 {
1098 return Err(CodecError::UnexpectedEnd);
1099 }
1100 let group_order =
1101 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1102 let end_of_track = buf.get_u8();
1103 let largest_group_id = VarInt::decode(buf)?;
1104 let largest_object_id = VarInt::decode(buf)?;
1105 let parameters = KeyValuePair::decode_list_d07(buf)?;
1106 Ok(ControlMessage::FetchOk(FetchOk {
1107 subscribe_id,
1108 group_order,
1109 end_of_track,
1110 largest_group_id,
1111 largest_object_id,
1112 parameters,
1113 }))
1114 }
1115 MessageType::FetchError => {
1116 let subscribe_id = VarInt::decode(buf)?;
1117 let error_code = VarInt::decode(buf)?;
1118 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1119 let reason_phrase = read_bytes(buf, reason_len)?;
1120 Ok(ControlMessage::FetchError(FetchError {
1121 subscribe_id,
1122 error_code,
1123 reason_phrase,
1124 }))
1125 }
1126 MessageType::FetchCancel => {
1127 let subscribe_id = VarInt::decode(buf)?;
1128 Ok(ControlMessage::FetchCancel(FetchCancel { subscribe_id }))
1129 }
1130 }
1131 }
1132}