1use std::sync::Arc;
2
3use bytes::{Buf, Bytes, BytesMut};
4
5use crate::draft15::endpoint::{Endpoint, EndpointError};
6use crate::draft15::event::{ClientEvent, Direction, StreamKind};
7use crate::draft15::observer::ConnectionObserver;
8use crate::draft15::session::request_id::Role;
9use crate::transport::quic::QuicTransport;
10use crate::transport::{RecvStream, SendStream, Transport, TransportError};
11use moqtap_codec::dispatch::{
12 AnyControlMessage, AnyDatagramHeader, AnyFetchHeader, AnySubgroupHeader,
13};
14use moqtap_codec::draft15::data_stream::{FetchHeader, SubgroupObject, SubgroupObjectReader};
15use moqtap_codec::draft15::message::ControlMessage;
16use moqtap_codec::error::CodecError;
17use moqtap_codec::kvp::KeyValuePair;
18use moqtap_codec::types::*;
19use moqtap_codec::varint::VarInt;
20use moqtap_codec::version::DraftVersion;
21
22pub const MOQT_ALPN: &[u8] = b"moq-00";
24
25#[derive(Debug, thiserror::Error)]
27pub enum ConnectionError {
28 #[error("endpoint error: {0}")]
30 Endpoint(#[from] EndpointError),
31 #[error("codec error: {0}")]
33 Codec(#[from] CodecError),
34 #[error("transport error: {0}")]
36 Transport(#[from] TransportError),
37 #[error("varint error: {0}")]
39 VarInt(#[from] moqtap_codec::varint::VarIntError),
40 #[error("control stream not open")]
42 NoControlStream,
43 #[error("unexpected end of stream")]
45 UnexpectedEnd,
46 #[error("stream finished")]
48 StreamFinished,
49 #[error("invalid server address: {0}")]
51 InvalidAddress(String),
52 #[error("TLS config error: {0}")]
54 TlsConfig(String),
55 #[error("data stream state error: {0}")]
57 DataStreamState(&'static str),
58}
59
60#[derive(Debug, Clone)]
62pub enum TransportType {
63 Quic,
65 WebTransport {
67 url: String,
69 },
70}
71
72pub struct ClientConfig {
76 pub draft: DraftVersion,
78 pub transport: TransportType,
80 pub skip_cert_verification: bool,
82 pub ca_certs: Vec<Vec<u8>>,
84 pub setup_parameters: Vec<KeyValuePair>,
86}
87
88impl ClientConfig {
89 pub fn alpn(&self) -> Vec<Vec<u8>> {
91 match &self.transport {
92 TransportType::Quic => vec![self.draft.quic_alpn().to_vec()],
93 TransportType::WebTransport { .. } => vec![b"h3".to_vec()],
94 }
95 }
96}
97
98pub struct FramedSendStream {
100 inner: SendStream,
101 draft: DraftVersion,
102 subgroup_io: Option<SubgroupObjectReader>,
106}
107
108impl FramedSendStream {
109 pub fn new(inner: SendStream, draft: DraftVersion) -> Self {
111 Self { inner, draft, subgroup_io: None }
112 }
113
114 pub fn stream_id(&self) -> u64 {
116 self.inner.stream_id()
117 }
118
119 pub async fn write_control(
122 &mut self,
123 msg: &AnyControlMessage,
124 ) -> Result<Vec<u8>, ConnectionError> {
125 let mut buf = Vec::new();
126 msg.encode(&mut buf)?;
127 self.inner.write_all(&buf).await?;
128 Ok(buf)
129 }
130
131 pub async fn write_subgroup_header(
135 &mut self,
136 header: &AnySubgroupHeader,
137 ) -> Result<(), ConnectionError> {
138 let mut buf = Vec::new();
139 header.encode(&mut buf);
140 self.inner.write_all(&buf).await?;
141 if let AnySubgroupHeader::Draft15(ref d15) = header {
142 self.subgroup_io = Some(SubgroupObjectReader::new(d15));
143 }
144 Ok(())
145 }
146
147 pub async fn write_fetch_header(
149 &mut self,
150 header: &AnyFetchHeader,
151 ) -> Result<(), ConnectionError> {
152 let mut buf = Vec::new();
153 header.encode(&mut buf);
154 self.inner.write_all(&buf).await?;
155 Ok(())
156 }
157
158 pub async fn write_subgroup_object(
163 &mut self,
164 object: &SubgroupObject,
165 ) -> Result<(), ConnectionError> {
166 let writer = self
167 .subgroup_io
168 .as_mut()
169 .ok_or(ConnectionError::DataStreamState("subgroup header not written yet"))?;
170 let mut buf = Vec::new();
171 writer.write_object(object, &mut buf)?;
172 self.inner.write_all(&buf).await?;
173 Ok(())
174 }
175
176 pub async fn finish(&mut self) -> Result<(), ConnectionError> {
178 self.inner.finish()?;
179 Ok(())
180 }
181
182 pub fn draft(&self) -> DraftVersion {
184 self.draft
185 }
186}
187
188pub struct FramedRecvStream {
190 inner: RecvStream,
191 buf: BytesMut,
192 draft: DraftVersion,
193 subgroup_io: Option<SubgroupObjectReader>,
196}
197
198impl FramedRecvStream {
199 pub fn new(inner: RecvStream, draft: DraftVersion) -> Self {
201 Self { inner, buf: BytesMut::with_capacity(4096), draft, subgroup_io: None }
202 }
203
204 pub fn stream_id(&self) -> u64 {
206 self.inner.stream_id()
207 }
208
209 async fn fill(&mut self) -> Result<bool, ConnectionError> {
211 let mut tmp = [0u8; 4096];
212 match self.inner.read(&mut tmp).await {
213 Ok(Some(n)) => {
214 self.buf.extend_from_slice(&tmp[..n]);
215 Ok(true)
216 }
217 Ok(None) => Ok(false),
218 Err(e) => Err(ConnectionError::Transport(e)),
219 }
220 }
221
222 async fn ensure(&mut self, n: usize) -> Result<(), ConnectionError> {
224 while self.buf.len() < n {
225 if !self.fill().await? {
226 return Err(ConnectionError::UnexpectedEnd);
227 }
228 }
229 Ok(())
230 }
231
232 pub async fn read_control(
238 &mut self,
239 capture_raw: bool,
240 ) -> Result<(AnyControlMessage, Option<Vec<u8>>), ConnectionError> {
241 self.ensure(1).await?;
243 let type_len = varint_len(self.buf[0]);
244 self.ensure(type_len).await?;
245
246 let mut cursor = &self.buf[..type_len];
247 let _type_id = VarInt::decode(&mut cursor)?;
248
249 let (payload_len, len_field_size) = if self.draft.uses_fixed_length_framing() {
251 self.ensure(type_len + 2).await?;
252 let hi = self.buf[type_len] as usize;
253 let lo = self.buf[type_len + 1] as usize;
254 ((hi << 8) | lo, 2)
255 } else {
256 self.ensure(type_len + 1).await?;
257 let payload_len_start = type_len;
258 let payload_len_varint_len = varint_len(self.buf[payload_len_start]);
259 self.ensure(type_len + payload_len_varint_len).await?;
260 let mut cursor = &self.buf[payload_len_start..type_len + payload_len_varint_len];
261 let payload_len = VarInt::decode(&mut cursor)?.into_inner() as usize;
262 (payload_len, payload_len_varint_len)
263 };
264
265 let total = type_len + len_field_size + payload_len;
267 self.ensure(total).await?;
268
269 let raw = capture_raw.then(|| self.buf[..total].to_vec());
271
272 let mut frame = &self.buf[..total];
274 let msg = AnyControlMessage::decode(self.draft, &mut frame)?;
275 self.buf.advance(total);
276 Ok((msg, raw))
277 }
278
279 pub async fn read_subgroup_header(&mut self) -> Result<AnySubgroupHeader, ConnectionError> {
282 self.ensure(1).await?;
283 loop {
284 let mut cursor = &self.buf[..];
285 match AnySubgroupHeader::decode(self.draft, &mut cursor) {
286 Ok(header) => {
287 let consumed = self.buf.len() - cursor.remaining();
288 self.buf.advance(consumed);
289 if let AnySubgroupHeader::Draft15(ref d15) = header {
290 self.subgroup_io = Some(SubgroupObjectReader::new(d15));
291 }
292 return Ok(header);
293 }
294 Err(CodecError::UnexpectedEnd) => {
295 if !self.fill().await? {
296 return Err(ConnectionError::UnexpectedEnd);
297 }
298 }
299 Err(e) => return Err(ConnectionError::Codec(e)),
300 }
301 }
302 }
303
304 pub async fn read_fetch_header(&mut self) -> Result<AnyFetchHeader, ConnectionError> {
306 self.ensure(1).await?;
307 loop {
308 let mut cursor = &self.buf[..];
309 match AnyFetchHeader::decode(self.draft, &mut cursor) {
310 Ok(header) => {
311 let consumed = self.buf.len() - cursor.remaining();
312 self.buf.advance(consumed);
313 return Ok(header);
314 }
315 Err(CodecError::UnexpectedEnd) => {
316 if !self.fill().await? {
317 return Err(ConnectionError::UnexpectedEnd);
318 }
319 }
320 Err(e) => return Err(ConnectionError::Codec(e)),
321 }
322 }
323 }
324
325 pub async fn read_subgroup_object(&mut self) -> Result<SubgroupObject, ConnectionError> {
332 if self.subgroup_io.is_none() {
333 return Err(ConnectionError::DataStreamState("subgroup header not read yet"));
334 }
335 loop {
336 let reader = self.subgroup_io.as_mut().unwrap();
337 let mut probe = reader.clone();
338 let mut cursor = &self.buf[..];
339 match probe.read_object(&mut cursor) {
340 Ok(obj) => {
341 let consumed = self.buf.len() - cursor.remaining();
342 self.buf.advance(consumed);
343 *reader = probe;
344 return Ok(obj);
345 }
346 Err(CodecError::UnexpectedEnd) => {
347 if !self.fill().await? {
348 return Err(ConnectionError::UnexpectedEnd);
349 }
350 }
351 Err(e) => return Err(ConnectionError::Codec(e)),
352 }
353 }
354 }
355
356 pub async fn read_fetch_stream_header(&mut self) -> Result<FetchHeader, ConnectionError> {
358 loop {
359 let mut cursor = &self.buf[..];
360 match FetchHeader::decode(&mut cursor) {
361 Ok(hdr) => {
362 let consumed = self.buf.len() - cursor.remaining();
363 self.buf.advance(consumed);
364 return Ok(hdr);
365 }
366 Err(CodecError::UnexpectedEnd) => {
367 if !self.fill().await? {
368 return Err(ConnectionError::UnexpectedEnd);
369 }
370 }
371 Err(e) => return Err(ConnectionError::Codec(e)),
372 }
373 }
374 }
375
376 pub fn draft(&self) -> DraftVersion {
378 self.draft
379 }
380}
381
382pub struct Connection {
385 transport: Transport,
386 endpoint: Endpoint,
387 draft: DraftVersion,
388 control_send: Option<FramedSendStream>,
389 control_recv: Option<FramedRecvStream>,
390 observer: Option<Box<dyn ConnectionObserver>>,
391}
392
393impl Connection {
394 pub async fn connect(addr: &str, config: ClientConfig) -> Result<Self, ConnectionError> {
401 let draft = config.draft;
402 let transport = match &config.transport {
403 TransportType::Quic => Self::connect_quic(addr, &config).await?,
404 TransportType::WebTransport { url } => {
405 let url = url.clone();
406 Self::connect_webtransport(&url, &config).await?
407 }
408 };
409
410 let (send, recv) = transport.open_bi().await?;
412 let mut control_send = FramedSendStream::new(send, draft);
413 let mut control_recv = FramedRecvStream::new(recv, draft);
414
415 let mut endpoint = Endpoint::new(Role::Client);
417 endpoint.connect()?;
418 let setup_msg = endpoint.send_client_setup(config.setup_parameters.clone())?;
419 let any_setup = AnyControlMessage::Draft15(setup_msg);
420 let _raw_setup = control_send.write_control(&any_setup).await?;
421
422 let (server_setup, _raw_server_setup) = control_recv.read_control(false).await?;
423 match &server_setup {
425 AnyControlMessage::Draft15(ControlMessage::ServerSetup(ref ss)) => {
426 endpoint.receive_server_setup(ss)?;
427 }
428 _ => {
429 return Err(ConnectionError::Endpoint(EndpointError::NotActive));
430 }
431 }
432
433 let conn = Self {
434 transport,
435 endpoint,
436 draft,
437 control_send: Some(control_send),
438 control_recv: Some(control_recv),
439 observer: None,
440 };
441
442 conn.emit(ClientEvent::SetupComplete { negotiated_version: 0xff000000 + 15 });
444
445 Ok(conn)
446 }
447
448 async fn connect_quic(addr: &str, config: &ClientConfig) -> Result<Transport, ConnectionError> {
450 let server_addr = addr.parse().map_err(|e: std::net::AddrParseError| {
451 ConnectionError::InvalidAddress(e.to_string())
452 })?;
453
454 let mut tls_config = if config.skip_cert_verification {
456 rustls::ClientConfig::builder()
457 .dangerous()
458 .with_custom_certificate_verifier(Arc::new(SkipVerification))
459 .with_no_client_auth()
460 } else {
461 let mut roots = rustls::RootCertStore::empty();
462 roots.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
463 for der in &config.ca_certs {
464 roots
465 .add(rustls::pki_types::CertificateDer::from(der.clone()))
466 .map_err(|e| ConnectionError::TlsConfig(format!("bad CA cert: {e}")))?;
467 }
468 rustls::ClientConfig::builder().with_root_certificates(roots).with_no_client_auth()
469 };
470
471 tls_config.alpn_protocols = config.alpn();
472
473 let quic_config: quinn::crypto::rustls::QuicClientConfig =
474 tls_config.try_into().map_err(|e| ConnectionError::TlsConfig(format!("{e}")))?;
475 let client_config = quinn::ClientConfig::new(Arc::new(quic_config));
476
477 let mut quinn_endpoint = quinn::Endpoint::client("0.0.0.0:0".parse().unwrap())
478 .map_err(|e| ConnectionError::InvalidAddress(e.to_string()))?;
479 quinn_endpoint.set_default_client_config(client_config);
480
481 let server_name = addr.split(':').next().unwrap_or("localhost").to_string();
482
483 let quic = quinn_endpoint
484 .connect(server_addr, &server_name)
485 .map_err(TransportError::from)?
486 .await
487 .map_err(TransportError::from)?;
488
489 Ok(Transport::Quic(QuicTransport::new(quic)))
490 }
491
492 #[cfg(feature = "webtransport")]
494 async fn connect_webtransport(
495 url: &str,
496 config: &ClientConfig,
497 ) -> Result<Transport, ConnectionError> {
498 use crate::transport::webtransport::WebTransportTransport;
499
500 let wt_config = if config.skip_cert_verification {
501 wtransport::ClientConfig::builder()
502 .with_bind_default()
503 .with_no_cert_validation()
504 .build()
505 } else {
506 wtransport::ClientConfig::builder().with_bind_default().with_native_certs().build()
507 };
508
509 let endpoint = wtransport::Endpoint::client(wt_config)
510 .map_err(|e| ConnectionError::Transport(TransportError::Connect(e.to_string())))?;
511
512 let connection = endpoint
513 .connect(url)
514 .await
515 .map_err(|e| ConnectionError::Transport(TransportError::Connect(e.to_string())))?;
516
517 Ok(Transport::WebTransport(WebTransportTransport::new(connection)))
518 }
519
520 #[cfg(not(feature = "webtransport"))]
522 async fn connect_webtransport(
523 _url: &str,
524 _config: &ClientConfig,
525 ) -> Result<Transport, ConnectionError> {
526 Err(ConnectionError::Transport(TransportError::Connect(
527 "webtransport feature not enabled".into(),
528 )))
529 }
530
531 pub fn set_observer(&mut self, observer: Box<dyn ConnectionObserver>) {
535 self.observer = Some(observer);
536 }
537
538 pub fn clear_observer(&mut self) {
540 self.observer = None;
541 }
542
543 fn emit(&self, event: ClientEvent) {
545 if let Some(ref obs) = self.observer {
546 obs.on_event_owned(event);
547 }
548 }
549
550 pub async fn send_control(&mut self, msg: &ControlMessage) -> Result<(), ConnectionError> {
557 let any = AnyControlMessage::Draft15(msg.clone());
558 let send = self.control_send.as_mut().ok_or(ConnectionError::NoControlStream)?;
559 let raw = send.write_control(&any).await?;
560 self.emit(ClientEvent::ControlMessage {
561 direction: Direction::Send,
562 message: any,
563 raw: Some(raw),
564 });
565 Ok(())
566 }
567
568 pub async fn recv_control(&mut self) -> Result<ControlMessage, ConnectionError> {
573 let recv = self.control_recv.as_mut().ok_or(ConnectionError::NoControlStream)?;
574 let capture_raw = self.observer.is_some();
575 let (any, raw) = recv.read_control(capture_raw).await?;
576 if capture_raw {
577 self.emit(ClientEvent::ControlMessage {
578 direction: Direction::Receive,
579 message: any.clone(),
580 raw,
581 });
582 }
583 match any {
585 AnyControlMessage::Draft15(msg) => Ok(msg),
586 _ => Err(ConnectionError::Codec(CodecError::UnknownMessageType(0))),
587 }
588 }
589
590 pub async fn recv_and_dispatch(&mut self) -> Result<ControlMessage, ConnectionError> {
593 let msg = self.recv_control().await?;
594 self.endpoint.receive_message(msg.clone())?;
595
596 if let ControlMessage::GoAway(ref ga) = msg {
598 self.emit(ClientEvent::Draining { new_session_uri: ga.new_session_uri.clone() });
599 }
600
601 Ok(msg)
602 }
603
604 pub async fn subscribe(
608 &mut self,
609 track_namespace: TrackNamespace,
610 track_name: Vec<u8>,
611 parameters: Vec<KeyValuePair>,
612 ) -> Result<VarInt, ConnectionError> {
613 let (req_id, msg) = self.endpoint.subscribe(track_namespace, track_name, parameters)?;
614 self.send_control(&msg).await?;
615 Ok(req_id)
616 }
617
618 pub async fn unsubscribe(&mut self, request_id: VarInt) -> Result<(), ConnectionError> {
620 let msg = self.endpoint.unsubscribe(request_id)?;
621 self.send_control(&msg).await
622 }
623
624 pub async fn fetch(
628 &mut self,
629 track_namespace: TrackNamespace,
630 track_name: Vec<u8>,
631 start_group: VarInt,
632 start_object: VarInt,
633 end_group: VarInt,
634 end_object: VarInt,
635 ) -> Result<VarInt, ConnectionError> {
636 let (req_id, msg) = self.endpoint.fetch(
637 track_namespace,
638 track_name,
639 start_group,
640 start_object,
641 end_group,
642 end_object,
643 )?;
644 self.send_control(&msg).await?;
645 Ok(req_id)
646 }
647
648 pub async fn joining_fetch(
650 &mut self,
651 joining_request_id: VarInt,
652 joining_start: VarInt,
653 ) -> Result<VarInt, ConnectionError> {
654 let (req_id, msg) = self.endpoint.joining_fetch(joining_request_id, joining_start)?;
655 self.send_control(&msg).await?;
656 Ok(req_id)
657 }
658
659 pub async fn fetch_cancel(&mut self, request_id: VarInt) -> Result<(), ConnectionError> {
661 let msg = self.endpoint.fetch_cancel(request_id)?;
662 self.send_control(&msg).await
663 }
664
665 pub async fn subscribe_namespace(
669 &mut self,
670 namespace_prefix: TrackNamespace,
671 parameters: Vec<KeyValuePair>,
672 ) -> Result<VarInt, ConnectionError> {
673 let (req_id, msg) = self.endpoint.subscribe_namespace(namespace_prefix, parameters)?;
674 self.send_control(&msg).await?;
675 Ok(req_id)
676 }
677
678 pub async fn publish_namespace(
680 &mut self,
681 track_namespace: TrackNamespace,
682 parameters: Vec<KeyValuePair>,
683 ) -> Result<VarInt, ConnectionError> {
684 let (req_id, msg) = self.endpoint.publish_namespace(track_namespace, parameters)?;
685 self.send_control(&msg).await?;
686 Ok(req_id)
687 }
688
689 pub async fn track_status(
693 &mut self,
694 track_namespace: TrackNamespace,
695 track_name: Vec<u8>,
696 parameters: Vec<KeyValuePair>,
697 ) -> Result<VarInt, ConnectionError> {
698 let (req_id, msg) = self.endpoint.track_status(track_namespace, track_name, parameters)?;
699 self.send_control(&msg).await?;
700 Ok(req_id)
701 }
702
703 pub async fn publish(
707 &mut self,
708 track_namespace: TrackNamespace,
709 track_name: Vec<u8>,
710 track_alias: VarInt,
711 parameters: Vec<KeyValuePair>,
712 ) -> Result<VarInt, ConnectionError> {
713 let (req_id, msg) =
714 self.endpoint.publish(track_namespace, track_name, track_alias, parameters)?;
715 self.send_control(&msg).await?;
716 Ok(req_id)
717 }
718
719 pub async fn publish_done(
721 &mut self,
722 request_id: VarInt,
723 status_code: VarInt,
724 stream_count: VarInt,
725 reason_phrase: Vec<u8>,
726 ) -> Result<(), ConnectionError> {
727 let msg = self.endpoint.send_publish_done(
728 request_id,
729 status_code,
730 stream_count,
731 reason_phrase,
732 )?;
733 self.send_control(&msg).await
734 }
735
736 pub async fn open_subgroup_stream(
740 &self,
741 header: &AnySubgroupHeader,
742 ) -> Result<FramedSendStream, ConnectionError> {
743 let send = self.transport.open_uni().await?;
744 let mut framed = FramedSendStream::new(send, self.draft);
745 let sid = framed.stream_id();
746 framed.write_subgroup_header(header).await?;
747 self.emit(ClientEvent::StreamOpened {
748 direction: Direction::Send,
749 stream_kind: StreamKind::Subgroup,
750 stream_id: sid,
751 });
752 self.emit(ClientEvent::DataStreamHeader {
753 stream_id: sid,
754 direction: Direction::Send,
755 header: header.clone(),
756 });
757 Ok(framed)
758 }
759
760 pub async fn accept_subgroup_stream(
763 &self,
764 ) -> Result<(AnySubgroupHeader, FramedRecvStream), ConnectionError> {
765 let recv = self.transport.accept_uni().await?;
766 let mut framed = FramedRecvStream::new(recv, self.draft);
767 let sid = framed.stream_id();
768 let header = framed.read_subgroup_header().await?;
769 self.emit(ClientEvent::StreamOpened {
770 direction: Direction::Receive,
771 stream_kind: StreamKind::Subgroup,
772 stream_id: sid,
773 });
774 self.emit(ClientEvent::DataStreamHeader {
775 stream_id: sid,
776 direction: Direction::Receive,
777 header: header.clone(),
778 });
779 Ok((header, framed))
780 }
781
782 pub fn send_datagram(
784 &self,
785 header: &AnyDatagramHeader,
786 payload: &[u8],
787 ) -> Result<(), ConnectionError> {
788 let mut buf = Vec::new();
789 header.encode(&mut buf);
790 buf.extend_from_slice(payload);
791 self.emit(ClientEvent::DatagramReceived {
792 direction: Direction::Send,
793 header: header.clone(),
794 payload_len: payload.len(),
795 });
796 self.transport.send_datagram(bytes::Bytes::from(buf))?;
797 Ok(())
798 }
799
800 pub async fn recv_datagram(&self) -> Result<(AnyDatagramHeader, Bytes), ConnectionError> {
802 let data = self.transport.recv_datagram().await?;
803 let mut cursor = &data[..];
804 let header = AnyDatagramHeader::decode(self.draft, &mut cursor)?;
805 let consumed = data.len() - cursor.len();
806 let payload = data.slice(consumed..);
807 self.emit(ClientEvent::DatagramReceived {
808 direction: Direction::Receive,
809 header: header.clone(),
810 payload_len: payload.len(),
811 });
812 Ok((header, payload))
813 }
814
815 pub fn endpoint(&self) -> &Endpoint {
819 &self.endpoint
820 }
821
822 pub fn endpoint_mut(&mut self) -> &mut Endpoint {
824 &mut self.endpoint
825 }
826
827 pub fn draft(&self) -> DraftVersion {
829 self.draft
830 }
831
832 pub fn close(&self, code: u32, reason: &[u8]) {
834 self.emit(ClientEvent::Closed { code, reason: reason.to_vec() });
835 self.transport.close(code, reason);
836 }
837}
838
839fn varint_len(first_byte: u8) -> usize {
841 1 << (first_byte >> 6)
842}
843
844#[derive(Debug)]
846struct SkipVerification;
847
848impl rustls::client::danger::ServerCertVerifier for SkipVerification {
849 fn verify_server_cert(
850 &self,
851 _end_entity: &rustls::pki_types::CertificateDer<'_>,
852 _intermediates: &[rustls::pki_types::CertificateDer<'_>],
853 _server_name: &rustls::pki_types::ServerName<'_>,
854 _ocsp_response: &[u8],
855 _now: rustls::pki_types::UnixTime,
856 ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
857 Ok(rustls::client::danger::ServerCertVerified::assertion())
858 }
859
860 fn verify_tls12_signature(
861 &self,
862 _message: &[u8],
863 _cert: &rustls::pki_types::CertificateDer<'_>,
864 _dcs: &rustls::DigitallySignedStruct,
865 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
866 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
867 }
868
869 fn verify_tls13_signature(
870 &self,
871 _message: &[u8],
872 _cert: &rustls::pki_types::CertificateDer<'_>,
873 _dcs: &rustls::DigitallySignedStruct,
874 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
875 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
876 }
877
878 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
879 vec![
880 rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
881 rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
882 rustls::SignatureScheme::ED25519,
883 rustls::SignatureScheme::RSA_PSS_SHA256,
884 rustls::SignatureScheme::RSA_PSS_SHA384,
885 rustls::SignatureScheme::RSA_PSS_SHA512,
886 ]
887 }
888}
889
890#[cfg(test)]
891mod tests {
892 use super::*;
893
894 #[test]
895 fn varint_len_single_byte() {
896 assert_eq!(varint_len(0x00), 1);
897 assert_eq!(varint_len(0x3F), 1);
898 }
899
900 #[test]
901 fn varint_len_two_bytes() {
902 assert_eq!(varint_len(0x40), 2);
903 assert_eq!(varint_len(0x7F), 2);
904 }
905
906 #[test]
907 fn varint_len_four_bytes() {
908 assert_eq!(varint_len(0x80), 4);
909 assert_eq!(varint_len(0xBF), 4);
910 }
911
912 #[test]
913 fn varint_len_eight_bytes() {
914 assert_eq!(varint_len(0xC0), 8);
915 assert_eq!(varint_len(0xFF), 8);
916 }
917
918 #[test]
919 fn client_config_alpn_quic_draft15() {
920 let config = ClientConfig {
921 draft: DraftVersion::Draft15,
922 transport: TransportType::Quic,
923 skip_cert_verification: false,
924 ca_certs: Vec::new(),
925 setup_parameters: Vec::new(),
926 };
927 assert_eq!(config.alpn(), vec![b"moqt-15".to_vec()]);
928 }
929
930 #[test]
931 fn client_config_alpn_webtransport() {
932 let config = ClientConfig {
933 draft: DraftVersion::Draft15,
934 transport: TransportType::WebTransport { url: "https://example.com".to_string() },
935 skip_cert_verification: false,
936 ca_certs: Vec::new(),
937 setup_parameters: Vec::new(),
938 };
939 assert_eq!(config.alpn(), vec![b"h3".to_vec()]);
940 }
941
942 #[test]
943 fn moqt_alpn_value() {
944 assert_eq!(MOQT_ALPN, b"moq-00");
945 }
946
947 #[test]
948 fn transport_type_debug() {
949 let quic = TransportType::Quic;
950 assert!(format!("{quic:?}").contains("Quic"));
951
952 let wt = TransportType::WebTransport { url: "https://example.com".to_string() };
953 assert!(format!("{wt:?}").contains("WebTransport"));
954 }
955}