Commit 3bbb2719

Robert Craigie <robert@craigie.dev>
2024-07-18 20:56:56
feat(client): add uploads.upload_file helper
1 parent b143c16
Changed files (3)
examples
src
openai
resources
uploads
examples/generate_file.sh
@@ -0,0 +1,10 @@
+# generate a text file with random data for testing file uploads
+wanted_size=$((1024*2048*512))
+file_size=$(( ((wanted_size/12)+1)*12 ))
+read_size=$((file_size*3/4))
+
+echo "wanted=$wanted_size file=$file_size read=$read_size"
+
+dd if=/dev/urandom bs=$read_size count=1 | base64 > /tmp/small_test_file.txt
+
+truncate -s "$wanted_size" /tmp/big_test_file.txt 
examples/uploads.py
@@ -0,0 +1,46 @@
+import sys
+from pathlib import Path
+
+import rich
+
+from openai import OpenAI
+
+# generate this file using `./generate_file.sh`
+file = Path("/tmp/big_test_file.txt")
+
+client = OpenAI()
+
+
+def from_disk() -> None:
+    print("uploading file from disk")
+
+    upload = client.uploads.upload_file_chunked(
+        file=file,
+        mime_type="txt",
+        purpose="batch",
+    )
+    rich.print(upload)
+
+
+def from_in_memory() -> None:
+    print("uploading file from memory")
+
+    # read the data into memory ourselves to simulate
+    # it coming from somewhere else
+    data = file.read_bytes()
+    filename = "my_file.txt"
+
+    upload = client.uploads.upload_file_chunked(
+        file=data,
+        filename=filename,
+        bytes=len(data),
+        mime_type="txt",
+        purpose="batch",
+    )
+    rich.print(upload)
+
+
+if "memory" in sys.argv:
+    from_in_memory()
+else:
+    from_disk()
src/openai/resources/uploads/uploads.py
@@ -2,8 +2,14 @@
 
 from __future__ import annotations
 
-from typing import List
-
+import io
+import os
+import logging
+import builtins
+from typing import List, overload
+from pathlib import Path
+
+import anyio
 import httpx
 
 from ... import _legacy_response
@@ -31,6 +37,12 @@ from ...types.file_purpose import FilePurpose
 __all__ = ["Uploads", "AsyncUploads"]
 
 
+# 64MB
+DEFAULT_PART_SIZE = 64 * 1024 * 1024
+
+log: logging.Logger = logging.getLogger(__name__)
+
+
 class Uploads(SyncAPIResource):
     @cached_property
     def parts(self) -> Parts:
@@ -44,6 +56,105 @@ class Uploads(SyncAPIResource):
     def with_streaming_response(self) -> UploadsWithStreamingResponse:
         return UploadsWithStreamingResponse(self)
 
