CSV Performance Optimization: Speed & Memory (2025 Developer Guide)

Jan 19, 2025
csvperformanceoptimizationmemory
0

CSV performance optimization is crucial for handling large datasets efficiently. Whether you're processing millions of rows, building real-time data pipelines, or optimizing memory usage, understanding the techniques and strategies for CSV performance optimization can significantly improve your application's speed and scalability.

This comprehensive guide covers everything you need to know about CSV performance optimization, including streaming vs loading strategies, memory management techniques, indexing approaches, compression methods, and advanced optimization patterns. Whether you're a data engineer, backend developer, or performance specialist, this guide will help you optimize your CSV processing workflows.

Why CSV Performance Matters

Common Performance Challenges

Memory Limitations:

  • Large files exceeding available RAM
  • Memory leaks during processing
  • Inefficient data structures
  • Garbage collection overhead

Processing Speed:

  • Slow file I/O operations
  • Inefficient parsing algorithms
  • CPU-intensive operations
  • Network latency for remote files

Scalability Issues:

  • Single-threaded processing
  • Blocking operations
  • Resource contention
  • Poor concurrency handling

Storage Optimization:

  • Large file sizes
  • Redundant data storage
  • Inefficient compression
  • Poor data organization

Performance Impact Factors

File Size:

  • Small files (< 1MB): Minimal impact
  • Medium files (1-100MB): Moderate impact
  • Large files (100MB-1GB): Significant impact
  • Very large files (> 1GB): Critical impact

Data Characteristics:

  • Number of columns
  • Data types and complexity
  • Encoding and character sets
  • Compression ratio

System Resources:

  • Available RAM
  • CPU cores and speed
  • Storage type (SSD vs HDD)
  • Network bandwidth

Streaming vs Loading Strategies

Streaming Processing

Benefits of Streaming:

  • Constant memory usage
  • Handles files larger than RAM
  • Real-time processing
  • Better resource utilization

Streaming Implementation:

import csv
import io
from typing import Iterator, Dict, Any

