1use std::collections::HashMap;
2
3use crate::draft13::fetch::{FetchError, FetchStateMachine};
4use crate::draft13::namespace::{
5 AnnounceStateMachine, NamespaceError, SubscribeAnnouncesStateMachine,
6};
7use crate::draft13::session::request_id::{RequestIdAllocator, RequestIdError};
8use crate::draft13::session::setup::{self, SetupError};
9use crate::draft13::session::state::{SessionError, SessionState, SessionStateMachine};
10use crate::draft13::subscription::{SubscriptionError, SubscriptionStateMachine};
11use crate::draft13::track_status::{TrackStatusError, TrackStatusStateMachine};
12use moqtap_codec::draft13::message::{
13 self, Announce, AnnounceCancel, AnnounceError, AnnounceOk, ClientSetup, ControlMessage, Fetch,
14 FetchCancel, FetchPayload, FetchType, GoAway, MaxRequestId, Publish, PublishError, PublishOk,
15 RequestsBlocked, ServerSetup, Subscribe, SubscribeDone, SubscribeError, SubscribeNamespace,
16 SubscribeNamespaceError, SubscribeNamespaceOk, SubscribeOk, SubscribeUpdate, TrackStatus,
17 TrackStatusError as TrackStatusErrorMsg, TrackStatusOk, Unannounce, Unsubscribe,
18 UnsubscribeNamespace,
19};
20use moqtap_codec::kvp::{KeyValuePair, KvpValue};
21use moqtap_codec::types::*;
22use moqtap_codec::varint::VarInt;
23
24type NamespaceKey = Vec<Vec<u8>>;
26
27#[derive(Debug, thiserror::Error)]
29pub enum EndpointError {
30 #[error("session error: {0}")]
32 Session(#[from] SessionError),
33 #[error("request ID error: {0}")]
35 RequestId(#[from] RequestIdError),
36 #[error("subscription error: {0}")]
38 Subscription(#[from] SubscriptionError),
39 #[error("fetch error: {0}")]
41 Fetch(#[from] FetchError),
42 #[error("namespace error: {0}")]
44 Namespace(#[from] NamespaceError),
45 #[error("track status error: {0}")]
47 TrackStatus(#[from] TrackStatusError),
48 #[error("setup error: {0}")]
50 Setup(#[from] SetupError),
51 #[error("unknown request ID: {0}")]
53 UnknownRequest(u64),
54 #[error("unknown namespace")]
56 UnknownNamespace,
57 #[error("session not active")]
59 NotActive,
60 #[error("session is draining, no new requests allowed")]
62 Draining,
63}
64
65pub struct Endpoint {
69 session: SessionStateMachine,
70 request_ids: RequestIdAllocator,
71 advertised_max_id: u64,
73 subscriptions: HashMap<u64, SubscriptionStateMachine>,
74 fetches: HashMap<u64, FetchStateMachine>,
75 subscribe_namespaces: HashMap<u64, SubscribeAnnouncesStateMachine>,
76 announces: HashMap<u64, AnnounceStateMachine>,
77 announce_ids: HashMap<NamespaceKey, u64>,
80 subscribe_namespace_ids: HashMap<NamespaceKey, u64>,
82 track_statuses: HashMap<u64, TrackStatusStateMachine>,
83 subscribe_track_aliases: HashMap<u64, VarInt>,
86 inbound_publishes: HashMap<u64, Publish>,
90 negotiated_version: Option<VarInt>,
91 offered_versions: Vec<VarInt>,
92 goaway_uri: Option<Vec<u8>>,
93 peer_reported_max_request_id: Option<VarInt>,
96}
97
98impl Default for Endpoint {
99 fn default() -> Self {
100 Self::new()
101 }
102}
103
104impl Endpoint {
105 pub fn new() -> Self {
107 Self {
108 session: SessionStateMachine::new(),
109 request_ids: RequestIdAllocator::new(),
110 advertised_max_id: 0,
111 subscriptions: HashMap::new(),
112 fetches: HashMap::new(),
113 subscribe_namespaces: HashMap::new(),
114 announces: HashMap::new(),
115 announce_ids: HashMap::new(),
116 subscribe_namespace_ids: HashMap::new(),
117 track_statuses: HashMap::new(),
118 subscribe_track_aliases: HashMap::new(),
119 inbound_publishes: HashMap::new(),
120 negotiated_version: None,
121 offered_versions: Vec::new(),
122 goaway_uri: None,
123 peer_reported_max_request_id: None,
124 }
125 }
126
127 pub fn session_state(&self) -> SessionState {
131 self.session.state()
132 }
133
134 pub fn negotiated_version(&self) -> Option<VarInt> {
136 self.negotiated_version
137 }
138
139 pub fn goaway_uri(&self) -> Option<&[u8]> {
141 self.goaway_uri.as_deref()
142 }
143
144 pub fn is_blocked(&self) -> bool {
146 self.request_ids.is_blocked()
147 }
148
149 pub fn active_subscription_count(&self) -> usize {
151 self.subscriptions.len()
152 }
153
154 pub fn active_fetch_count(&self) -> usize {
156 self.fetches.len()
157 }
158
159 pub fn active_subscribe_namespace_count(&self) -> usize {
161 self.subscribe_namespaces.len()
162 }
163
164 pub fn active_announce_count(&self) -> usize {
166 self.announces.len()
167 }
168
169 pub fn active_track_status_count(&self) -> usize {
171 self.track_statuses.len()
172 }
173
174 pub fn connect(&mut self) -> Result<(), EndpointError> {
178 self.session.on_connect()?;
179 Ok(())
180 }
181
182 pub fn close(&mut self) -> Result<(), EndpointError> {
184 self.session.on_close()?;
185 Ok(())
186 }
187
188 pub fn send_client_setup(
192 &mut self,
193 versions: Vec<VarInt>,
194 parameters: Vec<KeyValuePair>,
195 ) -> Result<ControlMessage, EndpointError> {
196 self.offered_versions = versions.clone();
197 let msg = ClientSetup { supported_versions: versions, parameters };
198 setup::validate_client_setup(&msg)?;
199 Ok(ControlMessage::ClientSetup(msg))
200 }
201
202 pub fn receive_server_setup(&mut self, msg: &ServerSetup) -> Result<(), EndpointError> {
206 setup::validate_server_setup(msg)?;
207 let version = setup::negotiate_version(&self.offered_versions, msg.selected_version)?;
208 self.negotiated_version = Some(version);
209 self.session.on_setup_complete()?;
210 for param in &msg.parameters {
212 if param.key == VarInt::from_u64(0x02).unwrap() {
213 if let KvpValue::Varint(v) = ¶m.value {
214 self.request_ids.update_max(v.into_inner())?;
215 }
216 }
217 }
218 Ok(())
219 }
220
221 pub fn receive_client_setup_and_respond(
225 &mut self,
226 client_setup: &ClientSetup,
227 selected_version: VarInt,
228 ) -> Result<ControlMessage, EndpointError> {
229 setup::validate_client_setup(client_setup)?;
230 let version = setup::negotiate_version(&client_setup.supported_versions, selected_version)?;
231 self.negotiated_version = Some(version);
232 self.session.on_setup_complete()?;
233 let msg = ServerSetup { selected_version: version, parameters: vec![] };
234 Ok(ControlMessage::ServerSetup(msg))
235 }
236
237 pub fn receive_max_request_id(&mut self, msg: &MaxRequestId) -> Result<(), EndpointError> {
241 self.request_ids.update_max(msg.request_id.into_inner())?;
242 Ok(())
243 }
244
245 pub fn send_max_request_id(&mut self, max_id: VarInt) -> Result<ControlMessage, EndpointError> {
248 let new_val = max_id.into_inner();
249 if new_val <= self.advertised_max_id && self.advertised_max_id > 0 {
250 return Err(EndpointError::RequestId(RequestIdError::Decreased(
251 self.advertised_max_id,
252 new_val,
253 )));
254 }
255 self.advertised_max_id = new_val;
256 Ok(ControlMessage::MaxRequestId(MaxRequestId { request_id: max_id }))
257 }
258
259 pub fn receive_goaway(&mut self, msg: &GoAway) -> Result<(), EndpointError> {
263 self.session.on_goaway()?;
264 self.goaway_uri = Some(msg.new_session_uri.clone());
265 Ok(())
266 }
267
268 fn require_active_or_err(&self) -> Result<(), EndpointError> {
271 match self.session.state() {
272 SessionState::Active => Ok(()),
273 SessionState::Draining => Err(EndpointError::Draining),
274 _ => Err(EndpointError::NotActive),
275 }
276 }
277
278 #[allow(clippy::too_many_arguments)]
287 pub fn subscribe(
288 &mut self,
289 track_namespace: TrackNamespace,
290 track_name: Vec<u8>,
291 subscriber_priority: u8,
292 group_order: VarInt,
293 filter_type: VarInt,
294 ) -> Result<(VarInt, ControlMessage), EndpointError> {
295 self.require_active_or_err()?;
296 let req_id = self.request_ids.allocate()?;
297
298 let mut sm = SubscriptionStateMachine::new();
299 sm.on_subscribe_sent()?;
300 self.subscriptions.insert(req_id.into_inner(), sm);
301
302 let msg = ControlMessage::Subscribe(Subscribe {
303 request_id: req_id,
304 track_namespace,
305 track_name,
306 subscriber_priority,
307 group_order,
308 forward: VarInt::from_u64(1).unwrap(),
309 filter_type,
310 start_group: None,
311 start_object: None,
312 end_group: None,
313 parameters: vec![],
314 });
315 Ok((req_id, msg))
316 }
317
318 pub fn receive_subscribe_ok(&mut self, msg: &SubscribeOk) -> Result<(), EndpointError> {
324 let id = msg.request_id.into_inner();
325 let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
326 sm.on_subscribe_ok()?;
327 self.subscribe_track_aliases.insert(id, msg.track_alias);
328 Ok(())
329 }
330
331 pub fn track_alias_for(&self, request_id: VarInt) -> Option<VarInt> {
334 self.subscribe_track_aliases.get(&request_id.into_inner()).copied()
335 }
336
337 pub fn receive_subscribe_error(&mut self, msg: &SubscribeError) -> Result<(), EndpointError> {
339 let id = msg.request_id.into_inner();
340 let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
341 sm.on_subscribe_error()?;
342 Ok(())
343 }
344
345 pub fn unsubscribe(&mut self, request_id: VarInt) -> Result<ControlMessage, EndpointError> {
347 let id = request_id.into_inner();
348 let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
349 sm.on_unsubscribe()?;
350 Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
351 }
352
353 pub fn receive_subscribe_update(&mut self, msg: &SubscribeUpdate) -> Result<(), EndpointError> {
355 let id = msg.request_id.into_inner();
356 let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
357 sm.on_subscribe_update()?;
358 Ok(())
359 }
360
361 pub fn receive_subscribe_done(&mut self, msg: &SubscribeDone) -> Result<(), EndpointError> {
363 let id = msg.request_id.into_inner();
364 let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
365 sm.on_subscribe_done()?;
366 Ok(())
367 }
368
369 #[allow(clippy::too_many_arguments)]
374 pub fn fetch(
375 &mut self,
376 track_namespace: TrackNamespace,
377 track_name: Vec<u8>,
378 subscriber_priority: u8,
379 group_order: VarInt,
380 start_group: VarInt,
381 start_object: VarInt,
382 end_group: VarInt,
383 end_object: VarInt,
384 ) -> Result<(VarInt, ControlMessage), EndpointError> {
385 self.require_active_or_err()?;
386 let req_id = self.request_ids.allocate()?;
387
388 let mut sm = FetchStateMachine::new();
389 sm.on_fetch_sent()?;
390 self.fetches.insert(req_id.into_inner(), sm);
391
392 let msg = ControlMessage::Fetch(Fetch {
393 request_id: req_id,
394 subscriber_priority,
395 group_order,
396 fetch_type: FetchType::Standalone,
397 fetch_payload: FetchPayload::Standalone {
398 track_namespace,
399 track_name,
400 start_group,
401 start_object,
402 end_group,
403 end_object,
404 },
405 parameters: vec![],
406 });
407 Ok((req_id, msg))
408 }
409
410 pub fn joining_fetch(
415 &mut self,
416 subscriber_priority: u8,
417 group_order: VarInt,
418 fetch_type: FetchType,
419 joining_subscribe_id: VarInt,
420 joining_start: VarInt,
421 ) -> Result<(VarInt, ControlMessage), EndpointError> {
422 self.require_active_or_err()?;
423 if !matches!(fetch_type, FetchType::RelativeJoining | FetchType::AbsoluteJoining) {
424 return Err(EndpointError::NotActive);
426 }
427 let req_id = self.request_ids.allocate()?;
428
429 let mut sm = FetchStateMachine::new();
430 sm.on_fetch_sent()?;
431 self.fetches.insert(req_id.into_inner(), sm);
432
433 let msg = ControlMessage::Fetch(Fetch {
434 request_id: req_id,
435 subscriber_priority,
436 group_order,
437 fetch_type,
438 fetch_payload: FetchPayload::Joining { joining_subscribe_id, joining_start },
439 parameters: vec![],
440 });
441 Ok((req_id, msg))
442 }
443
444 pub fn receive_fetch_ok(&mut self, msg: &message::FetchOk) -> Result<(), EndpointError> {
446 let id = msg.request_id.into_inner();
447 let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
448 sm.on_fetch_ok()?;
449 Ok(())
450 }
451
452 pub fn receive_fetch_error(&mut self, msg: &message::FetchError) -> Result<(), EndpointError> {
454 let id = msg.request_id.into_inner();
455 let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
456 sm.on_fetch_error()?;
457 Ok(())
458 }
459
460 pub fn fetch_cancel(&mut self, request_id: VarInt) -> Result<ControlMessage, EndpointError> {
462 let id = request_id.into_inner();
463 let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
464 sm.on_fetch_cancel()?;
465 Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
466 }
467
468 pub fn on_fetch_stream_fin(&mut self, request_id: VarInt) -> Result<(), EndpointError> {
470 let id = request_id.into_inner();
471 let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
472 sm.on_stream_fin()?;
473 Ok(())
474 }
475
476 pub fn on_fetch_stream_reset(&mut self, request_id: VarInt) -> Result<(), EndpointError> {
478 let id = request_id.into_inner();
479 let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
480 sm.on_stream_reset()?;
481 Ok(())
482 }
483
484 pub fn subscribe_namespace(
489 &mut self,
490 track_namespace_prefix: TrackNamespace,
491 ) -> Result<(VarInt, ControlMessage), EndpointError> {
492 self.require_active_or_err()?;
493 let req_id = self.request_ids.allocate()?;
494 let key = track_namespace_prefix.0.clone();
495 let mut sm = SubscribeAnnouncesStateMachine::new();
496 sm.on_subscribe_announces_sent()?;
497 self.subscribe_namespaces.insert(req_id.into_inner(), sm);
498 self.subscribe_namespace_ids.insert(key, req_id.into_inner());
499 Ok((
500 req_id,
501 ControlMessage::SubscribeNamespace(SubscribeNamespace {
502 request_id: req_id,
503 track_namespace_prefix,
504 parameters: vec![],
505 }),
506 ))
507 }
508
509 pub fn receive_subscribe_namespace_ok(
511 &mut self,
512 msg: &SubscribeNamespaceOk,
513 ) -> Result<(), EndpointError> {
514 let id = msg.request_id.into_inner();
515 let sm = self.subscribe_namespaces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
516 sm.on_subscribe_announces_ok()?;
517 Ok(())
518 }
519
520 pub fn receive_subscribe_namespace_error(
522 &mut self,
523 msg: &SubscribeNamespaceError,
524 ) -> Result<(), EndpointError> {
525 let id = msg.request_id.into_inner();
526 let sm = self.subscribe_namespaces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
527 sm.on_subscribe_announces_error()?;
528 Ok(())
529 }
530
531 pub fn unsubscribe_namespace(
533 &mut self,
534 track_namespace_prefix: TrackNamespace,
535 ) -> Result<ControlMessage, EndpointError> {
536 let id = *self
537 .subscribe_namespace_ids
538 .get(&track_namespace_prefix.0)
539 .ok_or(EndpointError::UnknownNamespace)?;
540 let sm = self.subscribe_namespaces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
541 sm.on_unsubscribe_announces()?;
542 Ok(ControlMessage::UnsubscribeNamespace(UnsubscribeNamespace { track_namespace_prefix }))
543 }
544
545 pub fn announce(
550 &mut self,
551 track_namespace: TrackNamespace,
552 ) -> Result<(VarInt, ControlMessage), EndpointError> {
553 self.require_active_or_err()?;
554 let req_id = self.request_ids.allocate()?;
555 let key = track_namespace.0.clone();
556 let mut sm = AnnounceStateMachine::new();
557 sm.on_announce_sent()?;
558 self.announces.insert(req_id.into_inner(), sm);
559 self.announce_ids.insert(key, req_id.into_inner());
560 Ok((
561 req_id,
562 ControlMessage::Announce(Announce {
563 request_id: req_id,
564 track_namespace,
565 parameters: vec![],
566 }),
567 ))
568 }
569
570 pub fn receive_announce_ok(&mut self, msg: &AnnounceOk) -> Result<(), EndpointError> {
572 let id = msg.request_id.into_inner();
573 let sm = self.announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
574 sm.on_announce_ok()?;
575 Ok(())
576 }
577
578 pub fn receive_announce_error(&mut self, msg: &AnnounceError) -> Result<(), EndpointError> {
580 let id = msg.request_id.into_inner();
581 let sm = self.announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
582 sm.on_announce_error()?;
583 Ok(())
584 }
585
586 pub fn receive_announce_cancel(&mut self, msg: &AnnounceCancel) -> Result<(), EndpointError> {
588 let id = *self
589 .announce_ids
590 .get(&msg.track_namespace.0)
591 .ok_or(EndpointError::UnknownNamespace)?;
592 let sm = self.announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
593 sm.on_announce_cancel()?;
594 Ok(())
595 }
596
597 pub fn unannounce(
599 &mut self,
600 track_namespace: TrackNamespace,
601 ) -> Result<ControlMessage, EndpointError> {
602 let id =
603 *self.announce_ids.get(&track_namespace.0).ok_or(EndpointError::UnknownNamespace)?;
604 let sm = self.announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
605 sm.on_unannounce()?;
606 Ok(ControlMessage::Unannounce(Unannounce { track_namespace }))
607 }
608
609 #[allow(clippy::too_many_arguments)]
614 pub fn track_status_request(
615 &mut self,
616 track_namespace: TrackNamespace,
617 track_name: Vec<u8>,
618 subscriber_priority: u8,
619 group_order: VarInt,
620 forward: VarInt,
621 filter_type: VarInt,
622 ) -> Result<(VarInt, ControlMessage), EndpointError> {
623 self.require_active_or_err()?;
624 let req_id = self.request_ids.allocate()?;
625 let mut sm = TrackStatusStateMachine::new();
626 sm.on_track_status_request_sent()?;
627 self.track_statuses.insert(req_id.into_inner(), sm);
628 Ok((
629 req_id,
630 ControlMessage::TrackStatus(TrackStatus {
631 request_id: req_id,
632 track_namespace,
633 track_name,
634 subscriber_priority,
635 group_order,
636 forward,
637 filter_type,
638 parameters: vec![],
639 }),
640 ))
641 }
642
643 pub fn receive_track_status_ok(&mut self, msg: &TrackStatusOk) -> Result<(), EndpointError> {
645 let id = msg.request_id.into_inner();
646 let sm = self.track_statuses.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
647 sm.on_track_status()?;
648 Ok(())
649 }
650
651 pub fn receive_track_status_error(
653 &mut self,
654 msg: &TrackStatusErrorMsg,
655 ) -> Result<(), EndpointError> {
656 let id = msg.request_id.into_inner();
657 let sm = self.track_statuses.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
658 sm.on_track_status()?;
659 Ok(())
660 }
661
662 pub fn receive_requests_blocked(&mut self, msg: &RequestsBlocked) -> Result<(), EndpointError> {
670 self.peer_reported_max_request_id = Some(msg.maximum_request_id);
671 Ok(())
672 }
673
674 pub fn peer_reported_max_request_id(&self) -> Option<VarInt> {
677 self.peer_reported_max_request_id
678 }
679
680 pub fn receive_publish(&mut self, msg: &Publish) -> Result<(), EndpointError> {
688 self.require_active_or_err()?;
689 self.inbound_publishes.insert(msg.request_id.into_inner(), msg.clone());
690 Ok(())
691 }
692
693 pub fn pending_publish(&self, request_id: VarInt) -> Option<&Publish> {
695 self.inbound_publishes.get(&request_id.into_inner())
696 }
697
698 pub fn pending_publish_count(&self) -> usize {
700 self.inbound_publishes.len()
701 }
702
703 #[allow(clippy::too_many_arguments)]
706 pub fn send_publish_ok(
707 &mut self,
708 request_id: VarInt,
709 forward: VarInt,
710 subscriber_priority: u8,
711 group_order: VarInt,
712 filter_type: VarInt,
713 start_group: Option<VarInt>,
714 start_object: Option<VarInt>,
715 end_group: Option<VarInt>,
716 ) -> Result<ControlMessage, EndpointError> {
717 let id = request_id.into_inner();
718 if !self.inbound_publishes.contains_key(&id) {
719 return Err(EndpointError::UnknownRequest(id));
720 }
721 self.inbound_publishes.remove(&id);
722 Ok(ControlMessage::PublishOk(PublishOk {
723 request_id,
724 forward,
725 subscriber_priority,
726 group_order,
727 filter_type,
728 start_group,
729 start_object,
730 end_group,
731 parameters: vec![],
732 }))
733 }
734
735 pub fn send_publish_error(
738 &mut self,
739 request_id: VarInt,
740 error_code: VarInt,
741 reason_phrase: Vec<u8>,
742 ) -> Result<ControlMessage, EndpointError> {
743 let id = request_id.into_inner();
744 if !self.inbound_publishes.contains_key(&id) {
745 return Err(EndpointError::UnknownRequest(id));
746 }
747 self.inbound_publishes.remove(&id);
748 Ok(ControlMessage::PublishError(PublishError { request_id, error_code, reason_phrase }))
749 }
750
751 pub fn receive_publish_ok(&mut self, _msg: &PublishOk) -> Result<(), EndpointError> {
754 Ok(())
755 }
756
757 pub fn receive_publish_error(&mut self, _msg: &PublishError) -> Result<(), EndpointError> {
760 Ok(())
761 }
762
763 pub fn receive_message(&mut self, msg: ControlMessage) -> Result<(), EndpointError> {
767 match msg {
768 ControlMessage::GoAway(ref m) => self.receive_goaway(m),
769 ControlMessage::MaxRequestId(ref m) => self.receive_max_request_id(m),
770 ControlMessage::RequestsBlocked(ref m) => self.receive_requests_blocked(m),
771 ControlMessage::SubscribeOk(ref m) => self.receive_subscribe_ok(m),
772 ControlMessage::SubscribeError(ref m) => self.receive_subscribe_error(m),
773 ControlMessage::SubscribeUpdate(ref m) => self.receive_subscribe_update(m),
774 ControlMessage::SubscribeDone(ref m) => self.receive_subscribe_done(m),
775 ControlMessage::FetchOk(ref m) => self.receive_fetch_ok(m),
776 ControlMessage::FetchError(ref m) => self.receive_fetch_error(m),
777 ControlMessage::SubscribeNamespaceOk(ref m) => self.receive_subscribe_namespace_ok(m),
778 ControlMessage::SubscribeNamespaceError(ref m) => {
779 self.receive_subscribe_namespace_error(m)
780 }
781 ControlMessage::AnnounceOk(ref m) => self.receive_announce_ok(m),
782 ControlMessage::AnnounceError(ref m) => self.receive_announce_error(m),
783 ControlMessage::AnnounceCancel(ref m) => self.receive_announce_cancel(m),
784 ControlMessage::TrackStatusOk(ref m) => self.receive_track_status_ok(m),
785 ControlMessage::TrackStatusError(ref m) => self.receive_track_status_error(m),
786 ControlMessage::Publish(ref m) => self.receive_publish(m),
787 ControlMessage::PublishOk(ref m) => self.receive_publish_ok(m),
788 ControlMessage::PublishError(ref m) => self.receive_publish_error(m),
789 _ => Ok(()),
790 }
791 }
792}