Instalace závislostí

Ujistěte se, že máte nainstalovaný Redis a redis-py knihovna pro práci s Redis z Pythonu. Nainstalujte potřebné závislosti pomocí pip:

pip install redis

Vytvoření Redis Manageru

Vytvořme a RedisManager třídy pro zpracování interakcí s Redis.

import redis
import json
import random
import logging

class RedisManager:
    def __init__(self, host='localhost', port=6379, db=0):
        self.db = redis.StrictRedis(host=host, port=port, db=db, decode_responses=True)
        self.logger = logging.getLogger('proxy_manager')
        self.logger.setLevel(logging.INFO)
        if not self.logger.hasHandlers():
            ch = logging.StreamHandler()
            ch.setLevel(logging.INFO)
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)
        self.proxies = []
        self.load_proxy_list()

    def update_proxies(self, filepath):
        proxies = self.read_proxies_from_file(filepath)
        for proxy in proxies:
            if proxy.get('anonymitylevel') == 'elite' and proxy.get('protocol') in ['http', 'https']:
                if not self.proxy_exists(proxy):
                    self.logger.info(f"proxy: {proxy['ipaddress']} ({proxy['protocol']}, {proxy['anonymitylevel']})")
                    self.proxy_save(proxy)
        self.load_proxy_list()

    def get_proxy(self):
        return random.choice(self.proxies)

    def load_proxy_list(self):
        self.proxies = []
        ret = self.db.smembers("proxies")
        for pid in ret:
            proxy = self.load_proxy(pid)
            if proxy:
                self.proxies.append(proxy)

    def load_proxy(self, pid):
        data = self.db.hmget(f"proxy:{pid}", "ipaddress", "port", "protocol", "anonymitylevel", "source", "country")
        if not all(data):
            return None
        return {k: v for k, v in zip(["ipaddress", "port", "protocol", "anonymitylevel", "source", "country"], data)}

    def proxy_save(self, proxy):
        next_id = self.db.incr("proxies_next_id")
        self.db.hmset(f"proxy:{next_id}", proxy)
        self.db.sadd("proxies", next_id)
        self.db.hset("proxies_ids", proxy['ipaddress'], next_id)

    def proxy_exists(self, proxy):
        return self.db.hexists("proxies_ids", proxy['ipaddress'])

    def read_proxies_from_file(self, filepath):
        with open(filepath, 'r') as file:
            values = json.load(file)
        proxies = []
        for data in values:
            proxy = {
                "ipaddress": data.get("ipaddress", "").lower(),
                "port": str(data.get("port", "")),
                "protocol": data.get("protocols", [""])[0].lower(),
                "anonymitylevel": data.get("anonymitylevel", "").lower(),
                "source": data.get("source", "").lower(),
                "country": data.get("country", "").lower()
            }
            proxies.append(proxy)
        return proxies

Skript 1: Hromadné přidávání serverů proxy

Tento skript načte seznam proxy ze souboru a přidá je do databáze Redis.

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    filepath = 'path/to/proxy_list.json'
    manager.update_proxies(filepath)
    logging.info("Proxies updated successfully")

if __name__ == "__main__":
    main()

Skript 2: Získání náhodného proxy

Tento skript načte náhodný proxy server z databáze Redis.

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    proxy = manager.get_proxy()
    logging.info(f"Random proxy: {proxy}")

if __name__ == "__main__":
    main()

Skript 3: Kontrola existence proxy

Tento skript zkontroluje, zda v databázi Redis existuje konkrétní proxy server.

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    test_proxy = {
        "ipaddress": "192.168.1.1",
        "port": "8080",
        "protocol": "http",
        "anonymitylevel": "elite",
        "source": "test_source",
        "country": "us"
    }
    exists = manager.proxy_exists(test_proxy)
    if exists:
        logging.info("Proxy exists in the database")
    else:
        logging.info("Proxy does not exist in the database")

if __name__ == "__main__":
    main()

Skript 4: Odstranění proxy

Tento skript odstraní proxy z databáze Redis. Nejprve přidejte a delete_proxy metoda k RedisManager třída:

def delete_proxy(self, pid):
    self.db.delete(f"proxy:{pid}")
    self.db.srem("proxies", pid)

Nyní skript pro odstranění proxy:

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    proxy_id = 1  # Replace with the ID of the proxy you want to delete
    manager.delete_proxy(proxy_id)
    logging.info("Proxy deleted successfully")

if __name__ == "__main__":
    main()

Závěr

V této příručce jsme vytvořili několik skriptů pro správu proxy pomocí Pythonu a Redis. Tyto skripty vám umožňují přidávat, načítat, kontrolovat a odstraňovat proxy z databáze Redis a poskytují robustní řešení pro správu proxy. Na základě těchto příkladů můžete dále přizpůsobit a rozšířit svůj systém správy proxy tak, aby vyhovoval vašim konkrétním potřebám.

Komentáře (0)

Zatím zde nejsou žádné komentáře, můžete být první!

Napsat komentář

Vaše e-mailová adresa nebude zveřejněna. Vyžadované informace jsou označeny *


Vyberte a kupte proxy

Proxy datových center

Rotující proxy

UDP proxy

Důvěřuje více než 10 000 zákazníkům po celém světě

Proxy zákazník
Proxy zákazník
Proxy zákazníka flowch.ai
Proxy zákazník
Proxy zákazník
Proxy zákazník