Memasang Ketergantungan
Pastikan anda telah memasang Redis dan redis-py
perpustakaan untuk bekerja dengan Redis dari Python. Pasang kebergantungan yang diperlukan menggunakan pip:
pip install redis
Mencipta Pengurus Redis
Mari buat a RedisManager
kelas untuk mengendalikan interaksi dengan Redis.
import redis
import json
import random
import logging
class RedisManager:
def __init__(self, host='localhost', port=6379, db=0):
self.db = redis.StrictRedis(host=host, port=port, db=db, decode_responses=True)
self.logger = logging.getLogger('proxy_manager')
self.logger.setLevel(logging.INFO)
if not self.logger.hasHandlers():
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
self.logger.addHandler(ch)
self.proxies = []
self.load_proxy_list()
def update_proxies(self, filepath):
proxies = self.read_proxies_from_file(filepath)
for proxy in proxies:
if proxy.get('anonymitylevel') == 'elite' and proxy.get('protocol') in ['http', 'https']:
if not self.proxy_exists(proxy):
self.logger.info(f"proxy: {proxy['ipaddress']} ({proxy['protocol']}, {proxy['anonymitylevel']})")
self.proxy_save(proxy)
self.load_proxy_list()
def get_proxy(self):
return random.choice(self.proxies)
def load_proxy_list(self):
self.proxies = []
ret = self.db.smembers("proxies")
for pid in ret:
proxy = self.load_proxy(pid)
if proxy:
self.proxies.append(proxy)
def load_proxy(self, pid):
data = self.db.hmget(f"proxy:{pid}", "ipaddress", "port", "protocol", "anonymitylevel", "source", "country")
if not all(data):
return None
return {k: v for k, v in zip(["ipaddress", "port", "protocol", "anonymitylevel", "source", "country"], data)}
def proxy_save(self, proxy):
next_id = self.db.incr("proxies_next_id")
self.db.hmset(f"proxy:{next_id}", proxy)
self.db.sadd("proxies", next_id)
self.db.hset("proxies_ids", proxy['ipaddress'], next_id)
def proxy_exists(self, proxy):
return self.db.hexists("proxies_ids", proxy['ipaddress'])
def read_proxies_from_file(self, filepath):
with open(filepath, 'r') as file:
values = json.load(file)
proxies = []
for data in values:
proxy = {
"ipaddress": data.get("ipaddress", "").lower(),
"port": str(data.get("port", "")),
"protocol": data.get("protocols", [""])[0].lower(),
"anonymitylevel": data.get("anonymitylevel", "").lower(),
"source": data.get("source", "").lower(),
"country": data.get("country", "").lower()
}
proxies.append(proxy)
return proxies
Skrip 1: Menambah Proksi Secara Pukal
Skrip ini membaca senarai proksi daripada fail dan menambahkannya ke pangkalan data Redis.
import logging
logging.basicConfig(level=logging.INFO)
def main():
manager = RedisManager()
filepath = 'path/to/proxy_list.json'
manager.update_proxies(filepath)
logging.info("Proxies updated successfully")
if __name__ == "__main__":
main()
Skrip 2: Mendapatkan semula Proksi Rawak
Skrip ini mendapatkan semula pelayan proksi rawak daripada pangkalan data Redis.
import logging
logging.basicConfig(level=logging.INFO)
def main():
manager = RedisManager()
proxy = manager.get_proxy()
logging.info(f"Random proxy: {proxy}")
if __name__ == "__main__":
main()
Skrip 3: Menyemak Kewujudan Proksi
Skrip ini menyemak sama ada pelayan proksi tertentu wujud dalam pangkalan data Redis.
import logging
logging.basicConfig(level=logging.INFO)
def main():
manager = RedisManager()
test_proxy = {
"ipaddress": "192.168.1.1",
"port": "8080",
"protocol": "http",
"anonymitylevel": "elite",
"source": "test_source",
"country": "us"
}
exists = manager.proxy_exists(test_proxy)
if exists:
logging.info("Proxy exists in the database")
else:
logging.info("Proxy does not exist in the database")
if __name__ == "__main__":
main()
Skrip 4: Memadam Proksi
Skrip ini memadamkan proksi daripada pangkalan data Redis. Pertama, tambahkan a delete_proxy
kaedah kepada RedisManager
kelas:
def delete_proxy(self, pid):
self.db.delete(f"proxy:{pid}")
self.db.srem("proxies", pid)
Sekarang, skrip untuk memadam proksi:
import logging
logging.basicConfig(level=logging.INFO)
def main():
manager = RedisManager()
proxy_id = 1 # Replace with the ID of the proxy you want to delete
manager.delete_proxy(proxy_id)
logging.info("Proxy deleted successfully")
if __name__ == "__main__":
main()
Kesimpulan
Dalam panduan ini, kami telah mencipta beberapa skrip untuk mengurus proksi menggunakan Python dan Redis. Skrip ini membolehkan anda menambah, mendapatkan semula, menyemak dan memadam proksi daripada pangkalan data Redis, menyediakan penyelesaian yang mantap untuk pengurusan proksi. Dengan membina contoh ini, anda boleh memperibadikan dan melanjutkan sistem pengurusan proksi anda untuk memenuhi keperluan khusus anda.
Komen (0)
Tiada ulasan di sini lagi, anda boleh menjadi yang pertama!