1pub use crate::error::{
23 CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_NAMESPACE_TUPLE_SIZE,
24 MAX_REASON_PHRASE_LENGTH,
25};
26use crate::kvp::KeyValuePair;
27use crate::types::{self, *};
28use crate::varint::VarInt;
29use bytes::{Buf, BufMut};
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33#[repr(u64)]
34pub enum MessageType {
35 SubscribeUpdate = 0x02,
37 Subscribe = 0x03,
39 SubscribeOk = 0x04,
41 SubscribeError = 0x05,
43 Announce = 0x06,
45 AnnounceOk = 0x07,
47 AnnounceError = 0x08,
49 Unannounce = 0x09,
51 Unsubscribe = 0x0A,
53 SubscribeDone = 0x0B,
55 AnnounceCancel = 0x0C,
57 TrackStatusRequest = 0x0D,
59 TrackStatus = 0x0E,
61 GoAway = 0x10,
63 SubscribeAnnounces = 0x11,
65 SubscribeAnnouncesOk = 0x12,
67 SubscribeAnnouncesError = 0x13,
69 UnsubscribeAnnounces = 0x14,
71 MaxRequestId = 0x15,
73 Fetch = 0x16,
75 FetchCancel = 0x17,
77 FetchOk = 0x18,
79 FetchError = 0x19,
81 RequestsBlocked = 0x1A,
83 ClientSetup = 0x20,
85 ServerSetup = 0x21,
87}
88
89impl MessageType {
90 pub fn from_id(id: u64) -> Option<Self> {
92 match id {
93 0x02 => Some(MessageType::SubscribeUpdate),
94 0x03 => Some(MessageType::Subscribe),
95 0x04 => Some(MessageType::SubscribeOk),
96 0x05 => Some(MessageType::SubscribeError),
97 0x06 => Some(MessageType::Announce),
98 0x07 => Some(MessageType::AnnounceOk),
99 0x08 => Some(MessageType::AnnounceError),
100 0x09 => Some(MessageType::Unannounce),
101 0x0A => Some(MessageType::Unsubscribe),
102 0x0B => Some(MessageType::SubscribeDone),
103 0x0C => Some(MessageType::AnnounceCancel),
104 0x0D => Some(MessageType::TrackStatusRequest),
105 0x0E => Some(MessageType::TrackStatus),
106 0x10 => Some(MessageType::GoAway),
107 0x11 => Some(MessageType::SubscribeAnnounces),
108 0x12 => Some(MessageType::SubscribeAnnouncesOk),
109 0x13 => Some(MessageType::SubscribeAnnouncesError),
110 0x14 => Some(MessageType::UnsubscribeAnnounces),
111 0x15 => Some(MessageType::MaxRequestId),
112 0x16 => Some(MessageType::Fetch),
113 0x17 => Some(MessageType::FetchCancel),
114 0x18 => Some(MessageType::FetchOk),
115 0x19 => Some(MessageType::FetchError),
116 0x1A => Some(MessageType::RequestsBlocked),
117 0x20 => Some(MessageType::ClientSetup),
118 0x21 => Some(MessageType::ServerSetup),
119 _ => None,
120 }
121 }
122
123 pub fn id(&self) -> u64 {
125 *self as u64
126 }
127}
128
129#[derive(Debug, Clone, PartialEq, Eq)]
135pub struct ClientSetup {
136 pub supported_versions: Vec<VarInt>,
138 pub parameters: Vec<KeyValuePair>,
140}
141
142#[derive(Debug, Clone, PartialEq, Eq)]
144pub struct ServerSetup {
145 pub selected_version: VarInt,
147 pub parameters: Vec<KeyValuePair>,
149}
150
151#[derive(Debug, Clone, PartialEq, Eq)]
153pub struct GoAway {
154 pub new_session_uri: Vec<u8>,
156}
157
158#[derive(Debug, Clone, PartialEq, Eq)]
160pub struct MaxRequestId {
161 pub request_id: VarInt,
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
167pub struct RequestsBlocked {
168 pub maximum_request_id: VarInt,
170}
171
172#[derive(Debug, Clone, PartialEq, Eq)]
178pub struct Subscribe {
179 pub request_id: VarInt,
181 pub track_alias: VarInt,
183 pub track_namespace: TrackNamespace,
185 pub track_name: Vec<u8>,
187 pub subscriber_priority: u8,
189 pub group_order: VarInt,
191 pub forward: VarInt,
193 pub filter_type: VarInt,
195 pub start_group: Option<VarInt>,
197 pub start_object: Option<VarInt>,
199 pub end_group: Option<VarInt>,
201 pub parameters: Vec<KeyValuePair>,
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
207pub struct SubscribeOk {
208 pub request_id: VarInt,
210 pub expires: VarInt,
212 pub group_order: VarInt,
214 pub content_exists: VarInt,
216 pub largest_location: Option<Location>,
218 pub parameters: Vec<KeyValuePair>,
220}
221
222#[derive(Debug, Clone, PartialEq, Eq)]
224pub struct SubscribeError {
225 pub request_id: VarInt,
227 pub error_code: VarInt,
229 pub reason_phrase: Vec<u8>,
231 pub track_alias: VarInt,
233}
234
235#[derive(Debug, Clone, PartialEq, Eq)]
237pub struct SubscribeUpdate {
238 pub request_id: VarInt,
240 pub start_group: VarInt,
242 pub start_object: VarInt,
244 pub end_group: VarInt,
246 pub subscriber_priority: u8,
248 pub forward: VarInt,
250 pub parameters: Vec<KeyValuePair>,
252}
253
254#[derive(Debug, Clone, PartialEq, Eq)]
256pub struct SubscribeDone {
257 pub request_id: VarInt,
259 pub status_code: VarInt,
261 pub stream_count: VarInt,
263 pub reason_phrase: Vec<u8>,
265}
266
267#[derive(Debug, Clone, PartialEq, Eq)]
269pub struct Unsubscribe {
270 pub request_id: VarInt,
272}
273
274#[derive(Debug, Clone, PartialEq, Eq)]
280pub struct Announce {
281 pub request_id: VarInt,
283 pub track_namespace: TrackNamespace,
285 pub parameters: Vec<KeyValuePair>,
287}
288
289#[derive(Debug, Clone, PartialEq, Eq)]
291pub struct AnnounceOk {
292 pub request_id: VarInt,
294}
295
296#[derive(Debug, Clone, PartialEq, Eq)]
298pub struct AnnounceError {
299 pub request_id: VarInt,
301 pub error_code: VarInt,
303 pub reason_phrase: Vec<u8>,
305}
306
307#[derive(Debug, Clone, PartialEq, Eq)]
309pub struct AnnounceCancel {
310 pub track_namespace: TrackNamespace,
312 pub error_code: VarInt,
314 pub reason_phrase: Vec<u8>,
316}
317
318#[derive(Debug, Clone, PartialEq, Eq)]
320pub struct Unannounce {
321 pub track_namespace: TrackNamespace,
323}
324
325#[derive(Debug, Clone, PartialEq, Eq)]
331pub struct SubscribeAnnounces {
332 pub request_id: VarInt,
334 pub track_namespace_prefix: TrackNamespace,
336 pub parameters: Vec<KeyValuePair>,
338}
339
340#[derive(Debug, Clone, PartialEq, Eq)]
342pub struct SubscribeAnnouncesOk {
343 pub request_id: VarInt,
345}
346
347#[derive(Debug, Clone, PartialEq, Eq)]
349pub struct SubscribeAnnouncesError {
350 pub request_id: VarInt,
352 pub error_code: VarInt,
354 pub reason_phrase: Vec<u8>,
356}
357
358#[derive(Debug, Clone, PartialEq, Eq)]
360pub struct UnsubscribeAnnounces {
361 pub track_namespace_prefix: TrackNamespace,
363}
364
365#[derive(Debug, Clone, PartialEq, Eq)]
371pub struct TrackStatusRequest {
372 pub request_id: VarInt,
374 pub track_namespace: TrackNamespace,
376 pub track_name: Vec<u8>,
378 pub parameters: Vec<KeyValuePair>,
380}
381
382#[derive(Debug, Clone, PartialEq, Eq)]
384pub struct TrackStatus {
385 pub request_id: VarInt,
387 pub status_code: VarInt,
389 pub largest_location: Location,
391 pub parameters: Vec<KeyValuePair>,
393}
394
395#[derive(Debug, Clone, Copy, PartialEq, Eq)]
401#[repr(u64)]
402pub enum FetchType {
403 Standalone = 1,
405 RelativeJoining = 2,
407 AbsoluteJoining = 3,
409}
410
411impl FetchType {
412 pub fn from_u64(v: u64) -> Option<Self> {
414 match v {
415 1 => Some(FetchType::Standalone),
416 2 => Some(FetchType::RelativeJoining),
417 3 => Some(FetchType::AbsoluteJoining),
418 _ => None,
419 }
420 }
421}
422
423#[derive(Debug, Clone, PartialEq, Eq)]
425pub struct Fetch {
426 pub request_id: VarInt,
428 pub subscriber_priority: u8,
430 pub group_order: VarInt,
432 pub fetch_type: FetchType,
434 pub fetch_payload: FetchPayload,
436 pub parameters: Vec<KeyValuePair>,
438}
439
440#[derive(Debug, Clone, PartialEq, Eq)]
442pub enum FetchPayload {
443 Standalone {
445 track_namespace: TrackNamespace,
447 track_name: Vec<u8>,
449 start_group: VarInt,
451 start_object: VarInt,
453 end_group: VarInt,
455 end_object: VarInt,
457 },
458 Joining {
460 joining_subscribe_id: VarInt,
462 joining_start: VarInt,
464 },
465}
466
467#[derive(Debug, Clone, PartialEq, Eq)]
469pub struct FetchOk {
470 pub request_id: VarInt,
472 pub group_order: VarInt,
474 pub end_of_track: VarInt,
476 pub end_location: Location,
478 pub parameters: Vec<KeyValuePair>,
480}
481
482#[derive(Debug, Clone, PartialEq, Eq)]
484pub struct FetchError {
485 pub request_id: VarInt,
487 pub error_code: VarInt,
489 pub reason_phrase: Vec<u8>,
491}
492
493#[derive(Debug, Clone, PartialEq, Eq)]
495pub struct FetchCancel {
496 pub request_id: VarInt,
498}
499
500#[derive(Debug, Clone, PartialEq, Eq)]
506pub enum ControlMessage {
507 ClientSetup(ClientSetup),
509 ServerSetup(ServerSetup),
511 GoAway(GoAway),
513 MaxRequestId(MaxRequestId),
515 RequestsBlocked(RequestsBlocked),
517 Subscribe(Subscribe),
519 SubscribeOk(SubscribeOk),
521 SubscribeError(SubscribeError),
523 SubscribeUpdate(SubscribeUpdate),
525 SubscribeDone(SubscribeDone),
527 Unsubscribe(Unsubscribe),
529 Announce(Announce),
531 AnnounceOk(AnnounceOk),
533 AnnounceError(AnnounceError),
535 AnnounceCancel(AnnounceCancel),
537 Unannounce(Unannounce),
539 SubscribeAnnounces(SubscribeAnnounces),
541 SubscribeAnnouncesOk(SubscribeAnnouncesOk),
543 SubscribeAnnouncesError(SubscribeAnnouncesError),
545 UnsubscribeAnnounces(UnsubscribeAnnounces),
547 TrackStatusRequest(TrackStatusRequest),
549 TrackStatus(TrackStatus),
551 Fetch(Fetch),
553 FetchOk(FetchOk),
555 FetchError(FetchError),
557 FetchCancel(FetchCancel),
559}
560
561impl ControlMessage {
562 pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
566 let mut payload = Vec::with_capacity(256);
567 self.encode_payload(&mut payload)?;
568
569 if payload.len() > MAX_MESSAGE_LENGTH {
570 return Err(CodecError::MessageTooLong(payload.len()));
571 }
572
573 VarInt::from_usize(self.message_type().id() as usize).encode(buf);
574 buf.put_u16(payload.len() as u16);
576 buf.put_slice(&payload);
577 Ok(())
578 }
579
580 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
584 let type_id = VarInt::decode(buf)?.into_inner();
585 let msg_type =
586 MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
587 if buf.remaining() < 2 {
589 return Err(CodecError::UnexpectedEnd);
590 }
591 let payload_len = buf.get_u16() as usize;
592 if buf.remaining() < payload_len {
593 return Err(CodecError::UnexpectedEnd);
594 }
595 let payload_bytes = buf.copy_to_bytes(payload_len);
596 let mut payload = &payload_bytes[..];
597 Self::decode_payload(msg_type, &mut payload)
598 }
599
600 fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
601 match self {
602 ControlMessage::ClientSetup(m) => {
603 VarInt::from_usize(m.supported_versions.len()).encode(buf);
604 for v in &m.supported_versions {
605 v.encode(buf);
606 }
607 KeyValuePair::encode_list(&m.parameters, buf);
608 }
609 ControlMessage::ServerSetup(m) => {
610 m.selected_version.encode(buf);
611 KeyValuePair::encode_list(&m.parameters, buf);
612 }
613 ControlMessage::GoAway(m) => {
614 if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
615 return Err(CodecError::GoAwayUriTooLong);
616 }
617 VarInt::from_usize(m.new_session_uri.len()).encode(buf);
618 buf.put_slice(&m.new_session_uri);
619 }
620 ControlMessage::MaxRequestId(m) => {
621 m.request_id.encode(buf);
622 }
623 ControlMessage::RequestsBlocked(m) => {
624 m.maximum_request_id.encode(buf);
625 }
626 ControlMessage::Subscribe(m) => {
627 m.request_id.encode(buf);
628 m.track_alias.encode(buf);
629 m.track_namespace.encode(buf);
630 VarInt::from_usize(m.track_name.len()).encode(buf);
631 buf.put_slice(&m.track_name);
632 buf.put_u8(m.subscriber_priority);
633 m.group_order.encode(buf);
634 m.forward.encode(buf);
635 m.filter_type.encode(buf);
636 if let Some(sg) = &m.start_group {
637 sg.encode(buf);
638 }
639 if let Some(so) = &m.start_object {
640 so.encode(buf);
641 }
642 if let Some(eg) = &m.end_group {
643 eg.encode(buf);
644 }
645 KeyValuePair::encode_list(&m.parameters, buf);
646 }
647 ControlMessage::SubscribeOk(m) => {
648 m.request_id.encode(buf);
649 m.expires.encode(buf);
650 m.group_order.encode(buf);
651 m.content_exists.encode(buf);
652 if let Some(loc) = &m.largest_location {
653 loc.encode(buf);
654 }
655 KeyValuePair::encode_list(&m.parameters, buf);
656 }
657 ControlMessage::SubscribeError(m) => {
658 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
659 return Err(CodecError::ReasonPhraseTooLong);
660 }
661 m.request_id.encode(buf);
662 m.error_code.encode(buf);
663 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
664 buf.put_slice(&m.reason_phrase);
665 m.track_alias.encode(buf);
666 }
667 ControlMessage::SubscribeUpdate(m) => {
668 m.request_id.encode(buf);
669 m.start_group.encode(buf);
670 m.start_object.encode(buf);
671 m.end_group.encode(buf);
672 buf.put_u8(m.subscriber_priority);
673 m.forward.encode(buf);
674 KeyValuePair::encode_list(&m.parameters, buf);
675 }
676 ControlMessage::SubscribeDone(m) => {
677 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
678 return Err(CodecError::ReasonPhraseTooLong);
679 }
680 m.request_id.encode(buf);
681 m.status_code.encode(buf);
682 m.stream_count.encode(buf);
683 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
684 buf.put_slice(&m.reason_phrase);
685 }
686 ControlMessage::Unsubscribe(m) => {
687 m.request_id.encode(buf);
688 }
689 ControlMessage::Announce(m) => {
690 m.request_id.encode(buf);
691 m.track_namespace.encode(buf);
692 KeyValuePair::encode_list(&m.parameters, buf);
693 }
694 ControlMessage::AnnounceOk(m) => {
695 m.request_id.encode(buf);
696 }
697 ControlMessage::AnnounceError(m) => {
698 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
699 return Err(CodecError::ReasonPhraseTooLong);
700 }
701 m.request_id.encode(buf);
702 m.error_code.encode(buf);
703 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
704 buf.put_slice(&m.reason_phrase);
705 }
706 ControlMessage::AnnounceCancel(m) => {
707 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
708 return Err(CodecError::ReasonPhraseTooLong);
709 }
710 m.track_namespace.encode(buf);
711 m.error_code.encode(buf);
712 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
713 buf.put_slice(&m.reason_phrase);
714 }
715 ControlMessage::Unannounce(m) => {
716 m.track_namespace.encode(buf);
717 }
718 ControlMessage::SubscribeAnnounces(m) => {
719 m.request_id.encode(buf);
720 m.track_namespace_prefix.encode(buf);
721 KeyValuePair::encode_list(&m.parameters, buf);
722 }
723 ControlMessage::SubscribeAnnouncesOk(m) => {
724 m.request_id.encode(buf);
725 }
726 ControlMessage::SubscribeAnnouncesError(m) => {
727 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
728 return Err(CodecError::ReasonPhraseTooLong);
729 }
730 m.request_id.encode(buf);
731 m.error_code.encode(buf);
732 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
733 buf.put_slice(&m.reason_phrase);
734 }
735 ControlMessage::UnsubscribeAnnounces(m) => {
736 m.track_namespace_prefix.encode(buf);
737 }
738 ControlMessage::TrackStatusRequest(m) => {
739 m.request_id.encode(buf);
740 m.track_namespace.encode(buf);
741 VarInt::from_usize(m.track_name.len()).encode(buf);
742 buf.put_slice(&m.track_name);
743 KeyValuePair::encode_list(&m.parameters, buf);
744 }
745 ControlMessage::TrackStatus(m) => {
746 m.request_id.encode(buf);
747 m.status_code.encode(buf);
748 m.largest_location.encode(buf);
749 KeyValuePair::encode_list(&m.parameters, buf);
750 }
751 ControlMessage::Fetch(m) => {
752 m.request_id.encode(buf);
753 buf.put_u8(m.subscriber_priority);
754 m.group_order.encode(buf);
755 VarInt::from_usize(m.fetch_type as usize).encode(buf);
756 match &m.fetch_payload {
757 FetchPayload::Standalone {
758 track_namespace,
759 track_name,
760 start_group,
761 start_object,
762 end_group,
763 end_object,
764 } => {
765 track_namespace.encode(buf);
766 VarInt::from_usize(track_name.len()).encode(buf);
767 buf.put_slice(track_name);
768 start_group.encode(buf);
769 start_object.encode(buf);
770 end_group.encode(buf);
771 end_object.encode(buf);
772 }
773 FetchPayload::Joining { joining_subscribe_id, joining_start } => {
774 joining_subscribe_id.encode(buf);
775 joining_start.encode(buf);
776 }
777 }
778 KeyValuePair::encode_list(&m.parameters, buf);
779 }
780 ControlMessage::FetchOk(m) => {
781 m.request_id.encode(buf);
782 m.group_order.encode(buf);
783 m.end_of_track.encode(buf);
784 m.end_location.encode(buf);
785 KeyValuePair::encode_list(&m.parameters, buf);
786 }
787 ControlMessage::FetchError(m) => {
788 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
789 return Err(CodecError::ReasonPhraseTooLong);
790 }
791 m.request_id.encode(buf);
792 m.error_code.encode(buf);
793 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
794 buf.put_slice(&m.reason_phrase);
795 }
796 ControlMessage::FetchCancel(m) => {
797 m.request_id.encode(buf);
798 }
799 }
800 Ok(())
801 }
802
803 fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
804 match msg_type {
805 MessageType::ClientSetup => {
806 let num_versions = VarInt::decode(buf)?.into_inner() as usize;
807 if num_versions == 0 {
808 return Err(CodecError::InvalidField);
809 }
810 let mut supported_versions = Vec::with_capacity(num_versions);
811 for _ in 0..num_versions {
812 supported_versions.push(VarInt::decode(buf)?);
813 }
814 let parameters = KeyValuePair::decode_list(buf)?;
815 Ok(ControlMessage::ClientSetup(ClientSetup { supported_versions, parameters }))
816 }
817 MessageType::ServerSetup => {
818 let selected_version = VarInt::decode(buf)?;
819 let parameters = KeyValuePair::decode_list(buf)?;
820 Ok(ControlMessage::ServerSetup(ServerSetup { selected_version, parameters }))
821 }
822 MessageType::GoAway => {
823 let uri_len = VarInt::decode(buf)?.into_inner() as usize;
824 let uri = types::read_bytes(buf, uri_len)?;
825 Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
826 }
827 MessageType::MaxRequestId => {
828 let request_id = VarInt::decode(buf)?;
829 Ok(ControlMessage::MaxRequestId(MaxRequestId { request_id }))
830 }
831 MessageType::RequestsBlocked => {
832 let maximum_request_id = VarInt::decode(buf)?;
833 Ok(ControlMessage::RequestsBlocked(RequestsBlocked { maximum_request_id }))
834 }
835 MessageType::Subscribe => {
836 let request_id = VarInt::decode(buf)?;
837 let track_alias = VarInt::decode(buf)?;
838 let track_namespace = TrackNamespace::decode(buf)?;
839 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
840 let track_name = types::read_bytes(buf, track_name_len)?;
841 if buf.remaining() < 1 {
842 return Err(CodecError::UnexpectedEnd);
843 }
844 let subscriber_priority = buf.get_u8();
845 let group_order = VarInt::decode(buf)?;
846 let forward = VarInt::decode(buf)?;
847 let filter_type = VarInt::decode(buf)?;
848 let ft_val = filter_type.into_inner();
849 if ft_val == 0 || ft_val > 4 {
850 return Err(CodecError::InvalidField);
851 }
852 let (start_group, start_object) = if ft_val == 3 || ft_val == 4 {
853 (Some(VarInt::decode(buf)?), Some(VarInt::decode(buf)?))
854 } else {
855 (None, None)
856 };
857 let end_group = if ft_val == 4 { Some(VarInt::decode(buf)?) } else { None };
858 let parameters = KeyValuePair::decode_list(buf)?;
859 Ok(ControlMessage::Subscribe(Subscribe {
860 request_id,
861 track_alias,
862 track_namespace,
863 track_name,
864 subscriber_priority,
865 group_order,
866 forward,
867 filter_type,
868 start_group,
869 start_object,
870 end_group,
871 parameters,
872 }))
873 }
874 MessageType::SubscribeOk => {
875 let request_id = VarInt::decode(buf)?;
876 let expires = VarInt::decode(buf)?;
877 let group_order = VarInt::decode(buf)?;
878 let content_exists = VarInt::decode(buf)?;
879 let largest_location = if content_exists.into_inner() != 0 {
880 Some(Location::decode(buf)?)
881 } else {
882 None
883 };
884 let parameters = KeyValuePair::decode_list(buf)?;
885 Ok(ControlMessage::SubscribeOk(SubscribeOk {
886 request_id,
887 expires,
888 group_order,
889 content_exists,
890 largest_location,
891 parameters,
892 }))
893 }
894 MessageType::SubscribeError => {
895 let request_id = VarInt::decode(buf)?;
896 let error_code = VarInt::decode(buf)?;
897 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
898 let reason_phrase = types::read_bytes(buf, reason_len)?;
899 let track_alias = VarInt::decode(buf)?;
900 Ok(ControlMessage::SubscribeError(SubscribeError {
901 request_id,
902 error_code,
903 reason_phrase,
904 track_alias,
905 }))
906 }
907 MessageType::SubscribeUpdate => {
908 let request_id = VarInt::decode(buf)?;
909 let start_group = VarInt::decode(buf)?;
910 let start_object = VarInt::decode(buf)?;
911 let end_group = VarInt::decode(buf)?;
912 if buf.remaining() < 1 {
913 return Err(CodecError::UnexpectedEnd);
914 }
915 let subscriber_priority = buf.get_u8();
916 let forward = VarInt::decode(buf)?;
917 let parameters = KeyValuePair::decode_list(buf)?;
918 Ok(ControlMessage::SubscribeUpdate(SubscribeUpdate {
919 request_id,
920 start_group,
921 start_object,
922 end_group,
923 subscriber_priority,
924 forward,
925 parameters,
926 }))
927 }
928 MessageType::SubscribeDone => {
929 let request_id = VarInt::decode(buf)?;
930 let status_code = VarInt::decode(buf)?;
931 let stream_count = VarInt::decode(buf)?;
932 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
933 let reason_phrase = types::read_bytes(buf, reason_len)?;
934 Ok(ControlMessage::SubscribeDone(SubscribeDone {
935 request_id,
936 status_code,
937 stream_count,
938 reason_phrase,
939 }))
940 }
941 MessageType::Unsubscribe => {
942 let request_id = VarInt::decode(buf)?;
943 Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
944 }
945 MessageType::Announce => {
946 let request_id = VarInt::decode(buf)?;
947 let track_namespace = TrackNamespace::decode(buf)?;
948 let parameters = KeyValuePair::decode_list(buf)?;
949 Ok(ControlMessage::Announce(Announce { request_id, track_namespace, parameters }))
950 }
951 MessageType::AnnounceOk => {
952 let request_id = VarInt::decode(buf)?;
953 Ok(ControlMessage::AnnounceOk(AnnounceOk { request_id }))
954 }
955 MessageType::AnnounceError => {
956 let request_id = VarInt::decode(buf)?;
957 let error_code = VarInt::decode(buf)?;
958 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
959 let reason_phrase = types::read_bytes(buf, reason_len)?;
960 Ok(ControlMessage::AnnounceError(AnnounceError {
961 request_id,
962 error_code,
963 reason_phrase,
964 }))
965 }
966 MessageType::AnnounceCancel => {
967 let track_namespace = TrackNamespace::decode(buf)?;
968 let error_code = VarInt::decode(buf)?;
969 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
970 let reason_phrase = types::read_bytes(buf, reason_len)?;
971 Ok(ControlMessage::AnnounceCancel(AnnounceCancel {
972 track_namespace,
973 error_code,
974 reason_phrase,
975 }))
976 }
977 MessageType::Unannounce => {
978 let track_namespace = TrackNamespace::decode(buf)?;
979 Ok(ControlMessage::Unannounce(Unannounce { track_namespace }))
980 }
981 MessageType::SubscribeAnnounces => {
982 let request_id = VarInt::decode(buf)?;
983 let track_namespace_prefix = TrackNamespace::decode(buf)?;
984 let parameters = KeyValuePair::decode_list(buf)?;
985 Ok(ControlMessage::SubscribeAnnounces(SubscribeAnnounces {
986 request_id,
987 track_namespace_prefix,
988 parameters,
989 }))
990 }
991 MessageType::SubscribeAnnouncesOk => {
992 let request_id = VarInt::decode(buf)?;
993 Ok(ControlMessage::SubscribeAnnouncesOk(SubscribeAnnouncesOk { request_id }))
994 }
995 MessageType::SubscribeAnnouncesError => {
996 let request_id = VarInt::decode(buf)?;
997 let error_code = VarInt::decode(buf)?;
998 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
999 let reason_phrase = types::read_bytes(buf, reason_len)?;
1000 Ok(ControlMessage::SubscribeAnnouncesError(SubscribeAnnouncesError {
1001 request_id,
1002 error_code,
1003 reason_phrase,
1004 }))
1005 }
1006 MessageType::UnsubscribeAnnounces => {
1007 let track_namespace_prefix = TrackNamespace::decode(buf)?;
1008 Ok(ControlMessage::UnsubscribeAnnounces(UnsubscribeAnnounces {
1009 track_namespace_prefix,
1010 }))
1011 }
1012 MessageType::TrackStatusRequest => {
1013 let request_id = VarInt::decode(buf)?;
1014 let track_namespace = TrackNamespace::decode(buf)?;
1015 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1016 let track_name = types::read_bytes(buf, track_name_len)?;
1017 let parameters = KeyValuePair::decode_list(buf)?;
1018 Ok(ControlMessage::TrackStatusRequest(TrackStatusRequest {
1019 request_id,
1020 track_namespace,
1021 track_name,
1022 parameters,
1023 }))
1024 }
1025 MessageType::TrackStatus => {
1026 let request_id = VarInt::decode(buf)?;
1027 let status_code = VarInt::decode(buf)?;
1028 let largest_location = Location::decode(buf)?;
1029 let parameters = KeyValuePair::decode_list(buf)?;
1030 Ok(ControlMessage::TrackStatus(TrackStatus {
1031 request_id,
1032 status_code,
1033 largest_location,
1034 parameters,
1035 }))
1036 }
1037 MessageType::Fetch => {
1038 let request_id = VarInt::decode(buf)?;
1039 if buf.remaining() < 1 {
1040 return Err(CodecError::UnexpectedEnd);
1041 }
1042 let subscriber_priority = buf.get_u8();
1043 let group_order = VarInt::decode(buf)?;
1044 let fetch_type_val = VarInt::decode(buf)?.into_inner();
1045 let fetch_type =
1046 FetchType::from_u64(fetch_type_val).ok_or(CodecError::InvalidField)?;
1047 let fetch_payload = match fetch_type {
1048 FetchType::Standalone => {
1049 let track_namespace = TrackNamespace::decode(buf)?;
1050 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1051 let track_name = types::read_bytes(buf, track_name_len)?;
1052 let start_group = VarInt::decode(buf)?;
1053 let start_object = VarInt::decode(buf)?;
1054 let end_group = VarInt::decode(buf)?;
1055 let end_object = VarInt::decode(buf)?;
1056 FetchPayload::Standalone {
1057 track_namespace,
1058 track_name,
1059 start_group,
1060 start_object,
1061 end_group,
1062 end_object,
1063 }
1064 }
1065 FetchType::RelativeJoining | FetchType::AbsoluteJoining => {
1066 let joining_subscribe_id = VarInt::decode(buf)?;
1067 let joining_start = VarInt::decode(buf)?;
1068 FetchPayload::Joining { joining_subscribe_id, joining_start }
1069 }
1070 };
1071 let parameters = KeyValuePair::decode_list(buf)?;
1072 Ok(ControlMessage::Fetch(Fetch {
1073 request_id,
1074 subscriber_priority,
1075 group_order,
1076 fetch_type,
1077 fetch_payload,
1078 parameters,
1079 }))
1080 }
1081 MessageType::FetchOk => {
1082 let request_id = VarInt::decode(buf)?;
1083 let group_order = VarInt::decode(buf)?;
1084 let end_of_track = VarInt::decode(buf)?;
1085 let end_location = Location::decode(buf)?;
1086 let parameters = KeyValuePair::decode_list(buf)?;
1087 Ok(ControlMessage::FetchOk(FetchOk {
1088 request_id,
1089 group_order,
1090 end_of_track,
1091 end_location,
1092 parameters,
1093 }))
1094 }
1095 MessageType::FetchError => {
1096 let request_id = VarInt::decode(buf)?;
1097 let error_code = VarInt::decode(buf)?;
1098 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1099 let reason_phrase = types::read_bytes(buf, reason_len)?;
1100 Ok(ControlMessage::FetchError(FetchError { request_id, error_code, reason_phrase }))
1101 }
1102 MessageType::FetchCancel => {
1103 let request_id = VarInt::decode(buf)?;
1104 Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
1105 }
1106 }
1107 }
1108
1109 pub fn message_type(&self) -> MessageType {
1111 match self {
1112 ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
1113 ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
1114 ControlMessage::GoAway(_) => MessageType::GoAway,
1115 ControlMessage::MaxRequestId(_) => MessageType::MaxRequestId,
1116 ControlMessage::RequestsBlocked(_) => MessageType::RequestsBlocked,
1117 ControlMessage::Subscribe(_) => MessageType::Subscribe,
1118 ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
1119 ControlMessage::SubscribeError(_) => MessageType::SubscribeError,
1120 ControlMessage::SubscribeUpdate(_) => MessageType::SubscribeUpdate,
1121 ControlMessage::SubscribeDone(_) => MessageType::SubscribeDone,
1122 ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
1123 ControlMessage::Announce(_) => MessageType::Announce,
1124 ControlMessage::AnnounceOk(_) => MessageType::AnnounceOk,
1125 ControlMessage::AnnounceError(_) => MessageType::AnnounceError,
1126 ControlMessage::AnnounceCancel(_) => MessageType::AnnounceCancel,
1127 ControlMessage::Unannounce(_) => MessageType::Unannounce,
1128 ControlMessage::SubscribeAnnounces(_) => MessageType::SubscribeAnnounces,
1129 ControlMessage::SubscribeAnnouncesOk(_) => MessageType::SubscribeAnnouncesOk,
1130 ControlMessage::SubscribeAnnouncesError(_) => MessageType::SubscribeAnnouncesError,
1131 ControlMessage::UnsubscribeAnnounces(_) => MessageType::UnsubscribeAnnounces,
1132 ControlMessage::TrackStatusRequest(_) => MessageType::TrackStatusRequest,
1133 ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
1134 ControlMessage::Fetch(_) => MessageType::Fetch,
1135 ControlMessage::FetchOk(_) => MessageType::FetchOk,
1136 ControlMessage::FetchError(_) => MessageType::FetchError,
1137 ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
1138 }
1139 }
1140}