main
1# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details.
2
3from __future__ import annotations
4
5import hmac
6import json
7import time
8import base64
9import hashlib
10from typing import cast
11
12from .._types import HeadersLike
13from .._utils import get_required_header
14from .._models import construct_type
15from .._resource import SyncAPIResource, AsyncAPIResource
16from .._exceptions import InvalidWebhookSignatureError
17from ..types.webhooks.unwrap_webhook_event import UnwrapWebhookEvent
18
19__all__ = ["Webhooks", "AsyncWebhooks"]
20
21
22class Webhooks(SyncAPIResource):
23 def unwrap(
24 self,
25 payload: str | bytes,
26 headers: HeadersLike,
27 *,
28 secret: str | None = None,
29 ) -> UnwrapWebhookEvent:
30 """Validates that the given payload was sent by OpenAI and parses the payload."""
31 if secret is None:
32 secret = self._client.webhook_secret
33
34 self.verify_signature(payload=payload, headers=headers, secret=secret)
35
36 return cast(
37 UnwrapWebhookEvent,
38 construct_type(
39 type_=UnwrapWebhookEvent,
40 value=json.loads(payload),
41 ),
42 )
43
44 def verify_signature(
45 self,
46 payload: str | bytes,
47 headers: HeadersLike,
48 *,
49 secret: str | None = None,
50 tolerance: int = 300,
51 ) -> None:
52 """Validates whether or not the webhook payload was sent by OpenAI.
53
54 Args:
55 payload: The webhook payload
56 headers: The webhook headers
57 secret: The webhook secret (optional, will use client secret if not provided)
58 tolerance: Maximum age of the webhook in seconds (default: 300 = 5 minutes)
59 """
60 if secret is None:
61 secret = self._client.webhook_secret
62
63 if secret is None:
64 raise ValueError(
65 "The webhook secret must either be set using the env var, OPENAI_WEBHOOK_SECRET, "
66 "on the client class, OpenAI(webhook_secret='123'), or passed to this function"
67 )
68
69 signature_header = get_required_header(headers, "webhook-signature")
70 timestamp = get_required_header(headers, "webhook-timestamp")
71 webhook_id = get_required_header(headers, "webhook-id")
72
73 # Validate timestamp to prevent replay attacks
74 try:
75 timestamp_seconds = int(timestamp)
76 except ValueError:
77 raise InvalidWebhookSignatureError("Invalid webhook timestamp format") from None
78
79 now = int(time.time())
80
81 if now - timestamp_seconds > tolerance:
82 raise InvalidWebhookSignatureError("Webhook timestamp is too old") from None
83
84 if timestamp_seconds > now + tolerance:
85 raise InvalidWebhookSignatureError("Webhook timestamp is too new") from None
86
87 # Extract signatures from v1,<base64> format
88 # The signature header can have multiple values, separated by spaces.
89 # Each value is in the format v1,<base64>. We should accept if any match.
90 signatures: list[str] = []
91 for part in signature_header.split():
92 if part.startswith("v1,"):
93 signatures.append(part[3:])
94 else:
95 signatures.append(part)
96
97 # Decode the secret if it starts with whsec_
98 if secret.startswith("whsec_"):
99 decoded_secret = base64.b64decode(secret[6:])
100 else:
101 decoded_secret = secret.encode()
102
103 body = payload.decode("utf-8") if isinstance(payload, bytes) else payload
104
105 # Prepare the signed payload (OpenAI uses webhookId.timestamp.payload format)
106 signed_payload = f"{webhook_id}.{timestamp}.{body}"
107 expected_signature = base64.b64encode(
108 hmac.new(decoded_secret, signed_payload.encode(), hashlib.sha256).digest()
109 ).decode()
110
111 # Accept if any signature matches
112 if not any(hmac.compare_digest(expected_signature, sig) for sig in signatures):
113 raise InvalidWebhookSignatureError(
114 "The given webhook signature does not match the expected signature"
115 ) from None
116
117
118class AsyncWebhooks(AsyncAPIResource):
119 def unwrap(
120 self,
121 payload: str | bytes,
122 headers: HeadersLike,
123 *,
124 secret: str | None = None,
125 ) -> UnwrapWebhookEvent:
126 """Validates that the given payload was sent by OpenAI and parses the payload."""
127 if secret is None:
128 secret = self._client.webhook_secret
129
130 self.verify_signature(payload=payload, headers=headers, secret=secret)
131
132 body = payload.decode("utf-8") if isinstance(payload, bytes) else payload
133 return cast(
134 UnwrapWebhookEvent,
135 construct_type(
136 type_=UnwrapWebhookEvent,
137 value=json.loads(body),
138 ),
139 )
140
141 def verify_signature(
142 self,
143 payload: str | bytes,
144 headers: HeadersLike,
145 *,
146 secret: str | None = None,
147 tolerance: int = 300,
148 ) -> None:
149 """Validates whether or not the webhook payload was sent by OpenAI.
150
151 Args:
152 payload: The webhook payload
153 headers: The webhook headers
154 secret: The webhook secret (optional, will use client secret if not provided)
155 tolerance: Maximum age of the webhook in seconds (default: 300 = 5 minutes)
156 """
157 if secret is None:
158 secret = self._client.webhook_secret
159
160 if secret is None:
161 raise ValueError(
162 "The webhook secret must either be set using the env var, OPENAI_WEBHOOK_SECRET, "
163 "on the client class, OpenAI(webhook_secret='123'), or passed to this function"
164 ) from None
165
166 signature_header = get_required_header(headers, "webhook-signature")
167 timestamp = get_required_header(headers, "webhook-timestamp")
168 webhook_id = get_required_header(headers, "webhook-id")
169
170 # Validate timestamp to prevent replay attacks
171 try:
172 timestamp_seconds = int(timestamp)
173 except ValueError:
174 raise InvalidWebhookSignatureError("Invalid webhook timestamp format") from None
175
176 now = int(time.time())
177
178 if now - timestamp_seconds > tolerance:
179 raise InvalidWebhookSignatureError("Webhook timestamp is too old") from None
180
181 if timestamp_seconds > now + tolerance:
182 raise InvalidWebhookSignatureError("Webhook timestamp is too new") from None
183
184 # Extract signatures from v1,<base64> format
185 # The signature header can have multiple values, separated by spaces.
186 # Each value is in the format v1,<base64>. We should accept if any match.
187 signatures: list[str] = []
188 for part in signature_header.split():
189 if part.startswith("v1,"):
190 signatures.append(part[3:])
191 else:
192 signatures.append(part)
193
194 # Decode the secret if it starts with whsec_
195 if secret.startswith("whsec_"):
196 decoded_secret = base64.b64decode(secret[6:])
197 else:
198 decoded_secret = secret.encode()
199
200 body = payload.decode("utf-8") if isinstance(payload, bytes) else payload
201
202 # Prepare the signed payload (OpenAI uses webhookId.timestamp.payload format)
203 signed_payload = f"{webhook_id}.{timestamp}.{body}"
204 expected_signature = base64.b64encode(
205 hmac.new(decoded_secret, signed_payload.encode(), hashlib.sha256).digest()
206 ).decode()
207
208 # Accept if any signature matches
209 if not any(hmac.compare_digest(expected_signature, sig) for sig in signatures):
210 raise InvalidWebhookSignatureError("The given webhook signature does not match the expected signature")