receive_log_topic.py
1 #!/usr/bin/env python 2 """ 3 Rabbitmq.com pub/sub example 4 5 https://www.rabbitmq.com/tutorials/tutorial-five-python.html 6 7 """ 8 9 import asyncio 10 import aioamqp 11 12 import random 13 import sys 14 15 16 @asyncio.coroutine 17 def callback(channel, body, envelope, properties): 18 print("consumer {} received {} ({})".format(envelope.consumer_tag, body, envelope.delivery_tag)) 19 20 21 @asyncio.coroutine 22 def receive_log(): 23 try: 24 transport, protocol = yield from aioamqp.connect('localhost', 5672) 25 except aioamqp.AmqpClosedConnection: 26 print("closed connections") 27 return 28 29 channel = yield from protocol.channel() 30 exchange_name = 'topic_logs' 31 32 yield from channel.exchange(exchange_name, 'topic') 33 34 result = yield from channel.queue(queue_name='', durable=False, auto_delete=True) 35 queue_name = result['queue'] 36 37 binding_keys = sys.argv[1:] 38 if not binding_keys: 39 print("Usage: %s [binding_key]..." % (sys.argv[0],)) 40 sys.exit(1) 41 42 for binding_key in binding_keys: 43 yield from channel.queue_bind( 44 exchange_name='topic_logs', 45 queue_name=queue_name, 46 routing_key=binding_key 47 ) 48 49 print(' [*] Waiting for logs. To exit press CTRL+C') 50 51 yield from channel.basic_consume(callback, queue_name=queue_name) 52 53 event_loop = asyncio.get_event_loop() 54 event_loop.run_until_complete(receive_log()) 55 event_loop.run_forever()