编程日志 | nohup.net

实践是检验真理的唯一标准

mqtt协议EMQX消息队列的订阅模式演示

就想简单的搞个共同消费,不想群发;折腾了半天,估计还是topic写错了,于是乎,记下来,后面直接就可以直接抄。

pom依赖:

org.eclipse.pahoorg.eclipse.paho.client.mqttv31.2.5

sender:

package com.awkj;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * Hello world!
 */
public class App {
    public static void main(String[] args) {
        String topic = "testtopic/";
        String content = "3333";
        int qos = 2;
        String broker = "tcp://192.168.79.128:1883";
        String clientId = MqttClient.generateClientId();
        //  持久化
        MemoryPersistence persistence = new MemoryPersistence();
        // MQTT 连接选项
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // 设置认证信息
        connOpts.setUserName("swk");
        connOpts.setPassword("swk".toCharArray());

        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);
            // 设置回调
            client.setCallback(new SampleCallback());
            // 建立连接
            System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);
            System.out.println("Connected to broker: " + broker);
            // 订阅 topic
//            client.subscribe(topic, qos);
//            System.out.println("Subscribed to topic: " + topic);
            // 发布消息
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            client.publish(topic, message);
            System.out.println("Message published");
            client.disconnect();
            System.out.println("Disconnected");
            client.close();
            System.exit(0);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }
}

回调函数:

package com.awkj;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class SampleCallback implements MqttCallback {
    // 连接丢失
    @Override
    public void connectionLost(Throwable cause) {
        System.out.println("connection lost:" + cause.getMessage());
    }

    @Override
    //  收到消息
    public void messageArrived(String topic, MqttMessage message) {
        System.out.println("Received message: \n  topic:" + topic + "\n  Qos:" + message.getQos() + "\n  payload:" + new String(message.getPayload()));
    }

    @Override
    // 消息传递成功
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete");
    }


}


receive1:

package com.awkj;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * Hello world!
 */
public class App {
    public static void main(String[] args) {
        String topic = "$queue/testtopic/";
        String content = "Hello World";
        int qos = 2;
        String broker = "tcp://192.168.79.128:1883";
        String clientId = MqttClient.generateClientId();
        //  持久化
        MemoryPersistence persistence = new MemoryPersistence();
        // MQTT 连接选项
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // 设置认证信息
        connOpts.setUserName("swk");
        connOpts.setPassword("swk".toCharArray());

        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);
            // 设置回调
            client.setCallback(new SampleCallback());
            // 建立连接
            System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);
            System.out.println("Connected to broker: " + broker);
            // 订阅 topic
            client.subscribe(topic, qos);
            System.out.println("Subscribed to topic: " + topic);
            // 发布消息
//            MqttMessage message = new MqttMessage(content.getBytes());
//            message.setQos(qos);
//            client.publish(topic, message);
//            System.out.println("Message published");
//            client.disconnect();
//            System.out.println("Disconnected");
//            client.close();
//            System.exit(0);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }
}

回调函数:

package com.awkj;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class SampleCallback implements MqttCallback {
    // 连接丢失
    @Override
    public void connectionLost(Throwable cause) {
        System.out.println("connection lost:" + cause.getMessage());
    }
    @Override
    //  收到消息
    public void messageArrived(String topic, MqttMessage message) {
        System.out.println("Received message: \n  topic:" + topic + "\n  Qos:" + message.getQos() + "\n  payload:" + new String(message.getPayload()));
    }
    @Override
    // 消息传递成功
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete");
    }

}


receive-2跟receive-1一样,就不再冗余代码了,直接启动2个工程即可。

效果:

sender第1次发送一次消息到消息队列,第一次receive-1收到;

sender第2次发送二次消息到消息队列,第一次receive-2收到。

这样,就可以实现多个消费者来共同消费消息,从而减小消息的积压——看你钱多,还是花的人多和快(这一次不讨论花的快的问题)

另外,推荐一个不错的windows MQTT客户端MQTTX:

https://packages.emqx.net/MQTTX/v1.8.2/MQTTX-Setup-1.8.2-x64.exe

颜值如下:

image.png





发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.2

© 2013-2022 nohup.net , All Rights Reserved. 豫ICP备20020372号-1