diff --git a/training/colossalai/run_sft.py b/training/colossalai/run_sft.py new file mode 100644 index 0000000..c9ad7f0 --- /dev/null +++ b/training/colossalai/run_sft.py @@ -0,0 +1,733 @@ +#!/usr/bin/env python +# coding=utf-8 +# Copyright 2021 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Fine-tuning the library models for causal language modeling (GPT, GPT-2, CTRL, ...) +on a text file or a dataset without using HuggingFace Trainer. + +Here is the full list of checkpoints on the hub that can be fine-tuned by this script: +https://huggingface.co/models?filter=text-generation +""" +# You can also adapt this script on your own causal language modeling task. Pointers for this are left as comments. + +import datetime +import math +import os +import signal +import time +from itertools import chain + +import datasets +import torch +import torch.distributed as dist +from accelerate.utils import set_seed +from context import barrier_context +from datasets import load_dataset +from packaging import version +from torch.utils.data import DataLoader +from tqdm.auto import tqdm + +import colossalai +import transformers +from colossalai.context import ParallelMode +from colossalai.core import global_context as gpc +from colossalai.logging import disable_existing_loggers, get_dist_logger +from colossalai.nn.optimizer import HybridAdam +from colossalai.nn.optimizer.zero_optimizer import ZeroOptimizer +from colossalai.nn.parallel import ZeroDDP +from colossalai.tensor import ProcessGroup +from colossalai.utils import get_current_device, get_dataloader, save_checkpoint +from colossalai.utils.model.colo_init_context import ColoInitContext +from transformers import ( + CONFIG_MAPPING, + MODEL_MAPPING, + AutoConfig, + AutoTokenizer, + GPT2Tokenizer, + AutoModelForCausalLM, + SchedulerType, + default_data_collator, + get_scheduler, +) +from transformers.utils.versions import require_version + +# Explanation: "AutoModelForCausalLM" will instantiate the proper subclass after +# ColossalAI has attempted to do a bunch of meta-programming trickery, so it +# crashes due to missing attributes. To work around that, we need to import the +# subclass - even if we don't use it - so ColossalAI properly patches the inner +# modules. +from transformers import ( + BloomForCausalLM, + OPTForCausalLM, + GPTNeoXForCausalLM, +) + +import re + +# haru SFT stuff +from harubaru_convogpt.dataset import SFTDataset +from harubaru_convogpt.sft import sft_forward + +require_version("datasets>=1.8.0", "To fix: pip install -r examples/pytorch/language-modeling/requirements.txt") + +MODEL_CONFIG_CLASSES = list(MODEL_MAPPING.keys()) +MODEL_TYPES = tuple(conf.model_type for conf in MODEL_CONFIG_CLASSES) + + +def get_time_stamp(): + torch.cuda.synchronize() + return time.time() + + +def parse_args(): + parser = colossalai.get_default_parser() + parser.add_argument( + "--dataset_name", + type=str, + default=None, + help="The name of the dataset to use (via the datasets library).", + ) + parser.add_argument( + "--dataset_config_name", + type=str, + default=None, + help="The configuration name of the dataset to use (via the datasets library).", + ) + parser.add_argument("--train_file", + type=str, + default=None, + help="A csv or a json file containing the training data.") + parser.add_argument("--validation_file", + type=str, + default=None, + help="A csv or a json file containing the validation data.") + parser.add_argument( + "--validation_split_percentage", + default=5, + help="The percentage of the train set used as validation set in case there's no validation split", + ) + parser.add_argument( + "--model_name_or_path", + type=str, + help="Path to pretrained model or model identifier from huggingface.co/models.", + required=True, + ) + parser.add_argument( + "--config_name", + type=str, + default=None, + help="Pretrained config name or path if not the same as model_name", + ) + parser.add_argument( + "--tokenizer_name", + type=str, + default=None, + help="Pretrained tokenizer name or path if not the same as model_name", + ) + parser.add_argument( + "--use_slow_tokenizer", + action="store_true", + help="If passed, will use a slow tokenizer (not backed by the 🤗 Tokenizers library).", + ) + parser.add_argument( + "--per_device_train_batch_size", + type=int, + default=8, + help="Batch size (per device) for the training dataloader.", + ) + parser.add_argument( + "--per_device_eval_batch_size", + type=int, + default=8, + help="Batch size (per device) for the evaluation dataloader.", + ) + parser.add_argument( + "--learning_rate", + type=float, + default=5e-5, + help="Initial learning rate (after the potential warmup period) to use.", + ) + parser.add_argument("--weight_decay", type=float, default=0.0, help="Weight decay to use.") + parser.add_argument("--num_train_epochs", type=int, default=3, help="Total number of training epochs to perform.") + parser.add_argument( + "--max_train_steps", + type=int, + default=None, + help="Total number of training steps to perform. If provided, overrides num_train_epochs.", + ) + parser.add_argument( + "--gradient_accumulation_steps", + type=int, + default=1, + help="Number of updates steps to accumulate before performing a backward/update pass.", + ) + parser.add_argument( + "--lr_scheduler_type", + type=SchedulerType, + default="linear", + help="The scheduler type to use.", + choices=["linear", "cosine", "cosine_with_restarts", "polynomial", "constant", "constant_with_warmup"], + ) + parser.add_argument("--num_warmup_steps", + type=int, + default=0, + help="Number of steps for the warmup in the lr scheduler.") + parser.add_argument("--output_dir", type=str, default=None, help="Where to store the final model.") + parser.add_argument("--seed", type=int, default=None, help="A seed for reproducible training.") + parser.add_argument( + "--model_type", + type=str, + default=None, + help="Model type to use if training from scratch.", + choices=MODEL_TYPES, + ) + parser.add_argument( + "--block_size", + type=int, + default=None, + help=("Optional input sequence length after tokenization. The training dataset will be truncated in block of" + " this size for training. Default to the model max input length for single sentence inputs (take into" + " account special tokens)."), + ) + parser.add_argument( + "--preprocessing_num_workers", + type=int, + default=None, + help="The number of processes to use for the preprocessing.", + ) + parser.add_argument("--overwrite_cache", + type=bool, + default=False, + help="Overwrite the cached training and evaluation sets") + parser.add_argument("--no_keep_linebreaks", + action="store_true", + help="Do not keep line breaks when using TXT files.") + parser.add_argument("--push_to_hub", action="store_true", help="Whether or not to push the model to the Hub.") + parser.add_argument("--hub_model_id", + type=str, + help="The name of the repository to keep in sync with the local `output_dir`.") + parser.add_argument("--hub_token", type=str, help="The token to use to push to the Model Hub.") + parser.add_argument( + "--checkpointing_steps", + type=str, + default=None, + help="Whether the various states should be saved at the end of every n steps, or 'epoch' for each epoch.", + ) + parser.add_argument("-r", + "--resume_from_checkpoint", + type=str, + default=None, + help="If the training should continue from a checkpoint folder.", + ) + parser.add_argument( + "--comment", type=str, help="Experiment comment for the Tensorboard writer." + ) + # NOTE(11b): These last two are useless. + parser.add_argument( + "--with_tracking", + action="store_true", + help="Whether to enable experiment trackers for logging.", + ) + parser.add_argument( + "--report_to", + type=str, + default="all", + help=('The integration to report the results and logs to. Supported platforms are `"tensorboard"`,' + ' `"wandb"` and `"comet_ml"`. Use `"all"` (default) to report to all integrations.' + "Only applicable when `--with_tracking` is passed."), + ) + + parser.add_argument("--mem_cap", type=int, default=0, help="use mem cap") + parser.add_argument("--init_in_cpu", action='store_true', default=False, help="init training model in cpu") + args = parser.parse_args() + + # Sanity checks + if args.dataset_name is None and args.train_file is None and args.validation_file is None: + raise ValueError("Need either a dataset name or a training/validation file.") + else: + if args.train_file is not None: + extension = args.train_file.split(".")[-1] + assert extension in ["csv", "json", "txt"], "`train_file` should be a csv, json or txt file." + if args.validation_file is not None: + extension = args.validation_file.split(".")[-1] + assert extension in ["csv", "json", "txt"], "`validation_file` should be a csv, json or txt file." + + if args.push_to_hub: + assert args.output_dir is not None, "Need an `output_dir` to create a repo when `--push_to_hub` is passed." + + return args + + +def colo_memory_cap(size_in_GB): + from colossalai.utils import colo_device_memory_capacity, colo_set_process_memory_fraction, get_current_device + cuda_capacity = colo_device_memory_capacity(get_current_device()) + if size_in_GB * (1024**3) < cuda_capacity: + colo_set_process_memory_fraction(size_in_GB * (1024**3) / cuda_capacity) + print("Using {} GB of GPU memory".format(size_in_GB)) + + +def main(): + args = parse_args() + disable_existing_loggers() + colossalai.launch_from_torch(config=dict()) + logger = get_dist_logger() + is_main_process = dist.get_rank() == 0 + + if is_main_process: + datasets.utils.logging.set_verbosity_warning() + transformers.utils.logging.set_verbosity_info() + else: + datasets.utils.logging.set_verbosity_error() + transformers.utils.logging.set_verbosity_error() + + if args.mem_cap > 0: + colo_memory_cap(args.mem_cap) + + # If passed along, set the training seed now. + if args.seed is not None: + set_seed(args.seed) + logger.info(f"Rank {dist.get_rank()}: random seed is set to {args.seed}") + + # Handle the repository creation + with barrier_context(): + if args.output_dir is not None: + os.makedirs(args.output_dir, exist_ok=True) + + # Get the datasets: you can either provide your own CSV/JSON/TXT training and evaluation files (see below) + # or just provide the name of one of the public datasets available on the hub at https://huggingface.co/datasets/ + # (the dataset will be downloaded automatically from the datasets Hub). + # + # For CSV/JSON files, this script will use the column called 'text' or the first column if no column called + # 'text' is found. You can easily tweak this behavior (see below). + # + # In distributed training, the load_dataset function guarantee that only one local process can concurrently + # download the dataset. + ''' + logger.info("Start preparing dataset", ranks=[0]) + if args.dataset_name is not None: + # Downloading and loading a dataset from the hub. + raw_datasets = load_dataset(args.dataset_name, args.dataset_config_name) + if "validation" not in raw_datasets.keys(): + raw_datasets["validation"] = load_dataset( + args.dataset_name, + args.dataset_config_name, + split=f"train[:{args.validation_split_percentage}%]", + ) + raw_datasets["train"] = load_dataset( + args.dataset_name, + args.dataset_config_name, + split=f"train[{args.validation_split_percentage}%:]", + ) + else: + data_files = {} + dataset_args = {} + if args.train_file is not None: + data_files["train"] = args.train_file + if args.validation_file is not None: + data_files["validation"] = args.validation_file + extension = args.train_file.split(".")[-1] + if extension == "txt": + extension = "text" + dataset_args["keep_linebreaks"] = not args.no_keep_linebreaks + raw_datasets = load_dataset(extension, data_files=data_files, **dataset_args) + # If no validation data is there, validation_split_percentage will be used to divide the dataset. + if "validation" not in raw_datasets.keys(): + raw_datasets["validation"] = load_dataset( + extension, + data_files=data_files, + split=f"train[:{args.validation_split_percentage}%]", + **dataset_args, + ) + raw_datasets["train"] = load_dataset( + extension, + data_files=data_files, + split=f"train[{args.validation_split_percentage}%:]", + **dataset_args, + ) + logger.info("Dataset is prepared", ranks=[0]) + ''' + + # See more about loading any type of standard or custom dataset (from files, python dict, pandas DataFrame, etc) at + # https://huggingface.co/docs/datasets/loading_datasets.html. + + # Load pretrained model and tokenizer + # + # In distributed training, the .from_pretrained methods guarantee that only one local process can concurrently + # download model & vocab. + if args.config_name: + config = AutoConfig.from_pretrained(args.config_name) + elif args.model_name_or_path: + config = AutoConfig.from_pretrained(args.model_name_or_path) + else: + config = CONFIG_MAPPING[args.model_type]() + logger.warning("You are instantiating a new config instance from scratch.") + logger.info("Model config has been created", ranks=[0]) + + if args.model_name_or_path == 'facebook/opt-13b': + tokenizer = GPT2Tokenizer.from_pretrained(args.model_name_or_path) + else: + print(f'load model from {args.model_name_or_path}') + tokenizer = AutoTokenizer.from_pretrained(args.model_name_or_path, use_fast=not args.use_slow_tokenizer) + logger.info(f"{tokenizer.__class__.__name__} has been created", ranks=[0]) + + if args.init_in_cpu: + init_dev = torch.device('cpu') + else: + init_dev = get_current_device() + + # build model + if args.model_name_or_path is None or args.model_name_or_path == 'facebook/opt-13b': + # currently, there has a bug in pretrained opt-13b + # we can not import it until huggingface fix it + logger.info("Train a new model from scratch", ranks=[0]) + with ColoInitContext(device=init_dev): + model = AutoModelForCausalLM(config) + else: + logger.info("Finetune a pre-trained model", ranks=[0]) + with ColoInitContext(device=init_dev): + model = AutoModelForCausalLM.from_pretrained(args.model_name_or_path, + from_tf=bool(".ckpt" in args.model_name_or_path), + config=config, + local_files_only=False) + + # enable graident checkpointing + model.gradient_checkpointing_enable() + + PLACEMENT_POLICY = 'auto' + cai_version = colossalai.__version__ + logger.info(f'using Colossal-AI version {cai_version}') + if version.parse(cai_version) > version.parse("0.1.10"): + from colossalai.nn.parallel import GeminiDDP + model = GeminiDDP(model, device=get_current_device(), placement_policy=PLACEMENT_POLICY, pin_memory=True) + elif version.parse(cai_version) <= version.parse("0.1.10") and version.parse(cai_version) >= version.parse("0.1.9"): + from colossalai.gemini import ChunkManager, GeminiManager + pg = ProcessGroup() + chunk_size = ChunkManager.search_chunk_size(model, 64 * 1024**2, 32) + chunk_manager = ChunkManager(chunk_size, + pg, + enable_distributed_storage=True, + init_device=GeminiManager.get_default_device(PLACEMENT_POLICY)) + gemini_manager = GeminiManager(PLACEMENT_POLICY, chunk_manager) + model = ZeroDDP(model, gemini_manager) + + logger.info(f'{model.__class__.__name__} has been created', ranks=[0]) + + ''' + # Preprocessing the datasets. + # First we tokenize all the texts. + column_names = raw_datasets["train"].column_names + text_column_name = "text" if "text" in column_names else column_names[0] + + def tokenize_function(examples): + return tokenizer(examples[text_column_name]) + + with barrier_context(executor_rank=0, parallel_mode=ParallelMode.DATA): + tokenized_datasets = raw_datasets.map( + tokenize_function, + batched=True, + num_proc=args.preprocessing_num_workers, + remove_columns=column_names, + load_from_cache_file=not args.overwrite_cache, + desc="Running tokenizer on dataset", + ) + ''' + + if args.block_size is None: + block_size = tokenizer.model_max_length + if block_size > 1024: + logger.warning( + f"The tokenizer picked seems to have a very large `model_max_length` ({tokenizer.model_max_length}). " + "Picking 1024 instead. You can change that default value by passing --block_size xxx.") + block_size = 1024 + else: + if args.block_size > tokenizer.model_max_length: + logger.warning(f"The block_size passed ({args.block_size}) is larger than the maximum length for the model" + f"({tokenizer.model_max_length}). Using block_size={tokenizer.model_max_length}.") + block_size = min(args.block_size, tokenizer.model_max_length) + + ''' + # Main data processing function that will concatenate all texts from our dataset and generate chunks of block_size. + def group_texts(examples): + # Concatenate all texts. + concatenated_examples = {k: list(chain(*examples[k])) for k in examples.keys()} + total_length = len(concatenated_examples[list(examples.keys())[0]]) + # We drop the small remainder, we could add padding if the model supported it instead of this drop, you can + # customize this part to your needs. + if total_length >= block_size: + total_length = (total_length // block_size) * block_size + # Split by chunks of max_len. + result = { + k: [t[i:i + block_size] for i in range(0, total_length, block_size) + ] for k, t in concatenated_examples.items() + } + result["labels"] = result["input_ids"].copy() + return result + + # Note that with `batched=True`, this map processes 1,000 texts together, so group_texts throws away a remainder + # for each of those groups of 1,000 texts. You can adjust that batch_size here but a higher value might be slower + # to preprocess. + # + # To speed up this part, we use multiprocessing. See the documentation of the map method for more information: + # https://huggingface.co/docs/datasets/package_reference/main_classes.html#datasets.Dataset.map + + with barrier_context(executor_rank=0, parallel_mode=ParallelMode.DATA): + lm_datasets = tokenized_datasets.map( + group_texts, + batched=True, + num_proc=args.preprocessing_num_workers, + load_from_cache_file=not args.overwrite_cache, + desc=f"Grouping texts in chunks of {block_size}", + ) + ''' + + # train_dataset = lm_datasets["train"] + # eval_dataset = lm_datasets["validation"] + + tokenizer.pad_token = tokenizer.eos_token + # tokenizer.add_special_tokens({'pad_token': '[PAD]'}) + + train_dataset = SFTDataset(args.train_file, tokenizer) + eval_dataset = SFTDataset(args.validation_file, tokenizer) + + # Log a few random samples from the training set: + # for index in random.sample(range(len(train_dataset)), 3): + # logger.info(f"Sample {index} of the training set: {train_dataset[index]}.") + + def collate_fn(batches): + input_ids = [ + batch["input_ids"].squeeze(0) for batch in batches + ] + # padded_tokens = {"input_ids": input_ids} + padded_tokens = tokenizer.pad( + {"input_ids": input_ids}, return_tensors="pt", padding=True + ) + start_positions = torch.stack( + [batch["start_positions"] for batch in batches] + ) + end_positions = torch.stack( + [batch["end_positions"] for batch in batches] + ) + return { + "input_ids": padded_tokens["input_ids"], + "attention_mask": padded_tokens["attention_mask"], + "start_positions": start_positions, + "end_positions": end_positions, + } + + # DataLoaders creation: + train_dataloader = get_dataloader(train_dataset, + shuffle=True, + add_sampler=True, + collate_fn=collate_fn, + batch_size=args.per_device_train_batch_size) + eval_dataloader = DataLoader(eval_dataset, + collate_fn=collate_fn, + batch_size=args.per_device_eval_batch_size) + logger.info("Dataloaders have been created", ranks=[0]) + + # Optimizer + # Split weights in two groups, one with weight decay and the other not. + no_decay = ["bias", "LayerNorm.weight"] + optimizer_grouped_parameters = [ + { + "params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)], + "weight_decay": args.weight_decay, + }, + { + "params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)], + "weight_decay": 0.0, + }, + ] + + optimizer = HybridAdam(optimizer_grouped_parameters, lr=args.learning_rate) + optimizer = ZeroOptimizer(optimizer, model, initial_scale=2**14) + + # Scheduler and math around the number of training steps. + overrode_max_train_steps = False + num_update_steps_per_epoch = math.ceil(len(train_dataloader) / args.gradient_accumulation_steps) + if args.max_train_steps is None: + args.max_train_steps = args.num_train_epochs * num_update_steps_per_epoch + overrode_max_train_steps = True + + lr_scheduler = get_scheduler( + name=args.lr_scheduler_type, + optimizer=optimizer, + num_warmup_steps=args.num_warmup_steps, + num_training_steps=args.max_train_steps, + ) + + if args.resume_from_checkpoint is not None: + # FIXME(11b): Implement this properly. Need to save/restore all the other + # state as well (optimizer, LR scheduler, dataloader position via step counter...) + logger.info(f"Resuming from checkpoint {args.resume_from_checkpoint}", ranks=[0]) + colossalai.utils.load_checkpoint(args.resume_from_checkpoint, model, optimizer, lr_scheduler) + + # We need to recalculate our total training steps as the size of the training dataloader may have changed. + num_update_steps_per_epoch = math.ceil(len(train_dataloader) / args.gradient_accumulation_steps) + if overrode_max_train_steps: + args.max_train_steps = args.num_train_epochs * num_update_steps_per_epoch + # Afterwards we recalculate our number of training epochs + args.num_train_epochs = math.ceil(args.max_train_steps / num_update_steps_per_epoch) + + # Train! + total_batch_size = args.per_device_train_batch_size * gpc.get_world_size(ParallelMode.DATA) + + logger.info("***** Running training *****", ranks=[0]) + logger.info(f" Num examples = {len(train_dataset)}", ranks=[0]) + logger.info(f" Num Epochs = {args.num_train_epochs}", ranks=[0]) + logger.info(f" Instantaneous batch size per device = {args.per_device_train_batch_size}", ranks=[0]) + logger.info(f" Total train batch size (w. parallel, distributed & accumulation) = {total_batch_size}", ranks=[0]) + logger.info(f" Gradient Accumulation steps = {args.gradient_accumulation_steps}", ranks=[0]) + logger.info(f" Total optimization steps = {args.max_train_steps}", ranks=[0]) + + now = datetime.datetime.now() + run_name = now.strftime("%Y-%m-%dT_%H-%M-%S%z") + writer = torch.utils.tensorboard.SummaryWriter(log_dir=f"{args.output_dir}/runs/{run_name}", comment=args.comment) + + # Only show the progress bar once on each machine. + progress_bar = tqdm(range(args.max_train_steps), disable=not is_main_process) + completed_steps = 0 + starting_epoch = 0 + global_step = 0 + + step_from_checkpoint = 0 + if args.resume_from_checkpoint is not None: + step_from_checkpoint = int(re.findall(r"epoch_\d+_step_(\d+).pt", args.resume_from_checkpoint)[0]) + + # Add supervised finetuning forward method to model + model.sft_forward = sft_forward.__get__(model) + + for epoch in range(starting_epoch, args.num_train_epochs): + + if completed_steps >= args.max_train_steps: + break + + model.train() + for step, batch in enumerate(train_dataloader): + if step < step_from_checkpoint: + completed_steps += 1 + global_step += 1 + progress_bar.update(1) + + # Apparently ColossalAI's checkpoint utilities don't work + # correctly for saving/restore the LR scheduler? So we "step" it + # manually here. + lr_scheduler.step() + continue + + batch = {k: v.cuda() for k, v in batch.items()} + # outputs = model.sft_forward(use_cache=False, **batch) # Caching is incompatible with gradient checkpointing. + outputs = model.sft_forward( + input_ids=batch["input_ids"], + attention_mask=batch["attention_mask"], + start_positions=batch["start_positions"], + end_positions=batch["end_positions"], + ) + loss = outputs['loss'] + optimizer.backward(loss) + + if step % args.gradient_accumulation_steps == 0 or step == len(train_dataloader) - 1: + optimizer.step() + lr_scheduler.step() + optimizer.zero_grad() + progress_bar.update(1) + completed_steps += 1 + + global_step += 1 + logger.info("Global step {} finished".format(global_step + 1), ranks=[0]) + + try: + train_perplexity = math.exp(loss) + except OverflowError: + train_perplexity = float("inf") + writer.add_scalar("Train/Perplexity (Step)", train_perplexity, global_step) + writer.add_scalar("Train/Loss (Step)", loss, global_step) + writer.add_scalar("Train/Learning Rate (Step)", lr_scheduler.get_last_lr()[-1], global_step) + + if args.output_dir is not None and args.checkpointing_steps is not None: + if args.checkpointing_steps != "epoch" and completed_steps % int(args.checkpointing_steps) == 0: + checkpoint_path = f'{args.output_dir}/epoch_{epoch}_step_{completed_steps}.pt' + logger.info(f" Saving iter checkpoint...", ranks=[0]) + save_checkpoint(checkpoint_path, epoch, model, optimizer, lr_scheduler) + logger.info(f" Saved checkpoint to {checkpoint_path}!", ranks=[0]) + + if True and completed_steps % (int(args.checkpointing_steps) * 8) == 0: + # Evaluate every X checkpoints. + model.eval() + losses = [] + for step, batch in enumerate(eval_dataloader): + with torch.no_grad(): + batch = {k: v.cuda() for k, v in batch.items()} + outputs = model.sft_forward(**batch) + + loss = outputs['loss'].unsqueeze(0) + losses.append(loss) + + losses = torch.cat(losses) + losses = losses[:len(eval_dataset)] + try: + eval_loss = torch.mean(losses) + perplexity = math.exp(eval_loss) + except OverflowError: + perplexity = float("inf") + logger.info(f"Step {global_step}: perplexity: {perplexity} eval_loss: {eval_loss}", ranks=[0]) + model.train() + + if completed_steps >= args.max_train_steps: + break + + # Evaluate per epoch. + if False: + model.eval() + losses = [] + for step, batch in enumerate(eval_dataloader): + with torch.no_grad(): + batch = {k: v.cuda() for k, v in batch.items()} + outputs = model(**batch) + + loss = outputs['loss'].unsqueeze(0) + losses.append(loss) + + losses = torch.cat(losses) + losses = losses[:len(eval_dataset)] + try: + eval_loss = torch.mean(losses) + perplexity = math.exp(eval_loss) + except OverflowError: + perplexity = float("inf") + + logger.info(f"Epoch {epoch}: perplexity: {perplexity} eval_loss: {eval_loss}", ranks=[0]) + # TODO(11b): This messes up the intra-epoch graphs. Apparently I need to + # read up on the Tensorboard docs to do this properly. Ignoring for now. + # writer.add_scalar("Eval/Loss (Global Step)", eval_loss, completed_steps) + # writer.add_scalar("Eval/Perplexity (Global Step)", perplexity, completed_steps) + + if args.output_dir is not None and args.checkpointing_steps == "epoch": + checkpoint_path = f'{args.output_dir}/epoch_{epoch}_step_{completed_steps}.pt' + logger.info(f" Saving epoch checkpoint...", ranks=[0]) + save_checkpoint(checkpoint_path, epoch, model, optimizer, lr_scheduler) + logger.info(f" Saved checkpoint to {checkpoint_path}!", ranks=[0]) + + if args.output_dir is not None: + checkpoint_path = f'{args.output_dir}/epoch_{epoch}_step_{completed_steps}.pt' + logger.info(f" Saving final checkpoint...", ranks=[0]) + save_checkpoint(checkpoint_path, epoch, model, optimizer, lr_scheduler) + logger.info(f" Saved checkpoint to {checkpoint_path}!", ranks=[0]) + + logger.info("Training finished", ranks=[0]) + + +if __name__ == "__main__": + main() diff --git a/training/harubaru-convogpt/dataset.py b/training/harubaru-convogpt/dataset.py new file mode 100644 index 0000000..e63b653 --- /dev/null +++ b/training/harubaru-convogpt/dataset.py @@ -0,0 +1,137 @@ +import os +import struct +import torch +import argparse +import numpy as np +import transformers +import json +from typing import Tuple + +def decode(in_file: str, out_file: str, tokenizer: transformers.AutoTokenizer) -> int: + mem = np.memmap(in_file, mode="r", dtype="uint16") + tokens = len(mem) + with open(out_file, "a") as f: + for token in mem: + f.write(tokenizer.decode([token])) + return tokens + +def encode(in_file: str, out_file: str, tokenizer: transformers.AutoTokenizer) -> int: + with open(in_file, "r", encoding="utf-8") as f: + text = f.read() + tokens = tokenizer.encode(text) + with open(out_file, "wb") as f: + for token in tokens: + f.write(np.uint16(token)) + return len(tokens) + +class TokenizedDataset(torch.utils.data.Dataset): + """ + Consumes a flat binary file containing 16-bit token serialization, aligned + along `context_length` chunks. + """ + + def __init__(self, path: str, context_length: int = 2048): + file_stat = os.stat(path) + self.file = open(path, 'rb') + self.length = int(file_stat.st_size / 2 / context_length) + self.formatstr = '%sH' % context_length + self.context_length = context_length + length_mb = os.stat(path).st_size / 1024.0 / 1024.0 + num_tokens = self.length * context_length + print(f"DATASET: {path}") + print(f"DATASET SIZE: {length_mb:,.2f}mb, {num_tokens:,} tokens, " + f"{self.length:,} contexts") + + def __len__(self) -> int: + return self.length + + def load(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]: + self.seek(idx) + input_ids = torch.tensor( + struct.unpack(self.formatstr, + self.file.read(self.context_length * 2))) + mask = torch.zeros(self.context_length) + return input_ids, mask + + def seek(self, idx): + self.file.seek(self.context_length * idx * 2) + + def __getitem__(self, idx) -> Tuple[torch.Tensor, torch.Tensor]: + return self.load(idx) + +class FeedbackDataset(torch.utils.data.Dataset): + def __init__(self, feedback_file: str, tokenizer: transformers.AutoTokenizer, max_length: int = 512): + self.tokenizer = tokenizer + self.max_length = max_length + self.feedback_file = feedback_file + + with open(feedback_file) as f: + self.feedback = [json.loads(line) for line in f] + + def __len__(self): + return len(self.feedback) + + def __getitem__(self, idx): + feedback = self.feedback[idx] + feedback_input = '\n'.join(feedback["input"].split("\n")[-2:]) + feedback_str = f'{feedback_input} {feedback["output"].lstrip().rstrip()}' + seq = self.tokenizer( + feedback_str, + padding="max_length", + truncation=True, + return_tensors="pt" + ) + reward = torch.tensor([feedback["reward"]]).unsqueeze(0) + return seq, reward + +# sft file example +# { +# "input": "Anonymous: Hi, how are you?\nGPT:", +# "output": " I'm good, how are you?\n", +# "reward": 0.0 +# } +import tqdm +class SFTDataset(torch.utils.data.Dataset): + def __init__(self, sft_file: str, tokenizer: transformers.AutoTokenizer, max_length: int = 2048): + self.tokenizer = tokenizer + self.max_length = max_length + self.sft_file = sft_file + + with open(sft_file) as f: + self.sft = [json.loads(line) for line in f] + + # iterate over sft, removing any that have a reward of 0 + self.sft = [sft for sft in self.sft if sft["reward"] != 0.0] + + # iterate over sft, removing any that have too many tokens + for feedback in tqdm.tqdm(self.sft, desc="Validating SFT"): + inputs = feedback["input"] + f' {feedback["output"].lstrip().rstrip()}\n' + if len(self.tokenizer(inputs).input_ids) > self.max_length: + self.sft.remove(feedback) + print(f"Removed {feedback['output']} due to length") + + def __len__(self): + return len(self.sft) + + def __getitem__(self, idx): + sft = self.sft[idx] + sft_input_tokens = self.tokenizer(sft["input"], return_tensors="pt").input_ids + sft_output_tokens = self.tokenizer(f' {sft["output"].lstrip().rstrip()}\n', return_tensors="pt").input_ids + input_ids = torch.cat([sft_input_tokens, sft_output_tokens], dim=-1) + start_positions = torch.tensor([len(sft_input_tokens[0])]) + end_positions = torch.tensor([len(sft_input_tokens[0]) + len(sft_output_tokens[0]) - 1]) + return { + "input_ids": input_ids, + "start_positions": start_positions, + "end_positions": end_positions, + } + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Dataset Creator') + parser.add_argument('--in_file', type=str, help='input file to use', required=True) + parser.add_argument('--out_file', type=str, help='output file to use', required=True) + parser.add_argument('--model', type=str, help='model tokenizer to use', required=True) + args = parser.parse_args() + + encode(args.in_file, args.out_file, transformers.AutoTokenizer.from_pretrained(args.model)) diff --git a/training/harubaru-convogpt/sft.py b/training/harubaru-convogpt/sft.py new file mode 100644 index 0000000..3ed583d --- /dev/null +++ b/training/harubaru-convogpt/sft.py @@ -0,0 +1,276 @@ +import os +import torch +import accelerate +import tqdm +import time +import argparse +import wandb + +from dataset import TokenizedDataset, FeedbackDataset, SFTDataset + +from transformers import AutoModelForCausalLM, AutoTokenizer +from transformers.modeling_outputs import CausalLMOutput + +from typing import Union, Optional + +# Supervised Finetuning: Compute loss between model output and target using start_positions and end_positions +def sft_forward( + self, + input_ids: Optional[torch.LongTensor] = None, + attention_mask: Optional[torch.FloatTensor] = None, + token_type_ids: Optional[torch.LongTensor] = None, + position_ids: Optional[torch.LongTensor] = None, + head_mask: Optional[torch.FloatTensor] = None, + inputs_embeds: Optional[torch.FloatTensor] = None, + start_positions: Optional[torch.LongTensor] = None, + end_positions: Optional[torch.LongTensor] = None, + output_attentions: Optional[bool] = None, + output_hidden_states: Optional[bool] = None, + return_dict: Optional[bool] = None, +) -> Union[torch.Tensor, CausalLMOutput]: + try: + return_dict = return_dict if return_dict is not None else self.config.use_return_dict + except AttributeError: + return_dict = True + + outputs = self.transformer( + input_ids, + attention_mask=attention_mask, + token_type_ids=token_type_ids, + position_ids=position_ids, + head_mask=head_mask, + inputs_embeds=inputs_embeds, + output_attentions=output_attentions, + output_hidden_states=output_hidden_states, + return_dict=return_dict, + ) + + sequence_output = outputs[0] + + logits = self.lm_head(sequence_output) + + answer_logits = logits[:, start_positions[0]:end_positions[0]+1] + answer_input_ids = input_ids[:, start_positions[0]:end_positions[0]+1] + + # compute loss for prompt and answer + loss_fct = torch.nn.CrossEntropyLoss(ignore_index=-1) + shift_answer_logits = answer_logits[..., :-1, :].contiguous() + shift_answer_labels = answer_input_ids[..., 1:].contiguous() + answer_loss = loss_fct(shift_answer_logits.view(-1, answer_logits.size(-1)), shift_answer_labels.view(-1)) + + loss = answer_loss + + if not return_dict: + output = (loss,) + outputs[2:] + return ((loss,) + outputs[2:]) if return_dict else output + + return CausalLMOutput( + loss=loss, + logits=logits, + hidden_states=outputs.hidden_states, + attentions=outputs.attentions, + ) + +class SFT_Trainer: + def __init__( + self, + accelerator: accelerate.Accelerator, + model: AutoModelForCausalLM, + tokenizer: AutoTokenizer, + train_dataloader: torch.utils.data.DataLoader, + optimizer: torch.optim.Optimizer, + weight_dtype: torch.dtype, + args: argparse.Namespace, + ) -> None: + self.accelerator = accelerator + self.model = model + self.tokenizer = tokenizer + self.train_dataloader = train_dataloader + self.optimizer = optimizer + self.weight_dtype = weight_dtype + self.args = args + + if accelerator.is_main_process: + self.progress_bar = tqdm.tqdm( + total=self.args.epochs*len(train_dataloader), + desc="Total Steps", + leave=False, + ) + + self.run = wandb.init( + project="convogpt-sftlm", + name=f'{self.args.model}-{self.args.epochs}-{self.args.batch_size}-{self.args.learning_rate}--{int(time.time())}', + config=self.args, + ) + + self.global_step = 0 + + def save_model(self) -> None: + self.accelerator.wait_for_everyone() + if self.accelerator.is_main_process: + path = f'{self.args.output_dir}/{self.run.name}' + os.makedirs(path, exist_ok=True) + unwrapped_model = self.accelerator.unwrap_model(self.model) + unwrapped_model.save_pretrained(path, save_function=self.accelerator.save) + + def step(self, batch: dict) -> None: + with self.accelerator.accumulate(self.model): + input_ids = batch['input_ids'] + attention_mask = batch['attention_mask'] + start_positions = batch['start_positions'] + end_positions = batch['end_positions'] + + try: + outputs = sft_forward( + self.model, + input_ids=input_ids, + attention_mask=attention_mask, + start_positions=start_positions, + end_positions=end_positions, + ) + + loss = outputs.loss + self.accelerator.backward(loss) + if self.accelerator.sync_gradients: + self.accelerator.clip_grad_norm_(self.model.parameters(), 1.0) + self.optimizer.step() + self.optimizer.zero_grad() + except RuntimeError as e: + print(f"RuntimeError: {e}") + print(f"input_ids: {input_ids}") + print(f"attention_mask: {attention_mask}") + print(f"start_positions: {start_positions}") + print(f"end_positions: {end_positions}") + print('Skipping batch...') + loss = torch.tensor(float('nan'), device=self.accelerator.device) + + return { + "train/loss": loss.detach().item(), + } + + def train(self) -> None: + self.model.train() + for epoch in range(self.args.epochs): + for _, batch in enumerate(self.train_dataloader): + step_start = time.perf_counter() + + #print(f"####\n{self.tokenizer.decode(batch['input_ids'][0])}\n#{batch['start_positions'][0]}:{batch['end_positions'][0]}\n####") + + metrics = self.step(batch) + + step_end = time.perf_counter() + + if self.accelerator.is_main_process: + rank_samples_per_second = self.args.batch_size / (step_end - step_start) + world_samples_per_second = rank_samples_per_second * self.accelerator.num_processes + + metrics.update({ + "perf/rank_samples_per_second": rank_samples_per_second, + "perf/world_samples_per_second": world_samples_per_second, + "train/epoch": epoch, + "train/step": self.global_step, + "train/samples_seen": self.global_step * self.args.batch_size, + }) + + self.global_step += 1 + + self.progress_bar.update(1) + self.progress_bar.set_postfix(**metrics) + + self.run.log(metrics, step=self.global_step) + + if self.global_step % self.args.save_steps == 0: + self.save_model() + self.accelerator.wait_for_everyone() + self.save_model() + +def main() -> None: + + parser = argparse.ArgumentParser(description="Supervised GPT finetuning") + parser.add_argument("--model", type=str, default="hakurei/gpt-j-random-tinier", help="Model name") + parser.add_argument("--dataset", type=str, default="train.jsonl", help="Training file") + parser.add_argument("--output_dir", type=str, default="output", help="Output directory") + parser.add_argument("--epochs", type=int, default=1, help="Number of epochs") + parser.add_argument("--batch_size", type=int, default=1, help="Batch size") + parser.add_argument("--save_steps", type=int, default=1000, help="Save model every x steps") + parser.add_argument("--learning_rate", type=float, default=1e-4, help="Learning rate") + args = parser.parse_args() + + accelerator = accelerate.Accelerator() + accelerate.utils.set_seed(42) + + tokenizer = AutoTokenizer.from_pretrained(args.model) + tokenizer.pad_token = tokenizer.eos_token + + def collate_fn(batches): + input_ids = [ + batch["input_ids"].squeeze(0) for batch in batches + ] + padded_tokens = tokenizer.pad( + {"input_ids": input_ids}, return_tensors="pt", padding=True + ) + start_positions = torch.stack( + [batch["start_positions"] for batch in batches] + ) + end_positions = torch.stack( + [batch["end_positions"] for batch in batches] + ) + return { + "input_ids": padded_tokens["input_ids"], + "attention_mask": padded_tokens["attention_mask"], + "start_positions": start_positions, + "end_positions": end_positions, + } + + train_dataset = SFTDataset(args.dataset, tokenizer) + + train_dataloader = torch.utils.data.DataLoader( + train_dataset, + batch_size=args.batch_size, + shuffle=True, + collate_fn=collate_fn, + ) + + model = AutoModelForCausalLM.from_pretrained(args.model) + optimizer = torch.optim.AdamW(model.parameters(), lr=args.learning_rate) + + model, optimizer, train_dataloader = accelerator.prepare( + model, optimizer, train_dataloader + ) + + trainer = SFT_Trainer( + accelerator=accelerator, + model=model, + tokenizer=tokenizer, + train_dataloader=train_dataloader, + optimizer=optimizer, + weight_dtype=None, + args=args, + ) + + trainer.train() + +if __name__ == '__main__': + """ + # Load model and tokenizer + model = AutoModelForCausalLM.from_pretrained('distilgpt2') + tokenizer = AutoTokenizer.from_pretrained('distilgpt2') + + # Add supervised finetuning forward method to model + model.forward = sft_forward.__get__(model) + + # Create input tensors + question = 'What is the capital of France?' + answer = 'The capital of France is Paris.' + question_tokens = tokenizer.encode(question, return_tensors='pt') + answer_tokens = tokenizer.encode(answer, return_tensors='pt') + input_ids = torch.cat([question_tokens, answer_tokens], dim=-1) + + start_positions = torch.tensor([len(question_tokens[0])]) + end_positions = torch.tensor([len(question_tokens[0]) + len(answer_tokens[0]) - 1]) + + # Compute loss + loss = model(input_ids, start_positions=start_positions, end_positions=end_positions).loss + print(loss) + """ + main() diff --git a/training/supervised-finetune.bash b/training/supervised-finetune.bash new file mode 100644 index 0000000..79104c1 --- /dev/null +++ b/training/supervised-finetune.bash @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +set -x + +export BATCH_SIZE=2 +export MODEL="EleutherAI/pythia-1.3b-deduped" +export NUMBER_OF_GPUS=1 +export OUTPUT_DIR="checkpoints" +LOG_NAME=$(date "+%Y-%m-%d_%H-%M-%S") + +# Set HuggingFace Datasets to offline mode by default: since we're using local +# JSON files, hitting their servers means something went wrong. If you're doing +# something else, adjust this accordingly. +export HF_DATASETS_OFFLINE=1 + +# HuggingFace transformers should be allowed to hit their servers though, to +# download pre-trained models during the first execution for example. +# export TRANSFORMERS_OFFLINE=1 + +mkdir -p "$OUTPUT_DIR/logs" +mkdir -p "$OUTPUT_DIR/runs" + +torchrun \ + --nproc_per_node ${NUMBER_OF_GPUS} \ + --master_port 19198 \ + ./colossalai/run_sft.py \ + --train_file "./data/train.json" \ + --validation_file "./data/eval.json" \ + --learning_rate "5.0e-5" \ + --checkpointing_steps 64 \ + --block_size 1024 \ + --mem_cap 0 \ + --lr_scheduler_type "cosine" \ + --num_warmup_steps 100 \ + --model_name_or_path "$MODEL" \ + --output_dir "$OUTPUT_DIR" \ + --num_train_epochs 1 \ + --per_device_eval_batch_size "$BATCH_SIZE" \ + --per_device_train_batch_size "$BATCH_SIZE" "$@" \ + 2>&1 | tee "$OUTPUT_DIR/logs/$LOG_NAME.log"