简介
chunjun是一款基于flink的开源数据同步工具,官方文档,其提供了很多flink官方未提供的插件供大家来使用,特别是达梦插件在国产化环境中很方便!
本次介绍的是chunjun中的一款http插件,通过该插件可以实现基于http请求的流处理,但是目前官方提供的http插件在以SQL模式运行的时候是有一些问题的,所以我花了些时间将问题排查修复下,并且添加了一个分页的新功能。下面是具体的过程。
问题
按照官方文档使用http插件运行的时候,会报下面的错误
java.lang.RuntimeException: request data error,msg is prevResponse value is exception java.lang.RuntimeException: key data.id on {msg=请求成功, total=0, code=0000, data=[{name=第0臭桑, id=0}, {name=第1臭桑, id=1}], timestamp=2023-02-12 16:39:12} is not a json at com.dtstack.chunjun.util.MapUtil.getValueByKey(MapUtil.java:161)at com.dtstack.chunjun.connector.http.client.ResponseParse.buildResponseByKey(ResponseParse.java:63)at com.dtstack.chunjun.connector.http.client.JsonResponseParse.next(JsonResponseParse.java:95)at com.dtstack.chunjun.connector.http.client.HttpClient.doExecute(HttpClient.java:272)at com.dtstack.chunjun.connector.http.client.HttpClient.execute(HttpClient.java:184)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:750)at com.dtstack.chunjun.connector.http.inputformat.HttpInputFormat.nextRecordInternal(HttpInputFormat.java:118)at com.dtstack.chunjun.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:198)at com.dtstack.chunjun.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:68)at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:133)at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)
解决方案【修改源码】修改HttpOptions
添加两个配置
// 第一个是配置是数据主体,一般http请求都是标准的统一返回值,有状态码 状态信息 数据主体,我们需要的数据都在数据主体里面的public static final ConfigOption DATA_SUBJECT = ConfigOptions.key("dataSubject") .stringType() .defaultValue("${data}") .withDescription("response data subject");// 这个配置是发送http请求的周期,如果设置2的话 就会重复请求两次的 如果是-1则会一直重复请求 public static final ConfigOption CYCLES = ConfigOptions.key("cycles") .longType() .defaultValue(1L) .withDescription("request cycle");
修改HttpDynamicTableFactory
@Override public Set<ConfigOption> optionalOptions() { Set<ConfigOption> options = new HashSet(); options.add(HttpOptions.DECODE); options.add(HttpOptions.METHOD); options.add(HttpOptions.HEADER); options.add(HttpOptions.BODY); options.add(HttpOptions.PARAMS); options.add(HttpOptions.INTERVALTIME); options.add(HttpOptions.COLUMN); options.add(HttpOptions.DELAY); // 下面这俩是对应了with参数 options.add(HttpOptions.DATA_SUBJECT); options.add(HttpOptions.CYCLES); return options; }
private HttpRestConfig getRestapiConf(ReadableConfig config) { Gson gson = GsonUtil.setTypeAdapter(new Gson()); HttpRestConfig httpRestConfig = new HttpRestConfig(); httpRestConfig.setIntervalTime(config.get(HttpOptions.INTERVALTIME)); httpRestConfig.setUrl(config.get(HttpOptions.URL)); httpRestConfig.setDecode(config.get(HttpOptions.DECODE)); httpRestConfig.setRequestMode(config.get(HttpOptions.METHOD)); // 将上面配置的参数信息封装到http请求配置里面 httpRestConfig.setDataSubject(config.get(HttpOptions.DATA_SUBJECT)); httpRestConfig.setCycles(config.get(HttpOptions.CYCLES)); httpRestConfig.setParam( gson.fromJson( config.get(HttpOptions.PARAMS), new TypeToken<List>() {}.getType())); httpRestConfig.setHeader( gson.fromJson( config.get(HttpOptions.HEADER), new TypeToken<List>() {}.getType())); httpRestConfig.setBody( gson.fromJson( config.get(HttpOptions.BODY), new TypeToken<List>() {}.getType())); httpRestConfig.setColumn( gson.fromJson( config.get(HttpOptions.COLUMN), new TypeToken<List>() {}.getType())); return httpRestConfig; }
修改HttpRowConverter
// 修改类的泛型 原来是 String 现在需要修改成Mappublic class HttpRowConverter extends AbstractRowConverter<Map, RowData, RowData, LogicalType>
// 上面修改了泛型后 这里重写的方法参数类型也会是map类型,在别的地方调用这个方法的时候,传递的就是map类型数据 // 但是源码里面用String接收的,这样会导致调用方法的时候就出错,而且单步调试的时候就是进不到这个方法的,只能进入到类上 // 前面传递过来的就是map类型数据了,源码里面,这个方法里的前两行是将字符串转成map的,那也就是说这两行代码不需要了,删除即可 @Override public RowData toInternal(Map result) throws Exception { GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount()); List columns = rowType.getFieldNames(); for (int pos = 0; pos < columns.size(); pos++) { Object value = MapUtil.getValueByKey( result, columns.get(pos), httpRestConfig.getFieldDelimiter()); if (value instanceof LinkedTreeMap) { value = value.toString(); } genericRowData.setField(pos, toInternalConverters.get(pos).deserialize(value)); } return genericRowData; }
经过上面的修改之后,可以在with参数里面指定数据主体和请求周期,直接在localTest类运行即可成功!下面是示例的sql
CREATE TABLE source( id int, name varchar) WITH ( 'connector' = 'http-x' ,'url' = 'http://127.0.0.1:8090/test/test' ,'intervalTime' = '3000' ,'method' = 'get' ,'cycles' = '5', ,'dataSubject' = '${data}' ,'decode' = 'json' ,'paging' = 'true' ,'pagingParam' = 'pageNumber' ,'params' = '[{"key": "pageNumber","value":1,"type":"int"},{"key": "pageSize","value":100,"type":"int"}]' ,'column' = '[ { "name": "id", "type": "int" }, { "name": "name", "type": "String" } ]' );CREATE TABLE sink( id int, name varchar) WITH ( 'connector' = 'stream-x' );insert into sinkselect *from source u;
后续
目前在上面的基础上,我又加了分页查询的功能,后面有时间会编辑此博客加上分页的源码修改
最后
转载请注明来处