Cómo usar Kafka Streams para el procesamiento de datos con estado y sin estado

Introducción

Kafka Streams es una biblioteca de Java para crear aplicaciones de procesamiento de flujo con Apache Kafka. Se puede usar o integrar en aplicaciones Java para procesar datos de transmisión en temas de Kafka. Es una biblioteca independiente que solo depende de Kafka y la usa como base para una alta disponibilidad y confiabilidad.

Kafka Streams tiene dos tipos de API:

  1. Streams DSL: una API de alto nivel

  2. Procesador: una API de bajo nivel

Esta guía cubre la API DSL de flujos de alto nivel, que proporciona un modelo de programación funcional para escribir topologías de procesamiento de flujo de manera concisa con unas pocas líneas de código. La API Streams DSL ofrece muchas abstracciones, como KStreams , KTable etc.

Una forma de desglosarlo es categorizar la funcionalidad que ofrecen estas API de la siguiente manera:

  1. operaciones sin estado

  2. operaciones con estado

Esta guía incluirá ejemplos de código para demostrar operaciones sin estado como map y filter y cálculos con estado como aggregate y count .

Funciones sin estado de Kafka Streams

El seguimiento KStream Los métodos se han cubierto en esta sección:

  • mapa

  • filtrar

  • agrupar por y agrupar por clave

  • a través y a

  • imprimir y mirar

  • unir

mapa

map puede transformar registros individuales en un KStream aplicando una función. Se puede utilizar para transformar tanto la clave como el valor. Si solo desea transformar el valor, use mapValues método. los flatMap El método puede devolver múltiples registros ( KeyValue ).

Veamos algunos ejemplos.

map se puede utilizar para convertir la clave y el valor de cada KStream grabar en minúsculas String :

                      
                        KStream<String, String> words = builder.stream("words");

words.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {

    @Override

    public KeyValue<String, String> apply(String key, String val) {

            return new KeyValue<>(key.toLowerCase(), val.toLowerCase());

        }

    });

                      
                    

O simplemente usa mapValues para trabajar solo con valores:

                      
                        words.mapValues(new ValueMapper<String, String>() {

    @Override

    public String apply(String val) {

            return val.toLowerCase();

        }

    });

                      
                    

Usando flatMapValues puede desglosar aún más un valor en una colección de valores.

                      
                        stream.flatMap(new KeyValueMapper<String, String, Iterable<? extends KeyValue<? extends String, ? extends String>>>() {

    @Override

    public Iterable<? extends KeyValue<? extends String, ? extends String>> apply(String key, String val) {

        String[] values = val.split(",");

        return Arrays.asList(values)

                    .stream()

                    .map(value -> new KeyValue<>(key, value))

                    .collect(Collectors.toList());

            }

    })

                      
                    

filtrar

Generalmente, el filter método sólo incluye registros en un KStream que cumplen un criterio específico. Se complementa con el filterNot método que se utiliza para excluir registros. En ambos casos, el criterio de filtración se define mediante un Predicate objeto.

Para examplepara procesar únicamente transacciones de un tipo de tarjeta de crédito específico:

                      
                        KStream<String, String> transactions = builder.stream("user-transactions");

transactions.filter(new Predicate<String, Transaction>() {

    @Override

    public boolean test(String userID, Transaction tx) {

            return tx.cardType().equals("VISA");

        }

    })

                      
                    

Para ignorar/excluir a todos los usuarios que no hayan configurado su contraseña:

                      
                        KStream<String, String> users = builder.stream("users");

users.filterNot(new Predicate<String, User>() {

    @Override

    public boolean test(String userID, User user) {

            return user.isPwdSet();

        }

    })

                      
                    

grupo

Las operaciones de agrupación se utilizan a menudo para convertir el contenido de un KStream a un KGroupedStream para realizar cálculos con estado (cubiertos más adelante en esta guía). Esto se puede lograr usando groupByKey o una mas genérica group método.

