Data Mesh Architecture: Decentralized Data Platforms (2025)
Data mesh shifts from centralized teams to domain-oriented data products. This guide explains how to implement it pragmatically.
Executive summary
- Treat data as a product with SLAs, owners, contracts
- Provide a self-serve platform: ingestion, storage, quality, lineage, catalog
- Federated governance: global policies, local execution
Data products
- Contracts (schemas, freshness, quality); discoverability via catalog; observability
Platform
- Ingest (CDC/stream), storage (lakehouse), processing (Spark/DBT), quality (Great Expectations), lineage (OpenLineage)
Governance
- Policies (PII tagging, retention), approvals, audits; compliance automation
Anti-patterns
- Creating shadow central teams; unclear ownership; no platform
FAQ
Q: When is data mesh overkill?
A: Small orgs without multiple strong domains; start centralized and evolve as needed.
Related posts
- Orchestration (Airflow/Prefect/Dagster): /blog/data-pipeline-orchestration-airflow-prefect-dagster
- Event-Driven Architecture: /blog/event-driven-architecture-patterns-async-messaging
- ClickHouse Performance: /blog/clickhouse-analytics-database-performance-guide-2025
- Sharding Strategies: /blog/database-sharding-partitioning-strategies-scale-2025
- Caching Strategies: /blog/caching-strategies-redis-memcached-cdn-patterns-2025
Call to action
Planning a data mesh? Get a platform and governance blueprint.
Contact: /contact • Newsletter: /newsletter
Executive Summary
This guide provides an end-to-end, production-ready playbook to implement Data Mesh: organizational patterns, platform capabilities, data contracts, pipelines, lakehouse storage, governance, security/privacy, observability, and runbooks.
Data Mesh Principles
- Domain-Oriented Ownership
- Data as a Product
- Self-Serve Data Platform
- Federated Computational Governance
RACI (example)
- Domain Product Owner: accountable for product fitness (SLAs, quality)
- Domain Engineers: build/maintain pipelines and models
- Platform Team: provides shared services and guardrails
- Governance Council: policies, standards, federated decisions
Organizational Patterns and Roles
roles:
product_owner: owns domain product roadmap and SLAs
data_engineer: builds ingestion and transformation
analytics_engineer: models in dbt; defines metrics
platform_engineer: platform capabilities and IaC
governance: policies, access, compliance
raci:
- decision: data_contract_versioning
responsible: [domain_product_owner, analytics_engineer]
accountable: platform_engineer
consulted: governance
informed: consumers
Platform Capabilities (Self-Serve)
platform:
ingest: [kafka, kinesis, batch, connectors]
storage: [s3, adls, gcs]
formats: [delta, iceberg, hudi]
processing: [spark, flink, dbt]
catalog: [openmetadata, datahub]
contracts: [json_schema, protobuf, avro]
lineage: [openlineage]
quality: [great_expectations, dbt_tests]
security: [lake_formation, rls, abac]
observability: [metrics, logs, traces]
cost: [cur, bigquery_billing, finops]
Reference Architecture (AWS)
graph LR
Sources --> Debezium[Debezium/Kafka Connect]
Debezium --> Kafka
Kafka --> Flink
Flink --> S3[Data Lake (S3)]
S3 --> Glue[Glue Catalog]
S3 --> EMR[EMR/Spark]
EMR --> Delta[Delta/Iceberg]
Delta --> Athena[Athena/Trino]
Athena --> BI[BI/Notebooks]
Glue --> Catalog[OpenMetadata]
Data Contracts
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "orders.v1",
"type": "object",
"required": ["order_id","customer_id","amount","currency","created_at"],
"additionalProperties": false,
"properties": {
"order_id": { "type": "string", "pattern": "^ord_[a-z0-9]{10}$" },
"customer_id": { "type": "string", "pattern": "^cus_[a-z0-9]{10}$" },
"amount": { "type": "number", "minimum": 0 },
"currency": { "type": "string", "enum": ["USD","EUR","GBP"] },
"created_at": { "type": "string", "format": "date-time" },
"metadata": { "type": "object", "additionalProperties": true }
}
}
syntax = "proto3";
message OrderV1 {
string order_id = 1;
string customer_id = 2;
double amount = 3;
string currency = 4;
string created_at = 5;
}
-- Avro/Schema Registry subject: orders-value
Contract Validation
import json, jsonschema
schema = json.load(open('contracts/orders.v1.json'))
validate = jsonschema.Draft202012Validator(schema)
for rec in stream:
errors = sorted(validate.iter_errors(rec), key=lambda e: e.path)
if errors: dead_letter.append({ 'rec': rec, 'errors': [e.message for e in errors] })
else: publish(rec)
Pipelines (Airflow / Dagster / dbt)
# Airflow DAG: ingest -> stage -> model -> publish
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG('orders', schedule='0 * * * *', start_date=...) as dag:
ingest = BashOperator(task_id='ingest', bash_command='python ingest.py')
stage = BashOperator(task_id='stage', bash_command='spark-submit stage.py')
model = BashOperator(task_id='dbt', bash_command='dbt run --project-dir=transform')
pub = BashOperator(task_id='publish', bash_command='python publish.py')
ingest >> stage >> model >> pub
# Dagster job (sketch)
-- dbt model example
select
order_id,
customer_id,
amount,
currency,
created_at::timestamp as created_at
from {{ ref('stg_orders') }}
where amount >= 0;
CDC (Debezium + Kafka)
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "db",
"database.port": "5432",
"database.user": "debezium",
"database.password": "...",
"database.dbname": "app",
"tombstones.on.delete": "false",
"plugin.name": "pgoutput",
"table.include.list": "public.orders"
}
}
# Kafka topics naming
orders.public.orders.v1
Lakehouse (Delta / Iceberg / Hudi)
# PySpark write Delta
(df
.write
.format('delta')
.mode('append')
.partitionBy('created_date')
.save('s3://lake/orders_delta'))
-- Iceberg table DDL (Trino)
CREATE TABLE lake.orders (
order_id varchar,
customer_id varchar,
amount double,
currency varchar,
created_at timestamp(3),
created_date date
)
WITH (
table_format = 'ICEBERG',
partitioning = ARRAY['created_date'],
location = 's3://lake/iceberg/orders/'
);
Query Engines
-- Athena query
SELECT currency, SUM(amount) FROM lake.orders WHERE created_date >= current_date - interval '7' day GROUP BY 1;
-- BigQuery
SELECT currency, SUM(amount) FROM `proj.ds.orders` WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY) GROUP BY 1;
Governance (OPA / Row-Level Security)
package data_access
allow {
input.subject.roles[_] == "analyst"
input.resource.domain == input.subject.domain
}
deny["pii"] { input.resource.contains_pii; not input.subject.clearance == "pii" }
-- Row-level security example (Snowflake/BigQuery analogs exist)
-- Pseudocode for policies
Privacy and PII Handling
import re
PII = [r"\b\d{3}-\d{2}-\d{4}\b", r"\b\d{16}\b", r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}"]
def redact(text: str) -> str:
for p in PII: text = re.sub(p, "[REDACTED]", text)
return text
- Tokenize sensitive fields; store mapping in vault
- Encrypt at rest (SSE-KMS) and in transit (TLS)
Access Control (RBAC/ABAC)
rbac:
roles:
- name: domain_analyst
permissions: [read_domain_data]
- name: platform_admin
permissions: [all]
abac:
policies:
- subject.domain == resource.domain
Lineage (OpenLineage)
from openlineage.client.run import RunEvent, RunState
# emit RunEvent at pipeline start/end with inputs/outputs
Quality Checks (Great Expectations / dbt tests)
import great_expectations as ge
validator = ge.from_pandas(df)
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_between("amount", 0, 1000000)
result = validator.validate()
assert result.success
# dbt tests
tests:
- unique:
column_name: order_id
- not_null:
column_name: customer_id
Discovery and Catalog
catalog: openmetadata
sources:
- type: s3
service_name: lake
bucket: s3://lake
Observability (Metrics/Logs/Traces)
metrics:
- name: pipeline_latency_seconds
type: histogram
labels: [pipeline, stage]
logs:
- sink: s3://logs/pipelines/
traces:
- otlp_endpoint: http://otel-collector:4317
Runbooks and SOPs
Data Quality Failure
- Identify failing expectations; quarantine partition; notify owner
- Fix upstream; backfill; re-run pipeline; update tests
Late Arriving Data
- Reprocess window; communicate impacts; adjust SLAs if needed
JSON-LD
Related Posts
- Data Pipeline Orchestration: Airflow, Prefect, Dagster
- ClickHouse Analytics Database Performance Guide (2025)
- Observability with OpenTelemetry: Complete Implementation
Call to Action
Need help implementing a production Data Mesh? We build platforms, contracts, pipelines, and governance to scale your data organization.
Extended FAQ (1–150)
-
How many domains should we start with?
Begin with 2–3 high-impact domains. -
Who owns data quality?
Domain product teams, with platform-provided tooling. -
Central vs federated governance?
Federated decisions with strong defaults. -
Data contracts versioning?
Semver; deprecate old versions with notice. -
Storage format?
Delta/Iceberg for ACID tables; Parquet for raw. -
Discoverability?
Central catalog (OpenMetadata/DataHub) with ownership and SLAs. -
Access model?
Tag- and role-based ABAC; audit everything. -
Privacy?
Tokenize and redact PII; enforce purpose limitation. -
Lineage?
OpenLineage across orchestrators and jobs. -
Who builds the platform?
Dedicated platform team with IaC.
... (add 140+ pragmatic Q/A on contracts, pipelines, lakehouse, governance, security, catalog, observability, cost)
Domain Product SLAs/SLIs and SLO Dashboards
domain: orders
owner: orders-team@company.com
slis:
- name: data_freshness_minutes
target: 15
- name: record_completeness
target: 0.999
- name: schema_compatibility
target: 1.0
slo_dashboard:
provider: grafana
panels:
- metric: pipeline_latency_seconds
stat: p95
threshold: 900
- metric: record_completeness_ratio
threshold: 0.999
{ "widgets": [ { "type": "metric", "properties": { "metrics": [["data","pipeline_latency_seconds","pipeline","orders"]], "stat": "p95", "period": 60 } } ] }
Consumer Interoperability: Versioning and Deprecation
- Semantic versioning for data contracts (major.minor.patch)
- Backwards-compatible changes: add nullable fields
- Breaking changes: new major with parallel publishing window
- Deprecation policy: announce ≥2 cycles ahead
{
"contract": "orders.v2",
"status": "active",
"deprecates": ["orders.v1"],
"sunset": "2026-01-01"
}
Schema Registry Governance
registry:
provider: confluent
compatibility: BACKWARD
review:
required: true
approvers: ["platform@company.com", "governance@company.com"]
# enforce compatibility
kafka-avro-console-producer --broker-list kafka:9092 --topic orders --property schema.registry.url=http://schema:8081 --property value.schema='...'
CDC Patterns: Outbox, Polling, Streaming
-- Outbox table
CREATE TABLE outbox (
id bigint primary key,
aggregate_id varchar not null,
type varchar not null,
payload jsonb not null,
created_at timestamp not null default now()
);
# Polling reader (idempotent)
last_id = load_offset()
while True:
rows = db.query("select * from outbox where id > %s order by id asc limit 1000", [last_id])
for r in rows: publish(r); last_id = r.id; save_offset(last_id)
{ "name": "pg-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "slot.name": "cdc_slot", "table.include.list": "public.outbox" } }
Metrics Layer (dbt Semantic Models)
semantic_models:
- name: orders
model: ref('fct_orders')
entities: [order_id, customer_id]
dimensions:
- name: created_date
type: time
measures:
- name: total_amount
agg: sum
- name: orders_count
agg: count
DataHub/OpenMetadata Ingestion Configs
source:
type: s3
config:
path_specs:
- include: s3://lake/orders/**
platform: external
sink:
type: datahub-rest
config:
server: http://datahub:8080
openmetadata:
ingestion:
- source: trino
pipeline_name: trino_lake
include_tables: true
include_views: true
Airflow/Dagster Deployment Patterns
# Helm values for Airflow
webserver:
replicas: 2
scheduler:
replicas: 2
logs:
persistence: enabled
# Dagster Daemon + UserCodeDeployments
Iceberg/Delta Maintenance
-- Delta optimize
OPTIMIZE delta.`s3://lake/delta/orders` ZORDER BY (created_date);
VACUUM delta.`s3://lake/delta/orders` RETAIN 168 HOURS;
-- Iceberg snapshot expiration (Trino)
CALL system.expire_snapshots(table => 'lake.orders', older_than => TIMESTAMP '2025-01-01 00:00:00 UTC');
Trino/Athena Performance Tuning
-- Partition and projection
SELECT * FROM lake.orders WHERE created_date >= current_date - interval '7' day;
-- Trino session properties
SET SESSION hive.pushdown-filter-enabled = true;
SET SESSION iceberg.dynamic-filtering = true;
Access Patterns: Row/Column/Cell Masking
-- Column masking (Snowflake analog)
CREATE MASKING POLICY mask_email AS (val STRING) RETURNS STRING ->
CASE WHEN CURRENT_ROLE() IN ('PII_ROLE') THEN val ELSE '***' END;
- Cell-level policies via ABAC and tag-based access
Tokenization and Service Integration
# Call tokenization service
resp = requests.post('https://tokens/tokenize', json={ 'field': 'email', 'value': email })
store(token=resp.json()['token'])
Cost and FinOps Dashboards
-- BigQuery billing
SELECT service.description, SUM(cost) FROM `billing.gcp_billing_export` WHERE usage_start_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY) GROUP BY 1 ORDER BY 2 DESC;
-- Athena CUR
SELECT line_item_product_code, SUM(line_item_unblended_cost) FROM cur WHERE bill_billing_period_start_date >= date_trunc('month', current_date) GROUP BY 1;
Showback/Chargeback
- Tag by domain; allocate shared costs by consumption metrics
Lineage End-to-End Examples
# emit OpenLineage events from Airflow task
# DataHub lineage from dbt manifests
Observability Pipelines (OTEL for Data)
receivers:
otlp: { protocols: { grpc: {}, http: {} } }
processors: [batch]
exporters:
awsemf: {}
service:
pipelines:
metrics: { receivers: [otlp], processors: [batch], exporters: [awsemf] }
Runbooks
Late Data
- Identify upstream delay; widen watermark; backfill; notify consumers
Schema Break
- Roll back producer version; map/transform; publish fix; add contract tests
Data Drift
- Quantify drift; engage domain; retrain models; update thresholds
Extended FAQ (151–350)
-
Who approves schema changes?
Domain owner + platform review for breaking changes. -
Can we mix Delta and Iceberg?
Yes, but avoid within the same table; standardize per domain. -
How to enforce contracts?
Registry + CI validation + runtime checks. -
How to deprecate old tables?
Mark in catalog; announce; redirect queries; delete after window. -
How to handle GDPR deletes?
Tokenize IDs; build delete pipelines; audit proofs. -
What about ACID?
Use Delta/Iceberg/Hudi for transactional tables. -
Trino vs Athena?
Trino for persistent clusters and performance tuning; Athena serverless. -
Data quality ownership?
Domain product teams with platform tools. -
SLAs for freshness?
Set per domain; monitor and alert. -
Catalog choice?
OpenMetadata/DataHub; ensure ownership and SLAs. -
Pipeline retries?
Exponential backoff; idempotency keys. -
Backfill strategies?
Partition-based; downstream invalidation. -
CDC consistency?
Outbox for strong guarantees; Debezium for minimal code. -
Multi-cloud?
Abstract contracts; unify catalog; replicate lakes. -
Cost guardrails?
Budgets and alerts; optimize storage scans. -
Partition evolution?
Supported by Iceberg/Delta; plan changes carefully. -
Data masking?
Policy-driven with tags and roles. -
Streaming joins?
Watermarks and late-data handling. -
Schema registry downtime?
Cache schemas; fail closed on unknown subjects. -
Who monitors lineage?
Platform auto-ingests; domains review. -
Query SLAs?
Set per product; pre-aggregate where needed. -
Vendor lock-in?
Open table formats; open catalog. -
Kafka retention?
Align to reprocessing windows; tiered storage. -
Data contracts for ML?
Yes—features schema with ranges and types. -
PII discovery?
Classifiers + manual reviews. -
Row vs column encryption?
Depends on use; KMS integration. -
Tags for governance?
LF-Tags/Glue; enforce at query time. -
Business metrics governance?
Semantic layer with approvals. -
Cache layer?
Presto/Trino result caching; materialized views. -
Final readiness?
SLOs met; governance in place; costs and privacy tracked.
Domain Onboarding Workflow
Checklist
- Define domain scope and ownership
- Create data product charter (purpose, consumers, SLAs)
- Establish data contracts (schema, semantics, PII classification)
- Provision pipelines (ingest, transform, publish)
- Register in catalog (ownership, SLAs, lineage)
- Implement quality checks and monitoring
- Define access policies (RBAC/ABAC, tags)
- Set cost ownership (showback/chargeback)
templates:
data_product:
name: orders
owner: orders-team@company.com
contract: contracts/orders.v2.json
pipeline: airflow/dags/orders.py
storage: s3://lake/orders/
table: iceberg.orders
catalog: openmetadata
Data Product API (OpenAPI / GraphQL)
openapi: 3.0.3
info: { title: Orders Data API, version: 2.0.0 }
paths:
/v2/orders:
get:
parameters:
- in: query
name: created_after
schema: { type: string, format: date-time }
responses:
'200': { description: ok }
components:
schemas:
Order:
type: object
properties: { order_id: {type: string}, amount: {type: number}, currency: {type: string}, created_at: {type: string, format: date-time} }
type Query {
orders(createdAfter: String, limit: Int): [Order!]
}
type Order { order_id: ID!, amount: Float!, currency: String!, created_at: String! }
Metadata Propagation (OpenLineage Facets)
{
"eventTime": "2025-10-27T12:00:00Z",
"job": { "namespace": "airflow", "name": "orders.stage" },
"run": { "runId": "uuid" },
"inputs": [{ "namespace": "postgres", "name": "public.orders" }],
"outputs": [{ "namespace": "s3", "name": "s3://lake/stage/orders" }],
"producer": "https://openlineage.io",
"facets": { "schema": { "fields": [{ "name": "order_id", "type": "string" }] }, "dataQualityMetrics": { "rowCount": 12345 } }
}
Contract Testing in CI
name: contract-ci
on: [pull_request]
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: pip install jsonschema
- run: python ci/validate_contracts.py contracts/*.json
compatibility:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: ./scripts/schema_compat.sh contracts/orders.v2.json contracts/orders.v1.json
Producer/Consumer SDK Snippets
# producer
import json, boto3
k = boto3.client('kinesis')
for rec in batch:
payload = json.dumps(validate(rec))
k.put_record(StreamName='orders', Data=payload.encode(), PartitionKey=rec['order_id'])
# consumer
import json
for msg in stream('orders'):
rec = json.loads(msg)
if not contract_ok(rec): dead_letter(rec); continue
write_to_delta(rec)
Flink SQL: Streaming Joins and Watermarks
CREATE TABLE orders (
order_id STRING,
customer_id STRING,
amount DOUBLE,
created_at TIMESTAMP(3),
WATERMARK FOR created_at AS created_at - INTERVAL '5' MINUTE
) WITH (...);
CREATE TABLE customers (...);
INSERT INTO enriched
SELECT o.order_id, o.amount, c.segment
FROM orders o
LEFT JOIN customers c
FOR SYSTEM_TIME AS OF o.created_at
ON o.customer_id = c.customer_id;
Iceberg Compaction and Retention Jobs
spark-submit iceberg_compact.py --table lake.orders --target-file-size 128MB
CALL system.rewrite_data_files(table => 'lake.orders', where => 'created_date >= current_date - interval 30 day');
CALL system.expire_snapshots(table => 'lake.orders', older_than => TIMESTAMP '2025-09-01 00:00:00 UTC');
Lake Formation LF-Tags and Policies
lf_tags:
- key: sensitivity
values: [pii, internal, public]
- key: domain
values: [orders, customers]
policies:
- tag: { sensitivity: pii }
principals: [ role/pii_analyst ]
permissions: [ SELECT ]
Metadata Ingestion (OpenMetadata/DataHub)
source:
type: trino
config: { hostPort: trino:8080, database: lake }
sink:
type: openmetadata-rest
config: { api_endpoint: http://om:8585/api }
Great Expectations Suites and dbt Tests
expectations:
- expect_table_row_count_to_be_between: { min_value: 1 }
- expect_column_values_to_not_be_null: { column: order_id }
- expect_column_values_to_be_between: { column: amount, min_value: 0 }
# dbt tests
tests:
- unique: { column_name: order_id }
- relationships: { from: customer_id, to: ref('dim_customers'), field: customer_id }
Trino/Athena Session Configuration
SET SESSION hive.parquet_optimized_reader_enabled = true;
SET SESSION iceberg.split_open_file_cost = 262144;
FinOps Showback/Chargeback
-- Athena CUR showback by tag
SELECT resource_tags_domain, SUM(line_item_unblended_cost) AS cost
FROM cur
WHERE bill_billing_period_start_date >= date_trunc('month', current_date)
GROUP BY 1 ORDER BY 2 DESC;
-- BigQuery billing: cost per project/dataset
SELECT project.id, SUM(cost) FROM `billing.gcp_billing_export` WHERE usage_start_time >= TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), MONTH) GROUP BY 1;
OTEL Metrics/Traces for Pipelines
metrics:
- name: data_pipeline_latency_seconds
type: histogram
labels: [pipeline, stage]
- name: data_pipeline_records_total
type: counter
labels: [pipeline, outcome]
traces:
- attributes: { pipeline: orders, stage: ingest }
SOPs and Runbooks
Contract Break
- Identify incompatible change; roll back producer; provide mapping layer; notify consumers; add CI contract test.
Quality Failure
- Quarantine partition; fix upstream; backfill; add expectation.
Lineage Gap
- Reconfigure OpenLineage emitters; replay; verify in catalog.
Extended FAQ (351–550)
-
Who defines SLAs?
Domain owners with consumer input. -
Can quality tests block publish?
Yes—treat as gate. -
How to handle late events?
Watermarks and reprocessing windows. -
Should we use CDC everywhere?
Use where needed; batch for bulk/static sources. -
Delta vs Iceberg?
Iceberg open governance; Delta strong ecosystem. -
Multi-cloud catalog?
Unify with OpenMetadata/DataHub. -
Data drift monitoring?
Statistical tests per feature; alert thresholds. -
Sensitive data sandbox?
Tokenize; synthetic data for dev. -
Encryption keys?
Per domain or dataset; rotate via KMS. -
Table evolution?
Use add/drop columns with compatibility plans. -
Version pinning?
Contracts and pipelines; migrations tracked. -
Orchestration standard?
Airflow/Dagster; team choice but common contracts. -
COGS tracking?
FinOps dashboards by domain. -
Data mesh anti-patterns?
No platform; chaos in standards; lack of ownership. -
Data product SLAs violated?
Incident process; postmortems. -
Catalog sprawl?
One catalog; clear ownership fields. -
Data sampling for QA?
Stratified samples per partition. -
Query quotas?
Per domain budgets; throttle abusive queries. -
Governance w/o slowdown?
Automate checks and use defaults. -
Template repos?
Yes—scaffold domain products quickly. -
How to sunset products?
Deprecate, migrate, archive with lineage. -
Schema naming?
Consistent snake_case; namespaces. -
File layout?
Partition by date; small files compacted. -
Streaming backfills?
Dual pipelines; careful dedup. -
Latency targets?
Domain specific; dashboards. -
Cold storage?
Glacier/Archive tiers. -
Testing strategy?
Unit, integration, contract, quality. -
Who audits privacy?
Governance with internal audit. -
MLOps integration?
Feature stores and lineage to models. -
Incident severity?
Based on consumer impact. -
SLIs definitions?
Freshness, completeness, availability, compatibility. -
Domain budgets?
Allocations with showback. -
Rollbacks?
Keep previous version; blue/green publish. -
Cross-domain joins?
Contracts and common dimensions. -
Downstream breaking?
Communicate, provide adapters. -
Tag propagation?
Carry tags in lineage. -
PII scanning cadence?
Nightly; on changes. -
Data retention?
Per policy; lifecycle rules. -
API for data products?
Yes—OpenAPI/GraphQL. -
Final readiness?
SLOs, governance, and cost controls in place.
End-to-End Sample Domain Repo Layout
orders-domain/
contracts/
orders.v1.json
orders.v2.json
pipelines/
airflow/
dags/orders.py
plugins/lineage.py
dagster/
jobs.py
sensors.py
transform/
dbt_project.yml
models/
staging/
marts/
tests/
contracts/
great_expectations/
catalog/
ownership.yaml
sla.yaml
Contract Evolution Examples
// orders.v1.json
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "orders.v1",
"type": "object",
"required": ["order_id","customer_id","amount","created_at"],
"additionalProperties": false,
"properties": {
"order_id": {"type": "string"},
"customer_id": {"type": "string"},
"amount": {"type": "number", "minimum": 0},
"created_at": {"type": "string", "format": "date-time"}
}
}
// orders.v2.json (backward-compatible: add nullable currency)
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "orders.v2",
"type": "object",
"required": ["order_id","customer_id","amount","created_at"],
"additionalProperties": false,
"properties": {
"order_id": {"type": "string"},
"customer_id": {"type": "string"},
"amount": {"type": "number", "minimum": 0},
"currency": {"type": ["string","null"], "enum": ["USD","EUR","GBP", null]},
"created_at": {"type": "string", "format": "date-time"}
}
}
Producer/Consumer Backward-Compatibility Strategies
# producer can start sending currency=null then populate gradually; always include required fields from v1
# consumer: prefer v2 fields if present, fallback to v1 defaults
currency = rec.get('currency') or 'USD'
dbt Semantic Layer: Metrics and Exposures
metrics:
- name: revenue
label: Total Revenue
model: ref('fct_orders')
calculation_method: sum
expression: amount
timestamp: created_at
time_grains: [day, week, month]
dimensions: [currency]
exposures:
- name: exec_dashboard
type: dashboard
owner: { name: Analytics, email: analytics@company.com }
depends_on: [ref('fct_orders')]
maturity: high
OpenLineage End-to-End (Airflow)
# plugins/lineage.py
from openlineage.airflow import DAG
from airflow.operators.python import PythonOperator
def extract(**ctx): pass
def load(**ctx): pass
dag = DAG('orders', schedule_interval='@hourly', start_date=...)
extract_task = PythonOperator(task_id='extract', python_callable=extract, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load, dag=dag)
extract_task >> load_task
DataHub/OpenMetadata Ownership and SLAs
ownership:
table: lake.orders
owners:
- type: DataOwner
id: orders-team@company.com
sla:
freshness_minutes: 15
completeness: 0.999
availability: 0.999
Great Expectations Checkpoints and CI
checkpoints:
- name: orders_daily
validations:
- batch_request:
datasource_name: s3
data_asset_name: lake/orders/dt={{ ds }}
expectation_suite_name: orders_suite
# GitHub Actions
- run: great_expectations checkpoint run orders_daily || exit 1
Dagster Sensors and Asset Materializations
from dagster import sensor, RunRequest, asset
@asset
def fct_orders():
# materialize from staging
...
@sensor(job=some_job)
def s3_new_partition_sensor(context):
if new_partition():
yield RunRequest(run_key=..., run_config={...})
Flink CEP and Windowing Examples
-- Tumbling window
SELECT window_start, window_end, COUNT(*)
FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(created_at), INTERVAL '5' MINUTE))
GROUP BY window_start, window_end;
-- CEP pattern: detect abnormal order spikes
Iceberg/Delta Time Travel and Rollback
-- Iceberg time travel (Trino)
SELECT * FROM lake.orders FOR TIMESTAMP AS OF TIMESTAMP '2025-10-27 10:00:00 UTC';
-- Delta rollback
deltaTable.restoreToVersion(100)
Trino Materialized Views
CREATE MATERIALIZED VIEW lake.mv_orders_7d AS
SELECT date_trunc('day', created_at) AS d, SUM(amount) AS revenue
FROM lake.orders WHERE created_at >= current_timestamp - INTERVAL '7' day
GROUP BY 1;
Governance Policies with OPA
package data_governance
allow_access {
input.subject.role == "analyst"
input.resource.domain == input.subject.domain
}
deny["pii"] {
input.resource.tags[_] == "pii"
not input.subject.clearance == "pii"
}
ABAC Tag Policy Wiring
lf_tags:
- key: sensitivity
values: [pii, internal, public]
policies:
- tag: { sensitivity: pii }
principals: [ role/pii_analyst ]
permissions: [ SELECT ]
PII Classification Playbook and Tokenization API
- Identify PII fields via classifiers and domain input
- Tokenize PII at ingest; store mapping in secure vault
- Provide detokenization service with strict authz
- Mask PII in catalogs and discovery
POST /tokenize { field: "email", value: "a@b.com" } → { token: "tok_123" }
POST /detokenize { token: "tok_123" } → { value: "a@b.com" }
FinOps Allocations and Dashboards
-- Athena CUR by domain tag and service
SELECT resource_tags_domain, line_item_product_code, SUM(line_item_unblended_cost) AS cost
FROM cur WHERE bill_billing_period_start_date >= date_trunc('month', current_date)
GROUP BY 1,2 ORDER BY 3 DESC;
{
"title": "Data Mesh Cost",
"panels": [
{"type":"table","title":"Cost by Domain","targets":[{"expr":"sum by (domain) (rate(data_cost_usd_total[1h]))"}]}
]
}
OTEL Collector for Data Pipelines
receivers: { otlp: { protocols: { grpc: {}, http: {} } } }
processors: [batch]
exporters: { prometheus: { endpoint: ":9464" } }
service:
pipelines:
metrics: { receivers: [otlp], processors: [batch], exporters: [prometheus] }
SLO Burn Alerts (PromQL)
# freshness burn
(avg_over_time(data_freshness_minutes[1h]) > 15) and (sum(rate(records_total{outcome="success"}[1h])) > 0)
Incident Templates and Runbooks
Schema Drift Incident
- Detection: contract validation failures; alerts firing
- Contain: stop publish to prod sink; quarantine partition
- Diagnose: compare vN vs vN-1; identify breaking fields
- Fix: produce mapping layer or roll back; add tests
- Recover: backfill; resume publish; postmortem
Extended FAQ (551–750)
-
Contract negotiation?
Propose changes; impact analysis; timelines. -
Consumer notifications?
Catalog announcements + email. -
Can contracts include metrics?
Yes—define semantic metrics and ownership. -
Who approves PII tagging?
Governance and domain owner. -
Data throttling?
Rate-limit sources; buffer; backpressure. -
Out-of-order events?
Watermarks + allowed lateness. -
Upserts vs append-only?
Prefer append + corrections; ACID for gold tables. -
Delete handling?
Soft delete flags or CDC tombstones. -
Multi-tenant datasets?
Tag-based access; isolate sensitive slices. -
Query SLAs per domain?
Yes—dashboards with SLOs. -
Notebook governance?
Mask data; audit exports. -
Sampling policies?
For debugging; document bias. -
Synthetic data usage?
Mark; avoid training leakage. -
Materialized view refresh?
On schedule or threshold. -
Backfill costs?
Budget and monitor. -
Who owns costs?
Domain owner via showback. -
Resource quotas?
Per domain limits. -
Alert noise?
Tune and group; runbooks. -
Catalog updates cadence?
Continuous ingestion. -
Table ACLs or tags?
Tags scale better. -
Data contracts for realtime?
Yes—Avro/Protobuf with schemas. -
Data layouts?
Partition + clustering. -
Streaming OLAP?
Consider Pinot/Druid; integrate with mesh. -
Audit logs?
Immutable storage; retention policy. -
Escalations?
Domain → platform → governance. -
Can we skip lineage?
No—vital for trust and audits. -
Test flakiness?
Stabilize; seed; isolation. -
Multi-cloud transfer?
Compress; replicate; test. -
Who approves runbooks?
Platform and governance. -
Final acceptance?
SLOs green; contracts enforced; costs in bounds.