概要
Flink流数据常常存在写入数据库的场景,一般是通过继承RichSinkFunction来实现对数据的写入。如果sink之前不做优化处理,写入时都是单条写入。单条写入有许多弊端:
1、写入频繁造成数据库压力大
2、写入速度慢、效率低,造成反压
所以需要使用批量写入的方式,本文通过开窗window定时缓存周期数据形成批,下发给sink节点,本文通过大数据量生产环境验证,不仅实现了批量写入,还在防止数据倾斜支持并行等方面做了优化,乃呕心之作。
批量写入功能实现
主函数
KeyedStream keyedStream=sinkStream.keyBy(newHashModKeySelector(keyIndexList,paralleSize));winStream=keyedStream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(windowSize))) .process(new RowProcessWindowFunction(keyIndexList));DataStreamSinksink=winStream.addSink(new DbSinkFunction(conf,writeSql));
1、对业务数据进行分组HashModKeySelector
public class HashModKeySelector implements KeySelector<Row, String> {private static final Logger logger = LoggerFactory.getLogger(HashModKeySelector2.class);private static final long serialVersionUID = 1L;/** * key在row中的索引 */private List<Integer> keyIndexList=null;private Integer paralleSize;private Map<String,String> md5Map = new ConcurrentHashMap<>();public HashModKeySelector2(List<Integer> keyIndexList, Integer paralleSize) {this.keyIndexList=keyIndexList;this.paralleSize=paralleSize;}@Overridepublic String getKey(Row value) {int size=keyIndexList.size();Row keyRow=new Row(size);for(int i=0;i<size;i++) {int index=keyIndexList.get(i);keyRow.setField(i, value.getField(index));}int keyHash=keyRow.hashCode()%paralleSize;String strKey=String.valueOf(keyHash);String md5Value = md5Map.get(strKey);if(StringUtils.isBlank(md5Value)){md5Value=md5(strKey);md5Map.put(strKey,md5Value);}return md5Value;}public static String md5(String key) {String result="";try {// 创建MD5消息摘要对象MessageDigest md = MessageDigest.getInstance("MD5");// 计算消息的摘要byte[] digest = md.digest(key.getBytes());// 将摘要转换为十六进制字符串String hexString = bytesToHex(digest);result=hexString;} catch (Exception e) {logger.error("计算{}md5值失败:",key,e);return key;}return result;}public static String bytesToHex(byte[] bytes) {StringBuilder hexString = new StringBuilder();for (byte b : bytes) {String hex = Integer.toHexString(0xff & b);if (hex.length() == 1) {hexString.append('0');}hexString.append(hex);}return hexString.toString();}}
2、 使用滚动窗口缓存数据,将单条数据放入集合中,发送到下游
public class RowProcessWindowFunction extends ProcessWindowFunction<Row, Row[], String, TimeWindow>{private static final Logger LOG = LoggerFactory.getLogger(RowProcessWindowFunction.class);/** * key在row中的索引 */private List<Integer> keyIndexList;public RowProcessWindowFunction(List<Integer> keyIndexList) {if(keyIndexList==null||keyIndexList.size()==0) {LOG.error("keyIndexList is empty");throw new RuntimeException("keyIndexList is empty");}this.keyIndexList=keyIndexList;}@Overridepublic void process(String key, Context context, Iterable<Row> inRow, Collector<Row[]> out) throws Exception {List<Row> rowList=new ArrayList<>();for (Row row : inRow) {rowList.add(row);}int size=rowList.size();Row[] rows=new Row[size];int index=0;for(Row tmpRow:rowList) {rows[index]=tmpRow;index=index+1;}out.collect(rows);}}
3、批量写入
public class DbSinkFunction<I> extends RichSinkFunction<I> {private static final Logger LOG = LoggerFactory.getLogger(DbSinkFunction.class);private String driver = null;private String sql = null;DbConnectionPool pool = null;private Integer laodRate;private int columnTypes[];public DbSinkFunction(String dbDriver, String dmlSql) {this.driver = dbDriver;this.sql = dmlSql;} @Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);//创建连接池pool = new DbConnectionPool(conf, driver); }@Overridepublic void close() throws Exception {//关闭资源、释放资源super.close();//关闭连接池pool.close();}/** * 写入数据库 */@Overridepublic void invoke(I record, Context context) throws Exception {PreparedStatement ps = null;Boolean isBatch = false;String logSql = sql.substring(0,sql.toUpperCase().indexOf("VALUES"));int length=1;Connection connection =null;try {connection =pool.getConnection();ps = connection.prepareStatement(sql);//如果是批量数据if (record instanceof Row[]) {isBatch = true;connection.setAutoCommit(false);Row[] rowArray = (Row[]) record;length=rowArray.length;LOG.info("Row array:{}",length);int no=0;for(int i=1;i<=length;i++) {Row row=rowArray[i-1];fillPreparedStatement(ps, row);ps.addBatch();if(i%3000==0) {ps.executeBatch();connection.commit();ps.clearBatch();no=0;}no=no+1;}if(no>0) {ps.executeBatch();connection.commit();ps.clearBatch();}} else if (record instanceof Row) {//单条数据isBatch = false;Row row = (Row) record;fillPreparedStatement(ps, row);ps.execute();} else {throw new RuntimeException("不支持的数据类型 " + record.getClass());}} catch (SQLException e) {connection.rollback();if (isBatch) {doOneInsert(sql, connection, (Row[]) record);}} catch (Exception e) {LOG.error("写入失败", e);} finally {closeDBResources(ps,connection);}}/** * 批量失败时 单条写入 * * @param sql * @param connection * @param rowArray */protected void doOneInsert(String sql, Connection connection, Row[] rowArray) {PreparedStatement ps = null;String logSql = sql.substring(0,sql.toUpperCase().indexOf("VALUES"));try {Integer allSize = rowArray.length;Integer errCount = 0;connection.setAutoCommit(true);ps = connection.prepareStatement(sql);for (Row row : rowArray) {try {fillPreparedStatement(ps, row);ps.execute();} catch (SQLException e) {errCount++;} finally {ps.clearParameters();}}} catch (Exception e) {LOG.error(e.getMessage(), e);} finally {closeDBResources(ps,null);}}private void closeDBResources(Statement stmt, Connection conn) {try {if (!(null== stmt||stmt.isClosed())) { stmt.close();}if (!(null == conn||conn.isClosed())) {conn.close();}} catch (SQLException e) {}}