main
1#!/usr/bin/env uv run
2####################################################################
3# Sample TUI app with a push to talk interface to the Realtime API #
4# If you have `uv` installed and the `OPENAI_API_KEY` #
5# environment variable set, you can run this example with just #
6# #
7# `./examples/realtime/push_to_talk_app.py` #
8# #
9# On Mac, you'll also need `brew install portaudio ffmpeg` #
10####################################################################
11#
12# /// script
13# requires-python = ">=3.9"
14# dependencies = [
15# "textual",
16# "numpy",
17# "pyaudio",
18# "pydub",
19# "sounddevice",
20# "openai[realtime]",
21# ]
22#
23# [tool.uv.sources]
24# openai = { path = "../../", editable = true }
25# ///
26from __future__ import annotations
27
28import base64
29import asyncio
30from typing import Any, cast
31from typing_extensions import override
32
33from textual import events
34from audio_util import CHANNELS, SAMPLE_RATE, AudioPlayerAsync
35from textual.app import App, ComposeResult
36from textual.widgets import Button, Static, RichLog
37from textual.reactive import reactive
38from textual.containers import Container
39
40from openai import AsyncOpenAI
41from openai.types.realtime.session import Session
42from openai.resources.realtime.realtime import AsyncRealtimeConnection
43
44
45class SessionDisplay(Static):
46 """A widget that shows the current session ID."""
47
48 session_id = reactive("")
49
50 @override
51 def render(self) -> str:
52 return f"Session ID: {self.session_id}" if self.session_id else "Connecting..."
53
54
55class AudioStatusIndicator(Static):
56 """A widget that shows the current audio recording status."""
57
58 is_recording = reactive(False)
59
60 @override
61 def render(self) -> str:
62 status = (
63 "🔴 Recording... (Press K to stop)" if self.is_recording else "⚪ Press K to start recording (Q to quit)"
64 )
65 return status
66
67
68class RealtimeApp(App[None]):
69 CSS = """
70 Screen {
71 background: #1a1b26; /* Dark blue-grey background */
72 }
73
74 Container {
75 border: double rgb(91, 164, 91);
76 }
77
78 Horizontal {
79 width: 100%;
80 }
81
82 #input-container {
83 height: 5; /* Explicit height for input container */
84 margin: 1 1;
85 padding: 1 2;
86 }
87
88 Input {
89 width: 80%;
90 height: 3; /* Explicit height for input */
91 }
92
93 Button {
94 width: 20%;
95 height: 3; /* Explicit height for button */
96 }
97
98 #bottom-pane {
99 width: 100%;
100 height: 82%; /* Reduced to make room for session display */
101 border: round rgb(205, 133, 63);
102 content-align: center middle;
103 }
104
105 #status-indicator {
106 height: 3;
107 content-align: center middle;
108 background: #2a2b36;
109 border: solid rgb(91, 164, 91);
110 margin: 1 1;
111 }
112
113 #session-display {
114 height: 3;
115 content-align: center middle;
116 background: #2a2b36;
117 border: solid rgb(91, 164, 91);
118 margin: 1 1;
119 }
120
121 Static {
122 color: white;
123 }
124 """
125
126 client: AsyncOpenAI
127 should_send_audio: asyncio.Event
128 audio_player: AudioPlayerAsync
129 last_audio_item_id: str | None
130 connection: AsyncRealtimeConnection | None
131 session: Session | None
132 connected: asyncio.Event
133
134 def __init__(self) -> None:
135 super().__init__()
136 self.connection = None
137 self.session = None
138 self.client = AsyncOpenAI()
139 self.audio_player = AudioPlayerAsync()
140 self.last_audio_item_id = None
141 self.should_send_audio = asyncio.Event()
142 self.connected = asyncio.Event()
143
144 @override
145 def compose(self) -> ComposeResult:
146 """Create child widgets for the app."""
147 with Container():
148 yield SessionDisplay(id="session-display")
149 yield AudioStatusIndicator(id="status-indicator")
150 yield RichLog(id="bottom-pane", wrap=True, highlight=True, markup=True)
151
152 async def on_mount(self) -> None:
153 self.run_worker(self.handle_realtime_connection())
154 self.run_worker(self.send_mic_audio())
155
156 async def handle_realtime_connection(self) -> None:
157 async with self.client.realtime.connect(model="gpt-realtime") as conn:
158 self.connection = conn
159 self.connected.set()
160
161 # note: this is the default and can be omitted
162 # if you want to manually handle VAD yourself, then set `'turn_detection': None`
163 await conn.session.update(
164 session={
165 "audio": {
166 "input": {"turn_detection": {"type": "server_vad"}},
167 },
168 "model": "gpt-realtime",
169 "type": "realtime",
170 }
171 )
172
173 acc_items: dict[str, Any] = {}
174
175 async for event in conn:
176 if event.type == "session.created":
177 self.session = event.session
178 session_display = self.query_one(SessionDisplay)
179 assert event.session.id is not None
180 session_display.session_id = event.session.id
181 continue
182
183 if event.type == "session.updated":
184 self.session = event.session
185 continue
186
187 if event.type == "response.output_audio.delta":
188 if event.item_id != self.last_audio_item_id:
189 self.audio_player.reset_frame_count()
190 self.last_audio_item_id = event.item_id
191
192 bytes_data = base64.b64decode(event.delta)
193 self.audio_player.add_data(bytes_data)
194 continue
195
196 if event.type == "response.output_audio_transcript.delta":
197 try:
198 text = acc_items[event.item_id]
199 except KeyError:
200 acc_items[event.item_id] = event.delta
201 else:
202 acc_items[event.item_id] = text + event.delta
203
204 # Clear and update the entire content because RichLog otherwise treats each delta as a new line
205 bottom_pane = self.query_one("#bottom-pane", RichLog)
206 bottom_pane.clear()
207 bottom_pane.write(acc_items[event.item_id])
208 continue
209
210 async def _get_connection(self) -> AsyncRealtimeConnection:
211 await self.connected.wait()
212 assert self.connection is not None
213 return self.connection
214
215 async def send_mic_audio(self) -> None:
216 import sounddevice as sd # type: ignore
217
218 sent_audio = False
219
220 device_info = sd.query_devices()
221 print(device_info)
222
223 read_size = int(SAMPLE_RATE * 0.02)
224
225 stream = sd.InputStream(
226 channels=CHANNELS,
227 samplerate=SAMPLE_RATE,
228 dtype="int16",
229 )
230 stream.start()
231
232 status_indicator = self.query_one(AudioStatusIndicator)
233
234 try:
235 while True:
236 if stream.read_available < read_size:
237 await asyncio.sleep(0)
238 continue
239
240 await self.should_send_audio.wait()
241 status_indicator.is_recording = True
242
243 data, _ = stream.read(read_size)
244
245 connection = await self._get_connection()
246 if not sent_audio:
247 asyncio.create_task(connection.send({"type": "response.cancel"}))
248 sent_audio = True
249
250 await connection.input_audio_buffer.append(audio=base64.b64encode(cast(Any, data)).decode("utf-8"))
251
252 await asyncio.sleep(0)
253 except KeyboardInterrupt:
254 pass
255 finally:
256 stream.stop()
257 stream.close()
258
259 async def on_key(self, event: events.Key) -> None:
260 """Handle key press events."""
261 if event.key == "enter":
262 self.query_one(Button).press()
263 return
264
265 if event.key == "q":
266 self.exit()
267 return
268
269 if event.key == "k":
270 status_indicator = self.query_one(AudioStatusIndicator)
271 if status_indicator.is_recording:
272 self.should_send_audio.clear()
273 status_indicator.is_recording = False
274
275 if self.session and self.session.turn_detection is None:
276 # The default in the API is that the model will automatically detect when the user has
277 # stopped talking and then start responding itself.
278 #
279 # However if we're in manual `turn_detection` mode then we need to
280 # manually tell the model to commit the audio buffer and start responding.
281 conn = await self._get_connection()
282 await conn.input_audio_buffer.commit()
283 await conn.response.create()
284 else:
285 self.should_send_audio.set()
286 status_indicator.is_recording = True
287
288
289if __name__ == "__main__":
290 app = RealtimeApp()
291 app.run()