Rust audio ALSA lib pcm.c:8306 (snd_pcm_recover) underrun occurred

  Kiến thức lập trình

I’m working on a Rust program that’s essentially a LAN walkie talkie, where I can speak into one computer and my voice will play out another on the same network. I’ve been using the cpal crate for audio recording/playback, audiopus for Opus encoding and decoding, ringbuf for a ring buffer, mpsc channels for communication between threads, and rtp_rs for packetization to RTP.

However, I have this issue where my voice is garbled and I get underruns on the output computer that appear repeatedly as the program runs:

ALSA lib pcm.c:8306:(snd_pcm_recover) underrun occurred
ALSA lib pcm.c:8306:(snd_pcm_recover) underrun occurred
ALSA lib pcm.c:8306:(snd_pcm_recover) underrun occurred
ALSA lib pcm.c:8306:(snd_pcm_recover) underrun occurred

I wrote 2 programs, one for recording and sending audio and another for receiving and playing audio.

Record/send side:

use audiopus::coder::Encoder;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use ringbuf::Rb;
use ringbuf::StaticRb;
use rtp_rs::*;
use std::net::UdpSocket;
use std::sync::mpsc;
use std::sync::mpsc::Receiver;
use std::thread;

/// A packet sent over the socket -> sent over thread channels
enum PacketData {
    Packet(Vec<u8>),
}

/// A raw audio sample that is passed to the buffering thread.
enum BufferData {
    Sample(Vec<f32>),
}

const OPUS_SAMPLE_RATE: audiopus::SampleRate = audiopus::SampleRate::Hz48000;
const OPUS_CHANNEL: audiopus::Channels = audiopus::Channels::Mono;
const OPUS_APPLICATION: audiopus::Application = audiopus::Application::LowDelay;
const OPUS_FRAME_SIZE: usize = 960;

// Record and Send Side

fn initialize_recording(
    buffer_thread_sender: mpsc::Sender<BufferData>,
) -> Result<cpal::Stream, cpal::StreamError> {
    let host = cpal::default_host();
    let input_device = host.default_input_device().unwrap();
    let config = cpal::StreamConfig {
        channels: 1,
        sample_rate: cpal::SampleRate(48000),
        buffer_size: cpal::BufferSize::Default,
    };
    println!(
        "Input Device: {}, Using config: {:?}",
        input_device.name().unwrap(),
        config
    );

    let stream = input_device
        .build_input_stream(
            &config.into(),
            move |data: &[f32], _: &_| {
                if let Err(err) = buffer_thread_sender.send(BufferData::Sample(data.to_vec())) {
                    eprintln!(
                        "Couldn't pass raw audio frame to the buffering thread: {}",
                        err
                    );
                    // std::process::exit(1)
                };
            },
            move |err| {
                eprintln!("There was a Stream Error: {:?}", err);
            },
            None,
        )
        .unwrap();

    Ok(stream)
}

fn encode_to_opus(samples: &[f32], encoder: &mut Encoder) -> Vec<u8> {
    let mut output: [u8; OPUS_FRAME_SIZE] = [0; OPUS_FRAME_SIZE];
    // let frame_size = 480;

    encoder.encode_float(&samples, &mut output).unwrap();
    output.to_vec()
}

fn packetize_rtp(opus_data: Vec<u8>, sequence_number: u16, timestamp: u32) -> Vec<u8> {
    let result: Vec<u8> = RtpPacketBuilder::new()
        .payload_type(111)
        .ssrc(1337)
        .sequence(Seq::from(sequence_number))
        .timestamp(timestamp)
        .padded(Pad::none())
        .marked(true)
        .payload(&opus_data)
        .build()
        .expect("Couldn't build packet");

    result
}

// Threading Code

