← Back to Blog
Data Engineering10 min read

Building Data Pipelines to Import and Normalize Client Data at Scale

Genius Cloud Solutions Team

March 22, 2026 · 10 min read

Every new client brings their own data mess. CSV exports with inconsistent headers. JSON from a legacy API that changes shape without warning. Postgres dumps in an unexpected encoding. Salesforce exports that denormalize everything. Flat files sitting on an SFTP server that someone updates manually every Monday. Your pipeline needs to handle all of it without crumbling — and ideally without requiring an engineer to babysit each new source.

After building these systems for multiple clients, we've converged on a set of design principles and a four-layer architecture that scales from a single data source to dozens.

Design Principle: Validate at the Boundary

The most important rule in client data ingestion: never let raw client data into your warehouse unvalidated. The boundary — the ingestion layer — is where you enforce your schema contracts. Everything that enters your system should be explicitly validated, typed, and versioned before it touches your gold layer.

Pydantic is our go-to tool for Python-based validation. Define your expected schema as a model, parse the incoming data, and let validation errors surface immediately with clear messages:

from pydantic import BaseModel, Field, field_validator
from datetime import date
from typing import Optional
from decimal import Decimal

class InvoiceRecord(BaseModel):
    invoice_id: str = Field(..., min_length=1)
    vendor_name: str
    invoice_date: date
    total_amount: Decimal = Field(..., gt=0)
    currency: str = Field(default="CAD", pattern=r"^[A-Z]{3}$")
    po_number: Optional[str] = None

    @field_validator("invoice_id")
    @classmethod
    def normalize_invoice_id(cls, v: str) -> str:
        return v.strip().upper()

def validate_batch(raw_records: list[dict]) -> tuple[list[InvoiceRecord], list[dict]]:
    valid, invalid = [], []
    for record in raw_records:
        try:
            valid.append(InvoiceRecord(**record))
        except Exception as e:
            invalid.append({"record": record, "error": str(e)})
    return valid, invalid

Invalid records should never silently drop or corrupt your warehouse. Log them, alert on them, and quarantine them for manual review. This principle alone prevents the majority of data quality incidents we see in client systems.

The Four-Layer Pipeline Architecture

1. Ingestion Layer

The ingestion layer is responsible for connecting to each data source and landing raw data in a staging area — untouched and unmodified. The key word is untouched. Raw data should land in your staging zone (typically an S3 or GCS landing bucket) exactly as it was received, preserving the original format, encoding, and structure. This gives you a full audit trail and makes it possible to re-process historical data if your validation rules change later.

For standard sources (databases, REST APIs, common SaaS platforms), Airbyte covers most of the heavy lifting with a library of pre-built connectors. For unusual sources — legacy SFTP feeds, proprietary file formats, one-off exports — custom connectors are worth the investment if the source is long-lived. Write them with retries, backpressure, and checkpointing from day one.

2. Validation Layer

Once raw data lands in staging, the validation layer processes it before anything reaches the warehouse. This is where schema validation (Pydantic, Great Expectations, or dbt expectations) runs, null checks execute, type coercion happens, and duplicates are identified. You need a clear strategy for what happens to invalid records: reject-and-alert (fail the batch, page someone) works for low-latency critical pipelines; quarantine-and-continue (isolate bad records, keep processing the rest) works better for bulk imports where partial results are better than none.

3. Transformation Layer

dbt is the standard here, and for good reason. SQL transformations are accessible to data analysts, version-controlled alongside your warehouse schema, and testable at every step. We use the medallion architecture: bronze (raw validated records, one-to-one with staging), silver (cleaned, normalized, joined), and gold (business-ready aggregates and dimensional models). Each layer has explicit contracts — dbt tests for primary key uniqueness, non-null constraints, accepted values, and referential integrity — so regressions surface before they reach dashboards.

4. Load Layer

The final layer writes to your warehouse — Snowflake, Redshift, or BigQuery — with an explicit loading strategy for each table. Incremental loads (append or merge on a surrogate key) are preferred for large tables; full refresh is acceptable for small reference tables or when the source system doesn't expose change data. Write your dbt models incrementally from the start, even if your current data volume doesn't require it. Retrofitting incremental logic onto a full-refresh model at scale is painful.

Handling Common Client Data Problems

