1use std::collections::HashMap;
2
3use crate::draft12::fetch::{FetchError, FetchStateMachine};
4use crate::draft12::namespace::{
5 AnnounceStateMachine, NamespaceError, SubscribeAnnouncesStateMachine,
6};
7use crate::draft12::session::request_id::{RequestIdAllocator, RequestIdError};
8use crate::draft12::session::setup::{self, SetupError};
9use crate::draft12::session::state::{SessionError, SessionState, SessionStateMachine};
10use crate::draft12::subscription::{SubscriptionError, SubscriptionStateMachine};
11use crate::draft12::track_status::{TrackStatusError, TrackStatusStateMachine};
12use moqtap_codec::draft12::message::{
13 self, Announce, AnnounceCancel, AnnounceError, AnnounceOk, ClientSetup, ControlMessage, Fetch,
14 FetchCancel, FetchPayload, FetchType, GoAway, MaxRequestId, Publish, PublishError, PublishOk,
15 RequestsBlocked, ServerSetup, Subscribe, SubscribeAnnounces, SubscribeAnnouncesError,
16 SubscribeAnnouncesOk, SubscribeDone, SubscribeError, SubscribeOk, SubscribeUpdate, TrackStatus,
17 TrackStatusRequest, Unannounce, Unsubscribe, UnsubscribeAnnounces,
18};
19use moqtap_codec::kvp::{KeyValuePair, KvpValue};
20use moqtap_codec::types::*;
21use moqtap_codec::varint::VarInt;
22
23type NamespaceKey = Vec<Vec<u8>>;
25
26#[derive(Debug, thiserror::Error)]
28pub enum EndpointError {
29 #[error("session error: {0}")]
31 Session(#[from] SessionError),
32 #[error("request ID error: {0}")]
34 RequestId(#[from] RequestIdError),
35 #[error("subscription error: {0}")]
37 Subscription(#[from] SubscriptionError),
38 #[error("fetch error: {0}")]
40 Fetch(#[from] FetchError),
41 #[error("namespace error: {0}")]
43 Namespace(#[from] NamespaceError),
44 #[error("track status error: {0}")]
46 TrackStatus(#[from] TrackStatusError),
47 #[error("setup error: {0}")]
49 Setup(#[from] SetupError),
50 #[error("unknown request ID: {0}")]
52 UnknownRequest(u64),
53 #[error("unknown namespace")]
55 UnknownNamespace,
56 #[error("session not active")]
58 NotActive,
59 #[error("session is draining, no new requests allowed")]
61 Draining,
62}
63
64pub struct Endpoint {
68 session: SessionStateMachine,
69 request_ids: RequestIdAllocator,
70 advertised_max_id: u64,
72 subscriptions: HashMap<u64, SubscriptionStateMachine>,
73 fetches: HashMap<u64, FetchStateMachine>,
74 subscribe_announces: HashMap<u64, SubscribeAnnouncesStateMachine>,
75 announces: HashMap<u64, AnnounceStateMachine>,
76 announce_ids: HashMap<NamespaceKey, u64>,
79 subscribe_announces_ids: HashMap<NamespaceKey, u64>,
81 track_statuses: HashMap<u64, TrackStatusStateMachine>,
82 subscribe_track_aliases: HashMap<u64, VarInt>,
85 inbound_publishes: HashMap<u64, Publish>,
89 negotiated_version: Option<VarInt>,
90 offered_versions: Vec<VarInt>,
91 goaway_uri: Option<Vec<u8>>,
92 peer_reported_max_request_id: Option<VarInt>,
95}
96
97impl Default for Endpoint {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103impl Endpoint {
104 pub fn new() -> Self {
106 Self {
107 session: SessionStateMachine::new(),
108 request_ids: RequestIdAllocator::new(),
109 advertised_max_id: 0,
110 subscriptions: HashMap::new(),
111 fetches: HashMap::new(),
112 subscribe_announces: HashMap::new(),
113 announces: HashMap::new(),
114 announce_ids: HashMap::new(),
115 subscribe_announces_ids: HashMap::new(),
116 track_statuses: HashMap::new(),
117 subscribe_track_aliases: HashMap::new(),
118 inbound_publishes: HashMap::new(),
119 negotiated_version: None,
120 offered_versions: Vec::new(),
121 goaway_uri: None,
122 peer_reported_max_request_id: None,
123 }
124 }
125
126 pub fn session_state(&self) -> SessionState {
130 self.session.state()
131 }
132
133 pub fn negotiated_version(&self) -> Option<VarInt> {
135 self.negotiated_version
136 }
137
138 pub fn goaway_uri(&self) -> Option<&[u8]> {
140 self.goaway_uri.as_deref()
141 }
142
143 pub fn is_blocked(&self) -> bool {
145 self.request_ids.is_blocked()
146 }
147
148 pub fn active_subscription_count(&self) -> usize {
150 self.subscriptions.len()
151 }
152
153 pub fn active_fetch_count(&self) -> usize {
155 self.fetches.len()
156 }
157
158 pub fn active_subscribe_announces_count(&self) -> usize {
160 self.subscribe_announces.len()
161 }
162
163 pub fn active_announce_count(&self) -> usize {
165 self.announces.len()
166 }
167
168 pub fn active_track_status_count(&self) -> usize {
170 self.track_statuses.len()
171 }
172
173 pub fn connect(&mut self) -> Result<(), EndpointError> {
177 self.session.on_connect()?;
178 Ok(())
179 }
180
181 pub fn close(&mut self) -> Result<(), EndpointError> {
183 self.session.on_close()?;
184 Ok(())
185 }
186
187 pub fn send_client_setup(
191 &mut self,
192 versions: Vec<VarInt>,
193 parameters: Vec<KeyValuePair>,
194 ) -> Result<ControlMessage, EndpointError> {
195 self.offered_versions = versions.clone();
196 let msg = ClientSetup { supported_versions: versions, parameters };
197 setup::validate_client_setup(&msg)?;
198 Ok(ControlMessage::ClientSetup(msg))
199 }
200
201 pub fn receive_server_setup(&mut self, msg: &ServerSetup) -> Result<(), EndpointError> {
205 setup::validate_server_setup(msg)?;
206 let version = setup::negotiate_version(&self.offered_versions, msg.selected_version)?;
207 self.negotiated_version = Some(version);
208 self.session.on_setup_complete()?;
209 for param in &msg.parameters {
211 if param.key == VarInt::from_u64(0x02).unwrap() {
212 if let KvpValue::Varint(v) = ¶m.value {
213 self.request_ids.update_max(v.into_inner())?;
214 }
215 }
216 }
217 Ok(())
218 }
219
220 pub fn receive_client_setup_and_respond(
224 &mut self,
225 client_setup: &ClientSetup,
226 selected_version: VarInt,
227 ) -> Result<ControlMessage, EndpointError> {
228 setup::validate_client_setup(client_setup)?;
229 let version = setup::negotiate_version(&client_setup.supported_versions, selected_version)?;
230 self.negotiated_version = Some(version);
231 self.session.on_setup_complete()?;
232 let msg = ServerSetup { selected_version: version, parameters: vec![] };
233 Ok(ControlMessage::ServerSetup(msg))
234 }
235
236 pub fn receive_max_request_id(&mut self, msg: &MaxRequestId) -> Result<(), EndpointError> {
240 self.request_ids.update_max(msg.request_id.into_inner())?;
241 Ok(())
242 }
243
244 pub fn send_max_request_id(&mut self, max_id: VarInt) -> Result<ControlMessage, EndpointError> {
247 let new_val = max_id.into_inner();
248 if new_val <= self.advertised_max_id && self.advertised_max_id > 0 {
249 return Err(EndpointError::RequestId(RequestIdError::Decreased(
250 self.advertised_max_id,
251 new_val,
252 )));
253 }
254 self.advertised_max_id = new_val;
255 Ok(ControlMessage::MaxRequestId(MaxRequestId { request_id: max_id }))
256 }
257
258 pub fn receive_goaway(&mut self, msg: &GoAway) -> Result<(), EndpointError> {
262 self.session.on_goaway()?;
263 self.goaway_uri = Some(msg.new_session_uri.clone());
264 Ok(())
265 }
266
267 fn require_active_or_err(&self) -> Result<(), EndpointError> {
270 match self.session.state() {
271 SessionState::Active => Ok(()),
272 SessionState::Draining => Err(EndpointError::Draining),
273 _ => Err(EndpointError::NotActive),
274 }
275 }
276
277 #[allow(clippy::too_many_arguments)]
286 pub fn subscribe(
287 &mut self,
288 track_namespace: TrackNamespace,
289 track_name: Vec<u8>,
290 subscriber_priority: u8,
291 group_order: VarInt,
292 filter_type: VarInt,
293 ) -> Result<(VarInt, ControlMessage), EndpointError> {
294 self.require_active_or_err()?;
295 let req_id = self.request_ids.allocate()?;
296
297 let mut sm = SubscriptionStateMachine::new();
298 sm.on_subscribe_sent()?;
299 self.subscriptions.insert(req_id.into_inner(), sm);
300
301 let msg = ControlMessage::Subscribe(Subscribe {
302 request_id: req_id,
303 track_namespace,
304 track_name,
305 subscriber_priority,
306 group_order,
307 forward: VarInt::from_u64(1).unwrap(),
308 filter_type,
309 start_group: None,
310 start_object: None,
311 end_group: None,
312 parameters: vec![],
313 });
314 Ok((req_id, msg))
315 }
316
317 pub fn receive_subscribe_ok(&mut self, msg: &SubscribeOk) -> Result<(), EndpointError> {
323 let id = msg.request_id.into_inner();
324 let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
325 sm.on_subscribe_ok()?;
326 self.subscribe_track_aliases.insert(id, msg.track_alias);
327 Ok(())
328 }
329
330 pub fn track_alias_for(&self, request_id: VarInt) -> Option<VarInt> {
333 self.subscribe_track_aliases.get(&request_id.into_inner()).copied()
334 }
335
336 pub fn receive_subscribe_error(&mut self, msg: &SubscribeError) -> Result<(), EndpointError> {
338 let id = msg.request_id.into_inner();
339 let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
340 sm.on_subscribe_error()?;
341 Ok(())
342 }
343
344 pub fn unsubscribe(&mut self, request_id: VarInt) -> Result<ControlMessage, EndpointError> {
346 let id = request_id.into_inner();
347 let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
348 sm.on_unsubscribe()?;
349 Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
350 }
351
352 pub fn receive_subscribe_update(&mut self, msg: &SubscribeUpdate) -> Result<(), EndpointError> {
354 let id = msg.request_id.into_inner();
355 let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
356 sm.on_subscribe_update()?;
357 Ok(())
358 }
359
360 pub fn receive_subscribe_done(&mut self, msg: &SubscribeDone) -> Result<(), EndpointError> {
362 let id = msg.request_id.into_inner();
363 let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
364 sm.on_subscribe_done()?;
365 Ok(())
366 }
367
368 #[allow(clippy::too_many_arguments)]
373 pub fn fetch(
374 &mut self,
375 track_namespace: TrackNamespace,
376 track_name: Vec<u8>,
377 subscriber_priority: u8,
378 group_order: VarInt,
379 start_group: VarInt,
380 start_object: VarInt,
381 end_group: VarInt,
382 end_object: VarInt,
383 ) -> Result<(VarInt, ControlMessage), EndpointError> {
384 self.require_active_or_err()?;
385 let req_id = self.request_ids.allocate()?;
386
387 let mut sm = FetchStateMachine::new();
388 sm.on_fetch_sent()?;
389 self.fetches.insert(req_id.into_inner(), sm);
390
391 let msg = ControlMessage::Fetch(Fetch {
392 request_id: req_id,
393 subscriber_priority,
394 group_order,
395 fetch_type: FetchType::Standalone,
396 fetch_payload: FetchPayload::Standalone {
397 track_namespace,
398 track_name,
399 start_group,
400 start_object,
401 end_group,
402 end_object,
403 },
404 parameters: vec![],
405 });
406 Ok((req_id, msg))
407 }
408
409 pub fn joining_fetch(
414 &mut self,
415 subscriber_priority: u8,
416 group_order: VarInt,
417 fetch_type: FetchType,
418 joining_subscribe_id: VarInt,
419 joining_start: VarInt,
420 ) -> Result<(VarInt, ControlMessage), EndpointError> {
421 self.require_active_or_err()?;
422 if !matches!(fetch_type, FetchType::RelativeJoining | FetchType::AbsoluteJoining) {
423 return Err(EndpointError::NotActive);
425 }
426 let req_id = self.request_ids.allocate()?;
427
428 let mut sm = FetchStateMachine::new();
429 sm.on_fetch_sent()?;
430 self.fetches.insert(req_id.into_inner(), sm);
431
432 let msg = ControlMessage::Fetch(Fetch {
433 request_id: req_id,
434 subscriber_priority,
435 group_order,
436 fetch_type,
437 fetch_payload: FetchPayload::Joining { joining_subscribe_id, joining_start },
438 parameters: vec![],
439 });
440 Ok((req_id, msg))
441 }
442
443 pub fn receive_fetch_ok(&mut self, msg: &message::FetchOk) -> Result<(), EndpointError> {
445 let id = msg.request_id.into_inner();
446 let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
447 sm.on_fetch_ok()?;
448 Ok(())
449 }
450
451 pub fn receive_fetch_error(&mut self, msg: &message::FetchError) -> Result<(), EndpointError> {
453 let id = msg.request_id.into_inner();
454 let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
455 sm.on_fetch_error()?;
456 Ok(())
457 }
458
459 pub fn fetch_cancel(&mut self, request_id: VarInt) -> Result<ControlMessage, EndpointError> {
461 let id = request_id.into_inner();
462 let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
463 sm.on_fetch_cancel()?;
464 Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
465 }
466
467 pub fn on_fetch_stream_fin(&mut self, request_id: VarInt) -> Result<(), EndpointError> {
469 let id = request_id.into_inner();
470 let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
471 sm.on_stream_fin()?;
472 Ok(())
473 }
474
475 pub fn on_fetch_stream_reset(&mut self, request_id: VarInt) -> Result<(), EndpointError> {
477 let id = request_id.into_inner();
478 let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
479 sm.on_stream_reset()?;
480 Ok(())
481 }
482
483 pub fn subscribe_announces(
488 &mut self,
489 track_namespace_prefix: TrackNamespace,
490 ) -> Result<(VarInt, ControlMessage), EndpointError> {
491 self.require_active_or_err()?;
492 let req_id = self.request_ids.allocate()?;
493 let key = track_namespace_prefix.0.clone();
494 let mut sm = SubscribeAnnouncesStateMachine::new();
495 sm.on_subscribe_announces_sent()?;
496 self.subscribe_announces.insert(req_id.into_inner(), sm);
497 self.subscribe_announces_ids.insert(key, req_id.into_inner());
498 Ok((
499 req_id,
500 ControlMessage::SubscribeAnnounces(SubscribeAnnounces {
501 request_id: req_id,
502 track_namespace_prefix,
503 parameters: vec![],
504 }),
505 ))
506 }
507
508 pub fn receive_subscribe_announces_ok(
510 &mut self,
511 msg: &SubscribeAnnouncesOk,
512 ) -> Result<(), EndpointError> {
513 let id = msg.request_id.into_inner();
514 let sm = self.subscribe_announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
515 sm.on_subscribe_announces_ok()?;
516 Ok(())
517 }
518
519 pub fn receive_subscribe_announces_error(
521 &mut self,
522 msg: &SubscribeAnnouncesError,
523 ) -> Result<(), EndpointError> {
524 let id = msg.request_id.into_inner();
525 let sm = self.subscribe_announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
526 sm.on_subscribe_announces_error()?;
527 Ok(())
528 }
529
530 pub fn unsubscribe_announces(
532 &mut self,
533 track_namespace_prefix: TrackNamespace,
534 ) -> Result<ControlMessage, EndpointError> {
535 let id = *self
536 .subscribe_announces_ids
537 .get(&track_namespace_prefix.0)
538 .ok_or(EndpointError::UnknownNamespace)?;
539 let sm = self.subscribe_announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
540 sm.on_unsubscribe_announces()?;
541 Ok(ControlMessage::UnsubscribeAnnounces(UnsubscribeAnnounces { track_namespace_prefix }))
542 }
543
544 pub fn announce(
549 &mut self,
550 track_namespace: TrackNamespace,
551 ) -> Result<(VarInt, ControlMessage), EndpointError> {
552 self.require_active_or_err()?;
553 let req_id = self.request_ids.allocate()?;
554 let key = track_namespace.0.clone();
555 let mut sm = AnnounceStateMachine::new();
556 sm.on_announce_sent()?;
557 self.announces.insert(req_id.into_inner(), sm);
558 self.announce_ids.insert(key, req_id.into_inner());
559 Ok((
560 req_id,
561 ControlMessage::Announce(Announce {
562 request_id: req_id,
563 track_namespace,
564 parameters: vec![],
565 }),
566 ))
567 }
568
569 pub fn receive_announce_ok(&mut self, msg: &AnnounceOk) -> Result<(), EndpointError> {
571 let id = msg.request_id.into_inner();
572 let sm = self.announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
573 sm.on_announce_ok()?;
574 Ok(())
575 }
576
577 pub fn receive_announce_error(&mut self, msg: &AnnounceError) -> Result<(), EndpointError> {
579 let id = msg.request_id.into_inner();
580 let sm = self.announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
581 sm.on_announce_error()?;
582 Ok(())
583 }
584
585 pub fn receive_announce_cancel(&mut self, msg: &AnnounceCancel) -> Result<(), EndpointError> {
587 let id = *self
588 .announce_ids
589 .get(&msg.track_namespace.0)
590 .ok_or(EndpointError::UnknownNamespace)?;
591 let sm = self.announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
592 sm.on_announce_cancel()?;
593 Ok(())
594 }
595
596 pub fn unannounce(
598 &mut self,
599 track_namespace: TrackNamespace,
600 ) -> Result<ControlMessage, EndpointError> {
601 let id =
602 *self.announce_ids.get(&track_namespace.0).ok_or(EndpointError::UnknownNamespace)?;
603 let sm = self.announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
604 sm.on_unannounce()?;
605 Ok(ControlMessage::Unannounce(Unannounce { track_namespace }))
606 }
607
608 pub fn track_status_request(
613 &mut self,
614 track_namespace: TrackNamespace,
615 track_name: Vec<u8>,
616 ) -> Result<(VarInt, ControlMessage), EndpointError> {
617 self.require_active_or_err()?;
618 let req_id = self.request_ids.allocate()?;
619 let mut sm = TrackStatusStateMachine::new();
620 sm.on_track_status_request_sent()?;
621 self.track_statuses.insert(req_id.into_inner(), sm);
622 Ok((
623 req_id,
624 ControlMessage::TrackStatusRequest(TrackStatusRequest {
625 request_id: req_id,
626 track_namespace,
627 track_name,
628 parameters: vec![],
629 }),
630 ))
631 }
632
633 pub fn receive_track_status(&mut self, msg: &TrackStatus) -> Result<(), EndpointError> {
635 let id = msg.request_id.into_inner();
636 let sm = self.track_statuses.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
637 sm.on_track_status()?;
638 Ok(())
639 }
640
641 pub fn receive_requests_blocked(&mut self, msg: &RequestsBlocked) -> Result<(), EndpointError> {
650 self.peer_reported_max_request_id = Some(msg.maximum_request_id);
651 Ok(())
652 }
653
654 pub fn peer_reported_max_request_id(&self) -> Option<VarInt> {
657 self.peer_reported_max_request_id
658 }
659
660 pub fn receive_publish(&mut self, msg: &Publish) -> Result<(), EndpointError> {
668 self.require_active_or_err()?;
669 self.inbound_publishes.insert(msg.request_id.into_inner(), msg.clone());
670 Ok(())
671 }
672
673 pub fn pending_publish(&self, request_id: VarInt) -> Option<&Publish> {
675 self.inbound_publishes.get(&request_id.into_inner())
676 }
677
678 pub fn pending_publish_count(&self) -> usize {
680 self.inbound_publishes.len()
681 }
682
683 #[allow(clippy::too_many_arguments)]
686 pub fn send_publish_ok(
687 &mut self,
688 request_id: VarInt,
689 forward: VarInt,
690 subscriber_priority: u8,
691 group_order: VarInt,
692 filter_type: VarInt,
693 start_group: Option<VarInt>,
694 start_object: Option<VarInt>,
695 end_group: Option<VarInt>,
696 ) -> Result<ControlMessage, EndpointError> {
697 let id = request_id.into_inner();
698 if !self.inbound_publishes.contains_key(&id) {
699 return Err(EndpointError::UnknownRequest(id));
700 }
701 self.inbound_publishes.remove(&id);
702 Ok(ControlMessage::PublishOk(PublishOk {
703 request_id,
704 forward,
705 subscriber_priority,
706 group_order,
707 filter_type,
708 start_group,
709 start_object,
710 end_group,
711 parameters: vec![],
712 }))
713 }
714
715 pub fn send_publish_error(
718 &mut self,
719 request_id: VarInt,
720 error_code: VarInt,
721 reason_phrase: Vec<u8>,
722 ) -> Result<ControlMessage, EndpointError> {
723 let id = request_id.into_inner();
724 if !self.inbound_publishes.contains_key(&id) {
725 return Err(EndpointError::UnknownRequest(id));
726 }
727 self.inbound_publishes.remove(&id);
728 Ok(ControlMessage::PublishError(PublishError { request_id, error_code, reason_phrase }))
729 }
730
731 pub fn receive_publish_ok(&mut self, _msg: &PublishOk) -> Result<(), EndpointError> {
734 Ok(())
735 }
736
737 pub fn receive_publish_error(&mut self, _msg: &PublishError) -> Result<(), EndpointError> {
740 Ok(())
741 }
742
743 pub fn receive_message(&mut self, msg: ControlMessage) -> Result<(), EndpointError> {
747 match msg {
748 ControlMessage::GoAway(ref m) => self.receive_goaway(m),
749 ControlMessage::MaxRequestId(ref m) => self.receive_max_request_id(m),
750 ControlMessage::RequestsBlocked(ref m) => self.receive_requests_blocked(m),
751 ControlMessage::SubscribeOk(ref m) => self.receive_subscribe_ok(m),
752 ControlMessage::SubscribeError(ref m) => self.receive_subscribe_error(m),
753 ControlMessage::SubscribeUpdate(ref m) => self.receive_subscribe_update(m),
754 ControlMessage::SubscribeDone(ref m) => self.receive_subscribe_done(m),
755 ControlMessage::FetchOk(ref m) => self.receive_fetch_ok(m),
756 ControlMessage::FetchError(ref m) => self.receive_fetch_error(m),
757 ControlMessage::SubscribeAnnouncesOk(ref m) => self.receive_subscribe_announces_ok(m),
758 ControlMessage::SubscribeAnnouncesError(ref m) => {
759 self.receive_subscribe_announces_error(m)
760 }
761 ControlMessage::AnnounceOk(ref m) => self.receive_announce_ok(m),
762 ControlMessage::AnnounceError(ref m) => self.receive_announce_error(m),
763 ControlMessage::AnnounceCancel(ref m) => self.receive_announce_cancel(m),
764 ControlMessage::TrackStatus(ref m) => self.receive_track_status(m),
765 ControlMessage::Publish(ref m) => self.receive_publish(m),
766 ControlMessage::PublishOk(ref m) => self.receive_publish_ok(m),
767 ControlMessage::PublishError(ref m) => self.receive_publish_error(m),
768 _ => Ok(()),
769 }
770 }
771}