Streaming

Apache Kafka – Una Introducción rápida

El procesamiento de datos en streaming es ubicuo hoy en día. Para obtener resultados pertinentes, el procesamiento de grandes cantidades de datos en lotes puede tener una latencia inaceptable. Es por eso que el ser capaces de procesar la información como un flujo de datos infinito, combinando nuevos datos con datos históricos cuando sea necesario, se vuelve un escenario no solo atractivo sino indispensable. Con suficientes productores de datos, incluso el recibir los mensajes para almacenarlos se puede convertir en un problema interesante. Necesitamos herramientas que puedan escalar horizontalmente para recibir los mensajes, persistirlos de una forma durable, y ponerlos a disposición de otros componentes de una forma determinística.

Uno de los protagonistas del ecosistema de ingeniería de datos, en particular en estos casos (fast-data architectures) es Apache Kafka. Apache Kafka se define como una plataforma, de código abierto, distribuida de flujo de eventos.

El rol de Kafka en estas arquitecturas es crucial: recibir de una forma escalable grandes cantidades de mensajes de fuentes diversas, unificando la interfaz a utilizar por los consumidores, quienes pueden consumir los mensajes con muy baja latencia. Todo esto basado en un concepto simple: el patrón publicador/suscriptor.

Antes de entrar en más detalle, resumamos las características principales de Apache Kafka:

  • Baja latencia
  • Escalabilidad
  • Alta disponibilidad
  • Durabilidad

A muy alto nivel, Kafka es un sistema distribuido que puede tener un cluster de uno o más servidores (que pueden ser brokers o nodos de kafka connect) y clientes (productores/consumidores). Los productores envían mensajes sobre un tema (topic) a los brokers y los consumidores leen los mensajes de los brokers.

Los mensajes de un tema puede estar distribuidos en varias particiones, y cada partición es un conjunto de brokers. Los consumidores también pueden estar distribuidos, formando grupos de consumidores que en conjunto ven todos los mensajes, pero en los cuales cada nodo sólo ve un subconjunto de ellos.

En un momento dado cada consumidor puede estar leyendo mensajes de una sola partición. Las particiones están ordenadas (orden total). Eso hace que cada consumidor sólo necesite recordar el offset del mensaje que consumió para poder continuar desde esa posición más adelante.

Un ejemplo rápido

Para ejecutar este pequeño ejemplo necesitas docker/docker compose instalado.

Iniciemos con nuestra infraestructura. Tendremos un broker de Kafka, un nodo de Zookeeper (como coordinador), un cluster de Apache Spark con dos ejecutores y un master, un servidor de almacenamiento de objetos (MinIO), y un nodo con JupyterLab, para ejecutar nuestros notebooks.

version: '3'
services:
kafka:
image: confluentinc/cp-kafka
ports:
– "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
– zookeeper
zookeeper:
image: confluentinc/cp-zookeeper
environment:
– ZOOKEEPER_CLIENT_PORT=2181
ports:
– "2181:2181"
spark-executor:
image: spark:3.5.1-scala2.12-java11-ubuntu
environment:
SPARK_MASTER_HOST: spark-master
SPARK_MASTER_PORT: 7077
command: spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
volumes:
– ./shared_folder:/workspace/data
deploy:
replicas: 2
depends_on:
– spark-master
spark-master:
image: spark:3.5.1-scala2.12-java11-ubuntu
environment:
SPARK_MASTER_HOST: spark-master
SPARK_MASTER_PORT: 7077
SPARK_LOCAL_IP: spark-master
command: spark-class org.apache.spark.deploy.master.Master
volumes:
– ./shared_folder:/workspace/data
spark-notebook:
build:
context: .
dockerfile_inline: |
FROM quay.io/jupyter/pyspark-notebook:spark-3.5.1
#set password for jupyter
RUN echo '{"IdentityProvider": {"hashed_password": "argon2:$$argon2id$$v=19$$m=10240,t=10,p=8$$zUYc32oQmbROa0YxSdntdw$$5hOMyxMMdml9/pM1Jc8A1GNMhi1d3cEEZiBW3KjJhCY"}}' >> /home/jovyan/.jupyter/jupyter_server_config.json && \
chmod 600 /home/jovyan/.jupyter/jupyter_server_config.json
ports:
– "4040:4040"
– "8888:8888"
– "38889:38889"
– "7777:7777"
volumes:
– ./:/home/jovyan/work
minio-server:
image: quay.io/minio/minio
command: server /data –console-address ":9001"
ports:
– "9000:9000"
– "9001:9001"
environment:
MINIO_ROOT_USER: minio
MINIO_ROOT_PASSWORD: minio2024

