1use ciborium::Value;
2
3use crate::error::MoqTraceError;
4
5#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum Direction {
8 Send,
10 Receive,
12}
13
14impl Direction {
15 fn from_cbor(v: &Value) -> Result<Self, MoqTraceError> {
16 match v.as_integer().and_then(|i| u64::try_from(i).ok()) {
17 Some(0) => Ok(Direction::Send),
18 Some(1) => Ok(Direction::Receive),
19 _ => Err(MoqTraceError::InvalidEvent("invalid direction value".into())),
20 }
21 }
22}
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum StreamType {
27 Subgroup,
29 Datagram,
31 Fetch,
33}
34
35impl StreamType {
36 fn from_cbor(v: &Value) -> Result<Self, MoqTraceError> {
37 match v.as_integer().and_then(|i| u64::try_from(i).ok()) {
38 Some(0) => Ok(StreamType::Subgroup),
39 Some(1) => Ok(StreamType::Datagram),
40 Some(2) => Ok(StreamType::Fetch),
41 _ => Err(MoqTraceError::InvalidEvent("invalid stream type value".into())),
42 }
43 }
44}
45
46const EVENT_CONTROL_MESSAGE: u64 = 0;
48const EVENT_STREAM_OPENED: u64 = 1;
49const EVENT_STREAM_CLOSED: u64 = 2;
50const EVENT_OBJECT_HEADER: u64 = 3;
51const EVENT_OBJECT_PAYLOAD: u64 = 4;
52const EVENT_STATE_CHANGE: u64 = 5;
53const EVENT_ERROR: u64 = 6;
54const EVENT_ANNOTATION: u64 = 7;
55
56#[derive(Debug, Clone, PartialEq)]
58pub struct TraceEvent {
59 pub seq: u64,
61 pub timestamp: i64,
63 pub data: EventData,
65}
66
67#[derive(Debug, Clone, PartialEq)]
69pub enum EventData {
70 ControlMessage {
72 direction: Direction,
74 message_type: u64,
76 message: Value,
78 raw: Option<Vec<u8>>,
80 },
81 StreamOpened {
83 stream_id: u64,
85 direction: Direction,
87 stream_type: StreamType,
89 },
90 StreamClosed {
92 stream_id: u64,
94 error_code: u64,
96 },
97 ObjectHeader {
99 stream_id: u64,
101 group: u64,
103 object: u64,
105 publisher_priority: u64,
107 object_status: u64,
109 },
110 ObjectPayload {
112 stream_id: u64,
114 group: u64,
116 object: u64,
118 size: u64,
120 payload: Option<Vec<u8>>,
122 },
123 StateChange {
125 from: String,
127 to: String,
129 },
130 Error {
132 error_code: u64,
134 reason: String,
136 },
137 Annotation {
139 label: String,
141 data: Value,
143 },
144}
145
146impl TraceEvent {
147 pub fn request_id(&self) -> Option<u64> {
153 if let EventData::ControlMessage { message: Value::Map(ref pairs), .. } = self.data {
154 for (k, v) in pairs {
155 if k.as_text() == Some("requestId") {
156 return v.as_integer().and_then(|i| u64::try_from(i).ok());
157 }
158 }
159 }
160 None
161 }
162
163 pub fn message_type(&self) -> Option<u64> {
165 if let EventData::ControlMessage { message_type, .. } = self.data {
166 Some(message_type)
167 } else {
168 None
169 }
170 }
171
172 pub fn direction(&self) -> Option<Direction> {
174 match &self.data {
175 EventData::ControlMessage { direction, .. }
176 | EventData::StreamOpened { direction, .. } => Some(*direction),
177 _ => None,
178 }
179 }
180}
181
182struct ByteStr<'a>(&'a [u8]);
188
189impl serde::Serialize for ByteStr<'_> {
190 #[inline]
191 fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
192 s.serialize_bytes(self.0)
193 }
194}
195
196impl Direction {
197 #[inline]
198 fn to_u64(self) -> u64 {
199 match self {
200 Direction::Send => 0,
201 Direction::Receive => 1,
202 }
203 }
204}
205
206impl StreamType {
207 #[inline]
208 fn to_u64(self) -> u64 {
209 match self {
210 StreamType::Subgroup => 0,
211 StreamType::Datagram => 1,
212 StreamType::Fetch => 2,
213 }
214 }
215}
216
217impl From<&TraceEvent> for Value {
222 fn from(event: &TraceEvent) -> Self {
223 Value::serialized(event).expect("TraceEvent serialization is infallible")
224 }
225}
226
227impl serde::Serialize for TraceEvent {
228 fn serialize<S: serde::Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
229 use serde::ser::SerializeMap;
230
231 let entries = 3 + match &self.data {
233 EventData::ControlMessage { raw, .. } => 3 + usize::from(raw.is_some()),
234 EventData::StreamOpened { .. } => 3,
235 EventData::StreamClosed { .. } => 2,
236 EventData::ObjectHeader { .. } => 5,
237 EventData::ObjectPayload { payload, .. } => 4 + usize::from(payload.is_some()),
238 EventData::StateChange { .. } => 2,
239 EventData::Error { .. } => 2,
240 EventData::Annotation { .. } => 2,
241 };
242
243 let mut map = ser.serialize_map(Some(entries))?;
244 map.serialize_entry("n", &self.seq)?;
245 map.serialize_entry("t", &self.timestamp)?;
246
247 match &self.data {
248 EventData::ControlMessage { direction, message_type, message, raw } => {
249 map.serialize_entry("e", &EVENT_CONTROL_MESSAGE)?;
250 map.serialize_entry("d", &direction.to_u64())?;
251 map.serialize_entry("mt", message_type)?;
252 map.serialize_entry("msg", message)?;
253 if let Some(raw) = raw {
254 map.serialize_entry("raw", &ByteStr(raw))?;
255 }
256 }
257 EventData::StreamOpened { stream_id, direction, stream_type } => {
258 map.serialize_entry("e", &EVENT_STREAM_OPENED)?;
259 map.serialize_entry("sid", stream_id)?;
260 map.serialize_entry("d", &direction.to_u64())?;
261 map.serialize_entry("st", &stream_type.to_u64())?;
262 }
263 EventData::StreamClosed { stream_id, error_code } => {
264 map.serialize_entry("e", &EVENT_STREAM_CLOSED)?;
265 map.serialize_entry("sid", stream_id)?;
266 map.serialize_entry("ec", error_code)?;
267 }
268 EventData::ObjectHeader {
269 stream_id,
270 group,
271 object,
272 publisher_priority,
273 object_status,
274 } => {
275 map.serialize_entry("e", &EVENT_OBJECT_HEADER)?;
276 map.serialize_entry("sid", stream_id)?;
277 map.serialize_entry("g", group)?;
278 map.serialize_entry("o", object)?;
279 map.serialize_entry("pp", publisher_priority)?;
280 map.serialize_entry("os", object_status)?;
281 }
282 EventData::ObjectPayload { stream_id, group, object, size, payload } => {
283 map.serialize_entry("e", &EVENT_OBJECT_PAYLOAD)?;
284 map.serialize_entry("sid", stream_id)?;
285 map.serialize_entry("g", group)?;
286 map.serialize_entry("o", object)?;
287 map.serialize_entry("sz", size)?;
288 if let Some(pl) = payload {
289 map.serialize_entry("pl", &ByteStr(pl))?;
290 }
291 }
292 EventData::StateChange { from, to } => {
293 map.serialize_entry("e", &EVENT_STATE_CHANGE)?;
294 map.serialize_entry("from", from)?;
295 map.serialize_entry("to", to)?;
296 }
297 EventData::Error { error_code, reason } => {
298 map.serialize_entry("e", &EVENT_ERROR)?;
299 map.serialize_entry("ec", error_code)?;
300 map.serialize_entry("reason", reason)?;
301 }
302 EventData::Annotation { label, data } => {
303 map.serialize_entry("e", &EVENT_ANNOTATION)?;
304 map.serialize_entry("label", label)?;
305 map.serialize_entry("data", data)?;
306 }
307 }
308
309 map.end()
310 }
311}
312
313fn get_uint(pairs: &[(Value, Value)], key: &str) -> Option<u64> {
315 pairs.iter().find_map(|(k, v)| {
316 if k.as_text() == Some(key) {
317 v.as_integer().and_then(|i| u64::try_from(i).ok())
318 } else {
319 None
320 }
321 })
322}
323
324fn get_int(pairs: &[(Value, Value)], key: &str) -> Option<i64> {
326 pairs.iter().find_map(|(k, v)| {
327 if k.as_text() == Some(key) {
328 v.as_integer().and_then(|i| i64::try_from(i).ok())
329 } else {
330 None
331 }
332 })
333}
334
335fn get_text(pairs: &[(Value, Value)], key: &str) -> Option<String> {
337 pairs.iter().find_map(|(k, v)| {
338 if k.as_text() == Some(key) {
339 v.as_text().map(|s| s.to_string())
340 } else {
341 None
342 }
343 })
344}
345
346fn get_value(pairs: &[(Value, Value)], key: &str) -> Option<Value> {
348 pairs.iter().find_map(|(k, v)| if k.as_text() == Some(key) { Some(v.clone()) } else { None })
349}
350
351fn get_bytes(pairs: &[(Value, Value)], key: &str) -> Option<Vec<u8>> {
353 pairs.iter().find_map(|(k, v)| {
354 if k.as_text() == Some(key) {
355 v.as_bytes().map(|b| b.to_vec())
356 } else {
357 None
358 }
359 })
360}
361
362fn require_uint(pairs: &[(Value, Value)], key: &str) -> Result<u64, MoqTraceError> {
363 get_uint(pairs, key).ok_or_else(|| MoqTraceError::InvalidEvent(format!("missing '{key}'")))
364}
365
366fn require_int(pairs: &[(Value, Value)], key: &str) -> Result<i64, MoqTraceError> {
367 get_int(pairs, key).ok_or_else(|| MoqTraceError::InvalidEvent(format!("missing '{key}'")))
368}
369
370fn require_text(pairs: &[(Value, Value)], key: &str) -> Result<String, MoqTraceError> {
371 get_text(pairs, key).ok_or_else(|| MoqTraceError::InvalidEvent(format!("missing '{key}'")))
372}
373
374fn require_value(pairs: &[(Value, Value)], key: &str) -> Result<Value, MoqTraceError> {
375 get_value(pairs, key).ok_or_else(|| MoqTraceError::InvalidEvent(format!("missing '{key}'")))
376}
377
378fn require_direction(pairs: &[(Value, Value)], key: &str) -> Result<Direction, MoqTraceError> {
379 let v = require_value(pairs, key)?;
380 Direction::from_cbor(&v)
381}
382
383impl TryFrom<Value> for TraceEvent {
384 type Error = MoqTraceError;
385
386 fn try_from(value: Value) -> Result<Self, MoqTraceError> {
387 let pairs = match value {
388 Value::Map(pairs) => pairs,
389 _ => return Err(MoqTraceError::InvalidEvent("event is not a CBOR map".into())),
390 };
391
392 let seq = require_uint(&pairs, "n")?;
393 let timestamp = require_int(&pairs, "t")?;
394 let event_type = require_uint(&pairs, "e")?;
395
396 let data = match event_type {
397 EVENT_CONTROL_MESSAGE => EventData::ControlMessage {
398 direction: require_direction(&pairs, "d")?,
399 message_type: require_uint(&pairs, "mt")?,
400 message: require_value(&pairs, "msg")?,
401 raw: get_bytes(&pairs, "raw"),
402 },
403 EVENT_STREAM_OPENED => {
404 let st_val = require_value(&pairs, "st")?;
405 EventData::StreamOpened {
406 stream_id: require_uint(&pairs, "sid")?,
407 direction: require_direction(&pairs, "d")?,
408 stream_type: StreamType::from_cbor(&st_val)?,
409 }
410 }
411 EVENT_STREAM_CLOSED => EventData::StreamClosed {
412 stream_id: require_uint(&pairs, "sid")?,
413 error_code: require_uint(&pairs, "ec")?,
414 },
415 EVENT_OBJECT_HEADER => EventData::ObjectHeader {
416 stream_id: require_uint(&pairs, "sid")?,
417 group: require_uint(&pairs, "g")?,
418 object: require_uint(&pairs, "o")?,
419 publisher_priority: require_uint(&pairs, "pp")?,
420 object_status: require_uint(&pairs, "os")?,
421 },
422 EVENT_OBJECT_PAYLOAD => EventData::ObjectPayload {
423 stream_id: require_uint(&pairs, "sid")?,
424 group: require_uint(&pairs, "g")?,
425 object: require_uint(&pairs, "o")?,
426 size: require_uint(&pairs, "sz")?,
427 payload: get_bytes(&pairs, "pl"),
428 },
429 EVENT_STATE_CHANGE => EventData::StateChange {
430 from: require_text(&pairs, "from")?,
431 to: require_text(&pairs, "to")?,
432 },
433 EVENT_ERROR => EventData::Error {
434 error_code: require_uint(&pairs, "ec")?,
435 reason: require_text(&pairs, "reason")?,
436 },
437 EVENT_ANNOTATION => EventData::Annotation {
438 label: require_text(&pairs, "label")?,
439 data: require_value(&pairs, "data")?,
440 },
441 other => {
442 return Err(MoqTraceError::InvalidEvent(format!("unknown event type: {other}")))
443 }
444 };
445
446 Ok(TraceEvent { seq, timestamp, data })
447 }
448}