8000 Seeded preprocessing by le1nux · Pull Request #295 · Modalities/modalities · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Se 8000 eded preprocessing #295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 92 additions & 17 deletions src/modalities/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,26 @@
import shutil
from datetime import datetime
from pathlib import Path
from typing import Type
from typing import Optional, Type

import click
import click_pathlib
from pydantic import BaseModel, FilePath

from modalities.api import (
FileExistencePolicy,
convert_pytorch_to_hf_checkpoint,
create_raw_data_index,
create_shuffled_dataset_chunk,
generate_text,
merge_packed_data_files,
pack_encoded_data,
shuffle_tokenized_data,
)
from modalities.batch import EvaluationResultBatch
from modalities.config.component_factory import ComponentFactory
from modalities.config.config import ProcessGroupBackendType, load_app_config_dict
from modalities.config.instantiation_models import TrainingComponentsInstantiationModel, TrainingReportGenerator
from modalities.dataloader.shuffle_tokenized_data import shuffle_tokenized_data
from modalities.evaluator import Evaluator
from modalities.gym import Gym
from modalities.logging_broker.message_broker import MessageBroker
Expand Down Expand Up @@ -133,7 +134,13 @@ def data():
default=None,
help="output path for index. will use parent directory of src_path if none.",
)
def CMD_entry_point_data_create_raw_index(src_path: Path, index_path: Path):
@click.option(
"--file_existence_policy",
type=click.Choice([policy.value for policy in FileExistencePolicy]),
default=FileExistencePolicy.ERROR.value,
help="Policy for handling existing files.",
)
def CMD_entry_point_data_create_raw_index(src_path: Path, index_path: Path, file_existence_policy: FileExistencePolicy):
"""Utility CMD for indexing the content of a large jsonl-file.
Background is the ability to further process the respective file without loading it,
while splitting its content line-based. This step is necessary in advance of further processing like tokenization.
Expand All @@ -142,27 +149,37 @@ def CMD_entry_point_data_create_raw_index(src_path: Path, index_path: Path):
Args:
src_path (Path): The path to the jsonl-file.
index_path (Path): The path to the index file, that will be created.
file_existence_policy (FileExistencePolicy): Policy for handling existing files.

Raises:
ValueError: If the index file already exists.
"""
create_raw_data_index(src_path=src_path, index_path=index_path)
file_existence_policy = FileExistencePolicy(file_existence_policy)
create_raw_data_index(src_path=src_path, index_path=index_path, file_existence_policy=file_existence_policy)


@data.command(name="pack_encoded_data")
@click.argument("config_path", type=FilePath)
def CMD_entry_point_pack_encoded_data(config_path: FilePath):
@click.option(
"--file_existence_policy",
type=click.Choice([policy.value for policy in FileExistencePolicy]),
default=FileExistencePolicy.ERROR.value,
help="Policy for handling existing files.",
)
def CMD_entry_point_pack_encoded_data(config_path: FilePath, file_existence_policy: FileExistencePolicy):
"""Utility to encode an indexed, large jsonl-file.
(see also `create_index` for more information)
Returns .pbin-file, which can be inserted into a training process directly
and does not require its original jsonl-file or the respective index file anymore.

Args:
config_path (FilePath): Path to the config file describing the tokenization setup.
file_existence_policy (FileExistencePolicy): Policy for handling existing files.
"""
file_existence_policy = FileExistencePolicy(file_existence_policy)
config_dict = load_app_config_dict(config_path)

pack_encoded_data(config_dict=config_dict)
pack_encoded_data(config_dict=config_dict, file_existence_policy=file_existence_policy)


