diff --git a/.gitignore b/.gitignore index 6ba79042..c7c05bb2 100644 --- a/.gitignore +++ b/.gitignore @@ -402,3 +402,11 @@ wandb/ # ignore run log files **/nohup.out + +# auto test +tests/cache +tests/ckpt +.ipynb_checkpoints +tests/dataset_download +"core.python" +tests/dataset_download.ipynb diff --git a/README.md b/README.md index 5c065815..fcb6d794 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,12 @@ Be careful that the ‘core’ is in the PyPI project name. pip install oslo-core ``` +## CentOS OS case install +```console +# sh gcc_install.sh +# pip install oslo-core +``` + ## Administrative Notes ### Citing OSLO diff --git a/gcc_install.sh b/gcc_install.sh new file mode 100644 index 00000000..b315e6f8 --- /dev/null +++ b/gcc_install.sh @@ -0,0 +1,10 @@ +#~/bin/bash +# install for gcc +yum install libaio-devel -y +yum install centos-release-scl -y +yum-config-manager --enable rhel-server-rhscl-7-rpms -y +yum install devtoolset-8 -y +yum install llvm-toolset-7 -y +sudo yum -y install llvm-toolset-7-clang-analyzer llvm-toolset-7-clang-tools-extra +sudo yum -y install pdsh +scl enable devtoolset-8 llvm-toolset-7 bash diff --git a/requirements.txt b/requirements.txt index 3618d7b7..ed55ce1e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ pybind11 scipy torch >= 1.11.0 transformers +wandb diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/inference.py b/tests/inference.py new file mode 100644 index 00000000..1be7e349 --- /dev/null +++ b/tests/inference.py @@ -0,0 +1,105 @@ +import os +from argparse import ArgumentParser +from functools import partial + +from transformers import ( + AutoModelForCausalLM, + AutoModelForMaskedLM, + AutoModelForSeq2SeqLM, + AutoModelForSequenceClassification, + AutoTokenizer, +) + +import oslo +from tests.util.oslo import initialize_oslo, print_rank_0 + +os.environ["TOKENIZERS_PARALLELISM"] = "true" + +parser = ArgumentParser() +parser.add_argument("--local-rank", default=0, type=int) +parser.add_argument("--task", required=True, type=str) +parser.add_argument("--model", required=True, type=str) +parser.add_argument("--tokenizer", default=None, type=str) +parser.add_argument("--input", default=None, type=str) +parser.add_argument("--tensor_parallel_size", default=1, type=int) +parser.add_argument("--data_parallel_size", default=1, type=int) +parser.add_argument("--pipeline_parallel_size", default=1, type=int) +parser.add_argument("--tensor_parallel_depth", default=1, type=int) +parser.add_argument("--tensor_parallel_mode", default="1D", type=str) + +args = parser.parse_args() +generation_task = args.task not in ["causal-lm", "seq2seq-lm"] +args.tokenizer = args.tokenizer if args.tokenizer else args.model + +# 1. Create a tokenizer +tokenizer = AutoTokenizer.from_pretrained(args.tokenizer) + +if tokenizer.pad_token is None: + tokenizer.pad_token = tokenizer.eos_token + +# 2. Define tasks and config +TASKS = { + "masked-lm": { + "class": AutoModelForMaskedLM.from_pretrained, + "example": f"Manners maketh man. Do you {tokenizer.mask_token} what that means?", + "output": lambda output: tokenizer.decode(output.logits.argmax(-1)[0]), + }, + "sequence-classification": { + "class": AutoModelForSequenceClassification.from_pretrained, + "example": "I will decide how I feel, I will be happy today.", + "output": lambda output: output.logits.argmax(-1).item(), + }, + "causal-lm": { + "class": AutoModelForCausalLM.from_pretrained, + "example": "I don't want a lot for Christmas. There is just one thing", + "output": lambda output: tokenizer.decode(output[0]), + }, + "seq2seq-lm": { + "class": AutoModelForSeq2SeqLM.from_pretrained, + "example": "Life was like a box of chocolates. You never know what you’re gonna get.", + "output": lambda output: tokenizer.decode(output[0]), + }, +} + + +assert args.task in TASKS, ( + f"{args.task} is not supported task. " + f"Please choose one of {list(TASKS.keys())}. " + "If there are no major problems, it will work for other tasks as well, " + "but I haven't tested it, so if you encounter any problems, " + "please report them through the github issue." +) + +make_result = ( + lambda input, before, after: "\n" + f"Result :\n" + f"> Input: {input}\n" + f"> Output (before OSLO): {TASKS[args.task]['output'](before)}\n" + f"> Output (after OSLO): {TASKS[args.task]['output'](after)}\n" +) + +# 3. Create a model and input +model = TASKS[args.task]["class"](args.model) +input = args.input if args.input is not None else TASKS[args.task]["example"] +forward_fn = model.forward if generation_task else partial(model.generate, num_beams=3) + +if args.task == "causal-lm": + input_data = tokenizer(input, return_tensors="pt") + del input_data["attention_mask"] +else: + input_data = tokenizer(input, return_tensors="pt") + +# 4. Get result before parallelization +output_before = forward_fn(**input_data) + +# 5. Parallelize the model +model_oslo, parallel_context = initialize_oslo(args, model) +forward_fn = ( + model_oslo.forward if generation_task else partial(model_oslo.generate, num_beams=3) +) + +# 6. Get result after parallelization +output_after = forward_fn(**input_data.to("cuda")) + +# 7. Print the results +print_rank_0(make_result(input, output_before, output_after), parallel_context) diff --git a/tests/merge.py b/tests/merge.py new file mode 100644 index 00000000..dfa272eb --- /dev/null +++ b/tests/merge.py @@ -0,0 +1,84 @@ +import os +import random +import numpy as np +import torch +import torch.distributed as dist +import transformers +import oslo + +from copy import deepcopy +from tensorboardX import SummaryWriter +from datasets import load_dataset +from torch.optim import AdamW +from torch.utils.data.distributed import DistributedSampler +from torch.utils.data import DataLoader +from transformers import AutoTokenizer + +from tqdm import tqdm +from tests.tasks.model_task import ModelTask +from oslo import ParallelContext, ParallelMode +from oslo.torch.nn.parallel import ( + TensorParallel, + PipelineParallel, + DistributedDataParallel, +) +from tests.util.arg_parser import get_args + +# Define tensor parallel mode +tensor_parallel_mode_map = { + "1D": ParallelMode.TENSOR_1D, + "2D": ParallelMode.TENSOR_2D, + "2D_ROW": ParallelMode.TENSOR_2D_ROW, + "2D_COL": ParallelMode.TENSOR_2D_COL, + "2P5D": ParallelMode.TENSOR_2P5D, + "2P5D_ROW": ParallelMode.TENSOR_2P5D_ROW, + "2P5D_COL": ParallelMode.TENSOR_2P5D_COL, + "2P5D_DEP": ParallelMode.TENSOR_2P5D_DEP, + "2P5D_XZ": ParallelMode.TENSOR_2P5D_XZ, + "3D": ParallelMode.TENSOR_3D, + "3D_INPUT": ParallelMode.TENSOR_3D_INPUT, + "3D_WEIGHT": ParallelMode.TENSOR_3D_WEIGHT, + "3D_OUTPUT": ParallelMode.TENSOR_3D_OUTPUT, +} + + +def main(): + args = get_args() + name = ( + f"{args.model}-{args.task}-" + f"bsz={args.batch_size}-" + f"len={args.sequence_length}" + ) + + args.local_rank = int(os.environ["LOCAL_RANK"]) + print(args.local_rank) + + # 1. Create parallelized model + model_tasks = ModelTask() + model_tasks_config = model_tasks.get_model_task(args.task) + model_oslo = model_tasks_config["class"](args.model) + + parallel_context = ParallelContext.from_torch( + data_parallel_size=args.data_parallel_size, + pipeline_parallel_size=args.pipeline_parallel_size, + tensor_parallel_size=args.tensor_parallel_size, + tensor_parallel_mode=tensor_parallel_mode_map[args.tensor_parallel_mode], + tensor_parallel_depth=args.tensor_parallel_depth, + ) + + model_oslo = TensorParallel(model_oslo, parallel_context) + oslo.ready(model_oslo, parallel_context) + + # 2. Load parallelized model + model_oslo.from_parallelized(path=args.merge_dir) + + # 3. Save and merge model checkpoint + saved_merge_dir = args.merge_dir + "_merge" + model_oslo.save_pretrained(save_directory=saved_merge_dir, merge_checkpoints=True) + + if torch.distributed.get_rank() == 0: + print("Complete checkpoint merge") + + +if __name__ == "__main__": + main() diff --git a/tests/tasks/model_task.py b/tests/tasks/model_task.py new file mode 100644 index 00000000..dc306cf1 --- /dev/null +++ b/tests/tasks/model_task.py @@ -0,0 +1,139 @@ +import torch +import os + +from functools import partial +from datasets import load_dataset +from transformers import ( + AutoModelForCausalLM, + AutoModelForSeq2SeqLM, + AutoModelForSequenceClassification, + AutoTokenizer, +) + +os.environ["TOKENIZERS_PARALLELISM"] = "true" + + +class ModelTask: + def __init__(self): + """ + Define model task + """ + self.tasks = { + "sequence-classification": { + "class": partial( + AutoModelForSequenceClassification.from_pretrained, num_labels=3 + ), + "load_dataset": load_dataset( + "pietrolesci/gpt3_nli", split="train", cache_dir="tests/cache" + ), + "preprocessing_map_func": self.mli_task_map_func, + }, + "causal-lm": { + "class": AutoModelForCausalLM.from_pretrained, + "load_dataset": load_dataset( + "squad", + split="train", + cache_dir="tests/cache", + ), + "preprocessing_map_func": self.causal_lm_task_map_func, + }, + "seq2seq": { + "class": AutoModelForSeq2SeqLM.from_pretrained, + "load_dataset": load_dataset( + "squad", + split="train", + cache_dir="tests/cache", + ), + "preprocessing_map_func": self.seq2seq_task_map_func, + }, + } + + def get_model_task(self, task): + + assert task in self.tasks, ( + f"{task} is not supported task. " + f"Please choose one of {list(self.tasks.keys())}. " + "If there are no major problems, it will work for other tasks as well, " + "but I haven't tested it, so if you encounter any problems, " + "please report them through the github issue." + ) + + return self.tasks[task] + + def mli_task_map_func(self, dataset, tokenizer, args): + def preprocess(row_datas): + input_texts = [] + labels = [] + + for text_a, text_b, label in zip( + row_datas["text_a"], row_datas["text_b"], row_datas["label"] + ): + input_texts.append(f"{str(text_a)}\n{str(text_b)}") + labels.append(label) + + input_text = tokenizer( + input_texts, + max_length=args.sequence_length, + return_tensors="pt", + padding="max_length", + truncation=True, + ) + + ret_labels = torch.tensor(labels, dtype=torch.long) + + return {**input_text, "labels": ret_labels} + + dataset = dataset.select(range(args.train_step)) + return dataset.map( + preprocess, + batched=True, + remove_columns=["text_a", "text_b", "label"], + ).with_format("torch") + + def causal_lm_task_map_func(self, dataset, tokenizer, args): + def preprocess(row_datas): + input_text = tokenizer( + row_datas["context"], + max_length=args.sequence_length, + return_tensors="pt", + padding="max_length", + truncation=True, + ) + + return {**input_text, "labels": input_text["input_ids"]} + + dataset = dataset.select(range(args.train_step)) + + return dataset.map( + preprocess, + batched=True, + remove_columns=["id", "title", "context", "question", "answers"], + ).with_format("torch") + + def seq2seq_task_map_func(self, dataset, tokenizer, args): + def preprocess(row_datas): + input_text = tokenizer( + row_datas["context"], + max_length=args.sequence_length, + return_tensors="pt", + padding="max_length", + truncation=True, + ) + + label_text = tokenizer( + row_datas["question"], + max_length=args.sequence_length, + return_tensors="pt", + padding="max_length", + truncation=True, + ) + + return {**input_text, "labels": label_text["input_ids"]} + + dataset = dataset.select(range(args.train_step)) + + return dataset.map( + preprocess, + batched=True, + remove_columns=["id", "title", "context", "question", "answers"], + ).with_format("torch") diff --git a/tests/test_all.py b/tests/test_all.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/test_script/run_inference.sh b/tests/test_script/run_inference.sh new file mode 100644 index 00000000..94627548 --- /dev/null +++ b/tests/test_script/run_inference.sh @@ -0,0 +1,17 @@ +## inference shell code +# EXAMPLE: ``sh ./tests/test_script/run_inference.sh 4 bert-base-cased masked-lm `` +# EXAMPLE: ``sh ./tests/test_script/run_inference.sh 4 ishan/bert-base-uncased-mnli sequence-classification `` +# EXAMPLE: ``sh ./tests/test_script/run_inference.sh 4 gpt2 causal-lm `` +# EXAMPLE: ``sh ./tests/test_script/run_inference.sh 4 EleutherAI/gpt-neo-1.3B causal-lm `` +# EXAMPLE: ``sh ./tests/test_script/run_inference.sh 4 t5-base seq2seq-lm `` + +NUM_GPUS=$1 +MODEL=$2 +TASK=$3 + +python -m torch.distributed.launch \ + --nproc_per_node="$NUM_GPUS" \ + ./tests/inference.py \ + --task=$TASK \ + --model=$MODEL \ + --tensor_parallel_size="$NUM_GPUS" diff --git a/tests/test_script/run_merge.sh b/tests/test_script/run_merge.sh new file mode 100644 index 00000000..f5167c7a --- /dev/null +++ b/tests/test_script/run_merge.sh @@ -0,0 +1,42 @@ +########################################################### +# If you use only two gpu example +# Checkpoint directory : tests/ckpt/checkpoint_0 +# saved merge directory: tests/ckpt/checkpoint_0_merge +########################################################### + +# EXAMPLE merge TP case BERT:`sh ./tests/test_script/run_merge.sh ishan/bert-base-uncased-mnli sequence-classification 2 1 1 2 1` + +# EXAMPLE merge TP case GPT:`sh ./tests/test_script/run_merge.sh gpt2 causal-lm 2 1 1 2 1` + +# EXAMPLE merge TP case T5:`sh ./tests/test_script/run_merge.sh t5-base seq2seq 2 1 1 2 1` + + +MODEL=$1 +TASK=$2 + +NUM_GPUS=$3 +DATA_PARALLEL_SIZE=$4 +PIPELINE_PARALLEL_SIZE=$5 +TENSOR_PARALLEL_SIZE=$6 +TENSOR_PARALLEL_DEPTH=$7 + +# tensor parallel mode +# "1D", "2D", "2D_ROW", "2D_COL", "2P5D", "2P5D_ROW", "2P5D_COL" +# "2P5D_DEP", "2P5D_XZ", "3D", "3D_INPUT", "3D_WEIGHT", "3D_OUTPUT" +TENSOR_PARALLEL_MODE=1D +MERGE_DIR=tests/ckpt/checkpoint_0 + +run_cmd="torchrun --standalone --nproc_per_node=${NUM_GPUS} \ + ./tests/merge.py \ + --task=$TASK \ + --model=$MODEL \ + --tensor_parallel_size=$TENSOR_PARALLEL_SIZE \ + --data_parallel_size=$DATA_PARALLEL_SIZE \ + --pipeline_parallel_size=$PIPELINE_PARALLEL_SIZE \ + --tensor_parallel_mode=$TENSOR_PARALLEL_MODE \ + --tensor_parallel_depth=$TENSOR_PARALLEL_DEPTH \ + --merge_dir=$MERGE_DIR + " + +echo ${run_cmd} +eval ${run_cmd} diff --git a/tests/test_script/run_train.sh b/tests/test_script/run_train.sh new file mode 100644 index 00000000..d0d0c5d7 --- /dev/null +++ b/tests/test_script/run_train.sh @@ -0,0 +1,71 @@ +# EXAMPLE TP case BERT:`sh ./tests/test_script/run_train.sh ishan/bert-base-uncased-mnli sequence-classification 2 128 128 100 1 1 1 2 1 1 1D` + +# EXAMPLE DP case BERT:`sh ./tests/test_script/run_train.sh ishan/bert-base-uncased-mnli sequence-classification 2 128 128 100 1 2 1 1 1 1 1D` + + +# EXAMPLE TP case GPT:`sh ./tests/test_script/run_train.sh gpt2 causal-lm 2 64 64 100 1 1 1 2 1 1 1D` + +# EXAMPLE TP case T5:`sh ./tests/test_script/run_train.sh t5-base seq2seq 2 64 128 100 1 1 1 2 1 1 1D` + + +# Check a checkpoint result on wandbv + +# Task specific model +# - BERT case +# - Sequence classification +# - ishan/bert-base-uncased-mnli + +# Task specific model +# - GPT case +# - causal-lm +# - gpt2 + +# Task specific model +# - T5 case +# - seq2seq +# - t5-base + +######################################### +# !!Feature still in development +# 1. Pipeline parallelism +# 2. Tensor parallelism + data pallelism +######################################### + +MODEL=$1 +TASK=$2 + +# Define variable of parallel model setting +NUM_GPUS=$3 +BATCH_SIZE=$4 +SEQ_LENGTH=$5 +TRAIN_STEP=$6 +TOTAL_TRAIN_STEP=$((TRAIN_STEP*BATCH_SIZE)) +SAVE_INTERVAL=$7 +DATA_PARALLEL_SIZE=$8 +PIPELINE_PARALLEL_SIZE=$9 +TENSOR_PARALLEL_SIZE=${10} +TENSOR_PARALLEL_DEPTH=${11} +EPOCH=${12} +# tensor parallel mode +# "1D", "2D", "2D_ROW", "2D_COL", "2P5D", "2P5D_ROW", "2P5D_COL" +# "2P5D_DEP", "2P5D_XZ", "3D", "3D_INPUT", "3D_WEIGHT", "3D_OUTPUT" +TENSOR_PARALLEL_MODE=${13} + +run_cmd="torchrun --standalone --nproc_per_node=${NUM_GPUS} \ + ./tests/training.py \ + --task=$TASK \ + --model=$MODEL \ + --batch_size=$BATCH_SIZE \ + --sequence_length=$SEQ_LENGTH \ + --train_step=$TOTAL_TRAIN_STEP \ + --save_interval=$SAVE_INTERVAL \ + --epoch=$EPOCH \ + --tensor_parallel_size=$TENSOR_PARALLEL_SIZE \ + --data_parallel_size=$DATA_PARALLEL_SIZE \ + --pipeline_parallel_size=$PIPELINE_PARALLEL_SIZE \ + --tensor_parallel_mode=$TENSOR_PARALLEL_MODE \ + --tensor_parallel_depth=$TENSOR_PARALLEL_DEPTH + " + +echo ${run_cmd} +eval ${run_cmd} diff --git a/tests/torch/_C/__init__.py b/tests/torch/_C/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/torch/_C/test_loading.py b/tests/torch/_C/test_loading.py deleted file mode 100644 index c3640ba2..00000000 --- a/tests/torch/_C/test_loading.py +++ /dev/null @@ -1,113 +0,0 @@ -""" -python3 test_loading.py -""" -from oslo.torch._C import ( - CPUAdamBinder, - CPUAdagradBinder, - FusedAdagradBinder, - FusedAdamBinder, - FusedNovogradBinder, - FusedLambBinder, - FusedLayerNormBinder, - FusedSGDBinder, - FusedMixedPrecisionLambBinder, - FusedMixedPrecisionL2NormBinder, - FusedL2NormBinder, - ExpertParallelBinder, - NgramRepeatBlockBinder, -) - - -def test_cpu_adam_bind(): - print("> Test load CPUAdam...", end="") - CPUAdamBinder().bind() - print("OK") - - -def test_cpu_adagrad_bind(): - print("> Test load CPUAdagrad...", end="") - CPUAdagradBinder().bind() - print("OK") - - -def test_fused_adagrad_bind(): - print("> Test load FusedAdagrad...", end="") - FusedAdagradBinder().bind() - print("OK") - - -def test_fused_adam_bind(): - print("> Test load FusedAdam...", end="") - FusedAdamBinder().bind() - print("OK") - - -def test_fused_novograd_bind(): - print("> Test load FusedNovograd...", end="") - FusedNovogradBinder().bind() - print("OK") - - -def test_fused_lamb_bind(): - print("> Test load FusedLamb...", end="") - FusedLambBinder().bind() - print("OK") - - -def test_fused_layer_norm_bind(): - print("> Test load FusedLayerNorm...", end="") - FusedLayerNormBinder().bind() - print("OK") - - -def test_fused_sgd_bind(): - print("> Test load FusedSGD...", end="") - FusedSGDBinder().bind() - print("OK") - - -def test_fused_mixed_precision_lamb_bind(): - print("> Test load FusedMixedPrecisionLamb...", end="") - FusedMixedPrecisionLambBinder().bind() - print("OK") - - -def test_fused_mixed_precision_l2_norm_bind(): - print("> Test load FusedMixedPrecisionL2Norm...", end="") - FusedMixedPrecisionL2NormBinder().bind() - print("OK") - - -def test_fused_l2_norm_bind(): - print("> Test load FusedL2Norm...", end="") - FusedL2NormBinder().bind() - print("OK") - - -def test_expert_parallel_bind(): - print("> Test load ExpertParallel...", end="") - ExpertParallelBinder().bind() - print("OK") - - -def test_ngram_repeat_block_bind(): - print("> Test load NgramRepeatBlock...", end="") - NgramRepeatBlockBinder().bind() - print("OK") - - -if __name__ == "__main__": - print("Test tests/torch/_C/test_loading.py") - test_cpu_adam_bind() - test_cpu_adagrad_bind() - test_fused_adagrad_bind() - test_fused_adam_bind() - test_fused_novograd_bind() - test_fused_lamb_bind() - test_fused_layer_norm_bind() - test_fused_sgd_bind() - test_fused_mixed_precision_lamb_bind() - test_fused_mixed_precision_l2_norm_bind() - test_fused_l2_norm_bind() - test_expert_parallel_bind() - test_ngram_repeat_block_bind() diff --git a/tests/torch/__init__.py b/tests/torch/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/torch/nn/__init__.py b/tests/torch/nn/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/torch/nn/modules/__init__.py b/tests/torch/nn/modules/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/torch/nn/modules/conv/__init__.py b/tests/torch/nn/modules/conv/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/torch/nn/modules/conv/test_conv.py b/tests/torch/nn/modules/conv/test_conv.py deleted file mode 100644 index 7176ce25..00000000 --- a/tests/torch/nn/modules/conv/test_conv.py +++ /dev/null @@ -1,43 +0,0 @@ -""" -python3 test_conv.py -""" -import torch -from transformers.modeling_utils import Conv1D - -import oslo.torch.nn as onn - - -def test_conv1d(): - transformers_conv = Conv1D(10, 10).cuda() - onn_conv = onn.Conv1D(10, 10).cuda() - onn_conv_skip = onn.Conv1D(10, 10, skip_bias_add=True).cuda() - - # make sure the parameters are the same - onn_conv.load_state_dict(transformers_conv.state_dict()) - onn_conv_skip.load_state_dict(transformers_conv.state_dict()) - - print("> Test weight shape...", end="") - assert transformers_conv.weight.shape == onn_conv.weight.shape - print("OK") - - print("> Test bias shape...", end="") - assert transformers_conv.bias.shape == onn_conv.bias.shape - print("OK") - - print("> Test forward...", end="") - input_tensor = torch.randn(1, 10, 10).cuda() - assert torch.allclose(transformers_conv(input_tensor), onn_conv(input_tensor)) - print("OK") - - print("> Test forward skip bias add...", end="") - input_tensor = torch.randn(1, 10, 10).cuda() - transformers_output = transformers_conv(input_tensor) - onn_output, bias = onn_conv_skip(input_tensor) - onn_output += bias - assert torch.allclose(transformers_output, onn_output) - print("OK") - - -if __name__ == "__main__": - print("Test tests/torch/nn/modules/conv/test_conv.py") - test_conv1d() diff --git a/tests/torch/nn/modules/dropout/__init__.py b/tests/torch/nn/modules/dropout/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/torch/nn/modules/dropout/test_dropout.py b/tests/torch/nn/modules/dropout/test_dropout.py deleted file mode 100644 index b5e4d4ea..00000000 --- a/tests/torch/nn/modules/dropout/test_dropout.py +++ /dev/null @@ -1,37 +0,0 @@ -""" -python3 test_dropout.py -""" -import torch - -import oslo.torch.nn as onn - - -def test_fused_bias_dropout(): - # create linear layers - onn_linear = onn.Linear(10, 10, skip_bias_add=True).cuda() - torch_linear = torch.nn.Linear(10, 10).cuda() - - # make sure the parameters are the same - torch_linear.load_state_dict(onn_linear.state_dict()) - - # create dropout layers - onn_dropout = onn.FusedBiasDropout(0.2).cuda() - torch_dropout = torch.nn.Dropout(0.2).cuda() - - # create input - input_tensor = torch.randn(1, 10, requires_grad=True).cuda() - - # forward pass - onn_output = onn_dropout(*onn_linear(input_tensor)).squeeze() - torch_output = torch_dropout(torch_linear(input_tensor)).squeeze() - - print("> Test forward...", end="") - for o1, o2 in zip(onn_output, torch_output): - if o1 != 0 and o2 != 0: - assert torch.allclose(o1, o2) - print("OK") - - -if __name__ == "__main__": - print("Test tests/torch/nn/modules/dropout/test_dropout.py") - test_fused_bias_dropout() diff --git a/tests/torch/nn/modules/embedding/__init__.py b/tests/torch/nn/modules/embedding/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/torch/nn/modules/embedding/test_embedding_1d.py b/tests/torch/nn/modules/embedding/test_embedding_1d.py deleted file mode 100644 index 98d8b78a..00000000 --- a/tests/torch/nn/modules/embedding/test_embedding_1d.py +++ /dev/null @@ -1,63 +0,0 @@ -""" -torchrun --nproc_per_node=4 test_embedding_1d.py -""" -from copy import deepcopy - -import torch -import torch.distributed as dist - -from oslo.torch.distributed import ParallelContext, ParallelMode -from oslo.torch.nn import Embedding1D -from oslo.torch.nn.parallel.tensor_parallel.utils import split_1d - - -def test_embedding_1d(pc): - batch_size = 2 - seq_len = 4 - hidden_dim = 8 - world_size = pc.get_world_size(ParallelMode.TENSOR_1D) - - input_ = torch.LongTensor([[0, 1, 6, 3], [5, 2, 7, 9]]).cuda() - target = torch.randn((batch_size, seq_len, hidden_dim)).cuda() - dist.broadcast(input_, src=0) - dist.broadcast(target, src=0) - - embedding = torch.nn.Embedding(16, 8).cuda() - w = deepcopy(embedding.weight.data) - - out = embedding(input_) - optimizer = torch.optim.Adam(embedding.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(out, target) - loss.backward() - optimizer.step() - out_update = embedding(input_) - - w = split_1d(w, world_size, dim=-1, parallel_context=pc) - embedding_1d = Embedding1D(16, 8, parallel_context=pc) - embedding_1d.weight.data = w - - pout = embedding_1d(input_) - optimizer = torch.optim.Adam(embedding_1d.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(pout, target) - loss.backward() - optimizer.step() - pout_update = embedding_1d(input_) - - if pc.get_global_rank() == 0: - print("> Test forward...", end="") - assert torch.allclose(out, pout) - if pc.get_global_rank() == 0: - print("OK") - - if pc.get_global_rank() == 0: - print("> Test backward...", end="") - assert torch.allclose(out_update, pout_update) - if pc.get_global_rank() == 0: - print("OK") - - -if __name__ == "__main__": - pc = ParallelContext.from_torch(tensor_parallel_size=4) - if pc.get_global_rank() == 0: - print("Test tests/torch/nn/modules/embedding/test_embedding_1d.py") - test_embedding_1d(pc) diff --git a/tests/torch/nn/modules/embedding/test_embedding_2d.py b/tests/torch/nn/modules/embedding/test_embedding_2d.py deleted file mode 100644 index 57210582..00000000 --- a/tests/torch/nn/modules/embedding/test_embedding_2d.py +++ /dev/null @@ -1,73 +0,0 @@ -""" -torchrun --nproc_per_node=4 test_embedding_2d.py -""" -from copy import deepcopy - -import torch -import torch.distributed as dist - -from oslo.torch.nn.parallel.tensor_parallel.utils import ( - split_2d, - split_embedding_2d, - split_batch_2d, - gather_2d, -) -from oslo.torch.distributed import ParallelContext, ParallelMode -from oslo.torch.nn import Embedding2D - - -def test_embedding_2d(pc): - summa_dim = pc.get_world_size(ParallelMode.TENSOR_2D_COL) - input_ = torch.LongTensor([[0, 1, 6, 3, 8], [5, 2, 7, 4, 9]]).cuda() - target = torch.randn((2, 5, 8)).cuda() - dist.broadcast(input_, src=0) - dist.broadcast(target, src=0) - - embedding = torch.nn.Embedding(16, 8).cuda() - w = deepcopy(embedding.weight.data) - - out = embedding(input_) - optimizer = torch.optim.Adam(embedding.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(out, target) - loss.backward() - optimizer.step() - out_update = embedding(input_) - - input_ = split_batch_2d(input_, summa_dim, parallel_context=pc) - target = split_2d(target, summa_dim, parallel_context=pc) - w = split_embedding_2d(w, summa_dim, parallel_context=pc) - - embedding_2d = Embedding2D(16, 8, parallel_context=pc) - embedding_2d.weight.data.copy_(w) - - pout = embedding_2d(input_) - optimizer = torch.optim.Adam(embedding_2d.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(pout, target) - loss.backward() - optimizer.step() - - pout_update = embedding_2d(input_) - pout = gather_2d(pout, summa_dim, parallel_context=pc) - pout_update = gather_2d(pout_update, summa_dim, parallel_context=pc) - - if pc.get_global_rank() == 0: - print("> Test forward...", end="") - assert torch.allclose(out, pout) - if pc.get_global_rank() == 0: - print("OK") - - if pc.get_global_rank() == 0: - print("> Test backward...", end="") - assert torch.allclose(out_update, pout_update) - if pc.get_global_rank() == 0: - print("OK") - - -if __name__ == "__main__": - pc = ParallelContext.from_torch( - tensor_parallel_size=4, - tensor_parallel_mode=ParallelMode.TENSOR_2D, - ) - if pc.get_global_rank() == 0: - print("Test tests/torch/nn/modules/embedding/test_embedding_2d.py") - test_embedding_2d(pc) diff --git a/tests/torch/nn/modules/embedding/test_embedding_2p5d.py b/tests/torch/nn/modules/embedding/test_embedding_2p5d.py deleted file mode 100644 index a1720a08..00000000 --- a/tests/torch/nn/modules/embedding/test_embedding_2p5d.py +++ /dev/null @@ -1,78 +0,0 @@ -""" -torchrun --nproc_per_node=4 test_embedding_2p5d.py -""" -from copy import deepcopy - -import torch -import torch.distributed as dist - -from oslo.torch.nn.parallel.tensor_parallel.utils import ( - split_batch_2p5d, - split_2p5d, - split_embedding_2p5d, - gather_2p5d, -) -from oslo.torch.distributed import ParallelContext, ParallelMode -from oslo.torch.nn import Embedding2p5D - - -def test_embedding_2p5d(pc): - batch_size = 2 - seq_len = 5 - num_embeddings = 16 - embedding_dim = 8 - tesseract_dim = pc.get_world_size(ParallelMode.TENSOR_2P5D_COL) - - input_ = torch.LongTensor([[0, 1, 6, 3, 8], [5, 2, 7, 4, 9]]).cuda() - target = torch.randn((batch_size, seq_len, embedding_dim)).cuda() - dist.broadcast(input_, src=0) - dist.broadcast(target, src=0) - embedding = torch.nn.Embedding(num_embeddings, embedding_dim).cuda() - w = deepcopy(embedding.weight.data) - - out = embedding(input_) - optimizer = torch.optim.Adam(embedding.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(out, target) - loss.backward() - optimizer.step() - out_update = embedding(input_) - - input_ = split_batch_2p5d(input_, tesseract_dim, parallel_context=pc) - target = split_2p5d(target, tesseract_dim, parallel_context=pc) - w = split_embedding_2p5d(w, tesseract_dim, dim=-1, parallel_context=pc) - - embedding_2p5d = Embedding2p5D(num_embeddings, embedding_dim, parallel_context=pc) - embedding_2p5d.weight.data.copy_(w) - - pout = embedding_2p5d(input_) - optimizer = torch.optim.Adam(embedding_2p5d.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(pout, target) - loss.backward() - optimizer.step() - - pout_update = embedding_2p5d(input_) - pout = gather_2p5d(pout, tesseract_dim, parallel_context=pc) - pout_update = gather_2p5d(pout_update, tesseract_dim, parallel_context=pc) - - if pc.get_global_rank() == 0: - print("> Test forward...", end="") - assert torch.allclose(out, pout) - if pc.get_global_rank() == 0: - print("OK") - - if pc.get_global_rank() == 0: - print("> Test backward...", end="") - assert torch.allclose(out_update, pout_update) - if pc.get_global_rank() == 0: - print("OK") - - -if __name__ == "__main__": - pc = ParallelContext.from_torch( - tensor_parallel_size=8, - tensor_parallel_depth=2, - tensor_parallel_mode=ParallelMode.TENSOR_2P5D, - ) - if pc.get_global_rank() == 0: - print("Test tests/torch/nn/modules/embedding/test_embedding_2p5d.py") - test_embedding_2p5d(pc) diff --git a/tests/torch/nn/modules/embedding/test_embedding_3d.py b/tests/torch/nn/modules/embedding/test_embedding_3d.py deleted file mode 100644 index 38b087a4..00000000 --- a/tests/torch/nn/modules/embedding/test_embedding_3d.py +++ /dev/null @@ -1,80 +0,0 @@ -""" -torchrun --nproc_per_node=4 test_embedding_3d.py -""" -from copy import deepcopy - -import torch -import torch.distributed as dist - -from oslo.torch.nn.parallel.tensor_parallel.utils import ( - split_batch_3d, - split_input_3d, - split_embedding_3d, - gather_output_3d, -) -from oslo.torch.distributed import ParallelContext, ParallelMode -from oslo.torch.nn import Embedding3D - - -def test_embedding_3d(pc): - batch_size = 4 - seq_len = 5 - num_embeddings = 16 - embedding_dim = 8 - cubic_dim = pc.get_world_size(ParallelMode.TENSOR_3D_INPUT) - - input_ = torch.LongTensor( - [[0, 1, 6, 13, 8], [5, 12, 7, 4, 9], [5, 2, 7, 15, 4], [14, 2, 8, 7, 9]] - ).cuda() - target = torch.randn((batch_size, seq_len, embedding_dim)).cuda() - dist.broadcast(input_, src=0) - dist.broadcast(target, src=0) - - embedding = torch.nn.Embedding(num_embeddings, embedding_dim).cuda() - w = deepcopy(embedding.weight.data) - - out = embedding(input_) - optimizer = torch.optim.Adam(embedding.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(out, target) - loss.backward() - optimizer.step() - - out_update = embedding(input_) - input_ = split_batch_3d(input_, cubic_dim, parallel_context=pc) - target = split_input_3d(target, cubic_dim, parallel_context=pc) - w = split_embedding_3d(w, cubic_dim, parallel_context=pc) - - embedding_3d = Embedding3D(num_embeddings, embedding_dim, parallel_context=pc) - embedding_3d.weight.data.copy_(w) - - pout = embedding_3d(input_) - optimizer = torch.optim.Adam(embedding_3d.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(pout, target) - loss.backward() - optimizer.step() - - pout_update = embedding_3d(input_) - pout = gather_output_3d(pout, cubic_dim, parallel_context=pc) - pout_update = gather_output_3d(pout_update, cubic_dim, parallel_context=pc) - - if pc.get_global_rank() == 0: - print("> Test forward...", end="") - assert torch.allclose(out, pout) - if pc.get_global_rank() == 0: - print("OK") - - if pc.get_global_rank() == 0: - print("> Test backward...", end="") - assert torch.allclose(out_update, pout_update) - if pc.get_global_rank() == 0: - print("OK") - - -if __name__ == "__main__": - pc = ParallelContext.from_torch( - tensor_parallel_size=8, - tensor_parallel_mode=ParallelMode.TENSOR_3D, - ) - if pc.get_global_rank() == 0: - print("Test tests/torch/nn/modules/embedding/test_embedding_3d.py") - test_embedding_3d(pc) diff --git a/tests/torch/nn/modules/embedding/test_vocab_embedding_1d.py b/tests/torch/nn/modules/embedding/test_vocab_embedding_1d.py deleted file mode 100644 index 45b83fd2..00000000 --- a/tests/torch/nn/modules/embedding/test_vocab_embedding_1d.py +++ /dev/null @@ -1,63 +0,0 @@ -""" -torchrun --nproc_per_node=4 test_vocab_embedding_1d.py -""" -from copy import deepcopy - -import torch -import torch.distributed as dist - -from oslo.torch.distributed import ParallelContext, ParallelMode -from oslo.torch.nn import VocabParallelEmbedding1D -from oslo.torch.nn.parallel.tensor_parallel.utils import split_1d - - -def test_vocab_embedding_1d(pc): - batch_size = 2 - seq_len = 4 - hidden_dim = 8 - world_size = pc.get_world_size(ParallelMode.TENSOR_1D) - - input_ = torch.LongTensor([[0, 1, 6, 3], [5, 2, 7, 9]]).cuda() - target = torch.randn((batch_size, seq_len, hidden_dim)).cuda() - dist.broadcast(input_, src=0) - dist.broadcast(target, src=0) - - vocab_embedding = torch.nn.Embedding(16, 8).cuda() - w = deepcopy(vocab_embedding.weight.data) - - out = vocab_embedding(input_) - optimizer = torch.optim.Adam(vocab_embedding.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(out, target) - loss.backward() - optimizer.step() - - out_update = vocab_embedding(input_) - w = split_1d(w, world_size, dim=0, parallel_context=pc) - vocab_embedding_1d = VocabParallelEmbedding1D(16, 8, parallel_context=pc) - vocab_embedding_1d.weight.data = w - - pout = vocab_embedding_1d(input_) - optimizer = torch.optim.Adam(vocab_embedding_1d.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(pout, target) - loss.backward() - optimizer.step() - pout_update = vocab_embedding_1d(input_) - - if pc.get_global_rank() == 0: - print("> Test forward...", end="") - assert torch.allclose(out, pout) - if pc.get_global_rank() == 0: - print("OK") - - if pc.get_global_rank() == 0: - print("> Test backward...", end="") - assert torch.allclose(out_update, pout_update) - if pc.get_global_rank() == 0: - print("OK") - - -if __name__ == "__main__": - pc = ParallelContext.from_torch(tensor_parallel_size=4) - if pc.get_global_rank() == 0: - print("Test tests/torch/nn/modules/embedding/test_vocab_embedding_1d.py") - test_vocab_embedding_1d(pc) diff --git a/tests/torch/nn/modules/embedding/test_vocab_embedding_2d.py b/tests/torch/nn/modules/embedding/test_vocab_embedding_2d.py deleted file mode 100644 index 4fb402c5..00000000 --- a/tests/torch/nn/modules/embedding/test_vocab_embedding_2d.py +++ /dev/null @@ -1,72 +0,0 @@ -""" -torchrun --nproc_per_node=4 test_vocab_embedding_2d.py -""" -from copy import deepcopy - -import torch -import torch.distributed as dist - -from oslo.torch.distributed import ParallelContext, ParallelMode -from oslo.torch.nn import VocabParallelEmbedding2D -from oslo.torch.nn.parallel.tensor_parallel.utils import ( - split_batch_2d, - split_2d, - gather_2d, -) - - -def test_vocab_embedding_2d(pc): - summa_dim = pc.get_world_size(ParallelMode.TENSOR_2D_COL) - input_ = torch.LongTensor([[0, 1, 6, 3, 8], [5, 2, 7, 4, 9]]).cuda() - target = torch.randn((2, 5, 8)).cuda() - dist.broadcast(input_, src=0) - dist.broadcast(target, src=0) - - vocab_embedding = torch.nn.Embedding(16, 8).cuda() - w = deepcopy(vocab_embedding.weight.data) - - out = vocab_embedding(input_) - optimizer = torch.optim.Adam(vocab_embedding.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(out, target) - loss.backward() - optimizer.step() - - out_update = vocab_embedding(input_) - input_ = split_batch_2d(input_, summa_dim, parallel_context=pc) - target = split_2d(target, summa_dim, parallel_context=pc) - w = split_2d(w, summa_dim, parallel_context=pc) - - vocab_embedding_2d = VocabParallelEmbedding2D(16, 8, parallel_context=pc) - vocab_embedding_2d.weight.data.copy_(w) - - pout = vocab_embedding_2d(input_) - optimizer = torch.optim.Adam(vocab_embedding_2d.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(pout, target) - loss.backward() - optimizer.step() - - pout_update = vocab_embedding_2d(input_) - pout = gather_2d(pout, summa_dim, parallel_context=pc) - pout_update = gather_2d(pout_update, summa_dim, parallel_context=pc) - - if pc.get_global_rank() == 0: - print("> Test forward...", end="") - assert torch.allclose(out, pout) - if pc.get_global_rank() == 0: - print("OK") - - if pc.get_global_rank() == 0: - print("> Test backward...", end="") - assert torch.allclose(out_update, pout_update) - if pc.get_global_rank() == 0: - print("OK") - - -if __name__ == "__main__": - pc = ParallelContext.from_torch( - tensor_parallel_size=4, - tensor_parallel_mode=ParallelMode.TENSOR_2D, - ) - if pc.get_global_rank() == 0: - print("Test tests/torch/nn/modules/embedding/test_vocab_embedding_2d.py") - test_vocab_embedding_2d(pc) diff --git a/tests/torch/nn/modules/embedding/test_vocab_embedding_2p5d.py b/tests/torch/nn/modules/embedding/test_vocab_embedding_2p5d.py deleted file mode 100644 index ed11b639..00000000 --- a/tests/torch/nn/modules/embedding/test_vocab_embedding_2p5d.py +++ /dev/null @@ -1,80 +0,0 @@ -""" -torchrun --nproc_per_node=8 test_vocab_embedding_2p5d.py -""" -from copy import deepcopy - -import torch -import torch.distributed as dist - -from oslo.torch.nn.parallel.tensor_parallel.utils import ( - split_batch_2p5d, - split_2p5d, - gather_2p5d, -) -from oslo.torch.distributed import ParallelContext, ParallelMode -from oslo.torch.nn import VocabParallelEmbedding2p5D - - -def test_vocab_embedding_2p5d(pc): - batch_size = 2 - seq_len = 5 - num_embeddings = 16 - embedding_dim = 8 - tesseract_dim = pc.get_world_size(ParallelMode.TENSOR_2P5D_COL) - - input_ = torch.LongTensor([[0, 1, 6, 3, 8], [5, 2, 7, 4, 9]]).cuda() - target = torch.randn((batch_size, seq_len, embedding_dim)).cuda() - dist.broadcast(input_, src=0) - dist.broadcast(target, src=0) - - vocab_embedding = torch.nn.Embedding(num_embeddings, embedding_dim).cuda() - w = deepcopy(vocab_embedding.weight.data) - - out = vocab_embedding(input_) - optimizer = torch.optim.Adam(vocab_embedding.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(out, target) - loss.backward() - optimizer.step() - - out_update = vocab_embedding(input_) - input_ = split_batch_2p5d(input_, tesseract_dim, parallel_context=pc) - target = split_2p5d(target, tesseract_dim, parallel_context=pc) - w = split_2p5d(w, tesseract_dim, parallel_context=pc) - - vocab_embedding_2p5d = VocabParallelEmbedding2p5D( - num_embeddings, embedding_dim, parallel_context=pc - ) - vocab_embedding_2p5d.weight.data.copy_(w) - - pout = vocab_embedding_2p5d(input_) - optimizer = torch.optim.Adam(vocab_embedding_2p5d.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(pout, target) - loss.backward() - optimizer.step() - - pout_update = vocab_embedding_2p5d(input_) - pout = gather_2p5d(pout, tesseract_dim, parallel_context=pc) - pout_update = gather_2p5d(pout_update, tesseract_dim, parallel_context=pc) - - if pc.get_global_rank() == 0: - print("> Test forward...", end="") - assert torch.allclose(out, pout) - if pc.get_global_rank() == 0: - print("OK") - - if pc.get_global_rank() == 0: - print("> Test backward...", end="") - assert torch.allclose(out_update, pout_update) - if pc.get_global_rank() == 0: - print("OK") - - -if __name__ == "__main__": - pc = ParallelContext.from_torch( - tensor_parallel_size=8, - tensor_parallel_depth=2, - tensor_parallel_mode=ParallelMode.TENSOR_2P5D, - ) - if pc.get_global_rank() == 0: - print("Test tests/torch/nn/modules/embedding/test_vocab_embedding_2p5d.py") - test_vocab_embedding_2p5d(pc) diff --git a/tests/torch/nn/modules/embedding/test_vocab_embedding_3d.py b/tests/torch/nn/modules/embedding/test_vocab_embedding_3d.py deleted file mode 100644 index 0e5e13ec..00000000 --- a/tests/torch/nn/modules/embedding/test_vocab_embedding_3d.py +++ /dev/null @@ -1,82 +0,0 @@ -""" -torchrun --nproc_per_node=8 test_vocab_embedding_3d.py -""" -from copy import deepcopy - -import torch -import torch.distributed as dist - -from oslo.torch.nn.parallel.tensor_parallel.utils import ( - split_batch_3d, - split_input_3d, - split_weight_3d, - gather_output_3d, -) -from oslo.torch.distributed import ParallelContext, ParallelMode -from oslo.torch.nn import VocabParallelEmbedding3D - - -def test_vocab_embedding_3d(pc): - batch_size = 4 - seq_len = 5 - num_embeddings = 16 - embedding_dim = 8 - cubic_dim = pc.get_world_size(ParallelMode.TENSOR_3D_INPUT) - - input_ = torch.LongTensor( - [[0, 1, 6, 13, 8], [5, 12, 7, 4, 9], [5, 2, 7, 15, 4], [14, 2, 8, 7, 9]] - ).cuda() - target = torch.randn((batch_size, seq_len, embedding_dim)).cuda() - dist.broadcast(input_, src=0) - dist.broadcast(target, src=0) - - embedding = torch.nn.Embedding(num_embeddings, embedding_dim).cuda() - w = deepcopy(embedding.weight.data) - - out = embedding(input_) - optimizer = torch.optim.Adam(embedding.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(out, target) - loss.backward() - optimizer.step() - - out_update = embedding(input_) - input_ = split_batch_3d(input_, cubic_dim, parallel_context=pc) - target = split_input_3d(target, cubic_dim, parallel_context=pc) - w = split_weight_3d(w, cubic_dim, parallel_context=pc) - - vocab_embedding_3d = VocabParallelEmbedding3D( - num_embeddings, embedding_dim, parallel_context=pc - ) - vocab_embedding_3d.weight.data.copy_(w) - - pout = vocab_embedding_3d(input_) - optimizer = torch.optim.Adam(vocab_embedding_3d.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(pout, target) - loss.backward() - optimizer.step() - - pout_update = vocab_embedding_3d(input_) - pout = gather_output_3d(pout, cubic_dim, parallel_context=pc) - pout_update = gather_output_3d(pout_update, cubic_dim, parallel_context=pc) - - if pc.get_global_rank() == 0: - print("> Test forward...", end="") - assert torch.allclose(out, pout) - if pc.get_global_rank() == 0: - print("OK") - - if pc.get_global_rank() == 0: - print("> Test backward...", end="") - assert torch.allclose(out_update, pout_update) - if pc.get_global_rank() == 0: - print("OK") - - -if __name__ == "__main__": - pc = ParallelContext.from_torch( - tensor_parallel_size=8, - tensor_parallel_mode=ParallelMode.TENSOR_3D, - ) - if pc.get_global_rank() == 0: - print("Test tests/torch/nn/modules/embedding/test_vocab_embedding_3d.py") - test_vocab_embedding_3d(pc) diff --git a/tests/torch/nn/modules/functional/__init__.py b/tests/torch/nn/modules/functional/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/torch/nn/modules/functional/test_gelu.py b/tests/torch/nn/modules/functional/test_gelu.py deleted file mode 100644 index ffb72fb8..00000000 --- a/tests/torch/nn/modules/functional/test_gelu.py +++ /dev/null @@ -1,55 +0,0 @@ -""" -python3 test_gelu.py -""" -import torch -from torch.nn import functional as F -from oslo.torch.nn import fused_gelu, fused_bias_gelu - - -def test_gelu(): - input_fused = torch.randn(4, 4, requires_grad=True).cuda() - input_fused.retain_grad() - input_non_fused = input_fused.clone().detach().requires_grad_(True) - input_non_fused.retain_grad() - - output_fused = fused_gelu(input_fused) - output_non_fused = F.gelu(input_non_fused) - - print("> Test forward...", end="") - assert torch.allclose(output_fused, output_non_fused, atol=1e-2) - print("OK") - - print("> Test backward...", end="") - output_non_fused.sum().backward() - output_fused.sum().backward() - assert torch.allclose(input_fused.grad, input_non_fused.grad, atol=1e-2) - print("OK") - - input_fused = torch.randn(4, 4, requires_grad=True).cuda() - input_fused.retain_grad() - input_bias_fused = torch.randn(4, 4, requires_grad=True).cuda() - input_bias_fused.retain_grad() - - input_non_fused = input_fused.clone().detach().requires_grad_(True) - input_non_fused.retain_grad() - input_bias_non_fused = input_bias_fused.clone().detach().requires_grad_(True) - input_bias_non_fused.retain_grad() - - output_fused = fused_bias_gelu(input_fused, input_bias_fused) - output_non_fused = F.gelu(input_non_fused + input_bias_non_fused) - - print("> Test forward with bias...", end="") - assert torch.allclose(output_fused, output_non_fused, atol=1e-2) - print("OK") - - print("> Test backward with bias...", end="") - output_non_fused.sum().backward() - output_fused.sum().backward() - assert torch.allclose(input_fused.grad, input_non_fused.grad, atol=1e-2) - assert torch.allclose(input_bias_fused.grad, input_bias_non_fused.grad, atol=1e-2) - print("OK") - - -if __name__ == "__main__": - print("Test tests/torch/nn/modules/functional/test_gelu.py") - test_gelu() diff --git a/tests/torch/nn/modules/layer_norm/__init__.py b/tests/torch/nn/modules/layer_norm/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/torch/nn/modules/layer_norm/test_fused_layer_norm_autocast.py b/tests/torch/nn/modules/layer_norm/test_fused_layer_norm_autocast.py deleted file mode 100644 index 09e015d8..00000000 --- a/tests/torch/nn/modules/layer_norm/test_fused_layer_norm_autocast.py +++ /dev/null @@ -1,132 +0,0 @@ -# from oslo.torch._C import FusedLayerNormBinder -import itertools -import unittest - -import torch - -import oslo.torch.nn as onn - - -def _prep_layers(normalized_shape, elementwise_affine, dtype): - native = torch.nn.LayerNorm( - normalized_shape=normalized_shape, elementwise_affine=elementwise_affine - ).to(device="cuda", dtype=dtype) - fused = onn.FusedLayerNorm( - normalized_shape=normalized_shape, elementwise_affine=elementwise_affine - ).cuda() - return native, fused - - -def _prep_rms_layers(normalized_shape, elementwise_affine, dtype): - native = onn.FusedRMSNorm( - normalized_shape=normalized_shape, elementwise_affine=elementwise_affine - ) - fused = onn.FusedRMSNorm( - normalized_shape=normalized_shape, elementwise_affine=elementwise_affine - ).cuda() - return native, fused - - -def _prep_inputs(batch_size, normalized_shape, dtype): - shape = (batch_size, *normalized_shape) - fused = torch.randn(shape).cuda().requires_grad_(True) - with torch.no_grad(): - native = fused.clone().to(dtype).requires_grad_(True) - return native, fused - - -autocast_dtypes = ( - (torch.half, torch.bfloat16) if torch.cuda.is_bf16_supported() else (torch.half,) -) - - -class TestAutocastFusedLayerNorm(unittest.TestCase): - bf16_fwd_thresholds = dict(rtol=1.6e-2, atol=3e-4) - bf16_bwd_thresholds = dict(rtol=1.6e-2, atol=3e-3) - - def setUp(self): - self.batch_size = 16 - self.normalized_shape = [32, 16] - - def _run_test(self, dtype, elementwise_affine): - native, fused = _prep_layers(self.normalized_shape, elementwise_affine, dtype) - native_x, fused_x = _prep_inputs(self.batch_size, self.normalized_shape, dtype) - - expected = native(native_x) - with torch.cuda.amp.autocast(dtype=dtype): - actual = fused(fused_x) - tols = ( - {"rtol": None, "atol": None} - if dtype == torch.half - else TestAutocastFusedLayerNorm.bf16_fwd_thresholds - ) - torch.testing.assert_allclose(actual, expected, **tols) - - g_native = torch.rand_like(expected) - with torch.no_grad(): - g_fused = g_native.clone() - expected.backward(g_native) - actual.backward(g_fused) - - tols = ( - {"rtol": None, "atol": None} - if dtype == torch.half - else TestAutocastFusedLayerNorm.bf16_bwd_thresholds - ) - torch.testing.assert_allclose(native_x.grad, fused_x.grad, **tols) - - def test_autocast(self): - for (dtype, elementwise_affine) in itertools.product( - autocast_dtypes, (True, False) - ): - with self.subTest(f"{dtype}-{elementwise_affine}"): - self._run_test(dtype, elementwise_affine) - - -class TestAutocastFusedRMSNorm(unittest.TestCase): - bf16_fwd_thresholds = dict(rtol=1.6e-2, atol=3e-4) - bf16_bwd_thresholds = dict(rtol=1.6e-2, atol=3e-3) - - def setUp(self): - self.batch_size = 16 - self.normalized_shape = [32, 16] - - def _run_test(self, dtype, elementwise_affine): - native, fused = _prep_rms_layers( - self.normalized_shape, elementwise_affine, dtype - ) - native_x, fused_x = _prep_inputs(self.batch_size, self.normalized_shape, dtype) - - expected = native(native_x.cpu()) - with torch.cuda.amp.autocast(dtype=dtype): - actual = fused(fused_x) - tols = ( - {"rtol": None, "atol": None} - if dtype == torch.half - else TestAutocastFusedRMSNorm.bf16_fwd_thresholds - ) - torch.testing.assert_allclose(actual, expected.detach().clone().cuda(), **tols) - - g_native = torch.rand_like(expected) - with torch.no_grad(): - g_fused = g_native.detach().clone().cuda() - expected.backward(g_native) - actual.backward(g_fused) - - tols = ( - {"rtol": None, "atol": None} - if dtype == torch.half - else TestAutocastFusedRMSNorm.bf16_bwd_thresholds - ) - torch.testing.assert_allclose(native_x.grad.cuda(), fused_x.grad, **tols) - - def test_autocast(self): - for (dtype, elementwise_affine) in itertools.product( - autocast_dtypes, (True, False) - ): - with self.subTest(f"{dtype}-{elementwise_affine}"): - self._run_test(dtype, elementwise_affine) - - -if __name__ == "__main__": - unittest.main(verbosity=2) diff --git a/tests/torch/nn/modules/layer_norm/test_fused_layer_norm_fusedlayernorm.py b/tests/torch/nn/modules/layer_norm/test_fused_layer_norm_fusedlayernorm.py deleted file mode 100644 index eaf998b4..00000000 --- a/tests/torch/nn/modules/layer_norm/test_fused_layer_norm_fusedlayernorm.py +++ /dev/null @@ -1,119 +0,0 @@ -import unittest - -import torch - -import oslo.torch.nn as onn - - -# Test FusedLayerNorm -class TestFusedLayerNorm(unittest.TestCase): - dtype = torch.float - elementwise_affine = False - normalized_shape = [32, 16] - rtol, atol = None, None - fwd_thresholds = dict(rtol=1e-2, atol=1e-2) # rtol=1, atol=1 to pass the test - bwd_thresholds = dict(rtol=1e-2, atol=1e-2) # rtol=1, atol=1 to pass the test - mixed_fused = False - - def setUp(self) -> None: - # bias and weight are set to 0 and 1 respectively, so no need to copy parameters from cpu module to the gpu one - if not self.mixed_fused: - self.module_cpu_ = onn.FusedLayerNorm( - normalized_shape=self.normalized_shape, - elementwise_affine=self.elementwise_affine, - ).cpu() - self.module_cuda_ = onn.FusedLayerNorm( - normalized_shape=self.normalized_shape, - elementwise_affine=self.elementwise_affine, - ).to(device="cuda", dtype=self.dtype) - else: - assert self.elementwise_affine - self.module_cpu_ = onn.MixedFusedLayerNorm( - normalized_shape=self.normalized_shape - ).cpu() - self.module_cuda_ = onn.MixedFusedLayerNorm( - normalized_shape=self.normalized_shape - ).to(device="cuda", dtype=self.dtype) - - if not self.mixed_fused: - self.module_cuda_ = onn.FusedLayerNorm( - normalized_shape=self.normalized_shape, - elementwise_affine=self.elementwise_affine, - ).to(device="cuda", dtype=self.dtype) - - def _check_same_output(self, batch_size, contiguous): - torch.cuda.manual_seed(42) - if contiguous: - input_shape = [batch_size] + self.normalized_shape - input_ = torch.randn(input_shape, device="cpu").requires_grad_(True) - input_cuda_ = ( - input_.to(device="cuda", dtype=self.dtype).detach().requires_grad_(True) - ) - self.assertTrue(input_.is_contiguous()) - self.assertTrue(input_cuda_.is_contiguous()) - else: - input_shape = [batch_size * 3] + [ - self.normalized_shape[0] * 5, - self.normalized_shape[1] * 3, - ] - input_src_ = torch.randn(input_shape, device="cpu") - input_ = input_src_[::3, ::5, ::3].detach().requires_grad_(True) - input_cuda_ = ( - input_src_.to(device="cuda", dtype=self.dtype)[::3, ::5, ::3] - .detach() - .requires_grad_(True) - ) - # make sure that tensors are NOT contiguous. - self.assertFalse(input_.is_contiguous()) - self.assertFalse(input_cuda_.is_contiguous()) - out_cpu_ = self.module_cpu_(input_) - gO = torch.rand_like(out_cpu_) - out_cpu_.backward(gO) - out_cuda_ = self.module_cuda_(input_cuda_) - # TODO (mkozuki): `torch.testing.assert_close` is deprecated. - # Use `torch.testing.assert_close`. - # See https://github.com/pytorch/pytorch/issues/61844 - torch.testing.assert_close( - out_cpu_.to(device="cuda", dtype=self.dtype), - out_cuda_.clone().detach(), - **self.fwd_thresholds, - ) - gO = gO.to(device="cuda", dtype=self.dtype) - out_cuda_.backward(gO) - self.assertFalse(out_cpu_.is_cuda) - self.assertTrue(out_cuda_.is_cuda) - torch.testing.assert_close( - input_.grad.to(device="cuda", dtype=self.dtype), - input_cuda_.grad, - **self.bwd_thresholds, - ) - if self.elementwise_affine: - torch.testing.assert_close( - self.module_cpu_.weight.grad.to(device="cuda", dtype=self.dtype), - self.module_cuda_.weight.grad, - **self.bwd_thresholds, - ) - - def _test_same_output(self, batch_size): - for contiguous in (True, False): - with self.subTest(contiguous=contiguous): - self._check_same_output(batch_size, contiguous) - - def test_layer_norm(self): - self._test_same_output(16) - - def test_large_batch(self): - self._test_same_output(65536) - - -class TestFusedLayerNormElemWise(TestFusedLayerNorm): - elementwise_affine = True - - -class TestMixedFusedLayerNormElemWise(TestFusedLayerNorm): - elementwise_affine = True - mixed_fused = True - - -if __name__ == "__main__": - unittest.main(verbosity=2) diff --git a/tests/torch/nn/modules/layer_norm/test_fused_layer_norm_rms.py b/tests/torch/nn/modules/layer_norm/test_fused_layer_norm_rms.py deleted file mode 100644 index 66a915e5..00000000 --- a/tests/torch/nn/modules/layer_norm/test_fused_layer_norm_rms.py +++ /dev/null @@ -1,113 +0,0 @@ -# from oslo.torch._C import FusedLayerNormBinder -import unittest - -import torch - -import oslo.torch.nn as onn - - -class TestFusedRMSNorm(unittest.TestCase): - dtype = torch.float - elementwise_affine = False - normalized_shape = [32, 16] - rtol, atol = None, None - fwd_thresholds = dict(rtol=1e-2, atol=1e-2) - bwd_thresholds = dict(rtol=1e-2, atol=1e-2) - mixed_fused = False - - def setUp(self): - # bias and weight are set to 0 and 1 respectively, so no need to copy parameters from cpu module to the gpu one - if not self.mixed_fused: - self.module_cpu_ = onn.FusedRMSNorm( - normalized_shape=self.normalized_shape, - elementwise_affine=self.elementwise_affine, - ).cpu() - self.module_cuda_ = onn.FusedRMSNorm( - normalized_shape=self.normalized_shape, - elementwise_affine=self.elementwise_affine, - ).to(device="cuda", dtype=self.dtype) - else: - assert self.elementwise_affine - self.module_cpu_ = onn.MixedFusedRMSNorm( - normalized_shape=self.normalized_shape - ).cpu() - self.module_cuda_ = onn.MixedFusedRMSNorm( - normalized_shape=self.normalized_shape - ).to(device="cuda", dtype=self.dtype) - - def _check_same_output(self, batch_size, contiguous): - torch.cuda.manual_seed(42) - if contiguous: - input_shape = [batch_size] + self.normalized_shape - input_ = torch.randn(input_shape, device="cpu").requires_grad_(True) - input_cuda_ = ( - input_.to(device="cuda", dtype=self.dtype).detach().requires_grad_(True) - ) - self.assertTrue(input_.is_contiguous()) - self.assertTrue(input_cuda_.is_contiguous()) - else: - input_shape = [batch_size * 3] + [ - self.normalized_shape[0] * 5, - self.normalized_shape[1] * 3, - ] - input_src_ = torch.randn(input_shape, device="cpu") - input_ = input_src_[::3, ::5, ::3].detach().requires_grad_(True) - input_cuda_ = ( - input_src_.to(device="cuda", dtype=self.dtype)[::3, ::5, ::3] - .detach() - .requires_grad_(True) - ) - # make sure that tensors are NOT contiguous. - self.assertFalse(input_.is_contiguous()) - self.assertFalse(input_cuda_.is_contiguous()) - out_cpu_ = self.module_cpu_(input_) - gO = torch.rand_like(out_cpu_) - out_cpu_.backward(gO) - out_cuda_ = self.module_cuda_(input_cuda_) - # TODO (mkozuki): `torch.testing.assert_close` is deprecated. - # Use `torch.testing.assert_close`. - # See https://github.com/pytorch/pytorch/issues/61844 - torch.testing.assert_close( - out_cpu_.to(device="cuda", dtype=self.dtype), - out_cuda_.clone().detach(), - **self.fwd_thresholds, - ) - gO = gO.to(device="cuda", dtype=self.dtype) - out_cuda_.backward(gO) - self.assertFalse(out_cpu_.is_cuda) - self.assertTrue(out_cuda_.is_cuda) - torch.testing.assert_close( - input_.grad.to(device="cuda", dtype=self.dtype), - input_cuda_.grad, - **self.bwd_thresholds, - ) - if self.elementwise_affine: - torch.testing.assert_close( - self.module_cpu_.weight.grad.to(device="cuda", dtype=self.dtype), - self.module_cuda_.weight.grad, - **self.bwd_thresholds, - ) - - def _test_same_output(self, batch_size): - for contiguous in (True, False): - with self.subTest(contiguous=contiguous): - self._check_same_output(batch_size, contiguous) - - def test_layer_norm(self): - self._test_same_output(16) - - def test_large_batch(self): - self._test_same_output(65536) - - -class TestFusedRMSNormElemWise(TestFusedRMSNorm): - elementwise_affine = True - - -class TestMixedFusedRMSNormElemWise(TestFusedRMSNorm): - elementwise_affine = True - mixed_fused = True - - -if __name__ == "__main__": - unittest.main(verbosity=True) diff --git a/tests/torch/nn/modules/layer_norm/test_layer_norm_1d.py b/tests/torch/nn/modules/layer_norm/test_layer_norm_1d.py deleted file mode 100644 index 81de4124..00000000 --- a/tests/torch/nn/modules/layer_norm/test_layer_norm_1d.py +++ /dev/null @@ -1,62 +0,0 @@ -""" -torchrun --nproc_per_node=4 test_layer_norm_1d.py -""" -from copy import deepcopy - -import torch -import torch.distributed as dist - -from oslo.torch.distributed import ParallelContext -from oslo.torch.nn import LayerNorm1D - - -def test_layer_norm_1d(pc): - batch_size = 2 - seq_len = 4 - hidden_dim = 8 - - input_ = torch.randn((batch_size, seq_len, hidden_dim)).cuda() - target = torch.randn((batch_size, seq_len, hidden_dim)).cuda() - dist.broadcast(input_, src=0) - dist.broadcast(target, src=0) - - layernorm = torch.nn.LayerNorm(hidden_dim).cuda() - w = deepcopy(layernorm.weight.data) - b = deepcopy(layernorm.bias.data) - - out = layernorm(input_) - optimizer = torch.optim.Adam(layernorm.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(out, target) - loss.backward() - optimizer.step() - - out_update = layernorm(input_) - layernorm_1d = LayerNorm1D(hidden_dim, parallel_context=pc) - layernorm_1d.weight.data = w - layernorm_1d.bias.data = b - - pout = layernorm_1d(input_) - optimizer = torch.optim.Adam(layernorm_1d.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(pout, target) - loss.backward() - optimizer.step() - pout_update = layernorm_1d(input_) - - if pc.get_global_rank() == 0: - print("> Test forward...", end="") - assert torch.allclose(out, pout) - if pc.get_global_rank() == 0: - print("OK") - - if pc.get_global_rank() == 0: - print("> Test backward...", end="") - assert torch.allclose(out_update, pout_update) - if pc.get_global_rank() == 0: - print("OK") - - -if __name__ == "__main__": - pc = ParallelContext.from_torch(tensor_parallel_size=4) - if pc.get_global_rank() == 0: - print("Test tests/torch/nn/modules/layer_norm/test_layer_norm_1d.py") - test_layer_norm_1d(pc) diff --git a/tests/torch/nn/modules/layer_norm/test_layer_norm_2d.py b/tests/torch/nn/modules/layer_norm/test_layer_norm_2d.py deleted file mode 100644 index c906127e..00000000 --- a/tests/torch/nn/modules/layer_norm/test_layer_norm_2d.py +++ /dev/null @@ -1,83 +0,0 @@ -""" -torchrun --nproc_per_node=4 test_layer_norm_2d.py -""" -from copy import deepcopy - -import torch -import torch.distributed as dist - -from oslo import ParallelMode -from oslo.torch.distributed import ParallelContext -from oslo.torch.nn import LayerNorm2D -from oslo.torch.nn.parallel.tensor_parallel.utils import ( - split_2d, - split_layernorm_2d, - split_bias_2d, - gather_2d, -) - - -def test_layer_norm_2d(pc): - batch_size = 2 - seq_len = 2 - hidden_dim = 8 - - summa_dim = pc.get_world_size(ParallelMode.TENSOR_2D_COL) - input_ = torch.randn((batch_size, seq_len, hidden_dim)).cuda() - target = torch.randn((batch_size, seq_len, hidden_dim)).cuda() - - dist.broadcast(input_, src=0) - dist.broadcast(target, src=0) - - layernorm = torch.nn.LayerNorm(hidden_dim).cuda() - w = deepcopy(layernorm.weight.data) - b = deepcopy(layernorm.bias.data) - - out = layernorm(input_) - optimizer = torch.optim.Adam(layernorm.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(out, target) - loss.backward() - optimizer.step() - - out_update = layernorm(input_) - dist.barrier() - - input_ = split_2d(input_, summa_dim, parallel_context=pc) - target = split_2d(target, summa_dim, parallel_context=pc) - w = split_layernorm_2d(w, summa_dim, parallel_context=pc) - b = split_bias_2d(b, summa_dim, parallel_context=pc) - - layernorm_2d = LayerNorm2D(hidden_dim, parallel_context=pc) - layernorm_2d.weight.data.copy_(w) - layernorm_2d.bias.data.copy_(b) - - pout = layernorm_2d(input_) - optimizer = torch.optim.Adam(layernorm_2d.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(pout, target) - loss.backward() - optimizer.step() - - pout_update = layernorm_2d(input_) - pout = gather_2d(pout, summa_dim, parallel_context=pc) - pout_update = gather_2d(pout_update, summa_dim, parallel_context=pc) - - if pc.get_global_rank() == 0: - print("> Test forward...", end="") - assert torch.allclose(out, pout) - if pc.get_global_rank() == 0: - print("OK") - - if pc.get_global_rank() == 0: - print("> Test backward...", end="") - assert torch.allclose(out_update, pout_update) - if pc.get_global_rank() == 0: - print("OK") - - -if __name__ == "__main__": - pc = ParallelContext.from_torch( - tensor_parallel_size=4, tensor_parallel_mode=ParallelMode.TENSOR_2D - ) - if pc.get_global_rank() == 0: - print("Test tests/torch/nn/modules/layer_norm/test_layer_norm_2d.py") - test_layer_norm_2d(pc) diff --git a/tests/torch/nn/modules/layer_norm/test_layer_norm_2p5d.py b/tests/torch/nn/modules/layer_norm/test_layer_norm_2p5d.py deleted file mode 100644 index 224cfd4d..00000000 --- a/tests/torch/nn/modules/layer_norm/test_layer_norm_2p5d.py +++ /dev/null @@ -1,86 +0,0 @@ -""" -torchrun --nproc_per_node=8 test_layer_norm_2p5d.py -""" -from copy import deepcopy - -import torch -import torch.distributed as dist - -from oslo import ParallelMode -from oslo.torch.distributed import ParallelContext -from oslo.torch.nn import LayerNorm2p5D -from oslo.torch.nn.parallel.tensor_parallel.utils import ( - gather_2p5d, - split_2p5d, - split_layernorm_2p5d, - split_bias_2p5d, -) - - -def test_layer_norm_2p5d(pc): - batch_size = 2 - seq_len = 2 - hidden_dim = 8 - - tesseract_dim = pc.get_world_size(ParallelMode.TENSOR_2P5D_COL) - input_ = torch.randn((batch_size, seq_len, hidden_dim)).cuda() - target = torch.randn((batch_size, seq_len, hidden_dim)).cuda() - - dist.broadcast(input_, src=0) - dist.broadcast(target, src=0) - - layernorm = torch.nn.LayerNorm(hidden_dim).cuda() - w = deepcopy(layernorm.weight.data) - b = deepcopy(layernorm.bias.data) - - out = layernorm(input_) - optimizer = torch.optim.Adam(layernorm.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(out, target) - loss.backward() - optimizer.step() - - out_update = layernorm(input_) - dist.barrier() - - input_ = split_2p5d(input_, tesseract_dim, parallel_context=pc) - target = split_2p5d(target, tesseract_dim, parallel_context=pc) - - w = split_layernorm_2p5d(w, tesseract_dim, parallel_context=pc) - b = split_bias_2p5d(b, tesseract_dim, parallel_context=pc) - - layernorm_2p5d = LayerNorm2p5D(hidden_dim, parallel_context=pc) - layernorm_2p5d.weight.data.copy_(w) - layernorm_2p5d.bias.data.copy_(b) - - pout = layernorm_2p5d(input_) - optimizer = torch.optim.Adam(layernorm_2p5d.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(pout, target) - loss.backward() - optimizer.step() - - pout_update = layernorm_2p5d(input_) - pout = gather_2p5d(pout, tesseract_dim, parallel_context=pc) - pout_update = gather_2p5d(pout_update, tesseract_dim, parallel_context=pc) - - if pc.get_global_rank() == 0: - print("> Test forward...", end="") - assert torch.allclose(out, pout) - if pc.get_global_rank() == 0: - print("OK") - - if pc.get_global_rank() == 0: - print("> Test backward...", end="") - assert torch.allclose(out_update, pout_update) - if pc.get_global_rank() == 0: - print("OK") - - -if __name__ == "__main__": - pc = ParallelContext.from_torch( - tensor_parallel_size=8, - tensor_parallel_depth=2, - tensor_parallel_mode=ParallelMode.TENSOR_2P5D, - ) - if pc.get_global_rank() == 0: - print("Test tests/torch/nn/modules/layer_norm/test_layer_norm_2p5d.py") - test_layer_norm_2p5d(pc) diff --git a/tests/torch/nn/modules/layer_norm/test_layer_norm_3d.py b/tests/torch/nn/modules/layer_norm/test_layer_norm_3d.py deleted file mode 100644 index 7e7f20a9..00000000 --- a/tests/torch/nn/modules/layer_norm/test_layer_norm_3d.py +++ /dev/null @@ -1,84 +0,0 @@ -""" -torchrun --nproc_per_node=8 test_layer_norm_3d.py -""" -from copy import deepcopy - -import torch -import torch.distributed as dist - -from oslo import ParallelMode -from oslo.torch.distributed import ParallelContext -from oslo.torch.nn import LayerNorm3D -from oslo.torch.nn.parallel.tensor_parallel.utils import ( - split_input_3d, - split_layernorm_3d, - split_bias_3d, - gather_output_3d, -) - - -def test_layer_norm_3d(pc): - batch_size = 4 - seq_len = 2 - hidden_dim = 8 - - cubic_dim = pc.get_world_size(ParallelMode.TENSOR_3D_INPUT) - input_ = torch.randn((batch_size, seq_len, hidden_dim)).cuda() - target = torch.randn((batch_size, seq_len, hidden_dim)).cuda() - - dist.broadcast(input_, src=0) - dist.broadcast(target, src=0) - - layernorm = torch.nn.LayerNorm(hidden_dim).cuda() - w = deepcopy(layernorm.weight.data) - b = deepcopy(layernorm.bias.data) - - out = layernorm(input_) - optimizer = torch.optim.Adam(layernorm.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(out, target) - loss.backward() - optimizer.step() - - out_update = layernorm(input_) - dist.barrier() - - input_ = split_input_3d(input_, cubic_dim, parallel_context=pc) - target = split_input_3d(target, cubic_dim, parallel_context=pc) - w = split_layernorm_3d(w, cubic_dim, parallel_context=pc) - b = split_bias_3d(b, cubic_dim, parallel_context=pc) - - layernorm_3d = LayerNorm3D(hidden_dim, parallel_context=pc) - layernorm_3d.weight.data.copy_(w) - layernorm_3d.bias.data.copy_(b) - - pout = layernorm_3d(input_) - optimizer = torch.optim.Adam(layernorm_3d.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(pout, target) - loss.backward() - optimizer.step() - - pout_update = layernorm_3d(input_) - pout = gather_output_3d(pout, cubic_dim, parallel_context=pc) - pout_update = gather_output_3d(pout_update, cubic_dim, parallel_context=pc) - - if pc.get_global_rank() == 0: - print("> Test forward...", end="") - assert torch.allclose(out, pout) - if pc.get_global_rank() == 0: - print("OK") - - if pc.get_global_rank() == 0: - print("> Test backward...", end="") - assert torch.allclose(out_update, pout_update) - if pc.get_global_rank() == 0: - print("OK") - - -if __name__ == "__main__": - pc = ParallelContext.from_torch( - tensor_parallel_size=8, - tensor_parallel_mode=ParallelMode.TENSOR_3D, - ) - if pc.get_global_rank() == 0: - print("Test tests/torch/nn/modules/layer_norm/test_layer_norm_3d.py") - test_layer_norm_3d(pc) diff --git a/tests/torch/nn/modules/linear/__init__.py b/tests/torch/nn/modules/linear/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/torch/nn/modules/linear/test_linear.py b/tests/torch/nn/modules/linear/test_linear.py deleted file mode 100644 index 5a230a15..00000000 --- a/tests/torch/nn/modules/linear/test_linear.py +++ /dev/null @@ -1,43 +0,0 @@ -""" -python3 test_linear.py -""" -import torch -from torch.nn import Linear - -import oslo.torch.nn as onn - - -def test_linear(): - torch_linear = Linear(10, 10).cuda() - onn_linear = onn.Linear(10, 10).cuda() - onn_linear_skip = onn.Linear(10, 10, skip_bias_add=True).cuda() - - # make sure the parameters are the same - onn_linear.load_state_dict(torch_linear.state_dict()) - onn_linear_skip.load_state_dict(torch_linear.state_dict()) - - print("> Test weight shape...", end="") - assert torch_linear.weight.shape == onn_linear.weight.shape - print("OK") - - print("> Test bias shape...", end="") - assert torch_linear.bias.shape == onn_linear.bias.shape - print("OK") - - print("> Test forward...", end="") - input_tensor = torch.randn(1, 10, 10).cuda() - assert torch.allclose(torch_linear(input_tensor), onn_linear(input_tensor)) - print("OK") - - print("> Test forward skip bias add...", end="") - input_tensor = torch.randn(1, 10, 10).cuda() - torch_output = torch_linear(input_tensor) - onn_output, bias = onn_linear_skip(input_tensor) - onn_output += bias - assert torch.allclose(torch_output, onn_output) - print("OK") - - -if __name__ == "__main__": - print("Test tests/torch/nn/modules/linear/test_linear.py") - test_linear() diff --git a/tests/torch/nn/modules/linear/test_linear_1d_col.py b/tests/torch/nn/modules/linear/test_linear_1d_col.py deleted file mode 100644 index 484bfaa7..00000000 --- a/tests/torch/nn/modules/linear/test_linear_1d_col.py +++ /dev/null @@ -1,72 +0,0 @@ -""" -torchrun --nproc_per_node=4 test_linear_1d_col.py -""" -from copy import deepcopy - -import torch -import torch.distributed as dist - -from oslo.torch.distributed import ParallelContext, ParallelMode -from oslo.torch.nn import ColLinear1D -from oslo.torch.nn.parallel.tensor_parallel.utils import gather_1d, split_1d - - -def test_linear_1d_col(pc): - batch_size = 2 - seq_len = 4 - input_dim = 4 - hidden_dim = 8 - world_size = pc.get_world_size(ParallelMode.TENSOR_1D) - - input_ = torch.randn((batch_size, seq_len, input_dim)).cuda() - target = torch.randn((batch_size, seq_len, hidden_dim)).cuda() - dist.broadcast(input_, src=0) - dist.broadcast(target, src=0) - - linear = torch.nn.Linear(input_dim, hidden_dim).cuda() - w = deepcopy(linear.weight.data) - b = deepcopy(linear.bias.data) - - out = linear(input_) - optimizer = torch.optim.Adam(linear.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(out, target) - loss.backward() - optimizer.step() - out_update = linear(input_) - - target = split_1d(target, world_size, dim=-1, parallel_context=pc) - w = split_1d(w, world_size, dim=0, parallel_context=pc) - b = split_1d(b, world_size, dim=0, parallel_context=pc) - - col_linear = ColLinear1D(input_dim, hidden_dim, parallel_context=pc).cuda() - col_linear.weight.data.copy_(w) - col_linear.bias.data.copy_(b) - - pout = col_linear(input_) - optimizer = torch.optim.Adam(col_linear.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(pout, target) - loss.backward() - optimizer.step() - - pout_update = col_linear(input_) - pout = gather_1d(pout, world_size, dim=-1, parallel_context=pc) - pout_update = gather_1d(pout_update, world_size, dim=-1, parallel_context=pc) - - if pc.get_global_rank() == 0: - print("> Test forward...", end="") - assert torch.allclose(out, pout) - if pc.get_global_rank() == 0: - print("OK") - - if pc.get_global_rank() == 0: - print("> Test backward...", end="") - assert torch.allclose(out_update, pout_update) - if pc.get_global_rank() == 0: - print("OK") - - -if __name__ == "__main__": - pc = ParallelContext.from_torch(tensor_parallel_size=4) - if pc.get_global_rank() == 0: - print("Test tests/torch/nn/modules/linear/test_linear_1d_col.py") - test_linear_1d_col(pc) diff --git a/tests/torch/nn/modules/linear/test_linear_1d_row.py b/tests/torch/nn/modules/linear/test_linear_1d_row.py deleted file mode 100644 index b9fdccfc..00000000 --- a/tests/torch/nn/modules/linear/test_linear_1d_row.py +++ /dev/null @@ -1,68 +0,0 @@ -""" -torchrun --nproc_per_node=4 test_linear_1d_row.py -""" -from copy import deepcopy - -import torch -import torch.distributed as dist - -from oslo.torch.distributed import ParallelContext, ParallelMode -from oslo.torch.nn import RowLinear1D -from oslo.torch.nn.parallel.tensor_parallel.utils import split_1d - - -def test_linear_1d_row(pc): - batch_size = 2 - seq_len = 4 - input_dim = 4 - hidden_dim = 8 - world_size = pc.get_world_size(ParallelMode.TENSOR_1D) - - input_ = torch.randn((batch_size, seq_len, input_dim)).cuda() - target = torch.randn((batch_size, seq_len, hidden_dim)).cuda() - dist.broadcast(input_, src=0) - dist.broadcast(target, src=0) - - linear = torch.nn.Linear(input_dim, hidden_dim).cuda() - w = deepcopy(linear.weight.data) - b = deepcopy(linear.bias.data) - - out = linear(input_) - optimizer = torch.optim.Adam(linear.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(out, target) - loss.backward() - optimizer.step() - - out_update = linear(input_) - input_ = split_1d(input_, world_size, dim=-1, parallel_context=pc) - w = split_1d(w, world_size, dim=-1, parallel_context=pc) - - row_linear = RowLinear1D(input_dim, hidden_dim, parallel_context=pc).cuda() - row_linear.weight.data.copy_(w) - row_linear.bias.data.copy_(b) - - pout = row_linear(input_) - optimizer = torch.optim.Adam(row_linear.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(pout, target) - loss.backward() - optimizer.step() - pout_update = row_linear(input_) - - if pc.get_global_rank() == 0: - print("> Test forward...", end="") - assert torch.allclose(out, pout) - if pc.get_global_rank() == 0: - print("OK") - - if pc.get_global_rank() == 0: - print("> Test backward...", end="") - assert torch.allclose(out_update, pout_update) - if pc.get_global_rank() == 0: - print("OK") - - -if __name__ == "__main__": - pc = ParallelContext.from_torch(tensor_parallel_size=4) - if pc.get_global_rank() == 0: - print("Test tests/torch/nn/modules/linear/test_linear_1d_row.py") - test_linear_1d_row(pc) diff --git a/tests/torch/nn/modules/linear/test_linear_2d.py b/tests/torch/nn/modules/linear/test_linear_2d.py deleted file mode 100644 index a9e12068..00000000 --- a/tests/torch/nn/modules/linear/test_linear_2d.py +++ /dev/null @@ -1,79 +0,0 @@ -""" -torchrun --nproc_per_node=4 test_linear_2d.py -""" -from copy import deepcopy - -import torch -import torch.distributed as dist - -from oslo.torch.distributed import ParallelContext, ParallelMode -from oslo.torch.nn import Linear2D -from oslo.torch.nn.parallel.tensor_parallel.utils import ( - split_2d, - gather_2d, - split_bias_2d, -) - - -def test_linear_2d(pc): - batch_size = 2 - seq_len = 2 - input_dim = 4 - hidden_dim = 8 - summa_dim = pc.get_world_size(ParallelMode.TENSOR_2D_COL) - - input_ = torch.randn((batch_size, seq_len, input_dim)).cuda() - target = torch.randn((batch_size, seq_len, hidden_dim)).cuda() - dist.broadcast(input_, src=0) - dist.broadcast(target, src=0) - - linear = torch.nn.Linear(input_dim, hidden_dim).cuda() - w = deepcopy(linear.weight.data) - b = deepcopy(linear.bias.data) - - out = linear(input_) - optimizer = torch.optim.Adam(linear.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(out, target) - loss.backward() - optimizer.step() - out_update = linear(input_) - - input_ = split_2d(input_, summa_dim, parallel_context=pc) - ptarget = split_2d(target, summa_dim, parallel_context=pc) - w = split_2d(w, summa_dim, parallel_context=pc) - b = split_bias_2d(b, summa_dim, parallel_context=pc) - - linear_2d = Linear2D(input_dim, hidden_dim, parallel_context=pc).cuda() - linear_2d.weight.data.copy_(w) - linear_2d.bias.data.copy_(b) - - pout = linear_2d(input_) - optimizer = torch.optim.Adam(linear_2d.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(pout, ptarget) - loss.backward() - optimizer.step() - - pout_update = linear_2d(input_) - pout = gather_2d(pout, summa_dim, parallel_context=pc) - pout_update = gather_2d(pout_update, summa_dim, parallel_context=pc) - - if pc.get_global_rank() == 0: - print("> Test forward...", end="") - assert torch.allclose(out, pout) - if pc.get_global_rank() == 0: - print("OK") - - if pc.get_global_rank() == 0: - print("> Test backward...", end="") - assert torch.allclose(out_update, pout_update) - if pc.get_global_rank() == 0: - print("OK") - - -if __name__ == "__main__": - pc = ParallelContext.from_torch( - tensor_parallel_size=4, tensor_parallel_mode=ParallelMode.TENSOR_2D - ) - if pc.get_global_rank() == 0: - print("Test tests/torch/nn/modules/linear/test_linear_2d.py") - test_linear_2d(pc) diff --git a/tests/torch/nn/modules/linear/test_linear_2p5d.py b/tests/torch/nn/modules/linear/test_linear_2p5d.py deleted file mode 100644 index d9fe117a..00000000 --- a/tests/torch/nn/modules/linear/test_linear_2p5d.py +++ /dev/null @@ -1,78 +0,0 @@ -""" -torchrun --nproc_per_node=4 test_linear_2p5d.py -""" -from copy import deepcopy - -import torch -import torch.distributed as dist - -from oslo.torch.distributed import ParallelContext, ParallelMode -from oslo.torch.nn import Linear2p5D -from oslo.torch.nn.parallel.tensor_parallel._2p5d._ops import split_2d, gather_2d - - -def test_linear_2p5d(pc): - batch_size = 4 - seq_len = 2 - input_dim = 4 - hidden_dim = 8 - tesseract_dim = pc.get_world_size(ParallelMode.TENSOR_2P5D_COL) - - input_ = torch.randn((batch_size, seq_len, input_dim)).cuda() - target = torch.randn((batch_size, seq_len, hidden_dim)).cuda() - dist.broadcast(input_, src=0) - dist.broadcast(target, src=0) - - linear = torch.nn.Linear(input_dim, hidden_dim).cuda() - w = deepcopy(linear.weight.data) - b = deepcopy(linear.bias.data) - - out = linear(input_) - optimizer = torch.optim.Adam(linear.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(out, target) - loss.backward() - optimizer.step() - - out_update = linear(input_) - input_ = split_2d(pc, input_, tesseract_dim) - ptarget = split_2d(pc, target, tesseract_dim) - - w = split_2d(pc, w, tesseract_dim, col_first=False) - b = b.chunk(tesseract_dim, dim=0)[pc.get_local_rank(ParallelMode.TENSOR_2P5D_ROW)] - - linear_2p5d = Linear2p5D(4, 4, parallel_context=pc).cuda() - linear_2p5d.weight.data.copy_(w) - linear_2p5d.bias.data.copy_(b) - - pout = linear_2p5d(input_) - optimizer = torch.optim.Adam(linear_2p5d.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(pout, ptarget) - loss.backward() - optimizer.step() - - pout_update = linear_2p5d(input_) - pout = gather_2d(pc, pout, tesseract_dim, False) - pout_update = gather_2d(pc, pout_update, tesseract_dim, False) - - if pc.get_global_rank() == 0: - print("> Test forward...", end="") - assert torch.allclose(out, pout) - if pc.get_global_rank() == 0: - print("OK") - - if pc.get_global_rank() == 0: - print("> Test backward...", end="") - assert torch.allclose(out_update, pout_update) - if pc.get_global_rank() == 0: - print("OK") - - -if __name__ == "__main__": - pc = ParallelContext.from_torch( - tensor_parallel_size=8, - tensor_parallel_depth=2, - tensor_parallel_mode=ParallelMode.TENSOR_2P5D, - ) - if pc.get_global_rank() == 0: - print("Test tests/torch/nn/modules/linear/test_linear_2p5d.py") - test_linear_2p5d(pc) diff --git a/tests/torch/nn/modules/linear/test_linear_3d.py b/tests/torch/nn/modules/linear/test_linear_3d.py deleted file mode 100644 index 0b9a22bd..00000000 --- a/tests/torch/nn/modules/linear/test_linear_3d.py +++ /dev/null @@ -1,82 +0,0 @@ -""" -torchrun --nproc_per_node=4 test_linear_3d.py -""" -from copy import deepcopy - -import torch -import torch.distributed as dist - -from oslo.torch.distributed import ParallelContext, ParallelMode -from oslo.torch.nn import Linear3D -from oslo.torch.nn.parallel.tensor_parallel.utils import ( - split_input_3d, - split_weight_3d, - split_bias_3d, - gather_output_3d, -) - - -def test_linear_3d(pc): - batch_size = 4 - seq_len = 2 - input_dim = 4 - hidden_dim = 8 - cubic_dim = pc.get_world_size(ParallelMode.TENSOR_3D_INPUT) - - input_ = torch.randn((batch_size, seq_len, input_dim)).cuda() - target = torch.randn((batch_size, seq_len, hidden_dim)).cuda() - dist.broadcast(input_, src=0) - dist.broadcast(target, src=0) - - linear = torch.nn.Linear(input_dim, hidden_dim).cuda() - w = deepcopy(linear.weight.data) - b = deepcopy(linear.bias.data) - - out = linear(input_) - optimizer = torch.optim.Adam(linear.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(out, target) - loss.backward() - optimizer.step() - - out_update = linear(input_) - input_ = split_input_3d(input_, cubic_dim, parallel_context=pc) - ptarget = split_input_3d(target, cubic_dim, parallel_context=pc) - - w = split_weight_3d(w, cubic_dim, parallel_context=pc) - b = split_bias_3d(b, cubic_dim, parallel_context=pc) - - linear_3d = Linear3D(input_dim, hidden_dim, parallel_context=pc).cuda() - linear_3d.weight.data.copy_(w) - linear_3d.bias.data.copy_(b) - - pout = linear_3d(input_) - optimizer = torch.optim.Adam(linear_3d.parameters(), lr=1e-3) - loss = torch.nn.MSELoss()(pout, ptarget) - loss.backward() - optimizer.step() - - pout_update = linear_3d(input_) - pout = gather_output_3d(pout, cubic_dim, parallel_context=pc) - pout_update = gather_output_3d(pout_update, cubic_dim, parallel_context=pc) - - if pc.get_global_rank() == 0: - print("> Test forward...", end="") - assert torch.allclose(out, pout) - if pc.get_global_rank() == 0: - print("OK") - - if pc.get_global_rank() == 0: - print("> Test backward...", end="") - assert torch.allclose(out_update, pout_update) - if pc.get_global_rank() == 0: - print("OK") - - -if __name__ == "__main__": - pc = ParallelContext.from_torch( - tensor_parallel_size=8, - tensor_parallel_mode=ParallelMode.TENSOR_3D, - ) - if pc.get_global_rank() == 0: - print("Test tests/torch/nn/modules/linear/test_linear_3d.py") - test_linear_3d(pc) diff --git a/tests/torch/nn/modules/loss/__init__.py b/tests/torch/nn/modules/loss/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/torch/nn/modules/loss/test_vocab_parallel_cross_entropy_1d.py b/tests/torch/nn/modules/loss/test_vocab_parallel_cross_entropy_1d.py deleted file mode 100644 index bffd4f47..00000000 --- a/tests/torch/nn/modules/loss/test_vocab_parallel_cross_entropy_1d.py +++ /dev/null @@ -1,66 +0,0 @@ -""" -torchrun --nproc_per_node=4 test_vocab_parallel_cross_entropy_1d.py -""" - -import torch -import torch.distributed as dist - -from oslo.torch.distributed import ParallelContext, ParallelMode -from oslo.torch.nn import VocabParallelCrossEntropyLoss1D -from oslo.torch.nn.parallel.tensor_parallel.utils import split_1d - - -def test_vocab_parallel_cross_entropy_1d(pc): - criterion_master = torch.nn.CrossEntropyLoss() - criterion = VocabParallelCrossEntropyLoss1D(parallel_context=pc) - - batch_size = 2 - seq_len = 4 - num_classes = 8 - world_size = pc.get_world_size(ParallelMode.TENSOR_1D) - - out_master = torch.randn(batch_size, seq_len, num_classes).cuda() - target = torch.randint( - num_classes, size=(batch_size, seq_len), dtype=torch.long - ).cuda() - dist.broadcast(out_master, src=0) - dist.broadcast(target, src=0) - - out = split_1d(out_master.clone(), world_size, dim=-1, parallel_context=pc) - out = out.clone() - out.requires_grad = True - loss = criterion(out, target) - - out_master = out_master.clone() - out_master.requires_grad = True - loss_master = criterion_master( - out_master.view(-1, out_master.size(-1)), target.view(-1) - ) - - if pc.get_global_rank() == 0: - print("> Test forward...", end="") - assert torch.allclose(loss_master, loss) - if pc.get_global_rank() == 0: - print("OK") - - loss_master.backward() - loss.backward() - - grad_master = out_master.grad - grad_master = split_1d(grad_master, world_size, dim=-1, parallel_context=pc) - grad = out.grad - - if pc.get_global_rank() == 0: - print("> Test backward...", end="") - assert torch.allclose(grad_master, grad) - if pc.get_global_rank() == 0: - print("OK") - - -if __name__ == "__main__": - pc = ParallelContext.from_torch(tensor_parallel_size=4) - if pc.get_global_rank() == 0: - print( - "Test tests/torch/nn/modules/linear/test_vocab_parallel_cross_entropy_1d.py" - ) - test_vocab_parallel_cross_entropy_1d(pc) diff --git a/tests/torch/nn/modules/loss/test_vocab_parallel_cross_entropy_2d.py b/tests/torch/nn/modules/loss/test_vocab_parallel_cross_entropy_2d.py deleted file mode 100644 index 12c6261d..00000000 --- a/tests/torch/nn/modules/loss/test_vocab_parallel_cross_entropy_2d.py +++ /dev/null @@ -1,68 +0,0 @@ -""" -torchrun --nproc_per_node=4 test_vocab_parallel_cross_entropy_2d.py -""" - -import torch -import torch.distributed as dist - -from oslo.torch.distributed import ParallelContext, ParallelMode -from oslo.torch.nn import VocabParallelCrossEntropyLoss2D -from oslo.torch.nn.parallel.tensor_parallel.utils import split_2d - - -def test_vocab_parallel_cross_entropy_2d(pc): - criterion_master = torch.nn.CrossEntropyLoss() - criterion = VocabParallelCrossEntropyLoss2D(parallel_context=pc) - - batch_size = 4 - seq_len = 6 - num_classes = 8 - summa_dim = pc.get_world_size(ParallelMode.TENSOR_2D_COL) - - out_master = torch.randn(batch_size, seq_len, num_classes).cuda() - target = torch.randint( - num_classes, size=(batch_size, seq_len), dtype=torch.long - ).cuda() - dist.broadcast(out_master, src=0) - dist.broadcast(target, src=0) - - out = split_2d(out_master.clone(), summa_dim, parallel_context=pc) - out = out.clone() - out.requires_grad = True - loss = criterion(out, target) - - out_master = out_master.clone() - out_master.requires_grad = True - loss_master = criterion_master( - out_master.view(-1, out_master.size(-1)), target.view(-1) - ) - - if pc.get_global_rank() == 0: - print("> Test forward...", end="") - assert torch.allclose(loss_master, loss) - if pc.get_global_rank() == 0: - print("OK") - - loss_master.backward() - loss.backward() - - grad_master = out_master.grad - grad_master = split_2d(grad_master, summa_dim, parallel_context=pc) - grad = out.grad - - if pc.get_global_rank() == 0: - print("> Test backward...", end="") - assert torch.allclose(grad_master, grad) - if pc.get_global_rank() == 0: - print("OK") - - -if __name__ == "__main__": - pc = ParallelContext.from_torch( - tensor_parallel_size=4, tensor_parallel_mode=ParallelMode.TENSOR_2D - ) - if pc.get_global_rank() == 0: - print( - "Test tests/torch/nn/modules/linear/test_vocab_parallel_cross_entropy_2d.py" - ) - test_vocab_parallel_cross_entropy_2d(pc) diff --git a/tests/torch/nn/modules/loss/test_vocab_parallel_cross_entropy_2p5d.py b/tests/torch/nn/modules/loss/test_vocab_parallel_cross_entropy_2p5d.py deleted file mode 100644 index 05588589..00000000 --- a/tests/torch/nn/modules/loss/test_vocab_parallel_cross_entropy_2p5d.py +++ /dev/null @@ -1,73 +0,0 @@ -""" -torchrun --nproc_per_node=4 test_vocab_parallel_cross_entropy_2p5d.py -""" - -import torch -import torch.distributed as dist - -from oslo.torch.distributed import ParallelContext, ParallelMode -from oslo.torch.nn import ( - VocabParallelCrossEntropyLoss2p5D, -) -from oslo.torch.nn.parallel.tensor_parallel.utils import split_2p5d - - -def test_vocab_parallel_cross_entropy_2p5d(pc): - criterion_master = torch.nn.CrossEntropyLoss() - criterion = VocabParallelCrossEntropyLoss2p5D(parallel_context=pc) - - batch_size = 4 - seq_len = 6 - num_classes = 8 - - tesseract_dim = pc.get_world_size(ParallelMode.TENSOR_2P5D_COL) - out_master = torch.randn(batch_size, seq_len, num_classes).cuda() - target = torch.randint( - num_classes, size=(batch_size, seq_len), dtype=torch.long - ).cuda() - dist.broadcast(out_master, src=0) - dist.broadcast(target, src=0) - - out = split_2p5d(out_master.clone(), tesseract_dim, parallel_context=pc) - out = out.clone() - out.requires_grad = True - - loss = criterion(out, target) - - out_master = out_master.clone() - out_master.requires_grad = True - loss_master = criterion_master( - out_master.view(-1, out_master.size(-1)), target.view(-1) - ) - - if pc.get_global_rank() == 0: - print("> Test forward...", end="") - assert torch.allclose(loss_master, loss) - if pc.get_global_rank() == 0: - print("OK") - - loss_master.backward() - loss.backward() - - grad_master = out_master.grad - grad_master = split_2p5d(grad_master, tesseract_dim, parallel_context=pc) - grad = out.grad - - if pc.get_global_rank() == 0: - print("> Test backward...", end="") - assert torch.allclose(grad_master, grad) - if pc.get_global_rank() == 0: - print("OK") - - -if __name__ == "__main__": - pc = ParallelContext.from_torch( - tensor_parallel_size=8, - tensor_parallel_depth=2, - tensor_parallel_mode=ParallelMode.TENSOR_2P5D, - ) - if pc.get_global_rank() == 0: - print( - "Test tests/torch/nn/modules/linear/test_vocab_parallel_cross_entropy_2p5d.py" - ) - test_vocab_parallel_cross_entropy_2p5d(pc) diff --git a/tests/torch/nn/modules/ngram_repeat_block/__init__.py b/tests/torch/nn/modules/ngram_repeat_block/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/torch/nn/modules/ngram_repeat_block/test_sequence_generator.py b/tests/torch/nn/modules/ngram_repeat_block/test_sequence_generator.py deleted file mode 100644 index 50bab798..00000000 --- a/tests/torch/nn/modules/ngram_repeat_block/test_sequence_generator.py +++ /dev/null @@ -1,123 +0,0 @@ -import math -import unittest - -import numpy as np -import torch - -from oslo.torch.nn import NGramRepeatBlock - -DEFAULT_TEST_VOCAB_SIZE = 100 - - -JIT_MSG = "Targeting OSS scriptability for the 1.6 release" - - -class TestSequenceGeneratorBase(unittest.TestCase): - def assertHypoTokens(self, hypo, tokens): - self.assertTensorEqual(hypo["tokens"], torch.LongTensor(tokens)) - - def assertHypoScore(self, hypo, pos_probs, normalized=True, lenpen=1.0): - pos_scores = torch.FloatTensor(pos_probs).log() - self.assertAlmostEqual(hypo["positional_scores"], pos_scores) - self.assertEqual(pos_scores.numel(), hypo["tokens"].numel()) - score = pos_scores.sum() - if normalized: - score /= pos_scores.numel() ** lenpen - self.assertLess(abs(score - hypo["score"]), 1e-6) - - def assertAlmostEqual(self, t1, t2): - self.assertEqual(t1.size(), t2.size(), "size mismatch") - self.assertLess((t1 - t2).abs().max(), 1e-4) - - def assertTensorEqual(self, t1, t2): - self.assertEqual(t1.size(), t2.size(), "size mismatch") - self.assertEqual(t1.ne(t2).long().sum(), 0) - - -@unittest.skipUnless(torch.cuda.is_available(), "") -class TestRepeatNgramBlocking(TestSequenceGeneratorBase): - def test_finds_repetitive_tokens(self): - bsz, vocab_size, beam_size, step = 2, 4, 1, 3 - generated_tok = torch.tensor( - [[2, 2, 2, 2], [3, 3, 3, 3]], dtype=torch.long, device="cuda" - ) - lprobs = torch.zeros((beam_size * bsz, vocab_size), device="cuda") - desired_result = lprobs.new_tensor( - [[0.0, 0.0, -math.inf, 0.0], [0.0, 0.0, 0.0, -math.inf]] - ) - - cuda_ext_result, baseline_result = self._compare_cuda_ext_to_default_implem( - bsz, beam_size, generated_tok, lprobs, step, 2 - ) - self.assertTensorEqual(cuda_ext_result, desired_result) - self.assertTensorEqual(baseline_result, desired_result) - - @unittest.skipIf(torch.__version__ < "1.6.0", JIT_MSG) - def test_jit_no_extension(self): - bsz, vocab_size, beam_size, step = 2, 4, 1, 3 - generated_tok = torch.tensor( - [[2, 2, 2, 2], [3, 3, 3, 3]], dtype=torch.long, device="cuda" - ) - lprobs = torch.zeros((beam_size * bsz, vocab_size), device="cuda") - blocker = NGramRepeatBlock(2, use_extension=False) - base_result = blocker(generated_tok, lprobs.clone(), bsz, beam_size, step) - scripted_blocker = torch.jit.script(blocker) - jit_result = scripted_blocker( - generated_tok, lprobs.clone(), bsz, beam_size, step - ) - self.assertTensorEqual(base_result, jit_result) - - def test_ngram_blocking_same_as_default_implem(self): - """Test that cuda extension returns same things as default impl in many settings.""" - vocab_size = 4 - step = 6 - for _ in range(2): - block_param = np.random.choice([1, 2, 3, 4]) - batch_size = np.random.randint(1, 8) - beam_size = np.random.choice([1, 2, 4, 8]) - lprobs = torch.zeros((beam_size * batch_size, vocab_size), device="cuda") - - generated_tok = torch.tensor( - np.random.randint( - 0, vocab_size, size=(batch_size * beam_size, step + 1) - ), - device="cuda", - dtype=torch.long, - ) - self._compare_cuda_ext_to_default_implem( - batch_size, - beam_size, - generated_tok, - lprobs, - step, - block_param, - ) - - def _compare_cuda_ext_to_default_implem( - self, bsz, beam_size, generated_tok, lprobs, step, block_param - ): - """Assert that cuda extension and default implem return the same thing.""" - blocker = NGramRepeatBlock(block_param) - assert blocker.use_extension, "Extension not compiled" - cuda_ext_result = blocker( - generated_tok, - lprobs.clone(), - bsz, - beam_size, - step, - ) - blocker.use_extension = False - baseline_result = blocker( - generated_tok, - lprobs.clone(), - bsz, - beam_size, - step, - ) - self.assertTensorEqual(cuda_ext_result, baseline_result) - blocker.use_extension = True - return cuda_ext_result, baseline_result - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/torch/nn/parallel/__init__.py b/tests/torch/nn/parallel/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/torch/nn/parallel/models/__init__.py b/tests/torch/nn/parallel/models/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/torch/nn/parallel/tasks/__init__.py b/tests/torch/nn/parallel/tasks/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/torch/nn/parallel/tasks/abstract_task.py b/tests/torch/nn/parallel/tasks/abstract_task.py deleted file mode 100644 index c8d5121e..00000000 --- a/tests/torch/nn/parallel/tasks/abstract_task.py +++ /dev/null @@ -1,44 +0,0 @@ -from abc import ABC - - -class AbstractTask(ABC): - @staticmethod - def get_model_class(): - raise NotImplementedError - - @staticmethod - def get_inference_sample(tokenizer): - raise NotImplementedError - - @staticmethod - def get_inference_output(tokenizer, output): - raise NotImplementedError - - @staticmethod - def get_training_dataset(): - raise NotImplementedError - - @staticmethod - def get_training_preprocessing(train_step, dataset): - raise NotImplementedError - - def get_training_inputs(self, sample, batch_size, max_length, tokenizer): - raise NotImplementedError - - @staticmethod - def name(): - raise NotImplementedError - - @staticmethod - def forward(model): - raise NotImplementedError - - @staticmethod - def tokenize(sample, batch_size, max_length, tokenizer): - return tokenizer( - [str(sample)] * batch_size, - return_tensors="pt", - padding="max_length", - truncation=True, - max_length=max_length, - ).to("cuda") diff --git a/tests/torch/nn/parallel/tasks/causal_lm_task.py b/tests/torch/nn/parallel/tasks/causal_lm_task.py deleted file mode 100644 index 55fa1f21..00000000 --- a/tests/torch/nn/parallel/tasks/causal_lm_task.py +++ /dev/null @@ -1,44 +0,0 @@ -from functools import partial - -from datasets import load_dataset -from tasks.abstract_task import AbstractTask -from transformers import AutoModelForCausalLM - - -class CausalLMTask(AbstractTask): - @staticmethod - def get_model_class(): - return AutoModelForCausalLM.from_pretrained - - @staticmethod - def get_inference_sample(tokenizer): - return "I don't want a lot for Christmas. There is just one thing" - - @staticmethod - def get_inference_output(tokenizer, output): - return tokenizer.decode(output[0]) - - @staticmethod - def get_training_dataset(): - return load_dataset("squad").data["train"]["context"] - - @staticmethod - def get_training_preprocessing(train_step, dataset): - return dataset[:train_step] - - def get_training_inputs(self, sample, batch_size, max_length, tokenizer): - inputs = self.tokenize(sample, batch_size, max_length, tokenizer) - - return { - "input_ids": inputs["input_ids"], - "attention_mask": inputs["attention_mask"], - "labels": inputs["input_ids"], - } - - @staticmethod - def name(): - return "causal_lm" - - @staticmethod - def forward(model): - return partial(model.generate, num_beams=3) diff --git a/tests/torch/nn/parallel/tasks/masked_lm_task.py b/tests/torch/nn/parallel/tasks/masked_lm_task.py deleted file mode 100644 index 9c68708c..00000000 --- a/tests/torch/nn/parallel/tasks/masked_lm_task.py +++ /dev/null @@ -1,35 +0,0 @@ -from tasks.abstract_task import AbstractTask -from transformers import AutoModelForMaskedLM - - -class MaskedLMTask(AbstractTask): - @staticmethod - def get_model_class(): - return AutoModelForMaskedLM.from_pretrained - - @staticmethod - def get_inference_sample(tokenizer): - return f"Manners maketh man. Do you {tokenizer.mask_token} what that means?" - - @staticmethod - def get_inference_output(tokenizer, output): - return tokenizer.decode(output.logits.argmax(-1)[0]) - - @staticmethod - def get_training_dataset(): - raise NotImplementedError - - @staticmethod - def get_training_preprocessing(train_step, dataset): - raise NotImplementedError - - def get_training_inputs(self, sample, batch_size, max_length, tokenizer): - raise NotImplementedError - - @staticmethod - def name(): - return "masked_lm" - - @staticmethod - def forward(model): - return model.forward diff --git a/tests/torch/nn/parallel/tasks/seq2seq_lm_task.py b/tests/torch/nn/parallel/tasks/seq2seq_lm_task.py deleted file mode 100644 index 716c3810..00000000 --- a/tests/torch/nn/parallel/tasks/seq2seq_lm_task.py +++ /dev/null @@ -1,47 +0,0 @@ -from functools import partial - -from datasets import load_dataset -from tasks.abstract_task import AbstractTask -from transformers import AutoModelForSeq2SeqLM - - -class Seq2SeqLMTask(AbstractTask): - @staticmethod - def get_model_class(): - return AutoModelForSeq2SeqLM.from_pretrained - - @staticmethod - def get_inference_sample(tokenizer): - return ( - "Life was like a box of chocolates. You never know what you’re gonna get." - ) - - @staticmethod - def get_inference_output(tokenizer, output): - return tokenizer.decode(output[0]) - - @staticmethod - def get_training_dataset(): - return load_dataset("wmt14", "de-en").data["train"][0] - - @staticmethod - def get_training_preprocessing(train_step, dataset): - return [(str(data[1]), str(data[0])) for data in dataset[:train_step]] - - def get_training_inputs(self, sample, batch_size, max_length, tokenizer): - src = self.tokenize(sample[0], batch_size, max_length, tokenizer) - tgt = self.tokenize(sample[1], batch_size, max_length, tokenizer) - - return { - "input_ids": src["input_ids"], - "attention_mask": src["attention_mask"], - "labels": tgt["input_ids"], - } - - @staticmethod - def name(): - return "seq2seq_lm" - - @staticmethod - def forward(model): - return partial(model.generate, num_beams=3) diff --git a/tests/torch/nn/parallel/tasks/sequence_classification_task.py b/tests/torch/nn/parallel/tasks/sequence_classification_task.py deleted file mode 100644 index 9e87e46b..00000000 --- a/tests/torch/nn/parallel/tasks/sequence_classification_task.py +++ /dev/null @@ -1,50 +0,0 @@ -from functools import partial - -import torch -from datasets import load_dataset -from tasks.abstract_task import AbstractTask -from transformers import AutoModelForSequenceClassification - - -class SequenceClassificationTask(AbstractTask): - @staticmethod - def get_model_class(): - return partial(AutoModelForSequenceClassification.from_pretrained, num_labels=3) - - @staticmethod - def get_inference_sample(tokenizer): - return "I will decide how I feel, I will be happy today." - - @staticmethod - def get_inference_output(tokenizer, output): - return output.logits.argmax(-1).item() - - @staticmethod - def get_training_dataset(): - return load_dataset("multi_nli").data["train"] - - @staticmethod - def get_training_preprocessing(train_step, dataset): - return [ - (f"{str(p)}\n{str(h)}", l.as_py()) - for p, h, l in list(zip(dataset[2], dataset[5], dataset[9]))[:train_step] - ] - - def get_training_inputs(self, sample, batch_size, max_length, tokenizer): - inputs = self.tokenize(sample[0], batch_size, max_length, tokenizer) - return { - "input_ids": inputs["input_ids"], - "attention_mask": inputs["attention_mask"], - "labels": torch.tensor(sample[1]) - .unsqueeze(0) - .repeat(batch_size, 1) - .to("cuda"), - } - - @staticmethod - def name(): - return "sequence_classification" - - @staticmethod - def forward(model): - return model.forward diff --git a/tests/torch/nn/parallel/test_inference.py b/tests/torch/nn/parallel/test_inference.py deleted file mode 100644 index 72478061..00000000 --- a/tests/torch/nn/parallel/test_inference.py +++ /dev/null @@ -1,72 +0,0 @@ -import os -from argparse import ArgumentParser - -from transformers import AutoTokenizer - -from tasks.causal_lm_task import CausalLMTask -from tasks.masked_lm_task import MaskedLMTask -from tasks.seq2seq_lm_task import Seq2SeqLMTask -from tasks.sequence_classification_task import SequenceClassificationTask -from utils import initialize_oslo, print_rank_0 - -os.environ["TOKENIZERS_PARALLELISM"] = "true" - -inference_tasks = { - task.name(): task - for task in [ - Seq2SeqLMTask(), - CausalLMTask(), - MaskedLMTask(), - SequenceClassificationTask(), - ] -} - -parser = ArgumentParser() -parser.add_argument("--local_rank", type=int, default=0) -parser.add_argument("--task", type=str, required=True) -parser.add_argument("--model", type=str, required=True) -parser.add_argument("--tokenizer", type=str, default=None) -parser.add_argument("--input", type=str, default=None) -parser.add_argument("--data_parallel_size", type=int, default=1) -parser.add_argument("--pipeline_parallel_size", type=int, default=1) -parser.add_argument("--tensor_parallel_size", type=int, default=1) -parser.add_argument("--tensor_parallel_depth", type=int, default=1) -parser.add_argument("--tensor_parallel_mode", type=str, default="1D") -args = parser.parse_args() - - -assert args.task in inference_tasks, ( - f"{args.task} is not supported task. " - f"Please choose one of {inference_tasks}. " - "If there are no major problems, it will work for other tasks as well, " - "but I haven't tested it, so if you encounter any problems, " - "please report them through the github issue." -) - -task = inference_tasks[args.task] - -if args.tokenizer is None: - args.tokenizer = args.model - -tokenizer = AutoTokenizer.from_pretrained(args.tokenizer) -model = task.get_model_class()(args.model) -input = args.input if args.input is not None else task.get_inference_sample(tokenizer) - -output_before = task.get_inference_output( - tokenizer, task.forward(model)(**tokenizer(input, return_tensors="pt")) -) - -model, pc = initialize_oslo(args, model) - -output_after = task.get_inference_output( - tokenizer, task.forward(model)(**tokenizer(input, return_tensors="pt").to("cuda")) -) - -print_rank_0( - message=f""" -Result: -> Input: {input} -> Output before: {output_before} -> Output after: {output_after}""", - pc=pc, -) diff --git a/tests/torch/nn/parallel/test_merging.py b/tests/torch/nn/parallel/test_merging.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/torch/nn/parallel/test_training.py b/tests/torch/nn/parallel/test_training.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/torch/optim/__init__.py b/tests/torch/optim/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/torch/utils/__init__.py b/tests/torch/utils/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/training.py b/tests/training.py new file mode 100644 index 00000000..69f189cc --- /dev/null +++ b/tests/training.py @@ -0,0 +1,218 @@ +import os +import random +import numpy as np +import torch +import torch.distributed as dist +import transformers +import oslo +import wandb +import math + +from copy import deepcopy +from datasets import load_dataset +from torch.optim import AdamW +from torch.utils.data.distributed import DistributedSampler +from torch.utils.data import DataLoader +from transformers import AutoTokenizer + +from tqdm import tqdm +from tests.tasks.model_task import ModelTask +from oslo import ParallelContext, ParallelMode +from oslo.torch.nn.parallel import TensorParallel, PipelineParallel +from oslo.torch.nn.parallel.data_parallel import DistributedDataParallel as DDP + + +from tests.util.arg_parser import get_args + +# Define tensor parallel mode +tensor_parallel_mode_map = { + "1D": ParallelMode.TENSOR_1D, + "2D": ParallelMode.TENSOR_2D, + "2D_ROW": ParallelMode.TENSOR_2D_ROW, + "2D_COL": ParallelMode.TENSOR_2D_COL, + "2P5D": ParallelMode.TENSOR_2P5D, + "2P5D_ROW": ParallelMode.TENSOR_2P5D_ROW, + "2P5D_COL": ParallelMode.TENSOR_2P5D_COL, + "2P5D_DEP": ParallelMode.TENSOR_2P5D_DEP, + "2P5D_XZ": ParallelMode.TENSOR_2P5D_XZ, + "3D": ParallelMode.TENSOR_3D, + "3D_INPUT": ParallelMode.TENSOR_3D_INPUT, + "3D_WEIGHT": ParallelMode.TENSOR_3D_WEIGHT, + "3D_OUTPUT": ParallelMode.TENSOR_3D_OUTPUT, +} + + +def torch_ddp_dataloader(dataset, batch_size, parallel_context, args): + """DDP func""" + num_workers = 1 + if args.tensor_parallel_size > 1: + rank_group = tensor_parallel_mode_map[args.tensor_parallel_mode] + else: + rank_group = ParallelMode.DATA + + num_replicas = parallel_context.get_world_size(rank_group) + + rank = parallel_context.get_local_rank(rank_group) + + batch_sampler = DistributedSampler(dataset, num_replicas=num_replicas, rank=rank) + + d_loader = torch.utils.data.DataLoader( + dataset=dataset, + batch_size=batch_size // num_replicas, + pin_memory=True, + shuffle=False, + num_workers=num_workers, + sampler=batch_sampler, + ) + return d_loader + + +def main(): + + args = get_args() + name = ( + f"{args.model}-{args.task}-" + f"bsz={args.batch_size}-" + f"len={args.sequence_length}" + ) + + # 1. set tokenizer + + tokenizer = AutoTokenizer.from_pretrained( + args.model, model_max_length=args.sequence_length + ) + if tokenizer.pad_token is None: + tokenizer.pad_token = tokenizer.eos_token + + # 2. set seed + torch.manual_seed(42) + random.seed(42) + np.random.seed(42) + + args.local_rank = int(os.environ["LOCAL_RANK"]) + print(args.local_rank) + + # 3. Create parallelized model and optimizer + model_tasks = ModelTask() + model_tasks_config = model_tasks.get_model_task(args.task) + + model_oslo = model_tasks_config["class"](args.model) + optimizer_oslo = AdamW(model_oslo.parameters(), lr=3e-5) + + model_no_oslo = model_tasks_config["class"](args.model) + optimizer_no_oslo = AdamW(model_no_oslo.parameters(), lr=3e-5) + + parallel_context = ParallelContext.from_torch( + data_parallel_size=args.data_parallel_size, + pipeline_parallel_size=args.pipeline_parallel_size, + tensor_parallel_size=args.tensor_parallel_size, + tensor_parallel_mode=tensor_parallel_mode_map[args.tensor_parallel_mode], + ) + + if args.tensor_parallel_size > 1: + model_oslo = TensorParallel(model_oslo, parallel_context) + + if args.data_parallel_size > 1: + model_oslo = DDP(model_oslo, parallel_context) + + assert ( + args.tensor_parallel_size > 1 or args.data_parallel_size > 1 + ), "Check the parallel strategy" + + oslo.ready(model_oslo, parallel_context) + + if args.tensor_parallel_size == 1 and args.data_parallel_size > 1: + torch.cuda.set_device(dist.get_rank()) + model_no_oslo = model_no_oslo.cuda(dist.get_rank()) + model_no_oslo = torch.nn.parallel.DistributedDataParallel( + model_no_oslo, device_ids=[dist.get_rank()], find_unused_parameters=False + ) + + # 4. Initialize wandb and create folders + if not dist.is_initialized() or dist.get_rank() == 0: + wandb.init(project="test", name=name) + os.makedirs("tests/ckpt", exist_ok=True) + os.makedirs("tests/cache", exist_ok=True) + + dist.barrier() + + # 5. Load dataset and do preprocessing + dataset = model_tasks_config["load_dataset"] + torch_dataset = model_tasks_config["preprocessing_map_func"]( + dataset, tokenizer, args + ) + + if args.data_parallel_size > 1: + oslo_model_dataloader = torch_ddp_dataloader( + torch_dataset, args.batch_size, parallel_context, args + ) + else: + oslo_model_dataloader = DataLoader( + torch_dataset, batch_size=args.batch_size, shuffle=False, drop_last=True + ) + + # 6. Train model + step = 0 + model_no_oslo.cuda() + + for ep in range(args.epoch): + save_model_dir = f"tests/ckpt/checkpoint_{str(ep)}" + + if dist.get_rank() == 0: + print(f"Start training epoch: {ep}") + + for _, sample in enumerate(tqdm(oslo_model_dataloader)): + model_oslo.zero_grad() + model_no_oslo.zero_grad() + inputs = {k: v.cuda() for k, v in sample.items() if k != "guid"} + + # 7. Run no oslo model + oslo_loss = model_oslo(**inputs).loss + + # 8. Run no oslo model + no_oslo_loss = model_no_oslo(**inputs).loss + if dist.get_rank() == 0: + print( + f"[oslo loss/no_oslo loss]: {oslo_loss.item():.4f} / {no_oslo_loss.item():.4f}" + ) + wandb.log( + { + "oslo_loss": oslo_loss.item(), + "no_oslo_loss": no_oslo_loss.item(), + "time": step, + } + ) + + step += 1 + + oslo_loss.backward() + optimizer_oslo.step() + + no_oslo_loss.backward() + optimizer_no_oslo.step() + + dist.barrier() + # 9. Save oslo model + if ep % args.save_interval == 0: + if not dist.is_initialized() or dist.get_rank() == 0: + os.makedirs(save_model_dir, exist_ok=True) + + model_oslo.save_pretrained( + save_directory=save_model_dir, merge_checkpoints=False + ) + + dist.barrier() + # 10. Save last oslo model where if not saved model in last epoch + if ep % args.save_interval != 0: + if not dist.is_initialized() or dist.get_rank() == 0: + os.makedirs(save_model_dir, exist_ok=True) + + model_oslo.save_pretrained( + save_directory=save_model_dir, merge_checkpoints=False + ) + + wandb.finish() + + +if __name__ == "__main__": + main() diff --git a/tests/util/arg_parser.py b/tests/util/arg_parser.py new file mode 100644 index 00000000..62f5a507 --- /dev/null +++ b/tests/util/arg_parser.py @@ -0,0 +1,23 @@ +import argparse + + +def get_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--local-rank", default=0, type=int) + # parser.add_argument("--config", required=True, type=str) + parser.add_argument("--task", required=True, type=str) + parser.add_argument("--model", required=True, type=str) + parser.add_argument("--tokenizer", default=None, type=str) + parser.add_argument("--batch_size", required=False, type=int) + parser.add_argument("--sequence_length", required=False, type=int) + parser.add_argument("--train_step", required=False, type=int) + parser.add_argument("--save_interval", required=False, type=int) + parser.add_argument("--tensor_parallel_size", default=1, type=int) + parser.add_argument("--data_parallel_size", default=1, type=int) + parser.add_argument("--pipeline_parallel_size", default=1, type=int) + parser.add_argument("--tensor_parallel_depth", default=1, type=int) + parser.add_argument("--epoch", default=1, type=int) + parser.add_argument("--tensor_parallel_mode", default="1D", type=str) + parser.add_argument("--merge_dir", required=False, type=str) + args = parser.parse_args() + return args diff --git a/tests/torch/nn/parallel/utils.py b/tests/util/oslo.py similarity index 83% rename from tests/torch/nn/parallel/utils.py rename to tests/util/oslo.py index 3e0cc8a3..3f95a351 100644 --- a/tests/torch/nn/parallel/utils.py +++ b/tests/util/oslo.py @@ -1,11 +1,11 @@ import oslo -from oslo import ParallelMode +from oslo.torch.distributed.parallel_context import ParallelContext, ParallelMode from oslo.torch.nn.parallel import TensorParallel, PipelineParallel def initialize_oslo(args, model): try: - pc = oslo.ParallelContext.from_torch( + pc = ParallelContext.from_torch( data_parallel_size=args.data_parallel_size, pipeline_parallel_size=args.pipeline_parallel_size, tensor_parallel_size=args.tensor_parallel_size, @@ -22,10 +22,10 @@ def initialize_oslo(args, model): model = TensorParallel(model, pc) if pc.get_world_size(ParallelMode.PIPELINE) > 1: model = PipelineParallel(model, pc) - oslo.ready(model, pc) - except: + except Exception as e: + print(e) pc = None model = model.cuda() @@ -36,4 +36,5 @@ def print_rank_0(message, pc): if pc is None: print(message) elif pc.get_global_rank() == 0: + print(f"Rank :{pc.get_global_rank()}") print(message)