Memasang Ketergantungan

Pastikan anda telah memasang Redis dan redis-py perpustakaan untuk bekerja dengan Redis dari Python. Pasang kebergantungan yang diperlukan menggunakan pip:

pip install redis

Mencipta Pengurus Redis

Mari buat a RedisManager kelas untuk mengendalikan interaksi dengan Redis.

import redis
import json
import random
import logging

class RedisManager:
    def __init__(self, host='localhost', port=6379, db=0):
        self.db = redis.StrictRedis(host=host, port=port, db=db, decode_responses=True)
        self.logger = logging.getLogger('proxy_manager')
        self.logger.setLevel(logging.INFO)
        if not self.logger.hasHandlers():
            ch = logging.StreamHandler()
            ch.setLevel(logging.INFO)
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)
        self.proxies = []
        self.load_proxy_list()

    def update_proxies(self, filepath):
        proxies = self.read_proxies_from_file(filepath)
        for proxy in proxies:
            if proxy.get('anonymitylevel') == 'elite' and proxy.get('protocol') in ['http', 'https']:
                if not self.proxy_exists(proxy):
                    self.logger.info(f"proxy: {proxy['ipaddress']} ({proxy['protocol']}, {proxy['anonymitylevel']})")
                    self.proxy_save(proxy)
        self.load_proxy_list()

    def get_proxy(self):
        return random.choice(self.proxies)

    def load_proxy_list(self):
        self.proxies = []
        ret = self.db.smembers("proxies")
        for pid in ret:
            proxy = self.load_proxy(pid)
            if proxy:
                self.proxies.append(proxy)

    def load_proxy(self, pid):
        data = self.db.hmget(f"proxy:{pid}", "ipaddress", "port", "protocol", "anonymitylevel", "source", "country")
        if not all(data):
            return None
        return {k: v for k, v in zip(["ipaddress", "port", "protocol", "anonymitylevel", "source", "country"], data)}

    def proxy_save(self, proxy):
        next_id = self.db.incr("proxies_next_id")
        self.db.hmset(f"proxy:{next_id}", proxy)
        self.db.sadd("proxies", next_id)
        self.db.hset("proxies_ids", proxy['ipaddress'], next_id)

    def proxy_exists(self, proxy):
        return self.db.hexists("proxies_ids", proxy['ipaddress'])

    def read_proxies_from_file(self, filepath):
        with open(filepath, 'r') as file:
            values = json.load(file)
        proxies = []
        for data in values:
            proxy = {
                "ipaddress": data.get("ipaddress", "").lower(),
                "port": str(data.get("port", "")),
                "protocol": data.get("protocols", [""])[0].lower(),
                "anonymitylevel": data.get("anonymitylevel", "").lower(),
                "source": data.get("source", "").lower(),
                "country": data.get("country", "").lower()
            }
            proxies.append(proxy)
        return proxies

Skrip 1: Menambah Proksi Secara Pukal

Skrip ini membaca senarai proksi daripada fail dan menambahkannya ke pangkalan data Redis.

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    filepath = 'path/to/proxy_list.json'
    manager.update_proxies(filepath)
    logging.info("Proxies updated successfully")

if __name__ == "__main__":
    main()

Skrip 2: Mendapatkan semula Proksi Rawak

Skrip ini mendapatkan semula pelayan proksi rawak daripada pangkalan data Redis.

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    proxy = manager.get_proxy()
    logging.info(f"Random proxy: {proxy}")

if __name__ == "__main__":
    main()

Skrip 3: Menyemak Kewujudan Proksi

Skrip ini menyemak sama ada pelayan proksi tertentu wujud dalam pangkalan data Redis.

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    test_proxy = {
        "ipaddress": "192.168.1.1",
        "port": "8080",
        "protocol": "http",
        "anonymitylevel": "elite",
        "source": "test_source",
        "country": "us"
    }
    exists = manager.proxy_exists(test_proxy)
    if exists:
        logging.info("Proxy exists in the database")
    else:
        logging.info("Proxy does not exist in the database")

if __name__ == "__main__":
    main()

Skrip 4: Memadam Proksi

Skrip ini memadamkan proksi daripada pangkalan data Redis. Pertama, tambahkan a delete_proxy kaedah kepada RedisManager kelas:

def delete_proxy(self, pid):
    self.db.delete(f"proxy:{pid}")
    self.db.srem("proxies", pid)

Sekarang, skrip untuk memadam proksi:

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    proxy_id = 1  # Replace with the ID of the proxy you want to delete
    manager.delete_proxy(proxy_id)
    logging.info("Proxy deleted successfully")

if __name__ == "__main__":
    main()

Kesimpulan

Dalam panduan ini, kami telah mencipta beberapa skrip untuk mengurus proksi menggunakan Python dan Redis. Skrip ini membolehkan anda menambah, mendapatkan semula, menyemak dan memadam proksi daripada pangkalan data Redis, menyediakan penyelesaian yang mantap untuk pengurusan proksi. Dengan membina contoh ini, anda boleh memperibadikan dan melanjutkan sistem pengurusan proksi anda untuk memenuhi keperluan khusus anda.

Komen (0)

Tiada ulasan di sini lagi, anda boleh menjadi yang pertama!

Tinggalkan Balasan

Alamat e-mel anda tidak akan disiarkan. Medan diperlukan ditanda dengan *


Pilih dan Beli Proksi

Proksi Pusat Data

Proksi Berputar

Proksi UDP

Dipercayai Oleh 10000+ Pelanggan Seluruh Dunia

Pelanggan Proksi
Pelanggan Proksi
Aliran Pelanggan Proksi.ai
Pelanggan Proksi
Pelanggan Proksi
Pelanggan Proksi