Ahora iniciamos la infraestructura usando docker compose up.

Ahora jupyter lab debe estar disponible en http://localhost:8888 y podemos ingresar usando la contraseña configurada `kafka-demo`.

Ahora crearemos un producer para kafka. Existen varias librerías para kafka en python, usualmente la que recomiendo para productores-consumidores simples es `kafka-python`, pero al momento de escribir esta entrada, la versión actual publicada no funciona con versiones recientes de python, así que podemos usar una distribución alternativa kafka-python-ng.

Para crear el productor sólo necesitamos crear un cliente, especificando los brokers (en este caso sólo tenemos uno) y enviar mensajes al tema al que queremos publicar.

Crear un consumidor sigue el mismo patrón, debemos indicar los servidores, el tema y el offset (opcional).

Y por último, para tener un ejemplo un poco más complejo, consumamos los mensajes usando pyspark y almacenemos la salida en s3. Primero crea el bucket test-bucket usando la interfaz de minio que estará disponible en http://localhost:9001 puede usar el usuario minio y la contraseña minio2024 configurada en el docker compose.

Configuramos la sesión de spark, incluyendo las dependencias (spark-sql-kafka para kafka, y hadoop-aws para s3), el endpoint de minio (minio-server:9000) y las credenciales.

¡Importante! debe cambiar la IP del driver por la IP del host.

Creamos nuestro dataframe inicial de spark leyendo desde el topic de kafka, usando structured streaming:

Realizamos las transformaciones necesarias y escribimos a minio:

Este es un proceso de streaming, así que se estará ejecutando cada minuto hasta que lo detengamos (e.g. matando el driver, o usando stop()).

Ahora, queda como tarea al lector experimentar con los diversos componentes (creando nuevos mensajes y temas) y explorar Spark Structured Streaming. Los recursos utilizados en esta entrada están disponibles como un gist en github.

Apache Kafka log

Introducción a Apache Spark: Desarrollo de una aplicación

Spark

Apache Spark es un framework de computación en paralelo, que promete velocidades hasta 100 veces mayores a las de Hadoop Map Reduce. Puede correr de manera local (en uno o varios hilos) o en cluster sobre Apache Mesos, Hadoop YARN, o en modo Standalone.

En este pequeño ejemplo crearemos un proyecto en eclipse con las dependencias necesarias para Spark y desarrollaremos un contador de palabras. Spark tiene API nativas para Scala, Python y Java. En este ejemplo usaremos Java 8.

Lo primero que haremos será crear un proyecto Maven en Eclipse:

Nuevo proyecto Maven en Eclipse Luna

En este caso seleccionamos el arquetipo por defecto, e introducimos el id del grupo y el id del artefacto que deseemos para nuestra aplicación.

El siguiente paso es añadir las dependencias al archivo pom.xml, así:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>uniandes.bigdata</groupId>
 <artifactId>sparkDemo</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <packaging>jar</packaging>

 <name>sparkDemo</name>
 <url>http://maven.apache.org</url>

 <properties>
 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 </properties>

 <dependencies>
 <dependency>
 <groupId>junit</groupId>
 <artifactId>junit</artifactId>
 <version>3.8.1</version>
 <scope>test</scope>
 </dependency>
 <dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-core_2.10</artifactId>
 <version>1.0.1</version>
 <scope>provided</scope>
 </dependency>
 </dependencies>
</project>

La dependencia de spark tiene groupId org.apache.spark, artifactId spark-core_2.10 y versión 1.0.1. Utilizaremos el scope provided para que no se empaqueten las librerías en el jar que generaremos.

Nuestra clase WordCount tendrá la siguiente estructura,

