1 kafka生产者工作模式
1.1 生产者消息发送流程
1.1.1 发送原理
Producer首先调用send方法进行发送,首先会经过拦截器,可以对数据进行一些加工处理。随后会经过序列化,kafka并没有采用Java提供的序列化器,而是自己实现的序列化器,但是Java提供的序列化器,会在原有数据的基础上,增加很多的用于安全校验的数据,在大数据的场景下,每次传输的数据量很大,如果在此基础上还要加入大量用于安全校验的数据,严重的影响了效率,所以kafka等中间件,自己实现了序列化器,仅仅进行简单的校验,增加了效率。
随后经过分区器(分区器实际上是将数据发送到了缓冲队列中,缓冲队列是一个双端队列,其内部包含内存池,避免频繁的申请和释放内存),因为kafka可以对topic进行分区,所以发送时就需要确定向哪个分区发送信息,就由分区器定义的规则来发送,一个分区对应一个队列,这些队列都是在内存中创建的,总大小默认32M,每一批次默认大小32K。
在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。
发送时,以分区节点为key,即broker1,broker2为key,请求为value进行发送,形成一个请求。请求发送到某个broker中,如果第一个请求发送到broker1,broker1没有即使的应答,允许继续发送第二个请求,直到五个请求都没有得到应答,后续的请求不会再发送,直到得到了请求的应答才继续发送。
从图中的流程可以看出,生产者和kafka集群之间还有一个RecordAccumulator队列,默认大小是32M,topic分区的话,producer会对应有一个分区器,数据在进入中间队列前,已经被分区器进行了分区,sender()方法在发送数据时,就直接根据分区进行拉取了,拉取时有两个参数,也就是调优参数:
- batch.size :也就是批大小,只有数据累计到batch.size后,sender才会发送数据,默认16k ;
- linger.ms :也就是等待时间,如果数据未达到batch.size,sender等待linger.ms设置的时间就会发送数据,单位ms,默认值就是0ms,就是有了一条数据直接发(默认为0是因为kafka要接实时数仓,所以设置为0);
kafka集群收到请求之后会涉及到一个应答机制,应答级别分为0、1、-1:
- 0:生产者发送过来的数据,不需要等待数据落盘应答;
- 1:生产者发送过来的数据,Leader(数据落盘)收到后应答,副本有没有无所谓;
- -1(all) :生产者发送过来的数据,Leader和ISR里面的所有节点收齐数据后应答,-1和all等价。
Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follow + Leader集合(leader:0,ISR:0,1,2),如果Follower长时间未向Leader发送通信请求或同步数据,则该Follow将被踢出ISR。改时间阈值由replica.lag.time.max.ms参数设定,默认30s,例如如果2超时,(leader:0,ISR:0,1)。
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景:
kafka集群应答之后,如果成功,进行数据的清理,如果失败,进行重试,默认重试次数是int的最大值:
重复数据的评判标准:具有相同主键的信息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition表示分区号;Sequence Number是单调自增的。
所以幂等性只能保证的是在单分区单会话内不重复。如果想保证数据一定不重复,就需要开启事务。
使用幂等性:开启参数enable.idempotence默认为true,即默认为开启。
1.1.4 生产者重要参数列表
1.2 异步发送API
生产者代码中有3必须,IP即连接地址、key和value的序列化器。
1.2.1 普通异步发送流程
创建maven项目
导入依赖
org.apache.kafkakafka-clients3.0.0
代码编写
package com.atguigu.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties; public class CustomProducer {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer kafkaProducer = new KafkaProducer(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord("first1", "atguigu"));}// 5. 关闭资源kafkaProducer.close();}}
1.2.2 带回调函数的异步发送
回调函数是实现应答机制的函数.
package com.atguigu.kafka.producer; import org.apache.kafka.clients.producer.*; import java.util.Properties; public class CustomProducerCallBack {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):// 序列化器的serialization是一个接口,找到他的实现类// 我们一般都是使用Stringproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer kafkaProducer = new KafkaProducer(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord("first1", "atguigu" + i),new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { //(1)消息发送成功exception == null接受到服务端ack消息 调用该方法 //(2)消息发送失败exception != null也会调用该方法 if (exception == null) { System.out.println(metadata);//使用打印演示 }else{ exception.printStackTrace();//打印异常信息 } }});}// 5. 关闭资源kafkaProducer.close();}}
1.3 同步发送API
package com.atguigu.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;import java.util.concurrent.ExecutionException; public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer kafkaProducer = new KafkaProducer(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 默认为异步发送kafkaProducer.send(new ProducerRecord("first1", "atguigu" + i));// 末尾加get为同步发送kafkaProducer.send(new ProducerRecord("first1", "atguigu" + i)).get();} // 5. 关闭资源kafkaProducer.close();}}
1.4 生产者分区
1.4.1 kafka分区的好处
因为不同的分区分布在不同的节点上,所以便于合理使用资源,实现负载均衡。并且在不同节点上可以提高并行度。
1.4.2 生产者发送消息的分区策略
- 指定发送到哪一个分区,直接使用对应的分区号,不会走分区器;
- 不写分区号,需要走分区器,有key,按照key进行hash之后取模分区个数;
- 不写分区号,需要走分区器,没有key,粘性分区缓存机制;
- 一批数据发送到随机的一个分区中,下一批数据发送到另外一个分区;
- 如果是异步发送,数据发送的比较快,10条数据被当作一批,每一次都是一个分区;
- 如果是同步发送,发一条数据歇一会,导致每一条数据都是不同批;
import org.apache.kafka.clients.producer.*;import java.util.Properties;import java.util.concurrent.ExecutionException; public class CustomProducerCallBackPartition {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer kafkaProducer = new KafkaProducer(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// (1)指定发送到哪一个分区直接使用对应的分区号 不会走分区器// (2) 不写分区号需要走分区器有key 按照key进行hash之后取模分区个数// (3) 不写分区号需要走分区器没有key 粘性分区缓存机制//一批数据发送到随机的一个分区中,下一批数据发送到另外一个分区// 如果是异步发送,数据发送的比较快 10条数据被当作一批每一次都是一个分区// 如果是同步发送,发一条数据歇一会,导致每一条数据都是不同批kafkaProducer.send(new ProducerRecord("first1", "atguigu" + i),new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { //(1)消息发送成功exception == null接受到服务端ack消息 调用该方法 //(2)消息发送失败exception != null也会调用该方法 if (exception == null) { System.out.println(metadata); }else{ exception.printStackTrace(); } }}).get();}//Thread.sleep(20);// 5. 关闭资源kafkaProducer.close();}}
1.4.3 自定义分区器
根据业务需求,可以自定义分区器。
假设需求:发送过来的数据中如果包含atguigu,就发往0号分区,不包含atguigu,就发往1号分区
import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map; // (1)实现分区器的接口public class CustomPartitioner implements Partitioner {/*传参topic:主题 key:key值 keyBytes:key序列化之后 value:value值 valueBytes:value序列化之后cluster:集群信息return的是分区号*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//String log = value.toString();if (log.contains("atguigu")) {return 0;}return 1;} @Overridepublic void close() { } @Overridepublic void configure(Mapimport org.apache.kafka.clients.producer.*;import java.util.Properties; public class CustomProducerCallBack {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 添加定义的分区器,需要自定义分区的全类名properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.partitioner.CustomPartitioner"); // 3. 创建kafka生产者对象KafkaProducer kafkaProducer = new KafkaProducer(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord("first1", "atguigu" + i),new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { //(1)消息发送成功exception == null接受到服务端ack消息 调用该方法 //(2)消息发送失败exception != null也会调用该方法 if (exception == null) { System.out.println(metadata); }else{ exception.printStackTrace(); } }});}// 5. 关闭资源kafkaProducer.close();}}
1.5 生产经验
1.5.1 生产者如何提高吞吐量
提高吞吐量,就是提高批次传输大小,还有就是效率问题.
import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class CustomProducerParameters {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties(); // 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //调优参数,还是需要根据业务需求来调整//batch.size 批次大小,默认是16k,将批次大小增大,进而提高吞吐量properties.put(ProducerConfig.BATCH_SIZE_CONFIG,32768);//linger.ms 等待时长,默认是0ms,增加等待时长properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);//双端队列大小,默认是32M,可以提高到64Mproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,67108864);//调整压缩格式,默认没有压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // 3. 创建kafka生产者对象KafkaProducer kafkaProducer = new KafkaProducer(properties); // 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord("first1", "atguigu"));}// 5. 关闭资源kafkaProducer.close();}}
1.5.2 数据可靠性
数据可靠性基于ack应答机制。数据完全可靠的条件:Acks级别设置为-1,分区副本大于等于2,ISR应答的最小副本数大于等于2。
副本介绍
(1)Kafka副本作用:提高数据可靠性。
(2)Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
(3)Kafka中副本分为:Leader和Follower。Kafka生产者只会把数据发往Leader,然后Follower找Leader进行同步数据。
(4)Kafka分区中的所有副本统称为AR(Assigned Repllicas)。
AR = ISR + OSR
- ISR,表示和Leader保持同步的Follower集合。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader发生故障之后,就会从ISR中选举新的Leader。
- OSR,表示Follower与Leader副本同步时,延迟过多的副本。
可靠性总结:
- acks=0,生产者数据发来,kafka集群内存接受到数据就返回ack
- acks=1,生产者数据发来,kafka集群中的leader落盘数据后返回ack
- acks=-1,生产者数据发来,kafka集群中的所有副本落盘数据后返回ack
import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class CustomProducerAcks {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//设置应答机制acks,可以去3个值,0、1、all(相当与ask = -1)properties.put(ProducerConfig.ACKS_CONFIG, "all");//重试次数retries ,默认是int最大值,2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 3. 创建kafka生产者对象KafkaProducer kafkaProducer = new KafkaProducer(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord("first1", "atguigu"));}// 5. 关闭资源kafkaProducer.close();}}
副本故障处理
1.5.3 数据去重
1.5.3.1 数据传递语义
1.5.3.2 幂等性
开启参数enable.idempotence 默认为true,false关闭。
1.5.3.3 生产者事务
0.11版本的Kafka同时引入了事务的特性,为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。
为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator。Producer就是通过和Transaction Coordinator交互获得Transaction ID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
就是引入一个全局唯一且一致的id,然后将id和pid绑定,从而使producer重启后,kafka集群依然可以通过id获得原来的pid。
注意:开启事务,必须开启幂等性。
一定要手动指定事务id:
1.5.4 数据有序
分区内有序,分区之间无序:
1.5.5 数据乱序
生产端的InFilghtRequests,默认每个broker最多缓存五个请求,当第一个数据发送过去,第二个数据没有发送成功,这时第二波数据就要进行重试,但是此时第三波数据发送,发送成功了,然后第二波数据的重试才发送成功,本来的数据顺序是123,但是现在被改为了132,发生了数据乱序。
将max.in.flight.requests-per.connection设置为1,即不缓存request请求,自然不会发生数据乱序的情况。
开启幂等性以后,因为SeqNumber是单调递增的,所以当数据是顺序的时候,不需要排序就可以发送,但是当发生上面的情况之后,服务端发现数据的SeqNumber是132,不是单调递增了,会对数据进行缓存,攒到5个以后会进行重新排序,之后再进行发送。
参考链接
【精选】Kafka基本原理详解_昙花逐月的博客-CSDN博客
这是最详细的Kafka应用教程了 – 掘金
Kafka : Kafka入门教程和JAVA客户端使用-CSDN博客
简易教程 | Kafka从搭建到使用 – 知乎
【精选】kafka简介_唏噗的博客-CSDN博客
Kafka 架构及基本原理简析
kafka是什么
再过半小时,你就能明白kafka的工作原理了
Kafka 设计与原理详解
Kafka【入门】就这一篇! – 知乎
kafka简介_kafka_唏噗-华为云开发者联盟
kafka详解
Kafka 设计与原理详解-CSDN博客
kafka学习知识点总结(三)
kafka——生产者原理解析_小波同学的技术博客_51CTO博客