CSV Performance Optimization: Speed & Memory (2025 Developer Guide)
CSV performance optimization is crucial for handling large datasets efficiently. Whether you're processing millions of rows, building real-time data pipelines, or optimizing memory usage, understanding the techniques and strategies for CSV performance optimization can significantly improve your application's speed and scalability.
This comprehensive guide covers everything you need to know about CSV performance optimization, including streaming vs loading strategies, memory management techniques, indexing approaches, compression methods, and advanced optimization patterns. Whether you're a data engineer, backend developer, or performance specialist, this guide will help you optimize your CSV processing workflows.
Why CSV Performance Matters
Common Performance Challenges
Memory Limitations:
- Large files exceeding available RAM
- Memory leaks during processing
- Inefficient data structures
- Garbage collection overhead
Processing Speed:
- Slow file I/O operations
- Inefficient parsing algorithms
- CPU-intensive operations
- Network latency for remote files
Scalability Issues:
- Single-threaded processing
- Blocking operations
- Resource contention
- Poor concurrency handling
Storage Optimization:
- Large file sizes
- Redundant data storage
- Inefficient compression
- Poor data organization
Performance Impact Factors
File Size:
- Small files (< 1MB): Minimal impact
- Medium files (1-100MB): Moderate impact
- Large files (100MB-1GB): Significant impact
- Very large files (> 1GB): Critical impact
Data Characteristics:
- Number of columns
- Data types and complexity
- Encoding and character sets
- Compression ratio
System Resources:
- Available RAM
- CPU cores and speed
- Storage type (SSD vs HDD)
- Network bandwidth
Streaming vs Loading Strategies
Streaming Processing
Benefits of Streaming:
- Constant memory usage
- Handles files larger than RAM
- Real-time processing
- Better resource utilization
Streaming Implementation:
import csv
import io
from typing import Iterator, Dict, Any
class CSVStreamProcessor:
"""Streaming CSV processor for large files"""
def __init__(self, file_path: str, chunk_size: int = 1000):
self.file_path = file_path
self.chunk_size = chunk_size
self.processed_rows = 0
self.memory_usage = 0
def stream_processing(self) -> Iterator[Dict[str, Any]]:
"""Stream process CSV file"""
with open(self.file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
chunk = []
for row in reader:
chunk.append(row)
if len(chunk) >= self.chunk_size:
yield from self.process_chunk(chunk)
chunk = []
# Memory monitoring
self.memory_usage = self.get_memory_usage()
print(f"Processed {self.processed_rows} rows, Memory: {self.memory_usage:.2f} MB")
# Process remaining data
if chunk:
yield from self.process_chunk(chunk)
def process_chunk(self, chunk: List[Dict[str, Any]]) -> Iterator[Dict[str, Any]]:
"""Process a chunk of data"""
for row in chunk:
# Apply transformations
processed_row = self.transform_row(row)
self.processed_rows += 1
yield processed_row
def transform_row(self, row: Dict[str, Any]) -> Dict[str, Any]:
"""Transform individual row"""
# Example: Clean and validate data
cleaned_row = {}
for key, value in row.items():
if isinstance(value, str):
cleaned_row[key] = value.strip()
else:
cleaned_row[key] = value
return cleaned_row
def get_memory_usage(self) -> float:
"""Get current memory usage in MB"""
import psutil
import os
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024
# Usage
processor = CSVStreamProcessor('large_file.csv', chunk_size=5000)
for row in processor.stream_processing():
# Process each row
pass
Advanced Streaming with Generators:
import csv
import gzip
from typing import Iterator, Dict, Any, Optional
class AdvancedCSVStreamer:
"""Advanced CSV streaming with compression support"""
def __init__(self, file_path: str, compression: Optional[str] = None):
self.file_path = file_path
self.compression = compression
self.file_handle = None
self.reader = None
def __enter__(self):
"""Context manager entry"""
if self.compression == 'gzip':
self.file_handle = gzip.open(self.file_path, 'rt', encoding='utf-8')
else:
self.file_handle = open(self.file_path, 'r', encoding='utf-8')
self.reader = csv.DictReader(self.file_handle)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
if self.file_handle:
self.file_handle.close()
def stream_rows(self, filter_func: Optional[callable] = None) -> Iterator[Dict[str, Any]]:
"""Stream rows with optional filtering"""
for row in self.reader:
if filter_func is None or filter_func(row):
yield row
def stream_chunks(self, chunk_size: int = 1000) -> Iterator[List[Dict[str, Any]]]:
"""Stream data in chunks"""
chunk = []
for row in self.stream_rows():
chunk.append(row)
if len(chunk) >= chunk_size:
yield chunk
chunk = []
if chunk:
yield chunk
# Usage with compression
with AdvancedCSVStreamer('data.csv.gz', compression='gzip') as streamer:
for chunk in streamer.stream_chunks(chunk_size=1000):
# Process chunk
process_chunk(chunk)
Batch Loading
Benefits of Batch Loading:
- Faster processing for small files
- Better for complex operations
- Easier to implement
- Better for data analysis
Batch Loading Implementation:
import pandas as pd
import numpy as np
from typing import List, Dict, Any
class CSVBatchProcessor:
"""Batch CSV processor for smaller files"""
def __init__(self, file_path: str, max_memory_mb: int = 500):
self.file_path = file_path
self.max_memory_mb = max_memory_mb
self.df = None
def load_data(self) -> pd.DataFrame:
"""Load CSV data into memory"""
# Check file size
file_size_mb = self.get_file_size_mb()
if file_size_mb > self.max_memory_mb:
raise MemoryError(f"File size ({file_size_mb:.2f} MB) exceeds memory limit ({self.max_memory_mb} MB)")
# Load with optimization
self.df = pd.read_csv(
self.file_path,
low_memory=False,
dtype=self.optimize_dtypes(),
parse_dates=True,
infer_datetime_format=True
)
return self.df
def optimize_dtypes(self) -> Dict[str, str]:
"""Optimize data types for memory efficiency"""
# Sample first few rows to infer types
sample_df = pd.read_csv(self.file_path, nrows=1000)
dtype_map = {}
for column in sample_df.columns:
if sample_df[column].dtype == 'object':
# Check if it's actually numeric
try:
pd.to_numeric(sample_df[column], errors='raise')
dtype_map[column] = 'float32'
except:
# Check if it's actually boolean
if sample_df[column].nunique() <= 2:
dtype_map[column] = 'category'
else:
dtype_map[column] = 'string'
elif sample_df[column].dtype == 'int64':
dtype_map[column] = 'int32'
elif sample_df[column].dtype == 'float64':
dtype_map[column] = 'float32'
return dtype_map
def get_file_size_mb(self) -> float:
"""Get file size in MB"""
import os
return os.path.getsize(self.file_path) / 1024 / 1024
def process_data(self) -> pd.DataFrame:
"""Process loaded data"""
if self.df is None:
self.load_data()
# Apply optimizations
self.df = self.optimize_memory_usage(self.df)
# Apply transformations
self.df = self.apply_transformations(self.df)
return self.df
def optimize_memory_usage(self, df: pd.DataFrame) -> pd.DataFrame:
"""Optimize DataFrame memory usage"""
# Convert object columns to category if appropriate
for col in df.select_dtypes(include=['object']).columns:
if df[col].nunique() / len(df) < 0.5:
df[col] = df[col].astype('category')
# Convert numeric columns to appropriate types
for col in df.select_dtypes(include=['int64']).columns:
if df[col].min() >= 0:
if df[col].max() < 255:
df[col] = df[col].astype('uint8')
elif df[col].max() < 65535:
df[col] = df[col].astype('uint16')
elif df[col].max() < 4294967295:
df[col] = df[col].astype('uint32')
else:
if df[col].min() > -128 and df[col].max() < 127:
df[col] = df[col].astype('int8')
elif df[col].min() > -32768 and df[col].max() < 32767:
df[col] = df[col].astype('int16')
elif df[col].min() > -2147483648 and df[col].max() < 2147483647:
df[col] = df[col].astype('int32')
return df
def apply_transformations(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply data transformations"""
# Example transformations
for col in df.select_dtypes(include=['object']).columns:
df[col] = df[col].str.strip()
return df
Memory Management Techniques
Memory-Efficient Data Structures
Optimized Data Types:
import pandas as pd
import numpy as np
from typing import Dict, Any
class MemoryOptimizer:
"""Memory optimization utilities for CSV processing"""
@staticmethod
def optimize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
"""Optimize DataFrame memory usage"""
original_memory = df.memory_usage(deep=True).sum() / 1024 / 1024
# Optimize numeric columns
for col in df.select_dtypes(include=[np.number]).columns:
df[col] = MemoryOptimizer.optimize_numeric_column(df[col])
# Optimize object columns
for col in df.select_dtypes(include=['object']).columns:
df[col] = MemoryOptimizer.optimize_object_column(df[col])
# Optimize datetime columns
for col in df.select_dtypes(include=['datetime64']).columns:
df[col] = MemoryOptimizer.optimize_datetime_column(df[col])
optimized_memory = df.memory_usage(deep=True).sum() / 1024 / 1024
reduction = (original_memory - optimized_memory) / original_memory * 100
print(f"Memory optimization: {original_memory:.2f} MB -> {optimized_memory:.2f} MB ({reduction:.1f}% reduction)")
return df
@staticmethod
def optimize_numeric_column(series: pd.Series) -> pd.Series:
"""Optimize numeric column memory usage"""
if series.dtype == 'int64':
if series.min() >= 0:
if series.max() < 255:
return series.astype('uint8')
elif series.max() < 65535:
return series.astype('uint16')
elif series.max() < 4294967295:
return series.astype('uint32')
else:
if series.min() > -128 and series.max() < 127:
return series.astype('int8')
elif series.min() > -32768 and series.max() < 32767:
return series.astype('int16')
elif series.min() > -2147483648 and series.max() < 2147483647:
return series.astype('int32')
elif series.dtype == 'float64':
return series.astype('float32')
return series
@staticmethod
def optimize_object_column(series: pd.Series) -> pd.Series:
"""Optimize object column memory usage"""
# Check if it's actually numeric
try:
pd.to_numeric(series, errors='raise')
return pd.to_numeric(series, errors='coerce').astype('float32')
except:
pass
# Check if it's boolean-like
if series.nunique() <= 2:
return series.astype('category')
# Check if it's categorical
if series.nunique() / len(series) < 0.5:
return series.astype('category')
return series.astype('string')
@staticmethod
def optimize_datetime_column(series: pd.Series) -> pd.Series:
"""Optimize datetime column memory usage"""
# Convert to datetime64[ns] if not already
if series.dtype != 'datetime64[ns]':
series = pd.to_datetime(series)
return series
Garbage Collection Optimization
Memory Cleanup Strategies:
import gc
import psutil
import os
from typing import List, Any
class MemoryManager:
"""Memory management for CSV processing"""
def __init__(self, max_memory_mb: int = 1000, cleanup_threshold: float = 0.8):
self.max_memory_mb = max_memory_mb
self.cleanup_threshold = cleanup_threshold
self.process = psutil.Process(os.getpid())
self.memory_history = []
def check_memory_usage(self) -> float:
"""Check current memory usage in MB"""
memory_info = self.process.memory_info()
memory_mb = memory_info.rss / 1024 / 1024
self.memory_history.append(memory_mb)
return memory_mb
def should_cleanup(self) -> bool:
"""Check if memory cleanup is needed"""
current_memory = self.check_memory_usage()
return current_memory > self.max_memory_mb * self.cleanup_threshold
def cleanup_memory(self) -> float:
"""Force garbage collection and return memory freed"""
before_memory = self.check_memory_usage()
# Force garbage collection
gc.collect()
after_memory = self.check_memory_usage()
memory_freed = before_memory - after_memory
print(f"Memory cleanup: {before_memory:.2f} MB -> {after_memory:.2f} MB (freed {memory_freed:.2f} MB)")
return memory_freed
def monitor_memory(self, operation_name: str):
"""Monitor memory usage during operation"""
start_memory = self.check_memory_usage()
def cleanup_if_needed():
if self.should_cleanup():
self.cleanup_memory()
return cleanup_if_needed
# Usage
memory_manager = MemoryManager(max_memory_mb=500)
def process_large_csv(file_path: str):
"""Process large CSV with memory management"""
cleanup = memory_manager.monitor_memory("csv_processing")
with open(file_path, 'r') as file:
reader = csv.DictReader(file)
for i, row in enumerate(reader):
# Process row
process_row(row)
# Cleanup if needed
if i % 1000 == 0:
cleanup()
Indexing and Query Optimization
CSV Indexing Strategies
Row-Based Indexing:
import csv
import json
import os
from typing import Dict, List, Any, Optional
class CSVIndexer:
"""CSV indexing for fast row access"""
def __init__(self, file_path: str):
self.file_path = file_path
self.index_file = f"{file_path}.index"
self.index = {}
self.header = []
def build_index(self) -> Dict[str, Any]:
"""Build index for CSV file"""
print("Building CSV index...")
with open(self.file_path, 'r', encoding='utf-8') as file:
reader = csv.reader(file)
# Read header
self.header = next(reader)
# Build row index
row_offset = file.tell()
row_number = 0
for row in reader:
self.index[row_number] = {
'offset': row_offset,
'length': len(','.join(row).encode('utf-8'))
}
row_offset = file.tell()
row_number += 1
# Save index to file
self.save_index()
print(f"Index built for {row_number} rows")
return self.index
def load_index(self) -> Dict[str, Any]:
"""Load index from file"""
if os.path.exists(self.index_file):
with open(self.index_file, 'r') as file:
self.index = json.load(file)
return self.index
return {}
def save_index(self):
"""Save index to file"""
with open(self.index_file, 'w') as file:
json.dump(self.index, file)
def get_row(self, row_number: int) -> Optional[Dict[str, Any]]:
"""Get specific row by number"""
if row_number not in self.index:
return None
row_info = self.index[row_number]
with open(self.file_path, 'r', encoding='utf-8') as file:
file.seek(row_info['offset'])
row_data = file.read(row_info['length'])
# Parse CSV row
reader = csv.reader([row_data])
row = next(reader)
return dict(zip(self.header, row))
def get_rows(self, start: int, end: int) -> List[Dict[str, Any]]:
"""Get range of rows"""
rows = []
for i in range(start, min(end, len(self.index))):
row = self.get_row(i)
if row:
rows.append(row)
return rows
# Usage
indexer = CSVIndexer('large_file.csv')
indexer.build_index()
# Fast row access
row_1000 = indexer.get_row(1000)
rows_1000_1100 = indexer.get_rows(1000, 1100)
Column-Based Indexing:
class ColumnIndexer:
"""Column-based indexing for fast column access"""
def __init__(self, file_path: str):
self.file_path = file_path
self.column_index = {}
self.header = []
def build_column_index(self) -> Dict[str, List[int]]:
"""Build column index for fast column access"""
print("Building column index...")
with open(self.file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
# Read header
self.header = reader.fieldnames
# Initialize column index
for col in self.header:
self.column_index[col] = []
# Build index
row_number = 0
for row in reader:
for col, value in row.items():
if value: # Only index non-empty values
self.column_index[col].append(row_number)
row_number += 1
print(f"Column index built for {len(self.header)} columns")
return self.column_index
def get_column_values(self, column_name: str) -> List[Any]:
"""Get all values for a specific column"""
if column_name not in self.column_index:
return []
row_numbers = self.column_index[column_name]
values = []
with open(self.file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for i, row in enumerate(reader):
if i in row_numbers:
values.append(row[column_name])
return values
def filter_by_column(self, column_name: str, value: Any) -> List[Dict[str, Any]]:
"""Filter rows by column value"""
if column_name not in self.column_index:
return []
row_numbers = self.column_index[column_name]
filtered_rows = []
with open(self.file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for i, row in enumerate(reader):
if i in row_numbers and row[column_name] == value:
filtered_rows.append(row)
return filtered_rows
Query Optimization
Efficient CSV Queries:
class CSVQueryEngine:
"""Query engine for CSV files"""
def __init__(self, file_path: str):
self.file_path = file_path
self.indexer = CSVIndexer(file_path)
self.column_indexer = ColumnIndexer(file_path)
def build_indexes(self):
"""Build all indexes"""
self.indexer.build_index()
self.column_indexer.build_column_index()
def query(self, filters: Dict[str, Any], limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""Query CSV with filters"""
results = []
with open(self.file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
# Apply filters
if self.matches_filters(row, filters):
results.append(row)
if limit and len(results) >= limit:
break
return results
def matches_filters(self, row: Dict[str, Any], filters: Dict[str, Any]) -> bool:
"""Check if row matches filters"""
for column, value in filters.items():
if column not in row or row[column] != value:
return False
return True
def aggregate(self, group_by: str, aggregate_func: str, column: str) -> Dict[str, Any]:
"""Aggregate data by column"""
groups = {}
with open(self.file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
group_value = row[group_by]
if group_value not in groups:
groups[group_value] = []
try:
numeric_value = float(row[column])
groups[group_value].append(numeric_value)
except (ValueError, TypeError):
continue
# Apply aggregate function
result = {}
for group, values in groups.items():
if aggregate_func == 'sum':
result[group] = sum(values)
elif aggregate_func == 'avg':
result[group] = sum(values) / len(values)
elif aggregate_func == 'count':
result[group] = len(values)
elif aggregate_func == 'min':
result[group] = min(values)
elif aggregate_func == 'max':
result[group] = max(values)
return result
Compression Techniques
File Compression
Compression Strategies:
import gzip
import bz2
import lzma
import pandas as pd
from typing import Optional
class CSVCompressor:
"""CSV compression utilities"""
@staticmethod
def compress_file(input_path: str, output_path: str, compression: str = 'gzip') -> str:
"""Compress CSV file"""
if compression == 'gzip':
with open(input_path, 'rb') as f_in:
with gzip.open(output_path, 'wb') as f_out:
f_out.writelines(f_in)
elif compression == 'bz2':
with open(input_path, 'rb') as f_in:
with bz2.open(output_path, 'wb') as f_out:
f_out.writelines(f_in)
elif compression == 'lzma':
with open(input_path, 'rb') as f_in:
with lzma.open(output_path, 'wb') as f_out:
f_out.writelines(f_in)
else:
raise ValueError(f"Unsupported compression: {compression}")
return output_path
@staticmethod
def decompress_file(input_path: str, output_path: str, compression: str = 'gzip') -> str:
"""Decompress CSV file"""
if compression == 'gzip':
with gzip.open(input_path, 'rb') as f_in:
with open(output_path, 'wb') as f_out:
f_out.writelines(f_in)
elif compression == 'bz2':
with bz2.open(input_path, 'rb') as f_in:
with open(output_path, 'wb') as f_out:
f_out.writelines(f_in)
elif compression == 'lzma':
with lzma.open(input_path, 'rb') as f_in:
with open(output_path, 'wb') as f_out:
f_out.writelines(f_in)
else:
raise ValueError(f"Unsupported compression: {compression}")
return output_path
@staticmethod
def get_compression_ratio(original_path: str, compressed_path: str) -> float:
"""Get compression ratio"""
original_size = os.path.getsize(original_path)
compressed_size = os.path.getsize(compressed_path)
return compressed_size / original_size
# Usage
compressor = CSVCompressor()
# Compress file
compressed_path = compressor.compress_file('data.csv', 'data.csv.gz', 'gzip')
ratio = compressor.get_compression_ratio('data.csv', compressed_path)
print(f"Compression ratio: {ratio:.2f}")
Pandas Compression:
class PandasCompressor:
"""Pandas-based CSV compression"""
@staticmethod
def compress_csv(input_path: str, output_path: str, compression: str = 'gzip') -> str:
"""Compress CSV using pandas"""
df = pd.read_csv(input_path)
if compression == 'gzip':
df.to_csv(output_path, compression='gzip', index=False)
elif compression == 'bz2':
df.to_csv(output_path, compression='bz2', index=False)
elif compression == 'xz':
df.to_csv(output_path, compression='xz', index=False)
else:
raise ValueError(f"Unsupported compression: {compression}")
return output_path
@staticmethod
def read_compressed_csv(file_path: str, compression: str = 'gzip') -> pd.DataFrame:
"""Read compressed CSV using pandas"""
return pd.read_csv(file_path, compression=compression)
Data Compression
Column Compression:
class ColumnCompressor:
"""Column-level compression for CSV data"""
@staticmethod
def compress_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Compress DataFrame columns"""
compressed_df = df.copy()
for col in compressed_df.columns:
if compressed_df[col].dtype == 'object':
# Try to compress object columns
compressed_df[col] = ColumnCompressor.compress_object_column(compressed_df[col])
elif compressed_df[col].dtype == 'int64':
# Compress integer columns
compressed_df[col] = ColumnCompressor.compress_int_column(compressed_df[col])
elif compressed_df[col].dtype == 'float64':
# Compress float columns
compressed_df[col] = ColumnCompressor.compress_float_column(compressed_df[col])
return compressed_df
@staticmethod
def compress_object_column(series: pd.Series) -> pd.Series:
"""Compress object column"""
# Convert to category if appropriate
if series.nunique() / len(series) < 0.5:
return series.astype('category')
# Convert to string if not already
return series.astype('string')
@staticmethod
def compress_int_column(series: pd.Series) -> pd.Series:
"""Compress integer column"""
if series.min() >= 0:
if series.max() < 255:
return series.astype('uint8')
elif series.max() < 65535:
return series.astype('uint16')
elif series.max() < 4294967295:
return series.astype('uint32')
else:
if series.min() > -128 and series.max() < 127:
return series.astype('int8')
elif series.min() > -32768 and series.max() < 32767:
return series.astype('int16')
elif series.min() > -2147483648 and series.max() < 2147483647:
return series.astype('int32')
return series
@staticmethod
def compress_float_column(series: pd.Series) -> pd.Series:
"""Compress float column"""
return series.astype('float32')
Parallel Processing
Multi-Threading
Threaded CSV Processing:
import threading
import queue
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any, Callable
class ThreadedCSVProcessor:
"""Multi-threaded CSV processor"""
def __init__(self, num_threads: int = 4):
self.num_threads = num_threads
self.results_queue = queue.Queue()
self.error_queue = queue.Queue()
def process_csv_parallel(self, file_path: str, process_func: Callable) -> List[Any]:
"""Process CSV file in parallel"""
# Read file and split into chunks
chunks = self.split_file_into_chunks(file_path)
# Process chunks in parallel
with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
futures = []
for i, chunk in enumerate(chunks):
future = executor.submit(self.process_chunk, chunk, process_func, i)
futures.append(future)
# Collect results
results = []
for future in as_completed(futures):
try:
result = future.result()
results.extend(result)
except Exception as e:
print(f"Error processing chunk: {e}")
return results
def split_file_into_chunks(self, file_path: str, chunk_size: int = 1000) -> List[List[Dict[str, Any]]]:
"""Split CSV file into chunks"""
chunks = []
current_chunk = []
with open(file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
current_chunk.append(row)
if len(current_chunk) >= chunk_size:
chunks.append(current_chunk)
current_chunk = []
if current_chunk:
chunks.append(current_chunk)
return chunks
def process_chunk(self, chunk: List[Dict[str, Any]], process_func: Callable, chunk_id: int) -> List[Any]:
"""Process a single chunk"""
results = []
for row in chunk:
try:
result = process_func(row)
results.append(result)
except Exception as e:
print(f"Error processing row in chunk {chunk_id}: {e}")
return results
Multi-Processing
Process-Based CSV Processing:
import multiprocessing as mp
from multiprocessing import Pool, Manager
from typing import List, Dict, Any, Callable
class ProcessedCSVProcessor:
"""Multi-process CSV processor"""
def __init__(self, num_processes: int = None):
self.num_processes = num_processes or mp.cpu_count()
def process_csv_parallel(self, file_path: str, process_func: Callable) -> List[Any]:
"""Process CSV file using multiple processes"""
# Split file into chunks
chunks = self.split_file_into_chunks(file_path)
# Process chunks in parallel
with Pool(processes=self.num_processes) as pool:
results = pool.map(self.process_chunk_wrapper,
[(chunk, process_func) for chunk in chunks])
# Flatten results
flattened_results = []
for result in results:
flattened_results.extend(result)
return flattened_results
def process_chunk_wrapper(self, args):
"""Wrapper for process chunk function"""
chunk, process_func = args
return self.process_chunk(chunk, process_func)
def split_file_into_chunks(self, file_path: str, chunk_size: int = 1000) -> List[List[Dict[str, Any]]]:
"""Split CSV file into chunks"""
chunks = []
current_chunk = []
with open(file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
current_chunk.append(row)
if len(current_chunk) >= chunk_size:
chunks.append(current_chunk)
current_chunk = []
if current_chunk:
chunks.append(current_chunk)
return chunks
def process_chunk(self, chunk: List[Dict[str, Any]], process_func: Callable) -> List[Any]:
"""Process a single chunk"""
results = []
for row in chunk:
try:
result = process_func(row)
results.append(result)
except Exception as e:
print(f"Error processing row: {e}")
return results
Performance Monitoring
Benchmarking Tools
Performance Benchmarking:
import time
import psutil
import os
from typing import Dict, Any, Callable
import pandas as pd
class CSVPerformanceBenchmark:
"""CSV performance benchmarking tool"""
def __init__(self):
self.results = {}
self.process = psutil.Process(os.getpid())
def benchmark_function(self, func: Callable, *args, **kwargs) -> Dict[str, Any]:
"""Benchmark a function"""
# Get initial memory usage
initial_memory = self.process.memory_info().rss / 1024 / 1024
# Start timing
start_time = time.time()
start_cpu = time.process_time()
# Execute function
result = func(*args, **kwargs)
# End timing
end_time = time.time()
end_cpu = time.process_time()
# Get final memory usage
final_memory = self.process.memory_info().rss / 1024 / 1024
# Calculate metrics
wall_time = end_time - start_time
cpu_time = end_cpu - start_cpu
memory_used = final_memory - initial_memory
benchmark_result = {
'wall_time': wall_time,
'cpu_time': cpu_time,
'memory_used': memory_used,
'result_size': len(result) if hasattr(result, '__len__') else 1
}
return benchmark_result
def compare_methods(self, methods: Dict[str, Callable], *args, **kwargs) -> Dict[str, Any]:
"""Compare multiple methods"""
results = {}
for name, method in methods.items():
print(f"Benchmarking {name}...")
result = self.benchmark_function(method, *args, **kwargs)
results[name] = result
return results
def generate_report(self, results: Dict[str, Any]) -> str:
"""Generate performance report"""
report = "CSV Performance Benchmark Report\n"
report += "=" * 40 + "\n\n"
for method, metrics in results.items():
report += f"Method: {method}\n"
report += f" Wall Time: {metrics['wall_time']:.4f} seconds\n"
report += f" CPU Time: {metrics['cpu_time']:.4f} seconds\n"
report += f" Memory Used: {metrics['memory_used']:.2f} MB\n"
report += f" Result Size: {metrics['result_size']}\n\n"
return report
# Usage
benchmark = CSVPerformanceBenchmark()
def method1(file_path):
return pd.read_csv(file_path)
def method2(file_path):
return pd.read_csv(file_path, chunksize=1000)
methods = {
'pandas_full': method1,
'pandas_chunked': method2
}
results = benchmark.compare_methods(methods, 'large_file.csv')
report = benchmark.generate_report(results)
print(report)
Conclusion
CSV performance optimization is essential for handling large datasets efficiently. By implementing the techniques and strategies outlined in this guide, you can significantly improve your CSV processing speed, reduce memory usage, and build more scalable applications.
Key Takeaways:
- Choose the Right Strategy: Use streaming for large files, batch loading for smaller files
- Optimize Memory Usage: Use appropriate data types and implement memory management
- Implement Indexing: Use row-based and column-based indexing for fast access
- Apply Compression: Use file and data compression to reduce storage and I/O
- Use Parallel Processing: Leverage multi-threading and multi-processing for better performance
Next Steps:
- Profile Your Code: Use benchmarking tools to identify bottlenecks
- Implement Optimizations: Apply the techniques that best fit your use case
- Monitor Performance: Continuously monitor and optimize your CSV processing
- Test with Real Data: Validate optimizations with actual data and workloads
- Scale Gradually: Start with small optimizations and scale up as needed
For more CSV data processing tools and guides, explore our CSV Tools Hub or try our CSV Validator for instant data validation.