Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@
**/.idea
**/venv
**/.DS_Store
**/data
**/runs
**/model.pt
31 changes: 31 additions & 0 deletions hugging-face-accelerate/manifests/checkpointing/manifest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
kind: AIchorManifest
apiVersion: 0.2.2

builder:
image: image
dockerfile: ./Dockerfile
context: .

spec:
operator: pytorch
image: image
command: "torchrun --nproc_per_node 2 main.py --batch_size 400 --enable_checkpointing true --checkpoint_dir gpt2_chkpt --checkpoint_interval 10 --num_epochs 400 --model openai-community/gpt2" # --nproc_per_node=={Number of GPUs}

tensorboard:
enabled: true

types:
Worker:
count: 1
resources:
cpus: 16
ramRatio: 4
# rdma: # new
# # list of network devices to mount on the container
# devices: ["sriov_a", "sriov_b", "sriov_c", "sriov_d"]
accelerators:
gpu:
count: 2
type: gpu
product: NVIDIA-A100-SXM4-80GB

7 changes: 7 additions & 0 deletions hugging-face-accelerate/src/constant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
AICHOR_AWS_ENDPOINT_URL: str = "AWS_ENDPOINT_URL"
AICHOR_INPUT_PATH: str = "AICHOR_INPUT_PATH"
AICHOR_OUTPUT_PATH: str = "AICHOR_OUTPUT_PATH"
AICHOR_OUTPUT_BUCKET_NAME: str = "AICHOR_OUTPUT_BUCKET_NAME"
AICHOR_TENSORBOARD_PATH: str = "AICHOR_LOGS_PATH"

HF_TOKEN: str = "HF_TOKEN"
85 changes: 85 additions & 0 deletions hugging-face-accelerate/src/data/checkpointing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import os
import shutil

from transformers import AutoTokenizer, PreTrainedTokenizer, PreTrainedTokenizerFast, AutoModelForSequenceClassification
from accelerate import Accelerator
from datasets import load_dataset, load_from_disk, Dataset, DatasetDict

from slugify import slugify
from s3fs import S3FileSystem

from constant import AICHOR_OUTPUT_BUCKET_NAME

# Save
def save_checkpoint(accelerator: Accelerator, epoch: int, checkpoint_dir: str, s3: S3FileSystem):
if s3 is None:
accelerator.save_state()
save_checkpoint_s3(accelerator=accelerator, epoch=epoch, checkpoint_dir=checkpoint_dir, s3=s3)

def save_checkpoint_s3(accelerator: Accelerator, epoch: int, checkpoint_dir: str, s3: S3FileSystem):
output_path = f"s3://{os.environ.get(AICHOR_OUTPUT_BUCKET_NAME)}/{checkpoint_dir}/checkpoint_epoch_{epoch}"
path = accelerator.save_state()
if accelerator.is_main_process:
s3.put(path, output_path, recursive=True)
# saving a "valid" file to make sure that checkpoint was fully saved.
with s3.open(f"{output_path}/valid", "w") as f:
f.write("1")
f.flush()
print(f"Checkpoint saved at {output_path}")
accelerator.wait_for_everyone()

# Load
def load_checkpoint(accelerator: Accelerator, s3: S3FileSystem, args) -> int:
checkpoint_path = args.load_checkpoint_name

# S3 not initialized, using local checkpoint
epoch = 0
if s3 is None:
if args.load_checkpoint_name is not None:
accelerator.load_state(checkpoint_path)
return get_epoch_from_path(checkpoint_path)
else:
accelerator.load_state()
return epoch

# retrieve from s3
if args.load_checkpoint_name == None:
checkpoint_path = get_last_checkpoint_path(checkpoint_dir=args.checkpoint_dir, s3=s3)
if checkpoint_path != None:
return load_checkpoint_s3(accelerator=accelerator, checkpoint_path=checkpoint_path, s3=s3)
return epoch

