Airflow es una plataforma para la definición, ejecución y monitoreo de flujos de trabajo (Workflows) creada por AirBnB, liberada bajo licencia Apache, y que desde hace unos años es un proyecto en incubación de Apache Software Foundation.

Como parte de la evolución de CDCol, evaluamos alternativas para la definición de workflows dinámicos. Después de evaluar alternativas entre las que se encontraban Luigi, Pinball y un desarrollo propio basado en Celery, la interfaz gráfica, la integración con Celery -que hace parte de la arquitectura actual CDCol-, y características como XCom, inclinaron la balanza a favor de Airflow.

Airflow tiene, conceptualmente, cuatro componentes: Un servidor web, un scheduler, ejecutores (con sus workers) y la base de datos.

Este artículo describirá la instalación de Airflow con Celery y las primeras pruebas, incluyendo la definición de workflows (DAGs) y plugins.

Instalación

Entorno

  • Máquina virtual con Ubuntu Server 18.04
  • Anaconda 3.4

Instalación Standalone

La instalación de airflow por defecto, usando el ejecutor local y la base de datos sqlite (que no es recomendable en entornos de producción), usando Anaconda, se reduce a:

export AIRFLOW_HOME="$HOME/airflow"
echo 'export AIRFLOW_HOME="$HOME/airflow"' >>$HOME/.bashrc
conda install -y -c conda-forge airflow
airflow initdb

La primera vez que se ejecuta, initdb crea el archivo de configuración $AIRFLOW_HOME/airflow.cfg con los valores por defecto y una base de datos sqllite con los valores por omisión. En este punto se puede ejecutar airflow webserver -p 8080 para verlo en acción en http://<dirección ip de la máquina>:8080.

Es una instalación funcional, en una sola máquina, sin embargo no se acerca a lo que queremos en un entorno de producción. Lo primero que vamos a hacer es cambiar el motor de base de datos que se utilizará para almacenar los metadatos. Para esto vamos a crear una nueva base de datos en postgresql y un usuario que pueda acceder de manera remota.

Después de matar el servidor web de airflow, instalamos las nuevas dependencias y creamos tanto la base de datos como el usuario (se puede ser mucho más creativo en los nombres 😛 ):

sudo apt install -y  postgresql postgresql-client postgresql-contrib
conda install -y psycopg2

sudo -u postgres createdb airflow
sudo -u postgres createuser airflow
sudo -u postgres psql airflow -c "alter user airflow with encrypted password 'la contraseña'"
sudo -u postgres psql airflow -c "grant all privileges on database airflow to airflow;"

#Configurar postgresql para que admita conexiones remotas
_HBA=`sudo -u postgres psql -t -P format=unaligned -c 'show hba_file'`
_CONFIG=`sudo -u postgres psql -t -P format=unaligned -c 'show config_file'`
mkdir -p $HOME/pg_backup
sudo cp "$_HBA" $HOME/pg_backup
cp "$_CONFIG" $HOME/pg_backup

sudo su -c "echo 'host all all 0.0.0.0/0 md5' >>$_HBA"
sudo sed -i "s/#listen_addresses = 'localhost'/listen_addresses = '*'/" $_CONFIG

#Disclaimer: Escuchar desde todas las interfaces
#y permitir conexiones desde todas las direcciones
#no es una buena política

sudo systemctl restart postgresql.service

Una vez tenemos la base de datos creada, con los permisos adecuados, modificamos el archivo de configuración de airflow ($AIRFLOW_HOME/airflow.cfg) para que la utilice, modificando la propiedad sql_alchemy_conn. La cadena de conexión en este caso quedaría:

sql_alchemy_conn = postgresql+psycopg2://airflow:la contraseña@10.0.2.15:5432/airflow

En este mismo archivo, también es necesario cambiar el ejecutor, de SequentialExecutor a LocalExecutor


executor = LocalExecutor

Luego volvemos a ejecutar airflow initdb, para inicializar la base de datos. Si todo ha salido bien, la salida será similar a la imagen.

Debemos asegurarnos de que exista la carpeta dags dentro del home de airflow.

mkdir -p "$AIRFLOW_HOME/dags"

Por alguna razón el scheduler falla (al menos en la versión 1.9) si en la carpeta dags no hay por lo menos un DAG válido, así que, mientras creamos nuestros workflows, podemos crear un DAG dummy (dummy.py):

import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator

from datetime import timedelta

args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
dag_id='example_dummy', default_args=args,
schedule_interval=None,
dagrun_timeout=timedelta(minutes=1))

run_this_last = DummyOperator(task_id='DOES_NOTHING', dag=dag)

Luego podemos iniciar el scheduler y el servidor web (la opción -D hace que se ejecuten como demonios):

airflow scheduler -D
airflow webserver -p 8080

Podemos, por ejemplo, ejecutar uno de los DAGs. Es importante fijarnos que no esté pausado (debe estar en On), de otra forma no se ejecutará.

Podemos ver el estado (si está corriendo, terminó correctamente o falló):

y el detalle de cada ejecución:

Esta instalación standalone es útil para desarrollo y un poco más cercana a lo que tendremos en producción que la instalación por defecto.
NOTA: Si esto es lo que necesitas, publiqué en GIST un script que genera este ambiente:

 

Instalación con Celery

