Building a CSV API: Design Patterns & Best Practices (2025 Developer Guide)

Jan 19, 2025
csvapireststreaming
0

Building a robust CSV API requires careful consideration of performance, scalability, and user experience. Whether you're creating a simple CSV processing service or a complex data management platform, understanding the design patterns and best practices for CSV APIs is crucial for success.

This comprehensive guide covers everything you need to know about building CSV APIs, including REST design patterns, streaming and chunking strategies, error handling, performance optimization, and security considerations. Whether you're a backend developer, API architect, or technical lead, this guide will help you build production-ready CSV APIs.

Why CSV APIs Matter

Common Use Cases

Data Processing Services:

  • CSV file validation and cleaning
  • Format conversion (CSV to JSON, XML, etc.)
  • Data transformation and enrichment
  • Bulk data import/export operations

Business Applications:

  • E-commerce product catalog management
  • Customer data synchronization
  • Financial transaction processing
  • HR and payroll data management

Analytics and Reporting:

  • Data aggregation and analysis
  • Report generation and export
  • Real-time data streaming
  • Historical data archiving

Integration Platforms:

  • Third-party system integration
  • Data pipeline orchestration
  • Event-driven data processing
  • Microservices communication

Benefits of CSV APIs

Scalability:

  • Handle large datasets efficiently
  • Support concurrent users
  • Scale horizontally as needed
  • Optimize resource usage

Flexibility:

  • Support multiple data formats
  • Customizable processing options
  • Extensible architecture
  • Easy integration

Reliability:

  • Robust error handling
  • Data validation and integrity
  • Transaction support
  • Backup and recovery

Performance:

  • Fast processing times
  • Memory-efficient operations
  • Streaming capabilities
  • Caching strategies

REST API Design Patterns

Resource-Based Design

CSV Resource Hierarchy:

/api/v1/csv/
├── files/                    # CSV file management
│   ├── {file_id}            # Individual file operations
│   ├── {file_id}/data       # File data access
│   ├── {file_id}/metadata   # File metadata
│   └── {file_id}/schema     # File schema
├── jobs/                     # Processing jobs
│   ├── {job_id}             # Job status and results
│   └── {job_id}/logs        # Job execution logs
├── templates/                # CSV templates
│   └── {template_id}        # Template operations
└── health                    # API health check

RESTful Endpoints:

# File Management
GET    /api/v1/csv/files                    # List files
POST   /api/v1/csv/files                    # Upload file
GET    /api/v1/csv/files/{file_id}          # Get file info
PUT    /api/v1/csv/files/{file_id}          # Update file
DELETE /api/v1/csv/files/{file_id}          # Delete file

# Data Access
GET    /api/v1/csv/files/{file_id}/data     # Get file data
POST   /api/v1/csv/files/{file_id}/data     # Append data
PUT    /api/v1/csv/files/{file_id}/data     # Replace data
DELETE /api/v1/csv/files/{file_id}/data     # Clear data

# Processing Operations
POST   /api/v1/csv/files/{file_id}/validate # Validate file
POST   /api/v1/csv/files/{file_id}/convert  # Convert format
POST   /api/v1/csv/files/{file_id}/filter   # Filter data
POST   /api/v1/csv/files/{file_id}/sort     # Sort data

# Job Management
GET    /api/v1/csv/jobs                     # List jobs
POST   /api/v1/csv/jobs                     # Create job
GET    /api/v1/csv/jobs/{job_id}            # Get job status
DELETE /api/v1/csv/jobs/{job_id}            # Cancel job

Request/Response Patterns

Standard Request Format:

# File Upload Request
POST /api/v1/csv/files
Content-Type: multipart/form-data

{
    "file": <binary_data>,
    "metadata": {
        "name": "sales_data.csv",
        "description": "Monthly sales data",
        "tags": ["sales", "monthly", "2025"]
    },
    "options": {
        "delimiter": ",",
        "encoding": "utf-8",
        "has_header": true
    }
}

