物联网宠儿mqtt.js那些事儿

简介: 物联网宠儿mqtt.js那些事儿

image.png


常见的mq有Kafka,RocketMQ和RabbitMQ,大家也很常见。 前者很常见,属于微服务间的mq。那么MQTT是什么呢?MQTT属于IoT也就是物联网的概念。

快来和使用mqtt.js开发IM功能2年的作者一探究竟吧~

先来看下MQTT在物联网领域的应用场景:



mqtt.js是MQTT在nodejs端的实现。

通过npm package.json包管理,现代vue技术栈下的前端也可用,比如用vue-cli,create-react-app等等构建的项目。


mqtt.js官方为微信小程序和支付宝小程序也做了支持。微信小程序的MQTT协议名为wxs,支付宝小程序则是alis


如果还是一脸懵逼,那么就跟随我通过mqtt.js去认识一下这个物联网领域的宠儿吧。


  • 什么是微消息队列?
  • MQTT关键名词解释
  • P2P消息和Pub/Sub消息
  • 封装的mqtt.js通用class
  • 客户端发包函数sendPacket
  • 客户端连接 mqtt.connect()
  • 订阅topic mqtt.Client#subscribe()
  • 发送消息 mqtt.Client#publish()
  • 接收消息 mqtt.Client#“message”事件


什么是微消息队列?


消息队列一般分为两种:


  • 微服务消息队列(微服务间信息传递,典型代表有RabbitMQ,Kafka,RocketMQ)
  • 物联网消息队列(物联网端与云端消息传递,代表有MQTT)


目前我实践过的,也就是我们本篇博文深入分析的,是物联网消息队列的mqtt.js。


传统的消息队列(微服务间信息传递)


传统的微服务间(多个子系统服务端间)消息队列是一种非常常见的服务端间消息传递的方式。


典型代表有:RabbitMQ,Kafka,RocketMQ。

阿里云官网拥有AMQP(兼容RabbitMQ),Kafka,和RocketMQ这三种微服务消息队列,对于我们今后在实际项目中落地提供了很大的帮助。


使用场景多种多样:

  • 高并发:秒杀、抢票(FIFO)
  • 共享型:积分兑换(多子系统共用积分模块)
  • 通信型:服务端间消息传递(nodejs,java,python,go等等)


MQTT消息队列(物联网端与云间消息传递)


MQTT是一个物联网MQTT协议,主要解决的是物联网IoT网络情况复杂的问题。


阿里云有MQTT消息队列服务。通信协议支持MQTT,STOMP,GB-808等。数据传输层支持TCP长连接、SSL加密、Websocket等。


使用场景主要为数据传输:

  1. 车联网(远程控制,汽车数据上传)
  2. IM通讯(1对1单聊,1对多朋友圈)
  3. 视频直播(弹幕通知,聊天互动)
  4. 智能家居(电器数据上传,遥控指令)


目前我手上负责的运行了2年的聊天系统就是使用的这个服务,我们主要按照设备<->server<->PC的方式,MQTT协议,Websocket传输协议进行设备与PC间的数据通信。


MQTT关键名词解释


实例(Instance)


每个MQTT实例都对应一个全局唯一的服务接入点。

肉眼可见的区别就是在通过mqtt.connect(url)与server(broker)建立连接时,broker的url都是一致的。

假设有saleman1,salesman2···他们本地的前端与服务端间建立连接的url都是统一的,只是在clientId进行区分即可。


客户端Id(Client ID)


MQTT的Client ID是每个客户端的唯一标识,要求全局都是唯一的,使用同一个Client ID连接会被拒绝。

阿里云的ClientID由两部分组成 <GroupID>@@@<DeviceID>

通常情况下Group ID是多前端统一的,比如PC端,安卓移动端,ios移动端,DeviceID也是多前端统一的。

那么如何区分多端呢?可以对Client ID中间的@@@做修改。

比如:

let CID_PC = `<GroupID>@@@-PC<DeviceID>`
let CID_Android = `<GroupID>@@@-Android<DeviceID>`
let CID_IOS = `<GroupID>@@@-IOS<DeviceID>`


组Id(Group ID)


用于指定一组逻辑功能完全一致的节点公用的组名,代表的是一类相同功能的设备。


Device ID

每个设备独一无二的标识。这个需要保证全局唯一,可以是每个传感器设备的序列号,可以是登录PC的userId。


父主题(Parent Topic)

MQTT协议基于Pub/Sub模型,任何消息都属于一个Topic。

Topic可以存在多级,第一级为父级Topic。

需要控制台单独创建。


子主题(Subtopic)

MQTT可以有二级Topic,也可以有三级Topic。

无需创建,代码中直接写即可。


P2P消息和Pub/Sub消息


Pub/Sub消息就是订阅和发布的模式,类似事件监听和广播。

如果对发布订阅不理解,可以去看Webhook到底是个啥?

MQTT除了支持Pub/Sub的模式,还支持P2P的模式。


什么是P2P消息?

  • P2P,全称为(Point to Point)。
  • 一对一的消息收发模式,只有一个消息发送者和一个消息接收者。
  • P2P模式下,消息发送者明确知道消息的预期接收者,并且这个消息只能被这个特定的客户端消费
  • 发送者发送消息时,通过Topic指定接收者,接收者无需订阅即可获得该消息。
  • P2P 模式不仅降低注册订阅的成本,而且因为对链路有优化,所以降低推送延迟。


P2P模式和Pub/Sub模式的区别

发送消息时

  • Pub/Sub模式下,发送者需要按照与接受者约定好的Topic发送消息
  • P2P模式下,发送者无需按照Tpic发送,可以直接按照规范进行发送

接收消息时

  • Pub/Sub模式下,接收者需要提前订阅topic才能接消息
  • P2P模式下无需订阅即可接收消息


nodejs发送P2P消息


const p2pTopic =topic+"/p2p/GID_xxxx@@@DEVICEID_001";
mqtt.client.publish(p2pTopic);


封装的mqtt.js通用class


  • 客户端连接 initClient(config)
  • 订阅topic subscribeTopic(topic, config)
  • 发送消息 publishMessage(message)
  • 接收消息 handleMessage(callback)


import mqtt from 'mqtt';
import config from '@/config';
export default class MQTT {
  constructor(options) {
    this.name = options.name;
    this.connecting = false;
  }
  /**
   * 客户端连接
   */
  initClient(config) {
    const { url, groupId, key, password, topic: { publish: publishTopic }} = config;
    return new Promise((resolve) => {
      this.client = mqtt.connect(
        {
          url,
          clientId: `${groupId}@@@${deviceId}`,
          username: key,
          password,
        }
      );
      this.client.on('connect', () => {
        this.connecting = true;
        resolve(this);
      });
    });
  }
  /**
   * 订阅topic
   */
  subscribeTopic(topic, config) {
    if (this.connecting) {
      this.client.subscribe(topic, config);
    }
    return this;
  }
  /**
   * 发送消息
   */
  publishMessage(message) {
    this.client.publish(publishTopic, message, { qos: 1 });
  }
  /**
   * 接收消息
   */
  handleMessage(callback) {
    if (!this.client._events.message) {
      this.client.on('message', callback);
    }
  }
}

客户端发包函数sendPacket


mqtt-packet生成一个可传输buffer

var mqtt = require('mqtt-packet')
var object = {
  cmd: 'publish',
  retain: false,
  qos: 0,
  dup: false,
  length: 10,
  topic: 'test',
  payload: 'test' // Can also be a Buffer
}
var opts = { protocolVersion: 4 } // default is 4. Usually, opts is a connect packet
console.log(mqtt.generate(object))
// Prints:
//
// <Buffer 30 0a 00 04 74 65 73 74 74 65 73 74>
//
// Which is the same as:
//
// new Buffer([
//   48, 10, // Header (publish)
//   0, 4, // Topic length
//   116, 101, 115, 116, // Topic (test)
//   116, 101, 115, 116 // Payload (test)
// ])

