1pub use crate::error::{
16 CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_NAMESPACE_TUPLE_SIZE,
17 MAX_REASON_PHRASE_LENGTH,
18};
19use crate::kvp::KeyValuePair;
20use crate::types::*;
21use crate::varint::VarInt;
22use bytes::{Buf, BufMut};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25#[repr(u64)]
26pub enum MessageType {
27 SubscribeUpdate = 0x02,
28 Subscribe = 0x03,
29 SubscribeOk = 0x04,
30 RequestError = 0x05,
31 PublishNamespace = 0x06,
32 RequestOk = 0x07,
33 PublishNamespaceDone = 0x09,
34 Unsubscribe = 0x0A,
35 PublishDone = 0x0B,
36 PublishNamespaceCancel = 0x0C,
37 TrackStatus = 0x0D,
38 GoAway = 0x10,
39 SubscribeNamespace = 0x11,
40 UnsubscribeNamespace = 0x14,
41 MaxRequestId = 0x15,
42 Fetch = 0x16,
43 FetchCancel = 0x17,
44 FetchOk = 0x18,
45 RequestsBlocked = 0x1A,
46 Publish = 0x1D,
47 PublishOk = 0x1E,
48 ClientSetup = 0x20,
49 ServerSetup = 0x21,
50}
51
52impl MessageType {
53 pub fn from_id(id: u64) -> Option<Self> {
54 match id {
55 0x02 => Some(MessageType::SubscribeUpdate),
56 0x03 => Some(MessageType::Subscribe),
57 0x04 => Some(MessageType::SubscribeOk),
58 0x05 => Some(MessageType::RequestError),
59 0x06 => Some(MessageType::PublishNamespace),
60 0x07 => Some(MessageType::RequestOk),
61 0x09 => Some(MessageType::PublishNamespaceDone),
62 0x0A => Some(MessageType::Unsubscribe),
63 0x0B => Some(MessageType::PublishDone),
64 0x0C => Some(MessageType::PublishNamespaceCancel),
65 0x0D => Some(MessageType::TrackStatus),
66 0x10 => Some(MessageType::GoAway),
67 0x11 => Some(MessageType::SubscribeNamespace),
68 0x14 => Some(MessageType::UnsubscribeNamespace),
69 0x15 => Some(MessageType::MaxRequestId),
70 0x16 => Some(MessageType::Fetch),
71 0x17 => Some(MessageType::FetchCancel),
72 0x18 => Some(MessageType::FetchOk),
73 0x1A => Some(MessageType::RequestsBlocked),
74 0x1D => Some(MessageType::Publish),
75 0x1E => Some(MessageType::PublishOk),
76 0x20 => Some(MessageType::ClientSetup),
77 0x21 => Some(MessageType::ServerSetup),
78 _ => None,
79 }
80 }
81
82 pub fn id(&self) -> u64 {
83 *self as u64
84 }
85}
86
87#[derive(Debug, Clone, PartialEq, Eq)]
93pub struct ClientSetup {
94 pub parameters: Vec<KeyValuePair>,
95}
96
97#[derive(Debug, Clone, PartialEq, Eq)]
99pub struct ServerSetup {
100 pub parameters: Vec<KeyValuePair>,
101}
102
103#[derive(Debug, Clone, PartialEq, Eq)]
104pub struct GoAway {
105 pub new_session_uri: Vec<u8>,
106}
107
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct MaxRequestId {
110 pub request_id: VarInt,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct RequestsBlocked {
115 pub maximum_request_id: VarInt,
116}
117
118#[derive(Debug, Clone, PartialEq, Eq)]
124pub struct RequestOk {
125 pub request_id: VarInt,
126 pub parameters: Vec<KeyValuePair>,
127}
128
129#[derive(Debug, Clone, PartialEq, Eq)]
131pub struct RequestError {
132 pub request_id: VarInt,
133 pub error_code: VarInt,
134 pub reason_phrase: Vec<u8>,
135}
136
137#[derive(Debug, Clone, PartialEq, Eq)]
143pub struct Subscribe {
144 pub request_id: VarInt,
145 pub track_namespace: TrackNamespace,
146 pub track_name: Vec<u8>,
147 pub parameters: Vec<KeyValuePair>,
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
152pub struct SubscribeOk {
153 pub request_id: VarInt,
154 pub track_alias: VarInt,
155 pub parameters: Vec<KeyValuePair>,
156}
157
158#[derive(Debug, Clone, PartialEq, Eq)]
160pub struct SubscribeUpdate {
161 pub request_id: VarInt,
162 pub subscription_request_id: VarInt,
163 pub parameters: Vec<KeyValuePair>,
164}
165
166#[derive(Debug, Clone, PartialEq, Eq)]
167pub struct Unsubscribe {
168 pub request_id: VarInt,
169}
170
171#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct Publish {
178 pub request_id: VarInt,
179 pub track_namespace: TrackNamespace,
180 pub track_name: Vec<u8>,
181 pub track_alias: VarInt,
182 pub parameters: Vec<KeyValuePair>,
183}
184
185#[derive(Debug, Clone, PartialEq, Eq)]
187pub struct PublishOk {
188 pub request_id: VarInt,
189 pub parameters: Vec<KeyValuePair>,
190}
191
192#[derive(Debug, Clone, PartialEq, Eq)]
194pub struct PublishDone {
195 pub request_id: VarInt,
196 pub status_code: VarInt,
197 pub stream_count: VarInt,
198 pub reason_phrase: Vec<u8>,
199}
200
201#[derive(Debug, Clone, PartialEq, Eq)]
207pub struct PublishNamespace {
208 pub request_id: VarInt,
209 pub track_namespace: TrackNamespace,
210 pub parameters: Vec<KeyValuePair>,
211}
212
213#[derive(Debug, Clone, PartialEq, Eq)]
215pub struct PublishNamespaceDone {
216 pub track_namespace: TrackNamespace,
217}
218
219#[derive(Debug, Clone, PartialEq, Eq)]
221pub struct PublishNamespaceCancel {
222 pub track_namespace: TrackNamespace,
223 pub error_code: VarInt,
224 pub reason_phrase: Vec<u8>,
225}
226
227#[derive(Debug, Clone, PartialEq, Eq)]
232pub struct SubscribeNamespace {
233 pub request_id: VarInt,
234 pub namespace_prefix: TrackNamespace,
235 pub parameters: Vec<KeyValuePair>,
236}
237
238#[derive(Debug, Clone, PartialEq, Eq)]
240pub struct UnsubscribeNamespace {
241 pub request_id: VarInt,
242}
243
244#[derive(Debug, Clone, PartialEq, Eq)]
250pub struct TrackStatus {
251 pub request_id: VarInt,
252 pub track_namespace: TrackNamespace,
253 pub track_name: Vec<u8>,
254 pub parameters: Vec<KeyValuePair>,
255}
256
257#[derive(Debug, Clone, Copy, PartialEq, Eq)]
262#[repr(u64)]
263pub enum FetchType {
264 Standalone = 1,
266 RelativeJoining = 2,
268 AbsoluteJoining = 3,
270}
271
272impl FetchType {
273 pub fn from_u64(v: u64) -> Option<Self> {
275 match v {
276 1 => Some(FetchType::Standalone),
277 2 => Some(FetchType::RelativeJoining),
278 3 => Some(FetchType::AbsoluteJoining),
279 _ => None,
280 }
281 }
282}
283
284#[derive(Debug, Clone, PartialEq, Eq)]
285pub struct Fetch {
286 pub request_id: VarInt,
287 pub fetch_type: FetchType,
288 pub fetch_payload: FetchPayload,
289 pub parameters: Vec<KeyValuePair>,
290}
291
292#[derive(Debug, Clone, PartialEq, Eq)]
293pub enum FetchPayload {
294 Standalone {
295 track_namespace: TrackNamespace,
296 track_name: Vec<u8>,
297 start_group: VarInt,
298 start_object: VarInt,
299 end_group: VarInt,
300 end_object: VarInt,
301 },
302 Joining {
303 joining_request_id: VarInt,
304 joining_start: VarInt,
305 },
306}
307
308#[derive(Debug, Clone, PartialEq, Eq)]
309pub struct FetchOk {
310 pub request_id: VarInt,
311 pub end_of_track: VarInt,
312 pub end_group: VarInt,
313 pub end_object: VarInt,
314 pub parameters: Vec<KeyValuePair>,
315}
316
317#[derive(Debug, Clone, PartialEq, Eq)]
318pub struct FetchCancel {
319 pub request_id: VarInt,
320}
321
322#[derive(Debug, Clone, PartialEq, Eq)]
327pub enum ControlMessage {
328 ClientSetup(ClientSetup),
329 ServerSetup(ServerSetup),
330 GoAway(GoAway),
331 MaxRequestId(MaxRequestId),
332 RequestsBlocked(RequestsBlocked),
333 RequestOk(RequestOk),
334 RequestError(RequestError),
335 Subscribe(Subscribe),
336 SubscribeOk(SubscribeOk),
337 SubscribeUpdate(SubscribeUpdate),
338 Unsubscribe(Unsubscribe),
339 Publish(Publish),
340 PublishOk(PublishOk),
341 PublishDone(PublishDone),
342 PublishNamespace(PublishNamespace),
343 PublishNamespaceDone(PublishNamespaceDone),
344 PublishNamespaceCancel(PublishNamespaceCancel),
345 SubscribeNamespace(SubscribeNamespace),
346 UnsubscribeNamespace(UnsubscribeNamespace),
347 TrackStatus(TrackStatus),
348 Fetch(Fetch),
349 FetchOk(FetchOk),
350 FetchCancel(FetchCancel),
351}
352
353impl ControlMessage {
354 pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
355 let mut payload = Vec::with_capacity(256);
356 self.encode_payload(&mut payload)?;
357
358 if payload.len() > MAX_MESSAGE_LENGTH {
359 return Err(CodecError::MessageTooLong(payload.len()));
360 }
361
362 let msg_type = self.message_type();
363 VarInt::from_usize(msg_type.id() as usize).encode(buf);
364 buf.put_u16(payload.len() as u16);
366 buf.put_slice(&payload);
367 Ok(())
368 }
369
370 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
371 let type_id = VarInt::decode(buf)?.into_inner();
372 let msg_type =
373 MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
374 if buf.remaining() < 2 {
376 return Err(CodecError::UnexpectedEnd);
377 }
378 let payload_len = buf.get_u16() as usize;
379 if buf.remaining() < payload_len {
380 return Err(CodecError::UnexpectedEnd);
381 }
382 let payload_bytes = buf.copy_to_bytes(payload_len);
383 let mut payload = &payload_bytes[..];
384 Self::decode_payload(msg_type, &mut payload)
385 }
386
387 fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
388 match self {
389 ControlMessage::ClientSetup(m) => {
390 KeyValuePair::encode_list(&m.parameters, buf);
391 }
392 ControlMessage::ServerSetup(m) => {
393 KeyValuePair::encode_list(&m.parameters, buf);
394 }
395 ControlMessage::GoAway(m) => {
396 if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
397 return Err(CodecError::GoAwayUriTooLong);
398 }
399 VarInt::from_usize(m.new_session_uri.len()).encode(buf);
400 buf.put_slice(&m.new_session_uri);
401 }
402 ControlMessage::MaxRequestId(m) => {
403 m.request_id.encode(buf);
404 }
405 ControlMessage::RequestsBlocked(m) => {
406 m.maximum_request_id.encode(buf);
407 }
408 ControlMessage::RequestOk(m) => {
409 m.request_id.encode(buf);
410 KeyValuePair::encode_list(&m.parameters, buf);
411 }
412 ControlMessage::RequestError(m) => {
413 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
414 return Err(CodecError::ReasonPhraseTooLong);
415 }
416 m.request_id.encode(buf);
417 m.error_code.encode(buf);
418 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
419 buf.put_slice(&m.reason_phrase);
420 }
421 ControlMessage::Subscribe(m) => {
422 m.request_id.encode(buf);
423 m.track_namespace.encode(buf);
424 VarInt::from_usize(m.track_name.len()).encode(buf);
425 buf.put_slice(&m.track_name);
426 KeyValuePair::encode_list(&m.parameters, buf);
427 }
428 ControlMessage::SubscribeOk(m) => {
429 m.request_id.encode(buf);
430 m.track_alias.encode(buf);
431 KeyValuePair::encode_list(&m.parameters, buf);
432 }
433 ControlMessage::SubscribeUpdate(m) => {
434 m.request_id.encode(buf);
435 m.subscription_request_id.encode(buf);
436 KeyValuePair::encode_list(&m.parameters, buf);
437 }
438 ControlMessage::Unsubscribe(m) => {
439 m.request_id.encode(buf);
440 }
441 ControlMessage::Publish(m) => {
442 m.request_id.encode(buf);
443 m.track_namespace.encode(buf);
444 VarInt::from_usize(m.track_name.len()).encode(buf);
445 buf.put_slice(&m.track_name);
446 m.track_alias.encode(buf);
447 KeyValuePair::encode_list(&m.parameters, buf);
448 }
449 ControlMessage::PublishOk(m) => {
450 m.request_id.encode(buf);
451 KeyValuePair::encode_list(&m.parameters, buf);
452 }
453 ControlMessage::PublishDone(m) => {
454 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
455 return Err(CodecError::ReasonPhraseTooLong);
456 }
457 m.request_id.encode(buf);
458 m.status_code.encode(buf);
459 m.stream_count.encode(buf);
460 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
461 buf.put_slice(&m.reason_phrase);
462 }
463 ControlMessage::PublishNamespace(m) => {
464 m.request_id.encode(buf);
465 m.track_namespace.encode(buf);
466 KeyValuePair::encode_list(&m.parameters, buf);
467 }
468 ControlMessage::PublishNamespaceDone(m) => {
469 m.track_namespace.encode(buf);
470 }
471 ControlMessage::PublishNamespaceCancel(m) => {
472 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
473 return Err(CodecError::ReasonPhraseTooLong);
474 }
475 m.track_namespace.encode(buf);
476 m.error_code.encode(buf);
477 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
478 buf.put_slice(&m.reason_phrase);
479 }
480 ControlMessage::SubscribeNamespace(m) => {
481 m.request_id.encode(buf);
482 m.namespace_prefix.encode(buf);
483 KeyValuePair::encode_list(&m.parameters, buf);
484 }
485 ControlMessage::UnsubscribeNamespace(m) => {
486 m.request_id.encode(buf);
487 }
488 ControlMessage::TrackStatus(m) => {
489 m.request_id.encode(buf);
490 m.track_namespace.encode(buf);
491 VarInt::from_usize(m.track_name.len()).encode(buf);
492 buf.put_slice(&m.track_name);
493 KeyValuePair::encode_list(&m.parameters, buf);
494 }
495 ControlMessage::Fetch(m) => {
496 m.request_id.encode(buf);
497 VarInt::from_usize(m.fetch_type as usize).encode(buf);
498 match &m.fetch_payload {
499 FetchPayload::Standalone {
500 track_namespace,
501 track_name,
502 start_group,
503 start_object,
504 end_group,
505 end_object,
506 } => {
507 track_namespace.encode(buf);
508 VarInt::from_usize(track_name.len()).encode(buf);
509 buf.put_slice(track_name);
510 start_group.encode(buf);
511 start_object.encode(buf);
512 end_group.encode(buf);
513 end_object.encode(buf);
514 }
515 FetchPayload::Joining { joining_request_id, joining_start } => {
516 joining_request_id.encode(buf);
517 joining_start.encode(buf);
518 }
519 }
520 KeyValuePair::encode_list(&m.parameters, buf);
521 }
522 ControlMessage::FetchOk(m) => {
523 m.request_id.encode(buf);
524 m.end_of_track.encode(buf);
525 m.end_group.encode(buf);
526 m.end_object.encode(buf);
527 KeyValuePair::encode_list(&m.parameters, buf);
528 }
529 ControlMessage::FetchCancel(m) => {
530 m.request_id.encode(buf);
531 }
532 }
533 Ok(())
534 }
535
536 fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
537 match msg_type {
538 MessageType::ClientSetup => {
539 let parameters = KeyValuePair::decode_list(buf)?;
540 Ok(ControlMessage::ClientSetup(ClientSetup { parameters }))
541 }
542 MessageType::ServerSetup => {
543 let parameters = KeyValuePair::decode_list(buf)?;
544 Ok(ControlMessage::ServerSetup(ServerSetup { parameters }))
545 }
546 MessageType::GoAway => {
547 let uri_len = VarInt::decode(buf)?.into_inner() as usize;
548 let uri = read_bytes(buf, uri_len)?;
549 Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
550 }
551 MessageType::MaxRequestId => {
552 let request_id = VarInt::decode(buf)?;
553 Ok(ControlMessage::MaxRequestId(MaxRequestId { request_id }))
554 }
555 MessageType::RequestsBlocked => {
556 let maximum_request_id = VarInt::decode(buf)?;
557 Ok(ControlMessage::RequestsBlocked(RequestsBlocked { maximum_request_id }))
558 }
559 MessageType::RequestOk => {
560 let request_id = VarInt::decode(buf)?;
561 let parameters = KeyValuePair::decode_list(buf)?;
562 Ok(ControlMessage::RequestOk(RequestOk { request_id, parameters }))
563 }
564 MessageType::RequestError => {
565 let request_id = VarInt::decode(buf)?;
566 let error_code = VarInt::decode(buf)?;
567 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
568 let reason_phrase = read_bytes(buf, reason_len)?;
569 Ok(ControlMessage::RequestError(RequestError {
570 request_id,
571 error_code,
572 reason_phrase,
573 }))
574 }
575 MessageType::Subscribe => {
576 let request_id = VarInt::decode(buf)?;
577 let track_namespace = TrackNamespace::decode(buf)?;
578 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
579 let track_name = read_bytes(buf, track_name_len)?;
580 let parameters = KeyValuePair::decode_list(buf)?;
581 Ok(ControlMessage::Subscribe(Subscribe {
582 request_id,
583 track_namespace,
584 track_name,
585 parameters,
586 }))
587 }
588 MessageType::SubscribeOk => {
589 let request_id = VarInt::decode(buf)?;
590 let track_alias = VarInt::decode(buf)?;
591 let parameters = KeyValuePair::decode_list(buf)?;
592 Ok(ControlMessage::SubscribeOk(SubscribeOk { request_id, track_alias, parameters }))
593 }
594 MessageType::SubscribeUpdate => {
595 let request_id = VarInt::decode(buf)?;
596 let subscription_request_id = VarInt::decode(buf)?;
597 let parameters = KeyValuePair::decode_list(buf)?;
598 Ok(ControlMessage::SubscribeUpdate(SubscribeUpdate {
599 request_id,
600 subscription_request_id,
601 parameters,
602 }))
603 }
604 MessageType::Unsubscribe => {
605 let request_id = VarInt::decode(buf)?;
606 Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
607 }
608 MessageType::Publish => {
609 let request_id = VarInt::decode(buf)?;
610 let track_namespace = TrackNamespace::decode(buf)?;
611 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
612 let track_name = read_bytes(buf, track_name_len)?;
613 let track_alias = VarInt::decode(buf)?;
614 let parameters = KeyValuePair::decode_list(buf)?;
615 Ok(ControlMessage::Publish(Publish {
616 request_id,
617 track_namespace,
618 track_name,
619 track_alias,
620 parameters,
621 }))
622 }
623 MessageType::PublishOk => {
624 let request_id = VarInt::decode(buf)?;
625 let parameters = KeyValuePair::decode_list(buf)?;
626 Ok(ControlMessage::PublishOk(PublishOk { request_id, parameters }))
627 }
628 MessageType::PublishDone => {
629 let request_id = VarInt::decode(buf)?;
630 let status_code = VarInt::decode(buf)?;
631 let stream_count = VarInt::decode(buf)?;
632 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
633 let reason_phrase = read_bytes(buf, reason_len)?;
634 Ok(ControlMessage::PublishDone(PublishDone {
635 request_id,
636 status_code,
637 stream_count,
638 reason_phrase,
639 }))
640 }
641 MessageType::PublishNamespace => {
642 let request_id = VarInt::decode(buf)?;
643 let track_namespace = TrackNamespace::decode(buf)?;
644 let parameters = KeyValuePair::decode_list(buf)?;
645 Ok(ControlMessage::PublishNamespace(PublishNamespace {
646 request_id,
647 track_namespace,
648 parameters,
649 }))
650 }
651 MessageType::PublishNamespaceDone => {
652 let track_namespace = TrackNamespace::decode(buf)?;
653 Ok(ControlMessage::PublishNamespaceDone(PublishNamespaceDone { track_namespace }))
654 }
655 MessageType::PublishNamespaceCancel => {
656 let track_namespace = TrackNamespace::decode(buf)?;
657 let error_code = VarInt::decode(buf)?;
658 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
659 let reason_phrase = read_bytes(buf, reason_len)?;
660 Ok(ControlMessage::PublishNamespaceCancel(PublishNamespaceCancel {
661 track_namespace,
662 error_code,
663 reason_phrase,
664 }))
665 }
666 MessageType::SubscribeNamespace => {
667 let request_id = VarInt::decode(buf)?;
668 let namespace_prefix = TrackNamespace::decode(buf)?;
669 let parameters = KeyValuePair::decode_list(buf)?;
670 Ok(ControlMessage::SubscribeNamespace(SubscribeNamespace {
671 request_id,
672 namespace_prefix,
673 parameters,
674 }))
675 }
676 MessageType::UnsubscribeNamespace => {
677 let request_id = VarInt::decode(buf)?;
678 Ok(ControlMessage::UnsubscribeNamespace(UnsubscribeNamespace { request_id }))
679 }
680 MessageType::TrackStatus => {
681 let request_id = VarInt::decode(buf)?;
682 let track_namespace = TrackNamespace::decode(buf)?;
683 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
684 let track_name = read_bytes(buf, track_name_len)?;
685 let parameters = KeyValuePair::decode_list(buf)?;
686 Ok(ControlMessage::TrackStatus(TrackStatus {
687 request_id,
688 track_namespace,
689 track_name,
690 parameters,
691 }))
692 }
693 MessageType::Fetch => {
694 let request_id = VarInt::decode(buf)?;
695 let fetch_type_val = VarInt::decode(buf)?.into_inner();
696 let fetch_type =
697 FetchType::from_u64(fetch_type_val).ok_or(CodecError::InvalidField)?;
698 let fetch_payload = match fetch_type {
699 FetchType::Standalone => {
700 let track_namespace = TrackNamespace::decode(buf)?;
701 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
702 let track_name = read_bytes(buf, track_name_len)?;
703 let start_group = VarInt::decode(buf)?;
704 let start_object = VarInt::decode(buf)?;
705 let end_group = VarInt::decode(buf)?;
706 let end_object = VarInt::decode(buf)?;
707 FetchPayload::Standalone {
708 track_namespace,
709 track_name,
710 start_group,
711 start_object,
712 end_group,
713 end_object,
714 }
715 }
716 FetchType::RelativeJoining | FetchType::AbsoluteJoining => {
717 let joining_request_id = VarInt::decode(buf)?;
718 let joining_start = VarInt::decode(buf)?;
719 FetchPayload::Joining { joining_request_id, joining_start }
720 }
721 };
722 let parameters = KeyValuePair::decode_list(buf)?;
723 Ok(ControlMessage::Fetch(Fetch {
724 request_id,
725 fetch_type,
726 fetch_payload,
727 parameters,
728 }))
729 }
730 MessageType::FetchOk => {
731 let request_id = VarInt::decode(buf)?;
732 let end_of_track = VarInt::decode(buf)?;
733 let end_group = VarInt::decode(buf)?;
734 let end_object = VarInt::decode(buf)?;
735 let parameters = KeyValuePair::decode_list(buf)?;
736 Ok(ControlMessage::FetchOk(FetchOk {
737 request_id,
738 end_of_track,
739 end_group,
740 end_object,
741 parameters,
742 }))
743 }
744 MessageType::FetchCancel => {
745 let request_id = VarInt::decode(buf)?;
746 Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
747 }
748 }
749 }
750
751 pub fn message_type(&self) -> MessageType {
752 match self {
753 ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
754 ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
755 ControlMessage::GoAway(_) => MessageType::GoAway,
756 ControlMessage::MaxRequestId(_) => MessageType::MaxRequestId,
757 ControlMessage::RequestsBlocked(_) => MessageType::RequestsBlocked,
758 ControlMessage::RequestOk(_) => MessageType::RequestOk,
759 ControlMessage::RequestError(_) => MessageType::RequestError,
760 ControlMessage::Subscribe(_) => MessageType::Subscribe,
761 ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
762 ControlMessage::SubscribeUpdate(_) => MessageType::SubscribeUpdate,
763 ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
764 ControlMessage::Publish(_) => MessageType::Publish,
765 ControlMessage::PublishOk(_) => MessageType::PublishOk,
766 ControlMessage::PublishDone(_) => MessageType::PublishDone,
767 ControlMessage::PublishNamespace(_) => MessageType::PublishNamespace,
768 ControlMessage::PublishNamespaceDone(_) => MessageType::PublishNamespaceDone,
769 ControlMessage::PublishNamespaceCancel(_) => MessageType::PublishNamespaceCancel,
770 ControlMessage::SubscribeNamespace(_) => MessageType::SubscribeNamespace,
771 ControlMessage::UnsubscribeNamespace(_) => MessageType::UnsubscribeNamespace,
772 ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
773 ControlMessage::Fetch(_) => MessageType::Fetch,
774 ControlMessage::FetchOk(_) => MessageType::FetchOk,
775 ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
776 }
777 }
778}