188 lines
5.0 KiB
Python
188 lines
5.0 KiB
Python
|
# -*- coding: utf-8 -*-
|
||
|
|
||
|
import array
|
||
|
import ctypes
|
||
|
|
||
|
from opus.api import libopus, c_int_pointer, c_int16_pointer, c_float_pointer
|
||
|
from opus.exceptions import OpusError
|
||
|
|
||
|
|
||
|
class Decoder(ctypes.Structure):
|
||
|
"""Opus decoder state.
|
||
|
|
||
|
This contains the complete state of an Opus decoder.
|
||
|
"""
|
||
|
|
||
|
pass
|
||
|
|
||
|
DecoderPointer = ctypes.POINTER(Decoder)
|
||
|
|
||
|
|
||
|
get_size = libopus.opus_decoder_get_size
|
||
|
get_size.argtypes = (ctypes.c_int,)
|
||
|
get_size.restype = ctypes.c_int
|
||
|
get_size.__doc__ = 'Gets the size of an OpusDecoder structure'
|
||
|
|
||
|
|
||
|
_create = libopus.opus_decoder_create
|
||
|
_create.argtypes = (ctypes.c_int, ctypes.c_int, c_int_pointer)
|
||
|
_create.restype = DecoderPointer
|
||
|
|
||
|
|
||
|
def create(fs, channels):
|
||
|
"""Allocates and initializes a decoder state"""
|
||
|
|
||
|
result_code = ctypes.c_int()
|
||
|
|
||
|
result = _create(fs, channels, ctypes.byref(result_code))
|
||
|
if result_code.value is not 0:
|
||
|
raise OpusError(result_code.value)
|
||
|
|
||
|
return result
|
||
|
|
||
|
|
||
|
_packet_get_bandwidth = libopus.opus_packet_get_bandwidth
|
||
|
_packet_get_bandwidth.argtypes = (ctypes.c_char_p,)
|
||
|
_packet_get_bandwidth.restype = ctypes.c_int
|
||
|
|
||
|
|
||
|
def packet_get_bandwidth(data):
|
||
|
"""Gets the bandwidth of an Opus packet."""
|
||
|
|
||
|
data_pointer = ctypes.c_char_p(data)
|
||
|
|
||
|
result = _packet_get_bandwidth(data_pointer)
|
||
|
if result < 0:
|
||
|
raise OpusError(result)
|
||
|
|
||
|
return result
|
||
|
|
||
|
|
||
|
_packet_get_nb_channels = libopus.opus_packet_get_nb_channels
|
||
|
_packet_get_nb_channels.argtypes = (ctypes.c_char_p,)
|
||
|
_packet_get_nb_channels.restype = ctypes.c_int
|
||
|
|
||
|
|
||
|
def packet_get_nb_channels(data):
|
||
|
"""Gets the number of channels from an Opus packet"""
|
||
|
|
||
|
data_pointer = ctypes.c_char_p(data)
|
||
|
|
||
|
result = _packet_get_nb_channels(data_pointer)
|
||
|
if result < 0:
|
||
|
raise OpusError(result)
|
||
|
|
||
|
return result
|
||
|
|
||
|
|
||
|
_packet_get_nb_frames = libopus.opus_packet_get_nb_frames
|
||
|
_packet_get_nb_frames.argtypes = (ctypes.c_char_p, ctypes.c_int)
|
||
|
_packet_get_nb_frames.restype = ctypes.c_int
|
||
|
|
||
|
|
||
|
def packet_get_nb_frames(data, length=None):
|
||
|
"""Gets the number of frames in an Opus packet"""
|
||
|
|
||
|
data_pointer = ctypes.c_char_p(data)
|
||
|
if length is None:
|
||
|
length = len(data)
|
||
|
|
||
|
result = _packet_get_nb_frames(data_pointer, ctypes.c_int(length))
|
||
|
if result < 0:
|
||
|
raise OpusError(result)
|
||
|
|
||
|
return result
|
||
|
|
||
|
|
||
|
_packet_get_samples_per_frame = libopus.opus_packet_get_samples_per_frame
|
||
|
_packet_get_samples_per_frame.argtypes = (ctypes.c_char_p, ctypes.c_int)
|
||
|
_packet_get_samples_per_frame.restype = ctypes.c_int
|
||
|
|
||
|
|
||
|
def packet_get_samples_per_frame(data, fs):
|
||
|
"""Gets the number of samples per frame from an Opus packet"""
|
||
|
|
||
|
data_pointer = ctypes.c_char_p(data)
|
||
|
|
||
|
result = _packet_get_nb_frames(data_pointer, ctypes.c_int(fs))
|
||
|
if result < 0:
|
||
|
raise OpusError(result)
|
||
|
|
||
|
return result
|
||
|
|
||
|
|
||
|
_get_nb_samples = libopus.opus_decoder_get_nb_samples
|
||
|
_get_nb_samples.argtypes = (DecoderPointer, ctypes.c_char_p, ctypes.c_int32)
|
||
|
_get_nb_samples.restype = ctypes.c_int
|
||
|
|
||
|
|
||
|
def get_nb_samples(decoder, packet, length):
|
||
|
result = _get_nb_samples(decoder, packet, length)
|
||
|
if result < 0:
|
||
|
raise OpusError(result)
|
||
|
|
||
|
return result
|
||
|
|
||
|
|
||
|
_decode = libopus.opus_decode
|
||
|
_decode.argtypes = (DecoderPointer, ctypes.c_char_p, ctypes.c_int32, c_int16_pointer, ctypes.c_int, ctypes.c_int)
|
||
|
_decode.restype = ctypes.c_int
|
||
|
|
||
|
|
||
|
def decode(decoder, data, length, frame_size, decode_fec, channels=2):
|
||
|
"""Decode an Opus frame
|
||
|
|
||
|
Unlike the `opus_decode` function , this function takes an additional parameter `channels`,
|
||
|
which indicates the number of channels in the frame
|
||
|
"""
|
||
|
|
||
|
pcm_size = frame_size * channels * ctypes.sizeof(ctypes.c_int16)
|
||
|
pcm = (ctypes.c_int16 * pcm_size)()
|
||
|
pcm_pointer = ctypes.cast(pcm, c_int16_pointer)
|
||
|
|
||
|
# Converting from a boolean to int
|
||
|
decode_fec = int(bool(decode_fec))
|
||
|
|
||
|
result = _decode(decoder, data, length, pcm_pointer, frame_size, decode_fec)
|
||
|
if result < 0:
|
||
|
raise OpusError(result)
|
||
|
|
||
|
return array.array('h', pcm[ :result * channels ]).tostring()
|
||
|
|
||
|
|
||
|
_decode_float = libopus.opus_decode_float
|
||
|
_decode_float.argtypes = (DecoderPointer, ctypes.c_char_p, ctypes.c_int32, c_float_pointer, ctypes.c_int, ctypes.c_int)
|
||
|
_decode_float.restype = ctypes.c_int
|
||
|
|
||
|
|
||
|
def decode_float(decoder, data, length, frame_size, decode_fec, channels=2):
|
||
|
pcm_size = frame_size * channels * ctypes.sizeof(ctypes.c_float)
|
||
|
pcm = (ctypes.c_float * pcm_size)()
|
||
|
pcm_pointer = ctypes.cast(pcm, c_float_pointer)
|
||
|
|
||
|
# Converting from a boolean to int
|
||
|
decode_fec = int(bool(decode_fec))
|
||
|
|
||
|
result = _decode_float(decoder, data, length, pcm_pointer, frame_size, decode_fec)
|
||
|
if result < 0:
|
||
|
raise OpusError(result)
|
||
|
|
||
|
return array.array('f', pcm[ : result * channels ]).tostring()
|
||
|
|
||
|
|
||
|
_ctl = libopus.opus_decoder_ctl
|
||
|
_ctl.restype = ctypes.c_int
|
||
|
|
||
|
|
||
|
def ctl(decoder, request, value=None):
|
||
|
if value is not None:
|
||
|
return request(_ctl, decoder, value)
|
||
|
|
||
|
return request(_ctl, decoder)
|
||
|
|
||
|
|
||
|
destroy = libopus.opus_decoder_destroy
|
||
|
destroy.argtypes = (DecoderPointer,)
|
||
|
destroy.restype = None
|
||
|
destroy.__doc__ = 'Frees an OpusDecoder allocated by opus_decoder_create()'
|