main
  1from __future__ import annotations
  2
  3import io
  4import base64
  5import asyncio
  6import threading
  7from typing import Callable, Awaitable
  8
  9import numpy as np
 10import pyaudio
 11import sounddevice as sd
 12from pydub import AudioSegment
 13
 14from openai.resources.realtime.realtime import AsyncRealtimeConnection
 15
 16CHUNK_LENGTH_S = 0.05  # 100ms
 17SAMPLE_RATE = 24000
 18FORMAT = pyaudio.paInt16
 19CHANNELS = 1
 20
 21# pyright: reportUnknownMemberType=false, reportUnknownVariableType=false, reportUnknownArgumentType=false
 22
 23
 24def audio_to_pcm16_base64(audio_bytes: bytes) -> bytes:
 25    # load the audio file from the byte stream
 26    audio = AudioSegment.from_file(io.BytesIO(audio_bytes))
 27    print(f"Loaded audio: {audio.frame_rate=} {audio.channels=} {audio.sample_width=} {audio.frame_width=}")
 28    # resample to 24kHz mono pcm16
 29    pcm_audio = audio.set_frame_rate(SAMPLE_RATE).set_channels(CHANNELS).set_sample_width(2).raw_data
 30    return pcm_audio
 31
 32
 33class AudioPlayerAsync:
 34    def __init__(self):
 35        self.queue = []
 36        self.lock = threading.Lock()
 37        self.stream = sd.OutputStream(
 38            callback=self.callback,
 39            samplerate=SAMPLE_RATE,
 40            channels=CHANNELS,
 41            dtype=np.int16,
 42            blocksize=int(CHUNK_LENGTH_S * SAMPLE_RATE),
 43        )
 44        self.playing = False
 45        self._frame_count = 0
 46
 47    def callback(self, outdata, frames, time, status):  # noqa
 48        with self.lock:
 49            data = np.empty(0, dtype=np.int16)
 50
 51            # get next item from queue if there is still space in the buffer
 52            while len(data) < frames and len(self.queue) > 0:
 53                item = self.queue.pop(0)
 54                frames_needed = frames - len(data)
 55                data = np.concatenate((data, item[:frames_needed]))
 56                if len(item) > frames_needed:
 57                    self.queue.insert(0, item[frames_needed:])
 58
 59            self._frame_count += len(data)
 60
 61            # fill the rest of the frames with zeros if there is no more data
 62            if len(data) < frames:
 63                data = np.concatenate((data, np.zeros(frames - len(data), dtype=np.int16)))
 64
 65        outdata[:] = data.reshape(-1, 1)
 66
 67    def reset_frame_count(self):
 68        self._frame_count = 0
 69
 70    def get_frame_count(self):
 71        return self._frame_count
 72
 73    def add_data(self, data: bytes):
 74        with self.lock:
 75            # bytes is pcm16 single channel audio data, convert to numpy array
 76            np_data = np.frombuffer(data, dtype=np.int16)
 77            self.queue.append(np_data)
 78            if not self.playing:
 79                self.start()
 80
 81    def start(self):
 82        self.playing = True
 83        self.stream.start()
 84
 85    def stop(self):
 86        self.playing = False
 87        self.stream.stop()
 88        with self.lock:
 89            self.queue = []
 90
 91    def terminate(self):
 92        self.stream.close()
 93
 94
 95async def send_audio_worker_sounddevice(
 96    connection: AsyncRealtimeConnection,
 97    should_send: Callable[[], bool] | None = None,
 98    start_send: Callable[[], Awaitable[None]] | None = None,
 99):
100    sent_audio = False
101
102    device_info = sd.query_devices()
103    print(device_info)
104
105    read_size = int(SAMPLE_RATE * 0.02)
106
107    stream = sd.InputStream(
108        channels=CHANNELS,
109        samplerate=SAMPLE_RATE,
110        dtype="int16",
111    )
112    stream.start()
113
114    try:
115        while True:
116            if stream.read_available < read_size:
117                await asyncio.sleep(0)
118                continue
119
120            data, _ = stream.read(read_size)
121
122            if should_send() if should_send else True:
123                if not sent_audio and start_send:
124                    await start_send()
125                await connection.send(
126                    {"type": "input_audio_buffer.append", "audio": base64.b64encode(data).decode("utf-8")}
127                )
128                sent_audio = True
129
130            elif sent_audio:
131                print("Done, triggering inference")
132                await connection.send({"type": "input_audio_buffer.commit"})
133                await connection.send({"type": "response.create", "response": {}})
134                sent_audio = False
135
136            await asyncio.sleep(0)
137
138    except KeyboardInterrupt:
139        pass
140    finally:
141        stream.stop()
142        stream.close()