1pub use crate::error::{
13 CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_NAMESPACE_TUPLE_SIZE,
14 MAX_REASON_PHRASE_LENGTH,
15};
16use crate::kvp::KeyValuePair;
17use crate::types::*;
18use crate::varint::VarInt;
19use bytes::{Buf, BufMut};
20
21fn decode_track_extensions(buf: &mut impl Buf) -> Result<Vec<KeyValuePair>, CodecError> {
25 let mut out = Vec::new();
26 while buf.has_remaining() {
27 out.push(KeyValuePair::decode(buf)?);
28 }
29 Ok(out)
30}
31
32fn encode_track_extensions(exts: &[KeyValuePair], buf: &mut impl BufMut) {
34 for kvp in exts {
35 kvp.encode(buf);
36 }
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40#[repr(u64)]
41pub enum MessageType {
42 RequestUpdate = 0x02,
43 Subscribe = 0x03,
44 SubscribeOk = 0x04,
45 RequestError = 0x05,
46 PublishNamespace = 0x06,
47 RequestOk = 0x07,
48 Namespace = 0x08,
49 PublishNamespaceDone = 0x09,
50 Unsubscribe = 0x0A,
51 PublishDone = 0x0B,
52 PublishNamespaceCancel = 0x0C,
53 TrackStatus = 0x0D,
54 NamespaceDone = 0x0E,
55 GoAway = 0x10,
56 SubscribeNamespace = 0x11,
57 MaxRequestId = 0x15,
58 Fetch = 0x16,
59 FetchCancel = 0x17,
60 FetchOk = 0x18,
61 RequestsBlocked = 0x1A,
62 Publish = 0x1D,
63 PublishOk = 0x1E,
64 ClientSetup = 0x20,
65 ServerSetup = 0x21,
66}
67
68impl MessageType {
69 pub fn from_id(id: u64) -> Option<Self> {
70 match id {
71 0x02 => Some(MessageType::RequestUpdate),
72 0x03 => Some(MessageType::Subscribe),
73 0x04 => Some(MessageType::SubscribeOk),
74 0x05 => Some(MessageType::RequestError),
75 0x06 => Some(MessageType::PublishNamespace),
76 0x07 => Some(MessageType::RequestOk),
77 0x08 => Some(MessageType::Namespace),
78 0x09 => Some(MessageType::PublishNamespaceDone),
79 0x0A => Some(MessageType::Unsubscribe),
80 0x0B => Some(MessageType::PublishDone),
81 0x0C => Some(MessageType::PublishNamespaceCancel),
82 0x0D => Some(MessageType::TrackStatus),
83 0x0E => Some(MessageType::NamespaceDone),
84 0x10 => Some(MessageType::GoAway),
85 0x11 => Some(MessageType::SubscribeNamespace),
86 0x15 => Some(MessageType::MaxRequestId),
87 0x16 => Some(MessageType::Fetch),
88 0x17 => Some(MessageType::FetchCancel),
89 0x18 => Some(MessageType::FetchOk),
90 0x1A => Some(MessageType::RequestsBlocked),
91 0x1D => Some(MessageType::Publish),
92 0x1E => Some(MessageType::PublishOk),
93 0x20 => Some(MessageType::ClientSetup),
94 0x21 => Some(MessageType::ServerSetup),
95 _ => None,
96 }
97 }
98
99 pub fn id(&self) -> u64 {
100 *self as u64
101 }
102}
103
104#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct ClientSetup {
110 pub parameters: Vec<KeyValuePair>,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct ServerSetup {
115 pub parameters: Vec<KeyValuePair>,
116}
117
118#[derive(Debug, Clone, PartialEq, Eq)]
119pub struct GoAway {
120 pub new_session_uri: Vec<u8>,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq)]
124pub struct MaxRequestId {
125 pub request_id: VarInt,
126}
127
128#[derive(Debug, Clone, PartialEq, Eq)]
129pub struct RequestsBlocked {
130 pub maximum_request_id: VarInt,
131}
132
133#[derive(Debug, Clone, PartialEq, Eq)]
138pub struct RequestOk {
139 pub request_id: VarInt,
140 pub parameters: Vec<KeyValuePair>,
141}
142
143#[derive(Debug, Clone, PartialEq, Eq)]
145pub struct RequestError {
146 pub request_id: VarInt,
147 pub error_code: VarInt,
148 pub retry_interval: VarInt,
149 pub reason_phrase: Vec<u8>,
150}
151
152#[derive(Debug, Clone, PartialEq, Eq)]
157pub struct Subscribe {
158 pub request_id: VarInt,
159 pub track_namespace: TrackNamespace,
160 pub track_name: Vec<u8>,
161 pub parameters: Vec<KeyValuePair>,
162}
163
164#[derive(Debug, Clone, PartialEq, Eq)]
165pub struct SubscribeOk {
166 pub request_id: VarInt,
167 pub track_alias: VarInt,
168 pub parameters: Vec<KeyValuePair>,
169 pub track_extensions: Vec<KeyValuePair>,
172}
173
174#[derive(Debug, Clone, PartialEq, Eq)]
176pub struct RequestUpdate {
177 pub request_id: VarInt,
178 pub existing_request_id: VarInt,
179 pub parameters: Vec<KeyValuePair>,
180}
181
182#[derive(Debug, Clone, PartialEq, Eq)]
183pub struct Unsubscribe {
184 pub request_id: VarInt,
185}
186
187#[derive(Debug, Clone, PartialEq, Eq)]
192pub struct Publish {
193 pub request_id: VarInt,
194 pub track_namespace: TrackNamespace,
195 pub track_name: Vec<u8>,
196 pub track_alias: VarInt,
197 pub parameters: Vec<KeyValuePair>,
198 pub track_extensions: Vec<KeyValuePair>,
201}
202
203#[derive(Debug, Clone, PartialEq, Eq)]
204pub struct PublishOk {
205 pub request_id: VarInt,
206 pub parameters: Vec<KeyValuePair>,
207}
208
209#[derive(Debug, Clone, PartialEq, Eq)]
210pub struct PublishDone {
211 pub request_id: VarInt,
212 pub status_code: VarInt,
213 pub stream_count: VarInt,
214 pub reason_phrase: Vec<u8>,
215}
216
217#[derive(Debug, Clone, PartialEq, Eq)]
222pub struct PublishNamespace {
223 pub request_id: VarInt,
224 pub track_namespace: TrackNamespace,
225 pub parameters: Vec<KeyValuePair>,
226}
227
228#[derive(Debug, Clone, PartialEq, Eq)]
230pub struct PublishNamespaceDone {
231 pub request_id: VarInt,
232}
233
234#[derive(Debug, Clone, PartialEq, Eq)]
236pub struct PublishNamespaceCancel {
237 pub request_id: VarInt,
238 pub error_code: VarInt,
239 pub reason_phrase: Vec<u8>,
240}
241
242#[derive(Debug, Clone, PartialEq, Eq)]
248pub struct Namespace {
249 pub namespace_suffix: TrackNamespace,
250}
251
252#[derive(Debug, Clone, PartialEq, Eq)]
254pub struct NamespaceDone {
255 pub namespace_suffix: TrackNamespace,
256}
257
258#[derive(Debug, Clone, PartialEq, Eq)]
264pub struct SubscribeNamespace {
265 pub request_id: VarInt,
266 pub namespace_prefix: TrackNamespace,
267 pub subscribe_options: VarInt,
268 pub parameters: Vec<KeyValuePair>,
269}
270
271#[derive(Debug, Clone, PartialEq, Eq)]
276pub struct TrackStatus {
277 pub request_id: VarInt,
278 pub track_namespace: TrackNamespace,
279 pub track_name: Vec<u8>,
280 pub parameters: Vec<KeyValuePair>,
281}
282
283#[derive(Debug, Clone, Copy, PartialEq, Eq)]
288#[repr(u64)]
289pub enum FetchType {
290 Standalone = 1,
292 RelativeJoining = 2,
294 AbsoluteJoining = 3,
296}
297
298impl FetchType {
299 pub fn from_u64(v: u64) -> Option<Self> {
301 match v {
302 1 => Some(FetchType::Standalone),
303 2 => Some(FetchType::RelativeJoining),
304 3 => Some(FetchType::AbsoluteJoining),
305 _ => None,
306 }
307 }
308}
309
310#[derive(Debug, Clone, PartialEq, Eq)]
311pub struct Fetch {
312 pub request_id: VarInt,
313 pub fetch_type: FetchType,
314 pub fetch_payload: FetchPayload,
315 pub parameters: Vec<KeyValuePair>,
316}
317
318#[derive(Debug, Clone, PartialEq, Eq)]
319pub enum FetchPayload {
320 Standalone {
321 track_namespace: TrackNamespace,
322 track_name: Vec<u8>,
323 start_group: VarInt,
324 start_object: VarInt,
325 end_group: VarInt,
326 end_object: VarInt,
327 },
328 Joining {
329 joining_request_id: VarInt,
330 joining_start: VarInt,
331 },
332}
333
334#[derive(Debug, Clone, PartialEq, Eq)]
335pub struct FetchOk {
336 pub request_id: VarInt,
337 pub end_of_track: VarInt,
338 pub end_group: VarInt,
339 pub end_object: VarInt,
340 pub parameters: Vec<KeyValuePair>,
341 pub track_extensions: Vec<KeyValuePair>,
344}
345
346#[derive(Debug, Clone, PartialEq, Eq)]
347pub struct FetchCancel {
348 pub request_id: VarInt,
349}
350
351#[derive(Debug, Clone, PartialEq, Eq)]
356pub enum ControlMessage {
357 ClientSetup(ClientSetup),
358 ServerSetup(ServerSetup),
359 GoAway(GoAway),
360 MaxRequestId(MaxRequestId),
361 RequestsBlocked(RequestsBlocked),
362 RequestOk(RequestOk),
363 RequestError(RequestError),
364 Subscribe(Subscribe),
365 SubscribeOk(SubscribeOk),
366 RequestUpdate(RequestUpdate),
367 Unsubscribe(Unsubscribe),
368 Publish(Publish),
369 PublishOk(PublishOk),
370 PublishDone(PublishDone),
371 PublishNamespace(PublishNamespace),
372 PublishNamespaceDone(PublishNamespaceDone),
373 PublishNamespaceCancel(PublishNamespaceCancel),
374 Namespace(Namespace),
375 NamespaceDone(NamespaceDone),
376 SubscribeNamespace(SubscribeNamespace),
377 TrackStatus(TrackStatus),
378 Fetch(Fetch),
379 FetchOk(FetchOk),
380 FetchCancel(FetchCancel),
381}
382
383impl ControlMessage {
384 pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
385 let mut payload = Vec::with_capacity(256);
386 self.encode_payload(&mut payload)?;
387
388 if payload.len() > MAX_MESSAGE_LENGTH {
389 return Err(CodecError::MessageTooLong(payload.len()));
390 }
391
392 let msg_type = self.message_type();
393 VarInt::from_usize(msg_type.id() as usize).encode(buf);
394 buf.put_u16(payload.len() as u16);
396 buf.put_slice(&payload);
397 Ok(())
398 }
399
400 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
401 let type_id = VarInt::decode(buf)?.into_inner();
402 let msg_type =
403 MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
404 if buf.remaining() < 2 {
406 return Err(CodecError::UnexpectedEnd);
407 }
408 let payload_len = buf.get_u16() as usize;
409 if buf.remaining() < payload_len {
410 return Err(CodecError::UnexpectedEnd);
411 }
412 let payload_bytes = buf.copy_to_bytes(payload_len);
413 let mut payload = &payload_bytes[..];
414 Self::decode_payload(msg_type, &mut payload)
415 }
416
417 fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
418 match self {
419 ControlMessage::ClientSetup(m) => {
420 KeyValuePair::encode_list(&m.parameters, buf);
421 }
422 ControlMessage::ServerSetup(m) => {
423 KeyValuePair::encode_list(&m.parameters, buf);
424 }
425 ControlMessage::GoAway(m) => {
426 if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
427 return Err(CodecError::GoAwayUriTooLong);
428 }
429 VarInt::from_usize(m.new_session_uri.len()).encode(buf);
430 buf.put_slice(&m.new_session_uri);
431 }
432 ControlMessage::MaxRequestId(m) => {
433 m.request_id.encode(buf);
434 }
435 ControlMessage::RequestsBlocked(m) => {
436 m.maximum_request_id.encode(buf);
437 }
438 ControlMessage::RequestOk(m) => {
439 m.request_id.encode(buf);
440 KeyValuePair::encode_list(&m.parameters, buf);
441 }
442 ControlMessage::RequestError(m) => {
443 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
444 return Err(CodecError::ReasonPhraseTooLong);
445 }
446 m.request_id.encode(buf);
447 m.error_code.encode(buf);
448 m.retry_interval.encode(buf);
449 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
450 buf.put_slice(&m.reason_phrase);
451 }
452 ControlMessage::Subscribe(m) => {
453 m.request_id.encode(buf);
454 m.track_namespace.encode(buf);
455 VarInt::from_usize(m.track_name.len()).encode(buf);
456 buf.put_slice(&m.track_name);
457 KeyValuePair::encode_list(&m.parameters, buf);
458 }
459 ControlMessage::SubscribeOk(m) => {
460 m.request_id.encode(buf);
461 m.track_alias.encode(buf);
462 KeyValuePair::encode_list(&m.parameters, buf);
463 encode_track_extensions(&m.track_extensions, buf);
464 }
465 ControlMessage::RequestUpdate(m) => {
466 m.request_id.encode(buf);
467 m.existing_request_id.encode(buf);
468 KeyValuePair::encode_list(&m.parameters, buf);
469 }
470 ControlMessage::Unsubscribe(m) => {
471 m.request_id.encode(buf);
472 }
473 ControlMessage::Publish(m) => {
474 m.request_id.encode(buf);
475 m.track_namespace.encode(buf);
476 VarInt::from_usize(m.track_name.len()).encode(buf);
477 buf.put_slice(&m.track_name);
478 m.track_alias.encode(buf);
479 KeyValuePair::encode_list(&m.parameters, buf);
480 encode_track_extensions(&m.track_extensions, buf);
481 }
482 ControlMessage::PublishOk(m) => {
483 m.request_id.encode(buf);
484 KeyValuePair::encode_list(&m.parameters, buf);
485 }
486 ControlMessage::PublishDone(m) => {
487 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
488 return Err(CodecError::ReasonPhraseTooLong);
489 }
490 m.request_id.encode(buf);
491 m.status_code.encode(buf);
492 m.stream_count.encode(buf);
493 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
494 buf.put_slice(&m.reason_phrase);
495 }
496 ControlMessage::PublishNamespace(m) => {
497 m.request_id.encode(buf);
498 m.track_namespace.encode(buf);
499 KeyValuePair::encode_list(&m.parameters, buf);
500 }
501 ControlMessage::PublishNamespaceDone(m) => {
502 m.request_id.encode(buf);
503 }
504 ControlMessage::PublishNamespaceCancel(m) => {
505 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
506 return Err(CodecError::ReasonPhraseTooLong);
507 }
508 m.request_id.encode(buf);
509 m.error_code.encode(buf);
510 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
511 buf.put_slice(&m.reason_phrase);
512 }
513 ControlMessage::Namespace(m) => {
514 m.namespace_suffix.encode(buf);
515 }
516 ControlMessage::NamespaceDone(m) => {
517 m.namespace_suffix.encode(buf);
518 }
519 ControlMessage::SubscribeNamespace(m) => {
520 m.request_id.encode(buf);
521 m.namespace_prefix.encode(buf);
522 m.subscribe_options.encode(buf);
523 KeyValuePair::encode_list(&m.parameters, buf);
524 }
525 ControlMessage::TrackStatus(m) => {
526 m.request_id.encode(buf);
527 m.track_namespace.encode(buf);
528 VarInt::from_usize(m.track_name.len()).encode(buf);
529 buf.put_slice(&m.track_name);
530 KeyValuePair::encode_list(&m.parameters, buf);
531 }
532 ControlMessage::Fetch(m) => {
533 m.request_id.encode(buf);
534 VarInt::from_usize(m.fetch_type as usize).encode(buf);
535 match &m.fetch_payload {
536 FetchPayload::Standalone {
537 track_namespace,
538 track_name,
539 start_group,
540 start_object,
541 end_group,
542 end_object,
543 } => {
544 track_namespace.encode(buf);
545 VarInt::from_usize(track_name.len()).encode(buf);
546 buf.put_slice(track_name);
547 start_group.encode(buf);
548 start_object.encode(buf);
549 end_group.encode(buf);
550 end_object.encode(buf);
551 }
552 FetchPayload::Joining { joining_request_id, joining_start } => {
553 joining_request_id.encode(buf);
554 joining_start.encode(buf);
555 }
556 }
557 KeyValuePair::encode_list(&m.parameters, buf);
558 }
559 ControlMessage::FetchOk(m) => {
560 m.request_id.encode(buf);
561 m.end_of_track.encode(buf);
562 m.end_group.encode(buf);
563 m.end_object.encode(buf);
564 KeyValuePair::encode_list(&m.parameters, buf);
565 encode_track_extensions(&m.track_extensions, buf);
566 }
567 ControlMessage::FetchCancel(m) => {
568 m.request_id.encode(buf);
569 }
570 }
571 Ok(())
572 }
573
574 fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
575 match msg_type {
576 MessageType::ClientSetup => {
577 let parameters = KeyValuePair::decode_list(buf)?;
578 Ok(ControlMessage::ClientSetup(ClientSetup { parameters }))
579 }
580 MessageType::ServerSetup => {
581 let parameters = KeyValuePair::decode_list(buf)?;
582 Ok(ControlMessage::ServerSetup(ServerSetup { parameters }))
583 }
584 MessageType::GoAway => {
585 let uri_len = VarInt::decode(buf)?.into_inner() as usize;
586 let uri = read_bytes(buf, uri_len)?;
587 Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
588 }
589 MessageType::MaxRequestId => {
590 let request_id = VarInt::decode(buf)?;
591 Ok(ControlMessage::MaxRequestId(MaxRequestId { request_id }))
592 }
593 MessageType::RequestsBlocked => {
594 let maximum_request_id = VarInt::decode(buf)?;
595 Ok(ControlMessage::RequestsBlocked(RequestsBlocked { maximum_request_id }))
596 }
597 MessageType::RequestOk => {
598 let request_id = VarInt::decode(buf)?;
599 let parameters = KeyValuePair::decode_list(buf)?;
600 Ok(ControlMessage::RequestOk(RequestOk { request_id, parameters }))
601 }
602 MessageType::RequestError => {
603 let request_id = VarInt::decode(buf)?;
604 let error_code = VarInt::decode(buf)?;
605 let retry_interval = VarInt::decode(buf)?;
606 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
607 let reason_phrase = read_bytes(buf, reason_len)?;
608 Ok(ControlMessage::RequestError(RequestError {
609 request_id,
610 error_code,
611 retry_interval,
612 reason_phrase,
613 }))
614 }
615 MessageType::Subscribe => {
616 let request_id = VarInt::decode(buf)?;
617 let track_namespace = TrackNamespace::decode(buf)?;
618 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
619 let track_name = read_bytes(buf, track_name_len)?;
620 let parameters = KeyValuePair::decode_list(buf)?;
621 Ok(ControlMessage::Subscribe(Subscribe {
622 request_id,
623 track_namespace,
624 track_name,
625 parameters,
626 }))
627 }
628 MessageType::SubscribeOk => {
629 let request_id = VarInt::decode(buf)?;
630 let track_alias = VarInt::decode(buf)?;
631 let parameters = KeyValuePair::decode_list(buf)?;
632 let track_extensions = decode_track_extensions(buf)?;
633 Ok(ControlMessage::SubscribeOk(SubscribeOk {
634 request_id,
635 track_alias,
636 parameters,
637 track_extensions,
638 }))
639 }
640 MessageType::RequestUpdate => {
641 let request_id = VarInt::decode(buf)?;
642 let existing_request_id = VarInt::decode(buf)?;
643 let parameters = KeyValuePair::decode_list(buf)?;
644 Ok(ControlMessage::RequestUpdate(RequestUpdate {
645 request_id,
646 existing_request_id,
647 parameters,
648 }))
649 }
650 MessageType::Unsubscribe => {
651 let request_id = VarInt::decode(buf)?;
652 Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
653 }
654 MessageType::Publish => {
655 let request_id = VarInt::decode(buf)?;
656 let track_namespace = TrackNamespace::decode(buf)?;
657 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
658 let track_name = read_bytes(buf, track_name_len)?;
659 let track_alias = VarInt::decode(buf)?;
660 let parameters = KeyValuePair::decode_list(buf)?;
661 let track_extensions = decode_track_extensions(buf)?;
662 Ok(ControlMessage::Publish(Publish {
663 request_id,
664 track_namespace,
665 track_name,
666 track_alias,
667 parameters,
668 track_extensions,
669 }))
670 }
671 MessageType::PublishOk => {
672 let request_id = VarInt::decode(buf)?;
673 let parameters = KeyValuePair::decode_list(buf)?;
674 Ok(ControlMessage::PublishOk(PublishOk { request_id, parameters }))
675 }
676 MessageType::PublishDone => {
677 let request_id = VarInt::decode(buf)?;
678 let status_code = VarInt::decode(buf)?;
679 let stream_count = VarInt::decode(buf)?;
680 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
681 let reason_phrase = read_bytes(buf, reason_len)?;
682 Ok(ControlMessage::PublishDone(PublishDone {
683 request_id,
684 status_code,
685 stream_count,
686 reason_phrase,
687 }))
688 }
689 MessageType::PublishNamespace => {
690 let request_id = VarInt::decode(buf)?;
691 let track_namespace = TrackNamespace::decode(buf)?;
692 let parameters = KeyValuePair::decode_list(buf)?;
693 Ok(ControlMessage::PublishNamespace(PublishNamespace {
694 request_id,
695 track_namespace,
696 parameters,
697 }))
698 }
699 MessageType::PublishNamespaceDone => {
700 let request_id = VarInt::decode(buf)?;
701 Ok(ControlMessage::PublishNamespaceDone(PublishNamespaceDone { request_id }))
702 }
703 MessageType::PublishNamespaceCancel => {
704 let request_id = VarInt::decode(buf)?;
705 let error_code = VarInt::decode(buf)?;
706 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
707 let reason_phrase = read_bytes(buf, reason_len)?;
708 Ok(ControlMessage::PublishNamespaceCancel(PublishNamespaceCancel {
709 request_id,
710 error_code,
711 reason_phrase,
712 }))
713 }
714 MessageType::Namespace => {
715 let namespace_suffix = TrackNamespace::decode_allow_empty(buf)?;
716 Ok(ControlMessage::Namespace(Namespace { namespace_suffix }))
717 }
718 MessageType::NamespaceDone => {
719 let namespace_suffix = TrackNamespace::decode_allow_empty(buf)?;
720 Ok(ControlMessage::NamespaceDone(NamespaceDone { namespace_suffix }))
721 }
722 MessageType::SubscribeNamespace => {
723 let request_id = VarInt::decode(buf)?;
724 let namespace_prefix = TrackNamespace::decode(buf)?;
725 let subscribe_options = VarInt::decode(buf)?;
726 let parameters = KeyValuePair::decode_list(buf)?;
727 Ok(ControlMessage::SubscribeNamespace(SubscribeNamespace {
728 request_id,
729 namespace_prefix,
730 subscribe_options,
731 parameters,
732 }))
733 }
734 MessageType::TrackStatus => {
735 let request_id = VarInt::decode(buf)?;
736 let track_namespace = TrackNamespace::decode(buf)?;
737 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
738 let track_name = read_bytes(buf, track_name_len)?;
739 let parameters = KeyValuePair::decode_list(buf)?;
740 Ok(ControlMessage::TrackStatus(TrackStatus {
741 request_id,
742 track_namespace,
743 track_name,
744 parameters,
745 }))
746 }
747 MessageType::Fetch => {
748 let request_id = VarInt::decode(buf)?;
749 let fetch_type_val = VarInt::decode(buf)?.into_inner();
750 let fetch_type =
751 FetchType::from_u64(fetch_type_val).ok_or(CodecError::InvalidField)?;
752 let fetch_payload = match fetch_type {
753 FetchType::Standalone => {
754 let track_namespace = TrackNamespace::decode(buf)?;
755 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
756 let track_name = read_bytes(buf, track_name_len)?;
757 let start_group = VarInt::decode(buf)?;
758 let start_object = VarInt::decode(buf)?;
759 let end_group = VarInt::decode(buf)?;
760 let end_object = VarInt::decode(buf)?;
761 FetchPayload::Standalone {
762 track_namespace,
763 track_name,
764 start_group,
765 start_object,
766 end_group,
767 end_object,
768 }
769 }
770 FetchType::RelativeJoining | FetchType::AbsoluteJoining => {
771 let joining_request_id = VarInt::decode(buf)?;
772 let joining_start = VarInt::decode(buf)?;
773 FetchPayload::Joining { joining_request_id, joining_start }
774 }
775 };
776 let parameters = KeyValuePair::decode_list(buf)?;
777 Ok(ControlMessage::Fetch(Fetch {
778 request_id,
779 fetch_type,
780 fetch_payload,
781 parameters,
782 }))
783 }
784 MessageType::FetchOk => {
785 let request_id = VarInt::decode(buf)?;
786 let end_of_track = VarInt::decode(buf)?;
787 let end_group = VarInt::decode(buf)?;
788 let end_object = VarInt::decode(buf)?;
789 let parameters = KeyValuePair::decode_list(buf)?;
790 let track_extensions = decode_track_extensions(buf)?;
791 Ok(ControlMessage::FetchOk(FetchOk {
792 request_id,
793 end_of_track,
794 end_group,
795 end_object,
796 parameters,
797 track_extensions,
798 }))
799 }
800 MessageType::FetchCancel => {
801 let request_id = VarInt::decode(buf)?;
802 Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
803 }
804 }
805 }
806
807 pub fn message_type(&self) -> MessageType {
808 match self {
809 ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
810 ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
811 ControlMessage::GoAway(_) => MessageType::GoAway,
812 ControlMessage::MaxRequestId(_) => MessageType::MaxRequestId,
813 ControlMessage::RequestsBlocked(_) => MessageType::RequestsBlocked,
814 ControlMessage::RequestOk(_) => MessageType::RequestOk,
815 ControlMessage::RequestError(_) => MessageType::RequestError,
816 ControlMessage::Subscribe(_) => MessageType::Subscribe,
817 ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
818 ControlMessage::RequestUpdate(_) => MessageType::RequestUpdate,
819 ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
820 ControlMessage::Publish(_) => MessageType::Publish,
821 ControlMessage::PublishOk(_) => MessageType::PublishOk,
822 ControlMessage::PublishDone(_) => MessageType::PublishDone,
823 ControlMessage::PublishNamespace(_) => MessageType::PublishNamespace,
824 ControlMessage::PublishNamespaceDone(_) => MessageType::PublishNamespaceDone,
825 ControlMessage::PublishNamespaceCancel(_) => MessageType::PublishNamespaceCancel,
826 ControlMessage::Namespace(_) => MessageType::Namespace,
827 ControlMessage::NamespaceDone(_) => MessageType::NamespaceDone,
828 ControlMessage::SubscribeNamespace(_) => MessageType::SubscribeNamespace,
829 ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
830 ControlMessage::Fetch(_) => MessageType::Fetch,
831 ControlMessage::FetchOk(_) => MessageType::FetchOk,
832 ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
833 }
834 }
835}