main
1from __future__ import annotations
2
3import sys
4import json
5import time
6import uuid
7import email
8import asyncio
9import inspect
10import logging
11import platform
12import email.utils
13from types import TracebackType
14from random import random
15from typing import (
16 TYPE_CHECKING,
17 Any,
18 Dict,
19 Type,
20 Union,
21 Generic,
22 Mapping,
23 TypeVar,
24 Iterable,
25 Iterator,
26 Optional,
27 Generator,
28 AsyncIterator,
29 cast,
30 overload,
31)
32from typing_extensions import Literal, override, get_origin
33
34import anyio
35import httpx
36import distro
37import pydantic
38from httpx import URL
39from pydantic import PrivateAttr
40
41from . import _exceptions
42from ._qs import Querystring
43from ._files import to_httpx_files, async_to_httpx_files
44from ._types import (
45 Body,
46 Omit,
47 Query,
48 Headers,
49 Timeout,
50 NotGiven,
51 ResponseT,
52 AnyMapping,
53 PostParser,
54 RequestFiles,
55 HttpxSendArgs,
56 RequestOptions,
57 HttpxRequestFiles,
58 ModelBuilderProtocol,
59 not_given,
60)
61from ._utils import SensitiveHeadersFilter, is_dict, is_list, asyncify, is_given, lru_cache, is_mapping
62from ._compat import PYDANTIC_V1, model_copy, model_dump
63from ._models import GenericModel, FinalRequestOptions, validate_type, construct_type
64from ._response import (
65 APIResponse,
66 BaseAPIResponse,
67 AsyncAPIResponse,
68 extract_response_type,
69)
70from ._constants import (
71 DEFAULT_TIMEOUT,
72 MAX_RETRY_DELAY,
73 DEFAULT_MAX_RETRIES,
74 INITIAL_RETRY_DELAY,
75 RAW_RESPONSE_HEADER,
76 OVERRIDE_CAST_TO_HEADER,
77 DEFAULT_CONNECTION_LIMITS,
78)
79from ._streaming import Stream, SSEDecoder, AsyncStream, SSEBytesDecoder
80from ._exceptions import (
81 APIStatusError,
82 APITimeoutError,
83 APIConnectionError,
84 APIResponseValidationError,
85)
86from ._legacy_response import LegacyAPIResponse
87
88log: logging.Logger = logging.getLogger(__name__)
89log.addFilter(SensitiveHeadersFilter())
90
91# TODO: make base page type vars covariant
92SyncPageT = TypeVar("SyncPageT", bound="BaseSyncPage[Any]")
93AsyncPageT = TypeVar("AsyncPageT", bound="BaseAsyncPage[Any]")
94
95
96_T = TypeVar("_T")
97_T_co = TypeVar("_T_co", covariant=True)
98
99_StreamT = TypeVar("_StreamT", bound=Stream[Any])
100_AsyncStreamT = TypeVar("_AsyncStreamT", bound=AsyncStream[Any])
101
102if TYPE_CHECKING:
103 from httpx._config import (
104 DEFAULT_TIMEOUT_CONFIG, # pyright: ignore[reportPrivateImportUsage]
105 )
106
107 HTTPX_DEFAULT_TIMEOUT = DEFAULT_TIMEOUT_CONFIG
108else:
109 try:
110 from httpx._config import DEFAULT_TIMEOUT_CONFIG as HTTPX_DEFAULT_TIMEOUT
111 except ImportError:
112 # taken from https://github.com/encode/httpx/blob/3ba5fe0d7ac70222590e759c31442b1cab263791/httpx/_config.py#L366
113 HTTPX_DEFAULT_TIMEOUT = Timeout(5.0)
114
115
116class PageInfo:
117 """Stores the necessary information to build the request to retrieve the next page.
118
119 Either `url` or `params` must be set.
120 """
121
122 url: URL | NotGiven
123 params: Query | NotGiven
124 json: Body | NotGiven
125
126 @overload
127 def __init__(
128 self,
129 *,
130 url: URL,
131 ) -> None: ...
132
133 @overload
134 def __init__(
135 self,
136 *,
137 params: Query,
138 ) -> None: ...
139
140 @overload
141 def __init__(
142 self,
143 *,
144 json: Body,
145 ) -> None: ...
146
147 def __init__(
148 self,
149 *,
150 url: URL | NotGiven = not_given,
151 json: Body | NotGiven = not_given,
152 params: Query | NotGiven = not_given,
153 ) -> None:
154 self.url = url
155 self.json = json
156 self.params = params
157
158 @override
159 def __repr__(self) -> str:
160 if self.url:
161 return f"{self.__class__.__name__}(url={self.url})"
162 if self.json:
163 return f"{self.__class__.__name__}(json={self.json})"
164 return f"{self.__class__.__name__}(params={self.params})"
165
166
167class BasePage(GenericModel, Generic[_T]):
168 """
169 Defines the core interface for pagination.
170
171 Type Args:
172 ModelT: The pydantic model that represents an item in the response.
173
174 Methods:
175 has_next_page(): Check if there is another page available
176 next_page_info(): Get the necessary information to make a request for the next page
177 """
178
179 _options: FinalRequestOptions = PrivateAttr()
180 _model: Type[_T] = PrivateAttr()
181
182 def has_next_page(self) -> bool:
183 items = self._get_page_items()
184 if not items:
185 return False
186 return self.next_page_info() is not None
187
188 def next_page_info(self) -> Optional[PageInfo]: ...
189
190 def _get_page_items(self) -> Iterable[_T]: # type: ignore[empty-body]
191 ...
192
193 def _params_from_url(self, url: URL) -> httpx.QueryParams:
194 # TODO: do we have to preprocess params here?
195 return httpx.QueryParams(cast(Any, self._options.params)).merge(url.params)
196
197 def _info_to_options(self, info: PageInfo) -> FinalRequestOptions:
198 options = model_copy(self._options)
199 options._strip_raw_response_header()
200
201 if not isinstance(info.params, NotGiven):
202 options.params = {**options.params, **info.params}
203 return options
204
205 if not isinstance(info.url, NotGiven):
206 params = self._params_from_url(info.url)
207 url = info.url.copy_with(params=params)
208 options.params = dict(url.params)
209 options.url = str(url)
210 return options
211
212 if not isinstance(info.json, NotGiven):
213 if not is_mapping(info.json):
214 raise TypeError("Pagination is only supported with mappings")
215
216 if not options.json_data:
217 options.json_data = {**info.json}
218 else:
219 if not is_mapping(options.json_data):
220 raise TypeError("Pagination is only supported with mappings")
221
222 options.json_data = {**options.json_data, **info.json}
223 return options
224
225 raise ValueError("Unexpected PageInfo state")
226
227
228class BaseSyncPage(BasePage[_T], Generic[_T]):
229 _client: SyncAPIClient = pydantic.PrivateAttr()
230
231 def _set_private_attributes(
232 self,
233 client: SyncAPIClient,
234 model: Type[_T],
235 options: FinalRequestOptions,
236 ) -> None:
237 if (not PYDANTIC_V1) and getattr(self, "__pydantic_private__", None) is None:
238 self.__pydantic_private__ = {}
239
240 self._model = model
241 self._client = client
242 self._options = options
243
244 # Pydantic uses a custom `__iter__` method to support casting BaseModels
245 # to dictionaries. e.g. dict(model).
246 # As we want to support `for item in page`, this is inherently incompatible
247 # with the default pydantic behaviour. It is not possible to support both
248 # use cases at once. Fortunately, this is not a big deal as all other pydantic
249 # methods should continue to work as expected as there is an alternative method
250 # to cast a model to a dictionary, model.dict(), which is used internally
251 # by pydantic.
252 def __iter__(self) -> Iterator[_T]: # type: ignore
253 for page in self.iter_pages():
254 for item in page._get_page_items():
255 yield item
256
257 def iter_pages(self: SyncPageT) -> Iterator[SyncPageT]:
258 page = self
259 while True:
260 yield page
261 if page.has_next_page():
262 page = page.get_next_page()
263 else:
264 return
265
266 def get_next_page(self: SyncPageT) -> SyncPageT:
267 info = self.next_page_info()
268 if not info:
269 raise RuntimeError(
270 "No next page expected; please check `.has_next_page()` before calling `.get_next_page()`."
271 )
272
273 options = self._info_to_options(info)
274 return self._client._request_api_list(self._model, page=self.__class__, options=options)
275
276
277class AsyncPaginator(Generic[_T, AsyncPageT]):
278 def __init__(
279 self,
280 client: AsyncAPIClient,
281 options: FinalRequestOptions,
282 page_cls: Type[AsyncPageT],
283 model: Type[_T],
284 ) -> None:
285 self._model = model
286 self._client = client
287 self._options = options
288 self._page_cls = page_cls
289
290 def __await__(self) -> Generator[Any, None, AsyncPageT]:
291 return self._get_page().__await__()
292
293 async def _get_page(self) -> AsyncPageT:
294 def _parser(resp: AsyncPageT) -> AsyncPageT:
295 resp._set_private_attributes(
296 model=self._model,
297 options=self._options,
298 client=self._client,
299 )
300 return resp
301
302 self._options.post_parser = _parser
303
304 return await self._client.request(self._page_cls, self._options)
305
306 async def __aiter__(self) -> AsyncIterator[_T]:
307 # https://github.com/microsoft/pyright/issues/3464
308 page = cast(
309 AsyncPageT,
310 await self, # type: ignore
311 )
312 async for item in page:
313 yield item
314
315
316class BaseAsyncPage(BasePage[_T], Generic[_T]):
317 _client: AsyncAPIClient = pydantic.PrivateAttr()
318
319 def _set_private_attributes(
320 self,
321 model: Type[_T],
322 client: AsyncAPIClient,
323 options: FinalRequestOptions,
324 ) -> None:
325 if (not PYDANTIC_V1) and getattr(self, "__pydantic_private__", None) is None:
326 self.__pydantic_private__ = {}
327
328 self._model = model
329 self._client = client
330 self._options = options
331
332 async def __aiter__(self) -> AsyncIterator[_T]:
333 async for page in self.iter_pages():
334 for item in page._get_page_items():
335 yield item
336
337 async def iter_pages(self: AsyncPageT) -> AsyncIterator[AsyncPageT]:
338 page = self
339 while True:
340 yield page
341 if page.has_next_page():
342 page = await page.get_next_page()
343 else:
344 return
345
346 async def get_next_page(self: AsyncPageT) -> AsyncPageT:
347 info = self.next_page_info()
348 if not info:
349 raise RuntimeError(
350 "No next page expected; please check `.has_next_page()` before calling `.get_next_page()`."
351 )
352
353 options = self._info_to_options(info)
354 return await self._client._request_api_list(self._model, page=self.__class__, options=options)
355
356
357_HttpxClientT = TypeVar("_HttpxClientT", bound=Union[httpx.Client, httpx.AsyncClient])
358_DefaultStreamT = TypeVar("_DefaultStreamT", bound=Union[Stream[Any], AsyncStream[Any]])
359
360
361class BaseClient(Generic[_HttpxClientT, _DefaultStreamT]):
362 _client: _HttpxClientT
363 _version: str
364 _base_url: URL
365 max_retries: int
366 timeout: Union[float, Timeout, None]
367 _strict_response_validation: bool
368 _idempotency_header: str | None
369 _default_stream_cls: type[_DefaultStreamT] | None = None
370
371 def __init__(
372 self,
373 *,
374 version: str,
375 base_url: str | URL,
376 _strict_response_validation: bool,
377 max_retries: int = DEFAULT_MAX_RETRIES,
378 timeout: float | Timeout | None = DEFAULT_TIMEOUT,
379 custom_headers: Mapping[str, str] | None = None,
380 custom_query: Mapping[str, object] | None = None,
381 ) -> None:
382 self._version = version
383 self._base_url = self._enforce_trailing_slash(URL(base_url))
384 self.max_retries = max_retries
385 self.timeout = timeout
386 self._custom_headers = custom_headers or {}
387 self._custom_query = custom_query or {}
388 self._strict_response_validation = _strict_response_validation
389 self._idempotency_header = None
390 self._platform: Platform | None = None
391
392 if max_retries is None: # pyright: ignore[reportUnnecessaryComparison]
393 raise TypeError(
394 "max_retries cannot be None. If you want to disable retries, pass `0`; if you want unlimited retries, pass `math.inf` or a very high number; if you want the default behavior, pass `openai.DEFAULT_MAX_RETRIES`"
395 )
396
397 def _enforce_trailing_slash(self, url: URL) -> URL:
398 if url.raw_path.endswith(b"/"):
399 return url
400 return url.copy_with(raw_path=url.raw_path + b"/")
401
402 def _make_status_error_from_response(
403 self,
404 response: httpx.Response,
405 ) -> APIStatusError:
406 if response.is_closed and not response.is_stream_consumed:
407 # We can't read the response body as it has been closed
408 # before it was read. This can happen if an event hook
409 # raises a status error.
410 body = None
411 err_msg = f"Error code: {response.status_code}"
412 else:
413 err_text = response.text.strip()
414 body = err_text
415
416 try:
417 body = json.loads(err_text)
418 err_msg = f"Error code: {response.status_code} - {body}"
419 except Exception:
420 err_msg = err_text or f"Error code: {response.status_code}"
421
422 return self._make_status_error(err_msg, body=body, response=response)
423
424 def _make_status_error(
425 self,
426 err_msg: str,
427 *,
428 body: object,
429 response: httpx.Response,
430 ) -> _exceptions.APIStatusError:
431 raise NotImplementedError()
432
433 def _build_headers(self, options: FinalRequestOptions, *, retries_taken: int = 0) -> httpx.Headers:
434 custom_headers = options.headers or {}
435 headers_dict = _merge_mappings(self.default_headers, custom_headers)
436 self._validate_headers(headers_dict, custom_headers)
437
438 # headers are case-insensitive while dictionaries are not.
439 headers = httpx.Headers(headers_dict)
440
441 idempotency_header = self._idempotency_header
442 if idempotency_header and options.idempotency_key and idempotency_header not in headers:
443 headers[idempotency_header] = options.idempotency_key
444
445 # Don't set these headers if they were already set or removed by the caller. We check
446 # `custom_headers`, which can contain `Omit()`, instead of `headers` to account for the removal case.
447 lower_custom_headers = [header.lower() for header in custom_headers]
448 if "x-stainless-retry-count" not in lower_custom_headers:
449 headers["x-stainless-retry-count"] = str(retries_taken)
450 if "x-stainless-read-timeout" not in lower_custom_headers:
451 timeout = self.timeout if isinstance(options.timeout, NotGiven) else options.timeout
452 if isinstance(timeout, Timeout):
453 timeout = timeout.read
454 if timeout is not None:
455 headers["x-stainless-read-timeout"] = str(timeout)
456
457 return headers
458
459 def _prepare_url(self, url: str) -> URL:
460 """
461 Merge a URL argument together with any 'base_url' on the client,
462 to create the URL used for the outgoing request.
463 """
464 # Copied from httpx's `_merge_url` method.
465 merge_url = URL(url)
466 if merge_url.is_relative_url:
467 merge_raw_path = self.base_url.raw_path + merge_url.raw_path.lstrip(b"/")
468 return self.base_url.copy_with(raw_path=merge_raw_path)
469
470 return merge_url
471
472 def _make_sse_decoder(self) -> SSEDecoder | SSEBytesDecoder:
473 return SSEDecoder()
474
475 def _build_request(
476 self,
477 options: FinalRequestOptions,
478 *,
479 retries_taken: int = 0,
480 ) -> httpx.Request:
481 if log.isEnabledFor(logging.DEBUG):
482 log.debug("Request options: %s", model_dump(options, exclude_unset=True))
483
484 kwargs: dict[str, Any] = {}
485
486 json_data = options.json_data
487 if options.extra_json is not None:
488 if json_data is None:
489 json_data = cast(Body, options.extra_json)
490 elif is_mapping(json_data):
491 json_data = _merge_mappings(json_data, options.extra_json)
492 else:
493 raise RuntimeError(f"Unexpected JSON data type, {type(json_data)}, cannot merge with `extra_body`")
494
495 headers = self._build_headers(options, retries_taken=retries_taken)
496 params = _merge_mappings(self.default_query, options.params)
497 content_type = headers.get("Content-Type")
498 files = options.files
499
500 # If the given Content-Type header is multipart/form-data then it
501 # has to be removed so that httpx can generate the header with
502 # additional information for us as it has to be in this form
503 # for the server to be able to correctly parse the request:
504 # multipart/form-data; boundary=---abc--
505 if content_type is not None and content_type.startswith("multipart/form-data"):
506 if "boundary" not in content_type:
507 # only remove the header if the boundary hasn't been explicitly set
508 # as the caller doesn't want httpx to come up with their own boundary
509 headers.pop("Content-Type")
510
511 # As we are now sending multipart/form-data instead of application/json
512 # we need to tell httpx to use it, https://www.python-httpx.org/advanced/clients/#multipart-file-encoding
513 if json_data:
514 if not is_dict(json_data):
515 raise TypeError(
516 f"Expected query input to be a dictionary for multipart requests but got {type(json_data)} instead."
517 )
518 kwargs["data"] = self._serialize_multipartform(json_data)
519
520 # httpx determines whether or not to send a "multipart/form-data"
521 # request based on the truthiness of the "files" argument.
522 # This gets around that issue by generating a dict value that
523 # evaluates to true.
524 #
525 # https://github.com/encode/httpx/discussions/2399#discussioncomment-3814186
526 if not files:
527 files = cast(HttpxRequestFiles, ForceMultipartDict())
528
529 prepared_url = self._prepare_url(options.url)
530 if "_" in prepared_url.host:
531 # work around https://github.com/encode/httpx/discussions/2880
532 kwargs["extensions"] = {"sni_hostname": prepared_url.host.replace("_", "-")}
533
534 is_body_allowed = options.method.lower() != "get"
535
536 if is_body_allowed:
537 if isinstance(json_data, bytes):
538 kwargs["content"] = json_data
539 else:
540 kwargs["json"] = json_data if is_given(json_data) else None
541 kwargs["files"] = files
542 else:
543 headers.pop("Content-Type", None)
544 kwargs.pop("data", None)
545
546 # TODO: report this error to httpx
547 return self._client.build_request( # pyright: ignore[reportUnknownMemberType]
548 headers=headers,
549 timeout=self.timeout if isinstance(options.timeout, NotGiven) else options.timeout,
550 method=options.method,
551 url=prepared_url,
552 # the `Query` type that we use is incompatible with qs'
553 # `Params` type as it needs to be typed as `Mapping[str, object]`
554 # so that passing a `TypedDict` doesn't cause an error.
555 # https://github.com/microsoft/pyright/issues/3526#event-6715453066
556 params=self.qs.stringify(cast(Mapping[str, Any], params)) if params else None,
557 **kwargs,
558 )
559
560 def _serialize_multipartform(self, data: Mapping[object, object]) -> dict[str, object]:
561 items = self.qs.stringify_items(
562 # TODO: type ignore is required as stringify_items is well typed but we can't be
563 # well typed without heavy validation.
564 data, # type: ignore
565 array_format="brackets",
566 )
567 serialized: dict[str, object] = {}
568 for key, value in items:
569 existing = serialized.get(key)
570
571 if not existing:
572 serialized[key] = value
573 continue
574
575 # If a value has already been set for this key then that
576 # means we're sending data like `array[]=[1, 2, 3]` and we
577 # need to tell httpx that we want to send multiple values with
578 # the same key which is done by using a list or a tuple.
579 #
580 # Note: 2d arrays should never result in the same key at both
581 # levels so it's safe to assume that if the value is a list,
582 # it was because we changed it to be a list.
583 if is_list(existing):
584 existing.append(value)
585 else:
586 serialized[key] = [existing, value]
587
588 return serialized
589
590 def _maybe_override_cast_to(self, cast_to: type[ResponseT], options: FinalRequestOptions) -> type[ResponseT]:
591 if not is_given(options.headers):
592 return cast_to
593
594 # make a copy of the headers so we don't mutate user-input
595 headers = dict(options.headers)
596
597 # we internally support defining a temporary header to override the
598 # default `cast_to` type for use with `.with_raw_response` and `.with_streaming_response`
599 # see _response.py for implementation details
600 override_cast_to = headers.pop(OVERRIDE_CAST_TO_HEADER, not_given)
601 if is_given(override_cast_to):
602 options.headers = headers
603 return cast(Type[ResponseT], override_cast_to)
604
605 return cast_to
606
607 def _should_stream_response_body(self, request: httpx.Request) -> bool:
608 return request.headers.get(RAW_RESPONSE_HEADER) == "stream" # type: ignore[no-any-return]
609
610 def _process_response_data(
611 self,
612 *,
613 data: object,
614 cast_to: type[ResponseT],
615 response: httpx.Response,
616 ) -> ResponseT:
617 if data is None:
618 return cast(ResponseT, None)
619
620 if cast_to is object:
621 return cast(ResponseT, data)
622
623 try:
624 if inspect.isclass(cast_to) and issubclass(cast_to, ModelBuilderProtocol):
625 return cast(ResponseT, cast_to.build(response=response, data=data))
626
627 if self._strict_response_validation:
628 return cast(ResponseT, validate_type(type_=cast_to, value=data))
629
630 return cast(ResponseT, construct_type(type_=cast_to, value=data))
631 except pydantic.ValidationError as err:
632 raise APIResponseValidationError(response=response, body=data) from err
633
634 @property
635 def qs(self) -> Querystring:
636 return Querystring()
637
638 @property
639 def custom_auth(self) -> httpx.Auth | None:
640 return None
641
642 @property
643 def auth_headers(self) -> dict[str, str]:
644 return {}
645
646 @property
647 def default_headers(self) -> dict[str, str | Omit]:
648 return {
649 "Accept": "application/json",
650 "Content-Type": "application/json",
651 "User-Agent": self.user_agent,
652 **self.platform_headers(),
653 **self.auth_headers,
654 **self._custom_headers,
655 }
656
657 @property
658 def default_query(self) -> dict[str, object]:
659 return {
660 **self._custom_query,
661 }
662
663 def _validate_headers(
664 self,
665 headers: Headers, # noqa: ARG002
666 custom_headers: Headers, # noqa: ARG002
667 ) -> None:
668 """Validate the given default headers and custom headers.
669
670 Does nothing by default.
671 """
672 return
673
674 @property
675 def user_agent(self) -> str:
676 return f"{self.__class__.__name__}/Python {self._version}"
677
678 @property
679 def base_url(self) -> URL:
680 return self._base_url
681
682 @base_url.setter
683 def base_url(self, url: URL | str) -> None:
684 self._base_url = self._enforce_trailing_slash(url if isinstance(url, URL) else URL(url))
685
686 def platform_headers(self) -> Dict[str, str]:
687 # the actual implementation is in a separate `lru_cache` decorated
688 # function because adding `lru_cache` to methods will leak memory
689 # https://github.com/python/cpython/issues/88476
690 return platform_headers(self._version, platform=self._platform)
691
692 def _parse_retry_after_header(self, response_headers: Optional[httpx.Headers] = None) -> float | None:
693 """Returns a float of the number of seconds (not milliseconds) to wait after retrying, or None if unspecified.
694
695 About the Retry-After header: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After
696 See also https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After#syntax
697 """
698 if response_headers is None:
699 return None
700
701 # First, try the non-standard `retry-after-ms` header for milliseconds,
702 # which is more precise than integer-seconds `retry-after`
703 try:
704 retry_ms_header = response_headers.get("retry-after-ms", None)
705 return float(retry_ms_header) / 1000
706 except (TypeError, ValueError):
707 pass
708
709 # Next, try parsing `retry-after` header as seconds (allowing nonstandard floats).
710 retry_header = response_headers.get("retry-after")
711 try:
712 # note: the spec indicates that this should only ever be an integer
713 # but if someone sends a float there's no reason for us to not respect it
714 return float(retry_header)
715 except (TypeError, ValueError):
716 pass
717
718 # Last, try parsing `retry-after` as a date.
719 retry_date_tuple = email.utils.parsedate_tz(retry_header)
720 if retry_date_tuple is None:
721 return None
722
723 retry_date = email.utils.mktime_tz(retry_date_tuple)
724 return float(retry_date - time.time())
725
726 def _calculate_retry_timeout(
727 self,
728 remaining_retries: int,
729 options: FinalRequestOptions,
730 response_headers: Optional[httpx.Headers] = None,
731 ) -> float:
732 max_retries = options.get_max_retries(self.max_retries)
733
734 # If the API asks us to wait a certain amount of time (and it's a reasonable amount), just do what it says.
735 retry_after = self._parse_retry_after_header(response_headers)
736 if retry_after is not None and 0 < retry_after <= 60:
737 return retry_after
738
739 # Also cap retry count to 1000 to avoid any potential overflows with `pow`
740 nb_retries = min(max_retries - remaining_retries, 1000)
741
742 # Apply exponential backoff, but not more than the max.
743 sleep_seconds = min(INITIAL_RETRY_DELAY * pow(2.0, nb_retries), MAX_RETRY_DELAY)
744
745 # Apply some jitter, plus-or-minus half a second.
746 jitter = 1 - 0.25 * random()
747 timeout = sleep_seconds * jitter
748 return timeout if timeout >= 0 else 0
749
750 def _should_retry(self, response: httpx.Response) -> bool:
751 # Note: this is not a standard header
752 should_retry_header = response.headers.get("x-should-retry")
753
754 # If the server explicitly says whether or not to retry, obey.
755 if should_retry_header == "true":
756 log.debug("Retrying as header `x-should-retry` is set to `true`")
757 return True
758 if should_retry_header == "false":
759 log.debug("Not retrying as header `x-should-retry` is set to `false`")
760 return False
761
762 # Retry on request timeouts.
763 if response.status_code == 408:
764 log.debug("Retrying due to status code %i", response.status_code)
765 return True
766
767 # Retry on lock timeouts.
768 if response.status_code == 409:
769 log.debug("Retrying due to status code %i", response.status_code)
770 return True
771
772 # Retry on rate limits.
773 if response.status_code == 429:
774 log.debug("Retrying due to status code %i", response.status_code)
775 return True
776
777 # Retry internal errors.
778 if response.status_code >= 500:
779 log.debug("Retrying due to status code %i", response.status_code)
780 return True
781
782 log.debug("Not retrying")
783 return False
784
785 def _idempotency_key(self) -> str:
786 return f"stainless-python-retry-{uuid.uuid4()}"
787
788
789class _DefaultHttpxClient(httpx.Client):
790 def __init__(self, **kwargs: Any) -> None:
791 kwargs.setdefault("timeout", DEFAULT_TIMEOUT)
792 kwargs.setdefault("limits", DEFAULT_CONNECTION_LIMITS)
793 kwargs.setdefault("follow_redirects", True)
794 super().__init__(**kwargs)
795
796
797if TYPE_CHECKING:
798 DefaultHttpxClient = httpx.Client
799 """An alias to `httpx.Client` that provides the same defaults that this SDK
800 uses internally.
801
802 This is useful because overriding the `http_client` with your own instance of
803 `httpx.Client` will result in httpx's defaults being used, not ours.
804 """
805else:
806 DefaultHttpxClient = _DefaultHttpxClient
807
808
809class SyncHttpxClientWrapper(DefaultHttpxClient):
810 def __del__(self) -> None:
811 if self.is_closed:
812 return
813
814 try:
815 self.close()
816 except Exception:
817 pass
818
819
820class SyncAPIClient(BaseClient[httpx.Client, Stream[Any]]):
821 _client: httpx.Client
822 _default_stream_cls: type[Stream[Any]] | None = None
823
824 def __init__(
825 self,
826 *,
827 version: str,
828 base_url: str | URL,
829 max_retries: int = DEFAULT_MAX_RETRIES,
830 timeout: float | Timeout | None | NotGiven = not_given,
831 http_client: httpx.Client | None = None,
832 custom_headers: Mapping[str, str] | None = None,
833 custom_query: Mapping[str, object] | None = None,
834 _strict_response_validation: bool,
835 ) -> None:
836 if not is_given(timeout):
837 # if the user passed in a custom http client with a non-default
838 # timeout set then we use that timeout.
839 #
840 # note: there is an edge case here where the user passes in a client
841 # where they've explicitly set the timeout to match the default timeout
842 # as this check is structural, meaning that we'll think they didn't
843 # pass in a timeout and will ignore it
844 if http_client and http_client.timeout != HTTPX_DEFAULT_TIMEOUT:
845 timeout = http_client.timeout
846 else:
847 timeout = DEFAULT_TIMEOUT
848
849 if http_client is not None and not isinstance(http_client, httpx.Client): # pyright: ignore[reportUnnecessaryIsInstance]
850 raise TypeError(
851 f"Invalid `http_client` argument; Expected an instance of `httpx.Client` but got {type(http_client)}"
852 )
853
854 super().__init__(
855 version=version,
856 # cast to a valid type because mypy doesn't understand our type narrowing
857 timeout=cast(Timeout, timeout),
858 base_url=base_url,
859 max_retries=max_retries,
860 custom_query=custom_query,
861 custom_headers=custom_headers,
862 _strict_response_validation=_strict_response_validation,
863 )
864 self._client = http_client or SyncHttpxClientWrapper(
865 base_url=base_url,
866 # cast to a valid type because mypy doesn't understand our type narrowing
867 timeout=cast(Timeout, timeout),
868 )
869
870 def is_closed(self) -> bool:
871 return self._client.is_closed
872
873 def close(self) -> None:
874 """Close the underlying HTTPX client.
875
876 The client will *not* be usable after this.
877 """
878 # If an error is thrown while constructing a client, self._client
879 # may not be present
880 if hasattr(self, "_client"):
881 self._client.close()
882
883 def __enter__(self: _T) -> _T:
884 return self
885
886 def __exit__(
887 self,
888 exc_type: type[BaseException] | None,
889 exc: BaseException | None,
890 exc_tb: TracebackType | None,
891 ) -> None:
892 self.close()
893
894 def _prepare_options(
895 self,
896 options: FinalRequestOptions, # noqa: ARG002
897 ) -> FinalRequestOptions:
898 """Hook for mutating the given options"""
899 return options
900
901 def _prepare_request(
902 self,
903 request: httpx.Request, # noqa: ARG002
904 ) -> None:
905 """This method is used as a callback for mutating the `Request` object
906 after it has been constructed.
907 This is useful for cases where you want to add certain headers based off of
908 the request properties, e.g. `url`, `method` etc.
909 """
910 return None
911
912 @overload
913 def request(
914 self,
915 cast_to: Type[ResponseT],
916 options: FinalRequestOptions,
917 *,
918 stream: Literal[True],
919 stream_cls: Type[_StreamT],
920 ) -> _StreamT: ...
921
922 @overload
923 def request(
924 self,
925 cast_to: Type[ResponseT],
926 options: FinalRequestOptions,
927 *,
928 stream: Literal[False] = False,
929 ) -> ResponseT: ...
930
931 @overload
932 def request(
933 self,
934 cast_to: Type[ResponseT],
935 options: FinalRequestOptions,
936 *,
937 stream: bool = False,
938 stream_cls: Type[_StreamT] | None = None,
939 ) -> ResponseT | _StreamT: ...
940
941 def request(
942 self,
943 cast_to: Type[ResponseT],
944 options: FinalRequestOptions,
945 *,
946 stream: bool = False,
947 stream_cls: type[_StreamT] | None = None,
948 ) -> ResponseT | _StreamT:
949 cast_to = self._maybe_override_cast_to(cast_to, options)
950
951 # create a copy of the options we were given so that if the
952 # options are mutated later & we then retry, the retries are
953 # given the original options
954 input_options = model_copy(options)
955 if input_options.idempotency_key is None and input_options.method.lower() != "get":
956 # ensure the idempotency key is reused between requests
957 input_options.idempotency_key = self._idempotency_key()
958
959 response: httpx.Response | None = None
960 max_retries = input_options.get_max_retries(self.max_retries)
961
962 retries_taken = 0
963 for retries_taken in range(max_retries + 1):
964 options = model_copy(input_options)
965 options = self._prepare_options(options)
966
967 remaining_retries = max_retries - retries_taken
968 request = self._build_request(options, retries_taken=retries_taken)
969 self._prepare_request(request)
970
971 kwargs: HttpxSendArgs = {}
972 if self.custom_auth is not None:
973 kwargs["auth"] = self.custom_auth
974
975 if options.follow_redirects is not None:
976 kwargs["follow_redirects"] = options.follow_redirects
977
978 log.debug("Sending HTTP Request: %s %s", request.method, request.url)
979
980 response = None
981 try:
982 response = self._client.send(
983 request,
984 stream=stream or self._should_stream_response_body(request=request),
985 **kwargs,
986 )
987 except httpx.TimeoutException as err:
988 log.debug("Encountered httpx.TimeoutException", exc_info=True)
989
990 if remaining_retries > 0:
991 self._sleep_for_retry(
992 retries_taken=retries_taken,
993 max_retries=max_retries,
994 options=input_options,
995 response=None,
996 )
997 continue
998
999 log.debug("Raising timeout error")
1000 raise APITimeoutError(request=request) from err
1001 except Exception as err:
1002 log.debug("Encountered Exception", exc_info=True)
1003
1004 if remaining_retries > 0:
1005 self._sleep_for_retry(
1006 retries_taken=retries_taken,
1007 max_retries=max_retries,
1008 options=input_options,
1009 response=None,
1010 )
1011 continue
1012
1013 log.debug("Raising connection error")
1014 raise APIConnectionError(request=request) from err
1015
1016 log.debug(
1017 'HTTP Response: %s %s "%i %s" %s',
1018 request.method,
1019 request.url,
1020 response.status_code,
1021 response.reason_phrase,
1022 response.headers,
1023 )
1024 log.debug("request_id: %s", response.headers.get("x-request-id"))
1025
1026 try:
1027 response.raise_for_status()
1028 except httpx.HTTPStatusError as err: # thrown on 4xx and 5xx status code
1029 log.debug("Encountered httpx.HTTPStatusError", exc_info=True)
1030
1031 if remaining_retries > 0 and self._should_retry(err.response):
1032 err.response.close()
1033 self._sleep_for_retry(
1034 retries_taken=retries_taken,
1035 max_retries=max_retries,
1036 options=input_options,
1037 response=response,
1038 )
1039 continue
1040
1041 # If the response is streamed then we need to explicitly read the response
1042 # to completion before attempting to access the response text.
1043 if not err.response.is_closed:
1044 err.response.read()
1045
1046 log.debug("Re-raising status error")
1047 raise self._make_status_error_from_response(err.response) from None
1048
1049 break
1050
1051 assert response is not None, "could not resolve response (should never happen)"
1052 return self._process_response(
1053 cast_to=cast_to,
1054 options=options,
1055 response=response,
1056 stream=stream,
1057 stream_cls=stream_cls,
1058 retries_taken=retries_taken,
1059 )
1060
1061 def _sleep_for_retry(
1062 self, *, retries_taken: int, max_retries: int, options: FinalRequestOptions, response: httpx.Response | None
1063 ) -> None:
1064 remaining_retries = max_retries - retries_taken
1065 if remaining_retries == 1:
1066 log.debug("1 retry left")
1067 else:
1068 log.debug("%i retries left", remaining_retries)
1069
1070 timeout = self._calculate_retry_timeout(remaining_retries, options, response.headers if response else None)
1071 log.info("Retrying request to %s in %f seconds", options.url, timeout)
1072
1073 time.sleep(timeout)
1074
1075 def _process_response(
1076 self,
1077 *,
1078 cast_to: Type[ResponseT],
1079 options: FinalRequestOptions,
1080 response: httpx.Response,
1081 stream: bool,
1082 stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None,
1083 retries_taken: int = 0,
1084 ) -> ResponseT:
1085 if response.request.headers.get(RAW_RESPONSE_HEADER) == "true":
1086 return cast(
1087 ResponseT,
1088 LegacyAPIResponse(
1089 raw=response,
1090 client=self,
1091 cast_to=cast_to,
1092 stream=stream,
1093 stream_cls=stream_cls,
1094 options=options,
1095 retries_taken=retries_taken,
1096 ),
1097 )
1098
1099 origin = get_origin(cast_to) or cast_to
1100
1101 if (
1102 inspect.isclass(origin)
1103 and issubclass(origin, BaseAPIResponse)
1104 # we only want to actually return the custom BaseAPIResponse class if we're
1105 # returning the raw response, or if we're not streaming SSE, as if we're streaming
1106 # SSE then `cast_to` doesn't actively reflect the type we need to parse into
1107 and (not stream or bool(response.request.headers.get(RAW_RESPONSE_HEADER)))
1108 ):
1109 if not issubclass(origin, APIResponse):
1110 raise TypeError(f"API Response types must subclass {APIResponse}; Received {origin}")
1111
1112 response_cls = cast("type[BaseAPIResponse[Any]]", cast_to)
1113 return cast(
1114 ResponseT,
1115 response_cls(
1116 raw=response,
1117 client=self,
1118 cast_to=extract_response_type(response_cls),
1119 stream=stream,
1120 stream_cls=stream_cls,
1121 options=options,
1122 retries_taken=retries_taken,
1123 ),
1124 )
1125
1126 if cast_to == httpx.Response:
1127 return cast(ResponseT, response)
1128
1129 api_response = APIResponse(
1130 raw=response,
1131 client=self,
1132 cast_to=cast("type[ResponseT]", cast_to), # pyright: ignore[reportUnnecessaryCast]
1133 stream=stream,
1134 stream_cls=stream_cls,
1135 options=options,
1136 retries_taken=retries_taken,
1137 )
1138 if bool(response.request.headers.get(RAW_RESPONSE_HEADER)):
1139 return cast(ResponseT, api_response)
1140
1141 return api_response.parse()
1142
1143 def _request_api_list(
1144 self,
1145 model: Type[object],
1146 page: Type[SyncPageT],
1147 options: FinalRequestOptions,
1148 ) -> SyncPageT:
1149 def _parser(resp: SyncPageT) -> SyncPageT:
1150 resp._set_private_attributes(
1151 client=self,
1152 model=model,
1153 options=options,
1154 )
1155 return resp
1156
1157 options.post_parser = _parser
1158
1159 return self.request(page, options, stream=False)
1160
1161 @overload
1162 def get(
1163 self,
1164 path: str,
1165 *,
1166 cast_to: Type[ResponseT],
1167 options: RequestOptions = {},
1168 stream: Literal[False] = False,
1169 ) -> ResponseT: ...
1170
1171 @overload
1172 def get(
1173 self,
1174 path: str,
1175 *,
1176 cast_to: Type[ResponseT],
1177 options: RequestOptions = {},
1178 stream: Literal[True],
1179 stream_cls: type[_StreamT],
1180 ) -> _StreamT: ...
1181
1182 @overload
1183 def get(
1184 self,
1185 path: str,
1186 *,
1187 cast_to: Type[ResponseT],
1188 options: RequestOptions = {},
1189 stream: bool,
1190 stream_cls: type[_StreamT] | None = None,
1191 ) -> ResponseT | _StreamT: ...
1192
1193 def get(
1194 self,
1195 path: str,
1196 *,
1197 cast_to: Type[ResponseT],
1198 options: RequestOptions = {},
1199 stream: bool = False,
1200 stream_cls: type[_StreamT] | None = None,
1201 ) -> ResponseT | _StreamT:
1202 opts = FinalRequestOptions.construct(method="get", url=path, **options)
1203 # cast is required because mypy complains about returning Any even though
1204 # it understands the type variables
1205 return cast(ResponseT, self.request(cast_to, opts, stream=stream, stream_cls=stream_cls))
1206
1207 @overload
1208 def post(
1209 self,
1210 path: str,
1211 *,
1212 cast_to: Type[ResponseT],
1213 body: Body | None = None,
1214 options: RequestOptions = {},
1215 files: RequestFiles | None = None,
1216 stream: Literal[False] = False,
1217 ) -> ResponseT: ...
1218
1219 @overload
1220 def post(
1221 self,
1222 path: str,
1223 *,
1224 cast_to: Type[ResponseT],
1225 body: Body | None = None,
1226 options: RequestOptions = {},
1227 files: RequestFiles | None = None,
1228 stream: Literal[True],
1229 stream_cls: type[_StreamT],
1230 ) -> _StreamT: ...
1231
1232 @overload
1233 def post(
1234 self,
1235 path: str,
1236 *,
1237 cast_to: Type[ResponseT],
1238 body: Body | None = None,
1239 options: RequestOptions = {},
1240 files: RequestFiles | None = None,
1241 stream: bool,
1242 stream_cls: type[_StreamT] | None = None,
1243 ) -> ResponseT | _StreamT: ...
1244
1245 def post(
1246 self,
1247 path: str,
1248 *,
1249 cast_to: Type[ResponseT],
1250 body: Body | None = None,
1251 options: RequestOptions = {},
1252 files: RequestFiles | None = None,
1253 stream: bool = False,
1254 stream_cls: type[_StreamT] | None = None,
1255 ) -> ResponseT | _StreamT:
1256 opts = FinalRequestOptions.construct(
1257 method="post", url=path, json_data=body, files=to_httpx_files(files), **options
1258 )
1259 return cast(ResponseT, self.request(cast_to, opts, stream=stream, stream_cls=stream_cls))
1260
1261 def patch(
1262 self,
1263 path: str,
1264 *,
1265 cast_to: Type[ResponseT],
1266 body: Body | None = None,
1267 options: RequestOptions = {},
1268 ) -> ResponseT:
1269 opts = FinalRequestOptions.construct(method="patch", url=path, json_data=body, **options)
1270 return self.request(cast_to, opts)
1271
1272 def put(
1273 self,
1274 path: str,
1275 *,
1276 cast_to: Type[ResponseT],
1277 body: Body | None = None,
1278 files: RequestFiles | None = None,
1279 options: RequestOptions = {},
1280 ) -> ResponseT:
1281 opts = FinalRequestOptions.construct(
1282 method="put", url=path, json_data=body, files=to_httpx_files(files), **options
1283 )
1284 return self.request(cast_to, opts)
1285
1286 def delete(
1287 self,
1288 path: str,
1289 *,
1290 cast_to: Type[ResponseT],
1291 body: Body | None = None,
1292 options: RequestOptions = {},
1293 ) -> ResponseT:
1294 opts = FinalRequestOptions.construct(method="delete", url=path, json_data=body, **options)
1295 return self.request(cast_to, opts)
1296
1297 def get_api_list(
1298 self,
1299 path: str,
1300 *,
1301 model: Type[object],
1302 page: Type[SyncPageT],
1303 body: Body | None = None,
1304 options: RequestOptions = {},
1305 method: str = "get",
1306 ) -> SyncPageT:
1307 opts = FinalRequestOptions.construct(method=method, url=path, json_data=body, **options)
1308 return self._request_api_list(model, page, opts)
1309
1310
1311class _DefaultAsyncHttpxClient(httpx.AsyncClient):
1312 def __init__(self, **kwargs: Any) -> None:
1313 kwargs.setdefault("timeout", DEFAULT_TIMEOUT)
1314 kwargs.setdefault("limits", DEFAULT_CONNECTION_LIMITS)
1315 kwargs.setdefault("follow_redirects", True)
1316 super().__init__(**kwargs)
1317
1318
1319try:
1320 import httpx_aiohttp
1321except ImportError:
1322
1323 class _DefaultAioHttpClient(httpx.AsyncClient):
1324 def __init__(self, **_kwargs: Any) -> None:
1325 raise RuntimeError("To use the aiohttp client you must have installed the package with the `aiohttp` extra")
1326else:
1327
1328 class _DefaultAioHttpClient(httpx_aiohttp.HttpxAiohttpClient): # type: ignore
1329 def __init__(self, **kwargs: Any) -> None:
1330 kwargs.setdefault("timeout", DEFAULT_TIMEOUT)
1331 kwargs.setdefault("limits", DEFAULT_CONNECTION_LIMITS)
1332 kwargs.setdefault("follow_redirects", True)
1333
1334 super().__init__(**kwargs)
1335
1336
1337if TYPE_CHECKING:
1338 DefaultAsyncHttpxClient = httpx.AsyncClient
1339 """An alias to `httpx.AsyncClient` that provides the same defaults that this SDK
1340 uses internally.
1341
1342 This is useful because overriding the `http_client` with your own instance of
1343 `httpx.AsyncClient` will result in httpx's defaults being used, not ours.
1344 """
1345
1346 DefaultAioHttpClient = httpx.AsyncClient
1347 """An alias to `httpx.AsyncClient` that changes the default HTTP transport to `aiohttp`."""
1348else:
1349 DefaultAsyncHttpxClient = _DefaultAsyncHttpxClient
1350 DefaultAioHttpClient = _DefaultAioHttpClient
1351
1352
1353class AsyncHttpxClientWrapper(DefaultAsyncHttpxClient):
1354 def __del__(self) -> None:
1355 if self.is_closed:
1356 return
1357
1358 try:
1359 # TODO(someday): support non asyncio runtimes here
1360 asyncio.get_running_loop().create_task(self.aclose())
1361 except Exception:
1362 pass
1363
1364
1365class AsyncAPIClient(BaseClient[httpx.AsyncClient, AsyncStream[Any]]):
1366 _client: httpx.AsyncClient
1367 _default_stream_cls: type[AsyncStream[Any]] | None = None
1368
1369 def __init__(
1370 self,
1371 *,
1372 version: str,
1373 base_url: str | URL,
1374 _strict_response_validation: bool,
1375 max_retries: int = DEFAULT_MAX_RETRIES,
1376 timeout: float | Timeout | None | NotGiven = not_given,
1377 http_client: httpx.AsyncClient | None = None,
1378 custom_headers: Mapping[str, str] | None = None,
1379 custom_query: Mapping[str, object] | None = None,
1380 ) -> None:
1381 if not is_given(timeout):
1382 # if the user passed in a custom http client with a non-default
1383 # timeout set then we use that timeout.
1384 #
1385 # note: there is an edge case here where the user passes in a client
1386 # where they've explicitly set the timeout to match the default timeout
1387 # as this check is structural, meaning that we'll think they didn't
1388 # pass in a timeout and will ignore it
1389 if http_client and http_client.timeout != HTTPX_DEFAULT_TIMEOUT:
1390 timeout = http_client.timeout
1391 else:
1392 timeout = DEFAULT_TIMEOUT
1393
1394 if http_client is not None and not isinstance(http_client, httpx.AsyncClient): # pyright: ignore[reportUnnecessaryIsInstance]
1395 raise TypeError(
1396 f"Invalid `http_client` argument; Expected an instance of `httpx.AsyncClient` but got {type(http_client)}"
1397 )
1398
1399 super().__init__(
1400 version=version,
1401 base_url=base_url,
1402 # cast to a valid type because mypy doesn't understand our type narrowing
1403 timeout=cast(Timeout, timeout),
1404 max_retries=max_retries,
1405 custom_query=custom_query,
1406 custom_headers=custom_headers,
1407 _strict_response_validation=_strict_response_validation,
1408 )
1409 self._client = http_client or AsyncHttpxClientWrapper(
1410 base_url=base_url,
1411 # cast to a valid type because mypy doesn't understand our type narrowing
1412 timeout=cast(Timeout, timeout),
1413 )
1414
1415 def is_closed(self) -> bool:
1416 return self._client.is_closed
1417
1418 async def close(self) -> None:
1419 """Close the underlying HTTPX client.
1420
1421 The client will *not* be usable after this.
1422 """
1423 await self._client.aclose()
1424
1425 async def __aenter__(self: _T) -> _T:
1426 return self
1427
1428 async def __aexit__(
1429 self,
1430 exc_type: type[BaseException] | None,
1431 exc: BaseException | None,
1432 exc_tb: TracebackType | None,
1433 ) -> None:
1434 await self.close()
1435
1436 async def _prepare_options(
1437 self,
1438 options: FinalRequestOptions, # noqa: ARG002
1439 ) -> FinalRequestOptions:
1440 """Hook for mutating the given options"""
1441 return options
1442
1443 async def _prepare_request(
1444 self,
1445 request: httpx.Request, # noqa: ARG002
1446 ) -> None:
1447 """This method is used as a callback for mutating the `Request` object
1448 after it has been constructed.
1449 This is useful for cases where you want to add certain headers based off of
1450 the request properties, e.g. `url`, `method` etc.
1451 """
1452 return None
1453
1454 @overload
1455 async def request(
1456 self,
1457 cast_to: Type[ResponseT],
1458 options: FinalRequestOptions,
1459 *,
1460 stream: Literal[False] = False,
1461 ) -> ResponseT: ...
1462
1463 @overload
1464 async def request(
1465 self,
1466 cast_to: Type[ResponseT],
1467 options: FinalRequestOptions,
1468 *,
1469 stream: Literal[True],
1470 stream_cls: type[_AsyncStreamT],
1471 ) -> _AsyncStreamT: ...
1472
1473 @overload
1474 async def request(
1475 self,
1476 cast_to: Type[ResponseT],
1477 options: FinalRequestOptions,
1478 *,
1479 stream: bool,
1480 stream_cls: type[_AsyncStreamT] | None = None,
1481 ) -> ResponseT | _AsyncStreamT: ...
1482
1483 async def request(
1484 self,
1485 cast_to: Type[ResponseT],
1486 options: FinalRequestOptions,
1487 *,
1488 stream: bool = False,
1489 stream_cls: type[_AsyncStreamT] | None = None,
1490 ) -> ResponseT | _AsyncStreamT:
1491 if self._platform is None:
1492 # `get_platform` can make blocking IO calls so we
1493 # execute it earlier while we are in an async context
1494 self._platform = await asyncify(get_platform)()
1495
1496 cast_to = self._maybe_override_cast_to(cast_to, options)
1497
1498 # create a copy of the options we were given so that if the
1499 # options are mutated later & we then retry, the retries are
1500 # given the original options
1501 input_options = model_copy(options)
1502 if input_options.idempotency_key is None and input_options.method.lower() != "get":
1503 # ensure the idempotency key is reused between requests
1504 input_options.idempotency_key = self._idempotency_key()
1505
1506 response: httpx.Response | None = None
1507 max_retries = input_options.get_max_retries(self.max_retries)
1508
1509 retries_taken = 0
1510 for retries_taken in range(max_retries + 1):
1511 options = model_copy(input_options)
1512 options = await self._prepare_options(options)
1513
1514 remaining_retries = max_retries - retries_taken
1515 request = self._build_request(options, retries_taken=retries_taken)
1516 await self._prepare_request(request)
1517
1518 kwargs: HttpxSendArgs = {}
1519 if self.custom_auth is not None:
1520 kwargs["auth"] = self.custom_auth
1521
1522 if options.follow_redirects is not None:
1523 kwargs["follow_redirects"] = options.follow_redirects
1524
1525 log.debug("Sending HTTP Request: %s %s", request.method, request.url)
1526
1527 response = None
1528 try:
1529 response = await self._client.send(
1530 request,
1531 stream=stream or self._should_stream_response_body(request=request),
1532 **kwargs,
1533 )
1534 except httpx.TimeoutException as err:
1535 log.debug("Encountered httpx.TimeoutException", exc_info=True)
1536
1537 if remaining_retries > 0:
1538 await self._sleep_for_retry(
1539 retries_taken=retries_taken,
1540 max_retries=max_retries,
1541 options=input_options,
1542 response=None,
1543 )
1544 continue
1545
1546 log.debug("Raising timeout error")
1547 raise APITimeoutError(request=request) from err
1548 except Exception as err:
1549 log.debug("Encountered Exception", exc_info=True)
1550
1551 if remaining_retries > 0:
1552 await self._sleep_for_retry(
1553 retries_taken=retries_taken,
1554 max_retries=max_retries,
1555 options=input_options,
1556 response=None,
1557 )
1558 continue
1559
1560 log.debug("Raising connection error")
1561 raise APIConnectionError(request=request) from err
1562
1563 log.debug(
1564 'HTTP Response: %s %s "%i %s" %s',
1565 request.method,
1566 request.url,
1567 response.status_code,
1568 response.reason_phrase,
1569 response.headers,
1570 )
1571 log.debug("request_id: %s", response.headers.get("x-request-id"))
1572
1573 try:
1574 response.raise_for_status()
1575 except httpx.HTTPStatusError as err: # thrown on 4xx and 5xx status code
1576 log.debug("Encountered httpx.HTTPStatusError", exc_info=True)
1577
1578 if remaining_retries > 0 and self._should_retry(err.response):
1579 await err.response.aclose()
1580 await self._sleep_for_retry(
1581 retries_taken=retries_taken,
1582 max_retries=max_retries,
1583 options=input_options,
1584 response=response,
1585 )
1586 continue
1587
1588 # If the response is streamed then we need to explicitly read the response
1589 # to completion before attempting to access the response text.
1590 if not err.response.is_closed:
1591 await err.response.aread()
1592
1593 log.debug("Re-raising status error")
1594 raise self._make_status_error_from_response(err.response) from None
1595
1596 break
1597
1598 assert response is not None, "could not resolve response (should never happen)"
1599 return await self._process_response(
1600 cast_to=cast_to,
1601 options=options,
1602 response=response,
1603 stream=stream,
1604 stream_cls=stream_cls,
1605 retries_taken=retries_taken,
1606 )
1607
1608 async def _sleep_for_retry(
1609 self, *, retries_taken: int, max_retries: int, options: FinalRequestOptions, response: httpx.Response | None
1610 ) -> None:
1611 remaining_retries = max_retries - retries_taken
1612 if remaining_retries == 1:
1613 log.debug("1 retry left")
1614 else:
1615 log.debug("%i retries left", remaining_retries)
1616
1617 timeout = self._calculate_retry_timeout(remaining_retries, options, response.headers if response else None)
1618 log.info("Retrying request to %s in %f seconds", options.url, timeout)
1619
1620 await anyio.sleep(timeout)
1621
1622 async def _process_response(
1623 self,
1624 *,
1625 cast_to: Type[ResponseT],
1626 options: FinalRequestOptions,
1627 response: httpx.Response,
1628 stream: bool,
1629 stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None,
1630 retries_taken: int = 0,
1631 ) -> ResponseT:
1632 if response.request.headers.get(RAW_RESPONSE_HEADER) == "true":
1633 return cast(
1634 ResponseT,
1635 LegacyAPIResponse(
1636 raw=response,
1637 client=self,
1638 cast_to=cast_to,
1639 stream=stream,
1640 stream_cls=stream_cls,
1641 options=options,
1642 retries_taken=retries_taken,
1643 ),
1644 )
1645
1646 origin = get_origin(cast_to) or cast_to
1647
1648 if (
1649 inspect.isclass(origin)
1650 and issubclass(origin, BaseAPIResponse)
1651 # we only want to actually return the custom BaseAPIResponse class if we're
1652 # returning the raw response, or if we're not streaming SSE, as if we're streaming
1653 # SSE then `cast_to` doesn't actively reflect the type we need to parse into
1654 and (not stream or bool(response.request.headers.get(RAW_RESPONSE_HEADER)))
1655 ):
1656 if not issubclass(origin, AsyncAPIResponse):
1657 raise TypeError(f"API Response types must subclass {AsyncAPIResponse}; Received {origin}")
1658
1659 response_cls = cast("type[BaseAPIResponse[Any]]", cast_to)
1660 return cast(
1661 "ResponseT",
1662 response_cls(
1663 raw=response,
1664 client=self,
1665 cast_to=extract_response_type(response_cls),
1666 stream=stream,
1667 stream_cls=stream_cls,
1668 options=options,
1669 retries_taken=retries_taken,
1670 ),
1671 )
1672
1673 if cast_to == httpx.Response:
1674 return cast(ResponseT, response)
1675
1676 api_response = AsyncAPIResponse(
1677 raw=response,
1678 client=self,
1679 cast_to=cast("type[ResponseT]", cast_to), # pyright: ignore[reportUnnecessaryCast]
1680 stream=stream,
1681 stream_cls=stream_cls,
1682 options=options,
1683 retries_taken=retries_taken,
1684 )
1685 if bool(response.request.headers.get(RAW_RESPONSE_HEADER)):
1686 return cast(ResponseT, api_response)
1687
1688 return await api_response.parse()
1689
1690 def _request_api_list(
1691 self,
1692 model: Type[_T],
1693 page: Type[AsyncPageT],
1694 options: FinalRequestOptions,
1695 ) -> AsyncPaginator[_T, AsyncPageT]:
1696 return AsyncPaginator(client=self, options=options, page_cls=page, model=model)
1697
1698 @overload
1699 async def get(
1700 self,
1701 path: str,
1702 *,
1703 cast_to: Type[ResponseT],
1704 options: RequestOptions = {},
1705 stream: Literal[False] = False,
1706 ) -> ResponseT: ...
1707
1708 @overload
1709 async def get(
1710 self,
1711 path: str,
1712 *,
1713 cast_to: Type[ResponseT],
1714 options: RequestOptions = {},
1715 stream: Literal[True],
1716 stream_cls: type[_AsyncStreamT],
1717 ) -> _AsyncStreamT: ...
1718
1719 @overload
1720 async def get(
1721 self,
1722 path: str,
1723 *,
1724 cast_to: Type[ResponseT],
1725 options: RequestOptions = {},
1726 stream: bool,
1727 stream_cls: type[_AsyncStreamT] | None = None,
1728 ) -> ResponseT | _AsyncStreamT: ...
1729
1730 async def get(
1731 self,
1732 path: str,
1733 *,
1734 cast_to: Type[ResponseT],
1735 options: RequestOptions = {},
1736 stream: bool = False,
1737 stream_cls: type[_AsyncStreamT] | None = None,
1738 ) -> ResponseT | _AsyncStreamT:
1739 opts = FinalRequestOptions.construct(method="get", url=path, **options)
1740 return await self.request(cast_to, opts, stream=stream, stream_cls=stream_cls)
1741
1742 @overload
1743 async def post(
1744 self,
1745 path: str,
1746 *,
1747 cast_to: Type[ResponseT],
1748 body: Body | None = None,
1749 files: RequestFiles | None = None,
1750 options: RequestOptions = {},
1751 stream: Literal[False] = False,
1752 ) -> ResponseT: ...
1753
1754 @overload
1755 async def post(
1756 self,
1757 path: str,
1758 *,
1759 cast_to: Type[ResponseT],
1760 body: Body | None = None,
1761 files: RequestFiles | None = None,
1762 options: RequestOptions = {},
1763 stream: Literal[True],
1764 stream_cls: type[_AsyncStreamT],
1765 ) -> _AsyncStreamT: ...
1766
1767 @overload
1768 async def post(
1769 self,
1770 path: str,
1771 *,
1772 cast_to: Type[ResponseT],
1773 body: Body | None = None,
1774 files: RequestFiles | None = None,
1775 options: RequestOptions = {},
1776 stream: bool,
1777 stream_cls: type[_AsyncStreamT] | None = None,
1778 ) -> ResponseT | _AsyncStreamT: ...
1779
1780 async def post(
1781 self,
1782 path: str,
1783 *,
1784 cast_to: Type[ResponseT],
1785 body: Body | None = None,
1786 files: RequestFiles | None = None,
1787 options: RequestOptions = {},
1788 stream: bool = False,
1789 stream_cls: type[_AsyncStreamT] | None = None,
1790 ) -> ResponseT | _AsyncStreamT:
1791 opts = FinalRequestOptions.construct(
1792 method="post", url=path, json_data=body, files=await async_to_httpx_files(files), **options
1793 )
1794 return await self.request(cast_to, opts, stream=stream, stream_cls=stream_cls)
1795
1796 async def patch(
1797 self,
1798 path: str,
1799 *,
1800 cast_to: Type[ResponseT],
1801 body: Body | None = None,
1802 options: RequestOptions = {},
1803 ) -> ResponseT:
1804 opts = FinalRequestOptions.construct(method="patch", url=path, json_data=body, **options)
1805 return await self.request(cast_to, opts)
1806
1807 async def put(
1808 self,
1809 path: str,
1810 *,
1811 cast_to: Type[ResponseT],
1812 body: Body | None = None,
1813 files: RequestFiles | None = None,
1814 options: RequestOptions = {},
1815 ) -> ResponseT:
1816 opts = FinalRequestOptions.construct(
1817 method="put", url=path, json_data=body, files=await async_to_httpx_files(files), **options
1818 )
1819 return await self.request(cast_to, opts)
1820
1821 async def delete(
1822 self,
1823 path: str,
1824 *,
1825 cast_to: Type[ResponseT],
1826 body: Body | None = None,
1827 options: RequestOptions = {},
1828 ) -> ResponseT:
1829 opts = FinalRequestOptions.construct(method="delete", url=path, json_data=body, **options)
1830 return await self.request(cast_to, opts)
1831
1832 def get_api_list(
1833 self,
1834 path: str,
1835 *,
1836 model: Type[_T],
1837 page: Type[AsyncPageT],
1838 body: Body | None = None,
1839 options: RequestOptions = {},
1840 method: str = "get",
1841 ) -> AsyncPaginator[_T, AsyncPageT]:
1842 opts = FinalRequestOptions.construct(method=method, url=path, json_data=body, **options)
1843 return self._request_api_list(model, page, opts)
1844
1845
1846def make_request_options(
1847 *,
1848 query: Query | None = None,
1849 extra_headers: Headers | None = None,
1850 extra_query: Query | None = None,
1851 extra_body: Body | None = None,
1852 idempotency_key: str | None = None,
1853 timeout: float | httpx.Timeout | None | NotGiven = not_given,
1854 post_parser: PostParser | NotGiven = not_given,
1855) -> RequestOptions:
1856 """Create a dict of type RequestOptions without keys of NotGiven values."""
1857 options: RequestOptions = {}
1858 if extra_headers is not None:
1859 options["headers"] = extra_headers
1860
1861 if extra_body is not None:
1862 options["extra_json"] = cast(AnyMapping, extra_body)
1863
1864 if query is not None:
1865 options["params"] = query
1866
1867 if extra_query is not None:
1868 options["params"] = {**options.get("params", {}), **extra_query}
1869
1870 if not isinstance(timeout, NotGiven):
1871 options["timeout"] = timeout
1872
1873 if idempotency_key is not None:
1874 options["idempotency_key"] = idempotency_key
1875
1876 if is_given(post_parser):
1877 # internal
1878 options["post_parser"] = post_parser # type: ignore
1879
1880 return options
1881
1882
1883class ForceMultipartDict(Dict[str, None]):
1884 def __bool__(self) -> bool:
1885 return True
1886
1887
1888class OtherPlatform:
1889 def __init__(self, name: str) -> None:
1890 self.name = name
1891
1892 @override
1893 def __str__(self) -> str:
1894 return f"Other:{self.name}"
1895
1896
1897Platform = Union[
1898 OtherPlatform,
1899 Literal[
1900 "MacOS",
1901 "Linux",
1902 "Windows",
1903 "FreeBSD",
1904 "OpenBSD",
1905 "iOS",
1906 "Android",
1907 "Unknown",
1908 ],
1909]
1910
1911
1912def get_platform() -> Platform:
1913 try:
1914 system = platform.system().lower()
1915 platform_name = platform.platform().lower()
1916 except Exception:
1917 return "Unknown"
1918
1919 if "iphone" in platform_name or "ipad" in platform_name:
1920 # Tested using Python3IDE on an iPhone 11 and Pythonista on an iPad 7
1921 # system is Darwin and platform_name is a string like:
1922 # - Darwin-21.6.0-iPhone12,1-64bit
1923 # - Darwin-21.6.0-iPad7,11-64bit
1924 return "iOS"
1925
1926 if system == "darwin":
1927 return "MacOS"
1928
1929 if system == "windows":
1930 return "Windows"
1931
1932 if "android" in platform_name:
1933 # Tested using Pydroid 3
1934 # system is Linux and platform_name is a string like 'Linux-5.10.81-android12-9-00001-geba40aecb3b7-ab8534902-aarch64-with-libc'
1935 return "Android"
1936
1937 if system == "linux":
1938 # https://distro.readthedocs.io/en/latest/#distro.id
1939 distro_id = distro.id()
1940 if distro_id == "freebsd":
1941 return "FreeBSD"
1942
1943 if distro_id == "openbsd":
1944 return "OpenBSD"
1945
1946 return "Linux"
1947
1948 if platform_name:
1949 return OtherPlatform(platform_name)
1950
1951 return "Unknown"
1952
1953
1954@lru_cache(maxsize=None)
1955def platform_headers(version: str, *, platform: Platform | None) -> Dict[str, str]:
1956 return {
1957 "X-Stainless-Lang": "python",
1958 "X-Stainless-Package-Version": version,
1959 "X-Stainless-OS": str(platform or get_platform()),
1960 "X-Stainless-Arch": str(get_architecture()),
1961 "X-Stainless-Runtime": get_python_runtime(),
1962 "X-Stainless-Runtime-Version": get_python_version(),
1963 }
1964
1965
1966class OtherArch:
1967 def __init__(self, name: str) -> None:
1968 self.name = name
1969
1970 @override
1971 def __str__(self) -> str:
1972 return f"other:{self.name}"
1973
1974
1975Arch = Union[OtherArch, Literal["x32", "x64", "arm", "arm64", "unknown"]]
1976
1977
1978def get_python_runtime() -> str:
1979 try:
1980 return platform.python_implementation()
1981 except Exception:
1982 return "unknown"
1983
1984
1985def get_python_version() -> str:
1986 try:
1987 return platform.python_version()
1988 except Exception:
1989 return "unknown"
1990
1991
1992def get_architecture() -> Arch:
1993 try:
1994 machine = platform.machine().lower()
1995 except Exception:
1996 return "unknown"
1997
1998 if machine in ("arm64", "aarch64"):
1999 return "arm64"
2000
2001 # TODO: untested
2002 if machine == "arm":
2003 return "arm"
2004
2005 if machine == "x86_64":
2006 return "x64"
2007
2008 # TODO: untested
2009 if sys.maxsize <= 2**32:
2010 return "x32"
2011
2012 if machine:
2013 return OtherArch(machine)
2014
2015 return "unknown"
2016
2017
2018def _merge_mappings(
2019 obj1: Mapping[_T_co, Union[_T, Omit]],
2020 obj2: Mapping[_T_co, Union[_T, Omit]],
2021) -> Dict[_T_co, _T]:
2022 """Merge two mappings of the same type, removing any values that are instances of `Omit`.
2023
2024 In cases with duplicate keys the second mapping takes precedence.
2025 """
2026 merged = {**obj1, **obj2}
2027 return {key: value for key, value in merged.items() if not isinstance(value, Omit)}