kafka是什么?
是一种高吞吐量的、分布式、发布、订阅、消息系统
1.导入maven坐标
org.apache.kafkakafka-clients2.4.1
2.编写提供者
public class KafkaProducer {public static void main(String[] args) {Properties prop = new Properties();prop.put("bootstrap.servers", "localhost:9092");prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");prop.put("acks", "all");prop.put("retries", 0);prop.put("batch.size", 16384);prop.put("linger.ms", 1);prop.put("buffer.memory", 33554432);String topic = "hello"; // 主题org.apache.kafka.clients.producer.KafkaProducer producer = new org.apache.kafka.clients.producer.KafkaProducer(prop);producer.send(new ProducerRecord(topic, Integer.toString(2), "hello kafka"));producer.close();}}
3.编写消费者
public class KafkaConsumer {public static void main(String[] args) throws InterruptedException {Properties prop = new Properties();prop.put("bootstrap.servers", "192.168.8.166:9092");prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");prop.put("group.id", "con-1");prop.put("auto.offset.reset", "latest");//自动提交偏移量prop.put("auto.commit.intervals.ms", "true");//自动提交时间prop.put("auto.commit.interval.ms", "1000");org.apache.kafka.clients.consumer.KafkaConsumer consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(prop);ArrayList topics = new ArrayList();//可以订阅多个消息topics.add("hello");consumer.subscribe(topics);while (true) {ConsumerRecords poll = consumer.poll(Duration.ofSeconds(20));for (ConsumerRecord consumerRecord : poll) {System.out.println(consumerRecord);System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());System.out.println(consumerRecord.topic());}}}}
4.下载kafka
Apache Kafka”>点此去官网下载——>Apache Kafka
解压后进入config目录
修改zookeeper.properties
dataDir=D:/kafka_2.13-3.5.1/tmp/zookeeper
修改日志存放的路径server.properties
log.dirs=D:/kafka_2.13-3.5.1/tmp/kafka-logs
启动zookeeper服务
zookeeper-server-start.bat ../../config/zookeeper.properties
启动kafka服务
kafka-server-start.bat ../../config/server.properties
5.依次启动消费者和生产者,查看发布的消息
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END