Commit 9d559b06
openai/api_resources/file.py
@@ -1,6 +1,7 @@
import json
import os
from typing import cast
+import time
import openai
from openai import api_requestor, util, error
@@ -259,3 +260,20 @@ class File(ListableAPIResource, DeletableAPIResource):
)
).get("data", [])
return cls.__find_matching_files(name, bytes, all_files, purpose)
+
+ @classmethod
+ def wait_for_processing(cls, id, max_wait_seconds=30 * 60):
+ TERMINAL_STATES = ["processed", "error", "deleted"]
+
+ start = time.time()
+ file = cls.retrieve(id=id)
+ while file.status not in TERMINAL_STATES:
+ file = cls.retrieve(id=id)
+ time.sleep(5.0)
+ if time.time() - start > max_wait_seconds:
+ raise openai.error.OpenAIError(
+ message="Giving up on waiting for file {id} to finish processing after {max_wait_seconds} seconds.".format(
+ id=id, max_wait_seconds=max_wait_seconds
+ )
+ )
+ return file.status
openai/cli.py
@@ -606,6 +606,201 @@ class FineTune:
)
+class FineTuningJob:
+ @classmethod
+ def list(cls, args):
+ has_ft_jobs = False
+ for fine_tune_job in openai.FineTuningJob.auto_paging_iter():
+ has_ft_jobs = True
+ print(fine_tune_job)
+ if not has_ft_jobs:
+ print("No fine-tuning jobs found.")
+
+ @classmethod
+ def _is_url(cls, file: str):
+ return file.lower().startswith("http")
+
+ @classmethod
+ def _download_file_from_public_url(cls, url: str) -> Optional[bytes]:
+ resp = requests.get(url)
+ if resp.status_code == 200:
+ return resp.content
+ else:
+ return None
+
+ @classmethod
+ def _maybe_upload_file(
+ cls,
+ file: Optional[str] = None,
+ content: Optional[bytes] = None,
+ user_provided_file: Optional[str] = None,
+ check_if_file_exists: bool = True,
+ ):
+ # Exactly one of `file` or `content` must be provided
+ if (file is None) == (content is None):
+ raise ValueError("Exactly one of `file` or `content` must be provided")
+
+ if content is None:
+ assert file is not None
+ with open(file, "rb") as f:
+ content = f.read()
+
+ if check_if_file_exists:
+ bytes = len(content)
+ matching_files = openai.File.find_matching_files(
+ name=user_provided_file or f.name,
+ bytes=bytes,
+ purpose="fine-tune",
+ )
+ if len(matching_files) > 0:
+ file_ids = [f["id"] for f in matching_files]
+ sys.stdout.write(
+ "Found potentially duplicated files with name '{name}', purpose 'fine-tune', and size {size} bytes\n".format(
+ name=os.path.basename(matching_files[0]["filename"]),
+ size=matching_files[0]["bytes"]
+ if "bytes" in matching_files[0]
+ else matching_files[0]["size"],
+ )
+ )
+ sys.stdout.write("\n".join(file_ids))
+ while True:
+ sys.stdout.write(
+ "\nEnter file ID to reuse an already uploaded file, or an empty string to upload this file anyway: "
+ )
+ inp = sys.stdin.readline().strip()
+ if inp in file_ids:
+ sys.stdout.write(
+ "Reusing already uploaded file: {id}\n".format(id=inp)
+ )
+ return inp
+ elif inp == "":
+ break
+ else:
+ sys.stdout.write(
+ "File id '{id}' is not among the IDs of the potentially duplicated files\n".format(
+ id=inp
+ )
+ )
+
+ buffer_reader = BufferReader(content, desc="Upload progress")
+ resp = openai.File.create(
+ file=buffer_reader,
+ purpose="fine-tune",
+ user_provided_filename=user_provided_file or file,
+ )
+ sys.stdout.write(
+ "Uploaded file from {file}: {id}\n".format(
+ file=user_provided_file or file, id=resp["id"]
+ )
+ )
+ sys.stdout.write("Waiting for file to finish processing before proceeding..\n")
+ sys.stdout.flush()
+ status = openai.File.wait_for_processing(resp["id"])
+ if status != "processed":
+ raise openai.error.OpenAIError(
+ "File {id} failed to process, status={status}.".format(
+ id=resp["id"], status=status
+ )
+ )
+
+ sys.stdout.write(
+ "File {id} finished processing and is ready for use in fine-tuning".format(
+ id=resp["id"]
+ )
+ )
+ sys.stdout.flush()
+ return resp["id"]
+
+ @classmethod
+ def _get_or_upload(cls, file, check_if_file_exists=True):
+ try:
+ # 1. If it's a valid file, use it
+ openai.File.retrieve(file)
+ return file
+ except openai.error.InvalidRequestError:
+ pass
+ if os.path.isfile(file):
+ # 2. If it's a file on the filesystem, upload it
+ return cls._maybe_upload_file(
+ file=file, check_if_file_exists=check_if_file_exists
+ )
+ if cls._is_url(file):
+ # 3. If it's a URL, download it temporarily
+ content = cls._download_file_from_public_url(file)
+ if content is not None:
+ return cls._maybe_upload_file(
+ content=content,
+ check_if_file_exists=check_if_file_exists,
+ user_provided_file=file,
+ )
+ return file
+
+ @classmethod
+ def create(cls, args):
+ create_args = {
+ "training_file": cls._get_or_upload(
+ args.training_file, args.check_if_files_exist
+ ),
+ }
+ if args.validation_file:
+ create_args["validation_file"] = cls._get_or_upload(
+ args.validation_file, args.check_if_files_exist
+ )
+
+ for param in ("model", "suffix"):
+ attr = getattr(args, param)
+ if attr is not None:
+ create_args[param] = attr
+
+ if getattr(args, "n_epochs"):
+ create_args["hyperparameters"] = {
+ "n_epochs": args.n_epochs,
+ }
+
+ resp = openai.FineTuningJob.create(**create_args)
+ print(resp)
+ return
+
+ @classmethod
+ def get(cls, args):
+ resp = openai.FineTuningJob.retrieve(id=args.id)
+ print(resp)
+
+ @classmethod
+ def results(cls, args):
+ fine_tune = openai.FineTuningJob.retrieve(id=args.id)
+ if "result_files" not in fine_tune or len(fine_tune["result_files"]) == 0:
+ raise openai.error.InvalidRequestError(
+ f"No results file available for fine-tune {args.id}", "id"
+ )
+ result_file = openai.FineTuningJob.retrieve(id=args.id)["result_files"][0]
+ resp = openai.File.download(id=result_file)
+ print(resp.decode("utf-8"))
+
+ @classmethod
+ def events(cls, args):
+ seen, has_more = 0, True
+ while has_more:
+ resp = openai.FineTuningJob.list_events(id=args.id) # type: ignore
+ for event in resp["data"]:
+ print(event)
+ seen += 1
+ if args.limit is not None and seen >= args.limit:
+ return
+ has_more = resp["has_more"]
+
+ @classmethod
+ def follow(cls, args):
+ raise openai.error.OpenAIError(
+ message="Event streaming is not yet supported for `fine_tuning.job` events"
+ )
+
+ @classmethod
+ def cancel(cls, args):
+ resp = openai.FineTuningJob.cancel(id=args.id)
+ print(resp)
+
+
class WandbLogger:
@classmethod
def sync(cls, args):
@@ -1098,6 +1293,83 @@ Mutually exclusive with `top_p`.""",
sub.add_argument("--prompt", type=str)
sub.set_defaults(func=Audio.translate)
+ # FineTuning Jobs
+ sub = subparsers.add_parser("fine_tuning.job.list")
+ sub.set_defaults(func=FineTuningJob.list)
+
+ sub = subparsers.add_parser("fine_tuning.job.create")
+ sub.add_argument(
+ "-t",
+ "--training_file",
+ required=True,
+ help="JSONL file containing either chat-completion or prompt-completion examples for training. "
+ "This can be the ID of a file uploaded through the OpenAI API (e.g. file-abcde12345), "
+ 'a local file path, or a URL that starts with "http".',
+ )
+ sub.add_argument(
+ "-v",
+ "--validation_file",
+ help="JSONL file containing either chat-completion or prompt-completion examples for validation. "
+ "This can be the ID of a file uploaded through the OpenAI API (e.g. file-abcde12345), "
+ 'a local file path, or a URL that starts with "http".',
+ )
+ sub.add_argument(
+ "--no_check_if_files_exist",
+ dest="check_if_files_exist",
+ action="store_false",
+ help="If this argument is set and training_file or validation_file are file paths, immediately upload them. If this argument is not set, check if they may be duplicates of already uploaded files before uploading, based on file name and file size.",
+ )
+ sub.add_argument(
+ "-m",
+ "--model",
+ help="The model to start fine-tuning from",
+ )
+ sub.add_argument(
+ "--suffix",
+ help="If set, this argument can be used to customize the generated fine-tuned model name."
+ "All punctuation and whitespace in `suffix` will be replaced with a "
+ "single dash, and the string will be lower cased. The max "
+ "length of `suffix` is 18 chars. "
+ "The generated name will match the form `ft:{base_model}:{org-title}:{suffix}:{rstring}` where `rstring` "
+ "is a random string sortable as a timestamp. "
+ 'For example, `openai api fine_tuning.job.create -t test.jsonl -m gpt-3.5-turbo-0613 --suffix "first finetune!" '
+ "could generate a model with the name "
+ "ft:gpt-3.5-turbo-0613:your-org:first-finetune:7p4PqAoY",
+ )
+ sub.add_argument(
+ "--n_epochs",
+ type=int,
+ help="The number of epochs to train the model for. An epoch refers to one "
+ "full cycle through the training dataset.",
+ )
+ sub.set_defaults(func=FineTuningJob.create)
+
+ sub = subparsers.add_parser("fine_tuning.job.get")
+ sub.add_argument("-i", "--id", required=True, help="The id of the fine-tune job")
+ sub.set_defaults(func=FineTuningJob.get)
+
+ sub = subparsers.add_parser("fine_tuning.job.results")
+ sub.add_argument("-i", "--id", required=True, help="The id of the fine-tune job")
+ sub.set_defaults(func=FineTuningJob.results)
+
+ sub = subparsers.add_parser("fine_tuning.job.events")
+ sub.add_argument("-i", "--id", required=True, help="The id of the fine-tune job")
+ sub.add_argument(
+ "--limit",
+ type=int,
+ required=False,
+ help="The number of events to return, starting from most recent. If not specified, all events will be returned.",
+ )
+ sub.set_defaults(func=FineTuningJob.events)
+
+ sub = subparsers.add_parser("fine_tuning.job.follow")
+ sub.add_argument("-i", "--id", required=True, help="The id of the fine-tune job")
+ sub.set_defaults(func=FineTuningJob.follow)
+
+ sub = subparsers.add_parser("fine_tuning.job.cancel")
+ sub.add_argument("-i", "--id", required=True, help="The id of the fine-tune job")
+ sub.set_defaults(func=FineTuningJob.cancel)
+
def wandb_register(parser):
subparsers = parser.add_subparsers(