Python编写Rabbitmq的生产者和消费者(基础)

Python Worker and Producer


  • 安装 pikapip3 install pika
  • 官方文档

    Producer编写

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    import pika

    # 身份验证
    auth = pika.PlainCredentials("用户名", "密码")
    # 这里的身份验证之前, 需要确认该身份是否具有权限
    # Rabbitmq后台 -> Admin -> User->SetPermission
    connection = pika.BlockingConnection(
    pika.ConnectionParameters("远程服务器地址", 5672, "/", auth)
    )

    # 声明管道
    channel = connection.channel()

    # 在管道里面声明queue
    # durable=True, 队列持续化
    channel.queue_declare(
    queue="hello", durable=True
    )

    # 用管道发消息
    # 使用默认的交换机发送消息。exchange为空就使用默认的
    channel.basic_publish(
    exchange="",
    routing_key="hello", # 消息队列名
    body="HelloWorld",
    properties=pika.BasicProperties(
    delivery_mode=2, # make message persistent
    )
    )

    print("Send Success..")
    connection.close()

Worker 编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import pika

# 身份验证
auth = pika.PlainCredentials("用户名", "密码")
# 这里的身份验证之前, 需要确认该身份是否具有权限
# Rabbitmq后台 -> Admin -> User->SetPermission
connection = pika.BlockingConnection(
pika.ConnectionParameters("远程服务器地址", 5672, "/", auth)
)


channel = connection.channel()

# 这里再次声明从那个队列去收消息(可以不写,但必须要有这个队列,不然会报错)
# 从队列取消息
channel.queue_declare(queue="hello", durable=True)


# 回调函数
def callback(ch, method, properties, body):
print("[+] recived: {}".format(body))

# 给队列发一个确认执行完成的消息,
# 否则消息会保存起来, 会转给下一个消费者
# ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)

# 取消发送消息中断处理功能,不管有没有处理完,都不会给服务器端发确认
# 百度上很多文章, callback没有加参数名, 导致一个报错. 官方文档里面为正确代码
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)

print("[+] Waiting for queue message....")

channel.start_consuming()