@data.command(name="create_shuffled_dataset_chunk")
Expand All @@ -172,6 +189,12 @@ def CMD_entry_point_pack_encoded_data(config_path: FilePath):
required=True,
help="Path to the file containing the list of files to be chunked.",
)
@click.option(
"--input_data_root_path",
type=Path,
required=True,
help="Directory path to the root of the input data.",
)
@click.option(
"--output_chunk_file_path",
type=Path,
Expand All @@ -191,24 +214,53 @@ def CMD_entry_point_pack_encoded_data(config_path: FilePath):
help="The number of chunks to create.",
)
@click.option(
"--vocab_size",
"--file_existence_policy",
type=click.Choice([policy.value for policy in FileExistencePolicy]),
default=FileExistencePolicy.ERROR.value,
help="Policy for handling existing files.",
)
@click.option(
"--global_seed",
type=int,
required=True,
help="The size of the vocabulary.",
default=None,
help="The global seed to use for shuffling.",
)
def CMD_create_shuffled_dataset_chunk(
input_file_list_path: Path, output_chunk_file_path: Path, chunk_id: int, num_chunks: int, vocab_size: int
input_file_list_path: Path,
input_data_root_path: Path,
output_chunk_file_path: Path,
chunk_id: int,
num_chunks: int,
file_existence_policy: FileExistencePolicy,
global_seed: Optional[int],
):
"""Utility to create a dataset chunk from a list of shuffled and tokenized pbin files.

Args:
input_file_list_path (Path): Path to file that contains relative paths of
pbin files to be chunked (one per line).
input_data_root_path (Path): Path to the root directory that contains the files to be chunked.
output_chunk_file_path (Path): File path to the chunked dataset.
chunk_id (int): The id of the chunk to be created.
num_chunks (int): Number of chunks in total.
file_existence_policy (FileExistencePolicy): Policy for handling existing files.
global_seed (Optional[int]): The global seed to use for shuffling.
"""
file_existence_policy = FileExistencePolicy(file_existence_policy)

with open(input_file_list_path, "r", encoding="utf-8") as f:
file_path_list = f.readlines()
file_path_list = [Path(file_path.strip()) for file_path in file_path_list]
file_path_list = [
input_data_root_path / Path(file_path.strip()).with_suffix(".pbin") for file_path in file_path_list
]

create_shuffled_dataset_chunk(
file_path_list=file_path_list,
output_chunk_file_path=output_chunk_file_path,
chunk_id=chunk_id,
num_chunks=num_chunks,
vocab_size=vocab_size,
file_existence_policy=file_existence_policy,
global_seed=global_seed,
)


Expand Down Expand Up @@ -244,20 +296,43 @@ def CMD_entry_point_merge_packed_data(src_paths: list[Path], target_path: Path):
help="Path to write the shuffled tokenized data (.pbin).",
)
@click.option(
"--batch-size", type=int, default=100, show_default=True, help="Number of documents to process per batch."
"--batch_size", type=int, default=100, show_default=True, help="Number of documents to process per batch."
)
@click.option(
"--file_existence_policy",
type=click.Choice([policy.value for policy in FileExistencePolicy]),
default=FileExistencePolicy.ERROR.value,
help="Policy for handling existing files.",
)
def CMD_shuffle_tokenized_data(input_data_path: Path, output_data_path: Path, batch_size: int) -> None:
@click.option(
"--seed",
type=int,
default=None,
help="The seed for shuffling the data.",
)
def CMD_shuffle_tokenized_data(
input_data_path: Path, output_data_path: Path, batch_size: int, file_existence_policy, seed: int
) -> None:
"""Entrypoint for shuffling tokenized data.

Args:
input_data_path (Path): The path to the input tokenized data (.pbin).
output_data_path (Path): Path to write the shuffled tokenized data (.pbin).
output_data_path (Path): File path to write the shuffled tokenized data (.pbin).
batch_size (int): The size of the batches to shuffle.

file_existence_policy (FileExistencePolicy): Policy for handling existing files.
seed (int): The seed for shuffling the data.
Returns:
None
"""
shuffle_tokenized_data(input_data_path=input_data_path, output_data_path=output_data_path, batch_size=batch_size)
file_existence_policy = FileExistencePolicy(file_existence_policy)

shuffle_tokenized_data(
input_data_path=input_data_path,
output_data_path=output_data_path,
batch_size=batch_size,
file_existence_policy=file_existence_policy,
seed=seed,
)


class Main:
Expand Down
Loading
0