I typed a code that listens to 8 USB microphone inputs simultaneously on the Raspberry Pi 4 B 4GB.
It ran fine on the PC, but only partially ran on the Raspberry Pi.
I confirmed that multiprocess and multithreading worked well, but only 3 microphones were streamed in each thread.
And when I turned off the working microphone, the other microphone started running, maintaining three streams.
8 USB microphones were connected to the Raspberry Pi using 2 4-port hubs.
Coding was done in Python 3.9.11 and the sounddevice was used.
Since the sound input to the microphone had to be received almost simultaneously,
3 multiprocesses were run and 3 multithreads were run in each process.
Each thread opened a stream with a sounddevice to the microphone's input and read the data.
Please tell me how I can increase the number of streams to 8. I desperately need help.
import sounddevice as sd
from threading import Thread
from multiprocessing import Process, Queue
from collections import deque
import time
import copy
SAMPLE_RATE = 48000
CHANNELS = 1
SPLIT_TIME = 1
MAX_MIC = 2
def initialize():
# find mic to use
mic_all = sd.query_devices()
mic_indices = []
for mic in mic_all:
if 'AB13X USB Audio' in mic['name'] and mic['hostapi'] == 0 and mic['max_input_channels'] > 0:
mic_indices.append(mic['index']) # mic index
return [mic_indices[i:i+MAX_MIC] for i in range(0, len(mic_indices), MAX_MIC)], mic_indices
def run(que, mic_list):
# Deque declaration according to number of microphones
dques = []
for mic_index in mic_list:
dques.append(deque())
# Thread declaration according to number of microphones
thrds = []
for mic_index, dqu in zip(mic_list, dques):
thrd = Thread(target=open_stream, args=(mic_index, dqu), daemon=True)
thrds.append(thrd)
# Run threads as simultaneously as possible
for thrd in thrds:
thrd.start()
while True:
is_all_existed = True
# Check deque
for dqu in dques:
if not dqu:
is_all_existed = False
if is_all_existed:
for dqu in dques:
data = dqu.popleft()
que.put(data)
time.sleep(0.1)
def open_stream(device_index, dqu):
stream = sd.InputStream(device=device_index, samplerate= SAMPLE_RATE, channels=CHANNELS, dtype='int16', latency=True)
stream.start()
while True:
audio_data, overflowed = stream.read(int(SAMPLE_RATE * SPLIT_TIME))
audio_data = audio_data.reshape(-1,)
now = time.time()
dqu.append([device_index, audio_data, now])
async def main():
mic_slice, mic_indices = initialize()
# Queue declaration for multiprocessing
que = Queue()
procs = []
for mic_list in mic_slice:
proc = Process(target=run, args=(que, mic_list), daemon=True)
procs.append(proc)
# Run processes as simultaneously as possible
for proc in procs:
proc.start()
# Main Loop
try:
stop_cnt = 0 # TODO Counter for stopping, deleting after development
while True:
stop_cnt += 1
if stop_cnt > 600:
break
# Get data from queue
if que.qsize() >= len(mic_indices): # TODO Code to be improved in the future
mic_data = {}
time_data = {}
for i in range(len(mic_indices)):
data = que.get()
mic_data[data[0]] = data[1]
time_data[data[0]] = data[2]
if len(mic_data.keys()) == len(time_data.keys()) == len(mic_indices):
for mic_index in mic_indices:
mic_data_list = mic_data[mic_index].tolist()
result = {mic_index: copy.deepcopy(mic_data_list)}
# Data post-processing, omitted below
else:
time.sleep(0.1)
except KeyboardInterrupt:
print('System off')