# Processing Request
POST /api/v1/csv/files/{file_id}/convert
Content-Type: application/json

{
    "target_format": "json",
    "options": {
        "pretty_print": true,
        "include_metadata": true
    }
}

Standard Response Format:

# Success Response
{
    "success": true,
    "data": {
        "file_id": "uuid-1234",
        "name": "sales_data.csv",
        "size": 1024000,
        "rows": 1000,
        "columns": 5,
        "created_at": "2025-01-19T10:00:00Z",
        "status": "processed"
    },
    "metadata": {
        "processing_time": "2.5s",
        "api_version": "v1"
    }
}

# Error Response
{
    "success": false,
    "error": {
        "code": "VALIDATION_ERROR",
        "message": "Invalid CSV format",
        "details": {
            "line": 15,
            "column": 3,
            "issue": "Missing closing quote"
        }
    },
    "metadata": {
        "request_id": "req-1234",
        "timestamp": "2025-01-19T10:00:00Z"
    }
}

Streaming and Chunking Strategies

Streaming Architecture

Streaming CSV Processing:

from fastapi import FastAPI, UploadFile, Response
from fastapi.responses import StreamingResponse
import csv
import io
import asyncio

app = FastAPI()

@app.post("/api/v1/csv/files/{file_id}/stream")
async def stream_csv_data(file_id: str, chunk_size: int = 1000):
    """Stream CSV data in chunks"""
    
    async def generate_chunks():
        # Open file stream
        with open(f"files/{file_id}.csv", "r") as file:
            reader = csv.DictReader(file)
            
            # Process in chunks
            chunk = []
            for row in reader:
                chunk.append(row)
                
                if len(chunk) >= chunk_size:
                    yield f"data: {json.dumps(chunk)}\n\n"
                    chunk = []
                    await asyncio.sleep(0.001)  # Yield control
            
            # Yield remaining data
            if chunk:
                yield f"data: {json.dumps(chunk)}\n\n"
    
    return StreamingResponse(
        generate_chunks(),
        media_type="text/plain",
        headers={"Cache-Control": "no-cache"}
    )

Chunked Processing:

@app.post("/api/v1/csv/files/{file_id}/process")
async def process_csv_chunks(file_id: str, chunk_size: int = 1000):
    """Process CSV file in chunks"""
    
    results = []
    chunk_count = 0
    
    with open(f"files/{file_id}.csv", "r") as file:
        reader = csv.DictReader(file)
        
        chunk = []
        for row in reader:
            chunk.append(row)
            
            if len(chunk) >= chunk_size:
                # Process chunk
                processed_chunk = await process_chunk(chunk)
                results.extend(processed_chunk)
                
                chunk_count += 1
                chunk = []
                
                # Log progress
                print(f"Processed chunk {chunk_count}")
        
        # Process remaining data
        if chunk:
            processed_chunk = await process_chunk(chunk)
            results.extend(processed_chunk)
    
    return {
        "success": True,
        "data": {
            "total_chunks": chunk_count + 1,
            "total_rows": len(results),
            "results": results
        }
    }

async def process_chunk(chunk):
    """Process a single chunk of data"""
    # Simulate processing
    await asyncio.sleep(0.1)
    
    # Apply transformations
    processed = []
    for row in chunk:
        # Example: Clean data
        cleaned_row = {
            key: value.strip() if isinstance(value, str) else value
            for key, value in row.items()
        }
        processed.append(cleaned_row)
    
    return processed

Memory Management

Memory-Efficient Processing:

import gc
import psutil
import os

class MemoryManager:
    """Memory management for CSV processing"""
    
    def __init__(self, max_memory_mb=500):
        self.max_memory_mb = max_memory_mb
        self.process = psutil.Process(os.getpid())
    
    def check_memory_usage(self):
        """Check current memory usage"""
        memory_info = self.process.memory_info()
        memory_mb = memory_info.rss / 1024 / 1024
        return memory_mb
    
    def should_garbage_collect(self):
        """Check if garbage collection is needed"""
        return self.check_memory_usage() > self.max_memory_mb
    
    def cleanup_memory(self):
        """Force garbage collection"""
        gc.collect()
        print(f"Memory usage after cleanup: {self.check_memory_usage():.2f} MB")

