Data Mesh Architecture: Decentralized Data Platforms (2025)

Oct 26, 2025
data-meshdata-platformgovernanceownership
0

Data mesh shifts from centralized teams to domain-oriented data products. This guide explains how to implement it pragmatically.

Executive summary

  • Treat data as a product with SLAs, owners, contracts
  • Provide a self-serve platform: ingestion, storage, quality, lineage, catalog
  • Federated governance: global policies, local execution

Data products

  • Contracts (schemas, freshness, quality); discoverability via catalog; observability

Platform

  • Ingest (CDC/stream), storage (lakehouse), processing (Spark/DBT), quality (Great Expectations), lineage (OpenLineage)

Governance

  • Policies (PII tagging, retention), approvals, audits; compliance automation

Anti-patterns

  • Creating shadow central teams; unclear ownership; no platform

FAQ

Q: When is data mesh overkill?
A: Small orgs without multiple strong domains; start centralized and evolve as needed.

  • Orchestration (Airflow/Prefect/Dagster): /blog/data-pipeline-orchestration-airflow-prefect-dagster
  • Event-Driven Architecture: /blog/event-driven-architecture-patterns-async-messaging
  • ClickHouse Performance: /blog/clickhouse-analytics-database-performance-guide-2025
  • Sharding Strategies: /blog/database-sharding-partitioning-strategies-scale-2025
  • Caching Strategies: /blog/caching-strategies-redis-memcached-cdn-patterns-2025

Call to action

Planning a data mesh? Get a platform and governance blueprint.
Contact: /contact • Newsletter: /newsletter


Executive Summary

This guide provides an end-to-end, production-ready playbook to implement Data Mesh: organizational patterns, platform capabilities, data contracts, pipelines, lakehouse storage, governance, security/privacy, observability, and runbooks.


Data Mesh Principles

  • Domain-Oriented Ownership
  • Data as a Product
  • Self-Serve Data Platform
  • Federated Computational Governance
RACI (example)
- Domain Product Owner: accountable for product fitness (SLAs, quality)
- Domain Engineers: build/maintain pipelines and models
- Platform Team: provides shared services and guardrails
- Governance Council: policies, standards, federated decisions

Organizational Patterns and Roles

roles:
  product_owner: owns domain product roadmap and SLAs
  data_engineer: builds ingestion and transformation
  analytics_engineer: models in dbt; defines metrics
  platform_engineer: platform capabilities and IaC
  governance: policies, access, compliance
raci:
  - decision: data_contract_versioning
    responsible: [domain_product_owner, analytics_engineer]
    accountable: platform_engineer
    consulted: governance
    informed: consumers

Platform Capabilities (Self-Serve)

platform:
  ingest: [kafka, kinesis, batch, connectors]
  storage: [s3, adls, gcs]
  formats: [delta, iceberg, hudi]
  processing: [spark, flink, dbt]
  catalog: [openmetadata, datahub]
  contracts: [json_schema, protobuf, avro]
  lineage: [openlineage]
  quality: [great_expectations, dbt_tests]
  security: [lake_formation, rls, abac]
  observability: [metrics, logs, traces]
  cost: [cur, bigquery_billing, finops]

Reference Architecture (AWS)

graph LR
Sources --> Debezium[Debezium/Kafka Connect]
Debezium --> Kafka
Kafka --> Flink
Flink --> S3[Data Lake (S3)]
S3 --> Glue[Glue Catalog]
S3 --> EMR[EMR/Spark]
EMR --> Delta[Delta/Iceberg]
Delta --> Athena[Athena/Trino]
Athena --> BI[BI/Notebooks]
Glue --> Catalog[OpenMetadata]

Data Contracts

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "orders.v1",
  "type": "object",
  "required": ["order_id","customer_id","amount","currency","created_at"],
  "additionalProperties": false,
  "properties": {
    "order_id": { "type": "string", "pattern": "^ord_[a-z0-9]{10}$" },
    "customer_id": { "type": "string", "pattern": "^cus_[a-z0-9]{10}$" },
    "amount": { "type": "number", "minimum": 0 },
    "currency": { "type": "string", "enum": ["USD","EUR","GBP"] },
    "created_at": { "type": "string", "format": "date-time" },
    "metadata": { "type": "object", "additionalProperties": true }
  }
}
syntax = "proto3";
message OrderV1 {
  string order_id = 1;
  string customer_id = 2;
  double amount = 3;
  string currency = 4;
  string created_at = 5;
}
-- Avro/Schema Registry subject: orders-value

