Blog.

Part 2. Continue on Data Validation Pipeline with YAML and Polars


Context

In addition to the data validation check, sometimes we want to do data conversions so that the downstream process can actuall work.

The idea is to add an additional step, and check for alias of a variable if it’s specified in the .yaml schema file.

Updated YAML schema.yaml

schema:
u_id:
type: integer
# aliases: ["id", "user_id"]
range: [1, 200]
mssubclass_v1:
type: integer
aliases: ["mssubclass"]
allowed_value: [60, 20, 70, 80]

A few things:

  • By-default, if there’s an alias, the data type conversion will be performed.
    • Simply comment it off, then the conversion won’t happen.
  • Obviously, there can be another step here to validate the .yaml schema file itself.

Detailed yaml file with a few other validation scenarios can be found: here

Updated validate.py

In the validation script, we will add a class to perform the data renaming and type conversion.

class DataHarmonizer:
def __init__(self, schema_path: str):
self.schema = self._load_schema(schema_path).get("schema", {})
# when the schema says it's a `string`
# then it will check if it's `pl.String`
self.type_map = {
"string": pl.String,
"float": pl.Float64,
"integer": pl.Int64,
"date": pl.Date,
"boolean": pl.Boolean,
}
...
def _load_schema(self, schema_path):
...
return yaml.safe_load(f)
def harmonize(self, pl.DataFrame):
...
rename_map = {}
df_cols = set(df.columns)
if col, props in self.schema.items():
# this only happens when there's column not found
if col not in df_cols:
aliases = props.get("aliases", [])
for alias in aliases:
# if any of the alias found in the df columns, then the rename will happen
if alias in df_cols:
rename_map[alias] = col
break # stops after finding the first match
if rename_map:
df = df.rename(rename_map)
# here the type conversion happens
for col, props in self.schema.items():
if col in df.columns:
# find the pl type
target_type_str = props.get("type")
target_pl_type = self.type_map(target_type_str)
# compare if current type and target pl type are the same
if target_pl_type:
current_type = df[col].dtype
if current_type != target_pl_type:
# before conversion happens, we will track the null count and our failure criterial
nulls_before = df[col].null_count()
# strick = False will make sure it runs without crashing
df = df.with_columns(pl.col(col).cast(target_pl_type, strict=False))
nulls_after = df[col].null_count()
# check failure count
failed_rows = nulls_after - nulls_before
if failed_rows > 0:
... log here...

The new class will look for columns that need to be converted, based on the aliases attributes in the schema file.

Then after this, the validator class would much stay the same. The DataHarmonizer class actually makes it easier.

Full Python file can be found: here

Next Steps

  • Use a utility to check the validity of the schema.yaml file
  • Bring in argparser if i need to