依存関係のインストール

Redisがインストールされており、 redis-py Python から Redis を操作するためのライブラリ。pip を使用して必要な依存関係をインストールします。

pip install redis

Redis マネージャーの作成

作ってみましょう RedisManager Redis とのやり取りを処理するクラス。

import redis
import json
import random
import logging

class RedisManager:
    def __init__(self, host='localhost', port=6379, db=0):
        self.db = redis.StrictRedis(host=host, port=port, db=db, decode_responses=True)
        self.logger = logging.getLogger('proxy_manager')
        self.logger.setLevel(logging.INFO)
        if not self.logger.hasHandlers():
            ch = logging.StreamHandler()
            ch.setLevel(logging.INFO)
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)
        self.proxies = []
        self.load_proxy_list()

    def update_proxies(self, filepath):
        proxies = self.read_proxies_from_file(filepath)
        for proxy in proxies:
            if proxy.get('anonymitylevel') == 'elite' and proxy.get('protocol') in ['http', 'https']:
                if not self.proxy_exists(proxy):
                    self.logger.info(f"proxy: {proxy['ipaddress']} ({proxy['protocol']}, {proxy['anonymitylevel']})")
                    self.proxy_save(proxy)
        self.load_proxy_list()

    def get_proxy(self):
        return random.choice(self.proxies)

    def load_proxy_list(self):
        self.proxies = []
        ret = self.db.smembers("proxies")
        for pid in ret:
            proxy = self.load_proxy(pid)
            if proxy:
                self.proxies.append(proxy)

    def load_proxy(self, pid):
        data = self.db.hmget(f"proxy:{pid}", "ipaddress", "port", "protocol", "anonymitylevel", "source", "country")
        if not all(data):
            return None
        return {k: v for k, v in zip(["ipaddress", "port", "protocol", "anonymitylevel", "source", "country"], data)}

    def proxy_save(self, proxy):
        next_id = self.db.incr("proxies_next_id")
        self.db.hmset(f"proxy:{next_id}", proxy)
        self.db.sadd("proxies", next_id)
        self.db.hset("proxies_ids", proxy['ipaddress'], next_id)

    def proxy_exists(self, proxy):
        return self.db.hexists("proxies_ids", proxy['ipaddress'])

    def read_proxies_from_file(self, filepath):
        with open(filepath, 'r') as file:
            values = json.load(file)
        proxies = []
        for data in values:
            proxy = {
                "ipaddress": data.get("ipaddress", "").lower(),
                "port": str(data.get("port", "")),
                "protocol": data.get("protocols", [""])[0].lower(),
                "anonymitylevel": data.get("anonymitylevel", "").lower(),
                "source": data.get("source", "").lower(),
                "country": data.get("country", "").lower()
            }
            proxies.append(proxy)
        return proxies

スクリプト 1: プロキシを一括追加する

このスクリプトは、ファイルからプロキシのリストを読み取り、それを Redis データベースに追加します。

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    filepath = 'path/to/proxy_list.json'
    manager.update_proxies(filepath)
    logging.info("Proxies updated successfully")

if __name__ == "__main__":
    main()

スクリプト 2: ランダム プロキシの取得

このスクリプトは、Redis データベースからランダムなプロキシ サーバーを取得します。

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    proxy = manager.get_proxy()
    logging.info(f"Random proxy: {proxy}")

if __name__ == "__main__":
    main()

スクリプト 3: プロキシの存在の確認

このスクリプトは、特定のプロキシ サーバーが Redis データベースに存在するかどうかを確認します。

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    test_proxy = {
        "ipaddress": "192.168.1.1",
        "port": "8080",
        "protocol": "http",
        "anonymitylevel": "elite",
        "source": "test_source",
        "country": "us"
    }
    exists = manager.proxy_exists(test_proxy)
    if exists:
        logging.info("Proxy exists in the database")
    else:
        logging.info("Proxy does not exist in the database")

if __name__ == "__main__":
    main()

スクリプト 4: プロキシの削除

このスクリプトはRedisデータベースからプロキシを削除します。まず、 delete_proxy 方法 RedisManager クラス:

def delete_proxy(self, pid):
    self.db.delete(f"proxy:{pid}")
    self.db.srem("proxies", pid)

次に、プロキシを削除するためのスクリプトを示します。

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    proxy_id = 1  # Replace with the ID of the proxy you want to delete
    manager.delete_proxy(proxy_id)
    logging.info("Proxy deleted successfully")

if __name__ == "__main__":
    main()

結論

このガイドでは、Python と Redis を使用してプロキシを管理するためのスクリプトをいくつか作成しました。これらのスクリプトを使用すると、Redis データベースからプロキシを追加、取得、確認、削除することができ、プロキシ管理のための堅牢なソリューションが提供されます。これらの例を基にして、プロキシ管理システムを特定のニーズに合わせてさらにカスタマイズおよび拡張できます。

コメント (0)

まだコメントはありません。あなたが最初のコメントを投稿できます!

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です


プロキシの選択と購入

データセンター・プロキシ

プロキシのローテーション

UDPプロキシ

世界中の10,000以上の顧客から信頼されています

代理顧客
代理顧客
代理顧客 flowch.ai
代理顧客
代理顧客
代理顧客