From 2b4da97aa89584a92634c1d01083f45f716e7ff6 Mon Sep 17 00:00:00 2001 From: gwjjyo Date: Fri, 13 Mar 2026 15:24:32 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20[=E5=8A=9F=E8=83=BD=E6=8F=8F=E8=BF=B0]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- adata/__init__.py | 26 ++++++ adata/common/utils/__init__.py | 1 + adata/common/utils/sunrequests.py | 136 +++++++++++++++++++++++++++++- 3 files changed, 162 insertions(+), 1 deletion(-) diff --git a/adata/__init__.py b/adata/__init__.py index dee08e2..f57611b 100644 --- a/adata/__init__.py +++ b/adata/__init__.py @@ -10,6 +10,7 @@ from adata.__version__ import __version__ from adata.bond import bond +from adata.common.utils import set_rate_limit from adata.common.utils.sunrequests import SunProxy from adata.fund import fund from adata.sentiment import sentiment @@ -33,6 +34,31 @@ def proxy(is_proxy=False, ip: str = None, proxy_url: str = None): return +def rate_limit(domain=None, max_requests=30, window_seconds=60): + """ + 设置请求频率限制 + :param domain: 域名,如 'data.eastmoney.com';为None则设置全局默认值 + :param max_requests: 窗口期内最大请求数,默认30次 + :param window_seconds: 时间窗口(秒),默认60秒 + :return: + + 使用示例: + import adata + + # 设置全局默认限制:每分钟30次 + adata.rate_limit(max_requests=30, window_seconds=60) + + # 设置特定域名的限制 + adata.rate_limit('data.eastmoney.com', max_requests=20, window_seconds=60) + adata.rate_limit('push2.eastmoney.com', max_requests=40, window_seconds=60) + + # 取消某个域名的限制(设置为0表示无限制) + adata.rate_limit('api.example.com', max_requests=0) + """ + set_rate_limit(domain, max_requests, window_seconds) + return + + # set up logging logger = logging.getLogger("adata") diff --git a/adata/common/utils/__init__.py b/adata/common/utils/__init__.py index 9b4eeb9..2bd404a 100644 --- a/adata/common/utils/__init__.py +++ b/adata/common/utils/__init__.py @@ -7,5 +7,6 @@ """ from .snowflake import worker from .sunrequests import sun_requests as requests +from .sunrequests import set_rate_limit diff --git a/adata/common/utils/sunrequests.py b/adata/common/utils/sunrequests.py index 9fec0ed..5c92a66 100644 --- a/adata/common/utils/sunrequests.py +++ b/adata/common/utils/sunrequests.py @@ -10,6 +10,8 @@ import threading import time +from collections import defaultdict +from urllib.parse import urlparse import requests @@ -41,12 +43,137 @@ def delete(cls, key): del cls._data[key] +class RateLimiter(object): + """ + 基于域名的频率限制器 + 使用滑动窗口算法控制每个域名的请求频率 + """ + _instance = None + _instance_lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + if not cls._instance: + with cls._instance_lock: + if not cls._instance: + cls._instance = super().__new__(cls) + cls._instance._init() + return cls._instance + + def _init(self): + # 每个域名的请求时间戳列表 {domain: [timestamp1, timestamp2, ...]} + self._domain_requests = defaultdict(list) + # 每个域名的限制配置 {domain: {'max_requests': 30, 'window_seconds': 60}} + self._domain_limits = {} + # 默认限制:每分钟30次 + self._default_max_requests = 30 + self._default_window_seconds = 60 + self._lock = threading.Lock() + + def set_limit(self, domain, max_requests=30, window_seconds=60): + """ + 设置指定域名的频率限制 + :param domain: 域名,如 'data.eastmoney.com' + :param max_requests: 窗口期内最大请求数,默认30 + :param window_seconds: 时间窗口(秒),默认60 + """ + with self._lock: + self._domain_limits[domain] = { + 'max_requests': max_requests, + 'window_seconds': window_seconds + } + + def set_default_limit(self, max_requests=30, window_seconds=60): + """ + 设置默认的频率限制 + :param max_requests: 窗口期内最大请求数,默认30 + :param window_seconds: 时间窗口(秒),默认60 + """ + self._default_max_requests = max_requests + self._default_window_seconds = window_seconds + + def get_limit(self, domain): + """获取指定域名的限制配置""" + with self._lock: + if domain in self._domain_limits: + return self._domain_limits[domain] + return { + 'max_requests': self._default_max_requests, + 'window_seconds': self._default_window_seconds + } + + def acquire(self, domain): + """ + 获取请求许可,如果超过限制则等待 + :param domain: 请求的域名 + :return: 实际等待的时间(秒) + """ + limit = self.get_limit(domain) + max_requests = limit['max_requests'] + window_seconds = limit['window_seconds'] + + with self._lock: + now = time.time() + requests_list = self._domain_requests[domain] + + # 清理窗口期外的旧记录 + cutoff = now - window_seconds + self._domain_requests[domain] = [t for t in requests_list if t > cutoff] + + # 检查是否需要等待 + if len(self._domain_requests[domain]) >= max_requests: + # 计算需要等待的时间 + oldest = min(self._domain_requests[domain]) + wait_time = oldest + window_seconds - now + if wait_time > 0: + return wait_time + + # 记录当前请求 + self._domain_requests[domain].append(now) + return 0 + + def wait_if_needed(self, domain): + """ + 如果需要则等待,直到可以发送请求 + :param domain: 请求的域名 + """ + wait_time = self.acquire(domain) + while wait_time > 0: + time.sleep(wait_time) + wait_time = self.acquire(domain) + + +def set_rate_limit(domain=None, max_requests=30, window_seconds=60): + """ + 设置频率限制 + :param domain: 域名,如 'data.eastmoney.com';为None则设置全局默认值 + :param max_requests: 窗口期内最大请求数 + :param window_seconds: 时间窗口(秒) + + 使用示例: + # 设置全局默认限制:每分钟30次 + set_rate_limit(max_requests=30, window_seconds=60) + + # 设置特定域名的限制 + set_rate_limit('data.eastmoney.com', max_requests=20, window_seconds=60) + + # 设置某个域名无限制 + set_rate_limit('api.example.com', max_requests=0, window_seconds=60) + """ + limiter = RateLimiter() + if domain is None: + limiter.set_default_limit(max_requests, window_seconds) + else: + limiter.set_limit(domain, max_requests, window_seconds) + + class SunRequests(object): def __init__(self, sun_proxy: SunProxy = None) -> None: super().__init__() self.sun_proxy = sun_proxy + self._rate_limiter = RateLimiter() - def request(self, method='get', url=None, times=3, retry_wait_time=1588, proxies=None, wait_time=None, **kwargs): + def request(self, method='get', url=None, times=3, retry_wait_time=1588, proxies=None, wait_time=None, + rate_limit=True, **kwargs): """ 简单封装的请求,参考requests,增加循环次数和次数之间的等待时间 :param proxies: 代理配置 @@ -55,9 +182,16 @@ def request(self, method='get', url=None, times=3, retry_wait_time=1588, proxies :param times: 次数,int :param retry_wait_time: 重试等待时间,毫秒 :param wait_time: 等待时间:毫秒;表示每个请求的间隔时间,在请求之前等待sleep,主要用于防止请求太频繁的限制。 + :param rate_limit: 是否启用频率限制,默认True;可设置为False禁用 :param kwargs: 其它 requests 参数,用法相同 :return: res """ + # 0. 频率限制检查 + if rate_limit and url: + domain = urlparse(url).netloc + if domain: + self._rate_limiter.wait_if_needed(domain) + # 1. 获取设置代理 proxies = self.__get_proxies(proxies) # 2. 请求数据结果