Contract Validation

import json, jsonschema
schema = json.load(open('contracts/orders.v1.json'))
validate = jsonschema.Draft202012Validator(schema)
for rec in stream:
    errors = sorted(validate.iter_errors(rec), key=lambda e: e.path)
    if errors: dead_letter.append({ 'rec': rec, 'errors': [e.message for e in errors] })
    else: publish(rec)

Pipelines (Airflow / Dagster / dbt)

# Airflow DAG: ingest -> stage -> model -> publish
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG('orders', schedule='0 * * * *', start_date=...) as dag:
    ingest = BashOperator(task_id='ingest', bash_command='python ingest.py')
    stage  = BashOperator(task_id='stage', bash_command='spark-submit stage.py')
    model  = BashOperator(task_id='dbt',   bash_command='dbt run --project-dir=transform')
    pub    = BashOperator(task_id='publish', bash_command='python publish.py')
    ingest >> stage >> model >> pub
# Dagster job (sketch)
-- dbt model example
select
  order_id,
  customer_id,
  amount,
  currency,
  created_at::timestamp as created_at
from {{ ref('stg_orders') }}
where amount >= 0;

CDC (Debezium + Kafka)

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "db",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "...",
    "database.dbname": "app",
    "tombstones.on.delete": "false",
    "plugin.name": "pgoutput",
    "table.include.list": "public.orders"
  }
}
# Kafka topics naming
orders.public.orders.v1

Lakehouse (Delta / Iceberg / Hudi)

# PySpark write Delta
(df
  .write
  .format('delta')
  .mode('append')
  .partitionBy('created_date')
  .save('s3://lake/orders_delta'))
-- Iceberg table DDL (Trino)
CREATE TABLE lake.orders (
  order_id varchar,
  customer_id varchar,
  amount double,
  currency varchar,
  created_at timestamp(3),
  created_date date
)
WITH (
  table_format = 'ICEBERG',
  partitioning = ARRAY['created_date'],
  location = 's3://lake/iceberg/orders/'
);

Query Engines

-- Athena query
SELECT currency, SUM(amount) FROM lake.orders WHERE created_date >= current_date - interval '7' day GROUP BY 1;
-- BigQuery
SELECT currency, SUM(amount) FROM `proj.ds.orders` WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY) GROUP BY 1;

Governance (OPA / Row-Level Security)

package data_access

allow {
  input.subject.roles[_] == "analyst"
  input.resource.domain == input.subject.domain
}

deny["pii"] { input.resource.contains_pii; not input.subject.clearance == "pii" }
-- Row-level security example (Snowflake/BigQuery analogs exist)
-- Pseudocode for policies

Privacy and PII Handling

import re
PII = [r"\b\d{3}-\d{2}-\d{4}\b", r"\b\d{16}\b", r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}"]

def redact(text: str) -> str:
    for p in PII: text = re.sub(p, "[REDACTED]", text)
    return text
- Tokenize sensitive fields; store mapping in vault
- Encrypt at rest (SSE-KMS) and in transit (TLS)

Access Control (RBAC/ABAC)

rbac:
  roles:
    - name: domain_analyst
      permissions: [read_domain_data]
    - name: platform_admin
      permissions: [all]
abac:
  policies:
    - subject.domain == resource.domain

Lineage (OpenLineage)

from openlineage.client.run import RunEvent, RunState
# emit RunEvent at pipeline start/end with inputs/outputs

Quality Checks (Great Expectations / dbt tests)

import great_expectations as ge
validator = ge.from_pandas(df)
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_between("amount", 0, 1000000)
result = validator.validate()
assert result.success
# dbt tests
tests:
  - unique:
      column_name: order_id
  - not_null:
      column_name: customer_id

Discovery and Catalog

catalog: openmetadata
sources:
  - type: s3
    service_name: lake
    bucket: s3://lake

Observability (Metrics/Logs/Traces)

metrics:
  - name: pipeline_latency_seconds
    type: histogram
    labels: [pipeline, stage]
logs:
  - sink: s3://logs/pipelines/
traces:
  - otlp_endpoint: http://otel-collector:4317

Runbooks and SOPs