Durante el uso groupByKey es sencillo, tenga en cuenta que un KeyValueMapper se puede usar con groupBy para usar una clave diferente. Para examplepuede usarlo para agrupar transacciones de usuarios según el tipo de tarjeta:

                      
                        KStream<String, User> transactions = builder.stream("transactions");



KGroupedStream<String, String> grouped = transactions.groupBy(new KeyValueMapper<String, User, String>() {

            @Override

            public String apply(String txID, User user) {

                return user.getCardType();

            }

    });

                      
                    

hacia y a través

los to El método es diferente de algunas de las otras operaciones que ha encontrado hasta ahora. Es bastante simple, pero muy poderoso ya que nos permite materializar (almacenar) KStream registros a otro tema en Kafka, ¡solo con una simple llamada de método!

devuelve un void resultado en lugar de un KStream (o KTable ) – estos tipos de operaciones también se conocen como métodos de terminal.

En esto exampletodas las palabras en minúsculas se envían a un tema llamado lowercase-words después de ser transformado usando mapValues operación (que se cubrió anteriormente).

                      
                        KStream<String, String> words = builder.stream("words");



words.mapValues(new ValueMapper<String, String>() {

    @Override

    public String apply(String val) {

            return val.toLowerCase();

        }

    })

    .to("lowercase-words");

                      
                    

through es otra operación simple pero poderosa que a menudo se usa para complementar el to método mientras se construyen canalizaciones de transmisión. Para continuar con lo anterior example – Digamos, después de almacenar todas las palabras en minúsculas en un tema, debe eliminar todas las palabras que tienen un carácter específico (por ejemplo, un guión - ) y almacenar los resultados finales en otro tema de Kafka. En lugar de usar to método y creando un nuevo KStream del tema lowercase-words es posible simplificar el código de esta manera:

                      
                        KStream<String, String> words = builder.stream("words");



words.mapValues(new ValueMapper<String, String>() {

    @Override

    public String apply(String val) {

            return val.toLowerCase();

        }

    })

    .through("lowercase-words")

    .filter(new Predicate<String, String>() {

        @Override

        public boolean test(String k, String v) {

                return v.contains("-");

            }

        })

    .to("processed-words");

                      
                    

imprimir y mirar

Si desea registrar el KStream registros (para fines de depuración), print es un método práctico (también es una operación de terminal, al igual que to ). También es posible configurar el comportamiento de este método usando un Printed objeto (que print acepta).

Para examplepara registrar los valores de un KStream al terminal de salida estándar:

                      
                        KStream<String, String> words = builder.stream("words");



words.mapValues(new ValueMapper<String, String>() {

    @Override

    public String apply(String val) {

            return val.toLowerCase();

        }

    })

    .print(Printed.withLabel("demo").toSysOut());

                      
                    

los peek método es similar a print en términos de funcionalidad, pero no es una operación de terminal. En su lugar, permite que la persona que llama utilice un ForeachAction para definir la acción específica y devuelve la misma KStream instancia.

En esto examplesimplemente registramos la clave y el valor para estandarizar:

                      
                        KStream<String, String> words = builder.stream("words");



words.mapValues(new ValueMapper<String, String>() {

    @Override

    public String apply(String val) {

            return val.toLowerCase();

        }

    })

    .peek(new ForeachAction<String, String>() {

        @Override

        public void apply(String k, String v) {

                System.out.println("key is "+k+", value is "+v);

            }

        })

    .to("lowercase-words");

                      
                    

unir

Si tiene dos flujos y necesita combinarlos, use merge .

                      
                        KStream<String, String> fte = builder.stream("fte");

KStream<String, String> contractor = builder.stream("contractors");



fte.merge(contractor).to("all-employees");

                      
                    

Funciones con estado de Kafka Streams

Esta sección cubrirá las operaciones de agregación ( aggregate , count y reduce ) junto con una descripción general de Windowing en Kafka Streams. Un efecto secundario de todas estas operaciones es el “estado” (de ahí el nombre de operaciones con estado), y es importante comprender dónde se almacena y cómo se administra.

