一、java后台调用MQTT
准备工作:需要导入的jar包
<!-- mqtt依赖包-->
<dependency> <groupId>org.fusesource.mqtt-client</groupId> <artifactId>mqtt-client</artifactId> <version>1.14</version> </dependency>1.发布消息import org.fusesource.hawtbuf.AsciiBuffer;
import org.fusesource.hawtbuf.UTF8Buffer;import org.fusesource.mqtt.client.Future;import org.fusesource.mqtt.client.FutureConnection;import org.fusesource.mqtt.client.MQTT;import org.fusesource.mqtt.client.QoS;import java.util.LinkedList;/** * MQTT消息发送 * @param topics 主题 * @param data 发送内容 * @throws Exception */public static void checkMQTT(String topics,String data) throws Exception { String user = env("ACTIVEMQ_USER", "admin"); //mqtt账号 String password = env("ACTIVEMQ_PASSWORD", "admin"); //mqtt密码 String host = env("ACTIVEMQ_HOST", "localhost"); //地址 int port = Integer.parseInt(env("ACTIVEMQ_PORT", "1883")); //端口号 String DATA = data;//发送的信息 String body = ""; for( int i=0; i < DATA.length(); i ++) { body += DATA.charAt(i%DATA.length()); } UTF8Buffer bodys = new UTF8Buffer(body); //进行编码转换,防止无法发送中文 MQTT mqtt = new MQTT(); mqtt.setHost(host, port); mqtt.setUserName(user); mqtt.setPassword(password); FutureConnection connection = mqtt.futureConnection(); connection.connect().await(); final LinkedList<Future<Void>> queue = new LinkedList<Future<Void>>(); UTF8Buffer topic = new UTF8Buffer(topics); //对主题进行编码转换,防止中文异常 queue.add(connection.publish(topic, new AsciiBuffer(bodys), QoS.AT_LEAST_ONCE, false)); //发送mqtt while( !queue.isEmpty() ) { queue.removeFirst().await(); } connection.disconnect().await();}private static String env(String key, String defaultValue) { String rc = System.getenv(key); if( rc== null ) return defaultValue; return rc;}2.订阅消息public static void getTopic(String topic) throw Exception{
String user = env("ACTIVEMQ_USER", "admin"); String password = env("ACTIVEMQ_PASSWORD", "password"); String host = env("ACTIVEMQ_HOST", "localhost"); int port = Integer.parseInt(env("ACTIVEMQ_PORT", "1883")); MQTT mqtt = new MQTT(); mqtt.setHost(host, port); mqtt.setUserName(user); mqtt.setPassword(password); final CallbackConnection connection = mqtt.callbackConnection(); connection.listener(new org.fusesource.mqtt.client.Listener() { public void onConnected() { } public void onDisconnected() { } public void onFailure(Throwable value) { value.printStackTrace(); } public void onPublish(UTF8Buffer utfTopic, Buffer msg, Runnable ack) { String body = msg.utf8().toString(); if( "SHUTDOWN".equals(body)) { long diff = System.currentTimeMillis() - start; System.out.println(String.format("Received %d in %.2f seconds", count, (1.0*diff/1000.0))); connection.disconnect(new Callback<Void>() { @Override public void onSuccess(Void value) { } @Override public void onFailure(Throwable value) { value.printStackTrace(); } }); } else { if( count == 0 ) { start = System.currentTimeMillis(); } if( count % 1000 == 0 ) { System.out.println(String.format("Received %d messages.", count)); } count ++; } ack.run(); } }); connection.connect(new Callback<Void>() { @Override public void onSuccess(Void value) { Topic[] topics = {new Topic(topic, QoS.AT_LEAST_ONCE)}; connection.subscribe(topics, new Callback<byte[]>() { public void onSuccess(byte[] qoses) { } public void onFailure(Throwable value) { value.printStackTrace(); } }); } @Override public void onFailure(Throwable value) { value.printStackTrace(); } });}二、前台页面js调用MQTT需要引入的js
<script src="js/mqttws31.js"></script>
1.页面接收 和 发送mqtt//连接mqtt
var client,destination;function linkMQTT() { var host = "localhost"; var port = "61614"; var clientId ="example-68312"; var user ="admin"; var password ="admin"; client = new Messaging.Client(host, Number(port), clientId); client.onConnect = onConnect; client.onMessageArrived = onMessageArrived; client.onConnectionLost = onConnectionLost; client.connect({ userName:user, cleanSession:true, password:password, onSuccess:onConnect, onFailure:onFailure }); return false;}//接收消息var onConnect = function(frame) { client.subscribe('00000001010042'); //订阅主题 client.subscribe('00000001010001');}//失败的操作function onFailure(failure) { alert(failure.errorMessage);}//订阅成功的操作function onMessageArrived(message) { alert(message.payloadString); }function onConnectionLost(responseObject) { if (responseObject.errorCode !== 0) { alert(client.clientId + ": " + responseObject.errorCode + "\n"); }}//发送消息funcation sendMessage(text) { message = new Messaging.Message(text); message.destinationName = '00000001010001'; client.send(message); message = new Messaging.Message(text); message.destinationName = '00000001010042'; client.send(message);}