This guide covers performance optimization techniques for ForexSmartBot to ensure optimal performance in production environments.
- Performance Overview
- Code Optimization
- Data Processing
- Memory Management
- Database Optimization
- UI Performance
- Backtesting Performance
- Monitoring & Profiling
- Best Practices
- Startup Time: < 3 seconds
- Memory Usage: < 200MB typical
- Response Time: < 100ms for most operations
- Backtesting Speed: 1000+ bars per second
- Chart Rendering: < 500ms for typical charts
- Data Loading: < 1 second for 1 year of daily data
- Data Processing: Large datasets and complex calculations
- Memory Usage: Inefficient data structures and memory leaks
- UI Responsiveness: Blocking operations on main thread
- Database Operations: Slow queries and inefficient storage
- Network I/O: Slow data provider responses
-
Use vectorized operations:
# Slow - loop-based def calculate_sma_slow(prices, period): sma = [] for i in range(period-1, len(prices)): sma.append(sum(prices[i-period+1:i+1]) / period) return sma # Fast - vectorized def calculate_sma_fast(prices, period): return prices.rolling(window=period).mean()
-
Avoid unnecessary calculations:
# Slow - recalculates every time def get_volatility(df): return df['Close'].pct_change().std() # Fast - caches result @lru_cache(maxsize=128) def get_volatility_cached(df_hash, period): return df['Close'].pct_change().std()
-
Use appropriate data structures:
# Slow - list for frequent lookups positions = [] for pos in positions: if pos.symbol == symbol: return pos # Fast - dictionary for O(1) lookups positions = {} if symbol in positions: return positions[symbol]
-
Minimize function calls:
# Slow - multiple function calls def process_data(df): df = df.dropna() df = df.reset_index() df = df.sort_values('Date') return df # Fast - chained operations def process_data(df): return (df.dropna() .reset_index() .sort_values('Date'))
-
Use generators for large datasets:
# Slow - loads all data into memory def process_large_dataset(data): results = [] for item in data: results.append(process_item(item)) return results # Fast - generator for memory efficiency def process_large_dataset(data): for item in data: yield process_item(item)
-
Optimize hot paths:
# Profile to identify hot paths import cProfile def profile_function(): # Your code here pass cProfile.run('profile_function()')
-
Use appropriate data types:
# Slow - default float64 df['Close'] = df['Close'].astype('float64') # Fast - use float32 for price data df['Close'] = df['Close'].astype('float32')
-
Optimize DataFrame operations:
# Slow - multiple operations df['SMA_10'] = df['Close'].rolling(10).mean() df['SMA_20'] = df['Close'].rolling(20).mean() df['SMA_50'] = df['Close'].rolling(50).mean() # Fast - single operation df[['SMA_10', 'SMA_20', 'SMA_50']] = df['Close'].rolling([10, 20, 50]).mean()
-
Use categorical data for strings:
# Slow - string objects df['Symbol'] = df['Symbol'].astype('object') # Fast - categorical data df['Symbol'] = df['Symbol'].astype('category')
-
Optimize groupby operations:
# Slow - multiple groupby operations result1 = df.groupby('Symbol')['Close'].mean() result2 = df.groupby('Symbol')['Volume'].sum() # Fast - single groupby operation result = df.groupby('Symbol').agg({ 'Close': 'mean', 'Volume': 'sum' })
-
Use chunking for large files:
def load_large_csv(filename, chunk_size=10000): chunks = [] for chunk in pd.read_csv(filename, chunksize=chunk_size): # Process chunk processed_chunk = process_chunk(chunk) chunks.append(processed_chunk) return pd.concat(chunks, ignore_index=True)
-
Use parallel processing:
from multiprocessing import Pool def process_data_parallel(data_chunks): with Pool() as pool: results = pool.map(process_chunk, data_chunks) return results
-
Cache frequently used data:
from functools import lru_cache @lru_cache(maxsize=128) def get_cached_data(symbol, start_date, end_date): return data_provider.get_data(symbol, start_date, end_date)
-
Use memory-efficient data types:
# Memory usage comparison import sys # float64 - 8 bytes per value df_float64 = pd.DataFrame({'Close': [1.0] * 1000000}) print(f"float64: {sys.getsizeof(df_float64)} bytes") # float32 - 4 bytes per value df_float32 = pd.DataFrame({'Close': [1.0] * 1000000}, dtype='float32') print(f"float32: {sys.getsizeof(df_float32)} bytes")
-
Clear unused variables:
def process_data(df): # Process data result = df.groupby('Symbol').mean() # Clear large variables del df import gc gc.collect() return result
-
Use generators for large datasets:
def process_large_dataset(data): for chunk in data: yield process_chunk(chunk)
-
Monitor memory usage:
import psutil import os def monitor_memory(): process = psutil.Process(os.getpid()) memory_info = process.memory_info() print(f"Memory usage: {memory_info.rss / 1024 / 1024:.1f} MB")
-
Set memory limits:
import resource def set_memory_limit(mb): resource.setrlimit(resource.RLIMIT_AS, (mb * 1024 * 1024, -1))
-
Use memory profiling:
from memory_profiler import profile @profile def memory_intensive_function(): # Your code here pass
-
Use appropriate data types:
CREATE TABLE trades ( id INTEGER PRIMARY KEY, symbol TEXT, side INTEGER, quantity REAL, entry_price REAL, exit_price REAL, pnl REAL, entry_time TIMESTAMP, exit_time TIMESTAMP );
-
Create indexes:
CREATE INDEX idx_trades_symbol ON trades(symbol); CREATE INDEX idx_trades_entry_time ON trades(entry_time); CREATE INDEX idx_trades_symbol_time ON trades(symbol, entry_time);
-
Use prepared statements:
def insert_trade(trade): cursor.execute(""" INSERT INTO trades (symbol, side, quantity, entry_price, exit_price, pnl, entry_time, exit_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, (trade.symbol, trade.side, trade.quantity, trade.entry_price, trade.exit_price, trade.pnl, trade.entry_time, trade.exit_time))
-
Batch operations:
def insert_trades_batch(trades): cursor.executemany(""" INSERT INTO trades (symbol, side, quantity, entry_price, exit_price, pnl, entry_time, exit_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, trades)
- Use connection pooling:
from sqlite3 import connect from threading import Lock class ConnectionPool: def __init__(self, db_path, max_connections=10): self.db_path = db_path self.max_connections = max_connections self.connections = [] self.lock = Lock() def get_connection(self): with self.lock: if self.connections: return self.connections.pop() else: return connect(self.db_path) def return_connection(self, conn): with self.lock: if len(self.connections) < self.max_connections: self.connections.append(conn) else: conn.close()
-
Use threading for long operations:
from PyQt6.QtCore import QThread, pyqtSignal class DataThread(QThread): data_ready = pyqtSignal(pd.DataFrame) def run(self): # Long-running operation data = self.load_data() self.data_ready.emit(data)
-
Optimize chart rendering:
def update_chart(self, df): # Limit data points for performance if len(df) > 1000: df = df.tail(1000) # Use efficient plotting self.axes.clear() self.axes.plot(df.index, df['Close'], linewidth=1) self.canvas.draw()
-
Use lazy loading:
def load_data_lazy(self, symbol): if symbol not in self._data_cache: self._data_cache[symbol] = self.load_data(symbol) return self._data_cache[symbol]
-
Use QTimer for periodic updates:
from PyQt6.QtCore import QTimer class MainWindow(QMainWindow): def __init__(self): super().__init__() self.timer = QTimer() self.timer.timeout.connect(self.update_data) self.timer.start(1000) # Update every second
-
Implement progress bars:
from PyQt6.QtWidgets import QProgressBar def long_operation(self): progress = QProgressBar() progress.setRange(0, 100) for i in range(100): # Do work progress.setValue(i) QApplication.processEvents()
-
Vectorize calculations:
def vectorized_backtest(self, df, strategy): # Calculate all indicators at once df = strategy.calculate_indicators(df) # Vectorize signal generation signals = strategy.signal_vectorized(df) # Vectorize position sizing sizes = self.calculate_position_sizes_vectorized(df, signals) return self.simulate_trades_vectorized(df, signals, sizes)
-
Use NumPy for calculations:
import numpy as np def calculate_returns(prices): return np.diff(prices) / prices[:-1] def calculate_sharpe_ratio(returns): return np.mean(returns) / np.std(returns) * np.sqrt(252)
-
Optimize data structures:
def optimized_backtest(self, df): # Use NumPy arrays for calculations prices = df['Close'].values signals = np.zeros(len(prices)) # Vectorized signal generation for i in range(1, len(prices)): signals[i] = self.generate_signal(prices[:i+1]) return self.simulate_trades(prices, signals)
-
Process data in chunks:
def chunked_backtest(self, df, chunk_size=1000): results = [] for i in range(0, len(df), chunk_size): chunk = df.iloc[i:i+chunk_size] result = self.backtest_chunk(chunk) results.append(result) return self.combine_results(results)
-
Use generators for large datasets:
def generator_backtest(self, data_generator): for chunk in data_generator: yield self.backtest_chunk(chunk)
-
Use cProfile for profiling:
import cProfile import pstats def profile_function(): # Your code here pass # Profile function cProfile.run('profile_function()', 'profile_output.prof') # Analyze results stats = pstats.Stats('profile_output.prof') stats.sort_stats('cumulative') stats.print_stats(10)
-
Use line_profiler for line-by-line profiling:
from line_profiler import LineProfiler def profile_lines(): profiler = LineProfiler() profiler.add_function(your_function) profiler.run('your_function()') profiler.print_stats()
-
Monitor system resources:
import psutil import time def monitor_resources(): while True: cpu_percent = psutil.cpu_percent() memory = psutil.virtual_memory() disk = psutil.disk_usage('/') print(f"CPU: {cpu_percent}%, Memory: {memory.percent}%, Disk: {disk.percent}%") time.sleep(1)
-
Track key metrics:
class PerformanceMetrics: def __init__(self): self.start_time = time.time() self.operation_times = {} def start_operation(self, name): self.operation_times[name] = time.time() def end_operation(self, name): if name in self.operation_times: duration = time.time() - self.operation_times[name] print(f"{name}: {duration:.3f}s")
-
Log performance data:
import logging def log_performance(operation, duration): logger = logging.getLogger('performance') logger.info(f"{operation}: {duration:.3f}s")
-
Profile before optimizing:
- Identify actual bottlenecks
- Measure before and after
- Focus on hot paths
-
Use appropriate data structures:
- Lists for ordered data
- Sets for membership testing
- Dictionaries for key-value lookups
- NumPy arrays for numerical data
-
Avoid premature optimization:
- Write clear, readable code first
- Profile to identify bottlenecks
- Optimize only what needs optimization
-
Separate concerns:
- Data processing
- Business logic
- UI updates
- I/O operations
-
Use caching strategically:
- Cache expensive calculations
- Cache frequently accessed data
- Use appropriate cache sizes
-
Optimize imports:
# Slow - imports at function level def process_data(): import pandas as pd import numpy as np # Process data # Fast - imports at module level import pandas as pd import numpy as np def process_data(): # Process data
-
Use context managers:
with open('data.csv') as f: data = pd.read_csv(f) # File automatically closed
-
Clear large variables:
def process_large_data(): large_data = load_large_dataset() result = process_data(large_data) del large_data # Clear from memory return result
-
Use generators for large datasets:
def process_large_file(filename): with open(filename) as f: for line in f: yield process_line(line)
-
Use transactions:
def batch_insert_trades(trades): conn = sqlite3.connect('trades.db') try: conn.execute('BEGIN TRANSACTION') for trade in trades: insert_trade(conn, trade) conn.commit() except Exception as e: conn.rollback() raise e finally: conn.close()
-
Use prepared statements:
def insert_trade_prepared(conn, trade): cursor = conn.cursor() cursor.execute(""" INSERT INTO trades (symbol, side, quantity, entry_price, exit_price, pnl, entry_time, exit_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, (trade.symbol, trade.side, trade.quantity, trade.entry_price, trade.exit_price, trade.pnl, trade.entry_time, trade.exit_time))
-
Use QTimer for updates:
from PyQt6.QtCore import QTimer class MainWindow(QMainWindow): def __init__(self): super().__init__() self.timer = QTimer() self.timer.timeout.connect(self.update_ui) self.timer.start(100) # Update every 100ms
-
Implement lazy loading:
def load_data_lazy(self, symbol): if symbol not in self._data_cache: self._data_cache[symbol] = self.load_data(symbol) return self._data_cache[symbol]
-
Use threading for long operations:
from PyQt6.QtCore import QThread, pyqtSignal class DataThread(QThread): data_ready = pyqtSignal(pd.DataFrame) def run(self): data = self.load_data() self.data_ready.emit(data)
-
Create benchmarks:
import time def benchmark_function(func, *args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() print(f"{func.__name__}: {end_time - start_time:.3f}s") return result
-
Compare implementations:
def compare_implementations(): data = generate_test_data() # Test implementation 1 result1 = benchmark_function(implementation1, data) # Test implementation 2 result2 = benchmark_function(implementation2, data) # Compare results assert np.allclose(result1, result2)
-
Test with large datasets:
def test_large_dataset(): # Generate large dataset data = generate_large_dataset(1000000) # Test performance start_time = time.time() result = process_data(data) end_time = time.time() print(f"Processed {len(data)} records in {end_time - start_time:.3f}s")
-
Test memory usage:
def test_memory_usage(): import psutil import os process = psutil.Process(os.getpid()) initial_memory = process.memory_info().rss # Process data result = process_large_dataset() final_memory = process.memory_info().rss memory_used = (final_memory - initial_memory) / 1024 / 1024 print(f"Memory used: {memory_used:.1f} MB")
Performance optimization is an ongoing process that requires:
- Measurement: Always measure before optimizing
- Profiling: Identify actual bottlenecks
- Iteration: Optimize, measure, repeat
- Monitoring: Continuously monitor performance
- Testing: Test with realistic data and loads
Remember that premature optimization can lead to complex, hard-to-maintain code. Focus on clear, readable code first, then optimize only what needs optimization based on actual performance measurements.
Note: This guide provides general optimization techniques. Always profile your specific use case to identify the most effective optimizations for your particular application.