main
1from __future__ import annotations
2
3import os
4import inspect
5from typing import Any, Union, Mapping, TypeVar, Callable, Awaitable, cast, overload
6from typing_extensions import Self, override
7
8import httpx
9
10from .._types import NOT_GIVEN, Omit, Query, Timeout, NotGiven
11from .._utils import is_given, is_mapping
12from .._client import OpenAI, AsyncOpenAI
13from .._compat import model_copy
14from .._models import FinalRequestOptions
15from .._streaming import Stream, AsyncStream
16from .._exceptions import OpenAIError
17from .._base_client import DEFAULT_MAX_RETRIES, BaseClient
18
19_deployments_endpoints = set(
20 [
21 "/completions",
22 "/chat/completions",
23 "/embeddings",
24 "/audio/transcriptions",
25 "/audio/translations",
26 "/audio/speech",
27 "/images/generations",
28 "/images/edits",
29 ]
30)
31
32
33AzureADTokenProvider = Callable[[], str]
34AsyncAzureADTokenProvider = Callable[[], "str | Awaitable[str]"]
35_HttpxClientT = TypeVar("_HttpxClientT", bound=Union[httpx.Client, httpx.AsyncClient])
36_DefaultStreamT = TypeVar("_DefaultStreamT", bound=Union[Stream[Any], AsyncStream[Any]])
37
38
39# we need to use a sentinel API key value for Azure AD
40# as we don't want to make the `api_key` in the main client Optional
41# and Azure AD tokens may be retrieved on a per-request basis
42API_KEY_SENTINEL = "".join(["<", "missing API key", ">"])
43
44
45class MutuallyExclusiveAuthError(OpenAIError):
46 def __init__(self) -> None:
47 super().__init__(
48 "The `api_key`, `azure_ad_token` and `azure_ad_token_provider` arguments are mutually exclusive; Only one can be passed at a time"
49 )
50
51
52class BaseAzureClient(BaseClient[_HttpxClientT, _DefaultStreamT]):
53 _azure_endpoint: httpx.URL | None
54 _azure_deployment: str | None
55
56 @override
57 def _build_request(
58 self,
59 options: FinalRequestOptions,
60 *,
61 retries_taken: int = 0,
62 ) -> httpx.Request:
63 if options.url in _deployments_endpoints and is_mapping(options.json_data):
64 model = options.json_data.get("model")
65 if model is not None and "/deployments" not in str(self.base_url.path):
66 options.url = f"/deployments/{model}{options.url}"
67
68 return super()._build_request(options, retries_taken=retries_taken)
69
70 @override
71 def _prepare_url(self, url: str) -> httpx.URL:
72 """Adjust the URL if the client was configured with an Azure endpoint + deployment
73 and the API feature being called is **not** a deployments-based endpoint
74 (i.e. requires /deployments/deployment-name in the URL path).
75 """
76 if self._azure_deployment and self._azure_endpoint and url not in _deployments_endpoints:
77 merge_url = httpx.URL(url)
78 if merge_url.is_relative_url:
79 merge_raw_path = (
80 self._azure_endpoint.raw_path.rstrip(b"/") + b"/openai/" + merge_url.raw_path.lstrip(b"/")
81 )
82 return self._azure_endpoint.copy_with(raw_path=merge_raw_path)
83
84 return merge_url
85
86 return super()._prepare_url(url)
87
88
89class AzureOpenAI(BaseAzureClient[httpx.Client, Stream[Any]], OpenAI):
90 @overload
91 def __init__(
92 self,
93 *,
94 azure_endpoint: str,
95 azure_deployment: str | None = None,
96 api_version: str | None = None,
97 api_key: str | Callable[[], str] | None = None,
98 azure_ad_token: str | None = None,
99 azure_ad_token_provider: AzureADTokenProvider | None = None,
100 organization: str | None = None,
101 webhook_secret: str | None = None,
102 websocket_base_url: str | httpx.URL | None = None,
103 timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
104 max_retries: int = DEFAULT_MAX_RETRIES,
105 default_headers: Mapping[str, str] | None = None,
106 default_query: Mapping[str, object] | None = None,
107 http_client: httpx.Client | None = None,
108 _strict_response_validation: bool = False,
109 ) -> None: ...
110
111 @overload
112 def __init__(
113 self,
114 *,
115 azure_deployment: str | None = None,
116 api_version: str | None = None,
117 api_key: str | Callable[[], str] | None = None,
118 azure_ad_token: str | None = None,
119 azure_ad_token_provider: AzureADTokenProvider | None = None,
120 organization: str | None = None,
121 webhook_secret: str | None = None,
122 websocket_base_url: str | httpx.URL | None = None,
123 timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
124 max_retries: int = DEFAULT_MAX_RETRIES,
125 default_headers: Mapping[str, str] | None = None,
126 default_query: Mapping[str, object] | None = None,
127 http_client: httpx.Client | None = None,
128 _strict_response_validation: bool = False,
129 ) -> None: ...
130
131 @overload
132 def __init__(
133 self,
134 *,
135 base_url: str,
136 api_version: str | None = None,
137 api_key: str | Callable[[], str] | None = None,
138 azure_ad_token: str | None = None,
139 azure_ad_token_provider: AzureADTokenProvider | None = None,
140 organization: str | None = None,
141 webhook_secret: str | None = None,
142 websocket_base_url: str | httpx.URL | None = None,
143 timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
144 max_retries: int = DEFAULT_MAX_RETRIES,
145 default_headers: Mapping[str, str] | None = None,
146 default_query: Mapping[str, object] | None = None,
147 http_client: httpx.Client | None = None,
148 _strict_response_validation: bool = False,
149 ) -> None: ...
150
151 def __init__(
152 self,
153 *,
154 api_version: str | None = None,
155 azure_endpoint: str | None = None,
156 azure_deployment: str | None = None,
157 api_key: str | Callable[[], str] | None = None,
158 azure_ad_token: str | None = None,
159 azure_ad_token_provider: AzureADTokenProvider | None = None,
160 organization: str | None = None,
161 project: str | None = None,
162 webhook_secret: str | None = None,
163 websocket_base_url: str | httpx.URL | None = None,
164 base_url: str | None = None,
165 timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
166 max_retries: int = DEFAULT_MAX_RETRIES,
167 default_headers: Mapping[str, str] | None = None,
168 default_query: Mapping[str, object] | None = None,
169 http_client: httpx.Client | None = None,
170 _strict_response_validation: bool = False,
171 ) -> None:
172 """Construct a new synchronous azure openai client instance.
173
174 This automatically infers the following arguments from their corresponding environment variables if they are not provided:
175 - `api_key` from `AZURE_OPENAI_API_KEY`
176 - `organization` from `OPENAI_ORG_ID`
177 - `project` from `OPENAI_PROJECT_ID`
178 - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN`
179 - `api_version` from `OPENAI_API_VERSION`
180 - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT`
181
182 Args:
183 azure_endpoint: Your Azure endpoint, including the resource, e.g. `https://example-resource.azure.openai.com/`
184
185 azure_ad_token: Your Azure Active Directory token, https://www.microsoft.com/en-us/security/business/identity-access/microsoft-entra-id
186
187 azure_ad_token_provider: A function that returns an Azure Active Directory token, will be invoked on every request.
188
189 azure_deployment: A model deployment, if given with `azure_endpoint`, sets the base client URL to include `/deployments/{azure_deployment}`.
190 Not supported with Assistants APIs.
191 """
192 if api_key is None:
193 api_key = os.environ.get("AZURE_OPENAI_API_KEY")
194
195 if azure_ad_token is None:
196 azure_ad_token = os.environ.get("AZURE_OPENAI_AD_TOKEN")
197
198 if api_key is None and azure_ad_token is None and azure_ad_token_provider is None:
199 raise OpenAIError(
200 "Missing credentials. Please pass one of `api_key`, `azure_ad_token`, `azure_ad_token_provider`, or the `AZURE_OPENAI_API_KEY` or `AZURE_OPENAI_AD_TOKEN` environment variables."
201 )
202
203 if api_version is None:
204 api_version = os.environ.get("OPENAI_API_VERSION")
205
206 if api_version is None:
207 raise ValueError(
208 "Must provide either the `api_version` argument or the `OPENAI_API_VERSION` environment variable"
209 )
210
211 if default_query is None:
212 default_query = {"api-version": api_version}
213 else:
214 default_query = {**default_query, "api-version": api_version}
215
216 if base_url is None:
217 if azure_endpoint is None:
218 azure_endpoint = os.environ.get("AZURE_OPENAI_ENDPOINT")
219
220 if azure_endpoint is None:
221 raise ValueError(
222 "Must provide one of the `base_url` or `azure_endpoint` arguments, or the `AZURE_OPENAI_ENDPOINT` environment variable"
223 )
224
225 if azure_deployment is not None:
226 base_url = f"{azure_endpoint.rstrip('/')}/openai/deployments/{azure_deployment}"
227 else:
228 base_url = f"{azure_endpoint.rstrip('/')}/openai"
229 else:
230 if azure_endpoint is not None:
231 raise ValueError("base_url and azure_endpoint are mutually exclusive")
232
233 if api_key is None:
234 # define a sentinel value to avoid any typing issues
235 api_key = API_KEY_SENTINEL
236
237 super().__init__(
238 api_key=api_key,
239 organization=organization,
240 project=project,
241 webhook_secret=webhook_secret,
242 base_url=base_url,
243 timeout=timeout,
244 max_retries=max_retries,
245 default_headers=default_headers,
246 default_query=default_query,
247 http_client=http_client,
248 websocket_base_url=websocket_base_url,
249 _strict_response_validation=_strict_response_validation,
250 )
251 self._api_version = api_version
252 self._azure_ad_token = azure_ad_token
253 self._azure_ad_token_provider = azure_ad_token_provider
254 self._azure_deployment = azure_deployment if azure_endpoint else None
255 self._azure_endpoint = httpx.URL(azure_endpoint) if azure_endpoint else None
256
257 @override
258 def copy(
259 self,
260 *,
261 api_key: str | Callable[[], str] | None = None,
262 organization: str | None = None,
263 project: str | None = None,
264 webhook_secret: str | None = None,
265 websocket_base_url: str | httpx.URL | None = None,
266 api_version: str | None = None,
267 azure_ad_token: str | None = None,
268 azure_ad_token_provider: AzureADTokenProvider | None = None,
269 base_url: str | httpx.URL | None = None,
270 timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
271 http_client: httpx.Client | None = None,
272 max_retries: int | NotGiven = NOT_GIVEN,
273 default_headers: Mapping[str, str] | None = None,
274 set_default_headers: Mapping[str, str] | None = None,
275 default_query: Mapping[str, object] | None = None,
276 set_default_query: Mapping[str, object] | None = None,
277 _extra_kwargs: Mapping[str, Any] = {},
278 ) -> Self:
279 """
280 Create a new client instance re-using the same options given to the current client with optional overriding.
281 """
282 return super().copy(
283 api_key=api_key,
284 organization=organization,
285 project=project,
286 webhook_secret=webhook_secret,
287 websocket_base_url=websocket_base_url,
288 base_url=base_url,
289 timeout=timeout,
290 http_client=http_client,
291 max_retries=max_retries,
292 default_headers=default_headers,
293 set_default_headers=set_default_headers,
294 default_query=default_query,
295 set_default_query=set_default_query,
296 _extra_kwargs={
297 "api_version": api_version or self._api_version,
298 "azure_ad_token": azure_ad_token or self._azure_ad_token,
299 "azure_ad_token_provider": azure_ad_token_provider or self._azure_ad_token_provider,
300 **_extra_kwargs,
301 },
302 )
303
304 with_options = copy
305
306 def _get_azure_ad_token(self) -> str | None:
307 if self._azure_ad_token is not None:
308 return self._azure_ad_token
309
310 provider = self._azure_ad_token_provider
311 if provider is not None:
312 token = provider()
313 if not token or not isinstance(token, str): # pyright: ignore[reportUnnecessaryIsInstance]
314 raise ValueError(
315 f"Expected `azure_ad_token_provider` argument to return a string but it returned {token}",
316 )
317 return token
318
319 return None
320
321 @override
322 def _prepare_options(self, options: FinalRequestOptions) -> FinalRequestOptions:
323 headers: dict[str, str | Omit] = {**options.headers} if is_given(options.headers) else {}
324
325 options = model_copy(options)
326 options.headers = headers
327
328 azure_ad_token = self._get_azure_ad_token()
329 if azure_ad_token is not None:
330 if headers.get("Authorization") is None:
331 headers["Authorization"] = f"Bearer {azure_ad_token}"
332 elif self.api_key is not API_KEY_SENTINEL:
333 if headers.get("api-key") is None:
334 headers["api-key"] = self.api_key
335 else:
336 # should never be hit
337 raise ValueError("Unable to handle auth")
338
339 return options
340
341 def _configure_realtime(self, model: str, extra_query: Query) -> tuple[httpx.URL, dict[str, str]]:
342 auth_headers = {}
343 query = {
344 **extra_query,
345 "api-version": self._api_version,
346 "deployment": self._azure_deployment or model,
347 }
348 if self.api_key and self.api_key != "<missing API key>":
349 auth_headers = {"api-key": self.api_key}
350 else:
351 token = self._get_azure_ad_token()
352 if token:
353 auth_headers = {"Authorization": f"Bearer {token}"}
354
355 if self.websocket_base_url is not None:
356 base_url = httpx.URL(self.websocket_base_url)
357 merge_raw_path = base_url.raw_path.rstrip(b"/") + b"/realtime"
358 realtime_url = base_url.copy_with(raw_path=merge_raw_path)
359 else:
360 base_url = self._prepare_url("/realtime")
361 realtime_url = base_url.copy_with(scheme="wss")
362
363 url = realtime_url.copy_with(params={**query})
364 return url, auth_headers
365
366
367class AsyncAzureOpenAI(BaseAzureClient[httpx.AsyncClient, AsyncStream[Any]], AsyncOpenAI):
368 @overload
369 def __init__(
370 self,
371 *,
372 azure_endpoint: str,
373 azure_deployment: str | None = None,
374 api_version: str | None = None,
375 api_key: str | Callable[[], Awaitable[str]] | None = None,
376 azure_ad_token: str | None = None,
377 azure_ad_token_provider: AsyncAzureADTokenProvider | None = None,
378 organization: str | None = None,
379 project: str | None = None,
380 webhook_secret: str | None = None,
381 websocket_base_url: str | httpx.URL | None = None,
382 timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
383 max_retries: int = DEFAULT_MAX_RETRIES,
384 default_headers: Mapping[str, str] | None = None,
385 default_query: Mapping[str, object] | None = None,
386 http_client: httpx.AsyncClient | None = None,
387 _strict_response_validation: bool = False,
388 ) -> None: ...
389
390 @overload
391 def __init__(
392 self,
393 *,
394 azure_deployment: str | None = None,
395 api_version: str | None = None,
396 api_key: str | Callable[[], Awaitable[str]] | None = None,
397 azure_ad_token: str | None = None,
398 azure_ad_token_provider: AsyncAzureADTokenProvider | None = None,
399 organization: str | None = None,
400 project: str | None = None,
401 webhook_secret: str | None = None,
402 websocket_base_url: str | httpx.URL | None = None,
403 timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
404 max_retries: int = DEFAULT_MAX_RETRIES,
405 default_headers: Mapping[str, str] | None = None,
406 default_query: Mapping[str, object] | None = None,
407 http_client: httpx.AsyncClient | None = None,
408 _strict_response_validation: bool = False,
409 ) -> None: ...
410
411 @overload
412 def __init__(
413 self,
414 *,
415 base_url: str,
416 api_version: str | None = None,
417 api_key: str | Callable[[], Awaitable[str]] | None = None,
418 azure_ad_token: str | None = None,
419 azure_ad_token_provider: AsyncAzureADTokenProvider | None = None,
420 organization: str | None = None,
421 project: str | None = None,
422 webhook_secret: str | None = None,
423 websocket_base_url: str | httpx.URL | None = None,
424 timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
425 max_retries: int = DEFAULT_MAX_RETRIES,
426 default_headers: Mapping[str, str] | None = None,
427 default_query: Mapping[str, object] | None = None,
428 http_client: httpx.AsyncClient | None = None,
429 _strict_response_validation: bool = False,
430 ) -> None: ...
431
432 def __init__(
433 self,
434 *,
435 azure_endpoint: str | None = None,
436 azure_deployment: str | None = None,
437 api_version: str | None = None,
438 api_key: str | Callable[[], Awaitable[str]] | None = None,
439 azure_ad_token: str | None = None,
440 azure_ad_token_provider: AsyncAzureADTokenProvider | None = None,
441 organization: str | None = None,
442 project: str | None = None,
443 webhook_secret: str | None = None,
444 base_url: str | None = None,
445 websocket_base_url: str | httpx.URL | None = None,
446 timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
447 max_retries: int = DEFAULT_MAX_RETRIES,
448 default_headers: Mapping[str, str] | None = None,
449 default_query: Mapping[str, object] | None = None,
450 http_client: httpx.AsyncClient | None = None,
451 _strict_response_validation: bool = False,
452 ) -> None:
453 """Construct a new asynchronous azure openai client instance.
454
455 This automatically infers the following arguments from their corresponding environment variables if they are not provided:
456 - `api_key` from `AZURE_OPENAI_API_KEY`
457 - `organization` from `OPENAI_ORG_ID`
458 - `project` from `OPENAI_PROJECT_ID`
459 - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN`
460 - `api_version` from `OPENAI_API_VERSION`
461 - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT`
462
463 Args:
464 azure_endpoint: Your Azure endpoint, including the resource, e.g. `https://example-resource.azure.openai.com/`
465
466 azure_ad_token: Your Azure Active Directory token, https://www.microsoft.com/en-us/security/business/identity-access/microsoft-entra-id
467
468 azure_ad_token_provider: A function that returns an Azure Active Directory token, will be invoked on every request.
469
470 azure_deployment: A model deployment, if given with `azure_endpoint`, sets the base client URL to include `/deployments/{azure_deployment}`.
471 Not supported with Assistants APIs.
472 """
473 if api_key is None:
474 api_key = os.environ.get("AZURE_OPENAI_API_KEY")
475
476 if azure_ad_token is None:
477 azure_ad_token = os.environ.get("AZURE_OPENAI_AD_TOKEN")
478
479 if api_key is None and azure_ad_token is None and azure_ad_token_provider is None:
480 raise OpenAIError(
481 "Missing credentials. Please pass one of `api_key`, `azure_ad_token`, `azure_ad_token_provider`, or the `AZURE_OPENAI_API_KEY` or `AZURE_OPENAI_AD_TOKEN` environment variables."
482 )
483
484 if api_version is None:
485 api_version = os.environ.get("OPENAI_API_VERSION")
486
487 if api_version is None:
488 raise ValueError(
489 "Must provide either the `api_version` argument or the `OPENAI_API_VERSION` environment variable"
490 )
491
492 if default_query is None:
493 default_query = {"api-version": api_version}
494 else:
495 default_query = {**default_query, "api-version": api_version}
496
497 if base_url is None:
498 if azure_endpoint is None:
499 azure_endpoint = os.environ.get("AZURE_OPENAI_ENDPOINT")
500
501 if azure_endpoint is None:
502 raise ValueError(
503 "Must provide one of the `base_url` or `azure_endpoint` arguments, or the `AZURE_OPENAI_ENDPOINT` environment variable"
504 )
505
506 if azure_deployment is not None:
507 base_url = f"{azure_endpoint.rstrip('/')}/openai/deployments/{azure_deployment}"
508 else:
509 base_url = f"{azure_endpoint.rstrip('/')}/openai"
510 else:
511 if azure_endpoint is not None:
512 raise ValueError("base_url and azure_endpoint are mutually exclusive")
513
514 if api_key is None:
515 # define a sentinel value to avoid any typing issues
516 api_key = API_KEY_SENTINEL
517
518 super().__init__(
519 api_key=api_key,
520 organization=organization,
521 project=project,
522 webhook_secret=webhook_secret,
523 base_url=base_url,
524 timeout=timeout,
525 max_retries=max_retries,
526 default_headers=default_headers,
527 default_query=default_query,
528 http_client=http_client,
529 websocket_base_url=websocket_base_url,
530 _strict_response_validation=_strict_response_validation,
531 )
532 self._api_version = api_version
533 self._azure_ad_token = azure_ad_token
534 self._azure_ad_token_provider = azure_ad_token_provider
535 self._azure_deployment = azure_deployment if azure_endpoint else None
536 self._azure_endpoint = httpx.URL(azure_endpoint) if azure_endpoint else None
537
538 @override
539 def copy(
540 self,
541 *,
542 api_key: str | Callable[[], Awaitable[str]] | None = None,
543 organization: str | None = None,
544 project: str | None = None,
545 webhook_secret: str | None = None,
546 websocket_base_url: str | httpx.URL | None = None,
547 api_version: str | None = None,
548 azure_ad_token: str | None = None,
549 azure_ad_token_provider: AsyncAzureADTokenProvider | None = None,
550 base_url: str | httpx.URL | None = None,
551 timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
552 http_client: httpx.AsyncClient | None = None,
553 max_retries: int | NotGiven = NOT_GIVEN,
554 default_headers: Mapping[str, str] | None = None,
555 set_default_headers: Mapping[str, str] | None = None,
556 default_query: Mapping[str, object] | None = None,
557 set_default_query: Mapping[str, object] | None = None,
558 _extra_kwargs: Mapping[str, Any] = {},
559 ) -> Self:
560 """
561 Create a new client instance re-using the same options given to the current client with optional overriding.
562 """
563 return super().copy(
564 api_key=api_key,
565 organization=organization,
566 project=project,
567 webhook_secret=webhook_secret,
568 websocket_base_url=websocket_base_url,
569 base_url=base_url,
570 timeout=timeout,
571 http_client=http_client,
572 max_retries=max_retries,
573 default_headers=default_headers,
574 set_default_headers=set_default_headers,
575 default_query=default_query,
576 set_default_query=set_default_query,
577 _extra_kwargs={
578 "api_version": api_version or self._api_version,
579 "azure_ad_token": azure_ad_token or self._azure_ad_token,
580 "azure_ad_token_provider": azure_ad_token_provider or self._azure_ad_token_provider,
581 **_extra_kwargs,
582 },
583 )
584
585 with_options = copy
586
587 async def _get_azure_ad_token(self) -> str | None:
588 if self._azure_ad_token is not None:
589 return self._azure_ad_token
590
591 provider = self._azure_ad_token_provider
592 if provider is not None:
593 token = provider()
594 if inspect.isawaitable(token):
595 token = await token
596 if not token or not isinstance(cast(Any, token), str):
597 raise ValueError(
598 f"Expected `azure_ad_token_provider` argument to return a string but it returned {token}",
599 )
600 return str(token)
601
602 return None
603
604 @override
605 async def _prepare_options(self, options: FinalRequestOptions) -> FinalRequestOptions:
606 headers: dict[str, str | Omit] = {**options.headers} if is_given(options.headers) else {}
607
608 options = model_copy(options)
609 options.headers = headers
610
611 azure_ad_token = await self._get_azure_ad_token()
612 if azure_ad_token is not None:
613 if headers.get("Authorization") is None:
614 headers["Authorization"] = f"Bearer {azure_ad_token}"
615 elif self.api_key is not API_KEY_SENTINEL:
616 if headers.get("api-key") is None:
617 headers["api-key"] = self.api_key
618 else:
619 # should never be hit
620 raise ValueError("Unable to handle auth")
621
622 return options
623
624 async def _configure_realtime(self, model: str, extra_query: Query) -> tuple[httpx.URL, dict[str, str]]:
625 auth_headers = {}
626 query = {
627 **extra_query,
628 "api-version": self._api_version,
629 "deployment": self._azure_deployment or model,
630 }
631 if self.api_key and self.api_key != "<missing API key>":
632 auth_headers = {"api-key": self.api_key}
633 else:
634 token = await self._get_azure_ad_token()
635 if token:
636 auth_headers = {"Authorization": f"Bearer {token}"}
637
638 if self.websocket_base_url is not None:
639 base_url = httpx.URL(self.websocket_base_url)
640 merge_raw_path = base_url.raw_path.rstrip(b"/") + b"/realtime"
641 realtime_url = base_url.copy_with(raw_path=merge_raw_path)
642 else:
643 base_url = self._prepare_url("/realtime")
644 realtime_url = base_url.copy_with(scheme="wss")
645
646 url = realtime_url.copy_with(params={**query})
647 return url, auth_headers