Menginstal Dependensi

Pastikan Anda telah menginstal Redis dan redis-py perpustakaan untuk bekerja dengan Redis dari Python. Instal dependensi yang diperlukan menggunakan pip:

pip install redis

Membuat Manajer Redis

Mari kita membuat RedisManager kelas untuk menangani interaksi dengan Redis.

import redis
import json
import random
import logging

class RedisManager:
    def __init__(self, host='localhost', port=6379, db=0):
        self.db = redis.StrictRedis(host=host, port=port, db=db, decode_responses=True)
        self.logger = logging.getLogger('proxy_manager')
        self.logger.setLevel(logging.INFO)
        if not self.logger.hasHandlers():
            ch = logging.StreamHandler()
            ch.setLevel(logging.INFO)
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)
        self.proxies = []
        self.load_proxy_list()

    def update_proxies(self, filepath):
        proxies = self.read_proxies_from_file(filepath)
        for proxy in proxies:
            if proxy.get('anonymitylevel') == 'elite' and proxy.get('protocol') in ['http', 'https']:
                if not self.proxy_exists(proxy):
                    self.logger.info(f"proxy: {proxy['ipaddress']} ({proxy['protocol']}, {proxy['anonymitylevel']})")
                    self.proxy_save(proxy)
        self.load_proxy_list()

    def get_proxy(self):
        return random.choice(self.proxies)

    def load_proxy_list(self):
        self.proxies = []
        ret = self.db.smembers("proxies")
        for pid in ret:
            proxy = self.load_proxy(pid)
            if proxy:
                self.proxies.append(proxy)

    def load_proxy(self, pid):
        data = self.db.hmget(f"proxy:{pid}", "ipaddress", "port", "protocol", "anonymitylevel", "source", "country")
        if not all(data):
            return None
        return {k: v for k, v in zip(["ipaddress", "port", "protocol", "anonymitylevel", "source", "country"], data)}

    def proxy_save(self, proxy):
        next_id = self.db.incr("proxies_next_id")
        self.db.hmset(f"proxy:{next_id}", proxy)
        self.db.sadd("proxies", next_id)
        self.db.hset("proxies_ids", proxy['ipaddress'], next_id)

    def proxy_exists(self, proxy):
        return self.db.hexists("proxies_ids", proxy['ipaddress'])

    def read_proxies_from_file(self, filepath):
        with open(filepath, 'r') as file:
            values = json.load(file)
        proxies = []
        for data in values:
            proxy = {
                "ipaddress": data.get("ipaddress", "").lower(),
                "port": str(data.get("port", "")),
                "protocol": data.get("protocols", [""])[0].lower(),
                "anonymitylevel": data.get("anonymitylevel", "").lower(),
                "source": data.get("source", "").lower(),
                "country": data.get("country", "").lower()
            }
            proxies.append(proxy)
        return proxies

Skrip 1: Menambahkan Proksi Secara Massal

Skrip ini membaca daftar proksi dari sebuah file dan menambahkannya ke database Redis.

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    filepath = 'path/to/proxy_list.json'
    manager.update_proxies(filepath)
    logging.info("Proxies updated successfully")

if __name__ == "__main__":
    main()

Skrip 2: Mengambil Proxy Acak

Skrip ini mengambil server proksi acak dari database Redis.

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    proxy = manager.get_proxy()
    logging.info(f"Random proxy: {proxy}")

if __name__ == "__main__":
    main()

Skrip 3: Memeriksa Keberadaan Proxy

Skrip ini memeriksa apakah ada server proksi tertentu di database Redis.

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    test_proxy = {
        "ipaddress": "192.168.1.1",
        "port": "8080",
        "protocol": "http",
        "anonymitylevel": "elite",
        "source": "test_source",
        "country": "us"
    }
    exists = manager.proxy_exists(test_proxy)
    if exists:
        logging.info("Proxy exists in the database")
    else:
        logging.info("Proxy does not exist in the database")

if __name__ == "__main__":
    main()

Skrip 4: Menghapus Proxy

Skrip ini menghapus proksi dari database Redis. Pertama, tambahkan a delete_proxy metode ke RedisManager kelas:

def delete_proxy(self, pid):
    self.db.delete(f"proxy:{pid}")
    self.db.srem("proxies", pid)

Sekarang, skrip untuk menghapus proxy:

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    proxy_id = 1  # Replace with the ID of the proxy you want to delete
    manager.delete_proxy(proxy_id)
    logging.info("Proxy deleted successfully")

if __name__ == "__main__":
    main()

Kesimpulan

Dalam panduan ini, kami telah membuat beberapa skrip untuk mengelola proxy menggunakan Python dan Redis. Skrip ini memungkinkan Anda menambahkan, mengambil, memeriksa, dan menghapus proksi dari database Redis, sehingga memberikan solusi tangguh untuk manajemen proksi. Dengan memanfaatkan contoh-contoh ini, Anda dapat menyesuaikan dan memperluas sistem manajemen proxy lebih lanjut agar sesuai dengan kebutuhan spesifik Anda.

Komentar (0)

Belum ada komentar di sini, Anda bisa menjadi yang pertama!

Tinggalkan Balasan

Alamat email Anda tidak akan dipublikasikan. Ruas yang wajib ditandai *


Pilih dan Beli Proxy

Proksi Pusat Data

Memutar Proxy

Proksi UDP

Dipercaya Oleh 10.000+ Pelanggan di Seluruh Dunia

Pelanggan Proksi
Pelanggan Proksi
Pelanggan Proksi flowch.ai
Pelanggan Proksi
Pelanggan Proksi
Pelanggan Proksi