Para hacer que airflow use como workers a los nodos de celery, basta con:

  1. Tener un cluster de celery funcional, usando como backend de resultados rabbitMQ o Redis. Airflow y celery son proyectos en desarrollo por lo que es importante verificar que las versiones sean compatibles entre los dos. En CDCol usamos celery version 3.x por lo que debemos usar una versión de airflow menor a 1.9 para poder tener más de un ejecutor sin inconvenientes.
  2. Modificar el archivo de configuración de airflow para que use el ejecutor `CeleryExecutor`.
  3. Instalar airflow en todos los nodos de celery. La configuración de airflow en todos los nodos debe ser homogénea.
  4. Asegurarse que las carpetas dags y plugins de airflow estén sincronizadas en todos los nodos.

Para este artículo, usaré Redis como backend de resultados y RabbitMQ como manejador de colas de mensajes.  Estarán instalados en la máquina en la que se ejecutará el scheduler y el webserver de airflow, sin embargo pueden estar en cualquier máquina que sea alcanzable por los nodos.

sudo apt install redis-server
sudo apt install rabbitmq-server

#Crear un usuario y un vhost para la cola de rabbit, habilitar el plugin para el monitoreo (requerido si se va a usar flower)

sudo rabbitmqctl add_user airflow airflow
sudo rabbitmqctl add_vhost airflow
sudo rabbitmqctl set_user_tags airflow airflow_tag administrator
sudo rabbitmqctl set_permissions -p airflow airflow ".*" ".*" ".*"
sudo rabbitmq-plugins enable rabbitmq_management

conda install redis-py
conda install -c conda-forge "celery<4" flower

Modificamos la configuración de celery para que use el ejecutor de celery, para que use redis server como backend de resultados y rabbitMQ como broker.

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
executor = CeleryExecutor


# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
broker_url = amqp://airflow:airflow@10.0.2.15/airflow

# Another key Celery setting
celery_result_backend = redis://10.0.2.15:6379/0

En todas las máquinas que tendrán workers de celery debe estar instalado airflow, celery y redis-py.

conda install redis-py
conda install -c conda-forge "airflow<1.9" "celery<4"

También debemos tener un mecanismo para sincronizar las carpetas dags y plugins de airflow, la opción más sencilla es usar un punto de montaje NFS (la ubicación de estos directorios se especifica en airflow.cfg). Es importante notar que el ambiente de python también debe mantenerse homogéneo.

Una vez tenemos todo configurado ejecutamos en la máquina maestra el webserver y el scheduler, y en los nodos que ejecutarán el trabajo airflow worker.


#En la máquina maestra
airflow webserver -p 8080 -D
airflow scheduler -D
#opcional:

airflow flower -D


#En cada nodo
airflow worker -D

Podemos resumir el proceso en dos scripts, uno para la máquina maestra (que en este caso también tiene un worker) y otro para los workers:

 

Para monitorear los workers de celery podemos usar flower, ejecutándolo con airflow flower -D y accediendo por el puerto 5555.

Definición de un Workflow

Una vez tenemos instalado airflow, lo siguiente que queremos hacer es definir un workflow. Airflow soporta únicamente grafos dirigidos acíclicos, lo que permite asegurar que, si todas las tareas terminan correctamente, el workflow termina (no hay ciclos infinitos). Los archivos de definición de DAGs en airflow son scripts de python.

El scheduler de airflow evalúa (ejecuta) periódicamente los archivos de la carpeta de DAGs, identificando cambios en la definición de los mismos. Es por esta razón que cada archivo de definición debe poder ejecutarse en segundos, de otro modo afectaría el rendimiento general de airflow.

En un archivo de definición de DAG se crea un objeto DAG y un conjunto de operadores que tienen dependencias entre ellos. El DAG debe tener un id, y puede definir una programación (cada cuánto debe ejecutarse), un timeout, una fecha de inicio, argumentos por defecto (que serán pasados a todas las tareas), y documentación. El DAG agrupa operadores.