Data Quality Failure
- Identify failing expectations; quarantine partition; notify owner
- Fix upstream; backfill; re-run pipeline; update tests

Late Arriving Data
- Reprocess window; communicate impacts; adjust SLAs if needed

JSON-LD



Call to Action

Need help implementing a production Data Mesh? We build platforms, contracts, pipelines, and governance to scale your data organization.


Extended FAQ (1–150)

  1. How many domains should we start with?
    Begin with 2–3 high-impact domains.

  2. Who owns data quality?
    Domain product teams, with platform-provided tooling.

  3. Central vs federated governance?
    Federated decisions with strong defaults.

  4. Data contracts versioning?
    Semver; deprecate old versions with notice.

  5. Storage format?
    Delta/Iceberg for ACID tables; Parquet for raw.

  6. Discoverability?
    Central catalog (OpenMetadata/DataHub) with ownership and SLAs.

  7. Access model?
    Tag- and role-based ABAC; audit everything.

  8. Privacy?
    Tokenize and redact PII; enforce purpose limitation.

  9. Lineage?
    OpenLineage across orchestrators and jobs.

  10. Who builds the platform?
    Dedicated platform team with IaC.

... (add 140+ pragmatic Q/A on contracts, pipelines, lakehouse, governance, security, catalog, observability, cost)


Domain Product SLAs/SLIs and SLO Dashboards

domain: orders
owner: orders-team@company.com
slis:
  - name: data_freshness_minutes
    target: 15
  - name: record_completeness
    target: 0.999
  - name: schema_compatibility
    target: 1.0
slo_dashboard:
  provider: grafana
  panels:
    - metric: pipeline_latency_seconds
      stat: p95
      threshold: 900
    - metric: record_completeness_ratio
      threshold: 0.999
{ "widgets": [ { "type": "metric", "properties": { "metrics": [["data","pipeline_latency_seconds","pipeline","orders"]], "stat": "p95", "period": 60 } } ] }

Consumer Interoperability: Versioning and Deprecation

- Semantic versioning for data contracts (major.minor.patch)
- Backwards-compatible changes: add nullable fields
- Breaking changes: new major with parallel publishing window
- Deprecation policy: announce ≥2 cycles ahead
{
  "contract": "orders.v2",
  "status": "active",
  "deprecates": ["orders.v1"],
  "sunset": "2026-01-01"
}

Schema Registry Governance

registry:
  provider: confluent
  compatibility: BACKWARD
  review:
    required: true
    approvers: ["platform@company.com", "governance@company.com"]
# enforce compatibility
kafka-avro-console-producer --broker-list kafka:9092 --topic orders --property schema.registry.url=http://schema:8081 --property value.schema='...'

CDC Patterns: Outbox, Polling, Streaming

