Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions adata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from adata.__version__ import __version__
from adata.bond import bond
from adata.common.utils import set_rate_limit
from adata.common.utils.sunrequests import SunProxy
from adata.fund import fund
from adata.sentiment import sentiment
Expand All @@ -33,6 +34,31 @@ def proxy(is_proxy=False, ip: str = None, proxy_url: str = None):
return


def rate_limit(domain=None, max_requests=30, window_seconds=60):
"""
设置请求频率限制
:param domain: 域名,如 'data.eastmoney.com';为None则设置全局默认值
:param max_requests: 窗口期内最大请求数,默认30次
:param window_seconds: 时间窗口(秒),默认60秒
:return:

使用示例:
import adata

# 设置全局默认限制:每分钟30次
adata.rate_limit(max_requests=30, window_seconds=60)

# 设置特定域名的限制
adata.rate_limit('data.eastmoney.com', max_requests=20, window_seconds=60)
adata.rate_limit('push2.eastmoney.com', max_requests=40, window_seconds=60)

# 取消某个域名的限制(设置为0表示无限制)
adata.rate_limit('api.example.com', max_requests=0)
"""
set_rate_limit(domain, max_requests, window_seconds)
return


# set up logging
logger = logging.getLogger("adata")

Expand Down
1 change: 1 addition & 0 deletions adata/common/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
"""
from .snowflake import worker
from .sunrequests import sun_requests as requests
from .sunrequests import set_rate_limit


136 changes: 135 additions & 1 deletion adata/common/utils/sunrequests.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import threading
import time
from collections import defaultdict
from urllib.parse import urlparse

import requests

Expand Down Expand Up @@ -41,12 +43,137 @@ def delete(cls, key):
del cls._data[key]


class RateLimiter(object):
"""
基于域名的频率限制器
使用滑动窗口算法控制每个域名的请求频率
"""
_instance = None
_instance_lock = threading.Lock()

def __new__(cls, *args, **kwargs):
if not cls._instance:
with cls._instance_lock:
if not cls._instance:
cls._instance = super().__new__(cls)
cls._instance._init()
return cls._instance

def _init(self):
# 每个域名的请求时间戳列表 {domain: [timestamp1, timestamp2, ...]}
self._domain_requests = defaultdict(list)
# 每个域名的限制配置 {domain: {'max_requests': 30, 'window_seconds': 60}}
self._domain_limits = {}
# 默认限制:每分钟30次
self._default_max_requests = 30
self._default_window_seconds = 60
self._lock = threading.Lock()

def set_limit(self, domain, max_requests=30, window_seconds=60):
"""
设置指定域名的频率限制
:param domain: 域名,如 'data.eastmoney.com'
:param max_requests: 窗口期内最大请求数,默认30
:param window_seconds: 时间窗口(秒),默认60
"""
with self._lock:
self._domain_limits[domain] = {
'max_requests': max_requests,
'window_seconds': window_seconds
}

def set_default_limit(self, max_requests=30, window_seconds=60):
"""
设置默认的频率限制
:param max_requests: 窗口期内最大请求数,默认30
:param window_seconds: 时间窗口(秒),默认60
"""
self._default_max_requests = max_requests
self._default_window_seconds = window_seconds

def get_limit(self, domain):
"""获取指定域名的限制配置"""
with self._lock:
if domain in self._domain_limits:
return self._domain_limits[domain]
return {
'max_requests': self._default_max_requests,
'window_seconds': self._default_window_seconds
}

def acquire(self, domain):
"""
获取请求许可,如果超过限制则等待
:param domain: 请求的域名
:return: 实际等待的时间(秒)
"""
limit = self.get_limit(domain)
max_requests = limit['max_requests']
window_seconds = limit['window_seconds']

with self._lock:
now = time.time()
requests_list = self._domain_requests[domain]

# 清理窗口期外的旧记录
cutoff = now - window_seconds
self._domain_requests[domain] = [t for t in requests_list if t > cutoff]

# 检查是否需要等待
if len(self._domain_requests[domain]) >= max_requests:
# 计算需要等待的时间
oldest = min(self._domain_requests[domain])
wait_time = oldest + window_seconds - now
if wait_time > 0:
return wait_time

# 记录当前请求
self._domain_requests[domain].append(now)
return 0

def wait_if_needed(self, domain):
"""
如果需要则等待,直到可以发送请求
:param domain: 请求的域名
"""
wait_time = self.acquire(domain)
while wait_time > 0:
time.sleep(wait_time)
wait_time = self.acquire(domain)


def set_rate_limit(domain=None, max_requests=30, window_seconds=60):
"""
设置频率限制
:param domain: 域名,如 'data.eastmoney.com';为None则设置全局默认值
:param max_requests: 窗口期内最大请求数
:param window_seconds: 时间窗口(秒)

使用示例:
# 设置全局默认限制:每分钟30次
set_rate_limit(max_requests=30, window_seconds=60)

# 设置特定域名的限制
set_rate_limit('data.eastmoney.com', max_requests=20, window_seconds=60)

# 设置某个域名无限制
set_rate_limit('api.example.com', max_requests=0, window_seconds=60)
"""
limiter = RateLimiter()
if domain is None:
limiter.set_default_limit(max_requests, window_seconds)
else:
limiter.set_limit(domain, max_requests, window_seconds)


class SunRequests(object):
def __init__(self, sun_proxy: SunProxy = None) -> None:
super().__init__()
self.sun_proxy = sun_proxy
self._rate_limiter = RateLimiter()

def request(self, method='get', url=None, times=3, retry_wait_time=1588, proxies=None, wait_time=None, **kwargs):
def request(self, method='get', url=None, times=3, retry_wait_time=1588, proxies=None, wait_time=None,
rate_limit=True, **kwargs):
"""
简单封装的请求,参考requests,增加循环次数和次数之间的等待时间
:param proxies: 代理配置
Expand All @@ -55,9 +182,16 @@ def request(self, method='get', url=None, times=3, retry_wait_time=1588, proxies
:param times: 次数,int
:param retry_wait_time: 重试等待时间,毫秒
:param wait_time: 等待时间:毫秒;表示每个请求的间隔时间,在请求之前等待sleep,主要用于防止请求太频繁的限制。
:param rate_limit: 是否启用频率限制,默认True;可设置为False禁用
:param kwargs: 其它 requests 参数,用法相同
:return: res
"""
# 0. 频率限制检查
if rate_limit and url:
domain = urlparse(url).netloc
if domain:
self._rate_limiter.wait_if_needed(domain)

# 1. 获取设置代理
proxies = self.__get_proxies(proxies)
# 2. 请求数据结果
Expand Down