class CSVStreamProcessor:
    """Streaming CSV processor for large files"""
    
    def __init__(self, file_path: str, chunk_size: int = 1000):
        self.file_path = file_path
        self.chunk_size = chunk_size
        self.processed_rows = 0
        self.memory_usage = 0
    
    def stream_processing(self) -> Iterator[Dict[str, Any]]:
        """Stream process CSV file"""
        with open(self.file_path, 'r', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            
            chunk = []
            for row in reader:
                chunk.append(row)
                
                if len(chunk) >= self.chunk_size:
                    yield from self.process_chunk(chunk)
                    chunk = []
                    
                    # Memory monitoring
                    self.memory_usage = self.get_memory_usage()
                    print(f"Processed {self.processed_rows} rows, Memory: {self.memory_usage:.2f} MB")
            
            # Process remaining data
            if chunk:
                yield from self.process_chunk(chunk)
    
    def process_chunk(self, chunk: List[Dict[str, Any]]) -> Iterator[Dict[str, Any]]:
        """Process a chunk of data"""
        for row in chunk:
            # Apply transformations
            processed_row = self.transform_row(row)
            self.processed_rows += 1
            yield processed_row
    
    def transform_row(self, row: Dict[str, Any]) -> Dict[str, Any]:
        """Transform individual row"""
        # Example: Clean and validate data
        cleaned_row = {}
        for key, value in row.items():
            if isinstance(value, str):
                cleaned_row[key] = value.strip()
            else:
                cleaned_row[key] = value
        
        return cleaned_row
    
    def get_memory_usage(self) -> float:
        """Get current memory usage in MB"""
        import psutil
        import os
        process = psutil.Process(os.getpid())
        return process.memory_info().rss / 1024 / 1024

# Usage
processor = CSVStreamProcessor('large_file.csv', chunk_size=5000)
for row in processor.stream_processing():
    # Process each row
    pass

Advanced Streaming with Generators:

import csv
import gzip
from typing import Iterator, Dict, Any, Optional

class AdvancedCSVStreamer:
    """Advanced CSV streaming with compression support"""
    
    def __init__(self, file_path: str, compression: Optional[str] = None):
        self.file_path = file_path
        self.compression = compression
        self.file_handle = None
        self.reader = None
    
    def __enter__(self):
        """Context manager entry"""
        if self.compression == 'gzip':
            self.file_handle = gzip.open(self.file_path, 'rt', encoding='utf-8')
        else:
            self.file_handle = open(self.file_path, 'r', encoding='utf-8')
        
        self.reader = csv.DictReader(self.file_handle)
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit"""
        if self.file_handle:
            self.file_handle.close()
    
    def stream_rows(self, filter_func: Optional[callable] = None) -> Iterator[Dict[str, Any]]:
        """Stream rows with optional filtering"""
        for row in self.reader:
            if filter_func is None or filter_func(row):
                yield row
    
    def stream_chunks(self, chunk_size: int = 1000) -> Iterator[List[Dict[str, Any]]]:
        """Stream data in chunks"""
        chunk = []
        for row in self.stream_rows():
            chunk.append(row)
            
            if len(chunk) >= chunk_size:
                yield chunk
                chunk = []
        
        if chunk:
            yield chunk

# Usage with compression
with AdvancedCSVStreamer('data.csv.gz', compression='gzip') as streamer:
    for chunk in streamer.stream_chunks(chunk_size=1000):
        # Process chunk
        process_chunk(chunk)

Batch Loading

Benefits of Batch Loading:

  • Faster processing for small files
  • Better for complex operations
  • Easier to implement
  • Better for data analysis

Batch Loading Implementation:

import pandas as pd
import numpy as np
from typing import List, Dict, Any

class CSVBatchProcessor:
    """Batch CSV processor for smaller files"""
    
    def __init__(self, file_path: str, max_memory_mb: int = 500):
        self.file_path = file_path
        self.max_memory_mb = max_memory_mb
        self.df = None
    
    def load_data(self) -> pd.DataFrame:
        """Load CSV data into memory"""
        # Check file size
        file_size_mb = self.get_file_size_mb()
        
        if file_size_mb > self.max_memory_mb:
            raise MemoryError(f"File size ({file_size_mb:.2f} MB) exceeds memory limit ({self.max_memory_mb} MB)")
        
        # Load with optimization
        self.df = pd.read_csv(
            self.file_path,
            low_memory=False,
            dtype=self.optimize_dtypes(),
            parse_dates=True,
            infer_datetime_format=True
        )
        
        return self.df
    
    def optimize_dtypes(self) -> Dict[str, str]:
        """Optimize data types for memory efficiency"""
        # Sample first few rows to infer types
        sample_df = pd.read_csv(self.file_path, nrows=1000)
        
        dtype_map = {}
        for column in sample_df.columns:
            if sample_df[column].dtype == 'object':
                # Check if it's actually numeric
                try:
                    pd.to_numeric(sample_df[column], errors='raise')
                    dtype_map[column] = 'float32'
                except:
                    # Check if it's actually boolean
                    if sample_df[column].nunique() <= 2:
                        dtype_map[column] = 'category'
                    else:
                        dtype_map[column] = 'string'
            elif sample_df[column].dtype == 'int64':
                dtype_map[column] = 'int32'
            elif sample_df[column].dtype == 'float64':
                dtype_map[column] = 'float32'
        
        return dtype_map
    
    def get_file_size_mb(self) -> float:
        """Get file size in MB"""
        import os
        return os.path.getsize(self.file_path) / 1024 / 1024
    
    def process_data(self) -> pd.DataFrame:
        """Process loaded data"""
        if self.df is None:
            self.load_data()
        
        # Apply optimizations
        self.df = self.optimize_memory_usage(self.df)
        
        # Apply transformations
        self.df = self.apply_transformations(self.df)
        
        return self.df
    
    def optimize_memory_usage(self, df: pd.DataFrame) -> pd.DataFrame:
        """Optimize DataFrame memory usage"""
        # Convert object columns to category if appropriate
        for col in df.select_dtypes(include=['object']).columns:
            if df[col].nunique() / len(df) < 0.5:
                df[col] = df[col].astype('category')
        
        # Convert numeric columns to appropriate types
        for col in df.select_dtypes(include=['int64']).columns:
            if df[col].min() >= 0:
                if df[col].max() < 255:
                    df[col] = df[col].astype('uint8')
                elif df[col].max() < 65535:
                    df[col] = df[col].astype('uint16')
                elif df[col].max() < 4294967295:
                    df[col] = df[col].astype('uint32')
            else:
                if df[col].min() > -128 and df[col].max() < 127:
                    df[col] = df[col].astype('int8')
                elif df[col].min() > -32768 and df[col].max() < 32767:
                    df[col] = df[col].astype('int16')
                elif df[col].min() > -2147483648 and df[col].max() < 2147483647:
                    df[col] = df[col].astype('int32')
        
        return df
    
    def apply_transformations(self, df: pd.DataFrame) -> pd.DataFrame:
        """Apply data transformations"""
        # Example transformations
        for col in df.select_dtypes(include=['object']).columns:
            df[col] = df[col].str.strip()
        
        return df

Memory Management Techniques

Memory-Efficient Data Structures

Optimized Data Types:

import pandas as pd
import numpy as np
from typing import Dict, Any

class MemoryOptimizer:
    """Memory optimization utilities for CSV processing"""
    
    @staticmethod
    def optimize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
        """Optimize DataFrame memory usage"""
        original_memory = df.memory_usage(deep=True).sum() / 1024 / 1024
        
        # Optimize numeric columns
        for col in df.select_dtypes(include=[np.number]).columns:
            df[col] = MemoryOptimizer.optimize_numeric_column(df[col])
        
        # Optimize object columns
        for col in df.select_dtypes(include=['object']).columns:
            df[col] = MemoryOptimizer.optimize_object_column(df[col])
        
        # Optimize datetime columns
        for col in df.select_dtypes(include=['datetime64']).columns:
            df[col] = MemoryOptimizer.optimize_datetime_column(df[col])
        
        optimized_memory = df.memory_usage(deep=True).sum() / 1024 / 1024
        reduction = (original_memory - optimized_memory) / original_memory * 100
        
        print(f"Memory optimization: {original_memory:.2f} MB -> {optimized_memory:.2f} MB ({reduction:.1f}% reduction)")
        
        return df
    
    @staticmethod
    def optimize_numeric_column(series: pd.Series) -> pd.Series:
        """Optimize numeric column memory usage"""
        if series.dtype == 'int64':
            if series.min() >= 0:
                if series.max() < 255:
                    return series.astype('uint8')
                elif series.max() < 65535:
                    return series.astype('uint16')
                elif series.max() < 4294967295:
                    return series.astype('uint32')
            else:
                if series.min() > -128 and series.max() < 127:
                    return series.astype('int8')
                elif series.min() > -32768 and series.max() < 32767:
                    return series.astype('int16')
                elif series.min() > -2147483648 and series.max() < 2147483647:
                    return series.astype('int32')
        elif series.dtype == 'float64':
            return series.astype('float32')
        
        return series
    
    @staticmethod
    def optimize_object_column(series: pd.Series) -> pd.Series:
        """Optimize object column memory usage"""
        # Check if it's actually numeric
        try:
            pd.to_numeric(series, errors='raise')
            return pd.to_numeric(series, errors='coerce').astype('float32')
        except:
            pass
        
        # Check if it's boolean-like
        if series.nunique() <= 2:
            return series.astype('category')
        
        # Check if it's categorical
        if series.nunique() / len(series) < 0.5:
            return series.astype('category')
        
        return series.astype('string')
    
    @staticmethod
    def optimize_datetime_column(series: pd.Series) -> pd.Series:
        """Optimize datetime column memory usage"""
        # Convert to datetime64[ns] if not already
        if series.dtype != 'datetime64[ns]':
            series = pd.to_datetime(series)
        
        return series

Garbage Collection Optimization

Memory Cleanup Strategies:

import gc
import psutil
import os
from typing import List, Any

class MemoryManager:
    """Memory management for CSV processing"""
    
    def __init__(self, max_memory_mb: int = 1000, cleanup_threshold: float = 0.8):
        self.max_memory_mb = max_memory_mb
        self.cleanup_threshold = cleanup_threshold
        self.process = psutil.Process(os.getpid())
        self.memory_history = []
    
    def check_memory_usage(self) -> float:
        """Check current memory usage in MB"""
        memory_info = self.process.memory_info()
        memory_mb = memory_info.rss / 1024 / 1024
        self.memory_history.append(memory_mb)
        return memory_mb
    
    def should_cleanup(self) -> bool:
        """Check if memory cleanup is needed"""
        current_memory = self.check_memory_usage()
        return current_memory > self.max_memory_mb * self.cleanup_threshold
    
    def cleanup_memory(self) -> float:
        """Force garbage collection and return memory freed"""
        before_memory = self.check_memory_usage()
        
        # Force garbage collection
        gc.collect()
        
        after_memory = self.check_memory_usage()
        memory_freed = before_memory - after_memory
        
        print(f"Memory cleanup: {before_memory:.2f} MB -> {after_memory:.2f} MB (freed {memory_freed:.2f} MB)")
        
        return memory_freed
    
    def monitor_memory(self, operation_name: str):
        """Monitor memory usage during operation"""
        start_memory = self.check_memory_usage()
        
        def cleanup_if_needed():
            if self.should_cleanup():
                self.cleanup_memory()
        
        return cleanup_if_needed

# Usage
memory_manager = MemoryManager(max_memory_mb=500)

def process_large_csv(file_path: str):
    """Process large CSV with memory management"""
    cleanup = memory_manager.monitor_memory("csv_processing")
    
    with open(file_path, 'r') as file:
        reader = csv.DictReader(file)
        
        for i, row in enumerate(reader):
            # Process row
            process_row(row)
            
            # Cleanup if needed
            if i % 1000 == 0:
                cleanup()

Indexing and Query Optimization

CSV Indexing Strategies

Row-Based Indexing:

import csv
import json
import os
from typing import Dict, List, Any, Optional

class CSVIndexer:
    """CSV indexing for fast row access"""
    
    def __init__(self, file_path: str):
        self.file_path = file_path
        self.index_file = f"{file_path}.index"
        self.index = {}
        self.header = []
    
    def build_index(self) -> Dict[str, Any]:
        """Build index for CSV file"""
        print("Building CSV index...")
        
        with open(self.file_path, 'r', encoding='utf-8') as file:
            reader = csv.reader(file)
            
            # Read header
            self.header = next(reader)
            
            # Build row index
            row_offset = file.tell()
            row_number = 0
            
            for row in reader:
                self.index[row_number] = {
                    'offset': row_offset,
                    'length': len(','.join(row).encode('utf-8'))
                }
                
                row_offset = file.tell()
                row_number += 1
        
        # Save index to file
        self.save_index()
        
        print(f"Index built for {row_number} rows")
        return self.index
    
    def load_index(self) -> Dict[str, Any]:
        """Load index from file"""
        if os.path.exists(self.index_file):
            with open(self.index_file, 'r') as file:
                self.index = json.load(file)
            return self.index
        return {}
    
    def save_index(self):
        """Save index to file"""
        with open(self.index_file, 'w') as file:
            json.dump(self.index, file)
    
    def get_row(self, row_number: int) -> Optional[Dict[str, Any]]:
        """Get specific row by number"""
        if row_number not in self.index:
            return None
        
        row_info = self.index[row_number]
        
        with open(self.file_path, 'r', encoding='utf-8') as file:
            file.seek(row_info['offset'])
            row_data = file.read(row_info['length'])
            
            # Parse CSV row
            reader = csv.reader([row_data])
            row = next(reader)
            
            return dict(zip(self.header, row))
    
    def get_rows(self, start: int, end: int) -> List[Dict[str, Any]]:
        """Get range of rows"""
        rows = []
        for i in range(start, min(end, len(self.index))):
            row = self.get_row(i)
            if row:
                rows.append(row)
        return rows

# Usage
indexer = CSVIndexer('large_file.csv')
indexer.build_index()

# Fast row access
row_1000 = indexer.get_row(1000)
rows_1000_1100 = indexer.get_rows(1000, 1100)

Column-Based Indexing:

class ColumnIndexer:
    """Column-based indexing for fast column access"""
    
    def __init__(self, file_path: str):
        self.file_path = file_path
        self.column_index = {}
        self.header = []
    
    def build_column_index(self) -> Dict[str, List[int]]:
        """Build column index for fast column access"""
        print("Building column index...")
        
        with open(self.file_path, 'r', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            
            # Read header
            self.header = reader.fieldnames
            
            # Initialize column index
            for col in self.header:
                self.column_index[col] = []
            
            # Build index
            row_number = 0
            for row in reader:
                for col, value in row.items():
                    if value:  # Only index non-empty values
                        self.column_index[col].append(row_number)
                row_number += 1
        
        print(f"Column index built for {len(self.header)} columns")
        return self.column_index
    
    def get_column_values(self, column_name: str) -> List[Any]:
        """Get all values for a specific column"""
        if column_name not in self.column_index:
            return []
        
        row_numbers = self.column_index[column_name]
        values = []
        
        with open(self.file_path, 'r', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            
            for i, row in enumerate(reader):
                if i in row_numbers:
                    values.append(row[column_name])
        
        return values
    
    def filter_by_column(self, column_name: str, value: Any) -> List[Dict[str, Any]]:
        """Filter rows by column value"""
        if column_name not in self.column_index:
            return []
        
        row_numbers = self.column_index[column_name]
        filtered_rows = []
        
        with open(self.file_path, 'r', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            
            for i, row in enumerate(reader):
                if i in row_numbers and row[column_name] == value:
                    filtered_rows.append(row)
        
        return filtered_rows

Query Optimization

Efficient CSV Queries:

class CSVQueryEngine:
    """Query engine for CSV files"""
    
    def __init__(self, file_path: str):
        self.file_path = file_path
        self.indexer = CSVIndexer(file_path)
        self.column_indexer = ColumnIndexer(file_path)
    
    def build_indexes(self):
        """Build all indexes"""
        self.indexer.build_index()
        self.column_indexer.build_column_index()
    
    def query(self, filters: Dict[str, Any], limit: Optional[int] = None) -> List[Dict[str, Any]]:
        """Query CSV with filters"""
        results = []
        
        with open(self.file_path, 'r', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            
            for row in reader:
                # Apply filters
                if self.matches_filters(row, filters):
                    results.append(row)
                    
                    if limit and len(results) >= limit:
                        break
        
        return results
    
    def matches_filters(self, row: Dict[str, Any], filters: Dict[str, Any]) -> bool:
        """Check if row matches filters"""
        for column, value in filters.items():
            if column not in row or row[column] != value:
                return False
        return True
    
    def aggregate(self, group_by: str, aggregate_func: str, column: str) -> Dict[str, Any]:
        """Aggregate data by column"""
        groups = {}
        
        with open(self.file_path, 'r', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            
            for row in reader:
                group_value = row[group_by]
                if group_value not in groups:
                    groups[group_value] = []
                
                try:
                    numeric_value = float(row[column])
                    groups[group_value].append(numeric_value)
                except (ValueError, TypeError):
                    continue
        
        # Apply aggregate function
        result = {}
        for group, values in groups.items():
            if aggregate_func == 'sum':
                result[group] = sum(values)
            elif aggregate_func == 'avg':
                result[group] = sum(values) / len(values)
            elif aggregate_func == 'count':
                result[group] = len(values)
            elif aggregate_func == 'min':
                result[group] = min(values)
            elif aggregate_func == 'max':
                result[group] = max(values)
        
        return result

Compression Techniques

File Compression

Compression Strategies:

import gzip
import bz2
import lzma
import pandas as pd
from typing import Optional

class CSVCompressor:
    """CSV compression utilities"""
    
    @staticmethod
    def compress_file(input_path: str, output_path: str, compression: str = 'gzip') -> str:
        """Compress CSV file"""
        if compression == 'gzip':
            with open(input_path, 'rb') as f_in:
                with gzip.open(output_path, 'wb') as f_out:
                    f_out.writelines(f_in)
        elif compression == 'bz2':
            with open(input_path, 'rb') as f_in:
                with bz2.open(output_path, 'wb') as f_out:
                    f_out.writelines(f_in)
        elif compression == 'lzma':
            with open(input_path, 'rb') as f_in:
                with lzma.open(output_path, 'wb') as f_out:
                    f_out.writelines(f_in)
        else:
            raise ValueError(f"Unsupported compression: {compression}")
        
        return output_path
    
    @staticmethod
    def decompress_file(input_path: str, output_path: str, compression: str = 'gzip') -> str:
        """Decompress CSV file"""
        if compression == 'gzip':
            with gzip.open(input_path, 'rb') as f_in:
                with open(output_path, 'wb') as f_out:
                    f_out.writelines(f_in)
        elif compression == 'bz2':
            with bz2.open(input_path, 'rb') as f_in:
                with open(output_path, 'wb') as f_out:
                    f_out.writelines(f_in)
        elif compression == 'lzma':
            with lzma.open(input_path, 'rb') as f_in:
                with open(output_path, 'wb') as f_out:
                    f_out.writelines(f_in)
        else:
            raise ValueError(f"Unsupported compression: {compression}")
        
        return output_path
    
    @staticmethod
    def get_compression_ratio(original_path: str, compressed_path: str) -> float:
        """Get compression ratio"""
        original_size = os.path.getsize(original_path)
        compressed_size = os.path.getsize(compressed_path)
        return compressed_size / original_size

# Usage
compressor = CSVCompressor()

# Compress file
compressed_path = compressor.compress_file('data.csv', 'data.csv.gz', 'gzip')
ratio = compressor.get_compression_ratio('data.csv', compressed_path)
print(f"Compression ratio: {ratio:.2f}")

Pandas Compression:

class PandasCompressor:
    """Pandas-based CSV compression"""
    
    @staticmethod
    def compress_csv(input_path: str, output_path: str, compression: str = 'gzip') -> str:
        """Compress CSV using pandas"""
        df = pd.read_csv(input_path)
        
        if compression == 'gzip':
            df.to_csv(output_path, compression='gzip', index=False)
        elif compression == 'bz2':
            df.to_csv(output_path, compression='bz2', index=False)
        elif compression == 'xz':
            df.to_csv(output_path, compression='xz', index=False)
        else:
            raise ValueError(f"Unsupported compression: {compression}")
        
        return output_path
    
    @staticmethod
    def read_compressed_csv(file_path: str, compression: str = 'gzip') -> pd.DataFrame:
        """Read compressed CSV using pandas"""
        return pd.read_csv(file_path, compression=compression)

Data Compression

Column Compression:

class ColumnCompressor:
    """Column-level compression for CSV data"""
    
    @staticmethod
    def compress_columns(df: pd.DataFrame) -> pd.DataFrame:
        """Compress DataFrame columns"""
        compressed_df = df.copy()
        
        for col in compressed_df.columns:
            if compressed_df[col].dtype == 'object':
                # Try to compress object columns
                compressed_df[col] = ColumnCompressor.compress_object_column(compressed_df[col])
            elif compressed_df[col].dtype == 'int64':
                # Compress integer columns
                compressed_df[col] = ColumnCompressor.compress_int_column(compressed_df[col])
            elif compressed_df[col].dtype == 'float64':
                # Compress float columns
                compressed_df[col] = ColumnCompressor.compress_float_column(compressed_df[col])
        
        return compressed_df
    
    @staticmethod
    def compress_object_column(series: pd.Series) -> pd.Series:
        """Compress object column"""
        # Convert to category if appropriate
        if series.nunique() / len(series) < 0.5:
            return series.astype('category')
        
        # Convert to string if not already
        return series.astype('string')
    
    @staticmethod
    def compress_int_column(series: pd.Series) -> pd.Series:
        """Compress integer column"""
        if series.min() >= 0:
            if series.max() < 255:
                return series.astype('uint8')
            elif series.max() < 65535:
                return series.astype('uint16')
            elif series.max() < 4294967295:
                return series.astype('uint32')
        else:
            if series.min() > -128 and series.max() < 127:
                return series.astype('int8')
            elif series.min() > -32768 and series.max() < 32767:
                return series.astype('int16')
            elif series.min() > -2147483648 and series.max() < 2147483647:
                return series.astype('int32')
        
        return series
    
    @staticmethod
    def compress_float_column(series: pd.Series) -> pd.Series:
        """Compress float column"""
        return series.astype('float32')

Parallel Processing

Multi-Threading

Threaded CSV Processing:

import threading
import queue
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any, Callable

class ThreadedCSVProcessor:
    """Multi-threaded CSV processor"""
    
    def __init__(self, num_threads: int = 4):
        self.num_threads = num_threads
        self.results_queue = queue.Queue()
        self.error_queue = queue.Queue()
    
    def process_csv_parallel(self, file_path: str, process_func: Callable) -> List[Any]:
        """Process CSV file in parallel"""
        # Read file and split into chunks
        chunks = self.split_file_into_chunks(file_path)
        
        # Process chunks in parallel
        with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
            futures = []
            
            for i, chunk in enumerate(chunks):
                future = executor.submit(self.process_chunk, chunk, process_func, i)
                futures.append(future)
            
            # Collect results
            results = []
            for future in as_completed(futures):
                try:
                    result = future.result()
                    results.extend(result)
                except Exception as e:
                    print(f"Error processing chunk: {e}")
        
        return results
    
    def split_file_into_chunks(self, file_path: str, chunk_size: int = 1000) -> List[List[Dict[str, Any]]]:
        """Split CSV file into chunks"""
        chunks = []
        current_chunk = []
        
        with open(file_path, 'r', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            
            for row in reader:
                current_chunk.append(row)
                
                if len(current_chunk) >= chunk_size:
                    chunks.append(current_chunk)
                    current_chunk = []
            
            if current_chunk:
                chunks.append(current_chunk)
        
        return chunks
    
    def process_chunk(self, chunk: List[Dict[str, Any]], process_func: Callable, chunk_id: int) -> List[Any]:
        """Process a single chunk"""
        results = []
        
        for row in chunk:
            try:
                result = process_func(row)
                results.append(result)
            except Exception as e:
                print(f"Error processing row in chunk {chunk_id}: {e}")
        
        return results

Multi-Processing

Process-Based CSV Processing:

import multiprocessing as mp
from multiprocessing import Pool, Manager
from typing import List, Dict, Any, Callable

class ProcessedCSVProcessor:
    """Multi-process CSV processor"""
    
    def __init__(self, num_processes: int = None):
        self.num_processes = num_processes or mp.cpu_count()
    
    def process_csv_parallel(self, file_path: str, process_func: Callable) -> List[Any]:
        """Process CSV file using multiple processes"""
        # Split file into chunks
        chunks = self.split_file_into_chunks(file_path)
        
        # Process chunks in parallel
        with Pool(processes=self.num_processes) as pool:
            results = pool.map(self.process_chunk_wrapper, 
                             [(chunk, process_func) for chunk in chunks])
        
        # Flatten results
        flattened_results = []
        for result in results:
            flattened_results.extend(result)
        
        return flattened_results
    
    def process_chunk_wrapper(self, args):
        """Wrapper for process chunk function"""
        chunk, process_func = args
        return self.process_chunk(chunk, process_func)
    
    def split_file_into_chunks(self, file_path: str, chunk_size: int = 1000) -> List[List[Dict[str, Any]]]:
        """Split CSV file into chunks"""
        chunks = []
        current_chunk = []
        
        with open(file_path, 'r', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            
            for row in reader:
                current_chunk.append(row)
                
                if len(current_chunk) >= chunk_size:
                    chunks.append(current_chunk)
                    current_chunk = []
            
            if current_chunk:
                chunks.append(current_chunk)
        
        return chunks
    
    def process_chunk(self, chunk: List[Dict[str, Any]], process_func: Callable) -> List[Any]:
        """Process a single chunk"""
        results = []
        
        for row in chunk:
            try:
                result = process_func(row)
                results.append(result)
            except Exception as e:
                print(f"Error processing row: {e}")
        
        return results

Performance Monitoring

Benchmarking Tools

Performance Benchmarking:

import time
import psutil
import os
from typing import Dict, Any, Callable
import pandas as pd

class CSVPerformanceBenchmark:
    """CSV performance benchmarking tool"""
    
    def __init__(self):
        self.results = {}
        self.process = psutil.Process(os.getpid())
    
    def benchmark_function(self, func: Callable, *args, **kwargs) -> Dict[str, Any]:
        """Benchmark a function"""
        # Get initial memory usage
        initial_memory = self.process.memory_info().rss / 1024 / 1024
        
        # Start timing
        start_time = time.time()
        start_cpu = time.process_time()
        
        # Execute function
        result = func(*args, **kwargs)
        
        # End timing
        end_time = time.time()
        end_cpu = time.process_time()
        
        # Get final memory usage
        final_memory = self.process.memory_info().rss / 1024 / 1024
        
        # Calculate metrics
        wall_time = end_time - start_time
        cpu_time = end_cpu - start_cpu
        memory_used = final_memory - initial_memory
        
        benchmark_result = {
            'wall_time': wall_time,
            'cpu_time': cpu_time,
            'memory_used': memory_used,
            'result_size': len(result) if hasattr(result, '__len__') else 1
        }
        
        return benchmark_result
    
    def compare_methods(self, methods: Dict[str, Callable], *args, **kwargs) -> Dict[str, Any]:
        """Compare multiple methods"""
        results = {}
        
        for name, method in methods.items():
            print(f"Benchmarking {name}...")
            result = self.benchmark_function(method, *args, **kwargs)
            results[name] = result
        
        return results
    
    def generate_report(self, results: Dict[str, Any]) -> str:
        """Generate performance report"""
        report = "CSV Performance Benchmark Report\n"
        report += "=" * 40 + "\n\n"
        
        for method, metrics in results.items():
            report += f"Method: {method}\n"
            report += f"  Wall Time: {metrics['wall_time']:.4f} seconds\n"
            report += f"  CPU Time: {metrics['cpu_time']:.4f} seconds\n"
            report += f"  Memory Used: {metrics['memory_used']:.2f} MB\n"
            report += f"  Result Size: {metrics['result_size']}\n\n"
        
        return report

# Usage
benchmark = CSVPerformanceBenchmark()

def method1(file_path):
    return pd.read_csv(file_path)

def method2(file_path):
    return pd.read_csv(file_path, chunksize=1000)

methods = {
    'pandas_full': method1,
    'pandas_chunked': method2
}

results = benchmark.compare_methods(methods, 'large_file.csv')
report = benchmark.generate_report(results)
print(report)

Conclusion

CSV performance optimization is essential for handling large datasets efficiently. By implementing the techniques and strategies outlined in this guide, you can significantly improve your CSV processing speed, reduce memory usage, and build more scalable applications.

Key Takeaways:

  1. Choose the Right Strategy: Use streaming for large files, batch loading for smaller files
  2. Optimize Memory Usage: Use appropriate data types and implement memory management
  3. Implement Indexing: Use row-based and column-based indexing for fast access
  4. Apply Compression: Use file and data compression to reduce storage and I/O
  5. Use Parallel Processing: Leverage multi-threading and multi-processing for better performance

Next Steps:

  1. Profile Your Code: Use benchmarking tools to identify bottlenecks
  2. Implement Optimizations: Apply the techniques that best fit your use case
  3. Monitor Performance: Continuously monitor and optimize your CSV processing
  4. Test with Real Data: Validate optimizations with actual data and workloads
  5. Scale Gradually: Start with small optimizations and scale up as needed

For more CSV data processing tools and guides, explore our CSV Tools Hub or try our CSV Validator for instant data validation.

Related posts