@app.post("/api/v1/csv/files/{file_id}/process-memory-safe")
async def process_csv_memory_safe(file_id: str):
    """Process CSV with memory management"""
    
    memory_manager = MemoryManager()
    results = []
    
    with open(f"files/{file_id}.csv", "r") as file:
        reader = csv.DictReader(file)
        
        for row in reader:
            # Process row
            processed_row = await process_row(row)
            results.append(processed_row)
            
            # Check memory usage
            if memory_manager.should_garbage_collect():
                memory_manager.cleanup_memory()
    
    return {"success": True, "data": results}

Error Handling and Validation

Comprehensive Error Handling

Error Handling Middleware:

from fastapi import HTTPException, Request
from fastapi.responses import JSONResponse
import logging
import traceback

logger = logging.getLogger(__name__)

class CSVAPIError(Exception):
    """Base exception for CSV API errors"""
    def __init__(self, message: str, error_code: str, details: dict = None):
        self.message = message
        self.error_code = error_code
        self.details = details or {}
        super().__init__(self.message)

class ValidationError(CSVAPIError):
    """CSV validation error"""
    pass

class ProcessingError(CSVAPIError):
    """CSV processing error"""
    pass

class FileNotFoundError(CSVAPIError):
    """File not found error"""
    pass

@app.exception_handler(CSVAPIError)
async def csv_api_error_handler(request: Request, exc: CSVAPIError):
    """Handle CSV API errors"""
    return JSONResponse(
        status_code=400,
        content={
            "success": False,
            "error": {
                "code": exc.error_code,
                "message": exc.message,
                "details": exc.details
            },
            "metadata": {
                "request_id": request.headers.get("X-Request-ID"),
                "timestamp": datetime.utcnow().isoformat()
            }
        }
    )

@app.exception_handler(Exception)
async def general_error_handler(request: Request, exc: Exception):
    """Handle general errors"""
    logger.error(f"Unexpected error: {str(exc)}")
    logger.error(traceback.format_exc())
    
    return JSONResponse(
        status_code=500,
        content={
            "success": False,
            "error": {
                "code": "INTERNAL_ERROR",
                "message": "An unexpected error occurred",
                "details": {}
            },
            "metadata": {
                "request_id": request.headers.get("X-Request-ID"),
                "timestamp": datetime.utcnow().isoformat()
            }
        }
    )

CSV Validation:

import csv
import io

class CSVValidator:
    """CSV validation utility"""
    
    def __init__(self):
        self.errors = []
        self.warnings = []
    
    def validate_file(self, file_content: bytes, options: dict = None):
        """Validate CSV file content"""
        self.errors = []
        self.warnings = []
        
        try:
            # Decode content
            content = file_content.decode('utf-8')
        except UnicodeDecodeError:
            self.errors.append({
                "type": "ENCODING_ERROR",
                "message": "Invalid UTF-8 encoding",
                "line": 0
            })
            return False
        
        # Parse CSV
        try:
            reader = csv.reader(io.StringIO(content))
            rows = list(reader)
        except csv.Error as e:
            self.errors.append({
                "type": "PARSING_ERROR",
                "message": str(e),
                "line": 0
            })
            return False
        
        # Validate structure
        if not rows:
            self.errors.append({
                "type": "EMPTY_FILE",
                "message": "File is empty",
                "line": 0
            })
            return False
        
        # Check header
        header = rows[0]
        if not header or all(not cell.strip() for cell in header):
            self.errors.append({
                "type": "MISSING_HEADER",
                "message": "File has no header row",
                "line": 1
            })
            return False
        
        # Validate data rows
        for i, row in enumerate(rows[1:], start=2):
            if len(row) != len(header):
                self.errors.append({
                    "type": "FIELD_COUNT_MISMATCH",
                    "message": f"Row has {len(row)} fields, expected {len(header)}",
                    "line": i
                })
            
            # Check for empty rows
            if all(not cell.strip() for cell in row):
                self.warnings.append({
                    "type": "EMPTY_ROW",
                    "message": "Empty row detected",
                    "line": i
                })
        
        return len(self.errors) == 0
    
    def get_validation_report(self):
        """Get validation report"""
        return {
            "valid": len(self.errors) == 0,
            "errors": self.errors,
            "warnings": self.warnings,
            "error_count": len(self.errors),
            "warning_count": len(self.warnings)
        }

