Cài đặt phụ thuộc

Đảm bảo rằng bạn đã cài đặt Redis và redis-py thư viện để làm việc với Redis từ Python. Cài đặt các phụ thuộc cần thiết bằng pip:

pip install redis

Tạo Trình quản lý Redis

Hãy tạo một RedisManager class để xử lý các tương tác với Redis.

import redis
import json
import random
import logging

class RedisManager:
    def __init__(self, host='localhost', port=6379, db=0):
        self.db = redis.StrictRedis(host=host, port=port, db=db, decode_responses=True)
        self.logger = logging.getLogger('proxy_manager')
        self.logger.setLevel(logging.INFO)
        if not self.logger.hasHandlers():
            ch = logging.StreamHandler()
            ch.setLevel(logging.INFO)
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)
        self.proxies = []
        self.load_proxy_list()

    def update_proxies(self, filepath):
        proxies = self.read_proxies_from_file(filepath)
        for proxy in proxies:
            if proxy.get('anonymitylevel') == 'elite' and proxy.get('protocol') in ['http', 'https']:
                if not self.proxy_exists(proxy):
                    self.logger.info(f"proxy: {proxy['ipaddress']} ({proxy['protocol']}, {proxy['anonymitylevel']})")
                    self.proxy_save(proxy)
        self.load_proxy_list()

    def get_proxy(self):
        return random.choice(self.proxies)

    def load_proxy_list(self):
        self.proxies = []
        ret = self.db.smembers("proxies")
        for pid in ret:
            proxy = self.load_proxy(pid)
            if proxy:
                self.proxies.append(proxy)

    def load_proxy(self, pid):
        data = self.db.hmget(f"proxy:{pid}", "ipaddress", "port", "protocol", "anonymitylevel", "source", "country")
        if not all(data):
            return None
        return {k: v for k, v in zip(["ipaddress", "port", "protocol", "anonymitylevel", "source", "country"], data)}

    def proxy_save(self, proxy):
        next_id = self.db.incr("proxies_next_id")
        self.db.hmset(f"proxy:{next_id}", proxy)
        self.db.sadd("proxies", next_id)
        self.db.hset("proxies_ids", proxy['ipaddress'], next_id)

    def proxy_exists(self, proxy):
        return self.db.hexists("proxies_ids", proxy['ipaddress'])

    def read_proxies_from_file(self, filepath):
        with open(filepath, 'r') as file:
            values = json.load(file)
        proxies = []
        for data in values:
            proxy = {
                "ipaddress": data.get("ipaddress", "").lower(),
                "port": str(data.get("port", "")),
                "protocol": data.get("protocols", [""])[0].lower(),
                "anonymitylevel": data.get("anonymitylevel", "").lower(),
                "source": data.get("source", "").lower(),
                "country": data.get("country", "").lower()
            }
            proxies.append(proxy)
        return proxies

Tập lệnh 1: Thêm proxy hàng loạt

Tập lệnh này đọc danh sách proxy từ một tệp và thêm chúng vào cơ sở dữ liệu Redis.

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    filepath = 'path/to/proxy_list.json'
    manager.update_proxies(filepath)
    logging.info("Proxies updated successfully")

if __name__ == "__main__":
    main()

Kịch bản 2: Truy xuất proxy ngẫu nhiên

Tập lệnh này truy xuất một máy chủ proxy ngẫu nhiên từ cơ sở dữ liệu Redis.

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    proxy = manager.get_proxy()
    logging.info(f"Random proxy: {proxy}")

if __name__ == "__main__":
    main()

Kịch bản 3: Kiểm tra sự tồn tại của proxy

Tập lệnh này kiểm tra xem máy chủ proxy cụ thể có tồn tại trong cơ sở dữ liệu Redis hay không.

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    test_proxy = {
        "ipaddress": "192.168.1.1",
        "port": "8080",
        "protocol": "http",
        "anonymitylevel": "elite",
        "source": "test_source",
        "country": "us"
    }
    exists = manager.proxy_exists(test_proxy)
    if exists:
        logging.info("Proxy exists in the database")
    else:
        logging.info("Proxy does not exist in the database")

if __name__ == "__main__":
    main()

Kịch bản 4: Xóa Proxy

Tập lệnh này xóa proxy khỏi cơ sở dữ liệu Redis. Đầu tiên, thêm một delete_proxy phương pháp để RedisManager lớp học:

def delete_proxy(self, pid):
    self.db.delete(f"proxy:{pid}")
    self.db.srem("proxies", pid)

Bây giờ, tập lệnh để xóa proxy:

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    proxy_id = 1  # Replace with the ID of the proxy you want to delete
    manager.delete_proxy(proxy_id)
    logging.info("Proxy deleted successfully")

if __name__ == "__main__":
    main()

Phần kết luận

Trong hướng dẫn này, chúng tôi đã tạo một số tập lệnh để quản lý proxy bằng Python và Redis. Các tập lệnh này cho phép bạn thêm, truy xuất, kiểm tra và xóa proxy khỏi cơ sở dữ liệu Redis, cung cấp giải pháp mạnh mẽ để quản lý proxy. Bằng cách xây dựng dựa trên những ví dụ này, bạn có thể tùy chỉnh và mở rộng thêm hệ thống quản lý proxy để phù hợp với nhu cầu cụ thể của mình.

Bình luận (0)

Chưa có bình luận nào ở đây, bạn có thể là người đầu tiên!

Trả lời

Email của bạn sẽ không được hiển thị công khai. Các trường bắt buộc được đánh dấu *


Chọn và mua proxy

Proxy trung tâm dữ liệu

Proxy luân phiên

Proxy UDP

Được tin cậy bởi hơn 10000 khách hàng trên toàn thế giới

Khách hàng ủy quyền
Khách hàng ủy quyền
Khách hàng proxy flowch.ai
Khách hàng ủy quyền
Khách hàng ủy quyền
Khách hàng ủy quyền