生产者创建消息。在其他基于发布与订阅的消息系统中,生产者可能被称为发布者 或 写入者。

一般情况下,一个消息会被发布到一个特定的主题上。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。

生产者发送消息的方式

生产者发送消息主要有 2 种方式:同步发送消息、异步发送消息

同步发送消息

同步发送消息:我们调用 KafkaProducer 的 send() 方法发送消息,send() 方法会返回一个包含 RecordMetadata 的 Future 对象,然后调用 Future 的 get() 方法等待 Kafka 响应,通过 Kafka 的响应,我们就可以知道消息是否发送成功。

  • 如果服务器返回错误,Future 的 get() 方法会抛出异常。
  • 如果没有发生错误,我们会得到一个 RecordMetadata 对象,这个对象包含消息的目标主题、分区信息和消息的偏移量等信息。

我们调用 KafkaProducer 的 send() 方法发送 ProducerRecord 对象,消息先是被放进缓冲区,然后使用单独的线程将消息发送到服务器端。


异常处理

如果在发送数据之前或者在发送过程中发生了任何错误,比如 broker 返回了一个不允许重发消息的异常或者已经超过了重发的次数,那么就会抛出异常。在发送消息之前,生产者也是有可能发生异常的。这些异常有可能是 SerializationException(说明序列化消息失败)、BufferExhaustedException 或 TimeoutException(说明缓冲区已满),又或者是 InterruptException(说明发送线程被中断)。

KafkaProducer 一般会发生两类错误。

  • 其中一类是可重试错误,这类错误可以通过重发消息来解决。比如对于连接错误,可以通过再次建立连接来解决,“无主(no leader)”错误则可以通过重新为分区选举首领来解决。KafkaProducer 可以被配置成自动重试,如果在多次重试后仍无法解决问题,应用程序会收到一个重试异常。
  • 另一类错误无法通过重试解决,比如“消息太大”异常。对于这类错误,KafkaProducer 不会进行任何重试,直接抛出异常。
public void send(String topic, String key, String val) {    ProducerRecord producerRecord = new ProducerRecord(topic, key, val);    try {        ListenableFuture<SendResult> future = kafkaTemplate.send(producerRecord);        SendResult sendResult = future.get();    } catch (Exception e) {        e.printStackTrace();    }}

异步发送消息

异步发送消息:我们调用 KafkaProducer 的 send() 方法,并指定一个回调方法,在服务器返回响应时调用该方法。

大多数时候,我们并不需要等待响应。不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志,或者把消息写入“错误消息”文件以便日后分析。为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持。

为了使用回调,需要一个实现了 org.apache.kafka.clients.producer.Callback 接口的类,这个接口只有一个 onCompletion() 方法。如果 Kafka 返回一个错误,onCompletion() 方法会抛出一个非空异常。通过 onCompletion() 方法抛出的异常,我们可以对发送失败的消息进行处理。一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。你只需要处理那些不可重试的错误或重试次数超出上限的情况。

private class DemoProducerCallback implements Callback {    @Override    public void onCompletion(RecordMetadata recordMetadata, Exception e) {        if (e != null) {            e.printStackTrace();        }    }}ProducerRecord record = new ProducerRecord("CustomerCountry", "Biomedical Materials", "USA");producer.send(record, new DemoProducerCallback());

分区器介绍分区

ProducerRecord 对象包含目标主题、消息键和值(消息)。

  • 如果消息键为 null,并且使用了默认的 DefaultPartitioner 分区器,那么分区器使用粘性分区策略(UniformSticky),会随机选择一个分区,并尽可能一直使用该分区,等到该分区的 batch 已满或者已完成,Kafka 再随机一个分区进行使用(保证和上一次的分区不同)。
  • 如果消息键不为 null,并且使用了默认的 DefaultPartitioner 分区器,那么分区器会对消息键进行散列(使用 Kafka 自己的散列算法,即使升级 Java 版本,散列值也不会发生变化),然后根据散列值把消息映射到特定的分区上(散列值 与 主题的分区数进行取余得到 partition 值)。

这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进行映射时,我们会使用主题的所有分区,而不仅仅是可用的分区。这也意味着,如果写入数据的分区是不可用的,那么就会发生错误。

只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。一旦主题增加了新的分区,那么键与分区之间的映射关系就改变了。如果要使用键来映射分区,那么最好在创建主题的时候就把分区规划好,而且永远不要增加新分区。

自定义分区策略

生产者可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。

通过分区器实现自定义分区策略的步骤:

  1. 定义一个类,该类实现 Partitioner 接口(分区器)
  2. 配置生产者(KafkaProducer),让生产者发送消息时使用自定义的分区器:properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
public class MyPartitioner implements Partitioner {    /**     * 返回信息对应的分区     *     * @param topic      主题     * @param key        消息的 key     * @param keyBytes   消息的 key 序列化后的字节数组     * @param value      消息的 value     * @param valueBytes 消息的 value 序列化后的字节数组     * @param cluster    集群元数据可以查看分区信息     * @return     */    @Override    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {        List partitions = cluster.partitionsForTopic(topic);        if ((keyBytes == null) || (!(key instanceof String))) {            throw new InvalidRecordException("We expect all messages to have String type as key");        }        // 实现自己的分区策略        // 返回数据写入的分区号        return 0;    }    // 关闭资源    @Override    public void close() {    }    // 配置方法    @Override    public void configure(Map configs) {    }}

参考资料

《Kafka 权威指南》第 3 章:Kafka 生产者——向 Kafka 写入数据

本文来自博客园,作者:真正的飞鱼,转载请注明原文链接:https://www.cnblogs.com/feiyu2/p/17250979.html