File size: 6,310 Bytes
d2a60ad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import os
import re
import multiprocessing
from pathlib import Path
from typing import Dict, List

from datasets import load_dataset, Dataset
from transformers import AutoTokenizer


os.environ["TOKENIZERS_PARALLELISM"] = "false"


DATASET_NAME_PATTERN = re.compile(r"[^a-zA-Z0-9]")


def download_dataset(
    ds_name: str,
    ds_config: str = None,
    ds_split: str = "train",
):
    """
    Download a dataset from the HuggingFace Hub. Will only save the

    Args:
        ds_name (`str`):
            The name of the dataset to load.
        ds_config (`str`, *optional*, Defaults to `None`):
            The configuration of the dataset to load.
        ds_split (`str`, *optional*, Defaults to `"train"`):
            The split of the dataset to load.

    Returns:
        len(ds) (`int`):
            The number of rows in the dataset.
    """
    if ds_name == "wikipedia":
        ds = load_wikipedia(ds_name, ds_config)
    else:
        if ds_config == "":
            ds_config = None
        ds = load_dataset(ds_name, ds_config, split=ds_split)

    chunk_and_save_dataset(
        ds, ds_name=ds_name, ds_config=ds_config, suffix=f"_{ds_split}_raw"
    )

    return len(ds)


def load_wikipedia(ds_name, ds_config):
    """
    Stream the wikipedia dataset from the HuggingFace Hub.

    Args:
        ds_name (`str`):
            The name of the dataset to load. Must be `"wikipedia"`.
        ds_config (`str`, *optional*, Defaults to `None`):
            The configuration of the dataset to load.

    Returns:
        ds (`datasets.Dataset`):
    """
    ds = load_dataset(ds_name, ds_config, streaming=True, split="train")

    def gen():
        for example in ds:
            yield {"text": example["text"]}

    return Dataset.from_generator(gen)


def chunk_and_save_dataset(
    ds: Dataset,
    chunk_size: int = 20_000,
    ds_name: str = None,
    ds_config: str = None,
    suffix: str = "",
):
    """
    Chunk a dataset into smaller datasets of size `chunk_size`.
    The name of the dataset will be used to create a folder in `/data`.

    Args:
        ds (`Dataset`):
            The dataset to chunk.
        chunk_size (`int`, *optional*, Defaults to `20_000`):
            The size of each chunk. Defaults to `20_000`.
        ds_name (`str`, *optional*, Defaults to `None`):
            The name of the dataset to load.
        ds_config (`str`, *optional*, Defaults to `None`):
            The configuration of the dataset to load.
        suffix (`str`, *optional*, Defaults to `""`):
            The suffix to add to the dataset name.


    Returns:
        chunks (`List[Dataset]`):
            The list of chunks.
    """

    if ds_config is None:
        ds_config = ""

    folder = Path("/data") / DATASET_NAME_PATTERN.sub("", ds_name + ds_config)
    folder.mkdir(exist_ok=True, parents=True)

    for chunk_num, start_idx in enumerate(range(0, len(ds), chunk_size)):
        end_idx = min(start_idx + chunk_size, len(ds))

        temp = ds.select(range(start_idx, end_idx))

        temp.to_parquet(str(folder / f"chunk_{chunk_num}{suffix}"))


def tokenize_dataset(
    ds_name: str,
    ds_config: str = None,
    ds_split: str = "train",
    model_name: str = None,
    opt_level: str = None,
    column_name: str = "text",
    num2skip: int = 0,
    num2embed: int = -1,
):
    """
    Tokenize the examples using the tokenizer. Sort by length

    Args:
        ds_name (`str`):
            The name of the dataset to load.

        ds_config (`str`, *optional*, Defaults to `None`):
            The configuration of the dataset to load.

        model_name (`str`, *optional*, Defaults to `None`):
            The name of the model to use for tokenization.

        opt_level (`str`, *optional*, Defaults to `None`):
            The optimization level to use for tokenization.

        column_name (`str`, *optional*, defaults to `text`):
            column name to use for tokenization. Defaults to `text`

        num2skip (`int`, *optional*, defaults to `0`):
            number of rows to skip. Defaults to `0`

        num2embed (`int`, *optional*, defaults to `-1`):
            number of rows to embed. Defaults to `-1`, which means all rows.

    Returns:
        ds (`Dataset`):
    """

    # TODO: option for controlling length for models that can go shorter/longer than 512

    folder = Path("/data") / DATASET_NAME_PATTERN.sub("", ds_name + ds_config)
    files = list(map(str, folder.glob(f"chunk_*_{ds_split}_raw")))

    ds = load_dataset("parquet", data_files=files, split="train")

    if num2embed == -1:
        num2embed = len(ds)
    ds = ds.select(range(num2skip, num2skip + num2embed))

    tokenizer = AutoTokenizer.from_pretrained(model_name)

    padding = "max_length" if opt_level == "O4" else False
    max_length = 512

    def tokenize(
        examples: Dict[str, List[str]],
    ):
        tokenized = tokenizer(
            examples[column_name],
            truncation=True,
            padding=padding,
            max_length=max_length,
        )
        tokenized["length"] = [len(x) for x in tokenized["input_ids"]]

        return tokenized

    tds = ds.map(
        tokenize,
        batched=True,
        batch_size=1000,
        remove_columns=set(ds.column_names) - {column_name},
        num_proc=multiprocessing.cpu_count(),
        desc="Tokenizing",
    )

    # sort to minimize padding
    if padding != "max_length":
        tds = tds.sort("length")

    chunk_and_save_dataset(
        tds, ds_name=ds_name, ds_config=ds_config, suffix=f"_{ds_split}_tokenized"
    )


def load_tokenized_dataset(
    ds_name: str,
    ds_config: str = None,
    ds_split: str = "train",
):
    """
    Load a tokenized dataset from disk.

    Args:
        ds_name (`str`):
            The name of the dataset to load.

        ds_config (`str`, *optional*, Defaults to `None`):
            The configuration of the dataset to load.

        ds_split (`str`, *optional*, Defaults to `"train"`):
            The split of the dataset to load.

    Returns:
        ds (`Dataset`):
    """

    folder = Path("/data") / DATASET_NAME_PATTERN.sub("", ds_name + ds_config)
    files = list(map(str, folder.glob(f"chunk_*_{ds_split}_tokenized")))

    return load_dataset("parquet", data_files=files, split="train")