Getting Started with Apache Airflow: Orchestrating Workflows
Airflow is an open source platform written in Python. It’s used to program, schedule and monitor workflows, a workflow being a sequence of steps that make up a process. Within the tool, it is defined as a Directed Acyclic Graph (DAG) of atomic tasks. In order to execute these tasks there are predefined operators, and we have the possibility of developing them quite simply. You can find more information on https://airflow.apache.org/
🤓 Basic concepts
Components
- Webserver: a Flask server with Gunicorn to access the graphical interface.
- Scheduler: responsible for scheduling tasks
- Metadata database: database where Airflow metadata is stored (information about DAGs, connections, variables, users, etc).
- Executor: defines how tasks are executed.
Key concepts
- DAG: a graphical representation of several tasks.
- Operator: represents a task in Airflow.
- Task: an instance of an Operator.
- Task Instances: a specific execution of a task with a timestamp.
- Workflow: combination of all of the above.
What’s a DAG?
It’s a group of tasks you want to execute, organized in a way that reflects their relationships and dependencies. It defines how a workflow will be executed.
What’s an Operator?
It’s a task in a workflow.
- They represent a single task
- When they are instantiated, a task is created
Types of Operator
- Actions: PostgreSQL, MySQL
- Transfer: S3ToRedshift, BigQueryToGCS, MySqlToS3
- Sensors: S3Sensor, SQL Sensor
What are Hooks?
Hooks are interfaces to connect to external services. They are used to connect to external services from PythonOperator. They can also be used to make Operators.
What’s XCom?
XCom it’s the Airflow’s mechanism that let tasks share information to each other
⛴️ Running Apache Airflow with Docker Compose
The following is a project to raise Apache Airflow services with Docker Compose. The environment’s features include:
- Database: Postgres 12.3
- Airflow 2.0.1 (init, webserver, scheduler)
- Database manager: Pgadmin 4
It also has an example of a simple DAG called hello_world, which you can find in the DAG folder.
⌨️ Creating our first DAG (Hello World DAG)
A DAG is a Python script in which you configure and define the tasks that are part of the workflow. We are now going to create a file in the DAGs folder, I titled the example: hello_world.py. The file will have three simple tasks executed sequentially, each one with a different operator.
from datetime import datetimefrom airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperatordefault_args = {
'owner': 'davidcasr',
'start_date': datetime(2021, 3, 1, 12, 0, 0)
}def hello_world_loop():
for palabra in ['hello', 'world']:
print(palabra)with DAG(
dag_id='hello_world',
default_args = default_args,
schedule_interval='@once'
) as dag:test_start = DummyOperator(task_id='test_start')test_python = PythonOperator(task_id='test_python', python_callable=hello_world_loop)test_bash = BashOperator(task_id='test_bash', bash_command='echo Hello World!')test_start >> test_python >> test_bash
The operators used were the following:
- DummyOperator: an operator that literally does nothing. It can be used to group tasks in a DAG.
- BashOperator: an operator to execute commands in a Bash shell.
- PythonOperator: an operator for executing Python function calls.
⚡ Running the DAG
With the DAG defined and located in the DAG folder, we can open our browser and go to localhost:8080. Then we look for the DAG created and enable it with the switch button, located on the left side of the hello_world DAG.
References
- Official Documentation: https://airflow.apache.org/docs/apache-airflow/stable/index.html