Apache Airflow-编写第一个DAG
Apache Airflow: Write your first DAG in Apache Airflow
在Apache Airflow中写入您的第一个DAG
Reading Time: 3 minutes
In this article, we’ll see how to write a basic “Hello World” DAG in Apache Airflow. We will go through all the files that we have to create in Apache Airflow to successfully write and execute our first DAG. 在本文中,我们将了解如何在Apache Airflow中编写基本的“Hello world” DAG。我们将遍历必须在Apache airflow中创建的所有文件,以成功写入和执行我们的第一个DAG。
Create a Python file
创建一个Python文件 Firstly, we will create a python file inside the “airflow/dags” directory. Since we are creating a basic Hello World script, we will keep the file name simple and name it “HelloWorld_dag.py“. Keep in mind if this is your first time writing a DAG in Airflow, then we will have to create the “dags” folder.
首先,我们将在“airflow/dags”目录中创建一个python文件。由于我们正在创建一个基本的Hello World脚本,因此我们将保持文件命名简单,并将其命名为“HelloWorld_dag.py”。请记住,如果这是您第一次在Airflow中编写DAG,那么我们将不得不创建“dags”文件夹。
Importing important modules
导入重要模块 To create a properly functional pipeline in airflow, we need to import the “DAG” python module and the “Operator” python module in our code. We can also import the “datetime” module.
要在Airflow中创建功能正常的管道,我们需要在代码中导入“DAG”python模块和“Operator”python模块。我们还可以导入“datetime”模块。
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
Create a DAG Object
创建DAG对象 In this step, we will create a DAG object that will nest the tasks in the pipeline. We send a “dag id”, which is the dag’s unique identifier. 在此步骤中,我们将创建一个 DAG 对象,该对象将在管道中嵌套任务。我们发送一个“dag id”,这是 dag 的唯一标识符。
As a best practice, it is advised to keep the “dag_id” and the name of the python file as the same. Therefore, we will keep the “dag_id” as “HelloWorld_dag“. 作为最佳实践,建议将“dag_id”和python文件的名称保持相同。因此,我们将“dag_id”保留为“HelloWorld_dag”。
Now we will define a “start_date” parameter, this is the point from where the scheduler will start filling in the dates. 现在我们将定义一个“start_date”参数,这是填写调度程序开始日期的地方。
For the Apache Airflow scheduler, we also have to specify the interval in which it will execute the DAG. We define the interval in “corn expression“. Apache Airflow has some pre-defined cron expressions such as “@yearly“, “@hourly“, and “@daily“. For this example, we will be going with “@hourly“. 对于 Apache Airflow 调度程序,我们还必须指定它将执行 DAG 的时间间隔。我们在“corn expression”中定义。Apache Airflow 有一些预定义的cron表达式,例如“@yearly”,“@hourly”和“@daily”。对于此示例,我们将使用“@hourly”。
Once the scheduler starts filling in the dates from the specified “start_date” parameter on an “hourly” basis and it will keep filling in the date till it reaches the current hour. This is called a “catchup“. We can turn off this “catchup” by keeping its parameter value as “False”. 一旦调度程序开始以“hourly”为单位填写指定“start_date”参数中的日期,它将直到达到当前填写的小时才会调度。这被称为“cathup”。我们可以通过将其参数值保留为“False”来关闭此它。
with DAG(
dag_id="HelloWorld_dag",
start_date=datetime(2021,1,1),
schedule_interval="@hourly",
catchup=False) as dag:
Create a Task
创建任务 Now we will define a PythonOperator. A PythonOperator is used to invoke a Python function from within your DAG. We will create a function that will return “Hello World” when it is invoked. 现在我们将定义一个 Python 操作器。Python操作器用于从 DAG 中调用Python函数。我们将创建一个函数,该函数在调用时将返回“Hello World”。
Like an object has “dag_id“, similarly a task has a “task_id“. 就像一个对象有“dag_id”,同样一个任务有一个“task_id”。
It also has a “python callable” parameter, which takes as input the name of the function to be called. 它还具有一个python 可调用参数,该参数将要调用的函数的名称作为输入。
task1 = PythonOperator(
task_id="hello_world",
python_callable=helloWorld)
Creating a callable function
创建可调用函数 Now we will create a callable function which will be called by the “PythonOperator”. 现在我们将创建一个可调用的函数,该函数将由“Python操作器”调用。
def helloWorld():
print("Hello world!")
Setting Dependecies in DAG
在 DAG 中设置依赖项 We don’t need to indicate the flow because we only have one task here; we can just write the task name. But if we had multiple tasks that we wanted to execute, we can set their dependencies by using the following operators “>>” or “<<“ respectively. 我们不需要指示DAG的流程,因为我们这里只有一个任务;我们可以只写任务名称。但是,如果我们有多个任务要执行,我们可以分别使用以下运算符“>>”或“<<”来设置它们的依赖关系。
Our complete DAG file should like this
我们完整的DAG文件应该像这样
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def helloWorld():
print("Hello world!")
with DAG(
dag_id="HelloWorld_dag",
start_date=datetime(2021,1,1),
schedule_interval="@hourly",
catchup=False) as dag:
task1 = PythonOperator(
task_id="hello_world",
python_callable=helloWorld)
task1
To run our DAG file
运行我们的 DAG 文件 To execute our DAG file, we need to start Apache Airflow and Airflow scheduler. We can do that using the following commands: 要执行我们的 DAG 文件,我们需要启动 Apache Airflow和Airflow调度程序。我们可以使用以下命令来执行此操作:
airflow webserver -p 8081
airflow scheduler
# access :http://localhost:8081/
We will be able to see our DAG running in Airflow Web UI once we log in to the terminal successfully. 成功登录到终端后,我们将能够看到我们的 DAG 。这时可以在Airflow Web UI 中运行它。
Conclusion 结论
In this blog, We saw how to write our first DAG and execute it. We saw how to instantiate a DAG object and Create a task and a callable function. 在这篇博客中,我们看到了如何编写第一个 DAG 并执行它。我们了解了如何实例化 DAG 对象和创建任务和可调用函数。
相关文章
- 采用开源Zabbix+500块硬件平替5万块动环检测系统,实现UPS、温湿度、烟雾等数据采集、存储、告警、大屏展示
- 驱动开发:WinDBG 常用调试命令总结
- 中小企业快速合规,快速部署开源堡垒机TELEPORT
- 驱动开发:监控进程与线程对象操作
- 客快物流大数据项目(九十六):ClickHouse的VersionedCollapsingMergeTree深入了解
- 软件测试|selenium屏幕操作事件TouchActions
- FPS游戏:视场角矩阵的特点
- Path Finder for Mac(强大的文件管理工具)v2149中文激活版
- 零售行业R公司对接亚马逊Amazon Device EDI项目案例
- FPS 游戏:快速寻找基址的方法
- 7min到40s:SpringBoot启动优化实践
- 客快物流大数据项目(九十九):Clickhouse中update/delete的使用
- 手把手教你使用CNN进行交通标志识别(已开源)
- 软件测试|selenium三种等待方式
- FPS游戏:实现人物定点瞬移
- 人力资源行业数据特点解析
- PE格式:实现PE文件特征码识别
- PE格式:实现VA与FOA之间的转换
- PE格式:导入表与IAT内存修正
- PE格式:手工实现IAT导入表注入劫持