Kafka简介
Kafka 是分布式发布-订阅消息系统。Kafka 是一个分布式的,可划分的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。
Kafka 的主要特点:
- 同时为发布和订阅提供高吞吐量。据了解,Kafka 每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。
- 可进行持久化操作。将消息持久化到磁盘,因此可用于批量消费,例如 ETL,以及实时应用程序。通过将数据持久化到硬盘防止数据丢失。
- 分布式系统,易于向外扩展。所有的
producer
、broker
和consumer
都会有多个,均为分布式的。无需停机即可扩展机器。 - 消息被处理的状态是在
consumer
端维护,而不是由broker
端维护。当失败时能自动平衡。 - 支持online和offline的场景。
Kafka 的整体架构非常简单,是显式分布式架构,producer
、broker
和 consumer
都可以有多个。producer
,consumer
实现Kafka 注册的接口,数据从 producer
发送到 broker
,broker
承担一个中间缓存和分发的作用。broker
分发注册到系统中的 consumer
。broker
的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。客户端和服务器端的通信,是基于简单,高性能,且与编程语言无关的 TCP 协议。
topic
:特指 Kafka 处理的消息源的不同分类。partition
:topic
物理上的分组,一个topic
可以分为多个partition
,每个partition
是一个有序的队列。partition
中的每条消息都会被分配一个有序的id
。message
:消息,是通信的基本单位,每个producer
可以向一个topic
发布一些消息。producers
:消息和数据生产者,向 Kafka 的一个topic
发布消息的过程叫做producers
。consumers
:消息和数据消费者,订阅topics
并处理其发布的消息的过程叫做consumers
。broker
:缓存代理,Kafka 集群中的一台或多台服务器统称为broker
。
Kafka 安装
在安装 Kafka 之前需要先安装 zookeepper,随后点击 Kafka 的 安装地址,进行一下步骤安装:
## 解压命令: tar -zxvf kafka_2.12-2.1.0.tgz -C /usr/local/ ## 改名命令: mv kafka_2.12-2.1.0/ kafka_2.12 ## 进入解压后的目录,修改server.properties文件: vim /usr/local/kafka_2.12/config/server.properties ## 修改配置: broker.id=0port=9092host.name=192.168.11.51advertised.host.name=192.168.11.51log.dirs=/usr/local/kafka_2.12/kafka-logsnum.partitions=2zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181 ## 建立日志文件夹: mkdir /usr/local/kafka_2.12/kafka-logs ##启动kafka: /usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &
常用命令:
## 简单操作: #(1)创建topic主题命令:(创建名为test的topic, 1个分区分别存放数据,数据备份总共1份) kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic topic1 --partitions 1 --replication-factor## --zookeeper 为zk服务列表 ## --create 命令后 --topic 为创建topic 并指定 topic name ## --partitions 为指定分区数量 ## --replication-factor 为指定副本集数量 #(2)查看topic列表命令: kafka-topics.sh --zookeeper 192.168.11.111:2181 --list #(3)kafka命令发送数据:(然后我们就可以编写数据发送出去了) kafka-console-producer.sh --broker-list 192.168.11.51:9092 --topic topic1 #(4)kafka命令接受数据:(然后我们就可以看到消费的信息了) kafka-console-consumer.sh --bootstrap-server 192.168.11.51:9092 --topic topic1 --from-beginning #(5)删除topic命令: kafka-topics.sh --zookeeper 192.168.11.111:2181 --delete --topic topic1 #(6)kafka查看消费进度:(当我们需要查看一个消费者组的消费进度时,则使用下面的命令)kafka-consumer-groups.sh --bootstrap-server 192.168.11.111:9092 --describe --group消费组名称
Kafka 使用
SpringBoot 结合使用 Kafka 需要使用到以下依赖:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
1.1 生产端
生产端的 yml
文件配置:
spring:kafka:# kafka 集群bootstrap-servers: 192.168.212.128:9092producer:# producer 发送消息失败时的重试次数retries: 0# 批量发送数据的配置batch-size: 16384# 设置kafka 生产者内存缓存区的大小(32 M)buffer-memory: 33554432# kafka 消息的序列化配置key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer# kafka 发送消息的 ack 可靠性投递的配置项# acks=0 : 生产者投递消息后,不会等待任何来自服务器的响应# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应# acks=-1 : 表示分区 leader 必须等待消息被成功写入到所有的节点中,才认为 producer 请求成功,这种方案提供最高的消息持久性保证,但理论上吞吐量是最低的acks: 1
简易生产者Demo代码:
@Componentpublic class KafkaProducerService {final static Logger logger = LoggerFactory.getLogger(KafkaProducerService.class);// key 是 String 类型,value 是 Object 类型@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;/** * 发送消息 * @param topic topic 主题 * @param object 消息主体 */public void sendMessage(String topic, Object object){// future 模式ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);// 添加回调函数future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {// 发送成功@Overridepublic void onSuccess(SendResult<String, Object> result) {logger.info("发送消息成功:" + result.toString());}@Overridepublic void onFailure(Throwable ex) {logger.info("发送消息失败:" + ex.getMessage());}});}}
1.2 消费端
消费端的 yml
文件配置:
spring:kafka:# kafka 集群bootstrap-servers: 192.168.212.128:9092consumer:# 消息的签收机制:手动签收enable-auto-commit: false# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理# latest(默认值),在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest,在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# kafka 消息的序列化配置key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 手动签收ack-mode: manual# 并行数concurrency: 5
简易生产者Demo代码:
@Componentpublic class KafkaConsumer {final static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);/** * 消费消息 * groupId 定义消费者组,下面脚本查看该消费组的消费进度 * kafka-consumer-groups.sh --bootstrap-server 192.168.11.111:9092 --describe --group消费组名称 * topics 定义监听哪个 topic * @param record 发送过来的记录 * @param acknowledgment 手动 ack * @param consumer 本消费者对应的一系列信息 */@KafkaListener(groupId = "group01", topics = "topic1")public void onMessage(ConsumerRecord<String, Object> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer){logger.info("消费端接收消息:{}", record.value().toString());// 手动签收acknowledgment.acknowledge();}}
1.3 运行测试
运行测试的时候,在服务器可以使用以下命令查看对应消费组的消费进度,输出的结果如下图:
kafka-consumer-groups.sh --bootstrap-server 192.168.11.111:9092 --describe --group消费组名称
其中各个字段的意思分别是:
topic
:主题partition
:当前分区的索引current-offset
:当前的进度,消费,ack
后的消息数log-end-offset
:日志的进度,比如10条消息发送后,无需消费,ack
,该数值就是 10LAG
:延迟,可以理解为还没消费的条数,比如消息发送了10条,这10条都没有ack
,则延迟数为10条consumer-id
:注册消费者 Id,如果消费者不是运行状态,则为-
host
:消费者地址client-id
:消费者名