Airflow + nós virtuais no Kubernets, tudo o que um DS precisa! (parte 3: Airflow)

When all the senses are synchronized, the soul emerges
Adolfo Bioy Casares, The Invention of Morel

Olá comunidade, neste post vamos comentar um pouco sobre o Airflow em si. Hoje Airflow é uma ferramenta muito utilizada para o gerenciamento de tarefas que devem ser executadas de forma sequencial que apresentam dependências. O termo técnico para esse tipo de estrutura é Grafos Direcionais Acíclicos (DAGs)… parece algo complicado, mas como a maior parte das coisas em matemática (quem sabe da vida) é possível decompor em pedaços menores para tornar mais fácil o entendimento.

Grafos são estruturas que apresentam nós e arestas, acredito que todos nós já nos deparamos com o tipo de estrutura alguma vez em nossas vidas, mas não demos um nome fancy para ela. Grafos são uma estrutura muito poderosas que normalmente vemos como redes, diversos elementos podem ser descritos por grafos: redes de computadores, conexões em redes sociais, malhas logísticas, entre outros.

Um grafo não dirigido e cíclico (fonte: Wikipedia).

Grafos dirigidos implicam que as relações entre os nós é ordenada e segue em uma direção. É possível pensar em uma rede social, quando uma pessoa é amiga de outra isso implica em uma relação bi-direcional, é uma aresta não dirigida. Isso contrasta quando você segue alguém, a informação passa a fluir em uma única direção… seria como se eu seguisse a Beyonce com os seus 279,2 milhões de seguidores, infelizmente eu seria só mais um na multidão. Eu veria seus posts, mas ela não receberia em sua timeline a minha postagem sobre o deploy de Airflow usando AKS e nós virtuais (o que é uma pena tenho certeza que se interessaria por Data Science após essa publicação da mais fina qualidade).

O mesmo grafo mais agora dirigido. No caso o 1 recebe informações do nó 5 e 2, mas não está nem ai para eles. Modificado a lá homo ecce do Wikipedia

A questão é que o grafo acima é cíclico, isso pode ser visto pelas conexões dos nós 4, 5, 3 e 2, nesse caso a informação pode ficar girando nesse caminho de forma infinita. Na orquestração de tarefas isso não faz sentido, pois ficariam rodando sem parar. Por isso falamos de grafos direcionais e acíclicos, eles funcionam como o poema de Carlos Drummond de Andrade.

João amava Teresa que amava Raimundo
que amava Maria que amava Joaquim que amava Lili
que não amava ninguém.

Trecho de Quadrilha, Carlos Drummond de Andrade

No trecho do poema temos um caso comum da vida amorosa de todos e um grafo acíclico. O amor não é correspondido por nenhum dos integrantes (direcional) e no fim Lili não ama ninguém (acíclico). O grafo acima em sua versão sofrência pode ser visto abaixo.

Grafo Direcional Acíclico, a informação é direcionada e não apresenta caminhos que voltam ao mesmo lugar. Mais uma modificação a lá homo ecce do Wikipedia

Com isso definimos o que é uma DAG, agora vamos seguir e conversar um pouco mais sobre o Airflow.

Informações gerais sobre Airflow

O Airflow apresenta diferente tipos de Executors, eles estabelecem a forma que as task são rodadas. O que vamos utilizar no deploy é o Cellery Executor, ele necessita que seja integrado ao deploy uma mensageria por isso vamos utilizar o RabbitMQ. Nessa forma de deploy o Airflow é separado em 3 tipos de containers distintos e necessita de um Postgres e mais um RabbitMQ para o correto deploy.

  • Webserver: Ele é responsável por servir o front-end web do Airflow, assim como as APIs que podem ser utilizadas para uma integração programática com outros processos.
  • Scheduler: Ele é responsável por monitorar o status das tarefas e chamar os demais passos de acordo com os resultados obtidos.
  • Worker: Reponsável por executar a tarefa, no caso do deploy proposto esse processo roda em um pod diferente do Webserver podendo ter mais de um pod worker para parelelizar a execução de tasks e DAGs independente.
Diagrama extraído do site do Airflow mostrando a organização geral. É possível verificar que o WebServer, Scheduler e Workers apresentam processo independentes que consomem as informações de um repositório contendo os códigos das DAGs.

