Apache Kafka – Una Introducción rápida

El procesamiento de datos en streaming es ubicuo hoy en día. Para obtener resultados pertinentes, el procesamiento de grandes cantidades de datos en lotes puede tener una latencia inaceptable. Es por eso que el ser capaces de procesar la información como un flujo de datos infinito, combinando nuevos datos con datos históricos cuando sea necesario, se vuelve un escenario no solo atractivo sino indispensable. Con suficientes productores de datos, incluso el recibir los mensajes para almacenarlos se puede convertir en un problema interesante. Necesitamos herramientas que puedan escalar horizontalmente para recibir los mensajes, persistirlos de una forma durable, y ponerlos a disposición de otros componentes de una forma determinística.

Uno de los protagonistas del ecosistema de ingeniería de datos, en particular en estos casos (fast-data architectures) es Apache Kafka. Apache Kafka se define como una plataforma, de código abierto, distribuida de flujo de eventos.

El rol de Kafka en estas arquitecturas es crucial: recibir de una forma escalable grandes cantidades de mensajes de fuentes diversas, unificando la interfaz a utilizar por los consumidores, quienes pueden consumir los mensajes con muy baja latencia. Todo esto basado en un concepto simple: el patrón publicador/suscriptor.

Antes de entrar en más detalle, resumamos las características principales de Apache Kafka:

  • Baja latencia
  • Escalabilidad
  • Alta disponibilidad
  • Durabilidad

A muy alto nivel, Kafka es un sistema distribuido que puede tener un cluster de uno o más servidores (que pueden ser brokers o nodos de kafka connect) y clientes (productores/consumidores). Los productores envían mensajes sobre un tema (topic) a los brokers y los consumidores leen los mensajes de los brokers.

Los mensajes de un tema puede estar distribuidos en varias particiones, y cada partición es un conjunto de brokers. Los consumidores también pueden estar distribuidos, formando grupos de consumidores que en conjunto ven todos los mensajes, pero en los cuales cada nodo sólo ve un subconjunto de ellos.

En un momento dado cada consumidor puede estar leyendo mensajes de una sola partición. Las particiones están ordenadas (orden total). Eso hace que cada consumidor sólo necesite recordar el offset del mensaje que consumió para poder continuar desde esa posición más adelante.

Un ejemplo rápido

Para ejecutar este pequeño ejemplo necesitas docker/docker compose instalado.

Iniciemos con nuestra infraestructura. Tendremos un broker de Kafka, un nodo de Zookeeper (como coordinador), un cluster de Apache Spark con dos ejecutores y un master, un servidor de almacenamiento de objetos (MinIO), y un nodo con JupyterLab, para ejecutar nuestros notebooks.

version: '3'
services:
kafka:
image: confluentinc/cp-kafka
ports:
– "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
– zookeeper
zookeeper:
image: confluentinc/cp-zookeeper
environment:
– ZOOKEEPER_CLIENT_PORT=2181
ports:
– "2181:2181"
spark-executor:
image: spark:3.5.1-scala2.12-java11-ubuntu
environment:
SPARK_MASTER_HOST: spark-master
SPARK_MASTER_PORT: 7077
command: spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
volumes:
– ./shared_folder:/workspace/data
deploy:
replicas: 2
depends_on:
– spark-master
spark-master:
image: spark:3.5.1-scala2.12-java11-ubuntu
environment:
SPARK_MASTER_HOST: spark-master
SPARK_MASTER_PORT: 7077
SPARK_LOCAL_IP: spark-master
command: spark-class org.apache.spark.deploy.master.Master
volumes:
– ./shared_folder:/workspace/data
spark-notebook:
build:
context: .
dockerfile_inline: |
FROM quay.io/jupyter/pyspark-notebook:spark-3.5.1
#set password for jupyter
RUN echo '{"IdentityProvider": {"hashed_password": "argon2:$$argon2id$$v=19$$m=10240,t=10,p=8$$zUYc32oQmbROa0YxSdntdw$$5hOMyxMMdml9/pM1Jc8A1GNMhi1d3cEEZiBW3KjJhCY"}}' >> /home/jovyan/.jupyter/jupyter_server_config.json && \
chmod 600 /home/jovyan/.jupyter/jupyter_server_config.json
ports:
– "4040:4040"
– "8888:8888"
– "38889:38889"
– "7777:7777"
volumes:
– ./:/home/jovyan/work
minio-server:
image: quay.io/minio/minio
command: server /data –console-address ":9001"
ports:
– "9000:9000"
– "9001:9001"
environment:
MINIO_ROOT_USER: minio
MINIO_ROOT_PASSWORD: minio2024

Ahora iniciamos la infraestructura usando docker compose up.

Ahora jupyter lab debe estar disponible en http://localhost:8888 y podemos ingresar usando la contraseña configurada `kafka-demo`.

Ahora crearemos un producer para kafka. Existen varias librerías para kafka en python, usualmente la que recomiendo para productores-consumidores simples es `kafka-python`, pero al momento de escribir esta entrada, la versión actual publicada no funciona con versiones recientes de python, así que podemos usar una distribución alternativa kafka-python-ng.

Para crear el productor sólo necesitamos crear un cliente, especificando los brokers (en este caso sólo tenemos uno) y enviar mensajes al tema al que queremos publicar.

Crear un consumidor sigue el mismo patrón, debemos indicar los servidores, el tema y el offset (opcional).

Y por último, para tener un ejemplo un poco más complejo, consumamos los mensajes usando pyspark y almacenemos la salida en s3. Primero crea el bucket test-bucket usando la interfaz de minio que estará disponible en http://localhost:9001 puede usar el usuario minio y la contraseña minio2024 configurada en el docker compose.

Configuramos la sesión de spark, incluyendo las dependencias (spark-sql-kafka para kafka, y hadoop-aws para s3), el endpoint de minio (minio-server:9000) y las credenciales.

¡Importante! debe cambiar la IP del driver por la IP del host.

Creamos nuestro dataframe inicial de spark leyendo desde el topic de kafka, usando structured streaming:

Realizamos las transformaciones necesarias y escribimos a minio:

Este es un proceso de streaming, así que se estará ejecutando cada minuto hasta que lo detengamos (e.g. matando el driver, o usando stop()).

Ahora, queda como tarea al lector experimentar con los diversos componentes (creando nuevos mensajes y temas) y explorar Spark Structured Streaming. Los recursos utilizados en esta entrada están disponibles como un gist en github.

Apache Kafka log

Síndrome de abstinencia

Estar contigo o no estar contigo es la medida de mi tiempo.

Jorge Luis Borges, El amenazado

