博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
配置Mqtt
阅读量:6279 次
发布时间:2019-06-22

本文共 4657 字,大约阅读时间需要 15 分钟。

一、java后台调用MQTT

准备工作:需要导入的jar包

<!-- mqtt依赖包-->

<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.14</version>
</dependency>
1.发布消息

import org.fusesource.hawtbuf.AsciiBuffer;

import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.Future;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import java.util.LinkedList;
/**
* MQTT消息发送
* @param topics 主题
* @param data 发送内容
* @throws Exception
*/
public static void checkMQTT(String topics,String data) throws Exception {
String user = env("ACTIVEMQ_USER", "admin"); //mqtt账号
String password = env("ACTIVEMQ_PASSWORD", "admin"); //mqtt密码
String host = env("ACTIVEMQ_HOST", "localhost"); //地址
int port = Integer.parseInt(env("ACTIVEMQ_PORT", "1883")); //端口号
String DATA = data;//发送的信息
String body = "";
for( int i=0; i < DATA.length(); i ++) {
body += DATA.charAt(i%DATA.length());
}
UTF8Buffer bodys = new UTF8Buffer(body); //进行编码转换,防止无法发送中文
MQTT mqtt = new MQTT();
mqtt.setHost(host, port);
mqtt.setUserName(user);
mqtt.setPassword(password);
FutureConnection connection = mqtt.futureConnection();
connection.connect().await();
final LinkedList<Future<Void>> queue = new LinkedList<Future<Void>>();
UTF8Buffer topic = new UTF8Buffer(topics); //对主题进行编码转换,防止中文异常
queue.add(connection.publish(topic, new AsciiBuffer(bodys), QoS.AT_LEAST_ONCE, false)); //发送mqtt
while( !queue.isEmpty() ) {
queue.removeFirst().await();
}
connection.disconnect().await();
}
private static String env(String key, String defaultValue) {
String rc = System.getenv(key);
if( rc== null )
return defaultValue;
return rc;
}
2.订阅消息

public static void getTopic(String topic) throw Exception{

String user = env("ACTIVEMQ_USER", "admin");
String password = env("ACTIVEMQ_PASSWORD", "password");
String host = env("ACTIVEMQ_HOST", "localhost");
int port = Integer.parseInt(env("ACTIVEMQ_PORT", "1883"));
MQTT mqtt = new MQTT();
mqtt.setHost(host, port);
mqtt.setUserName(user);
mqtt.setPassword(password);
final CallbackConnection connection = mqtt.callbackConnection();
connection.listener(new org.fusesource.mqtt.client.Listener() {
public void onConnected() {
}
public void onDisconnected() {
}
public void onFailure(Throwable value) {
value.printStackTrace();
}
public void onPublish(UTF8Buffer utfTopic, Buffer msg, Runnable ack) {
String body = msg.utf8().toString();
if( "SHUTDOWN".equals(body)) {
long diff = System.currentTimeMillis() - start;
System.out.println(String.format("Received %d in %.2f seconds", count, (1.0*diff/1000.0)));
connection.disconnect(new Callback<Void>() {
@Override
public void onSuccess(Void value) {
}
@Override
public void onFailure(Throwable value) {
value.printStackTrace();
}
});
} else {
if( count == 0 ) {
start = System.currentTimeMillis();
}
if( count % 1000 == 0 ) {
System.out.println(String.format("Received %d messages.", count));
}
count ++;
}
ack.run();
}
});
connection.connect(new Callback<Void>() {
@Override
public void onSuccess(Void value) {
Topic[] topics = {new Topic(topic, QoS.AT_LEAST_ONCE)};
connection.subscribe(topics, new Callback<byte[]>() {
public void onSuccess(byte[] qoses) {
}
public void onFailure(Throwable value) {
value.printStackTrace();
}
});
}
@Override
public void onFailure(Throwable value) {
value.printStackTrace();
}
});
}
二、前台页面js调用MQTT

需要引入的js

<script src="js/mqttws31.js"></script>

1.页面接收 和 发送mqtt

//连接mqtt

var client,destination;
function linkMQTT() {
var host = "localhost";
var port = "61614";
var clientId ="example-68312";
var user ="admin";
var password ="admin";
client = new Messaging.Client(host, Number(port), clientId);
client.onConnect = onConnect;
client.onMessageArrived = onMessageArrived;
client.onConnectionLost = onConnectionLost;
client.connect({
userName:user,
cleanSession:true,
password:password,
onSuccess:onConnect,
onFailure:onFailure
});
return false;
}
//接收消息
var onConnect = function(frame) {
client.subscribe('00000001010042'); //订阅主题
client.subscribe('00000001010001');
}
//失败的操作
function onFailure(failure) {
alert(failure.errorMessage);
}
//订阅成功的操作
function onMessageArrived(message) {
alert(message.payloadString);
}
function onConnectionLost(responseObject) {
if (responseObject.errorCode !== 0) {
alert(client.clientId + ": " + responseObject.errorCode + "\n");
}
}
//发送消息
funcation sendMessage(text) {
message = new Messaging.Message(text);
message.destinationName = '00000001010001';
client.send(message);
message = new Messaging.Message(text);
message.destinationName = '00000001010042';
client.send(message);
}

转载于:https://www.cnblogs.com/yxj9536/p/10265347.html

你可能感兴趣的文章
App 卸载记录
查看>>
南京大学周志华教授当选欧洲科学院外籍院士
查看>>
计算机网络与Internet应用
查看>>
Django 文件下载功能
查看>>
走红日本 阿里云如何能够赢得海外荣耀
查看>>
磁盘空间满引起的mysql启动失败:ERROR! MySQL server PID file could not be found!
查看>>
点播转码相关常见问题及排查方式
查看>>
[arm驱动]linux设备地址映射到用户空间
查看>>
弗洛伊德算法
查看>>
【算法之美】求解两个有序数组的中位数 — leetcode 4. Median of Two Sorted Arrays
查看>>
精度 Precision
查看>>
Android——4.2 - 3G移植之路之 APN (五)
查看>>
Linux_DHCP服务搭建
查看>>
[SilverLight]DataGrid实现批量输入(like Excel)(补充)
查看>>
秋式广告杀手:广告拦截原理与杀手组织
查看>>
翻译 | 摆脱浏览器限制的JavaScript
查看>>
闲扯下午引爆乌云社区“盗窃”乌云币事件
查看>>
02@在类的头文件中尽量少引入其他头文件
查看>>
JAVA IO BIO NIO AIO
查看>>
input checkbox 复选框大小修改
查看>>