Android中使用MQTT协议通信

MQTT介绍

物联网 (IoT) 设备必须连接互联网。通过连接到互联网,设备就能相互协作,以及与后端服务协同工作。互联网的基础网络协议是 TCP/IP。MQTT(消息队列遥测传输) 是基于 TCP/IP 协议栈而构建的,已成为 IoT 通信的标准。

IoT通信

MQTT 最初由 IBM 于上世纪 90 年代晚期发明和开发。它最初的用途是将石油管道上的传感器与卫星相链接。顾名思义,它是一种支持在各方之间异步通信的消息协议。异步消息协议在空间和时间上将消息发送者与接收者分离,因此可以在不可靠的网络环境中进行扩展。虽然叫做消息队列遥测传输,但它与消息队列毫无关系,而是使用了一个发布和订阅的模型。

在使用物联网的过程中经常会遇到需要客户端(设备)被动接收服务器的消息的过程,这个过程使用HTTP协议要付出很大代价的,而 AMOP(高级消息队列协议) 使用异步消息总线来解决此类问题,除了AMOP 外 XMPP (可扩展通讯和表示协议)也是一种即时消息协议,但与MQTT相比XMPP在设备和网络上需要的资源要多得多。

由于物联网的环境是非常特别的,所以MQTT遵循以下设计原则:

  1. 精简,不添加可有可无的功能。
  2. 发布/订阅(Pub/Sub)模式,方便消息在传感器之间传递。
  3. 允许用户动态创建主题,零运维成本。
  4. 把传输量降到最低以提高传输效率。
  5. 把低带宽、高延迟、不稳定的网络等因素考虑在内。
  6. 支持连续的会话控制。
  7. 理解客户端计算能力可能很低。
  8. 提供服务质量管理。
  9. 假设数据不可知,不强求传输数据的类型与格式,保持灵活性。

MQTT协议应用场景

发布订阅模型

说到发布订阅模式,很多人可能会认为自己很了解,因为我们在实际编码中经常会用到观察者模式,但是事实上观察者模式和发布订阅模式是不同的。

在发布订阅模式里,发布者,并不会直接通知订阅者,换句话说,发布者和订阅者,彼此互不相识。它们通过第三者,也就是在消息队列里面,我们常说的经纪人Broker。

发布订阅模式模型图

发布者只需告诉Broker,我要发的消息,topic是AAA. 订阅者只需告诉Broker,我要订阅topic是AAA的消息.

当Broker收到发布者发过来消息,并且topic是AAA时,就会把消息推送给订阅了topic是AAA的订阅者。当然也有可能是订阅者自己过来拉取,看具体实现。也就是说,发布订阅模式里,发布者和订阅者,不是松耦合,而是完全解耦的。

主题(Topic)

上面的模型图中我们已经提到了主题(topic),服务端上有很多主题,客户端只需要选择性的去订阅,一个客户端可以向服务器订阅多个主题。而所谓的发布就是客户端对不同的主题进行发布信息。在主题里面有一些通配符如下:

通配符含义说明
/主题层级分隔符例如: aaa/bbb/ccc 一个层层递进的关系
+单层通配符单层通配符只能匹配一层主题。例如: aaa/+ 可以匹配 aaa/bbb ,但是不能匹配 aaa/bbb/ccc
#多层通配符多层通配符可以匹配于多层主题。比如: aaa/# 不但可以匹配 aaa/bbb,还可以匹配 aaa/bbb/ccc/ddd

报文(SUBSCRIBE)

MQTT协议通过交换预定义的MQTT控制报文来通信,控制报文由三部分组成。

  • 固定报头(所有控制报文都包含)
  • 可变报头(部分控制报文包含)
  • 有效载荷(部分控制报文包含)

固定报头(Fixed Header)

每个MQTT消息都必须有一个固定报头,固定报头的格式如下:

控制报文类型(Control Packet Type)

位置:第1个字节,二进制位7-4。

名字报文流动方向描述
Reserved0禁止保留
CONNECT1客户端到服务端客户端请求连接服务端
CONNACK2服务端到客户端连接报文确认
PUBLISH3两个方向都允许发布消息
PUBACK4两个方向都允许QoS 1消息发布收到确认
PUBREC5两个方向都允许发布收到(保证交付第一步)
PUBREL6两个方向都允许发布释放(保证交付第二步)
PUBCOMP7两个方向都允许QoS 2消息发布完成(保证交互第三步)
SUBSCRIBE8客户端到服务端客户端订阅请求
SUBACK9服务端到客户端订阅请求报文确认
UNSUBSCRIBE10客户端到服务端客户端取消订阅请求
UNSUBACK11服务端到客户端取消订阅报文确认
PINGREQ12客户端到服务端心跳请求
PINGRESP13服务端到客户端心跳响应
DISCONNECT14客户端到服务端客户端断开连接
Reserved15禁止保留

