前言
通过了解架构及技术栈,初步认识ThingsBoard
架构
通过官方文档可知ThingsBoard
有两种架构模式
- monolithic architecture
单体架构,将所有的内容聚合在单个应用中
- Microservices architecture
微服务架构,包括Core(核心)、Transport(传输)、Rule Engine(规则引擎)、JavaScript Executor(JavaScript
执行器)、Web UI(界面)等多个服务,服务之间使用消息队列通信
技术栈
框架相关
Spring
作为平台的整体框架Spring Boot
Spring
的引导,提高了开发效率Maven
提供了包的依赖关系管理
通信相关
protobuf
(Protocol Buffers)
Google推出的一种数据描述语言,用于定义与语言无关的数据结构
可根据具体的语言动态生成对应的数据结构,后缀为 .protosyntax = "proto3";package msgqueue;option java_package = "org.thingsboard.server.common.msg.gen";option java_outer_classname = "MsgProtos";// Stores message metadata as map of stringsmessage TbMsgMetaDataProto {map data = 1;}// Stores stack of nested (caller) rule chainsmessage TbMsgProcessingStackItemProto {int64 ruleChainIdMSB = 1;int64 ruleChainIdLSB = 2;int64 ruleNodeIdMSB = 3;int64 ruleNodeIdLSB = 4;}message TbMsgProcessingCtxProto {int32 ruleNodeExecCounter = 1;repeated TbMsgProcessingStackItemProto stack = 2;}message TbMsgProto {string id = 1;string type = 2;string entityType = 3;int64 entityIdMSB = 4;int64 entityIdLSB = 5;int64 ruleChainIdMSB = 6;int64 ruleChainIdLSB = 7;int64 ruleNodeIdMSB = 8;int64 ruleNodeIdLSB = 9;int64 clusterPartition = 10;TbMsgMetaDataProto metaData = 11;// Transaction Data (12) was removed in 2.5int32 dataType = 13;string data = 14;int64 ts = 15;// Will be removed in 3.4. Moved to processing contextint32 ruleNodeExecCounter = 16;int64 customerIdMSB = 17;int64 customerIdLSB = 18;TbMsgProcessingCtxProto ctx = 19;}
如上使用
proto3
协议定义了org.thingsboard.server.common.msg.gen.MsgProtos
类
用于传输数据的结构定义HTTP
(Hyper Text Transfer Protocol)
超文本传输协议,最常见的数据传输协议
用于设备和ThingsBoard
间的数据交互,以及用户与ThingsBoard
间的操作交互等MQTT
(Message Queuing Telemetry Transport)
基于发布/订阅模式的协议,支持三种质量等级,广泛应用于物联网
用于设备和
ThingsBoard
间的数据交互CoAP
(Constrained Application Protocol)
基于UDP
的REST
风格协议,相较于HTTP
更加轻量级
虽然是UDP
,但通过消息类型支持消息的可靠传输
4种消息类型如下:- CON
需要被确认的请求,接收方必须响应,用以消息的可靠传输 - NON
无需被确认的请求,接收方不必响应 - ACK
确认消息,表示接收方收到了CON消息 - RST
复位消息,表示接收方收到的CON消息异常
用于设备和
ThingsBoard
间的数据交互- CON
LwM2M
(Lightweight Machine to Machine)
是CoAP
的上层协议,基于对象/资源模型进行交互,对象是资源的集合,需要实例化后使用
主要组成部分:- LwM2M Server
服务端,接收客户端的注册,与客户端进行数据交互 - LwM2M Client
客户端,注册至服务端后,才可通信,向服务端上报数据,响应服务端请求 - LwM2M Bootstrap Server
引导服务端,用于向客户端提供通信相关配置(服务端地址), 根据引导方式可省略 - SmartCard
智能卡,作用同LwM2M Bootstrap Server
用于设备和
ThingsBoard
间的数据交互- LwM2M Server
SNMP
(Simple Network Management Protocol)
基于UDP
的网络管理协议,采用特殊的客户机/服务器模式进行通信
用于设备和
ThingsBoard
间的数据交互Netty
一个基于JAVA NIO
的高性能的、异步事件驱动的通信框架,可由开发者自定义传输协议
主要用于实现
ThingsBoard
中MQTT
服务端与客户端gRPC
(google Remote Procedure Call)
google
推出的基于HTTP/2
远程调用框架
HTTP/2
相较于HTTP/1.x
主要有如下优势:- 二进制格式传输
相较于文本格式,体积更小,性能更高 - 多路复用
通过将数据包分为HEADERS帧和DATA帧,实现一个连接并发多个请求 - 服务端推送
在客户端请求前主动发送数据,对于一个客户端请求可响应多次
主要用于设备和
ThingsBoard
间的数据交互- 二进制格式传输
Azure Service Bus
微软在Azure上提供的一种云消息服务,和RabbitMQ
、KafKa
一样作为消息通信服务Pubsub
(Google Cloud Pub/Sub)
一种具有传递和接受消息的事件驱动以及流分析系统,跟KafKa
比较相似SQS
(Amazon Simple Queue Service)
一个分布式的消息队列服务
提供了两种队列:- 标准
近乎无限的吞吐量,至少传递一次消息,尽量保证顺序传递 - FIFO(先进先出)
高吞吐量,仅传递一次消息,严格保证消息的顺序
- 标准
Kafka
一种高吞吐量、持久性、分布式的发布订阅的消息队列系统RabbitMQ
一个由erlang开发的AMQP
(Advanced Message Queue高级消息队列协议)的开源实现,性能较好Memory
ThingsBoard
实现的基于内存的消息队列
数据存储相关
PostgreSQL
免费的开源关系型数据库
相较于MySQL
:- SQL的标准实现上更完善,功能实现更严谨
- 优化器功能更完整,支持更多类型的索引,复杂查询能力更强
- 数据使用堆表存放,存储量较大
- 主备为物理复制,数据一致性更可靠,性能更高
- 分区个数达至千万时,处理能力较差
MVCC
(Multi-Version Concurrency Control)基于新旧数据一同管理模式,需要定期VACUUM
清理旧数据,存在额外的消耗
用于存储非遥测数据,根据存储模式也可存储遥测数据
Cassandra
由Facebook
开发的、用于大数据的、开源分布式的NoSQL存储系统
具有以如下特性:- 高可扩展性
通过增加集群中的节点数量即可增加吞吐量 - 灵活的数据结构
可存储结构化、半结构化及非结构化等多种数据类型 - 便捷的数据分发
分布式架构支持多个数据中心间的数据复制及分发 - 高可靠性
不依赖外部组件(如ZooKeeper
)的对等分布式架构,数据分布在集群中的所有节点间,无中心节点,无单点故障
提供了类似SQL的COL语句
用于存储遥测数据
- 高可扩展性
其他
Actor
一种分布式并发编程模式,旨在将资源私有化在Actor
模型中,Actor
模型间通过消息队列通信,异步串行地处理消息,以避免多线程对于共享资源的竞争
Actor
模型由三部分组成:- state(状态)
内部私有地属性,可以理解为资源 - behavior(行为)
处理state逻辑,可以理解为方法 - MailBox(邮箱)
即接收消息的队列,用于存储接收到的消息并在空闲时处理
- state(状态)
caffeine
一款高性能的本地缓存组件,官方文档
主要提供了4种缓存添加策略:- 手动加载
Cache<Key, Graph> cache = Caffeine.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).maximumSize(10_000).build();// 查找一个缓存元素, 没有查找到的时候返回nullGraph graph = cache.getIfPresent(key);// 查找缓存,如果缓存不存在则生成缓存元素,如果无法生成则返回nullgraph = cache.get(key, k -> createExpensiveGraph(key));// 添加或者更新一个缓存元素cache.put(key, graph);// 移除一个缓存元素cache.invalidate(key);
- 自动加载
LoadingCache<Key, Graph> cache = Caffeine.newBuilder().maximumSize(10_000).expireAfterWrite(10, TimeUnit.MINUTES).build(key -> createExpensiveGraph(key));// 查找缓存,如果缓存不存在则生成缓存元素,如果无法生成则返回nullGraph graph = cache.get(key);// 批量查找缓存,如果缓存不存在则生成缓存元素Map<Key, Graph> graphs = cache.getAll(keys);
- 手动异步加载
AsyncCache<Key, Graph> cache = Caffeine.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).maximumSize(10_000).buildAsync();// 查找一个缓存元素, 没有查找到的时候返回nullCompletableFuture<Graph> graph = cache.getIfPresent(key);// 查找缓存元素,如果不存在,则异步生成graph = cache.get(key, k -> createExpensiveGraph(key));// 添加或者更新一个缓存元素cache.put(key, graph);// 移除一个缓存元素cache.synchronous().invalidate(key);
- 自动异步加载
AsyncLoadingCache<Key, Graph> cache = Caffeine.newBuilder().maximumSize(10_000).expireAfterWrite(10, TimeUnit.MINUTES)// 你可以选择: 去异步的封装一段同步操作来生成缓存元素.buildAsync(key -> createExpensiveGraph(key));// 你也可以选择: 构建一个异步缓存元素操作并返回一个future.buildAsync((key, executor) -> createExpensiveGraphAsync(key, executor));// 查找缓存元素,如果其不存在,将会异步进行生成CompletableFuture<Graph> graph = cache.get(key);// 批量查找缓存元素,如果其不存在,将会异步进行生成CompletableFuture<Map<Key, Graph>> graphs = cache.getAll(keys);
- 手动加载
Redis
(Remote Dictionary Server)
Key-Value存储系统,由于对数据的操作均在内存中执行,性能高效Node.js
一个跨平台的JavaScript
运行环境,实现了JavaScript
在服务端的应用AntiSamy
是OWASP的一个开源项目,分为Java
和.Net
版
通过对用户输入的内容进行检查,根据策略过滤非法字符,确保输入的安全性
被广泛应用于Web服务对存储型和反射型XSS的防御中Guava
Google对JavaAPI补充的开源库,为了方便编码,并减少编码错误
用于提供集合,缓存,支持原语句,并发性,常见注解,字符串处理,I/O和验证的实用方法ZooKeeper
分布式协调服务,常用于集群管理、分布式锁等