main
1from __future__ import annotations
2
3import io
4import base64
5import asyncio
6import threading
7from typing import Callable, Awaitable
8
9import numpy as np
10import pyaudio
11import sounddevice as sd
12from pydub import AudioSegment
13
14from openai.resources.realtime.realtime import AsyncRealtimeConnection
15
16CHUNK_LENGTH_S = 0.05 # 100ms
17SAMPLE_RATE = 24000
18FORMAT = pyaudio.paInt16
19CHANNELS = 1
20
21# pyright: reportUnknownMemberType=false, reportUnknownVariableType=false, reportUnknownArgumentType=false
22
23
24def audio_to_pcm16_base64(audio_bytes: bytes) -> bytes:
25 # load the audio file from the byte stream
26 audio = AudioSegment.from_file(io.BytesIO(audio_bytes))
27 print(f"Loaded audio: {audio.frame_rate=} {audio.channels=} {audio.sample_width=} {audio.frame_width=}")
28 # resample to 24kHz mono pcm16
29 pcm_audio = audio.set_frame_rate(SAMPLE_RATE).set_channels(CHANNELS).set_sample_width(2).raw_data
30 return pcm_audio
31
32
33class AudioPlayerAsync:
34 def __init__(self):
35 self.queue = []
36 self.lock = threading.Lock()
37 self.stream = sd.OutputStream(
38 callback=self.callback,
39 samplerate=SAMPLE_RATE,
40 channels=CHANNELS,
41 dtype=np.int16,
42 blocksize=int(CHUNK_LENGTH_S * SAMPLE_RATE),
43 )
44 self.playing = False
45 self._frame_count = 0
46
47 def callback(self, outdata, frames, time, status): # noqa
48 with self.lock:
49 data = np.empty(0, dtype=np.int16)
50
51 # get next item from queue if there is still space in the buffer
52 while len(data) < frames and len(self.queue) > 0:
53 item = self.queue.pop(0)
54 frames_needed = frames - len(data)
55 data = np.concatenate((data, item[:frames_needed]))
56 if len(item) > frames_needed:
57 self.queue.insert(0, item[frames_needed:])
58
59 self._frame_count += len(data)
60
61 # fill the rest of the frames with zeros if there is no more data
62 if len(data) < frames:
63 data = np.concatenate((data, np.zeros(frames - len(data), dtype=np.int16)))
64
65 outdata[:] = data.reshape(-1, 1)
66
67 def reset_frame_count(self):
68 self._frame_count = 0
69
70 def get_frame_count(self):
71 return self._frame_count
72
73 def add_data(self, data: bytes):
74 with self.lock:
75 # bytes is pcm16 single channel audio data, convert to numpy array
76 np_data = np.frombuffer(data, dtype=np.int16)
77 self.queue.append(np_data)
78 if not self.playing:
79 self.start()
80
81 def start(self):
82 self.playing = True
83 self.stream.start()
84
85 def stop(self):
86 self.playing = False
87 self.stream.stop()
88 with self.lock:
89 self.queue = []
90
91 def terminate(self):
92 self.stream.close()
93
94
95async def send_audio_worker_sounddevice(
96 connection: AsyncRealtimeConnection,
97 should_send: Callable[[], bool] | None = None,
98 start_send: Callable[[], Awaitable[None]] | None = None,
99):
100 sent_audio = False
101
102 device_info = sd.query_devices()
103 print(device_info)
104
105 read_size = int(SAMPLE_RATE * 0.02)
106
107 stream = sd.InputStream(
108 channels=CHANNELS,
109 samplerate=SAMPLE_RATE,
110 dtype="int16",
111 )
112 stream.start()
113
114 try:
115 while True:
116 if stream.read_available < read_size:
117 await asyncio.sleep(0)
118 continue
119
120 data, _ = stream.read(read_size)
121
122 if should_send() if should_send else True:
123 if not sent_audio and start_send:
124 await start_send()
125 await connection.send(
126 {"type": "input_audio_buffer.append", "audio": base64.b64encode(data).decode("utf-8")}
127 )
128 sent_audio = True
129
130 elif sent_audio:
131 print("Done, triggering inference")
132 await connection.send({"type": "input_audio_buffer.commit"})
133 await connection.send({"type": "response.create", "response": {}})
134 sent_audio = False
135
136 await asyncio.sleep(0)
137
138 except KeyboardInterrupt:
139 pass
140 finally:
141 stream.stop()
142 stream.close()