From Airflow to Argo Workflows and dbt Python models

This post is Merpay & Mercoin Advent Calendar 2024, brought to you by @Yani from the Merpay Data Management team.

This article describes the journey of Merpay when migrating from Airflow to Argo Workflows and dbt, and the considerations that went into this choice. We will start with an introduction of each tool and the migration criteria that were evaluated, followed by a note to clarify some important terminology. Finally we will close with a blueprint for such a migration, rounding up best practices and common pitfalls we gathered through our own experience.

Tool introduction

Apache Airflow

Apache Airflow is an open-source platform to programmatically author, schedule, and monitor workflows. Its main strength lies in its ability to define workflows as code, allowing for dynamic pipeline generation, testing, and versioning. It also supports a wide range of operators for tasks, further enhancing its flexibility.

Argo Workflows

Argo Workflows is an open-source, container-native workflow engine for orchestrating parallel jobs on Kubernetes. It supports workflow templates allowing users to define reusable workflow steps and to orchestrate complex jobs that require parallel execution and conditional branching.

dbt

dbt (Data Build Tool) is a data transformation tool that can be used to collaborate on data models. Users can modularize their SQL queries, test and document them before deploying them to production, with auto-generated data lineage which simplifies impact analysis and debugging. dbt compiles and runs queries against specific data warehouses such as BigQuery on Google Cloud Platform (GCP).

dbt SQL models are representations of tables or views. Models read in dbt sources or other models, apply a series of transformations, and return transformed datasets in the form of a SQL SELECT statement. dbt arranges models in a dependency graph and ensures that upstream models are executed before downstream models.

dbt Python models can help you solve use cases that can’t be solved with SQL. They have all the same capabilities around testing, documentation, and lineage. On GCP, Python models are executed via Dataproc which is using PySpark as the processing framework. PySpark is an expressive and flexible API compatible with other popular libraries (e.g. pandas, numpy, scikit-learn, etc).

Migration Criteria

Airflow to Argo Workflows for workflow orchestration

  1. Architecture
    Apache Airflow operates as a standalone application, this means that managing resources and scaling can be more of a challenge with Airflow.
    Argo Workflows is Kubernetes-native, meaning it’s designed to run on a Kubernetes cluster, which allows for easier scaling and resource management.

  2. Workflow Design
    Apache Airflow excels in its ability to define workflows as code, which allows for dynamic pipeline generation, versioning, and testing.
    Argo Workflows supports complex workflows with loops, recursion, and conditional logic. Workflows are configured with the native language of Kubernetes: YAML. There is a Python software development kit (SDK) called Hera, which can streamline code with less boilerplate and features like code completion.

  3. Scheduling
    Apache Airflow uses its own scheduler, which means that the performance and reliability of the scheduler are dependent on the resources of the machine where Airflow is installed.
    Argo Workflows uses Kubernetes CronJob to schedule workflows, leveraging the power of Kubernetes for resource management and reliability.

  4. User Interface
    Apache Airflow offers a robust and interactive user interface (UI) which allows users to monitor workflows in real-time, view logs, and even rerun tasks directly from the interface, thus supporting quick and easy debugging.
    Argo Workflows provides a straightforward and clean interface for viewing and managing workflows. It may not be as feature-rich as Airflow’s UI, but there is a lot of active development around it.

  5. Community and Support
    Apache Airflow has been around for a longer time, it has a larger community and more extensive documentation.
    Argo Workflows has a rapidly growing user base with a very active community, and the documentation is improving and expanding rapidly.

In the table below, the characteristics of each tool are evaluated as positive or negative in the context of Merpay’s requirements and overall environment.

Apache Airflow Argo Workflows
Architecture +
Workflow Design +
Scheduling +
User Interface + +
Community and Support + +

Airflow to dbt for task definition

  1. Purpose
    Apache Airflow can be used to define complete ETL workflows as well as workflows with arbitrary scripted tasks interacting with a variety of systems.
    dbt focuses only on data transformations and modeling, while interacting with a single data warehouse or database.

  2. Dependency Management
    Apache Airflow supports dependency management through explicit task and workflow dependencies.
    dbt offers built-in dependency management by automatically building dependency graphs based on the connectivity between models, and ensures that transformations are executed in the correct sequence.

  3. Language
    Apache Airflow was developed in Python so it offers the full flexibility of the language.
    dbt is mainly SQL-based and has secondary support for defining models in Python.

  4. Learning Curve
    Apache Airflow can be more daunting for users without prior experience in Python or understanding of basic Airflow-specific concepts.
    dbt reduces the learning curve by allowing users to define transformations in a common language like SQL and to manage boilerplate logic (such as materializations) through simple configuration parameters.

dbt has been the tool of choice for SQL-based data transformations in Merpay for a while, so after the migration from Airflow to Argo Workflows, we wanted to explore the feasibility of using dbt Python models for some of our workflows.

Terminology note

  • Directed Acyclic Graph (DAG)
    In Airflow, DAGs represent a collection of tasks, where each node is a task and each edge is a dependency between two tasks.
    In Spark, a DAG represents a logical execution plan of computation, where each node is a transformation and the edges show the flow between computations.

  • Task
    In Airflow, a task is the basic unit of work and parallelism, it performs a specific action and it can have upstream and downstream dependencies.
    In Spark, a task is also the unit of work but it exists within the broader context of a Spark job. Jobs are represented by DAGs, and are split into stages which are ultimately collections of tasks.
    However, in Spark the unit of parallelism is a partition, which is a logical chunk of data in an Resilient Distributed Dataset (RDD). Each partition is processed independently by a single task, performing the same computation on that specific chuck of data.