Para que o Airflow funcione corretamente é necessário que os códigos das DAGs estejam disponíveis em todos os pods (Webserver, Scheduler e Workers). Existem diferentes formas de fazer isso: disco compartilhado, sync com alguma pasta no S3/Blob Storage ou utilização de um repositório git. Nós optamos pelo último (git) por facilitar o CICD dos códigos, sendo possível travar o pull request para a main no repositório (não se fala mais master) necessitando de algum code review ou coisa do tipo para dar o merge.

Para facilitar a integração com o git, foi criada uma imagem Docker personalizada contendo o Airflow. Nesta imagem existe um processo cron que é responsável por baixar os códigos do git e instalar os pacotes colocados em requirements/requirements.txt. Mais informações sobre a imagem, é possível verificar o repositório no github e um post anterior no blog da Murabei como este e este.

Criação de DAGs

As DAGs no Airflow são definidas por código Python que é “carregado” pelo Airflow mostrando através do Webserver. Abaixo colocamos um código mostrando com seria para rodar um PythonOperator, esse operador roda uma função Python como tarefa.

import time
import os
import random
from builtins import range
from airflow.operators.python import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
from pprint import pprint


seven_days_ago = datetime.combine(
datetime.today() - timedelta(7), datetime.min.time())

args = {
  'owner': 'airflow',
  'start_date': seven_days_ago,
}


# Função que vai ser rodada, o airflow passa como argumento uma séries de
# informações com o ID da task run, parâmetros que foram utilizadas para
# rodar a DAG, entre outras informações.
def my_sleeping_function_1(**kwargs):
  print("Kwargs que foram passados para função:", kwargs)
  sleep_time = random.random()*5
  print("Sleeping for", sleep_time, "seconds")
  time.sleep(sleep_time)


# Definição das informações básicas da DAG com ID único (dag_id), intervalo em que
# a DAG deve ser rodado (schedule_interval) e alguma tags que podem ser
# utilizadas para organizar as DAGs no frontend (tags)
with DAG(
  dag_id='example_python_operator__no_error', default_args=args,
  schedule_interval=None, tags=['python', 'tests']) as dag:
  
  # Define uma task, o task_id deve ser único dentro da DAG. A baixo são definidas
  # 4 tasks do tipo PythonOperator usando a mesma função. Poderiam ser criadas
  # tasks usando diferentes Operators ou mesmo chamando diferentes funções
  # Python.
  run_this_1 = PythonOperator(
    task_id='print_the_context_1__no_error',
    python_callable=my_sleeping_function_1,
    dag=dag)

  run_this_2 = PythonOperator(
    task_id='print_the_context_2__no_error',
    python_callable=my_sleeping_function_1,
    dag=dag)

  run_this_3 = PythonOperator(
    task_id='print_the_context_3__no_error',
    python_callable=my_sleeping_function_1,
    dag=dag)

  run_this_4 = PythonOperator(
    task_id='print_the_context_4__no_error',
    python_callable=my_sleeping_function_1,
    dag=dag)
  
  ######################################################
  # Define as dependências entre as diferentes tarefas #
  # A tarefa 1 vai rodar antes das 2 e 3
  run_this_1 >> run_this_2
  run_this_1 >> run_this_3
  # Sendo que a 2 e 3 devem ser rodadas antes da 4
  run_this_3 >> run_this_4
  run_this_2 >> run_this_4

Abaixo podemos ver como a DAG definida acima pode ser verificada no front-end do Airflow.

Captura de tela do frontend do Airflow. Em destaque a DAG definida através do código Python acima.
Captura de tela da descrição da DAG definida através do código Python acima.
Captura de tela do front-end do Airflow. Na aba codes é possível verificar o código no qual foi definido a DAG.
Captura de tela do frontend do Airflow. É possível verificar os logs da tarefa que foi rodada.

Utilização do KubernetsPodOperator com afinidades para nó virtual no AKS

No exemplo acima estamos utilizando o PythonOperator. Este é o operador mais simples para rodar códigos Python, ele apenas roda uma função que tenha sido definida no código que está sendo utilizado no Airflow. Esse tipo de operador roda no pod worker quando o deploy é feito como CeleryExecutor, dessa forma não apresenta escalonamento automático e não aproveita os nós virtuais do AKS.

