main
  1from __future__ import annotations
  2
  3import os
  4import inspect
  5import logging
  6import datetime
  7import functools
  8from types import TracebackType
  9from typing import (
 10    TYPE_CHECKING,
 11    Any,
 12    Union,
 13    Generic,
 14    TypeVar,
 15    Callable,
 16    Iterator,
 17    AsyncIterator,
 18    cast,
 19    overload,
 20)
 21from typing_extensions import Awaitable, ParamSpec, override, get_origin
 22
 23import anyio
 24import httpx
 25import pydantic
 26
 27from ._types import NoneType
 28from ._utils import is_given, extract_type_arg, is_annotated_type, is_type_alias_type, extract_type_var_from_base
 29from ._models import BaseModel, is_basemodel, add_request_id
 30from ._constants import RAW_RESPONSE_HEADER, OVERRIDE_CAST_TO_HEADER
 31from ._streaming import Stream, AsyncStream, is_stream_class_type, extract_stream_chunk_type
 32from ._exceptions import OpenAIError, APIResponseValidationError
 33
 34if TYPE_CHECKING:
 35    from ._models import FinalRequestOptions
 36    from ._base_client import BaseClient
 37
 38
 39P = ParamSpec("P")
 40R = TypeVar("R")
 41_T = TypeVar("_T")
 42_APIResponseT = TypeVar("_APIResponseT", bound="APIResponse[Any]")
 43_AsyncAPIResponseT = TypeVar("_AsyncAPIResponseT", bound="AsyncAPIResponse[Any]")
 44
 45log: logging.Logger = logging.getLogger(__name__)
 46
 47
 48class BaseAPIResponse(Generic[R]):
 49    _cast_to: type[R]
 50    _client: BaseClient[Any, Any]
 51    _parsed_by_type: dict[type[Any], Any]
 52    _is_sse_stream: bool
 53    _stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None
 54    _options: FinalRequestOptions
 55
 56    http_response: httpx.Response
 57
 58    retries_taken: int
 59    """The number of retries made. If no retries happened this will be `0`"""
 60
 61    def __init__(
 62        self,
 63        *,
 64        raw: httpx.Response,
 65        cast_to: type[R],
 66        client: BaseClient[Any, Any],
 67        stream: bool,
 68        stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None,
 69        options: FinalRequestOptions,
 70        retries_taken: int = 0,
 71    ) -> None:
 72        self._cast_to = cast_to
 73        self._client = client
 74        self._parsed_by_type = {}
 75        self._is_sse_stream = stream
 76        self._stream_cls = stream_cls
 77        self._options = options
 78        self.http_response = raw
 79        self.retries_taken = retries_taken
 80
 81    @property
 82    def headers(self) -> httpx.Headers:
 83        return self.http_response.headers
 84
 85    @property
 86    def http_request(self) -> httpx.Request:
 87        """Returns the httpx Request instance associated with the current response."""
 88        return self.http_response.request
 89
 90    @property
 91    def status_code(self) -> int:
 92        return self.http_response.status_code
 93
 94    @property
 95    def url(self) -> httpx.URL:
 96        """Returns the URL for which the request was made."""
 97        return self.http_response.url
 98
 99    @property