package uniandes.bigdata.sparkDemo;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;

import scala.Tuple2;

/**
 * WordCount en Spark
 *
 */
public class WordCount
{
    /**
     * Recibe como parámetro la ruta del archivo de entrada.
     * Puede ser un archivo local o una ruta hdfs (hdfs://...)
     * */
    public static void main( String[] args )
    {
        String file=null;
        /**Recibir como parámetro el nombre del archivo
         **/
        if(args.length>0){
            file=args[0];
        }else{
            System.err.println("No se puede ejecutar sin un archivo");
            return;
        }
        //El nombre de la aplicación:
        SparkConf conf = new SparkConf().setAppName("PruebaSpark");
        //La aplicación se puede ejecutar en local para probar en desarrollo.
        //Cuando se ejecuta desde spark (usando spark-submit)
        //no se debe especificar el master:

        //conf.setMaster("local[*]");//Descomentar esta línea para probar localmente
        JavaSparkContext sc = new JavaSparkContext(conf);

        /*Lee un archivo de texto en el RDD. El archivo puede ser local, estar ubicado en el hdfs
          o en cualquier fuente soportada por Hadoop
        */
        JavaRDD<String> texto=sc.textFile(file);

        //Utilizamos las funciones de Spark, junto con las expresiones lambda de Java 8,
        //para crear una colección de palabras (usando flatMap),
        //crear un mapa con la palabra como llave y 1 como valor (mapToPair),
        //y reducirlo por llave de tal forma que obtenemos un mapa con la palabra como llave
        //y el número de apariciones como valor (reduceByKey)

        JavaPairRDD<String, Integer> byKey = texto.flatMap(l->Arrays.asList(l.split("\\s")))
                    .mapToPair(s->new Tuple2<String, Integer>(s.toLowerCase(),1))
                    .reduceByKey((a,b)->a + b);

        //Guardamos el resultado, como archivo de texto

        byKey.saveAsTextFile("salida");
    }
}

La aplicación se puede probar localmente descomentando la línea conf.setMaster(“local[*]”);. La cadena local[*] indica que se ejecute con tantos hilos como procesadores en la máquina local. Podemos especificar el número de hilos (e.g. local[2]), dar la ruta a un servidor Mesos o Spark, o especificar que use Hadoop YARN (usando “yarn-client” o “yarn-cluster”). Cuando se va a ejecutar la aplicación usando spark-submit, no se debe especificar en la aplicación la url del máster (se puede pasar como parámetro a spark-submit).

Instalar spark y ejecutar la aplicación

La instalación de spark se limita a descargar y descomprimir el paquete indicado. Si se desea usar Yarn, es necesario descargar los binarios compilados con compatibilidad para Hadoop 2, y asegurarse que las variables de entorno HADOOP_HOME, HADOOP_CONF y YARN_CONF están correctamente establecidas.

Después de generar el jar usando maven, podemos ejecutar la aplicación usando spark-submit:

spark-submit --verbose --master local --class uniandes.bigdata.sparkDemo.WordCount "rutaAljar\sparkDemo-0.0.1-SNAPSHOT.jar" "rutaAlArchivo\test.txt"

Un problema común en Windows, si no se tiene instalada la distribución de Hadoop de Hortonworks, es que la aplicación falle porque Spark no encuentra winutils.exe, que se debería encontrar en la carpeta %HADOOP_HOME%/bin. Este archivo se puede descargar del repositorio de HDP, como indican en uno de los foros de Azure.

 

Conclusiones

El no estar limitado al paradigma MapReduce, el uso de RDD (Resilient Distributed Dataset), junto con el uso de expresiones lambda (Java 8, Scala y python), hacen que el proceso de escribir una aplicación para Spark sea intuitivo. Algunas de las características de Spark se ven cuando se ejecutan varias tareas sobre un mismo dataset, dado que puede compartir datos en memoria entre diferentes trabajos.

Spark es una alternativa interesante, especialmente en aplicaciones que requieren iteraciones y reuso de los datos (como el análisis de grafos y aprendizaje de máquina). Proyectos como GraphX  y Spark Streaming, hacen aún más interesante este framework.