Update data_utils.py

This commit is contained in:
hoshi-hiyouga 2024-07-15 00:54:34 +08:00 committed by GitHub
parent a5b809516e
commit 97a0e291c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 25 additions and 26 deletions

View File

@ -13,16 +13,15 @@
# limitations under the License. # limitations under the License.
from enum import Enum, unique from enum import Enum, unique
from typing import TYPE_CHECKING, Dict, List, Sequence, Set, Union from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Set, TypedDict, Union
from datasets import concatenate_datasets, interleave_datasets from datasets import DatasetDict, concatenate_datasets, interleave_datasets
from ..extras.logging import get_logger from ..extras.logging import get_logger
if TYPE_CHECKING: if TYPE_CHECKING:
from datasets import Dataset, IterableDataset from datasets import Dataset, IterableDataset
from transformers import Seq2SeqTrainingArguments
from ..hparams import DataArguments from ..hparams import DataArguments
@ -42,24 +41,29 @@ class Role(str, Enum):
OBSERVATION = "observation" OBSERVATION = "observation"
class DatasetModule(TypedDict):
train_dataset: Optional[Union["Dataset", "IterableDataset"]]
eval_dataset: Optional[Union["Dataset", "IterableDataset"]]
def merge_dataset( def merge_dataset(
all_datasets: List[Union["Dataset", "IterableDataset"]], all_datasets: List[Union["Dataset", "IterableDataset"]], data_args: "DataArguments", seed: int
data_args: "DataArguments",
training_args: "Seq2SeqTrainingArguments",
) -> Union["Dataset", "IterableDataset"]: ) -> Union["Dataset", "IterableDataset"]:
if len(all_datasets) == 1: if len(all_datasets) == 1:
return all_datasets[0] return all_datasets[0]
elif data_args.mix_strategy == "concat": elif data_args.mix_strategy == "concat":
if data_args.streaming: if data_args.streaming:
logger.warning("The samples between different datasets will not be mixed in streaming mode.") logger.warning("The samples between different datasets will not be mixed in streaming mode.")
return concatenate_datasets(all_datasets) return concatenate_datasets(all_datasets)
elif data_args.mix_strategy.startswith("interleave"): elif data_args.mix_strategy.startswith("interleave"):
if not data_args.streaming: if not data_args.streaming:
logger.warning("We recommend using `mix_strategy=concat` in non-streaming mode.") logger.warning("We recommend using `mix_strategy=concat` in non-streaming mode.")
return interleave_datasets( return interleave_datasets(
datasets=all_datasets, datasets=all_datasets,
probabilities=data_args.interleave_probs, probabilities=data_args.interleave_probs,
seed=training_args.seed, seed=seed,
stopping_strategy="first_exhausted" if data_args.mix_strategy.endswith("under") else "all_exhausted", stopping_strategy="first_exhausted" if data_args.mix_strategy.endswith("under") else "all_exhausted",
) )
else: else:
@ -67,22 +71,17 @@ def merge_dataset(
def split_dataset( def split_dataset(
dataset: Union["Dataset", "IterableDataset"], data_args: "DataArguments", training_args: "Seq2SeqTrainingArguments" dataset: Union["Dataset", "IterableDataset"], data_args: "DataArguments", seed: int
) -> Dict[str, "Dataset"]: ) -> "DatasetDict":
if training_args.do_train: r"""
if data_args.val_size > 1e-6: # Split the dataset Splits the dataset and returns a dataset dict containing train set (required) and validation set (optional).
"""
if data_args.streaming: if data_args.streaming:
dataset = dataset.shuffle(buffer_size=data_args.buffer_size, seed=training_args.seed) dataset = dataset.shuffle(buffer_size=data_args.buffer_size, seed=seed)
val_set = dataset.take(int(data_args.val_size)) val_set = dataset.take(int(data_args.val_size))
train_set = dataset.skip(int(data_args.val_size)) train_set = dataset.skip(int(data_args.val_size))
return {"train_dataset": train_set, "eval_dataset": val_set} return DatasetDict({"train": train_set, "validation": val_set})
else: else:
val_size = int(data_args.val_size) if data_args.val_size > 1 else data_args.val_size val_size = int(data_args.val_size) if data_args.val_size > 1 else data_args.val_size
dataset = dataset.train_test_split(test_size=val_size, seed=training_args.seed) dataset = dataset.train_test_split(test_size=val_size, seed=seed)
return {"train_dataset": dataset["train"], "eval_dataset": dataset["test"]} return DatasetDict({"train": dataset["train"], "validation": dataset["test"]})
else:
if data_args.streaming:
dataset = dataset.shuffle(buffer_size=data_args.buffer_size, seed=training_args.seed)
return {"train_dataset": dataset}
else: # do_eval or do_predict
return {"eval_dataset": dataset}