Airflow

Airflow – Instalación y primeras pruebas

Airflow es una plataforma para la definición, ejecución y monitoreo de flujos de trabajo (Workflows) creada por AirBnB, liberada bajo licencia Apache, y que desde hace unos años es un proyecto en incubación de Apache Software Foundation.

Como parte de la evolución de CDCol, evaluamos alternativas para la definición de workflows dinámicos. Después de evaluar alternativas entre las que se encontraban Luigi, Pinball y un desarrollo propio basado en Celery, la interfaz gráfica, la integración con Celery -que hace parte de la arquitectura actual CDCol-, y características como XCom, inclinaron la balanza a favor de Airflow.

Airflow tiene, conceptualmente, cuatro componentes: Un servidor web, un scheduler, ejecutores (con sus workers) y la base de datos.

Este artículo describirá la instalación de Airflow con Celery y las primeras pruebas, incluyendo la definición de workflows (DAGs) y plugins.

Instalación

Entorno

  • Máquina virtual con Ubuntu Server 18.04
  • Anaconda 3.4

Instalación Standalone

La instalación de airflow por defecto, usando el ejecutor local y la base de datos sqlite (que no es recomendable en entornos de producción), usando Anaconda, se reduce a:

export AIRFLOW_HOME="$HOME/airflow"
echo 'export AIRFLOW_HOME="$HOME/airflow"' >>$HOME/.bashrc
conda install -y -c conda-forge airflow
airflow initdb

La primera vez que se ejecuta, initdb crea el archivo de configuración $AIRFLOW_HOME/airflow.cfg con los valores por defecto y una base de datos sqllite con los valores por omisión. En este punto se puede ejecutar airflow webserver -p 8080 para verlo en acción en http://<dirección ip de la máquina>:8080.

Es una instalación funcional, en una sola máquina, sin embargo no se acerca a lo que queremos en un entorno de producción. Lo primero que vamos a hacer es cambiar el motor de base de datos que se utilizará para almacenar los metadatos. Para esto vamos a crear una nueva base de datos en postgresql y un usuario que pueda acceder de manera remota.

Después de matar el servidor web de airflow, instalamos las nuevas dependencias y creamos tanto la base de datos como el usuario (se puede ser mucho más creativo en los nombres 😛 ):

sudo apt install -y  postgresql postgresql-client postgresql-contrib
conda install -y psycopg2

sudo -u postgres createdb airflow
sudo -u postgres createuser airflow
sudo -u postgres psql airflow -c "alter user airflow with encrypted password 'la contraseña'"
sudo -u postgres psql airflow -c "grant all privileges on database airflow to airflow;"

#Configurar postgresql para que admita conexiones remotas
_HBA=`sudo -u postgres psql -t -P format=unaligned -c 'show hba_file'`
_CONFIG=`sudo -u postgres psql -t -P format=unaligned -c 'show config_file'`
mkdir -p $HOME/pg_backup
sudo cp "$_HBA" $HOME/pg_backup
cp "$_CONFIG" $HOME/pg_backup

sudo su -c "echo 'host all all 0.0.0.0/0 md5' >>$_HBA"
sudo sed -i "s/#listen_addresses = 'localhost'/listen_addresses = '*'/" $_CONFIG

#Disclaimer: Escuchar desde todas las interfaces
#y permitir conexiones desde todas las direcciones
#no es una buena política

sudo systemctl restart postgresql.service

Una vez tenemos la base de datos creada, con los permisos adecuados, modificamos el archivo de configuración de airflow ($AIRFLOW_HOME/airflow.cfg) para que la utilice, modificando la propiedad sql_alchemy_conn. La cadena de conexión en este caso quedaría:

sql_alchemy_conn = postgresql+psycopg2://airflow:la contraseña@10.0.2.15:5432/airflow

En este mismo archivo, también es necesario cambiar el ejecutor, de SequentialExecutor a LocalExecutor