sendPacket函数


发出packetsend事件并且通过mqtt.writeToStream将packet写入client的stream中。

var mqttPacket = require('mqtt-packet')
function sendPacket (client, packet) {
  client.emit('packetsend', packet)
  mqttPacket.writeToStream(packet, client.stream, client.options)
}

_sendPack方法

MqttClient.prototype._sendPacket = function (packet) {
     sendPacket(this, packet);
}


客户端连接 mqtt.connect()


mqtt client建立与mqtt server(broker)的连接,通常是通过给定一个'mqtt', 'mqtts', 'tcp', 'tls', 'ws', 'wss', 'wxs' , 'alis'为协议的url进行连接。
mqtt.connect([url], options)


官方说明:

  • 通过给定的url和配置连接到一个broker,并且返回一个Client。
  • url可以遵循以下协议:'mqtt', 'mqtts', 'tcp', 'tls', 'ws', 'wss', 'wxs' , 'alis'。(mqtt.js支持微信小程序和支付宝小程序,协议分别为wxs和alis。
  • url也可以是通过URL.parse()返回的对象。
  • 可以传入一个单对象,既包含url又包含选项。

再来看一下我手上项目的连接配置,连接结果。

敏感信息已通过foo,bar,baz或者xxxx的组合进行数据脱敏处理。


连接配置

 {
    key: 'xxxxxxxx',
    secret: 'xxxxxxxx',
    url: 'wss://foo-bar.mqtt.baz.com/mqtt',
    groupId: 'FOO_BAR_BAZ_GID',
    topic: {
      publish: 'PUBLISH_TOPIC',
      subscribe: ['SUBSCRIBE_TOPIC/noticePC/', 'SUBSCRIBE_TOPIC/p2p'],
      unsubscribe: 'SUBSCRIBE_TOPIC/noticeMobile/',
    },
}


  • key 账号
  • secret 密码
  • url 用于建立client与server(broker)mqtt连接的链接
  • groupId 组id
  • topic 发送消息的topic,订阅的topic,取消订阅的topic


连接结果


包括总览,响应头和请求头。
General

Request URL: wss://foo-bar.mqtt.baz.com
Request Method: GET
Status Code: 101 Switching Protocols

Response Header

HTTP/1.1 101 Switching Protocols
upgrade: websocket
connection: upgrade
sec-websocket-accept: xxxxxxx
sec-websocket-protocol: mqtt

Request Header

GET wss://foo-bar.mqtt.baz.com/ HTTP/1.1
Host: foo-bar.mqtt.baz.com
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36
Upgrade: websocket
Origin: https://xxx.xxx.com
Sec-WebSocket-Version: 13
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-TW;q=0.6
Sec-WebSocket-Key: xxxxxxxxx
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Protocol: mqtt
源码分析


下面来看这段mqtt连接的代码。

this.client = mqtt.connect(
{
  url,
  clientId: `${groupId}@@@${deviceId}`,
  username: key,
  password,
}
);
function parseAuthOptions (opts) {
var matches
if (opts.auth) {
  matches = opts.auth.match(/^(.+):(.+)$/)
  if (matches) {
    opts.username = matches[1]
    opts.password = matches[2]
  } else {
    opts.username = opts.auth
  }
}
}
/**
 * connect - connect to an MQTT broker.
 *
 * @param {String} [brokerUrl] - url of the broker, optional
 * @param {Object} opts - see MqttClient#constructor
 */
function connect (brokerUrl, opts) {
if ((typeof brokerUrl === 'object') && !opts) {
  //  可以传入一个单对象,既包含url又包含选项
  opts = brokerUrl
  brokerUrl = null
}
opts = opts || {}
// 设置username和password
parseAuthOptions(opts)
if (opts.query && typeof opts.query.clientId === 'string') {
  // 设置Client Id
  opts.clientId = opts.query.clientId
}
function wrapper (client) {
 ...
  return protocols[opts.protocol](client, opts)
}
// 最终返回一个mqtt client实例
return new MqttClient(wrapper, opts)
}


订阅topic mqtt.Client#subscribe()


实际代码

const topic =  {
      subscribe: ['SUBSCRIBE_TOPIC/noticePC/', 'SUBSCRIBE_TOPIC/p2p'],
      unsubscribe: 'SUBSCRIBE_TOPIC/noticeMobile/',
};
const config = { qos:1 };
this.client.subscribe(topic.subscribe, config)


源码分析

MqttClient.prototype.subscribe = function () {
  var packet
  var args = new Array(arguments.length)
  for (var i = 0; i < arguments.length; i++) {
    args[i] = arguments[i]
  }
  var subs = []
   // obj为订阅的topic列表
  var obj = args.shift()
  // qos等配置
  var opts = args.pop()
  var defaultOpts = {
    qos: 0
  }
  opts = xtend(defaultOpts, opts)
  // 数组类型的订阅的topic列表  
  if (Array.isArray(obj)) {
    obj.forEach(function (topic) {
      if (!that._resubscribeTopics.hasOwnProperty(topic) ||
        that._resubscribeTopics[topic].qos < opts.qos ||
          resubscribe) {
        var currentOpts = {
          topic: topic,
          qos: opts.qos
        }
        // subs是最终的订阅的topic列表
        subs.push(currentOpts)
      }
    })
  }
  // 这个packet很重要
  packet = {
    // 发出订阅命令
    cmd: 'subscribe',
    subscriptions: subs,
    qos: 1,
    retain: false,
    dup: false,
    messageId: this._nextId()
  }
  // 发出订阅包
  this._sendPacket(packet)
  return this
}


发送消息 mqtt.Client#publish()


实际代码


const topic = {
      publish: 'PUBLISH_TOPIC',
};
const messge = {
   foo: '',
   bar: '',
   baz: '',
   ...
}
const msgStr = JSON.stringify(message);
this.client.publish(topic.publish, msgStr);


注意publish的消息需要使用JSON.stringify进行序列化,然后再发到指定的topic。


源码分析

MqttClient.prototype.publish = function (topic, message, opts, callback) {
  var packet
  var options = this.options
  var defaultOpts = {qos: 0, retain: false, dup: false}
  opts = xtend(defaultOpts, opts)
  // 将消息传入packet的payload
  packet = {
    cmd: 'publish',
    topic: topic,
    payload: message,
    qos: opts.qos,
    retain: opts.retain,
    messageId: this._nextId(),
    dup: opts.dup
  }
  // 处理不同qos
  switch (opts.qos) {
    case 1:
    case 2:
       // 发出publish packet
       this._sendPacketI(packet);
        ...
    default:
       this._sendPacket(packet);
        ...
  }
  return this
}

接收消息 mqtt.Client “message”事件


实际代码

this.client.on('message', callback);

数据以callback的方式接收。

function (topic, message, packet) {}

topic代表接收到的topic,buffer则是具体的数据。

message是接收到的数据,谨记通过JSON.parse()对buffer做解析。

handleMessage(callback) {
    this.client.on('message', callback);
}
this.client.handleMessage((topic, buffer) => {
  let receiveMsg = null;
  try {
   receiveMsg = JSON.parse(buffer.toString());
  } catch (e) {
   receiveMsg = null;
  }
  if (!receiveMsg) {
    return;
  }
  ...do something with receiveMsg...
});


源码分析

MqttClient继承了EventEmitter。

从而进行可以使用on监听“message”事件。

inherits(MqttClient, EventEmitter)

那么到底是在哪里间发出message事件的呢?>emit the message event

  1. 基于websocket-stream建立websocket连接
  2. 使用pipe连接基于readable-stream.Writable创建的可写流
  3. nextTick调用_handlePacket
  4. 在handlePacket中调用handlePublish发出message事件
1.基于websocket-stream建立websocket连接


this.stream = this.streamBuilder(this)
function streamBuilder (client, opts) {
  return createWebSocket(client, opts)
}
var websocket = require('websocket-stream')
function createWebSocket (client, opts) {
  var websocketSubProtocol =
    (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
      ? 'mqttv3.1'
      : 'mqtt'
  setDefaultOpts(opts)
  var url = buildUrl(opts, client)
  return websocket(url, [websocketSubProtocol], opts.wsOptions)
}
2. 使用pipe连接基于readable-stream.Writable创建的可写流


var Writable = require('readable-stream').Writable
var writable = new Writable();
this.stream.pipe(writable);
3.nextTick调用_handlePacket
writable._write = function (buf, enc, done) {
    completeParse = done
    parser.parse(buf)
    work()
}
function work () {
    var packet = packets.shift()
    if (packet) {
      that._handlePacket(packet, nextTickWork)
    }
}
function nextTickWork () {
    if (packets.length) {
      process.nextTick(work)
    } else {
      var done = completeParse
      completeParse = null
      done()
    }
}
4. 在handlePacket中调用handlePublish发出message事件


MqttClient.prototype._handlePacket = function (packet, done) {
  switch (packet.cmd) {
    case 'publish':
      this._handlePublish(packet, done)
      break
   ...
}
// emit the message event
MqttClient.prototype._handlePublish = function (packet, done) {
  switch (qos) {
    case 1: {
      // emit the message event
        if (!code) { that.emit('message', topic, message, packet) }
    }
}


参考资料:

相关文章
|
1天前
|
消息中间件 网络协议 物联网
MQTT常见问题之物联网设备端申请动态注册时MQTT服务不可用如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
1天前
|
消息中间件 DataWorks 物联网
MQTT问题之接入阿里云物联网平台如何解决
MQTT接入是指将设备或应用通过MQTT协议接入到消息服务器,以实现数据的发布和订阅;本合集着眼于MQTT接入的流程、配置指导以及常见接入问题的解决方法,帮助用户实现稳定可靠的消息交换。
211 1
|
1天前
|
网络协议 物联网 网络性能优化
物联网网络协议-MQTT协议的使用
物联网网络协议-MQTT协议的使用
156 2
|
1天前
|
Java Maven
【开源视频联动物联网平台】vertx写一个mqtt客户端
【开源视频联动物联网平台】vertx写一个mqtt客户端
50 1
|
1天前
【开源视频联动物联网平台】vertx写一个mqtt服务端
【开源视频联动物联网平台】vertx写一个mqtt服务端
26 1
|
1天前
|
消息中间件 存储 物联网
|
1天前
|
JSON 物联网 开发工具
MQTT协议问题之如何搭建物联网空调的服务器
MQTT协议是一个轻量级的消息传输协议,设计用于物联网(IoT)环境中设备间的通信;本合集将详细阐述MQTT协议的基本原理、特性以及各种实际应用场景,供用户学习和参考。
85 1
|
1天前
|
消息中间件 网络协议 物联网
MQTT协议问题之阿里云物联网服务器断开如何解决
MQTT协议是一个轻量级的消息传输协议,设计用于物联网(IoT)环境中设备间的通信;本合集将详细阐述MQTT协议的基本原理、特性以及各种实际应用场景,供用户学习和参考。
169 1
|
1天前
|
传感器 负载均衡 网络协议
物联网协议之MQTT
物联网协议之MQTT
152 0
|
8月前
|
消息中间件 存储 网络协议
MQTT-轻量级的物联网消息传输协议
随着 5G 时代的来临,万物互联的伟大构想正在成为现实。联网的 物联网设备 在 2018 年已经达到了 70 亿,在未来两年,仅智能水电气表就将超过10亿。
243 0

相关产品

  • 物联网平台
  • http://www.vxiaotou.com