学习笔记

  • 一、环境
  • 二、maven引入
  • 三、application配置
  • 四、SpringBoot-生产者
  • 五、SpringBoot-消费者
  • 六、SpringBoot-主题分区

一、环境

使用Kafka3.0.0

masterslave1slave2
ip193.168.3.34193.168.3.35193.168.3.36

二、maven引入

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-web</artifactId></dependency><dependency>    <groupId>org.projectlombok</groupId>    <artifactId>lombok</artifactId></dependency><dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId></dependency>

三、application配置

spring:  kafka:    bootstrap-servers: 192.168.3.34:9092,192.168.3.35:9092,192.168.3.36:9092 # 指定 kafka 的地址    producer: #生产者      retries: 0  #重复次数 ,失败不重发      batch-size: 16384 #每次批量发送消息的数量      buffer-memory: 33554432 #缓存大小达到buffer.memory就发送数据      acks: 1  # 0=生产者将不会等待来自服务器的任何确认  1=leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应  -1 =leader将等待完整的同步副本集以确认记录            key-serializer: org.apache.kafka.common.serialization.StringSerializer #指定 key 的序列化器      value-serializer: org.apache.kafka.common.serialization.StringSerializer #指定 value 的序列化器    consumer:      group-id: nacl #指定消费者组的 group_id      auto-offset-reset: earliest   #latest 最新的位置 , earliest最早的位置      auto-commit-interval: 100  #自动提交offset频率 100毫秒            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #指定 key 的反序列化器      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #指定 value 的反序列化器    listener:      concurrency: 3  #3个并行监听

四、SpringBoot-生产者

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@CrossOrigin@RestControllerpublic class ProducerController {    // Kafka 模板用来向 kafka 发送数据    @Resource    private KafkaTemplate<String, Object> kafkaTemplate;    @RequestMapping("/kf")    public String data() {        kafkaTemplate.send("first", "hello");        return "ok";    }}

五、SpringBoot-消费者

import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.KafkaListener;@Configurationpublic class KafkaConsumer {    // 指定要监听的 topic    @KafkaListener(topics = "first")    public void consumeTopic(String msg) { // 参数: 收到的 value        System.out.println("收到的信息: " + msg);    }}

六、SpringBoot-主题分区

import org.apache.kafka.clients.admin.NewTopic;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class KafkaTopic {    @Bean    public NewTopic batchTopic() {        //项目启动时,自动创建topic,指定分区和副本数量        return new NewTopic("first", 3, (short) 1);    }}