main
1from __future__ import annotations
2
3import os
4import inspect
5import logging
6import datetime
7import functools
8from types import TracebackType
9from typing import (
10 TYPE_CHECKING,
11 Any,
12 Union,
13 Generic,
14 TypeVar,
15 Callable,
16 Iterator,
17 AsyncIterator,
18 cast,
19 overload,
20)
21from typing_extensions import Awaitable, ParamSpec, override, get_origin
22
23import anyio
24import httpx
25import pydantic
26
27from ._types import NoneType
28from ._utils import is_given, extract_type_arg, is_annotated_type, is_type_alias_type, extract_type_var_from_base
29from ._models import BaseModel, is_basemodel, add_request_id
30from ._constants import RAW_RESPONSE_HEADER, OVERRIDE_CAST_TO_HEADER
31from ._streaming import Stream, AsyncStream, is_stream_class_type, extract_stream_chunk_type
32from ._exceptions import OpenAIError, APIResponseValidationError
33
34if TYPE_CHECKING:
35 from ._models import FinalRequestOptions
36 from ._base_client import BaseClient
37
38
39P = ParamSpec("P")
40R = TypeVar("R")
41_T = TypeVar("_T")
42_APIResponseT = TypeVar("_APIResponseT", bound="APIResponse[Any]")
43_AsyncAPIResponseT = TypeVar("_AsyncAPIResponseT", bound="AsyncAPIResponse[Any]")
44
45log: logging.Logger = logging.getLogger(__name__)
46
47
48class BaseAPIResponse(Generic[R]):
49 _cast_to: type[R]
50 _client: BaseClient[Any, Any]
51 _parsed_by_type: dict[type[Any], Any]
52 _is_sse_stream: bool
53 _stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None
54 _options: FinalRequestOptions
55
56 http_response: httpx.Response
57
58 retries_taken: int
59 """The number of retries made. If no retries happened this will be `0`"""
60
61 def __init__(
62 self,
63 *,
64 raw: httpx.Response,
65 cast_to: type[R],
66 client: BaseClient[Any, Any],
67 stream: bool,
68 stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None,
69 options: FinalRequestOptions,
70 retries_taken: int = 0,
71 ) -> None:
72 self._cast_to = cast_to
73 self._client = client
74 self._parsed_by_type = {}
75 self._is_sse_stream = stream
76 self._stream_cls = stream_cls
77 self._options = options
78 self.http_response = raw
79 self.retries_taken = retries_taken
80
81 @property
82 def headers(self) -> httpx.Headers:
83 return self.http_response.headers
84
85 @property
86 def http_request(self) -> httpx.Request:
87 """Returns the httpx Request instance associated with the current response."""
88 return self.http_response.request
89
90 @property
91 def status_code(self) -> int:
92 return self.http_response.status_code
93
94 @property
95 def url(self) -> httpx.URL:
96 """Returns the URL for which the request was made."""
97 return self.http_response.url
98
99 @property
100 def method(self) -> str:
101 return self.http_request.method
102
103 @property
104 def http_version(self) -> str:
105 return self.http_response.http_version
106
107 @property
108 def elapsed(self) -> datetime.timedelta:
109 """The time taken for the complete request/response cycle to complete."""
110 return self.http_response.elapsed
111
112 @property
113 def is_closed(self) -> bool:
114 """Whether or not the response body has been closed.
115
116 If this is False then there is response data that has not been read yet.
117 You must either fully consume the response body or call `.close()`
118 before discarding the response to prevent resource leaks.
119 """
120 return self.http_response.is_closed
121
122 @override
123 def __repr__(self) -> str:
124 return (
125 f"<{self.__class__.__name__} [{self.status_code} {self.http_response.reason_phrase}] type={self._cast_to}>"
126 )
127
128 def _parse(self, *, to: type[_T] | None = None) -> R | _T:
129 cast_to = to if to is not None else self._cast_to
130
131 # unwrap `TypeAlias('Name', T)` -> `T`
132 if is_type_alias_type(cast_to):
133 cast_to = cast_to.__value__ # type: ignore[unreachable]
134
135 # unwrap `Annotated[T, ...]` -> `T`
136 if cast_to and is_annotated_type(cast_to):
137 cast_to = extract_type_arg(cast_to, 0)
138
139 origin = get_origin(cast_to) or cast_to
140
141 if self._is_sse_stream:
142 if to:
143 if not is_stream_class_type(to):
144 raise TypeError(f"Expected custom parse type to be a subclass of {Stream} or {AsyncStream}")
145
146 return cast(
147 _T,
148 to(
149 cast_to=extract_stream_chunk_type(
150 to,
151 failure_message="Expected custom stream type to be passed with a type argument, e.g. Stream[ChunkType]",
152 ),
153 response=self.http_response,
154 client=cast(Any, self._client),
155 ),
156 )
157
158 if self._stream_cls:
159 return cast(
160 R,
161 self._stream_cls(
162 cast_to=extract_stream_chunk_type(self._stream_cls),
163 response=self.http_response,
164 client=cast(Any, self._client),
165 ),
166 )
167
168 stream_cls = cast("type[Stream[Any]] | type[AsyncStream[Any]] | None", self._client._default_stream_cls)
169 if stream_cls is None:
170 raise MissingStreamClassError()
171
172 return cast(
173 R,
174 stream_cls(
175 cast_to=cast_to,
176 response=self.http_response,
177 client=cast(Any, self._client),
178 ),
179 )
180
181 if cast_to is NoneType:
182 return cast(R, None)
183
184 response = self.http_response
185 if cast_to == str:
186 return cast(R, response.text)
187
188 if cast_to == bytes:
189 return cast(R, response.content)
190
191 if cast_to == int:
192 return cast(R, int(response.text))
193
194 if cast_to == float:
195 return cast(R, float(response.text))
196
197 if cast_to == bool:
198 return cast(R, response.text.lower() == "true")
199
200 # handle the legacy binary response case
201 if inspect.isclass(cast_to) and cast_to.__name__ == "HttpxBinaryResponseContent":
202 return cast(R, cast_to(response)) # type: ignore
203
204 if origin == APIResponse:
205 raise RuntimeError("Unexpected state - cast_to is `APIResponse`")
206
207 if inspect.isclass(origin) and issubclass(origin, httpx.Response):
208 # Because of the invariance of our ResponseT TypeVar, users can subclass httpx.Response
209 # and pass that class to our request functions. We cannot change the variance to be either
210 # covariant or contravariant as that makes our usage of ResponseT illegal. We could construct
211 # the response class ourselves but that is something that should be supported directly in httpx
212 # as it would be easy to incorrectly construct the Response object due to the multitude of arguments.
213 if cast_to != httpx.Response:
214 raise ValueError(f"Subclasses of httpx.Response cannot be passed to `cast_to`")
215 return cast(R, response)
216
217 if (
218 inspect.isclass(
219 origin # pyright: ignore[reportUnknownArgumentType]
220 )
221 and not issubclass(origin, BaseModel)
222 and issubclass(origin, pydantic.BaseModel)
223 ):
224 raise TypeError("Pydantic models must subclass our base model type, e.g. `from openai import BaseModel`")
225
226 if (
227 cast_to is not object
228 and not origin is list
229 and not origin is dict
230 and not origin is Union
231 and not issubclass(origin, BaseModel)
232 ):
233 raise RuntimeError(
234 f"Unsupported type, expected {cast_to} to be a subclass of {BaseModel}, {dict}, {list}, {Union}, {NoneType}, {str} or {httpx.Response}."
235 )
236
237 # split is required to handle cases where additional information is included
238 # in the response, e.g. application/json; charset=utf-8
239 content_type, *_ = response.headers.get("content-type", "*").split(";")
240 if not content_type.endswith("json"):
241 if is_basemodel(cast_to):
242 try:
243 data = response.json()
244 except Exception as exc:
245 log.debug("Could not read JSON from response data due to %s - %s", type(exc), exc)
246 else:
247 return self._client._process_response_data(
248 data=data,
249 cast_to=cast_to, # type: ignore
250 response=response,
251 )
252
253 if self._client._strict_response_validation:
254 raise APIResponseValidationError(
255 response=response,
256 message=f"Expected Content-Type response header to be `application/json` but received `{content_type}` instead.",
257 body=response.text,
258 )
259
260 # If the API responds with content that isn't JSON then we just return
261 # the (decoded) text without performing any parsing so that you can still
262 # handle the response however you need to.
263 return response.text # type: ignore
264
265 data = response.json()
266
267 return self._client._process_response_data(
268 data=data,
269 cast_to=cast_to, # type: ignore
270 response=response,
271 )
272
273
274class APIResponse(BaseAPIResponse[R]):
275 @property
276 def request_id(self) -> str | None:
277 return self.http_response.headers.get("x-request-id") # type: ignore[no-any-return]
278
279 @overload
280 def parse(self, *, to: type[_T]) -> _T: ...
281
282 @overload
283 def parse(self) -> R: ...
284
285 def parse(self, *, to: type[_T] | None = None) -> R | _T:
286 """Returns the rich python representation of this response's data.
287
288 For lower-level control, see `.read()`, `.json()`, `.iter_bytes()`.
289
290 You can customise the type that the response is parsed into through
291 the `to` argument, e.g.
292
293 ```py
294 from openai import BaseModel
295
296
297 class MyModel(BaseModel):
298 foo: str
299
300
301 obj = response.parse(to=MyModel)
302 print(obj.foo)
303 ```
304
305 We support parsing:
306 - `BaseModel`
307 - `dict`
308 - `list`
309 - `Union`
310 - `str`
311 - `int`
312 - `float`
313 - `httpx.Response`
314 """
315 cache_key = to if to is not None else self._cast_to
316 cached = self._parsed_by_type.get(cache_key)
317 if cached is not None:
318 return cached # type: ignore[no-any-return]
319
320 if not self._is_sse_stream:
321 self.read()
322
323 parsed = self._parse(to=to)
324 if is_given(self._options.post_parser):
325 parsed = self._options.post_parser(parsed)
326
327 if isinstance(parsed, BaseModel):
328 add_request_id(parsed, self.request_id)
329
330 self._parsed_by_type[cache_key] = parsed
331 return cast(R, parsed)
332
333 def read(self) -> bytes:
334 """Read and return the binary response content."""
335 try:
336 return self.http_response.read()
337 except httpx.StreamConsumed as exc:
338 # The default error raised by httpx isn't very
339 # helpful in our case so we re-raise it with
340 # a different error message.
341 raise StreamAlreadyConsumed() from exc
342
343 def text(self) -> str:
344 """Read and decode the response content into a string."""
345 self.read()
346 return self.http_response.text
347
348 def json(self) -> object:
349 """Read and decode the JSON response content."""
350 self.read()
351 return self.http_response.json()
352
353 def close(self) -> None:
354 """Close the response and release the connection.
355
356 Automatically called if the response body is read to completion.
357 """
358 self.http_response.close()
359
360 def iter_bytes(self, chunk_size: int | None = None) -> Iterator[bytes]:
361 """
362 A byte-iterator over the decoded response content.
363
364 This automatically handles gzip, deflate and brotli encoded responses.
365 """
366 for chunk in self.http_response.iter_bytes(chunk_size):
367 yield chunk
368
369 def iter_text(self, chunk_size: int | None = None) -> Iterator[str]:
370 """A str-iterator over the decoded response content
371 that handles both gzip, deflate, etc but also detects the content's
372 string encoding.
373 """
374 for chunk in self.http_response.iter_text(chunk_size):
375 yield chunk
376
377 def iter_lines(self) -> Iterator[str]:
378 """Like `iter_text()` but will only yield chunks for each line"""
379 for chunk in self.http_response.iter_lines():
380 yield chunk
381
382
383class AsyncAPIResponse(BaseAPIResponse[R]):
384 @property
385 def request_id(self) -> str | None:
386 return self.http_response.headers.get("x-request-id") # type: ignore[no-any-return]
387
388 @overload
389 async def parse(self, *, to: type[_T]) -> _T: ...
390
391 @overload
392 async def parse(self) -> R: ...
393
394 async def parse(self, *, to: type[_T] | None = None) -> R | _T:
395 """Returns the rich python representation of this response's data.
396
397 For lower-level control, see `.read()`, `.json()`, `.iter_bytes()`.
398
399 You can customise the type that the response is parsed into through
400 the `to` argument, e.g.
401
402 ```py
403 from openai import BaseModel
404
405
406 class MyModel(BaseModel):
407 foo: str
408
409
410 obj = response.parse(to=MyModel)
411 print(obj.foo)
412 ```
413
414 We support parsing:
415 - `BaseModel`
416 - `dict`
417 - `list`
418 - `Union`
419 - `str`
420 - `httpx.Response`
421 """
422 cache_key = to if to is not None else self._cast_to
423 cached = self._parsed_by_type.get(cache_key)
424 if cached is not None:
425 return cached # type: ignore[no-any-return]
426
427 if not self._is_sse_stream:
428 await self.read()
429
430 parsed = self._parse(to=to)
431 if is_given(self._options.post_parser):
432 parsed = self._options.post_parser(parsed)
433
434 if isinstance(parsed, BaseModel):
435 add_request_id(parsed, self.request_id)
436
437 self._parsed_by_type[cache_key] = parsed
438 return cast(R, parsed)
439
440 async def read(self) -> bytes:
441 """Read and return the binary response content."""
442 try:
443 return await self.http_response.aread()
444 except httpx.StreamConsumed as exc:
445 # the default error raised by httpx isn't very
446 # helpful in our case so we re-raise it with
447 # a different error message
448 raise StreamAlreadyConsumed() from exc
449
450 async def text(self) -> str:
451 """Read and decode the response content into a string."""
452 await self.read()
453 return self.http_response.text
454
455 async def json(self) -> object:
456 """Read and decode the JSON response content."""
457 await self.read()
458 return self.http_response.json()
459
460 async def close(self) -> None:
461 """Close the response and release the connection.
462
463 Automatically called if the response body is read to completion.
464 """
465 await self.http_response.aclose()
466
467 async def iter_bytes(self, chunk_size: int | None = None) -> AsyncIterator[bytes]:
468 """
469 A byte-iterator over the decoded response content.
470
471 This automatically handles gzip, deflate and brotli encoded responses.
472 """
473 async for chunk in self.http_response.aiter_bytes(chunk_size):
474 yield chunk
475
476 async def iter_text(self, chunk_size: int | None = None) -> AsyncIterator[str]:
477 """A str-iterator over the decoded response content
478 that handles both gzip, deflate, etc but also detects the content's
479 string encoding.
480 """
481 async for chunk in self.http_response.aiter_text(chunk_size):
482 yield chunk
483
484 async def iter_lines(self) -> AsyncIterator[str]:
485 """Like `iter_text()` but will only yield chunks for each line"""
486 async for chunk in self.http_response.aiter_lines():
487 yield chunk
488
489
490class BinaryAPIResponse(APIResponse[bytes]):
491 """Subclass of APIResponse providing helpers for dealing with binary data.
492
493 Note: If you want to stream the response data instead of eagerly reading it
494 all at once then you should use `.with_streaming_response` when making
495 the API request, e.g. `.with_streaming_response.get_binary_response()`
496 """
497
498 def write_to_file(
499 self,
500 file: str | os.PathLike[str],
501 ) -> None:
502 """Write the output to the given file.
503
504 Accepts a filename or any path-like object, e.g. pathlib.Path
505
506 Note: if you want to stream the data to the file instead of writing
507 all at once then you should use `.with_streaming_response` when making
508 the API request, e.g. `.with_streaming_response.get_binary_response()`
509 """
510 with open(file, mode="wb") as f:
511 for data in self.iter_bytes():
512 f.write(data)
513
514
515class AsyncBinaryAPIResponse(AsyncAPIResponse[bytes]):
516 """Subclass of APIResponse providing helpers for dealing with binary data.
517
518 Note: If you want to stream the response data instead of eagerly reading it
519 all at once then you should use `.with_streaming_response` when making
520 the API request, e.g. `.with_streaming_response.get_binary_response()`
521 """
522
523 async def write_to_file(
524 self,
525 file: str | os.PathLike[str],
526 ) -> None:
527 """Write the output to the given file.
528
529 Accepts a filename or any path-like object, e.g. pathlib.Path
530
531 Note: if you want to stream the data to the file instead of writing
532 all at once then you should use `.with_streaming_response` when making
533 the API request, e.g. `.with_streaming_response.get_binary_response()`
534 """
535 path = anyio.Path(file)
536 async with await path.open(mode="wb") as f:
537 async for data in self.iter_bytes():
538 await f.write(data)
539
540
541class StreamedBinaryAPIResponse(APIResponse[bytes]):
542 def stream_to_file(
543 self,
544 file: str | os.PathLike[str],
545 *,
546 chunk_size: int | None = None,
547 ) -> None:
548 """Streams the output to the given file.
549
550 Accepts a filename or any path-like object, e.g. pathlib.Path
551 """
552 with open(file, mode="wb") as f:
553 for data in self.iter_bytes(chunk_size):
554 f.write(data)
555
556
557class AsyncStreamedBinaryAPIResponse(AsyncAPIResponse[bytes]):
558 async def stream_to_file(
559 self,
560 file: str | os.PathLike[str],
561 *,
562 chunk_size: int | None = None,
563 ) -> None:
564 """Streams the output to the given file.
565
566 Accepts a filename or any path-like object, e.g. pathlib.Path
567 """
568 path = anyio.Path(file)
569 async with await path.open(mode="wb") as f:
570 async for data in self.iter_bytes(chunk_size):
571 await f.write(data)
572
573
574class MissingStreamClassError(TypeError):
575 def __init__(self) -> None:
576 super().__init__(
577 "The `stream` argument was set to `True` but the `stream_cls` argument was not given. See `openai._streaming` for reference",
578 )
579
580
581class StreamAlreadyConsumed(OpenAIError):
582 """
583 Attempted to read or stream content, but the content has already
584 been streamed.
585
586 This can happen if you use a method like `.iter_lines()` and then attempt
587 to read th entire response body afterwards, e.g.
588
589 ```py
590 response = await client.post(...)
591 async for line in response.iter_lines():
592 ... # do something with `line`
593
594 content = await response.read()
595 # ^ error
596 ```
597
598 If you want this behaviour you'll need to either manually accumulate the response
599 content or call `await response.read()` before iterating over the stream.
600 """
601
602 def __init__(self) -> None:
603 message = (
604 "Attempted to read or stream some content, but the content has "
605 "already been streamed. "
606 "This could be due to attempting to stream the response "
607 "content more than once."
608 "\n\n"
609 "You can fix this by manually accumulating the response content while streaming "
610 "or by calling `.read()` before starting to stream."
611 )
612 super().__init__(message)
613
614
615class ResponseContextManager(Generic[_APIResponseT]):
616 """Context manager for ensuring that a request is not made
617 until it is entered and that the response will always be closed
618 when the context manager exits
619 """
620
621 def __init__(self, request_func: Callable[[], _APIResponseT]) -> None:
622 self._request_func = request_func
623 self.__response: _APIResponseT | None = None
624
625 def __enter__(self) -> _APIResponseT:
626 self.__response = self._request_func()
627 return self.__response
628
629 def __exit__(
630 self,
631 exc_type: type[BaseException] | None,
632 exc: BaseException | None,
633 exc_tb: TracebackType | None,
634 ) -> None:
635 if self.__response is not None:
636 self.__response.close()
637
638
639class AsyncResponseContextManager(Generic[_AsyncAPIResponseT]):
640 """Context manager for ensuring that a request is not made
641 until it is entered and that the response will always be closed
642 when the context manager exits
643 """
644
645 def __init__(self, api_request: Awaitable[_AsyncAPIResponseT]) -> None:
646 self._api_request = api_request
647 self.__response: _AsyncAPIResponseT | None = None
648
649 async def __aenter__(self) -> _AsyncAPIResponseT:
650 self.__response = await self._api_request
651 return self.__response
652
653 async def __aexit__(
654 self,
655 exc_type: type[BaseException] | None,
656 exc: BaseException | None,
657 exc_tb: TracebackType | None,
658 ) -> None:
659 if self.__response is not None:
660 await self.__response.close()
661
662
663def to_streamed_response_wrapper(func: Callable[P, R]) -> Callable[P, ResponseContextManager[APIResponse[R]]]:
664 """Higher order function that takes one of our bound API methods and wraps it
665 to support streaming and returning the raw `APIResponse` object directly.
666 """
667
668 @functools.wraps(func)
669 def wrapped(*args: P.args, **kwargs: P.kwargs) -> ResponseContextManager[APIResponse[R]]:
670 extra_headers: dict[str, str] = {**(cast(Any, kwargs.get("extra_headers")) or {})}
671 extra_headers[RAW_RESPONSE_HEADER] = "stream"
672
673 kwargs["extra_headers"] = extra_headers
674
675 make_request = functools.partial(func, *args, **kwargs)
676
677 return ResponseContextManager(cast(Callable[[], APIResponse[R]], make_request))
678
679 return wrapped
680
681
682def async_to_streamed_response_wrapper(
683 func: Callable[P, Awaitable[R]],
684) -> Callable[P, AsyncResponseContextManager[AsyncAPIResponse[R]]]:
685 """Higher order function that takes one of our bound API methods and wraps it
686 to support streaming and returning the raw `APIResponse` object directly.
687 """
688
689 @functools.wraps(func)
690 def wrapped(*args: P.args, **kwargs: P.kwargs) -> AsyncResponseContextManager[AsyncAPIResponse[R]]:
691 extra_headers: dict[str, str] = {**(cast(Any, kwargs.get("extra_headers")) or {})}
692 extra_headers[RAW_RESPONSE_HEADER] = "stream"
693
694 kwargs["extra_headers"] = extra_headers
695
696 make_request = func(*args, **kwargs)
697
698 return AsyncResponseContextManager(cast(Awaitable[AsyncAPIResponse[R]], make_request))
699
700 return wrapped
701
702
703def to_custom_streamed_response_wrapper(
704 func: Callable[P, object],
705 response_cls: type[_APIResponseT],
706) -> Callable[P, ResponseContextManager[_APIResponseT]]:
707 """Higher order function that takes one of our bound API methods and an `APIResponse` class
708 and wraps the method to support streaming and returning the given response class directly.
709
710 Note: the given `response_cls` *must* be concrete, e.g. `class BinaryAPIResponse(APIResponse[bytes])`
711 """
712
713 @functools.wraps(func)
714 def wrapped(*args: P.args, **kwargs: P.kwargs) -> ResponseContextManager[_APIResponseT]:
715 extra_headers: dict[str, Any] = {**(cast(Any, kwargs.get("extra_headers")) or {})}
716 extra_headers[RAW_RESPONSE_HEADER] = "stream"
717 extra_headers[OVERRIDE_CAST_TO_HEADER] = response_cls
718
719 kwargs["extra_headers"] = extra_headers
720
721 make_request = functools.partial(func, *args, **kwargs)
722
723 return ResponseContextManager(cast(Callable[[], _APIResponseT], make_request))
724
725 return wrapped
726
727
728def async_to_custom_streamed_response_wrapper(
729 func: Callable[P, Awaitable[object]],
730 response_cls: type[_AsyncAPIResponseT],
731) -> Callable[P, AsyncResponseContextManager[_AsyncAPIResponseT]]:
732 """Higher order function that takes one of our bound API methods and an `APIResponse` class
733 and wraps the method to support streaming and returning the given response class directly.
734
735 Note: the given `response_cls` *must* be concrete, e.g. `class BinaryAPIResponse(APIResponse[bytes])`
736 """
737
738 @functools.wraps(func)
739 def wrapped(*args: P.args, **kwargs: P.kwargs) -> AsyncResponseContextManager[_AsyncAPIResponseT]:
740 extra_headers: dict[str, Any] = {**(cast(Any, kwargs.get("extra_headers")) or {})}
741 extra_headers[RAW_RESPONSE_HEADER] = "stream"
742 extra_headers[OVERRIDE_CAST_TO_HEADER] = response_cls
743
744 kwargs["extra_headers"] = extra_headers
745
746 make_request = func(*args, **kwargs)
747
748 return AsyncResponseContextManager(cast(Awaitable[_AsyncAPIResponseT], make_request))
749
750 return wrapped
751
752
753def to_raw_response_wrapper(func: Callable[P, R]) -> Callable[P, APIResponse[R]]:
754 """Higher order function that takes one of our bound API methods and wraps it
755 to support returning the raw `APIResponse` object directly.
756 """
757
758 @functools.wraps(func)
759 def wrapped(*args: P.args, **kwargs: P.kwargs) -> APIResponse[R]:
760 extra_headers: dict[str, str] = {**(cast(Any, kwargs.get("extra_headers")) or {})}
761 extra_headers[RAW_RESPONSE_HEADER] = "raw"
762
763 kwargs["extra_headers"] = extra_headers
764
765 return cast(APIResponse[R], func(*args, **kwargs))
766
767 return wrapped
768
769
770def async_to_raw_response_wrapper(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[AsyncAPIResponse[R]]]:
771 """Higher order function that takes one of our bound API methods and wraps it
772 to support returning the raw `APIResponse` object directly.
773 """
774
775 @functools.wraps(func)
776 async def wrapped(*args: P.args, **kwargs: P.kwargs) -> AsyncAPIResponse[R]:
777 extra_headers: dict[str, str] = {**(cast(Any, kwargs.get("extra_headers")) or {})}
778 extra_headers[RAW_RESPONSE_HEADER] = "raw"
779
780 kwargs["extra_headers"] = extra_headers
781
782 return cast(AsyncAPIResponse[R], await func(*args, **kwargs))
783
784 return wrapped
785
786
787def to_custom_raw_response_wrapper(
788 func: Callable[P, object],
789 response_cls: type[_APIResponseT],
790) -> Callable[P, _APIResponseT]:
791 """Higher order function that takes one of our bound API methods and an `APIResponse` class
792 and wraps the method to support returning the given response class directly.
793
794 Note: the given `response_cls` *must* be concrete, e.g. `class BinaryAPIResponse(APIResponse[bytes])`
795 """
796
797 @functools.wraps(func)
798 def wrapped(*args: P.args, **kwargs: P.kwargs) -> _APIResponseT:
799 extra_headers: dict[str, Any] = {**(cast(Any, kwargs.get("extra_headers")) or {})}
800 extra_headers[RAW_RESPONSE_HEADER] = "raw"
801 extra_headers[OVERRIDE_CAST_TO_HEADER] = response_cls
802
803 kwargs["extra_headers"] = extra_headers
804
805 return cast(_APIResponseT, func(*args, **kwargs))
806
807 return wrapped
808
809
810def async_to_custom_raw_response_wrapper(
811 func: Callable[P, Awaitable[object]],
812 response_cls: type[_AsyncAPIResponseT],
813) -> Callable[P, Awaitable[_AsyncAPIResponseT]]:
814 """Higher order function that takes one of our bound API methods and an `APIResponse` class
815 and wraps the method to support returning the given response class directly.
816
817 Note: the given `response_cls` *must* be concrete, e.g. `class BinaryAPIResponse(APIResponse[bytes])`
818 """
819
820 @functools.wraps(func)
821 def wrapped(*args: P.args, **kwargs: P.kwargs) -> Awaitable[_AsyncAPIResponseT]:
822 extra_headers: dict[str, Any] = {**(cast(Any, kwargs.get("extra_headers")) or {})}
823 extra_headers[RAW_RESPONSE_HEADER] = "raw"
824 extra_headers[OVERRIDE_CAST_TO_HEADER] = response_cls
825
826 kwargs["extra_headers"] = extra_headers
827
828 return cast(Awaitable[_AsyncAPIResponseT], func(*args, **kwargs))
829
830 return wrapped
831
832
833def extract_response_type(typ: type[BaseAPIResponse[Any]]) -> type:
834 """Given a type like `APIResponse[T]`, returns the generic type variable `T`.
835
836 This also handles the case where a concrete subclass is given, e.g.
837 ```py
838 class MyResponse(APIResponse[bytes]):
839 ...
840
841 extract_response_type(MyResponse) -> bytes
842 ```
843 """
844 return extract_type_var_from_base(
845 typ,
846 generic_bases=cast("tuple[type, ...]", (BaseAPIResponse, APIResponse, AsyncAPIResponse)),
847 index=0,
848 )