executor = LocalExecutor

Luego volvemos a ejecutar airflow initdb, para inicializar la base de datos. Si todo ha salido bien, la salida será similar a la imagen.

Debemos asegurarnos de que exista la carpeta dags dentro del home de airflow.

mkdir -p "$AIRFLOW_HOME/dags"

Por alguna razón el scheduler falla (al menos en la versión 1.9) si en la carpeta dags no hay por lo menos un DAG válido, así que, mientras creamos nuestros workflows, podemos crear un DAG dummy (dummy.py):

import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator

from datetime import timedelta

args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
dag_id='example_dummy', default_args=args,
schedule_interval=None,
dagrun_timeout=timedelta(minutes=1))

run_this_last = DummyOperator(task_id='DOES_NOTHING', dag=dag)

Luego podemos iniciar el scheduler y el servidor web (la opción -D hace que se ejecuten como demonios):

airflow scheduler -D
airflow webserver -p 8080

Podemos, por ejemplo, ejecutar uno de los DAGs. Es importante fijarnos que no esté pausado (debe estar en On), de otra forma no se ejecutará.

Podemos ver el estado (si está corriendo, terminó correctamente o falló):

y el detalle de cada ejecución:

Esta instalación standalone es útil para desarrollo y un poco más cercana a lo que tendremos en producción que la instalación por defecto.
NOTA: Si esto es lo que necesitas, publiqué en GIST un script que genera este ambiente:


#!/bin/bash
USUARIO_SO="$(whoami)"
ANACONDA_URL="https://repo.anaconda.com/archive/Anaconda3-5.2.0-Linux-x86_64.sh"
_DB_PASSWORD="la contraseña"
_IP=$(hostname -I | cut -d' ' -f1)
while getopts "a:p:h" opt; do
case $opt in
a) ANACONDA_URL="$OPTARG";;
p) _DB_PASSWORD="$OPTARG";;
h) cat <<EOF
All arguments are optional
-a anaconda url
-p password for airflow postgres user
-h this help
EOF
exit 0;
;;
\?) echo "Invalid option -$OPTARG" >&2
;;
esac
done
echo "Installation will be performed as $USUARIO_SO"
if [[ $(id -u) -eq 0 ]] ; then echo "This script must not be excecuted as root or using sudo(althougth the user must be sudoer and password will be asked in some steps)" ; exit 1 ; fi
#Prerequisites installation:
while sudo fuser /var/{lib/{dpkg,apt/lists},cache/apt/archives}/lock >/dev/null 2>&1; do
echo "Waiting while other process ends installs (dpkg/lock is locked)"
sleep 1
done
sudo apt update && sudo apt upgrade -y
sudo apt install -y openssh-server git wget htop postgresql postgresql-client postgresql-contrib
if ! hash conda &> /dev/null; then
mkdir -p ~/instaladores && wget -c -P "$HOME/instaladores" "$ANACONDA_URL"
bash "$HOME/instaladores/${ANACONDA_URL##*/}" -b -p "$HOME/anaconda2"
export PATH="$HOME/anaconda2/bin:$PATH"
echo "export PATH='$HOME/anaconda2/bin:$PATH'">>"$HOME/.bashrc"
fi
conda install -y psycopg2
conda install -y -c conda-forge airflow "celery<4"
if [[ -z "${AIRFLOW_HOME}" ]]; then
export AIRFLOW_HOME="$HOME/airflow"
echo "export AIRFLOW_HOME='$HOME/airflow'" >>"$HOME/.bashrc"
fi
airflow initdb
sudo -u postgres createdb airflow
sudo -u postgres createuser airflow
sudo -u postgres psql airflow -c "alter user airflow with encrypted password '$_DB_PASSWORD';"
sudo -u postgres psql airflow -c "grant all privileges on database airflow to airflow;"
#Configurar postgresql para que admita conexiones remotas
_HBA=$(sudo -u postgres psql -t -P format=unaligned -c 'show hba_file')
_CONFIG=$(sudo -u postgres psql -t -P format=unaligned -c 'show config_file')
mkdir -p "$HOME/pg_backup"
sudo cp "$_HBA" "$HOME/pg_backup"
cp "$_CONFIG" "$HOME/pg_backup"
sudo su -c "echo 'host all all 0.0.0.0/0 md5' >>$_HBA"
sudo sed -i "s/#listen_addresses = 'localhost'/listen_addresses = '*'/" "$_CONFIG"
sudo systemctl restart postgresql.service
sed -i "s%sql_alchemy_conn.*%sql_alchemy_conn = postgresql+psycopg2://airflow:$_DB_PASSWORD@$_IP:5432/airflow%" "$AIRFLOW_HOME/airflow.cfg"
sed -i "s%executor =.*%executor = LocalExecutor%" "$AIRFLOW_HOME/airflow.cfg"
mkdir -p "$AIRFLOW_HOME/dags"
cat <<EOF >"$AIRFLOW_HOME/dags/dummy.py"
import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import timedelta
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='example_dummy', default_args=args,
schedule_interval=None,
dagrun_timeout=timedelta(minutes=1))
run_this_last = DummyOperator(task_id='DOES_NOTHING', dag=dag)
EOF
airflow initdb
airflow scheduler -D
airflow webserver -p 8080 -D

 