Desde ese primer momento lo supe, estoy enganchado a ti. Sólo con verte, tocarte o escuchar tu voz mi alma se enciende. ¿Cómo pretendes que nos distanciemos, si mi alma está más contigo que aquí?

Me pregunto por qué no subí a ese último tren contigo. Nuestros encuentros, siempre marcados por despedidas. Lo sé, la realidad es mucho más compleja.

Deberíamos estar juntos. No podemos hacerlo. No me gustan las certezas, en especial estas dos. ¿No hemos sido siempre buenos para hacer lo imposible?

Con cada desencuentro este síndrome de abstinencia se hace más insoportable. Mis manos tiemblan si no te escribo, mi voz te busca aunque no la escuches, mis ojos te ven aunque no estés aquí.

Nuestra historia parece una colaboración entre Murakami y Benedetti. Tal vez deberíamos escribir el próximo capítulo, este aún no es el final.

Mientras me quieras, estaremos juntos aunque tengamos un océano de por medio, aunque mis palabras no te alcancen.

Sin nombre

El calor del sol de primavera en el rostro, el olor del aire de las montañas, una vibración que recorre su cuerpo y lo llena de energía y entusiasmo, eso es lo que siente cada vez que piensa en ella.

Sol en primavera

La noche anterior no hizo nada raro. Después de un largo día de trabajo llegó a su apartamento, preparó algo rápido, cenó y se fue a la cama. Durmió mientras escuchaba la lluvia golpear en la ventana. Las noches lluviosas en la ciudad le inspiran tranquilidad.

La mañana tiene un cielo despejado, un lienzo azul inmaculado, un buen comienzo para el fin de semana de no ser por esto. Es una sensación extraña, sabe que debería estar asustado, pero no es así. No es miedo lo que siente, es desconcierto y curiosidad.

Se despertó pensando en ella. Nunca fue capaz de decirle nada, no por miedo al rechazo, sino todo lo contrario, porque dejaría de ser una ilusión y se volvería real, y lo real se acaba. Le sorprendió tener todas estas sensaciones al recordar su rostro, su sonrisa, pero también le sorprendió no ser capaz de recordar su nombre.

Nunca había sido muy bueno con los nombres, pero esto es algo nuevo. No es alguien que acabe de conocer, alguien a quien sólo ha visto algunas veces o que no es memorable. Es ella.

Recuerda perfectamente cada detalle, cada momento en el que han estado juntos. La conoció años atrás, trabajaron juntos por un tiempo y siempre lograba impresionarlo con su inteligencia, creatividad y energía. En ese entonces no se imaginaba que se convertiría en alguien tan importante para él. Con el tiempo empezó a enamorarse de su actitud frente a la vida, su vitalidad -que él atribuía a su juventud, era algunos años menor, pero él nunca la tuvo-, y su sonrisa, esa que ahora no se puede sacar de la cabeza.

Recuerda con especial nivel de detalle la vez que cenaron juntos en un acogedor restaurante, con un ambiente tranquilo que ayudó a la conversación. Ese fue el día en que llegó a conocerla realmente, era perfecta.

Restaurante acogedor (from UNSPLASH)

Hace meses no la ve -ella siempre había querido estudiar en el exterior, ahora lo estaba haciendo-, tal vez este episodio sea una buena excusa para escribirle.

Pensó que vivir en esta época en la que delegamos nuestra memoria a dispositivos, aunque puede haber causado el problema, podía jugar a su favor ahora. Buscó los mensajes que habían intercambiado, no los encontró; pensó usar las aplicaciones de redes sociales, buscando su foto en los perfiles de amigos en común, pero fue otra búsqueda infructuosa. Parecía que se había desvanecido, probablemente desactivó sus redes para concentrarse en sus estudios.

No se dio por vencido, han trabajado juntos así que su nombre debe estar en algún acta de esa época -será el nombre que no reconozca en la lista-. Es increíble la inmensa cantidad de documentos que almacenamos en unos cuantos años y, aún más, lo natural que se ha convertido encontrar uno en particular. Abrió una de las listas de asistencia, de una reunión en la que, recordaba, ella había dado una de sus habituales ideas sobresalientes.

¡No está su nombre!

¿Cómo puede ser posible? Es como si nunca hubiera existido, pero estaba seguro de que no era un producto de su imaginación.

Decidió escribirle a uno de los compañeros de esa época, preguntar algo relacionado al trabajo, algo que no cree dudas sobre su cordura, como “¿recuerdas quién propuso esa forma de atacar el problema? Quiero usarlo como ejemplo dando los créditos apropiados”. Era un plan simple e infalible. Ahora estaba decidido, apenas tuviera el nombre le escribiría y le diría todo lo que siente por ella.

Empezó a redactar el mensaje, pero en ese momento recibió uno. Era ella, decía que se había despertado pensando en él, aunque probablemente no completamente despierta, porque no lograba recordar su nombre, así que volvió a activar su cuenta sólo para buscarlo entre los amigos en común y escribirle. Había algo que quería decirle.

Airflow – Instalación y primeras pruebas

Airflow es una plataforma para la definición, ejecución y monitoreo de flujos de trabajo (Workflows) creada por AirBnB, liberada bajo licencia Apache, y que desde hace unos años es un proyecto en incubación de Apache Software Foundation.

Como parte de la evolución de CDCol, evaluamos alternativas para la definición de workflows dinámicos. Después de evaluar alternativas entre las que se encontraban Luigi, Pinball y un desarrollo propio basado en Celery, la interfaz gráfica, la integración con Celery -que hace parte de la arquitectura actual CDCol-, y características como XCom, inclinaron la balanza a favor de Airflow.

Airflow tiene, conceptualmente, cuatro componentes: Un servidor web, un scheduler, ejecutores (con sus workers) y la base de datos.

Este artículo describirá la instalación de Airflow con Celery y las primeras pruebas, incluyendo la definición de workflows (DAGs) y plugins.

Instalación

Entorno

  • Máquina virtual con Ubuntu Server 18.04
  • Anaconda 3.4

Instalación Standalone

La instalación de airflow por defecto, usando el ejecutor local y la base de datos sqlite (que no es recomendable en entornos de producción), usando Anaconda, se reduce a:

export AIRFLOW_HOME="$HOME/airflow"
echo 'export AIRFLOW_HOME="$HOME/airflow"' >>$HOME/.bashrc
conda install -y -c conda-forge airflow
airflow initdb

La primera vez que se ejecuta, initdb crea el archivo de configuración $AIRFLOW_HOME/airflow.cfg con los valores por defecto y una base de datos sqllite con los valores por omisión. En este punto se puede ejecutar airflow webserver -p 8080 para verlo en acción en http://<dirección ip de la máquina>:8080.

