自定义代理源

源码: https://github.com/x-haose/hs-net/blob/main/examples/proxy/custom_provider.py

custom_provider.py
"""
自定义代理源

实现 ProxyProvider 对接任意代理源(API、Redis、文件等)。
所有引擎均可搭配使用。
"""

from hs_net import EngineEnum, ProxyProvider, ProxyService, SyncNet

# 替换为你自己的代理地址
PROXY_LIST = [
    "http://proxy1:8080",
    "socks5://user:pass@proxy2:1080",
]
TEST_URL = "http://ip-api.com/json/"

# 所有支持同步的引擎
SYNC_ENGINES = [
    EngineEnum.HTTPX,
    EngineEnum.REQUESTS,
    EngineEnum.CURL_CFFI,
    EngineEnum.REQUESTS_GO,
]


class MyProvider(ProxyProvider):
    """自定义代理提供器,轮询返回代理地址。"""

    def __init__(self):
        self._proxies = PROXY_LIST
        self._index = 0

    def get_proxy(self) -> str:
        proxy = self._proxies[self._index % len(self._proxies)]
        self._index += 1
        return proxy


def main():
    svc = ProxyService(provider=MyProvider())

    for engine in SYNC_ENGINES:
        with SyncNet(proxy=svc, engine=engine, retries=0, timeout=30) as net:
            resp = net.get(TEST_URL)
            ip = resp.json_data["query"]
            print(f"{engine.value:12s} -> IP: {ip}")


if __name__ == "__main__":
    main()