我们会预先确定好:channel的数量,factor 数量,切分主键,每个taskGroup的最大channel数量

engine是一个主线程,会根据reader设置切分的规则对读取任务进行切分成多个task,writertask的数量会和reader task数量保持一致,我们姑且理解这个为 task对。

然后他会根据我们确定好的channel数量去确定taskGroup的数量,例如 taskGroup的实际数量 = channel数/单个task的最大channel数。

再确定每个task的channel数量,每个task的channel数量 = channel数/ taskGroup的实际数量。

再把task对平均分配到这些taskGroup的队列里面去,启动taskGroup线程,从task队列中取出task对,启动reader线程,启动writer线程,两个线程通过channel管道进行数据传输速度限制。因为channel的数量可能小于task对的数量,channel不够时,其他task对就会等待,等在执行的task对执行完了,再运行。

例如:我们设定 6 channel,factor = 2,那么总task对的数量就是 6*2+1 = 13,有13对reader和writer,我们设置taskGroup的channel最大数量为5,那么taskGroup的数量就是 6/5=2,每个taskGroup有 3个channel,1个taskGroup有6个task对,一个taskGroup有7个task对。

二、全局core配置和每个job的配置

core.json

{"entry": {"jvm": "-Xms1G -Xmx1G","environment": {}},"common": {"column": {"datetimeFormat": "yyyy-MM-dd HH:mm:ss",## 这些都是时间格式化参数,用来做时间格式化的,这些不用管"timeFormat": "HH:mm:ss","dateFormat": "yyyy-MM-dd","extraFormats":["yyyyMMdd"],"timeZone": "GMT+8","encoding": "utf-8"}},"core": {"dataXServer": {"address": "http://localhost:7001/api","timeout": 10000,"reportDataxLog": false,"reportPerfLog": false},"transport": {"channel": {"class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",#指定用哪个channel,目前只有MemmoryChannel可以用"speed": {"byte": -1, # 当设置成 -1的时候表示,不支持byte限速模式,如果要使用则必须让此参数>0"record": -1# 当设置成 -1的时候表示,不支持record限速模式,如果要使用则必须让此参数>0},"flowControlInterval": 20, # 20ms,表示当现在的速度比预期速度快了20ms就要调整速度"capacity": 512,# channel的阻塞队列最大record数"byteCapacity": 67108864 # channel的阻塞队列最大byte数},"exchanger": {"class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",# buffer的Class"bufferSize": 32 # buffer的大小,每次都是一个buffer一个buffer的往阻塞队列里面推数据的}},"container": {"job": {"reportInterval": 10000},"taskGroup": {"channel": 5 # 每个taskGroup的最大channel数},"trace": {"enable": "false"}},"statistics": {# 统计类,用于输出统计结果的,也一般不用管"collector": {"plugin": {"taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector","maxDirtyNumber": 10}}}}}
job.json
{"job": {"content": [{"reader": {"name": "mysqlreader", #设置reader名称"parameter": {"column": ["id","machine_id", "devname", "shop_id", "shop_name", "province", "city", "city_level", "region", "agent_id", "agent_name", "second_agent_id", "second_agent_name", "shop_type", "sell_date"], # 插入那些字段"connection": [{"jdbcUrl": ["jdbc:mysql://47.112.238.105:8003/data_recommend" />

三、rdbms通过设置主键来切分

mysql的reader和clickhouse的reader都是基于rdbmsUtils来执行的,主键只支持两种类型,一种是整型,一种是String类型(目前只支持asii码,别的不支持)

1、整型,会去获得其最大最小值,然后等分成 channel * splitFactor的个数,再补一个 就是 主键为空的情况,也就是 channel * splitFactor+1个

2、字符串类型,先去获取其字符串的最大值和最小值,然后将最大值最小值,转换为128进制的bigInt,再将bigInt切分成多个bigInt数字,再把128进制的bigInt转换为字符串,相当于将这个字符串等分,最后得到的也是 channel * splitFactor+1个task对

四、task,task对个数,taskGroup,taskGroup的channel分配和计算

1、我们手动设置needChannel和factor,factor默认是5

2、task对的个数 = needChannel * splitFactor+ 1

3、一个reader对应一个writer

4、core.container.taskGroup.channel = 5 意味着,一个taskGroup应该有5个channel

5、程序会根据task的个数算出需要多少个taskGroup,再根据taskGroup算出每个taskGroup里面需要有多少个channel。

例如:

我设置了6个channel,factor 为 2,那么我就会生成 14个task,其中7对reader和writer,一个reader和writer共用一个channel。

1个taskGroup是5个channel,那么6个channel就需要2个taskGroup,每个taskGroup平均分配3个channel。

五、线程的对应关系

一个jobContainer线程,多个taskGroup线程,每个taskGroup线程里面又会启动channel数量这么多的writer和reader线程。

六、数据的传输

reader参数:fetchSize,这个参数会决定让jdbc从数据库取数据的数据每次只取fetchSize个数据,当result.next消费完了,再继续fetch下一个批次

writer参数:batchSize和batchByteSize,batchSize决定了一个批次只batchInsert这么多条数据,或者batchInsert的数据量>batchByteSize的时候就insert,决定了一次insert的数据量

一个task对,reader里面有一个BufferedRecordExchanger,writer里面有一个BufferedRecordExchanger,这两个BufferedRecordExchanger共用一个MemmeryChannel,MemeryChannel里面是阻塞队列,buffer是普通的ArrayList。

buffer的参数:

bufferSize = core.transport.exchanger.bufferSize 最大记录数,默认是32个

channel的参数:

channelByteCapacity = core.transport.channel.byteCapacity 最大字节数,默认是67108864,64k

channelSize = core.transport.channel.capacity 最大记录数,默认是512个

交互:

reader先读数据,往buffer里面放,当buffer满了,或者buffer的字节数>channelByteCapacity ,就把buffer的数据刷到channel里面去,如果channel的有数据了,那么就会唤醒writertask去消费,当channel满了,则reader休息。

writer线程每次都会把他自己的buffer填满,然后去插入数据,满batchSize或者batchByteSize就插入一次,满batchSize或者batchByteSize就插入一次,当阻塞队列的数据被消费了,就又会通知reader channel不为空,可以生产数据,唤醒reader线程,如果channel为空,则writer休息。如此循环往复

七、数据速度控制和channel数量的指定

datax里面有3种限速模式:

byte限制模式:

globalLimitedByteSpeed = job.setting.speed.byte 总的byte限制

channelLimitedByteSpeed = core.transport.channel.speed.byte 每个channel的byte限制

needChannelNumberByByte = globalLimitedByteSpeed / channelLimitedByteSpeed

在这个模式下,我们手动指定的channel不生效,通过needChannelNumberByByte 来计算出需要的channel数,如果设置了这个globalLimitedByteSpeed ,则必须设置 channelLimitedByteSpeed ,不然会异常

record限制模式:

globalLimitedRecordSpeed = job.setting.speed.record 总的record限制

channelLimitedRecordSpeed = job.setting.speed.record 每个channel的record的限制

needChannelNumberByRecord = globalLimitedRecordSpeed / channelLimitedRecordSpeed

在这个模式下,我们手动指定的channel不生效,通过needChannelNumberByByte 来计算出需要的channel数,如果设置了job.setting.speed.record则channelLimitedRecordSpeed则必须设置,不然会异常

如果 byte模式和record都设置了,那么channel就会取里面的最小值,

channel模式:

如果前面两个都没有设置,并且手动指定了 job.setting.speed.channel,那么channel模式生效,channel数 = job.setting.speed.channel,channel模式不限速

byte限速实现原理:

如果速度太快则需要限速,如果速度慢则不管

push某次buffer的数据前,获取当前已经push的总数据量和等待时间,speed = pushByte / watiTime ,得到当前的速度,如果速度 > 一个阈值core.transport.channel.flowControlInterval,那么就要去调整速度,通过reader线程休眠几秒来降低,推送速度。

休眠的秒数 = (上一次push时间 - 这次push的时间) * speed / 应该的speed - (上一次push时间 - 这次push的时间)

而record限速同理,如果两个模式都配置了,则取休眠时间的最大值。

休眠一下再继续推数据,直到速度和预期速度一样