Para fazer o deploy de tarefas utilizando containers, é possível usar o KubernetsPodOperator. Esse operador irá criar um pod que será responsável por rodar o código necessário, sendo que ao final dos procedimentos ele deve terminar liberando dessa forma o poder computacional solicitado. Abaixo podemos verificar a definição de uma imagem “Hello Word” para rodar uma tarefa em Python através de um pod.

Dockerfile:

# Define a imagem que será utilizada e
# a versão do sistema operacional o caso um
# Python 3.8 sobre um Debian Buster
FROM python:3.8-buster

# Define que todos os prints de um script Python
# serão direcionados para o stdout. Dessa forma
# é possível capturar eles nos logs
ENV PYTHONUNBUFFERED=0

# Atualiza o sistema e instala os drivers do Postgres
# e o curl para facilitar alguns debugs se necessário
RUN apt-get update
RUN apt-get install -y --force-yes postgresql-client
RUN apt-get install -y --force-yes curl

# Copia os requirements de Python que vão ser necessários
# para rodar os códigos da tarefa e instala os pacotes na imagem.
COPY requirements/requirements.txt ./
RUN pip3 install --upgrade pip
RUN pip3 install -r /requirements.txt

# Copia os códigos da tarefa
COPY code /code
WORKDIR /code/

# Definie que ao iniciar o container, será rodado um código
# bash de início
CMD ["bash", "/code/start_server.bash"]

code/start_server.bash

#!/bin/bash
# Acho que não precisa de nenhum comentário especial.
# Eventualmente é possível testar a disponibilidade de algum serviço
# antes de rodar o código python ou mesmo ajustar algumas variáveis
# de ambiente
echo "************************"
echo "Starting Test Task Image"
echo "************************"
python -u /code/app.py

Foram criadas duas imagens com o mesmo formato: test-task-image e test-task-image-error. A segunda é similar a primeira, mas dispara um erro após à 15 interações do loop, os códigos do app.py para ambas pode ser verificado abaixo:

test-task-image: code/app.py

import time

# Roda um loop simples printanto o index do
# loop para verificar se é possível capturar
# as informações nos logs.
print("## Start task")
for i in range(1, 30):
    print("Running loop:", i)
    time.sleep(1)

print("## End task")

test-task-image-error: code/app.py

import time

print("## Start task")
for i in range(1, 30):
    print("Running loop:", i)
    time.sleep(1)

    # Cria um raise arbitrário para verificar se o
    # Airflow é capaz de capturar o erro para fazer
    # o tratamento adequado.
    if 15 < i:
        raise Exception("Deu ruim na sua task")

print("## End task")

É possível verificar que a definição da imagem é extremamente simples. Uma das grandes vantagens é que cada tarefa (mesmo dentro da mesma DAG) pode ter requerimentos de pacotes diferentes entre sí, isso permite que novos processos possam utilizar as versões mais recentes dos pacotes sem que haja a necessidade de migrar todos os códigos legados. Essa colisão de dependencias ocorre na utilização do PythonOperator, visto que todos os códigos rodam utilizando os pods worker e compartilham os pacotes Python instalados, bem como mesma imagem do sistema operacional.

Segue abaixo uma DAG que utiliza o KubernetsPodOperator, mas não obriga que os pods sejam direcionados para o nó virtual.

import time
import random
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator)
from airflow.operators.python import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
from kubernetes.client import models as k8s

seven_days_ago = datetime.combine(
    datetime.today() - timedelta(7), datetime.min.time())


args = {
'owner': 'airflow',
'start_date': seven_days_ago,
}


def my_sleeping_function_1():
   """Sleep function."""
   sleep_time = random.random()*5
   print("Sleeping for", sleep_time, "seconds")
   time.sleep(sleep_time)


