main
  1# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details.
  2
  3from __future__ import annotations
  4
  5import hmac
  6import json
  7import time
  8import base64
  9import hashlib
 10from typing import cast
 11
 12from .._types import HeadersLike
 13from .._utils import get_required_header
 14from .._models import construct_type
 15from .._resource import SyncAPIResource, AsyncAPIResource
 16from .._exceptions import InvalidWebhookSignatureError
 17from ..types.webhooks.unwrap_webhook_event import UnwrapWebhookEvent
 18
 19__all__ = ["Webhooks", "AsyncWebhooks"]
 20
 21
 22class Webhooks(SyncAPIResource):
 23    def unwrap(
 24        self,
 25        payload: str | bytes,
 26        headers: HeadersLike,
 27        *,
 28        secret: str | None = None,
 29    ) -> UnwrapWebhookEvent:
 30        """Validates that the given payload was sent by OpenAI and parses the payload."""
 31        if secret is None:
 32            secret = self._client.webhook_secret
 33
 34        self.verify_signature(payload=payload, headers=headers, secret=secret)
 35
 36        return cast(
 37            UnwrapWebhookEvent,
 38            construct_type(
 39                type_=UnwrapWebhookEvent,
 40                value=json.loads(payload),
 41            ),
 42        )
 43
 44    def verify_signature(
 45        self,
 46        payload: str | bytes,
 47        headers: HeadersLike,
 48        *,
 49        secret: str | None = None,
 50        tolerance: int = 300,
 51    ) -> None:
 52        """Validates whether or not the webhook payload was sent by OpenAI.
 53
 54        Args:
 55            payload: The webhook payload
 56            headers: The webhook headers
 57            secret: The webhook secret (optional, will use client secret if not provided)
 58            tolerance: Maximum age of the webhook in seconds (default: 300 = 5 minutes)
 59        """
 60        if secret is None:
 61            secret = self._client.webhook_secret
 62
 63        if secret is None:
 64            raise ValueError(
 65                "The webhook secret must either be set using the env var, OPENAI_WEBHOOK_SECRET, "
 66                "on the client class, OpenAI(webhook_secret='123'), or passed to this function"
 67            )
 68
 69        signature_header = get_required_header(headers, "webhook-signature")
 70        timestamp = get_required_header(headers, "webhook-timestamp")
 71        webhook_id = get_required_header(headers, "webhook-id")
 72
 73        # Validate timestamp to prevent replay attacks
 74        try:
 75            timestamp_seconds = int(timestamp)
 76        except ValueError:
 77            raise InvalidWebhookSignatureError("Invalid webhook timestamp format") from None
 78
 79        now = int(time.time())
 80
 81        if now - timestamp_seconds > tolerance:
 82            raise InvalidWebhookSignatureError("Webhook timestamp is too old") from None
 83
 84        if timestamp_seconds > now + tolerance:
 85            raise InvalidWebhookSignatureError("Webhook timestamp is too new") from None
 86
 87        # Extract signatures from v1,<base64> format
 88        # The signature header can have multiple values, separated by spaces.
 89        # Each value is in the format v1,<base64>. We should accept if any match.
 90        signatures: list[str] = []
 91        for part in signature_header.split():
 92            if part.startswith("v1,"):
 93                signatures.append(part[3:])
 94            else:
 95                signatures.append(part)
 96
 97        # Decode the secret if it starts with whsec_
 98        if secret.startswith("whsec_"):
 99            decoded_secret = base64.b64decode(secret[6:])
100        else:
101            decoded_secret = secret.encode()
102
103        body = payload.decode("utf-8") if isinstance(payload, bytes) else payload
104
105        # Prepare the signed payload (OpenAI uses webhookId.timestamp.payload format)
106        signed_payload = f"{webhook_id}.{timestamp}.{body}"
107        expected_signature = base64.b64encode(
108            hmac.new(decoded_secret, signed_payload.encode(), hashlib.sha256).digest()
109        ).decode()
110
111        # Accept if any signature matches
112        if not any(hmac.compare_digest(expected_signature, sig) for sig in signatures):
113            raise InvalidWebhookSignatureError(
114                "The given webhook signature does not match the expected signature"
115            ) from None
116
117
118class AsyncWebhooks(AsyncAPIResource):
119    def unwrap(
120        self,
121        payload: str | bytes,
122        headers: HeadersLike,
123        *,
124        secret: str | None = None,
125    ) -> UnwrapWebhookEvent:
126        """Validates that the given payload was sent by OpenAI and parses the payload."""
127        if secret is None:
128            secret = self._client.webhook_secret
129
130        self.verify_signature(payload=payload, headers=headers, secret=secret)
131
132        body = payload.decode("utf-8") if isinstance(payload, bytes) else payload
133        return cast(
134            UnwrapWebhookEvent,
135            construct_type(
136                type_=UnwrapWebhookEvent,
137                value=json.loads(body),
138            ),
139        )
140
141    def verify_signature(
142        self,
143        payload: str | bytes,
144        headers: HeadersLike,
145        *,
146        secret: str | None = None,
147        tolerance: int = 300,
148    ) -> None:
149        """Validates whether or not the webhook payload was sent by OpenAI.
150
151        Args:
152            payload: The webhook payload
153            headers: The webhook headers
154            secret: The webhook secret (optional, will use client secret if not provided)
155            tolerance: Maximum age of the webhook in seconds (default: 300 = 5 minutes)
156        """
157        if secret is None:
158            secret = self._client.webhook_secret
159
160        if secret is None:
161            raise ValueError(
162                "The webhook secret must either be set using the env var, OPENAI_WEBHOOK_SECRET, "
163                "on the client class, OpenAI(webhook_secret='123'), or passed to this function"
164            ) from None
165
166        signature_header = get_required_header(headers, "webhook-signature")
167        timestamp = get_required_header(headers, "webhook-timestamp")
168        webhook_id = get_required_header(headers, "webhook-id")
169
170        # Validate timestamp to prevent replay attacks
171        try:
172            timestamp_seconds = int(timestamp)
173        except ValueError:
174            raise InvalidWebhookSignatureError("Invalid webhook timestamp format") from None
175
176        now = int(time.time())
177
178        if now - timestamp_seconds > tolerance:
179            raise InvalidWebhookSignatureError("Webhook timestamp is too old") from None
180
181        if timestamp_seconds > now + tolerance:
182            raise InvalidWebhookSignatureError("Webhook timestamp is too new") from None
183
184        # Extract signatures from v1,<base64> format
185        # The signature header can have multiple values, separated by spaces.
186        # Each value is in the format v1,<base64>. We should accept if any match.
187        signatures: list[str] = []
188        for part in signature_header.split():
189            if part.startswith("v1,"):
190                signatures.append(part[3:])
191            else:
192                signatures.append(part)
193
194        # Decode the secret if it starts with whsec_
195        if secret.startswith("whsec_"):
196            decoded_secret = base64.b64decode(secret[6:])
197        else:
198            decoded_secret = secret.encode()
199
200        body = payload.decode("utf-8") if isinstance(payload, bytes) else payload
201
202        # Prepare the signed payload (OpenAI uses webhookId.timestamp.payload format)
203        signed_payload = f"{webhook_id}.{timestamp}.{body}"
204        expected_signature = base64.b64encode(
205            hmac.new(decoded_secret, signed_payload.encode(), hashlib.sha256).digest()
206        ).decode()
207
208        # Accept if any signature matches
209        if not any(hmac.compare_digest(expected_signature, sig) for sig in signatures):
210            raise InvalidWebhookSignatureError("The given webhook signature does not match the expected signature")