配置环境变量
vim /etc/profile
在文件最后面添加如下配置,JAVA_HOME是自己安装的目录,需要改为自己的目录
export JAVA_HOME=/usr/java/jdk1.8.0_152export CLASSPATH=$:CLASSPATH:$JAVA_HOME/lib/export PATH=$PATH:$JAVA_HOME/bin
重新加载系统环境变量
source /etc/profile
验证是否安装成功
java -version
出现如下内容说明安装成功
下载压缩包:
官网地址:https://zookeeper.apache.org/releases.html
上传到安装目录并解压
我的是/usr/zookeeper,解压压缩包
修改配置文件
进入解压后的文件夹里面的conf目录下,该目录下有一个名zoo_sample.cfg文件,将该文件重新名为zoo.cfg,或者重新复制一份命名为zoo.cfg。使用vim命令打开zoo.cfg文件编辑,主要修改两个地方,data,log需要自己创建,我是直接在zookeeper安装目录下创建的
启动zookeeper
进入zookeeper安装目录下的bin目录
进入bin目录如下
启动zookeeper服务端:./zkServer.sh start
查看启动状态:./zkServer.sh status
启动zookeeper客户端
[root@localhost bin]# ./zkCli.sh
tar -zxvf kafka压缩包名称
#kafka服务器地址listeners=PLAINTEXT://10.3.1.156:9092
#zk服务器地址,我这里zk跟kafka在一台虚拟机上zookeeper.connect=10.3.1.156:2181
log.dirs=/usr/kafka/kafka-logs
非集群方法配置就完成了,下面就是启动kafka服务,进入kafka文件下的bin目录下,使用命令的方式体验消息发送和接收
启动kafka
./kafka-server-start.sh -daemon ../config/server.properties
创建topic
./kafka-topics.sh --bootstrap-server 10.3.1.156:9092 --create --topic test --replication-factor 1 --partitions 1
查看topic
./kafka-topics.sh --list --bootstrap-server 10.3.1.156:9092
发消息
./kafka-console-producer.sh --bootstrap-server 10.3.1.156:9092 --topic test
接收消息
./kafka-console-consumer.sh --bootstrap-server 10.3.1.156:9092 --topic test
删除topic
./kafka-topics.sh --bootstrap-server 10.3.1.156:9092 --delete --topic test
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId></dependency>
server:port: 8080spring:kafka:bootstrap-servers: 10.3.1.156:9092 #kafka服务地址producer: #生产者key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: #消费者key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliest
@RestControllerpublic class KafkaProducerController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping(value = "/sentMessage")public String sentMessage(@RequestParam("msg") String msg){kafkaTemplate.send("test", msg);return "发送成功";}}
@Componentpublic class KafkaConsumerListener {@KafkaListener(topics = "test", groupId = "my-test-group")public void consumer(ConsumerRecord<String,String> record){System.out.println("接收到的消息:" + record.value());}}