如何使用 Kafka Streams 進行有狀態和無狀態數據處理

介紹

Kafka Streams 是一個 Java 庫,用於使用 Apache Kafka 構建流處理應用程序。 它可以用於或集成到 Java 應用程序中以處理 Kafka 主題中的流數據。 它是一個獨立的庫,僅依賴於 Kafka 並將其作為高可用性和可靠性的基礎。

Kafka Streams 有兩種類型的 API:

  1. DSL 流:高級 API

  2. 處理器:低級 API

本指南涵蓋高級 Stream DSL API,它提供了一個函數式編程模型,只需幾行代碼即可簡潔地編寫流處理拓撲。 Streams DSL API 提供了許多抽象,例如 KStreams , KTable 等等

分解它的一種方法是將這些 API 提供的功能分類如下:

  1. 無狀態操作

  2. 狀態操作

本指南將包括演示無狀態操作的代碼示例,例如 map filter 和計算狀態為 aggregate count .

Kafka Streams 無狀態函數

監控 KStream 本節介紹了這些方法:

  • 地圖

  • 篩選

  • 分組依據和按鍵分組

  • 通過已經

  • 打印並觀看

  • 關聯

地圖

map 您可以將單個記錄轉換為 KStream 應用一個函數。 它可用於轉換鍵和值。 如果您只想轉換值,請使用 mapValues 方法。 這 flatMap 該方法可以返回多條記錄( KeyValue ).

讓我們看一些例子。

map 可用於轉換每個的鍵和值 KStream 雕刻小寫字母 String :

                      
                        KStream<String, String> words = builder.stream("words");

words.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {

    @Override

    public KeyValue<String, String> apply(String key, String val) {

            return new KeyValue<>(key.toLowerCase(), val.toLowerCase());

        }

    });

                      
                    

或者只是使用 mapValues 僅使用值:

                      
                        words.mapValues(new ValueMapper<String, String>() {

    @Override

    public String apply(String val) {

            return val.toLowerCase();

        }

    });

                      
                    

穿著 flatMapValues 您可以進一步將值分解為值的集合。

                      
                        stream.flatMap(new KeyValueMapper<String, String, Iterable<? extends KeyValue<? extends String, ? extends String>>>() {

    @Override

    public Iterable<? extends KeyValue<? extends String, ? extends String>> apply(String key, String val) {

        String[] values = val.split(",");

        return Arrays.asList(values)

                    .stream()

                    .map(value -> new KeyValue<>(key, value))

                    .collect(Collectors.toList());

            }

    })

                      
                    

篩選

一般來說, filter 方法僅包括記錄在 KStream 滿足特定條件的。 它輔以 filterNot 用於排除記錄的方法。 在這兩種情況下,過濾條件都由 Predicate 目的。

例如,只處理特定信用卡類型的交易:

                      
                        KStream<String, String> transactions = builder.stream("user-transactions");

transactions.filter(new Predicate<String, Transaction>() {

    @Override

    public boolean test(String userID, Transaction tx) {

            return tx.cardType().equals("VISA");

        }

    })

                      
                    

忽略/排除所有未設置密碼的用戶:

                      
                        KStream<String, String> users = builder.stream("users");

users.filterNot(new Predicate<String, User>() {

    @Override

    public boolean test(String userID, User user) {

            return user.isPwdSet();

        }

    })

                      
                    

分組操作常用於轉換一個 KStream 仍然 KGroupedStream 執行有狀態計算(本指南稍後介紹)。 這可以通過使用來實現 groupByKey 或更通用的 group 方法。

使用過程中 groupByKey 很簡單,請記住 KeyValueMapper 可以與 groupBy 使用不同的密鑰。 例如,您可以使用它根據卡類型對用戶交易進行分組:

                      
                        KStream<String, User> transactions = builder.stream("transactions");



KGroupedStream<String, String> grouped = transactions.groupBy(new KeyValueMapper<String, User, String>() {

            @Override

            public String apply(String txID, User user) {

                return user.getCardType();

            }

    });

                      
                    

到和通過

to 該方法不同於您目前遇到的其他一些操作。 它非常簡單,但非常強大,因為它允許我們實現(存儲) KStream 記錄到 Kafka 中的另一個主題,只需一個簡單的方法調用!

返回一個 void 結果而不是 KStream (任何一個 KTable ) – 這些類型的操作也稱為終端方法。

在這個例子中,所有小寫單詞都被發送到一個名為 lowercase-words 改造後使用 mapValues 操作(上面已介紹)。

                      
                        KStream<String, String> words = builder.stream("words");



words.mapValues(new ValueMapper<String, String>() {

    @Override

    public String apply(String val) {

            return val.toLowerCase();

        }

    })

    .to("lowercase-words");

                      
                    

through 是另一個簡單但功能強大的操作,通常用於補充 to 建設傳輸管道時的方法。 從上面的例子繼續——比如說,在一個主題中存儲了所有小寫單詞之後,你需要刪除所有具有特定字符的單詞(例如,一個連字符 - ) 並將最終結果存儲在另一個 Kafka 主題中。 而不是使用 to 方法並創建一個新的 KStream 關於話題 lowercase-words 可以通過這種方式簡化代碼:

                      
                        KStream<String, String> words = builder.stream("words");



