main
  1#!/usr/bin/env uv run
  2####################################################################
  3# Sample TUI app with a push to talk interface to the Realtime API #
  4# If you have `uv` installed and the `OPENAI_API_KEY`              #
  5# environment variable set, you can run this example with just     #
  6#                                                                  #
  7# `./examples/realtime/push_to_talk_app.py`                        #
  8#                                                                  #
  9# On Mac, you'll also need `brew install portaudio ffmpeg`           #
 10####################################################################
 11#
 12# /// script
 13# requires-python = ">=3.9"
 14# dependencies = [
 15#     "textual",
 16#     "numpy",
 17#     "pyaudio",
 18#     "pydub",
 19#     "sounddevice",
 20#     "openai[realtime]",
 21# ]
 22#
 23# [tool.uv.sources]
 24# openai = { path = "../../", editable = true }
 25# ///
 26from __future__ import annotations
 27
 28import base64
 29import asyncio
 30from typing import Any, cast
 31from typing_extensions import override
 32
 33from textual import events
 34from audio_util import CHANNELS, SAMPLE_RATE, AudioPlayerAsync
 35from textual.app import App, ComposeResult
 36from textual.widgets import Button, Static, RichLog
 37from textual.reactive import reactive
 38from textual.containers import Container
 39
 40from openai import AsyncOpenAI
 41from openai.types.realtime.session import Session
 42from openai.resources.realtime.realtime import AsyncRealtimeConnection
 43
 44
 45class SessionDisplay(Static):
 46    """A widget that shows the current session ID."""
 47
 48    session_id = reactive("")
 49
 50    @override
 51    def render(self) -> str:
 52        return f"Session ID: {self.session_id}" if self.session_id else "Connecting..."
 53
 54
 55class AudioStatusIndicator(Static):
 56    """A widget that shows the current audio recording status."""
 57
 58    is_recording = reactive(False)
 59
 60    @override
 61    def render(self) -> str:
 62        status = (
 63            "🔴 Recording... (Press K to stop)" if self.is_recording else "⚪ Press K to start recording (Q to quit)"
 64        )
 65        return status
 66
 67
 68class RealtimeApp(App[None]):
 69    CSS = """
 70        Screen {
 71            background: #1a1b26;  /* Dark blue-grey background */
 72        }
 73
 74        Container {
 75            border: double rgb(91, 164, 91);
 76        }
 77
 78        Horizontal {
 79            width: 100%;
 80        }
 81
 82        #input-container {
 83            height: 5;  /* Explicit height for input container */
 84            margin: 1 1;
 85            padding: 1 2;
 86        }
 87
 88        Input {
 89            width: 80%;
 90            height: 3;  /* Explicit height for input */
 91        }
 92
 93        Button {
 94            width: 20%;
 95            height: 3;  /* Explicit height for button */
 96        }
 97
 98        #bottom-pane {
 99            width: 100%;
100            height: 82%;  /* Reduced to make room for session display */
101            border: round rgb(205, 133, 63);
102            content-align: center middle;
103        }
104
105        #status-indicator {
106            height: 3;
107            content-align: center middle;
108            background: #2a2b36;
109            border: solid rgb(91, 164, 91);
110            margin: 1 1;
111        }
112
113        #session-display {
114            height: 3;
115            content-align: center middle;
116            background: #2a2b36;
117            border: solid rgb(91, 164, 91);
118            margin: 1 1;
119        }
120
121        Static {
122            color: white;
123        }
124    """
125
126    client: AsyncOpenAI
127    should_send_audio: asyncio.Event
128    audio_player: AudioPlayerAsync
129    last_audio_item_id: str | None
130    connection: AsyncRealtimeConnection | None
131    session: Session | None
132    connected: asyncio.Event
133
134    def __init__(self) -> None:
135        super().__init__()
136        self.connection = None
137        self.session = None
138        self.client = AsyncOpenAI()
139        self.audio_player = AudioPlayerAsync()
140        self.last_audio_item_id = None
141        self.should_send_audio = asyncio.Event()
142        self.connected = asyncio.Event()
143
144    @override
145    def compose(self) -> ComposeResult:
146        """Create child widgets for the app."""
147        with Container():
148            yield SessionDisplay(id="session-display")
149            yield AudioStatusIndicator(id="status-indicator")
150            yield RichLog(id="bottom-pane", wrap=True, highlight=True, markup=True)
151
152    async def on_mount(self) -> None:
153        self.run_worker(self.handle_realtime_connection())
154        self.run_worker(self.send_mic_audio())
155
156    async def handle_realtime_connection(self) -> None:
157        async with self.client.realtime.connect(model="gpt-realtime") as conn:
158            self.connection = conn
159            self.connected.set()
160
161            # note: this is the default and can be omitted
162            # if you want to manually handle VAD yourself, then set `'turn_detection': None`
163            await conn.session.update(
164                session={
165                    "audio": {
166                        "input": {"turn_detection": {"type": "server_vad"}},
167                    },
168                    "model": "gpt-realtime",
169                    "type": "realtime",
170                }
171            )
172
173            acc_items: dict[str, Any] = {}
174
175            async for event in conn:
176                if event.type == "session.created":
177                    self.session = event.session
178                    session_display = self.query_one(SessionDisplay)
179                    assert event.session.id is not None
180                    session_display.session_id = event.session.id
181                    continue
182
183                if event.type == "session.updated":
184                    self.session = event.session
185                    continue
186
187                if event.type == "response.output_audio.delta":
188                    if event.item_id != self.last_audio_item_id:
189                        self.audio_player.reset_frame_count()
190                        self.last_audio_item_id = event.item_id
191
192                    bytes_data = base64.b64decode(event.delta)
193                    self.audio_player.add_data(bytes_data)
194                    continue
195
196                if event.type == "response.output_audio_transcript.delta":
197                    try:
198                        text = acc_items[event.item_id]
199                    except KeyError:
200                        acc_items[event.item_id] = event.delta
201                    else:
202                        acc_items[event.item_id] = text + event.delta
203
204                    # Clear and update the entire content because RichLog otherwise treats each delta as a new line
205                    bottom_pane = self.query_one("#bottom-pane", RichLog)
206                    bottom_pane.clear()
207                    bottom_pane.write(acc_items[event.item_id])
208                    continue
209
210    async def _get_connection(self) -> AsyncRealtimeConnection:
211        await self.connected.wait()
212        assert self.connection is not None
213        return self.connection
214
215    async def send_mic_audio(self) -> None:
216        import sounddevice as sd  # type: ignore
217
218        sent_audio = False
219
220        device_info = sd.query_devices()
221        print(device_info)
222
223        read_size = int(SAMPLE_RATE * 0.02)
224
225        stream = sd.InputStream(
226            channels=CHANNELS,
227            samplerate=SAMPLE_RATE,
228            dtype="int16",
229        )
230        stream.start()
231
232        status_indicator = self.query_one(AudioStatusIndicator)
233
234        try:
235            while True:
236                if stream.read_available < read_size:
237                    await asyncio.sleep(0)
238                    continue
239
240                await self.should_send_audio.wait()
241                status_indicator.is_recording = True
242
243                data, _ = stream.read(read_size)
244
245                connection = await self._get_connection()
246                if not sent_audio:
247                    asyncio.create_task(connection.send({"type": "response.cancel"}))
248                    sent_audio = True
249
250                await connection.input_audio_buffer.append(audio=base64.b64encode(cast(Any, data)).decode("utf-8"))
251
252                await asyncio.sleep(0)
253        except KeyboardInterrupt:
254            pass
255        finally:
256            stream.stop()
257            stream.close()
258
259    async def on_key(self, event: events.Key) -> None:
260        """Handle key press events."""
261        if event.key == "enter":
262            self.query_one(Button).press()
263            return
264
265        if event.key == "q":
266            self.exit()
267            return
268
269        if event.key == "k":
270            status_indicator = self.query_one(AudioStatusIndicator)
271            if status_indicator.is_recording:
272                self.should_send_audio.clear()
273                status_indicator.is_recording = False
274
275                if self.session and self.session.turn_detection is None:
276                    # The default in the API is that the model will automatically detect when the user has
277                    # stopped talking and then start responding itself.
278                    #
279                    # However if we're in manual `turn_detection` mode then we need to
280                    # manually tell the model to commit the audio buffer and start responding.
281                    conn = await self._get_connection()
282                    await conn.input_audio_buffer.commit()
283                    await conn.response.create()
284            else:
285                self.should_send_audio.set()
286                status_indicator.is_recording = True
287
288
289if __name__ == "__main__":
290    app = RealtimeApp()
291    app.run()