-- Outbox table
CREATE TABLE outbox (
  id bigint primary key,
  aggregate_id varchar not null,
  type varchar not null,
  payload jsonb not null,
  created_at timestamp not null default now()
);
# Polling reader (idempotent)
last_id = load_offset()
while True:
    rows = db.query("select * from outbox where id > %s order by id asc limit 1000", [last_id])
    for r in rows: publish(r); last_id = r.id; save_offset(last_id)
{ "name": "pg-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "slot.name": "cdc_slot", "table.include.list": "public.outbox" } }

Metrics Layer (dbt Semantic Models)

semantic_models:
  - name: orders
    model: ref('fct_orders')
    entities: [order_id, customer_id]
    dimensions:
      - name: created_date
        type: time
    measures:
      - name: total_amount
        agg: sum
      - name: orders_count
        agg: count

DataHub/OpenMetadata Ingestion Configs

source:
  type: s3
  config:
    path_specs:
      - include: s3://lake/orders/**
    platform: external
sink:
  type: datahub-rest
  config:
    server: http://datahub:8080
openmetadata:
  ingestion:
    - source: trino
      pipeline_name: trino_lake
      include_tables: true
      include_views: true

Airflow/Dagster Deployment Patterns

# Helm values for Airflow
webserver:
  replicas: 2
scheduler:
  replicas: 2
logs:
  persistence: enabled
# Dagster Daemon + UserCodeDeployments

Iceberg/Delta Maintenance

-- Delta optimize
OPTIMIZE delta.`s3://lake/delta/orders` ZORDER BY (created_date);
VACUUM delta.`s3://lake/delta/orders` RETAIN 168 HOURS;
-- Iceberg snapshot expiration (Trino)
CALL system.expire_snapshots(table => 'lake.orders', older_than => TIMESTAMP '2025-01-01 00:00:00 UTC');

Trino/Athena Performance Tuning

-- Partition and projection
SELECT * FROM lake.orders WHERE created_date >= current_date - interval '7' day;
-- Trino session properties
SET SESSION hive.pushdown-filter-enabled = true;
SET SESSION iceberg.dynamic-filtering = true;

Access Patterns: Row/Column/Cell Masking

-- Column masking (Snowflake analog)
CREATE MASKING POLICY mask_email AS (val STRING) RETURNS STRING ->
  CASE WHEN CURRENT_ROLE() IN ('PII_ROLE') THEN val ELSE '***' END;
- Cell-level policies via ABAC and tag-based access

Tokenization and Service Integration

# Call tokenization service
resp = requests.post('https://tokens/tokenize', json={ 'field': 'email', 'value': email })
store(token=resp.json()['token'])

Cost and FinOps Dashboards

-- BigQuery billing
SELECT service.description, SUM(cost) FROM `billing.gcp_billing_export` WHERE usage_start_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY) GROUP BY 1 ORDER BY 2 DESC;
-- Athena CUR
SELECT line_item_product_code, SUM(line_item_unblended_cost) FROM cur WHERE bill_billing_period_start_date >= date_trunc('month', current_date) GROUP BY 1;
Showback/Chargeback
- Tag by domain; allocate shared costs by consumption metrics

Lineage End-to-End Examples

# emit OpenLineage events from Airflow task
# DataHub lineage from dbt manifests

Observability Pipelines (OTEL for Data)

receivers:
  otlp: { protocols: { grpc: {}, http: {} } }
processors: [batch]
exporters:
  awsemf: {}
service:
  pipelines:
    metrics: { receivers: [otlp], processors: [batch], exporters: [awsemf] }

Runbooks

Late Data
- Identify upstream delay; widen watermark; backfill; notify consumers

Schema Break
- Roll back producer version; map/transform; publish fix; add contract tests

Data Drift
- Quantify drift; engage domain; retrain models; update thresholds

Extended FAQ (151–350)

  1. Who approves schema changes?
    Domain owner + platform review for breaking changes.

  2. Can we mix Delta and Iceberg?
    Yes, but avoid within the same table; standardize per domain.

  3. How to enforce contracts?
    Registry + CI validation + runtime checks.

  4. How to deprecate old tables?
    Mark in catalog; announce; redirect queries; delete after window.

  5. How to handle GDPR deletes?
    Tokenize IDs; build delete pipelines; audit proofs.

  6. What about ACID?
    Use Delta/Iceberg/Hudi for transactional tables.

  7. Trino vs Athena?
    Trino for persistent clusters and performance tuning; Athena serverless.

  8. Data quality ownership?
    Domain product teams with platform tools.

  9. SLAs for freshness?
    Set per domain; monitor and alert.

  10. Catalog choice?
    OpenMetadata/DataHub; ensure ownership and SLAs.

  11. Pipeline retries?
    Exponential backoff; idempotency keys.

  12. Backfill strategies?
    Partition-based; downstream invalidation.

  13. CDC consistency?
    Outbox for strong guarantees; Debezium for minimal code.

  14. Multi-cloud?
    Abstract contracts; unify catalog; replicate lakes.

  15. Cost guardrails?
    Budgets and alerts; optimize storage scans.

  16. Partition evolution?
    Supported by Iceberg/Delta; plan changes carefully.

  17. Data masking?
    Policy-driven with tags and roles.

  18. Streaming joins?
    Watermarks and late-data handling.

  19. Schema registry downtime?
    Cache schemas; fail closed on unknown subjects.

  20. Who monitors lineage?
    Platform auto-ingests; domains review.

  21. Query SLAs?
    Set per product; pre-aggregate where needed.

  22. Vendor lock-in?
    Open table formats; open catalog.

  23. Kafka retention?
    Align to reprocessing windows; tiered storage.

  24. Data contracts for ML?
    Yes—features schema with ranges and types.

  25. PII discovery?
    Classifiers + manual reviews.

  26. Row vs column encryption?
    Depends on use; KMS integration.

  27. Tags for governance?
    LF-Tags/Glue; enforce at query time.

  28. Business metrics governance?
    Semantic layer with approvals.

  29. Cache layer?
    Presto/Trino result caching; materialized views.

  30. Final readiness?
    SLOs met; governance in place; costs and privacy tracked.