+    @overload
+    def upload_file_chunked(
+        self,
+        *,
+        file: os.PathLike[str],
+        mime_type: str,
+        purpose: FilePurpose,
+        bytes: int | None = None,
+        part_size: int | None = None,
+        md5: str | NotGiven = NOT_GIVEN,
+    ) -> Upload:
+        """Splits a file into multiple 64MB parts and uploads them sequentially."""
+
+    @overload
+    def upload_file_chunked(
+        self,
+        *,
+        file: bytes,
+        filename: str,
+        bytes: int,
+        mime_type: str,
+        purpose: FilePurpose,
+        part_size: int | None = None,
+        md5: str | NotGiven = NOT_GIVEN,
+    ) -> Upload:
+        """Splits an in-memory file into multiple 64MB parts and uploads them sequentially."""
+
+    def upload_file_chunked(
+        self,
+        *,
+        file: os.PathLike[str] | bytes,
+        mime_type: str,
+        purpose: FilePurpose,
+        filename: str | None = None,
+        bytes: int | None = None,
+        part_size: int | None = None,
+        md5: str | NotGiven = NOT_GIVEN,
+    ) -> Upload:
+        """Splits the given file into multiple parts and uploads them sequentially.
+
+        ```py
+        from pathlib import Path
+
+        client.uploads.upload_file(
+            file=Path("my-paper.pdf"),
+            mime_type="pdf",
+            purpose="assistants",
+        )
+        ```
+        """
+        if isinstance(file, builtins.bytes):
+            if filename is None:
+                raise TypeError("The `filename` argument must be given for in-memory files")
+
+            if bytes is None:
+                raise TypeError("The `bytes` argument must be given for in-memory files")
+        else:
+            if not isinstance(file, Path):
+                file = Path(file)
+
+            if not filename:
+                filename = file.name
+
+            if bytes is None:
+                bytes = file.stat().st_size
+
+        upload = self.create(
+            bytes=bytes,
+            filename=filename,
+            mime_type=mime_type,
+            purpose=purpose,
+        )
+
+        part_ids: list[str] = []
+
+        if part_size is None:
+            part_size = DEFAULT_PART_SIZE
+
+        if isinstance(file, builtins.bytes):
+            buf: io.FileIO | io.BytesIO = io.BytesIO(file)
+        else:
+            buf = io.FileIO(file)
+
+        try:
+            while True:
+                data = buf.read(part_size)
+                if not data:
+                    # EOF
+                    break
+
+                part = self.parts.create(upload_id=upload.id, data=data)
+                log.info("Uploaded part %s for upload %s", part.id, upload.id)
+                part_ids.append(part.id)
+        except Exception:
+            buf.close()
+            raise
+
+        return self.complete(upload_id=upload.id, part_ids=part_ids, md5=md5)
+
     def create(
         self,
         *,
@@ -227,6 +338,116 @@ class AsyncUploads(AsyncAPIResource):
     def with_streaming_response(self) -> AsyncUploadsWithStreamingResponse:
         return AsyncUploadsWithStreamingResponse(self)
 
+    @overload
+    async def upload_file_chunked(
+        self,
+        *,
+        file: os.PathLike[str],
+        mime_type: str,
+        purpose: FilePurpose,
+        bytes: int | None = None,
+        part_size: int | None = None,
+        md5: str | NotGiven = NOT_GIVEN,
+    ) -> Upload:
+        """Splits a file into multiple 64MB parts and uploads them sequentially."""
+
+    @overload
+    async def upload_file_chunked(
+        self,
+        *,
+        file: bytes,
+        filename: str,
+        bytes: int,
+        mime_type: str,
+        purpose: FilePurpose,
+        part_size: int | None = None,
+        md5: str | NotGiven = NOT_GIVEN,
+    ) -> Upload:
+        """Splits an in-memory file into multiple 64MB parts and uploads them sequentially."""
+
+    async def upload_file_chunked(
+        self,
+        *,
+        file: os.PathLike[str] | bytes,
+        mime_type: str,
+        purpose: FilePurpose,
+        filename: str | None = None,
+        bytes: int | None = None,
+        part_size: int | None = None,
+        md5: str | NotGiven = NOT_GIVEN,
+    ) -> Upload:
+        """Splits the given file into multiple parts and uploads them sequentially.
+
+        ```py
+        from pathlib import Path
+
+        client.uploads.upload_file(
+            file=Path("my-paper.pdf"),
+            mime_type="pdf",
+            purpose="assistants",
+        )
+        ```
+        """
+        if isinstance(file, builtins.bytes):
+            if filename is None:
+                raise TypeError("The `filename` argument must be given for in-memory files")
+
+            if bytes is None:
+                raise TypeError("The `bytes` argument must be given for in-memory files")
+        else:
+            if not isinstance(file, anyio.Path):
+                file = anyio.Path(file)
+
+            if not filename:
+                filename = file.name
+
+            if bytes is None:
+                stat = await file.stat()
+                bytes = stat.st_size
+
+        upload = await self.create(
+            bytes=bytes,
+            filename=filename,
+            mime_type=mime_type,
+            purpose=purpose,
+        )
+
+        part_ids: list[str] = []
+
+        if part_size is None:
+            part_size = DEFAULT_PART_SIZE
+
+        if isinstance(file, anyio.Path):
+            fd = await file.open("rb")
+            async with fd:
+                while True:
+                    data = await fd.read(part_size)
+                    if not data:
+                        # EOF
+                        break
+
+                    part = await self.parts.create(upload_id=upload.id, data=data)
+                    log.info("Uploaded part %s for upload %s", part.id, upload.id)
+                    part_ids.append(part.id)
+        else:
+            buf = io.BytesIO(file)
+
+            try:
+                while True:
+                    data = buf.read(part_size)
+                    if not data:
+                        # EOF
+                        break
+
+                    part = await self.parts.create(upload_id=upload.id, data=data)
+                    log.info("Uploaded part %s for upload %s", part.id, upload.id)
+                    part_ids.append(part.id)
+            except Exception:
+                buf.close()
+                raise
+
+        return await self.complete(upload_id=upload.id, part_ids=part_ids, md5=md5)
+
     async def create(
         self,
         *,