fn network_receiver_thread(receiver: Receiver<PacketData>) {
    // Function that receives encoded audio sample messages from the other realtime thread.

    let mut sequence_number = 1;
    let mut timestamp = 1;

    let socket = UdpSocket::bind("0.0.0.0:37069").expect("Couldn't bind!");

    for message in receiver.iter() {
        match message {
            PacketData::Packet(packet) => {
                let rtp_packet: Vec<u8> = packetize_rtp(packet, sequence_number, timestamp);
                send_packet_over_udp(&socket, rtp_packet, "192.168.1.168:37069")
                    .expect("Couldn't send audio packet");
                sequence_number += 1;
                timestamp += 1
            }
        }
    }
}

fn buffer_receiver_thread(
    receiver: Receiver<BufferData>,
    network_thread_sender: mpsc::Sender<PacketData>,
) {
    // Function that receives raw audio samples directly from audio thread and fills the ringbuffer,
    // then passes it to the network thread to be sent

    let mut encoder = Encoder::new(OPUS_SAMPLE_RATE, OPUS_CHANNEL, OPUS_APPLICATION).unwrap();

    let mut rb: ringbuf::SharedRb<f32, [std::mem::MaybeUninit<f32>; OPUS_FRAME_SIZE]> =
        StaticRb::<f32, OPUS_FRAME_SIZE>::default(); // thread-safe ringbuffer
    let (mut prod, mut cons) = rb.split_ref();

    for message in receiver.iter() {
        match message {
            BufferData::Sample(sample) => {
                for number in sample {
                    if let Err(err) = prod.push(number) {
                        eprintln!("Couldn't push sample to the ring buffer: {:?}", err);
                        break;
                    }
                    if cons.is_full() {
                        let encoded_data: Vec<u8> = encode_to_opus(
                            &cons.rb().iter().cloned().collect::<Vec<f32>>(),
                            &mut encoder,
                        );
                        network_thread_sender
                            .send(PacketData::Packet(encoded_data))
                            .expect("Couldn't pass audio frame to the network thread");
                        cons.clear();
                    }
                }
            }
        }
    }
}

fn send_packet_over_udp(
    socket: &UdpSocket,
    packet: Vec<u8>,
    destination_ip: &str,
) -> Result<(), std::io::Error> {
    socket.send_to(&packet, destination_ip)?;

    Ok(())
}

fn main() {
    let (network_thread_sender, network_thread_receiver) = mpsc::channel::<PacketData>();
    let (buffer_thread_sender, buffer_thread_receiver) = mpsc::channel::<BufferData>();

    // Buffer processing thread
    thread::spawn(move || {
        buffer_receiver_thread(buffer_thread_receiver, network_thread_sender);
    });

    // Network thread
    thread::spawn(move || {
        network_receiver_thread(network_thread_receiver);
    });

    let recording_stream = initialize_recording(buffer_thread_sender).unwrap();
    recording_stream
        .play()
        .expect("Couldn't start recording...");

    thread::sleep(std::time::Duration::from_secs(100));
}

There are 3 threads at work here. the real time thread created by cpal‘s recording callback, then a buffer_receiver_thread that accepts samples and encodes them, then a network_receiver_thread that receives the samples from the previous thread and sends them over the socket. So we record a chunk -> encode -> packetize -> send.

And the play/receive side:

use audiopus::Result;
use audiopus::coder::Decoder;
use circular_buffer::CircularBuffer;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use std::net::UdpSocket;
 
use std::sync::mpsc;
 
 
enum PacketData {
    Packet(Vec<u8>)
}
 
const OPUS_SAMPLE_RATE: audiopus::SampleRate = audiopus::SampleRate::Hz48000;
const OPUS_CHANNEL: audiopus::Channels = audiopus::Channels::Mono;
 
const OPUS_RECIEVED_FRAME_SIZE: usize = 960;
 