Instalación con Celery

Para hacer que airflow use como workers a los nodos de celery, basta con:

  1. Tener un cluster de celery funcional, usando como backend de resultados rabbitMQ o Redis. Airflow y celery son proyectos en desarrollo por lo que es importante verificar que las versiones sean compatibles entre los dos. En CDCol usamos celery version 3.x por lo que debemos usar una versión de airflow menor a 1.9 para poder tener más de un ejecutor sin inconvenientes.
  2. Modificar el archivo de configuración de airflow para que use el ejecutor `CeleryExecutor`.
  3. Instalar airflow en todos los nodos de celery. La configuración de airflow en todos los nodos debe ser homogénea.
  4. Asegurarse que las carpetas dags y plugins de airflow estén sincronizadas en todos los nodos.

Para este artículo, usaré Redis como backend de resultados y RabbitMQ como manejador de colas de mensajes.  Estarán instalados en la máquina en la que se ejecutará el scheduler y el webserver de airflow, sin embargo pueden estar en cualquier máquina que sea alcanzable por los nodos.

sudo apt install redis-server
sudo apt install rabbitmq-server

#Crear un usuario y un vhost para la cola de rabbit, habilitar el plugin para el monitoreo (requerido si se va a usar flower)

sudo rabbitmqctl add_user airflow airflow
sudo rabbitmqctl add_vhost airflow
sudo rabbitmqctl set_user_tags airflow airflow_tag administrator
sudo rabbitmqctl set_permissions -p airflow airflow ".*" ".*" ".*"
sudo rabbitmq-plugins enable rabbitmq_management

conda install redis-py
conda install -c conda-forge "celery<4" flower

Modificamos la configuración de celery para que use el ejecutor de celery, para que use redis server como backend de resultados y rabbitMQ como broker.

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
executor = CeleryExecutor


# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
broker_url = amqp://airflow:airflow@10.0.2.15/airflow

# Another key Celery setting
celery_result_backend = redis://10.0.2.15:6379/0

En todas las máquinas que tendrán workers de celery debe estar instalado airflow, celery y redis-py.

conda install redis-py
conda install -c conda-forge "airflow<1.9" "celery<4"

También debemos tener un mecanismo para sincronizar las carpetas dags y plugins de airflow, la opción más sencilla es usar un punto de montaje NFS (la ubicación de estos directorios se especifica en airflow.cfg). Es importante notar que el ambiente de python también debe mantenerse homogéneo.

