Ir para o conteúdo

Pub/Sub: MQTT e MosQuiTTo

MQTT é um protocolo de transporte para publish/subscribe do tipo cliente-servidor, definido pela OASIS, uma organização aberta responsável por padrões como SAML e DocBook. A especificação atual é a de número 5, lançada em março de 2019. O protocolo é leve, aberto e fácil de implementar, ideal para comunicação Machine to Machine (M2M) e uso no contexto de Internet das Coisas (Internet of Things - I0T).

MQTT is a very light weight and binary protocol, and due to its minimal packet overhead, MQTT excels when transferring data over the wire in comparison to protocols like HTTP. Another important aspect of the protocol is that MQTT is extremely easy to implement on the client side. Ease of use was a key concern in the development of MQTT and makes it a perfect fit for constrained devices with limited resources today.

O Eclipse Mosquitto é um broker de código livre que implementa o protocolo MQTT v5.0, v3.1.1 e v3.1. Por ser mínimo, o Mosquitto é ideal para uso em dispositivos pequenos e com pouca capacidade energética, como computadores low power single board, mas flexível o suficiente para ser usado em aplicações de larga escala.

Instalação
Inicializando o serviço

O arquivo mosquito.conf contém as configurações para o broker. As configurações funcionam bem para o nosso caso. O broker aceita requisições na porta 1883 e publishers e subscribers também utilizam essa porta por padrão. Basta iniciar o broker com a opção -v para ter mais detalhes sobre o que ocorre internamente.

  • Ubuntu: mosquitto -v
  • MacOS: /usr/local/sbin/mosquitto -c /usr/local/etc/mosquitto/mosquitto.conf
Publicando

Para publicar uma mensagem, o publisher deve indicar um host, porta, tópico e mensagem. Caso o host e porta sejam omitidos, assume-se localhost:1883. No MacOS, adicione /usr/local/opt/mosquitto/bin/mosquitto_sub ao path.

1
2
3
# publicando valor de 40 para tópicos 'sensor/temperature/1' e 'sensor/temperature/2'
mosquitto_pub -t sensor/temperature/1 -m 40
mosquitto_pub -t sensor/temperature/2 -m 32

Caso o subscriber não esteja em execução, adicione a opção -r para que o broker retenha a mensagem.

Consumindo

O consumidor funciona de maneira semelhante, informando o tópico de interesse:

1
2
# consumindo mensagens de tópico /sensor/temperature/*
mosquitto_sub -t sensor/temperature/+
Programando

Existem também APIs em diversas linguagem para desenvolvimento de aplicações que utilizem o Mosquitto. A biblioteca pode ser baixada aqui.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttPublishSample {

  public static void main(String[] args) {
    String topic        = "MQTT Examples";
    String content      = "Message from MqttPublishSample";
    int qos             = 2;
    String broker       = "tcp://mqtt.eclipse.org:1883";
    String clientId     = "JavaSample";
    MemoryPersistence persistence = new MemoryPersistence();

    try {
      MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
      MqttConnectOptions connOpts = new MqttConnectOptions();
      connOpts.setCleanSession(true);
      System.out.println("Connecting to broker: "+broker);
      sampleClient.connect(connOpts);
      System.out.println("Connected");
      System.out.println("Publishing message: "+content);
      MqttMessage message = new MqttMessage(content.getBytes());
      message.setQos(qos);
      sampleClient.publish(topic, message);
      System.out.println("Message published");
      sampleClient.disconnect();
      System.out.println("Disconnected");
      System.exit(0);
    } catch(MqttException me) {
      System.out.println("reason "+me.getReasonCode());
      System.out.println("msg "+me.getMessage());
      System.out.println("loc "+me.getLocalizedMessage());
      System.out.println("cause "+me.getCause());
      System.out.println("excep "+me);
      me.printStackTrace();
    }
  }
}
Hive
  • /usr/local/opt/mosquitto/bin/mosquitto_sub -h broker.hivemq.com -p 1883 -t esportes/+/flamengo
  • /usr/local/opt/mosquitto/bin/mosquitto_pub -t esportes/nadacao/flamengo -m "perdeu mais uma vez" -r -h broker.hivemq.com

Exercícios - RPC e Publish/Subscribe

  • Usando thrift e a linguagem Java, estenda o serviço ChaveValor para retornar o valor antigo de uma determinada chave na operação setKV() caso a chave já exista.
  • Usando o broker mosquitto instalado localmente, faça em Java um publisher que simula um sensor de temperatura e publica valores aleatórios entre 15 e 45 a cada segundo.
  • Faça o subscriber que irá consumir esses dados de temperatura.

Referências