Airflow概念


DAGS(Directed Acyclic Graphs)[有向無環圖]

  • DAG是要運行的任務的一組集合, 反應了這些任務間的關系及依賴。

    1586579893121

Operators and Tasks

  • DAGS 並不執行任何實際的計算, 相反Operator(操作算子)決定了到底要做什么。

  • Task(任務): 一旦一個算子被初始化, 那么它就被引用為一個 task。每個算子描述了工作流中的單個任務。

    • 初始化一個任務需要提供一個唯一的任務id及DAG容器。
  • 一個DAG就是一個用於組織任務集並執行它們的內容的容器。

  • Operators 分類:

    • Sensors(傳感器)

      • 會保持運行直到達到了一個特定標准的一類算子。該標准可以是: 等待一定時間, 獲取外部文件或者 獲取上游數據源。

      • HdfsSensor: 等待一個文件或文件夾加載到HDFS

      • NamedHivePartitionSensor: 檢查最近分區的Hive表是否可以被下游使用執行

      • 其實你把項目clone下來看一下會發現有很多的現成sensors已經寫好了(airflow\airflow\contrib\sensors目錄下):

        1586581462984

    • Operators(操作算子)

      • 會觸發特定的行為(比如運行一個bash命令, 執行一個python 函數, 或者執行一個Hive查詢......)

      • BashOperator: 執行一個bash命令

      • PythonOperator: 執行任意python函數

      • HiveOperator: 在特定Hive數據庫中執行 hql 代碼或者Hive腳本

      • BigQueryOperator:: 在指定Google BigQuery 數據庫中執行Google BigQuery SQL查詢

      • airflow\airflow\contrib\operators目錄下也有很多現成的 operator

        1586581784507

        1586581896555

        1586581924882

        • 我們看到有Spark提交相關操作算子, 但是沒有Flink的, 所以后續可能得要自己實現了。
    • Transfers: 將數據從一個位置移至另一個位置

      • MySqlToHiveTransfer: 將數據從MySQL移至Hive
      • S3ToRedshiftTransfer: 將數據從Amazon S3移至 Redshift
      • 看了下, 這類算子和會觸發特定行為的操作算子混合放在airflow\airflow\contrib\operators目錄下。

接下來我們來看一下某段調度Google Big Query的代碼

import json
from datetime import timedelta, datetime

