JiangShan-web/MQTT接口.md
2025-05-22 16:20:13 +08:00

16 KiB
Raw Permalink Blame History

在线离线接口

主题:

/productid/deviceid/status/post

原始http推送接口

http://121.41.45.13:8025/receive

abcdefghijkmlnopqrstuvwxyz

新的http推送接口

http://121.36.111.27:8080/bridge/get

消息格式json格式具体字段如下其中status取值4离线3在线

{"status":3,"isShadow":1,"rssi":-51}

onenet平台上下线信息

{msg={"dev_name":"1","at":1742873510747,"pid":"JYr2f72uSJ","type":2,"status":1}, signature=25BcZvEQ+kk10rfB9gLnnQ==, time=1742873510772, id=5614fc489df3433f9891d6d4c7c3e5e7, nonce=b1R7hjgp} "status":1上线 0下线

物模型推送接口: 将数据推送至FASTBEE端

/productid/deviceid/property/post

消息格式json格式具体字段如下

[{ "id": "co2", "value": "1", "remark": "" }]

规则引擎仅作于设备状态更新

import cn.hutool.json.JSONArray; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import cn.hutool.core.util.NumberUtil;

Long productId = 136 // 固定 productId String sysTopic = "" // 系统主题 String sysPayload = "" // 系统数据格式 // 1. 获取原始内容 String payload = msgContext.getPayload() msgContext.logger.info("原始数据:" + payload)

// 2. 只提取 msg 内容 try { // 使用 Groovy 兼容的正则写法,提取 msg 内的数据 String msgData = (payload =~ /msg={(.+?)}/)[0][1] msgContext.logger.info("提取后的 msg 数据:" + msgData)

// ✅ 解析提取后的数据
JSONObject msgObj = JSONUtil.parseObj("{" + msgData + "}")

// ✅ 提取 dev_name 作为 serialNumber
String serialNumber = msgObj.getStr("dev_name", "unknown")

// ✅ 解析 status 并进行转换
Integer status = msgObj.getInt("status", -1)
Integer convertedStatus = (status == 0) ? 4 : (status == 1) ? 3 : status

// ✅ 构造转换后的 sysPayload
JSONObject newObj = new JSONObject()
newObj.put("status", convertedStatus)
newObj.put("isShadow", 1)
newObj.put("rssi", msgObj.getInt("rssi", -51))  // 默认 rssi = -51

sysPayload = newObj.toString()
sysTopic = "/" + productId + "/" + serialNumber + "/status/post"

} catch (Exception e) { msgContext.logger.error("数据解析失败:", e) sysPayload = "{}" }

// 3. 打印调试信息 msgContext.logger.info("新主题:" + sysTopic) msgContext.logger.info("新内容:" + sysPayload) msgContext.logger.info("NewTopic 长度: " + sysTopic.length());

// 4. 设置新的数据 msgContext.setTopic(sysTopic) msgContext.setPayload(sysPayload) msgContext.logger.info("输出:" + msgContext.getTopic()); // 执行Action动作参数(脚本由系统自动生成) msgContext.setData("mqttBridgeID", 8);

定制设备状态

import cn.hutool.json.JSONArray; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import cn.hutool.core.util.NumberUtil;

String sysTopic = "/special/device" // 固定主题 String sysPayload = "" // 系统数据格式

// 1. 获取原始内容 String payload = msgContext.getPayload() msgContext.logger.info(" 原始数据:" + payload)

// 2. 只提取 msg 内容 try { // 使用 Groovy 兼容的正则写法,提取 msg 内的数据 String msgData = (payload =~ /msg={(.+?)}/)[0][1] msgContext.logger.info(" 提取后的 msg 数据:" + msgData)

// ✅ 解析提取后的数据
JSONObject msgObj = JSONUtil.parseObj("{"  + msgData + "}")

// ✅ 提取 dev_name 作为 serialNumber 
String serialNumber = msgObj.getStr("dev_name",  "unknown")

// ✅ 解析 status 并进行转换 (0=上线,1=下线)
Integer status = msgObj.getInt("status",  -1)
Integer convertedStatus = (status == 0) ? 0 : (status == 1) ? 1 : status

// ✅ 构造转换后的 JSON payload 
JSONObject newObj = new JSONObject()
newObj.put("serialNumber",  serialNumber)
newObj.put("status",  convertedStatus)

