1use crate::error::{
2 CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_REASON_PHRASE_LENGTH,
3};
4use crate::kvp::KeyValuePair;
5use crate::types::read_bytes;
6use crate::types::*;
7use crate::varint::VarInt;
8use bytes::{Buf, BufMut};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12#[repr(u64)]
13pub enum MessageType {
14 SubscribeUpdate = 0x02,
16 Subscribe = 0x03,
18 SubscribeOk = 0x04,
20 SubscribeError = 0x05,
22 Announce = 0x06,
24 AnnounceOk = 0x07,
26 AnnounceError = 0x08,
28 Unannounce = 0x09,
30 Unsubscribe = 0x0A,
32 SubscribeDone = 0x0B,
34 AnnounceCancel = 0x0C,
36 TrackStatusRequest = 0x0D,
38 TrackStatus = 0x0E,
40 GoAway = 0x10,
42 SubscribeAnnounces = 0x11,
44 SubscribeAnnouncesOk = 0x12,
46 SubscribeAnnouncesError = 0x13,
48 UnsubscribeAnnounces = 0x14,
50 MaxSubscribeId = 0x15,
52 Fetch = 0x16,
54 FetchCancel = 0x17,
56 FetchOk = 0x18,
58 FetchError = 0x19,
60 ClientSetup = 0x40,
62 ServerSetup = 0x41,
64}
65
66impl MessageType {
67 pub fn from_id(id: u64) -> Option<Self> {
69 match id {
70 0x02 => Some(MessageType::SubscribeUpdate),
71 0x03 => Some(MessageType::Subscribe),
72 0x04 => Some(MessageType::SubscribeOk),
73 0x05 => Some(MessageType::SubscribeError),
74 0x06 => Some(MessageType::Announce),
75 0x07 => Some(MessageType::AnnounceOk),
76 0x08 => Some(MessageType::AnnounceError),
77 0x09 => Some(MessageType::Unannounce),
78 0x0A => Some(MessageType::Unsubscribe),
79 0x0B => Some(MessageType::SubscribeDone),
80 0x0C => Some(MessageType::AnnounceCancel),
81 0x0D => Some(MessageType::TrackStatusRequest),
82 0x0E => Some(MessageType::TrackStatus),
83 0x10 => Some(MessageType::GoAway),
84 0x11 => Some(MessageType::SubscribeAnnounces),
85 0x12 => Some(MessageType::SubscribeAnnouncesOk),
86 0x13 => Some(MessageType::SubscribeAnnouncesError),
87 0x14 => Some(MessageType::UnsubscribeAnnounces),
88 0x15 => Some(MessageType::MaxSubscribeId),
89 0x16 => Some(MessageType::Fetch),
90 0x17 => Some(MessageType::FetchCancel),
91 0x18 => Some(MessageType::FetchOk),
92 0x19 => Some(MessageType::FetchError),
93 0x40 => Some(MessageType::ClientSetup),
94 0x41 => Some(MessageType::ServerSetup),
95 _ => None,
96 }
97 }
98
99 pub fn id(&self) -> u64 {
101 *self as u64
102 }
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
111pub struct ClientSetup {
112 pub supported_versions: Vec<VarInt>,
114 pub parameters: Vec<KeyValuePair>,
116}
117
118#[derive(Debug, Clone, PartialEq, Eq)]
120pub struct ServerSetup {
121 pub selected_version: VarInt,
123 pub parameters: Vec<KeyValuePair>,
125}
126
127#[derive(Debug, Clone, PartialEq, Eq)]
129pub struct GoAway {
130 pub new_session_uri: Vec<u8>,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq)]
136pub struct MaxSubscribeId {
137 pub subscribe_id: VarInt,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct Subscribe {
148 pub subscribe_id: VarInt,
150 pub track_alias: VarInt,
152 pub track_namespace: TrackNamespace,
154 pub track_name: Vec<u8>,
156 pub subscriber_priority: u8,
158 pub group_order: GroupOrder,
160 pub filter_type: FilterType,
162 pub start_location: Option<Location>,
164 pub end_group: Option<VarInt>,
166 pub end_object: Option<VarInt>,
168 pub parameters: Vec<KeyValuePair>,
170}
171
172#[derive(Debug, Clone, PartialEq, Eq)]
174pub struct SubscribeOk {
175 pub subscribe_id: VarInt,
177 pub expires: VarInt,
179 pub group_order: GroupOrder,
181 pub content_exists: ContentExists,
183 pub largest_group_id: Option<VarInt>,
185 pub largest_object_id: Option<VarInt>,
187 pub parameters: Vec<KeyValuePair>,
189}
190
191#[derive(Debug, Clone, PartialEq, Eq)]
193pub struct SubscribeError {
194 pub subscribe_id: VarInt,
196 pub error_code: VarInt,
198 pub reason_phrase: Vec<u8>,
200 pub track_alias: VarInt,
202}
203
204#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct SubscribeUpdate {
207 pub subscribe_id: VarInt,
209 pub start_group: VarInt,
211 pub start_object: VarInt,
213 pub end_group: VarInt,
215 pub end_object: VarInt,
217 pub subscriber_priority: u8,
219 pub parameters: Vec<KeyValuePair>,
221}
222
223#[derive(Debug, Clone, PartialEq, Eq)]
225pub struct SubscribeDone {
226 pub subscribe_id: VarInt,
228 pub status_code: VarInt,
230 pub reason_phrase: Vec<u8>,
232 pub content_exists: ContentExists,
234 pub final_group: Option<VarInt>,
236 pub final_object: Option<VarInt>,
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
242pub struct Unsubscribe {
243 pub subscribe_id: VarInt,
245}
246
247#[derive(Debug, Clone, PartialEq, Eq)]
253pub struct Announce {
254 pub track_namespace: TrackNamespace,
256 pub parameters: Vec<KeyValuePair>,
258}
259
260#[derive(Debug, Clone, PartialEq, Eq)]
262pub struct AnnounceOk {
263 pub track_namespace: TrackNamespace,
265}
266
267#[derive(Debug, Clone, PartialEq, Eq)]
269pub struct AnnounceError {
270 pub track_namespace: TrackNamespace,
272 pub error_code: VarInt,
274 pub reason_phrase: Vec<u8>,
276}
277
278#[derive(Debug, Clone, PartialEq, Eq)]
280pub struct AnnounceCancel {
281 pub track_namespace: TrackNamespace,
283 pub error_code: VarInt,
285 pub reason_phrase: Vec<u8>,
287}
288
289#[derive(Debug, Clone, PartialEq, Eq)]
291pub struct Unannounce {
292 pub track_namespace: TrackNamespace,
294}
295
296#[derive(Debug, Clone, PartialEq, Eq)]
302pub struct SubscribeAnnounces {
303 pub track_namespace_prefix: TrackNamespace,
305 pub parameters: Vec<KeyValuePair>,
307}
308
309#[derive(Debug, Clone, PartialEq, Eq)]
311pub struct SubscribeAnnouncesOk {
312 pub track_namespace_prefix: TrackNamespace,
314}
315
316#[derive(Debug, Clone, PartialEq, Eq)]
318pub struct SubscribeAnnouncesError {
319 pub track_namespace_prefix: TrackNamespace,
321 pub error_code: VarInt,
323 pub reason_phrase: Vec<u8>,
325}
326
327#[derive(Debug, Clone, PartialEq, Eq)]
329pub struct UnsubscribeAnnounces {
330 pub track_namespace_prefix: TrackNamespace,
332}
333
334#[derive(Debug, Clone, PartialEq, Eq)]
340pub struct TrackStatusRequest {
341 pub track_namespace: TrackNamespace,
343 pub track_name: Vec<u8>,
345}
346
347#[derive(Debug, Clone, PartialEq, Eq)]
349pub struct TrackStatus {
350 pub track_namespace: TrackNamespace,
352 pub track_name: Vec<u8>,
354 pub status_code: VarInt,
356 pub last_group_id: VarInt,
358 pub last_object_id: VarInt,
360}
361
362#[derive(Debug, Clone, PartialEq, Eq)]
368pub struct Fetch {
369 pub subscribe_id: VarInt,
371 pub track_namespace: TrackNamespace,
373 pub track_name: Vec<u8>,
375 pub subscriber_priority: u8,
377 pub group_order: GroupOrder,
379 pub start_group: VarInt,
381 pub start_object: VarInt,
383 pub end_group: VarInt,
385 pub end_object: VarInt,
387 pub parameters: Vec<KeyValuePair>,
389}
390
391#[derive(Debug, Clone, PartialEq, Eq)]
393pub struct FetchOk {
394 pub subscribe_id: VarInt,
396 pub group_order: GroupOrder,
398 pub end_of_track: u8,
400 pub largest_group_id: Option<VarInt>,
402 pub largest_object_id: Option<VarInt>,
404 pub parameters: Vec<KeyValuePair>,
406}
407
408#[derive(Debug, Clone, PartialEq, Eq)]
410pub struct FetchError {
411 pub subscribe_id: VarInt,
413 pub error_code: VarInt,
415 pub reason_phrase: Vec<u8>,
417}
418
419#[derive(Debug, Clone, PartialEq, Eq)]
421pub struct FetchCancel {
422 pub subscribe_id: VarInt,
424}
425
426#[derive(Debug, Clone, PartialEq, Eq)]
432pub enum ControlMessage {
433 ClientSetup(ClientSetup),
435 ServerSetup(ServerSetup),
437 GoAway(GoAway),
439 MaxSubscribeId(MaxSubscribeId),
441 Subscribe(Subscribe),
443 SubscribeOk(SubscribeOk),
445 SubscribeError(SubscribeError),
447 SubscribeUpdate(SubscribeUpdate),
449 SubscribeDone(SubscribeDone),
451 Unsubscribe(Unsubscribe),
453 Announce(Announce),
455 AnnounceOk(AnnounceOk),
457 AnnounceError(AnnounceError),
459 AnnounceCancel(AnnounceCancel),
461 Unannounce(Unannounce),
463 SubscribeAnnounces(SubscribeAnnounces),
465 SubscribeAnnouncesOk(SubscribeAnnouncesOk),
467 SubscribeAnnouncesError(SubscribeAnnouncesError),
469 UnsubscribeAnnounces(UnsubscribeAnnounces),
471 TrackStatusRequest(TrackStatusRequest),
473 TrackStatus(TrackStatus),
475 Fetch(Fetch),
477 FetchOk(FetchOk),
479 FetchError(FetchError),
481 FetchCancel(FetchCancel),
483}
484
485impl ControlMessage {
486 pub fn message_type(&self) -> MessageType {
488 match self {
489 ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
490 ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
491 ControlMessage::GoAway(_) => MessageType::GoAway,
492 ControlMessage::MaxSubscribeId(_) => MessageType::MaxSubscribeId,
493 ControlMessage::Subscribe(_) => MessageType::Subscribe,
494 ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
495 ControlMessage::SubscribeError(_) => MessageType::SubscribeError,
496 ControlMessage::SubscribeUpdate(_) => MessageType::SubscribeUpdate,
497 ControlMessage::SubscribeDone(_) => MessageType::SubscribeDone,
498 ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
499 ControlMessage::Announce(_) => MessageType::Announce,
500 ControlMessage::AnnounceOk(_) => MessageType::AnnounceOk,
501 ControlMessage::AnnounceError(_) => MessageType::AnnounceError,
502 ControlMessage::AnnounceCancel(_) => MessageType::AnnounceCancel,
503 ControlMessage::Unannounce(_) => MessageType::Unannounce,
504 ControlMessage::SubscribeAnnounces(_) => MessageType::SubscribeAnnounces,
505 ControlMessage::SubscribeAnnouncesOk(_) => MessageType::SubscribeAnnouncesOk,
506 ControlMessage::SubscribeAnnouncesError(_) => MessageType::SubscribeAnnouncesError,
507 ControlMessage::UnsubscribeAnnounces(_) => MessageType::UnsubscribeAnnounces,
508 ControlMessage::TrackStatusRequest(_) => MessageType::TrackStatusRequest,
509 ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
510 ControlMessage::Fetch(_) => MessageType::Fetch,
511 ControlMessage::FetchOk(_) => MessageType::FetchOk,
512 ControlMessage::FetchError(_) => MessageType::FetchError,
513 ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
514 }
515 }
516
517 pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
519 let mut payload = Vec::with_capacity(256);
520 self.encode_payload(&mut payload)?;
521
522 if payload.len() > MAX_MESSAGE_LENGTH {
523 return Err(CodecError::MessageTooLong(payload.len()));
524 }
525
526 VarInt::from_usize(self.message_type().id() as usize).encode(buf);
527 VarInt::from_usize(payload.len()).encode(buf);
528 buf.put_slice(&payload);
529 Ok(())
530 }
531
532 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
534 let type_id = VarInt::decode(buf)?.into_inner();
535 let msg_type =
536 MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
537 let payload_len = VarInt::decode(buf)?.into_inner() as usize;
538 if buf.remaining() < payload_len {
539 return Err(CodecError::UnexpectedEnd);
540 }
541 let payload_bytes = buf.copy_to_bytes(payload_len);
542 let mut payload = &payload_bytes[..];
543 Self::decode_payload(msg_type, &mut payload)
544 }
545
546 fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
547 match self {
548 ControlMessage::ClientSetup(m) => {
549 VarInt::from_usize(m.supported_versions.len()).encode(buf);
550 for v in &m.supported_versions {
551 v.encode(buf);
552 }
553 KeyValuePair::encode_list_d07(&m.parameters, buf);
554 }
555 ControlMessage::ServerSetup(m) => {
556 m.selected_version.encode(buf);
557 KeyValuePair::encode_list_d07(&m.parameters, buf);
558 }
559 ControlMessage::GoAway(m) => {
560 if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
561 return Err(CodecError::GoAwayUriTooLong);
562 }
563 VarInt::from_usize(m.new_session_uri.len()).encode(buf);
564 buf.put_slice(&m.new_session_uri);
565 }
566 ControlMessage::MaxSubscribeId(m) => {
567 m.subscribe_id.encode(buf);
568 }
569 ControlMessage::Subscribe(m) => {
570 m.subscribe_id.encode(buf);
571 m.track_alias.encode(buf);
572 m.track_namespace.encode(buf);
573 VarInt::from_usize(m.track_name.len()).encode(buf);
574 buf.put_slice(&m.track_name);
575 buf.put_u8(m.subscriber_priority);
576 buf.put_u8(m.group_order as u8);
577 buf.put_u8(m.filter_type as u8);
578 if let Some(loc) = &m.start_location {
579 loc.encode(buf);
580 }
581 if let Some(eg) = &m.end_group {
582 eg.encode(buf);
583 }
584 if let Some(eo) = &m.end_object {
585 eo.encode(buf);
586 }
587 KeyValuePair::encode_list_d07(&m.parameters, buf);
588 }
589 ControlMessage::SubscribeOk(m) => {
590 m.subscribe_id.encode(buf);
591 m.expires.encode(buf);
592 buf.put_u8(m.group_order as u8);
593 buf.put_u8(m.content_exists as u8);
594 if let Some(gid) = &m.largest_group_id {
595 gid.encode(buf);
596 }
597 if let Some(oid) = &m.largest_object_id {
598 oid.encode(buf);
599 }
600 KeyValuePair::encode_list_d07(&m.parameters, buf);
601 }
602 ControlMessage::SubscribeError(m) => {
603 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
604 return Err(CodecError::ReasonPhraseTooLong);
605 }
606 m.subscribe_id.encode(buf);
607 m.error_code.encode(buf);
608 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
609 buf.put_slice(&m.reason_phrase);
610 m.track_alias.encode(buf);
611 }
612 ControlMessage::SubscribeUpdate(m) => {
613 m.subscribe_id.encode(buf);
614 m.start_group.encode(buf);
615 m.start_object.encode(buf);
616 m.end_group.encode(buf);
617 m.end_object.encode(buf);
618 buf.put_u8(m.subscriber_priority);
619 KeyValuePair::encode_list_d07(&m.parameters, buf);
620 }
621 ControlMessage::SubscribeDone(m) => {
622 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
623 return Err(CodecError::ReasonPhraseTooLong);
624 }
625 m.subscribe_id.encode(buf);
626 m.status_code.encode(buf);
627 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
628 buf.put_slice(&m.reason_phrase);
629 buf.put_u8(m.content_exists as u8);
630 if let Some(fg) = &m.final_group {
631 fg.encode(buf);
632 }
633 if let Some(fo) = &m.final_object {
634 fo.encode(buf);
635 }
636 }
637 ControlMessage::Unsubscribe(m) => {
638 m.subscribe_id.encode(buf);
639 }
640 ControlMessage::Announce(m) => {
641 m.track_namespace.encode(buf);
642 KeyValuePair::encode_list_d07(&m.parameters, buf);
643 }
644 ControlMessage::AnnounceOk(m) => {
645 m.track_namespace.encode(buf);
646 }
647 ControlMessage::AnnounceError(m) => {
648 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
649 return Err(CodecError::ReasonPhraseTooLong);
650 }
651 m.track_namespace.encode(buf);
652 m.error_code.encode(buf);
653 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
654 buf.put_slice(&m.reason_phrase);
655 }
656 ControlMessage::AnnounceCancel(m) => {
657 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
658 return Err(CodecError::ReasonPhraseTooLong);
659 }
660 m.track_namespace.encode(buf);
661 m.error_code.encode(buf);
662 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
663 buf.put_slice(&m.reason_phrase);
664 }
665 ControlMessage::Unannounce(m) => {
666 m.track_namespace.encode(buf);
667 }
668 ControlMessage::SubscribeAnnounces(m) => {
669 m.track_namespace_prefix.encode(buf);
670 KeyValuePair::encode_list_d07(&m.parameters, buf);
671 }
672 ControlMessage::SubscribeAnnouncesOk(m) => {
673 m.track_namespace_prefix.encode(buf);
674 }
675 ControlMessage::SubscribeAnnouncesError(m) => {
676 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
677 return Err(CodecError::ReasonPhraseTooLong);
678 }
679 m.track_namespace_prefix.encode(buf);
680 m.error_code.encode(buf);
681 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
682 buf.put_slice(&m.reason_phrase);
683 }
684 ControlMessage::UnsubscribeAnnounces(m) => {
685 m.track_namespace_prefix.encode(buf);
686 }
687 ControlMessage::TrackStatusRequest(m) => {
688 m.track_namespace.encode(buf);
689 VarInt::from_usize(m.track_name.len()).encode(buf);
690 buf.put_slice(&m.track_name);
691 }
692 ControlMessage::TrackStatus(m) => {
693 m.track_namespace.encode(buf);
694 VarInt::from_usize(m.track_name.len()).encode(buf);
695 buf.put_slice(&m.track_name);
696 m.status_code.encode(buf);
697 m.last_group_id.encode(buf);
698 m.last_object_id.encode(buf);
699 }
700 ControlMessage::Fetch(m) => {
701 m.subscribe_id.encode(buf);
702 m.track_namespace.encode(buf);
703 VarInt::from_usize(m.track_name.len()).encode(buf);
704 buf.put_slice(&m.track_name);
705 buf.put_u8(m.subscriber_priority);
706 buf.put_u8(m.group_order as u8);
707 m.start_group.encode(buf);
708 m.start_object.encode(buf);
709 m.end_group.encode(buf);
710 m.end_object.encode(buf);
711 KeyValuePair::encode_list_d07(&m.parameters, buf);
712 }
713 ControlMessage::FetchOk(m) => {
714 m.subscribe_id.encode(buf);
715 buf.put_u8(m.group_order as u8);
716 buf.put_u8(m.end_of_track);
717 if let Some(gid) = &m.largest_group_id {
718 gid.encode(buf);
719 }
720 if let Some(oid) = &m.largest_object_id {
721 oid.encode(buf);
722 }
723 KeyValuePair::encode_list_d07(&m.parameters, buf);
724 }
725 ControlMessage::FetchError(m) => {
726 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
727 return Err(CodecError::ReasonPhraseTooLong);
728 }
729 m.subscribe_id.encode(buf);
730 m.error_code.encode(buf);
731 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
732 buf.put_slice(&m.reason_phrase);
733 }
734 ControlMessage::FetchCancel(m) => {
735 m.subscribe_id.encode(buf);
736 }
737 }
738 Ok(())
739 }
740
741 fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
742 match msg_type {
743 MessageType::ClientSetup => {
744 let num_versions = VarInt::decode(buf)?.into_inner() as usize;
745 if num_versions == 0 {
746 return Err(CodecError::InvalidField);
747 }
748 let mut supported_versions = Vec::with_capacity(num_versions);
749 for _ in 0..num_versions {
750 supported_versions.push(VarInt::decode(buf)?);
751 }
752 let parameters = KeyValuePair::decode_list_d07(buf)?;
753 Ok(ControlMessage::ClientSetup(ClientSetup { supported_versions, parameters }))
754 }
755 MessageType::ServerSetup => {
756 let selected_version = VarInt::decode(buf)?;
757 let parameters = KeyValuePair::decode_list_d07(buf)?;
758 Ok(ControlMessage::ServerSetup(ServerSetup { selected_version, parameters }))
759 }
760 MessageType::GoAway => {
761 let uri_len = VarInt::decode(buf)?.into_inner() as usize;
762 let uri = read_bytes(buf, uri_len)?;
763 Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
764 }
765 MessageType::MaxSubscribeId => {
766 let subscribe_id = VarInt::decode(buf)?;
767 Ok(ControlMessage::MaxSubscribeId(MaxSubscribeId { subscribe_id }))
768 }
769 MessageType::Subscribe => {
770 let subscribe_id = VarInt::decode(buf)?;
771 let track_alias = VarInt::decode(buf)?;
772 let track_namespace = TrackNamespace::decode(buf)?;
773 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
774 let track_name = read_bytes(buf, track_name_len)?;
775 if buf.remaining() < 3 {
776 return Err(CodecError::UnexpectedEnd);
777 }
778 let subscriber_priority = buf.get_u8();
779 let group_order =
780 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
781 let filter_val = buf.get_u8();
782 let filter_type =
783 FilterType::from_u8(filter_val).ok_or(CodecError::InvalidField)?;
784 let start_location = match filter_type {
785 FilterType::AbsoluteStart | FilterType::AbsoluteRange => {
786 Some(Location::decode(buf)?)
787 }
788 _ => None,
789 };
790 let (end_group, end_object) = match filter_type {
791 FilterType::AbsoluteRange => {
792 let eg = VarInt::decode(buf)?;
793 let eo = VarInt::decode(buf)?;
794 (Some(eg), Some(eo))
795 }
796 _ => (None, None),
797 };
798 let parameters = KeyValuePair::decode_list_d07(buf)?;
799 Ok(ControlMessage::Subscribe(Subscribe {
800 subscribe_id,
801 track_alias,
802 track_namespace,
803 track_name,
804 subscriber_priority,
805 group_order,
806 filter_type,
807 start_location,
808 end_group,
809 end_object,
810 parameters,
811 }))
812 }
813 MessageType::SubscribeOk => {
814 let subscribe_id = VarInt::decode(buf)?;
815 let expires = VarInt::decode(buf)?;
816 if buf.remaining() < 2 {
817 return Err(CodecError::UnexpectedEnd);
818 }
819 let group_order =
820 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
821 let content_exists_val = buf.get_u8();
822 let content_exists = match content_exists_val {
823 0 => ContentExists::NoLargestLocation,
824 1 => ContentExists::HasLargestLocation,
825 _ => return Err(CodecError::InvalidField),
826 };
827 let (largest_group_id, largest_object_id) =
828 if content_exists == ContentExists::HasLargestLocation {
829 let gid = VarInt::decode(buf)?;
830 let oid = VarInt::decode(buf)?;
831 (Some(gid), Some(oid))
832 } else {
833 (None, None)
834 };
835 let parameters = KeyValuePair::decode_list_d07(buf)?;
836 Ok(ControlMessage::SubscribeOk(SubscribeOk {
837 subscribe_id,
838 expires,
839 group_order,
840 content_exists,
841 largest_group_id,
842 largest_object_id,
843 parameters,
844 }))
845 }
846 MessageType::SubscribeError => {
847 let subscribe_id = VarInt::decode(buf)?;
848 let error_code = VarInt::decode(buf)?;
849 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
850 let reason_phrase = read_bytes(buf, reason_len)?;
851 let track_alias = VarInt::decode(buf)?;
852 Ok(ControlMessage::SubscribeError(SubscribeError {
853 subscribe_id,
854 error_code,
855 reason_phrase,
856 track_alias,
857 }))
858 }
859 MessageType::SubscribeUpdate => {
860 let subscribe_id = VarInt::decode(buf)?;
861 let start_group = VarInt::decode(buf)?;
862 let start_object = VarInt::decode(buf)?;
863 let end_group = VarInt::decode(buf)?;
864 let end_object = VarInt::decode(buf)?;
865 if buf.remaining() < 1 {
866 return Err(CodecError::UnexpectedEnd);
867 }
868 let subscriber_priority = buf.get_u8();
869 let parameters = KeyValuePair::decode_list_d07(buf)?;
870 Ok(ControlMessage::SubscribeUpdate(SubscribeUpdate {
871 subscribe_id,
872 start_group,
873 start_object,
874 end_group,
875 end_object,
876 subscriber_priority,
877 parameters,
878 }))
879 }
880 MessageType::SubscribeDone => {
881 let subscribe_id = VarInt::decode(buf)?;
882 let status_code = VarInt::decode(buf)?;
883 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
884 let reason_phrase = read_bytes(buf, reason_len)?;
885 if buf.remaining() < 1 {
886 return Err(CodecError::UnexpectedEnd);
887 }
888 let content_exists_val = buf.get_u8();
889 let content_exists = match content_exists_val {
890 0 => ContentExists::NoLargestLocation,
891 1 => ContentExists::HasLargestLocation,
892 _ => return Err(CodecError::InvalidField),
893 };
894 let (final_group, final_object) =
895 if content_exists == ContentExists::HasLargestLocation {
896 let fg = VarInt::decode(buf)?;
897 let fo = VarInt::decode(buf)?;
898 (Some(fg), Some(fo))
899 } else {
900 (None, None)
901 };
902 Ok(ControlMessage::SubscribeDone(SubscribeDone {
903 subscribe_id,
904 status_code,
905 reason_phrase,
906 content_exists,
907 final_group,
908 final_object,
909 }))
910 }
911 MessageType::Unsubscribe => {
912 let subscribe_id = VarInt::decode(buf)?;
913 Ok(ControlMessage::Unsubscribe(Unsubscribe { subscribe_id }))
914 }
915 MessageType::Announce => {
916 let track_namespace = TrackNamespace::decode(buf)?;
917 let parameters = KeyValuePair::decode_list_d07(buf)?;
918 Ok(ControlMessage::Announce(Announce { track_namespace, parameters }))
919 }
920 MessageType::AnnounceOk => {
921 let track_namespace = TrackNamespace::decode(buf)?;
922 Ok(ControlMessage::AnnounceOk(AnnounceOk { track_namespace }))
923 }
924 MessageType::AnnounceError => {
925 let track_namespace = TrackNamespace::decode(buf)?;
926 let error_code = VarInt::decode(buf)?;
927 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
928 let reason_phrase = read_bytes(buf, reason_len)?;
929 Ok(ControlMessage::AnnounceError(AnnounceError {
930 track_namespace,
931 error_code,
932 reason_phrase,
933 }))
934 }
935 MessageType::AnnounceCancel => {
936 let track_namespace = TrackNamespace::decode(buf)?;
937 let error_code = VarInt::decode(buf)?;
938 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
939 let reason_phrase = read_bytes(buf, reason_len)?;
940 Ok(ControlMessage::AnnounceCancel(AnnounceCancel {
941 track_namespace,
942 error_code,
943 reason_phrase,
944 }))
945 }
946 MessageType::Unannounce => {
947 let track_namespace = TrackNamespace::decode(buf)?;
948 Ok(ControlMessage::Unannounce(Unannounce { track_namespace }))
949 }
950 MessageType::SubscribeAnnounces => {
951 let track_namespace_prefix = TrackNamespace::decode(buf)?;
952 let parameters = KeyValuePair::decode_list_d07(buf)?;
953 Ok(ControlMessage::SubscribeAnnounces(SubscribeAnnounces {
954 track_namespace_prefix,
955 parameters,
956 }))
957 }
958 MessageType::SubscribeAnnouncesOk => {
959 let track_namespace_prefix = TrackNamespace::decode(buf)?;
960 Ok(ControlMessage::SubscribeAnnouncesOk(SubscribeAnnouncesOk {
961 track_namespace_prefix,
962 }))
963 }
964 MessageType::SubscribeAnnouncesError => {
965 let track_namespace_prefix = TrackNamespace::decode(buf)?;
966 let error_code = VarInt::decode(buf)?;
967 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
968 let reason_phrase = read_bytes(buf, reason_len)?;
969 Ok(ControlMessage::SubscribeAnnouncesError(SubscribeAnnouncesError {
970 track_namespace_prefix,
971 error_code,
972 reason_phrase,
973 }))
974 }
975 MessageType::UnsubscribeAnnounces => {
976 let track_namespace_prefix = TrackNamespace::decode(buf)?;
977 Ok(ControlMessage::UnsubscribeAnnounces(UnsubscribeAnnounces {
978 track_namespace_prefix,
979 }))
980 }
981 MessageType::TrackStatusRequest => {
982 let track_namespace = TrackNamespace::decode(buf)?;
983 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
984 let track_name = read_bytes(buf, track_name_len)?;
985 Ok(ControlMessage::TrackStatusRequest(TrackStatusRequest {
986 track_namespace,
987 track_name,
988 }))
989 }
990 MessageType::TrackStatus => {
991 let track_namespace = TrackNamespace::decode(buf)?;
992 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
993 let track_name = read_bytes(buf, track_name_len)?;
994 let status_code = VarInt::decode(buf)?;
995 let last_group_id = VarInt::decode(buf)?;
996 let last_object_id = VarInt::decode(buf)?;
997 Ok(ControlMessage::TrackStatus(TrackStatus {
998 track_namespace,
999 track_name,
1000 status_code,
1001 last_group_id,
1002 last_object_id,
1003 }))
1004 }
1005 MessageType::Fetch => {
1006 let subscribe_id = VarInt::decode(buf)?;
1007 let track_namespace = TrackNamespace::decode(buf)?;
1008 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1009 let track_name = read_bytes(buf, track_name_len)?;
1010 if buf.remaining() < 2 {
1011 return Err(CodecError::UnexpectedEnd);
1012 }
1013 let subscriber_priority = buf.get_u8();
1014 let group_order =
1015 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1016 let start_group = VarInt::decode(buf)?;
1017 let start_object = VarInt::decode(buf)?;
1018 let end_group = VarInt::decode(buf)?;
1019 let end_object = VarInt::decode(buf)?;
1020 let parameters = KeyValuePair::decode_list_d07(buf)?;
1021 Ok(ControlMessage::Fetch(Fetch {
1022 subscribe_id,
1023 track_namespace,
1024 track_name,
1025 subscriber_priority,
1026 group_order,
1027 start_group,
1028 start_object,
1029 end_group,
1030 end_object,
1031 parameters,
1032 }))
1033 }
1034 MessageType::FetchOk => {
1035 let subscribe_id = VarInt::decode(buf)?;
1036 if buf.remaining() < 2 {
1037 return Err(CodecError::UnexpectedEnd);
1038 }
1039 let group_order =
1040 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1041 let end_of_track = buf.get_u8();
1042 let largest_group_id = Some(VarInt::decode(buf)?);
1043 let largest_object_id = Some(VarInt::decode(buf)?);
1044 let parameters = KeyValuePair::decode_list_d07(buf)?;
1045 Ok(ControlMessage::FetchOk(FetchOk {
1046 subscribe_id,
1047 group_order,
1048 end_of_track,
1049 largest_group_id,
1050 largest_object_id,
1051 parameters,
1052 }))
1053 }
1054 MessageType::FetchError => {
1055 let subscribe_id = VarInt::decode(buf)?;
1056 let error_code = VarInt::decode(buf)?;
1057 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1058 let reason_phrase = read_bytes(buf, reason_len)?;
1059 Ok(ControlMessage::FetchError(FetchError {
1060 subscribe_id,
1061 error_code,
1062 reason_phrase,
1063 }))
1064 }
1065 MessageType::FetchCancel => {
1066 let subscribe_id = VarInt::decode(buf)?;
1067 Ok(ControlMessage::FetchCancel(FetchCancel { subscribe_id }))
1068 }
1069 }
1070 }
1071}