下面是使用Spring Boot和Kafka实现消息队列的简单例子:

  1. 引入依赖

在pom.xml中添加以下依赖:

org.springframework.kafkaspring-kafka2.7.5
  1. 配置Kafka

在application.properties中添加Kafka的相关配置:

spring.kafka.bootstrap-servers=localhost:9092spring.kafka.consumer.group-id=myGroupspring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  1. 发送消息

创建一个生产者类,使用KafkaTemplate发送消息:

@Servicepublic class KafkaProducerService {@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}}
  1. 接收消息

创建一个消费者类,使用@KafkaListener注解监听指定的主题,处理消息:

@Servicepublic class KafkaConsumerService {@KafkaListener(topics = "myTopic", groupId = "myGroup")public void onMessage(String message) {System.out.println("Received message: " + message);}}
  1. 测试

在Controller中调用生产者发送消息,然后在控制台中可以看到消费者接收到的消息:

@RestControllerpublic class KafkaController {@Autowiredprivate KafkaProducerService kafkaProducerService;@GetMapping("/send")public String sendMessage() {kafkaProducerService.sendMessage("myTopic", "Hello, Kafka!");return "Message sent successfully";}}

以上就是一个简单的使用Spring Boot和Kafka实现消息队列的例子

分区

  1. 编写Kafka生产者代码,使用KafkaTemplate发送消息,并指定分区号。如下所示:
@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String message, int partition) {kafkaTemplate.send("my-topic", partition, null, message);

2.编写Kafka消费者代码,使用@KafkaListener注解监听指定的主题,并在方法参数中获取分区号。如下所示:

@KafkaListener(topics = "my-topic", groupId = "my-group")public void listen(ConsumerRecord record, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {System.out.println("Received message: " + record.value() + ", partition: " + partition);