在airflow中使用的時間是utc時間,而更多時候我們希望的是使用本地時間,於是在定義airflow定時任務的時候,涉及到了時間的轉換。
1.python中本地時間和utc時間的轉換
查看國內可用時區:
>>> import pytz
>>> pytz.country_timezones('cn')
['Asia/Shanghai', 'Asia/Urumqi']
方式一:
修改配置文件airflow.conf
使用操作系統時間
[core]
default_timezone=system
方式二:
tz = pytz.timezone('Asia/Shanghai')
naive = datetime.strptime("2018-06-13 17:40:00", "%Y-%m-%d %H:%M:%S")
local_dt = tz.localize(naive, is_dst=None)
utc_dt = local_dt.astimezone(pytz.utc)
參考鏈接: https://stackoverflow.com/questions/79797/how-do-i-convert-local-time-to-utc-in-python
方式三:
tz = pytz.timezone('Asia/Shanghai')
dt = datetime(2018, 6, 13, 17, 40, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc)
2.Airflow中通過時間轉換使用本地時間
這里涉及到一個問題,如果只是將本地時間轉換成了utc時間,那么在運行過程中airflow會拋出以下錯誤:
Can't subtract offset-naive and offset-aware datetimes
解決辦法是當將時間轉換為utc時間之后將其時區屬性設為None:
dt.replace(tzinfo=None)
參考鏈接: https://stackoverflow.com/questions/796008/cant-subtract-offset-naive-and-offset-aware-datetimes
完整示例DAG如下:
import time
import airflow
import pytz
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import timedelta, datetime
default_args = {
'owner': 'cord',
'depends_on_past': True,
'email': ['123456@qq.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
tz = pytz.timezone('Asia/Shanghai')
# naive = datetime.strptime("2018-06-13 17:40:00", "%Y-%m-%d %H:%M:%S")
# local_dt = tz.localize(naive, is_dst=None)
# utc_dt = local_dt.astimezone(pytz.utc).replace(tzinfo=None)
dt = datetime(2018, 6, 13, 17, 40, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)
dag = DAG(
'demo',
default_args=default_args,
description='my DAG',
schedule_interval='* */3 * * *',
start_date=utc_dt
)
def test_func(str):
print(str)
task = PythonOperator(
task_id='hello',
python_callable=test_func,
op_kwargs={'str': 'hello world'},
dag=dag
)