zl程序教程

您现在的位置是:首页 >  Java

当前栏目

Apache Airflow-编写第一个DAG

2023-02-18 16:48:00 时间

Apache Airflow: Write your first DAG in Apache Airflow

在Apache Airflow中写入您的第一个DAG

Reading Time: 3 minutes

In this article, we’ll see how to write a basic “Hello World” DAG in Apache Airflow. We will go through all the files that we have to create in Apache Airflow to successfully write and execute our first DAG. 在本文中,我们将了解如何在Apache Airflow中编写基本的“Hello world” DAG。我们将遍历必须在Apache airflow中创建的所有文件,以成功写入和执行我们的第一个DAG。

Create a Python file

创建一个Python文件 Firstly, we will create a python file inside the “airflow/dags” directory. Since we are creating a basic Hello World script, we will keep the file name simple and name it “HelloWorld_dag.py“. Keep in mind if this is your first time writing a DAG in Airflow, then we will have to create the “dags” folder.

首先,我们将在“airflow/dags”目录中创建一个python文件。由于我们正在创建一个基本的Hello World脚本,因此我们将保持文件命名简单,并将其命名为“HelloWorld_dag.py”。请记住,如果这是您第一次在Airflow中编写DAG,那么我们将不得不创建“dags”文件夹。

Importing important modules

导入重要模块 To create a properly functional pipeline in airflow, we need to import the “DAG” python module and the “Operator” python module in our code. We can also import the “datetime” module.

要在Airflow中创建功能正常的管道,我们需要在代码中导入“DAG”python模块和“Operator”python模块。我们还可以导入“datetime”模块。

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

Create a DAG Object

创建DAG对象 In this step, we will create a DAG object that will nest the tasks in the pipeline. We send a “dag id”, which is the dag’s unique identifier. 在此步骤中,我们将创建一个 DAG 对象,该对象将在管道中嵌套任务。我们发送一个“dag id”,这是 dag 的唯一标识符。

As a best practice, it is advised to keep the “dag_id” and the name of the python file as the same. Therefore, we will keep the “dag_id” as “HelloWorld_dag“. 作为最佳实践,建议将“dag_id”和python文件的名称保持相同。因此,我们将“dag_id”保留为“HelloWorld_dag”。

Now we will define a “start_date” parameter, this is the point from where the scheduler will start filling in the dates. 现在我们将定义一个“start_date”参数,这是填写调度程序开始日期的地方。

For the Apache Airflow scheduler, we also have to specify the interval in which it will execute the DAG. We define the interval in “corn expression“. Apache Airflow has some pre-defined cron expressions such as “@yearly“, “@hourly“, and “@daily“. For this example, we will be going with “@hourly“. 对于 Apache Airflow 调度程序,我们还必须指定它将执行 DAG 的时间间隔。我们在“corn expression”中定义。Apache Airflow 有一些预定义的cron表达式,例如“@yearly”,“@hourly”和“@daily”。对于此示例,我们将使用“@hourly”。

Once the scheduler starts filling in the dates from the specified “start_date” parameter on an “hourly” basis and it will keep filling in the date till it reaches the current hour. This is called a “catchup“. We can turn off this “catchup” by keeping its parameter value as “False”. 一旦调度程序开始以“hourly”为单位填写指定“start_date”参数中的日期,它将直到达到当前填写的小时才会调度。这被称为“cathup”。我们可以通过将其参数值保留为“False”来关闭此它。

with DAG(
    dag_id="HelloWorld_dag",
    start_date=datetime(2021,1,1),
    schedule_interval="@hourly",
    catchup=False) as dag:

Create a Task

创建任务 Now we will define a PythonOperator. A PythonOperator is used to invoke a Python function from within your DAG. We will create a function that will return “Hello World” when it is invoked. 现在我们将定义一个 Python 操作器。Python操作器用于从 DAG 中调用Python函数。我们将创建一个函数,该函数在调用时将返回“Hello World”。

Like an object has “dag_id“, similarly a task has a “task_id“. 就像一个对象有“dag_id”,同样一个任务有一个“task_id”。

It also has a “python callable” parameter, which takes as input the name of the function to be called. 它还具有一个python 可调用参数,该参数将要调用的函数的名称作为输入。

 task1 = PythonOperator(
        task_id="hello_world",
        python_callable=helloWorld)

Creating a callable function

创建可调用函数 Now we will create a callable function which will be called by the “PythonOperator”. 现在我们将创建一个可调用的函数,该函数将由“Python操作器”调用。

 def helloWorld():
        print("Hello world!")

Setting Dependecies in DAG

在 DAG 中设置依赖项 We don’t need to indicate the flow because we only have one task here; we can just write the task name. But if we had multiple tasks that we wanted to execute, we can set their dependencies by using the following operators “>>” or “<<“ respectively. 我们不需要指示DAG的流程,因为我们这里只有一个任务;我们可以只写任务名称。但是,如果我们有多个任务要执行,我们可以分别使用以下运算符“>>”或“<<”来设置它们的依赖关系。

Our complete DAG file should like this

我们完整的DAG文件应该像这样

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def helloWorld():
        print("Hello world!")

with DAG(
    dag_id="HelloWorld_dag",
    start_date=datetime(2021,1,1),
    schedule_interval="@hourly",
    catchup=False) as dag:

    task1 = PythonOperator(
        task_id="hello_world",
        python_callable=helloWorld)


task1

To run our DAG file

运行我们的 DAG 文件 To execute our DAG file, we need to start Apache Airflow and Airflow scheduler. We can do that using the following commands: 要执行我们的 DAG 文件,我们需要启动 Apache Airflow和Airflow调度程序。我们可以使用以下命令来执行此操作:

airflow webserver -p 8081
airflow scheduler

# access :http://localhost:8081/

We will be able to see our DAG running in Airflow Web UI once we log in to the terminal successfully. 成功登录到终端后,我们将能够看到我们的 DAG 。这时可以在Airflow Web UI 中运行它。

Conclusion 结论

In this blog, We saw how to write our first DAG and execute it. We saw how to instantiate a DAG object and Create a task and a callable function. 在这篇博客中,我们看到了如何编写第一个 DAG 并执行它。我们了解了如何实例化 DAG 对象和创建任务和可调用函数。