with DAG(dag_id='example_kubernets__private_repo', default_args=args,
         schedule_interval=None,
         start_date=datetime(2021, 1, 1),
         tags=['kubernetes', 'tests']) as dag:
    
    # Define a 1a tarefa utilizando a imagem test-task-image
    # que encontra-se em um repositório privado.
    kube_op_1 = KubernetesPodOperator(
        task_id="kube_op_1",
        name="kube_op_1",
        # Define o namespace onde será feito o deploy do pod.
        namespace='airflow-test',
        # Definie a image que será utilizada para gerar o pod.
        image="airflowk8stest.azurecr.io/airflow/test-task-image:0.0",
        # Os secrets necessários para rodar o pod.
        image_pull_secrets=[k8s.V1LocalObjectReference('dockercfg')],
        # labels que serão aplicados no Pod.
        labels={"foo": "bar"},
        # Define se após terminar o pod deverá ser removido.
        is_delete_operator_pod=True)
    
    # Define um Python Operator que será intercalado com os
    # KubernetsOperator na DAG. Os python operator rodam
    # sempre nos nós worker (sem escalonamento automático).
    run_this_1 = PythonOperator(
        task_id='print_the_context_1',
        python_callable=my_sleeping_function_1,
        dag=dag)
   
    # Segunda tarefa usando o KubernetsPodOperator, poderia ser
    # utilizada outras imagens para compor a DAG em cada uma
    # das tarefas.
    kube_op_2 = KubernetesPodOperator(
        task_id="kube_op_2",
        name="kube_op_2",
        namespace='airflow-test',
        image="airflowk8stest.azurecr.io/airflow/test-task-image:0.0",
        image_pull_secrets=[k8s.V1LocalObjectReference('dockercfg')],
        labels={"foo": "bar"},
        is_delete_operator_pod=True)
    
    # Python operator similar a 1a tarefa python
    run_this_2 = PythonOperator(
        task_id='print_the_context_2',
        python_callable=my_sleeping_function_1)
    
    # Última tarefa que causa um erro pemitindo verificar se o Airflow é
    # capaz de identificar que a tarefa deu erro mesmo dentro de um pod.
    kube_op_3 = KubernetesPodOperator(
        task_id="kube_op_3",
        name="kube_op_3",
        namespace='airflow-test',
        image="airflowk8stest.azurecr.io/airflow/test-task-image-error:0.0",
        image_pull_secrets=[k8s.V1LocalObjectReference('dockercfg')],
        labels={"foo": "bar"},
        is_delete_operator_pod=True)
     
     # Cria as dependências das tarefas na DAG
     kube_op_1 >> run_this_1
     kube_op_2 >> run_this_2
     run_this_1 >> kube_op_3
     run_this_2 >> kube_op_3

Aqui, da mesma forma que no PyhtonOperator, é possível verificar a DAG e os códigos que foram responsáveis por sua criação no frontend do Airflow. Contudo os códigos que compõe cada uma das imagens, que constituem as tarefas KubernetsPodOperator, não podem ser visualizados. De qualquer forma eles podem ser disponibilizados em repositórios git separados por exemplo.

Captura do frontend do Airflow. É possível verificar que o Airflow consegue identificar que o pod terminou em erro e trata corretamente o sinal.

Assim como no PythonOperator, é possível visualizar os logs através do frontend do Airflow.

Captura de tela do frontend do Airflow mostrando os logs da tarefa executada no Pod.

É possível verificar usando o kubectrl get pods que no momento que as tarefas 1 e 2 estão sendo executadas dois pods efêmeros são criados.

Captura dos resultado do kubectrl get pods mostrando a criação dos dois pods efêmeros.

De qualquer forma, ainda não estamos utilizando o nó virtual do AKS. Para que isso seja feito é necessário definir afinidades e tolerâncias no deploy dos containers através do Airflow. Para saber mais sobre afinidades e tolerâncias, dê uma olhada na documentação do Kubernets e do AKS .

import time
import random
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator)
from airflow.operators.python import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
from kubernetes.client import models as k8s

seven_days_ago = datetime.combine(
    datetime.today() - timedelta(7), datetime.min.time())


args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
}


##################################################
# Define afinidades para que o pod seja colocado #
# no nó virtual
match_expressions = [
    k8s.V1NodeSelectorRequirement(
        key="type", operator="In", values=['virtual-kubelet']),
    k8s.V1NodeSelectorRequirement(
        key="beta.kubernetes.io/os", operator="In", values=['linux']),
    k8s.V1NodeSelectorRequirement(
        key="kubernetes.io/role", operator="In", values=['agent']),
]