Un operador describe una tarea. Se pueden agrupar en 3 categorías, según su objetivo: Operadores que ejecutan una acción, que transfieren datos, o sensores. Airflow tiene operadores que pueden cubrir la mayoría de necesidades, como BashOperator, PythonOperator, PostgresOperatorHTTPOperator`, entre otros.

Un ejemplo de workflow, que muestra el manejo de las dependencias, es:


import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

from datetime import timedelta

def print_value(ds, value):
    print(value)
#Argumentos por defecto. 
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
#Definición del DAG
dag = DAG(
dag_id='test_print_values', default_args=args,
schedule_interval=None,
dagrun_timeout=timedelta(minutes=5))

#Tareas Dummy que pertenecen al DAG
start_with = DummyOperator(task_id='PRIMERA_TAREA', dag=dag)

end_with = DummyOperator(task_id='TAREA_FINAL', dag=dag)

#Crear 5 Tareas que se ejecutarán después de start_with y antes de end_with
#Se pueden usar los operadores >> y << para expresar las dependencias. for i in range(5): _po=PythonOperator(task_id="echo_{}".format(i), python_callable=print_value, op_kargs={'value':i}, dag=dag ) start_with>>_po>>end_with

Esta definición genera el workflow de la siguiente imagen:

Y al ejecutar el DAG se puede ver el progreso, donde se muestra que la tarea final sólo se ejecuta una vez todas las tareas anteriores han terminado:

 

Por defecto, los nuevos dags se crean pausados, no se ejecutarán hasta que se activen por la interfaz web o usando la interfaz de línea de comando. Un DAG que no tenga schedule_interval (o sea None), sólo se ejecutará al lanzarlo explícitamente desde la web o usando trigger_dag.


airflow trigger_dag test_print_values

Definición de un plugin

 

Aunque los operadores incluidos en Airflow son bastante genéricos, muchas veces necesitamos utilidades adicionales, nuevos tipos de operadores, o vistas web. Para esto, Airflow permite definir plugins. Un plugin se define con un módulo de python en la carpeta de plugins. Un plugin se define con una clase extiende airflow.plugins_manager.AirflowPlugin y define conjuntos de operadores, ejecutores, vistas, “hooks“, macros y links de menú.

Cada uno de esos elementos se define como una clase que extiende una base. Por ejemplo, un operador se define como una clase que extiende airflow.models.BaseOperator y un hook a airflow.hooks.base_hook.

Por ejemplo, podríamos definir un plugin con un operador que reciba una carpeta y genere un archivo tar.gz con la salida. Para eso definimos el operador, extendiendo de BaseOperator, con su constructor y el método execute, que es el que realiza el trabajo.

import tarfile
import os
from airflow.models import BaseOperator
from airflow import utils as airflow_utils

class TarGzOperator(BaseOperator):
    @airflow_utils.apply_defaults
    def __init__(self, to_compress, output_folder, *args,**kwargs):
        super().__init__(*args, **kwargs)
        if to_compress is None or output_folder is None: 
            raise Exception("You must specify the input and output folder")
        if to_compress == output_folder:
            raise Exception("to compress folder cannot be the output folder")
        if os.path.exists(output_folder) and not os.path.isdir(output_folder):
            raise Exception("output_folder must be a folder")
        self.to_compress = to_compress
        self.output_folder = output_folder
        
    
    def execute(self, context):
        print(context)
        _filename= os.path.basename(os.path.normpath(self.to_compress))
        _output_file=os.path.join(self.output_folder, _filename+str(".tar.gz") )
        if not os.path.exists(self.output_folder):
            os.makedirs(self.output_folder)
        with tarfile.open(_output_file,"w:gz" ) as _tar:
            _tar.add(self.to_compress)
            
        
        return _output_file

Luego es necesario definir el plugin :


from airflow.plugins_manager import AirflowPlugin

#si la definición del operador se hace en un archivo diferente,
# sería necesario hacer el import

class TarGzPlugin(AirflowPlugin):
    name = 'targz_plugin'
    operators=[TarGzOperator]

Al publicar el archivo en la carpeta plugins de airflow, y reiniciar tanto el webserver como el el scheduler, el nuevo plugin está disponible. Se pueden definir DAGs que lo usen:

import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import TarGzOperator

from datetime import timedelta

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='test_plugin', default_args=args,
    schedule_interval=None,
    dagrun_timeout=timedelta(minutes=5))

dummy = DummyOperator(task_id='DOES_NOTHING', dag=dag)

tarBackups = TarGzOperator(to_compress='/home/cronosnull/airflow',output_folder='/home/cronosnull',task_id='targz',dag=dag)

dummy >> tarBackups

Al habilitar y ejecutar el DAG desde la interfaz web se generará el archivo tar.gz esperado y se puede ver que el valor retornado por el operador queda registrado en XCom (lo que permitirá usarlo desde una siguiente tarea).

Airflow es una herramienta extensible, que se adapta a escenarios diversos. Espero que este artículo sea útil para quien está empezando a explorarla, le ahorre algo de tiempo y le despierte nuevas dudas -mientras lo escribía resolví algunas-.

Han pasado casi 6 años desde los artículos de Mordiendo Hadoop: Mordiendo Hadoop: Instalación y primeras pruebasMordiendo Hadoop: Instalación en Cluster y Mordiendo Hadoop: Desarrollo de aplicaciones MapReduce. En ese tiempo el ecosistema de Hadoop se ha enriquecido, su arquitectura ha cambiado y su uso se ha extendido, convirtiéndose en la referencia de tecnología para Big Data. En estos años hemos realizado varias instalaciones de Hadoop en el grupo de Ingeniería de Información de la universidad, tanto para los cursos de maestría como para proyectos de pregrado y postgrado.

Desde hace poco más de 3 años, usamos la distribución de HortonWorks, HDP, por su facilidad de instalación (a través de ambari), por ser de código abierto y por su estabilidad. Cada vez es más fácil realizar la instalación, pero todo depende de la configuración previa de los nodos. En este artículo describiré la instalación de HDP, los pasos preliminares y las lecciones aprendidas que pueden ser aplicadas a nuevas instalaciones ( una traducción-resumen de la documentación oficial, con algunos consejos).

Preparación

Antes de iniciar la instalación de la plataforma HDP, debemos asegurarnos de cumplir con unos prerrequisitos:

  1. Tener instalado en los nodos un sistema operativo soportado (Recomendados para HDP 2.3: Centos 6.x o 7.x). Consejo: Instalar el sistema operativo en inglés, ambari en ocasiones tiene problemas con la codificación de caracteres cuando el sistema operativo tiene un idioma diferente.
  2. Desde la que será la máquina host de ambari el usuario root debe poder autenticarse sin contraseña a las otras máquinas del cluster.
  3. Todas las máquinas deberían tener la hora sincronizada (usando ntp), el servicio de ntp debe estar configurado para iniciar al iniciar el sistema operativo.
  4. Todas las máquinas deben ser alcanzables por nombre de dominio. (DNS es la mejor opción, si no es posible se puede lograr agregándolas al /etc/hosts de todas las máquinas).
  5. Cada máquina debe tener correctamente configurado su FQDN (es una continuación del punto anterior). En especial, si la máquina tiene más de una interfaz de red debemos asegurarnos que siempre resuelva su nombre de dominio con la misma dirección IP (y que sea la que está en el segmento del cluster).
  6. El número máximo de descriptores de archivos abiertos (el número máximo de archivos abiertos de forma simultanea) debe ser de, como mínimo, 10000 (el recomendado es 65536 ). Se puede verificar usando ulimit -Snulimit -Hn (Límite suave y fuerte respectivamente) y se puede cambiar usando ulimit -n 10000.
  7. Desactivar SELinux  y configurar el firewall para que las máquinas del cluster puedan comunicarse entre ellas por los puertos apropiados, y permitir el acceso de los usuarios a los servicios.  Se recomienda desactivar el firewall durante el proceso de instalación, e iniciarlo y configurarlo apropiadamente una vez la instalación esté completa.
  8. Debemos tener una idea de cómo vamos a distribuir los servicios en el cluster (en qué máquinas tendremos los servicios maestros y en cuáles tendremos los esclavos y los clientes).

Instalación

Antes de iniciar la instalación debemos determinar cuál será la máquina dedicada como host de Ambari. En esta máquina debemos:

  1. Configurar el repositorio:
    wget -nv http://public-repo-1.hortonworks.com/ambari/centos7/2.x/updates/2.1.0/ambari.repo -O /etc/yum.repos.d/ambari.repo
    
  2. Instalar ambari-server:
    yum install ambari-server
  3. Configurar ambari-server:
    ambari-server setup
    1. Seleccione el JDK deseado (Oracle JDK 1.8 es una buena opción). Debe aceptar la licencia para continuar con la instalación.
    2. Si desea tener una manejador de bases de datos diferente a postgres para ambari, seleccione la configuración avanzada. En caso contrario acepte el valor por defecto (En ese caso se creará en Postgres una base de datos llamada ambari; El usuario por defecto será ambari y contraseña bigdata).
  4. Inicie el servidor ambari:
    ambari-server start
  5. Desde un navegador web ingresar al servidor de ambari, http://<fqdn del servidor de ambari>:8080
    • Ingresar con el nombre de usuario y contraseña por defecto (admin /admin).Bienvenida de ambari
    • Al ingresar podrá crear un nuevo cluster usando el asistente, manejar los usuarios y grupos que pueden acceder a ambari y desplegar vistas.
    • Al iniciar el asistente para la creación de un nuevo cluster le solicitará un nombre. Debe ser descriptivo, porque es la forma de identificar el cluster en ambari.
      ambari2
    • El segundo paso del asistente es seleccionar la versión de HDP a instalar.  Si las máquinas tendrán conexión a Internet durante la instalación sólo es necesario indicar la versión, en caso contrario es necesario abrir las opciones avanzadas y configurar la dirección del repositorio local que se utilizará.
      ambari3
    • En el siguiente paso se agregan los nombres de las máquinas que harán parte del cluster, una por línea o utilizando expresiones.  También debe indicar la llave privada ssh autorizada para acceder a los nodos del cluster.  (Si realizó correctamente el paso 2 de la preparación, la llave privada probablemente estará en /root/.ssh/id_rsa del host de ambari)
      ambari4Si utilizó expresiones aparecerá una ventana confirmando los nombres
      ambari4.1
    • En el siguiente paso se instalará el agente de ambari en los hosts seleccionados y se pueden eliminar nodos que no se quieran incluir en el cluster.
      ambari5
    • A continuación es necesario seleccionar los servicios que se desean en el cluster.
      ambari6
    • Luego debemos seleccionar dónde quedaran los componentes maestros de estos servicios.
      ambari8
    • También seleccionar en qué nodos quedarán los componentes esclavos y los clientes de los servicios.
    • Antes de la revisión final y la instalación, debemos personalizar la configuración de los servicios. Ambari hace recomendaciones de acuerdo a las características de los nodos, que pueden ser usados como valores por defecto, sin embargo conviene revisarlas y es necesario completar las configuraciones que aparecen marcadas en rojo (para las cuales no hay un valor pre-establecido).
      ambari9
    • Luego se realiza una revisión final y se procede a la instalación, configuración y prueba de los servicios.
      ambari10
      ambari11
      El proceso de instalación puede tomar varios minutos, pero es completamente automático. Si hay una falla en alguno de los procesos se puede hacer clic en el mensaje para obtener el log (para determinar la causa). Son comunes los problemas de conexión con los servidores espejo, por lo que la solución más común es reintentar la instalación. Al finalizar verá el resumen de la instalación y podrá ir al tablero de control, en el cual puede ver cuáles servicios están activos, cuáles inactivos, los recursos utilizados, ver las alertas y realizar tareas de administración (por ejemplo, agregar un nuevo servicio al cluster, iniciar o detener un servicio y modificar la configuración).
      ambari12
      ambari13
      ambari14

El proceso de instalación es sencillo, semiautomático y permite tener rápidamente un entorno funcional. Sin embargo, se debe tener en cuenta que esta instalación es sólo un punto de arranque y se deben definir políticas para el uso de los recursos (generalmente más de un usuario utilizará el cluster) e ir ajustando la configuración de acuerdo a las métricas obtenidas.

 

Spark

Apache Spark es un framework de computación en paralelo, que promete velocidades hasta 100 veces mayores a las de Hadoop Map Reduce. Puede correr de manera local (en uno o varios hilos) o en cluster sobre Apache Mesos, Hadoop YARN, o en modo Standalone.

En este pequeño ejemplo crearemos un proyecto en eclipse con las dependencias necesarias para Spark y desarrollaremos un contador de palabras. Spark tiene API nativas para Scala, Python y Java. En este ejemplo usaremos Java 8.

Lo primero que haremos será crear un proyecto Maven en Eclipse:

Nuevo proyecto Maven en Eclipse Luna

En este caso seleccionamos el arquetipo por defecto, e introducimos el id del grupo y el id del artefacto que deseemos para nuestra aplicación.

El siguiente paso es añadir las dependencias al archivo pom.xml, así:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>uniandes.bigdata</groupId>
 <artifactId>sparkDemo</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <packaging>jar</packaging>

 <name>sparkDemo</name>
 <url>http://maven.apache.org</url>

 <properties>
 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 </properties>

 <dependencies>
 <dependency>
 <groupId>junit</groupId>
 <artifactId>junit</artifactId>
 <version>3.8.1</version>
 <scope>test</scope>
 </dependency>
 <dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-core_2.10</artifactId>
 <version>1.0.1</version>
 <scope>provided</scope>
 </dependency>
 </dependencies>
</project>

La dependencia de spark tiene groupId org.apache.spark, artifactId spark-core_2.10 y versión 1.0.1. Utilizaremos el scope provided para que no se empaqueten las librerías en el jar que generaremos.

Nuestra clase WordCount tendrá la siguiente estructura,

package uniandes.bigdata.sparkDemo;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;

import scala.Tuple2;

/**
 * WordCount en Spark
 *
 */
public class WordCount
{
    /**
     * Recibe como parámetro la ruta del archivo de entrada.
     * Puede ser un archivo local o una ruta hdfs (hdfs://...)
     * */
    public static void main( String[] args )
    {
        String file=null;
        /**Recibir como parámetro el nombre del archivo
         **/
        if(args.length>0){
            file=args[0];
        }else{
            System.err.println("No se puede ejecutar sin un archivo");
            return;
        }
        //El nombre de la aplicación:
        SparkConf conf = new SparkConf().setAppName("PruebaSpark");
        //La aplicación se puede ejecutar en local para probar en desarrollo.
        //Cuando se ejecuta desde spark (usando spark-submit)
        //no se debe especificar el master:

        //conf.setMaster("local[*]");//Descomentar esta línea para probar localmente
        JavaSparkContext sc = new JavaSparkContext(conf);

        /*Lee un archivo de texto en el RDD. El archivo puede ser local, estar ubicado en el hdfs
          o en cualquier fuente soportada por Hadoop
        */
        JavaRDD<String> texto=sc.textFile(file);

        //Utilizamos las funciones de Spark, junto con las expresiones lambda de Java 8,
        //para crear una colección de palabras (usando flatMap),
        //crear un mapa con la palabra como llave y 1 como valor (mapToPair),
        //y reducirlo por llave de tal forma que obtenemos un mapa con la palabra como llave
        //y el número de apariciones como valor (reduceByKey)

        JavaPairRDD<String, Integer> byKey = texto.flatMap(l->Arrays.asList(l.split("\\s")))
                    .mapToPair(s->new Tuple2<String, Integer>(s.toLowerCase(),1))
                    .reduceByKey((a,b)->a + b);

        //Guardamos el resultado, como archivo de texto

        byKey.saveAsTextFile("salida");
    }
}

La aplicación se puede probar localmente descomentando la línea conf.setMaster(“local[*]”);. La cadena local[*] indica que se ejecute con tantos hilos como procesadores en la máquina local. Podemos especificar el número de hilos (e.g. local[2]), dar la ruta a un servidor Mesos o Spark, o especificar que use Hadoop YARN (usando “yarn-client” o “yarn-cluster”). Cuando se va a ejecutar la aplicación usando spark-submit, no se debe especificar en la aplicación la url del máster (se puede pasar como parámetro a spark-submit).

Instalar spark y ejecutar la aplicación

La instalación de spark se limita a descargar y descomprimir el paquete indicado. Si se desea usar Yarn, es necesario descargar los binarios compilados con compatibilidad para Hadoop 2, y asegurarse que las variables de entorno HADOOP_HOME, HADOOP_CONF y YARN_CONF están correctamente establecidas.

Después de generar el jar usando maven, podemos ejecutar la aplicación usando spark-submit:

spark-submit --verbose --master local --class uniandes.bigdata.sparkDemo.WordCount "rutaAljar\sparkDemo-0.0.1-SNAPSHOT.jar" "rutaAlArchivo\test.txt"

Un problema común en Windows, si no se tiene instalada la distribución de Hadoop de Hortonworks, es que la aplicación falle porque Spark no encuentra winutils.exe, que se debería encontrar en la carpeta %HADOOP_HOME%/bin. Este archivo se puede descargar del repositorio de HDP, como indican en uno de los foros de Azure.

 

Conclusiones

El no estar limitado al paradigma MapReduce, el uso de RDD (Resilient Distributed Dataset), junto con el uso de expresiones lambda (Java 8, Scala y python), hacen que el proceso de escribir una aplicación para Spark sea intuitivo. Algunas de las características de Spark se ven cuando se ejecutan varias tareas sobre un mismo dataset, dado que puede compartir datos en memoria entre diferentes trabajos.

Spark es una alternativa interesante, especialmente en aplicaciones que requieren iteraciones y reuso de los datos (como el análisis de grafos y aprendizaje de máquina). Proyectos como GraphX  y Spark Streaming, hacen aún más interesante este framework.

 

He visto cientos de startups buscando “Ninja Programmers”. Sólo conozco un candidato que cumple ese requisito y aunque casualmente es uno de los mejores desarrolladores que conozco, que sea un maestro del ninjutsu no dice mucho acerca de su experiencia en el desarrollo de software. Aunque la analogía del Ninja puede servir para sintetizar algunas de las cualidades de un buen desarrollador, términos como Ninja o Rockstar no aportan a la descripción del perfil que se busca para cubrir las necesidades de la empresa.

BigData at the peak of inflated expectations
Gartner’s 2013 Hype Cycle for Emerging Technologies

Con el término “Data Scientist” pasa algo similar: no sabemos qué esperar.  Según el ciclo de Gartner para tecnologías emergentes para el 2013, tanto Big Data como Content Analysis están en la cima del  pico de expectativas infladas lo que hace que muchas empresas busquen sacarle provecho, sin tener muy claro cómo, e integrar a un científico de datos a la nómina parece tener mucho sentido. Incluso, Data Scientist es considerado el trabajo más sexy del siglo 21.

El problema del término Data Scientist es que engloba un conjunto de roles, con diferentes habilidades, que interactúan entre sí.

Existen varios trabajos que intentan clasificar los roles de los científicos de datos. Kandel et al. (2012), por ejemplo, identifican tres arquetipos: Hacker, Scripter y Usuario de aplicación; en el mismo trabajo describen 5 tareas de alto nivel: Descubrir, Discutir, Perfilar, Modelar y Reportar. En su reporte “Four Functional Clusters of Analytics Professionals“, Talented Analytics identifica 4 categorías de profesionales de acuerdo a sus funciones: Preparación de datos, Programadores, Administradores y Generalistas. Así como estos, se pueden encotrar varias taxonomías de Data Scientists, pero la que más me ha gustado y la que utilizaré en el resto del artículo es la presentada en “Analyzing the Analyzers: An Introspective Survey of Data Scientists and Their Work“, en la que definen un conjunto de habilidades, y un conjunto de roles, a partir de una encuesta realizada a profesionales que se identifican con el rol de Data Scientist.

Analyzing the Analyzers An Introspective Survey of Data Scientists and Their Work
Grupos de habilidades

En el análisis identifican una serie de habilidades que se relacionan con el trabajo de los diferentes profesionales entrevistados y se generan cinco grupos de habilidades usando Non-negative Matrix Factorization: Negocio, Machine Learning/Big Data, Matemáticas/Investigación de operaciones, Programación y Estadística.

En la encuesta también se les preguntó a los profesionales cómo se ven a sí mismos y, de igual forma, se generaron cuatro clusters:  Data Developer (o Data Engineer), Data Researcher, Data Creative y Data Businessperson.

Aunque cada uno de estos roles tiene fortalezas de manera predominante en uno o dos de los grupos de habilidades, todos los profesionales tienen habilidades en los 5 grupos.  El análisis sugiere tener equipos de Data Scientists de los diferentes roles de tal forma que sus habilidades se superpongan de la mejor manera posible.

" Analyzing the Analyzers An Introspective Survey of Data Scientists and Their Work"
Diferentes Roles con habilidades predominantes, pero todos los roles tienen habilidades de los 5 grupos.

Este marco nos permite diferenciar entre profesionales del análisis de datos y nos permite describir de una manera más detallada los perfiles necesarios. También nos permite hablar acerca de las habilidades que se persiguen en un curso específico de Data Science. Por ejemplo, en el curso de “From Big Data to Content Analysis” dictado en el 2012 en la universidad de los Andes, el programa buscaba desarrollar habilidades de los grupos ML/Big Data y Programming, asociadas con los roles Data Creative y Data Engineer, pero gracias a la interdisciplinariedad de los participantes los proyectos se nutrieron con habilidades del grupo de estadística (Visualización, análisis espacial, marketing), más asociado al rol de Data Researcher.

Pensamientos finales

  • No todos los Data Scientists son iguales. Un científico de datos no trabaja solo, se necesita un equipo que sume sus fortalezas y su conocimiento del dominio de los datos para obtener los mejores resultados.
  • Aunque cada rol tiene fortalezas especialmente en un grupo de habilidades, un científico de datos debe conocer lo suficiente de los otros grupos de habilidades como para facilitar las discusiones.
  • La interdisciplinaridad de un equipo de análisis de datos es una de sus mayores fortalezas.
  • Prefiero el término Data Engineer frente a Data Developer, es más general y describe mejor el rol.
  • Los autores de  Analyzing the Analyzers han publicado un test para determinar tu rol como Data Scientist. A mi me pareció bastante acertado.
  • Conozco personas que conformarían un excelente grupo de data science, algunas de ellas creo que nunca han pensado en sí mismas como data scientists.
  • Data Science no implica Big Data, aunque sean términos que se relacionan.

Estaba buscando actividades, para asignarles como “Bono” a los estudiantes de algorítmica y programación, y recordé blockly, que permite programar usando elementos visuales (como armar un rompecabezas). Blockly permite generar código en los lenguajes Javascript, Dart, Python o XML. La filosofía del lenguaje de Blocky lo hace perfecto para nuevos programadores.

Mi solución al laberinto (Demo de blockly)
Una solución para el Laberinto

El editor de Blocky es libre (bajo licencia Apache) e implementado en Javascript, lo que permite incluirlo fácilmente en cualquier proyecto web, sin requerimientos especiales en el lado de servidor. Después de hacer checkout y copiarlo al servidor, podemos usar el playground que se encuentra en los test.

Por defecto tiene un conjunto de “bloques” bastante completo, que incluye: Estructuras de control, manipulación de texto, manejo de variables, listas, expresiones aritméticas, expresiones lógicas, definición y llamada a procedimientos. Si necesitamos algo que no está definido, el lenguaje se puede extender fácilmente. Por ejemplo, si quiero que puedan mostrar un mensaje al usuario (Lo que en JavaScript sería un alert…) puedo definir el bloque diciendo en qué categoría se mostrará, que valores tendrá como entrada, si retorna o no un valor, entre otras cosas. Una vez el bloque está definido es necesario implementar los generadores para los lenguajes que queremos soportar (para las funciones incluidas, Blockly incluye generadores para JavaScript, Dart y Python).

if (!Blockly.Language) {
  Blockly.Language = {};
}
Blockly.JavaScript = Blockly.Generator.get('JavaScript');
Blockly.Dart = Blockly.Generator.get('Dart');
Blockly.Python = Blockly.Generator.get('Python');

Blockly.Language.alert = {
  // Bloque para mostrar un mensaje al usuario (alert)
  category: 'Commands',
  helpUrl: 'http://www.christian-ariza.net/techstuff/Blockly#alert',
  init: function() {
    this.setColour(290);
    this.appendTitle('alert');
    this.setPreviousStatement(true);
    this.appendInput('Mensaje', Blockly.INPUT_VALUE, 'MSG');
    this.setNextStatement(true);
    this.setTooltip('Muestra un mensaje (modal) al usuario');
  }
};

Blockly.JavaScript.alert= function (){
  var mensaje=Blockly.JavaScript.valueToCode(this, 'MSG',
      Blockly.JavaScript.ORDER_NONE) || '\'\'';
  console.log(mensaje);
  var code="alert ("+mensaje+");\n";
  return code;
}
Blockly.Dart.alert= function (){
  var mensaje=Blockly.Dart.valueToCode(this, 'MSG',
      Blockly.Dart.ORDER_NONE) || '\'\'';
  console.log(mensaje);
  var code="window.alert ("+mensaje+");\n";
  return code;
}
Blockly.Python.alert= function (){
  var mensaje=Blockly.Python.valueToCode(this, 'MSG',
      Blockly.Python.ORDER_NONE) || '\'\'';
  console.log(mensaje);
  var code="print("+mensaje+")\n";
  return code;
}

Definiendo estos elementos ya podemos usar el nuevo bloque:

Ejemplo usando el nuevo bloque "alert"
Ejemplo usando el nuevo bloque “alert”

El código generado sería:

JavaScript

var x;
var item;
function procedure(x) {
  item = x;
  alert ('Hola');
  alert (item);
}

procedure('Parametro X');

Dart

Para que funcione el alert en Dart, haría falta importar una librería que maneje DOM, ej #import(“dart:dom”), ese import no fue generado (No encontré como especificarlo…)

var x;
var item;
void procedure(x) {
  item = x;
  window.alert ('Hola');
  window.alert (item);
}

main() {
  procedure('Parametro X');
}

Python

x = None
item = None
def procedure(x):
  global item
  item = x
  print('Hola')
  print(item)

procedure('Hola mundo')

Si quieren seguir experimentando con Blockly pueden usar el “ambiente” que usaré en el curso, es básicamente el demo de generador de código, con el bloque alert que acabo de crear. La idea es ir agregándole nuevos bloques según sea necesario para alguna actividad en particular.

Si el experimento de los bonos es un éxito, el siguiente paso será que construyan una aplicación para Android usando el App Inventor,  puede ser para el último nivel del curso :P.

Un pequeño post que se irá actualizando cada vez que sea necesario con pequeños tips de uso de vmware VIX y vmrun específicamente.

  • Para usar vmrun con la última versión de vmplayer (3.1.3 en este momento), necesitamos editar el archivo de configuración vixwrapper-config.txt para que use la implementación de VIX.
  • Al ejecutar un comando en la maquina virtual (usando runProgramInGuest) la salida de éste no se reflejará en el host, aunque es una funcionalidad muy solicitada. Una solución temporal es redirigir la salida a un archivo y obtener el archivo (usando copyFileFromGuestToHost).
  • Cuando ejecutamos vmware workstation sobre una máquina virtual (ESX) si queremos que las máquinas que corren sobre WS tengan conectividad de red tipo bridged el vSwitch debe estar en modo promiscuo. La misma situación se presenta con ESX anidados.
  • Ejecutar un comando en Debian 6, Ubuntu 10.04 o superior, usando vmrun y vmware Workstation 7: Vmrun runProgramInGuest failing in Debian 6 guest
UEC Architecture

Ubuntu Enterprise Cloud permite el despliegue de una Cloud privada, con el enfoque infraestructura como servicio(IaaS) compatible con Amazon EC2. Usa el hypervisor KVM y una versión de Eucalyptus modificada para usarlo. En este post revisaremos la arquitectura e instalación de Ubuntu Enterprise Cloud, siendo principalmente un resumen, y traducción, de la documentación pertinente.

Una visión general de los componentes se presenta en el siguiente diagrama, tomado del white paper Ubuntu Enterprise Cloud Architecture:

UEC Architecture

Cloud Controller

Provee la interface con la que el usuario de la cloud interactúa. Esta interface se compone de una API SOAP standard compatible con la API de Amazon EC2, una interfaz de consulta más simple que es usada por euca2ools y ElasticFox,  y una interfaz web tradicional para la interacción directa.

Walrus Storage Controller(WS3)

Implementa APIs REST y SOA que son compatibles con Amazon Simple Storace Protocol (S3). Es usado para almacenar las imágenes de máquinas que pueden ser instanciadas por UEC y para acceder y almacenar datos. Actualmente la máquina en la cual corre el Cloud Controller también corre WS3, pero se espera que ésta limitación sea removida en las siguientes versiones.

Elastic Block Storage Controller(EBS)

Corre en la misma máquina que el Cluster Controller y se configura cuando éste es instalado. Permite crear dispositivos de bloques persistentes que pueden ser montados en máquinas en ejecución.  Estos dispositivos pueden ser usados como cualquier dispositivo de bloques, por ejemplo creando en ello un sistema de archivos.

EBS permite crear instantaneas de volúmenes, que son almacenados en WS3. Las instantaneas pueden ser usadas como punto de inicio para nuevos EBS. La misma instantánea puede ser usada para instanciar tantos volúmenes como se desee.

A nivel de red los dispositivos de bloques se acceden usando ATA over Ethernet(AoE). Dado que los paquetes no pueden ser ruteados se requiere  que tanto el EBS como los nodos que tienen las imágenes de las máquinas que lo acceden estén en el mismo segmento Ethernet.

Cluster Controllers

Opera entre los Node Controllers y el Cloud Controller. Recibe las peticiones para asignar imágenes de máquinas del Cloud Controller y decide cual Node Controller correrá la instancia de máquina (MInst). Esta decición es tomada en base a los reportes de estado que el Cluster Controller recibe de cada Node Controller. También puede responder al Cloud Controller acerca de la capacidad del cluster de correr tipos específicos de instancia, ayudándolo a decidir sobre cual cluster correr nuevas instancias.

El Cluster Controller está a cargo de manejar las redes virtuales que las Minst corran y enrutar el tráfico entre ellas. Los Cluster Controllers también corren los EBS Controllers. El grupo formado por un Cluster Controller y EBS Controller y un número variable de Node Controllers conforman el equivalente a las “zonas de disponibilidad” de Amazon.

Node Controllers

Corre en las máquinas físicas en las cuales se instanciarán las imágenes de máquinas. Interactúa con el hypervisor y el sistema operativo corriendo en el nodo, según es instruido por el Cluster Controller. La tarea inicial del Node Controller es descubrir el entorno en el cual está corriendo, en términos de recursos disponibles(memoria, espacio en disco, tipo de procesador y número de núcleos) así como máquinas virtuales en ejecución que puedieron se iniciadas independientemente del Node Controler, Cluster Controller y Cloud Controller.

Los node controllers esperan y realizan las tareas solicitadas por el Cluster Controller (iniciar o parar instancias) o responde a consultas de disponibilidad.

Cuando le llega una petición para instanciar una imágen de máquina el NC debe:

  1. verificar la autenticidad de la petición.
  2. Descargar la imagen de WS3(Las imágenes son cacheadas, para iniciar varias instancias en una máquina sólo se descarga una vez).
  3. Se crea la intefaz de red virtual (VNI).
  4. Inicia la instancia de la máquina corriendo como una maquina virtual.

Para detenerla se realizan las operaciones opuestas en el orden 1,4,3.

Instalación y despliegue.

Continuar leyendo

JabRef es una de mis herramientas favoritas para gestión de referencias, llevo un par de años usándola y aún me sorprende con funcionalidades que me facilitan la vida. El motivo de este post es documentar el uso de algunas de éstas funcionalidades, algunas de uso muy común y otras para usos específicos.

  • Para empezar maneja el formato BibTex, que separa el formato del contenido y que no depende de la aplicación, es manejado por muchas herramientas en especial por las que manejan LATEX, como Lyx.
  • Tiene varias formas para ingresar una nueva referencia, las que más uso son:
    • BibTex->New entry (CTRL+N o el icono + en la barra de herramientas), despliega un menú donde seleccionamos el tipo de entrada (articulo, tesis, libro, etc.) y a continuación nos presenta los campos a ingresar.
    • BibTex->New entry from plain text. despliega el menú de tipo de entrada y a continuación nos presenta un área donde podemos pegar texto plano y etiquetar partes de éste como cada uno de los campos de la referencia. También tenemos una pestaña para pegar directamente la referencia en Bibtex.
  • Permite importar referencias desde otros formatos. File->import into current database o File-> Import into new database. Entre los formatos soportados se encuentran BibTeXML, JSTOR y MSBib(Formato de bibliografía de MS Office 2007).
  • Permite exportar las referencias, no solo a formatos de bibliografía de otras aplicaciones sino también a formatos que facilitan la visualización. File->Export y se selecciona el formato de salida, mi favorito para visualización es HTML Table (with Abstract & BibTex).
  • Permite encontrar duplicados. Tools->Scan Database->Find duplicates
  • Permite hacer búsquedas y filtros.
  • Permite el manejo de grupos. Lo descubrí hace poco, solo debemos habilitar la interfaz de grupos (CTRL->SHIFT->G)
  • Permite hacer búsquedas directamente a JSTOR, IEEEXplore, ACM Portal, entre otras Bases de datos. Menú Web Search.

Éstas son las opciones que más utilizo pero JabRef tiene muchas más herramientas, además de ser extensible usando plugins.

Por último, un dato útil, para importar la bibliografía a MS Word 2007 primero exportamos la bibliografía en formato Office 2007 y luego, en Word, en la pestaña Referencias hacemos clic en Administrar Fuentes, en la ventana que nos despliega vemos “Fuentes disponibles en”, vamos a Examinar y buscamos el xml que generamos al exportar, ahora podemos usar las referencias en nuestro documento.

Actualización 2012-03-11: Hoy tuve el problema complementario, exportar la bibliografía de un documento de Word 2007 a Bibtex, la solución fue mucho más simple de lo que esperaba gracias a el add-in Refmanager.