Es una instalación funcional, en una sola máquina, sin embargo no se acerca a lo que queremos en un entorno de producción. Lo primero que vamos a hacer es cambiar el motor de base de datos que se utilizará para almacenar los metadatos. Para esto vamos a crear una nueva base de datos en postgresql y un usuario que pueda acceder de manera remota.

Después de matar el servidor web de airflow, instalamos las nuevas dependencias y creamos tanto la base de datos como el usuario (se puede ser mucho más creativo en los nombres 😛 ):

sudo apt install -y  postgresql postgresql-client postgresql-contrib
conda install -y psycopg2

sudo -u postgres createdb airflow
sudo -u postgres createuser airflow
sudo -u postgres psql airflow -c "alter user airflow with encrypted password 'la contraseña'"
sudo -u postgres psql airflow -c "grant all privileges on database airflow to airflow;"

#Configurar postgresql para que admita conexiones remotas
_HBA=`sudo -u postgres psql -t -P format=unaligned -c 'show hba_file'`
_CONFIG=`sudo -u postgres psql -t -P format=unaligned -c 'show config_file'`
mkdir -p $HOME/pg_backup
sudo cp "$_HBA" $HOME/pg_backup
cp "$_CONFIG" $HOME/pg_backup

sudo su -c "echo 'host all all 0.0.0.0/0 md5' >>$_HBA"
sudo sed -i "s/#listen_addresses = 'localhost'/listen_addresses = '*'/" $_CONFIG

#Disclaimer: Escuchar desde todas las interfaces
#y permitir conexiones desde todas las direcciones
#no es una buena política

sudo systemctl restart postgresql.service

Una vez tenemos la base de datos creada, con los permisos adecuados, modificamos el archivo de configuración de airflow ($AIRFLOW_HOME/airflow.cfg) para que la utilice, modificando la propiedad sql_alchemy_conn. La cadena de conexión en este caso quedaría:

sql_alchemy_conn = postgresql+psycopg2://airflow:la contraseña@10.0.2.15:5432/airflow

En este mismo archivo, también es necesario cambiar el ejecutor, de SequentialExecutor a LocalExecutor


executor = LocalExecutor

Luego volvemos a ejecutar airflow initdb, para inicializar la base de datos. Si todo ha salido bien, la salida será similar a la imagen.

Debemos asegurarnos de que exista la carpeta dags dentro del home de airflow.

mkdir -p "$AIRFLOW_HOME/dags"

Por alguna razón el scheduler falla (al menos en la versión 1.9) si en la carpeta dags no hay por lo menos un DAG válido, así que, mientras creamos nuestros workflows, podemos crear un DAG dummy (dummy.py):

import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator

from datetime import timedelta

args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
dag_id='example_dummy', default_args=args,
schedule_interval=None,
dagrun_timeout=timedelta(minutes=1))

run_this_last = DummyOperator(task_id='DOES_NOTHING', dag=dag)

Luego podemos iniciar el scheduler y el servidor web (la opción -D hace que se ejecuten como demonios):

airflow scheduler -D
airflow webserver -p 8080

Podemos, por ejemplo, ejecutar uno de los DAGs. Es importante fijarnos que no esté pausado (debe estar en On), de otra forma no se ejecutará.

Podemos ver el estado (si está corriendo, terminó correctamente o falló):

y el detalle de cada ejecución:

Esta instalación standalone es útil para desarrollo y un poco más cercana a lo que tendremos en producción que la instalación por defecto.
NOTA: Si esto es lo que necesitas, publiqué en GIST un script que genera este ambiente:


#!/bin/bash
USUARIO_SO="$(whoami)"
ANACONDA_URL="https://repo.anaconda.com/archive/Anaconda3-5.2.0-Linux-x86_64.sh"
_DB_PASSWORD="la contraseña"
_IP=$(hostname -I | cut -d' ' -f1)
while getopts "a:p:h" opt; do
case $opt in
a) ANACONDA_URL="$OPTARG";;
p) _DB_PASSWORD="$OPTARG";;
h) cat <<EOF
All arguments are optional
-a anaconda url
-p password for airflow postgres user
-h this help
EOF
exit 0;
;;
\?) echo "Invalid option -$OPTARG" >&2
;;
esac
done
echo "Installation will be performed as $USUARIO_SO"
if [[ $(id -u) -eq 0 ]] ; then echo "This script must not be excecuted as root or using sudo(althougth the user must be sudoer and password will be asked in some steps)" ; exit 1 ; fi
#Prerequisites installation:
while sudo fuser /var/{lib/{dpkg,apt/lists},cache/apt/archives}/lock >/dev/null 2>&1; do
echo "Waiting while other process ends installs (dpkg/lock is locked)"
sleep 1
done
sudo apt update && sudo apt upgrade -y
sudo apt install -y openssh-server git wget htop postgresql postgresql-client postgresql-contrib
if ! hash conda &> /dev/null; then
mkdir -p ~/instaladores && wget -c -P "$HOME/instaladores" "$ANACONDA_URL"
bash "$HOME/instaladores/${ANACONDA_URL##*/}" -b -p "$HOME/anaconda2"
export PATH="$HOME/anaconda2/bin:$PATH"
echo "export PATH='$HOME/anaconda2/bin:$PATH'">>"$HOME/.bashrc"
fi
conda install -y psycopg2
conda install -y -c conda-forge airflow "celery<4"
if [[ -z "${AIRFLOW_HOME}" ]]; then
export AIRFLOW_HOME="$HOME/airflow"
echo "export AIRFLOW_HOME='$HOME/airflow'" >>"$HOME/.bashrc"
fi
airflow initdb
sudo -u postgres createdb airflow
sudo -u postgres createuser airflow
sudo -u postgres psql airflow -c "alter user airflow with encrypted password '$_DB_PASSWORD';"
sudo -u postgres psql airflow -c "grant all privileges on database airflow to airflow;"
#Configurar postgresql para que admita conexiones remotas
_HBA=$(sudo -u postgres psql -t -P format=unaligned -c 'show hba_file')
_CONFIG=$(sudo -u postgres psql -t -P format=unaligned -c 'show config_file')
mkdir -p "$HOME/pg_backup"
sudo cp "$_HBA" "$HOME/pg_backup"
cp "$_CONFIG" "$HOME/pg_backup"
sudo su -c "echo 'host all all 0.0.0.0/0 md5' >>$_HBA"
sudo sed -i "s/#listen_addresses = 'localhost'/listen_addresses = '*'/" "$_CONFIG"
sudo systemctl restart postgresql.service
sed -i "s%sql_alchemy_conn.*%sql_alchemy_conn = postgresql+psycopg2://airflow:$_DB_PASSWORD@$_IP:5432/airflow%" "$AIRFLOW_HOME/airflow.cfg"
sed -i "s%executor =.*%executor = LocalExecutor%" "$AIRFLOW_HOME/airflow.cfg"
mkdir -p "$AIRFLOW_HOME/dags"
cat <<EOF >"$AIRFLOW_HOME/dags/dummy.py"
import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import timedelta
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='example_dummy', default_args=args,
schedule_interval=None,
dagrun_timeout=timedelta(minutes=1))
run_this_last = DummyOperator(task_id='DOES_NOTHING', dag=dag)
EOF
airflow initdb
airflow scheduler -D
airflow webserver -p 8080 -D

 