@app.post("/api/v1/csv/files/{file_id}/validate")
async def validate_csv_file(file_id: str):
    """Validate CSV file"""
    
    try:
        # Read file
        with open(f"files/{file_id}.csv", "rb") as file:
            content = file.read()
        
        # Validate
        validator = CSVValidator()
        is_valid = validator.validate_file(content)
        
        return {
            "success": True,
            "data": {
                "file_id": file_id,
                "validation": validator.get_validation_report()
            }
        }
    
    except FileNotFoundError:
        raise FileNotFoundError(
            message="File not found",
            error_code="FILE_NOT_FOUND",
            details={"file_id": file_id}
        )
    except Exception as e:
        raise ProcessingError(
            message="Validation failed",
            error_code="VALIDATION_ERROR",
            details={"error": str(e)}
        )

Data Quality Validation

Advanced Data Validation:

class DataQualityValidator:
    """Advanced data quality validation"""
    
    def __init__(self):
        self.rules = []
        self.results = []
    
    def add_rule(self, rule):
        """Add validation rule"""
        self.rules.append(rule)
    
    def validate_data(self, data):
        """Validate data against rules"""
        self.results = []
        
        for rule in self.rules:
            result = rule.validate(data)
            self.results.append(result)
        
        return all(result.passed for result in self.results)
    
    def get_quality_report(self):
        """Get data quality report"""
        return {
            "total_rules": len(self.rules),
            "passed_rules": sum(1 for r in self.results if r.passed),
            "failed_rules": sum(1 for r in self.results if not r.passed),
            "results": [r.to_dict() for r in self.results]
        }

class ValidationRule:
    """Base validation rule"""
    
    def __init__(self, name, description):
        self.name = name
        self.description = description
    
    def validate(self, data):
        """Validate data - to be implemented by subclasses"""
        raise NotImplementedError

class RequiredFieldRule(ValidationRule):
    """Rule for required fields"""
    
    def __init__(self, field_name):
        super().__init__(
            f"required_{field_name}",
            f"Field '{field_name}' is required"
        )
        self.field_name = field_name
    
    def validate(self, data):
        """Check if required field is present and not empty"""
        if self.field_name not in data:
            return ValidationResult(False, f"Field '{self.field_name}' is missing")
        
        if not data[self.field_name] or not str(data[self.field_name]).strip():
            return ValidationResult(False, f"Field '{self.field_name}' is empty")
        
        return ValidationResult(True, f"Field '{self.field_name}' is valid")

class DataTypeRule(ValidationRule):
    """Rule for data type validation"""
    
    def __init__(self, field_name, expected_type):
        super().__init__(
            f"type_{field_name}",
            f"Field '{field_name}' must be {expected_type.__name__}"
        )
        self.field_name = field_name
        self.expected_type = expected_type
    
    def validate(self, data):
        """Check if field has correct data type"""
        if self.field_name not in data:
            return ValidationResult(False, f"Field '{self.field_name}' is missing")
        
        try:
            self.expected_type(data[self.field_name])
            return ValidationResult(True, f"Field '{self.field_name}' has correct type")
        except (ValueError, TypeError):
            return ValidationResult(False, f"Field '{self.field_name}' is not {self.expected_type.__name__}")

class ValidationResult:
    """Validation result"""
    
    def __init__(self, passed, message):
        self.passed = passed
        self.message = message
    
    def to_dict(self):
        return {
            "passed": self.passed,
            "message": self.message
        }