words.mapValues(new ValueMapper<String, String>() {

    @Override

    public String apply(String val) {

            return val.toLowerCase();

        }

    })

    .through("lowercase-words")

    .filter(new Predicate<String, String>() {

        @Override

        public boolean test(String k, String v) {

                return v.contains("-");

            }

        })

    .to("processed-words");

                      
                    

打印並觀看

如果你想註冊 KStream 日誌(用於調試目的), print 是一個方便的方法(它也是一個終端操作,就像 to ). 也可以使用 Printed 對象(那個 print 你接受)。

比如記錄a的值 KStream 到標準輸出終端:

                      
                        KStream<String, String> words = builder.stream("words");



words.mapValues(new ValueMapper<String, String>() {

    @Override

    public String apply(String val) {

            return val.toLowerCase();

        }

    })

    .print(Printed.withLabel("demo").toSysOut());

                      
                    

peek 方法類似於 print 在功能方面,但它不是終端操作。 相反,它允許調用者使用 ForeachAction 定義特定的動作並返回相同的 KStream 實例。

本例中我們簡單記錄key和value進行標準化:

                      
                        KStream<String, String> words = builder.stream("words");



words.mapValues(new ValueMapper<String, String>() {

    @Override

    public String apply(String val) {

            return val.toLowerCase();

        }

    })

    .peek(new ForeachAction<String, String>() {

        @Override

        public void apply(String k, String v) {

                System.out.println("key is "+k+", value is "+v);

            }

        })

    .to("lowercase-words");

                      
                    

關聯

如果您有兩個流並需要合併它們,請使用 merge .

                      
                        KStream<String, String> fte = builder.stream("fte");

KStream<String, String> contractor = builder.stream("contractors");



fte.merge(contractor).to("all-employees");

                      
                    

Kafka Streams 有狀態函數

本節將介紹聚合操作( aggregate , count reduce ) 以及 Kafka Streams 上的窗口概述。 所有這些操作的副作用是“狀態”(因此稱為有狀態操作),了解它的存儲位置和管理方式很重要。

與這些操作相關的狀態存儲在本地“狀態存儲”中,在內存或磁盤中。 “數據局部性”使處理更加高效。 您還可以配置您的應用程序,以便將此狀態存儲中的數據也發送到 Kafka 主題。 這對於高可用性和容錯性很重要,因為在應用程序崩潰或出現問題時可以從 Kafka 恢復數據。

讓我們回顧一下其中的一些有狀態操作。

數數

KGroupedStream 支持這個操作。 使用單一方法使用此操作可以方便地計算特定鍵的記錄數。

繼續 groupBy 上面給出的例子。 一旦我們按卡類型對交易進行分組,我們就可以簡單地使用 count 獲取每種卡的交易次數。

                      
                        KStream<String, User> transactions = builder.stream("transactions");



KGroupedStream<String, String> grouped = transactions.groupBy(new KeyValueMapper<String, User, String>() {

            @Override

            public String apply(String txID, User user) {

                return user.getCardType();

            }

    });



KTable<String, Long> txPerCardType = grouped.count();

                      
                    

為了在本地存儲這個狀態(計數),計數接受一個實例 Materialized 可以按如下方式使用:

                      
                        KTable<String, Long> txPerCardType = grouped.count(Materialized.as("tx-per-card-type"));

                      
                    

添加

aggregate 在流式數據集上運行計算(例如移動平均線)時很有用。 這需要處理狀態並且必須考慮當前值和計算聚合的當前值。

一個很好的理解方式 aggregate 實際上是用它來實現 count 手術。 當收到第一條記錄時, Initializer 用於初始化狀態(在此示例中,計數設置為零)並使用第一條記錄調用。 在那之後, Aggregator take control – 在此示例中,每次收到記錄時,當前計數都會遞增 1。

                      
                            KStream<String, String> stream = builder.stream("transactions");



    KTable<String, Result> aggregate = stream.groupByKey()

            .aggregate(new Initializer<Result>() {

                @Override

                public Result apply() {

                    return new Result("", 0);

                }

            }, new Aggregator<String, String, Result>() {

                @Override

                public Result apply(String k, String v, Result count) {

                    Integer currentCount = count.getCount();

                    return new Result(k, currentCount + 1);

                }

            });

                      
                    

減少

reduce 該操作可用於組合價值流並實施 sum , min , max 等等 你可以想到 aggregate 作為通用版本操作 reduce .

帶有 Kafka 流的 Windows

例如,網站分析的一個常見要求是對每小時唯一頁面瀏覽量、每分鐘點擊次數等進行度量。 Windowing 允許您限制流處理操作在一個時間間隔內運行。

支持的時間窗包括:滑動、翻轉、跳躍和基於會話的時間窗。

要計算每小時的唯一頁面瀏覽量,您可以使用 60 分鐘的輪換時間窗口。 因此,將添加從下午 1 點到下午 2 點的產品頁面瀏覽量,然後將開始一個新的時間段。 以下是您如何實現這一目標的示例:

                      
                        KStream<Product, Long> views = builder.stream("product-views");



views.groupByKey()

    .windowedBy(SessionWindows.with(Duration.ofMinutes(60)))

    .toStream()

    .to("views-per-hour");

                      
                    

結論

本指南介紹了 Kafka Streams 和 API 的類型。 隨後介紹了常用的無狀態和有狀態操作以及示例。 你可以參考Kafka 流式傳輸 Javadocs卡夫卡文檔閱讀更多。

文章標題 名稱(可選) 電子郵件(可選) 描述

發送建議

相關文章