Instalación con Celery

Para hacer que airflow use como workers a los nodos de celery, basta con:

  1. Tener un cluster de celery funcional, usando como backend de resultados rabbitMQ o Redis. Airflow y celery son proyectos en desarrollo por lo que es importante verificar que las versiones sean compatibles entre los dos. En CDCol usamos celery version 3.x por lo que debemos usar una versión de airflow menor a 1.9 para poder tener más de un ejecutor sin inconvenientes.
  2. Modificar el archivo de configuración de airflow para que use el ejecutor `CeleryExecutor`.
  3. Instalar airflow en todos los nodos de celery. La configuración de airflow en todos los nodos debe ser homogénea.
  4. Asegurarse que las carpetas dags y plugins de airflow estén sincronizadas en todos los nodos.

Para este artículo, usaré Redis como backend de resultados y RabbitMQ como manejador de colas de mensajes.  Estarán instalados en la máquina en la que se ejecutará el scheduler y el webserver de airflow, sin embargo pueden estar en cualquier máquina que sea alcanzable por los nodos.

sudo apt install redis-server
sudo apt install rabbitmq-server

#Crear un usuario y un vhost para la cola de rabbit, habilitar el plugin para el monitoreo (requerido si se va a usar flower)

sudo rabbitmqctl add_user airflow airflow
sudo rabbitmqctl add_vhost airflow
sudo rabbitmqctl set_user_tags airflow airflow_tag administrator
sudo rabbitmqctl set_permissions -p airflow airflow ".*" ".*" ".*"
sudo rabbitmq-plugins enable rabbitmq_management

conda install redis-py
conda install -c conda-forge "celery<4" flower

Modificamos la configuración de celery para que use el ejecutor de celery, para que use redis server como backend de resultados y rabbitMQ como broker.

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
executor = CeleryExecutor


# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
broker_url = amqp://airflow:airflow@10.0.2.15/airflow

# Another key Celery setting
celery_result_backend = redis://10.0.2.15:6379/0

En todas las máquinas que tendrán workers de celery debe estar instalado airflow, celery y redis-py.

conda install redis-py
conda install -c conda-forge "airflow<1.9" "celery<4"

También debemos tener un mecanismo para sincronizar las carpetas dags y plugins de airflow, la opción más sencilla es usar un punto de montaje NFS (la ubicación de estos directorios se especifica en airflow.cfg). Es importante notar que el ambiente de python también debe mantenerse homogéneo.

Una vez tenemos todo configurado ejecutamos en la máquina maestra el webserver y el scheduler, y en los nodos que ejecutarán el trabajo airflow worker.


#En la máquina maestra
airflow webserver -p 8080 -D
airflow scheduler -D
#opcional:

airflow flower -D


#En cada nodo
airflow worker -D

Podemos resumir el proceso en dos scripts, uno para la máquina maestra (que en este caso también tiene un worker) y otro para los workers:


#!/bin/bash
USUARIO_SO="$(whoami)"
ANACONDA_URL="https://repo.anaconda.com/archive/Anaconda3-5.2.0-Linux-x86_64.sh"
_DB_PASSWORD="la contraseña"
_IP=$(hostname -I | cut -d' ' -f1)
while getopts "a:p:h" opt; do
case $opt in
a) ANACONDA_URL="$OPTARG";;
p) _DB_PASSWORD="$OPTARG";;
h) cat <<EOF
All arguments are optional
-a anaconda url
-p password for airflow postgres user
-h this help
EOF
exit 0;
;;
\?) echo "Invalid option -$OPTARG" >&2
;;
esac
done
echo "Installation will be performed as $USUARIO_SO"
if [[ $(id -u) -eq 0 ]] ; then echo "This script must not be excecuted as root or using sudo(althougth the user must be sudoer and password will be asked in some steps)" ; exit 1 ; fi
#Prerequisites installation:
while sudo fuser /var/{lib/{dpkg,apt/lists},cache/apt/archives}/lock >/dev/null 2>&1; do
echo "Waiting while other process ends installs (dpkg/lock is locked)"
sleep 1
done
sudo apt update && sudo apt upgrade -y
sudo apt install -y openssh-server git wget htop postgresql postgresql-client postgresql-contrib redis-server rabbitmq-server
if ! hash conda &> /dev/null; then
mkdir -p ~/instaladores && wget -c -P "$HOME/instaladores" "$ANACONDA_URL"
bash "$HOME/instaladores/${ANACONDA_URL##*/}" -b -p "$HOME/anaconda2"
export PATH="$HOME/anaconda2/bin:$PATH"
echo "export PATH='$HOME/anaconda2/bin:$PATH'">>"$HOME/.bashrc"
fi
conda install -y psycopg2 redis-py
conda install -y -c conda-forge "airflow<1.9" "celery<4" flower
if [[ -z "${AIRFLOW_HOME}" ]]; then
export AIRFLOW_HOME="$HOME/airflow"
echo "export AIRFLOW_HOME='$HOME/airflow'" >>"$HOME/.bashrc"
fi
airflow initdb
sudo -u postgres createdb airflow
sudo -u postgres createuser airflow
sudo -u postgres psql airflow -c "alter user airflow with encrypted password '$_DB_PASSWORD';"
sudo -u postgres psql airflow -c "grant all privileges on database airflow to airflow;"
#Configurar postgresql para que admita conexiones remotas
_HBA=$(sudo -u postgres psql -t -P format=unaligned -c 'show hba_file')
_CONFIG=$(sudo -u postgres psql -t -P format=unaligned -c 'show config_file')
mkdir -p "$HOME/pg_backup"
sudo cp "$_HBA" "$HOME/pg_backup"
cp "$_CONFIG" "$HOME/pg_backup"
sudo su -c "echo 'host all all 0.0.0.0/0 md5' >>$_HBA"
sudo sed -i "s/#listen_addresses = 'localhost'/listen_addresses = '*'/" "$_CONFIG"
sudo systemctl restart postgresql.service
sed -i "s%sql_alchemy_conn.*%sql_alchemy_conn = postgresql+psycopg2://airflow:$_DB_PASSWORD@$_IP:5432/airflow%" "$AIRFLOW_HOME/airflow.cfg"
sed -i "s%executor =.*%executor = CeleryExecutor%" "$AIRFLOW_HOME/airflow.cfg"
sed -i "s%broker_url =.*%broker_url = amqp://airflow:airflow@$_IP/airflow%" "$AIRFLOW_HOME/airflow.cfg"
sed -i "s%celery_result_backend =.*%celery_result_backend = redis://$_IP:6379/0%" "$AIRFLOW_HOME/airflow.cfg"
sudo sed -i "s%bind .*%bind $_IP%" "/etc/redis/redis.conf"
sudo service redis restart
sudo rabbitmqctl add_user airflow airflow
sudo rabbitmqctl add_vhost airflow
sudo rabbitmqctl set_permissions -p airflow airflow ".*" ".*" ".*"
sudo rabbitmq-plugins enable rabbitmq_management
sudo rabbitmqctl set_user_tags airflow airflow_tag administrator
sudo service rabbitmq-server restart
sudo apt-get install -y nfs-kernel-server
sudo mkdir -p /nfs-share/{dags,plugins}
sudo chown -R "$USUARIO_SO" /nfs-share
sudo sh -c "echo '/nfs-share *(ro,sync,no_subtree_check)'>> /etc/exports"
sudo service nfs-kernel-server restart
ln -s /nfs-share/dags "$AIRFLOW_HOME/dags"
ln -s /nfs-share/plugins "$AIRFLOW_HOME/plugins"
cat <<EOF >"$AIRFLOW_HOME/dags/dummy.py"
import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import timedelta
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='example_dummy', default_args=args,
schedule_interval=None,
dagrun_timeout=timedelta(minutes=1))
run_this_last = DummyOperator(task_id='DOES_NOTHING', dag=dag)
EOF
echo "You must open the appropriated ports to allow workers to connect to postgres, redis, rabbit and nfs"
airflow initdb
airflow scheduler -D
airflow webserver -p 8080 -D
airflow worker -D
echo "you should do a source .bashrc to reload the environment variables"


