1pub use crate::error::{
15 CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_NAMESPACE_TUPLE_SIZE,
16 MAX_REASON_PHRASE_LENGTH,
17};
18use crate::kvp::{KeyValuePair, KvpValue};
19use crate::types::*;
20use crate::varint::VarInt;
21use bytes::{Buf, BufMut};
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29enum ParamEncoding {
30 Varint,
32 Uint8,
34 Location,
36 LengthPrefixed,
38}
39
40fn param_encoding(key: u64) -> Option<ParamEncoding> {
41 match key {
42 0x02 | 0x04 | 0x08 | 0x32 => Some(ParamEncoding::Varint),
43 0x10 | 0x20 | 0x22 => Some(ParamEncoding::Uint8),
44 0x09 => Some(ParamEncoding::Location),
45 0x03 | 0x21 => Some(ParamEncoding::LengthPrefixed),
46 _ => None,
47 }
48}
49
50fn decode_parameters(buf: &mut impl Buf) -> Result<Vec<KeyValuePair>, CodecError> {
52 let count = VarInt::decode(buf)?.into_inner() as usize;
53 let mut params = Vec::with_capacity(count);
54 let mut prev_key: u64 = 0;
55
56 for _ in 0..count {
57 let delta = VarInt::decode(buf)?.into_inner();
58 let abs_key = prev_key + delta;
59 prev_key = abs_key;
60
61 let encoding = param_encoding(abs_key).ok_or(CodecError::InvalidField)?;
62
63 let value = match encoding {
64 ParamEncoding::Varint => {
65 let v = VarInt::decode(buf)?;
66 KvpValue::Varint(v)
67 }
68 ParamEncoding::Uint8 => {
69 if buf.remaining() < 1 {
70 return Err(CodecError::UnexpectedEnd);
71 }
72 let byte = buf.get_u8();
73 KvpValue::Varint(VarInt::from_u64(byte as u64).unwrap())
74 }
75 ParamEncoding::Location => {
76 let group = VarInt::decode(buf)?;
77 let object = VarInt::decode(buf)?;
78 let mut encoded = Vec::new();
79 group.encode(&mut encoded);
80 object.encode(&mut encoded);
81 KvpValue::Bytes(encoded)
82 }
83 ParamEncoding::LengthPrefixed => {
84 let len = VarInt::decode(buf)?.into_inner() as usize;
85 let data = read_bytes(buf, len)?;
86 KvpValue::Bytes(data)
87 }
88 };
89
90 params.push(KeyValuePair { key: VarInt::from_u64(abs_key).unwrap(), value });
91 }
92 Ok(params)
93}
94
95fn encode_parameters(params: &[KeyValuePair], buf: &mut impl BufMut) {
97 VarInt::from_usize(params.len()).encode(buf);
98 let mut prev_key: u64 = 0;
99
100 for p in params {
101 let abs_key = p.key.into_inner();
102 let delta = abs_key - prev_key;
103 prev_key = abs_key;
104 VarInt::from_u64(delta).unwrap().encode(buf);
105
106 let encoding = param_encoding(abs_key);
107 match (&p.value, encoding) {
108 (KvpValue::Varint(v), Some(ParamEncoding::Varint)) => {
109 v.encode(buf);
110 }
111 (KvpValue::Varint(v), Some(ParamEncoding::Uint8)) => {
112 buf.put_u8(v.into_inner() as u8);
113 }
114 (KvpValue::Bytes(b), Some(ParamEncoding::Location)) => {
115 buf.put_slice(b);
116 }
117 (KvpValue::Bytes(b), Some(ParamEncoding::LengthPrefixed)) => {
118 VarInt::from_usize(b.len()).encode(buf);
119 buf.put_slice(b);
120 }
121 _ => {
122 match &p.value {
124 KvpValue::Varint(v) => v.encode(buf),
125 KvpValue::Bytes(b) => {
126 VarInt::from_usize(b.len()).encode(buf);
127 buf.put_slice(b);
128 }
129 }
130 }
131 }
132 }
133}
134
135fn decode_kvp_delta(buf: &mut impl Buf) -> Result<Vec<KeyValuePair>, CodecError> {
138 let mut pairs = Vec::new();
139 let mut prev_key: u64 = 0;
140
141 while buf.has_remaining() {
142 let delta = VarInt::decode(buf)?.into_inner();
143 let abs_key = prev_key + delta;
144 prev_key = abs_key;
145
146 let value = if abs_key % 2 == 0 {
147 let v = VarInt::decode(buf)?;
148 KvpValue::Varint(v)
149 } else {
150 let len = VarInt::decode(buf)?.into_inner() as usize;
151 let data = read_bytes(buf, len)?;
152 KvpValue::Bytes(data)
153 };
154
155 pairs.push(KeyValuePair { key: VarInt::from_u64(abs_key).unwrap(), value });
156 }
157 Ok(pairs)
158}
159
160fn encode_kvp_delta(pairs: &[KeyValuePair], buf: &mut impl BufMut) {
162 let mut prev_key: u64 = 0;
163 for p in pairs {
164 let abs_key = p.key.into_inner();
165 let delta = abs_key - prev_key;
166 prev_key = abs_key;
167 VarInt::from_u64(delta).unwrap().encode(buf);
168 match &p.value {
169 KvpValue::Varint(v) => v.encode(buf),
170 KvpValue::Bytes(b) => {
171 VarInt::from_usize(b.len()).encode(buf);
172 buf.put_slice(b);
173 }
174 }
175 }
176}
177
178#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183#[repr(u64)]
184pub enum MessageType {
185 RequestUpdate = 0x02,
186 Subscribe = 0x03,
187 SubscribeOk = 0x04,
188 RequestError = 0x05,
189 PublishNamespace = 0x06,
190 RequestOk = 0x07,
191 Namespace = 0x08,
192 PublishDone = 0x0B,
193 TrackStatus = 0x0D,
194 NamespaceDone = 0x0E,
195 PublishBlocked = 0x0F,
196 GoAway = 0x10,
197 SubscribeNamespace = 0x11,
198 Fetch = 0x16,
199 FetchOk = 0x18,
200 Publish = 0x1D,
201 PublishOk = 0x1E,
202 Setup = 0x2F00,
203}
204
205impl MessageType {
206 pub fn from_id(id: u64) -> Option<Self> {
207 match id {
208 0x02 => Some(MessageType::RequestUpdate),
209 0x03 => Some(MessageType::Subscribe),
210 0x04 => Some(MessageType::SubscribeOk),
211 0x05 => Some(MessageType::RequestError),
212 0x06 => Some(MessageType::PublishNamespace),
213 0x07 => Some(MessageType::RequestOk),
214 0x08 => Some(MessageType::Namespace),
215 0x0B => Some(MessageType::PublishDone),
216 0x0D => Some(MessageType::TrackStatus),
217 0x0E => Some(MessageType::NamespaceDone),
218 0x0F => Some(MessageType::PublishBlocked),
219 0x10 => Some(MessageType::GoAway),
220 0x11 => Some(MessageType::SubscribeNamespace),
221 0x16 => Some(MessageType::Fetch),
222 0x18 => Some(MessageType::FetchOk),
223 0x1D => Some(MessageType::Publish),
224 0x1E => Some(MessageType::PublishOk),
225 0x2F00 => Some(MessageType::Setup),
226 _ => None,
227 }
228 }
229
230 pub fn id(&self) -> u64 {
231 *self as u64
232 }
233}
234
235#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct Setup {
242 pub options: Vec<KeyValuePair>,
243}
244
245#[derive(Debug, Clone, PartialEq, Eq)]
246pub struct GoAway {
247 pub new_session_uri: Vec<u8>,
248 pub timeout: VarInt,
249}
250
251#[derive(Debug, Clone, PartialEq, Eq)]
257pub struct RequestOk {
258 pub parameters: Vec<KeyValuePair>,
259}
260
261#[derive(Debug, Clone, PartialEq, Eq)]
263pub struct RequestError {
264 pub error_code: VarInt,
265 pub retry_interval: VarInt,
266 pub reason_phrase: Vec<u8>,
267}
268
269#[derive(Debug, Clone, PartialEq, Eq)]
274pub struct Subscribe {
275 pub request_id: VarInt,
276 pub required_request_id_delta: VarInt,
277 pub track_namespace: TrackNamespace,
278 pub track_name: Vec<u8>,
279 pub parameters: Vec<KeyValuePair>,
280}
281
282#[derive(Debug, Clone, PartialEq, Eq)]
284pub struct SubscribeOk {
285 pub track_alias: VarInt,
286 pub parameters: Vec<KeyValuePair>,
287 pub track_properties: Vec<KeyValuePair>,
288}
289
290#[derive(Debug, Clone, PartialEq, Eq)]
291pub struct RequestUpdate {
292 pub request_id: VarInt,
293 pub required_request_id_delta: VarInt,
294 pub parameters: Vec<KeyValuePair>,
295}
296
297#[derive(Debug, Clone, PartialEq, Eq)]
302pub struct Publish {
303 pub request_id: VarInt,
304 pub required_request_id_delta: VarInt,
305 pub track_namespace: TrackNamespace,
306 pub track_name: Vec<u8>,
307 pub track_alias: VarInt,
308 pub parameters: Vec<KeyValuePair>,
309 pub track_properties: Vec<KeyValuePair>,
310}
311
312#[derive(Debug, Clone, PartialEq, Eq)]
314pub struct PublishOk {
315 pub parameters: Vec<KeyValuePair>,
316}
317
318#[derive(Debug, Clone, PartialEq, Eq)]
320pub struct PublishDone {
321 pub status_code: VarInt,
322 pub stream_count: VarInt,
323 pub reason_phrase: Vec<u8>,
324}
325
326#[derive(Debug, Clone, PartialEq, Eq)]
331pub struct PublishNamespace {
332 pub request_id: VarInt,
333 pub required_request_id_delta: VarInt,
334 pub track_namespace: TrackNamespace,
335 pub parameters: Vec<KeyValuePair>,
336}
337
338#[derive(Debug, Clone, PartialEq, Eq)]
343pub struct Namespace {
344 pub namespace_suffix: TrackNamespace,
345}
346
347#[derive(Debug, Clone, PartialEq, Eq)]
348pub struct NamespaceDone {
349 pub namespace_suffix: TrackNamespace,
350}
351
352#[derive(Debug, Clone, PartialEq, Eq)]
357pub struct SubscribeNamespace {
358 pub request_id: VarInt,
359 pub required_request_id_delta: VarInt,
360 pub namespace_prefix: TrackNamespace,
361 pub subscribe_options: VarInt,
362 pub parameters: Vec<KeyValuePair>,
363}
364
365#[derive(Debug, Clone, PartialEq, Eq)]
370pub struct TrackStatus {
371 pub request_id: VarInt,
372 pub required_request_id_delta: VarInt,
373 pub track_namespace: TrackNamespace,
374 pub track_name: Vec<u8>,
375 pub parameters: Vec<KeyValuePair>,
376}
377
378#[derive(Debug, Clone, Copy, PartialEq, Eq)]
383#[repr(u64)]
384pub enum FetchType {
385 Standalone = 1,
386 RelativeJoining = 2,
387 AbsoluteJoining = 3,
388}
389
390impl FetchType {
391 pub fn from_u64(v: u64) -> Option<Self> {
392 match v {
393 1 => Some(FetchType::Standalone),
394 2 => Some(FetchType::RelativeJoining),
395 3 => Some(FetchType::AbsoluteJoining),
396 _ => None,
397 }
398 }
399}
400
401#[derive(Debug, Clone, PartialEq, Eq)]
402pub struct Fetch {
403 pub request_id: VarInt,
404 pub required_request_id_delta: VarInt,
405 pub fetch_type: FetchType,
406 pub fetch_payload: FetchPayload,
407 pub parameters: Vec<KeyValuePair>,
408}
409
410#[derive(Debug, Clone, PartialEq, Eq)]
411pub enum FetchPayload {
412 Standalone {
413 track_namespace: TrackNamespace,
414 track_name: Vec<u8>,
415 start_group: VarInt,
416 start_object: VarInt,
417 end_group: VarInt,
418 end_object: VarInt,
419 },
420 Joining {
421 joining_request_id: VarInt,
422 joining_start: VarInt,
423 },
424}
425
426#[derive(Debug, Clone, PartialEq, Eq)]
428pub struct FetchOk {
429 pub end_of_track: u8,
430 pub end_group: VarInt,
431 pub end_object: VarInt,
432 pub parameters: Vec<KeyValuePair>,
433 pub track_properties: Vec<KeyValuePair>,
434}
435
436#[derive(Debug, Clone, PartialEq, Eq)]
441pub struct PublishBlocked {
442 pub namespace_suffix: TrackNamespace,
443 pub track_name: Vec<u8>,
444}
445
446#[derive(Debug, Clone, PartialEq, Eq)]
451pub enum ControlMessage {
452 Setup(Setup),
453 GoAway(GoAway),
454 RequestOk(RequestOk),
455 RequestError(RequestError),
456 Subscribe(Subscribe),
457 SubscribeOk(SubscribeOk),
458 RequestUpdate(RequestUpdate),
459 Publish(Publish),
460 PublishOk(PublishOk),
461 PublishDone(PublishDone),
462 PublishNamespace(PublishNamespace),
463 Namespace(Namespace),
464 NamespaceDone(NamespaceDone),
465 SubscribeNamespace(SubscribeNamespace),
466 TrackStatus(TrackStatus),
467 Fetch(Fetch),
468 FetchOk(FetchOk),
469 PublishBlocked(PublishBlocked),
470}
471
472impl ControlMessage {
473 pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
474 let mut payload = Vec::with_capacity(256);
475 self.encode_payload(&mut payload)?;
476
477 if payload.len() > MAX_MESSAGE_LENGTH {
478 return Err(CodecError::MessageTooLong(payload.len()));
479 }
480
481 let msg_type = self.message_type();
482 VarInt::from_usize(msg_type.id() as usize).encode(buf);
483 buf.put_u16(payload.len() as u16);
485 buf.put_slice(&payload);
486 Ok(())
487 }
488
489 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
490 let type_id = VarInt::decode(buf)?.into_inner();
491 let msg_type =
492 MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
493 if buf.remaining() < 2 {
495 return Err(CodecError::UnexpectedEnd);
496 }
497 let payload_len = buf.get_u16() as usize;
498 if buf.remaining() < payload_len {
499 return Err(CodecError::UnexpectedEnd);
500 }
501 let payload_bytes = buf.copy_to_bytes(payload_len);
502 let mut payload = &payload_bytes[..];
503 Self::decode_payload(msg_type, &mut payload)
504 }
505
506 fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
507 match self {
508 ControlMessage::Setup(m) => {
509 encode_kvp_delta(&m.options, buf);
510 }
511 ControlMessage::GoAway(m) => {
512 if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
513 return Err(CodecError::GoAwayUriTooLong);
514 }
515 VarInt::from_usize(m.new_session_uri.len()).encode(buf);
516 buf.put_slice(&m.new_session_uri);
517 m.timeout.encode(buf);
518 }
519 ControlMessage::RequestOk(m) => {
520 encode_parameters(&m.parameters, buf);
521 }
522 ControlMessage::RequestError(m) => {
523 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
524 return Err(CodecError::ReasonPhraseTooLong);
525 }
526 m.error_code.encode(buf);
527 m.retry_interval.encode(buf);
528 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
529 buf.put_slice(&m.reason_phrase);
530 }
531 ControlMessage::Subscribe(m) => {
532 m.request_id.encode(buf);
533 m.required_request_id_delta.encode(buf);
534 m.track_namespace.encode(buf);
535 VarInt::from_usize(m.track_name.len()).encode(buf);
536 buf.put_slice(&m.track_name);
537 encode_parameters(&m.parameters, buf);
538 }
539 ControlMessage::SubscribeOk(m) => {
540 m.track_alias.encode(buf);
541 encode_parameters(&m.parameters, buf);
542 encode_kvp_delta(&m.track_properties, buf);
543 }
544 ControlMessage::RequestUpdate(m) => {
545 m.request_id.encode(buf);
546 m.required_request_id_delta.encode(buf);
547 encode_parameters(&m.parameters, buf);
548 }
549 ControlMessage::Publish(m) => {
550 m.request_id.encode(buf);
551 m.required_request_id_delta.encode(buf);
552 m.track_namespace.encode(buf);
553 VarInt::from_usize(m.track_name.len()).encode(buf);
554 buf.put_slice(&m.track_name);
555 m.track_alias.encode(buf);
556 encode_parameters(&m.parameters, buf);
557 encode_kvp_delta(&m.track_properties, buf);
558 }
559 ControlMessage::PublishOk(m) => {
560 encode_parameters(&m.parameters, buf);
561 }
562 ControlMessage::PublishDone(m) => {
563 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
564 return Err(CodecError::ReasonPhraseTooLong);
565 }
566 m.status_code.encode(buf);
567 m.stream_count.encode(buf);
568 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
569 buf.put_slice(&m.reason_phrase);
570 }
571 ControlMessage::PublishNamespace(m) => {
572 m.request_id.encode(buf);
573 m.required_request_id_delta.encode(buf);
574 m.track_namespace.encode(buf);
575 encode_parameters(&m.parameters, buf);
576 }
577 ControlMessage::Namespace(m) => {
578 m.namespace_suffix.encode(buf);
579 }
580 ControlMessage::NamespaceDone(m) => {
581 m.namespace_suffix.encode(buf);
582 }
583 ControlMessage::SubscribeNamespace(m) => {
584 m.request_id.encode(buf);
585 m.required_request_id_delta.encode(buf);
586 m.namespace_prefix.encode(buf);
587 m.subscribe_options.encode(buf);
588 encode_parameters(&m.parameters, buf);
589 }
590 ControlMessage::TrackStatus(m) => {
591 m.request_id.encode(buf);
592 m.required_request_id_delta.encode(buf);
593 m.track_namespace.encode(buf);
594 VarInt::from_usize(m.track_name.len()).encode(buf);
595 buf.put_slice(&m.track_name);
596 encode_parameters(&m.parameters, buf);
597 }
598 ControlMessage::Fetch(m) => {
599 m.request_id.encode(buf);
600 m.required_request_id_delta.encode(buf);
601 VarInt::from_usize(m.fetch_type as usize).encode(buf);
602 match &m.fetch_payload {
603 FetchPayload::Standalone {
604 track_namespace,
605 track_name,
606 start_group,
607 start_object,
608 end_group,
609 end_object,
610 } => {
611 track_namespace.encode(buf);
612 VarInt::from_usize(track_name.len()).encode(buf);
613 buf.put_slice(track_name);
614 start_group.encode(buf);
615 start_object.encode(buf);
616 end_group.encode(buf);
617 end_object.encode(buf);
618 }
619 FetchPayload::Joining { joining_request_id, joining_start } => {
620 joining_request_id.encode(buf);
621 joining_start.encode(buf);
622 }
623 }
624 encode_parameters(&m.parameters, buf);
625 }
626 ControlMessage::FetchOk(m) => {
627 buf.put_u8(m.end_of_track);
628 m.end_group.encode(buf);
629 m.end_object.encode(buf);
630 encode_parameters(&m.parameters, buf);
631 encode_kvp_delta(&m.track_properties, buf);
632 }
633 ControlMessage::PublishBlocked(m) => {
634 m.namespace_suffix.encode(buf);
635 VarInt::from_usize(m.track_name.len()).encode(buf);
636 buf.put_slice(&m.track_name);
637 }
638 }
639 Ok(())
640 }
641
642 fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
643 match msg_type {
644 MessageType::Setup => {
645 let options = decode_kvp_delta(buf)?;
646 Ok(ControlMessage::Setup(Setup { options }))
647 }
648 MessageType::GoAway => {
649 let uri_len = VarInt::decode(buf)?.into_inner() as usize;
650 let uri = read_bytes(buf, uri_len)?;
651 let timeout = VarInt::decode(buf)?;
652 Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri, timeout }))
653 }
654 MessageType::RequestOk => {
655 let parameters = decode_parameters(buf)?;
656 Ok(ControlMessage::RequestOk(RequestOk { parameters }))
657 }
658 MessageType::RequestError => {
659 let error_code = VarInt::decode(buf)?;
660 let retry_interval = VarInt::decode(buf)?;
661 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
662 let reason_phrase = read_bytes(buf, reason_len)?;
663 Ok(ControlMessage::RequestError(RequestError {
664 error_code,
665 retry_interval,
666 reason_phrase,
667 }))
668 }
669 MessageType::Subscribe => {
670 let request_id = VarInt::decode(buf)?;
671 let required_request_id_delta = VarInt::decode(buf)?;
672 let track_namespace = TrackNamespace::decode(buf)?;
673 let tn_len = VarInt::decode(buf)?.into_inner() as usize;
674 let track_name = read_bytes(buf, tn_len)?;
675 let parameters = decode_parameters(buf)?;
676 Ok(ControlMessage::Subscribe(Subscribe {
677 request_id,
678 required_request_id_delta,
679 track_namespace,
680 track_name,
681 parameters,
682 }))
683 }
684 MessageType::SubscribeOk => {
685 let track_alias = VarInt::decode(buf)?;
686 let parameters = decode_parameters(buf)?;
687 let track_properties = decode_kvp_delta(buf)?;
688 Ok(ControlMessage::SubscribeOk(SubscribeOk {
689 track_alias,
690 parameters,
691 track_properties,
692 }))
693 }
694 MessageType::RequestUpdate => {
695 let request_id = VarInt::decode(buf)?;
696 let required_request_id_delta = VarInt::decode(buf)?;
697 let parameters = decode_parameters(buf)?;
698 Ok(ControlMessage::RequestUpdate(RequestUpdate {
699 request_id,
700 required_request_id_delta,
701 parameters,
702 }))
703 }
704 MessageType::Publish => {
705 let request_id = VarInt::decode(buf)?;
706 let required_request_id_delta = VarInt::decode(buf)?;
707 let track_namespace = TrackNamespace::decode(buf)?;
708 let tn_len = VarInt::decode(buf)?.into_inner() as usize;
709 let track_name = read_bytes(buf, tn_len)?;
710 let track_alias = VarInt::decode(buf)?;
711 let parameters = decode_parameters(buf)?;
712 let track_properties = decode_kvp_delta(buf)?;
713 Ok(ControlMessage::Publish(Publish {
714 request_id,
715 required_request_id_delta,
716 track_namespace,
717 track_name,
718 track_alias,
719 parameters,
720 track_properties,
721 }))
722 }
723 MessageType::PublishOk => {
724 let parameters = decode_parameters(buf)?;
725 Ok(ControlMessage::PublishOk(PublishOk { parameters }))
726 }
727 MessageType::PublishDone => {
728 let status_code = VarInt::decode(buf)?;
729 let stream_count = VarInt::decode(buf)?;
730 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
731 let reason_phrase = read_bytes(buf, reason_len)?;
732 Ok(ControlMessage::PublishDone(PublishDone {
733 status_code,
734 stream_count,
735 reason_phrase,
736 }))
737 }
738 MessageType::PublishNamespace => {
739 let request_id = VarInt::decode(buf)?;
740 let required_request_id_delta = VarInt::decode(buf)?;
741 let track_namespace = TrackNamespace::decode(buf)?;
742 let parameters = decode_parameters(buf)?;
743 Ok(ControlMessage::PublishNamespace(PublishNamespace {
744 request_id,
745 required_request_id_delta,
746 track_namespace,
747 parameters,
748 }))
749 }
750 MessageType::Namespace => {
751 let namespace_suffix = TrackNamespace::decode_allow_empty(buf)?;
752 Ok(ControlMessage::Namespace(Namespace { namespace_suffix }))
753 }
754 MessageType::NamespaceDone => {
755 let namespace_suffix = TrackNamespace::decode_allow_empty(buf)?;
756 Ok(ControlMessage::NamespaceDone(NamespaceDone { namespace_suffix }))
757 }
758 MessageType::SubscribeNamespace => {
759 let request_id = VarInt::decode(buf)?;
760 let required_request_id_delta = VarInt::decode(buf)?;
761 let namespace_prefix = TrackNamespace::decode_allow_empty(buf)?;
762 let subscribe_options = VarInt::decode(buf)?;
763 let parameters = decode_parameters(buf)?;
764 Ok(ControlMessage::SubscribeNamespace(SubscribeNamespace {
765 request_id,
766 required_request_id_delta,
767 namespace_prefix,
768 subscribe_options,
769 parameters,
770 }))
771 }
772 MessageType::TrackStatus => {
773 let request_id = VarInt::decode(buf)?;
774 let required_request_id_delta = VarInt::decode(buf)?;
775 let track_namespace = TrackNamespace::decode(buf)?;
776 let tn_len = VarInt::decode(buf)?.into_inner() as usize;
777 let track_name = read_bytes(buf, tn_len)?;
778 let parameters = decode_parameters(buf)?;
779 Ok(ControlMessage::TrackStatus(TrackStatus {
780 request_id,
781 required_request_id_delta,
782 track_namespace,
783 track_name,
784 parameters,
785 }))
786 }
787 MessageType::Fetch => {
788 let request_id = VarInt::decode(buf)?;
789 let required_request_id_delta = VarInt::decode(buf)?;
790 let fetch_type_val = VarInt::decode(buf)?.into_inner();
791 let fetch_type =
792 FetchType::from_u64(fetch_type_val).ok_or(CodecError::InvalidField)?;
793 let fetch_payload = match fetch_type {
794 FetchType::Standalone => {
795 let track_namespace = TrackNamespace::decode(buf)?;
796 let tn_len = VarInt::decode(buf)?.into_inner() as usize;
797 let track_name = read_bytes(buf, tn_len)?;
798 let start_group = VarInt::decode(buf)?;
799 let start_object = VarInt::decode(buf)?;
800 let end_group = VarInt::decode(buf)?;
801 let end_object = VarInt::decode(buf)?;
802 FetchPayload::Standalone {
803 track_namespace,
804 track_name,
805 start_group,
806 start_object,
807 end_group,
808 end_object,
809 }
810 }
811 FetchType::RelativeJoining | FetchType::AbsoluteJoining => {
812 let joining_request_id = VarInt::decode(buf)?;
813 let joining_start = VarInt::decode(buf)?;
814 FetchPayload::Joining { joining_request_id, joining_start }
815 }
816 };
817 let parameters = decode_parameters(buf)?;
818 Ok(ControlMessage::Fetch(Fetch {
819 request_id,
820 required_request_id_delta,
821 fetch_type,
822 fetch_payload,
823 parameters,
824 }))
825 }
826 MessageType::FetchOk => {
827 if buf.remaining() < 1 {
828 return Err(CodecError::UnexpectedEnd);
829 }
830 let end_of_track = buf.get_u8();
831 let end_group = VarInt::decode(buf)?;
832 let end_object = VarInt::decode(buf)?;
833 let parameters = decode_parameters(buf)?;
834 let track_properties = decode_kvp_delta(buf)?;
835 Ok(ControlMessage::FetchOk(FetchOk {
836 end_of_track,
837 end_group,
838 end_object,
839 parameters,
840 track_properties,
841 }))
842 }
843 MessageType::PublishBlocked => {
844 let namespace_suffix = TrackNamespace::decode_allow_empty(buf)?;
845 let tn_len = VarInt::decode(buf)?.into_inner() as usize;
846 let track_name = read_bytes(buf, tn_len)?;
847 Ok(ControlMessage::PublishBlocked(PublishBlocked { namespace_suffix, track_name }))
848 }
849 }
850 }
851
852 pub fn message_type(&self) -> MessageType {
853 match self {
854 ControlMessage::Setup(_) => MessageType::Setup,
855 ControlMessage::GoAway(_) => MessageType::GoAway,
856 ControlMessage::RequestOk(_) => MessageType::RequestOk,
857 ControlMessage::RequestError(_) => MessageType::RequestError,
858 ControlMessage::Subscribe(_) => MessageType::Subscribe,
859 ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
860 ControlMessage::RequestUpdate(_) => MessageType::RequestUpdate,
861 ControlMessage::Publish(_) => MessageType::Publish,
862 ControlMessage::PublishOk(_) => MessageType::PublishOk,
863 ControlMessage::PublishDone(_) => MessageType::PublishDone,
864 ControlMessage::PublishNamespace(_) => MessageType::PublishNamespace,
865 ControlMessage::Namespace(_) => MessageType::Namespace,
866 ControlMessage::NamespaceDone(_) => MessageType::NamespaceDone,
867 ControlMessage::SubscribeNamespace(_) => MessageType::SubscribeNamespace,
868 ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
869 ControlMessage::Fetch(_) => MessageType::Fetch,
870 ControlMessage::FetchOk(_) => MessageType::FetchOk,
871 ControlMessage::PublishBlocked(_) => MessageType::PublishBlocked,
872 }
873 }
874}