Packet Inspection
ruopus exposes two types for reading the structure of raw Opus packet bytes
without decoding audio: Toc (the single header byte) and
Packet (the full parsed packet including its frames).
The TOC Byte
Every Opus packet starts with a table-of-contents (TOC) byte that encodes the mode, bandwidth, frame size, channel count, and framing code in 8 bits (RFC 6716 §3.1).
Toc wraps that byte:
import ruopus
# From a raw packet
raw = b"\x78\x01\x02\x03\x04"
toc = ruopus.Toc(raw[0])
print(toc)
# Or from the parsed packet:
pkt = ruopus.Packet(raw)
toc = pkt.toc
Reading TOC Fields
toc = ruopus.Toc(raw[0])
print(f"Mode: {toc.mode}") # Mode.CeltOnly / Mode.Hybrid / Mode.SilkOnly
print(f"Bandwidth: {toc.bandwidth}") # Bandwidth.FullBand, etc.
print(f"Frame size: {toc.frame_size}") # FrameSize.Ms20, etc.
print(f"Channels: {toc.channels}") # 1 or 2
print(f"Stereo: {toc.stereo}") # bool
print(f"Config: {toc.config}") # raw 5-bit config number (0-31)
print(f"FCC: {toc.frame_count_code}") # frame framing code (0-3)
print(f"Raw byte: {toc.byte:#04x}")
Building a TOC from Parts
from_parts() constructs a TOC from its three bitfields, useful
for building test vectors or inspecting the config space:
toc = ruopus.Toc.from_parts(config=16, stereo=True, frame_count_code=0)
print(toc.mode, toc.bandwidth, toc.frame_size)
Hashing and Equality
Toc is hashable and supports equality, so you can count unique
configurations in a stream:
from collections import Counter
counts = Counter(ruopus.Toc(pkt[0]).config for pkt in raw_packets)
print(counts.most_common(5))
Parsing a Full Packet
Packet parses the complete RFC 6716 §3 structure: the TOC byte
plus the compressed audio frames. It validates all R1-R7 bitstream constraints.
pkt = ruopus.Packet(raw_bytes)
print(f"Frames: {len(pkt)}") # number of encoded frames
print(f"Duration: {pkt.duration}") # datetime.timedelta
print(f"Padding: {pkt.padding} bytes") # code-3 padding only
for i, frame in enumerate(pkt.frames):
print(f" frame[{i}]: {len(frame)} bytes")
# Sequence protocol: pkt[0] == pkt.frames[0]
first_frame = pkt[0] # bytes
last_frame = pkt[-1] # negative indexing supported
Empty frames (zero bytes) indicate a DTX (discontinuous transmission) packet; the decoder treats them as a short silence.
Self-Delimited Parsing
Multistream payloads use self-delimited framing (RFC 6716 Appendix B) so
that each elementary stream’s extent is unambiguous. Parse with
parse_self_delimited():
payload = b"..." # concatenated self-delimited streams
offset = 0
while offset < len(payload):
pkt, consumed = ruopus.Packet.parse_self_delimited(payload[offset:])
print(f"Stream at {offset}: {len(pkt)} frames, {consumed} bytes")
offset += consumed
Inspecting a Live Stream
A typical monitoring loop that logs packet stats without touching audio:
import ruopus
from collections import defaultdict
mode_counts = defaultdict(int)
bw_counts = defaultdict(int)
total_bytes = 0
for raw_pkt in incoming_packets():
pkt = ruopus.Packet(raw_pkt)
mode_counts[pkt.toc.mode] += 1
bw_counts[pkt.toc.bandwidth] += 1
total_bytes += len(raw_pkt)
print("Mode distribution:", dict(mode_counts))
print("Bandwidth distribution:", dict(bw_counts))
print(f"Total compressed data: {total_bytes / 1024:.1f} KB")
Bit-Exactness Check
After a round-trip encode → decode, compare the encoder’s
final_range with the decoder’s
final_range. A conformant implementation produces
identical values:
import numpy as np
import ruopus
enc = ruopus.OpusEncoder(1)
dec = ruopus.OpusDecoder(1)
frame = np.zeros(960, dtype=np.float32)
packet = enc.encode_auto(frame)
_pcm = dec.decode_packet(packet)
assert enc.final_range == dec.final_range, "Bit-exactness violation!"
print(f"Range coder state: {enc.final_range:#010x}")