欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

本篇概览

  • 本文是《Strimzi Kafka Bridge(桥接)实战之》系列的第二篇,咱们直奔bridge的重点:常用接口,用实际操作体验如何用bridge完成常用的消息收发业务

  • 官方的openapi接口文档地址 : https://strimzi.io/docs/bridge/in-development/#_openapi

  • 整篇文章由以下内容构成:

  1. 准备工作:创建topic
  2. 生产消息
  3. 消费消息,strimzi bridge消费消息的逻辑略有些特殊,就是要提前创建strimzi bridge consumer,再通过consumer来调用拉取消息的接口
  • 完成本篇实战后,相信您已经可以数量的通过http来使用kafka的服务了

准备工作:创建topic

  • 遗憾的是,bridge未提供创建topic的API,所以咱们还是用命令来创建吧
  • ssh登录kubernetes的宿主机
  • 执行创建名为bridge-quickstart-topic的topic,共四个分区
kubectl -n aabbcc \run kafka-producer \-ti \--image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 \--rm=true \--restart=Never \-- bin/kafka-topics.sh \--bootstrap-server my-cluster-kafka-bootstrap:9092 \--create \--topic bridge-quickstart-topic \--partitions 4 \--replication-factor 1
  • 检查topic创建是否成功
kubectl -n aabbcc \run kafka-producer \-ti \--image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 \--rm=true \--restart=Never \-- bin/kafka-topics.sh \--bootstrap-server my-cluster-kafka-bootstrap:9092 \--describe \--topic bridge-quickstart-topic
  • 如下图,可见topic的创建符合预期
  • 接下来的操作都是向bridge发送http请求完成的,我这边宿主机的IP地址是192.168.0.1,bridge的NodePort端口号31331

查看指定topic的详情

  • 如下请求,可以取得topicbridge-quickstart-topic的详情
curl -X GET \  http://192.168.0.1:31331/topics/bridge-quickstart-topic
  • 收到响应如下,是这个topic的详细信息
{"name": "bridge-quickstart-topic","configs": {"compression.type": "producer","leader.replication.throttled.replicas": "","message.downconversion.enable": "true","min.insync.replicas": "1","segment.jitter.ms": "0","cleanup.policy": "delete","flush.ms": "9223372036854775807","follower.replication.throttled.replicas": "","segment.bytes": "1073741824","retention.ms": "604800000","flush.messages": "9223372036854775807","message.format.version": "3.0-IV1","max.compaction.lag.ms": "9223372036854775807","file.delete.delay.ms": "60000","max.message.bytes": "1048588","min.compaction.lag.ms": "0","message.timestamp.type": "CreateTime","preallocate": "false","min.cleanable.dirty.ratio": "0.5","index.interval.bytes": "4096","unclean.leader.election.enable": "false","retention.bytes": "-1","delete.retention.ms": "86400000","segment.ms": "604800000","message.timestamp.difference.max.ms": "9223372036854775807","segment.index.bytes": "10485760"},"partitions": [{"partition": 0,"leader": 0,"replicas": [{"broker": 0,"leader": true,"in_sync": true}]},{"partition": 1,"leader": 0,"replicas": [{"broker": 0,"leader": true,"in_sync": true}]},{"partition": 2,"leader": 0,"replicas": [{"broker": 0,"leader": true,"in_sync": true}]},{"partition": 3,"leader": 0,"replicas": [{"broker": 0,"leader": true,"in_sync": true}]}]}

批量生产消息(同步)

  • 试试bridge提供的批量生产消息的API,以下命令会生产了三条消息,第一条通过key的hash值确定分区,第二条用partition参数明确指定了分区是2,第三条的分区是按照轮询策略更新的
curl -X POST \  http://42.193.162.141:31331/topics/bridge-quickstart-topic \  -H 'content-type: application/vnd.kafka.json.v2+json' \  -d '{    "records": [        {            "key": "my-key",            "value": "sales-lead-0001"        },        {            "value": "sales-lead-0002",            "partition": 2        },        {            "value": "sales-lead-0003"        }    ]}'
  • bridge响应如下,会返回每一条消息的partition和offset,这就是同步消息的特点,等到meta信息更新完毕后才会返回
{"offsets": [{"partition": 0,"offset": 0}, {"partition": 2,"offset": 0}, {"partition": 3,"offset": 0}]}

批量生产消息(异步)

  • 有的场景下,例如追求高QPS并且对返回的meta信息不关注,可以考虑异步的方式发送消息,也就是说bridge收到响应后立即返回200,这种异步模式和前面的同步模式只有一个参数的差别:在请求url中增加async=true即可
curl -X POST \  http://42.193.162.141:31331/topics/bridge-quickstart-topic" />
  • 咱们先拉取消息,将消息都消费掉
  • 由于没有新生产消息,此时再拉去应该拉取不到了
  • 现在执行以下请求,就可以将offset设置到74
  • curl -X POST http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions \  -H 'content-type: application/vnd.kafka.v2+json' \  -d '{    "offsets": [        {            "topic": "bridge-quickstart-topic",            "partition": 2,            "offset": 74        }    ]}'
    • 再次拉取消息,发现74和之后的所有消息都可以拉去到了(注意,包含了74)
    • 至此,咱们对生产和发送消息的常用接口都已经操作了一遍,对于常规的业务场景已经够用,接下来的文章,咱们以此为基础,玩出更多花样来

    你不孤单,欣宸原创一路相伴

    1. Java系列
    2. Spring系列
    3. Docker系列
    4. kubernetes系列
    5. 数据库+中间件系列
    6. DevOps系列