# Cria um objeto de afinidade que pode ser utilizado
# no deploy do Pod
virtual_affinity = k8s.V1Affinity(
    node_affinity=k8s.V1NodeAffinity(
        preferred_during_scheduling_ignored_during_execution=[
            k8s.V1PreferredSchedulingTerm(
                weight=1,
                preference=k8s.V1NodeSelectorTerm(
                    match_expressions=match_expressions
                )
            )
        ]
    )
)

# Cria uma tolerância para que consiga colocar o pod no nó virtual
# por definição o AKS não deixa colocar os pods por lá, por isso
# é necessário criar a tolerância mesmo setando a afinidade
virtual_tolerations = [
     k8s.V1Toleration(key="virtual-kubelet.io/provider", operator="Exists")
]
##################################################


def my_sleeping_function_1():
    """Sleep function."""
    sleep_time = random.random()*5
    print("Sleeping for", sleep_time, "seconds")
    time.sleep(sleep_time)


with DAG(dag_id='example_kubernets_error__private_repo', default_args=args,
         schedule_interval=None,
         start_date=datetime(2021, 1, 1),
         tags=['kubernetes', 'tests']) as dag:

    kube_op_1 = KubernetesPodOperator(
        task_id="kube_op_1",
        name="kube_op_1",
        namespace='airflow-test',
        image="airflowk8stest.azurecr.io/airflow/test-task-image:0.0",
        image_pull_secrets=[k8s.V1LocalObjectReference('dockercfg')],
        labels={"foo": "bar"},
        is_delete_operator_pod=True)

    run_this_1 = PythonOperator(
        task_id='print_the_context_1',
        python_callable=my_sleeping_function_1,
        dag=dag)

    kube_op_2 = KubernetesPodOperator(
         task_id="kube_op_2",
         name="kube_op_2",
         namespace='airflow-test',
         image="airflowk8stest.azurecr.io/airflow/test-task-image:0.0",
         image_pull_secrets=[k8s.V1LocalObjectReference('dockercfg')],
         labels={"foo": "bar"},
         is_delete_operator_pod=True)

    run_this_2 = PythonOperator(
         task_id='print_the_context_2',
         python_callable=my_sleeping_function_1,
         dag=dag)
    
    # Modificando a 3a task para que fosse direcionada para o
    # nó virtual do AKS
    kube_op_3 = KubernetesPodOperator(
        task_id="kube_op_3",
        name="kube_op_3",
        namespace='airflow-test',
        image="airflowk8stest.azurecr.io/airflow/test-task-image-error:0.0",
        image_pull_secrets=[k8s.V1LocalObjectReference('dockercfg')],
        labels={"foo": "bar"},

        # Essa opção foi setata para False para poder permitir "ver"
        # o container no nó virtual do AKS mesmo após o termino da task.
        # Em deploy "normais" é recomendável manter a opção como True
        # para não acumular containers "terminados".
        is_delete_operator_pod=False,
        
        # Aqui setando para o pods apresentar afinidade pelo nó
        # virtual do AKS.
        affinity=virtual_affinity,
        tolerations=virtual_tolerations)

kube_op_1 >> run_this_1
kube_op_2 >> run_this_2
run_this_1 >> kube_op_3
run_this_2 >> kube_op_3

A DAG com a tarefa com afinidade pelo nó virtual roda da mesma forma que as demais, só demora mais visto que o recurso é alocado no momento que a tarefa está sendo rodada. Ainda é possível recuperar os logs nos virtuais da mesma forma que nos nós “tradicionais”.

Dessa forma é possível rodar tarefas utilizando recursos que são alocados e desalocados de forma automática, isso centralizando o desenvolvimento utilizando containers docker e Python.

Próximos passos

Agora que já falamos do Airflow e temos a infraestrutura em pé só falta fazer o deploy. No próximo post vamos mostrar os arquivos YML que podem ser utilizados para criar os containers no AKS e expor os end-points para o consumo.

Nos vemos no próximo post!

Conheça um pouco mais do jeito Murabei acompanhando nosso perfil no Linkedin.  

 

Compartilhar