نسخة تجريبية مجانية للوكيل

تثبيت التبعيات

تأكد من تثبيت Redis و redis-py مكتبة للعمل مع Redis من بايثون. قم بتثبيت التبعيات الضرورية باستخدام النقطة:

pip install redis

إنشاء مدير Redis

لنقم بإنشاء ملف RedisManager فئة للتعامل مع التفاعلات مع Redis.

import redis
import json
import random
import logging

class RedisManager:
    def __init__(self, host='localhost', port=6379, db=0):
        self.db = redis.StrictRedis(host=host, port=port, db=db, decode_responses=True)
        self.logger = logging.getLogger('proxy_manager')
        self.logger.setLevel(logging.INFO)
        if not self.logger.hasHandlers():
            ch = logging.StreamHandler()
            ch.setLevel(logging.INFO)
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)
        self.proxies = []
        self.load_proxy_list()

    def update_proxies(self, filepath):
        proxies = self.read_proxies_from_file(filepath)
        for proxy in proxies:
            if proxy.get('anonymitylevel') == 'elite' and proxy.get('protocol') in ['http', 'https']:
                if not self.proxy_exists(proxy):
                    self.logger.info(f"proxy: {proxy['ipaddress']} ({proxy['protocol']}, {proxy['anonymitylevel']})")
                    self.proxy_save(proxy)
        self.load_proxy_list()

    def get_proxy(self):
        return random.choice(self.proxies)

    def load_proxy_list(self):
        self.proxies = []
        ret = self.db.smembers("proxies")
        for pid in ret:
            proxy = self.load_proxy(pid)
            if proxy:
                self.proxies.append(proxy)

    def load_proxy(self, pid):
        data = self.db.hmget(f"proxy:{pid}", "ipaddress", "port", "protocol", "anonymitylevel", "source", "country")
        if not all(data):
            return None
        return {k: v for k, v in zip(["ipaddress", "port", "protocol", "anonymitylevel", "source", "country"], data)}

    def proxy_save(self, proxy):
        next_id = self.db.incr("proxies_next_id")
        self.db.hmset(f"proxy:{next_id}", proxy)
        self.db.sadd("proxies", next_id)
        self.db.hset("proxies_ids", proxy['ipaddress'], next_id)

    def proxy_exists(self, proxy):
        return self.db.hexists("proxies_ids", proxy['ipaddress'])

    def read_proxies_from_file(self, filepath):
        with open(filepath, 'r') as file:
            values = json.load(file)
        proxies = []
        for data in values:
            proxy = {
                "ipaddress": data.get("ipaddress", "").lower(),
                "port": str(data.get("port", "")),
                "protocol": data.get("protocols", [""])[0].lower(),
                "anonymitylevel": data.get("anonymitylevel", "").lower(),
                "source": data.get("source", "").lower(),
                "country": data.get("country", "").lower()
            }
            proxies.append(proxy)
        return proxies

البرنامج النصي 1: إضافة الوكلاء بكميات كبيرة

يقرأ هذا البرنامج النصي قائمة بالوكلاء من ملف ويضيفهم إلى قاعدة بيانات Redis.

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    filepath = 'path/to/proxy_list.json'
    manager.update_proxies(filepath)
    logging.info("Proxies updated successfully")

if __name__ == "__main__":
    main()

البرنامج النصي 2: استرداد وكيل عشوائي

يسترد هذا البرنامج النصي خادم وكيل عشوائي من قاعدة بيانات Redis.

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    proxy = manager.get_proxy()
    logging.info(f"Random proxy: {proxy}")

if __name__ == "__main__":
    main()

البرنامج النصي 3: التحقق من وجود الوكيل

يتحقق هذا البرنامج النصي من وجود خادم وكيل محدد في قاعدة بيانات Redis.

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    test_proxy = {
        "ipaddress": "192.168.1.1",
        "port": "8080",
        "protocol": "http",
        "anonymitylevel": "elite",
        "source": "test_source",
        "country": "us"
    }
    exists = manager.proxy_exists(test_proxy)
    if exists:
        logging.info("Proxy exists in the database")
    else:
        logging.info("Proxy does not exist in the database")

if __name__ == "__main__":
    main()

البرنامج النصي 4: حذف الوكيل

يحذف هذا البرنامج النصي الوكيل من قاعدة بيانات Redis. أولا أضف أ delete_proxy طريقة إلى RedisManager فصل:

def delete_proxy(self, pid):
    self.db.delete(f"proxy:{pid}")
    self.db.srem("proxies", pid)

الآن، البرنامج النصي لحذف الوكيل:

import logging

logging.basicConfig(level=logging.INFO)

def main():
    manager = RedisManager()
    proxy_id = 1  # Replace with the ID of the proxy you want to delete
    manager.delete_proxy(proxy_id)
    logging.info("Proxy deleted successfully")

if __name__ == "__main__":
    main()

خاتمة

في هذا الدليل، قمنا بإنشاء العديد من البرامج النصية لإدارة الوكلاء باستخدام Python وRedis. تسمح لك هذه البرامج النصية بإضافة الوكلاء واستردادهم والتحقق منهم وحذفهم من قاعدة بيانات Redis، مما يوفر حلاً قويًا لإدارة الوكيل. من خلال البناء على هذه الأمثلة، يمكنك تخصيص نظام إدارة الوكيل وتوسيعه ليناسب احتياجاتك الخاصة.

التعليقات (0)

لا توجد تعليقات هنا حتى الآن، يمكنك أن تكون الأول!

اترك تعليقاً

لن يتم نشر عنوان بريدك الإلكتروني. الحقول الإلزامية مشار إليها بـ *

اختر وشراء الوكيل

وكلاء مركز البيانات

وكلاء الدورية

وكلاء UDP

موثوق به من قبل أكثر من 10000 عميل حول العالم

العميل الوكيل
العميل الوكيل
وكيل العميلflowch.ai
العميل الوكيل
العميل الوكيل
العميل الوكيل