100    def method(self) -> str:
101        return self.http_request.method
102
103    @property
104    def http_version(self) -> str:
105        return self.http_response.http_version
106
107    @property
108    def elapsed(self) -> datetime.timedelta:
109        """The time taken for the complete request/response cycle to complete."""
110        return self.http_response.elapsed
111
112    @property
113    def is_closed(self) -> bool:
114        """Whether or not the response body has been closed.
115
116        If this is False then there is response data that has not been read yet.
117        You must either fully consume the response body or call `.close()`
118        before discarding the response to prevent resource leaks.
119        """
120        return self.http_response.is_closed
121
122    @override
123    def __repr__(self) -> str:
124        return (
125            f"<{self.__class__.__name__} [{self.status_code} {self.http_response.reason_phrase}] type={self._cast_to}>"
126        )
127
128    def _parse(self, *, to: type[_T] | None = None) -> R | _T:
129        cast_to = to if to is not None else self._cast_to
130
131        # unwrap `TypeAlias('Name', T)` -> `T`
132        if is_type_alias_type(cast_to):
133            cast_to = cast_to.__value__  # type: ignore[unreachable]
134
135        # unwrap `Annotated[T, ...]` -> `T`
136        if cast_to and is_annotated_type(cast_to):
137            cast_to = extract_type_arg(cast_to, 0)
138
139        origin = get_origin(cast_to) or cast_to
140
141        if self._is_sse_stream:
142            if to:
143                if not is_stream_class_type(to):
144                    raise TypeError(f"Expected custom parse type to be a subclass of {Stream} or {AsyncStream}")
145
146                return cast(
147                    _T,
148                    to(
149                        cast_to=extract_stream_chunk_type(
150                            to,
151                            failure_message="Expected custom stream type to be passed with a type argument, e.g. Stream[ChunkType]",
152                        ),
153                        response=self.http_response,
154                        client=cast(Any, self._client),
155                    ),
156                )
157
158            if self._stream_cls:
159                return cast(
160                    R,
161                    self._stream_cls(
162                        cast_to=extract_stream_chunk_type(self._stream_cls),
163                        response=self.http_response,
164                        client=cast(Any, self._client),
165                    ),
166                )
167
168            stream_cls = cast("type[Stream[Any]] | type[AsyncStream[Any]] | None", self._client._default_stream_cls)
169            if stream_cls is None:
170                raise MissingStreamClassError()
171
172            return cast(
173                R,
174                stream_cls(
175                    cast_to=cast_to,
176                    response=self.http_response,
177                    client=cast(Any, self._client),
178                ),
179            )
180
181        if cast_to is NoneType:
182            return cast(R, None)
183
184        response = self.http_response
185        if cast_to == str:
186            return cast(R, response.text)
187
188        if cast_to == bytes:
189            return cast(R, response.content)
190
191        if cast_to == int:
192            return cast(R, int(response.text))
193
194        if cast_to == float:
195            return cast(R, float(response.text))
196
197        if cast_to == bool:
198            return cast(R, response.text.lower() == "true")
199
200        # handle the legacy binary response case
201        if inspect.isclass(cast_to) and cast_to.__name__ == "HttpxBinaryResponseContent":
202            return cast(R, cast_to(response))  # type: ignore
203
204        if origin == APIResponse:
205            raise RuntimeError("Unexpected state - cast_to is `APIResponse`")
206
207        if inspect.isclass(origin) and issubclass(origin, httpx.Response):
208            # Because of the invariance of our ResponseT TypeVar, users can subclass httpx.Response
209            # and pass that class to our request functions. We cannot change the variance to be either
210            # covariant or contravariant as that makes our usage of ResponseT illegal. We could construct
211            # the response class ourselves but that is something that should be supported directly in httpx
212            # as it would be easy to incorrectly construct the Response object due to the multitude of arguments.
213            if cast_to != httpx.Response:
214                raise ValueError(f"Subclasses of httpx.Response cannot be passed to `cast_to`")
215            return cast(R, response)
216
217        if (
218            inspect.isclass(
219                origin  # pyright: ignore[reportUnknownArgumentType]
220            )
221            and not issubclass(origin, BaseModel)
222            and issubclass(origin, pydantic.BaseModel)
223        ):
224            raise TypeError("Pydantic models must subclass our base model type, e.g. `from openai import BaseModel`")
225
226        if (
227            cast_to is not object
228            and not origin is list
229            and not origin is dict
230            and not origin is Union
231            and not issubclass(origin, BaseModel)
232        ):
233            raise RuntimeError(
234                f"Unsupported type, expected {cast_to} to be a subclass of {BaseModel}, {dict}, {list}, {Union}, {NoneType}, {str} or {httpx.Response}."
235            )
236
237        # split is required to handle cases where additional information is included
238        # in the response, e.g. application/json; charset=utf-8
239        content_type, *_ = response.headers.get("content-type", "*").split(";")
240        if not content_type.endswith("json"):
241            if is_basemodel(cast_to):
242                try:
243                    data = response.json()
244                except Exception as exc:
245                    log.debug("Could not read JSON from response data due to %s - %s", type(exc), exc)
246                else:
247                    return self._client._process_response_data(
248                        data=data,
249                        cast_to=cast_to,  # type: ignore
250                        response=response,
251                    )
252
253            if self._client._strict_response_validation:
254                raise APIResponseValidationError(
255                    response=response,
256                    message=f"Expected Content-Type response header to be `application/json` but received `{content_type}` instead.",
257                    body=response.text,
258                )
259
260            # If the API responds with content that isn't JSON then we just return
261            # the (decoded) text without performing any parsing so that you can still
262            # handle the response however you need to.
263            return response.text  # type: ignore
264
265        data = response.json()
266
267        return self._client._process_response_data(
268            data=data,
269            cast_to=cast_to,  # type: ignore
270            response=response,
271        )
272
273
274class APIResponse(BaseAPIResponse[R]):
275    @property
276    def request_id(self) -> str | None:
277        return self.http_response.headers.get("x-request-id")  # type: ignore[no-any-return]
278
279    @overload
280    def parse(self, *, to: type[_T]) -> _T: ...
281
282    @overload
283    def parse(self) -> R: ...
284
285    def parse(self, *, to: type[_T] | None = None) -> R | _T:
286        """Returns the rich python representation of this response's data.
287
288        For lower-level control, see `.read()`, `.json()`, `.iter_bytes()`.
289
290        You can customise the type that the response is parsed into through
291        the `to` argument, e.g.
292
293        ```py
294        from openai import BaseModel
295
296
297        class MyModel(BaseModel):
298            foo: str
299
300
301        obj = response.parse(to=MyModel)
302        print(obj.foo)
303        ```
304
305        We support parsing:
306          - `BaseModel`
307          - `dict`
308          - `list`
309          - `Union`
310          - `str`
311          - `int`
312          - `float`
313          - `httpx.Response`
314        """
315        cache_key = to if to is not None else self._cast_to
316        cached = self._parsed_by_type.get(cache_key)
317        if cached is not None:
318            return cached  # type: ignore[no-any-return]
319
320        if not self._is_sse_stream:
321            self.read()
322
323        parsed = self._parse(to=to)
324        if is_given(self._options.post_parser):
325            parsed = self._options.post_parser(parsed)
326
327        if isinstance(parsed, BaseModel):
328            add_request_id(parsed, self.request_id)
329
330        self._parsed_by_type[cache_key] = parsed
331        return cast(R, parsed)
332
333    def read(self) -> bytes:
334        """Read and return the binary response content."""
335        try:
336            return self.http_response.read()
337        except httpx.StreamConsumed as exc:
338            # The default error raised by httpx isn't very
339            # helpful in our case so we re-raise it with
340            # a different error message.
341            raise StreamAlreadyConsumed() from exc
342
343    def text(self) -> str:
344        """Read and decode the response content into a string."""
345        self.read()
346        return self.http_response.text
347
348    def json(self) -> object:
349        """Read and decode the JSON response content."""
350        self.read()
351        return self.http_response.json()
352
353    def close(self) -> None:
354        """Close the response and release the connection.
355
356        Automatically called if the response body is read to completion.
357        """
358        self.http_response.close()
359
360    def iter_bytes(self, chunk_size: int | None = None) -> Iterator[bytes]:
361        """
362        A byte-iterator over the decoded response content.
363
364        This automatically handles gzip, deflate and brotli encoded responses.
365        """
366        for chunk in self.http_response.iter_bytes(chunk_size):
367            yield chunk
368
369    def iter_text(self, chunk_size: int | None = None) -> Iterator[str]:
370        """A str-iterator over the decoded response content
371        that handles both gzip, deflate, etc but also detects the content's
372        string encoding.
373        """
374        for chunk in self.http_response.iter_text(chunk_size):
375            yield chunk
376
377    def iter_lines(self) -> Iterator[str]:
378        """Like `iter_text()` but will only yield chunks for each line"""
379        for chunk in self.http_response.iter_lines():
380            yield chunk
381
382
383class AsyncAPIResponse(BaseAPIResponse[R]):
384    @property
385    def request_id(self) -> str | None:
386        return self.http_response.headers.get("x-request-id")  # type: ignore[no-any-return]
387
388    @overload
389    async def parse(self, *, to: type[_T]) -> _T: ...
390
391    @overload
392    async def parse(self) -> R: ...
393
394    async def parse(self, *, to: type[_T] | None = None) -> R | _T:
395        """Returns the rich python representation of this response's data.
396
397        For lower-level control, see `.read()`, `.json()`, `.iter_bytes()`.
398
399        You can customise the type that the response is parsed into through
400        the `to` argument, e.g.
401
402        ```py
403        from openai import BaseModel
404
405
406        class MyModel(BaseModel):
407            foo: str
408
409
410        obj = response.parse(to=MyModel)
411        print(obj.foo)
412        ```
413
414        We support parsing:
415          - `BaseModel`
416          - `dict`
417          - `list`
418          - `Union`
419          - `str`
420          - `httpx.Response`
421        """
422        cache_key = to if to is not None else self._cast_to
423        cached = self._parsed_by_type.get(cache_key)
424        if cached is not None:
425            return cached  # type: ignore[no-any-return]
426
427        if not self._is_sse_stream:
428            await self.read()
429
430        parsed = self._parse(to=to)
431        if is_given(self._options.post_parser):
432            parsed = self._options.post_parser(parsed)
433
434        if isinstance(parsed, BaseModel):
435            add_request_id(parsed, self.request_id)
436
437        self._parsed_by_type[cache_key] = parsed
438        return cast(R, parsed)
439
440    async def read(self) -> bytes:
441        """Read and return the binary response content."""
442        try:
443            return await self.http_response.aread()
444        except httpx.StreamConsumed as exc:
445            # the default error raised by httpx isn't very
446            # helpful in our case so we re-raise it with
447            # a different error message
448            raise StreamAlreadyConsumed() from exc
449
450    async def text(self) -> str:
451        """Read and decode the response content into a string."""
452        await self.read()
453        return self.http_response.text
454
455    async def json(self) -> object:
456        """Read and decode the JSON response content."""
457        await self.read()
458        return self.http_response.json()
459
460    async def close(self) -> None:
461        """Close the response and release the connection.
462
463        Automatically called if the response body is read to completion.
464        """
465        await self.http_response.aclose()
466
467    async def iter_bytes(self, chunk_size: int | None = None) -> AsyncIterator[bytes]:
468        """
469        A byte-iterator over the decoded response content.
470
471        This automatically handles gzip, deflate and brotli encoded responses.
472        """
473        async for chunk in self.http_response.aiter_bytes(chunk_size):
474            yield chunk
475
476    async def iter_text(self, chunk_size: int | None = None) -> AsyncIterator[str]:
477        """A str-iterator over the decoded response content
478        that handles both gzip, deflate, etc but also detects the content's
479        string encoding.
480        """
481        async for chunk in self.http_response.aiter_text(chunk_size):
482            yield chunk
483
484    async def iter_lines(self) -> AsyncIterator[str]:
485        """Like `iter_text()` but will only yield chunks for each line"""
486        async for chunk in self.http_response.aiter_lines():
487            yield chunk
488
489
490class BinaryAPIResponse(APIResponse[bytes]):
491    """Subclass of APIResponse providing helpers for dealing with binary data.
492
493    Note: If you want to stream the response data instead of eagerly reading it
494    all at once then you should use `.with_streaming_response` when making
495    the API request, e.g. `.with_streaming_response.get_binary_response()`
496    """
497
498    def write_to_file(
499        self,
500        file: str | os.PathLike[str],
501    ) -> None:
502        """Write the output to the given file.
503
504        Accepts a filename or any path-like object, e.g. pathlib.Path
505
506        Note: if you want to stream the data to the file instead of writing
507        all at once then you should use `.with_streaming_response` when making
508        the API request, e.g. `.with_streaming_response.get_binary_response()`
509        """
510        with open(file, mode="wb") as f:
511            for data in self.iter_bytes():
512                f.write(data)
513
514
515class AsyncBinaryAPIResponse(AsyncAPIResponse[bytes]):
516    """Subclass of APIResponse providing helpers for dealing with binary data.
517
518    Note: If you want to stream the response data instead of eagerly reading it
519    all at once then you should use `.with_streaming_response` when making
520    the API request, e.g. `.with_streaming_response.get_binary_response()`
521    """
522
523    async def write_to_file(
524        self,
525        file: str | os.PathLike[str],
526    ) -> None:
527        """Write the output to the given file.
528
529        Accepts a filename or any path-like object, e.g. pathlib.Path
530
531        Note: if you want to stream the data to the file instead of writing
532        all at once then you should use `.with_streaming_response` when making
533        the API request, e.g. `.with_streaming_response.get_binary_response()`
534        """
535        path = anyio.Path(file)
536        async with await path.open(mode="wb") as f:
537            async for data in self.iter_bytes():
538                await f.write(data)
539
540
541class StreamedBinaryAPIResponse(APIResponse[bytes]):
542    def stream_to_file(
543        self,
544        file: str | os.PathLike[str],
545        *,
546        chunk_size: int | None = None,
547    ) -> None:
548        """Streams the output to the given file.
549
550        Accepts a filename or any path-like object, e.g. pathlib.Path
551        """
552        with open(file, mode="wb") as f:
553            for data in self.iter_bytes(chunk_size):
554                f.write(data)
555
556
557class AsyncStreamedBinaryAPIResponse(AsyncAPIResponse[bytes]):
558    async def stream_to_file(
559        self,
560        file: str | os.PathLike[str],
561        *,
562        chunk_size: int | None = None,
563    ) -> None:
564        """Streams the output to the given file.
565
566        Accepts a filename or any path-like object, e.g. pathlib.Path
567        """
568        path = anyio.Path(file)
569        async with await path.open(mode="wb") as f:
570            async for data in self.iter_bytes(chunk_size):
571                await f.write(data)
572
573
574class MissingStreamClassError(TypeError):
575    def __init__(self) -> None:
576        super().__init__(
577            "The `stream` argument was set to `True` but the `stream_cls` argument was not given. See `openai._streaming` for reference",
578        )
579
580
581class StreamAlreadyConsumed(OpenAIError):
582    """
583    Attempted to read or stream content, but the content has already
584    been streamed.
585
586    This can happen if you use a method like `.iter_lines()` and then attempt
587    to read th entire response body afterwards, e.g.
588
589    ```py
590    response = await client.post(...)
591    async for line in response.iter_lines():
592        ...  # do something with `line`
593
594    content = await response.read()
595    # ^ error
596    ```
597
598    If you want this behaviour you'll need to either manually accumulate the response
599    content or call `await response.read()` before iterating over the stream.
600    """
601
602    def __init__(self) -> None:
603        message = (
604            "Attempted to read or stream some content, but the content has "
605            "already been streamed. "
606            "This could be due to attempting to stream the response "
607            "content more than once."
608            "\n\n"
609            "You can fix this by manually accumulating the response content while streaming "
610            "or by calling `.read()` before starting to stream."
611        )
612        super().__init__(message)
613
614
615class ResponseContextManager(Generic[_APIResponseT]):
616    """Context manager for ensuring that a request is not made
617    until it is entered and that the response will always be closed
618    when the context manager exits
619    """
620
621    def __init__(self, request_func: Callable[[], _APIResponseT]) -> None:
622        self._request_func = request_func
623        self.__response: _APIResponseT | None = None
624
625    def __enter__(self) -> _APIResponseT:
626        self.__response = self._request_func()
627        return self.__response
628
629    def __exit__(
630        self,
631        exc_type: type[BaseException] | None,
632        exc: BaseException | None,
633        exc_tb: TracebackType | None,
634    ) -> None:
635        if self.__response is not None:
636            self.__response.close()
637
638
639class AsyncResponseContextManager(Generic[_AsyncAPIResponseT]):
640    """Context manager for ensuring that a request is not made
641    until it is entered and that the response will always be closed
642    when the context manager exits
643    """
644
645    def __init__(self, api_request: Awaitable[_AsyncAPIResponseT]) -> None:
646        self._api_request = api_request
647        self.__response: _AsyncAPIResponseT | None = None
648
649    async def __aenter__(self) -> _AsyncAPIResponseT:
650        self.__response = await self._api_request
651        return self.__response
652
653    async def __aexit__(
654        self,
655        exc_type: type[BaseException] | None,
656        exc: BaseException | None,
657        exc_tb: TracebackType | None,
658    ) -> None:
659        if self.__response is not None:
660            await self.__response.close()
661
662
663def to_streamed_response_wrapper(func: Callable[P, R]) -> Callable[P, ResponseContextManager[APIResponse[R]]]:
664    """Higher order function that takes one of our bound API methods and wraps it
665    to support streaming and returning the raw `APIResponse` object directly.
666    """
667
668    @functools.wraps(func)
669    def wrapped(*args: P.args, **kwargs: P.kwargs) -> ResponseContextManager[APIResponse[R]]:
670        extra_headers: dict[str, str] = {**(cast(Any, kwargs.get("extra_headers")) or {})}
671        extra_headers[RAW_RESPONSE_HEADER] = "stream"
672
673        kwargs["extra_headers"] = extra_headers
674
675        make_request = functools.partial(func, *args, **kwargs)
676
677        return ResponseContextManager(cast(Callable[[], APIResponse[R]], make_request))
678
679    return wrapped
680
681
682def async_to_streamed_response_wrapper(
683    func: Callable[P, Awaitable[R]],
684) -> Callable[P, AsyncResponseContextManager[AsyncAPIResponse[R]]]:
685    """Higher order function that takes one of our bound API methods and wraps it
686    to support streaming and returning the raw `APIResponse` object directly.
687    """
688
689    @functools.wraps(func)
690    def wrapped(*args: P.args, **kwargs: P.kwargs) -> AsyncResponseContextManager[AsyncAPIResponse[R]]:
691        extra_headers: dict[str, str] = {**(cast(Any, kwargs.get("extra_headers")) or {})}
692        extra_headers[RAW_RESPONSE_HEADER] = "stream"
693
694        kwargs["extra_headers"] = extra_headers
695
696        make_request = func(*args, **kwargs)
697
698        return AsyncResponseContextManager(cast(Awaitable[AsyncAPIResponse[R]], make_request))
699
700    return wrapped
701
702
703def to_custom_streamed_response_wrapper(
704    func: Callable[P, object],
705    response_cls: type[_APIResponseT],
706) -> Callable[P, ResponseContextManager[_APIResponseT]]:
707    """Higher order function that takes one of our bound API methods and an `APIResponse` class
708    and wraps the method to support streaming and returning the given response class directly.
709
710    Note: the given `response_cls` *must* be concrete, e.g. `class BinaryAPIResponse(APIResponse[bytes])`
711    """
712
713    @functools.wraps(func)
714    def wrapped(*args: P.args, **kwargs: P.kwargs) -> ResponseContextManager[_APIResponseT]:
715        extra_headers: dict[str, Any] = {**(cast(Any, kwargs.get("extra_headers")) or {})}
716        extra_headers[RAW_RESPONSE_HEADER] = "stream"
717        extra_headers[OVERRIDE_CAST_TO_HEADER] = response_cls
718
719        kwargs["extra_headers"] = extra_headers
720
721        make_request = functools.partial(func, *args, **kwargs)
722
723        return ResponseContextManager(cast(Callable[[], _APIResponseT], make_request))
724
725    return wrapped
726
727
728def async_to_custom_streamed_response_wrapper(
729    func: Callable[P, Awaitable[object]],
730    response_cls: type[_AsyncAPIResponseT],
731) -> Callable[P, AsyncResponseContextManager[_AsyncAPIResponseT]]:
732    """Higher order function that takes one of our bound API methods and an `APIResponse` class
733    and wraps the method to support streaming and returning the given response class directly.
734
735    Note: the given `response_cls` *must* be concrete, e.g. `class BinaryAPIResponse(APIResponse[bytes])`
736    """
737
738    @functools.wraps(func)
739    def wrapped(*args: P.args, **kwargs: P.kwargs) -> AsyncResponseContextManager[_AsyncAPIResponseT]:
740        extra_headers: dict[str, Any] = {**(cast(Any, kwargs.get("extra_headers")) or {})}
741        extra_headers[RAW_RESPONSE_HEADER] = "stream"
742        extra_headers[OVERRIDE_CAST_TO_HEADER] = response_cls
743
744        kwargs["extra_headers"] = extra_headers
745
746        make_request = func(*args, **kwargs)
747
748        return AsyncResponseContextManager(cast(Awaitable[_AsyncAPIResponseT], make_request))
749
750    return wrapped
751
752
753def to_raw_response_wrapper(func: Callable[P, R]) -> Callable[P, APIResponse[R]]:
754    """Higher order function that takes one of our bound API methods and wraps it
755    to support returning the raw `APIResponse` object directly.
756    """
757
758    @functools.wraps(func)
759    def wrapped(*args: P.args, **kwargs: P.kwargs) -> APIResponse[R]:
760        extra_headers: dict[str, str] = {**(cast(Any, kwargs.get("extra_headers")) or {})}
761        extra_headers[RAW_RESPONSE_HEADER] = "raw"
762
763        kwargs["extra_headers"] = extra_headers
764
765        return cast(APIResponse[R], func(*args, **kwargs))
766
767    return wrapped
768
769
770def async_to_raw_response_wrapper(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[AsyncAPIResponse[R]]]:
771    """Higher order function that takes one of our bound API methods and wraps it
772    to support returning the raw `APIResponse` object directly.
773    """
774
775    @functools.wraps(func)
776    async def wrapped(*args: P.args, **kwargs: P.kwargs) -> AsyncAPIResponse[R]:
777        extra_headers: dict[str, str] = {**(cast(Any, kwargs.get("extra_headers")) or {})}
778        extra_headers[RAW_RESPONSE_HEADER] = "raw"
779
780        kwargs["extra_headers"] = extra_headers
781
782        return cast(AsyncAPIResponse[R], await func(*args, **kwargs))
783
784    return wrapped
785
786
787def to_custom_raw_response_wrapper(
788    func: Callable[P, object],
789    response_cls: type[_APIResponseT],
790) -> Callable[P, _APIResponseT]:
791    """Higher order function that takes one of our bound API methods and an `APIResponse` class
792    and wraps the method to support returning the given response class directly.
793
794    Note: the given `response_cls` *must* be concrete, e.g. `class BinaryAPIResponse(APIResponse[bytes])`
795    """
796
797    @functools.wraps(func)
798    def wrapped(*args: P.args, **kwargs: P.kwargs) -> _APIResponseT:
799        extra_headers: dict[str, Any] = {**(cast(Any, kwargs.get("extra_headers")) or {})}
800        extra_headers[RAW_RESPONSE_HEADER] = "raw"
801        extra_headers[OVERRIDE_CAST_TO_HEADER] = response_cls
802
803        kwargs["extra_headers"] = extra_headers
804
805        return cast(_APIResponseT, func(*args, **kwargs))
806
807    return wrapped
808
809
810def async_to_custom_raw_response_wrapper(
811    func: Callable[P, Awaitable[object]],
812    response_cls: type[_AsyncAPIResponseT],
813) -> Callable[P, Awaitable[_AsyncAPIResponseT]]:
814    """Higher order function that takes one of our bound API methods and an `APIResponse` class
815    and wraps the method to support returning the given response class directly.
816
817    Note: the given `response_cls` *must* be concrete, e.g. `class BinaryAPIResponse(APIResponse[bytes])`
818    """
819
820    @functools.wraps(func)
821    def wrapped(*args: P.args, **kwargs: P.kwargs) -> Awaitable[_AsyncAPIResponseT]:
822        extra_headers: dict[str, Any] = {**(cast(Any, kwargs.get("extra_headers")) or {})}
823        extra_headers[RAW_RESPONSE_HEADER] = "raw"
824        extra_headers[OVERRIDE_CAST_TO_HEADER] = response_cls
825
826        kwargs["extra_headers"] = extra_headers
827
828        return cast(Awaitable[_AsyncAPIResponseT], func(*args, **kwargs))
829
830    return wrapped
831
832
833def extract_response_type(typ: type[BaseAPIResponse[Any]]) -> type:
834    """Given a type like `APIResponse[T]`, returns the generic type variable `T`.
835
836    This also handles the case where a concrete subclass is given, e.g.
837    ```py
838    class MyResponse(APIResponse[bytes]):
839        ...
840
841    extract_response_type(MyResponse) -> bytes
842    ```
843    """
844    return extract_type_var_from_base(
845        typ,
846        generic_bases=cast("tuple[type, ...]", (BaseAPIResponse, APIResponse, AsyncAPIResponse)),
847        index=0,
848    )