-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata_manager.py
More file actions
230 lines (187 loc) · 7.44 KB
/
data_manager.py
File metadata and controls
230 lines (187 loc) · 7.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
"""
Data Manager Module
Enhanced market data management with caching, retry logic, and error handling.
"""
import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
import time
import config
from logger import system_logger
class DataManager:
"""
Manage market data with caching and error handling.
"""
def __init__(self):
"""Initialize data manager with stock universe and cache."""
self.stock_universe = config.DEFAULT_STOCK_UNIVERSE.copy()
self.cache = {}
self.cache_timestamps = {}
def _is_cache_valid(self, key: str, ttl: int) -> bool:
"""
Check if cached data is still valid.
Args:
key: Cache key
ttl: Time to live in seconds
Returns:
True if cache is valid
"""
if key not in self.cache_timestamps:
return False
age = (datetime.now() - self.cache_timestamps[key]).total_seconds()
return age < ttl
def _get_from_cache(self, key: str, ttl: int) -> Optional[Any]:
"""
Get data from cache if valid.
Args:
key: Cache key
ttl: Time to live in seconds
Returns:
Cached data or None
"""
if key in self.cache and self._is_cache_valid(key, ttl):
system_logger.debug(f"Cache hit: {key}")
return self.cache[key]
return None
def _save_to_cache(self, key: str, data: Any):
"""
Save data to cache.
Args:
key: Cache key
data: Data to cache
"""
self.cache[key] = data
self.cache_timestamps[key] = datetime.now()
# Limit cache size
if len(self.cache) > config.CACHE_MAX_SIZE:
# Remove oldest entry
oldest_key = min(self.cache_timestamps, key=self.cache_timestamps.get)
del self.cache[oldest_key]
del self.cache_timestamps[oldest_key]
def get_quote(self, symbol: str) -> Optional[Dict[str, Any]]:
"""
Get real-time quote with caching and error handling.
Args:
symbol: Stock symbol
Returns:
Quote dictionary or None if error
"""
cache_key = f"quote_{symbol}"
# Check cache
cached_data = self._get_from_cache(cache_key, config.CACHE_TTL_QUOTE)
if cached_data:
return cached_data
# Fetch fresh data with retry logic
for attempt in range(config.API_RETRY_ATTEMPTS):
try:
ticker = yf.Ticker(symbol)
info = ticker.info
quote = {
'symbol': symbol,
'price': info.get('currentPrice', 0),
'open': info.get('open', 0),
'high': info.get('dayHigh', 0),
'low': info.get('dayLow', 0),
'volume': info.get('volume', 0),
'change': info.get('regularMarketChange', 0),
'change_pct': info.get('regularMarketChangePercent', 0) / 100 if info.get('regularMarketChangePercent') else 0,
'bid': info.get('bid', 0),
'ask': info.get('ask', 0),
'market_cap': info.get('marketCap', 0),
}
# Cache the result
self._save_to_cache(cache_key, quote)
system_logger.debug(f"Fetched quote for {symbol}: ${quote['price']:.2f}")
return quote
except Exception as e:
system_logger.warning(
f"Attempt {attempt + 1}/{config.API_RETRY_ATTEMPTS} failed for {symbol}: {str(e)}"
)
if attempt < config.API_RETRY_ATTEMPTS - 1:
time.sleep(config.API_RETRY_DELAY * (attempt + 1))
else:
system_logger.error(f"Failed to fetch quote for {symbol} after {config.API_RETRY_ATTEMPTS} attempts")
return None
def get_historical_data(
self,
symbol: str,
days: int = 180
) -> Optional[pd.DataFrame]:
"""
Get historical price data with caching and error handling.
Args:
symbol: Stock symbol
days: Number of days of history
Returns:
DataFrame with historical data or None if error
"""
cache_key = f"hist_{symbol}_{days}"
# Check cache
cached_data = self._get_from_cache(cache_key, config.CACHE_TTL_HISTORICAL)
if cached_data is not None:
return cached_data
# Fetch fresh data with retry logic
for attempt in range(config.API_RETRY_ATTEMPTS):
try:
ticker = yf.Ticker(symbol)
df = ticker.history(period=f"{days}d")
if df is None or len(df) == 0:
system_logger.warning(f"No historical data returned for {symbol}")
return None
# Cache the result
self._save_to_cache(cache_key, df)
system_logger.debug(f"Fetched {len(df)} days of history for {symbol}")
return df
except Exception as e:
system_logger.warning(
f"Attempt {attempt + 1}/{config.API_RETRY_ATTEMPTS} failed for {symbol}: {str(e)}"
)
if attempt < config.API_RETRY_ATTEMPTS - 1:
time.sleep(config.API_RETRY_DELAY * (attempt + 1))
else:
system_logger.error(f"Failed to fetch historical data for {symbol}")
return None
def update_stock_universe(self, universe: Dict[str, str]):
"""
Update the stock universe.
Args:
universe: Dictionary of symbol to name mappings
"""
self.stock_universe = universe.copy()
system_logger.info(f"Stock universe updated with {len(universe)} symbols")
def add_symbol(self, symbol: str, name: str):
"""
Add a symbol to the stock universe.
Args:
symbol: Stock symbol
name: Company name
"""
self.stock_universe[symbol] = name
system_logger.info(f"Added {symbol} ({name}) to stock universe")
def remove_symbol(self, symbol: str):
"""
Remove a symbol from the stock universe.
Args:
symbol: Stock symbol to remove
"""
if symbol in self.stock_universe:
del self.stock_universe[symbol]
system_logger.info(f"Removed {symbol} from stock universe")
def clear_cache(self):
"""Clear all cached data."""
self.cache.clear()
self.cache_timestamps.clear()
system_logger.info("Cache cleared")
def get_cache_stats(self) -> Dict[str, Any]:
"""
Get cache statistics.
Returns:
Dictionary with cache stats
"""
return {
'size': len(self.cache),
'max_size': config.CACHE_MAX_SIZE,
'utilization': len(self.cache) / config.CACHE_MAX_SIZE * 100,
'keys': list(self.cache.keys())
}