#!/bin/bash
USUARIO_SO="$(whoami)"
ANACONDA_URL="https://repo.anaconda.com/archive/Anaconda3-5.2.0-Linux-x86_64.sh"
_DB_PASSWORD="la contraseña"
while getopts "a:p:h" opt; do
case $opt in
a) ANACONDA_URL="$OPTARG";;
p) _DB_PASSWORD="$OPTARG";;
h) cat <<EOF
Use AirflowWorker.sh [options] master_address
All arguments are optional
-a anaconda url
-p password for airflow postgres user
-h this help
EOF
exit 0;
;;
\?) echo "Invalid option –$OPTARG" >&2
;;
esac
done
_IP=${@:$OPTIND:1}
if [ -z "$_IP" ]
then
echo "You must provide the master address"
exit 1
fi
echo "Installation will be performed as $USUARIO_SO"
if [[ $(id -u) -eq 0 ]] ; then echo "This script must not be excecuted as root or using sudo(althougth the user must be sudoer and password will be asked in some steps)" ; exit 1 ; fi
#Prerequisites installation:
while sudo fuser /var/{lib/{dpkg,apt/lists},cache/apt/archives}/lock >/dev/null 2>&1; do
echo "Waiting while other process ends installs (dpkg/lock is locked)"
sleep 1
done
sudo apt update && sudo apt upgrade -y
sudo apt install -y openssh-server git wget htop
if ! hash conda &> /dev/null; then
mkdir -p ~/instaladores && wget -c -P "$HOME/instaladores" "$ANACONDA_URL"
bash "$HOME/instaladores/${ANACONDA_URL##*/}" -b -p "$HOME/anaconda2"
export PATH="$HOME/anaconda2/bin:$PATH"
echo "export PATH='$HOME/anaconda2/bin:$PATH'">>"$HOME/.bashrc"
fi
conda install -y psycopg2 redis-py
conda install -y -c conda-forge "airflow<1.9" "celery<4"
if [[ -z "${AIRFLOW_HOME}" ]]; then
export AIRFLOW_HOME="$HOME/airflow"
echo "export AIRFLOW_HOME='$HOME/airflow'" >>"$HOME/.bashrc"
fi
airflow initdb
#Configurar postgresql para que admita conexiones remotas
sed -i "s%sql_alchemy_conn.*%sql_alchemy_conn = postgresql+psycopg2://airflow:$_DB_PASSWORD@$_IP:5432/airflow%" "$AIRFLOW_HOME/airflow.cfg"
sed -i "s%executor =.*%executor = CeleryExecutor%" "$AIRFLOW_HOME/airflow.cfg"
sed -i "s%broker_url =.*%broker_url = amqp://airflow:airflow@$_IP/airflow%" "$AIRFLOW_HOME/airflow.cfg"
sed -i "s%celery_result_backend =.*%celery_result_backend = redis://$_IP:6379/0%" "$AIRFLOW_HOME/airflow.cfg"
sudo apt-get install -y nfs-common
sudo mkdir -p /nfs-share/
sudo chown -R "$USUARIO_SO" /nfs-share
sudo sh -c "echo '$_IP:/nfs-share /nfs-share nfs auto,user,exec,ro,async,atime 0 0'>> /etc/fstab"
sudo mount -a
ln -s /nfs-share/dags "$AIRFLOW_HOME/dags"
ln -s /nfs-share/plugins "$AIRFLOW_HOME/plugins"
airflow worker -D

 

Para monitorear los workers de celery podemos usar flower, ejecutándolo con airflow flower -D y accediendo por el puerto 5555.

Definición de un Workflow

Una vez tenemos instalado airflow, lo siguiente que queremos hacer es definir un workflow. Airflow soporta únicamente grafos dirigidos acíclicos, lo que permite asegurar que, si todas las tareas terminan correctamente, el workflow termina (no hay ciclos infinitos). Los archivos de definición de DAGs en airflow son scripts de python.

El scheduler de airflow evalúa (ejecuta) periódicamente los archivos de la carpeta de DAGs, identificando cambios en la definición de los mismos. Es por esta razón que cada archivo de definición debe poder ejecutarse en segundos, de otro modo afectaría el rendimiento general de airflow.

En un archivo de definición de DAG se crea un objeto DAG y un conjunto de operadores que tienen dependencias entre ellos. El DAG debe tener un id, y puede definir una programación (cada cuánto debe ejecutarse), un timeout, una fecha de inicio, argumentos por defecto (que serán pasados a todas las tareas), y documentación. El DAG agrupa operadores.