Encoding Issues

UTF-8 vs. Latin-1, BOM characters, Windows line endings — encoding bugs are invisible until they break a downstream system. Always specify encoding explicitly when reading files, and detect it if the source is unknown:

import chardet

def read_csv_safely(file_path: str) -> str:
    with open(file_path, "rb") as f:
        raw = f.read()
    detected = chardet.detect(raw)
    encoding = detected.get("encoding") or "utf-8"
    # Strip BOM if present
    return raw.decode(encoding).lstrip("\ufeff")

Date Format Chaos

Clients send dates as 2026-05-25, 25/05/2026, May 25, 2026, 20260525, and worse. Never try to enumerate every format with a format string. Use dateutil.parser to handle ambiguous formats, but set dayfirst=True or dayfirst=False explicitly based on the client's locale to avoid 05/06 ambiguities silently producing wrong dates.

Duplicate Records

Client systems routinely send the same record twice — full resyncs, retry logic gone wrong, or manual re-exports. Never assume a source is deduplicated. Design every target table with a surrogate key (a hash of the natural key fields) and use upsert semantics at load time. In Snowflake this is a MERGE statement; in dbt it's a unique_key on an incremental model.

Schema Drift

Clients add columns, rename columns, or drop columns without telling anyone. Your pipeline should detect this proactively rather than failing mid-run. A simple check againstINFORMATION_SCHEMA before each run can catch drift early:

-- Detect new columns in source that don't exist in warehouse
SELECT src.column_name
FROM source_schema_snapshot AS src
LEFT JOIN information_schema.columns AS wh
  ON wh.table_schema = 'bronze'
  AND wh.table_name  = 'client_invoices'
  AND wh.column_name = src.column_name
WHERE wh.column_name IS NULL;

Orchestration with Airflow

Apache Airflow (or Astronomer, or MWAA on AWS) ties the layers together. Each pipeline is a DAG — a directed acyclic graph of tasks. Here's a simplified structure for the four-layer pipeline:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "data-engineering",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
}

with DAG(
    dag_id="client_data_import",
    start_date=datetime(2026, 1, 1),
    schedule_interval="0 6 * * *",   # 6am daily
    default_args=default_args,
    catchup=False,
    tags=["client-data", "ingestion"],
) as dag:

    ingest = PythonOperator(
        task_id="ingest_to_staging",
        python_callable=run_airbyte_sync,
        op_kwargs={"connection_id": "{{ var.value.client_connection_id }}"},
    )

    validate = PythonOperator(
        task_id="validate_and_quarantine",
        python_callable=validate_staging_data,
    )

    transform = BashOperator(
        task_id="dbt_run_bronze_silver",
        bash_command="dbt run --select bronze.+ silver.+ --target prod",
    )

    test = BashOperator(
        task_id="dbt_test",
        bash_command="dbt test --select bronze.+ silver.+ --target prod",
    )

    load_gold = BashOperator(
        task_id="dbt_run_gold",
        bash_command="dbt run --select gold.+ --target prod",
    )

    ingest >> validate >> transform >> test >> load_gold

Making It Reliable

A pipeline that works once isn't a pipeline — it's a script. Production reliability comes from a few non-negotiable properties:

  • Idempotency — Running the pipeline twice on the same data should produce the same result. No duplicate rows, no partial updates. Upsert semantics and truncate-and-reload patterns both achieve this.
  • Retry logic — Network failures, API rate limits, and temporary warehouse unavailability are normal. Every task should have exponential backoff retries with a clear maximum.
  • Alerting on failure — PagerDuty for critical pipelines, Slack notifications for non-critical. SLA breaches (a pipeline that hasn't completed by its expected time) should alert just like failures.
  • Data quality checks — Great Expectations or dbt tests on key business metrics after each run. If yesterday's order count drops by 40% overnight, something is wrong — you want to know before your client does.

Client data ingestion is never a one-time project. Sources change, clients add new data, and business requirements evolve. Build the foundation right — validate at the boundary, separate your layers, automate your quality checks — and you'll spend your time adding value instead of debugging encoding errors at midnight.

Need a production-ready data pipeline?

We build client data ingestion and normalization pipelines that handle the mess of real-world data — reliably, at scale, with full observability.

Get in touch → info@geniuscloudsolution.ca