def load_checkpoint_s3(accelerator: Accelerator, checkpoint_path: str, s3: S3FileSystem):
checkpoint_local_path = "tmp_checkpoint"
if accelerator.is_local_main_process:
print(f"Loading checkpoint from {checkpoint_path}")
s3.get(checkpoint_path, checkpoint_local_path, recursive=True)
accelerator.wait_for_everyone()
accelerator.load_state(checkpoint_local_path)
accelerator.wait_for_everyone() # wait for every process to finish loading
if accelerator.is_local_main_process:
shutil.rmtree(checkpoint_local_path)

# get epoch from checkpoint name
return get_epoch_from_path(checkpoint_path)

def get_epoch_from_path(path: str) -> int:
checkpoint_name = path.split('/')[-1]
return int(checkpoint_name.replace("checkpoint_epoch_", "")) + 1

def get_last_checkpoint_path(checkpoint_dir: str, s3: S3FileSystem):
checkpoint_dir_full = f"s3://{os.environ.get(AICHOR_OUTPUT_BUCKET_NAME)}/{checkpoint_dir}"
try:
dirs = s3.listdir(checkpoint_dir_full)
except FileNotFoundError:
print(f"Couldn't find checkpoint at {checkpoint_dir_full}, starting from epoch 0")
return None
sorted_dirs = sorted(dirs, key=lambda x: int(x['Key'].split('checkpoint_epoch_')[-1]), reverse=True)
for directory in sorted_dirs:
directory_key = directory['Key']
files_in_dir = s3.listdir(f"s3://{directory_key}")
for file in files_in_dir:
if file['Key'].endswith('/valid'):
return f"s3://{directory['Key']}"

return None
30 changes: 30 additions & 0 deletions hugging-face-accelerate/src/data/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import os

from accelerate import Accelerator
from datasets import load_dataset, load_from_disk, Dataset, DatasetDict

from s3fs import S3FileSystem

from constant import AICHOR_INPUT_PATH

def get_dataset(accelerator: Accelerator, s3: S3FileSystem) -> (Dataset | DatasetDict):
if s3 is None:
return get_dataset_from_remote()
return get_dataset_s3(accelerator=accelerator, s3=s3)

def get_dataset_from_remote() -> (Dataset | DatasetDict):
return load_dataset("glue", "mrpc")

def get_dataset_s3(accelerator: Accelerator, s3: S3FileSystem) -> (Dataset | DatasetDict):
s3_path = os.environ.get(AICHOR_INPUT_PATH) + "glue-mrpc"
dataset: Dataset | DatasetDict

if s3.exists(s3_path):
dataset = load_from_disk(s3_path) # accepts S3 paths
else:
dataset = get_dataset_from_remote()
if accelerator.is_main_process:
dataset.save_to_disk(s3_path) # accepts S3 paths
accelerator.wait_for_everyone()

return dataset
51 changes: 51 additions & 0 deletions hugging-face-accelerate/src/data/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import os
import shutil

from transformers import AutoModelForSequenceClassification
from accelerate import Accelerator

from slugify import slugify
from s3fs import S3FileSystem

from constant import HF_TOKEN, AICHOR_INPUT_PATH

def get_model(accelerator: Accelerator, s3: S3FileSystem, model_name: str):
if s3 is None:
return get_model_from(model_name)
return get_model_s3(accelerator=accelerator, s3=s3, model_name=model_name)

def get_model_from(load_from: str):
return AutoModelForSequenceClassification.from_pretrained(load_from, token=os.environ.get(HF_TOKEN))

def get_model_s3(accelerator: Accelerator, s3: S3FileSystem, model_name: str):
model_slug = f"{slugify(model_name)}-model"
local_path = model_slug
load_from = ""
should_save_to_s3 = False
s3_path = os.environ.get(AICHOR_INPUT_PATH) + model_slug

# download model from S3 if present
if s3.exists(s3_path):
# only main process should download from s3
if accelerator.is_local_main_process:
s3.get(s3_path, local_path, recursive=True)
load_from = local_path
else: # download from HuggingFace
load_from = model_name
should_save_to_s3 = True

accelerator.wait_for_everyone() # wait for local main process to finish downloading the tokenizer from s3
model = AutoModelForSequenceClassification.from_pretrained(load_from, token=os.environ.get(HF_TOKEN))

