Sõltuvuste installimine
Veenduge, et Redis oleks installitud ja redis-py
raamatukogu, et töötada koos Pythoni Redisega. Installige vajalikud sõltuvused pipi abil:
pip install redis
Redis Manageri loomine
Loome a RedisManager
klassis Redisega suhtlemiseks.
import redis
import json
import random
import logging
class RedisManager:
def __init__(self, host='localhost', port=6379, db=0):
self.db = redis.StrictRedis(host=host, port=port, db=db, decode_responses=True)
self.logger = logging.getLogger('proxy_manager')
self.logger.setLevel(logging.INFO)
if not self.logger.hasHandlers():
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
self.logger.addHandler(ch)
self.proxies = []
self.load_proxy_list()
def update_proxies(self, filepath):
proxies = self.read_proxies_from_file(filepath)
for proxy in proxies:
if proxy.get('anonymitylevel') == 'elite' and proxy.get('protocol') in ['http', 'https']:
if not self.proxy_exists(proxy):
self.logger.info(f"proxy: {proxy['ipaddress']} ({proxy['protocol']}, {proxy['anonymitylevel']})")
self.proxy_save(proxy)
self.load_proxy_list()
def get_proxy(self):
return random.choice(self.proxies)
def load_proxy_list(self):
self.proxies = []
ret = self.db.smembers("proxies")
for pid in ret:
proxy = self.load_proxy(pid)
if proxy:
self.proxies.append(proxy)
def load_proxy(self, pid):
data = self.db.hmget(f"proxy:{pid}", "ipaddress", "port", "protocol", "anonymitylevel", "source", "country")
if not all(data):
return None
return {k: v for k, v in zip(["ipaddress", "port", "protocol", "anonymitylevel", "source", "country"], data)}
def proxy_save(self, proxy):
next_id = self.db.incr("proxies_next_id")
self.db.hmset(f"proxy:{next_id}", proxy)
self.db.sadd("proxies", next_id)
self.db.hset("proxies_ids", proxy['ipaddress'], next_id)
def proxy_exists(self, proxy):
return self.db.hexists("proxies_ids", proxy['ipaddress'])
def read_proxies_from_file(self, filepath):
with open(filepath, 'r') as file:
values = json.load(file)
proxies = []
for data in values:
proxy = {
"ipaddress": data.get("ipaddress", "").lower(),
"port": str(data.get("port", "")),
"protocol": data.get("protocols", [""])[0].lower(),
"anonymitylevel": data.get("anonymitylevel", "").lower(),
"source": data.get("source", "").lower(),
"country": data.get("country", "").lower()
}
proxies.append(proxy)
return proxies
1. skript: puhverserverite hulgi lisamine
See skript loeb failist puhverserverite loendi ja lisab need Redise andmebaasi.
import logging
logging.basicConfig(level=logging.INFO)
def main():
manager = RedisManager()
filepath = 'path/to/proxy_list.json'
manager.update_proxies(filepath)
logging.info("Proxies updated successfully")
if __name__ == "__main__":
main()
2. skript: juhusliku puhverserveri hankimine
See skript hangib Redise andmebaasist juhusliku puhverserveri.
import logging
logging.basicConfig(level=logging.INFO)
def main():
manager = RedisManager()
proxy = manager.get_proxy()
logging.info(f"Random proxy: {proxy}")
if __name__ == "__main__":
main()
3. skript: puhverserveri olemasolu kontrollimine
See skript kontrollib, kas Redise andmebaasis on konkreetne puhverserver.
import logging
logging.basicConfig(level=logging.INFO)
def main():
manager = RedisManager()
test_proxy = {
"ipaddress": "192.168.1.1",
"port": "8080",
"protocol": "http",
"anonymitylevel": "elite",
"source": "test_source",
"country": "us"
}
exists = manager.proxy_exists(test_proxy)
if exists:
logging.info("Proxy exists in the database")
else:
logging.info("Proxy does not exist in the database")
if __name__ == "__main__":
main()
4. skript: puhverserveri kustutamine
See skript kustutab puhverserveri Redise andmebaasist. Esiteks lisage a delete_proxy
meetodit RedisManager
klass:
def delete_proxy(self, pid):
self.db.delete(f"proxy:{pid}")
self.db.srem("proxies", pid)
Nüüd puhverserveri kustutamise skript:
import logging
logging.basicConfig(level=logging.INFO)
def main():
manager = RedisManager()
proxy_id = 1 # Replace with the ID of the proxy you want to delete
manager.delete_proxy(proxy_id)
logging.info("Proxy deleted successfully")
if __name__ == "__main__":
main()
Kokkuvõte
Selles juhendis oleme loonud mitu skripti puhverserverite haldamiseks Pythoni ja Redise abil. Need skriptid võimaldavad teil Redise andmebaasist puhverservereid lisada, tuua, kontrollida ja kustutada, pakkudes tugevat lahendust puhverserveri haldamiseks. Nendele näidetele tuginedes saate oma puhverserveri haldussüsteemi veelgi kohandada ja laiendada vastavalt teie konkreetsetele vajadustele.
Kommentaarid (0)
Siin pole veel kommentaare, võite olla esimene!