Only 3 microphone streams are open on Raspberry Pi 4

81 Views Asked by At

I typed a code that listens to 8 USB microphone inputs simultaneously on the Raspberry Pi 4 B 4GB.
It ran fine on the PC, but only partially ran on the Raspberry Pi.
I confirmed that multiprocess and multithreading worked well, but only 3 microphones were streamed in each thread.
And when I turned off the working microphone, the other microphone started running, maintaining three streams.

8 USB microphones were connected to the Raspberry Pi using 2 4-port hubs.
Coding was done in Python 3.9.11 and the sounddevice was used.
Since the sound input to the microphone had to be received almost simultaneously, 3 multiprocesses were run and 3 multithreads were run in each process.
Each thread opened a stream with a sounddevice to the microphone's input and read the data.
Please tell me how I can increase the number of streams to 8. I desperately need help.

import sounddevice as sd
from threading import Thread
from multiprocessing import Process, Queue
from collections import deque
import time
import copy

SAMPLE_RATE = 48000
CHANNELS = 1
SPLIT_TIME = 1
MAX_MIC = 2

def initialize():
    # find mic to use
    mic_all = sd.query_devices()

    mic_indices = []
    for mic in mic_all:
        if 'AB13X USB Audio' in mic['name'] and mic['hostapi'] == 0 and mic['max_input_channels'] > 0:
            mic_indices.append(mic['index']) # mic index

    return [mic_indices[i:i+MAX_MIC] for i in range(0, len(mic_indices), MAX_MIC)], mic_indices

def run(que, mic_list):
    # Deque declaration according to number of microphones
    dques = []
    for mic_index in mic_list:
        dques.append(deque())

    # Thread declaration according to number of microphones
    thrds = []
    for mic_index, dqu in zip(mic_list, dques):
        thrd = Thread(target=open_stream, args=(mic_index, dqu), daemon=True)
        thrds.append(thrd)

    # Run threads as simultaneously as possible
    for thrd in thrds:
        thrd.start()
    
    while True:
        is_all_existed = True
        # Check deque
        for dqu in dques:
            if not dqu:
                is_all_existed = False
        
        if is_all_existed:
            for dqu in dques:
                data = dqu.popleft()
                que.put(data)
        time.sleep(0.1)

def open_stream(device_index, dqu):
    stream = sd.InputStream(device=device_index, samplerate= SAMPLE_RATE, channels=CHANNELS, dtype='int16', latency=True)
    stream.start()

    while True:
        audio_data, overflowed = stream.read(int(SAMPLE_RATE * SPLIT_TIME))
        audio_data = audio_data.reshape(-1,)
        now = time.time()
        dqu.append([device_index, audio_data, now])

async def main():
    mic_slice, mic_indices = initialize()

    # Queue declaration for multiprocessing
    que = Queue()
    procs = []
    for mic_list in mic_slice:
        proc = Process(target=run, args=(que, mic_list), daemon=True)
        procs.append(proc)
    
    # Run processes as simultaneously as possible
    for proc in procs:
        proc.start()

    # Main Loop
    try:
        stop_cnt = 0 # TODO Counter for stopping, deleting after development
        while True:
            stop_cnt += 1
            if stop_cnt > 600:
                break
            
            # Get data from queue
            if que.qsize() >= len(mic_indices): # TODO Code to be improved in the future
                mic_data = {}
                time_data = {}
                for i in range(len(mic_indices)):
                    data = que.get()
                    mic_data[data[0]] = data[1]
                    time_data[data[0]] = data[2]

                if len(mic_data.keys()) == len(time_data.keys()) == len(mic_indices):
                    for mic_index in mic_indices:
                        mic_data_list = mic_data[mic_index].tolist()
                        
                        result = {mic_index: copy.deepcopy(mic_data_list)}
                        
                    # Data post-processing, omitted below
                        
            else:
                time.sleep(0.1)
    except KeyboardInterrupt:
        print('System off')
0

There are 0 best solutions below