El procesamiento de datos en streaming es ubicuo hoy en día. Para obtener resultados pertinentes, el procesamiento de grandes cantidades de datos en lotes puede tener una latencia inaceptable. Es por eso que el ser capaces de procesar la información como un flujo de datos infinito, combinando nuevos datos con datos históricos cuando sea necesario, se vuelve un escenario no solo atractivo sino indispensable. Con suficientes productores de datos, incluso el recibir los mensajes para almacenarlos se puede convertir en un problema interesante. Necesitamos herramientas que puedan escalar horizontalmente para recibir los mensajes, persistirlos de una forma durable, y ponerlos a disposición de otros componentes de una forma determinística.
Uno de los protagonistas del ecosistema de ingeniería de datos, en particular en estos casos (fast-data architectures) es Apache Kafka. Apache Kafka se define como una plataforma, de código abierto, distribuida de flujo de eventos.
El rol de Kafka en estas arquitecturas es crucial: recibir de una forma escalable grandes cantidades de mensajes de fuentes diversas, unificando la interfaz a utilizar por los consumidores, quienes pueden consumir los mensajes con muy baja latencia. Todo esto basado en un concepto simple: el patrón publicador/suscriptor.
Antes de entrar en más detalle, resumamos las características principales de Apache Kafka:
- Baja latencia
- Escalabilidad
- Alta disponibilidad
- Durabilidad
A muy alto nivel, Kafka es un sistema distribuido que puede tener un cluster de uno o más servidores (que pueden ser brokers o nodos de kafka connect) y clientes (productores/consumidores). Los productores envían mensajes sobre un tema (topic) a los brokers y los consumidores leen los mensajes de los brokers.
Los mensajes de un tema puede estar distribuidos en varias particiones, y cada partición es un conjunto de brokers. Los consumidores también pueden estar distribuidos, formando grupos de consumidores que en conjunto ven todos los mensajes, pero en los cuales cada nodo sólo ve un subconjunto de ellos.
En un momento dado cada consumidor puede estar leyendo mensajes de una sola partición. Las particiones están ordenadas (orden total). Eso hace que cada consumidor sólo necesite recordar el offset del mensaje que consumió para poder continuar desde esa posición más adelante.
Un ejemplo rápido
Para ejecutar este pequeño ejemplo necesitas docker/docker compose instalado.
Iniciemos con nuestra infraestructura. Tendremos un broker de Kafka, un nodo de Zookeeper (como coordinador), un cluster de Apache Spark con dos ejecutores y un master, un servidor de almacenamiento de objetos (MinIO), y un nodo con JupyterLab, para ejecutar nuestros notebooks.
Ahora iniciamos la infraestructura usando docker compose up
.
Ahora jupyter lab debe estar disponible en http://localhost:8888 y podemos ingresar usando la contraseña configurada `kafka-demo`.
Ahora crearemos un producer para kafka. Existen varias librerías para kafka en python, usualmente la que recomiendo para productores-consumidores simples es `kafka-python`, pero al momento de escribir esta entrada, la versión actual publicada no funciona con versiones recientes de python, así que podemos usar una distribución alternativa kafka-python-ng
.
Para crear el productor sólo necesitamos crear un cliente, especificando los brokers (en este caso sólo tenemos uno) y enviar mensajes al tema al que queremos publicar.
Crear un consumidor sigue el mismo patrón, debemos indicar los servidores, el tema y el offset (opcional).
Y por último, para tener un ejemplo un poco más complejo, consumamos los mensajes usando pyspark y almacenemos la salida en s3. Primero crea el bucket test-bucket usando la interfaz de minio que estará disponible en http://localhost:9001 puede usar el usuario minio y la contraseña minio2024 configurada en el docker compose.
Configuramos la sesión de spark, incluyendo las dependencias (spark-sql-kafka para kafka, y hadoop-aws para s3), el endpoint de minio (minio-server:9000) y las credenciales.
¡Importante! debe cambiar la IP del driver por la IP del host.
Creamos nuestro dataframe inicial de spark leyendo desde el topic de kafka, usando structured streaming:
Realizamos las transformaciones necesarias y escribimos a minio:
Este es un proceso de streaming, así que se estará ejecutando cada minuto hasta que lo detengamos (e.g. matando el driver, o usando stop()).
Ahora, queda como tarea al lector experimentar con los diversos componentes (creando nuevos mensajes y temas) y explorar Spark Structured Streaming. Los recursos utilizados en esta entrada están disponibles como un gist en github.