Commit f6552d76

stainless-app[bot] <142633134+stainless-app[bot]@users.noreply.github.com>
2025-12-02 00:56:43
fix: ensure streams are always closed
1 parent fc48250
Changed files (1)
src
src/openai/_streaming.py
@@ -55,9 +55,10 @@ class Stream(Generic[_T]):
         process_data = self._client._process_response_data
         iterator = self._iter_events()
 
-        for sse in iterator:
-            if sse.data.startswith("[DONE]"):
-                break
+        try:
+            for sse in iterator:
+                if sse.data.startswith("[DONE]"):
+                    break
 
             # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data
             if sse.event and sse.event.startswith("thread."):
@@ -96,8 +97,9 @@ class Stream(Generic[_T]):
 
                 yield process_data(data=data, cast_to=cast_to, response=response)
 
-        # As we might not fully consume the response stream, we need to close it explicitly
-        response.close()
+        finally:
+            # Ensure the response is closed even if the consumer doesn't read all data
+            response.close()
 
     def __enter__(self) -> Self:
         return self
@@ -156,9 +158,10 @@ class AsyncStream(Generic[_T]):
         process_data = self._client._process_response_data
         iterator = self._iter_events()
 
-        async for sse in iterator:
-            if sse.data.startswith("[DONE]"):
-                break
+        try:
+            async for sse in iterator:
+                if sse.data.startswith("[DONE]"):
+                    break
 
             # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data
             if sse.event and sse.event.startswith("thread."):
@@ -197,8 +200,9 @@ class AsyncStream(Generic[_T]):
 
                 yield process_data(data=data, cast_to=cast_to, response=response)
 
-        # As we might not fully consume the response stream, we need to close it explicitly
-        await response.aclose()
+        finally:
+            # Ensure the response is closed even if the consumer doesn't read all data
+            await response.aclose()
 
     async def __aenter__(self) -> Self:
         return self