Performance Optimization

Caching Strategies

Redis Caching:

import redis
import json
import hashlib

class CSVCache:
    """CSV data caching with Redis"""
    
    def __init__(self, redis_url="redis://localhost:6379"):
        self.redis_client = redis.from_url(redis_url)
        self.default_ttl = 3600  # 1 hour
    
    def get_cache_key(self, file_id, operation, params=None):
        """Generate cache key"""
        key_data = {
            "file_id": file_id,
            "operation": operation,
            "params": params or {}
        }
        key_string = json.dumps(key_data, sort_keys=True)
        return f"csv:{hashlib.md5(key_string.encode()).hexdigest()}"
    
    def get(self, file_id, operation, params=None):
        """Get cached data"""
        key = self.get_cache_key(file_id, operation, params)
        data = self.redis_client.get(key)
        
        if data:
            return json.loads(data)
        return None
    
    def set(self, file_id, operation, data, ttl=None):
        """Set cached data"""
        key = self.get_cache_key(file_id, operation)
        ttl = ttl or self.default_ttl
        
        self.redis_client.setex(
            key,
            ttl,
            json.dumps(data)
        )
    
    def invalidate(self, file_id):
        """Invalidate all cache entries for file"""
        pattern = f"csv:*{file_id}*"
        keys = self.redis_client.keys(pattern)
        
        if keys:
            self.redis_client.delete(*keys)

# Usage in API
cache = CSVCache()

@app.get("/api/v1/csv/files/{file_id}/data")
async def get_csv_data(file_id: str, page: int = 1, size: int = 100):
    """Get CSV data with caching"""
    
    # Check cache
    cache_key = f"data_{file_id}_{page}_{size}"
    cached_data = cache.get(file_id, "data", {"page": page, "size": size})
    
    if cached_data:
        return {"success": True, "data": cached_data, "cached": True}
    
    # Load data
    data = await load_csv_data(file_id, page, size)
    
    # Cache result
    cache.set(file_id, "data", data, ttl=1800)  # 30 minutes
    
    return {"success": True, "data": data, "cached": False}

Database Optimization

Efficient Data Storage:

from sqlalchemy import create_engine, Column, String, Integer, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import pandas as pd

Base = declarative_base()

class CSVFile(Base):
    """CSV file metadata"""
    __tablename__ = "csv_files"
    
    id = Column(String, primary_key=True)
    name = Column(String, nullable=False)
    size = Column(Integer, nullable=False)
    rows = Column(Integer, nullable=False)
    columns = Column(Integer, nullable=False)
    created_at = Column(DateTime, nullable=False)
    updated_at = Column(DateTime, nullable=False)
    metadata = Column(Text)  # JSON metadata

class CSVData(Base):
    """CSV data storage"""
    __tablename__ = "csv_data"
    
    id = Column(Integer, primary_key=True)
    file_id = Column(String, nullable=False)
    row_number = Column(Integer, nullable=False)
    data = Column(Text, nullable=False)  # JSON row data

class CSVDataManager:
    """CSV data management with database"""
    
    def __init__(self, database_url):
        self.engine = create_engine(database_url)
        self.Session = sessionmaker(bind=self.engine)
        Base.metadata.create_all(self.engine)
    
    def store_csv_file(self, file_id, name, size, rows, columns, metadata=None):
        """Store CSV file metadata"""
        session = self.Session()
        
        try:
            csv_file = CSVFile(
                id=file_id,
                name=name,
                size=size,
                rows=rows,
                columns=columns,
                created_at=datetime.utcnow(),
                updated_at=datetime.utcnow(),
                metadata=json.dumps(metadata) if metadata else None
            )
            
            session.add(csv_file)
            session.commit()
            
        finally:
            session.close()
    
    def store_csv_data(self, file_id, data):
        """Store CSV data in database"""
        session = self.Session()
        
        try:
            for i, row in enumerate(data):
                csv_data = CSVData(
                    file_id=file_id,
                    row_number=i,
                    data=json.dumps(row)
                )
                session.add(csv_data)
            
            session.commit()
            
        finally:
            session.close()
    
    def get_csv_data(self, file_id, page=1, size=100):
        """Get CSV data with pagination"""
        session = self.Session()
        
        try:
            offset = (page - 1) * size
            
            query = session.query(CSVData).filter(
                CSVData.file_id == file_id
            ).offset(offset).limit(size)
            
            results = query.all()
            
            data = []
            for result in results:
                row_data = json.loads(result.data)
                data.append(row_data)
            
            return data
            
        finally:
            session.close()