Una vez tenemos todo configurado ejecutamos en la máquina maestra el webserver y el scheduler, y en los nodos que ejecutarán el trabajo airflow worker.


#En la máquina maestra
airflow webserver -p 8080 -D
airflow scheduler -D
#opcional:

airflow flower -D


#En cada nodo
airflow worker -D

Podemos resumir el proceso en dos scripts, uno para la máquina maestra (que en este caso también tiene un worker) y otro para los workers:


#!/bin/bash
USUARIO_SO="$(whoami)"
ANACONDA_URL="https://repo.anaconda.com/archive/Anaconda3-5.2.0-Linux-x86_64.sh"
_DB_PASSWORD="la contraseña"
_IP=$(hostname -I | cut -d' ' -f1)
while getopts "a:p:h" opt; do
case $opt in
a) ANACONDA_URL="$OPTARG";;
p) _DB_PASSWORD="$OPTARG";;
h) cat <<EOF
All arguments are optional
-a anaconda url
-p password for airflow postgres user
-h this help
EOF
exit 0;
;;
\?) echo "Invalid option -$OPTARG" >&2
;;
esac
done
echo "Installation will be performed as $USUARIO_SO"
if [[ $(id -u) -eq 0 ]] ; then echo "This script must not be excecuted as root or using sudo(althougth the user must be sudoer and password will be asked in some steps)" ; exit 1 ; fi
#Prerequisites installation:
while sudo fuser /var/{lib/{dpkg,apt/lists},cache/apt/archives}/lock >/dev/null 2>&1; do
echo "Waiting while other process ends installs (dpkg/lock is locked)"
sleep 1
done
sudo apt update && sudo apt upgrade -y
sudo apt install -y openssh-server git wget htop postgresql postgresql-client postgresql-contrib redis-server rabbitmq-server
if ! hash conda &> /dev/null; then
mkdir -p ~/instaladores && wget -c -P "$HOME/instaladores" "$ANACONDA_URL"
bash "$HOME/instaladores/${ANACONDA_URL##*/}" -b -p "$HOME/anaconda2"
export PATH="$HOME/anaconda2/bin:$PATH"
echo "export PATH='$HOME/anaconda2/bin:$PATH'">>"$HOME/.bashrc"
fi
conda install -y psycopg2 redis-py
conda install -y -c conda-forge "airflow<1.9" "celery<4" flower
if [[ -z "${AIRFLOW_HOME}" ]]; then
export AIRFLOW_HOME="$HOME/airflow"
echo "export AIRFLOW_HOME='$HOME/airflow'" >>"$HOME/.bashrc"
fi
airflow initdb
sudo -u postgres createdb airflow
sudo -u postgres createuser airflow
sudo -u postgres psql airflow -c "alter user airflow with encrypted password '$_DB_PASSWORD';"
sudo -u postgres psql airflow -c "grant all privileges on database airflow to airflow;"
#Configurar postgresql para que admita conexiones remotas
_HBA=$(sudo -u postgres psql -t -P format=unaligned -c 'show hba_file')
_CONFIG=$(sudo -u postgres psql -t -P format=unaligned -c 'show config_file')
mkdir -p "$HOME/pg_backup"
sudo cp "$_HBA" "$HOME/pg_backup"
cp "$_CONFIG" "$HOME/pg_backup"
sudo su -c "echo 'host all all 0.0.0.0/0 md5' >>$_HBA"
sudo sed -i "s/#listen_addresses = 'localhost'/listen_addresses = '*'/" "$_CONFIG"
sudo systemctl restart postgresql.service
sed -i "s%sql_alchemy_conn.*%sql_alchemy_conn = postgresql+psycopg2://airflow:$_DB_PASSWORD@$_IP:5432/airflow%" "$AIRFLOW_HOME/airflow.cfg"
sed -i "s%executor =.*%executor = CeleryExecutor%" "$AIRFLOW_HOME/airflow.cfg"
sed -i "s%broker_url =.*%broker_url = amqp://airflow:airflow@$_IP/airflow%" "$AIRFLOW_HOME/airflow.cfg"
sed -i "s%celery_result_backend =.*%celery_result_backend = redis://$_IP:6379/0%" "$AIRFLOW_HOME/airflow.cfg"
sudo sed -i "s%bind .*%bind $_IP%" "/etc/redis/redis.conf"
sudo service redis restart
sudo rabbitmqctl add_user airflow airflow
sudo rabbitmqctl add_vhost airflow
sudo rabbitmqctl set_permissions -p airflow airflow ".*" ".*" ".*"
sudo rabbitmq-plugins enable rabbitmq_management
sudo rabbitmqctl set_user_tags airflow airflow_tag administrator
sudo service rabbitmq-server restart
sudo apt-get install -y nfs-kernel-server
sudo mkdir -p /nfs-share/{dags,plugins}
sudo chown -R "$USUARIO_SO" /nfs-share
sudo sh -c "echo '/nfs-share *(ro,sync,no_subtree_check)'>> /etc/exports"
sudo service nfs-kernel-server restart
ln -s /nfs-share/dags "$AIRFLOW_HOME/dags"
ln -s /nfs-share/plugins "$AIRFLOW_HOME/plugins"
cat <<EOF >"$AIRFLOW_HOME/dags/dummy.py"
import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import timedelta
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='example_dummy', default_args=args,
schedule_interval=None,
dagrun_timeout=timedelta(minutes=1))
run_this_last = DummyOperator(task_id='DOES_NOTHING', dag=dag)
EOF
echo "You must open the appropriated ports to allow workers to connect to postgres, redis, rabbit and nfs"
airflow initdb
airflow scheduler -D
airflow webserver -p 8080 -D
airflow worker -D
echo "you should do a source .bashrc to reload the environment variables"


