Skip to content

Commit

Permalink
generalize the astype operation to be version agnostic
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Jul 19, 2024
1 parent 9c8f216 commit e94f719
Showing 1 changed file with 4 additions and 3 deletions.
7 changes: 4 additions & 3 deletions tests/unit/io/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def test_io_partitions_push(tmpdir):
# Generate random csv files
files = [os.path.join(tmpdir, f"csv/day_{i}") for i in range(23)]
for file in files:
with open(file, "w") as f:
with open(file, "w", encoding="utf-8") as f:
f.write("0,1,2,3,a,b,c\n" * 1000)

# Load csv files
Expand Down Expand Up @@ -672,6 +672,7 @@ def test_hive_partitioned_data(tmpdir, cpu):
# Make sure the directory structure is hive-like
df_expect = ddf.compute()
df_expect = df_expect.sort_values(["id", "x", "y"]).reset_index(drop=True)
ts_dtype = df_expect["timestamp"].dtype
timestamp_check = df_expect["timestamp"].iloc[0]
name_check = df_expect["name"].iloc[0]
result_paths = glob.glob(
Expand All @@ -689,7 +690,7 @@ def test_hive_partitioned_data(tmpdir, cpu):
# Read back with dask.dataframe and check the data
df_check = dd.read_parquet(path, engine="pyarrow").compute()
df_check["name"] = df_check["name"].astype("object")
df_check["timestamp"] = df_check["timestamp"].astype("int64")
df_check["timestamp"] = df_check["timestamp"].astype(ts_dtype)
df_check = df_check.sort_values(["id", "x", "y"]).reset_index(drop=True)
for col in df_expect:
# Order of columns can change after round-trip partitioning
Expand All @@ -698,7 +699,7 @@ def test_hive_partitioned_data(tmpdir, cpu):
# Read back with NVT and check the data
df_check = merlin.io.Dataset(path, engine="parquet").to_ddf().compute()
df_check["name"] = df_check["name"].astype("object")
df_check["timestamp"] = df_check["timestamp"].astype("int64")
df_check["timestamp"] = df_check["timestamp"].astype(ts_dtype)
df_check = df_check.sort_values(["id", "x", "y"]).reset_index(drop=True)
for col in df_expect:
# Order of columns can change after round-trip partitioning
Expand Down

0 comments on commit e94f719

Please sign in to comment.