sysPayload = newObj.toString() 

} catch (Exception e) { msgContext.logger.error(" 数据解析失败:", e) // 错误时返回默认 JSON JSONObject errorObj = new JSONObject() errorObj.put("serialNumber", "error") errorObj.put("status", -1) sysPayload = errorObj.toString() }

// 3. 打印调试信息 msgContext.logger.info(" 新主题:" + sysTopic) msgContext.logger.info(" 新内容:" + sysPayload)

// 4. 设置新的数据 msgContext.setTopic(sysTopic) msgContext.setPayload(sysPayload) msgContext.logger.info(" 输出:" + msgContext.getTopic()); // 执行Action动作参数(脚本由系统自动生成) msgContext.setData("mqttBridgeID", 8);

ONENET 物模型

物模型

{ msg = { "notifyType": "property", "productId": "ebm1xlK9QJ", "messageType": "notify", "data": { "id": "1", "params": { "time": { "time": 1747031766274, "value": 3 }, "press": { "time": 1747031766273, "value": 4 }, "remove": { "time": 1747031766273, "value": 2 } } }, "deviceName": "D1231XS9KI9X" }, signature = YoYfkfdyNiYfPmJPLzz5wg == , time = 1747031766259, id = e20abecf848c408fb381bc03a23ebfd7, nonce = xQLnbAzQ }

[ { "id": "params", "remark": "", "value": "3" }, { "id": "press", "remark": "", "value": "3" }, { "id": "remove", "remark": "", "value": "3" } ]

[ { "id": "press", "remark": "", "value": "3" } ]

{ "id": "123", "version": "1.0", "params": { "time": { "value": 2, "time": 1706673129818 }, "press": { "value": 3, "time": 1706673129818 }, "remove": { "value": 2 "time": 1747031766273 } } }

{ "id": "1234567890123", "version": "1.0", "params": { "press": { "value": 4 }, "remove": { "value": 2 }, "time": { "value": 3 } } }

mqttfx

物模型规则脚本

import cn.hutool.json.JSONArray; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import cn.hutool.core.util.NumberUtil;

Long productId = 136 String sysTopic = "" String sysPayload = "[]"

try { // 1. 获取原始payload String payload = msgContext.getPayload().trim()

// 2. 安全提取msg JSON兼容各种格式
int msgStart = payload.indexOf('"msg":')  + 6 
int jsonStart = payload.indexOf('{',  msgStart)
int jsonEnd = payload.lastIndexOf('}') 

String msgJsonStr = payload.substring(jsonStart,  jsonEnd + 1)
msgContext.logger.info(" 提取的msg JSON: " + msgJsonStr)

// 3. 解析JSON 
JSONObject msgObj = JSONUtil.parseObj(msgJsonStr) 

// 4. 获取设备名(确保字段名正确)
String serialNumber = msgObj.getStr("deviceName",  "unknown")

// 5. 处理物模型数据(完全兼容的遍历方式)
JSONObject data = msgObj.getJSONObject("data") 
JSONObject params = data.getJSONObject("params") 

JSONArray propertyArray = new JSONArray()
// 使用兼容的遍历方式替代each闭包 
Set<String> keys = params.keySet() 
keys.each  { key ->
    JSONObject valueObj = params.getJSONObject(key) 
    if (valueObj != null) {
        JSONObject prop = new JSONObject()
        prop.set("id",  key)
        prop.set("remark",  "")
        prop.set("value",  valueObj.getStr("value")) 
        propertyArray.add(prop) 
    }
}

// 6. 设置输出 
sysPayload = propertyArray.toString() 
sysTopic = "/${productId}/${serialNumber}/property/post"

} catch (Exception e) { msgContext.logger.error(" 处理异常: " + e.getMessage()) } finally { // 确保有效输出 msgContext.setTopic(sysTopic ?: "/${productId}/unknown/property/post") msgContext.setPayload(sysPayload ?: "[]") msgContext.setData("mqttBridgeID", 8)

// 验证日志 
msgContext.logger.info("===  最终结果验证 ===")
msgContext.logger.info(" 设备名: " + (sysTopic.split("/")[2]  ?: "null"))
msgContext.logger.info("Payload 内容: " + sysPayload)

}

2合1脚本

import cn.hutool.json.JSONArray; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import cn.hutool.core.util.NumberUtil;

// 公共配置 Long productId = 136 String sysTopic = "" String sysPayload = ""

try { // 1. 获取原始payload String payload = msgContext.getPayload().trim() msgContext.logger.info(" 原始数据:" + payload)

// 2. 提取msg内容
String msgContent = payload 
if (payload.startsWith("{"))  {
    // 尝试直接解析为JSON
    try {
        JSONObject payloadObj = JSONUtil.parseObj(payload) 
        if (payloadObj.containsKey("msg"))  {
            msgContent = payloadObj.getStr("msg") 
        }
    } catch (Exception e) {
        // 可能是msg={...}格式 
        if (payload.contains("msg={"))  {
            int start = payload.indexOf("msg={")  + 4
            int end = payload.lastIndexOf("}")  + 1
            msgContent = payload.substring(start,  end)
        }
    }
} else if (payload.contains("msg={"))  {
    // 处理msg={"key":"value"}格式
    int start = payload.indexOf("msg={")  + 4
    int end = payload.lastIndexOf("}")  + 1
    msgContent = payload.substring(start,  end)
} else if (payload.contains("msg="))  {
    // 处理msg=JSON格式不带大括号
    int start = payload.indexOf("msg=")  + 4 
    msgContent = payload.substring(start) 
}

// 3. 解析msg内容 
JSONObject parsedObj = JSONUtil.parseObj(msgContent) 
msgContext.logger.info(" 解析后的JSON对象" + parsedObj)

// 4. 判断数据类型并处理 
if (parsedObj.containsKey("notifyType")  && "property".equals(parsedObj.getStr("notifyType")))  {
    // 物模型数据处理 ========================================= 
    msgContext.logger.info(" 检测到物模型数据格式")
    
    // 获取设备名 
    String serialNumber = parsedObj.getStr("deviceName",  "unknown")
    
    // 处理物模型数据
    JSONObject data = parsedObj.getJSONObject("data") 
    JSONObject params = data.getJSONObject("params") 
    
    JSONArray propertyArray = new JSONArray()
    params.keySet().each  { key ->
        JSONObject valueObj = params.getJSONObject(key) 
        if (valueObj != null) {
            JSONObject prop = new JSONObject()
            prop.set("id",  key)
            prop.set("remark",  "")
            prop.set("value",  valueObj.getStr("value")) 
            propertyArray.add(prop) 
        }
    }
    
    // 设置物模型输出 
    sysPayload = propertyArray.toString() 
    sysTopic = "/${productId}/${serialNumber}/property/post"
    
} else if (parsedObj.containsKey("dev_name")  || parsedObj.containsKey("status"))  {
    // 设备状态处理 =========================================== 
    msgContext.logger.info(" 检测到设备状态数据格式")
    
    // 提取设备信息 
    String serialNumber = parsedObj.getStr("dev_name",  "unknown")
    Integer status = parsedObj.getInt("status",  -1)
    Integer convertedStatus = (status == 0) ? 0 : (status == 1) ? 1 : status
    
    // 构造状态JSON 
    JSONObject statusObj = new JSONObject()
    statusObj.put("serialNumber",  serialNumber)
    statusObj.put("status",  convertedStatus)
    
    // 设置状态输出
    sysPayload = statusObj.toString() 
    sysTopic = "/special/device"
    
} else {
    throw new Exception("无法识别的数据格式")
}

} catch (Exception e) { msgContext.logger.error(" 处理异常: " + e.getMessage())

// 创建错误响应 
JSONObject errorObj = new JSONObject()
errorObj.put("error",  e.getMessage()) 
errorObj.put("originalData",  msgContext.getPayload()) 

sysPayload = errorObj.toString() 
sysTopic = "/error/unknown"

} finally { // 确保有效输出 msgContext.setTopic(sysTopic) msgContext.setPayload(sysPayload) msgContext.setData("mqttBridgeID", 8)

// 记录最终结果
msgContext.logger.info("===  处理结果 ===")
msgContext.logger.info(" 主题: " + sysTopic)
msgContext.logger.info("Payload:  " + sysPayload)

}

压力传感器的id

{msg={at=1747123666940, imei=868256050123875, type=1, ds_id=3323_0_5700, value=1538.0, dev_id=2415928615}, msg_signature=mpYC8EvR7lB14AzqteyjhA==, nonce=nPyzGpMV}

{"msg":{"at":1747142877978,"imei":"868256050017168","type":1,"ds_id":"3323_0_5700","value":-1.0,"dev_id":2442383559},"msg_signature":"MHjUOUPj0GGfpKpyBZ6BzQ==","nonce":"TdTKIcVq"}

上下线消息

{msg={at=1747126859912, login_type=10, imei=868256050123875, type=2, dev_id=2442383559, status=1}, msg_signature=Z/cRScM2TLOUeaptE1hllg==, nonce=BSy0fFIY}

{"msg":{"at":1747126859912,"login_type":10,"imei":"868256050123875","type":2,"dev_id":2442383559,"status":1},"msg_signature":"Z/cRScM2TLOUeaptE1hllg==","nonce":"BSy0fFIY"}

移动消息

{"msg":{"at":1747275785411,"imei":"868256050099802","type":1,"ds_id":"30100_0_6500","value":"01","dev_id":2442561775},"msg_signature":"2dRxxL4t9HDW1EmRwIGhfQ==","nonce":"hM31WR88"}

脚本

import cn.hutool.json.JSONArray; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import cn.hutool.core.util.NumberUtil;

// 默认 topic String sysTopic = "/special/device"; String sysPayload = "";

// 获取原始 payload String payload = msgContext.getPayload(); msgContext.logger.info("原始 payload" + payload);

try { // 提取 msg 内容 def matcher = (payload =~ /"msg":{(.+?)}/); if (!matcher.find()) { throw new Exception("未找到 msg 字段"); }

String msgData = matcher[0][1];
msgContext.logger.info("提取出的 msg 内容:" + msgData);

// 将 msgData 转为 JSON 字符串key=value → "key":"value"
String jsonLike = msgData.replaceAll(/(\w+)=([^,}]+)/) { all, k, v ->
    "\"${k}\":\"${v}\""
};
jsonLike = "{${jsonLike}}";
msgContext.logger.info("转换为 JSON 格式字符串:" + jsonLike);

JSONObject msgObj = JSONUtil.parseObj(jsonLike);

// 判断是否是物模型数据(包含 ds_id 字段)
if (msgObj.containsKey("ds_id")) {
    String dsid = msgObj.getStr("ds_id");
    if ("3323_0_5700" != dsid) {
        msgContext.logger.info("不处理的 ds_id" + dsid);
        return; // 直接跳过不处理
    }

    // ✅ 物模型处理
    String imei = msgObj.getStr("imei", "unknown");
    String valueStr = msgObj.getStr("value", "0");
    Double valueNum = NumberUtil.parseNumber(msgObj.getStr("value", "0")).doubleValue();

    JSONArray array = new JSONArray();
    JSONObject data = new JSONObject();
    data.put("id", "press");
    data.put("remark", "");
    data.put("value", valueNum);
    array.add(data);

    sysPayload = array.toString();
    sysTopic = "/138/${imei}/property/post";

} else {
    // ✅ 在线离线数据处理
    String imei = msgObj.getStr("imei", "unknown");
    Integer status = NumberUtil.parseInt(msgObj.getStr("status", "-1"));
    Integer loginType = NumberUtil.parseInt(msgObj.getStr("login_type", "-1"));
    Long timestamp = msgObj.getLong("at", 0);

    JSONObject newPayload = new JSONObject();
    newPayload.put("serialNumber", imei);
    newPayload.put("status", status);
    newPayload.put("loginType", loginType);
    newPayload.put("timestamp", timestamp);

    sysPayload = newPayload.toString();
    sysTopic = "/special/device";
}

} catch (Exception e) { msgContext.logger.error("解析出错:", e); JSONObject errorPayload = new JSONObject(); errorPayload.put("serialNumber", "error"); errorPayload.put("status", -1); sysPayload = errorPayload.toString(); sysTopic = "/special/device"; }

// 设置 topic 和 payload msgContext.setTopic(sysTopic); msgContext.setPayload(sysPayload);

// 打印调试信息 msgContext.logger.info("新主题:" + sysTopic); msgContext.logger.info("新内容:" + sysPayload); msgContext.logger.info("输出:" + msgContext.getTopic());

// 可选设置 // 执行Action动作参数(脚本由系统自动生成) msgContext.setData("mqttBridgeID", 7); // 执行Action动作参数(脚本由系统自动生成)