Apache Airflow (Python) DAG File

From GM-RKB
(Redirected from Airflow DAG File)
Jump to navigation Jump to search

An Apache Airflow (Python) DAG File is a workflow DAG file that specifies an Airflow DAG (for Apache Airflow).



References

2023

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

# default_args are the default arguments for the DAG
default_args = {
   "owner": "me",
   "start_date": "2022-01-01",
   "depends_on_past": False,
   "retries": 1,
   "retry_delay": timedelta(minutes=5),
} 

# Create the DAG
dag = DAG(
   "my_dag_id",
   default_args=default_args,
   schedule_interval=timedelta(hours=1),
)

# Define a task using the PythonOperator
task1 = PythonOperator(
   task_id="task1",
   python_callable=some_function,
   dag=dag,
)

# Define another task
task2 = PythonOperator(
   task_id="task2",
   python_callable=some_other_function,
   dag=dag,
)

# Set task dependencies
task2.set_upstream(task1)
    • In this example, the DAG has two tasks: task1 and task2. The task2 depends on the task1, meaning that task1 will be executed before task2. The DAG has a schedule_interval of 1 hour, which means it will run every hour. The default_args dictionary contains default arguments for the DAG, such as the owner, the start date, and the retry behavior.

      This is a very basic example of an Airflow DAG file. You can define more complex DAGs with multiple tasks, branching and looping logic, and dynamic task generation. You can also use a wide range of operators provided by Airflow or create your own custom operators.

2023

  • chat
    • Q: write an Airflow DAG file with branching and looping logic
    • A: Here is an example of an Apache Airflow DAG file that includes branching and looping logic:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator

# default_args are the default arguments for the DAG
default_args = {
    "owner": "me",
    "start_date": "2022-01-01",
    "depends_on_past": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

# Create the DAG
dag = DAG(
    "my_dag_id",
    default_args=default_args,
    schedule_interval=timedelta(hours=1),
)

# Define a task that branches based on a condition
def branching_task(**kwargs):
    if some_condition(kwargs["execution_date"]):
        return "task_a"
    else:
        return "task_b"

branching_op = PythonOperator(
    task_id="branching_task",
    python_callable=branching_task,
    provide_context=True,
    dag=dag,
)

# Define the tasks that will be run depending on the result of the branching task
task_a = DummyOperator(task_id="task_a", dag=dag)
task_b = DummyOperator(task_id="task_b", dag=dag)

# Set task dependencies
branching_op >> task_a
branching_op >> task_b

# Define a task that will run in a loop
def loop_task(**kwargs):
    for i in range(5):
        task_id = f"task_{i}"
        task = DummyOperator(task_id=task_id, dag=dag)
        kwargs["ti"].xcom_push(key=task_id, value=i)
        yield task

loop_op = PythonOperator(
    task_id="loop_task",
    python_callable=loop_task,
    provide_context=True,
    dag=dag,
)

# Set task dependencies
loop_op >> task_b
    • In this example, the DAG has a task called branching_task that branches based on the result of the some_condition function. Depending on the result of this function, the DAG will run either task_a or task_b. The loop_task runs in a loop and generates a series of tasks, task_0, task_1, etc., that are each run one after the other. The loop task pushes the current loop iteration index to the XCom (short for cross-communication) store, which allows other tasks to retrieve the value and use it in their execution.