Commit abc25966
Changed files (1)
src
openai
src/openai/_streaming.py
@@ -60,42 +60,42 @@ class Stream(Generic[_T]):
if sse.data.startswith("[DONE]"):
break
- # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data
- if sse.event and sse.event.startswith("thread."):
- data = sse.json()
-
- if sse.event == "error" and is_mapping(data) and data.get("error"):
- message = None
- error = data.get("error")
- if is_mapping(error):
- message = error.get("message")
- if not message or not isinstance(message, str):
- message = "An error occurred during streaming"
-
- raise APIError(
- message=message,
- request=self.response.request,
- body=data["error"],
- )
-
- yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response)
- else:
- data = sse.json()
- if is_mapping(data) and data.get("error"):
- message = None
- error = data.get("error")
- if is_mapping(error):
- message = error.get("message")
- if not message or not isinstance(message, str):
- message = "An error occurred during streaming"
-
- raise APIError(
- message=message,
- request=self.response.request,
- body=data["error"],
- )
-
- yield process_data(data=data, cast_to=cast_to, response=response)
+ # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data
+ if sse.event and sse.event.startswith("thread."):
+ data = sse.json()
+
+ if sse.event == "error" and is_mapping(data) and data.get("error"):
+ message = None
+ error = data.get("error")
+ if is_mapping(error):
+ message = error.get("message")
+ if not message or not isinstance(message, str):
+ message = "An error occurred during streaming"
+
+ raise APIError(
+ message=message,
+ request=self.response.request,
+ body=data["error"],
+ )
+
+ yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response)
+ else:
+ data = sse.json()
+ if is_mapping(data) and data.get("error"):
+ message = None
+ error = data.get("error")
+ if is_mapping(error):
+ message = error.get("message")
+ if not message or not isinstance(message, str):
+ message = "An error occurred during streaming"
+
+ raise APIError(
+ message=message,
+ request=self.response.request,
+ body=data["error"],
+ )
+
+ yield process_data(data=data, cast_to=cast_to, response=response)
finally:
# Ensure the response is closed even if the consumer doesn't read all data
@@ -163,42 +163,42 @@ class AsyncStream(Generic[_T]):
if sse.data.startswith("[DONE]"):
break
- # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data
- if sse.event and sse.event.startswith("thread."):
- data = sse.json()
-
- if sse.event == "error" and is_mapping(data) and data.get("error"):
- message = None
- error = data.get("error")
- if is_mapping(error):
- message = error.get("message")
- if not message or not isinstance(message, str):
- message = "An error occurred during streaming"
-
- raise APIError(
- message=message,
- request=self.response.request,
- body=data["error"],
- )
-
- yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response)
- else:
- data = sse.json()
- if is_mapping(data) and data.get("error"):
- message = None
- error = data.get("error")
- if is_mapping(error):
- message = error.get("message")
- if not message or not isinstance(message, str):
- message = "An error occurred during streaming"
-
- raise APIError(
- message=message,
- request=self.response.request,
- body=data["error"],
- )
-
- yield process_data(data=data, cast_to=cast_to, response=response)
+ # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data
+ if sse.event and sse.event.startswith("thread."):
+ data = sse.json()
+
+ if sse.event == "error" and is_mapping(data) and data.get("error"):
+ message = None
+ error = data.get("error")
+ if is_mapping(error):
+ message = error.get("message")
+ if not message or not isinstance(message, str):
+ message = "An error occurred during streaming"
+
+ raise APIError(
+ message=message,
+ request=self.response.request,
+ body=data["error"],
+ )
+
+ yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response)
+ else:
+ data = sse.json()
+ if is_mapping(data) and data.get("error"):
+ message = None
+ error = data.get("error")
+ if is_mapping(error):
+ message = error.get("message")
+ if not message or not isinstance(message, str):
+ message = "An error occurred during streaming"
+
+ raise APIError(
+ message=message,
+ request=self.response.request,
+ body=data["error"],
+ )
+
+ yield process_data(data=data, cast_to=cast_to, response=response)
finally:
# Ensure the response is closed even if the consumer doesn't read all data