Kafka學習筆記(四)—— API原理剖析

1、Producer API

1.1 消息發送流程

Kafka的Producer發送消息采用的是異步發送的方式。在消息發送的過程中,涉及到了兩個線程——main線程和Sender線程,以及一個線程共享變量——RecordAccumulator。main線程將消息發送給RecordAccumulator,Sender線程不斷從RecordAccumulator中拉取消息發送到Kafka broker。

來一個動圖品品:

注意圖中的三個組件:

  • interceptor:攔截器,后邊寫代碼會自定義攔截器
  • Serializer:序列化器
  • Partitioner:分區器

關于這三個小組件到后邊代碼中,都會有所體現~

1.2 異步發送消息

中國有句古話:talk is cheap,show me the code ~

1.2.1 簡單的代碼示例:

1)導入依賴

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

2)Producer代碼:

public class MyProducer {
    public static void main(String[] args) {

        //1.創建Kafka生產者的配置信息
        Properties properties = new Properties();

        //2.指定Kafka連接的集群
        properties.put("bootstrap.servers", "hadoop102:9092");

        //3.指定ACK應答級別
        properties.put("acks", "all");

        //4.批次大小,16KB
        properties.put("batch.size", 16384);

        //5.等待時間(即使數據量沒有到達16KB,也會在這之后發送數據,防止等待時間過長)
        properties.put("linger.ms", 1);

        //6.重試次數
        properties.put("retries", 3);

        //7. RecordAccumulator 緩沖區大小 32MB
        properties.put("buffer.memory", 33554432);

        //8. key value序列化的類
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //9.創建生產者對象
        KafkaProducer producer = new KafkaProducer<String, String>(properties);

        //10.發送數據
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("first", "simon-1024"+Integer.toString(i)));
        }
        //11.注意要關閉資源,原因在于:整個程序運行下來不到1毫秒,數據不會被發送出去。
        producer.close();
    }
}

3)啟動消費者開始消費 (事先創建了topic為first,有2個分區,2副本)

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

4)查看消費結果:

注意:消費者并不是按照 01234...這樣的順序消費消息的,這是為什么呢?體會分區的意義??!

1.2.2 帶有回調函數的send方法

補充:其實send()方法是有重載的,注意看下面這種寫法:

public class CallbackProducer {
    public static void main(String[] args) {

        //1.創建kafka生產者配置信息
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //2. 創建生產者對象
        KafkaProducer kafkaProducer = new KafkaProducer<String,String>(properties);

        //3. 發送數據
        for (int i = 0; i <10 ; i++) {
         kafkaProducer.send(new ProducerRecord("second", "simon-1024--" + i), new Callback() {// send的重載方法,可以有回調函數
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    //打印出本條數據發送到哪個分區了,偏移量是多少
    System.out.println(recordMetadata.partition()+"   "+recordMetadata.offset());
                }
            });
        }
        kafkaProducer.close();
    }
}

消費結果與上邊一致,下圖是我的執行結果:

直接證明了之前講過的offset并不是全局唯一的,只保證區內有序。

回調函數會在 producer 收到 ack 時調用,為異步調用,該方法有兩個參數,分別是RecordMetadata 和 Exception,如果 Exception 為 null,說明消息發送成功,如果Exception 不為 null,說明消息發送失敗。
注意:消息發送失敗會自動重試,不需要我們在回調函數中手動重試。

ProducerRecord的構造方法還有好多個重載,不再一一舉例,如下:

1.2.3 自定義分區

如果我們在發送消息的時候沒有指定分區,那么Kafka會使用默認的分區器,看一下源碼,分區器都干了些什么(源碼分析在注釋中給出).

查閱了一下官方文檔,默認的分區器為:org.apache.kafka.clients.producer.internals.DefaultPartitioner,直接查看它計算分區的方法:

/**
     * Compute the partition for the given record.
     * 給指定的消息計算分區
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        
        //1. 獲得集群中的該topic信息
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        //2. 共有幾個分區
        int numPartitions = partitions.size();
        
        //3. 如果待發送的消息沒有指定key
        if (keyBytes == null) {
            //3.1 做累加操作
            //【為什么累加呢?比如第一次nextVlue = a,那么下一次為a+1,實現了輪詢策略】
            int nextValue = nextValue(topic);
            //3.2 獲取所有可用的分區(分區所在的機器沒掛掉)
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            //3.2.1 如果有可用的分區
            if (availablePartitions.size() > 0) {
                // 負數轉正后做摸運算
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                // 返回相應的分區數
                return availablePartitions.get(part).partition();
            } 
            //3.2.2 如果沒有可用的分區
            else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } 
        
        //4. 如果待發送的消息指定了key
        else {
            // hash the keyBytes to choose a partition
            //4.1 根據key的哈希值和分區數相與運算,得到分區號
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

ok,可以看到整個業務邏輯流程還是很清楚的。那么我們自己嘗試寫一個分區器:


public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 使得所有的消息發往0號分區
        return 0;
    }

    @Override
    public void close() {

    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}


運行程序,觀察回調函數的執行效果:

1.3 同步發送消息

上面說到的都是消息都是通過異步的方式發送的,使用到了main線程和sender線程。但是如果sender線程在工作的時候,我們阻塞住main線程,那兩個線程實現了串行工作的效果,也就相當于同步發送了。注意這里同步的意思是:一條消息發出去之后,會阻塞當前線程,直到返回ack。

由于 send 方法返回的是一個 Future 對象,根據 Futrue 對象的特點,我們也可以實現同步發送的效果,只需在調用 Future 對象的 get 方發即可。了解即可,不去深究。對上邊的代碼進行簡單的改造:

        //發送數據的代碼片段
        for (int i = 0; i < 10; i++) {
            
            //send方法返回一個Future對象
            Future future = producer.send(new ProducerRecord<String, String>("sencond", "simon-1024", "hello world " + Integer.toString(i)));
            try {
                //由future對象獲得返回值,并且阻塞住線程
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

之前說過Kafka保證的是分區有序,而不是全局有序。如果要保證全局有序,那么最直接的方案就是只用一個分區,并且使用同步發送的方式,保證數據不丟失。

2、Consumer API

Consumer 消費數據時的可靠性是很容易保證的,因為數據在 Kafka 中是持久化的,故不用擔心數據丟失問題。
由于 consumer 在消費過程中可能會出現斷電宕機等故障,consumer 恢復后,需要從故障前的位置繼續消費,所以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢復后繼續消費。所以 offset 的維護是 Consumer 消費數據是必須考慮的問題。

下面是兩個例子,分別是自動提交offset和手動提交offset

2.1 自動提交offset

1)導入依賴

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

2)代碼示例

先解釋一下用到的類:

KafkaConsumer:需要創建一個消費者對象,用來消費數據ConsumerConfig:獲取所需的一系列配置參數
ConsuemrRecord:每條數據都要封裝成一個 ConsumerRecord 對象

為了使我們能夠專注于自己的業務邏輯,Kafka 提供了自動提交offset 的功能。自動提交 offset 的相關參數:
enable.auto.commit:是否開啟自動提交 offset 功能
auto.commit.interval.ms:自動提交 offset 的時間間隔

public class MyConsumer {
    public static void main(String[] args) {

        //1. 創建消費者的配置對象
        Properties properties = new Properties();

        //2. 消費者連接的集群信息
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        //3. 反序列化消息的key和value
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");

        //4. 允許自動提交:拉取到消息就自動提交offset下標,可能造成數據丟失
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);

        //5. 自動提交的間隔為1毫秒
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1");

        //6. 設置消費者groupID
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"simon-0");

        //7. 創建消費者對象
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer(properties);

        //8. 訂閱的主題,參數是個集合,可以訂閱多個主題
        kafkaConsumer.subscribe(Arrays.asList("sencond"));

        //Tip:循環拉取消息
        while (true){
            //9. 拉取消息,并且10毫秒拉取一次
            ConsumerRecords<String,String> records = kafkaConsumer.poll(10);

            //10. 解析消息
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset	=	%d,	key	=	%s,	value = %s%n", record.offset(), record.key(), record.value());
            }
        }
   }
}

先啟動消費者,然后隨便起一個生產者,我就以我之前創建的生產者為例,消費結果如下:

關于ConsumerConfig的屬性AUTO_OFFSET_RESET_CONFIG的補充:

假如有一個消費者,消費到offset = 10消息,然后關機了。7天之后機器重啟,現在的消息的offset為1000。現在按道理來說應該從11開始消費,但是Kafka的消息默認保存消息7天,所以現在消費者持有的offset是無效的。

這時AUTO_OFFSET_RESET_CONFIG有兩個值可以選擇:earliestlatest,看一眼官方Doc:

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer's group
  • anything else: throw exception to the consumer.

Talk is cheap ,show me the code:

//1. 增加一條配置
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//2. 修改消費者分組,手動使得offset失效
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"simon-1");

先執行一次producer生成一組數據,然后再啟動consumer,可以消費到之前發送的所有數據??!

2.2 手動提交offset

前面的代碼演示了自動提交offset,每次消費完成消費者都會提交offset,下次消費從offset+1開始。但是如果關閉自動提交,那么消費完成也不會提交offset,也就是說重新開啟消費者還會從頭開始消費。

如果消費者采用自動提交,拿到數據之后就提交offset。如果處理數據的時候出現了問題,那么這個數據就丟失了。

所以,Kafka提供了兩種手動提交 offset 的方法: commitSync(同步提交)commitAsync(異步提交)。兩者相同是,都會將本次 poll 的一批數據最高的偏移量提交;不同是:commitSync阻塞當前線程,一直到提交成功,并且會自動失敗重試(由不可控因素導致,也會出現提交失敗);而 commitAsync 則沒有失敗重試機制,故有可能提交失敗。

2.2.1 同步提交

同步提交的方法為:consumer.commitSync();
同步提交只要不發生不可恢復的錯誤,會一直嘗試至提交成功,因此,會將降低程序的讀取、處理速度。

//關閉自動提交 offset
properties.put("enable.auto.commit", "false");

//加在消費完成代碼之后,消費者同步提交,當前線程會阻塞直到 offset 提交成功
consumer.commitSync();

2.2.2 異步提交offset

雖然同步提交offset更加安全可靠一點,但是它會造成線程的阻塞,直到提交成功。因此吞吐量會受到很大的影響。在更多的情況下,選用異步提交方式。

異步提交的方法為:consumer.commitAsync();
異步提交不會等待broker 的響應,而是只管發送,不管是否成功。提高了應用程序吞吐量,但下次讀取消息的遺失或重復可能性大大提升。

//關閉自動提交 offset
properties.put("enable.auto.commit", "false");

//異步提交
consumer.commitAsync(new OffsetCommitCallback() {
    @Override
    public void onComplete(Map<TopicPartition,
                           OffsetAndMetadata> offsets, Exception exception) {
        if (exception != null) {
            System.err.println("Commit failed for" +
                               offsets);
        }
    }
});

無論是同步提交還是異步提交 offset,都有可能會造成數據的漏消費或者重復消費。先提交 offset 后消費,有可能造成數據的漏消費;而先消費后提交 offset,有可能會造成數據的重復消費。

2.3自定義存儲offset

Kafka 0.9 版本之前,offset 存儲在 zookeeper,0.9 版本及之后,默認將 offset 存儲在 Kafka

的一個內置的 topic 中。除此之外,Kafka 還可以選擇自定義存儲 offset。

offset 的維護非常繁瑣,因為需要考慮到消費者的 Rebalance。

當有新的消費者加入消費者組、已有的消費者退出消費者組或者所訂閱的主題的分區發生變化,就會觸發到分區的重新分配,重新分配的過程叫做 Rebalance。

消費者發生 Rebalance 之后,每個消費者消費的分區就會發生變化。因此消費者要首先獲取到自己被重新分配到的分區,并且定位到每個分區最近提交的 offset 位置繼續消費。

要實現自定義存儲 offset,需要借助 ConsumerRebalanceListener,以下為示例代碼,其

中提交和獲取 offset 的方法,需要根據所選的 offset 存儲系統自行實現。

        //消費者訂閱主題
consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {

        //該方法會在 Rebalance 之前調用
        @Override
        public void
            onPartitionsRevoked(Collection<TopicPartition> partitions) {
            commitOffset(currentOffset);
        }
        //該方法會在 Rebalance 之后調用
        @Override
        public void
            onPartitionsAssigned(Collection<TopicPartition> partitions) {
            currentOffset.clear();
            for (TopicPartition partition : partitions) {
                //定位到最近提交的 offset 位置繼續消費
                consumer.seek(partition, getOffset(partition));
            }
        }
    });

    while (true) {
        //消費者拉取數據
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n",        record.offset(), record.key(), record.value());
            currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
        }
        //異步提交
        commitOffset(currentOffset);
    }
}
    //獲取某分區的最新 offset,比如可以mysql數據庫中獲取
    private static long getOffset(TopicPartition partition) {
        return 0;
    }
    //提交該消費者所有分區的 offset,可以將其存入到MySQL中一份
    private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
        
    }

3、自定義攔截器

3.1 攔截器原理

Producer攔截器(interceptor)是在Kafka 0.10版本被引入的,主要用于實現clients端的定制化控制邏輯。

對于producer而言,interceptor使得用戶在消息發送前以及producer回調邏輯前有機會對消息做一些定制化需求,比如修改消息等。同時,producer允許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)。Intercetpor的實現接口是org.apache.kafka.clients.producer.ProducerInterceptor

看一下結構,總共就三個方法,另外還有一個方法繼承自父類

詳細解讀一下各個方法:

  • configure(Map<String, ?> configs) :

    獲取配置信息和初始化數據時調用。

  • onSend(ProducerRecord<K, V> record)

    該方法封裝進KafkaProducer.send方法中,即它運行在用戶主線程中。Producer確保在消息被序列化以及計算分區前調用該方法。用戶可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的topic和分區,否則會影響目標分區的計算。

  • onAcknowledgement(RecordMetadata, Exception)

    該方法會在消息從RecordAccumulator成功發送到Kafka Broker之后,或者在發送過程中失敗時調用。并且通常都是在producer回調邏輯觸發之前。onAcknowledgement運行在producer的IO線程中,因此不要在該方法中放入很重的邏輯,否則會拖慢producer的消息發送效率。

  • close

    關閉interceptor,主要用于執行一些資源清理工作

3.2 攔截器案例

需求如下:實現一個簡單的雙interceptor組成的攔截鏈。第一個interceptor會在消息發送前將時間戳信息加到消息value的最前部;第二個interceptor會在消息發送后更新成功發送消息數或失敗發送消息數。

代碼如下~~:

TimeInterceptor.java

public class TimeInterceptor implements ProducerInterceptor<String,String> {

    /**
     * 在待發送的消息之前加入時間戳
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(record.topic(), record.partition(),
                record.key(), System.currentTimeMillis() + record.value());
        return producerRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> configs) {}
}

CounterInterceptor.java

public class CounterInterceptor implements ProducerInterceptor<String,String> {

    int success ;
    int error ;

    /**
     * 不改變消息的內容,直接返回
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

        if (metadata!=null){
            success++;
        }else {
            error++;
        }
    }

    /**
     * 打印發送成功和失敗的消息條數
     */
    @Override
    public void close() {
        System.out.println("success :"+success);
        System.out.println("error :"+error);
 }

    @Override
    public void configure(Map<String, ?> configs) {}
}

開啟消費者,消費數據:

看控制臺輸出 ,打印了發送成功和失敗消息的條數:



posted @ 2020-01-01 16:50  三秋葉  閱讀(...)  評論(...編輯  收藏