# cleanup downloaded model from S3 from local main process
if (not should_save_to_s3) and accelerator.is_local_main_process:
shutil.rmtree(local_path)

accelerator.wait_for_everyone() # wait for all model loaded on all processes
if should_save_to_s3 and accelerator.is_main_process:
model.save_pretrained(local_path)
s3.put(local_path, s3_path, recursive=True)
shutil.rmtree(local_path)

accelerator.wait_for_everyone() # wait for local main process to finish cleaning directory
return model
24 changes: 24 additions & 0 deletions hugging-face-accelerate/src/data/save_final_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import os
import shutil

from accelerate import Accelerator

from s3fs import S3FileSystem

from constant import AICHOR_OUTPUT_PATH

def save_final_model(accelerator: Accelerator, model, s3: S3FileSystem):
local_path = "final_model"
output_path = os.environ.get(AICHOR_OUTPUT_PATH)

if accelerator.is_main_process:
print(f"Saving trained model at: {output_path} from main process")
accelerator.save_model(model, local_path)
print(f"Model saved at {local_path}")
if s3 is not None:
print(f"Uploading model to {output_path}")
s3.put(local_path, output_path, recursive=True)
shutil.rmtree(local_path)
print("Uploaded")

accelerator.wait_for_everyone()
52 changes: 52 additions & 0 deletions hugging-face-accelerate/src/data/tokenizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import os
import shutil

from transformers import AutoTokenizer, PreTrainedTokenizer, PreTrainedTokenizerFast
from accelerate import Accelerator
from slugify import slugify
from s3fs import S3FileSystem

from constant import HF_TOKEN, AICHOR_INPUT_PATH

def get_tokenizer(accelerator: Accelerator, s3: S3FileSystem, model_name: str) -> (PreTrainedTokenizer | PreTrainedTokenizerFast):
if s3 is None:
return get_tokenizer_from(model_name)
return get_tokenizer_s3(accelerator=accelerator, s3=s3, model_name=model_name)

def get_tokenizer_from(load_from: str) -> (PreTrainedTokenizer | PreTrainedTokenizerFast):
return AutoTokenizer.from_pretrained(load_from, token=os.environ.get(HF_TOKEN))

def get_tokenizer_s3(accelerator: Accelerator, s3: S3FileSystem, model_name: str) -> (PreTrainedTokenizer | PreTrainedTokenizerFast):
model_slug = f"{slugify(model_name)}-tokenizer"
local_path = model_slug
load_from = ""
should_save_to_s3 = False
s3_path = os.environ.get(AICHOR_INPUT_PATH) + model_slug

# download model from S3 if present
if s3.exists(s3_path):
# only main process should download from s3
if accelerator.is_local_main_process:
s3.get(s3_path, local_path, recursive=True)
load_from = local_path
else: # download from HuggingFace
load_from = model_name
should_save_to_s3 = True

accelerator.wait_for_everyone() # wait for local main process to finish downloading the tokenizer from s3
tokenizer = get_tokenizer_from(load_from)

accelerator.wait_for_everyone() # wait for all tokenizer loaded on all processes

# cleanup downloaded model from S3
if (not should_save_to_s3) and accelerator.is_local_main_process:
shutil.rmtree(local_path)

# save downloaded model from HuggingFace to S3
if should_save_to_s3 and accelerator.is_main_process:
tokenizer.save_pretrained(local_path)
s3.put(local_path, s3_path, recursive=True)
shutil.rmtree(local_path)

accelerator.wait_for_everyone() # wait cleanup tasks to end
return tokenizer
40 changes: 31 additions & 9 deletions hugging-face-accelerate/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
from accelerate import Accelerator
from accelerate.utils import ProjectConfiguration

from utils import get_dataset, get_tokenizer, get_model, save_final_model, TENSORBOARD_PATH, AWS_ENDPOINT_URL
from data.tokenizer import get_tokenizer
from data.dataset import get_dataset
from data.model import get_model
from data.checkpointing import save_checkpoint, load_checkpoint
from data.save_final_model import save_final_model
from constant import AICHOR_TENSORBOARD_PATH, AICHOR_AWS_ENDPOINT_URL

