1pub use crate::error::{
2 CodecError, MAX_FULL_TRACK_NAME_LENGTH, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH,
3 MAX_NAMESPACE_TUPLE_SIZE, MAX_REASON_PHRASE_LENGTH,
4};
5use crate::kvp::KeyValuePair;
6use crate::types::*;
7use crate::varint::VarInt;
8use bytes::{Buf, BufMut};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12#[repr(u64)]
13pub enum MessageType {
14 SubscribeUpdate = 0x02,
16 Subscribe = 0x03,
18 SubscribeOk = 0x04,
20 SubscribeError = 0x05,
22 PublishNamespace = 0x06,
24 PublishNamespaceOk = 0x07,
26 PublishNamespaceError = 0x08,
28 PublishNamespaceDone = 0x09,
30 Unsubscribe = 0x0A,
32 PublishDone = 0x0B,
34 PublishNamespaceCancel = 0x0C,
36 TrackStatus = 0x0D,
38 TrackStatusOk = 0x0E,
40 TrackStatusError = 0x0F,
42 GoAway = 0x10,
44 SubscribeNamespace = 0x11,
46 SubscribeNamespaceOk = 0x12,
48 SubscribeNamespaceError = 0x13,
50 UnsubscribeNamespace = 0x14,
52 MaxRequestId = 0x15,
54 Fetch = 0x16,
56 FetchCancel = 0x17,
58 FetchOk = 0x18,
60 FetchError = 0x19,
62 RequestsBlocked = 0x1A,
64 Publish = 0x1D,
66 PublishOk = 0x1E,
68 PublishError = 0x1F,
70 ClientSetup = 0x20,
72 ServerSetup = 0x21,
74}
75
76impl MessageType {
77 pub fn from_id(id: u64) -> Option<Self> {
79 match id {
80 0x02 => Some(MessageType::SubscribeUpdate),
81 0x03 => Some(MessageType::Subscribe),
82 0x04 => Some(MessageType::SubscribeOk),
83 0x05 => Some(MessageType::SubscribeError),
84 0x06 => Some(MessageType::PublishNamespace),
85 0x07 => Some(MessageType::PublishNamespaceOk),
86 0x08 => Some(MessageType::PublishNamespaceError),
87 0x09 => Some(MessageType::PublishNamespaceDone),
88 0x0A => Some(MessageType::Unsubscribe),
89 0x0B => Some(MessageType::PublishDone),
90 0x0C => Some(MessageType::PublishNamespaceCancel),
91 0x0D => Some(MessageType::TrackStatus),
92 0x0E => Some(MessageType::TrackStatusOk),
93 0x0F => Some(MessageType::TrackStatusError),
94 0x10 => Some(MessageType::GoAway),
95 0x11 => Some(MessageType::SubscribeNamespace),
96 0x12 => Some(MessageType::SubscribeNamespaceOk),
97 0x13 => Some(MessageType::SubscribeNamespaceError),
98 0x14 => Some(MessageType::UnsubscribeNamespace),
99 0x15 => Some(MessageType::MaxRequestId),
100 0x16 => Some(MessageType::Fetch),
101 0x17 => Some(MessageType::FetchCancel),
102 0x18 => Some(MessageType::FetchOk),
103 0x19 => Some(MessageType::FetchError),
104 0x1A => Some(MessageType::RequestsBlocked),
105 0x1D => Some(MessageType::Publish),
106 0x1E => Some(MessageType::PublishOk),
107 0x1F => Some(MessageType::PublishError),
108 0x20 => Some(MessageType::ClientSetup),
109 0x21 => Some(MessageType::ServerSetup),
110 _ => None,
111 }
112 }
113
114 pub fn id(&self) -> u64 {
116 *self as u64
117 }
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
126pub struct ClientSetup {
127 pub supported_versions: Vec<VarInt>,
129 pub parameters: Vec<KeyValuePair>,
131}
132
133#[derive(Debug, Clone, PartialEq, Eq)]
135pub struct ServerSetup {
136 pub selected_version: VarInt,
138 pub parameters: Vec<KeyValuePair>,
140}
141
142#[derive(Debug, Clone, PartialEq, Eq)]
144pub struct GoAway {
145 pub new_session_uri: Vec<u8>,
147}
148
149#[derive(Debug, Clone, PartialEq, Eq)]
151pub struct MaxRequestId {
152 pub request_id: VarInt,
154}
155
156#[derive(Debug, Clone, PartialEq, Eq)]
158pub struct RequestsBlocked {
159 pub maximum_request_id: VarInt,
161}
162
163#[derive(Debug, Clone, PartialEq, Eq)]
169pub struct Subscribe {
170 pub request_id: VarInt,
172 pub track_namespace: TrackNamespace,
174 pub track_name: Vec<u8>,
176 pub subscriber_priority: u8,
178 pub group_order: GroupOrder,
180 pub forward: Forward,
182 pub filter_type: FilterType,
184 pub start_location: Option<Location>,
186 pub end_group: Option<VarInt>,
188 pub parameters: Vec<KeyValuePair>,
190}
191
192#[derive(Debug, Clone, PartialEq, Eq)]
194pub struct SubscribeOk {
195 pub request_id: VarInt,
197 pub track_alias: VarInt,
199 pub expires: VarInt,
201 pub group_order: GroupOrder,
203 pub content_exists: ContentExists,
205 pub largest_location: Option<Location>,
207 pub parameters: Vec<KeyValuePair>,
209}
210
211#[derive(Debug, Clone, PartialEq, Eq)]
213pub struct SubscribeError {
214 pub request_id: VarInt,
216 pub error_code: VarInt,
218 pub reason_phrase: Vec<u8>,
220}
221
222#[derive(Debug, Clone, PartialEq, Eq)]
224pub struct SubscribeUpdate {
225 pub request_id: VarInt,
227 pub subscription_request_id: VarInt,
229 pub start_location: Location,
231 pub end_group: VarInt,
233 pub subscriber_priority: u8,
235 pub forward: Forward,
237 pub parameters: Vec<KeyValuePair>,
239}
240
241#[derive(Debug, Clone, PartialEq, Eq)]
243pub struct Unsubscribe {
244 pub request_id: VarInt,
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
254pub struct Publish {
255 pub request_id: VarInt,
257 pub track_namespace: TrackNamespace,
259 pub track_name: Vec<u8>,
261 pub track_alias: VarInt,
263 pub group_order: GroupOrder,
265 pub content_exists: ContentExists,
267 pub largest_location: Option<Location>,
269 pub forward: Forward,
271 pub parameters: Vec<KeyValuePair>,
273}
274
275#[derive(Debug, Clone, PartialEq, Eq)]
277pub struct PublishOk {
278 pub request_id: VarInt,
280 pub forward: Forward,
282 pub subscriber_priority: u8,
284 pub group_order: GroupOrder,
286 pub filter_type: FilterType,
288 pub start_location: Option<Location>,
290 pub end_group: Option<VarInt>,
292 pub parameters: Vec<KeyValuePair>,
294}
295
296#[derive(Debug, Clone, PartialEq, Eq)]
298pub struct PublishError {
299 pub request_id: VarInt,
301 pub error_code: VarInt,
303 pub reason_phrase: Vec<u8>,
305}
306
307#[derive(Debug, Clone, PartialEq, Eq)]
309pub struct PublishDone {
310 pub request_id: VarInt,
312 pub status_code: VarInt,
314 pub stream_count: VarInt,
316 pub reason_phrase: Vec<u8>,
318}
319
320#[derive(Debug, Clone, PartialEq, Eq)]
326pub struct PublishNamespace {
327 pub request_id: VarInt,
329 pub track_namespace: TrackNamespace,
331 pub parameters: Vec<KeyValuePair>,
333}
334
335#[derive(Debug, Clone, PartialEq, Eq)]
337pub struct PublishNamespaceOk {
338 pub request_id: VarInt,
340 pub parameters: Vec<KeyValuePair>,
342}
343
344#[derive(Debug, Clone, PartialEq, Eq)]
346pub struct PublishNamespaceError {
347 pub request_id: VarInt,
349 pub error_code: VarInt,
351 pub reason_phrase: Vec<u8>,
353}
354
355#[derive(Debug, Clone, PartialEq, Eq)]
357pub struct PublishNamespaceDone {
358 pub track_namespace: TrackNamespace,
360}
361
362#[derive(Debug, Clone, PartialEq, Eq)]
364pub struct PublishNamespaceCancel {
365 pub track_namespace: TrackNamespace,
367 pub error_code: VarInt,
369 pub reason_phrase: Vec<u8>,
371}
372
373#[derive(Debug, Clone, PartialEq, Eq)]
379pub struct SubscribeNamespace {
380 pub request_id: VarInt,
382 pub track_namespace: TrackNamespace,
384 pub parameters: Vec<KeyValuePair>,
386}
387
388#[derive(Debug, Clone, PartialEq, Eq)]
390pub struct SubscribeNamespaceOk {
391 pub request_id: VarInt,
393 pub parameters: Vec<KeyValuePair>,
395}
396
397#[derive(Debug, Clone, PartialEq, Eq)]
399pub struct SubscribeNamespaceError {
400 pub request_id: VarInt,
402 pub error_code: VarInt,
404 pub reason_phrase: Vec<u8>,
406}
407
408#[derive(Debug, Clone, PartialEq, Eq)]
410pub struct UnsubscribeNamespace {
411 pub track_namespace_prefix: TrackNamespace,
413}
414
415#[derive(Debug, Clone, Copy, PartialEq, Eq)]
421#[repr(u64)]
422pub enum FetchType {
423 Standalone = 1,
425 RelativeJoining = 2,
427 AbsoluteJoining = 3,
429}
430
431impl FetchType {
432 pub fn from_u64(v: u64) -> Option<Self> {
434 match v {
435 1 => Some(FetchType::Standalone),
436 2 => Some(FetchType::RelativeJoining),
437 3 => Some(FetchType::AbsoluteJoining),
438 _ => None,
439 }
440 }
441}
442
443#[derive(Debug, Clone, PartialEq, Eq)]
445pub enum FetchPayload {
446 Standalone {
448 track_namespace: TrackNamespace,
450 track_name: Vec<u8>,
452 start_group: VarInt,
454 start_object: VarInt,
456 end_group: VarInt,
458 end_object: VarInt,
460 },
461 Joining {
463 joining_request_id: VarInt,
465 joining_start: VarInt,
467 },
468}
469
470#[derive(Debug, Clone, PartialEq, Eq)]
472pub struct Fetch {
473 pub request_id: VarInt,
475 pub subscriber_priority: u8,
477 pub group_order: GroupOrder,
479 pub fetch_type: FetchType,
481 pub fetch_payload: FetchPayload,
483 pub parameters: Vec<KeyValuePair>,
485}
486
487#[derive(Debug, Clone, PartialEq, Eq)]
489pub struct FetchOk {
490 pub request_id: VarInt,
492 pub group_order: GroupOrder,
494 pub end_of_track: VarInt,
496 pub end_location: Location,
498 pub parameters: Vec<KeyValuePair>,
500}
501
502#[derive(Debug, Clone, PartialEq, Eq)]
504pub struct FetchError {
505 pub request_id: VarInt,
507 pub error_code: VarInt,
509 pub reason_phrase: Vec<u8>,
511}
512
513#[derive(Debug, Clone, PartialEq, Eq)]
515pub struct FetchCancel {
516 pub request_id: VarInt,
518}
519
520#[derive(Debug, Clone, PartialEq, Eq)]
526pub struct TrackStatus {
527 pub request_id: VarInt,
529 pub track_namespace: TrackNamespace,
531 pub track_name: Vec<u8>,
533 pub subscriber_priority: u8,
535 pub group_order: GroupOrder,
537 pub forward: Forward,
539 pub filter_type: FilterType,
541 pub start_location: Option<Location>,
543 pub end_group: Option<VarInt>,
545 pub parameters: Vec<KeyValuePair>,
547}
548
549#[derive(Debug, Clone, PartialEq, Eq)]
551pub struct TrackStatusOk {
552 pub request_id: VarInt,
554 pub track_alias: VarInt,
556 pub expires: VarInt,
558 pub group_order: GroupOrder,
560 pub content_exists: ContentExists,
562 pub largest_location: Option<Location>,
564 pub parameters: Vec<KeyValuePair>,
566}
567
568#[derive(Debug, Clone, PartialEq, Eq)]
570pub struct TrackStatusError {
571 pub request_id: VarInt,
573 pub error_code: VarInt,
575 pub reason_phrase: Vec<u8>,
577}
578
579#[derive(Debug, Clone, PartialEq, Eq)]
585pub enum ControlMessage {
586 ClientSetup(ClientSetup),
588 ServerSetup(ServerSetup),
590 GoAway(GoAway),
592 MaxRequestId(MaxRequestId),
594 RequestsBlocked(RequestsBlocked),
596 Subscribe(Subscribe),
598 SubscribeOk(SubscribeOk),
600 SubscribeError(SubscribeError),
602 SubscribeUpdate(SubscribeUpdate),
604 Unsubscribe(Unsubscribe),
606 Publish(Publish),
608 PublishOk(PublishOk),
610 PublishError(PublishError),
612 PublishDone(PublishDone),
614 PublishNamespace(PublishNamespace),
616 PublishNamespaceOk(PublishNamespaceOk),
618 PublishNamespaceError(PublishNamespaceError),
620 PublishNamespaceDone(PublishNamespaceDone),
622 PublishNamespaceCancel(PublishNamespaceCancel),
624 SubscribeNamespace(SubscribeNamespace),
626 SubscribeNamespaceOk(SubscribeNamespaceOk),
628 SubscribeNamespaceError(SubscribeNamespaceError),
630 UnsubscribeNamespace(UnsubscribeNamespace),
632 Fetch(Fetch),
634 FetchOk(FetchOk),
636 FetchError(FetchError),
638 FetchCancel(FetchCancel),
640 TrackStatus(TrackStatus),
642 TrackStatusOk(TrackStatusOk),
644 TrackStatusError(TrackStatusError),
646}
647
648impl ControlMessage {
649 pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
653 let mut payload = Vec::with_capacity(256);
654 self.encode_payload(&mut payload)?;
655
656 if payload.len() > MAX_MESSAGE_LENGTH {
657 return Err(CodecError::MessageTooLong(payload.len()));
658 }
659
660 VarInt::from_usize(self.message_type().id() as usize).encode(buf);
661 buf.put_u16(payload.len() as u16);
663 buf.put_slice(&payload);
664 Ok(())
665 }
666
667 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
671 let type_id = VarInt::decode(buf)?.into_inner();
672 let msg_type =
673 MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
674 if buf.remaining() < 2 {
676 return Err(CodecError::UnexpectedEnd);
677 }
678 let payload_len = buf.get_u16() as usize;
679 if buf.remaining() < payload_len {
680 return Err(CodecError::UnexpectedEnd);
681 }
682 let payload_bytes = buf.copy_to_bytes(payload_len);
683 let mut payload = &payload_bytes[..];
684 Self::decode_payload(msg_type, &mut payload)
685 }
686
687 fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
688 match self {
689 ControlMessage::ClientSetup(m) => {
690 VarInt::from_usize(m.supported_versions.len()).encode(buf);
691 for v in &m.supported_versions {
692 v.encode(buf);
693 }
694 KeyValuePair::encode_list(&m.parameters, buf);
695 }
696 ControlMessage::ServerSetup(m) => {
697 m.selected_version.encode(buf);
698 KeyValuePair::encode_list(&m.parameters, buf);
699 }
700 ControlMessage::GoAway(m) => {
701 if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
702 return Err(CodecError::GoAwayUriTooLong);
703 }
704 VarInt::from_usize(m.new_session_uri.len()).encode(buf);
705 buf.put_slice(&m.new_session_uri);
706 }
707 ControlMessage::MaxRequestId(m) => {
708 m.request_id.encode(buf);
709 }
710 ControlMessage::RequestsBlocked(m) => {
711 m.maximum_request_id.encode(buf);
712 }
713 ControlMessage::Subscribe(m) => {
714 m.request_id.encode(buf);
715 m.track_namespace.encode(buf);
716 VarInt::from_usize(m.track_name.len()).encode(buf);
717 buf.put_slice(&m.track_name);
718 buf.put_u8(m.subscriber_priority);
719 buf.put_u8(m.group_order as u8);
720 buf.put_u8(m.forward as u8);
721 buf.put_u8(m.filter_type as u8);
722 if let Some(loc) = &m.start_location {
723 loc.encode(buf);
724 }
725 if let Some(eg) = &m.end_group {
726 eg.encode(buf);
727 }
728 KeyValuePair::encode_list(&m.parameters, buf);
729 }
730 ControlMessage::SubscribeOk(m) => {
731 m.request_id.encode(buf);
732 m.track_alias.encode(buf);
733 m.expires.encode(buf);
734 buf.put_u8(m.group_order as u8);
735 buf.put_u8(m.content_exists as u8);
736 if let Some(loc) = &m.largest_location {
737 loc.encode(buf);
738 }
739 KeyValuePair::encode_list(&m.parameters, buf);
740 }
741 ControlMessage::SubscribeError(m) => {
742 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
743 return Err(CodecError::ReasonPhraseTooLong);
744 }
745 m.request_id.encode(buf);
746 m.error_code.encode(buf);
747 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
748 buf.put_slice(&m.reason_phrase);
749 }
750 ControlMessage::SubscribeUpdate(m) => {
751 m.request_id.encode(buf);
752 m.subscription_request_id.encode(buf);
753 m.start_location.encode(buf);
754 m.end_group.encode(buf);
755 buf.put_u8(m.subscriber_priority);
756 buf.put_u8(m.forward as u8);
757 KeyValuePair::encode_list(&m.parameters, buf);
758 }
759 ControlMessage::Unsubscribe(m) => {
760 m.request_id.encode(buf);
761 }
762 ControlMessage::Publish(m) => {
763 m.request_id.encode(buf);
764 m.track_namespace.encode(buf);
765 VarInt::from_usize(m.track_name.len()).encode(buf);
766 buf.put_slice(&m.track_name);
767 m.track_alias.encode(buf);
768 buf.put_u8(m.group_order as u8);
769 buf.put_u8(m.content_exists as u8);
770 if let Some(loc) = &m.largest_location {
771 loc.encode(buf);
772 }
773 buf.put_u8(m.forward as u8);
774 KeyValuePair::encode_list(&m.parameters, buf);
775 }
776 ControlMessage::PublishOk(m) => {
777 m.request_id.encode(buf);
778 buf.put_u8(m.forward as u8);
779 buf.put_u8(m.subscriber_priority);
780 buf.put_u8(m.group_order as u8);
781 buf.put_u8(m.filter_type as u8);
782 if let Some(loc) = &m.start_location {
783 loc.encode(buf);
784 }
785 if let Some(eg) = &m.end_group {
786 eg.encode(buf);
787 }
788 KeyValuePair::encode_list(&m.parameters, buf);
789 }
790 ControlMessage::PublishError(m) => {
791 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
792 return Err(CodecError::ReasonPhraseTooLong);
793 }
794 m.request_id.encode(buf);
795 m.error_code.encode(buf);
796 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
797 buf.put_slice(&m.reason_phrase);
798 }
799 ControlMessage::PublishDone(m) => {
800 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
801 return Err(CodecError::ReasonPhraseTooLong);
802 }
803 m.request_id.encode(buf);
804 m.status_code.encode(buf);
805 m.stream_count.encode(buf);
806 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
807 buf.put_slice(&m.reason_phrase);
808 }
809 ControlMessage::PublishNamespace(m) => {
810 m.request_id.encode(buf);
811 m.track_namespace.encode(buf);
812 KeyValuePair::encode_list(&m.parameters, buf);
813 }
814 ControlMessage::PublishNamespaceOk(m) => {
815 m.request_id.encode(buf);
816 KeyValuePair::encode_list(&m.parameters, buf);
817 }
818 ControlMessage::PublishNamespaceError(m) => {
819 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
820 return Err(CodecError::ReasonPhraseTooLong);
821 }
822 m.request_id.encode(buf);
823 m.error_code.encode(buf);
824 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
825 buf.put_slice(&m.reason_phrase);
826 }
827 ControlMessage::PublishNamespaceDone(m) => {
828 m.track_namespace.encode(buf);
829 }
830 ControlMessage::PublishNamespaceCancel(m) => {
831 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
832 return Err(CodecError::ReasonPhraseTooLong);
833 }
834 m.track_namespace.encode(buf);
835 m.error_code.encode(buf);
836 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
837 buf.put_slice(&m.reason_phrase);
838 }
839 ControlMessage::SubscribeNamespace(m) => {
840 m.request_id.encode(buf);
841 m.track_namespace.encode(buf);
842 KeyValuePair::encode_list(&m.parameters, buf);
843 }
844 ControlMessage::SubscribeNamespaceOk(m) => {
845 m.request_id.encode(buf);
846 KeyValuePair::encode_list(&m.parameters, buf);
847 }
848 ControlMessage::SubscribeNamespaceError(m) => {
849 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
850 return Err(CodecError::ReasonPhraseTooLong);
851 }
852 m.request_id.encode(buf);
853 m.error_code.encode(buf);
854 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
855 buf.put_slice(&m.reason_phrase);
856 }
857 ControlMessage::UnsubscribeNamespace(m) => {
858 m.track_namespace_prefix.encode(buf);
859 }
860 ControlMessage::Fetch(m) => {
861 m.request_id.encode(buf);
862 buf.put_u8(m.subscriber_priority);
863 buf.put_u8(m.group_order as u8);
864 VarInt::from_usize(m.fetch_type as usize).encode(buf);
865 match &m.fetch_payload {
866 FetchPayload::Standalone {
867 track_namespace,
868 track_name,
869 start_group,
870 start_object,
871 end_group,
872 end_object,
873 } => {
874 track_namespace.encode(buf);
875 VarInt::from_usize(track_name.len()).encode(buf);
876 buf.put_slice(track_name);
877 start_group.encode(buf);
878 start_object.encode(buf);
879 end_group.encode(buf);
880 end_object.encode(buf);
881 }
882 FetchPayload::Joining { joining_request_id, joining_start } => {
883 joining_request_id.encode(buf);
884 joining_start.encode(buf);
885 }
886 }
887 KeyValuePair::encode_list(&m.parameters, buf);
888 }
889 ControlMessage::FetchOk(m) => {
890 m.request_id.encode(buf);
891 buf.put_u8(m.group_order as u8);
892 m.end_of_track.encode(buf);
893 m.end_location.encode(buf);
894 KeyValuePair::encode_list(&m.parameters, buf);
895 }
896 ControlMessage::FetchError(m) => {
897 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
898 return Err(CodecError::ReasonPhraseTooLong);
899 }
900 m.request_id.encode(buf);
901 m.error_code.encode(buf);
902 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
903 buf.put_slice(&m.reason_phrase);
904 }
905 ControlMessage::FetchCancel(m) => {
906 m.request_id.encode(buf);
907 }
908 ControlMessage::TrackStatus(m) => {
909 m.request_id.encode(buf);
910 m.track_namespace.encode(buf);
911 VarInt::from_usize(m.track_name.len()).encode(buf);
912 buf.put_slice(&m.track_name);
913 buf.put_u8(m.subscriber_priority);
914 buf.put_u8(m.group_order as u8);
915 buf.put_u8(m.forward as u8);
916 buf.put_u8(m.filter_type as u8);
917 if let Some(loc) = &m.start_location {
918 loc.encode(buf);
919 }
920 if let Some(eg) = &m.end_group {
921 eg.encode(buf);
922 }
923 KeyValuePair::encode_list(&m.parameters, buf);
924 }
925 ControlMessage::TrackStatusOk(m) => {
926 m.request_id.encode(buf);
927 m.track_alias.encode(buf);
928 m.expires.encode(buf);
929 buf.put_u8(m.group_order as u8);
930 buf.put_u8(m.content_exists as u8);
931 if let Some(loc) = &m.largest_location {
932 loc.encode(buf);
933 }
934 KeyValuePair::encode_list(&m.parameters, buf);
935 }
936 ControlMessage::TrackStatusError(m) => {
937 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
938 return Err(CodecError::ReasonPhraseTooLong);
939 }
940 m.request_id.encode(buf);
941 m.error_code.encode(buf);
942 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
943 buf.put_slice(&m.reason_phrase);
944 }
945 }
946 Ok(())
947 }
948
949 fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
950 match msg_type {
951 MessageType::ClientSetup => {
952 let num_versions = VarInt::decode(buf)?.into_inner() as usize;
953 if num_versions == 0 {
954 return Err(CodecError::InvalidField);
955 }
956 let mut supported_versions = Vec::with_capacity(num_versions);
957 for _ in 0..num_versions {
958 supported_versions.push(VarInt::decode(buf)?);
959 }
960 let parameters = KeyValuePair::decode_list(buf)?;
961 Ok(ControlMessage::ClientSetup(ClientSetup { supported_versions, parameters }))
962 }
963 MessageType::ServerSetup => {
964 let selected_version = VarInt::decode(buf)?;
965 let parameters = KeyValuePair::decode_list(buf)?;
966 Ok(ControlMessage::ServerSetup(ServerSetup { selected_version, parameters }))
967 }
968 MessageType::GoAway => {
969 let uri_len = VarInt::decode(buf)?.into_inner() as usize;
970 let uri = read_bytes(buf, uri_len)?;
971 Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
972 }
973 MessageType::MaxRequestId => {
974 let request_id = VarInt::decode(buf)?;
975 Ok(ControlMessage::MaxRequestId(MaxRequestId { request_id }))
976 }
977 MessageType::RequestsBlocked => {
978 let maximum_request_id = VarInt::decode(buf)?;
979 Ok(ControlMessage::RequestsBlocked(RequestsBlocked { maximum_request_id }))
980 }
981 MessageType::Subscribe => {
982 let request_id = VarInt::decode(buf)?;
983 let track_namespace = TrackNamespace::decode(buf)?;
984 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
985 let track_name = read_bytes(buf, track_name_len)?;
986 if buf.remaining() < 4 {
987 return Err(CodecError::UnexpectedEnd);
988 }
989 let subscriber_priority = buf.get_u8();
990 let group_order =
991 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
992 let forward_val = buf.get_u8();
993 let forward = match forward_val {
994 0 => Forward::DontForward,
995 1 => Forward::Forward,
996 _ => return Err(CodecError::InvalidField),
997 };
998 let filter_val = buf.get_u8();
999 let filter_type =
1000 FilterType::from_u8(filter_val).ok_or(CodecError::InvalidField)?;
1001 let start_location = match filter_type {
1002 FilterType::AbsoluteStart | FilterType::AbsoluteRange => {
1003 Some(Location::decode(buf)?)
1004 }
1005 _ => None,
1006 };
1007 let end_group = match filter_type {
1008 FilterType::AbsoluteRange => Some(VarInt::decode(buf)?),
1009 _ => None,
1010 };
1011 let parameters = KeyValuePair::decode_list(buf)?;
1012 Ok(ControlMessage::Subscribe(Subscribe {
1013 request_id,
1014 track_namespace,
1015 track_name,
1016 subscriber_priority,
1017 group_order,
1018 forward,
1019 filter_type,
1020 start_location,
1021 end_group,
1022 parameters,
1023 }))
1024 }
1025 MessageType::SubscribeOk => {
1026 let request_id = VarInt::decode(buf)?;
1027 let track_alias = VarInt::decode(buf)?;
1028 let expires = VarInt::decode(buf)?;
1029 if buf.remaining() < 2 {
1030 return Err(CodecError::UnexpectedEnd);
1031 }
1032 let group_order =
1033 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1034 let content_exists_val = buf.get_u8();
1035 let content_exists = match content_exists_val {
1036 0 => ContentExists::NoLargestLocation,
1037 1 => ContentExists::HasLargestLocation,
1038 _ => return Err(CodecError::InvalidField),
1039 };
1040 let largest_location = if content_exists == ContentExists::HasLargestLocation {
1041 Some(Location::decode(buf)?)
1042 } else {
1043 None
1044 };
1045 let parameters = KeyValuePair::decode_list(buf)?;
1046 Ok(ControlMessage::SubscribeOk(SubscribeOk {
1047 request_id,
1048 track_alias,
1049 expires,
1050 group_order,
1051 content_exists,
1052 largest_location,
1053 parameters,
1054 }))
1055 }
1056 MessageType::SubscribeError => {
1057 let request_id = VarInt::decode(buf)?;
1058 let error_code = VarInt::decode(buf)?;
1059 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1060 let reason_phrase = read_bytes(buf, reason_len)?;
1061 Ok(ControlMessage::SubscribeError(SubscribeError {
1062 request_id,
1063 error_code,
1064 reason_phrase,
1065 }))
1066 }
1067 MessageType::SubscribeUpdate => {
1068 let request_id = VarInt::decode(buf)?;
1069 let subscription_request_id = VarInt::decode(buf)?;
1070 let start_location = Location::decode(buf)?;
1071 let end_group = VarInt::decode(buf)?;
1072 if buf.remaining() < 2 {
1073 return Err(CodecError::UnexpectedEnd);
1074 }
1075 let subscriber_priority = buf.get_u8();
1076 let forward_val = buf.get_u8();
1077 let forward = match forward_val {
1078 0 => Forward::DontForward,
1079 1 => Forward::Forward,
1080 _ => return Err(CodecError::InvalidField),
1081 };
1082 let parameters = KeyValuePair::decode_list(buf)?;
1083 Ok(ControlMessage::SubscribeUpdate(SubscribeUpdate {
1084 request_id,
1085 subscription_request_id,
1086 start_location,
1087 end_group,
1088 subscriber_priority,
1089 forward,
1090 parameters,
1091 }))
1092 }
1093 MessageType::Unsubscribe => {
1094 let request_id = VarInt::decode(buf)?;
1095 Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
1096 }
1097 MessageType::Publish => {
1098 let request_id = VarInt::decode(buf)?;
1099 let track_namespace = TrackNamespace::decode(buf)?;
1100 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1101 let track_name = read_bytes(buf, track_name_len)?;
1102 let track_alias = VarInt::decode(buf)?;
1103 if buf.remaining() < 2 {
1104 return Err(CodecError::UnexpectedEnd);
1105 }
1106 let group_order =
1107 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1108 let content_exists_val = buf.get_u8();
1109 let content_exists = match content_exists_val {
1110 0 => ContentExists::NoLargestLocation,
1111 1 => ContentExists::HasLargestLocation,
1112 _ => return Err(CodecError::InvalidField),
1113 };
1114 let largest_location = if content_exists == ContentExists::HasLargestLocation {
1115 Some(Location::decode(buf)?)
1116 } else {
1117 None
1118 };
1119 if buf.remaining() < 1 {
1120 return Err(CodecError::UnexpectedEnd);
1121 }
1122 let forward_val = buf.get_u8();
1123 let forward = match forward_val {
1124 0 => Forward::DontForward,
1125 1 => Forward::Forward,
1126 _ => return Err(CodecError::InvalidField),
1127 };
1128 let parameters = KeyValuePair::decode_list(buf)?;
1129 Ok(ControlMessage::Publish(Publish {
1130 request_id,
1131 track_namespace,
1132 track_name,
1133 track_alias,
1134 group_order,
1135 content_exists,
1136 largest_location,
1137 forward,
1138 parameters,
1139 }))
1140 }
1141 MessageType::PublishOk => {
1142 let request_id = VarInt::decode(buf)?;
1143 if buf.remaining() < 4 {
1144 return Err(CodecError::UnexpectedEnd);
1145 }
1146 let forward_val = buf.get_u8();
1147 let forward = match forward_val {
1148 0 => Forward::DontForward,
1149 1 => Forward::Forward,
1150 _ => return Err(CodecError::InvalidField),
1151 };
1152 let subscriber_priority = buf.get_u8();
1153 let group_order =
1154 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1155 let filter_val = buf.get_u8();
1156 let filter_type =
1157 FilterType::from_u8(filter_val).ok_or(CodecError::InvalidField)?;
1158 let start_location = match filter_type {
1159 FilterType::AbsoluteStart | FilterType::AbsoluteRange => {
1160 Some(Location::decode(buf)?)
1161 }
1162 _ => None,
1163 };
1164 let end_group = match filter_type {
1165 FilterType::AbsoluteRange => Some(VarInt::decode(buf)?),
1166 _ => None,
1167 };
1168 let parameters = KeyValuePair::decode_list(buf)?;
1169 Ok(ControlMessage::PublishOk(PublishOk {
1170 request_id,
1171 forward,
1172 subscriber_priority,
1173 group_order,
1174 filter_type,
1175 start_location,
1176 end_group,
1177 parameters,
1178 }))
1179 }
1180 MessageType::PublishError => {
1181 let request_id = VarInt::decode(buf)?;
1182 let error_code = VarInt::decode(buf)?;
1183 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1184 let reason_phrase = read_bytes(buf, reason_len)?;
1185 Ok(ControlMessage::PublishError(PublishError {
1186 request_id,
1187 error_code,
1188 reason_phrase,
1189 }))
1190 }
1191 MessageType::PublishDone => {
1192 let request_id = VarInt::decode(buf)?;
1193 let status_code = VarInt::decode(buf)?;
1194 let stream_count = VarInt::decode(buf)?;
1195 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1196 let reason_phrase = read_bytes(buf, reason_len)?;
1197 Ok(ControlMessage::PublishDone(PublishDone {
1198 request_id,
1199 status_code,
1200 stream_count,
1201 reason_phrase,
1202 }))
1203 }
1204 MessageType::PublishNamespace => {
1205 let request_id = VarInt::decode(buf)?;
1206 let track_namespace = TrackNamespace::decode(buf)?;
1207 let parameters = KeyValuePair::decode_list(buf)?;
1208 Ok(ControlMessage::PublishNamespace(PublishNamespace {
1209 request_id,
1210 track_namespace,
1211 parameters,
1212 }))
1213 }
1214 MessageType::PublishNamespaceOk => {
1215 let request_id = VarInt::decode(buf)?;
1216 let parameters = KeyValuePair::decode_list(buf)?;
1217 Ok(ControlMessage::PublishNamespaceOk(PublishNamespaceOk {
1218 request_id,
1219 parameters,
1220 }))
1221 }
1222 MessageType::PublishNamespaceError => {
1223 let request_id = VarInt::decode(buf)?;
1224 let error_code = VarInt::decode(buf)?;
1225 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1226 let reason_phrase = read_bytes(buf, reason_len)?;
1227 Ok(ControlMessage::PublishNamespaceError(PublishNamespaceError {
1228 request_id,
1229 error_code,
1230 reason_phrase,
1231 }))
1232 }
1233 MessageType::PublishNamespaceDone => {
1234 let track_namespace = TrackNamespace::decode(buf)?;
1235 Ok(ControlMessage::PublishNamespaceDone(PublishNamespaceDone { track_namespace }))
1236 }
1237 MessageType::PublishNamespaceCancel => {
1238 let track_namespace = TrackNamespace::decode(buf)?;
1239 let error_code = VarInt::decode(buf)?;
1240 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1241 let reason_phrase = read_bytes(buf, reason_len)?;
1242 Ok(ControlMessage::PublishNamespaceCancel(PublishNamespaceCancel {
1243 track_namespace,
1244 error_code,
1245 reason_phrase,
1246 }))
1247 }
1248 MessageType::SubscribeNamespace => {
1249 let request_id = VarInt::decode(buf)?;
1250 let track_namespace = TrackNamespace::decode(buf)?;
1251 let parameters = KeyValuePair::decode_list(buf)?;
1252 Ok(ControlMessage::SubscribeNamespace(SubscribeNamespace {
1253 request_id,
1254 track_namespace,
1255 parameters,
1256 }))
1257 }
1258 MessageType::SubscribeNamespaceOk => {
1259 let request_id = VarInt::decode(buf)?;
1260 let parameters = KeyValuePair::decode_list(buf)?;
1261 Ok(ControlMessage::SubscribeNamespaceOk(SubscribeNamespaceOk {
1262 request_id,
1263 parameters,
1264 }))
1265 }
1266 MessageType::SubscribeNamespaceError => {
1267 let request_id = VarInt::decode(buf)?;
1268 let error_code = VarInt::decode(buf)?;
1269 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1270 let reason_phrase = read_bytes(buf, reason_len)?;
1271 Ok(ControlMessage::SubscribeNamespaceError(SubscribeNamespaceError {
1272 request_id,
1273 error_code,
1274 reason_phrase,
1275 }))
1276 }
1277 MessageType::UnsubscribeNamespace => {
1278 let track_namespace_prefix = TrackNamespace::decode(buf)?;
1279 Ok(ControlMessage::UnsubscribeNamespace(UnsubscribeNamespace {
1280 track_namespace_prefix,
1281 }))
1282 }
1283 MessageType::Fetch => {
1284 let request_id = VarInt::decode(buf)?;
1285 if buf.remaining() < 2 {
1286 return Err(CodecError::UnexpectedEnd);
1287 }
1288 let subscriber_priority = buf.get_u8();
1289 let group_order =
1290 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1291 let fetch_type_val = VarInt::decode(buf)?.into_inner();
1292 let fetch_type =
1293 FetchType::from_u64(fetch_type_val).ok_or(CodecError::InvalidField)?;
1294 let fetch_payload = match fetch_type {
1295 FetchType::Standalone => {
1296 let track_namespace = TrackNamespace::decode(buf)?;
1297 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1298 let track_name = read_bytes(buf, track_name_len)?;
1299 let start_group = VarInt::decode(buf)?;
1300 let start_object = VarInt::decode(buf)?;
1301 let end_group = VarInt::decode(buf)?;
1302 let end_object = VarInt::decode(buf)?;
1303 FetchPayload::Standalone {
1304 track_namespace,
1305 track_name,
1306 start_group,
1307 start_object,
1308 end_group,
1309 end_object,
1310 }
1311 }
1312 FetchType::RelativeJoining | FetchType::AbsoluteJoining => {
1313 let joining_request_id = VarInt::decode(buf)?;
1314 let joining_start = VarInt::decode(buf)?;
1315 FetchPayload::Joining { joining_request_id, joining_start }
1316 }
1317 };
1318 let parameters = KeyValuePair::decode_list(buf)?;
1319 Ok(ControlMessage::Fetch(Fetch {
1320 request_id,
1321 subscriber_priority,
1322 group_order,
1323 fetch_type,
1324 fetch_payload,
1325 parameters,
1326 }))
1327 }
1328 MessageType::FetchOk => {
1329 let request_id = VarInt::decode(buf)?;
1330 if buf.remaining() < 1 {
1331 return Err(CodecError::UnexpectedEnd);
1332 }
1333 let group_order =
1334 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1335 let end_of_track = VarInt::decode(buf)?;
1336 let end_location = Location::decode(buf)?;
1337 let parameters = KeyValuePair::decode_list(buf)?;
1338 Ok(ControlMessage::FetchOk(FetchOk {
1339 request_id,
1340 group_order,
1341 end_of_track,
1342 end_location,
1343 parameters,
1344 }))
1345 }
1346 MessageType::FetchError => {
1347 let request_id = VarInt::decode(buf)?;
1348 let error_code = VarInt::decode(buf)?;
1349 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1350 let reason_phrase = read_bytes(buf, reason_len)?;
1351 Ok(ControlMessage::FetchError(FetchError { request_id, error_code, reason_phrase }))
1352 }
1353 MessageType::FetchCancel => {
1354 let request_id = VarInt::decode(buf)?;
1355 Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
1356 }
1357 MessageType::TrackStatus => {
1358 let request_id = VarInt::decode(buf)?;
1359 let track_namespace = TrackNamespace::decode(buf)?;
1360 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1361 let track_name = read_bytes(buf, track_name_len)?;
1362 if buf.remaining() < 4 {
1363 return Err(CodecError::UnexpectedEnd);
1364 }
1365 let subscriber_priority = buf.get_u8();
1366 let group_order =
1367 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1368 let forward_val = buf.get_u8();
1369 let forward = match forward_val {
1370 0 => Forward::DontForward,
1371 1 => Forward::Forward,
1372 _ => return Err(CodecError::InvalidField),
1373 };
1374 let filter_val = buf.get_u8();
1375 let filter_type =
1376 FilterType::from_u8(filter_val).ok_or(CodecError::InvalidField)?;
1377 let start_location = match filter_type {
1378 FilterType::AbsoluteStart | FilterType::AbsoluteRange => {
1379 Some(Location::decode(buf)?)
1380 }
1381 _ => None,
1382 };
1383 let end_group = match filter_type {
1384 FilterType::AbsoluteRange => Some(VarInt::decode(buf)?),
1385 _ => None,
1386 };
1387 let parameters = KeyValuePair::decode_list(buf)?;
1388 Ok(ControlMessage::TrackStatus(TrackStatus {
1389 request_id,
1390 track_namespace,
1391 track_name,
1392 subscriber_priority,
1393 group_order,
1394 forward,
1395 filter_type,
1396 start_location,
1397 end_group,
1398 parameters,
1399 }))
1400 }
1401 MessageType::TrackStatusOk => {
1402 let request_id = VarInt::decode(buf)?;
1403 let track_alias = VarInt::decode(buf)?;
1404 let expires = VarInt::decode(buf)?;
1405 if buf.remaining() < 2 {
1406 return Err(CodecError::UnexpectedEnd);
1407 }
1408 let group_order =
1409 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1410 let content_exists_val = buf.get_u8();
1411 let content_exists = match content_exists_val {
1412 0 => ContentExists::NoLargestLocation,
1413 1 => ContentExists::HasLargestLocation,
1414 _ => return Err(CodecError::InvalidField),
1415 };
1416 let largest_location = if content_exists == ContentExists::HasLargestLocation {
1417 Some(Location::decode(buf)?)
1418 } else {
1419 None
1420 };
1421 let parameters = KeyValuePair::decode_list(buf)?;
1422 Ok(ControlMessage::TrackStatusOk(TrackStatusOk {
1423 request_id,
1424 track_alias,
1425 expires,
1426 group_order,
1427 content_exists,
1428 largest_location,
1429 parameters,
1430 }))
1431 }
1432 MessageType::TrackStatusError => {
1433 let request_id = VarInt::decode(buf)?;
1434 let error_code = VarInt::decode(buf)?;
1435 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1436 let reason_phrase = read_bytes(buf, reason_len)?;
1437 Ok(ControlMessage::TrackStatusError(TrackStatusError {
1438 request_id,
1439 error_code,
1440 reason_phrase,
1441 }))
1442 }
1443 }
1444 }
1445
1446 pub fn message_type(&self) -> MessageType {
1448 match self {
1449 ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
1450 ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
1451 ControlMessage::GoAway(_) => MessageType::GoAway,
1452 ControlMessage::MaxRequestId(_) => MessageType::MaxRequestId,
1453 ControlMessage::RequestsBlocked(_) => MessageType::RequestsBlocked,
1454 ControlMessage::Subscribe(_) => MessageType::Subscribe,
1455 ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
1456 ControlMessage::SubscribeError(_) => MessageType::SubscribeError,
1457 ControlMessage::SubscribeUpdate(_) => MessageType::SubscribeUpdate,
1458 ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
1459 ControlMessage::Publish(_) => MessageType::Publish,
1460 ControlMessage::PublishOk(_) => MessageType::PublishOk,
1461 ControlMessage::PublishError(_) => MessageType::PublishError,
1462 ControlMessage::PublishDone(_) => MessageType::PublishDone,
1463 ControlMessage::PublishNamespace(_) => MessageType::PublishNamespace,
1464 ControlMessage::PublishNamespaceOk(_) => MessageType::PublishNamespaceOk,
1465 ControlMessage::PublishNamespaceError(_) => MessageType::PublishNamespaceError,
1466 ControlMessage::PublishNamespaceDone(_) => MessageType::PublishNamespaceDone,
1467 ControlMessage::PublishNamespaceCancel(_) => MessageType::PublishNamespaceCancel,
1468 ControlMessage::SubscribeNamespace(_) => MessageType::SubscribeNamespace,
1469 ControlMessage::SubscribeNamespaceOk(_) => MessageType::SubscribeNamespaceOk,
1470 ControlMessage::SubscribeNamespaceError(_) => MessageType::SubscribeNamespaceError,
1471 ControlMessage::UnsubscribeNamespace(_) => MessageType::UnsubscribeNamespace,
1472 ControlMessage::Fetch(_) => MessageType::Fetch,
1473 ControlMessage::FetchOk(_) => MessageType::FetchOk,
1474 ControlMessage::FetchError(_) => MessageType::FetchError,
1475 ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
1476 ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
1477 ControlMessage::TrackStatusOk(_) => MessageType::TrackStatusOk,
1478 ControlMessage::TrackStatusError(_) => MessageType::TrackStatusError,
1479 }
1480 }
1481}