main
1# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details.
2
3from __future__ import annotations
4
5import asyncio
6from typing import Dict, Iterable, Optional
7from typing_extensions import Union, Literal
8from concurrent.futures import Future, ThreadPoolExecutor, as_completed
9
10import httpx
11import sniffio
12
13from ... import _legacy_response
14from ...types import FileChunkingStrategyParam
15from ..._types import Body, Omit, Query, Headers, NotGiven, FileTypes, SequenceNotStr, omit, not_given
16from ..._utils import is_given, maybe_transform, async_maybe_transform
17from ..._compat import cached_property
18from ..._resource import SyncAPIResource, AsyncAPIResource
19from ..._response import to_streamed_response_wrapper, async_to_streamed_response_wrapper
20from ...pagination import SyncCursorPage, AsyncCursorPage
21from ..._base_client import AsyncPaginator, make_request_options
22from ...types.file_object import FileObject
23from ...types.vector_stores import file_batch_create_params, file_batch_list_files_params
24from ...types.file_chunking_strategy_param import FileChunkingStrategyParam
25from ...types.vector_stores.vector_store_file import VectorStoreFile
26from ...types.vector_stores.vector_store_file_batch import VectorStoreFileBatch
27
28__all__ = ["FileBatches", "AsyncFileBatches"]
29
30
31class FileBatches(SyncAPIResource):
32 @cached_property
33 def with_raw_response(self) -> FileBatchesWithRawResponse:
34 """
35 This property can be used as a prefix for any HTTP method call to return
36 the raw response object instead of the parsed content.
37
38 For more information, see https://www.github.com/openai/openai-python#accessing-raw-response-data-eg-headers
39 """
40 return FileBatchesWithRawResponse(self)
41
42 @cached_property
43 def with_streaming_response(self) -> FileBatchesWithStreamingResponse:
44 """
45 An alternative to `.with_raw_response` that doesn't eagerly read the response body.
46
47 For more information, see https://www.github.com/openai/openai-python#with_streaming_response
48 """
49 return FileBatchesWithStreamingResponse(self)
50
51 def create(
52 self,
53 vector_store_id: str,
54 *,
55 attributes: Optional[Dict[str, Union[str, float, bool]]] | Omit = omit,
56 chunking_strategy: FileChunkingStrategyParam | Omit = omit,
57 file_ids: SequenceNotStr[str] | Omit = omit,
58 files: Iterable[file_batch_create_params.File] | Omit = omit,
59 # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
60 # The extra values given here take precedence over values defined on the client or passed to this method.
61 extra_headers: Headers | None = None,
62 extra_query: Query | None = None,
63 extra_body: Body | None = None,
64 timeout: float | httpx.Timeout | None | NotGiven = not_given,
65 ) -> VectorStoreFileBatch:
66 """
67 Create a vector store file batch.
68
69 Args:
70 attributes: Set of 16 key-value pairs that can be attached to an object. This can be useful
71 for storing additional information about the object in a structured format, and
72 querying for objects via API or the dashboard. Keys are strings with a maximum
73 length of 64 characters. Values are strings with a maximum length of 512
74 characters, booleans, or numbers.
75
76 chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will use the `auto`
77 strategy. Only applicable if `file_ids` is non-empty.
78
79 file_ids: A list of [File](https://platform.openai.com/docs/api-reference/files) IDs that
80 the vector store should use. Useful for tools like `file_search` that can access
81 files. If `attributes` or `chunking_strategy` are provided, they will be applied
82 to all files in the batch. Mutually exclusive with `files`.
83
84 files: A list of objects that each include a `file_id` plus optional `attributes` or
85 `chunking_strategy`. Use this when you need to override metadata for specific
86 files. The global `attributes` or `chunking_strategy` will be ignored and must
87 be specified for each file. Mutually exclusive with `file_ids`.
88
89 extra_headers: Send extra headers
90
91 extra_query: Add additional query parameters to the request
92
93 extra_body: Add additional JSON properties to the request
94
95 timeout: Override the client-level default timeout for this request, in seconds
96 """
97 if not vector_store_id:
98 raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
99 extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
100 return self._post(
101 f"/vector_stores/{vector_store_id}/file_batches",
102 body=maybe_transform(
103 {
104 "attributes": attributes,
105 "chunking_strategy": chunking_strategy,
106 "file_ids": file_ids,
107 "files": files,
108 },
109 file_batch_create_params.FileBatchCreateParams,
110 ),
111 options=make_request_options(
112 extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
113 ),
114 cast_to=VectorStoreFileBatch,
115 )
116
117 def retrieve(
118 self,
119 batch_id: str,
120 *,
121 vector_store_id: str,
122 # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
123 # The extra values given here take precedence over values defined on the client or passed to this method.
124 extra_headers: Headers | None = None,
125 extra_query: Query | None = None,
126 extra_body: Body | None = None,
127 timeout: float | httpx.Timeout | None | NotGiven = not_given,
128 ) -> VectorStoreFileBatch:
129 """
130 Retrieves a vector store file batch.
131
132 Args:
133 extra_headers: Send extra headers
134
135 extra_query: Add additional query parameters to the request
136
137 extra_body: Add additional JSON properties to the request
138
139 timeout: Override the client-level default timeout for this request, in seconds
140 """
141 if not vector_store_id:
142 raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
143 if not batch_id:
144 raise ValueError(f"Expected a non-empty value for `batch_id` but received {batch_id!r}")
145 extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
146 return self._get(
147 f"/vector_stores/{vector_store_id}/file_batches/{batch_id}",
148 options=make_request_options(
149 extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
150 ),
151 cast_to=VectorStoreFileBatch,
152 )
153
154 def cancel(
155 self,
156 batch_id: str,
157 *,
158 vector_store_id: str,
159 # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
160 # The extra values given here take precedence over values defined on the client or passed to this method.
161 extra_headers: Headers | None = None,
162 extra_query: Query | None = None,
163 extra_body: Body | None = None,
164 timeout: float | httpx.Timeout | None | NotGiven = not_given,
165 ) -> VectorStoreFileBatch:
166 """Cancel a vector store file batch.
167
168 This attempts to cancel the processing of
169 files in this batch as soon as possible.
170
171 Args:
172 extra_headers: Send extra headers
173
174 extra_query: Add additional query parameters to the request
175
176 extra_body: Add additional JSON properties to the request
177
178 timeout: Override the client-level default timeout for this request, in seconds
179 """
180 if not vector_store_id:
181 raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
182 if not batch_id:
183 raise ValueError(f"Expected a non-empty value for `batch_id` but received {batch_id!r}")
184 extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
185 return self._post(
186 f"/vector_stores/{vector_store_id}/file_batches/{batch_id}/cancel",
187 options=make_request_options(
188 extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
189 ),
190 cast_to=VectorStoreFileBatch,
191 )
192
193 def create_and_poll(
194 self,
195 vector_store_id: str,
196 *,
197 file_ids: SequenceNotStr[str],
198 poll_interval_ms: int | Omit = omit,
199 chunking_strategy: FileChunkingStrategyParam | Omit = omit,
200 ) -> VectorStoreFileBatch:
201 """Create a vector store batch and poll until all files have been processed."""
202 batch = self.create(
203 vector_store_id=vector_store_id,
204 file_ids=file_ids,
205 chunking_strategy=chunking_strategy,
206 )
207 # TODO: don't poll unless necessary??
208 return self.poll(
209 batch.id,
210 vector_store_id=vector_store_id,
211 poll_interval_ms=poll_interval_ms,
212 )
213
214 def list_files(
215 self,
216 batch_id: str,
217 *,
218 vector_store_id: str,
219 after: str | Omit = omit,
220 before: str | Omit = omit,
221 filter: Literal["in_progress", "completed", "failed", "cancelled"] | Omit = omit,
222 limit: int | Omit = omit,
223 order: Literal["asc", "desc"] | Omit = omit,
224 # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
225 # The extra values given here take precedence over values defined on the client or passed to this method.
226 extra_headers: Headers | None = None,
227 extra_query: Query | None = None,
228 extra_body: Body | None = None,
229 timeout: float | httpx.Timeout | None | NotGiven = not_given,
230 ) -> SyncCursorPage[VectorStoreFile]:
231 """
232 Returns a list of vector store files in a batch.
233
234 Args:
235 after: A cursor for use in pagination. `after` is an object ID that defines your place
236 in the list. For instance, if you make a list request and receive 100 objects,
237 ending with obj_foo, your subsequent call can include after=obj_foo in order to
238 fetch the next page of the list.
239
240 before: A cursor for use in pagination. `before` is an object ID that defines your place
241 in the list. For instance, if you make a list request and receive 100 objects,
242 starting with obj_foo, your subsequent call can include before=obj_foo in order
243 to fetch the previous page of the list.
244
245 filter: Filter by file status. One of `in_progress`, `completed`, `failed`, `cancelled`.
246
247 limit: A limit on the number of objects to be returned. Limit can range between 1 and
248 100, and the default is 20.
249
250 order: Sort order by the `created_at` timestamp of the objects. `asc` for ascending
251 order and `desc` for descending order.
252
253 extra_headers: Send extra headers
254
255 extra_query: Add additional query parameters to the request
256
257 extra_body: Add additional JSON properties to the request
258
259 timeout: Override the client-level default timeout for this request, in seconds
260 """
261 if not vector_store_id:
262 raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
263 if not batch_id:
264 raise ValueError(f"Expected a non-empty value for `batch_id` but received {batch_id!r}")
265 extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
266 return self._get_api_list(
267 f"/vector_stores/{vector_store_id}/file_batches/{batch_id}/files",
268 page=SyncCursorPage[VectorStoreFile],
269 options=make_request_options(
270 extra_headers=extra_headers,
271 extra_query=extra_query,
272 extra_body=extra_body,
273 timeout=timeout,
274 query=maybe_transform(
275 {
276 "after": after,
277 "before": before,
278 "filter": filter,
279 "limit": limit,
280 "order": order,
281 },
282 file_batch_list_files_params.FileBatchListFilesParams,
283 ),
284 ),
285 model=VectorStoreFile,
286 )
287
288 def poll(
289 self,
290 batch_id: str,
291 *,
292 vector_store_id: str,
293 poll_interval_ms: int | Omit = omit,
294 ) -> VectorStoreFileBatch:
295 """Wait for the given file batch to be processed.
296
297 Note: this will return even if one of the files failed to process, you need to
298 check batch.file_counts.failed_count to handle this case.
299 """
300 headers: dict[str, str] = {"X-Stainless-Poll-Helper": "true"}
301 if is_given(poll_interval_ms):
302 headers["X-Stainless-Custom-Poll-Interval"] = str(poll_interval_ms)
303
304 while True:
305 response = self.with_raw_response.retrieve(
306 batch_id,
307 vector_store_id=vector_store_id,
308 extra_headers=headers,
309 )
310
311 batch = response.parse()
312 if batch.file_counts.in_progress > 0:
313 if not is_given(poll_interval_ms):
314 from_header = response.headers.get("openai-poll-after-ms")
315 if from_header is not None:
316 poll_interval_ms = int(from_header)
317 else:
318 poll_interval_ms = 1000
319
320 self._sleep(poll_interval_ms / 1000)
321 continue
322
323 return batch
324
325 def upload_and_poll(
326 self,
327 vector_store_id: str,
328 *,
329 files: Iterable[FileTypes],
330 max_concurrency: int = 5,
331 file_ids: SequenceNotStr[str] = [],
332 poll_interval_ms: int | Omit = omit,
333 chunking_strategy: FileChunkingStrategyParam | Omit = omit,
334 ) -> VectorStoreFileBatch:
335 """Uploads the given files concurrently and then creates a vector store file batch.
336
337 If you've already uploaded certain files that you want to include in this batch
338 then you can pass their IDs through the `file_ids` argument.
339
340 By default, if any file upload fails then an exception will be eagerly raised.
341
342 The number of concurrency uploads is configurable using the `max_concurrency`
343 parameter.
344
345 Note: this method only supports `asyncio` or `trio` as the backing async
346 runtime.
347 """
348 results: list[FileObject] = []
349
350 with ThreadPoolExecutor(max_workers=max_concurrency) as executor:
351 futures: list[Future[FileObject]] = [
352 executor.submit(
353 self._client.files.create,
354 file=file,
355 purpose="assistants",
356 )
357 for file in files
358 ]
359
360 for future in as_completed(futures):
361 exc = future.exception()
362 if exc:
363 raise exc
364
365 results.append(future.result())
366
367 batch = self.create_and_poll(
368 vector_store_id=vector_store_id,
369 file_ids=[*file_ids, *(f.id for f in results)],
370 poll_interval_ms=poll_interval_ms,
371 chunking_strategy=chunking_strategy,
372 )
373 return batch
374
375
376class AsyncFileBatches(AsyncAPIResource):
377 @cached_property
378 def with_raw_response(self) -> AsyncFileBatchesWithRawResponse:
379 """
380 This property can be used as a prefix for any HTTP method call to return
381 the raw response object instead of the parsed content.
382
383 For more information, see https://www.github.com/openai/openai-python#accessing-raw-response-data-eg-headers
384 """
385 return AsyncFileBatchesWithRawResponse(self)
386
387 @cached_property
388 def with_streaming_response(self) -> AsyncFileBatchesWithStreamingResponse:
389 """
390 An alternative to `.with_raw_response` that doesn't eagerly read the response body.
391
392 For more information, see https://www.github.com/openai/openai-python#with_streaming_response
393 """
394 return AsyncFileBatchesWithStreamingResponse(self)
395
396 async def create(
397 self,
398 vector_store_id: str,
399 *,
400 attributes: Optional[Dict[str, Union[str, float, bool]]] | Omit = omit,
401 chunking_strategy: FileChunkingStrategyParam | Omit = omit,
402 file_ids: SequenceNotStr[str] | Omit = omit,
403 files: Iterable[file_batch_create_params.File] | Omit = omit,
404 # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
405 # The extra values given here take precedence over values defined on the client or passed to this method.
406 extra_headers: Headers | None = None,
407 extra_query: Query | None = None,
408 extra_body: Body | None = None,
409 timeout: float | httpx.Timeout | None | NotGiven = not_given,
410 ) -> VectorStoreFileBatch:
411 """
412 Create a vector store file batch.
413
414 Args:
415 attributes: Set of 16 key-value pairs that can be attached to an object. This can be useful
416 for storing additional information about the object in a structured format, and
417 querying for objects via API or the dashboard. Keys are strings with a maximum
418 length of 64 characters. Values are strings with a maximum length of 512
419 characters, booleans, or numbers.
420
421 chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will use the `auto`
422 strategy. Only applicable if `file_ids` is non-empty.
423
424 file_ids: A list of [File](https://platform.openai.com/docs/api-reference/files) IDs that
425 the vector store should use. Useful for tools like `file_search` that can access
426 files. If `attributes` or `chunking_strategy` are provided, they will be applied
427 to all files in the batch. Mutually exclusive with `files`.
428
429 files: A list of objects that each include a `file_id` plus optional `attributes` or
430 `chunking_strategy`. Use this when you need to override metadata for specific
431 files. The global `attributes` or `chunking_strategy` will be ignored and must
432 be specified for each file. Mutually exclusive with `file_ids`.
433
434 extra_headers: Send extra headers
435
436 extra_query: Add additional query parameters to the request
437
438 extra_body: Add additional JSON properties to the request
439
440 timeout: Override the client-level default timeout for this request, in seconds
441 """
442 if not vector_store_id:
443 raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
444 extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
445 return await self._post(
446 f"/vector_stores/{vector_store_id}/file_batches",
447 body=await async_maybe_transform(
448 {
449 "attributes": attributes,
450 "chunking_strategy": chunking_strategy,
451 "file_ids": file_ids,
452 "files": files,
453 },
454 file_batch_create_params.FileBatchCreateParams,
455 ),
456 options=make_request_options(
457 extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
458 ),
459 cast_to=VectorStoreFileBatch,
460 )
461
462 async def retrieve(
463 self,
464 batch_id: str,
465 *,
466 vector_store_id: str,
467 # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
468 # The extra values given here take precedence over values defined on the client or passed to this method.
469 extra_headers: Headers | None = None,
470 extra_query: Query | None = None,
471 extra_body: Body | None = None,
472 timeout: float | httpx.Timeout | None | NotGiven = not_given,
473 ) -> VectorStoreFileBatch:
474 """
475 Retrieves a vector store file batch.
476
477 Args:
478 extra_headers: Send extra headers
479
480 extra_query: Add additional query parameters to the request
481
482 extra_body: Add additional JSON properties to the request
483
484 timeout: Override the client-level default timeout for this request, in seconds
485 """
486 if not vector_store_id:
487 raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
488 if not batch_id:
489 raise ValueError(f"Expected a non-empty value for `batch_id` but received {batch_id!r}")
490 extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
491 return await self._get(
492 f"/vector_stores/{vector_store_id}/file_batches/{batch_id}",
493 options=make_request_options(
494 extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
495 ),
496 cast_to=VectorStoreFileBatch,
497 )
498
499 async def cancel(
500 self,
501 batch_id: str,
502 *,
503 vector_store_id: str,
504 # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
505 # The extra values given here take precedence over values defined on the client or passed to this method.
506 extra_headers: Headers | None = None,
507 extra_query: Query | None = None,
508 extra_body: Body | None = None,
509 timeout: float | httpx.Timeout | None | NotGiven = not_given,
510 ) -> VectorStoreFileBatch:
511 """Cancel a vector store file batch.
512
513 This attempts to cancel the processing of
514 files in this batch as soon as possible.
515
516 Args:
517 extra_headers: Send extra headers
518
519 extra_query: Add additional query parameters to the request
520
521 extra_body: Add additional JSON properties to the request
522
523 timeout: Override the client-level default timeout for this request, in seconds
524 """
525 if not vector_store_id:
526 raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
527 if not batch_id:
528 raise ValueError(f"Expected a non-empty value for `batch_id` but received {batch_id!r}")
529 extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
530 return await self._post(
531 f"/vector_stores/{vector_store_id}/file_batches/{batch_id}/cancel",
532 options=make_request_options(
533 extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
534 ),
535 cast_to=VectorStoreFileBatch,
536 )
537
538 async def create_and_poll(
539 self,
540 vector_store_id: str,
541 *,
542 file_ids: SequenceNotStr[str],
543 poll_interval_ms: int | Omit = omit,
544 chunking_strategy: FileChunkingStrategyParam | Omit = omit,
545 ) -> VectorStoreFileBatch:
546 """Create a vector store batch and poll until all files have been processed."""
547 batch = await self.create(
548 vector_store_id=vector_store_id,
549 file_ids=file_ids,
550 chunking_strategy=chunking_strategy,
551 )
552 # TODO: don't poll unless necessary??
553 return await self.poll(
554 batch.id,
555 vector_store_id=vector_store_id,
556 poll_interval_ms=poll_interval_ms,
557 )
558
559 def list_files(
560 self,
561 batch_id: str,
562 *,
563 vector_store_id: str,
564 after: str | Omit = omit,
565 before: str | Omit = omit,
566 filter: Literal["in_progress", "completed", "failed", "cancelled"] | Omit = omit,
567 limit: int | Omit = omit,
568 order: Literal["asc", "desc"] | Omit = omit,
569 # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
570 # The extra values given here take precedence over values defined on the client or passed to this method.
571 extra_headers: Headers | None = None,
572 extra_query: Query | None = None,
573 extra_body: Body | None = None,
574 timeout: float | httpx.Timeout | None | NotGiven = not_given,
575 ) -> AsyncPaginator[VectorStoreFile, AsyncCursorPage[VectorStoreFile]]:
576 """
577 Returns a list of vector store files in a batch.
578
579 Args:
580 after: A cursor for use in pagination. `after` is an object ID that defines your place
581 in the list. For instance, if you make a list request and receive 100 objects,
582 ending with obj_foo, your subsequent call can include after=obj_foo in order to
583 fetch the next page of the list.
584
585 before: A cursor for use in pagination. `before` is an object ID that defines your place
586 in the list. For instance, if you make a list request and receive 100 objects,
587 starting with obj_foo, your subsequent call can include before=obj_foo in order
588 to fetch the previous page of the list.
589
590 filter: Filter by file status. One of `in_progress`, `completed`, `failed`, `cancelled`.
591
592 limit: A limit on the number of objects to be returned. Limit can range between 1 and
593 100, and the default is 20.
594
595 order: Sort order by the `created_at` timestamp of the objects. `asc` for ascending
596 order and `desc` for descending order.
597
598 extra_headers: Send extra headers
599
600 extra_query: Add additional query parameters to the request
601
602 extra_body: Add additional JSON properties to the request
603
604 timeout: Override the client-level default timeout for this request, in seconds
605 """
606 if not vector_store_id:
607 raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
608 if not batch_id:
609 raise ValueError(f"Expected a non-empty value for `batch_id` but received {batch_id!r}")
610 extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
611 return self._get_api_list(
612 f"/vector_stores/{vector_store_id}/file_batches/{batch_id}/files",
613 page=AsyncCursorPage[VectorStoreFile],
614 options=make_request_options(
615 extra_headers=extra_headers,
616 extra_query=extra_query,
617 extra_body=extra_body,
618 timeout=timeout,
619 query=maybe_transform(
620 {
621 "after": after,
622 "before": before,
623 "filter": filter,
624 "limit": limit,
625 "order": order,
626 },
627 file_batch_list_files_params.FileBatchListFilesParams,
628 ),
629 ),
630 model=VectorStoreFile,
631 )
632
633 async def poll(
634 self,
635 batch_id: str,
636 *,
637 vector_store_id: str,
638 poll_interval_ms: int | Omit = omit,
639 ) -> VectorStoreFileBatch:
640 """Wait for the given file batch to be processed.
641
642 Note: this will return even if one of the files failed to process, you need to
643 check batch.file_counts.failed_count to handle this case.
644 """
645 headers: dict[str, str] = {"X-Stainless-Poll-Helper": "true"}
646 if is_given(poll_interval_ms):
647 headers["X-Stainless-Custom-Poll-Interval"] = str(poll_interval_ms)
648
649 while True:
650 response = await self.with_raw_response.retrieve(
651 batch_id,
652 vector_store_id=vector_store_id,
653 extra_headers=headers,
654 )
655
656 batch = response.parse()
657 if batch.file_counts.in_progress > 0:
658 if not is_given(poll_interval_ms):
659 from_header = response.headers.get("openai-poll-after-ms")
660 if from_header is not None:
661 poll_interval_ms = int(from_header)
662 else:
663 poll_interval_ms = 1000
664
665 await self._sleep(poll_interval_ms / 1000)
666 continue
667
668 return batch
669
670 async def upload_and_poll(
671 self,
672 vector_store_id: str,
673 *,
674 files: Iterable[FileTypes],
675 max_concurrency: int = 5,
676 file_ids: SequenceNotStr[str] = [],
677 poll_interval_ms: int | Omit = omit,
678 chunking_strategy: FileChunkingStrategyParam | Omit = omit,
679 ) -> VectorStoreFileBatch:
680 """Uploads the given files concurrently and then creates a vector store file batch.
681
682 If you've already uploaded certain files that you want to include in this batch
683 then you can pass their IDs through the `file_ids` argument.
684
685 By default, if any file upload fails then an exception will be eagerly raised.
686
687 The number of concurrency uploads is configurable using the `max_concurrency`
688 parameter.
689
690 Note: this method only supports `asyncio` or `trio` as the backing async
691 runtime.
692 """
693 uploaded_files: list[FileObject] = []
694
695 async_library = sniffio.current_async_library()
696
697 if async_library == "asyncio":
698
699 async def asyncio_upload_file(semaphore: asyncio.Semaphore, file: FileTypes) -> None:
700 async with semaphore:
701 file_obj = await self._client.files.create(
702 file=file,
703 purpose="assistants",
704 )
705 uploaded_files.append(file_obj)
706
707 semaphore = asyncio.Semaphore(max_concurrency)
708
709 tasks = [asyncio_upload_file(semaphore, file) for file in files]
710
711 await asyncio.gather(*tasks)
712 elif async_library == "trio":
713 # We only import if the library is being used.
714 # We support Python 3.7 so are using an older version of trio that does not have type information
715 import trio # type: ignore # pyright: ignore[reportMissingTypeStubs]
716
717 async def trio_upload_file(limiter: trio.CapacityLimiter, file: FileTypes) -> None:
718 async with limiter:
719 file_obj = await self._client.files.create(
720 file=file,
721 purpose="assistants",
722 )
723 uploaded_files.append(file_obj)
724
725 limiter = trio.CapacityLimiter(max_concurrency)
726
727 async with trio.open_nursery() as nursery:
728 for file in files:
729 nursery.start_soon(trio_upload_file, limiter, file) # pyright: ignore [reportUnknownMemberType]
730 else:
731 raise RuntimeError(
732 f"Async runtime {async_library} is not supported yet. Only asyncio or trio is supported",
733 )
734
735 batch = await self.create_and_poll(
736 vector_store_id=vector_store_id,
737 file_ids=[*file_ids, *(f.id for f in uploaded_files)],
738 poll_interval_ms=poll_interval_ms,
739 chunking_strategy=chunking_strategy,
740 )
741 return batch
742
743
744class FileBatchesWithRawResponse:
745 def __init__(self, file_batches: FileBatches) -> None:
746 self._file_batches = file_batches
747
748 self.create = _legacy_response.to_raw_response_wrapper(
749 file_batches.create,
750 )
751 self.retrieve = _legacy_response.to_raw_response_wrapper(
752 file_batches.retrieve,
753 )
754 self.cancel = _legacy_response.to_raw_response_wrapper(
755 file_batches.cancel,
756 )
757 self.list_files = _legacy_response.to_raw_response_wrapper(
758 file_batches.list_files,
759 )
760
761
762class AsyncFileBatchesWithRawResponse:
763 def __init__(self, file_batches: AsyncFileBatches) -> None:
764 self._file_batches = file_batches
765
766 self.create = _legacy_response.async_to_raw_response_wrapper(
767 file_batches.create,
768 )
769 self.retrieve = _legacy_response.async_to_raw_response_wrapper(
770 file_batches.retrieve,
771 )
772 self.cancel = _legacy_response.async_to_raw_response_wrapper(
773 file_batches.cancel,
774 )
775 self.list_files = _legacy_response.async_to_raw_response_wrapper(
776 file_batches.list_files,
777 )
778
779
780class FileBatchesWithStreamingResponse:
781 def __init__(self, file_batches: FileBatches) -> None:
782 self._file_batches = file_batches
783
784 self.create = to_streamed_response_wrapper(
785 file_batches.create,
786 )
787 self.retrieve = to_streamed_response_wrapper(
788 file_batches.retrieve,
789 )
790 self.cancel = to_streamed_response_wrapper(
791 file_batches.cancel,
792 )
793 self.list_files = to_streamed_response_wrapper(
794 file_batches.list_files,
795 )
796
797
798class AsyncFileBatchesWithStreamingResponse:
799 def __init__(self, file_batches: AsyncFileBatches) -> None:
800 self._file_batches = file_batches
801
802 self.create = async_to_streamed_response_wrapper(
803 file_batches.create,
804 )
805 self.retrieve = async_to_streamed_response_wrapper(
806 file_batches.retrieve,
807 )
808 self.cancel = async_to_streamed_response_wrapper(
809 file_batches.cancel,
810 )
811 self.list_files = async_to_streamed_response_wrapper(
812 file_batches.list_files,
813 )