RabbitMQ的使用—实战
RabbitMQ是一个开源的消息代理中间件,在分布式系统开发中被广泛应用。它实现了高级消息队列协议(AMQP),提供可靠的消息传递、灵活的路由、消息确认等功能。下面是使用RabbitMQ的基本流程:
1.安装 RabbitMQ
首先需要在您的系统上安装 RabbitMQ。具体安装步骤根据您的操作系统不同而有所区别。您可以访问 RabbitMQ 官方网站获取安装指南。
2.连接到 RabbitMQ
使用任何支持 AMQP 协议的客户端库与 RabbitMQ 建立连接。常用的客户端库包括 pika(Python)、RabbitMQ.Client(C#)等。
实 战:
系统:mac OS
环境:Pycharm2021
构造一个请求对象,然后通过pickle工具进行序列化,最后送到RabbitMQ中。生产者代码如下:
import pika
import requests
import pickle
MAX_PRIORITY = 100
TOTAL = 100
QUEUE_NAME = 'scrape_queue'
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME,durable=True)
for i in range(1, TOTAL+1):
url = f'https://ssr1.scrape.center/detail/{i}'
request = requests.Request('GET',url)
channel.basic_publish(exchange='',
routing_key=QUEUE_NAME,
properties=pika.BasicProperties(delivery_mode=2,),body=pickle.dumps(request))
print(f'Put request of {url}')
对于消费者,可以编写一个循环,让它不断地从队列中取出请求对象,取出一个就执行一次爬虫任务,实现如下:
import pika
import pickle
import requests
MAX_PRIORITY = 100
QUEUE_NAME = 'scrape_queue'
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
session = requests.Session()
def scrape(request):
try:
response = session.send(request.prepare())
print(f'success scraped {response.url}')
except request.RequestException:
print(f'error occurred when scraping {request.url}')
while 1:
method_frame, header, body = channel.basic_get(
queue=QUEUE_NAME, auto_ack=True
)
if body:
request = pickle.loads(body)
print(f'Get {request}')
scrape(request)
这里消费者调用basic_get方法获取了消息,然后通过pickle工具把消息反序列化还原成一个请求对象,之后使用session的send方法执行该请求,爬取了数据,如果爬取成功就打印爬取成功的消息。
运行结果如下:
可以看到,消费者依次取出了请求对象,然后成功完成了一个个爬取任务。