fn initialize_playback(thread_receiver: mpsc::Receiver::<PacketData>) -> Result<cpal::Stream> {
    let host = cpal::default_host();
    let output_device = host.default_output_device().unwrap();
    let config = output_device.default_output_config().unwrap();
    
    println!("Output Device: {}, Using config: {:?}", output_device.name().unwrap(), config);
 
    let mut decoder = Decoder::new(OPUS_SAMPLE_RATE, OPUS_CHANNEL).expect("Couldn't make the decoder");
 
    // Use a ring buffer to store decoded audio data
    let mut ring_buffer: CircularBuffer<10000, f32> = CircularBuffer::<10000, f32>::new(); // 10000 just in case
 
    let stream = output_device.build_output_stream(
        &config.into(),
        move |data: &mut [f32], _: &_| {
 
            loop {
                if ring_buffer.len() > data.len() {
                    for (r, d) in ring_buffer.drain(..data.len()).zip(data){
                        *d = r;
                    }
                    break;
                }
 
                let received_packet: PacketData = thread_receiver.recv().expect("Couldn't get audio sample from network thread.");
 
            
                match received_packet {
                    PacketData::Packet(packet) => {
                        let decoded_data = decode_opus(&mut decoder, &packet).unwrap();
 
                        ring_buffer.extend_from_slice(&decoded_data);
                    }
                }
                
            }
 
        },
        move |err| {
            eprintln!("Audio streaming error: {:?}", err);
        },
        None
    ).unwrap();
 
    Ok(stream)
}
 
fn receive_packet_from_udp(socket: &UdpSocket) -> Result<Vec<u8>>{
 
 
    let mut buffer = [0; 4096];
    let (size, _) = socket.recv_from(&mut buffer).expect("Couldn't get anything from the socket");
 
    let rtp_packet = rtp_rs::RtpReader::new(&mut buffer).expect("Couldn't parse RTP");
    let opus_data = rtp_packet.payload();
 
    Ok(opus_data[..size].to_vec())
 
}
 
fn decode_opus(decoder: &mut Decoder, opus_data: &[u8]) -> std::io::Result<Vec<f32>> {
    let mut output = vec![0f32; OPUS_RECIEVED_FRAME_SIZE];
    let samples_decoded = decoder.decode_float(Some(opus_data), &mut output, false).expect("Couldn't decode");
    output.truncate(samples_decoded);
    Ok(output)
}
 
fn network_sender_thread(sender: mpsc::Sender<PacketData>, socket: &UdpSocket) {
 
    // Receives the audio data and sends the data to the realtime thread
 
    loop {
        let audio_data = receive_packet_from_udp(&socket).unwrap();
        sender.send(PacketData::Packet(audio_data.to_vec())).expect("Couldn't send to the playback thread");
    }
 
 
}
 
fn main() {
 
    let (network_sender, network_receiver) = mpsc::channel::<PacketData>();
 
    let socket: UdpSocket = UdpSocket::bind("0.0.0.0:37069").expect("Couldn't bind properly");
 
    std::thread::spawn(move || {
        network_sender_thread(network_sender, &socket);
    });
 
    let output_stream: cpal::Stream = initialize_playback(network_receiver).expect("Something went wrong with playback");
    output_stream.play().expect("Couldn't play. ");
    std::thread::sleep(std::time::Duration::from_secs(100));
}

here, the process is inverted. We have a network_sender_thread that receives audio data from the socket and puts it into the ring buffer. once the ring buffer is full, cpal‘s output thread decodes it and plays. So we receive sample -> decode -> play.

I made sure sample rates and buffer sizes are adjusted properly, and I also made sure that audio data was being encoded properly as well.

I’ve tested this on an M1 Mac as the recorder side and a HP Elitebook running Ubuntu as the player side.

What could be causing the garbled audio on the receiving side? How does one go about troubleshooting the “ALSA lib pcm.c:8306 (snd_pcm_recover) underrun occurred” error? Is there anything within my implementation of passing data between threads via mpsc channel that is causing this to go wrong?

LEAVE A COMMENT