HamRemote/opus/api/decoder.py

188 lines
5.0 KiB
Python
Raw Normal View History

2023-02-09 08:33:01 +01:00
# -*- coding: utf-8 -*-
import array
import ctypes
from opus.api import libopus, c_int_pointer, c_int16_pointer, c_float_pointer
from opus.exceptions import OpusError
class Decoder(ctypes.Structure):
"""Opus decoder state.
This contains the complete state of an Opus decoder.
"""
pass
DecoderPointer = ctypes.POINTER(Decoder)
get_size = libopus.opus_decoder_get_size
get_size.argtypes = (ctypes.c_int,)
get_size.restype = ctypes.c_int
get_size.__doc__ = 'Gets the size of an OpusDecoder structure'
_create = libopus.opus_decoder_create
_create.argtypes = (ctypes.c_int, ctypes.c_int, c_int_pointer)
_create.restype = DecoderPointer
def create(fs, channels):
"""Allocates and initializes a decoder state"""
result_code = ctypes.c_int()
result = _create(fs, channels, ctypes.byref(result_code))
if result_code.value is not 0:
raise OpusError(result_code.value)
return result
_packet_get_bandwidth = libopus.opus_packet_get_bandwidth
_packet_get_bandwidth.argtypes = (ctypes.c_char_p,)
_packet_get_bandwidth.restype = ctypes.c_int
def packet_get_bandwidth(data):
"""Gets the bandwidth of an Opus packet."""
data_pointer = ctypes.c_char_p(data)
result = _packet_get_bandwidth(data_pointer)
if result < 0:
raise OpusError(result)
return result
_packet_get_nb_channels = libopus.opus_packet_get_nb_channels
_packet_get_nb_channels.argtypes = (ctypes.c_char_p,)
_packet_get_nb_channels.restype = ctypes.c_int
def packet_get_nb_channels(data):
"""Gets the number of channels from an Opus packet"""
data_pointer = ctypes.c_char_p(data)
result = _packet_get_nb_channels(data_pointer)
if result < 0:
raise OpusError(result)
return result
_packet_get_nb_frames = libopus.opus_packet_get_nb_frames
_packet_get_nb_frames.argtypes = (ctypes.c_char_p, ctypes.c_int)
_packet_get_nb_frames.restype = ctypes.c_int
def packet_get_nb_frames(data, length=None):
"""Gets the number of frames in an Opus packet"""
data_pointer = ctypes.c_char_p(data)
if length is None:
length = len(data)
result = _packet_get_nb_frames(data_pointer, ctypes.c_int(length))
if result < 0:
raise OpusError(result)
return result
_packet_get_samples_per_frame = libopus.opus_packet_get_samples_per_frame
_packet_get_samples_per_frame.argtypes = (ctypes.c_char_p, ctypes.c_int)
_packet_get_samples_per_frame.restype = ctypes.c_int
def packet_get_samples_per_frame(data, fs):
"""Gets the number of samples per frame from an Opus packet"""
data_pointer = ctypes.c_char_p(data)
result = _packet_get_nb_frames(data_pointer, ctypes.c_int(fs))
if result < 0:
raise OpusError(result)
return result
_get_nb_samples = libopus.opus_decoder_get_nb_samples
_get_nb_samples.argtypes = (DecoderPointer, ctypes.c_char_p, ctypes.c_int32)
_get_nb_samples.restype = ctypes.c_int
def get_nb_samples(decoder, packet, length):
result = _get_nb_samples(decoder, packet, length)
if result < 0:
raise OpusError(result)
return result
_decode = libopus.opus_decode
_decode.argtypes = (DecoderPointer, ctypes.c_char_p, ctypes.c_int32, c_int16_pointer, ctypes.c_int, ctypes.c_int)
_decode.restype = ctypes.c_int
def decode(decoder, data, length, frame_size, decode_fec, channels=2):
"""Decode an Opus frame
Unlike the `opus_decode` function , this function takes an additional parameter `channels`,
which indicates the number of channels in the frame
"""
pcm_size = frame_size * channels * ctypes.sizeof(ctypes.c_int16)
pcm = (ctypes.c_int16 * pcm_size)()
pcm_pointer = ctypes.cast(pcm, c_int16_pointer)
# Converting from a boolean to int
decode_fec = int(bool(decode_fec))
result = _decode(decoder, data, length, pcm_pointer, frame_size, decode_fec)
if result < 0:
raise OpusError(result)
return array.array('h', pcm[ :result * channels ]).tostring()
_decode_float = libopus.opus_decode_float
_decode_float.argtypes = (DecoderPointer, ctypes.c_char_p, ctypes.c_int32, c_float_pointer, ctypes.c_int, ctypes.c_int)
_decode_float.restype = ctypes.c_int
def decode_float(decoder, data, length, frame_size, decode_fec, channels=2):
pcm_size = frame_size * channels * ctypes.sizeof(ctypes.c_float)
pcm = (ctypes.c_float * pcm_size)()
pcm_pointer = ctypes.cast(pcm, c_float_pointer)
# Converting from a boolean to int
decode_fec = int(bool(decode_fec))
result = _decode_float(decoder, data, length, pcm_pointer, frame_size, decode_fec)
if result < 0:
raise OpusError(result)
return array.array('f', pcm[ : result * channels ]).tostring()
_ctl = libopus.opus_decoder_ctl
_ctl.restype = ctypes.c_int
def ctl(decoder, request, value=None):
if value is not None:
return request(_ctl, decoder, value)
return request(_ctl, decoder)
destroy = libopus.opus_decoder_destroy
destroy.argtypes = (DecoderPointer,)
destroy.restype = None
destroy.__doc__ = 'Frees an OpusDecoder allocated by opus_decoder_create()'