Skip to main content

moqtap_client/draft10/
connection.rs

1use std::sync::Arc;
2
3use bytes::{Buf, Bytes, BytesMut};
4
5use crate::draft10::endpoint::{Endpoint, EndpointError};
6use crate::draft10::event::{ClientEvent, Direction, FetchObject, StreamKind, SubgroupObject};
7use crate::draft10::observer::ConnectionObserver;
8use crate::transport::quic::QuicTransport;
9use crate::transport::{RecvStream, SendStream, Transport, TransportError};
10use moqtap_codec::dispatch::{
11    AnyControlMessage, AnyDatagramHeader, AnyFetchHeader, AnySubgroupHeader,
12};
13use moqtap_codec::draft10::data_stream::{FetchObjectHeader, ObjectHeader};
14use moqtap_codec::draft10::message::ControlMessage;
15use moqtap_codec::error::CodecError;
16use moqtap_codec::types::*;
17use moqtap_codec::varint::VarInt;
18use moqtap_codec::version::DraftVersion;
19
20/// MoQT ALPN identifier (used by raw QUIC transport).
21pub const MOQT_ALPN: &[u8] = b"moq-00";
22
23/// Errors from the draft-10 connection layer.
24#[derive(Debug, thiserror::Error)]
25pub enum ConnectionError {
26    /// Endpoint state machine error.
27    #[error("endpoint error: {0}")]
28    Endpoint(#[from] EndpointError),
29    /// Wire codec error.
30    #[error("codec error: {0}")]
31    Codec(#[from] CodecError),
32    /// Transport-level error.
33    #[error("transport error: {0}")]
34    Transport(#[from] TransportError),
35    /// Variable-length integer decoding error.
36    #[error("varint error: {0}")]
37    VarInt(#[from] moqtap_codec::varint::VarIntError),
38    /// Control stream was not opened.
39    #[error("control stream not open")]
40    NoControlStream,
41    /// Stream ended before a complete message was read.
42    #[error("unexpected end of stream")]
43    UnexpectedEnd,
44    /// Stream was finished by the peer.
45    #[error("stream finished")]
46    StreamFinished,
47    /// Invalid server address string.
48    #[error("invalid server address: {0}")]
49    InvalidAddress(String),
50    /// TLS configuration error.
51    #[error("TLS config error: {0}")]
52    TlsConfig(String),
53}
54
55/// Transport type for the connection.
56#[derive(Debug, Clone)]
57pub enum TransportType {
58    /// Raw QUIC via quinn. The `addr` field should be `host:port`.
59    Quic,
60    /// WebTransport via wtransport. The `url` field is the WebTransport URL.
61    WebTransport {
62        /// The WebTransport endpoint URL (e.g., `https://host:port/path`).
63        url: String,
64    },
65}
66
67/// Configuration for a draft-10 MoQT client connection.
68pub struct ClientConfig {
69    /// Additional draft versions to offer in CLIENT_SETUP (draft-10 is always
70    /// offered first).
71    pub additional_versions: Vec<DraftVersion>,
72    /// The transport type (QUIC or WebTransport).
73    pub transport: TransportType,
74    /// Whether to skip TLS certificate verification (for testing).
75    pub skip_cert_verification: bool,
76    /// Custom CA certificates to trust (DER-encoded).
77    pub ca_certs: Vec<Vec<u8>>,
78    /// Setup parameters to include in CLIENT_SETUP (e.g., auth tokens).
79    pub setup_parameters: Vec<moqtap_codec::kvp::KeyValuePair>,
80}
81
82impl ClientConfig {
83    /// Returns the MoQT version varints for the CLIENT_SETUP message.
84    /// Draft-10 first, then any additional versions.
85    pub fn supported_versions(&self) -> Vec<VarInt> {
86        let mut versions = vec![DraftVersion::Draft10.version_varint()];
87        for v in &self.additional_versions {
88            let varint = v.version_varint();
89            if !versions.contains(&varint) {
90                versions.push(varint);
91            }
92        }
93        versions
94    }
95
96    /// Returns the ALPN protocol identifiers for the transport.
97    pub fn alpn(&self) -> Vec<Vec<u8>> {
98        match &self.transport {
99            TransportType::Quic => vec![DraftVersion::Draft10.quic_alpn().to_vec()],
100            TransportType::WebTransport { .. } => vec![b"h3".to_vec()],
101        }
102    }
103}
104
105/// A framed writer for a send stream. Handles MoQT length-prefixed framing.
106pub struct FramedSendStream {
107    inner: SendStream,
108}
109
110impl FramedSendStream {
111    /// Create a new framed send stream.
112    pub fn new(inner: SendStream) -> Self {
113        Self { inner }
114    }
115
116    /// Get the transport-level stream ID.
117    pub fn stream_id(&self) -> u64 {
118        self.inner.stream_id()
119    }
120
121    /// Write a control message to the stream with type+length framing.
122    /// Returns the raw bytes that were written (for event capture).
123    pub async fn write_control(
124        &mut self,
125        msg: &AnyControlMessage,
126    ) -> Result<Vec<u8>, ConnectionError> {
127        let mut buf = Vec::new();
128        msg.encode(&mut buf)?;
129        self.inner.write_all(&buf).await?;
130        Ok(buf)
131    }
132
133    /// Write a subgroup stream header.
134    pub async fn write_subgroup_header(
135        &mut self,
136        header: &AnySubgroupHeader,
137    ) -> Result<(), ConnectionError> {
138        let mut buf = Vec::new();
139        header.encode(&mut buf);
140        self.inner.write_all(&buf).await?;
141        Ok(())
142    }
143
144    /// Write a fetch response header.
145    pub async fn write_fetch_header(
146        &mut self,
147        header: &AnyFetchHeader,
148    ) -> Result<(), ConnectionError> {
149        let mut buf = Vec::new();
150        header.encode(&mut buf);
151        self.inner.write_all(&buf).await?;
152        Ok(())
153    }
154
155    /// Append a draft-10 subgroup object (header + payload) to the stream.
156    /// Draft-10 subgroup objects are stateless — no header-dependent bookkeeping.
157    pub async fn write_subgroup_object(
158        &mut self,
159        object: &SubgroupObject,
160    ) -> Result<(), ConnectionError> {
161        let mut buf = Vec::new();
162        object.header.encode(&mut buf);
163        buf.extend_from_slice(&object.payload);
164        self.inner.write_all(&buf).await?;
165        Ok(())
166    }
167
168    /// Append a draft-10 fetch object (header + payload) to the stream.
169    pub async fn write_fetch_object(
170        &mut self,
171        object: &FetchObject,
172    ) -> Result<(), ConnectionError> {
173        let mut buf = Vec::new();
174        object.header.encode(&mut buf);
175        buf.extend_from_slice(&object.payload);
176        self.inner.write_all(&buf).await?;
177        Ok(())
178    }
179
180    /// Finish the stream (send FIN).
181    pub async fn finish(&mut self) -> Result<(), ConnectionError> {
182        self.inner.finish()?;
183        Ok(())
184    }
185}
186
187/// A framed reader for a recv stream. Handles MoQT varint-length decoding.
188pub struct FramedRecvStream {
189    inner: RecvStream,
190    buf: BytesMut,
191}
192
193impl FramedRecvStream {
194    /// Create a new framed receive stream.
195    pub fn new(inner: RecvStream) -> Self {
196        Self { inner, buf: BytesMut::with_capacity(4096) }
197    }
198
199    /// Get the transport-level stream ID.
200    pub fn stream_id(&self) -> u64 {
201        self.inner.stream_id()
202    }
203
204    /// Read more data from the stream into the internal buffer.
205    async fn fill(&mut self) -> Result<bool, ConnectionError> {
206        let mut tmp = [0u8; 4096];
207        match self.inner.read(&mut tmp).await {
208            Ok(Some(n)) => {
209                self.buf.extend_from_slice(&tmp[..n]);
210                Ok(true)
211            }
212            Ok(None) => Ok(false),
213            Err(e) => Err(ConnectionError::Transport(e)),
214        }
215    }
216
217    /// Ensure at least `n` bytes are available in the buffer.
218    async fn ensure(&mut self, n: usize) -> Result<(), ConnectionError> {
219        while self.buf.len() < n {
220            if !self.fill().await? {
221                return Err(ConnectionError::UnexpectedEnd);
222            }
223        }
224        Ok(())
225    }
226
227    /// Read a control message from the stream.
228    ///
229    /// When `capture_raw` is true, the returned tuple includes a clone of the
230    /// framed wire bytes (for observer emission). When false, the second
231    /// element is `None` and the payload clone is skipped.
232    pub async fn read_control(
233        &mut self,
234        capture_raw: bool,
235    ) -> Result<(AnyControlMessage, Option<Vec<u8>>), ConnectionError> {
236        // Read type ID varint
237        self.ensure(1).await?;
238        let type_len = varint_len(self.buf[0]);
239        self.ensure(type_len).await?;
240
241        let mut cursor = &self.buf[..type_len];
242        let _type_id = VarInt::decode(&mut cursor)?;
243
244        // Draft-10 uses varint length framing.
245        self.ensure(type_len + 1).await?;
246        let payload_len_start = type_len;
247        let payload_len_varint_len = varint_len(self.buf[payload_len_start]);
248        self.ensure(type_len + payload_len_varint_len).await?;
249        let mut cursor = &self.buf[payload_len_start..type_len + payload_len_varint_len];
250        let payload_len = VarInt::decode(&mut cursor)?.into_inner() as usize;
251        let len_field_size = payload_len_varint_len;
252
253        // Read full payload
254        let total = type_len + len_field_size + payload_len;
255        self.ensure(total).await?;
256
257        // Capture raw bytes only if requested (observer attached).
258        let raw = capture_raw.then(|| self.buf[..total].to_vec());
259
260        // Now decode the whole message using the draft-10 dispatcher
261        let mut frame = &self.buf[..total];
262        let msg = AnyControlMessage::decode(DraftVersion::Draft10, &mut frame)?;
263        self.buf.advance(total);
264        Ok((msg, raw))
265    }
266
267    /// Read a subgroup stream header.
268    pub async fn read_subgroup_header(&mut self) -> Result<AnySubgroupHeader, ConnectionError> {
269        self.ensure(1).await?;
270        loop {
271            let mut cursor = &self.buf[..];
272            match AnySubgroupHeader::decode(DraftVersion::Draft10, &mut cursor) {
273                Ok(header) => {
274                    let consumed = self.buf.len() - cursor.remaining();
275                    self.buf.advance(consumed);
276                    return Ok(header);
277                }
278                Err(CodecError::UnexpectedEnd) => {
279                    if !self.fill().await? {
280                        return Err(ConnectionError::UnexpectedEnd);
281                    }
282                }
283                Err(e) => return Err(ConnectionError::Codec(e)),
284            }
285        }
286    }
287
288    /// Read a fetch response header.
289    pub async fn read_fetch_header(&mut self) -> Result<AnyFetchHeader, ConnectionError> {
290        self.ensure(1).await?;
291        loop {
292            let mut cursor = &self.buf[..];
293            match AnyFetchHeader::decode(DraftVersion::Draft10, &mut cursor) {
294                Ok(header) => {
295                    let consumed = self.buf.len() - cursor.remaining();
296                    self.buf.advance(consumed);
297                    return Ok(header);
298                }
299                Err(CodecError::UnexpectedEnd) => {
300                    if !self.fill().await? {
301                        return Err(ConnectionError::UnexpectedEnd);
302                    }
303                }
304                Err(e) => return Err(ConnectionError::Codec(e)),
305            }
306        }
307    }
308
309    /// Read the next draft-10 subgroup object (header + payload). Since
310    /// draft-10 subgroup objects are stateless, this does not require any
311    /// prior header-decoding state.
312    pub async fn read_subgroup_object(&mut self) -> Result<SubgroupObject, ConnectionError> {
313        loop {
314            let mut cursor = &self.buf[..];
315            match ObjectHeader::decode(&mut cursor) {
316                Ok(header) => {
317                    let header_consumed = self.buf.len() - cursor.remaining();
318                    let payload_len = header.payload_length.into_inner() as usize;
319                    let total = header_consumed + payload_len;
320                    if self.buf.len() < total {
321                        if !self.fill().await? {
322                            return Err(ConnectionError::UnexpectedEnd);
323                        }
324                        continue;
325                    }
326                    let payload = self.buf[header_consumed..total].to_vec();
327                    self.buf.advance(total);
328                    return Ok(SubgroupObject { header, payload });
329                }
330                Err(CodecError::UnexpectedEnd) => {
331                    if !self.fill().await? {
332                        return Err(ConnectionError::UnexpectedEnd);
333                    }
334                }
335                Err(e) => return Err(ConnectionError::Codec(e)),
336            }
337        }
338    }
339
340    /// Read the next draft-10 fetch object (header + payload).
341    pub async fn read_fetch_object(&mut self) -> Result<FetchObject, ConnectionError> {
342        loop {
343            let mut cursor = &self.buf[..];
344            match FetchObjectHeader::decode(&mut cursor) {
345                Ok(header) => {
346                    let header_consumed = self.buf.len() - cursor.remaining();
347                    let payload_len = header.payload_length.into_inner() as usize;
348                    let total = header_consumed + payload_len;
349                    if self.buf.len() < total {
350                        if !self.fill().await? {
351                            return Err(ConnectionError::UnexpectedEnd);
352                        }
353                        continue;
354                    }
355                    let payload = self.buf[header_consumed..total].to_vec();
356                    self.buf.advance(total);
357                    return Ok(FetchObject { header, payload });
358                }
359                Err(CodecError::UnexpectedEnd) => {
360                    if !self.fill().await? {
361                        return Err(ConnectionError::UnexpectedEnd);
362                    }
363                }
364                Err(e) => return Err(ConnectionError::Codec(e)),
365            }
366        }
367    }
368}
369
370/// A live draft-10 MoQT connection over QUIC or WebTransport.
371pub struct Connection {
372    transport: Transport,
373    endpoint: Endpoint,
374    control_send: Option<FramedSendStream>,
375    control_recv: Option<FramedRecvStream>,
376    observer: Option<Box<dyn ConnectionObserver>>,
377}
378
379impl Connection {
380    /// Connect to a draft-10 MoQT server as a client.
381    pub async fn connect(addr: &str, config: ClientConfig) -> Result<Self, ConnectionError> {
382        let transport = match &config.transport {
383            TransportType::Quic => Self::connect_quic(addr, &config).await?,
384            TransportType::WebTransport { url } => {
385                let url = url.clone();
386                Self::connect_webtransport(&url, &config).await?
387            }
388        };
389
390        // Open bidirectional control stream
391        let (send, recv) = transport.open_bi().await?;
392        let mut control_send = FramedSendStream::new(send);
393        let mut control_recv = FramedRecvStream::new(recv);
394
395        // Perform setup handshake
396        let mut endpoint = Endpoint::new();
397        endpoint.connect()?;
398        let setup_msg = endpoint
399            .send_client_setup(config.supported_versions(), config.setup_parameters.clone())?;
400        let any_setup = AnyControlMessage::Draft10(setup_msg);
401        let _raw_setup = control_send.write_control(&any_setup).await?;
402
403        let (server_setup, _raw_server_setup) = control_recv.read_control(false).await?;
404        match &server_setup {
405            AnyControlMessage::Draft10(ControlMessage::ServerSetup(ref ss)) => {
406                endpoint.receive_server_setup(ss)?;
407            }
408            _ => {
409                return Err(ConnectionError::Endpoint(EndpointError::NotActive));
410            }
411        }
412
413        let conn = Self {
414            transport,
415            endpoint,
416            control_send: Some(control_send),
417            control_recv: Some(control_recv),
418            observer: None,
419        };
420
421        if let Some(v) = conn.endpoint.negotiated_version() {
422            conn.emit(ClientEvent::SetupComplete { negotiated_version: v.into_inner() });
423        }
424
425        Ok(conn)
426    }
427
428    /// Establish a raw QUIC connection.
429    async fn connect_quic(addr: &str, config: &ClientConfig) -> Result<Transport, ConnectionError> {
430        let server_addr = addr.parse().map_err(|e: std::net::AddrParseError| {
431            ConnectionError::InvalidAddress(e.to_string())
432        })?;
433
434        let mut tls_config = if config.skip_cert_verification {
435            rustls::ClientConfig::builder()
436                .dangerous()
437                .with_custom_certificate_verifier(Arc::new(SkipVerification))
438                .with_no_client_auth()
439        } else {
440            let mut roots = rustls::RootCertStore::empty();
441            roots.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
442            for der in &config.ca_certs {
443                roots
444                    .add(rustls::pki_types::CertificateDer::from(der.clone()))
445                    .map_err(|e| ConnectionError::TlsConfig(format!("bad CA cert: {e}")))?;
446            }
447            rustls::ClientConfig::builder().with_root_certificates(roots).with_no_client_auth()
448        };
449
450        tls_config.alpn_protocols = config.alpn();
451
452        let quic_config: quinn::crypto::rustls::QuicClientConfig =
453            tls_config.try_into().map_err(|e| ConnectionError::TlsConfig(format!("{e}")))?;
454        let client_config = quinn::ClientConfig::new(Arc::new(quic_config));
455
456        let mut quinn_endpoint = quinn::Endpoint::client("0.0.0.0:0".parse().unwrap())
457            .map_err(|e| ConnectionError::InvalidAddress(e.to_string()))?;
458        quinn_endpoint.set_default_client_config(client_config);
459
460        let server_name = addr.split(':').next().unwrap_or("localhost").to_string();
461
462        let quic = quinn_endpoint
463            .connect(server_addr, &server_name)
464            .map_err(TransportError::from)?
465            .await
466            .map_err(TransportError::from)?;
467
468        Ok(Transport::Quic(QuicTransport::new(quic)))
469    }
470
471    /// Establish a WebTransport connection.
472    #[cfg(feature = "webtransport")]
473    async fn connect_webtransport(
474        url: &str,
475        config: &ClientConfig,
476    ) -> Result<Transport, ConnectionError> {
477        use crate::transport::webtransport::WebTransportTransport;
478
479        let wt_config = if config.skip_cert_verification {
480            wtransport::ClientConfig::builder()
481                .with_bind_default()
482                .with_no_cert_validation()
483                .build()
484        } else {
485            wtransport::ClientConfig::builder().with_bind_default().with_native_certs().build()
486        };
487
488        let endpoint = wtransport::Endpoint::client(wt_config)
489            .map_err(|e| ConnectionError::Transport(TransportError::Connect(e.to_string())))?;
490
491        let connection = endpoint
492            .connect(url)
493            .await
494            .map_err(|e| ConnectionError::Transport(TransportError::Connect(e.to_string())))?;
495
496        Ok(Transport::WebTransport(WebTransportTransport::new(connection)))
497    }
498
499    /// Stub for when the webtransport feature is not enabled.
500    #[cfg(not(feature = "webtransport"))]
501    async fn connect_webtransport(
502        _url: &str,
503        _config: &ClientConfig,
504    ) -> Result<Transport, ConnectionError> {
505        Err(ConnectionError::Transport(TransportError::Connect(
506            "webtransport feature not enabled".into(),
507        )))
508    }
509
510    // ── Observer ───────────────────────────────────────────────
511
512    /// Attach an observer to receive connection events.
513    pub fn set_observer(&mut self, observer: Box<dyn ConnectionObserver>) {
514        self.observer = Some(observer);
515    }
516
517    /// Remove the observer.
518    pub fn clear_observer(&mut self) {
519        self.observer = None;
520    }
521
522    /// Emit an event to the observer, if one is attached.
523    fn emit(&self, event: ClientEvent) {
524        if let Some(ref obs) = self.observer {
525            obs.on_event_owned(event);
526        }
527    }
528
529    // ── Control message I/O ─────────────────────────────────
530
531    /// Send a control message on the control stream.
532    pub async fn send_control(&mut self, msg: &ControlMessage) -> Result<(), ConnectionError> {
533        let any = AnyControlMessage::Draft10(msg.clone());
534        let send = self.control_send.as_mut().ok_or(ConnectionError::NoControlStream)?;
535        let raw = send.write_control(&any).await?;
536        self.emit(ClientEvent::ControlMessage {
537            direction: Direction::Send,
538            message: any,
539            raw: Some(raw),
540        });
541        Ok(())
542    }
543
544    /// Read the next control message from the control stream.
545    pub async fn recv_control(&mut self) -> Result<ControlMessage, ConnectionError> {
546        let recv = self.control_recv.as_mut().ok_or(ConnectionError::NoControlStream)?;
547        let capture_raw = self.observer.is_some();
548        let (any, raw) = recv.read_control(capture_raw).await?;
549        if capture_raw {
550            self.emit(ClientEvent::ControlMessage {
551                direction: Direction::Receive,
552                message: any.clone(),
553                raw,
554            });
555        }
556        match any {
557            AnyControlMessage::Draft10(msg) => Ok(msg),
558            _ => Err(ConnectionError::Codec(CodecError::UnknownMessageType(0))),
559        }
560    }
561
562    /// Read and dispatch the next incoming control message through the endpoint
563    /// state machine. Returns the decoded message for inspection.
564    pub async fn recv_and_dispatch(&mut self) -> Result<ControlMessage, ConnectionError> {
565        let msg = self.recv_control().await?;
566        self.endpoint.receive_message(msg.clone())?;
567
568        if let ControlMessage::GoAway(ref ga) = msg {
569            self.emit(ClientEvent::Draining { new_session_uri: ga.new_session_uri.clone() });
570        }
571
572        Ok(msg)
573    }
574
575    // ── Subscribe flow ──────────────────────────────────────
576
577    /// Send a SUBSCRIBE and return the allocated subscribe ID.
578    #[allow(clippy::too_many_arguments)]
579    pub async fn subscribe(
580        &mut self,
581        track_alias: VarInt,
582        track_namespace: TrackNamespace,
583        track_name: Vec<u8>,
584        subscriber_priority: u8,
585        group_order: GroupOrder,
586        filter_type: FilterType,
587    ) -> Result<VarInt, ConnectionError> {
588        let (sub_id, msg) = self.endpoint.subscribe(
589            track_alias,
590            track_namespace,
591            track_name,
592            subscriber_priority,
593            group_order,
594            filter_type,
595        )?;
596        self.send_control(&msg).await?;
597        Ok(sub_id)
598    }
599
600    /// Send an UNSUBSCRIBE for the given subscribe ID.
601    pub async fn unsubscribe(&mut self, subscribe_id: VarInt) -> Result<(), ConnectionError> {
602        let msg = self.endpoint.unsubscribe(subscribe_id)?;
603        self.send_control(&msg).await
604    }
605
606    // ── Fetch flow ──────────────────────────────────────────
607
608    /// Send a FETCH and return the allocated subscribe ID.
609    #[allow(clippy::too_many_arguments)]
610    pub async fn fetch(
611        &mut self,
612        track_namespace: TrackNamespace,
613        track_name: Vec<u8>,
614        subscriber_priority: u8,
615        group_order: GroupOrder,
616        start_group: VarInt,
617        start_object: VarInt,
618        end_group: VarInt,
619        end_object: VarInt,
620    ) -> Result<VarInt, ConnectionError> {
621        let (sub_id, msg) = self.endpoint.fetch(
622            track_namespace,
623            track_name,
624            subscriber_priority,
625            group_order,
626            start_group,
627            start_object,
628            end_group,
629            end_object,
630        )?;
631        self.send_control(&msg).await?;
632        Ok(sub_id)
633    }
634
635    /// Send a FETCH_CANCEL for the given subscribe ID.
636    pub async fn fetch_cancel(&mut self, subscribe_id: VarInt) -> Result<(), ConnectionError> {
637        let msg = self.endpoint.fetch_cancel(subscribe_id)?;
638        self.send_control(&msg).await
639    }
640
641    // ── Namespace flows ─────────────────────────────────────
642
643    /// Send a SUBSCRIBE_ANNOUNCES.
644    pub async fn subscribe_announces(
645        &mut self,
646        track_namespace_prefix: TrackNamespace,
647    ) -> Result<(), ConnectionError> {
648        let msg = self.endpoint.subscribe_announces(track_namespace_prefix)?;
649        self.send_control(&msg).await
650    }
651
652    /// Send an ANNOUNCE.
653    pub async fn announce(
654        &mut self,
655        track_namespace: TrackNamespace,
656    ) -> Result<(), ConnectionError> {
657        let msg = self.endpoint.announce(track_namespace)?;
658        self.send_control(&msg).await
659    }
660
661    /// Send an UNANNOUNCE.
662    pub async fn unannounce(
663        &mut self,
664        track_namespace: TrackNamespace,
665    ) -> Result<(), ConnectionError> {
666        let msg = self.endpoint.unannounce(track_namespace)?;
667        self.send_control(&msg).await
668    }
669
670    // ── Track Status flow ────────────────────────────────────
671
672    /// Send a TRACK_STATUS_REQUEST.
673    pub async fn track_status_request(
674        &mut self,
675        track_namespace: TrackNamespace,
676        track_name: Vec<u8>,
677    ) -> Result<(), ConnectionError> {
678        let msg = self.endpoint.track_status_request(track_namespace, track_name)?;
679        self.send_control(&msg).await
680    }
681
682    // ── Data streams ────────────────────────────────────────
683
684    /// Open a new unidirectional stream for sending subgroup data.
685    pub async fn open_subgroup_stream(
686        &self,
687        header: &AnySubgroupHeader,
688    ) -> Result<FramedSendStream, ConnectionError> {
689        let send = self.transport.open_uni().await?;
690        let mut framed = FramedSendStream::new(send);
691        let sid = framed.stream_id();
692        framed.write_subgroup_header(header).await?;
693        self.emit(ClientEvent::StreamOpened {
694            direction: Direction::Send,
695            stream_kind: StreamKind::Subgroup,
696            stream_id: sid,
697        });
698        self.emit(ClientEvent::DataStreamHeader {
699            stream_id: sid,
700            direction: Direction::Send,
701            header: header.clone(),
702        });
703        Ok(framed)
704    }
705
706    /// Accept an incoming unidirectional data stream and read its subgroup
707    /// header.
708    pub async fn accept_subgroup_stream(
709        &self,
710    ) -> Result<(AnySubgroupHeader, FramedRecvStream), ConnectionError> {
711        let recv = self.transport.accept_uni().await?;
712        let mut framed = FramedRecvStream::new(recv);
713        let sid = framed.stream_id();
714        let header = framed.read_subgroup_header().await?;
715        self.emit(ClientEvent::StreamOpened {
716            direction: Direction::Receive,
717            stream_kind: StreamKind::Subgroup,
718            stream_id: sid,
719        });
720        self.emit(ClientEvent::DataStreamHeader {
721            stream_id: sid,
722            direction: Direction::Receive,
723            header: header.clone(),
724        });
725        Ok((header, framed))
726    }
727
728    /// Send an object via datagram.
729    pub fn send_datagram(
730        &self,
731        header: &AnyDatagramHeader,
732        payload: &[u8],
733    ) -> Result<(), ConnectionError> {
734        let mut buf = Vec::new();
735        header.encode(&mut buf);
736        buf.extend_from_slice(payload);
737        self.emit(ClientEvent::DatagramReceived {
738            direction: Direction::Send,
739            header: header.clone(),
740            payload_len: payload.len(),
741        });
742        self.transport.send_datagram(bytes::Bytes::from(buf))?;
743        Ok(())
744    }
745
746    /// Receive a datagram and decode its header.
747    pub async fn recv_datagram(&self) -> Result<(AnyDatagramHeader, Bytes), ConnectionError> {
748        let data = self.transport.recv_datagram().await?;
749        let mut cursor = &data[..];
750        let header = AnyDatagramHeader::decode(DraftVersion::Draft10, &mut cursor)?;
751        let consumed = data.len() - cursor.len();
752        let payload = data.slice(consumed..);
753        self.emit(ClientEvent::DatagramReceived {
754            direction: Direction::Receive,
755            header: header.clone(),
756            payload_len: payload.len(),
757        });
758        Ok((header, payload))
759    }
760
761    // ── Accessors ───────────────────────────────────────────
762
763    /// Access the underlying endpoint state machine.
764    pub fn endpoint(&self) -> &Endpoint {
765        &self.endpoint
766    }
767
768    /// Mutable access to the endpoint state machine.
769    pub fn endpoint_mut(&mut self) -> &mut Endpoint {
770        &mut self.endpoint
771    }
772
773    /// Get the negotiated MoQT version.
774    pub fn negotiated_version(&self) -> Option<VarInt> {
775        self.endpoint.negotiated_version()
776    }
777
778    /// Close the connection.
779    pub fn close(&self, code: u32, reason: &[u8]) {
780        self.emit(ClientEvent::Closed { code, reason: reason.to_vec() });
781        self.transport.close(code, reason);
782    }
783}
784
785/// Determine the encoded length of a varint from its first byte.
786fn varint_len(first_byte: u8) -> usize {
787    1 << (first_byte >> 6)
788}
789
790/// TLS certificate verifier that skips all verification (for testing only).
791#[derive(Debug)]
792struct SkipVerification;
793
794impl rustls::client::danger::ServerCertVerifier for SkipVerification {
795    fn verify_server_cert(
796        &self,
797        _end_entity: &rustls::pki_types::CertificateDer<'_>,
798        _intermediates: &[rustls::pki_types::CertificateDer<'_>],
799        _server_name: &rustls::pki_types::ServerName<'_>,
800        _ocsp_response: &[u8],
801        _now: rustls::pki_types::UnixTime,
802    ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
803        Ok(rustls::client::danger::ServerCertVerified::assertion())
804    }
805
806    fn verify_tls12_signature(
807        &self,
808        _message: &[u8],
809        _cert: &rustls::pki_types::CertificateDer<'_>,
810        _dcs: &rustls::DigitallySignedStruct,
811    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
812        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
813    }
814
815    fn verify_tls13_signature(
816        &self,
817        _message: &[u8],
818        _cert: &rustls::pki_types::CertificateDer<'_>,
819        _dcs: &rustls::DigitallySignedStruct,
820    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
821        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
822    }
823
824    fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
825        vec![
826            rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
827            rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
828            rustls::SignatureScheme::ED25519,
829            rustls::SignatureScheme::RSA_PSS_SHA256,
830            rustls::SignatureScheme::RSA_PSS_SHA384,
831            rustls::SignatureScheme::RSA_PSS_SHA512,
832        ]
833    }
834}
835
836#[cfg(test)]
837mod tests {
838    use super::*;
839
840    #[test]
841    fn client_config_supported_versions_default() {
842        let config = ClientConfig {
843            additional_versions: Vec::new(),
844            transport: TransportType::Quic,
845            skip_cert_verification: false,
846            ca_certs: Vec::new(),
847            setup_parameters: Vec::new(),
848        };
849        let versions = config.supported_versions();
850        assert_eq!(versions.len(), 1);
851        assert_eq!(versions[0].into_inner(), 0xff000000 + 10);
852    }
853
854    #[test]
855    fn client_config_alpn_quic() {
856        let config = ClientConfig {
857            additional_versions: Vec::new(),
858            transport: TransportType::Quic,
859            skip_cert_verification: false,
860            ca_certs: Vec::new(),
861            setup_parameters: Vec::new(),
862        };
863        assert_eq!(config.alpn(), vec![DraftVersion::Draft10.quic_alpn().to_vec()]);
864    }
865
866    #[test]
867    fn moqt_alpn_value() {
868        assert_eq!(MOQT_ALPN, b"moq-00");
869    }
870}