1pub use crate::error::{
13 CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_NAMESPACE_TUPLE_SIZE,
14 MAX_REASON_PHRASE_LENGTH,
15};
16use crate::kvp::KeyValuePair;
17use crate::types::*;
18use crate::varint::VarInt;
19use bytes::{Buf, BufMut};
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22#[repr(u64)]
23pub enum MessageType {
24 SubscribeUpdate = 0x02,
25 Subscribe = 0x03,
26 SubscribeOk = 0x04,
27 SubscribeError = 0x05,
28 Announce = 0x06,
29 AnnounceOk = 0x07,
30 AnnounceError = 0x08,
31 Unannounce = 0x09,
32 Unsubscribe = 0x0A,
33 SubscribeDone = 0x0B,
34 AnnounceCancel = 0x0C,
35 TrackStatus = 0x0D,
36 TrackStatusOk = 0x0E,
37 TrackStatusError = 0x0F,
38 GoAway = 0x10,
39 SubscribeNamespace = 0x11,
40 SubscribeNamespaceOk = 0x12,
41 SubscribeNamespaceError = 0x13,
42 UnsubscribeNamespace = 0x14,
43 MaxRequestId = 0x15,
44 Fetch = 0x16,
45 FetchCancel = 0x17,
46 FetchOk = 0x18,
47 FetchError = 0x19,
48 RequestsBlocked = 0x1A,
49 Publish = 0x1D,
50 PublishOk = 0x1E,
51 PublishError = 0x1F,
52 ClientSetup = 0x20,
53 ServerSetup = 0x21,
54}
55
56impl MessageType {
57 pub fn from_id(id: u64) -> Option<Self> {
58 match id {
59 0x02 => Some(MessageType::SubscribeUpdate),
60 0x03 => Some(MessageType::Subscribe),
61 0x04 => Some(MessageType::SubscribeOk),
62 0x05 => Some(MessageType::SubscribeError),
63 0x06 => Some(MessageType::Announce),
64 0x07 => Some(MessageType::AnnounceOk),
65 0x08 => Some(MessageType::AnnounceError),
66 0x09 => Some(MessageType::Unannounce),
67 0x0A => Some(MessageType::Unsubscribe),
68 0x0B => Some(MessageType::SubscribeDone),
69 0x0C => Some(MessageType::AnnounceCancel),
70 0x0D => Some(MessageType::TrackStatus),
71 0x0E => Some(MessageType::TrackStatusOk),
72 0x0F => Some(MessageType::TrackStatusError),
73 0x10 => Some(MessageType::GoAway),
74 0x11 => Some(MessageType::SubscribeNamespace),
75 0x12 => Some(MessageType::SubscribeNamespaceOk),
76 0x13 => Some(MessageType::SubscribeNamespaceError),
77 0x14 => Some(MessageType::UnsubscribeNamespace),
78 0x15 => Some(MessageType::MaxRequestId),
79 0x16 => Some(MessageType::Fetch),
80 0x17 => Some(MessageType::FetchCancel),
81 0x18 => Some(MessageType::FetchOk),
82 0x19 => Some(MessageType::FetchError),
83 0x1A => Some(MessageType::RequestsBlocked),
84 0x1D => Some(MessageType::Publish),
85 0x1E => Some(MessageType::PublishOk),
86 0x1F => Some(MessageType::PublishError),
87 0x20 => Some(MessageType::ClientSetup),
88 0x21 => Some(MessageType::ServerSetup),
89 _ => None,
90 }
91 }
92
93 pub fn id(&self) -> u64 {
94 *self as u64
95 }
96}
97
98#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct ClientSetup {
104 pub supported_versions: Vec<VarInt>,
105 pub parameters: Vec<KeyValuePair>,
106}
107
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct ServerSetup {
110 pub selected_version: VarInt,
111 pub parameters: Vec<KeyValuePair>,
112}
113
114#[derive(Debug, Clone, PartialEq, Eq)]
115pub struct GoAway {
116 pub new_session_uri: Vec<u8>,
117}
118
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub struct MaxRequestId {
121 pub request_id: VarInt,
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub struct RequestsBlocked {
126 pub maximum_request_id: VarInt,
127}
128
129#[derive(Debug, Clone, PartialEq, Eq)]
134pub struct Subscribe {
135 pub request_id: VarInt,
136 pub track_namespace: TrackNamespace,
137 pub track_name: Vec<u8>,
138 pub subscriber_priority: u8,
139 pub group_order: VarInt,
140 pub forward: VarInt,
141 pub filter_type: VarInt,
142 pub start_group: Option<VarInt>,
143 pub start_object: Option<VarInt>,
144 pub end_group: Option<VarInt>,
145 pub parameters: Vec<KeyValuePair>,
146}
147
148#[derive(Debug, Clone, PartialEq, Eq)]
149pub struct SubscribeOk {
150 pub request_id: VarInt,
151 pub track_alias: VarInt,
152 pub expires: VarInt,
153 pub group_order: VarInt,
154 pub content_exists: VarInt,
155 pub largest_location: Option<Location>,
156 pub parameters: Vec<KeyValuePair>,
157}
158
159#[derive(Debug, Clone, PartialEq, Eq)]
160pub struct SubscribeError {
161 pub request_id: VarInt,
162 pub error_code: VarInt,
163 pub reason_phrase: Vec<u8>,
164}
165
166#[derive(Debug, Clone, PartialEq, Eq)]
167pub struct SubscribeUpdate {
168 pub request_id: VarInt,
169 pub start_group: VarInt,
170 pub start_object: VarInt,
171 pub end_group: VarInt,
172 pub subscriber_priority: u8,
173 pub forward: VarInt,
174 pub parameters: Vec<KeyValuePair>,
175}
176
177#[derive(Debug, Clone, PartialEq, Eq)]
178pub struct SubscribeDone {
179 pub request_id: VarInt,
180 pub status_code: VarInt,
181 pub stream_count: VarInt,
182 pub reason_phrase: Vec<u8>,
183}
184
185#[derive(Debug, Clone, PartialEq, Eq)]
186pub struct Unsubscribe {
187 pub request_id: VarInt,
188}
189
190#[derive(Debug, Clone, PartialEq, Eq)]
195pub struct Announce {
196 pub request_id: VarInt,
197 pub track_namespace: TrackNamespace,
198 pub parameters: Vec<KeyValuePair>,
199}
200
201#[derive(Debug, Clone, PartialEq, Eq)]
202pub struct AnnounceOk {
203 pub request_id: VarInt,
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
207pub struct AnnounceError {
208 pub request_id: VarInt,
209 pub error_code: VarInt,
210 pub reason_phrase: Vec<u8>,
211}
212
213#[derive(Debug, Clone, PartialEq, Eq)]
214pub struct AnnounceCancel {
215 pub track_namespace: TrackNamespace,
216 pub error_code: VarInt,
217 pub reason_phrase: Vec<u8>,
218}
219
220#[derive(Debug, Clone, PartialEq, Eq)]
221pub struct Unannounce {
222 pub track_namespace: TrackNamespace,
223}
224
225#[derive(Debug, Clone, PartialEq, Eq)]
230pub struct SubscribeNamespace {
231 pub request_id: VarInt,
232 pub track_namespace_prefix: TrackNamespace,
233 pub parameters: Vec<KeyValuePair>,
234}
235
236#[derive(Debug, Clone, PartialEq, Eq)]
237pub struct SubscribeNamespaceOk {
238 pub request_id: VarInt,
239}
240
241#[derive(Debug, Clone, PartialEq, Eq)]
242pub struct SubscribeNamespaceError {
243 pub request_id: VarInt,
244 pub error_code: VarInt,
245 pub reason_phrase: Vec<u8>,
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
249pub struct UnsubscribeNamespace {
250 pub track_namespace_prefix: TrackNamespace,
251}
252
253#[derive(Debug, Clone, PartialEq, Eq)]
259pub struct TrackStatus {
260 pub request_id: VarInt,
261 pub track_namespace: TrackNamespace,
262 pub track_name: Vec<u8>,
263 pub subscriber_priority: u8,
264 pub group_order: VarInt,
265 pub forward: VarInt,
266 pub filter_type: VarInt,
267 pub parameters: Vec<KeyValuePair>,
268}
269
270#[derive(Debug, Clone, PartialEq, Eq)]
272pub struct TrackStatusOk {
273 pub request_id: VarInt,
274 pub track_alias: VarInt,
275 pub expires: VarInt,
276 pub group_order: VarInt,
277 pub content_exists: VarInt,
278 pub largest_location: Option<Location>,
279 pub parameters: Vec<KeyValuePair>,
280}
281
282#[derive(Debug, Clone, PartialEq, Eq)]
284pub struct TrackStatusError {
285 pub request_id: VarInt,
286 pub error_code: VarInt,
287 pub reason_phrase: Vec<u8>,
288}
289
290#[derive(Debug, Clone, Copy, PartialEq, Eq)]
295#[repr(u64)]
296pub enum FetchType {
297 Standalone = 1,
298 RelativeJoining = 2,
299 AbsoluteJoining = 3,
300}
301
302impl FetchType {
303 pub fn from_u64(v: u64) -> Option<Self> {
304 match v {
305 1 => Some(FetchType::Standalone),
306 2 => Some(FetchType::RelativeJoining),
307 3 => Some(FetchType::AbsoluteJoining),
308 _ => None,
309 }
310 }
311}
312
313#[derive(Debug, Clone, PartialEq, Eq)]
314pub struct Fetch {
315 pub request_id: VarInt,
316 pub subscriber_priority: u8,
317 pub group_order: VarInt,
318 pub fetch_type: FetchType,
319 pub fetch_payload: FetchPayload,
320 pub parameters: Vec<KeyValuePair>,
321}
322
323#[derive(Debug, Clone, PartialEq, Eq)]
324pub enum FetchPayload {
325 Standalone {
326 track_namespace: TrackNamespace,
327 track_name: Vec<u8>,
328 start_group: VarInt,
329 start_object: VarInt,
330 end_group: VarInt,
331 end_object: VarInt,
332 },
333 Joining {
334 joining_subscribe_id: VarInt,
335 joining_start: VarInt,
336 },
337}
338
339#[derive(Debug, Clone, PartialEq, Eq)]
340pub struct FetchOk {
341 pub request_id: VarInt,
342 pub group_order: VarInt,
343 pub end_of_track: VarInt,
344 pub end_location: Location,
345 pub parameters: Vec<KeyValuePair>,
346}
347
348#[derive(Debug, Clone, PartialEq, Eq)]
349pub struct FetchError {
350 pub request_id: VarInt,
351 pub error_code: VarInt,
352 pub reason_phrase: Vec<u8>,
353}
354
355#[derive(Debug, Clone, PartialEq, Eq)]
356pub struct FetchCancel {
357 pub request_id: VarInt,
358}
359
360#[derive(Debug, Clone, PartialEq, Eq)]
365pub struct Publish {
366 pub request_id: VarInt,
367 pub track_namespace: TrackNamespace,
368 pub track_name: Vec<u8>,
369 pub track_alias: VarInt,
370 pub group_order: VarInt,
371 pub content_exists: VarInt,
372 pub largest_location: Option<Location>,
373 pub forward: VarInt,
374 pub parameters: Vec<KeyValuePair>,
375}
376
377#[derive(Debug, Clone, PartialEq, Eq)]
378pub struct PublishOk {
379 pub request_id: VarInt,
380 pub forward: VarInt,
381 pub subscriber_priority: u8,
382 pub group_order: VarInt,
383 pub filter_type: VarInt,
384 pub start_group: Option<VarInt>,
385 pub start_object: Option<VarInt>,
386 pub end_group: Option<VarInt>,
387 pub parameters: Vec<KeyValuePair>,
388}
389
390#[derive(Debug, Clone, PartialEq, Eq)]
391pub struct PublishError {
392 pub request_id: VarInt,
393 pub error_code: VarInt,
394 pub reason_phrase: Vec<u8>,
395}
396
397#[derive(Debug, Clone, PartialEq, Eq)]
402pub enum ControlMessage {
403 ClientSetup(ClientSetup),
404 ServerSetup(ServerSetup),
405 GoAway(GoAway),
406 MaxRequestId(MaxRequestId),
407 RequestsBlocked(RequestsBlocked),
408 Subscribe(Subscribe),
409 SubscribeOk(SubscribeOk),
410 SubscribeError(SubscribeError),
411 SubscribeUpdate(SubscribeUpdate),
412 SubscribeDone(SubscribeDone),
413 Unsubscribe(Unsubscribe),
414 Announce(Announce),
415 AnnounceOk(AnnounceOk),
416 AnnounceError(AnnounceError),
417 AnnounceCancel(AnnounceCancel),
418 Unannounce(Unannounce),
419 SubscribeNamespace(SubscribeNamespace),
420 SubscribeNamespaceOk(SubscribeNamespaceOk),
421 SubscribeNamespaceError(SubscribeNamespaceError),
422 UnsubscribeNamespace(UnsubscribeNamespace),
423 TrackStatus(TrackStatus),
424 TrackStatusOk(TrackStatusOk),
425 TrackStatusError(TrackStatusError),
426 Fetch(Fetch),
427 FetchOk(FetchOk),
428 FetchError(FetchError),
429 FetchCancel(FetchCancel),
430 Publish(Publish),
431 PublishOk(PublishOk),
432 PublishError(PublishError),
433}
434
435impl ControlMessage {
436 pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
440 let mut payload = Vec::with_capacity(256);
441 self.encode_payload(&mut payload)?;
442
443 if payload.len() > MAX_MESSAGE_LENGTH {
444 return Err(CodecError::MessageTooLong(payload.len()));
445 }
446
447 VarInt::from_usize(self.message_type().id() as usize).encode(buf);
448 buf.put_u16(payload.len() as u16);
450 buf.put_slice(&payload);
451 Ok(())
452 }
453
454 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
458 let type_id = VarInt::decode(buf)?.into_inner();
459 let msg_type =
460 MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
461 if buf.remaining() < 2 {
463 return Err(CodecError::UnexpectedEnd);
464 }
465 let payload_len = buf.get_u16() as usize;
466 if buf.remaining() < payload_len {
467 return Err(CodecError::UnexpectedEnd);
468 }
469 let payload_bytes = buf.copy_to_bytes(payload_len);
470 let mut payload = &payload_bytes[..];
471 Self::decode_payload(msg_type, &mut payload)
472 }
473
474 fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
475 match self {
476 ControlMessage::ClientSetup(m) => {
477 VarInt::from_usize(m.supported_versions.len()).encode(buf);
478 for v in &m.supported_versions {
479 v.encode(buf);
480 }
481 KeyValuePair::encode_list(&m.parameters, buf);
482 }
483 ControlMessage::ServerSetup(m) => {
484 m.selected_version.encode(buf);
485 KeyValuePair::encode_list(&m.parameters, buf);
486 }
487 ControlMessage::GoAway(m) => {
488 if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
489 return Err(CodecError::GoAwayUriTooLong);
490 }
491 VarInt::from_usize(m.new_session_uri.len()).encode(buf);
492 buf.put_slice(&m.new_session_uri);
493 }
494 ControlMessage::MaxRequestId(m) => {
495 m.request_id.encode(buf);
496 }
497 ControlMessage::RequestsBlocked(m) => {
498 m.maximum_request_id.encode(buf);
499 }
500 ControlMessage::Subscribe(m) => {
501 m.request_id.encode(buf);
502 m.track_namespace.encode(buf);
503 VarInt::from_usize(m.track_name.len()).encode(buf);
504 buf.put_slice(&m.track_name);
505 buf.put_u8(m.subscriber_priority);
506 m.group_order.encode(buf);
507 m.forward.encode(buf);
508 m.filter_type.encode(buf);
509 if let Some(sg) = &m.start_group {
510 sg.encode(buf);
511 }
512 if let Some(so) = &m.start_object {
513 so.encode(buf);
514 }
515 if let Some(eg) = &m.end_group {
516 eg.encode(buf);
517 }
518 KeyValuePair::encode_list(&m.parameters, buf);
519 }
520 ControlMessage::SubscribeOk(m) => {
521 m.request_id.encode(buf);
522 m.track_alias.encode(buf);
523 m.expires.encode(buf);
524 m.group_order.encode(buf);
525 m.content_exists.encode(buf);
526 if let Some(loc) = &m.largest_location {
527 loc.encode(buf);
528 }
529 KeyValuePair::encode_list(&m.parameters, buf);
530 }
531 ControlMessage::SubscribeError(m) => {
532 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
533 return Err(CodecError::ReasonPhraseTooLong);
534 }
535 m.request_id.encode(buf);
536 m.error_code.encode(buf);
537 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
538 buf.put_slice(&m.reason_phrase);
539 }
540 ControlMessage::SubscribeUpdate(m) => {
541 m.request_id.encode(buf);
542 m.start_group.encode(buf);
543 m.start_object.encode(buf);
544 m.end_group.encode(buf);
545 buf.put_u8(m.subscriber_priority);
546 m.forward.encode(buf);
547 KeyValuePair::encode_list(&m.parameters, buf);
548 }
549 ControlMessage::SubscribeDone(m) => {
550 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
551 return Err(CodecError::ReasonPhraseTooLong);
552 }
553 m.request_id.encode(buf);
554 m.status_code.encode(buf);
555 m.stream_count.encode(buf);
556 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
557 buf.put_slice(&m.reason_phrase);
558 }
559 ControlMessage::Unsubscribe(m) => {
560 m.request_id.encode(buf);
561 }
562 ControlMessage::Announce(m) => {
563 m.request_id.encode(buf);
564 m.track_namespace.encode(buf);
565 KeyValuePair::encode_list(&m.parameters, buf);
566 }
567 ControlMessage::AnnounceOk(m) => {
568 m.request_id.encode(buf);
569 }
570 ControlMessage::AnnounceError(m) => {
571 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
572 return Err(CodecError::ReasonPhraseTooLong);
573 }
574 m.request_id.encode(buf);
575 m.error_code.encode(buf);
576 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
577 buf.put_slice(&m.reason_phrase);
578 }
579 ControlMessage::AnnounceCancel(m) => {
580 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
581 return Err(CodecError::ReasonPhraseTooLong);
582 }
583 m.track_namespace.encode(buf);
584 m.error_code.encode(buf);
585 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
586 buf.put_slice(&m.reason_phrase);
587 }
588 ControlMessage::Unannounce(m) => {
589 m.track_namespace.encode(buf);
590 }
591 ControlMessage::SubscribeNamespace(m) => {
592 m.request_id.encode(buf);
593 m.track_namespace_prefix.encode(buf);
594 KeyValuePair::encode_list(&m.parameters, buf);
595 }
596 ControlMessage::SubscribeNamespaceOk(m) => {
597 m.request_id.encode(buf);
598 }
599 ControlMessage::SubscribeNamespaceError(m) => {
600 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
601 return Err(CodecError::ReasonPhraseTooLong);
602 }
603 m.request_id.encode(buf);
604 m.error_code.encode(buf);
605 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
606 buf.put_slice(&m.reason_phrase);
607 }
608 ControlMessage::UnsubscribeNamespace(m) => {
609 m.track_namespace_prefix.encode(buf);
610 }
611 ControlMessage::TrackStatus(m) => {
612 m.request_id.encode(buf);
613 m.track_namespace.encode(buf);
614 VarInt::from_usize(m.track_name.len()).encode(buf);
615 buf.put_slice(&m.track_name);
616 buf.put_u8(m.subscriber_priority);
617 m.group_order.encode(buf);
618 m.forward.encode(buf);
619 m.filter_type.encode(buf);
620 KeyValuePair::encode_list(&m.parameters, buf);
621 }
622 ControlMessage::TrackStatusOk(m) => {
623 m.request_id.encode(buf);
624 m.track_alias.encode(buf);
625 m.expires.encode(buf);
626 m.group_order.encode(buf);
627 m.content_exists.encode(buf);
628 if let Some(loc) = &m.largest_location {
629 loc.encode(buf);
630 }
631 KeyValuePair::encode_list(&m.parameters, buf);
632 }
633 ControlMessage::TrackStatusError(m) => {
634 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
635 return Err(CodecError::ReasonPhraseTooLong);
636 }
637 m.request_id.encode(buf);
638 m.error_code.encode(buf);
639 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
640 buf.put_slice(&m.reason_phrase);
641 }
642 ControlMessage::Fetch(m) => {
643 m.request_id.encode(buf);
644 buf.put_u8(m.subscriber_priority);
645 m.group_order.encode(buf);
646 VarInt::from_usize(m.fetch_type as usize).encode(buf);
647 match &m.fetch_payload {
648 FetchPayload::Standalone {
649 track_namespace,
650 track_name,
651 start_group,
652 start_object,
653 end_group,
654 end_object,
655 } => {
656 track_namespace.encode(buf);
657 VarInt::from_usize(track_name.len()).encode(buf);
658 buf.put_slice(track_name);
659 start_group.encode(buf);
660 start_object.encode(buf);
661 end_group.encode(buf);
662 end_object.encode(buf);
663 }
664 FetchPayload::Joining { joining_subscribe_id, joining_start } => {
665 joining_subscribe_id.encode(buf);
666 joining_start.encode(buf);
667 }
668 }
669 KeyValuePair::encode_list(&m.parameters, buf);
670 }
671 ControlMessage::FetchOk(m) => {
672 m.request_id.encode(buf);
673 m.group_order.encode(buf);
674 m.end_of_track.encode(buf);
675 m.end_location.encode(buf);
676 KeyValuePair::encode_list(&m.parameters, buf);
677 }
678 ControlMessage::FetchError(m) => {
679 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
680 return Err(CodecError::ReasonPhraseTooLong);
681 }
682 m.request_id.encode(buf);
683 m.error_code.encode(buf);
684 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
685 buf.put_slice(&m.reason_phrase);
686 }
687 ControlMessage::FetchCancel(m) => {
688 m.request_id.encode(buf);
689 }
690 ControlMessage::Publish(m) => {
691 m.request_id.encode(buf);
692 m.track_namespace.encode(buf);
693 VarInt::from_usize(m.track_name.len()).encode(buf);
694 buf.put_slice(&m.track_name);
695 m.track_alias.encode(buf);
696 m.group_order.encode(buf);
697 m.content_exists.encode(buf);
698 if let Some(loc) = &m.largest_location {
699 loc.encode(buf);
700 }
701 m.forward.encode(buf);
702 KeyValuePair::encode_list(&m.parameters, buf);
703 }
704 ControlMessage::PublishOk(m) => {
705 m.request_id.encode(buf);
706 m.forward.encode(buf);
707 buf.put_u8(m.subscriber_priority);
708 m.group_order.encode(buf);
709 m.filter_type.encode(buf);
710 if let Some(sg) = &m.start_group {
711 sg.encode(buf);
712 }
713 if let Some(so) = &m.start_object {
714 so.encode(buf);
715 }
716 if let Some(eg) = &m.end_group {
717 eg.encode(buf);
718 }
719 KeyValuePair::encode_list(&m.parameters, buf);
720 }
721 ControlMessage::PublishError(m) => {
722 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
723 return Err(CodecError::ReasonPhraseTooLong);
724 }
725 m.request_id.encode(buf);
726 m.error_code.encode(buf);
727 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
728 buf.put_slice(&m.reason_phrase);
729 }
730 }
731 Ok(())
732 }
733
734 fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
735 match msg_type {
736 MessageType::ClientSetup => {
737 let num_versions = VarInt::decode(buf)?.into_inner() as usize;
738 if num_versions == 0 {
739 return Err(CodecError::InvalidField);
740 }
741 let mut supported_versions = Vec::with_capacity(num_versions);
742 for _ in 0..num_versions {
743 supported_versions.push(VarInt::decode(buf)?);
744 }
745 let parameters = KeyValuePair::decode_list(buf)?;
746 Ok(ControlMessage::ClientSetup(ClientSetup { supported_versions, parameters }))
747 }
748 MessageType::ServerSetup => {
749 let selected_version = VarInt::decode(buf)?;
750 let parameters = KeyValuePair::decode_list(buf)?;
751 Ok(ControlMessage::ServerSetup(ServerSetup { selected_version, parameters }))
752 }
753 MessageType::GoAway => {
754 let uri_len = VarInt::decode(buf)?.into_inner() as usize;
755 let uri = read_bytes(buf, uri_len)?;
756 Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
757 }
758 MessageType::MaxRequestId => {
759 let request_id = VarInt::decode(buf)?;
760 Ok(ControlMessage::MaxRequestId(MaxRequestId { request_id }))
761 }
762 MessageType::RequestsBlocked => {
763 let maximum_request_id = VarInt::decode(buf)?;
764 Ok(ControlMessage::RequestsBlocked(RequestsBlocked { maximum_request_id }))
765 }
766 MessageType::Subscribe => {
767 let request_id = VarInt::decode(buf)?;
768 let track_namespace = TrackNamespace::decode(buf)?;
769 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
770 let track_name = read_bytes(buf, track_name_len)?;
771 if buf.remaining() < 1 {
772 return Err(CodecError::UnexpectedEnd);
773 }
774 let subscriber_priority = buf.get_u8();
775 let group_order = VarInt::decode(buf)?;
776 let forward = VarInt::decode(buf)?;
777 let filter_type = VarInt::decode(buf)?;
778 let ft_val = filter_type.into_inner();
779 if ft_val == 0 || ft_val > 4 {
780 return Err(CodecError::InvalidField);
781 }
782 let (start_group, start_object) = if ft_val == 3 || ft_val == 4 {
783 (Some(VarInt::decode(buf)?), Some(VarInt::decode(buf)?))
784 } else {
785 (None, None)
786 };
787 let end_group = if ft_val == 4 { Some(VarInt::decode(buf)?) } else { None };
788 let parameters = KeyValuePair::decode_list(buf)?;
789 Ok(ControlMessage::Subscribe(Subscribe {
790 request_id,
791 track_namespace,
792 track_name,
793 subscriber_priority,
794 group_order,
795 forward,
796 filter_type,
797 start_group,
798 start_object,
799 end_group,
800 parameters,
801 }))
802 }
803 MessageType::SubscribeOk => {
804 let request_id = VarInt::decode(buf)?;
805 let track_alias = VarInt::decode(buf)?;
806 let expires = VarInt::decode(buf)?;
807 let group_order = VarInt::decode(buf)?;
808 let content_exists = VarInt::decode(buf)?;
809 let largest_location = if content_exists.into_inner() != 0 {
810 Some(Location::decode(buf)?)
811 } else {
812 None
813 };
814 let parameters = KeyValuePair::decode_list(buf)?;
815 Ok(ControlMessage::SubscribeOk(SubscribeOk {
816 request_id,
817 track_alias,
818 expires,
819 group_order,
820 content_exists,
821 largest_location,
822 parameters,
823 }))
824 }
825 MessageType::SubscribeError => {
826 let request_id = VarInt::decode(buf)?;
827 let error_code = VarInt::decode(buf)?;
828 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
829 let reason_phrase = read_bytes(buf, reason_len)?;
830 Ok(ControlMessage::SubscribeError(SubscribeError {
831 request_id,
832 error_code,
833 reason_phrase,
834 }))
835 }
836 MessageType::SubscribeUpdate => {
837 let request_id = VarInt::decode(buf)?;
838 let start_group = VarInt::decode(buf)?;
839 let start_object = VarInt::decode(buf)?;
840 let end_group = VarInt::decode(buf)?;
841 if buf.remaining() < 1 {
842 return Err(CodecError::UnexpectedEnd);
843 }
844 let subscriber_priority = buf.get_u8();
845 let forward = VarInt::decode(buf)?;
846 let parameters = KeyValuePair::decode_list(buf)?;
847 Ok(ControlMessage::SubscribeUpdate(SubscribeUpdate {
848 request_id,
849 start_group,
850 start_object,
851 end_group,
852 subscriber_priority,
853 forward,
854 parameters,
855 }))
856 }
857 MessageType::SubscribeDone => {
858 let request_id = VarInt::decode(buf)?;
859 let status_code = VarInt::decode(buf)?;
860 let stream_count = VarInt::decode(buf)?;
861 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
862 let reason_phrase = read_bytes(buf, reason_len)?;
863 Ok(ControlMessage::SubscribeDone(SubscribeDone {
864 request_id,
865 status_code,
866 stream_count,
867 reason_phrase,
868 }))
869 }
870 MessageType::Unsubscribe => {
871 let request_id = VarInt::decode(buf)?;
872 Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
873 }
874 MessageType::Announce => {
875 let request_id = VarInt::decode(buf)?;
876 let track_namespace = TrackNamespace::decode(buf)?;
877 let parameters = KeyValuePair::decode_list(buf)?;
878 Ok(ControlMessage::Announce(Announce { request_id, track_namespace, parameters }))
879 }
880 MessageType::AnnounceOk => {
881 let request_id = VarInt::decode(buf)?;
882 Ok(ControlMessage::AnnounceOk(AnnounceOk { request_id }))
883 }
884 MessageType::AnnounceError => {
885 let request_id = VarInt::decode(buf)?;
886 let error_code = VarInt::decode(buf)?;
887 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
888 let reason_phrase = read_bytes(buf, reason_len)?;
889 Ok(ControlMessage::AnnounceError(AnnounceError {
890 request_id,
891 error_code,
892 reason_phrase,
893 }))
894 }
895 MessageType::AnnounceCancel => {
896 let track_namespace = TrackNamespace::decode(buf)?;
897 let error_code = VarInt::decode(buf)?;
898 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
899 let reason_phrase = read_bytes(buf, reason_len)?;
900 Ok(ControlMessage::AnnounceCancel(AnnounceCancel {
901 track_namespace,
902 error_code,
903 reason_phrase,
904 }))
905 }
906 MessageType::Unannounce => {
907 let track_namespace = TrackNamespace::decode(buf)?;
908 Ok(ControlMessage::Unannounce(Unannounce { track_namespace }))
909 }
910 MessageType::SubscribeNamespace => {
911 let request_id = VarInt::decode(buf)?;
912 let track_namespace_prefix = TrackNamespace::decode(buf)?;
913 let parameters = KeyValuePair::decode_list(buf)?;
914 Ok(ControlMessage::SubscribeNamespace(SubscribeNamespace {
915 request_id,
916 track_namespace_prefix,
917 parameters,
918 }))
919 }
920 MessageType::SubscribeNamespaceOk => {
921 let request_id = VarInt::decode(buf)?;
922 Ok(ControlMessage::SubscribeNamespaceOk(SubscribeNamespaceOk { request_id }))
923 }
924 MessageType::SubscribeNamespaceError => {
925 let request_id = VarInt::decode(buf)?;
926 let error_code = VarInt::decode(buf)?;
927 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
928 let reason_phrase = read_bytes(buf, reason_len)?;
929 Ok(ControlMessage::SubscribeNamespaceError(SubscribeNamespaceError {
930 request_id,
931 error_code,
932 reason_phrase,
933 }))
934 }
935 MessageType::UnsubscribeNamespace => {
936 let track_namespace_prefix = TrackNamespace::decode(buf)?;
937 Ok(ControlMessage::UnsubscribeNamespace(UnsubscribeNamespace {
938 track_namespace_prefix,
939 }))
940 }
941 MessageType::TrackStatus => {
942 let request_id = VarInt::decode(buf)?;
943 let track_namespace = TrackNamespace::decode(buf)?;
944 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
945 let track_name = read_bytes(buf, track_name_len)?;
946 if buf.remaining() < 1 {
947 return Err(CodecError::UnexpectedEnd);
948 }
949 let subscriber_priority = buf.get_u8();
950 let group_order = VarInt::decode(buf)?;
951 let forward = VarInt::decode(buf)?;
952 let filter_type = VarInt::decode(buf)?;
953 let parameters = KeyValuePair::decode_list(buf)?;
954 Ok(ControlMessage::TrackStatus(TrackStatus {
955 request_id,
956 track_namespace,
957 track_name,
958 subscriber_priority,
959 group_order,
960 forward,
961 filter_type,
962 parameters,
963 }))
964 }
965 MessageType::TrackStatusOk => {
966 let request_id = VarInt::decode(buf)?;
967 let track_alias = VarInt::decode(buf)?;
968 let expires = VarInt::decode(buf)?;
969 let group_order = VarInt::decode(buf)?;
970 let content_exists = VarInt::decode(buf)?;
971 let largest_location = if content_exists.into_inner() != 0 {
972 Some(Location::decode(buf)?)
973 } else {
974 None
975 };
976 let parameters = KeyValuePair::decode_list(buf)?;
977 Ok(ControlMessage::TrackStatusOk(TrackStatusOk {
978 request_id,
979 track_alias,
980 expires,
981 group_order,
982 content_exists,
983 largest_location,
984 parameters,
985 }))
986 }
987 MessageType::TrackStatusError => {
988 let request_id = VarInt::decode(buf)?;
989 let error_code = VarInt::decode(buf)?;
990 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
991 let reason_phrase = read_bytes(buf, reason_len)?;
992 Ok(ControlMessage::TrackStatusError(TrackStatusError {
993 request_id,
994 error_code,
995 reason_phrase,
996 }))
997 }
998 MessageType::Fetch => {
999 let request_id = VarInt::decode(buf)?;
1000 if buf.remaining() < 1 {
1001 return Err(CodecError::UnexpectedEnd);
1002 }
1003 let subscriber_priority = buf.get_u8();
1004 let group_order = VarInt::decode(buf)?;
1005 let fetch_type_val = VarInt::decode(buf)?.into_inner();
1006 let fetch_type =
1007 FetchType::from_u64(fetch_type_val).ok_or(CodecError::InvalidField)?;
1008 let fetch_payload = match fetch_type {
1009 FetchType::Standalone => {
1010 let track_namespace = TrackNamespace::decode(buf)?;
1011 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1012 let track_name = read_bytes(buf, track_name_len)?;
1013 let start_group = VarInt::decode(buf)?;
1014 let start_object = VarInt::decode(buf)?;
1015 let end_group = VarInt::decode(buf)?;
1016 let end_object = VarInt::decode(buf)?;
1017 FetchPayload::Standalone {
1018 track_namespace,
1019 track_name,
1020 start_group,
1021 start_object,
1022 end_group,
1023 end_object,
1024 }
1025 }
1026 FetchType::RelativeJoining | FetchType::AbsoluteJoining => {
1027 let joining_subscribe_id = VarInt::decode(buf)?;
1028 let joining_start = VarInt::decode(buf)?;
1029 FetchPayload::Joining { joining_subscribe_id, joining_start }
1030 }
1031 };
1032 let parameters = KeyValuePair::decode_list(buf)?;
1033 Ok(ControlMessage::Fetch(Fetch {
1034 request_id,
1035 subscriber_priority,
1036 group_order,
1037 fetch_type,
1038 fetch_payload,
1039 parameters,
1040 }))
1041 }
1042 MessageType::FetchOk => {
1043 let request_id = VarInt::decode(buf)?;
1044 let group_order = VarInt::decode(buf)?;
1045 let end_of_track = VarInt::decode(buf)?;
1046 let end_location = Location::decode(buf)?;
1047 let parameters = KeyValuePair::decode_list(buf)?;
1048 Ok(ControlMessage::FetchOk(FetchOk {
1049 request_id,
1050 group_order,
1051 end_of_track,
1052 end_location,
1053 parameters,
1054 }))
1055 }
1056 MessageType::FetchError => {
1057 let request_id = VarInt::decode(buf)?;
1058 let error_code = VarInt::decode(buf)?;
1059 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1060 let reason_phrase = read_bytes(buf, reason_len)?;
1061 Ok(ControlMessage::FetchError(FetchError { request_id, error_code, reason_phrase }))
1062 }
1063 MessageType::FetchCancel => {
1064 let request_id = VarInt::decode(buf)?;
1065 Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
1066 }
1067 MessageType::Publish => {
1068 let request_id = VarInt::decode(buf)?;
1069 let track_namespace = TrackNamespace::decode(buf)?;
1070 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1071 let track_name = read_bytes(buf, track_name_len)?;
1072 let track_alias = VarInt::decode(buf)?;
1073 let group_order = VarInt::decode(buf)?;
1074 let content_exists = VarInt::decode(buf)?;
1075 let largest_location = if content_exists.into_inner() != 0 {
1076 Some(Location::decode(buf)?)
1077 } else {
1078 None
1079 };
1080 let forward = VarInt::decode(buf)?;
1081 let parameters = KeyValuePair::decode_list(buf)?;
1082 Ok(ControlMessage::Publish(Publish {
1083 request_id,
1084 track_namespace,
1085 track_name,
1086 track_alias,
1087 group_order,
1088 content_exists,
1089 largest_location,
1090 forward,
1091 parameters,
1092 }))
1093 }
1094 MessageType::PublishOk => {
1095 let request_id = VarInt::decode(buf)?;
1096 let forward = VarInt::decode(buf)?;
1097 if buf.remaining() < 1 {
1098 return Err(CodecError::UnexpectedEnd);
1099 }
1100 let subscriber_priority = buf.get_u8();
1101 let group_order = VarInt::decode(buf)?;
1102 let filter_type = VarInt::decode(buf)?;
1103 let ft_val = filter_type.into_inner();
1104 if ft_val == 0 || ft_val > 4 {
1105 return Err(CodecError::InvalidField);
1106 }
1107 let (start_group, start_object) = if ft_val == 3 || ft_val == 4 {
1108 (Some(VarInt::decode(buf)?), Some(VarInt::decode(buf)?))
1109 } else {
1110 (None, None)
1111 };
1112 let end_group = if ft_val == 4 { Some(VarInt::decode(buf)?) } else { None };
1113 let parameters = KeyValuePair::decode_list(buf)?;
1114 Ok(ControlMessage::PublishOk(PublishOk {
1115 request_id,
1116 forward,
1117 subscriber_priority,
1118 group_order,
1119 filter_type,
1120 start_group,
1121 start_object,
1122 end_group,
1123 parameters,
1124 }))
1125 }
1126 MessageType::PublishError => {
1127 let request_id = VarInt::decode(buf)?;
1128 let error_code = VarInt::decode(buf)?;
1129 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1130 let reason_phrase = read_bytes(buf, reason_len)?;
1131 Ok(ControlMessage::PublishError(PublishError {
1132 request_id,
1133 error_code,
1134 reason_phrase,
1135 }))
1136 }
1137 }
1138 }
1139
1140 pub fn message_type(&self) -> MessageType {
1141 match self {
1142 ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
1143 ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
1144 ControlMessage::GoAway(_) => MessageType::GoAway,
1145 ControlMessage::MaxRequestId(_) => MessageType::MaxRequestId,
1146 ControlMessage::RequestsBlocked(_) => MessageType::RequestsBlocked,
1147 ControlMessage::Subscribe(_) => MessageType::Subscribe,
1148 ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
1149 ControlMessage::SubscribeError(_) => MessageType::SubscribeError,
1150 ControlMessage::SubscribeUpdate(_) => MessageType::SubscribeUpdate,
1151 ControlMessage::SubscribeDone(_) => MessageType::SubscribeDone,
1152 ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
1153 ControlMessage::Announce(_) => MessageType::Announce,
1154 ControlMessage::AnnounceOk(_) => MessageType::AnnounceOk,
1155 ControlMessage::AnnounceError(_) => MessageType::AnnounceError,
1156 ControlMessage::AnnounceCancel(_) => MessageType::AnnounceCancel,
1157 ControlMessage::Unannounce(_) => MessageType::Unannounce,
1158 ControlMessage::SubscribeNamespace(_) => MessageType::SubscribeNamespace,
1159 ControlMessage::SubscribeNamespaceOk(_) => MessageType::SubscribeNamespaceOk,
1160 ControlMessage::SubscribeNamespaceError(_) => MessageType::SubscribeNamespaceError,
1161 ControlMessage::UnsubscribeNamespace(_) => MessageType::UnsubscribeNamespace,
1162 ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
1163 ControlMessage::TrackStatusOk(_) => MessageType::TrackStatusOk,
1164 ControlMessage::TrackStatusError(_) => MessageType::TrackStatusError,
1165 ControlMessage::Fetch(_) => MessageType::Fetch,
1166 ControlMessage::FetchOk(_) => MessageType::FetchOk,
1167 ControlMessage::FetchError(_) => MessageType::FetchError,
1168 ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
1169 ControlMessage::Publish(_) => MessageType::Publish,
1170 ControlMessage::PublishOk(_) => MessageType::PublishOk,
1171 ControlMessage::PublishError(_) => MessageType::PublishError,
1172 }
1173 }
1174}