Skip to main content

moqtap_client/draft12/
endpoint.rs

1use std::collections::HashMap;
2
3use crate::draft12::fetch::{FetchError, FetchStateMachine};
4use crate::draft12::namespace::{
5    AnnounceStateMachine, NamespaceError, SubscribeAnnouncesStateMachine,
6};
7use crate::draft12::session::request_id::{RequestIdAllocator, RequestIdError};
8use crate::draft12::session::setup::{self, SetupError};
9use crate::draft12::session::state::{SessionError, SessionState, SessionStateMachine};
10use crate::draft12::subscription::{SubscriptionError, SubscriptionStateMachine};
11use crate::draft12::track_status::{TrackStatusError, TrackStatusStateMachine};
12use moqtap_codec::draft12::message::{
13    self, Announce, AnnounceCancel, AnnounceError, AnnounceOk, ClientSetup, ControlMessage, Fetch,
14    FetchCancel, FetchPayload, FetchType, GoAway, MaxRequestId, Publish, PublishError, PublishOk,
15    RequestsBlocked, ServerSetup, Subscribe, SubscribeAnnounces, SubscribeAnnouncesError,
16    SubscribeAnnouncesOk, SubscribeDone, SubscribeError, SubscribeOk, SubscribeUpdate, TrackStatus,
17    TrackStatusRequest, Unannounce, Unsubscribe, UnsubscribeAnnounces,
18};
19use moqtap_codec::kvp::{KeyValuePair, KvpValue};
20use moqtap_codec::types::*;
21use moqtap_codec::varint::VarInt;
22
23/// Key identifying a namespace (used for Announce maps).
24type NamespaceKey = Vec<Vec<u8>>;
25
26/// Errors that can occur during draft-12 endpoint operations.
27#[derive(Debug, thiserror::Error)]
28pub enum EndpointError {
29    /// A session-level state machine error.
30    #[error("session error: {0}")]
31    Session(#[from] SessionError),
32    /// A request ID allocation or validation error.
33    #[error("request ID error: {0}")]
34    RequestId(#[from] RequestIdError),
35    /// A subscription state machine error.
36    #[error("subscription error: {0}")]
37    Subscription(#[from] SubscriptionError),
38    /// A fetch state machine error.
39    #[error("fetch error: {0}")]
40    Fetch(#[from] FetchError),
41    /// A namespace state machine error.
42    #[error("namespace error: {0}")]
43    Namespace(#[from] NamespaceError),
44    /// A track status state machine error.
45    #[error("track status error: {0}")]
46    TrackStatus(#[from] TrackStatusError),
47    /// A setup negotiation error.
48    #[error("setup error: {0}")]
49    Setup(#[from] SetupError),
50    /// The request ID does not match any known state machine.
51    #[error("unknown request ID: {0}")]
52    UnknownRequest(u64),
53    /// The track namespace does not match any known state machine.
54    #[error("unknown namespace")]
55    UnknownNamespace,
56    /// The session is not in the Active state.
57    #[error("session not active")]
58    NotActive,
59    /// The session is draining and cannot accept new requests.
60    #[error("session is draining, no new requests allowed")]
61    Draining,
62}
63
64/// Unified draft-12 MoQT endpoint wrapping session lifecycle, request ID
65/// allocation, and all per-flow state machines (subscriptions, fetches,
66/// announces, subscribe-announces, track statuses).
67pub struct Endpoint {
68    session: SessionStateMachine,
69    request_ids: RequestIdAllocator,
70    /// Tracks the MAX_REQUEST_ID we have advertised to the peer.
71    advertised_max_id: u64,
72    subscriptions: HashMap<u64, SubscriptionStateMachine>,
73    fetches: HashMap<u64, FetchStateMachine>,
74    subscribe_announces: HashMap<u64, SubscribeAnnouncesStateMachine>,
75    announces: HashMap<u64, AnnounceStateMachine>,
76    /// Maps namespace tuple -> request_id, so callers can UNANNOUNCE / cancel
77    /// by namespace without threading the id through every API.
78    announce_ids: HashMap<NamespaceKey, u64>,
79    /// Maps namespace prefix tuple -> request_id for subscribe-announces.
80    subscribe_announces_ids: HashMap<NamespaceKey, u64>,
81    track_statuses: HashMap<u64, TrackStatusStateMachine>,
82    /// Track aliases assigned by the publisher in SUBSCRIBE_OK, keyed by
83    /// the subscriber-chosen request id.
84    subscribe_track_aliases: HashMap<u64, VarInt>,
85    /// Inbound PUBLISH requests received from the peer, keyed by their
86    /// request id. Tracked so the application can respond with
87    /// PUBLISH_OK / PUBLISH_ERROR using the correct id.
88    inbound_publishes: HashMap<u64, Publish>,
89    negotiated_version: Option<VarInt>,
90    offered_versions: Vec<VarInt>,
91    goaway_uri: Option<Vec<u8>>,
92    /// The most recent `maximum_request_id` reported by the peer via a
93    /// `REQUESTS_BLOCKED` message.
94    peer_reported_max_request_id: Option<VarInt>,
95}
96
97impl Default for Endpoint {
98    fn default() -> Self {
99        Self::new()
100    }
101}
102
103impl Endpoint {
104    /// Create a new draft-12 endpoint.
105    pub fn new() -> Self {
106        Self {
107            session: SessionStateMachine::new(),
108            request_ids: RequestIdAllocator::new(),
109            advertised_max_id: 0,
110            subscriptions: HashMap::new(),
111            fetches: HashMap::new(),
112            subscribe_announces: HashMap::new(),
113            announces: HashMap::new(),
114            announce_ids: HashMap::new(),
115            subscribe_announces_ids: HashMap::new(),
116            track_statuses: HashMap::new(),
117            subscribe_track_aliases: HashMap::new(),
118            inbound_publishes: HashMap::new(),
119            negotiated_version: None,
120            offered_versions: Vec::new(),
121            goaway_uri: None,
122            peer_reported_max_request_id: None,
123        }
124    }
125
126    // ── Accessors ──────────────────────────────────────────────
127
128    /// Returns the current session state.
129    pub fn session_state(&self) -> SessionState {
130        self.session.state()
131    }
132
133    /// Returns the negotiated MoQT version, if setup is complete.
134    pub fn negotiated_version(&self) -> Option<VarInt> {
135        self.negotiated_version
136    }
137
138    /// Returns the URI from a received GOAWAY message, if any.
139    pub fn goaway_uri(&self) -> Option<&[u8]> {
140        self.goaway_uri.as_deref()
141    }
142
143    /// Returns whether this endpoint is blocked on request ID allocation.
144    pub fn is_blocked(&self) -> bool {
145        self.request_ids.is_blocked()
146    }
147
148    /// Returns the number of active subscription state machines.
149    pub fn active_subscription_count(&self) -> usize {
150        self.subscriptions.len()
151    }
152
153    /// Returns the number of active fetch state machines.
154    pub fn active_fetch_count(&self) -> usize {
155        self.fetches.len()
156    }
157
158    /// Returns the number of active subscribe-announces state machines.
159    pub fn active_subscribe_announces_count(&self) -> usize {
160        self.subscribe_announces.len()
161    }
162
163    /// Returns the number of active announce state machines.
164    pub fn active_announce_count(&self) -> usize {
165        self.announces.len()
166    }
167
168    /// Returns the number of active track status state machines.
169    pub fn active_track_status_count(&self) -> usize {
170        self.track_statuses.len()
171    }
172
173    // ── Session lifecycle ──────────────────────────────────────
174
175    /// Transition from Connecting to SetupExchange.
176    pub fn connect(&mut self) -> Result<(), EndpointError> {
177        self.session.on_connect()?;
178        Ok(())
179    }
180
181    /// Close the session (Active or Draining -> Closed).
182    pub fn close(&mut self) -> Result<(), EndpointError> {
183        self.session.on_close()?;
184        Ok(())
185    }
186
187    // ── Client setup ───────────────────────────────────────────
188
189    /// Generate a CLIENT_SETUP message (client-side).
190    pub fn send_client_setup(
191        &mut self,
192        versions: Vec<VarInt>,
193        parameters: Vec<KeyValuePair>,
194    ) -> Result<ControlMessage, EndpointError> {
195        self.offered_versions = versions.clone();
196        let msg = ClientSetup { supported_versions: versions, parameters };
197        setup::validate_client_setup(&msg)?;
198        Ok(ControlMessage::ClientSetup(msg))
199    }
200
201    /// Process a SERVER_SETUP message (client-side). Transitions to Active.
202    /// If the server includes a MAX_REQUEST_ID parameter (key 0x02), the
203    /// request ID allocator is initialized with that value.
204    pub fn receive_server_setup(&mut self, msg: &ServerSetup) -> Result<(), EndpointError> {
205        setup::validate_server_setup(msg)?;
206        let version = setup::negotiate_version(&self.offered_versions, msg.selected_version)?;
207        self.negotiated_version = Some(version);
208        self.session.on_setup_complete()?;
209        // Extract MAX_REQUEST_ID (key 0x02) from setup parameters if present
210        for param in &msg.parameters {
211            if param.key == VarInt::from_u64(0x02).unwrap() {
212                if let KvpValue::Varint(v) = &param.value {
213                    self.request_ids.update_max(v.into_inner())?;
214                }
215            }
216        }
217        Ok(())
218    }
219
220    // ── Server setup ───────────────────────────────────────────
221
222    /// Process CLIENT_SETUP and generate SERVER_SETUP (server-side).
223    pub fn receive_client_setup_and_respond(
224        &mut self,
225        client_setup: &ClientSetup,
226        selected_version: VarInt,
227    ) -> Result<ControlMessage, EndpointError> {
228        setup::validate_client_setup(client_setup)?;
229        let version = setup::negotiate_version(&client_setup.supported_versions, selected_version)?;
230        self.negotiated_version = Some(version);
231        self.session.on_setup_complete()?;
232        let msg = ServerSetup { selected_version: version, parameters: vec![] };
233        Ok(ControlMessage::ServerSetup(msg))
234    }
235
236    // ── MAX_REQUEST_ID ─────────────────────────────────────────
237
238    /// Process an incoming MAX_REQUEST_ID message.
239    pub fn receive_max_request_id(&mut self, msg: &MaxRequestId) -> Result<(), EndpointError> {
240        self.request_ids.update_max(msg.request_id.into_inner())?;
241        Ok(())
242    }
243
244    /// Generate a MAX_REQUEST_ID message (typically server-side).
245    /// The value must strictly increase over previous sends.
246    pub fn send_max_request_id(&mut self, max_id: VarInt) -> Result<ControlMessage, EndpointError> {
247        let new_val = max_id.into_inner();
248        if new_val <= self.advertised_max_id && self.advertised_max_id > 0 {
249            return Err(EndpointError::RequestId(RequestIdError::Decreased(
250                self.advertised_max_id,
251                new_val,
252            )));
253        }
254        self.advertised_max_id = new_val;
255        Ok(ControlMessage::MaxRequestId(MaxRequestId { request_id: max_id }))
256    }
257
258    // ── GoAway ─────────────────────────────────────────────────
259
260    /// Process an incoming GOAWAY message. Transitions to Draining.
261    pub fn receive_goaway(&mut self, msg: &GoAway) -> Result<(), EndpointError> {
262        self.session.on_goaway()?;
263        self.goaway_uri = Some(msg.new_session_uri.clone());
264        Ok(())
265    }
266
267    // ── Subscribe flow ─────────────────────────────────────────
268
269    fn require_active_or_err(&self) -> Result<(), EndpointError> {
270        match self.session.state() {
271            SessionState::Active => Ok(()),
272            SessionState::Draining => Err(EndpointError::Draining),
273            _ => Err(EndpointError::NotActive),
274        }
275    }
276
277    /// Send a SUBSCRIBE message. Allocates a request ID and creates a
278    /// subscription state machine. The `filter_type` must be a valid
279    /// varint filter-type discriminant (see the draft-12 codec for
280    /// definitions). `LargestObject` (2) is a reasonable default.
281    ///
282    /// Draft-12 note: the track alias is no longer chosen by the subscriber
283    /// — it is returned by the publisher in `SUBSCRIBE_OK`. Callers that
284    /// need the alias should inspect `track_alias_for` after the OK arrives.
285    #[allow(clippy::too_many_arguments)]
286    pub fn subscribe(
287        &mut self,
288        track_namespace: TrackNamespace,
289        track_name: Vec<u8>,
290        subscriber_priority: u8,
291        group_order: VarInt,
292        filter_type: VarInt,
293    ) -> Result<(VarInt, ControlMessage), EndpointError> {
294        self.require_active_or_err()?;
295        let req_id = self.request_ids.allocate()?;
296
297        let mut sm = SubscriptionStateMachine::new();
298        sm.on_subscribe_sent()?;
299        self.subscriptions.insert(req_id.into_inner(), sm);
300
301        let msg = ControlMessage::Subscribe(Subscribe {
302            request_id: req_id,
303            track_namespace,
304            track_name,
305            subscriber_priority,
306            group_order,
307            forward: VarInt::from_u64(1).unwrap(),
308            filter_type,
309            start_group: None,
310            start_object: None,
311            end_group: None,
312            parameters: vec![],
313        });
314        Ok((req_id, msg))
315    }
316
317    /// Process an incoming SUBSCRIBE_OK.
318    ///
319    /// Draft-12 carries the publisher-assigned `track_alias` on SUBSCRIBE_OK,
320    /// which is recorded here so callers can retrieve it via
321    /// [`Endpoint::track_alias_for`].
322    pub fn receive_subscribe_ok(&mut self, msg: &SubscribeOk) -> Result<(), EndpointError> {
323        let id = msg.request_id.into_inner();
324        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
325        sm.on_subscribe_ok()?;
326        self.subscribe_track_aliases.insert(id, msg.track_alias);
327        Ok(())
328    }
329
330    /// Returns the publisher-assigned track alias for a subscription, if
331    /// SUBSCRIBE_OK has been processed for the given request id.
332    pub fn track_alias_for(&self, request_id: VarInt) -> Option<VarInt> {
333        self.subscribe_track_aliases.get(&request_id.into_inner()).copied()
334    }
335
336    /// Process an incoming SUBSCRIBE_ERROR.
337    pub fn receive_subscribe_error(&mut self, msg: &SubscribeError) -> Result<(), EndpointError> {
338        let id = msg.request_id.into_inner();
339        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
340        sm.on_subscribe_error()?;
341        Ok(())
342    }
343
344    /// Send an UNSUBSCRIBE message for an active subscription.
345    pub fn unsubscribe(&mut self, request_id: VarInt) -> Result<ControlMessage, EndpointError> {
346        let id = request_id.into_inner();
347        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
348        sm.on_unsubscribe()?;
349        Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
350    }
351
352    /// Process an incoming SUBSCRIBE_UPDATE.
353    pub fn receive_subscribe_update(&mut self, msg: &SubscribeUpdate) -> Result<(), EndpointError> {
354        let id = msg.request_id.into_inner();
355        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
356        sm.on_subscribe_update()?;
357        Ok(())
358    }
359
360    /// Process an incoming SUBSCRIBE_DONE (subscriber side — publisher finished).
361    pub fn receive_subscribe_done(&mut self, msg: &SubscribeDone) -> Result<(), EndpointError> {
362        let id = msg.request_id.into_inner();
363        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
364        sm.on_subscribe_done()?;
365        Ok(())
366    }
367
368    // ── Fetch flow ─────────────────────────────────────────────
369
370    /// Send a standalone FETCH message. Allocates a request ID and creates a
371    /// fetch state machine.
372    #[allow(clippy::too_many_arguments)]
373    pub fn fetch(
374        &mut self,
375        track_namespace: TrackNamespace,
376        track_name: Vec<u8>,
377        subscriber_priority: u8,
378        group_order: VarInt,
379        start_group: VarInt,
380        start_object: VarInt,
381        end_group: VarInt,
382        end_object: VarInt,
383    ) -> Result<(VarInt, ControlMessage), EndpointError> {
384        self.require_active_or_err()?;
385        let req_id = self.request_ids.allocate()?;
386
387        let mut sm = FetchStateMachine::new();
388        sm.on_fetch_sent()?;
389        self.fetches.insert(req_id.into_inner(), sm);
390
391        let msg = ControlMessage::Fetch(Fetch {
392            request_id: req_id,
393            subscriber_priority,
394            group_order,
395            fetch_type: FetchType::Standalone,
396            fetch_payload: FetchPayload::Standalone {
397                track_namespace,
398                track_name,
399                start_group,
400                start_object,
401                end_group,
402                end_object,
403            },
404            parameters: vec![],
405        });
406        Ok((req_id, msg))
407    }
408
409    /// Send a joining FETCH message that attaches to an existing subscription.
410    /// Allocates a new request ID for the fetch and tracks it in its own
411    /// fetch state machine. `joining_start` is interpreted per `fetch_type`
412    /// (relative offset vs absolute group id).
413    pub fn joining_fetch(
414        &mut self,
415        subscriber_priority: u8,
416        group_order: VarInt,
417        fetch_type: FetchType,
418        joining_subscribe_id: VarInt,
419        joining_start: VarInt,
420    ) -> Result<(VarInt, ControlMessage), EndpointError> {
421        self.require_active_or_err()?;
422        if !matches!(fetch_type, FetchType::RelativeJoining | FetchType::AbsoluteJoining) {
423            // Caller used the wrong API for a standalone fetch.
424            return Err(EndpointError::NotActive);
425        }
426        let req_id = self.request_ids.allocate()?;
427
428        let mut sm = FetchStateMachine::new();
429        sm.on_fetch_sent()?;
430        self.fetches.insert(req_id.into_inner(), sm);
431
432        let msg = ControlMessage::Fetch(Fetch {
433            request_id: req_id,
434            subscriber_priority,
435            group_order,
436            fetch_type,
437            fetch_payload: FetchPayload::Joining { joining_subscribe_id, joining_start },
438            parameters: vec![],
439        });
440        Ok((req_id, msg))
441    }
442
443    /// Process an incoming FETCH_OK.
444    pub fn receive_fetch_ok(&mut self, msg: &message::FetchOk) -> Result<(), EndpointError> {
445        let id = msg.request_id.into_inner();
446        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
447        sm.on_fetch_ok()?;
448        Ok(())
449    }
450
451    /// Process an incoming FETCH_ERROR.
452    pub fn receive_fetch_error(&mut self, msg: &message::FetchError) -> Result<(), EndpointError> {
453        let id = msg.request_id.into_inner();
454        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
455        sm.on_fetch_error()?;
456        Ok(())
457    }
458
459    /// Send a FETCH_CANCEL message.
460    pub fn fetch_cancel(&mut self, request_id: VarInt) -> Result<ControlMessage, EndpointError> {
461        let id = request_id.into_inner();
462        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
463        sm.on_fetch_cancel()?;
464        Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
465    }
466
467    /// Notify that a fetch data stream received FIN.
468    pub fn on_fetch_stream_fin(&mut self, request_id: VarInt) -> Result<(), EndpointError> {
469        let id = request_id.into_inner();
470        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
471        sm.on_stream_fin()?;
472        Ok(())
473    }
474
475    /// Notify that a fetch data stream was reset.
476    pub fn on_fetch_stream_reset(&mut self, request_id: VarInt) -> Result<(), EndpointError> {
477        let id = request_id.into_inner();
478        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
479        sm.on_stream_reset()?;
480        Ok(())
481    }
482
483    // ── Subscribe Announces flow ───────────────────────────────
484
485    /// Send a SUBSCRIBE_ANNOUNCES message. Returns the allocated request ID
486    /// alongside the control message so the caller can correlate replies.
487    pub fn subscribe_announces(
488        &mut self,
489        track_namespace_prefix: TrackNamespace,
490    ) -> Result<(VarInt, ControlMessage), EndpointError> {
491        self.require_active_or_err()?;
492        let req_id = self.request_ids.allocate()?;
493        let key = track_namespace_prefix.0.clone();
494        let mut sm = SubscribeAnnouncesStateMachine::new();
495        sm.on_subscribe_announces_sent()?;
496        self.subscribe_announces.insert(req_id.into_inner(), sm);
497        self.subscribe_announces_ids.insert(key, req_id.into_inner());
498        Ok((
499            req_id,
500            ControlMessage::SubscribeAnnounces(SubscribeAnnounces {
501                request_id: req_id,
502                track_namespace_prefix,
503                parameters: vec![],
504            }),
505        ))
506    }
507
508    /// Process an incoming SUBSCRIBE_ANNOUNCES_OK.
509    pub fn receive_subscribe_announces_ok(
510        &mut self,
511        msg: &SubscribeAnnouncesOk,
512    ) -> Result<(), EndpointError> {
513        let id = msg.request_id.into_inner();
514        let sm = self.subscribe_announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
515        sm.on_subscribe_announces_ok()?;
516        Ok(())
517    }
518
519    /// Process an incoming SUBSCRIBE_ANNOUNCES_ERROR.
520    pub fn receive_subscribe_announces_error(
521        &mut self,
522        msg: &SubscribeAnnouncesError,
523    ) -> Result<(), EndpointError> {
524        let id = msg.request_id.into_inner();
525        let sm = self.subscribe_announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
526        sm.on_subscribe_announces_error()?;
527        Ok(())
528    }
529
530    /// Send an UNSUBSCRIBE_ANNOUNCES message.
531    pub fn unsubscribe_announces(
532        &mut self,
533        track_namespace_prefix: TrackNamespace,
534    ) -> Result<ControlMessage, EndpointError> {
535        let id = *self
536            .subscribe_announces_ids
537            .get(&track_namespace_prefix.0)
538            .ok_or(EndpointError::UnknownNamespace)?;
539        let sm = self.subscribe_announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
540        sm.on_unsubscribe_announces()?;
541        Ok(ControlMessage::UnsubscribeAnnounces(UnsubscribeAnnounces { track_namespace_prefix }))
542    }
543
544    // ── Announce flow ──────────────────────────────────────────
545
546    /// Send an ANNOUNCE message. Returns the allocated request ID alongside
547    /// the control message.
548    pub fn announce(
549        &mut self,
550        track_namespace: TrackNamespace,
551    ) -> Result<(VarInt, ControlMessage), EndpointError> {
552        self.require_active_or_err()?;
553        let req_id = self.request_ids.allocate()?;
554        let key = track_namespace.0.clone();
555        let mut sm = AnnounceStateMachine::new();
556        sm.on_announce_sent()?;
557        self.announces.insert(req_id.into_inner(), sm);
558        self.announce_ids.insert(key, req_id.into_inner());
559        Ok((
560            req_id,
561            ControlMessage::Announce(Announce {
562                request_id: req_id,
563                track_namespace,
564                parameters: vec![],
565            }),
566        ))
567    }
568
569    /// Process an incoming ANNOUNCE_OK.
570    pub fn receive_announce_ok(&mut self, msg: &AnnounceOk) -> Result<(), EndpointError> {
571        let id = msg.request_id.into_inner();
572        let sm = self.announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
573        sm.on_announce_ok()?;
574        Ok(())
575    }
576
577    /// Process an incoming ANNOUNCE_ERROR.
578    pub fn receive_announce_error(&mut self, msg: &AnnounceError) -> Result<(), EndpointError> {
579        let id = msg.request_id.into_inner();
580        let sm = self.announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
581        sm.on_announce_error()?;
582        Ok(())
583    }
584
585    /// Process an incoming ANNOUNCE_CANCEL.
586    pub fn receive_announce_cancel(&mut self, msg: &AnnounceCancel) -> Result<(), EndpointError> {
587        let id = *self
588            .announce_ids
589            .get(&msg.track_namespace.0)
590            .ok_or(EndpointError::UnknownNamespace)?;
591        let sm = self.announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
592        sm.on_announce_cancel()?;
593        Ok(())
594    }
595
596    /// Send an UNANNOUNCE message (publisher withdrawing).
597    pub fn unannounce(
598        &mut self,
599        track_namespace: TrackNamespace,
600    ) -> Result<ControlMessage, EndpointError> {
601        let id =
602            *self.announce_ids.get(&track_namespace.0).ok_or(EndpointError::UnknownNamespace)?;
603        let sm = self.announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
604        sm.on_unannounce()?;
605        Ok(ControlMessage::Unannounce(Unannounce { track_namespace }))
606    }
607
608    // ── Track Status flow ──────────────────────────────────────
609
610    /// Send a TRACK_STATUS_REQUEST message. Returns the allocated request ID
611    /// alongside the control message so the caller can correlate replies.
612    pub fn track_status_request(
613        &mut self,
614        track_namespace: TrackNamespace,
615        track_name: Vec<u8>,
616    ) -> Result<(VarInt, ControlMessage), EndpointError> {
617        self.require_active_or_err()?;
618        let req_id = self.request_ids.allocate()?;
619        let mut sm = TrackStatusStateMachine::new();
620        sm.on_track_status_request_sent()?;
621        self.track_statuses.insert(req_id.into_inner(), sm);
622        Ok((
623            req_id,
624            ControlMessage::TrackStatusRequest(TrackStatusRequest {
625                request_id: req_id,
626                track_namespace,
627                track_name,
628                parameters: vec![],
629            }),
630        ))
631    }
632
633    /// Process an incoming TRACK_STATUS reply.
634    pub fn receive_track_status(&mut self, msg: &TrackStatus) -> Result<(), EndpointError> {
635        let id = msg.request_id.into_inner();
636        let sm = self.track_statuses.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
637        sm.on_track_status()?;
638        Ok(())
639    }
640
641    // ── Requests blocked ───────────────────────────────────────
642
643    /// Process an incoming REQUESTS_BLOCKED message.
644    ///
645    /// Draft-12 renames draft-12's SUBSCRIBES_BLOCKED to REQUESTS_BLOCKED so
646    /// the peer can explicitly report that a new request id would exceed our
647    /// advertised maximum. The endpoint records the peer's reported maximum;
648    /// acting on it (issuing a new `MAX_REQUEST_ID`) is up to the caller.
649    pub fn receive_requests_blocked(&mut self, msg: &RequestsBlocked) -> Result<(), EndpointError> {
650        self.peer_reported_max_request_id = Some(msg.maximum_request_id);
651        Ok(())
652    }
653
654    /// The maximum request id that the peer most recently reported in a
655    /// `REQUESTS_BLOCKED` message, if any.
656    pub fn peer_reported_max_request_id(&self) -> Option<VarInt> {
657        self.peer_reported_max_request_id
658    }
659
660    // ── Publish flow (draft-12) ────────────────────────────────
661
662    /// Process an incoming PUBLISH message. The message is recorded so the
663    /// application can respond with [`Endpoint::send_publish_ok`] or
664    /// [`Endpoint::send_publish_error`] using the same request id. This is a
665    /// pass-through: no full state machine is modelled here — the codec
666    /// struct carries everything the application needs to decide.
667    pub fn receive_publish(&mut self, msg: &Publish) -> Result<(), EndpointError> {
668        self.require_active_or_err()?;
669        self.inbound_publishes.insert(msg.request_id.into_inner(), msg.clone());
670        Ok(())
671    }
672
673    /// Returns the inbound PUBLISH message for a given request id, if any.
674    pub fn pending_publish(&self, request_id: VarInt) -> Option<&Publish> {
675        self.inbound_publishes.get(&request_id.into_inner())
676    }
677
678    /// Number of PUBLISH requests received but not yet responded to.
679    pub fn pending_publish_count(&self) -> usize {
680        self.inbound_publishes.len()
681    }
682
683    /// Generate a PUBLISH_OK response for a previously received PUBLISH.
684    /// Removes the pending publish entry on success.
685    #[allow(clippy::too_many_arguments)]
686    pub fn send_publish_ok(
687        &mut self,
688        request_id: VarInt,
689        forward: VarInt,
690        subscriber_priority: u8,
691        group_order: VarInt,
692        filter_type: VarInt,
693        start_group: Option<VarInt>,
694        start_object: Option<VarInt>,
695        end_group: Option<VarInt>,
696    ) -> Result<ControlMessage, EndpointError> {
697        let id = request_id.into_inner();
698        if !self.inbound_publishes.contains_key(&id) {
699            return Err(EndpointError::UnknownRequest(id));
700        }
701        self.inbound_publishes.remove(&id);
702        Ok(ControlMessage::PublishOk(PublishOk {
703            request_id,
704            forward,
705            subscriber_priority,
706            group_order,
707            filter_type,
708            start_group,
709            start_object,
710            end_group,
711            parameters: vec![],
712        }))
713    }
714
715    /// Generate a PUBLISH_ERROR response for a previously received PUBLISH.
716    /// Removes the pending publish entry on success.
717    pub fn send_publish_error(
718        &mut self,
719        request_id: VarInt,
720        error_code: VarInt,
721        reason_phrase: Vec<u8>,
722    ) -> Result<ControlMessage, EndpointError> {
723        let id = request_id.into_inner();
724        if !self.inbound_publishes.contains_key(&id) {
725            return Err(EndpointError::UnknownRequest(id));
726        }
727        self.inbound_publishes.remove(&id);
728        Ok(ControlMessage::PublishError(PublishError { request_id, error_code, reason_phrase }))
729    }
730
731    /// Process an incoming PUBLISH_OK (publisher side — peer accepted our
732    /// PUBLISH offer). Pass-through: no state machine is currently modelled.
733    pub fn receive_publish_ok(&mut self, _msg: &PublishOk) -> Result<(), EndpointError> {
734        Ok(())
735    }
736
737    /// Process an incoming PUBLISH_ERROR (publisher side — peer rejected our
738    /// PUBLISH offer). Pass-through: no state machine is currently modelled.
739    pub fn receive_publish_error(&mut self, _msg: &PublishError) -> Result<(), EndpointError> {
740        Ok(())
741    }
742
743    // ── Unified message dispatch ───────────────────────────────
744
745    /// Dispatch an incoming control message to the appropriate handler.
746    pub fn receive_message(&mut self, msg: ControlMessage) -> Result<(), EndpointError> {
747        match msg {
748            ControlMessage::GoAway(ref m) => self.receive_goaway(m),
749            ControlMessage::MaxRequestId(ref m) => self.receive_max_request_id(m),
750            ControlMessage::RequestsBlocked(ref m) => self.receive_requests_blocked(m),
751            ControlMessage::SubscribeOk(ref m) => self.receive_subscribe_ok(m),
752            ControlMessage::SubscribeError(ref m) => self.receive_subscribe_error(m),
753            ControlMessage::SubscribeUpdate(ref m) => self.receive_subscribe_update(m),
754            ControlMessage::SubscribeDone(ref m) => self.receive_subscribe_done(m),
755            ControlMessage::FetchOk(ref m) => self.receive_fetch_ok(m),
756            ControlMessage::FetchError(ref m) => self.receive_fetch_error(m),
757            ControlMessage::SubscribeAnnouncesOk(ref m) => self.receive_subscribe_announces_ok(m),
758            ControlMessage::SubscribeAnnouncesError(ref m) => {
759                self.receive_subscribe_announces_error(m)
760            }
761            ControlMessage::AnnounceOk(ref m) => self.receive_announce_ok(m),
762            ControlMessage::AnnounceError(ref m) => self.receive_announce_error(m),
763            ControlMessage::AnnounceCancel(ref m) => self.receive_announce_cancel(m),
764            ControlMessage::TrackStatus(ref m) => self.receive_track_status(m),
765            ControlMessage::Publish(ref m) => self.receive_publish(m),
766            ControlMessage::PublishOk(ref m) => self.receive_publish_ok(m),
767            ControlMessage::PublishError(ref m) => self.receive_publish_error(m),
768            _ => Ok(()),
769        }
770    }
771}