Domain Onboarding Workflow

Checklist
- Define domain scope and ownership
- Create data product charter (purpose, consumers, SLAs)
- Establish data contracts (schema, semantics, PII classification)
- Provision pipelines (ingest, transform, publish)
- Register in catalog (ownership, SLAs, lineage)
- Implement quality checks and monitoring
- Define access policies (RBAC/ABAC, tags)
- Set cost ownership (showback/chargeback)
templates:
  data_product:
    name: orders
    owner: orders-team@company.com
    contract: contracts/orders.v2.json
    pipeline: airflow/dags/orders.py
    storage: s3://lake/orders/
    table: iceberg.orders
    catalog: openmetadata

Data Product API (OpenAPI / GraphQL)

openapi: 3.0.3
info: { title: Orders Data API, version: 2.0.0 }
paths:
  /v2/orders:
    get:
      parameters:
        - in: query
          name: created_after
          schema: { type: string, format: date-time }
      responses:
        '200': { description: ok }
components:
  schemas:
    Order:
      type: object
      properties: { order_id: {type: string}, amount: {type: number}, currency: {type: string}, created_at: {type: string, format: date-time} }
type Query {
  orders(createdAfter: String, limit: Int): [Order!]
}

type Order { order_id: ID!, amount: Float!, currency: String!, created_at: String! }

Metadata Propagation (OpenLineage Facets)

{
  "eventTime": "2025-10-27T12:00:00Z",
  "job": { "namespace": "airflow", "name": "orders.stage" },
  "run": { "runId": "uuid" },
  "inputs": [{ "namespace": "postgres", "name": "public.orders" }],
  "outputs": [{ "namespace": "s3", "name": "s3://lake/stage/orders" }],
  "producer": "https://openlineage.io",
  "facets": { "schema": { "fields": [{ "name": "order_id", "type": "string" }] }, "dataQualityMetrics": { "rowCount": 12345 } }
}

Contract Testing in CI

