Part 2. Continue on Data Validation Pipeline with YAML and Polars
Context
In addition to the data validation check, sometimes we want to do data conversions so that the downstream process can actuall work.
The idea is to add an additional step, and check for alias of a variable if it’s specified in the .yaml schema file.
Updated YAML schema.yaml
schema: u_id: type: integer # aliases: ["id", "user_id"] range: [1, 200]
mssubclass_v1: type: integer aliases: ["mssubclass"] allowed_value: [60, 20, 70, 80]A few things:
- By-default, if there’s an alias, the data type conversion will be performed.
- Simply comment it off, then the conversion won’t happen.
- Obviously, there can be another step here to validate the
.yamlschema file itself.
Detailed yaml file with a few other validation scenarios can be found: here
Updated validate.py
In the validation script, we will add a class to perform the data renaming and type conversion.
class DataHarmonizer: def __init__(self, schema_path: str): self.schema = self._load_schema(schema_path).get("schema", {})
# when the schema says it's a `string` # then it will check if it's `pl.String`
self.type_map = { "string": pl.String, "float": pl.Float64, "integer": pl.Int64, "date": pl.Date, "boolean": pl.Boolean, }... def _load_schema(self, schema_path): ... return yaml.safe_load(f)
def harmonize(self, pl.DataFrame): ... rename_map = {}
df_cols = set(df.columns)
if col, props in self.schema.items(): # this only happens when there's column not found if col not in df_cols: aliases = props.get("aliases", [])
for alias in aliases: # if any of the alias found in the df columns, then the rename will happen if alias in df_cols: rename_map[alias] = col break # stops after finding the first match
if rename_map: df = df.rename(rename_map)
# here the type conversion happens for col, props in self.schema.items(): if col in df.columns: # find the pl type target_type_str = props.get("type") target_pl_type = self.type_map(target_type_str)
# compare if current type and target pl type are the same if target_pl_type: current_type = df[col].dtype
if current_type != target_pl_type:
# before conversion happens, we will track the null count and our failure criterial nulls_before = df[col].null_count()
# strick = False will make sure it runs without crashing df = df.with_columns(pl.col(col).cast(target_pl_type, strict=False))
nulls_after = df[col].null_count()
# check failure count failed_rows = nulls_after - nulls_before
if failed_rows > 0: ... log here...The new class will look for columns that need to be converted, based on the aliases attributes in the schema file.
Then after this, the validator class would much stay the same. The DataHarmonizer class actually makes it easier.
Full Python file can be found: here
Next Steps
- Use a utility to check the validity of the
schema.yamlfile - Bring in
argparserif i need to