kafka是什么?

是一种高吞吐量的、分布式、发布、订阅、消息系统

1.导入maven坐标

org.apache.kafkakafka-clients2.4.1

2.编写提供者

public class KafkaProducer {public static void main(String[] args) {Properties prop = new Properties();prop.put("bootstrap.servers", "localhost:9092");prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");prop.put("acks", "all");prop.put("retries", 0);prop.put("batch.size", 16384);prop.put("linger.ms", 1);prop.put("buffer.memory", 33554432);String topic = "hello"; // 主题org.apache.kafka.clients.producer.KafkaProducer producer = new org.apache.kafka.clients.producer.KafkaProducer(prop);producer.send(new ProducerRecord(topic, Integer.toString(2), "hello kafka"));producer.close();}}

3.编写消费者

public class KafkaConsumer {public static void main(String[] args) throws InterruptedException {Properties prop = new Properties();prop.put("bootstrap.servers", "192.168.8.166:9092");prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");prop.put("group.id", "con-1");prop.put("auto.offset.reset", "latest");//自动提交偏移量prop.put("auto.commit.intervals.ms", "true");//自动提交时间prop.put("auto.commit.interval.ms", "1000");org.apache.kafka.clients.consumer.KafkaConsumer consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(prop);ArrayList topics = new ArrayList();//可以订阅多个消息topics.add("hello");consumer.subscribe(topics);while (true) {ConsumerRecords poll = consumer.poll(Duration.ofSeconds(20));for (ConsumerRecord consumerRecord : poll) {System.out.println(consumerRecord);System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());System.out.println(consumerRecord.topic());}}}}

4.下载kafka

Apache Kafka”>点此去官网下载——>Apache Kafka

解压后进入config目录

修改zookeeper.properties

dataDir=D:/kafka_2.13-3.5.1/tmp/zookeeper

修改日志存放的路径server.properties

log.dirs=D:/kafka_2.13-3.5.1/tmp/kafka-logs

启动zookeeper服务

zookeeper-server-start.bat ../../config/zookeeper.properties

启动kafka服务

kafka-server-start.bat ../../config/server.properties

5.依次启动消费者和生产者,查看发布的消息