تثبيت التبعيات
تأكد من تثبيت Redis و redis-py
مكتبة للعمل مع Redis من بايثون. قم بتثبيت التبعيات الضرورية باستخدام النقطة:
pip install redis
إنشاء مدير Redis
لنقم بإنشاء ملف RedisManager
فئة للتعامل مع التفاعلات مع Redis.
import redis
import json
import random
import logging
class RedisManager:
def __init__(self, host='localhost', port=6379, db=0):
self.db = redis.StrictRedis(host=host, port=port, db=db, decode_responses=True)
self.logger = logging.getLogger('proxy_manager')
self.logger.setLevel(logging.INFO)
if not self.logger.hasHandlers():
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
self.logger.addHandler(ch)
self.proxies = []
self.load_proxy_list()
def update_proxies(self, filepath):
proxies = self.read_proxies_from_file(filepath)
for proxy in proxies:
if proxy.get('anonymitylevel') == 'elite' and proxy.get('protocol') in ['http', 'https']:
if not self.proxy_exists(proxy):
self.logger.info(f"proxy: {proxy['ipaddress']} ({proxy['protocol']}, {proxy['anonymitylevel']})")
self.proxy_save(proxy)
self.load_proxy_list()
def get_proxy(self):
return random.choice(self.proxies)
def load_proxy_list(self):
self.proxies = []
ret = self.db.smembers("proxies")
for pid in ret:
proxy = self.load_proxy(pid)
if proxy:
self.proxies.append(proxy)
def load_proxy(self, pid):
data = self.db.hmget(f"proxy:{pid}", "ipaddress", "port", "protocol", "anonymitylevel", "source", "country")
if not all(data):
return None
return {k: v for k, v in zip(["ipaddress", "port", "protocol", "anonymitylevel", "source", "country"], data)}
def proxy_save(self, proxy):
next_id = self.db.incr("proxies_next_id")
self.db.hmset(f"proxy:{next_id}", proxy)
self.db.sadd("proxies", next_id)
self.db.hset("proxies_ids", proxy['ipaddress'], next_id)
def proxy_exists(self, proxy):
return self.db.hexists("proxies_ids", proxy['ipaddress'])
def read_proxies_from_file(self, filepath):
with open(filepath, 'r') as file:
values = json.load(file)
proxies = []
for data in values:
proxy = {
"ipaddress": data.get("ipaddress", "").lower(),
"port": str(data.get("port", "")),
"protocol": data.get("protocols", [""])[0].lower(),
"anonymitylevel": data.get("anonymitylevel", "").lower(),
"source": data.get("source", "").lower(),
"country": data.get("country", "").lower()
}
proxies.append(proxy)
return proxies
البرنامج النصي 1: إضافة الوكلاء بكميات كبيرة
يقرأ هذا البرنامج النصي قائمة بالوكلاء من ملف ويضيفهم إلى قاعدة بيانات Redis.
import logging
logging.basicConfig(level=logging.INFO)
def main():
manager = RedisManager()
filepath = 'path/to/proxy_list.json'
manager.update_proxies(filepath)
logging.info("Proxies updated successfully")
if __name__ == "__main__":
main()
البرنامج النصي 2: استرداد وكيل عشوائي
يسترد هذا البرنامج النصي خادم وكيل عشوائي من قاعدة بيانات Redis.
import logging
logging.basicConfig(level=logging.INFO)
def main():
manager = RedisManager()
proxy = manager.get_proxy()
logging.info(f"Random proxy: {proxy}")
if __name__ == "__main__":
main()
البرنامج النصي 3: التحقق من وجود الوكيل
يتحقق هذا البرنامج النصي من وجود خادم وكيل محدد في قاعدة بيانات Redis.
import logging
logging.basicConfig(level=logging.INFO)
def main():
manager = RedisManager()
test_proxy = {
"ipaddress": "192.168.1.1",
"port": "8080",
"protocol": "http",
"anonymitylevel": "elite",
"source": "test_source",
"country": "us"
}
exists = manager.proxy_exists(test_proxy)
if exists:
logging.info("Proxy exists in the database")
else:
logging.info("Proxy does not exist in the database")
if __name__ == "__main__":
main()
البرنامج النصي 4: حذف الوكيل
يحذف هذا البرنامج النصي الوكيل من قاعدة بيانات Redis. أولا أضف أ delete_proxy
طريقة إلى RedisManager
فصل:
def delete_proxy(self, pid):
self.db.delete(f"proxy:{pid}")
self.db.srem("proxies", pid)
الآن، البرنامج النصي لحذف الوكيل:
import logging
logging.basicConfig(level=logging.INFO)
def main():
manager = RedisManager()
proxy_id = 1 # Replace with the ID of the proxy you want to delete
manager.delete_proxy(proxy_id)
logging.info("Proxy deleted successfully")
if __name__ == "__main__":
main()
خاتمة
في هذا الدليل، قمنا بإنشاء العديد من البرامج النصية لإدارة الوكلاء باستخدام Python وRedis. تسمح لك هذه البرامج النصية بإضافة الوكلاء واستردادهم والتحقق منهم وحذفهم من قاعدة بيانات Redis، مما يوفر حلاً قويًا لإدارة الوكيل. من خلال البناء على هذه الأمثلة، يمكنك تخصيص نظام إدارة الوكيل وتوسيعه ليناسب احتياجاتك الخاصة.
التعليقات (0)
لا توجد تعليقات هنا حتى الآن، يمكنك أن تكون الأول!