API Rate Limiting

Rate Limiting Implementation:

from fastapi import Request, HTTPException
from fastapi.middleware.base import BaseHTTPMiddleware
import time
import redis

class RateLimitMiddleware(BaseHTTPMiddleware):
    """Rate limiting middleware"""
    
    def __init__(self, app, redis_client, max_requests=100, window_seconds=60):
        super().__init__(app)
        self.redis_client = redis_client
        self.max_requests = max_requests
        self.window_seconds = window_seconds
    
    async def dispatch(self, request: Request, call_next):
        """Check rate limit before processing request"""
        
        # Get client IP
        client_ip = request.client.host
        
        # Check rate limit
        if not self.check_rate_limit(client_ip):
            raise HTTPException(
                status_code=429,
                detail="Rate limit exceeded"
            )
        
        # Process request
        response = await call_next(request)
        
        # Add rate limit headers
        response.headers["X-RateLimit-Limit"] = str(self.max_requests)
        response.headers["X-RateLimit-Remaining"] = str(self.get_remaining_requests(client_ip))
        response.headers["X-RateLimit-Reset"] = str(int(time.time()) + self.window_seconds)
        
        return response
    
    def check_rate_limit(self, client_ip):
        """Check if client has exceeded rate limit"""
        key = f"rate_limit:{client_ip}"
        current_time = int(time.time())
        window_start = current_time - self.window_seconds
        
        # Remove old entries
        self.redis_client.zremrangebyscore(key, 0, window_start)
        
        # Count current requests
        current_requests = self.redis_client.zcard(key)
        
        if current_requests >= self.max_requests:
            return False
        
        # Add current request
        self.redis_client.zadd(key, {str(current_time): current_time})
        self.redis_client.expire(key, self.window_seconds)
        
        return True
    
    def get_remaining_requests(self, client_ip):
        """Get remaining requests for client"""
        key = f"rate_limit:{client_ip}"
        current_requests = self.redis_client.zcard(key)
        return max(0, self.max_requests - current_requests)

# Apply rate limiting
redis_client = redis.from_url("redis://localhost:6379")
app.add_middleware(RateLimitMiddleware, redis_client=redis_client)

Security Considerations

Input Validation and Sanitization

Security Validation:

import re
import html
from typing import List, Dict, Any

