Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Remove validate_dataset and regenerate_dataset #373

Merged
merged 4 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 3 additions & 96 deletions merlin/io/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1130,103 +1130,10 @@ def npartitions(self):
return self.to_ddf().npartitions

def validate_dataset(self, **kwargs):
"""Validate for efficient processing.
raise NotImplementedError(""" validate_dataset is not supported for merlin >23.08 """)

The purpose of this method is to validate that the Dataset object
meets the minimal requirements for efficient NVTabular processing.
For now, this criteria requires the data to be in parquet format.

Example Usage::

dataset = Dataset("/path/to/data_pq", engine="parquet")
assert validate_dataset(dataset)

Parameters
-----------
**kwargs :
Key-word arguments to pass down to the engine's validate_dataset
method. For the recommended parquet format, these arguments
include `add_metadata_file`, `row_group_max_size`, `file_min_size`,
and `require_metadata_file`. For more information, see
`ParquetDatasetEngine.validate_dataset`.

Returns
-------
valid : bool
`True` if the input dataset is valid for efficient NVTabular
processing.
"""

# Check that the dataset format is Parquet
if not isinstance(self.engine, ParquetDatasetEngine):
msg = (
"NVTabular is optimized for the parquet format. Please use "
"the to_parquet method to convert your dataset."
)
warnings.warn(msg)
return False # Early return

return self.engine.validate_dataset(**kwargs)

def regenerate_dataset(
self,
output_path,
columns=None,
output_format="parquet",
compute=True,
**kwargs,
):
"""EXPERIMENTAL:
Regenerate an NVTabular Dataset for efficient processing by writing
out new Parquet files. In contrast to default ``to_parquet`` behavior,
this method preserves the original ordering.

Example Usage::

dataset = Dataset("/path/to/data_pq", engine="parquet")
dataset.regenerate_dataset(
out_path, part_size="1MiB", file_size="10MiB"
)

Parameters
-----------
output_path : string
Root directory path to use for the new (regenerated) dataset.
columns : list(string), optional
Subset of columns to include in the regenerated dataset.
output_format : string, optional
Format to use for regenerated dataset. Only "parquet" (default)
is currently supported.
compute : bool, optional
Whether to compute the task graph or to return a Delayed object.
By default, the graph will be executed.
**kwargs :
Key-word arguments to pass down to the engine's regenerate_dataset
method. See `ParquetDatasetEngine.regenerate_dataset` for more
information.

Returns
-------
result : int or Delayed
If `compute=True` (default), the return value will be an integer
corresponding to the number of generated data files. If `False`,
the returned value will be a `Delayed` object.
"""

# Check that the desired output format is Parquet
if output_format not in ["parquet"]:
msg = (
f"NVTabular is optimized for the parquet format. "
f"{output_format} is not yet a supported output format for "
f"regenerate_dataset."
)
raise ValueError(msg)

result = ParquetDatasetEngine.regenerate_dataset(self, output_path, columns=None, **kwargs)
if compute:
return result.compute()
else:
return result
def regenerate_dataset(self, *args, **kwargs):
raise NotImplementedError(""" regenerate_dataset is not supported for merlin >23.08 """)

def infer_schema(self, n=1):
"""Create a schema containing the column names and inferred dtypes of the Dataset
Expand Down
7 changes: 0 additions & 7 deletions merlin/io/dataset_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,6 @@ def _path_partition_map(self):
def num_rows(self):
raise NotImplementedError(""" Returns the number of rows in the dataset """)

def validate_dataset(self, **kwargs):
raise NotImplementedError(""" Returns True if the raw data is efficient for NVTabular """)

@classmethod
def regenerate_dataset(cls, dataset, output_path, columns=None, **kwargs):
raise NotImplementedError(""" Regenerate a dataset with optimal properties """)

def sample_data(self, n=1):
"""Return a sample of real data from the dataset

Expand Down
Loading
Loading