SEED = 42

Expand All @@ -23,11 +28,14 @@
def training_function(args: argparse.Namespace):
# Initialize accelerator
accelerator = Accelerator(
cpu=False,
cpu=args.cpu,
mixed_precision=args.mixed_precision,
project_dir=LOCAL_PROJECT_DIR,
project_config=ProjectConfiguration(
logging_dir=os.environ.get(TENSORBOARD_PATH),
automatic_checkpoint_naming=True,
project_dir=LOCAL_PROJECT_DIR,
total_limit=1,
logging_dir=os.environ.get(AICHOR_TENSORBOARD_PATH) if args.local is not None else None,
),
log_with="tensorboard",
)
Expand All @@ -36,7 +44,9 @@ def training_function(args: argparse.Namespace):
run = os.path.split(__file__)[-1].split(".")[0]
accelerator.init_trackers(run, {"lr": args.learning_rate, "num_epochs": args.num_epochs, "seed": SEED, "batch_size": args.batch_size})

s3 = s3fs.S3FileSystem(endpoint_url=os.environ.get(AWS_ENDPOINT_URL))
s3 = None
if not args.local:
s3 = s3fs.S3FileSystem(endpoint_url=os.environ.get(AICHOR_AWS_ENDPOINT_URL))

tokenizer = get_tokenizer(accelerator=accelerator, s3=s3, model_name=args.model)
datasets = get_dataset(accelerator=accelerator, s3=s3)
Expand Down Expand Up @@ -114,15 +124,19 @@ def collate_fn(examples):
model, optimizer, train_dataloader, eval_dataloader, lr_scheduler
)

# We need to keep track of how many total steps we have iterated over
overall_step = 0
starting_epoch = 0

if args.enable_checkpointing:
starting_epoch = load_checkpoint(accelerator=accelerator, s3=s3, args=args)

accelerator.wait_for_everyone()

# Now we train the model
if accelerator.is_main_process:
print("Start training")
start_time = time.time()

for epoch in range(0, args.num_epochs):
for epoch in range(starting_epoch, args.num_epochs):
model.train()
total_loss = 0
for step, batch in enumerate(train_dataloader):
Expand All @@ -138,8 +152,6 @@ def collate_fn(examples):
lr_scheduler.step()
optimizer.zero_grad()

overall_step += 1

model.eval()
for step, batch in enumerate(eval_dataloader):
# We could avoid this line since we set the accelerator with `device_placement=True`.
Expand All @@ -166,6 +178,10 @@ def collate_fn(examples):
step=epoch,
)

# Save checkpoint if enabled
if args.enable_checkpointing and (epoch + 1) % args.checkpoint_interval == 0:
save_checkpoint(accelerator=accelerator, epoch=epoch, checkpoint_dir=args.checkpoint_dir, s3=s3)

accelerator.wait_for_everyone()
if accelerator.is_main_process:
# display execution time
Expand Down Expand Up @@ -193,6 +209,12 @@ def main():
parser.add_argument("--batch_size", type=int, default=32, help="Batch size. Adjust depending on GPU memory available")
parser.add_argument("--num_epochs", type=int, default=12)
parser.add_argument("--learning_rate", type=float, default=2e-5)
parser.add_argument("--enable_checkpointing", type=bool, default=False, help="enable automatic checkpointing")
parser.add_argument("--checkpoint_interval", type=int, default=50, help="automatic checkpoint epoch interval")
parser.add_argument("--checkpoint_dir", type=str, default="checkpoints", help="checkpoint dir name on aichor output bucket. Used for both loading and saving.")
parser.add_argument("--load_checkpoint_name", type=str, default=None, help="Checkpoint name to load. Leave this unset to automatically load from latest checkpoint.")
parser.add_argument("--local", action='store_true', help="Run locally, disable dependency to AIchor S3 and environment variables.")
parser.add_argument("--cpu", action='store_true', help="Run on CPU. Disabled by default")
args = parser.parse_args()

training_function(args)
Expand Down
Loading