JiangShan-web/MQTT接口.md

576 lines
16 KiB
Markdown
Raw Permalink Normal View History

2025-05-22 16:20:13 +08:00
# 在线离线接口
# 主题:
/productid/deviceid/status/post
# 原始http推送接口
http://121.41.45.13:8025/receive
abcdefghijkmlnopqrstuvwxyz
# 新的http推送接口
http://121.36.111.27:8080/bridge/get
# 消息格式json格式具体字段如下其中status取值4离线3在线
{"status":3,"isShadow":1,"rssi":-51}
# onenet平台上下线信息
{msg={"dev_name":"1","at":1742873510747,"pid":"JYr2f72uSJ","type":2,"status":1}, signature=25BcZvEQ+kk10rfB9gLnnQ==, time=1742873510772, id=5614fc489df3433f9891d6d4c7c3e5e7, nonce=b1R7hjgp}
"status":1上线 0下线
# 物模型推送接口: 将数据推送至FASTBEE端
/productid/deviceid/property/post
# 消息格式json格式具体字段如下
[{
"id": "co2",
"value": "1",
"remark": ""
}]
# 规则引擎仅作于设备状态更新
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.hutool.core.util.NumberUtil;
Long productId = 136 // 固定 productId
String sysTopic = "" // 系统主题
String sysPayload = "" // 系统数据格式
// 1. 获取原始内容
String payload = msgContext.getPayload()
msgContext.logger.info("原始数据:" + payload)
// 2. ✅ 只提取 msg 内容
try {
// ✅ 使用 Groovy 兼容的正则写法,提取 msg 内的数据
String msgData = (payload =~ /msg=\{(.+?)\}/)[0][1]
msgContext.logger.info("提取后的 msg 数据:" + msgData)
// ✅ 解析提取后的数据
JSONObject msgObj = JSONUtil.parseObj("{" + msgData + "}")
// ✅ 提取 dev_name 作为 serialNumber
String serialNumber = msgObj.getStr("dev_name", "unknown")
// ✅ 解析 status 并进行转换
Integer status = msgObj.getInt("status", -1)
Integer convertedStatus = (status == 0) ? 4 : (status == 1) ? 3 : status
// ✅ 构造转换后的 sysPayload
JSONObject newObj = new JSONObject()
newObj.put("status", convertedStatus)
newObj.put("isShadow", 1)
newObj.put("rssi", msgObj.getInt("rssi", -51)) // 默认 rssi = -51
sysPayload = newObj.toString()
sysTopic = "/" + productId + "/" + serialNumber + "/status/post"
} catch (Exception e) {
msgContext.logger.error("数据解析失败:", e)
sysPayload = "{}"
}
// 3. 打印调试信息
msgContext.logger.info("新主题:" + sysTopic)
msgContext.logger.info("新内容:" + sysPayload)
msgContext.logger.info("NewTopic 长度: " + sysTopic.length());
// 4. 设置新的数据
msgContext.setTopic(sysTopic)
msgContext.setPayload(sysPayload)
msgContext.logger.info("输出:" + msgContext.getTopic());
// 执行Action动作参数(脚本由系统自动生成)
msgContext.setData("mqttBridgeID", 8);
# 定制设备状态
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.hutool.core.util.NumberUtil;
String sysTopic = "/special/device" // 固定主题
String sysPayload = "" // 系统数据格式
// 1. 获取原始内容
String payload = msgContext.getPayload()
msgContext.logger.info(" 原始数据:" + payload)
// 2. ✅ 只提取 msg 内容
try {
// ✅ 使用 Groovy 兼容的正则写法,提取 msg 内的数据
String msgData = (payload =~ /msg=\{(.+?)\}/)[0][1]
msgContext.logger.info(" 提取后的 msg 数据:" + msgData)
// ✅ 解析提取后的数据
JSONObject msgObj = JSONUtil.parseObj("{" + msgData + "}")
// ✅ 提取 dev_name 作为 serialNumber
String serialNumber = msgObj.getStr("dev_name", "unknown")
// ✅ 解析 status 并进行转换 (0=上线,1=下线)
Integer status = msgObj.getInt("status", -1)
Integer convertedStatus = (status == 0) ? 0 : (status == 1) ? 1 : status
// ✅ 构造转换后的 JSON payload
JSONObject newObj = new JSONObject()
newObj.put("serialNumber", serialNumber)
newObj.put("status", convertedStatus)
sysPayload = newObj.toString()
} catch (Exception e) {
msgContext.logger.error(" 数据解析失败:", e)
// 错误时返回默认 JSON
JSONObject errorObj = new JSONObject()
errorObj.put("serialNumber", "error")
errorObj.put("status", -1)
sysPayload = errorObj.toString()
}
// 3. 打印调试信息
msgContext.logger.info(" 新主题:" + sysTopic)
msgContext.logger.info(" 新内容:" + sysPayload)
// 4. 设置新的数据
msgContext.setTopic(sysTopic)
msgContext.setPayload(sysPayload)
msgContext.logger.info(" 输出:" + msgContext.getTopic());
// 执行Action动作参数(脚本由系统自动生成)
msgContext.setData("mqttBridgeID", 8);
# ONENET 物模型
# 物模型
{
msg = {
"notifyType": "property",
"productId": "ebm1xlK9QJ",
"messageType": "notify",
"data": {
"id": "1",
"params": {
"time": {
"time": 1747031766274,
"value": 3
},
"press": {
"time": 1747031766273,
"value": 4
},
"remove": {
"time": 1747031766273,
"value": 2
}
}
},
"deviceName": "D1231XS9KI9X"
},
signature = YoYfkfdyNiYfPmJPLzz5wg == ,
time = 1747031766259,
id = e20abecf848c408fb381bc03a23ebfd7,
nonce = xQLnbAzQ
}
[
{
"id": "params",
"remark": "",
"value": "3"
},
{
"id": "press",
"remark": "",
"value": "3"
},
{
"id": "remove",
"remark": "",
"value": "3"
}
]
[
{
"id": "press",
"remark": "",
"value": "3"
}
]
{
"id": "123",
"version": "1.0",
"params": {
"time": {
"value": 2,
"time": 1706673129818
},
"press": {
"value": 3,
"time": 1706673129818
},
"remove": {
"value": 2
"time": 1747031766273
}
}
}
{
"id": "1234567890123",
"version": "1.0",
"params": {
"press": { "value": 4 },
"remove": { "value": 2 },
"time": { "value": 3 }
}
}
# mqttfx
# 物模型规则脚本
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.hutool.core.util.NumberUtil;
Long productId = 136
String sysTopic = ""
String sysPayload = "[]"
try {
// 1. 获取原始payload
String payload = msgContext.getPayload().trim()
// 2. 安全提取msg JSON兼容各种格式
int msgStart = payload.indexOf('"msg":') + 6
int jsonStart = payload.indexOf('{', msgStart)
int jsonEnd = payload.lastIndexOf('}')
String msgJsonStr = payload.substring(jsonStart, jsonEnd + 1)
msgContext.logger.info(" 提取的msg JSON: " + msgJsonStr)
// 3. 解析JSON
JSONObject msgObj = JSONUtil.parseObj(msgJsonStr)
// 4. 获取设备名(确保字段名正确)
String serialNumber = msgObj.getStr("deviceName", "unknown")
// 5. 处理物模型数据(完全兼容的遍历方式)
JSONObject data = msgObj.getJSONObject("data")
JSONObject params = data.getJSONObject("params")
JSONArray propertyArray = new JSONArray()
// 使用兼容的遍历方式替代each闭包
Set<String> keys = params.keySet()
keys.each { key ->
JSONObject valueObj = params.getJSONObject(key)
if (valueObj != null) {
JSONObject prop = new JSONObject()
prop.set("id", key)
prop.set("remark", "")
prop.set("value", valueObj.getStr("value"))
propertyArray.add(prop)
}
}
// 6. 设置输出
sysPayload = propertyArray.toString()
sysTopic = "/${productId}/${serialNumber}/property/post"
} catch (Exception e) {
msgContext.logger.error(" 处理异常: " + e.getMessage())
} finally {
// 确保有效输出
msgContext.setTopic(sysTopic ?: "/${productId}/unknown/property/post")
msgContext.setPayload(sysPayload ?: "[]")
msgContext.setData("mqttBridgeID", 8)
// 验证日志
msgContext.logger.info("=== 最终结果验证 ===")
msgContext.logger.info(" 设备名: " + (sysTopic.split("/")[2] ?: "null"))
msgContext.logger.info("Payload 内容: " + sysPayload)
}
# 2合1脚本
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.hutool.core.util.NumberUtil;
// 公共配置
Long productId = 136
String sysTopic = ""
String sysPayload = ""
try {
// 1. 获取原始payload
String payload = msgContext.getPayload().trim()
msgContext.logger.info(" 原始数据:" + payload)
// 2. 提取msg内容
String msgContent = payload
if (payload.startsWith("{")) {
// 尝试直接解析为JSON
try {
JSONObject payloadObj = JSONUtil.parseObj(payload)
if (payloadObj.containsKey("msg")) {
msgContent = payloadObj.getStr("msg")
}
} catch (Exception e) {
// 可能是msg={...}格式
if (payload.contains("msg={")) {
int start = payload.indexOf("msg={") + 4
int end = payload.lastIndexOf("}") + 1
msgContent = payload.substring(start, end)
}
}
} else if (payload.contains("msg={")) {
// 处理msg={"key":"value"}格式
int start = payload.indexOf("msg={") + 4
int end = payload.lastIndexOf("}") + 1
msgContent = payload.substring(start, end)
} else if (payload.contains("msg=")) {
// 处理msg=JSON格式不带大括号
int start = payload.indexOf("msg=") + 4
msgContent = payload.substring(start)
}
// 3. 解析msg内容
JSONObject parsedObj = JSONUtil.parseObj(msgContent)
msgContext.logger.info(" 解析后的JSON对象" + parsedObj)
// 4. 判断数据类型并处理
if (parsedObj.containsKey("notifyType") && "property".equals(parsedObj.getStr("notifyType"))) {
// 物模型数据处理 =========================================
msgContext.logger.info(" 检测到物模型数据格式")
// 获取设备名
String serialNumber = parsedObj.getStr("deviceName", "unknown")
// 处理物模型数据
JSONObject data = parsedObj.getJSONObject("data")
JSONObject params = data.getJSONObject("params")
JSONArray propertyArray = new JSONArray()
params.keySet().each { key ->
JSONObject valueObj = params.getJSONObject(key)
if (valueObj != null) {
JSONObject prop = new JSONObject()
prop.set("id", key)
prop.set("remark", "")
prop.set("value", valueObj.getStr("value"))
propertyArray.add(prop)
}
}
// 设置物模型输出
sysPayload = propertyArray.toString()
sysTopic = "/${productId}/${serialNumber}/property/post"
} else if (parsedObj.containsKey("dev_name") || parsedObj.containsKey("status")) {
// 设备状态处理 ===========================================
msgContext.logger.info(" 检测到设备状态数据格式")
// 提取设备信息
String serialNumber = parsedObj.getStr("dev_name", "unknown")
Integer status = parsedObj.getInt("status", -1)
Integer convertedStatus = (status == 0) ? 0 : (status == 1) ? 1 : status
// 构造状态JSON
JSONObject statusObj = new JSONObject()
statusObj.put("serialNumber", serialNumber)
statusObj.put("status", convertedStatus)
// 设置状态输出
sysPayload = statusObj.toString()
sysTopic = "/special/device"
} else {
throw new Exception("无法识别的数据格式")
}
} catch (Exception e) {
msgContext.logger.error(" 处理异常: " + e.getMessage())
// 创建错误响应
JSONObject errorObj = new JSONObject()
errorObj.put("error", e.getMessage())
errorObj.put("originalData", msgContext.getPayload())
sysPayload = errorObj.toString()
sysTopic = "/error/unknown"
} finally {
// 确保有效输出
msgContext.setTopic(sysTopic)
msgContext.setPayload(sysPayload)
msgContext.setData("mqttBridgeID", 8)
// 记录最终结果
msgContext.logger.info("=== 处理结果 ===")
msgContext.logger.info(" 主题: " + sysTopic)
msgContext.logger.info("Payload: " + sysPayload)
}
# 压力传感器的id
{msg={at=1747123666940, imei=868256050123875, type=1, ds_id=3323_0_5700, value=1538.0, dev_id=2415928615}, msg_signature=mpYC8EvR7lB14AzqteyjhA==, nonce=nPyzGpMV}
{"msg":{"at":1747142877978,"imei":"868256050017168","type":1,"ds_id":"3323_0_5700","value":-1.0,"dev_id":2442383559},"msg_signature":"MHjUOUPj0GGfpKpyBZ6BzQ==","nonce":"TdTKIcVq"}
# 上下线消息
{msg={at=1747126859912, login_type=10, imei=868256050123875, type=2, dev_id=2442383559, status=1}, msg_signature=Z/cRScM2TLOUeaptE1hllg==, nonce=BSy0fFIY}
{"msg":{"at":1747126859912,"login_type":10,"imei":"868256050123875","type":2,"dev_id":2442383559,"status":1},"msg_signature":"Z/cRScM2TLOUeaptE1hllg==","nonce":"BSy0fFIY"}
# 移动消息
{"msg":{"at":1747275785411,"imei":"868256050099802","type":1,"ds_id":"30100_0_6500","value":"01","dev_id":2442561775},"msg_signature":"2dRxxL4t9HDW1EmRwIGhfQ==","nonce":"hM31WR88"}
脚本
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.hutool.core.util.NumberUtil;
// 默认 topic
String sysTopic = "/special/device";
String sysPayload = "";
// 获取原始 payload
String payload = msgContext.getPayload();
msgContext.logger.info("原始 payload" + payload);
try {
// 提取 msg 内容
def matcher = (payload =~ /"msg":\{(.+?)\}/);
if (!matcher.find()) {
throw new Exception("未找到 msg 字段");
}
String msgData = matcher[0][1];
msgContext.logger.info("提取出的 msg 内容:" + msgData);
// 将 msgData 转为 JSON 字符串key=value → "key":"value"
String jsonLike = msgData.replaceAll(/(\w+)=([^,}]+)/) { all, k, v ->
"\"${k}\":\"${v}\""
};
jsonLike = "{${jsonLike}}";
msgContext.logger.info("转换为 JSON 格式字符串:" + jsonLike);
JSONObject msgObj = JSONUtil.parseObj(jsonLike);
// 判断是否是物模型数据(包含 ds_id 字段)
if (msgObj.containsKey("ds_id")) {
String dsid = msgObj.getStr("ds_id");
if ("3323_0_5700" != dsid) {
msgContext.logger.info("不处理的 ds_id" + dsid);
return; // 直接跳过不处理
}
// ✅ 物模型处理
String imei = msgObj.getStr("imei", "unknown");
String valueStr = msgObj.getStr("value", "0");
Double valueNum = NumberUtil.parseNumber(msgObj.getStr("value", "0")).doubleValue();
JSONArray array = new JSONArray();
JSONObject data = new JSONObject();
data.put("id", "press");
data.put("remark", "");
data.put("value", valueNum);
array.add(data);
sysPayload = array.toString();
sysTopic = "/138/${imei}/property/post";
} else {
// ✅ 在线离线数据处理
String imei = msgObj.getStr("imei", "unknown");
Integer status = NumberUtil.parseInt(msgObj.getStr("status", "-1"));
Integer loginType = NumberUtil.parseInt(msgObj.getStr("login_type", "-1"));
Long timestamp = msgObj.getLong("at", 0);
JSONObject newPayload = new JSONObject();
newPayload.put("serialNumber", imei);
newPayload.put("status", status);
newPayload.put("loginType", loginType);
newPayload.put("timestamp", timestamp);
sysPayload = newPayload.toString();
sysTopic = "/special/device";
}
} catch (Exception e) {
msgContext.logger.error("解析出错:", e);
JSONObject errorPayload = new JSONObject();
errorPayload.put("serialNumber", "error");
errorPayload.put("status", -1);
sysPayload = errorPayload.toString();
sysTopic = "/special/device";
}
// 设置 topic 和 payload
msgContext.setTopic(sysTopic);
msgContext.setPayload(sysPayload);
// 打印调试信息
msgContext.logger.info("新主题:" + sysTopic);
msgContext.logger.info("新内容:" + sysPayload);
msgContext.logger.info("输出:" + msgContext.getTopic());
// 可选设置
// 执行Action动作参数(脚本由系统自动生成)
msgContext.setData("mqttBridgeID", 7);
// 执行Action动作参数(脚本由系统自动生成)