from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,    
    'start_date': datetime(2018, 12, 1),
    'end_date': datetime(2018, 12, 5),
    'email': ['airflow@airflow.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

# Set Schedule: Run pipeline once a day.  每天運行一次管道
# Use cron to define exact time. Eg. 8:15am would be "15 08 * * *"
# 每天晚上9點調度
schedule_interval = "00 21 * * *"

# Define DAG: Set ID and assign default args and schedule interval
# 定義DAG, 設置ID並重新標出默認參數及調度間隔
dag = DAG(
    'bigquery_github_trends', 
    default_args=default_args, 
    schedule_interval=schedule_interval
    )

# Config variables 配置變量
BQ_CONN_ID = "my_gcp_conn"
BQ_PROJECT = "my-bq-project"
BQ_DATASET = "my-bq-dataset"

## Task 1: check that the github archive data has a dated table created for that date
## 任務1: 檢查github archive 數據是否有過時的表
# To test this task, run this command: 
# docker-compose -f docker-compose-gcloud.yml run --rm webserver airflow test bigquery_github_trends bq_check_githubarchive_day 2018-12-01
t1 = BigQueryCheckOperator(
        task_id='bq_check_githubarchive_day',
        sql='''
        #standardSQL
        SELECT
          table_id
        FROM
          `githubarchive.day.__TABLES_SUMMARY__`
        WHERE
          table_id = "{{ yesterday_ds_nodash }}"
        ''',
        use_legacy_sql=False,
        bigquery_conn_id=BQ_CONN_ID,
        dag=dag
    )

## Task 2: check that the hacker news table contains data for that date.
## 任務2: 檢查hacker new表是否含有該日的數據
t2 = BigQueryCheckOperator(
        task_id='bq_check_hackernews_full',
        sql='''
        #standardSQL
        SELECT
          FORMAT_TIMESTAMP("%Y%m%d", timestamp ) AS date
        FROM
          `bigquery-public-data.hacker_news.full`
        WHERE
          type = 'story'
          AND FORMAT_TIMESTAMP("%Y%m%d", timestamp ) = "{{ yesterday_ds_nodash }}"
        LIMIT
          1
        ''',
        use_legacy_sql=False,
        bigquery_conn_id=BQ_CONN_ID,
        dag=dag
    )

## Task 3: create a github daily metrics partition table
## 任務3: 創建一個github每日衡量指標的分區表
t3 = BigQueryOperator(
        task_id='bq_write_to_github_daily_metrics',    
        sql='''
        #standardSQL
        SELECT
          date,
          repo,
          SUM(IF(type='WatchEvent', 1, NULL)) AS stars,
          SUM(IF(type='ForkEvent',  1, NULL)) AS forks
        FROM (
          SELECT
            FORMAT_TIMESTAMP("%Y%m%d", created_at) AS date,
            actor.id as actor_id,
            repo.name as repo,
            type
          FROM
            `githubarchive.day.{{ yesterday_ds_nodash }}`
          WHERE type IN ('WatchEvent','ForkEvent')
        )
        GROUP BY
          date,
          repo
        ''',
        destination_dataset_table='{0}.{1}.github_daily_metrics${2}'.format(
            BQ_PROJECT, BQ_DATASET, '{{ yesterday_ds_nodash }}'
        ),    
        write_disposition='WRITE_TRUNCATE',
        allow_large_results=True,
        use_legacy_sql=False,
        bigquery_conn_id=BQ_CONN_ID,
        dag=dag
    )

## Task 4: aggregate past github events to daily partition table
## 聚合過去的github事件到每日分區表
t4 = BigQueryOperator(
        task_id='bq_write_to_github_agg',    
        sql='''
        #standardSQL
        SELECT
          "{2}" as date,
          repo,
          SUM(stars) as stars_last_28_days,
          SUM(IF(_PARTITIONTIME BETWEEN TIMESTAMP("{4}") 
            AND TIMESTAMP("{3}") , 
            stars, null)) as stars_last_7_days,
          SUM(IF(_PARTITIONTIME BETWEEN TIMESTAMP("{3}") 
            AND TIMESTAMP("{3}") , 
            stars, null)) as stars_last_1_day,
          SUM(forks) as forks_last_28_days,
          SUM(IF(_PARTITIONTIME BETWEEN TIMESTAMP("{4}") 
            AND TIMESTAMP("{3}") , 
            forks, null)) as forks_last_7_days,
          SUM(IF(_PARTITIONTIME BETWEEN TIMESTAMP("{3}") 
            AND TIMESTAMP("{3}") , 
            forks, null)) as forks_last_1_day
        FROM
          `{0}.{1}.github_daily_metrics`
        WHERE _PARTITIONTIME BETWEEN TIMESTAMP("{5}") 
        AND TIMESTAMP("{3}") 
        GROUP BY
          date,
          repo
        '''.format(BQ_PROJECT, BQ_DATASET,
            "{{ yesterday_ds_nodash }}", "{{ yesterday_ds }}",
            "{{ macros.ds_add(ds, -6) }}",
            "{{ macros.ds_add(ds, -27) }}"
            )
        ,
        destination_dataset_table='{0}.{1}.github_agg${2}'.format(
            BQ_PROJECT, BQ_DATASET, '{{ yesterday_ds_nodash }}'
        ),
        write_disposition='WRITE_TRUNCATE',
        allow_large_results=True,
        use_legacy_sql=False,
        bigquery_conn_id=BQ_CONN_ID,
        dag=dag
    )

# Task 5: aggregate hacker news data to a daily partition table
# 聚合黑客新聞數據到一個每日分區表
t5 = BigQueryOperator(
    task_id='bq_write_to_hackernews_agg',    
    sql='''
    #standardSQL
    SELECT
      FORMAT_TIMESTAMP("%Y%m%d", timestamp) AS date,
      `by` AS submitter,
      id as story_id,
      REGEXP_EXTRACT(url, "(https?://github.com/[^/]*/[^/#?]*)") as url,
      SUM(score) as score
    FROM
      `bigquery-public-data.hacker_news.full`
    WHERE
      type = 'story'
      AND timestamp>'{{ yesterday_ds }}'
      AND timestamp<'{{ ds }}'
      AND url LIKE '%https://github.com%'
      AND url NOT LIKE '%github.com/blog/%'
    GROUP BY
      date,
      submitter,
      story_id,
      url
    ''',
    destination_dataset_table='{0}.{1}.hackernews_agg${2}'.format(
        BQ_PROJECT, BQ_DATASET, '{{ yesterday_ds_nodash }}'
    ),
    write_disposition='WRITE_TRUNCATE',
    allow_large_results=True,
    use_legacy_sql=False,
    bigquery_conn_id=BQ_CONN_ID,
    dag=dag
    )

# Task 6: join the aggregate tables 加到聚合表中
t6 = BigQueryOperator(
    task_id='bq_write_to_hackernews_github_agg',    
    sql='''
    #standardSQL
    SELECT 
    a.date as date,
    a.url as github_url,
    b.repo as github_repo,
    a.score as hn_score,
    a.story_id as hn_story_id,
    b.stars_last_28_days as stars_last_28_days,
    b.stars_last_7_days as stars_last_7_days,
    b.stars_last_1_day as stars_last_1_day,
    b.forks_last_28_days as forks_last_28_days,
    b.forks_last_7_days as forks_last_7_days,
    b.forks_last_1_day as forks_last_1_day
    FROM
    (SELECT
      *
    FROM
      `{0}.{1}.hackernews_agg`
      WHERE _PARTITIONTIME BETWEEN TIMESTAMP("{2}") AND TIMESTAMP("{2}")
      )as a
    LEFT JOIN 
      (
      SELECT 
      repo,
      CONCAT('https://github.com/', repo) as url,
      stars_last_28_days,
      stars_last_7_days,
      stars_last_1_day,
      forks_last_28_days,
      forks_last_7_days,
      forks_last_1_day
      FROM
      `{0}.{1}.github_agg`
      WHERE _PARTITIONTIME BETWEEN TIMESTAMP("{2}") AND TIMESTAMP("{2}")
      ) as b
    ON a.url = b.url
    '''.format(
            BQ_PROJECT, BQ_DATASET, "{{ yesterday_ds }}"
        ),
    destination_dataset_table='{0}.{1}.hackernews_github_agg${2}'.format(
        BQ_PROJECT, BQ_DATASET, '{{ yesterday_ds_nodash }}'
    ),
    write_disposition='WRITE_TRUNCATE',
    allow_large_results=True,
    use_legacy_sql=False,
    bigquery_conn_id=BQ_CONN_ID,
    dag=dag
    )

# Task 7: Check if partition data is written successfully
# 檢查分區數據是否被成功寫入
t7 = BigQueryCheckOperator(
    task_id='bq_check_hackernews_github_agg',
    sql='''
    #standardSQL
    SELECT
        COUNT(*) AS rows_in_partition
    FROM `{0}.{1}.hackernews_github_agg`    
    WHERE _PARTITIONDATE = "{2}"
    '''.format(BQ_PROJECT, BQ_DATASET, '{{ yesterday_ds }}'
        ),
    use_legacy_sql=False,
    bigquery_conn_id=BQ_CONN_ID,
    dag=dag)

# Setting up Dependencies 設置依賴
t3.set_upstream(t1)
t4.set_upstream(t3)
t5.set_upstream(t2)
t6.set_upstream(t4)
t6.set_upstream(t5)
t7.set_upstream(t6

將數據加載到hdfs, 再通過Hive將數據加載到MySQL的代碼

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.hive_operator import HiveOperator
from datetime import date, timedelta

# --------------------------------------------------------------------------------
# Create a few placeholder scripts. 創建一些占位符腳本
# In practice these would be different python script files, which are imported in this section with absolute or relative imports
# 在嘗試中這些會是不同的python腳本文件, 在此部分會被以相對或絕對路徑的方式導入
# --------------------------------------------------------------------------------


def fetchtweets():
    return None


def cleantweets():
    return None


def analyzetweets():
    return None


def transfertodb():
    return None


# --------------------------------------------------------------------------------
# set default arguments 設置默認參數
# --------------------------------------------------------------------------------

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    'example_twitter_dag', default_args=default_args,
    schedule_interval="@daily")

# --------------------------------------------------------------------------------
# This task should call Twitter API and retrieve tweets from yesterday from and to
# for the four twitter users (Twitter_A,..,Twitter_D) 
# 此任務應該會調用Twitter的API並取回昨日四位twitter用戶的往來信息
# There should be eight csv output files generated by this task and naming convention
# is direction(from or to)_twitterHandle_date.csv
# 應當會有8個 csv輸出文件由此任務生成, 並慣例命名為 數據來往方向_twitterHandle_date.csv
# --------------------------------------------------------------------------------

fetch_tweets = PythonOperator(
    task_id='fetch_tweets',
    python_callable=fetchtweets,
    dag=dag)

# --------------------------------------------------------------------------------
# Clean the eight files. 清理此8個文件
# In this step you can get rid of or cherry pick columns
# 在此階段你可以完全丟棄或者選擇某幾個分支並改變部分文檔
# and different parts of the text
# --------------------------------------------------------------------------------

clean_tweets = PythonOperator(
    task_id='clean_tweets',
    python_callable=cleantweets,
    dag=dag)

clean_tweets.set_upstream(fetch_tweets)

# --------------------------------------------------------------------------------
# In this section you can use a script to analyze the twitter data. 
# 在此部分你可以使用一個腳本來分析twitter數據
# Could simply be a sentiment analysis through algorithms like bag of words or something more
# complicated.
# 可以僅是一個通過算法獲取(如bag of words模型或或跟復雜的)觀點分析。
# You can also take a look at Web Services to do such tasks
# 你也可以嘗試用web服務來做這些任務
# --------------------------------------------------------------------------------

analyze_tweets = PythonOperator(
    task_id='analyze_tweets',
    python_callable=analyzetweets,
    dag=dag)

analyze_tweets.set_upstream(clean_tweets)

# --------------------------------------------------------------------------------
# Although this is the last task, we need to declare it before the next tasks as we
# will use set_downstream This task will extract summary from Hive data and store
# it to MySQL
# 盡管這是最后一個任務, 我們需要在下一個任務之前申明它, 當我們設置下游時, 該任務會抽取Hive中的總結數據並存入MySQL
# --------------------------------------------------------------------------------

hive_to_mysql = PythonOperator(
    task_id='hive_to_mysql',
    python_callable=transfertodb,
    dag=dag)

# --------------------------------------------------------------------------------
# The following tasks are generated using for loop. The first task puts the eight
# csv files to HDFS. 
# 接下來的任務通過循環生成。第一個任務把8個csv文件放到hdfs
# The second task loads these files from HDFS to respected Hive
# tables. 
# 第二個任務將這些文件從hdfs同步到Hive表中
# These two for loops could be combined into one loop. 
# 這兩個循環可以被合並為一個
# However, in most cases, you will be running different analysis on your incoming incoming and 
# outgoing tweets, and hence they are kept separated in this example.
# 然而, 大多數情況下, 你會根據你的獲取的和輸出的消息進行不同的分析, 所以它們在此例子中是被分離的。
# --------------------------------------------------------------------------------

from_channels = ['fromTwitter_A', 'fromTwitter_B', 'fromTwitter_C', 'fromTwitter_D']
to_channels = ['toTwitter_A', 'toTwitter_B', 'toTwitter_C', 'toTwitter_D']
yesterday = date.today() - timedelta(days=1)
dt = yesterday.strftime("%Y-%m-%d")
# define where you want to store the tweets csv file in your local directory
# 定義你想要把csv文件發送到本地目錄哪里
local_dir = "/tmp/"
# define the location where you want to store in HDFS
# 定義你想把csv文件存到hdfs目錄哪里
hdfs_dir = " /tmp/"

for channel in to_channels:

    file_name = "to_" + channel + "_" + yesterday.strftime("%Y-%m-%d") + ".csv"

    load_to_hdfs = BashOperator(
        task_id="put_" + channel + "_to_hdfs",
        bash_command="HADOOP_USER_NAME=hdfs hadoop fs -put -f " +
                     local_dir + file_name +
                     hdfs_dir + channel + "/",
        dag=dag)

    load_to_hdfs.set_upstream(analyze_tweets)

    load_to_hive = HiveOperator(
        task_id="load_" + channel + "_to_hive",
        hql="LOAD DATA INPATH '" +
            hdfs_dir + channel + "/" + file_name + "' "
            "INTO TABLE " + channel + " "
            "PARTITION(dt='" + dt + "')",
        dag=dag)
    load_to_hive.set_upstream(load_to_hdfs)
    load_to_hive.set_downstream(hive_to_mysql)

for channel in from_channels:
    file_name = "from_" + channel + "_" + yesterday.strftime("%Y-%m-%d") + ".csv"
    load_to_hdfs = BashOperator(
        task_id="put_" + channel + "_to_hdfs",
        bash_command="HADOOP_USER_NAME=hdfs hadoop fs -put -f " +
                     local_dir + file_name +
                     hdfs_dir + channel + "/",
        dag=dag)

    load_to_hdfs.set_upstream(analyze_tweets)

    load_to_hive = HiveOperator(
        task_id="load_" + channel + "_to_hive",
        hql="LOAD DATA INPATH '" +
            hdfs_dir + channel + "/" + file_name + "' "
            "INTO TABLE " + channel + " "
            "PARTITION(dt='" + dt + "')",
        dag=dag)

    load_to_hive.set_upstream(load_to_hdfs)
    load_to_hive.set_downstream(hive_to_mysql)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM