Using Apache Airflow with Domino

Overview


Data science projects often require multiple steps to go from raw data to useful data products. These steps tend to be sequential, and involve things like:

  • sourcing data
  • cleaning data
  • processing data
  • training models

Once you understand the steps necessary to deliver results from your work, it's useful to automate them as a repeatable pipeline. Domino has the ability to schedule Jobs, but for more complex pipelines you can pair Domino with an external scheduling system like Apache Airflow.

This article will describe how to integrate Airflow with Domino by using the python-domino package.

 

 

 

Contents


 

 

 

Domino and Airflow webinar


This video is a recording of a webinar held on February 21st, 2019. This webinar walks through a detailed example of integrating Airflow with Domino.

 

 Click here to see the slides from this presentation.

 

 

 

Getting started with Airflow


Airflow is an open source platform to author, schedule, and monitor pipelines of programmatic tasks. As a user, you can define pipelines with code and configure the Airflow scheduler to execute the underlying tasks. The Airflow UI can be used visualize, monitor, and troubleshoot pipelines.

If you are new to Airflow, read the Airflow Quick Start to set up your own Airflow server.

There are many options for configuring your Airflow server, and for pipelines that can run parallel tasks, you will need to use Airflow's LocalExecutor mode. In this mode you can run tasks in parallel and execute multiple dependencies at the same time. Airflow uses a database to keep records of all the tasks it executes and schedules, so you will need to install and configure a SQL database for LocalExecutor mode.

Read the following guide to learn more about setting up LocalExecutor mode:

For more information about scheduling and triggers, notifications, and pipeline monitoring, read the Airflow documentation.

 

 

 

Installing python-domino on your Airflow workers


To create Airflow tasks that work with Domino, you need to install python-domino on your Airflow workers. This library will enable you to add tasks in your pipeline code that interact with the Domino API to start Jobs.

Connect to your Airflow workers, and follow these steps to install and configure python-domino:

  1. Install from pip

    pip install git+https://github.com/dominodatalab/python-domino.git

  2. Set up an Airflow variable to point to the Domino host. This is the URL where you load the Domino application in your browser.

    Key: DOMINO_API_HOST
    Value: <your-domino-url>

  3. Set up an Airflow variable to store the user API key you want to use with Airflow. This is the user Airflow with authenticate to Domino as for the purpose of starting Jobs.

    Key: DOMINO_API_KEY
    Value: <your-api-key>

 

 

 

How Airflow tasks map to Domino Jobs


Airflow pipelines are defined with Python code. This fits in well with Domino’s code-first philosophy. You can use python-domino in your pipeline definitions to create tasks that start Jobs in Domino.

Architecturally, Airflow has its own server and worker nodes, and Airflow will operate as an independent service that sits outside of your Domino deployment. Airflow will need network connectivity to Domino so its workers can access the Domino API to start Jobs in your Domino project. All the code that performs the actual work in each step of the pipeline -- code that fetches data, cleans data, and trains data science models -- is maintained and versioned in your Domino project. This way you have Domino’s Reproducibility engine working together with Airflow’s scheduler.

 

Screen_Shot_2019-02-21_at_11.35.09_AM.png

 

 

 

Example pipeline


The following example assumes you have an Airflow server where you want to set up a pipeline of tasks that fetches data, cleans and processes data, performs an analysis, then generates a report. It also assumes you have all the code required to complete those tasks stored as scripts in a Domino project.

 

Screen_Shot_2019-02-20_at_4.32.16_PM.png

 

The example graph shown above is written using Airflow and python-domino, and executes all the dependencies in Domino using the Airflow scheduler. It trains a model using multiple datasets, and generates a final report.

See the commented script below for an example of how to configure an Airflow DAG to execute such a pipeline with Domino Jobs.

 

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from domino import Domino
from airflow.models import Variable

# Initialize Domino API object with the api_key and host
api_key=Variable.get("DOMINO_API_KEY")
host=Variable.get("DOMINO_API_HOST")
domino = Domino("sujaym/airflow-pipeline",api_key,host)

# Parameters to DAG object
default_args = {
    'owner': 'domino',
    'depends_on_past': False,
    'start_date': datetime(2019, 2, 7),
    'retries': 1,
    'retry_delay': timedelta(minutes=.5),
    'end_date': datetime(2019, 2, 10),
}

# Instantiate a DAG 
dag = DAG('domino_pipeline', description='Execute Airflow DAG in Domino',default_args=default_args,schedule_interval=timedelta(days=1))

# Define Task instances in Airflow to kick off Jobs in Domino
t1 = PythonOperator(task_id='get_dataset_1', python_callable=domino.runs_start_blocking, dag=dag, op_kwargs={"command":["src/data/get_dataset_1.py"]})

t2= PythonOperator(task_id='get_dataset_2', python_callable=domino.runs_start_blocking, op_kwargs={"command":["src/data/get_dataset_2.py"]}, dag=dag)

t3 = PythonOperator(task_id='get_dataset_3', python_callable=domino.runs_start_blocking, op_kwargs={"command":["src/models/get_dataset_3.sh"]}, dag=dag)

t4 = PythonOperator(task_id='clean_data', python_callable=domino.runs_start_blocking, op_kwargs={"command":["src/data/cleaning_data.py"]}, dag=dag)

t5 = PythonOperator(task_id='generate_features_1', python_callable=domino.runs_start_blocking, op_kwargs={"command":["src/features/word2vec_features.py"]}, dag=dag)

t6 = PythonOperator(task_id='run_model_1', python_callable=domino.runs_start_blocking, op_kwargs={"command":["src/models/run_model_1.py"]}, dag=dag)

t7 = PythonOperator(task_id='do_feature_engg', python_callable=domino.runs_start_blocking, op_kwargs={"command":["src/features/feature_eng.py"]}, dag=dag)

t8 = PythonOperator(task_id='run_model_2', python_callable=domino.runs_start_blocking, op_kwargs={"command":["src/models/run_model_2.py"]}, dag=dag)

t9 = PythonOperator(task_id='run_model_3', python_callable=domino.runs_start_blocking, op_kwargs={"command":["src/models/run_model_3.py"]}, dag=dag)

t10 = PythonOperator(task_id='run_final_report', python_callable=domino.runs_start_blocking, op_kwargs={"command":["src/report/report.sh"]}, dag=dag)

# Define your dependencies
t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream(t2)
t5.set_upstream(t3)
t6.set_upstream([t4, t5])
t7.set_upstream(t4)
t8.set_upstream(t7)
t9.set_upstream(t7)
t10.set_upstream([t6, t8, t9])
Was this article helpful?
1 out of 1 found this helpful