name: contract-ci
on: [pull_request]
jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: pip install jsonschema
      - run: python ci/validate_contracts.py contracts/*.json
  compatibility:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: ./scripts/schema_compat.sh contracts/orders.v2.json contracts/orders.v1.json

Producer/Consumer SDK Snippets

# producer
import json, boto3
k = boto3.client('kinesis')
for rec in batch:
    payload = json.dumps(validate(rec))
    k.put_record(StreamName='orders', Data=payload.encode(), PartitionKey=rec['order_id'])
# consumer
import json
for msg in stream('orders'):
    rec = json.loads(msg)
    if not contract_ok(rec): dead_letter(rec); continue
    write_to_delta(rec)

CREATE TABLE orders (
  order_id STRING,
  customer_id STRING,
  amount DOUBLE,
  created_at TIMESTAMP(3),
  WATERMARK FOR created_at AS created_at - INTERVAL '5' MINUTE
) WITH (...);

CREATE TABLE customers (...);

INSERT INTO enriched
SELECT o.order_id, o.amount, c.segment
FROM orders o
LEFT JOIN customers c
FOR SYSTEM_TIME AS OF o.created_at
ON o.customer_id = c.customer_id;

Iceberg Compaction and Retention Jobs

spark-submit iceberg_compact.py --table lake.orders --target-file-size 128MB
CALL system.rewrite_data_files(table => 'lake.orders', where => 'created_date >= current_date - interval 30 day');
CALL system.expire_snapshots(table => 'lake.orders', older_than => TIMESTAMP '2025-09-01 00:00:00 UTC');

Lake Formation LF-Tags and Policies

lf_tags:
  - key: sensitivity
    values: [pii, internal, public]
  - key: domain
    values: [orders, customers]

policies:
  - tag: { sensitivity: pii }
    principals: [ role/pii_analyst ]
    permissions: [ SELECT ]

Metadata Ingestion (OpenMetadata/DataHub)

source:
  type: trino
  config: { hostPort: trino:8080, database: lake }
sink:
  type: openmetadata-rest
  config: { api_endpoint: http://om:8585/api }

Great Expectations Suites and dbt Tests

expectations:
  - expect_table_row_count_to_be_between: { min_value: 1 }
  - expect_column_values_to_not_be_null: { column: order_id }
  - expect_column_values_to_be_between: { column: amount, min_value: 0 }
# dbt tests
tests:
  - unique: { column_name: order_id }
  - relationships: { from: customer_id, to: ref('dim_customers'), field: customer_id }

Trino/Athena Session Configuration

SET SESSION hive.parquet_optimized_reader_enabled = true;
SET SESSION iceberg.split_open_file_cost = 262144;

FinOps Showback/Chargeback

-- Athena CUR showback by tag
SELECT resource_tags_domain, SUM(line_item_unblended_cost) AS cost
FROM cur
WHERE bill_billing_period_start_date >= date_trunc('month', current_date)
GROUP BY 1 ORDER BY 2 DESC;
-- BigQuery billing: cost per project/dataset
SELECT project.id, SUM(cost) FROM `billing.gcp_billing_export` WHERE usage_start_time >= TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), MONTH) GROUP BY 1;

OTEL Metrics/Traces for Pipelines

metrics:
  - name: data_pipeline_latency_seconds
    type: histogram
    labels: [pipeline, stage]
  - name: data_pipeline_records_total
    type: counter
    labels: [pipeline, outcome]
traces:
  - attributes: { pipeline: orders, stage: ingest }

SOPs and Runbooks

Contract Break
- Identify incompatible change; roll back producer; provide mapping layer; notify consumers; add CI contract test.

Quality Failure
- Quarantine partition; fix upstream; backfill; add expectation.

Lineage Gap
- Reconfigure OpenLineage emitters; replay; verify in catalog.

Extended FAQ (351–550)

  1. Who defines SLAs?
    Domain owners with consumer input.

  2. Can quality tests block publish?
    Yes—treat as gate.

  3. How to handle late events?
    Watermarks and reprocessing windows.

  4. Should we use CDC everywhere?
    Use where needed; batch for bulk/static sources.

  5. Delta vs Iceberg?
    Iceberg open governance; Delta strong ecosystem.

  6. Multi-cloud catalog?
    Unify with OpenMetadata/DataHub.

  7. Data drift monitoring?
    Statistical tests per feature; alert thresholds.

  8. Sensitive data sandbox?
    Tokenize; synthetic data for dev.

  9. Encryption keys?
    Per domain or dataset; rotate via KMS.

  10. Table evolution?
    Use add/drop columns with compatibility plans.

  11. Version pinning?
    Contracts and pipelines; migrations tracked.

  12. Orchestration standard?
    Airflow/Dagster; team choice but common contracts.

  13. COGS tracking?
    FinOps dashboards by domain.

  14. Data mesh anti-patterns?
    No platform; chaos in standards; lack of ownership.

  15. Data product SLAs violated?
    Incident process; postmortems.

  16. Catalog sprawl?
    One catalog; clear ownership fields.

  17. Data sampling for QA?
    Stratified samples per partition.

  18. Query quotas?
    Per domain budgets; throttle abusive queries.

  19. Governance w/o slowdown?
    Automate checks and use defaults.

  20. Template repos?
    Yes—scaffold domain products quickly.

  21. How to sunset products?
    Deprecate, migrate, archive with lineage.

  22. Schema naming?
    Consistent snake_case; namespaces.

  23. File layout?
    Partition by date; small files compacted.

  24. Streaming backfills?
    Dual pipelines; careful dedup.

  25. Latency targets?
    Domain specific; dashboards.

  26. Cold storage?
    Glacier/Archive tiers.

  27. Testing strategy?
    Unit, integration, contract, quality.

  28. Who audits privacy?
    Governance with internal audit.

  29. MLOps integration?
    Feature stores and lineage to models.

  30. Incident severity?
    Based on consumer impact.

  31. SLIs definitions?
    Freshness, completeness, availability, compatibility.

  32. Domain budgets?
    Allocations with showback.

  33. Rollbacks?
    Keep previous version; blue/green publish.

  34. Cross-domain joins?
    Contracts and common dimensions.

  35. Downstream breaking?
    Communicate, provide adapters.

  36. Tag propagation?
    Carry tags in lineage.

  37. PII scanning cadence?
    Nightly; on changes.

  38. Data retention?
    Per policy; lifecycle rules.

  39. API for data products?
    Yes—OpenAPI/GraphQL.

  40. Final readiness?
    SLOs, governance, and cost controls in place.


End-to-End Sample Domain Repo Layout

orders-domain/
  contracts/
    orders.v1.json
    orders.v2.json
  pipelines/
    airflow/
      dags/orders.py
      plugins/lineage.py
    dagster/
      jobs.py
      sensors.py
  transform/
    dbt_project.yml
    models/
      staging/
      marts/
  tests/
    contracts/
    great_expectations/
  catalog/
    ownership.yaml
    sla.yaml

Contract Evolution Examples

// orders.v1.json
{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "orders.v1",
  "type": "object",
  "required": ["order_id","customer_id","amount","created_at"],
  "additionalProperties": false,
  "properties": {
    "order_id": {"type": "string"},
    "customer_id": {"type": "string"},
    "amount": {"type": "number", "minimum": 0},
    "created_at": {"type": "string", "format": "date-time"}
  }
}
// orders.v2.json (backward-compatible: add nullable currency)
{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "orders.v2",
  "type": "object",
  "required": ["order_id","customer_id","amount","created_at"],
  "additionalProperties": false,
  "properties": {
    "order_id": {"type": "string"},
    "customer_id": {"type": "string"},
    "amount": {"type": "number", "minimum": 0},
    "currency": {"type": ["string","null"], "enum": ["USD","EUR","GBP", null]},
    "created_at": {"type": "string", "format": "date-time"}
  }
}

Producer/Consumer Backward-Compatibility Strategies

# producer can start sending currency=null then populate gradually; always include required fields from v1
# consumer: prefer v2 fields if present, fallback to v1 defaults
currency = rec.get('currency') or 'USD'

dbt Semantic Layer: Metrics and Exposures

metrics:
  - name: revenue
    label: Total Revenue
    model: ref('fct_orders')
    calculation_method: sum
    expression: amount
    timestamp: created_at
    time_grains: [day, week, month]
    dimensions: [currency]
exposures:
  - name: exec_dashboard
    type: dashboard
    owner: { name: Analytics, email: analytics@company.com }
    depends_on: [ref('fct_orders')]
    maturity: high

OpenLineage End-to-End (Airflow)

# plugins/lineage.py
from openlineage.airflow import DAG
from airflow.operators.python import PythonOperator

def extract(**ctx): pass

def load(**ctx): pass

dag = DAG('orders', schedule_interval='@hourly', start_date=...)
extract_task = PythonOperator(task_id='extract', python_callable=extract, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load, dag=dag)
extract_task >> load_task

DataHub/OpenMetadata Ownership and SLAs

ownership:
  table: lake.orders
  owners:
    - type: DataOwner
      id: orders-team@company.com
sla:
  freshness_minutes: 15
  completeness: 0.999
  availability: 0.999

Great Expectations Checkpoints and CI

checkpoints:
  - name: orders_daily
    validations:
      - batch_request:
          datasource_name: s3
          data_asset_name: lake/orders/dt={{ ds }}
        expectation_suite_name: orders_suite
# GitHub Actions
- run: great_expectations checkpoint run orders_daily || exit 1

Dagster Sensors and Asset Materializations

from dagster import sensor, RunRequest, asset

@asset
def fct_orders():
    # materialize from staging
    ...

@sensor(job=some_job)
def s3_new_partition_sensor(context):
    if new_partition():
        yield RunRequest(run_key=..., run_config={...})

-- Tumbling window
SELECT window_start, window_end, COUNT(*)
FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(created_at), INTERVAL '5' MINUTE))
GROUP BY window_start, window_end;
-- CEP pattern: detect abnormal order spikes

Iceberg/Delta Time Travel and Rollback

-- Iceberg time travel (Trino)
SELECT * FROM lake.orders FOR TIMESTAMP AS OF TIMESTAMP '2025-10-27 10:00:00 UTC';
-- Delta rollback
deltaTable.restoreToVersion(100)

Trino Materialized Views

CREATE MATERIALIZED VIEW lake.mv_orders_7d AS
SELECT date_trunc('day', created_at) AS d, SUM(amount) AS revenue
FROM lake.orders WHERE created_at >= current_timestamp - INTERVAL '7' day
GROUP BY 1;

Governance Policies with OPA

package data_governance

allow_access {
  input.subject.role == "analyst"
  input.resource.domain == input.subject.domain
}

deny["pii"] {
  input.resource.tags[_] == "pii"
  not input.subject.clearance == "pii"
}

ABAC Tag Policy Wiring

lf_tags:
  - key: sensitivity
    values: [pii, internal, public]
policies:
  - tag: { sensitivity: pii }
    principals: [ role/pii_analyst ]
    permissions: [ SELECT ]

PII Classification Playbook and Tokenization API

- Identify PII fields via classifiers and domain input
- Tokenize PII at ingest; store mapping in secure vault
- Provide detokenization service with strict authz
- Mask PII in catalogs and discovery
POST /tokenize { field: "email", value: "a@b.com" } → { token: "tok_123" }
POST /detokenize { token: "tok_123" } → { value: "a@b.com" }

FinOps Allocations and Dashboards

-- Athena CUR by domain tag and service
SELECT resource_tags_domain, line_item_product_code, SUM(line_item_unblended_cost) AS cost
FROM cur WHERE bill_billing_period_start_date >= date_trunc('month', current_date)
GROUP BY 1,2 ORDER BY 3 DESC;
{
  "title": "Data Mesh Cost",
  "panels": [
    {"type":"table","title":"Cost by Domain","targets":[{"expr":"sum by (domain) (rate(data_cost_usd_total[1h]))"}]}
  ]
}

OTEL Collector for Data Pipelines

receivers: { otlp: { protocols: { grpc: {}, http: {} } } }
processors: [batch]
exporters: { prometheus: { endpoint: ":9464" } }
service:
  pipelines:
    metrics: { receivers: [otlp], processors: [batch], exporters: [prometheus] }

SLO Burn Alerts (PromQL)

# freshness burn
(avg_over_time(data_freshness_minutes[1h]) > 15) and (sum(rate(records_total{outcome="success"}[1h])) > 0)

Incident Templates and Runbooks

Schema Drift Incident
- Detection: contract validation failures; alerts firing
- Contain: stop publish to prod sink; quarantine partition
- Diagnose: compare vN vs vN-1; identify breaking fields
- Fix: produce mapping layer or roll back; add tests
- Recover: backfill; resume publish; postmortem

Extended FAQ (551–750)

  1. Contract negotiation?
    Propose changes; impact analysis; timelines.

  2. Consumer notifications?
    Catalog announcements + email.

  3. Can contracts include metrics?
    Yes—define semantic metrics and ownership.

  4. Who approves PII tagging?
    Governance and domain owner.

  5. Data throttling?
    Rate-limit sources; buffer; backpressure.

  6. Out-of-order events?
    Watermarks + allowed lateness.

  7. Upserts vs append-only?
    Prefer append + corrections; ACID for gold tables.

  8. Delete handling?
    Soft delete flags or CDC tombstones.

  9. Multi-tenant datasets?
    Tag-based access; isolate sensitive slices.

  10. Query SLAs per domain?
    Yes—dashboards with SLOs.

  11. Notebook governance?
    Mask data; audit exports.

  12. Sampling policies?
    For debugging; document bias.

  13. Synthetic data usage?
    Mark; avoid training leakage.

  14. Materialized view refresh?
    On schedule or threshold.

  15. Backfill costs?
    Budget and monitor.

  16. Who owns costs?
    Domain owner via showback.

  17. Resource quotas?
    Per domain limits.

  18. Alert noise?
    Tune and group; runbooks.

  19. Catalog updates cadence?
    Continuous ingestion.

  20. Table ACLs or tags?
    Tags scale better.

  21. Data contracts for realtime?
    Yes—Avro/Protobuf with schemas.

  22. Data layouts?
    Partition + clustering.

  23. Streaming OLAP?
    Consider Pinot/Druid; integrate with mesh.

  24. Audit logs?
    Immutable storage; retention policy.

  25. Escalations?
    Domain → platform → governance.

  26. Can we skip lineage?
    No—vital for trust and audits.

  27. Test flakiness?
    Stabilize; seed; isolation.

  28. Multi-cloud transfer?
    Compress; replicate; test.

  29. Who approves runbooks?
    Platform and governance.

  30. Final acceptance?
    SLOs green; contracts enforced; costs in bounds.

Related posts