标志(Flags)

位置:第1个字节,二进制位3-0。

Control PacketFixed header flagsBit 3Bit 2Bit 1Bit 0
CONNECTReserved0000
CONNACKReserved0000
PUBLISHUsed in MQTT 3.1.1DUP1QoS2QoS2RETAIN3
PUBACKReserved0000
PUBRECReserved0000
PUBRELReserved0010
PUBCOMPReserved0000
SUBSCRIBEReserved0010
SUBACKReserved0000
UNSUBSCRIBEReserved0010
UNSUBACKReserved0000
PINGREQReserved0000
PINGRESPReserved0000
DISCONNECTReserved0000
  • DUP1:控制报文的重复分发标志
  • QoS2:PUBLISH报文的服务质量等级
  • RETAIN3:PUBLISH报文的保留标志
DUP

位置:第1个字节,第3位。

如果DUP标志被设置为0,表示这是客户端或服务端第一次请求发送这个PUBLISH报文。如果DUP标志被设置为1,表示这可能是一个早前报文请求的重发。客户端或服务端请求重发一个PUBLISH报文时,必须将DUP标志设置为1 [MQTT-3.3.1.-1].。对于QoS 0的消息,DUP标志必须设置为0 [MQTT-3.3.1-2]。

QoS

位置:第1个字节,第2-1位。

这个字段表示应用消息分发的服务质量等级保证。服务质量等级对应含义如下:

QoS valueBit 2bit 1Description
000最多分发一次
101至少分发一次
210只分发一次
-11保留位

剩余长度(Remanining Length)

位置:从第2个字节开始。

剩余长度(Remaining Length)表示当前报文剩余部分的字节数,包括可变报头和负载的数据。剩余长度不包括用于编码剩余长度字段本身的字节数。

剩余长度字段使用一个变长度编码方案,对小于128的值它使用单字节编码。更大的值按下面的方式处理。低7位有效位用于编码数据,最高有效位用于指示是否有更多的字节,且按照大端方式进行编码。因此每个字节可以编码128个数值和一个延续位(continuation bit)。剩余长度字段最大4个字节。

DigitsFromTo
10 (0x00)127 (0x7F)
2128 (0x80, 0x01)16 383 (0xFF, 0x7F)
316 384 (0x80, 0x80, 0x01)2 097 151 (0xFF, 0xFF, 0x7F)
42 097 152 (0x80, 0x80, 0x80, 0x01)268 435 455 (0xFF, 0xFF, 0xFF, 0x7F)

例如:剩余长度数据为0x40(十进制的64), 则只占一字节,二进制形式应该是 0100 0000,如果是0x7f(十进制的127)则也只占一个字节,二进制形式为 0111 1111

而如果是0x80(十进制的128)则需要占两个字节,二进制形式为 0000 0001(第3个字节) 1000 0000 (第2个字节), 第2个字节的最高位是延续位,表示此时允许发送的最大数据是128字节。

所以四个字节 127 x 127 x 127 x 127 / 1024 / 1024 = 248M 最大允许发送248M的数据。

可变报头(Variable Header)

某些MQTT控制报文包含一个可变报头部分。它在固定报头和负载之间。可变报头的内容根据报文类型的不同而不同。说白了就是对相同的控制报文指定一个唯一的标识来区分它们。

下面列出了那些控制报文需要报文标识符:

控制报文报文标识符
CONNECTNO
CONNACKNO
PUBLISHYES (If QoS > 0)
PUBACKYES
PUBRECYES
PUBRELYES
PUBCOMPYES
SUBSCRIBEYES
SUBACKYES
UNSUBSCRIBEYES
UNSUBACKYES
PINGREQNO
PINGRESPNO
DISCONNECTNO

需要注意的是客户端和服务端各自独立维护自己的报文标识符,如果是消息重发则要保证是相同的报文标识符。

有效载荷(Payload)

某些MQTT控制报文在报文的最后部分包含一个有效载荷。对于PUBLISH(发布消息)来说有效载荷就是应用的自定义消息,下面列出了需要有效载荷的控制报文。

控制报文有效载荷
CONNECTRequired
CONNACKNone
PUBLISHOptional
PUBACKNone
PUBRECNone
PUBRELNone
PUBCOMPNone
SUBSCRIBERequired
SUBACKRequired
UNSUBSCRIBERequired
UNSUBACKNone
PINGREQNone
PINGRESPNone
DISCONNECTNone

Android中使用

在android中可以使用Github上的一个开源库: https://github.com/eclipse/paho.mqtt.android

1
2
3
4
5
6
7
8
9
10
11
repositories {
maven {
url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
}
}


dependencies {
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}

设置连接参数配置:

1
2
3
4
5
6
7
8
9
10
11
12
// 连接参数
mConnectOpt = new MqttConnectOptions();
// 清除缓存
mConnectOpt.setCleanSession(true);
// 设置超时时间,单位:秒
mConnectOpt.setConnectionTimeout(10);
// 心跳包发送间隔,单位:秒
mConnectOpt.setKeepAliveInterval(20);
// 用户名
mConnectOpt.setUserName(mUsername);
// 密码
mConnectOpt.setPassword(mPassword.toCharArray());

连接到服务器:

1
2
3
4
5
String tmpDir = System.getProperty("java.io.tmpdir");
MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
mClient = new MqttClient("tcp://192.168.1.1:1111", "唯一标识", dataStore);
mClient.setCallback(mMqttCallback); // 设置回调
mClient.connect(mConnectOpt); //传入配置

断开连接:

1
mClient.disconnect();

订阅和取消订阅主题:

1
2
3
4
5
6
7
8
9
10
11
try {
//订阅
mClient.subscribe(topics, qos);

//......

//取消订阅
mClient.unsubscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}

发布消息:

1
2
3
4
5
6
7
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(qos);
try {
mClient.publish(topic, mqttMessage);
} catch (MqttException e) {
mMqttCallback.connectionLost(e);
}

评论

Ajax Android AndroidStudio Animation Anroid Studio AppBarLayout Babel Banner Buffer Bulma ByteBuffer C++ C11 C89 C99 CDN CMYK COM1 COM2 CSS Camera Raw, 直方图 Chrome ContentProvider CoordinatorLayout C语言 DML DOM Dagger Dagger2 Darktable Demo Document DownloadManage ES2015 ESLint Element Error Exception Extensions File FileProvider Flow Fresco GCC Git GitHub GitLab Gradle Groovy HTML5 Handler HandlerThread Hexo Hybrid I/O IDEA IO ImageMagick IntelliJ Intellij Interpolator JCenter JNI JS Java JavaScript JsBridge Kotlin Lab Lambda Lifecycle Lint Linux Looper MQTT MVC MVP Maven MessageQueue Modbus Momentum MySQL NDK NIO NexT Next Nodejs ObjectAnimator Oracle VM Permission PhotoShop Physics Python RGB RS-232 RTU Remote-SSH Retrofit Runnable RxAndroid RxJava SE0 SSH Spring SpringBoot Statubar Task Theme Thread Tkinter UI UIKit UML VM virtualBox VS Code VUE ValueAnimator ViewPropertyAnimator Vue Web Web前端 Workbench api apk bookmark by关键字 compileOnly css c语言 databases demo hexo hotfix html iOS icarus implementation init jQuery javascript launchModel logo merge mvp offset photos pug query rxjava2 scss servlet shell svg tkinter tomcat transition unicode utf-8 vector virtual box vscode 七牛 下载 中介者模式 串口 临潼石榴 主题 书签 事件 享元模式 仓库 代理模式 位运算 依赖注入 修改,tables 光和色 内存 内核 内部分享 函数 函数式编程 分支 分析 创建 删除 动画 单例模式 压缩图片 发布 可空性 合并 同向性 后期 启动模式 命令 命令模式 响应式 响应式编程 图层 图床 图片压缩 图片处理 图片轮播 地球 域名 基础 增加 备忘录模式 外观模式 多线程 大爆炸 天气APP 太白山 头文件 奇点 字符串 字符集 存储引擎 宇宙 宏定义 实践 属性 属性动画 岐山擀面皮 岐山肉臊子 岐山香醋 工具 工厂模式 年终总结 开发技巧 异常 弱引用 恒星 打包 技巧 指针 插件 摄影 操作系统 攻略 故事 数据库 数据类型 数组 文件 新功能 旅行 旋转木马 时序图 时空 时间简史 曲线 杂谈 权限 枚举 架构 查询 标准库 标签选择器 样式 核心 框架 案例 桥接模式 检测工具 模块化 模板引擎 模板方法模式 油泼辣子 泛型 洛川苹果 浅色状态栏 源码 源码分析 瀑布流 热修复 版本 版本控制 状态栏 状态模式 生活 留言板 相册 相对论 眉县猕猴桃 知识点 码云 磁盘 科学 笔记 策略模式 类图 系统,发行版, GNU 索引 组件 组合模式 结构 结构体 编码 网易云信 网格布局 网站广播 网站通知 网络 美化 联合 膨胀的宇宙 自定义 自定义View 自定义插件 蒙版 虚拟 虚拟机 补码 补齐 表单 表达式 装饰模式 西安 观察者模式 规范 视图 视频 解耦器模式 设计 设计原则 设计模式 访问者模式 语法 责任链模式 贪吃蛇 转换 软件工程 软引用 运算符 迭代子模式 适配器模式 选择器 通信 通道 配置 链表 锐化 错误 键盘 闭包 降噪 陕西地方特产 面向对象 项目优化 项目构建 黑洞
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×