El estado asociado con estas operaciones se almacena en “almacenes de estado” locales, ya sea en la memoria o en el disco. La “localidad de datos” hace que el procesamiento sea mucho más eficiente. También puede configurar su aplicación para que los datos de este almacén de estado también se envíen a temas de Kafka. Esto es importante para la alta disponibilidad y la tolerancia a fallas, ya que los datos se pueden restaurar desde Kafka en caso de problemas o bloqueos de la aplicación.

Repasemos algunas de estas operaciones con estado.

contar

KGroupedStream apoya esta operación. Hace que sea conveniente contar el número de registros de una clave específica usando esta operación usando un solo método.

Continuando con el groupBy example presentado anteriormente. Una vez que agrupamos las transacciones por tipo de tarjeta, simplemente podemos usar count para obtener el número de transacciones para cada tipo de tarjeta.

                      
                        KStream<String, User> transactions = builder.stream("transactions");



KGroupedStream<String, String> grouped = transactions.groupBy(new KeyValueMapper<String, User, String>() {

            @Override

            public String apply(String txID, User user) {

                return user.getCardType();

            }

    });



KTable<String, Long> txPerCardType = grouped.count();

                      
                    

Para almacenar este estado (recuento) localmente, el recuento acepta una instancia de Materialized que se puede utilizar de la siguiente manera:

                      
                        KTable<String, Long> txPerCardType = grouped.count(Materialized.as("tx-per-card-type"));

                      
                    

agregar

aggregate es útil cuando se ejecutan cálculos como promedios móviles sobre un conjunto de datos de transmisión. Esto requiere que se maneje el estado y debe tener en cuenta el valor actual y el valor actual del agregado calculado.

Una buena manera de entender aggregate es realmente usarlo para implementar count operación. Cuando se recibe el primer registro, el Initializer se utiliza para inicializar el estado (en este example, el recuento se establece en cero) y se invoca con el primer registro. Después de eso, el Aggregator toma el control – En este examplecada vez que se recibe un registro, el recuento actual se incrementa en uno.

                      
                            KStream<String, String> stream = builder.stream("transactions");



    KTable<String, Result> aggregate = stream.groupByKey()

            .aggregate(new Initializer<Result>() {

                @Override

                public Result apply() {

                    return new Result("", 0);

                }

            }, new Aggregator<String, String, Result>() {

                @Override

                public Result apply(String k, String v, Result count) {

                    Integer currentCount = count.getCount();

                    return new Result(k, currentCount + 1);

                }

            });

                      
                    

reducir

reduce La operación se puede utilizar para combinar flujos de valores e implementar sum , min , max etc. Puedes pensar en aggregate operación como una versión genérica de reduce .

Ventanas con Kafka Streams

Para exampleun requisito común para el análisis de sitios web es tener métricas sobre la cantidad de vistas de página únicas por hora, clics por minuto, etc. Windowing le permite limitar las operaciones de procesamiento de flujo para que se ejecuten dentro de un intervalo de tiempo.

Las ventanas de tiempo admitidas incluyen: ventanas de tiempo de deslizamiento, volteo, salto y basadas en sesiones.

Para contar vistas de página únicas por hora, puede usar una ventana de tiempo de rotación de 60 minutos. Por lo tanto, las visitas a la página de un producto desde la 1 p. m. hasta las 2 p. m. se agregarán y se iniciará un nuevo bloque de tiempo después de eso. Aquí hay un example de cómo podría lograr esto:

                      
                        KStream<Product, Long> views = builder.stream("product-views");



views.groupByKey()

    .windowedBy(SessionWindows.with(Duration.ofMinutes(60)))

    .toStream()

    .to("views-per-hour");

                      
                    

Conclusión

Esta guía proporcionó una introducción a Kafka Streams y el tipo de API. Esto fue seguido por la cobertura de operaciones Stateless y Stateful de uso común, junto con ejemplos. Puedes referirte a la Kafka transmite Javadocs y Documentación de Kafka para leer más.

Título del artículo Nombre (opcional) Correo electrónico (opcional) Descripción

Enviar sugerencia

Related Posts