Building a CSV API: Design Patterns & Best Practices (2025 Developer Guide)
Building a robust CSV API requires careful consideration of performance, scalability, and user experience. Whether you're creating a simple CSV processing service or a complex data management platform, understanding the design patterns and best practices for CSV APIs is crucial for success.
This comprehensive guide covers everything you need to know about building CSV APIs, including REST design patterns, streaming and chunking strategies, error handling, performance optimization, and security considerations. Whether you're a backend developer, API architect, or technical lead, this guide will help you build production-ready CSV APIs.
Why CSV APIs Matter
Common Use Cases
Data Processing Services:
- CSV file validation and cleaning
- Format conversion (CSV to JSON, XML, etc.)
- Data transformation and enrichment
- Bulk data import/export operations
Business Applications:
- E-commerce product catalog management
- Customer data synchronization
- Financial transaction processing
- HR and payroll data management
Analytics and Reporting:
- Data aggregation and analysis
- Report generation and export
- Real-time data streaming
- Historical data archiving
Integration Platforms:
- Third-party system integration
- Data pipeline orchestration
- Event-driven data processing
- Microservices communication
Benefits of CSV APIs
Scalability:
- Handle large datasets efficiently
- Support concurrent users
- Scale horizontally as needed
- Optimize resource usage
Flexibility:
- Support multiple data formats
- Customizable processing options
- Extensible architecture
- Easy integration
Reliability:
- Robust error handling
- Data validation and integrity
- Transaction support
- Backup and recovery
Performance:
- Fast processing times
- Memory-efficient operations
- Streaming capabilities
- Caching strategies
REST API Design Patterns
Resource-Based Design
CSV Resource Hierarchy:
/api/v1/csv/
├── files/ # CSV file management
│ ├── {file_id} # Individual file operations
│ ├── {file_id}/data # File data access
│ ├── {file_id}/metadata # File metadata
│ └── {file_id}/schema # File schema
├── jobs/ # Processing jobs
│ ├── {job_id} # Job status and results
│ └── {job_id}/logs # Job execution logs
├── templates/ # CSV templates
│ └── {template_id} # Template operations
└── health # API health check
RESTful Endpoints:
# File Management
GET /api/v1/csv/files # List files
POST /api/v1/csv/files # Upload file
GET /api/v1/csv/files/{file_id} # Get file info
PUT /api/v1/csv/files/{file_id} # Update file
DELETE /api/v1/csv/files/{file_id} # Delete file
# Data Access
GET /api/v1/csv/files/{file_id}/data # Get file data
POST /api/v1/csv/files/{file_id}/data # Append data
PUT /api/v1/csv/files/{file_id}/data # Replace data
DELETE /api/v1/csv/files/{file_id}/data # Clear data
# Processing Operations
POST /api/v1/csv/files/{file_id}/validate # Validate file
POST /api/v1/csv/files/{file_id}/convert # Convert format
POST /api/v1/csv/files/{file_id}/filter # Filter data
POST /api/v1/csv/files/{file_id}/sort # Sort data
# Job Management
GET /api/v1/csv/jobs # List jobs
POST /api/v1/csv/jobs # Create job
GET /api/v1/csv/jobs/{job_id} # Get job status
DELETE /api/v1/csv/jobs/{job_id} # Cancel job
Request/Response Patterns
Standard Request Format:
# File Upload Request
POST /api/v1/csv/files
Content-Type: multipart/form-data
{
"file": <binary_data>,
"metadata": {
"name": "sales_data.csv",
"description": "Monthly sales data",
"tags": ["sales", "monthly", "2025"]
},
"options": {
"delimiter": ",",
"encoding": "utf-8",
"has_header": true
}
}
# Processing Request
POST /api/v1/csv/files/{file_id}/convert
Content-Type: application/json
{
"target_format": "json",
"options": {
"pretty_print": true,
"include_metadata": true
}
}
Standard Response Format:
# Success Response
{
"success": true,
"data": {
"file_id": "uuid-1234",
"name": "sales_data.csv",
"size": 1024000,
"rows": 1000,
"columns": 5,
"created_at": "2025-01-19T10:00:00Z",
"status": "processed"
},
"metadata": {
"processing_time": "2.5s",
"api_version": "v1"
}
}
# Error Response
{
"success": false,
"error": {
"code": "VALIDATION_ERROR",
"message": "Invalid CSV format",
"details": {
"line": 15,
"column": 3,
"issue": "Missing closing quote"
}
},
"metadata": {
"request_id": "req-1234",
"timestamp": "2025-01-19T10:00:00Z"
}
}
Streaming and Chunking Strategies
Streaming Architecture
Streaming CSV Processing:
from fastapi import FastAPI, UploadFile, Response
from fastapi.responses import StreamingResponse
import csv
import io
import asyncio
app = FastAPI()
@app.post("/api/v1/csv/files/{file_id}/stream")
async def stream_csv_data(file_id: str, chunk_size: int = 1000):
"""Stream CSV data in chunks"""
async def generate_chunks():
# Open file stream
with open(f"files/{file_id}.csv", "r") as file:
reader = csv.DictReader(file)
# Process in chunks
chunk = []
for row in reader:
chunk.append(row)
if len(chunk) >= chunk_size:
yield f"data: {json.dumps(chunk)}\n\n"
chunk = []
await asyncio.sleep(0.001) # Yield control
# Yield remaining data
if chunk:
yield f"data: {json.dumps(chunk)}\n\n"
return StreamingResponse(
generate_chunks(),
media_type="text/plain",
headers={"Cache-Control": "no-cache"}
)
Chunked Processing:
@app.post("/api/v1/csv/files/{file_id}/process")
async def process_csv_chunks(file_id: str, chunk_size: int = 1000):
"""Process CSV file in chunks"""
results = []
chunk_count = 0
with open(f"files/{file_id}.csv", "r") as file:
reader = csv.DictReader(file)
chunk = []
for row in reader:
chunk.append(row)
if len(chunk) >= chunk_size:
# Process chunk
processed_chunk = await process_chunk(chunk)
results.extend(processed_chunk)
chunk_count += 1
chunk = []
# Log progress
print(f"Processed chunk {chunk_count}")
# Process remaining data
if chunk:
processed_chunk = await process_chunk(chunk)
results.extend(processed_chunk)
return {
"success": True,
"data": {
"total_chunks": chunk_count + 1,
"total_rows": len(results),
"results": results
}
}
async def process_chunk(chunk):
"""Process a single chunk of data"""
# Simulate processing
await asyncio.sleep(0.1)
# Apply transformations
processed = []
for row in chunk:
# Example: Clean data
cleaned_row = {
key: value.strip() if isinstance(value, str) else value
for key, value in row.items()
}
processed.append(cleaned_row)
return processed
Memory Management
Memory-Efficient Processing:
import gc
import psutil
import os
class MemoryManager:
"""Memory management for CSV processing"""
def __init__(self, max_memory_mb=500):
self.max_memory_mb = max_memory_mb
self.process = psutil.Process(os.getpid())
def check_memory_usage(self):
"""Check current memory usage"""
memory_info = self.process.memory_info()
memory_mb = memory_info.rss / 1024 / 1024
return memory_mb
def should_garbage_collect(self):
"""Check if garbage collection is needed"""
return self.check_memory_usage() > self.max_memory_mb
def cleanup_memory(self):
"""Force garbage collection"""
gc.collect()
print(f"Memory usage after cleanup: {self.check_memory_usage():.2f} MB")
@app.post("/api/v1/csv/files/{file_id}/process-memory-safe")
async def process_csv_memory_safe(file_id: str):
"""Process CSV with memory management"""
memory_manager = MemoryManager()
results = []
with open(f"files/{file_id}.csv", "r") as file:
reader = csv.DictReader(file)
for row in reader:
# Process row
processed_row = await process_row(row)
results.append(processed_row)
# Check memory usage
if memory_manager.should_garbage_collect():
memory_manager.cleanup_memory()
return {"success": True, "data": results}
Error Handling and Validation
Comprehensive Error Handling
Error Handling Middleware:
from fastapi import HTTPException, Request
from fastapi.responses import JSONResponse
import logging
import traceback
logger = logging.getLogger(__name__)
class CSVAPIError(Exception):
"""Base exception for CSV API errors"""
def __init__(self, message: str, error_code: str, details: dict = None):
self.message = message
self.error_code = error_code
self.details = details or {}
super().__init__(self.message)
class ValidationError(CSVAPIError):
"""CSV validation error"""
pass
class ProcessingError(CSVAPIError):
"""CSV processing error"""
pass
class FileNotFoundError(CSVAPIError):
"""File not found error"""
pass
@app.exception_handler(CSVAPIError)
async def csv_api_error_handler(request: Request, exc: CSVAPIError):
"""Handle CSV API errors"""
return JSONResponse(
status_code=400,
content={
"success": False,
"error": {
"code": exc.error_code,
"message": exc.message,
"details": exc.details
},
"metadata": {
"request_id": request.headers.get("X-Request-ID"),
"timestamp": datetime.utcnow().isoformat()
}
}
)
@app.exception_handler(Exception)
async def general_error_handler(request: Request, exc: Exception):
"""Handle general errors"""
logger.error(f"Unexpected error: {str(exc)}")
logger.error(traceback.format_exc())
return JSONResponse(
status_code=500,
content={
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": "An unexpected error occurred",
"details": {}
},
"metadata": {
"request_id": request.headers.get("X-Request-ID"),
"timestamp": datetime.utcnow().isoformat()
}
}
)
CSV Validation:
import csv
import io
class CSVValidator:
"""CSV validation utility"""
def __init__(self):
self.errors = []
self.warnings = []
def validate_file(self, file_content: bytes, options: dict = None):
"""Validate CSV file content"""
self.errors = []
self.warnings = []
try:
# Decode content
content = file_content.decode('utf-8')
except UnicodeDecodeError:
self.errors.append({
"type": "ENCODING_ERROR",
"message": "Invalid UTF-8 encoding",
"line": 0
})
return False
# Parse CSV
try:
reader = csv.reader(io.StringIO(content))
rows = list(reader)
except csv.Error as e:
self.errors.append({
"type": "PARSING_ERROR",
"message": str(e),
"line": 0
})
return False
# Validate structure
if not rows:
self.errors.append({
"type": "EMPTY_FILE",
"message": "File is empty",
"line": 0
})
return False
# Check header
header = rows[0]
if not header or all(not cell.strip() for cell in header):
self.errors.append({
"type": "MISSING_HEADER",
"message": "File has no header row",
"line": 1
})
return False
# Validate data rows
for i, row in enumerate(rows[1:], start=2):
if len(row) != len(header):
self.errors.append({
"type": "FIELD_COUNT_MISMATCH",
"message": f"Row has {len(row)} fields, expected {len(header)}",
"line": i
})
# Check for empty rows
if all(not cell.strip() for cell in row):
self.warnings.append({
"type": "EMPTY_ROW",
"message": "Empty row detected",
"line": i
})
return len(self.errors) == 0
def get_validation_report(self):
"""Get validation report"""
return {
"valid": len(self.errors) == 0,
"errors": self.errors,
"warnings": self.warnings,
"error_count": len(self.errors),
"warning_count": len(self.warnings)
}
@app.post("/api/v1/csv/files/{file_id}/validate")
async def validate_csv_file(file_id: str):
"""Validate CSV file"""
try:
# Read file
with open(f"files/{file_id}.csv", "rb") as file:
content = file.read()
# Validate
validator = CSVValidator()
is_valid = validator.validate_file(content)
return {
"success": True,
"data": {
"file_id": file_id,
"validation": validator.get_validation_report()
}
}
except FileNotFoundError:
raise FileNotFoundError(
message="File not found",
error_code="FILE_NOT_FOUND",
details={"file_id": file_id}
)
except Exception as e:
raise ProcessingError(
message="Validation failed",
error_code="VALIDATION_ERROR",
details={"error": str(e)}
)
Data Quality Validation
Advanced Data Validation:
class DataQualityValidator:
"""Advanced data quality validation"""
def __init__(self):
self.rules = []
self.results = []
def add_rule(self, rule):
"""Add validation rule"""
self.rules.append(rule)
def validate_data(self, data):
"""Validate data against rules"""
self.results = []
for rule in self.rules:
result = rule.validate(data)
self.results.append(result)
return all(result.passed for result in self.results)
def get_quality_report(self):
"""Get data quality report"""
return {
"total_rules": len(self.rules),
"passed_rules": sum(1 for r in self.results if r.passed),
"failed_rules": sum(1 for r in self.results if not r.passed),
"results": [r.to_dict() for r in self.results]
}
class ValidationRule:
"""Base validation rule"""
def __init__(self, name, description):
self.name = name
self.description = description
def validate(self, data):
"""Validate data - to be implemented by subclasses"""
raise NotImplementedError
class RequiredFieldRule(ValidationRule):
"""Rule for required fields"""
def __init__(self, field_name):
super().__init__(
f"required_{field_name}",
f"Field '{field_name}' is required"
)
self.field_name = field_name
def validate(self, data):
"""Check if required field is present and not empty"""
if self.field_name not in data:
return ValidationResult(False, f"Field '{self.field_name}' is missing")
if not data[self.field_name] or not str(data[self.field_name]).strip():
return ValidationResult(False, f"Field '{self.field_name}' is empty")
return ValidationResult(True, f"Field '{self.field_name}' is valid")
class DataTypeRule(ValidationRule):
"""Rule for data type validation"""
def __init__(self, field_name, expected_type):
super().__init__(
f"type_{field_name}",
f"Field '{field_name}' must be {expected_type.__name__}"
)
self.field_name = field_name
self.expected_type = expected_type
def validate(self, data):
"""Check if field has correct data type"""
if self.field_name not in data:
return ValidationResult(False, f"Field '{self.field_name}' is missing")
try:
self.expected_type(data[self.field_name])
return ValidationResult(True, f"Field '{self.field_name}' has correct type")
except (ValueError, TypeError):
return ValidationResult(False, f"Field '{self.field_name}' is not {self.expected_type.__name__}")
class ValidationResult:
"""Validation result"""
def __init__(self, passed, message):
self.passed = passed
self.message = message
def to_dict(self):
return {
"passed": self.passed,
"message": self.message
}
Performance Optimization
Caching Strategies
Redis Caching:
import redis
import json
import hashlib
class CSVCache:
"""CSV data caching with Redis"""
def __init__(self, redis_url="redis://localhost:6379"):
self.redis_client = redis.from_url(redis_url)
self.default_ttl = 3600 # 1 hour
def get_cache_key(self, file_id, operation, params=None):
"""Generate cache key"""
key_data = {
"file_id": file_id,
"operation": operation,
"params": params or {}
}
key_string = json.dumps(key_data, sort_keys=True)
return f"csv:{hashlib.md5(key_string.encode()).hexdigest()}"
def get(self, file_id, operation, params=None):
"""Get cached data"""
key = self.get_cache_key(file_id, operation, params)
data = self.redis_client.get(key)
if data:
return json.loads(data)
return None
def set(self, file_id, operation, data, ttl=None):
"""Set cached data"""
key = self.get_cache_key(file_id, operation)
ttl = ttl or self.default_ttl
self.redis_client.setex(
key,
ttl,
json.dumps(data)
)
def invalidate(self, file_id):
"""Invalidate all cache entries for file"""
pattern = f"csv:*{file_id}*"
keys = self.redis_client.keys(pattern)
if keys:
self.redis_client.delete(*keys)
# Usage in API
cache = CSVCache()
@app.get("/api/v1/csv/files/{file_id}/data")
async def get_csv_data(file_id: str, page: int = 1, size: int = 100):
"""Get CSV data with caching"""
# Check cache
cache_key = f"data_{file_id}_{page}_{size}"
cached_data = cache.get(file_id, "data", {"page": page, "size": size})
if cached_data:
return {"success": True, "data": cached_data, "cached": True}
# Load data
data = await load_csv_data(file_id, page, size)
# Cache result
cache.set(file_id, "data", data, ttl=1800) # 30 minutes
return {"success": True, "data": data, "cached": False}
Database Optimization
Efficient Data Storage:
from sqlalchemy import create_engine, Column, String, Integer, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import pandas as pd
Base = declarative_base()
class CSVFile(Base):
"""CSV file metadata"""
__tablename__ = "csv_files"
id = Column(String, primary_key=True)
name = Column(String, nullable=False)
size = Column(Integer, nullable=False)
rows = Column(Integer, nullable=False)
columns = Column(Integer, nullable=False)
created_at = Column(DateTime, nullable=False)
updated_at = Column(DateTime, nullable=False)
metadata = Column(Text) # JSON metadata
class CSVData(Base):
"""CSV data storage"""
__tablename__ = "csv_data"
id = Column(Integer, primary_key=True)
file_id = Column(String, nullable=False)
row_number = Column(Integer, nullable=False)
data = Column(Text, nullable=False) # JSON row data
class CSVDataManager:
"""CSV data management with database"""
def __init__(self, database_url):
self.engine = create_engine(database_url)
self.Session = sessionmaker(bind=self.engine)
Base.metadata.create_all(self.engine)
def store_csv_file(self, file_id, name, size, rows, columns, metadata=None):
"""Store CSV file metadata"""
session = self.Session()
try:
csv_file = CSVFile(
id=file_id,
name=name,
size=size,
rows=rows,
columns=columns,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
metadata=json.dumps(metadata) if metadata else None
)
session.add(csv_file)
session.commit()
finally:
session.close()
def store_csv_data(self, file_id, data):
"""Store CSV data in database"""
session = self.Session()
try:
for i, row in enumerate(data):
csv_data = CSVData(
file_id=file_id,
row_number=i,
data=json.dumps(row)
)
session.add(csv_data)
session.commit()
finally:
session.close()
def get_csv_data(self, file_id, page=1, size=100):
"""Get CSV data with pagination"""
session = self.Session()
try:
offset = (page - 1) * size
query = session.query(CSVData).filter(
CSVData.file_id == file_id
).offset(offset).limit(size)
results = query.all()
data = []
for result in results:
row_data = json.loads(result.data)
data.append(row_data)
return data
finally:
session.close()
API Rate Limiting
Rate Limiting Implementation:
from fastapi import Request, HTTPException
from fastapi.middleware.base import BaseHTTPMiddleware
import time
import redis
class RateLimitMiddleware(BaseHTTPMiddleware):
"""Rate limiting middleware"""
def __init__(self, app, redis_client, max_requests=100, window_seconds=60):
super().__init__(app)
self.redis_client = redis_client
self.max_requests = max_requests
self.window_seconds = window_seconds
async def dispatch(self, request: Request, call_next):
"""Check rate limit before processing request"""
# Get client IP
client_ip = request.client.host
# Check rate limit
if not self.check_rate_limit(client_ip):
raise HTTPException(
status_code=429,
detail="Rate limit exceeded"
)
# Process request
response = await call_next(request)
# Add rate limit headers
response.headers["X-RateLimit-Limit"] = str(self.max_requests)
response.headers["X-RateLimit-Remaining"] = str(self.get_remaining_requests(client_ip))
response.headers["X-RateLimit-Reset"] = str(int(time.time()) + self.window_seconds)
return response
def check_rate_limit(self, client_ip):
"""Check if client has exceeded rate limit"""
key = f"rate_limit:{client_ip}"
current_time = int(time.time())
window_start = current_time - self.window_seconds
# Remove old entries
self.redis_client.zremrangebyscore(key, 0, window_start)
# Count current requests
current_requests = self.redis_client.zcard(key)
if current_requests >= self.max_requests:
return False
# Add current request
self.redis_client.zadd(key, {str(current_time): current_time})
self.redis_client.expire(key, self.window_seconds)
return True
def get_remaining_requests(self, client_ip):
"""Get remaining requests for client"""
key = f"rate_limit:{client_ip}"
current_requests = self.redis_client.zcard(key)
return max(0, self.max_requests - current_requests)
# Apply rate limiting
redis_client = redis.from_url("redis://localhost:6379")
app.add_middleware(RateLimitMiddleware, redis_client=redis_client)
Security Considerations
Input Validation and Sanitization
Security Validation:
import re
import html
from typing import List, Dict, Any
class SecurityValidator:
"""Security validation for CSV data"""
def __init__(self):
self.malicious_patterns = [
r'<script.*?>.*?</script>', # Script tags
r'javascript:', # JavaScript URLs
r'data:text/html', # Data URLs
r'vbscript:', # VBScript URLs
r'on\w+\s*=', # Event handlers
]
def validate_input(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and sanitize input data"""
sanitized_data = {}
for key, value in data.items():
if isinstance(value, str):
# Check for malicious patterns
if self.is_malicious(value):
raise SecurityError(f"Malicious content detected in field '{key}'")
# Sanitize HTML
sanitized_value = html.escape(value)
sanitized_data[key] = sanitized_value
else:
sanitized_data[key] = value
return sanitized_data
def is_malicious(self, content: str) -> bool:
"""Check if content contains malicious patterns"""
content_lower = content.lower()
for pattern in self.malicious_patterns:
if re.search(pattern, content_lower, re.IGNORECASE):
return True
return False
class SecurityError(Exception):
"""Security validation error"""
pass
@app.post("/api/v1/csv/files/{file_id}/validate-security")
async def validate_security(file_id: str):
"""Validate CSV file for security issues"""
try:
# Load file data
data = await load_csv_data(file_id)
# Validate security
validator = SecurityValidator()
security_issues = []
for i, row in enumerate(data):
try:
validator.validate_input(row)
except SecurityError as e:
security_issues.append({
"row": i + 1,
"error": str(e)
})
return {
"success": True,
"data": {
"file_id": file_id,
"security_issues": security_issues,
"is_secure": len(security_issues) == 0
}
}
except Exception as e:
raise ProcessingError(
message="Security validation failed",
error_code="SECURITY_VALIDATION_ERROR",
details={"error": str(e)}
)
Authentication and Authorization
JWT Authentication:
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from datetime import datetime, timedelta
security = HTTPBearer()
class AuthManager:
"""Authentication and authorization manager"""
def __init__(self, secret_key: str):
self.secret_key = secret_key
self.algorithm = "HS256"
def create_token(self, user_id: str, permissions: List[str] = None) -> str:
"""Create JWT token"""
payload = {
"user_id": user_id,
"permissions": permissions or [],
"exp": datetime.utcnow() + timedelta(hours=24)
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def verify_token(self, token: str) -> Dict[str, Any]:
"""Verify JWT token"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired"
)
except jwt.JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
def check_permission(self, user_permissions: List[str], required_permission: str) -> bool:
"""Check if user has required permission"""
return required_permission in user_permissions
auth_manager = AuthManager("your-secret-key")
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Get current authenticated user"""
token = credentials.credentials
payload = auth_manager.verify_token(token)
return payload
def require_permission(permission: str):
"""Decorator for permission-based access control"""
def decorator(func):
async def wrapper(*args, **kwargs):
user = await get_current_user()
if not auth_manager.check_permission(user.get("permissions", []), permission):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions"
)
return await func(*args, **kwargs)
return wrapper
return decorator
# Protected endpoints
@app.post("/api/v1/csv/files")
@require_permission("csv:create")
async def create_csv_file(file: UploadFile, current_user: dict = Depends(get_current_user)):
"""Create CSV file (requires csv:create permission)"""
# Implementation
pass
@app.delete("/api/v1/csv/files/{file_id}")
@require_permission("csv:delete")
async def delete_csv_file(file_id: str, current_user: dict = Depends(get_current_user)):
"""Delete CSV file (requires csv:delete permission)"""
# Implementation
pass
Monitoring and Logging
Comprehensive Logging
Structured Logging:
import logging
import json
from datetime import datetime
from typing import Dict, Any
class CSVAPILogger:
"""Structured logger for CSV API"""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Create handler
handler = logging.StreamHandler()
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_request(self, request_id: str, method: str, url: str, user_id: str = None):
"""Log API request"""
self.logger.info(json.dumps({
"event": "api_request",
"request_id": request_id,
"method": method,
"url": url,
"user_id": user_id,
"timestamp": datetime.utcnow().isoformat()
}))
def log_response(self, request_id: str, status_code: int, response_time: float):
"""Log API response"""
self.logger.info(json.dumps({
"event": "api_response",
"request_id": request_id,
"status_code": status_code,
"response_time": response_time,
"timestamp": datetime.utcnow().isoformat()
}))
def log_error(self, request_id: str, error: str, details: Dict[str, Any] = None):
"""Log API error"""
self.logger.error(json.dumps({
"event": "api_error",
"request_id": request_id,
"error": error,
"details": details or {},
"timestamp": datetime.utcnow().isoformat()
}))
def log_processing(self, file_id: str, operation: str, status: str, details: Dict[str, Any] = None):
"""Log CSV processing operation"""
self.logger.info(json.dumps({
"event": "csv_processing",
"file_id": file_id,
"operation": operation,
"status": status,
"details": details or {},
"timestamp": datetime.utcnow().isoformat()
}))
# Usage
logger = CSVAPILogger("csv_api")
@app.middleware("http")
async def logging_middleware(request: Request, call_next):
"""Logging middleware"""
request_id = request.headers.get("X-Request-ID", "unknown")
start_time = time.time()
# Log request
logger.log_request(
request_id=request_id,
method=request.method,
url=str(request.url),
user_id=request.headers.get("X-User-ID")
)
try:
# Process request
response = await call_next(request)
# Log response
response_time = time.time() - start_time
logger.log_response(
request_id=request_id,
status_code=response.status_code,
response_time=response_time
)
return response
except Exception as e:
# Log error
logger.log_error(
request_id=request_id,
error=str(e),
details={"url": str(request.url)}
)
raise
Health Monitoring
Health Check Endpoints:
@app.get("/api/v1/csv/health")
async def health_check():
"""API health check"""
health_status = {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"version": "1.0.0",
"services": {}
}
# Check database
try:
# Test database connection
db_status = "healthy"
except Exception as e:
db_status = f"unhealthy: {str(e)}"
health_status["services"]["database"] = db_status
# Check Redis
try:
# Test Redis connection
redis_status = "healthy"
except Exception as e:
redis_status = f"unhealthy: {str(e)}"
health_status["services"]["redis"] = redis_status
# Check file storage
try:
# Test file storage
storage_status = "healthy"
except Exception as e:
storage_status = f"unhealthy: {str(e)}"
health_status["services"]["storage"] = storage_status
# Overall status
if any("unhealthy" in status for status in health_status["services"].values()):
health_status["status"] = "unhealthy"
return health_status
@app.get("/api/v1/csv/metrics")
async def get_metrics():
"""Get API metrics"""
metrics = {
"timestamp": datetime.utcnow().isoformat(),
"requests": {
"total": 1000,
"successful": 950,
"failed": 50,
"rate_per_minute": 10.5
},
"files": {
"total": 100,
"processed": 95,
"failed": 5
},
"performance": {
"average_response_time": 0.5,
"p95_response_time": 1.2,
"p99_response_time": 2.0
}
}
return metrics
Conclusion
Building a robust CSV API requires careful consideration of performance, scalability, security, and user experience. By following the design patterns and best practices outlined in this guide, you can create production-ready CSV APIs that handle large datasets efficiently while maintaining data integrity and security.
Key Takeaways:
- REST Design: Use resource-based URLs and standard HTTP methods
- Performance: Implement streaming, chunking, and caching strategies
- Error Handling: Provide comprehensive error handling and validation
- Security: Validate inputs, implement authentication, and prevent attacks
- Monitoring: Use structured logging and health checks for observability
Next Steps:
- Choose Technology Stack: Select appropriate frameworks and libraries
- Implement Core Features: Start with basic CRUD operations
- Add Advanced Features: Implement streaming, caching, and security
- Test Thoroughly: Create comprehensive test suites
- Monitor and Optimize: Use metrics and logging to improve performance
For more CSV data processing tools and guides, explore our CSV Tools Hub or try our CSV Validator for instant data validation.