Skip to main content

moqtap_trace/
event.rs

1use ciborium::Value;
2
3use crate::error::MoqTraceError;
4
5/// Direction of a message or stream relative to the recording endpoint.
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum Direction {
8    /// Sent (outgoing). Wire value: `0`.
9    Send,
10    /// Received (incoming). Wire value: `1`.
11    Receive,
12}
13
14impl Direction {
15    fn from_cbor(v: &Value) -> Result<Self, MoqTraceError> {
16        match v.as_integer().and_then(|i| u64::try_from(i).ok()) {
17            Some(0) => Ok(Direction::Send),
18            Some(1) => Ok(Direction::Receive),
19            _ => Err(MoqTraceError::InvalidEvent("invalid direction value".into())),
20        }
21    }
22}
23
24/// Data stream type.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum StreamType {
27    /// Subgroup stream. Wire value: `0`.
28    Subgroup,
29    /// Datagram. Wire value: `1`.
30    Datagram,
31    /// Fetch stream. Wire value: `2`.
32    Fetch,
33}
34
35impl StreamType {
36    fn from_cbor(v: &Value) -> Result<Self, MoqTraceError> {
37        match v.as_integer().and_then(|i| u64::try_from(i).ok()) {
38            Some(0) => Ok(StreamType::Subgroup),
39            Some(1) => Ok(StreamType::Datagram),
40            Some(2) => Ok(StreamType::Fetch),
41            _ => Err(MoqTraceError::InvalidEvent("invalid stream type value".into())),
42        }
43    }
44}
45
46/// Event type discriminant. Matches FORMAT.md `"e"` values.
47const EVENT_CONTROL_MESSAGE: u64 = 0;
48const EVENT_STREAM_OPENED: u64 = 1;
49const EVENT_STREAM_CLOSED: u64 = 2;
50const EVENT_OBJECT_HEADER: u64 = 3;
51const EVENT_OBJECT_PAYLOAD: u64 = 4;
52const EVENT_STATE_CHANGE: u64 = 5;
53const EVENT_ERROR: u64 = 6;
54const EVENT_ANNOTATION: u64 = 7;
55
56/// A single event in a `.moqtrace` file.
57#[derive(Debug, Clone, PartialEq)]
58pub struct TraceEvent {
59    /// Monotonically increasing sequence number (0-based).
60    pub seq: u64,
61    /// Timestamp in microseconds since the header's `startTime`.
62    pub timestamp: i64,
63    /// Event-specific data.
64    pub data: EventData,
65}
66
67/// Event-specific payload, discriminated by type.
68#[derive(Debug, Clone, PartialEq)]
69pub enum EventData {
70    /// A control-stream message was sent or received (event type 0).
71    ControlMessage {
72        /// Send or receive.
73        direction: Direction,
74        /// Wire message type ID (e.g. `0x03` for SUBSCRIBE).
75        message_type: u64,
76        /// Decoded message fields as an opaque CBOR value.
77        message: Value,
78        /// Raw wire bytes (only at `full` detail level).
79        raw: Option<Vec<u8>>,
80    },
81    /// A QUIC stream was opened (event type 1).
82    StreamOpened {
83        /// QUIC stream ID.
84        stream_id: u64,
85        /// Outgoing or incoming.
86        direction: Direction,
87        /// Stream type.
88        stream_type: StreamType,
89    },
90    /// A QUIC stream was closed (event type 2).
91    StreamClosed {
92        /// QUIC stream ID.
93        stream_id: u64,
94        /// Error code (0 = clean close).
95        error_code: u64,
96    },
97    /// An object header was parsed from a data stream (event type 3).
98    ObjectHeader {
99        /// Stream ID this object arrived on.
100        stream_id: u64,
101        /// Group ID.
102        group: u64,
103        /// Object ID.
104        object: u64,
105        /// Publisher priority.
106        publisher_priority: u64,
107        /// Object status (0=normal, 1=end-of-group, etc.).
108        object_status: u64,
109    },
110    /// Object payload bytes were received or sent (event type 4).
111    ObjectPayload {
112        /// Stream ID.
113        stream_id: u64,
114        /// Group ID.
115        group: u64,
116        /// Object ID.
117        object: u64,
118        /// Payload size in bytes.
119        size: u64,
120        /// Payload bytes (only at `headers+data` or `full` level).
121        payload: Option<Vec<u8>>,
122    },
123    /// Session FSM phase transition (event type 5).
124    StateChange {
125        /// Previous session phase.
126        from: String,
127        /// New session phase.
128        to: String,
129    },
130    /// Protocol or transport error (event type 6).
131    Error {
132        /// Error code.
133        error_code: u64,
134        /// Human-readable reason.
135        reason: String,
136    },
137    /// User-defined annotation (event type 7).
138    Annotation {
139        /// User-defined label.
140        label: String,
141        /// User-defined data (any CBOR type).
142        data: Value,
143    },
144}
145
146impl TraceEvent {
147    /// Extract the `request_id` from a control message's decoded `"msg"`
148    /// field, if present.
149    ///
150    /// Returns `None` for non-control-message events or if the `"msg"` map
151    /// does not contain a `"requestId"` key.
152    pub fn request_id(&self) -> Option<u64> {
153        if let EventData::ControlMessage { message: Value::Map(ref pairs), .. } = self.data {
154            for (k, v) in pairs {
155                if k.as_text() == Some("requestId") {
156                    return v.as_integer().and_then(|i| u64::try_from(i).ok());
157                }
158            }
159        }
160        None
161    }
162
163    /// Return the message type for control message events.
164    pub fn message_type(&self) -> Option<u64> {
165        if let EventData::ControlMessage { message_type, .. } = self.data {
166            Some(message_type)
167        } else {
168            None
169        }
170    }
171
172    /// Return the direction for events that have one.
173    pub fn direction(&self) -> Option<Direction> {
174        match &self.data {
175            EventData::ControlMessage { direction, .. }
176            | EventData::StreamOpened { direction, .. } => Some(*direction),
177            _ => None,
178        }
179    }
180}
181
182// ── CBOR conversion ────────────────────────────────────────
183
184/// Wrapper that serializes as a CBOR byte string (major type 2). Without this
185/// wrapper, serializing `&[u8]` via generic `Serialize` produces a CBOR array
186/// of u8.
187struct ByteStr<'a>(&'a [u8]);
188
189impl serde::Serialize for ByteStr<'_> {
190    #[inline]
191    fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
192        s.serialize_bytes(self.0)
193    }
194}
195
196impl Direction {
197    #[inline]
198    fn to_u64(self) -> u64 {
199        match self {
200            Direction::Send => 0,
201            Direction::Receive => 1,
202        }
203    }
204}
205
206impl StreamType {
207    #[inline]
208    fn to_u64(self) -> u64 {
209        match self {
210            StreamType::Subgroup => 0,
211            StreamType::Datagram => 1,
212            StreamType::Fetch => 2,
213        }
214    }
215}
216
217/// Build a CBOR [`Value`] from a [`TraceEvent`] by running it through the
218/// crate's `Serialize` impl. Convenience for tests and inspection — the
219/// hot write path in [`MoqTraceWriter`](crate::writer::MoqTraceWriter) uses
220/// `Serialize` directly and never materializes a `Value`.
221impl From<&TraceEvent> for Value {
222    fn from(event: &TraceEvent) -> Self {
223        Value::serialized(event).expect("TraceEvent serialization is infallible")
224    }
225}
226
227impl serde::Serialize for TraceEvent {
228    fn serialize<S: serde::Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
229        use serde::ser::SerializeMap;
230
231        // Pre-count entries so the CBOR map is encoded with a definite length.
232        let entries = 3 /* n, t, e */ + match &self.data {
233            EventData::ControlMessage { raw, .. } => 3 + usize::from(raw.is_some()),
234            EventData::StreamOpened { .. } => 3,
235            EventData::StreamClosed { .. } => 2,
236            EventData::ObjectHeader { .. } => 5,
237            EventData::ObjectPayload { payload, .. } => 4 + usize::from(payload.is_some()),
238            EventData::StateChange { .. } => 2,
239            EventData::Error { .. } => 2,
240            EventData::Annotation { .. } => 2,
241        };
242
243        let mut map = ser.serialize_map(Some(entries))?;
244        map.serialize_entry("n", &self.seq)?;
245        map.serialize_entry("t", &self.timestamp)?;
246
247        match &self.data {
248            EventData::ControlMessage { direction, message_type, message, raw } => {
249                map.serialize_entry("e", &EVENT_CONTROL_MESSAGE)?;
250                map.serialize_entry("d", &direction.to_u64())?;
251                map.serialize_entry("mt", message_type)?;
252                map.serialize_entry("msg", message)?;
253                if let Some(raw) = raw {
254                    map.serialize_entry("raw", &ByteStr(raw))?;
255                }
256            }
257            EventData::StreamOpened { stream_id, direction, stream_type } => {
258                map.serialize_entry("e", &EVENT_STREAM_OPENED)?;
259                map.serialize_entry("sid", stream_id)?;
260                map.serialize_entry("d", &direction.to_u64())?;
261                map.serialize_entry("st", &stream_type.to_u64())?;
262            }
263            EventData::StreamClosed { stream_id, error_code } => {
264                map.serialize_entry("e", &EVENT_STREAM_CLOSED)?;
265                map.serialize_entry("sid", stream_id)?;
266                map.serialize_entry("ec", error_code)?;
267            }
268            EventData::ObjectHeader {
269                stream_id,
270                group,
271                object,
272                publisher_priority,
273                object_status,
274            } => {
275                map.serialize_entry("e", &EVENT_OBJECT_HEADER)?;
276                map.serialize_entry("sid", stream_id)?;
277                map.serialize_entry("g", group)?;
278                map.serialize_entry("o", object)?;
279                map.serialize_entry("pp", publisher_priority)?;
280                map.serialize_entry("os", object_status)?;
281            }
282            EventData::ObjectPayload { stream_id, group, object, size, payload } => {
283                map.serialize_entry("e", &EVENT_OBJECT_PAYLOAD)?;
284                map.serialize_entry("sid", stream_id)?;
285                map.serialize_entry("g", group)?;
286                map.serialize_entry("o", object)?;
287                map.serialize_entry("sz", size)?;
288                if let Some(pl) = payload {
289                    map.serialize_entry("pl", &ByteStr(pl))?;
290                }
291            }
292            EventData::StateChange { from, to } => {
293                map.serialize_entry("e", &EVENT_STATE_CHANGE)?;
294                map.serialize_entry("from", from)?;
295                map.serialize_entry("to", to)?;
296            }
297            EventData::Error { error_code, reason } => {
298                map.serialize_entry("e", &EVENT_ERROR)?;
299                map.serialize_entry("ec", error_code)?;
300                map.serialize_entry("reason", reason)?;
301            }
302            EventData::Annotation { label, data } => {
303                map.serialize_entry("e", &EVENT_ANNOTATION)?;
304                map.serialize_entry("label", label)?;
305                map.serialize_entry("data", data)?;
306            }
307        }
308
309        map.end()
310    }
311}
312
313/// Helper to extract a u64 from a CBOR map by key.
314fn get_uint(pairs: &[(Value, Value)], key: &str) -> Option<u64> {
315    pairs.iter().find_map(|(k, v)| {
316        if k.as_text() == Some(key) {
317            v.as_integer().and_then(|i| u64::try_from(i).ok())
318        } else {
319            None
320        }
321    })
322}
323
324/// Helper to extract an i64 from a CBOR map by key.
325fn get_int(pairs: &[(Value, Value)], key: &str) -> Option<i64> {
326    pairs.iter().find_map(|(k, v)| {
327        if k.as_text() == Some(key) {
328            v.as_integer().and_then(|i| i64::try_from(i).ok())
329        } else {
330            None
331        }
332    })
333}
334
335/// Helper to extract a text string from a CBOR map by key.
336fn get_text(pairs: &[(Value, Value)], key: &str) -> Option<String> {
337    pairs.iter().find_map(|(k, v)| {
338        if k.as_text() == Some(key) {
339            v.as_text().map(|s| s.to_string())
340        } else {
341            None
342        }
343    })
344}
345
346/// Helper to extract a value from a CBOR map by key.
347fn get_value(pairs: &[(Value, Value)], key: &str) -> Option<Value> {
348    pairs.iter().find_map(|(k, v)| if k.as_text() == Some(key) { Some(v.clone()) } else { None })
349}
350
351/// Helper to extract byte string from a CBOR map by key.
352fn get_bytes(pairs: &[(Value, Value)], key: &str) -> Option<Vec<u8>> {
353    pairs.iter().find_map(|(k, v)| {
354        if k.as_text() == Some(key) {
355            v.as_bytes().map(|b| b.to_vec())
356        } else {
357            None
358        }
359    })
360}
361
362fn require_uint(pairs: &[(Value, Value)], key: &str) -> Result<u64, MoqTraceError> {
363    get_uint(pairs, key).ok_or_else(|| MoqTraceError::InvalidEvent(format!("missing '{key}'")))
364}
365
366fn require_int(pairs: &[(Value, Value)], key: &str) -> Result<i64, MoqTraceError> {
367    get_int(pairs, key).ok_or_else(|| MoqTraceError::InvalidEvent(format!("missing '{key}'")))
368}
369
370fn require_text(pairs: &[(Value, Value)], key: &str) -> Result<String, MoqTraceError> {
371    get_text(pairs, key).ok_or_else(|| MoqTraceError::InvalidEvent(format!("missing '{key}'")))
372}
373
374fn require_value(pairs: &[(Value, Value)], key: &str) -> Result<Value, MoqTraceError> {
375    get_value(pairs, key).ok_or_else(|| MoqTraceError::InvalidEvent(format!("missing '{key}'")))
376}
377
378fn require_direction(pairs: &[(Value, Value)], key: &str) -> Result<Direction, MoqTraceError> {
379    let v = require_value(pairs, key)?;
380    Direction::from_cbor(&v)
381}
382
383impl TryFrom<Value> for TraceEvent {
384    type Error = MoqTraceError;
385
386    fn try_from(value: Value) -> Result<Self, MoqTraceError> {
387        let pairs = match value {
388            Value::Map(pairs) => pairs,
389            _ => return Err(MoqTraceError::InvalidEvent("event is not a CBOR map".into())),
390        };
391
392        let seq = require_uint(&pairs, "n")?;
393        let timestamp = require_int(&pairs, "t")?;
394        let event_type = require_uint(&pairs, "e")?;
395
396        let data = match event_type {
397            EVENT_CONTROL_MESSAGE => EventData::ControlMessage {
398                direction: require_direction(&pairs, "d")?,
399                message_type: require_uint(&pairs, "mt")?,
400                message: require_value(&pairs, "msg")?,
401                raw: get_bytes(&pairs, "raw"),
402            },
403            EVENT_STREAM_OPENED => {
404                let st_val = require_value(&pairs, "st")?;
405                EventData::StreamOpened {
406                    stream_id: require_uint(&pairs, "sid")?,
407                    direction: require_direction(&pairs, "d")?,
408                    stream_type: StreamType::from_cbor(&st_val)?,
409                }
410            }
411            EVENT_STREAM_CLOSED => EventData::StreamClosed {
412                stream_id: require_uint(&pairs, "sid")?,
413                error_code: require_uint(&pairs, "ec")?,
414            },
415            EVENT_OBJECT_HEADER => EventData::ObjectHeader {
416                stream_id: require_uint(&pairs, "sid")?,
417                group: require_uint(&pairs, "g")?,
418                object: require_uint(&pairs, "o")?,
419                publisher_priority: require_uint(&pairs, "pp")?,
420                object_status: require_uint(&pairs, "os")?,
421            },
422            EVENT_OBJECT_PAYLOAD => EventData::ObjectPayload {
423                stream_id: require_uint(&pairs, "sid")?,
424                group: require_uint(&pairs, "g")?,
425                object: require_uint(&pairs, "o")?,
426                size: require_uint(&pairs, "sz")?,
427                payload: get_bytes(&pairs, "pl"),
428            },
429            EVENT_STATE_CHANGE => EventData::StateChange {
430                from: require_text(&pairs, "from")?,
431                to: require_text(&pairs, "to")?,
432            },
433            EVENT_ERROR => EventData::Error {
434                error_code: require_uint(&pairs, "ec")?,
435                reason: require_text(&pairs, "reason")?,
436            },
437            EVENT_ANNOTATION => EventData::Annotation {
438                label: require_text(&pairs, "label")?,
439                data: require_value(&pairs, "data")?,
440            },
441            other => {
442                return Err(MoqTraceError::InvalidEvent(format!("unknown event type: {other}")))
443            }
444        };
445
446        Ok(TraceEvent { seq, timestamp, data })
447    }
448}