The key distinction is that Spark’s parallelism is rooted in data partitioning, whereas Airflow’s parallelism revolves around task orchestration. On the surface this might seem as a slight difference but in reality it can have big implications.

Migration process

Initially, our migration involved mostly workflows that cataloged company-wide data gathered from BigQuery’s Information Schema views, Data Catalog and other GCP APIs.
The migration process was for the most part straightforward, but we gathered a few points to act as best practices, as well as a couple of common pitfalls.

Best practices

  • Express your starting data as Dataframes
  • Chain preparatory transformations
    • A good way to keep things clean is the transform() function
  • Repartition based on the target API’s quota and other limitations
    • Useful functions when managing partitions include agg(), groupBy(), keyBy() and partitionBy()
  • Interact with APIs on a partition-level
    • Mostly use flatMap() or mapPartitions()
  • Manage the output schema explicitly

Common pitfalls

  • RDDs fail as a whole
    • In contrast to Airflow tasks, it’s harder to gather partial results
  • Incremental tables are not supported by Python models
    • Use a Python model for the latest table and a SQL model for the incremental

Example code

The following example is the complete code for a Python model used for collecting information about all GCP projects, and augmenting that with a column for BigQuery-enabled projects.

from time import sleep
from typing import Iterator

import google.auth
from google.auth.impersonated_credentials import Credentials
from googleapiclient import discovery
from googleapiclient.errors import HttpError
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import length, col, lit, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType

def model(dbt, session: SparkSession) -> DataFrame:
    dbt.config(materialized="table")
    service_account = dbt.config.get("service_account")

    all_projects = get_projects_from_resource_manager(session, service_account)
    bigquery_projects = get_projects_from_bigquery(session, service_account)

    return (
        all_projects
        .transform(exclude_temp_projects)
        .transform(
            add_bigquery_enabled_column,
            bigquery_projects=bigquery_projects
        )
        .transform(finalize_dataframe)
    )

def get_projects_from_resource_manager(
        session: SparkSession,
        target_principal: str
) -> DataFrame:
    projects = list_projects_from_resource_manager(target_principal)

    schema = StructType([
        StructField("projectId", StringType()),
        StructField("projectNumber", StringType()),
        StructField("lifecycleState", StringType()),
        StructField("labels", StructType([
            StructField("data_bank_card_info", StringType()),
            StructField("data_credit_card_info", StringType()),
            StructField("data_personal_identifiable_info", StringType()),
            StructField("service_corporation", StringType()),
            StructField("service_country", StringType()),
        ])),
        StructField("parent", StructType([
            StructField("type", StringType()),
            StructField("id", StringType()),
        ])),
        StructField("createTime", StringType()),
    ])

    return session.createDataFrame(projects, schema)

def get_projects_from_bigquery(
        session: SparkSession,
        target_principal: str
) -> DataFrame:
    projects = list_projects_from_bigquery(target_principal)

    return session.createDataFrame(projects)

def exclude_temp_projects(projects: DataFrame) -> DataFrame:
    project = col("projectId")

    return projects.where(~(
        project.startswith("sys-")
        & (length(project) == 30)
        & (project.substr(5, 26).rlike(r"(\d+)"))
    ))

def add_bigquery_enabled_column(
        all_projects: DataFrame,
        bigquery_projects: DataFrame
) -> DataFrame:
    return (
        all_projects
        .join(
            bigquery_projects.withColumn("bigqueryEnabled", lit(True)),
            "projectId",
            "left_outer"
        )
        .fillna(False, "bigqueryEnabled")
    )

def finalize_dataframe(df: DataFrame) -> DataFrame:
    return (
        df
        .withColumn("createTime", to_timestamp("createTime"))
    )

def list_projects_from_resource_manager(target_principal: str) -> Iterator[dict]:
    credentials = get_impersonated_credentials(target_principal)
    service = discovery.build(
        "cloudresourcemanager",
        "v1",
        credentials=credentials,
        cache_discovery=False
    )

    request = service.projects().list()

    while request is not None:
        response = request.execute()

        for project in response.get("projects", []):
            yield project

        request = service.projects().list_next(
            previous_request=request,
            previous_response=response
        )

def list_projects_from_bigquery(target_principal: str) -> Iterator[dict]:
    credentials = get_impersonated_credentials(target_principal)
    service = discovery.build(
        "bigquery",
        "v2",
        credentials=credentials,
        cache_discovery=False
    )

    request = service.projects().list()

    while request is not None:
        try:
            response = request.execute()
        except HttpError as e:
            if 403 == e.status_code and "Quota exceeded" in e.reason:
                print(f"Error while listing projects: {e.reason}")
                sleep(1)
                continue
            else:
                raise e

        for project in response.get("projects", []):
            yield {"projectId": project["projectReference"]["projectId"]}

        request = service.projects().list_next(
            previous_request=request,
            previous_response=response
        )

def get_impersonated_credentials(target_principal: str) -> Credentials:
    scopes = ("https://www.googleapis.com/auth/cloud-platform",)
    source_credentials, _ = google.auth.default(scopes)
    return Credentials(source_credentials, target_principal, scopes)

Conclusion

In this article we discussed the criteria that led us to migrate from Apache Airflow to Argo Workflows and dbt Python models. More importantly, we pointed out some key differences regarding the units of work and parallelism between these tools, and laid out a blueprint for such a migration with our best practices and the common pitfalls we observed.

We hope this helps your own journey and see you for the next article tomorrow!

  • X
  • Facebook
  • linkedin
  • このエントリーをはてなブックマークに追加