class SecurityValidator:
    """Security validation for CSV data"""
    
    def __init__(self):
        self.malicious_patterns = [
            r'<script.*?>.*?</script>',  # Script tags
            r'javascript:',              # JavaScript URLs
            r'data:text/html',           # Data URLs
            r'vbscript:',                # VBScript URLs
            r'on\w+\s*=',                # Event handlers
        ]
    
    def validate_input(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Validate and sanitize input data"""
        sanitized_data = {}
        
        for key, value in data.items():
            if isinstance(value, str):
                # Check for malicious patterns
                if self.is_malicious(value):
                    raise SecurityError(f"Malicious content detected in field '{key}'")
                
                # Sanitize HTML
                sanitized_value = html.escape(value)
                sanitized_data[key] = sanitized_value
            else:
                sanitized_data[key] = value
        
        return sanitized_data
    
    def is_malicious(self, content: str) -> bool:
        """Check if content contains malicious patterns"""
        content_lower = content.lower()
        
        for pattern in self.malicious_patterns:
            if re.search(pattern, content_lower, re.IGNORECASE):
                return True
        
        return False

class SecurityError(Exception):
    """Security validation error"""
    pass

@app.post("/api/v1/csv/files/{file_id}/validate-security")
async def validate_security(file_id: str):
    """Validate CSV file for security issues"""
    
    try:
        # Load file data
        data = await load_csv_data(file_id)
        
        # Validate security
        validator = SecurityValidator()
        security_issues = []
        
        for i, row in enumerate(data):
            try:
                validator.validate_input(row)
            except SecurityError as e:
                security_issues.append({
                    "row": i + 1,
                    "error": str(e)
                })
        
        return {
            "success": True,
            "data": {
                "file_id": file_id,
                "security_issues": security_issues,
                "is_secure": len(security_issues) == 0
            }
        }
    
    except Exception as e:
        raise ProcessingError(
            message="Security validation failed",
            error_code="SECURITY_VALIDATION_ERROR",
            details={"error": str(e)}
        )

Authentication and Authorization

JWT Authentication:

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from datetime import datetime, timedelta

security = HTTPBearer()

class AuthManager:
    """Authentication and authorization manager"""
    
    def __init__(self, secret_key: str):
        self.secret_key = secret_key
        self.algorithm = "HS256"
    
    def create_token(self, user_id: str, permissions: List[str] = None) -> str:
        """Create JWT token"""
        payload = {
            "user_id": user_id,
            "permissions": permissions or [],
            "exp": datetime.utcnow() + timedelta(hours=24)
        }
        
        return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
    
    def verify_token(self, token: str) -> Dict[str, Any]:
        """Verify JWT token"""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
            return payload
        except jwt.ExpiredSignatureError:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Token has expired"
            )
        except jwt.JWTError:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid token"
            )
    
    def check_permission(self, user_permissions: List[str], required_permission: str) -> bool:
        """Check if user has required permission"""
        return required_permission in user_permissions

auth_manager = AuthManager("your-secret-key")

async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
    """Get current authenticated user"""
    token = credentials.credentials
    payload = auth_manager.verify_token(token)
    return payload

def require_permission(permission: str):
    """Decorator for permission-based access control"""
    def decorator(func):
        async def wrapper(*args, **kwargs):
            user = await get_current_user()
            
            if not auth_manager.check_permission(user.get("permissions", []), permission):
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail="Insufficient permissions"
                )
            
            return await func(*args, **kwargs)
        return wrapper
    return decorator

# Protected endpoints
@app.post("/api/v1/csv/files")
@require_permission("csv:create")
async def create_csv_file(file: UploadFile, current_user: dict = Depends(get_current_user)):
    """Create CSV file (requires csv:create permission)"""
    # Implementation
    pass

@app.delete("/api/v1/csv/files/{file_id}")
@require_permission("csv:delete")
async def delete_csv_file(file_id: str, current_user: dict = Depends(get_current_user)):
    """Delete CSV file (requires csv:delete permission)"""
    # Implementation
    pass

Monitoring and Logging

Comprehensive Logging

Structured Logging:

import logging
import json
from datetime import datetime
from typing import Dict, Any

class CSVAPILogger:
    """Structured logger for CSV API"""
    
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        # Create formatter
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        
        # Create handler
        handler = logging.StreamHandler()
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
    
    def log_request(self, request_id: str, method: str, url: str, user_id: str = None):
        """Log API request"""
        self.logger.info(json.dumps({
            "event": "api_request",
            "request_id": request_id,
            "method": method,
            "url": url,
            "user_id": user_id,
            "timestamp": datetime.utcnow().isoformat()
        }))
    
    def log_response(self, request_id: str, status_code: int, response_time: float):
        """Log API response"""
        self.logger.info(json.dumps({
            "event": "api_response",
            "request_id": request_id,
            "status_code": status_code,
            "response_time": response_time,
            "timestamp": datetime.utcnow().isoformat()
        }))
    
    def log_error(self, request_id: str, error: str, details: Dict[str, Any] = None):
        """Log API error"""
        self.logger.error(json.dumps({
            "event": "api_error",
            "request_id": request_id,
            "error": error,
            "details": details or {},
            "timestamp": datetime.utcnow().isoformat()
        }))
    
    def log_processing(self, file_id: str, operation: str, status: str, details: Dict[str, Any] = None):
        """Log CSV processing operation"""
        self.logger.info(json.dumps({
            "event": "csv_processing",
            "file_id": file_id,
            "operation": operation,
            "status": status,
            "details": details or {},
            "timestamp": datetime.utcnow().isoformat()
        }))

# Usage
logger = CSVAPILogger("csv_api")

@app.middleware("http")
async def logging_middleware(request: Request, call_next):
    """Logging middleware"""
    request_id = request.headers.get("X-Request-ID", "unknown")
    start_time = time.time()
    
    # Log request
    logger.log_request(
        request_id=request_id,
        method=request.method,
        url=str(request.url),
        user_id=request.headers.get("X-User-ID")
    )
    
    try:
        # Process request
        response = await call_next(request)
        
        # Log response
        response_time = time.time() - start_time
        logger.log_response(
            request_id=request_id,
            status_code=response.status_code,
            response_time=response_time
        )
        
        return response
    
    except Exception as e:
        # Log error
        logger.log_error(
            request_id=request_id,
            error=str(e),
            details={"url": str(request.url)}
        )
        raise

Health Monitoring

Health Check Endpoints:

@app.get("/api/v1/csv/health")
async def health_check():
    """API health check"""
    
    health_status = {
        "status": "healthy",
        "timestamp": datetime.utcnow().isoformat(),
        "version": "1.0.0",
        "services": {}
    }
    
    # Check database
    try:
        # Test database connection
        db_status = "healthy"
    except Exception as e:
        db_status = f"unhealthy: {str(e)}"
    
    health_status["services"]["database"] = db_status
    
    # Check Redis
    try:
        # Test Redis connection
        redis_status = "healthy"
    except Exception as e:
        redis_status = f"unhealthy: {str(e)}"
    
    health_status["services"]["redis"] = redis_status
    
    # Check file storage
    try:
        # Test file storage
        storage_status = "healthy"
    except Exception as e:
        storage_status = f"unhealthy: {str(e)}"
    
    health_status["services"]["storage"] = storage_status
    
    # Overall status
    if any("unhealthy" in status for status in health_status["services"].values()):
        health_status["status"] = "unhealthy"
    
    return health_status

@app.get("/api/v1/csv/metrics")
async def get_metrics():
    """Get API metrics"""
    
    metrics = {
        "timestamp": datetime.utcnow().isoformat(),
        "requests": {
            "total": 1000,
            "successful": 950,
            "failed": 50,
            "rate_per_minute": 10.5
        },
        "files": {
            "total": 100,
            "processed": 95,
            "failed": 5
        },
        "performance": {
            "average_response_time": 0.5,
            "p95_response_time": 1.2,
            "p99_response_time": 2.0
        }
    }
    
    return metrics

Conclusion

Building a robust CSV API requires careful consideration of performance, scalability, security, and user experience. By following the design patterns and best practices outlined in this guide, you can create production-ready CSV APIs that handle large datasets efficiently while maintaining data integrity and security.

Key Takeaways:

  1. REST Design: Use resource-based URLs and standard HTTP methods
  2. Performance: Implement streaming, chunking, and caching strategies
  3. Error Handling: Provide comprehensive error handling and validation
  4. Security: Validate inputs, implement authentication, and prevent attacks
  5. Monitoring: Use structured logging and health checks for observability

Next Steps:

  1. Choose Technology Stack: Select appropriate frameworks and libraries
  2. Implement Core Features: Start with basic CRUD operations
  3. Add Advanced Features: Implement streaming, caching, and security
  4. Test Thoroughly: Create comprehensive test suites
  5. Monitor and Optimize: Use metrics and logging to improve performance

For more CSV data processing tools and guides, explore our CSV Tools Hub or try our CSV Validator for instant data validation.

Related posts