#!/bin/bash
USUARIO_SO="$(whoami)"
ANACONDA_URL="https://repo.anaconda.com/archive/Anaconda3-5.2.0-Linux-x86_64.sh"
_DB_PASSWORD="la contraseña"
while getopts "a:p:h" opt; do
case $opt in
a) ANACONDA_URL="$OPTARG";;
p) _DB_PASSWORD="$OPTARG";;
h) cat <<EOF
Use AirflowWorker.sh [options] master_address
All arguments are optional
-a anaconda url
-p password for airflow postgres user
-h this help
EOF
exit 0;
;;
\?) echo "Invalid option -$OPTARG" >&2
;;
esac
done
_IP=${@:$OPTIND:1}
if [ -z "$_IP" ]
then
echo "You must provide the master address"
exit 1
fi
echo "Installation will be performed as $USUARIO_SO"
if [[ $(id -u) -eq 0 ]] ; then echo "This script must not be excecuted as root or using sudo(althougth the user must be sudoer and password will be asked in some steps)" ; exit 1 ; fi
#Prerequisites installation:
while sudo fuser /var/{lib/{dpkg,apt/lists},cache/apt/archives}/lock >/dev/null 2>&1; do
echo "Waiting while other process ends installs (dpkg/lock is locked)"
sleep 1
done
sudo apt update && sudo apt upgrade -y
sudo apt install -y openssh-server git wget htop
if ! hash conda &> /dev/null; then
mkdir -p ~/instaladores && wget -c -P "$HOME/instaladores" "$ANACONDA_URL"
bash "$HOME/instaladores/${ANACONDA_URL##*/}" -b -p "$HOME/anaconda2"
export PATH="$HOME/anaconda2/bin:$PATH"
echo "export PATH='$HOME/anaconda2/bin:$PATH'">>"$HOME/.bashrc"
fi
conda install -y psycopg2 redis-py
conda install -y -c conda-forge "airflow<1.9" "celery<4"
if [[ -z "${AIRFLOW_HOME}" ]]; then
export AIRFLOW_HOME="$HOME/airflow"
echo "export AIRFLOW_HOME='$HOME/airflow'" >>"$HOME/.bashrc"
fi
airflow initdb
#Configurar postgresql para que admita conexiones remotas
sed -i "s%sql_alchemy_conn.*%sql_alchemy_conn = postgresql+psycopg2://airflow:$_DB_PASSWORD@$_IP:5432/airflow%" "$AIRFLOW_HOME/airflow.cfg"
sed -i "s%executor =.*%executor = CeleryExecutor%" "$AIRFLOW_HOME/airflow.cfg"
sed -i "s%broker_url =.*%broker_url = amqp://airflow:airflow@$_IP/airflow%" "$AIRFLOW_HOME/airflow.cfg"
sed -i "s%celery_result_backend =.*%celery_result_backend = redis://$_IP:6379/0%" "$AIRFLOW_HOME/airflow.cfg"
sudo apt-get install -y nfs-common
sudo mkdir -p /nfs-share/
sudo chown -R "$USUARIO_SO" /nfs-share
sudo sh -c "echo '$_IP:/nfs-share /nfs-share nfs auto,user,exec,ro,async,atime 0 0'>> /etc/fstab"
sudo mount -a
ln -s /nfs-share/dags "$AIRFLOW_HOME/dags"
ln -s /nfs-share/plugins "$AIRFLOW_HOME/plugins"
airflow worker -D

 

