|
@@ -2,10 +2,12 @@ package vip.xiaonuo.biz.modular.mq;
|
|
|
|
|
|
import jakarta.annotation.Resource;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
import org.eclipse.paho.client.mqttv3.*;
|
|
|
import org.springframework.boot.CommandLineRunner;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import vip.xiaonuo.common.prop.CommonProperties;
|
|
|
+import vip.xiaonuo.dev.api.DevConfigApi;
|
|
|
|
|
|
import java.io.UnsupportedEncodingException;
|
|
|
|
|
@@ -20,44 +22,48 @@ public class MqttSubscribeClient implements CommandLineRunner {
|
|
|
@Resource
|
|
|
private CommonProperties commonProperties;
|
|
|
|
|
|
+ @Resource
|
|
|
+ private DevConfigApi devConfigApi;
|
|
|
+
|
|
|
|
|
|
@Override
|
|
|
public void run(String... args){
|
|
|
- log.info("[MqttMsgSubscribe]公共生产者启动开始.");
|
|
|
- try{
|
|
|
- client = new MqttClient(commonProperties.getMqttUrl(), "1234567890");
|
|
|
- connOpts = new MqttConnectOptions();
|
|
|
- connOpts.setCleanSession(true);
|
|
|
- connOpts.setUserName(commonProperties.getMqttName());
|
|
|
- connOpts.setAutomaticReconnect(true);
|
|
|
- connOpts.setPassword(commonProperties.getMqttPassword().toCharArray());
|
|
|
- log.info("Connecting to broker: " + commonProperties.getMqttUrl());
|
|
|
- client.connect(connOpts);
|
|
|
- log.info("Connected");
|
|
|
- // client.subscribe(ConstantContextHolder.getMqttTopic() + ConstantContextHolder.getSysItemCode(), 2);
|
|
|
- client.setCallback(new MqttCallback() {
|
|
|
- @Override
|
|
|
- public void connectionLost(Throwable cause) {
|
|
|
- System.out.println("连接丢失: " + cause.getMessage());
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void messageArrived(String topic, MqttMessage message) throws Exception {
|
|
|
- System.out.println("接收到消息: " + new String(message.getPayload()));
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void deliveryComplete(IMqttDeliveryToken token) {
|
|
|
- System.out.println("消息发送完成: " + token.isComplete());
|
|
|
- }
|
|
|
- });
|
|
|
- }catch (MqttSecurityException e) {
|
|
|
- e.printStackTrace();
|
|
|
- } catch (MqttException e) {
|
|
|
- e.printStackTrace();
|
|
|
+ String mqttSwitch = devConfigApi.getValueByKey("MQTT_SWITCH");
|
|
|
+ if(StringUtils.equals(mqttSwitch,"true")){
|
|
|
+ log.info("[MqttMsgSubscribe]公共生产者启动开始.");
|
|
|
+ try{
|
|
|
+ client = new MqttClient(commonProperties.getMqttUrl(), "1234567890");
|
|
|
+ connOpts = new MqttConnectOptions();
|
|
|
+ connOpts.setCleanSession(true);
|
|
|
+ connOpts.setUserName(commonProperties.getMqttName());
|
|
|
+ connOpts.setAutomaticReconnect(true);
|
|
|
+ connOpts.setPassword(commonProperties.getMqttPassword().toCharArray());
|
|
|
+ log.info("Connecting to broker: " + commonProperties.getMqttUrl());
|
|
|
+ client.connect(connOpts);
|
|
|
+ log.info("Connected");
|
|
|
+ // client.subscribe(ConstantContextHolder.getMqttTopic() + ConstantContextHolder.getSysItemCode(), 2);
|
|
|
+ client.setCallback(new MqttCallback() {
|
|
|
+ @Override
|
|
|
+ public void connectionLost(Throwable cause) {
|
|
|
+ System.out.println("连接丢失: " + cause.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
|
|
|
+ System.out.println("接收到消息: " + new String(message.getPayload()));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void deliveryComplete(IMqttDeliveryToken token) {
|
|
|
+ System.out.println("消息发送完成: " + token.isComplete());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }catch (MqttSecurityException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ } catch (MqttException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
}
|
|
|
|
|
|
|