Para testar a comunicação de clientes ao broker, utilizaremos uma aplicação em Python denominada Pika. Segundo a definição em seu site, o Pika “is a pure-Python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library.”. Ou seja, é uma implementação em Python do protocolo AMQP 0-9-1 que se propõe a ser bastante independente da estrutura de rede do cliente.
Para instalar o Pika, utilizaremos o gerenciador de pacotes do Python, o pip. Após instalar o pip pelo repositório do Ubuntu, instalamos o Pika:
$ sudo apt-get install python-pip $ pip install --upgrade --user pip $ pip3 install pika --user
As Figuras 1 e 2 exibem as telas com as operações mencionadas acima.
Executando o Pika (cliente AMQP)
Finalmente, podemos executar os scripts do Pika para implementar nosso exemplo de log. São dois scripts diferentes, o script dos consumers – que recebe as mensagens de log – e o script do publisher – que publica a mensagem de log.
Para abrir, manipular e testar os scripts em Python, utilizamos o Visual Studio Code.
O Consumer
O código em Python, utilizando o Pika, para o consumidor da mensagem de log é exibido a seguir:
#!/usr/bin/env python
import pika
import sys
#abre a conexão com o broker RabbitMQ instalado na máquina local
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
#declara um exchange do tipo topic
channel.exchange_declare(exchange='topic_logs',
exchange_type='topic')
#cria a fila de onde as mensagens serão lidas (consumidas)
result = channel.queue_declare(exclusive=True)
#requisita o nome da fila criada
queue_name = result.method.queue
#passando-se as binding_keys como argumentos ao executar o script
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
#varre as binding_keys passadas como argumento.
for binding_key in binding_keys:
#cria o binding (associação) entre o binding key e a fila
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
#definição da função a ser chamada sempre que uma mensagem com binding_key correspondente for recebida na fila
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
#consome da fila criada na aplicação
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
O código do consumidor inicia uma conexão com o broker na máquina local (localhost), utilizando as credenciais padrões. A seguir, declara um exchange do tipo Topic chamado topic_logs. A seguir, uma fila é declarada para este consumidor. Observe que, neste caso, cada cliente irá consumir de uma fila única. A partir de uma lista de binding_keys passadas como argumentos durante a execução do script, serão criados os bindings para a fila daquele consumer para, então, a aplicação começar a receber as mensagens publicadas na fila. A Figura 3 a seguir exibe a execução do script – dentro de um terminal aberto no VStudio Code – com binding key igual a “#”, ou seja, para consumir todas as mensagens de log.
$ python receive_logs_topic.py “#”
Através da interface gráfica do RabbitMQ é possível verificar a conexão deste cliente (consumer) ao broker. A Figura 4 exibe a conexão do cliente (consumer) ao broker.
Ainda na interface de gerenciamento gráfica do RabbitMQ, na aba Exchanges, podemos selecionar o exchange topic_logs. Na tela de exibição que se abre é possível visualizar informações sobre as mensagens que são destinadas àquele exchange, bem como visualizar os bindings associados a ele. A Figura 5 a seguir exibe a fila genérica criada pelo cliente (consumer) com binding key igual a “#”.
A seguir, outra sessão da mesma aplicação do cliente (consumer) é criada, agora com binding key igual a “*.critical”, ou seja, todas as mensagens de log críticas serão destinadas a este consumer, independente de qual módulo do sistema gerou a mensagem.
$ python receive_logs_topic.py “*.critical”
Da mesma forma, é possível verificar na tela de informações do exchange topic_logs o binding para a fila com binding key igual a “*.critical”.
A interface gráfica do RabbitMQ permite visualizar as filas no broker. Como afirmamos anteriormente, estes filas foram criadas pelos próprios clientes consumidores. Esta abordagem é possível uma vez que cada tipo de mensagem de log é endereçada a uma única aplicação cliente (consumer) e, assim, não é necessário que mais de um cliente compartilhe a mesma fila. As Figuras 7 e 8 exibem a execução dos scripts dos consumers e as filas associadas a estes consumers na interface gráfica do broker.
O Publisher
Ao falarmos do publisher, nos referimos aos clientes que publicam mensagens ao broker. Enfatizamos que as mensagens que chegam ao broker jamais são endereçadas diretamente às filas – as quais os consumidores monitoram – mas sim ao exchange. O código do publisher é exibido a seguir:
#!/usr/bin/env python
import pika
import sys
#abre a conexão com o broker RabbitMQ instalado na máquina local
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
#declara um exchange do tipo topic (que pode já estar declarado no broker)
channel.exchange_declare(exchange='topic_logs',
exchange_type='topic')
#a routing key da mensagem é passada diretamente como argumento durante a execução do script
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
#ainda como argumento, é possível escrever a mensagem como argumento
message = ' '.join(sys.argv[2:]) or 'Hello World!'
#publica a mensagem com o routing key no exchange topic_logs
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
Em suma, a aplicação cliente publica uma mensagem a um exchange com routing key correspondente à origem e a severidade da mensagem de log. Para nosso exemplo, a aplicação publisher é executada com routing key igual a “kern.critical”, indicando uma mensagem de log crítica gerada pelo módulo do kernel do nosso sistema fictício.
$ python emit_log_topic.py “kern.critical” “A Critical Kernel Error”
É possível observar, ainda na Figura 9, que as aplicações dos consumers receberam a mensagem publicada pelo nosso cliente publisher. Ou seja, a mensagem foi enviada ao exchange topic_logs do broker e encaminhada às filas com binding key compatível. Em nosso exemplo, ambas as aplicações consumers recebem a mensagem pois uma das aplicações recebe mensagens com todos os routing keys endereçados ao exchange (“#”) e a outra recebe mensagens críticas geradas por qualquer módulo do sistema (“*.critical”). A Figura 10 exibe a tela de detalhes da fila com binding key igual a “#”, onde é possível observar gráficos da entrega da mensagem ao consumidor, bem como outras informações sobre a fila.
Conclusão (e o que esperar daqui pra frente)
Este artigo propõe apresentar uma visão geral sobre o protocolo AMQP, bem como apresentar um broker que implementa o AMQP 0-9-1 nativamente. Embora focamos em uma aplicação específica, o AMQP 0-9-1 oferece uma suíte que permite implementar funcionalidades como a distribuição de tarefas entre diversos clientes, envio de mensagens a um grande número de consumidores, manipulação de parâmetros do broker pelos clientes, entre outros. Ainda, o RabbitMQ oferece a possibilidade de integrar plugins que acrescentem funcionalidades ao broker como, por exemplo, a interface gráfica que utilizamos neste artigo ou, ainda, um plugin para MQTT. A integração de outros protocolos de comunicação ao RabbitMQ permite que a mesma interface seja utilizada em um ambiente que agrega clientes que se comunicam com AMQP e clientes que se comunicam, por exemplo, com MQTT, criando uma ponte entre estas mensagens.
Fornecemos a seguir uma VM (máquina virtual, preferencialmente importada com o VirtualBox) com Ubuntu 16.04, a qual utilizamos para nossos experimentos e que o leitor pode (e é incentivado a) usar e experimentar com o RabbitMQ/AMQP:
O próximo artigo desta série (nos cobrem, comentem, perguntem!) irá propor uma aplicação do RabbitMQ/AMQP na nuvem, aprofundando-se nos detalhes do broker e do protocolo e apresentando um caso de uso com uma Raspberry Pi e OpenWRT (deem uma olhada no artigo que já escrevemos sobre) atuando como cliente do broker.
Referências
Saiba mais
MQTT-SN: MQTT para rede de sensores





















Muito bom esta série de artigos, parabéns!!!
Ainda tem disponibilidade desta VM?