Introducción a Apache Spark: Desarrollo de una aplicación

Spark

Apache Spark es un framework de computación en paralelo, que promete velocidades hasta 100 veces mayores a las de Hadoop Map Reduce. Puede correr de manera local (en uno o varios hilos) o en cluster sobre Apache Mesos, Hadoop YARN, o en modo Standalone.

En este pequeño ejemplo crearemos un proyecto en eclipse con las dependencias necesarias para Spark y desarrollaremos un contador de palabras. Spark tiene API nativas para Scala, Python y Java. En este ejemplo usaremos Java 8.

Lo primero que haremos será crear un proyecto Maven en Eclipse:

Nuevo proyecto Maven en Eclipse Luna

En este caso seleccionamos el arquetipo por defecto, e introducimos el id del grupo y el id del artefacto que deseemos para nuestra aplicación.

El siguiente paso es añadir las dependencias al archivo pom.xml, así:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>uniandes.bigdata</groupId>
 <artifactId>sparkDemo</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <packaging>jar</packaging>

 <name>sparkDemo</name>
 <url>http://maven.apache.org</url>

 <properties>
 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 </properties>

 <dependencies>
 <dependency>
 <groupId>junit</groupId>
 <artifactId>junit</artifactId>
 <version>3.8.1</version>
 <scope>test</scope>
 </dependency>
 <dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-core_2.10</artifactId>
 <version>1.0.1</version>
 <scope>provided</scope>
 </dependency>
 </dependencies>
</project>

La dependencia de spark tiene groupId org.apache.spark, artifactId spark-core_2.10 y versión 1.0.1. Utilizaremos el scope provided para que no se empaqueten las librerías en el jar que generaremos.

Nuestra clase WordCount tendrá la siguiente estructura,

package uniandes.bigdata.sparkDemo;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;

import scala.Tuple2;

/**
 * WordCount en Spark
 *
 */
public class WordCount
{
    /**
     * Recibe como parámetro la ruta del archivo de entrada.
     * Puede ser un archivo local o una ruta hdfs (hdfs://...)
     * */
    public static void main( String[] args )
    {
        String file=null;
        /**Recibir como parámetro el nombre del archivo
         **/
        if(args.length>0){
            file=args[0];
        }else{
            System.err.println("No se puede ejecutar sin un archivo");
            return;
        }
        //El nombre de la aplicación:
        SparkConf conf = new SparkConf().setAppName("PruebaSpark");
        //La aplicación se puede ejecutar en local para probar en desarrollo.
        //Cuando se ejecuta desde spark (usando spark-submit)
        //no se debe especificar el master:

        //conf.setMaster("local[*]");//Descomentar esta línea para probar localmente
        JavaSparkContext sc = new JavaSparkContext(conf);

        /*Lee un archivo de texto en el RDD. El archivo puede ser local, estar ubicado en el hdfs
          o en cualquier fuente soportada por Hadoop
        */
        JavaRDD<String> texto=sc.textFile(file);

        //Utilizamos las funciones de Spark, junto con las expresiones lambda de Java 8,
        //para crear una colección de palabras (usando flatMap),
        //crear un mapa con la palabra como llave y 1 como valor (mapToPair),
        //y reducirlo por llave de tal forma que obtenemos un mapa con la palabra como llave
        //y el número de apariciones como valor (reduceByKey)

        JavaPairRDD<String, Integer> byKey = texto.flatMap(l->Arrays.asList(l.split("\\s")))
                    .mapToPair(s->new Tuple2<String, Integer>(s.toLowerCase(),1))
                    .reduceByKey((a,b)->a + b);

        //Guardamos el resultado, como archivo de texto

        byKey.saveAsTextFile("salida");
    }
}

La aplicación se puede probar localmente descomentando la línea conf.setMaster(“local[*]”);. La cadena local[*] indica que se ejecute con tantos hilos como procesadores en la máquina local. Podemos especificar el número de hilos (e.g. local[2]), dar la ruta a un servidor Mesos o Spark, o especificar que use Hadoop YARN (usando “yarn-client” o “yarn-cluster”). Cuando se va a ejecutar la aplicación usando spark-submit, no se debe especificar en la aplicación la url del máster (se puede pasar como parámetro a spark-submit).

Instalar spark y ejecutar la aplicación

La instalación de spark se limita a descargar y descomprimir el paquete indicado. Si se desea usar Yarn, es necesario descargar los binarios compilados con compatibilidad para Hadoop 2, y asegurarse que las variables de entorno HADOOP_HOME, HADOOP_CONF y YARN_CONF están correctamente establecidas.

Después de generar el jar usando maven, podemos ejecutar la aplicación usando spark-submit:

spark-submit --verbose --master local --class uniandes.bigdata.sparkDemo.WordCount "rutaAljar\sparkDemo-0.0.1-SNAPSHOT.jar" "rutaAlArchivo\test.txt"

Un problema común en Windows, si no se tiene instalada la distribución de Hadoop de Hortonworks, es que la aplicación falle porque Spark no encuentra winutils.exe, que se debería encontrar en la carpeta %HADOOP_HOME%/bin. Este archivo se puede descargar del repositorio de HDP, como indican en uno de los foros de Azure.

 

Conclusiones

El no estar limitado al paradigma MapReduce, el uso de RDD (Resilient Distributed Dataset), junto con el uso de expresiones lambda (Java 8, Scala y python), hacen que el proceso de escribir una aplicación para Spark sea intuitivo. Algunas de las características de Spark se ven cuando se ejecutan varias tareas sobre un mismo dataset, dado que puede compartir datos en memoria entre diferentes trabajos.

Spark es una alternativa interesante, especialmente en aplicaciones que requieren iteraciones y reuso de los datos (como el análisis de grafos y aprendizaje de máquina). Proyectos como GraphX  y Spark Streaming, hacen aún más interesante este framework.