Un operador describe una tarea. Se pueden agrupar en 3 categorías, según su objetivo: Operadores que ejecutan una acción, que transfieren datos, o sensores. Airflow tiene operadores que pueden cubrir la mayoría de necesidades, como BashOperator, PythonOperator, PostgresOperatorHTTPOperator`, entre otros.

Un ejemplo de workflow, que muestra el manejo de las dependencias, es:


import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

from datetime import timedelta

def print_value(ds, value):
    print(value)
#Argumentos por defecto. 
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
#Definición del DAG
dag = DAG(
dag_id='test_print_values', default_args=args,
schedule_interval=None,
dagrun_timeout=timedelta(minutes=5))

#Tareas Dummy que pertenecen al DAG
start_with = DummyOperator(task_id='PRIMERA_TAREA', dag=dag)

end_with = DummyOperator(task_id='TAREA_FINAL', dag=dag)

#Crear 5 Tareas que se ejecutarán después de start_with y antes de end_with
#Se pueden usar los operadores >> y << para expresar las dependencias. for i in range(5): _po=PythonOperator(task_id="echo_{}".format(i), python_callable=print_value, op_kargs={'value':i}, dag=dag ) start_with>>_po>>end_with

Esta definición genera el workflow de la siguiente imagen:

Y al ejecutar el DAG se puede ver el progreso, donde se muestra que la tarea final sólo se ejecuta una vez todas las tareas anteriores han terminado:

 

Por defecto, los nuevos dags se crean pausados, no se ejecutarán hasta que se activen por la interfaz web o usando la interfaz de línea de comando. Un DAG que no tenga schedule_interval (o sea None), sólo se ejecutará al lanzarlo explícitamente desde la web o usando trigger_dag.


airflow trigger_dag test_print_values

Definición de un plugin

 

Aunque los operadores incluidos en Airflow son bastante genéricos, muchas veces necesitamos utilidades adicionales, nuevos tipos de operadores, o vistas web. Para esto, Airflow permite definir plugins. Un plugin se define con un módulo de python en la carpeta de plugins. Un plugin se define con una clase extiende airflow.plugins_manager.AirflowPlugin y define conjuntos de operadores, ejecutores, vistas, “hooks“, macros y links de menú.

Cada uno de esos elementos se define como una clase que extiende una base. Por ejemplo, un operador se define como una clase que extiende airflow.models.BaseOperator y un hook a airflow.hooks.base_hook.

Por ejemplo, podríamos definir un plugin con un operador que reciba una carpeta y genere un archivo tar.gz con la salida. Para eso definimos el operador, extendiendo de BaseOperator, con su constructor y el método execute, que es el que realiza el trabajo.

import tarfile
import os
from airflow.models import BaseOperator
from airflow import utils as airflow_utils

class TarGzOperator(BaseOperator):
    @airflow_utils.apply_defaults
    def __init__(self, to_compress, output_folder, *args,**kwargs):
        super().__init__(*args, **kwargs)
        if to_compress is None or output_folder is None: 
            raise Exception("You must specify the input and output folder")
        if to_compress == output_folder:
            raise Exception("to compress folder cannot be the output folder")
        if os.path.exists(output_folder) and not os.path.isdir(output_folder):
            raise Exception("output_folder must be a folder")
        self.to_compress = to_compress
        self.output_folder = output_folder
        
    
    def execute(self, context):
        print(context)
        _filename= os.path.basename(os.path.normpath(self.to_compress))
        _output_file=os.path.join(self.output_folder, _filename+str(".tar.gz") )
        if not os.path.exists(self.output_folder):
            os.makedirs(self.output_folder)
        with tarfile.open(_output_file,"w:gz" ) as _tar:
            _tar.add(self.to_compress)
            
        
        return _output_file

Luego es necesario definir el plugin :


from airflow.plugins_manager import AirflowPlugin

#si la definición del operador se hace en un archivo diferente,
# sería necesario hacer el import

class TarGzPlugin(AirflowPlugin):
    name = 'targz_plugin'
    operators=[TarGzOperator]

Al publicar el archivo en la carpeta plugins de airflow, y reiniciar tanto el webserver como el el scheduler, el nuevo plugin está disponible. Se pueden definir DAGs que lo usen:

import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import TarGzOperator

from datetime import timedelta

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='test_plugin', default_args=args,
    schedule_interval=None,
    dagrun_timeout=timedelta(minutes=5))

dummy = DummyOperator(task_id='DOES_NOTHING', dag=dag)

tarBackups = TarGzOperator(to_compress='/home/cronosnull/airflow',output_folder='/home/cronosnull',task_id='targz',dag=dag)

dummy >> tarBackups

Al habilitar y ejecutar el DAG desde la interfaz web se generará el archivo tar.gz esperado y se puede ver que el valor retornado por el operador queda registrado en XCom (lo que permitirá usarlo desde una siguiente tarea).

Airflow es una herramienta extensible, que se adapta a escenarios diversos. Espero que este artículo sea útil para quien está empezando a explorarla, le ahorre algo de tiempo y le despierte nuevas dudas -mientras lo escribía resolví algunas-.

Consonancia

El jazz amenizaba esa noche lluviosa y sin luna. La estrecha calle empedrada, a la salida del café, se sentía acogedora y extrañamente vacía.

Estaba fascinado por las dos imponentes columnas al final del camino, pero aún más por María, que había empezado a caminar bajo la lluvia y ahora lo miraba, con su hipnótica sonrisa, y lo invitaba a acompañarla. La luz amarilla y tenue de las lamparas, la melodía y armonía de la música y María, todo parecía producto de un sueño y sin embargo ahí estaba, pensando en que la acompañaría bajo la lluvia toda la vida.

Foto de Pedro Szekely (CC BY-SA 2.0)

Sentencia

life

Durante una noche de julio de 2015 tuve un sueño. Soñé que estaba sentenciado a muerte, y estaba en un cuarto esperando a que se llevara a cabo la sentencia. Durante la espera podía recibir visitas, siempre había a mi lado dos personas que iban cambiando. Vi personas que me han acompañado en diferentes etapas de la vida, que he querido y quiero, aunque a algunas hace años no las vea ni sepa de ellas.

Fue un sueño alegre, aunque el tema no lo indique.

El día siguiente quise escribir algo sobre él, pero las ocupaciones del curso intersemestral que estaba dictando, que tenía clase en la mañana y en la tarde, me distrajeron. Almorcé con una de las personas que vi en el sueño, pensé en contarle de él, pero sentí que no era un buen momento. Hice una pequeña anotación en mi agenda para recordarlo, para escribir cuando llegara a casa, “La vida es una sentencia a muerte”. Aunque llegué a casa con intención de hacerlo tuve que cambiar de planes, una parálisis facial, que probablemente tuve desde medio día y explicaría la lágrima que quería salir durante la sesión de la tarde en la clase de una forma menos romántica de lo que quisieran mis estudiantes, me obligó a pasar la noche en una sala de urgencias.

life

Con las terapias y citas médicas que siguieron, olvidé escribir sobre el sueño. Ahora, 2 años después, escribo esto porque llevo algunos meses recordándolo con frecuencia, probablemente por Gabriela.

Hay personas que sólo necesitan una palabra para volverse memorables. Personas que sorprenden por su sabiduría, su motivación o su energía.

La conocí durante las pocas semanas que fue estudiante de uno de los cursos que dicté el año pasado. A pesar de haber tenido que faltar a algunas de las clases, por la misma razón que al final tuvo que cancelar el semestre, mostraba una motivación e iniciativa desmedida. A pesar del corto tiempo compartido, su muerte, en enero de este año, me llenó de tristeza. Es un recordatorio de lo importante de la actitud frente a la vida, del impacto que puede tener en quienes la comparten con nosotros, pero también de que es efímera.

Mordiendo Hadoop – Instalación con Ambari

Han pasado casi 6 años desde los artículos de Mordiendo Hadoop: Mordiendo Hadoop: Instalación y primeras pruebasMordiendo Hadoop: Instalación en Cluster y Mordiendo Hadoop: Desarrollo de aplicaciones MapReduce. En ese tiempo el ecosistema de Hadoop se ha enriquecido, su arquitectura ha cambiado y su uso se ha extendido, convirtiéndose en la referencia de tecnología para Big Data. En estos años hemos realizado varias instalaciones de Hadoop en el grupo de Ingeniería de Información de la universidad, tanto para los cursos de maestría como para proyectos de pregrado y postgrado.

Desde hace poco más de 3 años, usamos la distribución de HortonWorks, HDP, por su facilidad de instalación (a través de ambari), por ser de código abierto y por su estabilidad. Cada vez es más fácil realizar la instalación, pero todo depende de la configuración previa de los nodos. En este artículo describiré la instalación de HDP, los pasos preliminares y las lecciones aprendidas que pueden ser aplicadas a nuevas instalaciones ( una traducción-resumen de la documentación oficial, con algunos consejos).

Preparación

Antes de iniciar la instalación de la plataforma HDP, debemos asegurarnos de cumplir con unos prerrequisitos:

  1. Tener instalado en los nodos un sistema operativo soportado (Recomendados para HDP 2.3: Centos 6.x o 7.x). Consejo: Instalar el sistema operativo en inglés, ambari en ocasiones tiene problemas con la codificación de caracteres cuando el sistema operativo tiene un idioma diferente.
  2. Desde la que será la máquina host de ambari el usuario root debe poder autenticarse sin contraseña a las otras máquinas del cluster.
  3. Todas las máquinas deberían tener la hora sincronizada (usando ntp), el servicio de ntp debe estar configurado para iniciar al iniciar el sistema operativo.
  4. Todas las máquinas deben ser alcanzables por nombre de dominio. (DNS es la mejor opción, si no es posible se puede lograr agregándolas al /etc/hosts de todas las máquinas).
  5. Cada máquina debe tener correctamente configurado su FQDN (es una continuación del punto anterior). En especial, si la máquina tiene más de una interfaz de red debemos asegurarnos que siempre resuelva su nombre de dominio con la misma dirección IP (y que sea la que está en el segmento del cluster).
  6. El número máximo de descriptores de archivos abiertos (el número máximo de archivos abiertos de forma simultanea) debe ser de, como mínimo, 10000 (el recomendado es 65536 ). Se puede verificar usando ulimit -Snulimit -Hn (Límite suave y fuerte respectivamente) y se puede cambiar usando ulimit -n 10000.
  7. Desactivar SELinux  y configurar el firewall para que las máquinas del cluster puedan comunicarse entre ellas por los puertos apropiados, y permitir el acceso de los usuarios a los servicios.  Se recomienda desactivar el firewall durante el proceso de instalación, e iniciarlo y configurarlo apropiadamente una vez la instalación esté completa.
  8. Debemos tener una idea de cómo vamos a distribuir los servicios en el cluster (en qué máquinas tendremos los servicios maestros y en cuáles tendremos los esclavos y los clientes).

Instalación

Antes de iniciar la instalación debemos determinar cuál será la máquina dedicada como host de Ambari. En esta máquina debemos:

  1. Configurar el repositorio:
    wget -nv http://public-repo-1.hortonworks.com/ambari/centos7/2.x/updates/2.1.0/ambari.repo -O /etc/yum.repos.d/ambari.repo
    
  2. Instalar ambari-server:
    yum install ambari-server
  3. Configurar ambari-server:
    ambari-server setup
    1. Seleccione el JDK deseado (Oracle JDK 1.8 es una buena opción). Debe aceptar la licencia para continuar con la instalación.
    2. Si desea tener una manejador de bases de datos diferente a postgres para ambari, seleccione la configuración avanzada. En caso contrario acepte el valor por defecto (En ese caso se creará en Postgres una base de datos llamada ambari; El usuario por defecto será ambari y contraseña bigdata).
  4. Inicie el servidor ambari:
    ambari-server start
  5. Desde un navegador web ingresar al servidor de ambari, http://<fqdn del servidor de ambari>:8080
    • Ingresar con el nombre de usuario y contraseña por defecto (admin /admin).Bienvenida de ambari
    • Al ingresar podrá crear un nuevo cluster usando el asistente, manejar los usuarios y grupos que pueden acceder a ambari y desplegar vistas.
    • Al iniciar el asistente para la creación de un nuevo cluster le solicitará un nombre. Debe ser descriptivo, porque es la forma de identificar el cluster en ambari.
      ambari2
    • El segundo paso del asistente es seleccionar la versión de HDP a instalar.  Si las máquinas tendrán conexión a Internet durante la instalación sólo es necesario indicar la versión, en caso contrario es necesario abrir las opciones avanzadas y configurar la dirección del repositorio local que se utilizará.
      ambari3
    • En el siguiente paso se agregan los nombres de las máquinas que harán parte del cluster, una por línea o utilizando expresiones.  También debe indicar la llave privada ssh autorizada para acceder a los nodos del cluster.  (Si realizó correctamente el paso 2 de la preparación, la llave privada probablemente estará en /root/.ssh/id_rsa del host de ambari)
      ambari4Si utilizó expresiones aparecerá una ventana confirmando los nombres
      ambari4.1
    • En el siguiente paso se instalará el agente de ambari en los hosts seleccionados y se pueden eliminar nodos que no se quieran incluir en el cluster.
      ambari5
    • A continuación es necesario seleccionar los servicios que se desean en el cluster.
      ambari6
    • Luego debemos seleccionar dónde quedaran los componentes maestros de estos servicios.
      ambari8
    • También seleccionar en qué nodos quedarán los componentes esclavos y los clientes de los servicios.
    • Antes de la revisión final y la instalación, debemos personalizar la configuración de los servicios. Ambari hace recomendaciones de acuerdo a las características de los nodos, que pueden ser usados como valores por defecto, sin embargo conviene revisarlas y es necesario completar las configuraciones que aparecen marcadas en rojo (para las cuales no hay un valor pre-establecido).
      ambari9
    • Luego se realiza una revisión final y se procede a la instalación, configuración y prueba de los servicios.
      ambari10
      ambari11
      El proceso de instalación puede tomar varios minutos, pero es completamente automático. Si hay una falla en alguno de los procesos se puede hacer clic en el mensaje para obtener el log (para determinar la causa). Son comunes los problemas de conexión con los servidores espejo, por lo que la solución más común es reintentar la instalación. Al finalizar verá el resumen de la instalación y podrá ir al tablero de control, en el cual puede ver cuáles servicios están activos, cuáles inactivos, los recursos utilizados, ver las alertas y realizar tareas de administración (por ejemplo, agregar un nuevo servicio al cluster, iniciar o detener un servicio y modificar la configuración).
      ambari12
      ambari13
      ambari14

El proceso de instalación es sencillo, semiautomático y permite tener rápidamente un entorno funcional. Sin embargo, se debe tener en cuenta que esta instalación es sólo un punto de arranque y se deben definir políticas para el uso de los recursos (generalmente más de un usuario utilizará el cluster) e ir ajustando la configuración de acuerdo a las métricas obtenidas.

 

Repitentes

Camino

Cuando era pequeño, en la casa de mi abuela, en un libro que nunca volví a encontrar y que no sé cómo llegó a mis manos, leí sobre una interpretación del universo (probablemente influenciada por doctrinas filosóficas de India) que afirmaba que la verdadera forma de los seres humanos es el alma inmortal y su objetivo es alcanzar la sabiduría y, a través de ella, la iluminación.

Camino

Según esa interpretación la existencia en este mundo es una forma de aprender. El alma encarna tantas veces como sea necesario para aprender de sus vivencias y llegar a la sabiduría.  Almas jóvenes conviven con almas que han pasado por varias vidas y que han acumulado conocimiento, pero también corrupción. En todas las vidas se aprende, pero este conocimiento puede llevar a la iluminación o a la perversión. En ocasiones, algunos seres que han alcanzado la iluminación deciden volver para enseñarle el camino a aquellos que quedan atrás.

Hace poco estaba pensando que, según este modelo, el mundo va en decadencia: es un curso lleno de estudiantes problemáticos, en la que la voz de los guías se pierde entre el ruido.

15 años después del Cluetrain Manifesto algunas empresas aún no entienden la red.

Hace unos días tuve algunas conversaciones con un tema en común: el uso de las redes sociales por parte de las empresas. Me parece interesante que muchos de los errores que se cometen en las redes sociales en realidad son los mismos que se vienen cometiendo por no entender los cambios que se produjeron gracias a Internet, en especial con la llegada de la Web 2.0, y que en 1999 llevaron a la publicación de las 95 tesis (esp) del Cluetrain Manifesto (que fueron extendidas en el libro homónimo publicado en el 2000).

Los problemas siguen siendo los mismos: tratar de usar la red como un medio masivo, sin escuchar, sin comprender que los mercados son conversaciones y que se necesitan una voz humana; tratar de poner barreras a la información sin considerar que los contenidos no se generan únicamente en el departamento de marketing, ahora todos los empleados y todos los usuarios tienen voz; olvidar que un comunicado de prensa no es una conversación.

 La solución no está en contratar a aquel usuario de redes sociales con muchos seguidores, que no sabe nada sobre el funcionamiento de la empresa y no puede generar conversaciones valiosas sobre los productos o servicios. Podemos encontrar las voces adecuadas dentro de la organización, que conozcan el dominio y puedan actuar de manera rápida, atendiendo las peticiones de los usuarios. Los directivos de la organización también deberían comunicarse usando los medios sociales, hay algunos ejemplos notables, la gente quiere escuchar lo que tienen que decir.

En la red los usuarios tienen el poder, no hay público cautivo. Los contenidos publicados deben aportar valor y generar conversaciones para conseguir y mantener un lugar en los timelines.

La información debe fluir rápidamente, no se puede tapar el sol con un dedo. Esto es especialmente cierto cuando se presentan problemas, el tratar de ocultarlos en un entorno en el que no hay una única fuente de información sólo hace que se pierda la confianza.

El impacto en las redes sociales no se debe medir con el número de seguidores o likes sino, entre otras cosas, con la satisfacción de los clientes que se comunican usando estos canales.

 Para las empresas es mucho más cómodo pensar en medios masivos, públicos cautivos e información controlada, pero esto es sólo una ilusión y cuanto más se mantenga más daño hará y será más difícil recuperar la confianza y la comunicación con los clientes. 15 años debería ser tiempo suficiente para aprender esta lección.

Ladrones del silencio

Foto por Blake Bronstad, https://stocksnap.io/photo/KDW6ROR4WF, licencia CC0

Foto por Blake Bronstad, https://stocksnap.io/photo/KDW6ROR4WF, licencia CC0

Estamos rodeados por pantallas, parlantes y multitudes, que nos roban el silencio. Es poco el tiempo en el que en realidad estamos solos, y la mayor parte de ese tiempo estamos buscando la forma de evitarlo. Conversar con un autor a través de un libro, perdernos en una historia o en la música, muchas veces es sólo una excusa para no sentirnos encerrados en nuestros pensamientos. Nos roban el silencio… y lo agradecemos.

Es un miedo aprendido. Hemos perdido la habilidad de hablar con nosotros mismos, tenemos un temor inmenso de lo que podríamos decirnos. “Conócete a ti mismo”, pero no mucho, corres el riesgo de descubrir que no te soportas.

Este temor nos lleva a depender de las relaciones con otros, relaciones que están basadas en una proyección de nuestra identidad a través del prisma de las expectativas comunes.

Escucharnos nos permitiría tener una idea de quienes somos, dejando de lado las valoraciones hechas por una sociedad que busca uniformarnos y de tribus a las que tendemos a adaptarnos. Tendríamos una idea clara de qué nos hace felices y cómo lo que hacemos va a favor o en contra de esa felicidad.

No todo nos gustará, veremos las sombras, los demonios, los miedos, todo aquello que hace parte de nuestra identidad. Es más fácil luchar contra un enemigo conocido.

Los niños hablan solos, viven en su propio universo, pero poco a poco empiezan a asociar ese comportamiento con algo inadecuado. El mundo nos roba el silencio y con él la identidad.