Menginstal Dependensi
Pastikan Anda telah menginstal Redis dan redis-py
perpustakaan untuk bekerja dengan Redis dari Python. Instal dependensi yang diperlukan menggunakan pip:
pip install redis
Membuat Manajer Redis
Mari kita membuat RedisManager
kelas untuk menangani interaksi dengan Redis.
import redis
import json
import random
import logging
class RedisManager:
def __init__(self, host='localhost', port=6379, db=0):
self.db = redis.StrictRedis(host=host, port=port, db=db, decode_responses=True)
self.logger = logging.getLogger('proxy_manager')
self.logger.setLevel(logging.INFO)
if not self.logger.hasHandlers():
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
self.logger.addHandler(ch)
self.proxies = []
self.load_proxy_list()
def update_proxies(self, filepath):
proxies = self.read_proxies_from_file(filepath)
for proxy in proxies:
if proxy.get('anonymitylevel') == 'elite' and proxy.get('protocol') in ['http', 'https']:
if not self.proxy_exists(proxy):
self.logger.info(f"proxy: {proxy['ipaddress']} ({proxy['protocol']}, {proxy['anonymitylevel']})")
self.proxy_save(proxy)
self.load_proxy_list()
def get_proxy(self):
return random.choice(self.proxies)
def load_proxy_list(self):
self.proxies = []
ret = self.db.smembers("proxies")
for pid in ret:
proxy = self.load_proxy(pid)
if proxy:
self.proxies.append(proxy)
def load_proxy(self, pid):
data = self.db.hmget(f"proxy:{pid}", "ipaddress", "port", "protocol", "anonymitylevel", "source", "country")
if not all(data):
return None
return {k: v for k, v in zip(["ipaddress", "port", "protocol", "anonymitylevel", "source", "country"], data)}
def proxy_save(self, proxy):
next_id = self.db.incr("proxies_next_id")
self.db.hmset(f"proxy:{next_id}", proxy)
self.db.sadd("proxies", next_id)
self.db.hset("proxies_ids", proxy['ipaddress'], next_id)
def proxy_exists(self, proxy):
return self.db.hexists("proxies_ids", proxy['ipaddress'])
def read_proxies_from_file(self, filepath):
with open(filepath, 'r') as file:
values = json.load(file)
proxies = []
for data in values:
proxy = {
"ipaddress": data.get("ipaddress", "").lower(),
"port": str(data.get("port", "")),
"protocol": data.get("protocols", [""])[0].lower(),
"anonymitylevel": data.get("anonymitylevel", "").lower(),
"source": data.get("source", "").lower(),
"country": data.get("country", "").lower()
}
proxies.append(proxy)
return proxies
Skrip 1: Menambahkan Proksi Secara Massal
Skrip ini membaca daftar proksi dari sebuah file dan menambahkannya ke database Redis.
import logging
logging.basicConfig(level=logging.INFO)
def main():
manager = RedisManager()
filepath = 'path/to/proxy_list.json'
manager.update_proxies(filepath)
logging.info("Proxies updated successfully")
if __name__ == "__main__":
main()
Skrip 2: Mengambil Proxy Acak
Skrip ini mengambil server proksi acak dari database Redis.
import logging
logging.basicConfig(level=logging.INFO)
def main():
manager = RedisManager()
proxy = manager.get_proxy()
logging.info(f"Random proxy: {proxy}")
if __name__ == "__main__":
main()
Skrip 3: Memeriksa Keberadaan Proxy
Skrip ini memeriksa apakah ada server proksi tertentu di database Redis.
import logging
logging.basicConfig(level=logging.INFO)
def main():
manager = RedisManager()
test_proxy = {
"ipaddress": "192.168.1.1",
"port": "8080",
"protocol": "http",
"anonymitylevel": "elite",
"source": "test_source",
"country": "us"
}
exists = manager.proxy_exists(test_proxy)
if exists:
logging.info("Proxy exists in the database")
else:
logging.info("Proxy does not exist in the database")
if __name__ == "__main__":
main()
Skrip 4: Menghapus Proxy
Skrip ini menghapus proksi dari database Redis. Pertama, tambahkan a delete_proxy
metode ke RedisManager
kelas:
def delete_proxy(self, pid):
self.db.delete(f"proxy:{pid}")
self.db.srem("proxies", pid)
Sekarang, skrip untuk menghapus proxy:
import logging
logging.basicConfig(level=logging.INFO)
def main():
manager = RedisManager()
proxy_id = 1 # Replace with the ID of the proxy you want to delete
manager.delete_proxy(proxy_id)
logging.info("Proxy deleted successfully")
if __name__ == "__main__":
main()
Kesimpulan
Dalam panduan ini, kami telah membuat beberapa skrip untuk mengelola proxy menggunakan Python dan Redis. Skrip ini memungkinkan Anda menambahkan, mengambil, memeriksa, dan menghapus proksi dari database Redis, sehingga memberikan solusi tangguh untuk manajemen proksi. Dengan memanfaatkan contoh-contoh ini, Anda dapat menyesuaikan dan memperluas sistem manajemen proxy lebih lanjut agar sesuai dengan kebutuhan spesifik Anda.
Komentar (0)
Belum ada komentar di sini, Anda bisa menjadi yang pertama!