本文讲解4.2版jxTMS的数据源的API,整个系列的文章请查看:docker版jxTMS使用指南:4.2版升级内容
docker版本的使用,请参考docker版jxTMS使用指南
4.0版jxTMS的说明,请查看:4.0版升级内容
使用数据源前需导入:
from jx.dataSource import dataSource
其构造函数为:
def __init__(self, informDual=None, afterPullDual=None):#informDual:支流向分发者回送消息通知#afterPullDual:pull到数据后,对数据进行后处理,如数据类型转换、倍率缩放等
informDual其实就是一种通知机制,是接收端向数据源的分发点回送通知机制。
jxTMS接口机的数据流动是:
前端数据->站点->设备
即站点获取数据【拉或推】,然后分发给相应的设备。一般情况下,数据处理主要由设备来完成,即设备根据自己的特点与需要来处理、保存、加工、转换数据等。
但业务可能也需要在站点处来处理数据,如电力系统监控,监控方关心的是母线电压是否失电、蓄电池储能是否危险【容量过低会损害电池】、柴油发电机是否正常工作等高层次的系统正常与否的关键性指标上。而这些信息分别来自配电、储能、柴油发电机等多个设备。所以由站点统一提供在业务处理上较为便利。
按之前所介绍的设备获取方式【即站点通过mqtt接收前端采集器推送过来的数据】,这是非常简单的。但当站点使用了数据源之后就麻烦了:现在是由数据源直接拉取数据然后分发,站点被从数据流中旁路了!
针对这个问题,数据源提供了端点回送通知的机制,在需要时,端点就可以利用informDual向站点回送数据、发送通知、下达控制命令【如拉取的数据太多,端点处理不过来,就可以利用此机制来实现流量控制】等。
informDual的函数签名是:
def _siteDataDual(self, dn, data):#dn:设备名#data:设备接收到的数据,一般为dict类型,具体由使用数据源的站点和各分支端点自行约定
dataSource的类函数
register(cls, type, dual)
注册一个指定类型的数据源创建函数
参数:type:新增的数据源类型dual:新建一个该类型数据源的函数
New(cls, type, *args, **kw)
创建一个指定类型的数据源
参数:type:欲创建的数据源类型*args、**kw:创建该类型数据源的函数所要求的参数
dataSource的对象函数
Debug(self, debug=None)
指定数据源是否工作于调试状态。当数据源处于调试状态时,会额外输出大量的过程性信息以帮助开发者跟踪数据处理过程。由于信息量太大,所以调试完毕应予以关闭。
参数:debug:True【打开】或False【关闭】,如果不给出则返回当前的调试状态
Check(self, check=None)
指定数据源接收到数据后是否执行校验。如果指定执行校验,则数据校验在后处理后执行。
参数:check:True【打开】或False【关闭】,如果不给出则返回当前的校验状态
Test(self, test=None)
指定数据源是否工作于测试状态。当数据源处于测试状态时,数据源并不实际执行拉取数据的动作,而是调用各数据点在定义时所给出的产生式来自动产生数据。
参数:test:True【打开】或False【关闭】,如果不给出则返回当前的测试状态
setPullMode(self, delayExec_seconds=0, taskInterval_milliseconds=500, interval_seconds=60)
设置数据源的拉取模式
参数:delayExec_seconds:延时多少秒后再设置taskInterval_milliseconds:各数据点拉取时的等待间隔interval_seconds:拉取周期秒数,即每轮拉取的间隔秒数返回值:无说明:系统在启动时会对各站点、设备的拉取、轮询、数据保存等统一进行设置,但这会导致大量任务同一时间集中执行,所以一般都延时一个随机的时间后执行以分散处理压力
addWantReceive(self, t, productStatement=None, compareStatement=None)
添加一个待拉取的数据点
参数:t:一个拉取函数所需要的各种拉取参数的元组,由使用者根据拉取函数所需自行给出productStatement:数据产生式compareStatement:数据校验式返回值:无说明:参数元组的第一个参数必须是设备名,第二个参数必须是该数据点对应的变量名
addWantReceiveOver(self, dn, receiveFunc)
在一个设备所有的待拉取的数据点添加完毕后,增加一个拉取结束标记。jxTMS在遇到该标记后,将之前所拉取到的该设备的数据一起分发到receiveFunc中
参数:dn:设备名,应是之前addWantReceive函数添加参数元组时的第一个参数receiveFunc:该设备的接收函数,一般就是该函数的receive函数返回值:数据源初始化时送入的informDual,以便于分支反向馈送数据示例:#某设备的数据点定义self._informMain = ds.addWantReceiveOver(self.name(),self.receive)
clearWandQueue(self)
当拉取出现问题时,如modbus/TCP的网络中断,可以直接调用本函数将后继的拉取全部取消。
注:本函数清除的是本轮拉取的等待队列,但当interval_seconds秒后,还会执行新一轮的拉取,相当于自动尝试是否可以重新拉取
onCheckFail(self, dn, vn, v)
当打开了数据校验,某个数据点校验失败后会触发本事件
参数:dn:数据点定义时指定的设备名vn:数据点定义时指定的变量名v:数据点拉取并进行过后处理的数据
afterPull(self, t, v)
拉取到数据后,首先会执行本函数进行后处理。如果在初始化数据源时给出了afterPullDual,就会直接调用afterPullDual,否则直接返回送入的数据
参数:t:数据点定义时送入的数据点参数元组v:数据点拉取到的数据说明:afterPullDual的函数签名和afterPull相同
pull(self, t)
实际的拉取函数
参数:t:数据点定义时送入的数据点参数元组返回值:默认直接返回一个None说明:开发者自己实现的数据源必须重写本函数,否则当然拉取不到数据
onPullError(self, dn, vn)
拉取数据失败时触发本事件
参数:dn:数据点定义时指定的设备名vn:数据点定义时指定的变量名
receive(self, msg)
数据源接收到数据
参数:msg:数据源接收到的数据说明:拉取模式下,开发者不需关心本函数,数据源在拉取到数据后会自动调用该函数进行数据分发
注:数据源主要应工作在拉取模式下,但有可能业务需要统一业务数据处理框架,不管什么样的数据获取方式,都通过数据源进行数据分发。则推送模式时就可以通过本函数来灌入数据
extractSub(self, msg)
拉取模式时,receive函数收到的其实就已经是各设备的数据了,但什么情况都有可能发生,比如一条消息打包了多台设备的数据;又比如多条消息必须合并后才能解析出一个数据,如船用ais,在一条信息过长时,就会拆分成多条消息发送。
所以,为了兼容各种情况,数据源在接收到数据后,首先调用extractSub函数将其拆分为设备数据数组,然后再进行分发
参数:msg:数据源接收到的数据,拉取模式下,就是receive函数接收到的消息,如果用户自己重写了receive函数,则整个数据分发机制就由开发者自行定义了,也完全没必要再调用extractSub函数返回值:默认返回:[ msg ]说明:拉取模式下,receive函数收到的msg是一个二元的元组:(dn,data)。由于数据源自身的分发机制是对数组形式的元组进行分发,所以缺省的extractSub函数就是将该元组封装到list中即可正常分发
addSub(self, subName, subReceiveFunc)
手动添加一个分支端点。
参数:subName:分支名,接口机中一般是设备名subReceiveFunc:分支数据接收函数,接口机中一般是设备对象的receive函数返回值:数据源初始化时送入的informDual,以便于分支反向馈送数据说明:拉取模式下,addSub函数会被addWantReceiveOver函数自动调用,所以本函数主要用于其它情况下的统一数据处理框架
参考资料:
jxTMS设计思想
jxTMS编程手册
下面的系列文章讲述了如何用jxTMS开发一个实用的业务功能:
如何用jxTMS开发一个功能
下面的系列文章讲述了jxTMS的一些基本开发能力:
jxTMS的HelloWorld