Para monitorear los workers de celery podemos usar flower, ejecutándolo con airflow flower -D y accediendo por el puerto 5555.

Definición de un Workflow

Una vez tenemos instalado airflow, lo siguiente que queremos hacer es definir un workflow. Airflow soporta únicamente grafos dirigidos acíclicos, lo que permite asegurar que, si todas las tareas terminan correctamente, el workflow termina (no hay ciclos infinitos). Los archivos de definición de DAGs en airflow son scripts de python.

El scheduler de airflow evalúa (ejecuta) periódicamente los archivos de la carpeta de DAGs, identificando cambios en la definición de los mismos. Es por esta razón que cada archivo de definición debe poder ejecutarse en segundos, de otro modo afectaría el rendimiento general de airflow.

En un archivo de definición de DAG se crea un objeto DAG y un conjunto de operadores que tienen dependencias entre ellos. El DAG debe tener un id, y puede definir una programación (cada cuánto debe ejecutarse), un timeout, una fecha de inicio, argumentos por defecto (que serán pasados a todas las tareas), y documentación. El DAG agrupa operadores.

Un operador describe una tarea. Se pueden agrupar en 3 categorías, según su objetivo: Operadores que ejecutan una acción, que transfieren datos, o sensores. Airflow tiene operadores que pueden cubrir la mayoría de necesidades, como BashOperator, PythonOperator, PostgresOperatorHTTPOperator`, entre otros.

Un ejemplo de workflow, que muestra el manejo de las dependencias, es:


import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

from datetime import timedelta

def print_value(ds, value):
    print(value)
#Argumentos por defecto. 
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
#Definición del DAG
dag = DAG(
dag_id='test_print_values', default_args=args,
schedule_interval=None,
dagrun_timeout=timedelta(minutes=5))

#Tareas Dummy que pertenecen al DAG
start_with = DummyOperator(task_id='PRIMERA_TAREA', dag=dag)

end_with = DummyOperator(task_id='TAREA_FINAL', dag=dag)

#Crear 5 Tareas que se ejecutarán después de start_with y antes de end_with
#Se pueden usar los operadores >> y << para expresar las dependencias. for i in range(5): _po=PythonOperator(task_id="echo_{}".format(i), python_callable=print_value, op_kargs={'value':i}, dag=dag ) start_with>>_po>>end_with

Esta definición genera el workflow de la siguiente imagen:

Y al ejecutar el DAG se puede ver el progreso, donde se muestra que la tarea final sólo se ejecuta una vez todas las tareas anteriores han terminado:

 

Por defecto, los nuevos dags se crean pausados, no se ejecutarán hasta que se activen por la interfaz web o usando la interfaz de línea de comando. Un DAG que no tenga schedule_interval (o sea None), sólo se ejecutará al lanzarlo explícitamente desde la web o usando trigger_dag.


airflow trigger_dag test_print_values

Definición de un plugin

 

Aunque los operadores incluidos en Airflow son bastante genéricos, muchas veces necesitamos utilidades adicionales, nuevos tipos de operadores, o vistas web. Para esto, Airflow permite definir plugins. Un plugin se define con un módulo de python en la carpeta de plugins. Un plugin se define con una clase extiende airflow.plugins_manager.AirflowPlugin y define conjuntos de operadores, ejecutores, vistas, “hooks“, macros y links de menú.

Cada uno de esos elementos se define como una clase que extiende una base. Por ejemplo, un operador se define como una clase que extiende airflow.models.BaseOperator y un hook a airflow.hooks.base_hook.

Por ejemplo, podríamos definir un plugin con un operador que reciba una carpeta y genere un archivo tar.gz con la salida. Para eso definimos el operador, extendiendo de BaseOperator, con su constructor y el método execute, que es el que realiza el trabajo.

import tarfile
import os
from airflow.models import BaseOperator
from airflow import utils as airflow_utils

class TarGzOperator(BaseOperator):
    @airflow_utils.apply_defaults
    def __init__(self, to_compress, output_folder, *args,**kwargs):
        super().__init__(*args, **kwargs)
        if to_compress is None or output_folder is None: 
            raise Exception("You must specify the input and output folder")
        if to_compress == output_folder:
            raise Exception("to compress folder cannot be the output folder")
        if os.path.exists(output_folder) and not os.path.isdir(output_folder):
            raise Exception("output_folder must be a folder")
        self.to_compress = to_compress
        self.output_folder = output_folder
        
    
    def execute(self, context):
        print(context)
        _filename= os.path.basename(os.path.normpath(self.to_compress))
        _output_file=os.path.join(self.output_folder, _filename+str(".tar.gz") )
        if not os.path.exists(self.output_folder):
            os.makedirs(self.output_folder)
        with tarfile.open(_output_file,"w:gz" ) as _tar:
            _tar.add(self.to_compress)
            
        
        return _output_file

Luego es necesario definir el plugin :


from airflow.plugins_manager import AirflowPlugin

#si la definición del operador se hace en un archivo diferente,
# sería necesario hacer el import

class TarGzPlugin(AirflowPlugin):
    name = 'targz_plugin'
    operators=[TarGzOperator]

Al publicar el archivo en la carpeta plugins de airflow, y reiniciar tanto el webserver como el el scheduler, el nuevo plugin está disponible. Se pueden definir DAGs que lo usen:

import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import TarGzOperator

from datetime import timedelta

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='test_plugin', default_args=args,
    schedule_interval=None,
    dagrun_timeout=timedelta(minutes=5))

dummy = DummyOperator(task_id='DOES_NOTHING', dag=dag)

tarBackups = TarGzOperator(to_compress='/home/cronosnull/airflow',output_folder='/home/cronosnull',task_id='targz',dag=dag)

dummy >> tarBackups

Al habilitar y ejecutar el DAG desde la interfaz web se generará el archivo tar.gz esperado y se puede ver que el valor retornado por el operador queda registrado en XCom (lo que permitirá usarlo desde una siguiente tarea).

Airflow es una herramienta extensible, que se adapta a escenarios diversos. Espero que este artículo sea útil para quien está empezando a explorarla, le ahorre algo de tiempo y le despierte nuevas dudas -mientras lo escribía resolví algunas-.