Skip to main content

moqtap_client/draft13/
endpoint.rs

1use std::collections::HashMap;
2
3use crate::draft13::fetch::{FetchError, FetchStateMachine};
4use crate::draft13::namespace::{
5    AnnounceStateMachine, NamespaceError, SubscribeAnnouncesStateMachine,
6};
7use crate::draft13::session::request_id::{RequestIdAllocator, RequestIdError};
8use crate::draft13::session::setup::{self, SetupError};
9use crate::draft13::session::state::{SessionError, SessionState, SessionStateMachine};
10use crate::draft13::subscription::{SubscriptionError, SubscriptionStateMachine};
11use crate::draft13::track_status::{TrackStatusError, TrackStatusStateMachine};
12use moqtap_codec::draft13::message::{
13    self, Announce, AnnounceCancel, AnnounceError, AnnounceOk, ClientSetup, ControlMessage, Fetch,
14    FetchCancel, FetchPayload, FetchType, GoAway, MaxRequestId, Publish, PublishError, PublishOk,
15    RequestsBlocked, ServerSetup, Subscribe, SubscribeDone, SubscribeError, SubscribeNamespace,
16    SubscribeNamespaceError, SubscribeNamespaceOk, SubscribeOk, SubscribeUpdate, TrackStatus,
17    TrackStatusError as TrackStatusErrorMsg, TrackStatusOk, Unannounce, Unsubscribe,
18    UnsubscribeNamespace,
19};
20use moqtap_codec::kvp::{KeyValuePair, KvpValue};
21use moqtap_codec::types::*;
22use moqtap_codec::varint::VarInt;
23
24/// Key identifying a namespace (used for Announce maps).
25type NamespaceKey = Vec<Vec<u8>>;
26
27/// Errors that can occur during draft-13 endpoint operations.
28#[derive(Debug, thiserror::Error)]
29pub enum EndpointError {
30    /// A session-level state machine error.
31    #[error("session error: {0}")]
32    Session(#[from] SessionError),
33    /// A request ID allocation or validation error.
34    #[error("request ID error: {0}")]
35    RequestId(#[from] RequestIdError),
36    /// A subscription state machine error.
37    #[error("subscription error: {0}")]
38    Subscription(#[from] SubscriptionError),
39    /// A fetch state machine error.
40    #[error("fetch error: {0}")]
41    Fetch(#[from] FetchError),
42    /// A namespace state machine error.
43    #[error("namespace error: {0}")]
44    Namespace(#[from] NamespaceError),
45    /// A track status state machine error.
46    #[error("track status error: {0}")]
47    TrackStatus(#[from] TrackStatusError),
48    /// A setup negotiation error.
49    #[error("setup error: {0}")]
50    Setup(#[from] SetupError),
51    /// The request ID does not match any known state machine.
52    #[error("unknown request ID: {0}")]
53    UnknownRequest(u64),
54    /// The track namespace does not match any known state machine.
55    #[error("unknown namespace")]
56    UnknownNamespace,
57    /// The session is not in the Active state.
58    #[error("session not active")]
59    NotActive,
60    /// The session is draining and cannot accept new requests.
61    #[error("session is draining, no new requests allowed")]
62    Draining,
63}
64
65/// Unified draft-13 MoQT endpoint wrapping session lifecycle, request ID
66/// allocation, and all per-flow state machines (subscriptions, fetches,
67/// announces, subscribe-namespaces, track statuses).
68pub struct Endpoint {
69    session: SessionStateMachine,
70    request_ids: RequestIdAllocator,
71    /// Tracks the MAX_REQUEST_ID we have advertised to the peer.
72    advertised_max_id: u64,
73    subscriptions: HashMap<u64, SubscriptionStateMachine>,
74    fetches: HashMap<u64, FetchStateMachine>,
75    subscribe_namespaces: HashMap<u64, SubscribeAnnouncesStateMachine>,
76    announces: HashMap<u64, AnnounceStateMachine>,
77    /// Maps namespace tuple -> request_id, so callers can UNANNOUNCE / cancel
78    /// by namespace without threading the id through every API.
79    announce_ids: HashMap<NamespaceKey, u64>,
80    /// Maps namespace prefix tuple -> request_id for subscribe-namespaces.
81    subscribe_namespace_ids: HashMap<NamespaceKey, u64>,
82    track_statuses: HashMap<u64, TrackStatusStateMachine>,
83    /// Track aliases assigned by the publisher in SUBSCRIBE_OK, keyed by
84    /// the subscriber-chosen request id.
85    subscribe_track_aliases: HashMap<u64, VarInt>,
86    /// Inbound PUBLISH requests received from the peer, keyed by their
87    /// request id. Tracked so the application can respond with
88    /// PUBLISH_OK / PUBLISH_ERROR using the correct id.
89    inbound_publishes: HashMap<u64, Publish>,
90    negotiated_version: Option<VarInt>,
91    offered_versions: Vec<VarInt>,
92    goaway_uri: Option<Vec<u8>>,
93    /// The most recent `maximum_request_id` reported by the peer via a
94    /// `REQUESTS_BLOCKED` message.
95    peer_reported_max_request_id: Option<VarInt>,
96}
97
98impl Default for Endpoint {
99    fn default() -> Self {
100        Self::new()
101    }
102}
103
104impl Endpoint {
105    /// Create a new draft-13 endpoint.
106    pub fn new() -> Self {
107        Self {
108            session: SessionStateMachine::new(),
109            request_ids: RequestIdAllocator::new(),
110            advertised_max_id: 0,
111            subscriptions: HashMap::new(),
112            fetches: HashMap::new(),
113            subscribe_namespaces: HashMap::new(),
114            announces: HashMap::new(),
115            announce_ids: HashMap::new(),
116            subscribe_namespace_ids: HashMap::new(),
117            track_statuses: HashMap::new(),
118            subscribe_track_aliases: HashMap::new(),
119            inbound_publishes: HashMap::new(),
120            negotiated_version: None,
121            offered_versions: Vec::new(),
122            goaway_uri: None,
123            peer_reported_max_request_id: None,
124        }
125    }
126
127    // ── Accessors ──────────────────────────────────────────────
128
129    /// Returns the current session state.
130    pub fn session_state(&self) -> SessionState {
131        self.session.state()
132    }
133
134    /// Returns the negotiated MoQT version, if setup is complete.
135    pub fn negotiated_version(&self) -> Option<VarInt> {
136        self.negotiated_version
137    }
138
139    /// Returns the URI from a received GOAWAY message, if any.
140    pub fn goaway_uri(&self) -> Option<&[u8]> {
141        self.goaway_uri.as_deref()
142    }
143
144    /// Returns whether this endpoint is blocked on request ID allocation.
145    pub fn is_blocked(&self) -> bool {
146        self.request_ids.is_blocked()
147    }
148
149    /// Returns the number of active subscription state machines.
150    pub fn active_subscription_count(&self) -> usize {
151        self.subscriptions.len()
152    }
153
154    /// Returns the number of active fetch state machines.
155    pub fn active_fetch_count(&self) -> usize {
156        self.fetches.len()
157    }
158
159    /// Returns the number of active subscribe-namespace state machines.
160    pub fn active_subscribe_namespace_count(&self) -> usize {
161        self.subscribe_namespaces.len()
162    }
163
164    /// Returns the number of active announce state machines.
165    pub fn active_announce_count(&self) -> usize {
166        self.announces.len()
167    }
168
169    /// Returns the number of active track status state machines.
170    pub fn active_track_status_count(&self) -> usize {
171        self.track_statuses.len()
172    }
173
174    // ── Session lifecycle ──────────────────────────────────────
175
176    /// Transition from Connecting to SetupExchange.
177    pub fn connect(&mut self) -> Result<(), EndpointError> {
178        self.session.on_connect()?;
179        Ok(())
180    }
181
182    /// Close the session (Active or Draining -> Closed).
183    pub fn close(&mut self) -> Result<(), EndpointError> {
184        self.session.on_close()?;
185        Ok(())
186    }
187
188    // ── Client setup ───────────────────────────────────────────
189
190    /// Generate a CLIENT_SETUP message (client-side).
191    pub fn send_client_setup(
192        &mut self,
193        versions: Vec<VarInt>,
194        parameters: Vec<KeyValuePair>,
195    ) -> Result<ControlMessage, EndpointError> {
196        self.offered_versions = versions.clone();
197        let msg = ClientSetup { supported_versions: versions, parameters };
198        setup::validate_client_setup(&msg)?;
199        Ok(ControlMessage::ClientSetup(msg))
200    }
201
202    /// Process a SERVER_SETUP message (client-side). Transitions to Active.
203    /// If the server includes a MAX_REQUEST_ID parameter (key 0x02), the
204    /// request ID allocator is initialized with that value.
205    pub fn receive_server_setup(&mut self, msg: &ServerSetup) -> Result<(), EndpointError> {
206        setup::validate_server_setup(msg)?;
207        let version = setup::negotiate_version(&self.offered_versions, msg.selected_version)?;
208        self.negotiated_version = Some(version);
209        self.session.on_setup_complete()?;
210        // Extract MAX_REQUEST_ID (key 0x02) from setup parameters if present
211        for param in &msg.parameters {
212            if param.key == VarInt::from_u64(0x02).unwrap() {
213                if let KvpValue::Varint(v) = &param.value {
214                    self.request_ids.update_max(v.into_inner())?;
215                }
216            }
217        }
218        Ok(())
219    }
220
221    // ── Server setup ───────────────────────────────────────────
222
223    /// Process CLIENT_SETUP and generate SERVER_SETUP (server-side).
224    pub fn receive_client_setup_and_respond(
225        &mut self,
226        client_setup: &ClientSetup,
227        selected_version: VarInt,
228    ) -> Result<ControlMessage, EndpointError> {
229        setup::validate_client_setup(client_setup)?;
230        let version = setup::negotiate_version(&client_setup.supported_versions, selected_version)?;
231        self.negotiated_version = Some(version);
232        self.session.on_setup_complete()?;
233        let msg = ServerSetup { selected_version: version, parameters: vec![] };
234        Ok(ControlMessage::ServerSetup(msg))
235    }
236
237    // ── MAX_REQUEST_ID ─────────────────────────────────────────
238
239    /// Process an incoming MAX_REQUEST_ID message.
240    pub fn receive_max_request_id(&mut self, msg: &MaxRequestId) -> Result<(), EndpointError> {
241        self.request_ids.update_max(msg.request_id.into_inner())?;
242        Ok(())
243    }
244
245    /// Generate a MAX_REQUEST_ID message (typically server-side).
246    /// The value must strictly increase over previous sends.
247    pub fn send_max_request_id(&mut self, max_id: VarInt) -> Result<ControlMessage, EndpointError> {
248        let new_val = max_id.into_inner();
249        if new_val <= self.advertised_max_id && self.advertised_max_id > 0 {
250            return Err(EndpointError::RequestId(RequestIdError::Decreased(
251                self.advertised_max_id,
252                new_val,
253            )));
254        }
255        self.advertised_max_id = new_val;
256        Ok(ControlMessage::MaxRequestId(MaxRequestId { request_id: max_id }))
257    }
258
259    // ── GoAway ─────────────────────────────────────────────────
260
261    /// Process an incoming GOAWAY message. Transitions to Draining.
262    pub fn receive_goaway(&mut self, msg: &GoAway) -> Result<(), EndpointError> {
263        self.session.on_goaway()?;
264        self.goaway_uri = Some(msg.new_session_uri.clone());
265        Ok(())
266    }
267
268    // ── Subscribe flow ─────────────────────────────────────────
269
270    fn require_active_or_err(&self) -> Result<(), EndpointError> {
271        match self.session.state() {
272            SessionState::Active => Ok(()),
273            SessionState::Draining => Err(EndpointError::Draining),
274            _ => Err(EndpointError::NotActive),
275        }
276    }
277
278    /// Send a SUBSCRIBE message. Allocates a request ID and creates a
279    /// subscription state machine. The `filter_type` must be a valid
280    /// varint filter-type discriminant (see the draft-13 codec for
281    /// definitions). `LargestObject` (2) is a reasonable default.
282    ///
283    /// Draft-13 note: the track alias is no longer chosen by the subscriber
284    /// — it is returned by the publisher in `SUBSCRIBE_OK`. Callers that
285    /// need the alias should inspect `track_alias_for` after the OK arrives.
286    #[allow(clippy::too_many_arguments)]
287    pub fn subscribe(
288        &mut self,
289        track_namespace: TrackNamespace,
290        track_name: Vec<u8>,
291        subscriber_priority: u8,
292        group_order: VarInt,
293        filter_type: VarInt,
294    ) -> Result<(VarInt, ControlMessage), EndpointError> {
295        self.require_active_or_err()?;
296        let req_id = self.request_ids.allocate()?;
297
298        let mut sm = SubscriptionStateMachine::new();
299        sm.on_subscribe_sent()?;
300        self.subscriptions.insert(req_id.into_inner(), sm);
301
302        let msg = ControlMessage::Subscribe(Subscribe {
303            request_id: req_id,
304            track_namespace,
305            track_name,
306            subscriber_priority,
307            group_order,
308            forward: VarInt::from_u64(1).unwrap(),
309            filter_type,
310            start_group: None,
311            start_object: None,
312            end_group: None,
313            parameters: vec![],
314        });
315        Ok((req_id, msg))
316    }
317
318    /// Process an incoming SUBSCRIBE_OK.
319    ///
320    /// Draft-13 carries the publisher-assigned `track_alias` on SUBSCRIBE_OK,
321    /// which is recorded here so callers can retrieve it via
322    /// [`Endpoint::track_alias_for`].
323    pub fn receive_subscribe_ok(&mut self, msg: &SubscribeOk) -> Result<(), EndpointError> {
324        let id = msg.request_id.into_inner();
325        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
326        sm.on_subscribe_ok()?;
327        self.subscribe_track_aliases.insert(id, msg.track_alias);
328        Ok(())
329    }
330
331    /// Returns the publisher-assigned track alias for a subscription, if
332    /// SUBSCRIBE_OK has been processed for the given request id.
333    pub fn track_alias_for(&self, request_id: VarInt) -> Option<VarInt> {
334        self.subscribe_track_aliases.get(&request_id.into_inner()).copied()
335    }
336
337    /// Process an incoming SUBSCRIBE_ERROR.
338    pub fn receive_subscribe_error(&mut self, msg: &SubscribeError) -> Result<(), EndpointError> {
339        let id = msg.request_id.into_inner();
340        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
341        sm.on_subscribe_error()?;
342        Ok(())
343    }
344
345    /// Send an UNSUBSCRIBE message for an active subscription.
346    pub fn unsubscribe(&mut self, request_id: VarInt) -> Result<ControlMessage, EndpointError> {
347        let id = request_id.into_inner();
348        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
349        sm.on_unsubscribe()?;
350        Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
351    }
352
353    /// Process an incoming SUBSCRIBE_UPDATE.
354    pub fn receive_subscribe_update(&mut self, msg: &SubscribeUpdate) -> Result<(), EndpointError> {
355        let id = msg.request_id.into_inner();
356        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
357        sm.on_subscribe_update()?;
358        Ok(())
359    }
360
361    /// Process an incoming SUBSCRIBE_DONE (subscriber side — publisher finished).
362    pub fn receive_subscribe_done(&mut self, msg: &SubscribeDone) -> Result<(), EndpointError> {
363        let id = msg.request_id.into_inner();
364        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
365        sm.on_subscribe_done()?;
366        Ok(())
367    }
368
369    // ── Fetch flow ─────────────────────────────────────────────
370
371    /// Send a standalone FETCH message. Allocates a request ID and creates a
372    /// fetch state machine.
373    #[allow(clippy::too_many_arguments)]
374    pub fn fetch(
375        &mut self,
376        track_namespace: TrackNamespace,
377        track_name: Vec<u8>,
378        subscriber_priority: u8,
379        group_order: VarInt,
380        start_group: VarInt,
381        start_object: VarInt,
382        end_group: VarInt,
383        end_object: VarInt,
384    ) -> Result<(VarInt, ControlMessage), EndpointError> {
385        self.require_active_or_err()?;
386        let req_id = self.request_ids.allocate()?;
387
388        let mut sm = FetchStateMachine::new();
389        sm.on_fetch_sent()?;
390        self.fetches.insert(req_id.into_inner(), sm);
391
392        let msg = ControlMessage::Fetch(Fetch {
393            request_id: req_id,
394            subscriber_priority,
395            group_order,
396            fetch_type: FetchType::Standalone,
397            fetch_payload: FetchPayload::Standalone {
398                track_namespace,
399                track_name,
400                start_group,
401                start_object,
402                end_group,
403                end_object,
404            },
405            parameters: vec![],
406        });
407        Ok((req_id, msg))
408    }
409
410    /// Send a joining FETCH message that attaches to an existing subscription.
411    /// Allocates a new request ID for the fetch and tracks it in its own
412    /// fetch state machine. `joining_start` is interpreted per `fetch_type`
413    /// (relative offset vs absolute group id).
414    pub fn joining_fetch(
415        &mut self,
416        subscriber_priority: u8,
417        group_order: VarInt,
418        fetch_type: FetchType,
419        joining_subscribe_id: VarInt,
420        joining_start: VarInt,
421    ) -> Result<(VarInt, ControlMessage), EndpointError> {
422        self.require_active_or_err()?;
423        if !matches!(fetch_type, FetchType::RelativeJoining | FetchType::AbsoluteJoining) {
424            // Caller used the wrong API for a standalone fetch.
425            return Err(EndpointError::NotActive);
426        }
427        let req_id = self.request_ids.allocate()?;
428
429        let mut sm = FetchStateMachine::new();
430        sm.on_fetch_sent()?;
431        self.fetches.insert(req_id.into_inner(), sm);
432
433        let msg = ControlMessage::Fetch(Fetch {
434            request_id: req_id,
435            subscriber_priority,
436            group_order,
437            fetch_type,
438            fetch_payload: FetchPayload::Joining { joining_subscribe_id, joining_start },
439            parameters: vec![],
440        });
441        Ok((req_id, msg))
442    }
443
444    /// Process an incoming FETCH_OK.
445    pub fn receive_fetch_ok(&mut self, msg: &message::FetchOk) -> Result<(), EndpointError> {
446        let id = msg.request_id.into_inner();
447        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
448        sm.on_fetch_ok()?;
449        Ok(())
450    }
451
452    /// Process an incoming FETCH_ERROR.
453    pub fn receive_fetch_error(&mut self, msg: &message::FetchError) -> Result<(), EndpointError> {
454        let id = msg.request_id.into_inner();
455        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
456        sm.on_fetch_error()?;
457        Ok(())
458    }
459
460    /// Send a FETCH_CANCEL message.
461    pub fn fetch_cancel(&mut self, request_id: VarInt) -> Result<ControlMessage, EndpointError> {
462        let id = request_id.into_inner();
463        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
464        sm.on_fetch_cancel()?;
465        Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
466    }
467
468    /// Notify that a fetch data stream received FIN.
469    pub fn on_fetch_stream_fin(&mut self, request_id: VarInt) -> Result<(), EndpointError> {
470        let id = request_id.into_inner();
471        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
472        sm.on_stream_fin()?;
473        Ok(())
474    }
475
476    /// Notify that a fetch data stream was reset.
477    pub fn on_fetch_stream_reset(&mut self, request_id: VarInt) -> Result<(), EndpointError> {
478        let id = request_id.into_inner();
479        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
480        sm.on_stream_reset()?;
481        Ok(())
482    }
483
484    // ── Subscribe Namespace flow ──────────────────────────────
485
486    /// Send a SUBSCRIBE_NAMESPACE message. Returns the allocated request ID
487    /// alongside the control message so the caller can correlate replies.
488    pub fn subscribe_namespace(
489        &mut self,
490        track_namespace_prefix: TrackNamespace,
491    ) -> Result<(VarInt, ControlMessage), EndpointError> {
492        self.require_active_or_err()?;
493        let req_id = self.request_ids.allocate()?;
494        let key = track_namespace_prefix.0.clone();
495        let mut sm = SubscribeAnnouncesStateMachine::new();
496        sm.on_subscribe_announces_sent()?;
497        self.subscribe_namespaces.insert(req_id.into_inner(), sm);
498        self.subscribe_namespace_ids.insert(key, req_id.into_inner());
499        Ok((
500            req_id,
501            ControlMessage::SubscribeNamespace(SubscribeNamespace {
502                request_id: req_id,
503                track_namespace_prefix,
504                parameters: vec![],
505            }),
506        ))
507    }
508
509    /// Process an incoming SUBSCRIBE_NAMESPACE_OK.
510    pub fn receive_subscribe_namespace_ok(
511        &mut self,
512        msg: &SubscribeNamespaceOk,
513    ) -> Result<(), EndpointError> {
514        let id = msg.request_id.into_inner();
515        let sm = self.subscribe_namespaces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
516        sm.on_subscribe_announces_ok()?;
517        Ok(())
518    }
519
520    /// Process an incoming SUBSCRIBE_NAMESPACE_ERROR.
521    pub fn receive_subscribe_namespace_error(
522        &mut self,
523        msg: &SubscribeNamespaceError,
524    ) -> Result<(), EndpointError> {
525        let id = msg.request_id.into_inner();
526        let sm = self.subscribe_namespaces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
527        sm.on_subscribe_announces_error()?;
528        Ok(())
529    }
530
531    /// Send an UNSUBSCRIBE_NAMESPACE message.
532    pub fn unsubscribe_namespace(
533        &mut self,
534        track_namespace_prefix: TrackNamespace,
535    ) -> Result<ControlMessage, EndpointError> {
536        let id = *self
537            .subscribe_namespace_ids
538            .get(&track_namespace_prefix.0)
539            .ok_or(EndpointError::UnknownNamespace)?;
540        let sm = self.subscribe_namespaces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
541        sm.on_unsubscribe_announces()?;
542        Ok(ControlMessage::UnsubscribeNamespace(UnsubscribeNamespace { track_namespace_prefix }))
543    }
544
545    // ── Announce flow ──────────────────────────────────────────
546
547    /// Send an ANNOUNCE message. Returns the allocated request ID alongside
548    /// the control message.
549    pub fn announce(
550        &mut self,
551        track_namespace: TrackNamespace,
552    ) -> Result<(VarInt, ControlMessage), EndpointError> {
553        self.require_active_or_err()?;
554        let req_id = self.request_ids.allocate()?;
555        let key = track_namespace.0.clone();
556        let mut sm = AnnounceStateMachine::new();
557        sm.on_announce_sent()?;
558        self.announces.insert(req_id.into_inner(), sm);
559        self.announce_ids.insert(key, req_id.into_inner());
560        Ok((
561            req_id,
562            ControlMessage::Announce(Announce {
563                request_id: req_id,
564                track_namespace,
565                parameters: vec![],
566            }),
567        ))
568    }
569
570    /// Process an incoming ANNOUNCE_OK.
571    pub fn receive_announce_ok(&mut self, msg: &AnnounceOk) -> Result<(), EndpointError> {
572        let id = msg.request_id.into_inner();
573        let sm = self.announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
574        sm.on_announce_ok()?;
575        Ok(())
576    }
577
578    /// Process an incoming ANNOUNCE_ERROR.
579    pub fn receive_announce_error(&mut self, msg: &AnnounceError) -> Result<(), EndpointError> {
580        let id = msg.request_id.into_inner();
581        let sm = self.announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
582        sm.on_announce_error()?;
583        Ok(())
584    }
585
586    /// Process an incoming ANNOUNCE_CANCEL.
587    pub fn receive_announce_cancel(&mut self, msg: &AnnounceCancel) -> Result<(), EndpointError> {
588        let id = *self
589            .announce_ids
590            .get(&msg.track_namespace.0)
591            .ok_or(EndpointError::UnknownNamespace)?;
592        let sm = self.announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
593        sm.on_announce_cancel()?;
594        Ok(())
595    }
596
597    /// Send an UNANNOUNCE message (publisher withdrawing).
598    pub fn unannounce(
599        &mut self,
600        track_namespace: TrackNamespace,
601    ) -> Result<ControlMessage, EndpointError> {
602        let id =
603            *self.announce_ids.get(&track_namespace.0).ok_or(EndpointError::UnknownNamespace)?;
604        let sm = self.announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
605        sm.on_unannounce()?;
606        Ok(ControlMessage::Unannounce(Unannounce { track_namespace }))
607    }
608
609    // ── Track Status flow ──────────────────────────────────────
610
611    /// Send a TRACK_STATUS message (draft-13: subscribe-like request).
612    /// Returns the allocated request ID alongside the control message.
613    #[allow(clippy::too_many_arguments)]
614    pub fn track_status_request(
615        &mut self,
616        track_namespace: TrackNamespace,
617        track_name: Vec<u8>,
618        subscriber_priority: u8,
619        group_order: VarInt,
620        forward: VarInt,
621        filter_type: VarInt,
622    ) -> Result<(VarInt, ControlMessage), EndpointError> {
623        self.require_active_or_err()?;
624        let req_id = self.request_ids.allocate()?;
625        let mut sm = TrackStatusStateMachine::new();
626        sm.on_track_status_request_sent()?;
627        self.track_statuses.insert(req_id.into_inner(), sm);
628        Ok((
629            req_id,
630            ControlMessage::TrackStatus(TrackStatus {
631                request_id: req_id,
632                track_namespace,
633                track_name,
634                subscriber_priority,
635                group_order,
636                forward,
637                filter_type,
638                parameters: vec![],
639            }),
640        ))
641    }
642
643    /// Process an incoming TRACK_STATUS_OK reply (draft-13).
644    pub fn receive_track_status_ok(&mut self, msg: &TrackStatusOk) -> Result<(), EndpointError> {
645        let id = msg.request_id.into_inner();
646        let sm = self.track_statuses.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
647        sm.on_track_status()?;
648        Ok(())
649    }
650
651    /// Process an incoming TRACK_STATUS_ERROR reply (draft-13).
652    pub fn receive_track_status_error(
653        &mut self,
654        msg: &TrackStatusErrorMsg,
655    ) -> Result<(), EndpointError> {
656        let id = msg.request_id.into_inner();
657        let sm = self.track_statuses.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
658        sm.on_track_status()?;
659        Ok(())
660    }
661
662    // ── Requests blocked ───────────────────────────────────────
663
664    /// Process an incoming REQUESTS_BLOCKED message.
665    ///
666    /// The peer explicitly reports that a new request id would exceed our
667    /// advertised maximum. The endpoint records the peer's reported maximum;
668    /// acting on it (issuing a new `MAX_REQUEST_ID`) is up to the caller.
669    pub fn receive_requests_blocked(&mut self, msg: &RequestsBlocked) -> Result<(), EndpointError> {
670        self.peer_reported_max_request_id = Some(msg.maximum_request_id);
671        Ok(())
672    }
673
674    /// The maximum request id that the peer most recently reported in a
675    /// `REQUESTS_BLOCKED` message, if any.
676    pub fn peer_reported_max_request_id(&self) -> Option<VarInt> {
677        self.peer_reported_max_request_id
678    }
679
680    // ── Publish flow (draft-13) ────────────────────────────────
681
682    /// Process an incoming PUBLISH message. The message is recorded so the
683    /// application can respond with [`Endpoint::send_publish_ok`] or
684    /// [`Endpoint::send_publish_error`] using the same request id. This is a
685    /// pass-through: no full state machine is modelled here — the codec
686    /// struct carries everything the application needs to decide.
687    pub fn receive_publish(&mut self, msg: &Publish) -> Result<(), EndpointError> {
688        self.require_active_or_err()?;
689        self.inbound_publishes.insert(msg.request_id.into_inner(), msg.clone());
690        Ok(())
691    }
692
693    /// Returns the inbound PUBLISH message for a given request id, if any.
694    pub fn pending_publish(&self, request_id: VarInt) -> Option<&Publish> {
695        self.inbound_publishes.get(&request_id.into_inner())
696    }
697
698    /// Number of PUBLISH requests received but not yet responded to.
699    pub fn pending_publish_count(&self) -> usize {
700        self.inbound_publishes.len()
701    }
702
703    /// Generate a PUBLISH_OK response for a previously received PUBLISH.
704    /// Removes the pending publish entry on success.
705    #[allow(clippy::too_many_arguments)]
706    pub fn send_publish_ok(
707        &mut self,
708        request_id: VarInt,
709        forward: VarInt,
710        subscriber_priority: u8,
711        group_order: VarInt,
712        filter_type: VarInt,
713        start_group: Option<VarInt>,
714        start_object: Option<VarInt>,
715        end_group: Option<VarInt>,
716    ) -> Result<ControlMessage, EndpointError> {
717        let id = request_id.into_inner();
718        if !self.inbound_publishes.contains_key(&id) {
719            return Err(EndpointError::UnknownRequest(id));
720        }
721        self.inbound_publishes.remove(&id);
722        Ok(ControlMessage::PublishOk(PublishOk {
723            request_id,
724            forward,
725            subscriber_priority,
726            group_order,
727            filter_type,
728            start_group,
729            start_object,
730            end_group,
731            parameters: vec![],
732        }))
733    }
734
735    /// Generate a PUBLISH_ERROR response for a previously received PUBLISH.
736    /// Removes the pending publish entry on success.
737    pub fn send_publish_error(
738        &mut self,
739        request_id: VarInt,
740        error_code: VarInt,
741        reason_phrase: Vec<u8>,
742    ) -> Result<ControlMessage, EndpointError> {
743        let id = request_id.into_inner();
744        if !self.inbound_publishes.contains_key(&id) {
745            return Err(EndpointError::UnknownRequest(id));
746        }
747        self.inbound_publishes.remove(&id);
748        Ok(ControlMessage::PublishError(PublishError { request_id, error_code, reason_phrase }))
749    }
750
751    /// Process an incoming PUBLISH_OK (publisher side — peer accepted our
752    /// PUBLISH offer). Pass-through: no state machine is currently modelled.
753    pub fn receive_publish_ok(&mut self, _msg: &PublishOk) -> Result<(), EndpointError> {
754        Ok(())
755    }
756
757    /// Process an incoming PUBLISH_ERROR (publisher side — peer rejected our
758    /// PUBLISH offer). Pass-through: no state machine is currently modelled.
759    pub fn receive_publish_error(&mut self, _msg: &PublishError) -> Result<(), EndpointError> {
760        Ok(())
761    }
762
763    // ── Unified message dispatch ───────────────────────────────
764
765    /// Dispatch an incoming control message to the appropriate handler.
766    pub fn receive_message(&mut self, msg: ControlMessage) -> Result<(), EndpointError> {
767        match msg {
768            ControlMessage::GoAway(ref m) => self.receive_goaway(m),
769            ControlMessage::MaxRequestId(ref m) => self.receive_max_request_id(m),
770            ControlMessage::RequestsBlocked(ref m) => self.receive_requests_blocked(m),
771            ControlMessage::SubscribeOk(ref m) => self.receive_subscribe_ok(m),
772            ControlMessage::SubscribeError(ref m) => self.receive_subscribe_error(m),
773            ControlMessage::SubscribeUpdate(ref m) => self.receive_subscribe_update(m),
774            ControlMessage::SubscribeDone(ref m) => self.receive_subscribe_done(m),
775            ControlMessage::FetchOk(ref m) => self.receive_fetch_ok(m),
776            ControlMessage::FetchError(ref m) => self.receive_fetch_error(m),
777            ControlMessage::SubscribeNamespaceOk(ref m) => self.receive_subscribe_namespace_ok(m),
778            ControlMessage::SubscribeNamespaceError(ref m) => {
779                self.receive_subscribe_namespace_error(m)
780            }
781            ControlMessage::AnnounceOk(ref m) => self.receive_announce_ok(m),
782            ControlMessage::AnnounceError(ref m) => self.receive_announce_error(m),
783            ControlMessage::AnnounceCancel(ref m) => self.receive_announce_cancel(m),
784            ControlMessage::TrackStatusOk(ref m) => self.receive_track_status_ok(m),
785            ControlMessage::TrackStatusError(ref m) => self.receive_track_status_error(m),
786            ControlMessage::Publish(ref m) => self.receive_publish(m),
787            ControlMessage::PublishOk(ref m) => self.receive_publish_ok(m),
788            ControlMessage::PublishError(ref m) => self.receive_publish_error(m),
789            _ => Ok(()),
790        }
791    }
792}