Skip to main content

moqtap_client/draft11/
endpoint.rs

1use std::collections::HashMap;
2
3use crate::draft11::fetch::{FetchError, FetchStateMachine};
4use crate::draft11::namespace::{
5    AnnounceStateMachine, NamespaceError, SubscribeAnnouncesStateMachine,
6};
7use crate::draft11::session::request_id::{RequestIdAllocator, RequestIdError};
8use crate::draft11::session::setup::{self, SetupError};
9use crate::draft11::session::state::{SessionError, SessionState, SessionStateMachine};
10use crate::draft11::subscription::{SubscriptionError, SubscriptionStateMachine};
11use crate::draft11::track_status::{TrackStatusError, TrackStatusStateMachine};
12use moqtap_codec::draft11::message::{
13    self, Announce, AnnounceCancel, AnnounceError, AnnounceOk, ClientSetup, ControlMessage, Fetch,
14    FetchCancel, FetchPayload, FetchType, GoAway, MaxRequestId, RequestsBlocked, ServerSetup,
15    Subscribe, SubscribeAnnounces, SubscribeAnnouncesError, SubscribeAnnouncesOk, SubscribeDone,
16    SubscribeError, SubscribeOk, SubscribeUpdate, TrackStatus, TrackStatusRequest, Unannounce,
17    Unsubscribe, UnsubscribeAnnounces,
18};
19use moqtap_codec::kvp::{KeyValuePair, KvpValue};
20use moqtap_codec::types::*;
21use moqtap_codec::varint::VarInt;
22
23/// Key identifying a namespace (used for Announce maps).
24type NamespaceKey = Vec<Vec<u8>>;
25
26/// Errors that can occur during draft-11 endpoint operations.
27#[derive(Debug, thiserror::Error)]
28pub enum EndpointError {
29    /// A session-level state machine error.
30    #[error("session error: {0}")]
31    Session(#[from] SessionError),
32    /// A request ID allocation or validation error.
33    #[error("request ID error: {0}")]
34    RequestId(#[from] RequestIdError),
35    /// A subscription state machine error.
36    #[error("subscription error: {0}")]
37    Subscription(#[from] SubscriptionError),
38    /// A fetch state machine error.
39    #[error("fetch error: {0}")]
40    Fetch(#[from] FetchError),
41    /// A namespace state machine error.
42    #[error("namespace error: {0}")]
43    Namespace(#[from] NamespaceError),
44    /// A track status state machine error.
45    #[error("track status error: {0}")]
46    TrackStatus(#[from] TrackStatusError),
47    /// A setup negotiation error.
48    #[error("setup error: {0}")]
49    Setup(#[from] SetupError),
50    /// The request ID does not match any known state machine.
51    #[error("unknown request ID: {0}")]
52    UnknownRequest(u64),
53    /// The track namespace does not match any known state machine.
54    #[error("unknown namespace")]
55    UnknownNamespace,
56    /// The session is not in the Active state.
57    #[error("session not active")]
58    NotActive,
59    /// The session is draining and cannot accept new requests.
60    #[error("session is draining, no new requests allowed")]
61    Draining,
62}
63
64/// Unified draft-11 MoQT endpoint wrapping session lifecycle, request ID
65/// allocation, and all per-flow state machines (subscriptions, fetches,
66/// announces, subscribe-announces, track statuses).
67pub struct Endpoint {
68    session: SessionStateMachine,
69    request_ids: RequestIdAllocator,
70    /// Tracks the MAX_REQUEST_ID we have advertised to the peer.
71    advertised_max_id: u64,
72    subscriptions: HashMap<u64, SubscriptionStateMachine>,
73    fetches: HashMap<u64, FetchStateMachine>,
74    subscribe_announces: HashMap<u64, SubscribeAnnouncesStateMachine>,
75    announces: HashMap<u64, AnnounceStateMachine>,
76    /// Maps namespace tuple -> request_id, so callers can UNANNOUNCE / cancel
77    /// by namespace without threading the id through every API.
78    announce_ids: HashMap<NamespaceKey, u64>,
79    /// Maps namespace prefix tuple -> request_id for subscribe-announces.
80    subscribe_announces_ids: HashMap<NamespaceKey, u64>,
81    track_statuses: HashMap<u64, TrackStatusStateMachine>,
82    negotiated_version: Option<VarInt>,
83    offered_versions: Vec<VarInt>,
84    goaway_uri: Option<Vec<u8>>,
85    /// The most recent `maximum_request_id` reported by the peer via a
86    /// `REQUESTS_BLOCKED` message.
87    peer_reported_max_request_id: Option<VarInt>,
88}
89
90impl Default for Endpoint {
91    fn default() -> Self {
92        Self::new()
93    }
94}
95
96impl Endpoint {
97    /// Create a new draft-11 endpoint.
98    pub fn new() -> Self {
99        Self {
100            session: SessionStateMachine::new(),
101            request_ids: RequestIdAllocator::new(),
102            advertised_max_id: 0,
103            subscriptions: HashMap::new(),
104            fetches: HashMap::new(),
105            subscribe_announces: HashMap::new(),
106            announces: HashMap::new(),
107            announce_ids: HashMap::new(),
108            subscribe_announces_ids: HashMap::new(),
109            track_statuses: HashMap::new(),
110            negotiated_version: None,
111            offered_versions: Vec::new(),
112            goaway_uri: None,
113            peer_reported_max_request_id: None,
114        }
115    }
116
117    // ── Accessors ──────────────────────────────────────────────
118
119    /// Returns the current session state.
120    pub fn session_state(&self) -> SessionState {
121        self.session.state()
122    }
123
124    /// Returns the negotiated MoQT version, if setup is complete.
125    pub fn negotiated_version(&self) -> Option<VarInt> {
126        self.negotiated_version
127    }
128
129    /// Returns the URI from a received GOAWAY message, if any.
130    pub fn goaway_uri(&self) -> Option<&[u8]> {
131        self.goaway_uri.as_deref()
132    }
133
134    /// Returns whether this endpoint is blocked on request ID allocation.
135    pub fn is_blocked(&self) -> bool {
136        self.request_ids.is_blocked()
137    }
138
139    /// Returns the number of active subscription state machines.
140    pub fn active_subscription_count(&self) -> usize {
141        self.subscriptions.len()
142    }
143
144    /// Returns the number of active fetch state machines.
145    pub fn active_fetch_count(&self) -> usize {
146        self.fetches.len()
147    }
148
149    /// Returns the number of active subscribe-announces state machines.
150    pub fn active_subscribe_announces_count(&self) -> usize {
151        self.subscribe_announces.len()
152    }
153
154    /// Returns the number of active announce state machines.
155    pub fn active_announce_count(&self) -> usize {
156        self.announces.len()
157    }
158
159    /// Returns the number of active track status state machines.
160    pub fn active_track_status_count(&self) -> usize {
161        self.track_statuses.len()
162    }
163
164    // ── Session lifecycle ──────────────────────────────────────
165
166    /// Transition from Connecting to SetupExchange.
167    pub fn connect(&mut self) -> Result<(), EndpointError> {
168        self.session.on_connect()?;
169        Ok(())
170    }
171
172    /// Close the session (Active or Draining -> Closed).
173    pub fn close(&mut self) -> Result<(), EndpointError> {
174        self.session.on_close()?;
175        Ok(())
176    }
177
178    // ── Client setup ───────────────────────────────────────────
179
180    /// Generate a CLIENT_SETUP message (client-side).
181    pub fn send_client_setup(
182        &mut self,
183        versions: Vec<VarInt>,
184        parameters: Vec<KeyValuePair>,
185    ) -> Result<ControlMessage, EndpointError> {
186        self.offered_versions = versions.clone();
187        let msg = ClientSetup { supported_versions: versions, parameters };
188        setup::validate_client_setup(&msg)?;
189        Ok(ControlMessage::ClientSetup(msg))
190    }
191
192    /// Process a SERVER_SETUP message (client-side). Transitions to Active.
193    /// If the server includes a MAX_REQUEST_ID parameter (key 0x02), the
194    /// request ID allocator is initialized with that value.
195    pub fn receive_server_setup(&mut self, msg: &ServerSetup) -> Result<(), EndpointError> {
196        setup::validate_server_setup(msg)?;
197        let version = setup::negotiate_version(&self.offered_versions, msg.selected_version)?;
198        self.negotiated_version = Some(version);
199        self.session.on_setup_complete()?;
200        // Extract MAX_REQUEST_ID (key 0x02) from setup parameters if present
201        for param in &msg.parameters {
202            if param.key == VarInt::from_u64(0x02).unwrap() {
203                if let KvpValue::Varint(v) = &param.value {
204                    self.request_ids.update_max(v.into_inner())?;
205                }
206            }
207        }
208        Ok(())
209    }
210
211    // ── Server setup ───────────────────────────────────────────
212
213    /// Process CLIENT_SETUP and generate SERVER_SETUP (server-side).
214    pub fn receive_client_setup_and_respond(
215        &mut self,
216        client_setup: &ClientSetup,
217        selected_version: VarInt,
218    ) -> Result<ControlMessage, EndpointError> {
219        setup::validate_client_setup(client_setup)?;
220        let version = setup::negotiate_version(&client_setup.supported_versions, selected_version)?;
221        self.negotiated_version = Some(version);
222        self.session.on_setup_complete()?;
223        let msg = ServerSetup { selected_version: version, parameters: vec![] };
224        Ok(ControlMessage::ServerSetup(msg))
225    }
226
227    // ── MAX_REQUEST_ID ─────────────────────────────────────────
228
229    /// Process an incoming MAX_REQUEST_ID message.
230    pub fn receive_max_request_id(&mut self, msg: &MaxRequestId) -> Result<(), EndpointError> {
231        self.request_ids.update_max(msg.request_id.into_inner())?;
232        Ok(())
233    }
234
235    /// Generate a MAX_REQUEST_ID message (typically server-side).
236    /// The value must strictly increase over previous sends.
237    pub fn send_max_request_id(&mut self, max_id: VarInt) -> Result<ControlMessage, EndpointError> {
238        let new_val = max_id.into_inner();
239        if new_val <= self.advertised_max_id && self.advertised_max_id > 0 {
240            return Err(EndpointError::RequestId(RequestIdError::Decreased(
241                self.advertised_max_id,
242                new_val,
243            )));
244        }
245        self.advertised_max_id = new_val;
246        Ok(ControlMessage::MaxRequestId(MaxRequestId { request_id: max_id }))
247    }
248
249    // ── GoAway ─────────────────────────────────────────────────
250
251    /// Process an incoming GOAWAY message. Transitions to Draining.
252    pub fn receive_goaway(&mut self, msg: &GoAway) -> Result<(), EndpointError> {
253        self.session.on_goaway()?;
254        self.goaway_uri = Some(msg.new_session_uri.clone());
255        Ok(())
256    }
257
258    // ── Subscribe flow ─────────────────────────────────────────
259
260    fn require_active_or_err(&self) -> Result<(), EndpointError> {
261        match self.session.state() {
262            SessionState::Active => Ok(()),
263            SessionState::Draining => Err(EndpointError::Draining),
264            _ => Err(EndpointError::NotActive),
265        }
266    }
267
268    /// Send a SUBSCRIBE message. Allocates a request ID and creates a
269    /// subscription state machine. The `filter_type` must be a valid
270    /// varint filter-type discriminant (see the draft-11 codec for
271    /// definitions). `LargestObject` (2) is a reasonable default.
272    #[allow(clippy::too_many_arguments)]
273    pub fn subscribe(
274        &mut self,
275        track_alias: VarInt,
276        track_namespace: TrackNamespace,
277        track_name: Vec<u8>,
278        subscriber_priority: u8,
279        group_order: VarInt,
280        filter_type: VarInt,
281    ) -> Result<(VarInt, ControlMessage), EndpointError> {
282        self.require_active_or_err()?;
283        let req_id = self.request_ids.allocate()?;
284
285        let mut sm = SubscriptionStateMachine::new();
286        sm.on_subscribe_sent()?;
287        self.subscriptions.insert(req_id.into_inner(), sm);
288
289        let msg = ControlMessage::Subscribe(Subscribe {
290            request_id: req_id,
291            track_alias,
292            track_namespace,
293            track_name,
294            subscriber_priority,
295            group_order,
296            forward: VarInt::from_u64(1).unwrap(),
297            filter_type,
298            start_group: None,
299            start_object: None,
300            end_group: None,
301            parameters: vec![],
302        });
303        Ok((req_id, msg))
304    }
305
306    /// Process an incoming SUBSCRIBE_OK.
307    pub fn receive_subscribe_ok(&mut self, msg: &SubscribeOk) -> Result<(), EndpointError> {
308        let id = msg.request_id.into_inner();
309        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
310        sm.on_subscribe_ok()?;
311        Ok(())
312    }
313
314    /// Process an incoming SUBSCRIBE_ERROR.
315    pub fn receive_subscribe_error(&mut self, msg: &SubscribeError) -> Result<(), EndpointError> {
316        let id = msg.request_id.into_inner();
317        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
318        sm.on_subscribe_error()?;
319        Ok(())
320    }
321
322    /// Send an UNSUBSCRIBE message for an active subscription.
323    pub fn unsubscribe(&mut self, request_id: VarInt) -> Result<ControlMessage, EndpointError> {
324        let id = request_id.into_inner();
325        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
326        sm.on_unsubscribe()?;
327        Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
328    }
329
330    /// Process an incoming SUBSCRIBE_UPDATE.
331    pub fn receive_subscribe_update(&mut self, msg: &SubscribeUpdate) -> Result<(), EndpointError> {
332        let id = msg.request_id.into_inner();
333        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
334        sm.on_subscribe_update()?;
335        Ok(())
336    }
337
338    /// Process an incoming SUBSCRIBE_DONE (subscriber side — publisher finished).
339    pub fn receive_subscribe_done(&mut self, msg: &SubscribeDone) -> Result<(), EndpointError> {
340        let id = msg.request_id.into_inner();
341        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
342        sm.on_subscribe_done()?;
343        Ok(())
344    }
345
346    // ── Fetch flow ─────────────────────────────────────────────
347
348    /// Send a standalone FETCH message. Allocates a request ID and creates a
349    /// fetch state machine.
350    #[allow(clippy::too_many_arguments)]
351    pub fn fetch(
352        &mut self,
353        track_namespace: TrackNamespace,
354        track_name: Vec<u8>,
355        subscriber_priority: u8,
356        group_order: VarInt,
357        start_group: VarInt,
358        start_object: VarInt,
359        end_group: VarInt,
360        end_object: VarInt,
361    ) -> Result<(VarInt, ControlMessage), EndpointError> {
362        self.require_active_or_err()?;
363        let req_id = self.request_ids.allocate()?;
364
365        let mut sm = FetchStateMachine::new();
366        sm.on_fetch_sent()?;
367        self.fetches.insert(req_id.into_inner(), sm);
368
369        let msg = ControlMessage::Fetch(Fetch {
370            request_id: req_id,
371            subscriber_priority,
372            group_order,
373            fetch_type: FetchType::Standalone,
374            fetch_payload: FetchPayload::Standalone {
375                track_namespace,
376                track_name,
377                start_group,
378                start_object,
379                end_group,
380                end_object,
381            },
382            parameters: vec![],
383        });
384        Ok((req_id, msg))
385    }
386
387    /// Send a joining FETCH message that attaches to an existing subscription.
388    /// Allocates a new request ID for the fetch and tracks it in its own
389    /// fetch state machine. `joining_start` is interpreted per `fetch_type`
390    /// (relative offset vs absolute group id).
391    pub fn joining_fetch(
392        &mut self,
393        subscriber_priority: u8,
394        group_order: VarInt,
395        fetch_type: FetchType,
396        joining_subscribe_id: VarInt,
397        joining_start: VarInt,
398    ) -> Result<(VarInt, ControlMessage), EndpointError> {
399        self.require_active_or_err()?;
400        if !matches!(fetch_type, FetchType::RelativeJoining | FetchType::AbsoluteJoining) {
401            // Caller used the wrong API for a standalone fetch.
402            return Err(EndpointError::NotActive);
403        }
404        let req_id = self.request_ids.allocate()?;
405
406        let mut sm = FetchStateMachine::new();
407        sm.on_fetch_sent()?;
408        self.fetches.insert(req_id.into_inner(), sm);
409
410        let msg = ControlMessage::Fetch(Fetch {
411            request_id: req_id,
412            subscriber_priority,
413            group_order,
414            fetch_type,
415            fetch_payload: FetchPayload::Joining { joining_subscribe_id, joining_start },
416            parameters: vec![],
417        });
418        Ok((req_id, msg))
419    }
420
421    /// Process an incoming FETCH_OK.
422    pub fn receive_fetch_ok(&mut self, msg: &message::FetchOk) -> Result<(), EndpointError> {
423        let id = msg.request_id.into_inner();
424        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
425        sm.on_fetch_ok()?;
426        Ok(())
427    }
428
429    /// Process an incoming FETCH_ERROR.
430    pub fn receive_fetch_error(&mut self, msg: &message::FetchError) -> Result<(), EndpointError> {
431        let id = msg.request_id.into_inner();
432        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
433        sm.on_fetch_error()?;
434        Ok(())
435    }
436
437    /// Send a FETCH_CANCEL message.
438    pub fn fetch_cancel(&mut self, request_id: VarInt) -> Result<ControlMessage, EndpointError> {
439        let id = request_id.into_inner();
440        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
441        sm.on_fetch_cancel()?;
442        Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
443    }
444
445    /// Notify that a fetch data stream received FIN.
446    pub fn on_fetch_stream_fin(&mut self, request_id: VarInt) -> Result<(), EndpointError> {
447        let id = request_id.into_inner();
448        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
449        sm.on_stream_fin()?;
450        Ok(())
451    }
452
453    /// Notify that a fetch data stream was reset.
454    pub fn on_fetch_stream_reset(&mut self, request_id: VarInt) -> Result<(), EndpointError> {
455        let id = request_id.into_inner();
456        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
457        sm.on_stream_reset()?;
458        Ok(())
459    }
460
461    // ── Subscribe Announces flow ───────────────────────────────
462
463    /// Send a SUBSCRIBE_ANNOUNCES message. Returns the allocated request ID
464    /// alongside the control message so the caller can correlate replies.
465    pub fn subscribe_announces(
466        &mut self,
467        track_namespace_prefix: TrackNamespace,
468    ) -> Result<(VarInt, ControlMessage), EndpointError> {
469        self.require_active_or_err()?;
470        let req_id = self.request_ids.allocate()?;
471        let key = track_namespace_prefix.0.clone();
472        let mut sm = SubscribeAnnouncesStateMachine::new();
473        sm.on_subscribe_announces_sent()?;
474        self.subscribe_announces.insert(req_id.into_inner(), sm);
475        self.subscribe_announces_ids.insert(key, req_id.into_inner());
476        Ok((
477            req_id,
478            ControlMessage::SubscribeAnnounces(SubscribeAnnounces {
479                request_id: req_id,
480                track_namespace_prefix,
481                parameters: vec![],
482            }),
483        ))
484    }
485
486    /// Process an incoming SUBSCRIBE_ANNOUNCES_OK.
487    pub fn receive_subscribe_announces_ok(
488        &mut self,
489        msg: &SubscribeAnnouncesOk,
490    ) -> Result<(), EndpointError> {
491        let id = msg.request_id.into_inner();
492        let sm = self.subscribe_announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
493        sm.on_subscribe_announces_ok()?;
494        Ok(())
495    }
496
497    /// Process an incoming SUBSCRIBE_ANNOUNCES_ERROR.
498    pub fn receive_subscribe_announces_error(
499        &mut self,
500        msg: &SubscribeAnnouncesError,
501    ) -> Result<(), EndpointError> {
502        let id = msg.request_id.into_inner();
503        let sm = self.subscribe_announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
504        sm.on_subscribe_announces_error()?;
505        Ok(())
506    }
507
508    /// Send an UNSUBSCRIBE_ANNOUNCES message.
509    pub fn unsubscribe_announces(
510        &mut self,
511        track_namespace_prefix: TrackNamespace,
512    ) -> Result<ControlMessage, EndpointError> {
513        let id = *self
514            .subscribe_announces_ids
515            .get(&track_namespace_prefix.0)
516            .ok_or(EndpointError::UnknownNamespace)?;
517        let sm = self.subscribe_announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
518        sm.on_unsubscribe_announces()?;
519        Ok(ControlMessage::UnsubscribeAnnounces(UnsubscribeAnnounces { track_namespace_prefix }))
520    }
521
522    // ── Announce flow ──────────────────────────────────────────
523
524    /// Send an ANNOUNCE message. Returns the allocated request ID alongside
525    /// the control message.
526    pub fn announce(
527        &mut self,
528        track_namespace: TrackNamespace,
529    ) -> Result<(VarInt, ControlMessage), EndpointError> {
530        self.require_active_or_err()?;
531        let req_id = self.request_ids.allocate()?;
532        let key = track_namespace.0.clone();
533        let mut sm = AnnounceStateMachine::new();
534        sm.on_announce_sent()?;
535        self.announces.insert(req_id.into_inner(), sm);
536        self.announce_ids.insert(key, req_id.into_inner());
537        Ok((
538            req_id,
539            ControlMessage::Announce(Announce {
540                request_id: req_id,
541                track_namespace,
542                parameters: vec![],
543            }),
544        ))
545    }
546
547    /// Process an incoming ANNOUNCE_OK.
548    pub fn receive_announce_ok(&mut self, msg: &AnnounceOk) -> Result<(), EndpointError> {
549        let id = msg.request_id.into_inner();
550        let sm = self.announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
551        sm.on_announce_ok()?;
552        Ok(())
553    }
554
555    /// Process an incoming ANNOUNCE_ERROR.
556    pub fn receive_announce_error(&mut self, msg: &AnnounceError) -> Result<(), EndpointError> {
557        let id = msg.request_id.into_inner();
558        let sm = self.announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
559        sm.on_announce_error()?;
560        Ok(())
561    }
562
563    /// Process an incoming ANNOUNCE_CANCEL.
564    pub fn receive_announce_cancel(&mut self, msg: &AnnounceCancel) -> Result<(), EndpointError> {
565        let id = *self
566            .announce_ids
567            .get(&msg.track_namespace.0)
568            .ok_or(EndpointError::UnknownNamespace)?;
569        let sm = self.announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
570        sm.on_announce_cancel()?;
571        Ok(())
572    }
573
574    /// Send an UNANNOUNCE message (publisher withdrawing).
575    pub fn unannounce(
576        &mut self,
577        track_namespace: TrackNamespace,
578    ) -> Result<ControlMessage, EndpointError> {
579        let id =
580            *self.announce_ids.get(&track_namespace.0).ok_or(EndpointError::UnknownNamespace)?;
581        let sm = self.announces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
582        sm.on_unannounce()?;
583        Ok(ControlMessage::Unannounce(Unannounce { track_namespace }))
584    }
585
586    // ── Track Status flow ──────────────────────────────────────
587
588    /// Send a TRACK_STATUS_REQUEST message. Returns the allocated request ID
589    /// alongside the control message so the caller can correlate replies.
590    pub fn track_status_request(
591        &mut self,
592        track_namespace: TrackNamespace,
593        track_name: Vec<u8>,
594    ) -> Result<(VarInt, ControlMessage), EndpointError> {
595        self.require_active_or_err()?;
596        let req_id = self.request_ids.allocate()?;
597        let mut sm = TrackStatusStateMachine::new();
598        sm.on_track_status_request_sent()?;
599        self.track_statuses.insert(req_id.into_inner(), sm);
600        Ok((
601            req_id,
602            ControlMessage::TrackStatusRequest(TrackStatusRequest {
603                request_id: req_id,
604                track_namespace,
605                track_name,
606                parameters: vec![],
607            }),
608        ))
609    }
610
611    /// Process an incoming TRACK_STATUS reply.
612    pub fn receive_track_status(&mut self, msg: &TrackStatus) -> Result<(), EndpointError> {
613        let id = msg.request_id.into_inner();
614        let sm = self.track_statuses.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
615        sm.on_track_status()?;
616        Ok(())
617    }
618
619    // ── Requests blocked ───────────────────────────────────────
620
621    /// Process an incoming REQUESTS_BLOCKED message.
622    ///
623    /// Draft-11 renames draft-10's SUBSCRIBES_BLOCKED to REQUESTS_BLOCKED so
624    /// the peer can explicitly report that a new request id would exceed our
625    /// advertised maximum. The endpoint records the peer's reported maximum;
626    /// acting on it (issuing a new `MAX_REQUEST_ID`) is up to the caller.
627    pub fn receive_requests_blocked(&mut self, msg: &RequestsBlocked) -> Result<(), EndpointError> {
628        self.peer_reported_max_request_id = Some(msg.maximum_request_id);
629        Ok(())
630    }
631
632    /// The maximum request id that the peer most recently reported in a
633    /// `REQUESTS_BLOCKED` message, if any.
634    pub fn peer_reported_max_request_id(&self) -> Option<VarInt> {
635        self.peer_reported_max_request_id
636    }
637
638    // ── Unified message dispatch ───────────────────────────────
639
640    /// Dispatch an incoming control message to the appropriate handler.
641    pub fn receive_message(&mut self, msg: ControlMessage) -> Result<(), EndpointError> {
642        match msg {
643            ControlMessage::GoAway(ref m) => self.receive_goaway(m),
644            ControlMessage::MaxRequestId(ref m) => self.receive_max_request_id(m),
645            ControlMessage::RequestsBlocked(ref m) => self.receive_requests_blocked(m),
646            ControlMessage::SubscribeOk(ref m) => self.receive_subscribe_ok(m),
647            ControlMessage::SubscribeError(ref m) => self.receive_subscribe_error(m),
648            ControlMessage::SubscribeUpdate(ref m) => self.receive_subscribe_update(m),
649            ControlMessage::SubscribeDone(ref m) => self.receive_subscribe_done(m),
650            ControlMessage::FetchOk(ref m) => self.receive_fetch_ok(m),
651            ControlMessage::FetchError(ref m) => self.receive_fetch_error(m),
652            ControlMessage::SubscribeAnnouncesOk(ref m) => self.receive_subscribe_announces_ok(m),
653            ControlMessage::SubscribeAnnouncesError(ref m) => {
654                self.receive_subscribe_announces_error(m)
655            }
656            ControlMessage::AnnounceOk(ref m) => self.receive_announce_ok(m),
657            ControlMessage::AnnounceError(ref m) => self.receive_announce_error(m),
658            ControlMessage::AnnounceCancel(ref m) => self.receive_announce_cancel(m),
659            ControlMessage::TrackStatus(ref m) => self.receive_track_status(m),
660            _ => Ok(()),
661        }
662    }
663}