airflow使用SimpleHttpOperator實現http調用任務


使用SimpleHttpOperator作為處理器的時候,會發現默認訪問的地址www.google.com端口為443

例如下面這樣定義的任務

task = SimpleHttpOperator(
    task_id='get_op',
    http_conn_id='http_test',
    method='GET',
    endpoint='test1',
    data={},
    headers={},
    dag=dag)

在運行的時候會拋出如下異常:

Subtask: During handling of the above exception, another exception occurred:
......
File "/data1/bigdata/anaconda3.5/lib/python3.6/site-packages/airflow/operators/http_operator.py", line 82, in execute
  self.extra_options)
File "/data1/bigdata/anaconda3.5/lib/python3.6/site-packages/airflow/hooks/http_hook.py", line 86, in run
  return self.run_and_check(session, prepped_request, extra_options)
File "/data1/bigdata/anaconda3.5/lib/python3.6/site-packages/airflow/hooks/http_hook.py", line 102, in run_and_check
  allow_redirects=extra_options.get("allow_redirects", True))
......
Subtask: requests.exceptions.ConnectionError: HTTPSConnectionPool(host='www.google.com', port=443): Max retries exceeded with url: /test1 (Caused by NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x2ac347314940>: Failed to establish a new connection: [Errno 101] Network is unreachable',))

說明http請求的host需要進行配置,不然默認訪問谷歌域名.

查看源碼:

http_hook.py

    def get_conn(self, headers):
		......
        conn = self.get_connection(self.http_conn_id)
        session = requests.Session()
        self.base_url = conn.host
        if not self.base_url.startswith('http'):
            self.base_url = 'http://' + self.base_url
		......

base_hook.py

    def get_connection(cls, conn_id):
        environment_uri = os.environ.get(CONN_ENV_PREFIX + conn_id.upper())
        conn = None
        if environment_uri:
            conn = Connection(conn_id=conn_id, uri=environment_uri)
        else:
            conn = random.choice(cls.get_connections(conn_id))
        if conn.host:
            logging.info("Using connection to: " + conn.host)
        return conn

通過源碼得知,airflow會先讀取環境變量看是否有自定義uri,如果有的話使用自定義的uri,如果沒有的話則使用內置的默認值。

而環境變量的定義規則是AIRFLOW_CONN_前綴加上http_conn_id的大寫形式

例如上述例子中的任務,可以通過設置環境變量export AIRFLOW_CONN_HTTP_TEST=http://localhost:8080來實現。

同時也可以在python代碼中動態設置:

os.environ['AIRFLOW_CONN_HTTP_TEST']='http://localhost:8080'

一般推薦在代碼中動態設置.

SimpleHttpOperator的幾種常見用法如下(官方示例):

t1 = SimpleHttpOperator(
    task_id='post_op',
    endpoint='api/v1.0/nodes',
    data=json.dumps({"priority": 5}),
    headers={"Content-Type": "application/json"},
    response_check=lambda response: True if len(response.json()) == 0 else False,
    dag=dag)

t5 = SimpleHttpOperator(
    task_id='post_op_formenc',
    endpoint='nodes/url',
    data="name=Joe",
    headers={"Content-Type": "application/x-www-form-urlencoded"},
    dag=dag)

t2 = SimpleHttpOperator(
    task_id='get_op',
    method='GET',
    endpoint='api/v1.0/nodes',
    data={"param1": "value1", "param2": "value2"},
    headers={},
    dag=dag)

t3 = SimpleHttpOperator(
    task_id='put_op',
    method='PUT',
    endpoint='api/v1.0/nodes',
    data=json.dumps({"priority": 5}),
    headers={"Content-Type": "application/json"},
    dag=dag)

t4 = SimpleHttpOperator(
    task_id='del_op',
    method='DELETE',
    endpoint='api/v1.0/nodes',
    data="some=data",
    headers={"Content-Type": "application/x-www-form-urlencoded"},
    dag=dag)

完整示例如下:

import os
from datetime import timedelta, datetime
import pytz
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.models import DAG

default_args = {
    'owner': 'cord',
    'depends_on_past': False,
    'wait_for_downstream': True,
    'execution_timeout': timedelta(minutes=3),
    'email': ['123456789@qq.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

tz = pytz.timezone('Asia/Shanghai')
dt = datetime(2018, 7, 26, 12, 20, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)
os.environ['AIRFLOW_CONN_HTTP_TEST']='http://localhost:9090'

dag = DAG(
    'bm01',
    default_args=default_args,
    description='my DAG',
    schedule_interval='*/2 * * * *',
    start_date=utc_dt
)

task1 = SimpleHttpOperator(
    task_id='get_op1',
    http_conn_id='http_test',
    method='GET',
    endpoint='test1',
    data={},
    headers={},
    dag=dag)

task2 = SimpleHttpOperator(
    task_id='get_op2',
    http_conn_id='http_test',
    method='GET',
    endpoint='test2',
    data={},
    headers={},
    dag=dag)

task1 >> task2

​ 另外,這里SimpleHttpOperator發出的HTTP請求是阻塞的,也就是說在依賴任務中,只有上游任務執行完成返回之后才會去執行下游任務。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM