Spaces:
Running
Running
""" | |
Download, preprocess and serve the TinyStories dataset as a DataLoader. | |
""" | |
import argparse | |
import glob | |
import json | |
import os | |
import random | |
from typing import List | |
from concurrent.futures import ProcessPoolExecutor | |
from functools import partial | |
import numpy as np | |
import requests | |
import sentencepiece as spm | |
import torch | |
import torch.distributed as dist | |
from tqdm import tqdm | |
from tokenizer import Tokenizer | |
DATA_CACHE_DIR = "data" | |
def download_file(url: str, fname: str, chunk_size=1024): | |
"""Helper function to download a file from a given url""" | |
resp = requests.get(url, stream=True) | |
total = int(resp.headers.get("content-length", 0)) | |
with open(fname, "wb") as file, tqdm( | |
desc=fname, | |
total=total, | |
unit="iB", | |
unit_scale=True, | |
unit_divisor=1024, | |
) as bar: | |
for data in resp.iter_content(chunk_size=chunk_size): | |
size = file.write(data) | |
bar.update(size) | |
def download(): | |
"""Downloads the TinyStories dataset to DATA_CACHE_DIR""" | |
os.makedirs(DATA_CACHE_DIR, exist_ok=True) | |
# download the TinyStories dataset, unless it's already downloaded | |
data_url = "https://huggingface.co/datasets/roneneldan/TinyStories/resolve/main/TinyStories_all_data.tar.gz" | |
data_filename = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data.tar.gz") | |
if not os.path.exists(data_filename): | |
print(f"Downloading {data_url} to {data_filename}...") | |
download_file(data_url, data_filename) | |
else: | |
print(f"{data_filename} already exists, skipping download...") | |
# unpack the tar.gz file into all the data shards (json files) | |
data_dir = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data") | |
if not os.path.exists(data_dir): | |
os.makedirs(data_dir, exist_ok=True) | |
print(f"Unpacking {data_filename}...") | |
os.system(f"tar -xzf {data_filename} -C {data_dir}") | |
else: | |
print(f"{data_dir} already exists, skipping unpacking...") | |
# print a single example just for debugging and such | |
shard_filenames = sorted(glob.glob(os.path.join(data_dir, "*.json"))) | |
with open(shard_filenames[0], "r") as f: | |
data = json.load(f) | |
print("Download done.") | |
print(f"Number of shards: {len(shard_filenames)}") | |
print(f"Example story:\n{data[0]}") | |
def train_vocab(vocab_size): | |
""" | |
Trains a custom sentencepiece tokenizer on the TinyStories dataset. | |
The custom tokenizer files will be saved in DATA_CACHE_DIR/tok{N} directories, | |
where N is the vocab size. This is also where the pretok .bin files will go. | |
""" | |
assert vocab_size > 0, "Vocab size must be positive" | |
# output file prefix path for sentencepiece | |
prefix = os.path.join(DATA_CACHE_DIR, f"tok{vocab_size}") | |
# how many shards we'll use for vocab training, kept low for efficiency | |
num_shards = 10 | |
# 1) export a large chunk of text as a single text file tiny.txt | |
tiny_file = os.path.join(DATA_CACHE_DIR, "tiny.txt") | |
data_dir = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data") | |
shard_filenames = sorted(glob.glob(os.path.join(data_dir, "*.json"))) | |
print(f"Writing temporary file {tiny_file} with {num_shards} shards...") | |
with open(tiny_file, "w", encoding="utf-8") as of: | |
for shard in tqdm(shard_filenames[:num_shards]): | |
with open(shard, "r") as f: | |
data = json.load(f) | |
for example in data: | |
text = example["story"] | |
text = text.strip() | |
of.write(text + "\n") | |
print(f"Size is: {os.path.getsize(tiny_file) / 1024 / 1024:.2f} MB") | |
# 2) train the sentencepiece model | |
print("Will now train the vocab...") | |
spm.SentencePieceTrainer.train(input=tiny_file, | |
model_prefix=prefix, | |
model_type="bpe", | |
vocab_size=vocab_size, | |
self_test_sample_size=0, | |
input_format="text", | |
character_coverage=1.0, | |
num_threads=os.cpu_count(), | |
split_digits=True, | |
allow_whitespace_only_pieces=True, | |
byte_fallback=True, | |
unk_surface=r" \342\201\207 ", | |
normalization_rule_name="identity") | |
# 3) optional cleanup, ask the user if they'd like to delete tiny.txt | |
dec = input(f"Delete the temporary file {tiny_file}? [y/N] ") | |
if dec.lower() == "y": | |
os.remove(tiny_file) | |
print(f"Deleted {tiny_file}") | |
print(f"Trained tokenizer is in {prefix}.model") | |
print("Done.") | |
def process_shard(args, vocab_size): | |
shard_id, shard = args | |
tokenizer_model = get_tokenizer_model_path(vocab_size) | |
enc = Tokenizer(tokenizer_model) | |
with open(shard, "r") as f: | |
data = json.load(f) | |
all_tokens = [] | |
for example in tqdm(data, position=shard_id): | |
text = example["story"] | |
text = text.strip() # get rid of leading/trailing whitespace | |
tokens = enc.encode(text, bos=True, eos=False) # encode the text, use BOS | |
all_tokens.extend(tokens) | |
# convert to uint16 nparray | |
all_tokens = np.array(all_tokens, dtype=np.uint16) | |
# calculate the output filename | |
if vocab_size == 0: | |
# if we're using Llama 2, just save the tokenized file in the same dir | |
tokenized_filename = shard.replace(".json", ".bin") | |
else: | |
# save .bin files into a new tok{N} directory | |
bin_dir = os.path.join(DATA_CACHE_DIR, f"tok{vocab_size}") | |
shard_basename = os.path.basename(shard) | |
bin_basename = shard_basename.replace(".json", ".bin") | |
tokenized_filename = os.path.join(bin_dir, bin_basename) | |
# write the bytes | |
with open(tokenized_filename, "wb") as f: | |
f.write(all_tokens.tobytes()) | |
# calculate the average sequence length (they are separated by BOS=1) | |
avg_seq_len = all_tokens.size / ((all_tokens == 1).sum()) | |
print(f"Saved {tokenized_filename}, average seqlen: {avg_seq_len:.2f}") | |
def pretokenize(vocab_size): | |
# iterate the shards and tokenize all of them one by one | |
data_dir = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data") | |
shard_filenames = sorted(glob.glob(os.path.join(data_dir, "*.json"))) | |
if vocab_size > 0: | |
# .bin files will be saved into tok{N} directory, create it once here | |
bin_dir = os.path.join(DATA_CACHE_DIR, f"tok{vocab_size}") | |
os.makedirs(bin_dir, exist_ok=True) | |
# process all the shards in a process pool | |
fun = partial(process_shard, vocab_size=vocab_size) | |
with ProcessPoolExecutor() as executor: | |
executor.map(fun, enumerate(shard_filenames)) | |
print("Done.") | |
class PretokDataset(torch.utils.data.IterableDataset): | |
"""Loads pretokenized examples from disk and yields them as PyTorch tensors.""" | |
def __init__(self, split, max_seq_len, vocab_size, vocab_source): | |
super().__init__() | |
self.split = split | |
self.max_seq_len = max_seq_len | |
self.vocab_size = vocab_size | |
self.vocab_source = vocab_source | |
def __iter__(self): | |
# get worker info within a DataLoader | |
worker_info = torch.utils.data.get_worker_info() | |
worker_id = worker_info.id if worker_info else 0 | |
# get DDP rank info | |
rank = dist.get_rank() if dist.is_initialized() else 0 | |
# combine the worker_id and worker_rank to create a unique seed for rng | |
seed = 42 + worker_id + 1337 * rank | |
rng = random.Random(seed) | |
print(f"Created a PretokDataset with rng seed {seed}") | |
if self.vocab_source == "llama2": | |
# the .bin files are right along the .json files | |
bin_dir = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data") | |
shard_filenames = sorted(glob.glob(os.path.join(bin_dir, "*.bin"))) | |
elif self.vocab_source == "custom": | |
# the .bin files are in tok{N} directory | |
bin_dir = os.path.join(DATA_CACHE_DIR, f"tok{self.vocab_size}") | |
shard_filenames = sorted(glob.glob(os.path.join(bin_dir, "*.bin"))) | |
# train/test split. let's use only shard 0 for test split, rest train | |
shard_filenames = shard_filenames[1:] if self.split == "train" else shard_filenames[:1] | |
assert len(shard_filenames)>0, f"No bin files found in {bin_dir}" | |
while True: | |
rng.shuffle(shard_filenames) | |
for shard in shard_filenames: | |
# open the dataset for reading but keep it on disk with memmap | |
m = np.memmap(shard, dtype=np.uint16, mode="r") | |
num_batches = len(m) // self.max_seq_len | |
num_batches -= 1 # drop the last partial batch | |
assert num_batches > 0, "this shard is way too small? investigate." | |
ixs = list(range(num_batches)) | |
rng.shuffle(ixs) | |
for ix in ixs: | |
start = ix * self.max_seq_len | |
end = start + self.max_seq_len + 1 | |
# calling .astype will copy the data into a new numpy array, now in RAM | |
chunk = torch.from_numpy((m[start:end]).astype(np.int64)) | |
x = chunk[:-1] | |
y = chunk[1:] | |
yield x, y | |
# ----------------------------------------------------------------------------- | |
# public interface functions | |
def get_tokenizer_model_path(vocab_size): | |
""" | |
Returns path to the sentencepiece tokenizer model for a given vocab size | |
vocab_size = 0 designates the default Llama 2 tokenizer, in that case | |
None is returned. | |
""" | |
if vocab_size == 0: | |
return None | |
else: | |
return os.path.join(DATA_CACHE_DIR, f"tok{vocab_size}.model") | |
class Task: | |
def iter_batches(batch_size, device, num_workers=0, **dataset_kwargs): | |
ds = PretokDataset(**dataset_kwargs) | |
dl = torch.utils.data.DataLoader( | |
ds, batch_size=batch_size, pin_memory=True, num_workers=num_workers | |
) | |
for x, y in dl: | |
x = x.to(device, non_blocking=True) | |
y = y.to(device, non_blocking=True) | |
yield x, y | |
# ----------------------------------------------------------------------------- | |
# CLI for constructing the dataset | |
if __name__ == "__main__": | |
""" | |
These stages are designed to be run in order. | |
To tokenize data with the Llama 2 tokenizer: | |
python tinystories.py download | |
python tinystories.py pretokenize | |
To tokenize data with a custom tokenizer we train ourselves with sentencepiece, e.g.: | |
python tinystories.py download | |
python tinystories.py train_vocab --vocab_size=2048 | |
python tinystories.py pretokenize --vocab_size=2048 | |
""" | |
parser = argparse.ArgumentParser() | |
parser.add_argument("stage", type=str, choices=["download", "pretokenize", "train_vocab"]) | |
parser.add_argument("--vocab_size", type=int, default=0, help="pretokenization vocab size. 0 = use Llama 2 tokenizer.") | |
args = parser.parse_args() | |
# depending on the stage call the appropriate function | |
if args.stage == "download": | |
download() | |
elif args.stage == "train_vocab": | |
train_vocab(vocab_size=args.vocab_size) | |
elif args.stage == "pretokenize": | |
pretokenize(vocab_size=args.vocab_size) | |
else: | |
raise ValueError(f"Unknown stage {args.stage}") | |