1pub use crate::error::{
13 CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_NAMESPACE_TUPLE_SIZE,
14 MAX_REASON_PHRASE_LENGTH,
15};
16use crate::kvp::KeyValuePair;
17use crate::types::{self, *};
18use crate::varint::VarInt;
19use bytes::{Buf, BufMut};
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23#[repr(u64)]
24pub enum MessageType {
25 SubscribeUpdate = 0x02,
26 Subscribe = 0x03,
27 SubscribeOk = 0x04,
28 SubscribeError = 0x05,
29 Announce = 0x06,
30 AnnounceOk = 0x07,
31 AnnounceError = 0x08,
32 Unannounce = 0x09,
33 Unsubscribe = 0x0A,
34 SubscribeDone = 0x0B,
35 AnnounceCancel = 0x0C,
36 TrackStatusRequest = 0x0D,
37 TrackStatus = 0x0E,
38 GoAway = 0x10,
39 SubscribeAnnounces = 0x11,
40 SubscribeAnnouncesOk = 0x12,
41 SubscribeAnnouncesError = 0x13,
42 UnsubscribeAnnounces = 0x14,
43 MaxRequestId = 0x15,
44 Fetch = 0x16,
45 FetchCancel = 0x17,
46 FetchOk = 0x18,
47 FetchError = 0x19,
48 RequestsBlocked = 0x1A,
49 Publish = 0x1D,
50 PublishOk = 0x1E,
51 PublishError = 0x1F,
52 ClientSetup = 0x20,
53 ServerSetup = 0x21,
54}
55
56impl MessageType {
57 pub fn from_id(id: u64) -> Option<Self> {
58 match id {
59 0x02 => Some(MessageType::SubscribeUpdate),
60 0x03 => Some(MessageType::Subscribe),
61 0x04 => Some(MessageType::SubscribeOk),
62 0x05 => Some(MessageType::SubscribeError),
63 0x06 => Some(MessageType::Announce),
64 0x07 => Some(MessageType::AnnounceOk),
65 0x08 => Some(MessageType::AnnounceError),
66 0x09 => Some(MessageType::Unannounce),
67 0x0A => Some(MessageType::Unsubscribe),
68 0x0B => Some(MessageType::SubscribeDone),
69 0x0C => Some(MessageType::AnnounceCancel),
70 0x0D => Some(MessageType::TrackStatusRequest),
71 0x0E => Some(MessageType::TrackStatus),
72 0x10 => Some(MessageType::GoAway),
73 0x11 => Some(MessageType::SubscribeAnnounces),
74 0x12 => Some(MessageType::SubscribeAnnouncesOk),
75 0x13 => Some(MessageType::SubscribeAnnouncesError),
76 0x14 => Some(MessageType::UnsubscribeAnnounces),
77 0x15 => Some(MessageType::MaxRequestId),
78 0x16 => Some(MessageType::Fetch),
79 0x17 => Some(MessageType::FetchCancel),
80 0x18 => Some(MessageType::FetchOk),
81 0x19 => Some(MessageType::FetchError),
82 0x1A => Some(MessageType::RequestsBlocked),
83 0x1D => Some(MessageType::Publish),
84 0x1E => Some(MessageType::PublishOk),
85 0x1F => Some(MessageType::PublishError),
86 0x20 => Some(MessageType::ClientSetup),
87 0x21 => Some(MessageType::ServerSetup),
88 _ => None,
89 }
90 }
91
92 pub fn id(&self) -> u64 {
93 *self as u64
94 }
95}
96
97#[derive(Debug, Clone, PartialEq, Eq)]
102pub struct ClientSetup {
103 pub supported_versions: Vec<VarInt>,
104 pub parameters: Vec<KeyValuePair>,
105}
106
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct ServerSetup {
109 pub selected_version: VarInt,
110 pub parameters: Vec<KeyValuePair>,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct GoAway {
115 pub new_session_uri: Vec<u8>,
116}
117
118#[derive(Debug, Clone, PartialEq, Eq)]
119pub struct MaxRequestId {
120 pub request_id: VarInt,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq)]
124pub struct RequestsBlocked {
125 pub maximum_request_id: VarInt,
126}
127
128#[derive(Debug, Clone, PartialEq, Eq)]
134pub struct Subscribe {
135 pub request_id: VarInt,
136 pub track_namespace: TrackNamespace,
137 pub track_name: Vec<u8>,
138 pub subscriber_priority: u8,
139 pub group_order: VarInt,
140 pub forward: VarInt,
141 pub filter_type: VarInt,
142 pub start_group: Option<VarInt>,
143 pub start_object: Option<VarInt>,
144 pub end_group: Option<VarInt>,
145 pub parameters: Vec<KeyValuePair>,
146}
147
148#[derive(Debug, Clone, PartialEq, Eq)]
150pub struct SubscribeOk {
151 pub request_id: VarInt,
152 pub track_alias: VarInt,
153 pub expires: VarInt,
154 pub group_order: VarInt,
155 pub content_exists: VarInt,
156 pub largest_location: Option<Location>,
157 pub parameters: Vec<KeyValuePair>,
158}
159
160#[derive(Debug, Clone, PartialEq, Eq)]
162pub struct SubscribeError {
163 pub request_id: VarInt,
164 pub error_code: VarInt,
165 pub reason_phrase: Vec<u8>,
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
169pub struct SubscribeUpdate {
170 pub request_id: VarInt,
171 pub start_group: VarInt,
172 pub start_object: VarInt,
173 pub end_group: VarInt,
174 pub subscriber_priority: u8,
175 pub forward: VarInt,
176 pub parameters: Vec<KeyValuePair>,
177}
178
179#[derive(Debug, Clone, PartialEq, Eq)]
180pub struct SubscribeDone {
181 pub request_id: VarInt,
182 pub status_code: VarInt,
183 pub stream_count: VarInt,
184 pub reason_phrase: Vec<u8>,
185}
186
187#[derive(Debug, Clone, PartialEq, Eq)]
188pub struct Unsubscribe {
189 pub request_id: VarInt,
190}
191
192#[derive(Debug, Clone, PartialEq, Eq)]
197pub struct Announce {
198 pub request_id: VarInt,
199 pub track_namespace: TrackNamespace,
200 pub parameters: Vec<KeyValuePair>,
201}
202
203#[derive(Debug, Clone, PartialEq, Eq)]
204pub struct AnnounceOk {
205 pub request_id: VarInt,
206}
207
208#[derive(Debug, Clone, PartialEq, Eq)]
209pub struct AnnounceError {
210 pub request_id: VarInt,
211 pub error_code: VarInt,
212 pub reason_phrase: Vec<u8>,
213}
214
215#[derive(Debug, Clone, PartialEq, Eq)]
216pub struct AnnounceCancel {
217 pub track_namespace: TrackNamespace,
218 pub error_code: VarInt,
219 pub reason_phrase: Vec<u8>,
220}
221
222#[derive(Debug, Clone, PartialEq, Eq)]
223pub struct Unannounce {
224 pub track_namespace: TrackNamespace,
225}
226
227#[derive(Debug, Clone, PartialEq, Eq)]
232pub struct SubscribeAnnounces {
233 pub request_id: VarInt,
234 pub track_namespace_prefix: TrackNamespace,
235 pub parameters: Vec<KeyValuePair>,
236}
237
238#[derive(Debug, Clone, PartialEq, Eq)]
239pub struct SubscribeAnnouncesOk {
240 pub request_id: VarInt,
241}
242
243#[derive(Debug, Clone, PartialEq, Eq)]
244pub struct SubscribeAnnouncesError {
245 pub request_id: VarInt,
246 pub error_code: VarInt,
247 pub reason_phrase: Vec<u8>,
248}
249
250#[derive(Debug, Clone, PartialEq, Eq)]
251pub struct UnsubscribeAnnounces {
252 pub track_namespace_prefix: TrackNamespace,
253}
254
255#[derive(Debug, Clone, PartialEq, Eq)]
260pub struct TrackStatusRequest {
261 pub request_id: VarInt,
262 pub track_namespace: TrackNamespace,
263 pub track_name: Vec<u8>,
264 pub parameters: Vec<KeyValuePair>,
265}
266
267#[derive(Debug, Clone, PartialEq, Eq)]
268pub struct TrackStatus {
269 pub request_id: VarInt,
270 pub status_code: VarInt,
271 pub largest_location: Location,
272 pub parameters: Vec<KeyValuePair>,
273}
274
275#[derive(Debug, Clone, Copy, PartialEq, Eq)]
280#[repr(u64)]
281pub enum FetchType {
282 Standalone = 1,
283 RelativeJoining = 2,
284 AbsoluteJoining = 3,
285}
286
287impl FetchType {
288 pub fn from_u64(v: u64) -> Option<Self> {
289 match v {
290 1 => Some(FetchType::Standalone),
291 2 => Some(FetchType::RelativeJoining),
292 3 => Some(FetchType::AbsoluteJoining),
293 _ => None,
294 }
295 }
296}
297
298#[derive(Debug, Clone, PartialEq, Eq)]
299pub struct Fetch {
300 pub request_id: VarInt,
301 pub subscriber_priority: u8,
302 pub group_order: VarInt,
303 pub fetch_type: FetchType,
304 pub fetch_payload: FetchPayload,
305 pub parameters: Vec<KeyValuePair>,
306}
307
308#[derive(Debug, Clone, PartialEq, Eq)]
309pub enum FetchPayload {
310 Standalone {
311 track_namespace: TrackNamespace,
312 track_name: Vec<u8>,
313 start_group: VarInt,
314 start_object: VarInt,
315 end_group: VarInt,
316 end_object: VarInt,
317 },
318 Joining {
319 joining_subscribe_id: VarInt,
320 joining_start: VarInt,
321 },
322}
323
324#[derive(Debug, Clone, PartialEq, Eq)]
325pub struct FetchOk {
326 pub request_id: VarInt,
327 pub group_order: VarInt,
328 pub end_of_track: VarInt,
329 pub end_location: Location,
330 pub parameters: Vec<KeyValuePair>,
331}
332
333#[derive(Debug, Clone, PartialEq, Eq)]
334pub struct FetchError {
335 pub request_id: VarInt,
336 pub error_code: VarInt,
337 pub reason_phrase: Vec<u8>,
338}
339
340#[derive(Debug, Clone, PartialEq, Eq)]
341pub struct FetchCancel {
342 pub request_id: VarInt,
343}
344
345#[derive(Debug, Clone, PartialEq, Eq)]
351pub struct Publish {
352 pub request_id: VarInt,
353 pub track_namespace: TrackNamespace,
354 pub track_name: Vec<u8>,
355 pub track_alias: VarInt,
356 pub group_order: VarInt,
357 pub content_exists: VarInt,
358 pub largest_location: Option<Location>,
359 pub forward: VarInt,
360 pub parameters: Vec<KeyValuePair>,
361}
362
363#[derive(Debug, Clone, PartialEq, Eq)]
365pub struct PublishOk {
366 pub request_id: VarInt,
367 pub forward: VarInt,
368 pub subscriber_priority: u8,
369 pub group_order: VarInt,
370 pub filter_type: VarInt,
371 pub start_group: Option<VarInt>,
372 pub start_object: Option<VarInt>,
373 pub end_group: Option<VarInt>,
374 pub parameters: Vec<KeyValuePair>,
375}
376
377#[derive(Debug, Clone, PartialEq, Eq)]
379pub struct PublishError {
380 pub request_id: VarInt,
381 pub error_code: VarInt,
382 pub reason_phrase: Vec<u8>,
383}
384
385#[derive(Debug, Clone, PartialEq, Eq)]
390pub enum ControlMessage {
391 ClientSetup(ClientSetup),
392 ServerSetup(ServerSetup),
393 GoAway(GoAway),
394 MaxRequestId(MaxRequestId),
395 RequestsBlocked(RequestsBlocked),
396 Subscribe(Subscribe),
397 SubscribeOk(SubscribeOk),
398 SubscribeError(SubscribeError),
399 SubscribeUpdate(SubscribeUpdate),
400 SubscribeDone(SubscribeDone),
401 Unsubscribe(Unsubscribe),
402 Announce(Announce),
403 AnnounceOk(AnnounceOk),
404 AnnounceError(AnnounceError),
405 AnnounceCancel(AnnounceCancel),
406 Unannounce(Unannounce),
407 SubscribeAnnounces(SubscribeAnnounces),
408 SubscribeAnnouncesOk(SubscribeAnnouncesOk),
409 SubscribeAnnouncesError(SubscribeAnnouncesError),
410 UnsubscribeAnnounces(UnsubscribeAnnounces),
411 TrackStatusRequest(TrackStatusRequest),
412 TrackStatus(TrackStatus),
413 Fetch(Fetch),
414 FetchOk(FetchOk),
415 FetchError(FetchError),
416 FetchCancel(FetchCancel),
417 Publish(Publish),
418 PublishOk(PublishOk),
419 PublishError(PublishError),
420}
421
422impl ControlMessage {
423 pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
427 let mut payload = Vec::with_capacity(256);
428 self.encode_payload(&mut payload)?;
429
430 if payload.len() > MAX_MESSAGE_LENGTH {
431 return Err(CodecError::MessageTooLong(payload.len()));
432 }
433
434 VarInt::from_usize(self.message_type().id() as usize).encode(buf);
435 buf.put_u16(payload.len() as u16);
437 buf.put_slice(&payload);
438 Ok(())
439 }
440
441 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
445 let type_id = VarInt::decode(buf)?.into_inner();
446 let msg_type =
447 MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
448 if buf.remaining() < 2 {
450 return Err(CodecError::UnexpectedEnd);
451 }
452 let payload_len = buf.get_u16() as usize;
453 if buf.remaining() < payload_len {
454 return Err(CodecError::UnexpectedEnd);
455 }
456 let payload_bytes = buf.copy_to_bytes(payload_len);
457 let mut payload = &payload_bytes[..];
458 Self::decode_payload(msg_type, &mut payload)
459 }
460
461 fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
462 match self {
463 ControlMessage::ClientSetup(m) => {
464 VarInt::from_usize(m.supported_versions.len()).encode(buf);
465 for v in &m.supported_versions {
466 v.encode(buf);
467 }
468 KeyValuePair::encode_list(&m.parameters, buf);
469 }
470 ControlMessage::ServerSetup(m) => {
471 m.selected_version.encode(buf);
472 KeyValuePair::encode_list(&m.parameters, buf);
473 }
474 ControlMessage::GoAway(m) => {
475 if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
476 return Err(CodecError::GoAwayUriTooLong);
477 }
478 VarInt::from_usize(m.new_session_uri.len()).encode(buf);
479 buf.put_slice(&m.new_session_uri);
480 }
481 ControlMessage::MaxRequestId(m) => {
482 m.request_id.encode(buf);
483 }
484 ControlMessage::RequestsBlocked(m) => {
485 m.maximum_request_id.encode(buf);
486 }
487 ControlMessage::Subscribe(m) => {
488 m.request_id.encode(buf);
489 m.track_namespace.encode(buf);
490 VarInt::from_usize(m.track_name.len()).encode(buf);
491 buf.put_slice(&m.track_name);
492 buf.put_u8(m.subscriber_priority);
493 m.group_order.encode(buf);
494 m.forward.encode(buf);
495 m.filter_type.encode(buf);
496 if let Some(sg) = &m.start_group {
497 sg.encode(buf);
498 }
499 if let Some(so) = &m.start_object {
500 so.encode(buf);
501 }
502 if let Some(eg) = &m.end_group {
503 eg.encode(buf);
504 }
505 KeyValuePair::encode_list(&m.parameters, buf);
506 }
507 ControlMessage::SubscribeOk(m) => {
508 m.request_id.encode(buf);
509 m.track_alias.encode(buf);
510 m.expires.encode(buf);
511 m.group_order.encode(buf);
512 m.content_exists.encode(buf);
513 if let Some(loc) = &m.largest_location {
514 loc.encode(buf);
515 }
516 KeyValuePair::encode_list(&m.parameters, buf);
517 }
518 ControlMessage::SubscribeError(m) => {
519 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
520 return Err(CodecError::ReasonPhraseTooLong);
521 }
522 m.request_id.encode(buf);
523 m.error_code.encode(buf);
524 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
525 buf.put_slice(&m.reason_phrase);
526 }
527 ControlMessage::SubscribeUpdate(m) => {
528 m.request_id.encode(buf);
529 m.start_group.encode(buf);
530 m.start_object.encode(buf);
531 m.end_group.encode(buf);
532 buf.put_u8(m.subscriber_priority);
533 m.forward.encode(buf);
534 KeyValuePair::encode_list(&m.parameters, buf);
535 }
536 ControlMessage::SubscribeDone(m) => {
537 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
538 return Err(CodecError::ReasonPhraseTooLong);
539 }
540 m.request_id.encode(buf);
541 m.status_code.encode(buf);
542 m.stream_count.encode(buf);
543 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
544 buf.put_slice(&m.reason_phrase);
545 }
546 ControlMessage::Unsubscribe(m) => {
547 m.request_id.encode(buf);
548 }
549 ControlMessage::Announce(m) => {
550 m.request_id.encode(buf);
551 m.track_namespace.encode(buf);
552 KeyValuePair::encode_list(&m.parameters, buf);
553 }
554 ControlMessage::AnnounceOk(m) => {
555 m.request_id.encode(buf);
556 }
557 ControlMessage::AnnounceError(m) => {
558 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
559 return Err(CodecError::ReasonPhraseTooLong);
560 }
561 m.request_id.encode(buf);
562 m.error_code.encode(buf);
563 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
564 buf.put_slice(&m.reason_phrase);
565 }
566 ControlMessage::AnnounceCancel(m) => {
567 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
568 return Err(CodecError::ReasonPhraseTooLong);
569 }
570 m.track_namespace.encode(buf);
571 m.error_code.encode(buf);
572 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
573 buf.put_slice(&m.reason_phrase);
574 }
575 ControlMessage::Unannounce(m) => {
576 m.track_namespace.encode(buf);
577 }
578 ControlMessage::SubscribeAnnounces(m) => {
579 m.request_id.encode(buf);
580 m.track_namespace_prefix.encode(buf);
581 KeyValuePair::encode_list(&m.parameters, buf);
582 }
583 ControlMessage::SubscribeAnnouncesOk(m) => {
584 m.request_id.encode(buf);
585 }
586 ControlMessage::SubscribeAnnouncesError(m) => {
587 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
588 return Err(CodecError::ReasonPhraseTooLong);
589 }
590 m.request_id.encode(buf);
591 m.error_code.encode(buf);
592 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
593 buf.put_slice(&m.reason_phrase);
594 }
595 ControlMessage::UnsubscribeAnnounces(m) => {
596 m.track_namespace_prefix.encode(buf);
597 }
598 ControlMessage::TrackStatusRequest(m) => {
599 m.request_id.encode(buf);
600 m.track_namespace.encode(buf);
601 VarInt::from_usize(m.track_name.len()).encode(buf);
602 buf.put_slice(&m.track_name);
603 KeyValuePair::encode_list(&m.parameters, buf);
604 }
605 ControlMessage::TrackStatus(m) => {
606 m.request_id.encode(buf);
607 m.status_code.encode(buf);
608 m.largest_location.encode(buf);
609 KeyValuePair::encode_list(&m.parameters, buf);
610 }
611 ControlMessage::Fetch(m) => {
612 m.request_id.encode(buf);
613 buf.put_u8(m.subscriber_priority);
614 m.group_order.encode(buf);
615 VarInt::from_usize(m.fetch_type as usize).encode(buf);
616 match &m.fetch_payload {
617 FetchPayload::Standalone {
618 track_namespace,
619 track_name,
620 start_group,
621 start_object,
622 end_group,
623 end_object,
624 } => {
625 track_namespace.encode(buf);
626 VarInt::from_usize(track_name.len()).encode(buf);
627 buf.put_slice(track_name);
628 start_group.encode(buf);
629 start_object.encode(buf);
630 end_group.encode(buf);
631 end_object.encode(buf);
632 }
633 FetchPayload::Joining { joining_subscribe_id, joining_start } => {
634 joining_subscribe_id.encode(buf);
635 joining_start.encode(buf);
636 }
637 }
638 KeyValuePair::encode_list(&m.parameters, buf);
639 }
640 ControlMessage::FetchOk(m) => {
641 m.request_id.encode(buf);
642 m.group_order.encode(buf);
643 m.end_of_track.encode(buf);
644 m.end_location.encode(buf);
645 KeyValuePair::encode_list(&m.parameters, buf);
646 }
647 ControlMessage::FetchError(m) => {
648 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
649 return Err(CodecError::ReasonPhraseTooLong);
650 }
651 m.request_id.encode(buf);
652 m.error_code.encode(buf);
653 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
654 buf.put_slice(&m.reason_phrase);
655 }
656 ControlMessage::FetchCancel(m) => {
657 m.request_id.encode(buf);
658 }
659 ControlMessage::Publish(m) => {
660 m.request_id.encode(buf);
661 m.track_namespace.encode(buf);
662 VarInt::from_usize(m.track_name.len()).encode(buf);
663 buf.put_slice(&m.track_name);
664 m.track_alias.encode(buf);
665 m.group_order.encode(buf);
666 m.content_exists.encode(buf);
667 if let Some(loc) = &m.largest_location {
668 loc.encode(buf);
669 }
670 m.forward.encode(buf);
671 KeyValuePair::encode_list(&m.parameters, buf);
672 }
673 ControlMessage::PublishOk(m) => {
674 m.request_id.encode(buf);
675 m.forward.encode(buf);
676 buf.put_u8(m.subscriber_priority);
677 m.group_order.encode(buf);
678 m.filter_type.encode(buf);
679 if let Some(sg) = &m.start_group {
680 sg.encode(buf);
681 }
682 if let Some(so) = &m.start_object {
683 so.encode(buf);
684 }
685 if let Some(eg) = &m.end_group {
686 eg.encode(buf);
687 }
688 KeyValuePair::encode_list(&m.parameters, buf);
689 }
690 ControlMessage::PublishError(m) => {
691 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
692 return Err(CodecError::ReasonPhraseTooLong);
693 }
694 m.request_id.encode(buf);
695 m.error_code.encode(buf);
696 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
697 buf.put_slice(&m.reason_phrase);
698 }
699 }
700 Ok(())
701 }
702
703 fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
704 match msg_type {
705 MessageType::ClientSetup => {
706 let num_versions = VarInt::decode(buf)?.into_inner() as usize;
707 if num_versions == 0 {
708 return Err(CodecError::InvalidField);
709 }
710 let mut supported_versions = Vec::with_capacity(num_versions);
711 for _ in 0..num_versions {
712 supported_versions.push(VarInt::decode(buf)?);
713 }
714 let parameters = KeyValuePair::decode_list(buf)?;
715 Ok(ControlMessage::ClientSetup(ClientSetup { supported_versions, parameters }))
716 }
717 MessageType::ServerSetup => {
718 let selected_version = VarInt::decode(buf)?;
719 let parameters = KeyValuePair::decode_list(buf)?;
720 Ok(ControlMessage::ServerSetup(ServerSetup { selected_version, parameters }))
721 }
722 MessageType::GoAway => {
723 let uri_len = VarInt::decode(buf)?.into_inner() as usize;
724 let uri = types::read_bytes(buf, uri_len)?;
725 Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
726 }
727 MessageType::MaxRequestId => {
728 let request_id = VarInt::decode(buf)?;
729 Ok(ControlMessage::MaxRequestId(MaxRequestId { request_id }))
730 }
731 MessageType::RequestsBlocked => {
732 let maximum_request_id = VarInt::decode(buf)?;
733 Ok(ControlMessage::RequestsBlocked(RequestsBlocked { maximum_request_id }))
734 }
735 MessageType::Subscribe => {
736 let request_id = VarInt::decode(buf)?;
737 let track_namespace = TrackNamespace::decode(buf)?;
738 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
739 let track_name = types::read_bytes(buf, track_name_len)?;
740 if buf.remaining() < 1 {
741 return Err(CodecError::UnexpectedEnd);
742 }
743 let subscriber_priority = buf.get_u8();
744 let group_order = VarInt::decode(buf)?;
745 let forward = VarInt::decode(buf)?;
746 let filter_type = VarInt::decode(buf)?;
747 let ft_val = filter_type.into_inner();
748 if ft_val == 0 || ft_val > 4 {
749 return Err(CodecError::InvalidField);
750 }
751 let (start_group, start_object) = if ft_val == 3 || ft_val == 4 {
752 (Some(VarInt::decode(buf)?), Some(VarInt::decode(buf)?))
753 } else {
754 (None, None)
755 };
756 let end_group = if ft_val == 4 { Some(VarInt::decode(buf)?) } else { None };
757 let parameters = KeyValuePair::decode_list(buf)?;
758 Ok(ControlMessage::Subscribe(Subscribe {
759 request_id,
760 track_namespace,
761 track_name,
762 subscriber_priority,
763 group_order,
764 forward,
765 filter_type,
766 start_group,
767 start_object,
768 end_group,
769 parameters,
770 }))
771 }
772 MessageType::SubscribeOk => {
773 let request_id = VarInt::decode(buf)?;
774 let track_alias = VarInt::decode(buf)?;
775 let expires = VarInt::decode(buf)?;
776 let group_order = VarInt::decode(buf)?;
777 let content_exists = VarInt::decode(buf)?;
778 let largest_location = if content_exists.into_inner() != 0 {
779 Some(Location::decode(buf)?)
780 } else {
781 None
782 };
783 let parameters = KeyValuePair::decode_list(buf)?;
784 Ok(ControlMessage::SubscribeOk(SubscribeOk {
785 request_id,
786 track_alias,
787 expires,
788 group_order,
789 content_exists,
790 largest_location,
791 parameters,
792 }))
793 }
794 MessageType::SubscribeError => {
795 let request_id = VarInt::decode(buf)?;
796 let error_code = VarInt::decode(buf)?;
797 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
798 let reason_phrase = types::read_bytes(buf, reason_len)?;
799 Ok(ControlMessage::SubscribeError(SubscribeError {
800 request_id,
801 error_code,
802 reason_phrase,
803 }))
804 }
805 MessageType::SubscribeUpdate => {
806 let request_id = VarInt::decode(buf)?;
807 let start_group = VarInt::decode(buf)?;
808 let start_object = VarInt::decode(buf)?;
809 let end_group = VarInt::decode(buf)?;
810 if buf.remaining() < 1 {
811 return Err(CodecError::UnexpectedEnd);
812 }
813 let subscriber_priority = buf.get_u8();
814 let forward = VarInt::decode(buf)?;
815 let parameters = KeyValuePair::decode_list(buf)?;
816 Ok(ControlMessage::SubscribeUpdate(SubscribeUpdate {
817 request_id,
818 start_group,
819 start_object,
820 end_group,
821 subscriber_priority,
822 forward,
823 parameters,
824 }))
825 }
826 MessageType::SubscribeDone => {
827 let request_id = VarInt::decode(buf)?;
828 let status_code = VarInt::decode(buf)?;
829 let stream_count = VarInt::decode(buf)?;
830 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
831 let reason_phrase = types::read_bytes(buf, reason_len)?;
832 Ok(ControlMessage::SubscribeDone(SubscribeDone {
833 request_id,
834 status_code,
835 stream_count,
836 reason_phrase,
837 }))
838 }
839 MessageType::Unsubscribe => {
840 let request_id = VarInt::decode(buf)?;
841 Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
842 }
843 MessageType::Announce => {
844 let request_id = VarInt::decode(buf)?;
845 let track_namespace = TrackNamespace::decode(buf)?;
846 let parameters = KeyValuePair::decode_list(buf)?;
847 Ok(ControlMessage::Announce(Announce { request_id, track_namespace, parameters }))
848 }
849 MessageType::AnnounceOk => {
850 let request_id = VarInt::decode(buf)?;
851 Ok(ControlMessage::AnnounceOk(AnnounceOk { request_id }))
852 }
853 MessageType::AnnounceError => {
854 let request_id = VarInt::decode(buf)?;
855 let error_code = VarInt::decode(buf)?;
856 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
857 let reason_phrase = types::read_bytes(buf, reason_len)?;
858 Ok(ControlMessage::AnnounceError(AnnounceError {
859 request_id,
860 error_code,
861 reason_phrase,
862 }))
863 }
864 MessageType::AnnounceCancel => {
865 let track_namespace = TrackNamespace::decode(buf)?;
866 let error_code = VarInt::decode(buf)?;
867 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
868 let reason_phrase = types::read_bytes(buf, reason_len)?;
869 Ok(ControlMessage::AnnounceCancel(AnnounceCancel {
870 track_namespace,
871 error_code,
872 reason_phrase,
873 }))
874 }
875 MessageType::Unannounce => {
876 let track_namespace = TrackNamespace::decode(buf)?;
877 Ok(ControlMessage::Unannounce(Unannounce { track_namespace }))
878 }
879 MessageType::SubscribeAnnounces => {
880 let request_id = VarInt::decode(buf)?;
881 let track_namespace_prefix = TrackNamespace::decode(buf)?;
882 let parameters = KeyValuePair::decode_list(buf)?;
883 Ok(ControlMessage::SubscribeAnnounces(SubscribeAnnounces {
884 request_id,
885 track_namespace_prefix,
886 parameters,
887 }))
888 }
889 MessageType::SubscribeAnnouncesOk => {
890 let request_id = VarInt::decode(buf)?;
891 Ok(ControlMessage::SubscribeAnnouncesOk(SubscribeAnnouncesOk { request_id }))
892 }
893 MessageType::SubscribeAnnouncesError => {
894 let request_id = VarInt::decode(buf)?;
895 let error_code = VarInt::decode(buf)?;
896 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
897 let reason_phrase = types::read_bytes(buf, reason_len)?;
898 Ok(ControlMessage::SubscribeAnnouncesError(SubscribeAnnouncesError {
899 request_id,
900 error_code,
901 reason_phrase,
902 }))
903 }
904 MessageType::UnsubscribeAnnounces => {
905 let track_namespace_prefix = TrackNamespace::decode(buf)?;
906 Ok(ControlMessage::UnsubscribeAnnounces(UnsubscribeAnnounces {
907 track_namespace_prefix,
908 }))
909 }
910 MessageType::TrackStatusRequest => {
911 let request_id = VarInt::decode(buf)?;
912 let track_namespace = TrackNamespace::decode(buf)?;
913 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
914 let track_name = types::read_bytes(buf, track_name_len)?;
915 let parameters = KeyValuePair::decode_list(buf)?;
916 Ok(ControlMessage::TrackStatusRequest(TrackStatusRequest {
917 request_id,
918 track_namespace,
919 track_name,
920 parameters,
921 }))
922 }
923 MessageType::TrackStatus => {
924 let request_id = VarInt::decode(buf)?;
925 let status_code = VarInt::decode(buf)?;
926 let largest_location = Location::decode(buf)?;
927 let parameters = KeyValuePair::decode_list(buf)?;
928 Ok(ControlMessage::TrackStatus(TrackStatus {
929 request_id,
930 status_code,
931 largest_location,
932 parameters,
933 }))
934 }
935 MessageType::Fetch => {
936 let request_id = VarInt::decode(buf)?;
937 if buf.remaining() < 1 {
938 return Err(CodecError::UnexpectedEnd);
939 }
940 let subscriber_priority = buf.get_u8();
941 let group_order = VarInt::decode(buf)?;
942 let fetch_type_val = VarInt::decode(buf)?.into_inner();
943 let fetch_type =
944 FetchType::from_u64(fetch_type_val).ok_or(CodecError::InvalidField)?;
945 let fetch_payload = match fetch_type {
946 FetchType::Standalone => {
947 let track_namespace = TrackNamespace::decode(buf)?;
948 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
949 let track_name = types::read_bytes(buf, track_name_len)?;
950 let start_group = VarInt::decode(buf)?;
951 let start_object = VarInt::decode(buf)?;
952 let end_group = VarInt::decode(buf)?;
953 let end_object = VarInt::decode(buf)?;
954 FetchPayload::Standalone {
955 track_namespace,
956 track_name,
957 start_group,
958 start_object,
959 end_group,
960 end_object,
961 }
962 }
963 FetchType::RelativeJoining | FetchType::AbsoluteJoining => {
964 let joining_subscribe_id = VarInt::decode(buf)?;
965 let joining_start = VarInt::decode(buf)?;
966 FetchPayload::Joining { joining_subscribe_id, joining_start }
967 }
968 };
969 let parameters = KeyValuePair::decode_list(buf)?;
970 Ok(ControlMessage::Fetch(Fetch {
971 request_id,
972 subscriber_priority,
973 group_order,
974 fetch_type,
975 fetch_payload,
976 parameters,
977 }))
978 }
979 MessageType::FetchOk => {
980 let request_id = VarInt::decode(buf)?;
981 let group_order = VarInt::decode(buf)?;
982 let end_of_track = VarInt::decode(buf)?;
983 let end_location = Location::decode(buf)?;
984 let parameters = KeyValuePair::decode_list(buf)?;
985 Ok(ControlMessage::FetchOk(FetchOk {
986 request_id,
987 group_order,
988 end_of_track,
989 end_location,
990 parameters,
991 }))
992 }
993 MessageType::FetchError => {
994 let request_id = VarInt::decode(buf)?;
995 let error_code = VarInt::decode(buf)?;
996 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
997 let reason_phrase = types::read_bytes(buf, reason_len)?;
998 Ok(ControlMessage::FetchError(FetchError { request_id, error_code, reason_phrase }))
999 }
1000 MessageType::FetchCancel => {
1001 let request_id = VarInt::decode(buf)?;
1002 Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
1003 }
1004 MessageType::Publish => {
1005 let request_id = VarInt::decode(buf)?;
1006 let track_namespace = TrackNamespace::decode(buf)?;
1007 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1008 let track_name = types::read_bytes(buf, track_name_len)?;
1009 let track_alias = VarInt::decode(buf)?;
1010 let group_order = VarInt::decode(buf)?;
1011 let content_exists = VarInt::decode(buf)?;
1012 let largest_location = if content_exists.into_inner() != 0 {
1013 Some(Location::decode(buf)?)
1014 } else {
1015 None
1016 };
1017 let forward = VarInt::decode(buf)?;
1018 let parameters = KeyValuePair::decode_list(buf)?;
1019 Ok(ControlMessage::Publish(Publish {
1020 request_id,
1021 track_namespace,
1022 track_name,
1023 track_alias,
1024 group_order,
1025 content_exists,
1026 largest_location,
1027 forward,
1028 parameters,
1029 }))
1030 }
1031 MessageType::PublishOk => {
1032 let request_id = VarInt::decode(buf)?;
1033 let forward = VarInt::decode(buf)?;
1034 if buf.remaining() < 1 {
1035 return Err(CodecError::UnexpectedEnd);
1036 }
1037 let subscriber_priority = buf.get_u8();
1038 let group_order = VarInt::decode(buf)?;
1039 let filter_type = VarInt::decode(buf)?;
1040 let ft_val = filter_type.into_inner();
1041 if ft_val == 0 || ft_val > 4 {
1042 return Err(CodecError::InvalidField);
1043 }
1044 let (start_group, start_object) = if ft_val == 3 || ft_val == 4 {
1045 (Some(VarInt::decode(buf)?), Some(VarInt::decode(buf)?))
1046 } else {
1047 (None, None)
1048 };
1049 let end_group = if ft_val == 4 { Some(VarInt::decode(buf)?) } else { None };
1050 let parameters = KeyValuePair::decode_list(buf)?;
1051 Ok(ControlMessage::PublishOk(PublishOk {
1052 request_id,
1053 forward,
1054 subscriber_priority,
1055 group_order,
1056 filter_type,
1057 start_group,
1058 start_object,
1059 end_group,
1060 parameters,
1061 }))
1062 }
1063 MessageType::PublishError => {
1064 let request_id = VarInt::decode(buf)?;
1065 let error_code = VarInt::decode(buf)?;
1066 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1067 let reason_phrase = types::read_bytes(buf, reason_len)?;
1068 Ok(ControlMessage::PublishError(PublishError {
1069 request_id,
1070 error_code,
1071 reason_phrase,
1072 }))
1073 }
1074 }
1075 }
1076
1077 pub fn message_type(&self) -> MessageType {
1078 match self {
1079 ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
1080 ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
1081 ControlMessage::GoAway(_) => MessageType::GoAway,
1082 ControlMessage::MaxRequestId(_) => MessageType::MaxRequestId,
1083 ControlMessage::RequestsBlocked(_) => MessageType::RequestsBlocked,
1084 ControlMessage::Subscribe(_) => MessageType::Subscribe,
1085 ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
1086 ControlMessage::SubscribeError(_) => MessageType::SubscribeError,
1087 ControlMessage::SubscribeUpdate(_) => MessageType::SubscribeUpdate,
1088 ControlMessage::SubscribeDone(_) => MessageType::SubscribeDone,
1089 ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
1090 ControlMessage::Announce(_) => MessageType::Announce,
1091 ControlMessage::AnnounceOk(_) => MessageType::AnnounceOk,
1092 ControlMessage::AnnounceError(_) => MessageType::AnnounceError,
1093 ControlMessage::AnnounceCancel(_) => MessageType::AnnounceCancel,
1094 ControlMessage::Unannounce(_) => MessageType::Unannounce,
1095 ControlMessage::SubscribeAnnounces(_) => MessageType::SubscribeAnnounces,
1096 ControlMessage::SubscribeAnnouncesOk(_) => MessageType::SubscribeAnnouncesOk,
1097 ControlMessage::SubscribeAnnouncesError(_) => MessageType::SubscribeAnnouncesError,
1098 ControlMessage::UnsubscribeAnnounces(_) => MessageType::UnsubscribeAnnounces,
1099 ControlMessage::TrackStatusRequest(_) => MessageType::TrackStatusRequest,
1100 ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
1101 ControlMessage::Fetch(_) => MessageType::Fetch,
1102 ControlMessage::FetchOk(_) => MessageType::FetchOk,
1103 ControlMessage::FetchError(_) => MessageType::FetchError,
1104 ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
1105 ControlMessage::Publish(_) => MessageType::Publish,
1106 ControlMessage::PublishOk(_) => MessageType::PublishOk,
1107 ControlMessage::PublishError(_) => MessageType::PublishError,
1108 }
1109 }
1110}