ansible 的callback插件自定義


  由於ansible在2.0版本的時候,python api發生較大改變,所以一直在使用ansible的1.9版本。

在之前學習的過程中,在執行playbook的時候,在執行shell的命令的時候,由於當前屏幕不輸出命令結果,一直認為是ansible不返回命令結果。最近在寫自動化平台的時候,需要捕獲ansible的playbook的shell結果。

所以找官網和資料研究下。ansible其實提供接口給咱們對執行返回的數據進行重定向。我們可以根據自己的需求,寫自己的callback'插件。如下例子:

 1 import os
 2 import time
 3 import json
 4 
 5 TIME_FORMAT="%b %d %Y %H:%M:%S"##日期格式
 6 MSG_FORMAT="%(now)s - %(category)s - %(data)s\n\n"##消息的格式出入字典!
 7 
 8 if not os.path.exists("/var/log/ansible/hosts"):#ansible執行結果存放的目錄。
 9     os.makedirs("/var/log/ansible/hosts")
10 
11 def log(host, category, data):##該函數主要是作用,將返回的數據以日志的形式寫入文件中。host:表示當前執行的主機IP,category:執行狀態比如OK  unreachable、faile等。
12     if type(data) == dict:#判斷返回的數據類型。
13         if 'verbose_override' in data:#如果字典的key值中含有關鍵字:verbose_override表示該信息是ansible返回的facts的元素數據。不需要進行額外記錄。
14             # avoid logging extraneous data from facts
15             data = 'omitted'
16         else:
17             data = data.copy()#拷貝正常數據,如果一個字典是不可變的,我們可以進行copy然后進行修改。
18             invocation = data.pop('invocation', None)#invocation的對應的key是當前用戶調用的模塊以及模塊參數。
19             data = json.dumps(data)#進行序列化轉化成字符串。
20             if invocation is not None:#如果模塊不為空的話,拼湊數據結構。
21                 data = json.dumps(invocation) + " => %s " % data
22 
23     path = os.path.join("/var/log/ansible/hosts", host)#拼接結果文件,以IP為名字的。
24     now = time.strftime(TIME_FORMAT, time.localtime())#時間串。
25     fd = open(path, "a")#寫文件。
26     fd.write(MSG_FORMAT % dict(now=now, category=category, data=data))
27     fd.close()
28 
29 class CallbackModule(object):
30     """
31     logs playbook results, per host, in /var/log/ansible/hosts 該類是處理不同的情況下的ansible的產生的結果的進行不同的處理。可以根據自己的需求進行篩選自己的需要的函數。
32     """
33 
34     def on_any(self, *args, **kwargs):
35         pass
36 
37     def runner_on_failed(self, host, res, ignore_errors=False):#執行失敗的時候。
38         log(host, 'FAILED', res)
39 
40     def runner_on_ok(self, host, res):#正常的結果。
41         log(host, 'OK', res)
42 
43     def runner_on_skipped(self, host, item=None):#忽略!
44         log(host, 'SKIPPED', '...')
45 
46     def runner_on_unreachable(self, host, res):#主機不可達!
47         log(host, 'UNREACHABLE', res)
48 
49     def runner_on_no_hosts(self):
50         pass
51 
52     def runner_on_async_poll(self, host, res, jid, clock):
53         pass
54 
55     def runner_on_async_ok(self, host, res, jid):
56         pass
57 
58     def runner_on_async_failed(self, host, res, jid):
59         log(host, 'ASYNC_FAILED', res)
60 
61     def playbook_on_start(self):
62         pass
63 
64     def playbook_on_notify(self, host, handler):
65         pass
66 
67     def playbook_on_no_hosts_matched(self):
68         pass
69 
70     def playbook_on_no_hosts_remaining(self):
71         pass
72 
73     def playbook_on_task_start(self, name, is_conditional):
74         pass
75 
76     def playbook_on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None):
77         pass
78 
79     def playbook_on_setup(self):
80         pass
81 
82     def playbook_on_import_for_host(self, host, imported_file):
83         log(host, 'IMPORTED', imported_file)
84 
85     def playbook_on_not_import_for_host(self, host, missing_file):
86         log(host, 'NOTIMPORTED', missing_file)
87 
88     def playbook_on_play_start(self, name):
89         pass
90 
91     def playbook_on_stats(self, stats):
92         pass

 該文件命名為:log_plays.py

進行執行:

yaml文件:

1 $ cat check.yml 
2 ---
3 - name: lmd 
4   hosts: test 
5   remote_user: bjliumeide 
6   tasks:
7      - name: ensure apache is at the latest version
8        shell: w 

 

host文件:

1 [test]
2 172.17.33.75
3 172.17.33.74
4 127.0.0.1

 

去對應的目錄查看:

查看到生成對應的以IP為命令的文件。

其中一個文件內容:

類似如上內容。

如上的log_plays.py有個問題:

1         if 'verbose_override' in data:
2             # avoid logging extraneous data from facts
3             data = 'omitted'

 

 返回上面的data中如果有facts的結果的話,我們結果文件中還是有該條記錄。對log_plays.py進行修改之后:

 1 import os
 2 import time
 3 import json
 4 
 5 
 6 TIME_FORMAT="%b %d %Y %H:%M:%S"
 7 MSG_FORMAT="%(now)s - %(category)s - %(data)s\n\n"
 8 
 9 if not os.path.exists("/export/bjliumeide/log/ansible/hosts"):
10     os.makedirs("/export/bjliumeide/log/ansible/hosts")
11 
12 def log(host, category, data):
13     if type(data) == dict:
14         if 'verbose_override' in data:
15             # avoid logging extraneous data from facts
16             data = 'omitted'
17         else:
18             data = data.copy()
19             invocation = data.pop('invocation', None)
20             #data = json.dumps(data)
21             #if invocation is not None:
22              #   data = json.dumps(invocation) + " => %s " % data
23             path = os.path.join("/export/bjliumeide/log/ansible/hosts", host)
24             now = time.strftime(TIME_FORMAT, time.localtime())
25             fd = open(path, "a")
26             fd.write(MSG_FORMAT % dict(now=now, category=category, data=data))
27             fd.close()
28     else:##對於返回的結果不是字典的情況進行處理。常常是報錯。
29         
30         path = os.path.join("/export/bjliumeide/log/ansible/hosts", host)
31         now = time.strftime(TIME_FORMAT, time.localtime())
32         fd = open(path, "a")
33         fd.write(MSG_FORMAT % dict(now=now, category=category, data=data))
34         fd.close()
35 
36 class CallbackModule(object):
37     """
38     logs playbook results, per host, in /var/log/ansible/hosts
39     """
40 
41     def on_any(self, *args, **kwargs):
42         pass
43 
44     def runner_on_failed(self, host, res, ignore_errors=False):
45         log(host, 'FAILED', res)
46 
47     def runner_on_ok(self, host, res):
48         log(host, 'OK', res)
49 
50     def runner_on_skipped(self, host, item=None):
51         log(host, 'SKIPPED', '...')
52 
53     def runner_on_unreachable(self, host, res):
54         log(host, 'UNREACHABLE', res)
55 
56     def runner_on_no_hosts(self):
57         pass
58 
59     def runner_on_async_poll(self, host, res, jid, clock):
60         pass
61 
62     def runner_on_async_ok(self, host, res, jid):
63         pass
64 
65     def runner_on_async_failed(self, host, res, jid):
66         log(host, 'ASYNC_FAILED', res)
67 
68     def playbook_on_start(self):
69         pass
70 
71     def playbook_on_notify(self, host, handler):
72         pass
73 
74     def playbook_on_no_hosts_matched(self):
75         pass
76 
77     def playbook_on_no_hosts_remaining(self):
78         pass
79 
80     def playbook_on_task_start(self, name, is_conditional):
81         pass
82 
83     def playbook_on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None):
84         pass
85 
86     def playbook_on_setup(self):
87         pass
88 
89     def playbook_on_import_for_host(self, host, imported_file):
90         log(host, 'IMPORTED', imported_file)
91 
92     def playbook_on_not_import_for_host(self, host, missing_file):
93         log(host, 'NOTIMPORTED', missing_file)
94 
95     def playbook_on_play_start(self, name):
96         pass
97 
98     def playbook_on_stats(self, stats):
99         pass

 修改項:

  • facts數據不在寫入執行結果文件中。
  • 對於返回的結果不是字典的情況下(報錯返回的是字符串),進行分別處理。

 對如上進行修改方便我們進行入庫操作。方便前台查詢。

接下來進行數據庫的操作代替文件操作。

 難點:

我們現在即使改成插入數據庫的操作,但是有一個問題:我們無法確認該執行任務 改如何插入數據庫中那個任務id對應的任務IP的任務結果內容。經查看官網的插件例子里。發現如下的介紹內容:

context_demo.py

 1 import os
 2 import time
 3 import json
 4 
 5 class CallbackModule(object):
 6     """
 7     This is a very trivial example of how any callback function can get at play and task objects.
 8     play will be 'None' for runner invocations, and task will be None for 'setup' invocations.
 9     """
10 
11     def on_any(self, *args, **kwargs):
12         play = getattr(self, 'play', None)
13         task = getattr(self, 'task', None)
14 print "play = %s, task = %s, args = %s, kwargs = %s" % (play,task,args,kwargs)

xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

log_plays.py內容:

  1 import os
  2 import time
  3 import json
  4 
  5 TIME_FORMAT="%b %d %Y %H:%M:%S"
  6 MSG_FORMAT="%(now)s - %(category)s - %(data)s\n\n"
  7 
  8 if not os.path.exists("/export/bjliumeide/log/ansible/hosts"):
  9     os.makedirs("/export/bjliumeide/log/ansible/hosts")
 10 
 11 def log(host, category, data):
 12     if type(data) == dict:
 13         if 'verbose_override' in data:
 14             # avoid logging extraneous data from facts
 15             #data = 'omitted'
 16             pass
 17         else:
 18             data = data.copy()
 19             invocation = data.pop('invocation', None)
 20             #data = json.dumps(data)
 21             #if invocation is not None:
 22              #   data = json.dumps(invocation) + " => %s " % data
 23         path = os.path.join("/export/bjliumeide/log/ansible/hosts", host)
 24         now = time.strftime(TIME_FORMAT, time.localtime())
 25         fd = open(path, "a")
 26         fd.write(MSG_FORMAT % dict(now=now, category=category, data=data))
 27         fd.close()
 28     else:
 29         
 30         path = os.path.join("/export/bjliumeide/log/ansible/hosts", host)
 31         now = time.strftime(TIME_FORMAT, time.localtime())
 32         fd = open(path, "a")
 33         fd.write(MSG_FORMAT % dict(now=now, category=category, data=data))
 34         fd.close()
 35 
 36 class CallbackModule(object):
 37     """
 38     logs playbook results, per host, in /var/log/ansible/hosts
 39     """
 40 
 41     def on_any(self, *args, **kwargs):
 42         pass
 43 
 44     def runner_on_failed(self, host, res, ignore_errors=False):
 45         log(host, 'FAILED', res)
 46 
 47     def runner_on_ok(self, host, res):
 48         task=getattr(self,'task',None)
 49         log(host,task.name, res)
 50     def runner_on_skipped(self, host, item=None):
 51         log(host, 'SKIPPED', '...')
 52 
 53     def runner_on_unreachable(self, host, res):
 54         task=getattr(self,'task',None)
 55         if task:
 56             log(host,task.name,res)#返回task.name
 57         else:
 58             pass
 59 
 60     def runner_on_no_hosts(self):
 61         pass
 62 
 63     def runner_on_async_poll(self, host, res, jid, clock):
 64         pass
 65 
 66     def runner_on_async_ok(self, host, res, jid):
 67         pass
 68 
 69     def runner_on_async_failed(self, host, res, jid):
 70         log(host, 'ASYNC_FAILED', res)
 71 
 72     def playbook_on_start(self):
 73         pass
 74 
 75     def playbook_on_notify(self, host, handler):
 76         pass
 77 
 78     def playbook_on_no_hosts_matched(self):
 79         pass
 80 
 81     def playbook_on_no_hosts_remaining(self):
 82         pass
 83 
 84     def playbook_on_task_start(self, name, is_conditional):
 85         pass
 86 
 87     def playbook_on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None):
 88         pass
 89 
 90     def playbook_on_setup(self):
 91         pass
 92 
 93     def playbook_on_import_for_host(self, host, imported_file):
 94         log(host, 'IMPORTED', imported_file)
 95 
 96     def playbook_on_not_import_for_host(self, host, missing_file):
 97         log(host, 'NOTIMPORTED', missing_file)
 98 
 99     def playbook_on_play_start(self, name):
100         pass
101 
102     def playbook_on_stats(self, stats):
103         pass

然后我們故意輸錯密碼:

 1 $ ansible-playbook  check.yml  -k
 2 SSH password: 
 3 
 4 PLAY [lmd] ******************************************************************** 
 5 
 6 TASK: [121] ******************************************************************* 
 7 fatal: [172.17.3.11] => Authentication failure.
 8 
 9 FATAL: all hosts have already failed -- aborting
10 
11 PLAY RECAP ******************************************************************** 
12            to retry, use: --limit @/export/bjliumeide/check.retry
13 
14 172.17.3.11                : ok=0    changed=0    unreachable=1    failed=0   

看到任務執行結果:unreachable

在來看下結果文件的內容是否有我們想要的121

這樣我就和我們任務task_id關聯起來了。原打算將ansible服務器直接寫入數據庫中。最后還是選擇在ansible將數據post到db端,進行入庫。

修改后log_play.py

  1 #coding=utf-8
  2 #author:evil
  3 
  4 import os
  5 import time
  6 import json
  7 import requests
  8 
  9 Data_Fail_List=['UNREACHABLE','SKIPPED']
 10 
 11 
 12 def log(host, category,task_id, data):
 13     if type(data) == dict:
 14         if 'verbose_override' in data:
 15             # avoid logging extraneous data from facts
 16             data = 'omitted'
 17         else:
 18             data = data.copy()
 19             invocation = data.pop('invocation', None)
 20             task_status='success'
 21             task_res=data['stdout']
 22             task_id=task_id
 23             task_host=host
 24             payload={'task_status':'success','task_res':task_res,'task_id':task_id,'task_host':task_host}
 25             requests.post("http://10.13.168.153:8000/task_callback/", data=payload)
 26     else:
 27         task_status='fail'
 28         task_res=data
 29         task_id=task_id
 30 "log_plays.py" 118L, 3419C written
 31 [root@localhost callback_plugins]# cat log_plays.py
 32 #coding=utf-8
 33 #author:evil
 34 
 35 import os
 36 import time
 37 import json
 38 import requests 
 39 
 40 Data_Fail_List=['UNREACHABLE','SKIPPED']
 41 
 42 
 43 def log(host, category,task_id, data):
 44     if type(data) == dict:
 45         if 'verbose_override' in data:
 46             # avoid logging extraneous data from facts
 47             data = 'omitted'
 48         else:
 49             data = data.copy()
 50             invocation = data.pop('invocation', None)
 51             task_status='success'
 52             task_res=data['stdout']
 53             task_id=task_id 
 54             task_host=host
 55             payload={'task_status':'success','task_res':task_res,'task_id':task_id,'task_host':task_host}
 56             requests.post("http://10.13.168.153:8000/task_callback/", data=payload)
 57     else:
 58         task_status='fail'
 59         task_res=data
 60         task_id=task_id
 61         task_host=host
 62         payload={'task_status':'fail','task_res':data,'task_id':task_id,'task_host':task_host}
 63         requests.post("http://10.13.168.153:8000/task_callback/", data=payload)
 64         
 65         #path = os.path.join("/export/bjliumeide/log/ansible/hosts", host)
 66         #now = time.strftime(TIME_FORMAT, time.localtime())
 67         #fd = open(path, "a")
 68         #fd.write(MSG_FORMAT % dict(now=now, category=category,task_id=task_id, data=data))
 69         #fd.close()
 70 
 71 class CallbackModule(object):
 72     """
 73     logs playbook results, per host, in /var/log/ansible/hosts
 74     """
 75 
 76     def on_any(self, *args, **kwargs):
 77         pass
 78 
 79     def runner_on_failed(self, host, res, ignore_errors=False):
 80         task_obj=getattr(self,'task',None)
 81         task_name=task_obj.name
 82         if task_obj:
 83             log(host, 'FAILED',task_name,res)
 84 
 85     def runner_on_ok(self, host, res):
 86         task_obj=getattr(self,'task',None)
 87         task_name=task_obj.name
 88         if task_obj:
 89             log(host, 'OK',task_name, res)
 90         else:
 91             pass
 92     def runner_on_skipped(self, host, item=None):
 93         task_obj=getattr(self,'task',None)
 94         task_name=task_obj.name
 95         if task_obj:
 96            log(host, 'SKIPPED',task_name,'...')
 97         else:
 98            pass
 99     def runner_on_unreachable(self, host, res):
100         task_obj=getattr(self,'task',None)
101         if task_obj:
102             log(host,'UNREACHABLE',task_obj.name,res)
103         else:
104             pass
105     def runner_on_no_hosts(self):
106         pass
107 
108     def runner_on_async_poll(self, host, res, jid, clock):
109         pass
110 
111     def runner_on_async_ok(self, host, res, jid):
112         pass
113 
114     def runner_on_async_failed(self, host, res, jid):
115         log(host, 'ASYNC_FAILED', res)
116 
117     def playbook_on_start(self):
118         pass
119 
120     def playbook_on_notify(self, host, handler):
121         pass
122 
123     def playbook_on_no_hosts_matched(self):
124         pass
125 
126     def playbook_on_no_hosts_remaining(self):
127         pass
128 
129     def playbook_on_task_start(self, name, is_conditional):
130         pass
131 
132     def playbook_on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None):
133         pass
134 
135     def playbook_on_setup(self):
136         pass
137 
138     def playbook_on_import_for_host(self, host, imported_file):
139         log(host, 'IMPORTED', imported_file)
140 
141     def playbook_on_not_import_for_host(self, host, missing_file):
142         log(host, 'NOTIMPORTED', missing_file)
143 
144     def playbook_on_play_start(self, name):
145         pass
146 
147     def playbook_on_stats(self, stats):
148         pass

db端接收代碼:

1 def task_callback(request):
2     if request.method=='POST':
3         task_status=request.POST.get('task_status')
4         task_host=request.POST.get('task_host')
5         task_res=request.POST.get('task_res')
6         task_id=request.POST.get('task_id')
7         print(task_status,task_host,task_res,task_id)
8         return  HttpResponse('ok')

在ansible端執行playbook看db端是否接收到:

上面雖然解決了任務id的問題。但是有一個問題還未解決:如果用戶在在執行命令:tail -f 或者 執行的主機出現卡死的情況,這個時候需要設置命令執行回饋的超時的設置。

解決思路:

1、不用playbook的API。使用annsible的API的時候,ansible本身提供一個參數B:

可以解決這個問題。

 1 [root@localhost ansbile]# ansible -i 1.txt  test  -m command -a "top" -u root -k -B 4
 2 SSH password: 
 3 background launch...
 4 
 5 
 6 127.0.0.1 | success >> {
 7     "ansible_job_id": "273366723200.80789", 
 8     "results_file": "/root/.ansible_async/273366723200.80789", 
 9     "started": 1
10 }

 

 但是還出現一個問題:上面執行其實也是采取異步方式盡心執行。結果需要自己查看。不如playbook的API的方便。

2、用ansible的playbook API的時候,我們可以用異步方式執行我們的任務。首先了解在yaml的里的2個參數:

async:表示任務在后台最大執行時間。

poll:表示間隔幾秒去后台查看執行結果。ansible在異步執行任務的時候,執行完畢的之后,會將結果放在執行用戶的家目錄:

1  ls /root/.ansible_async/ 

 該目錄下以jid命令的文件,記錄執行結果。

注意:需要定期清理該目錄下的文件!!!否則iNode容易打滿!!!

 所以采取異步的方式執行我們的任務,不會出現任務后台hang死的情況!

還有一個問題需要解決:hang死可以用異步方式執行。但是心的問題出現:我們自定義的callback插件出現一個問題:

插件代碼:

  1 #coding=utf-8
  2 #author:evil
  3 
  4 import os
  5 import time
  6 import json
  7 import requests 
  8 
  9 Data_Fail_List=['UNREACHABLE','SKIPPED','ASYNC_FAILED']
 10 
 11 
 12 def log(host, category,task_id, data,jid=None):
 13     if type(data) == dict:
 14         if 'verbose_override' in data:
 15             # avoid logging extraneous data from facts
 16             data = 'omitted'
 17         else:
 18             data = data.copy()
 19             invocation = data.pop('invocation', None)
 20             if 'stdout' in data.keys():
 21                 task_res=data['stdout']
 22             else:
 23                 task_res=data
 24             #task_res=json.dumps(data)
 25             task_status=category
 26             task_id=task_id 
 27             task_host=host
 28             jid=jid
 29             payload={'task_status':task_status,'task_res':task_res,'task_id':task_id,'task_host':task_host,'jid':jid}
 30             requests.post("http://10.13.166.117:18000/task_callback/", data=payload)
 31     else:
 32         if data=='ansible_job_id':
 33             exit(0)    
 34         task_status='fail'
 35         task_res=data
 36         task_id=task_id
 37         task_host=host
 38         payload={'task_status':'fail','task_res':data,'task_id':task_id,'task_host':task_host}
 39         requests.post("http://10.13.166.117:18000/task_callback/", data=payload)
 40         
 41 
 42 class CallbackModule(object):
 43     """
 44     logs playbook results, per host, in /var/log/ansible/hosts
 45     """
 46 
 47     def on_any(self, *args, **kwargs):
 48         pass
 49 
 50     def runner_on_failed(self, host, res, ignore_errors=False):
 51         task_obj=getattr(self,'task',None)
 52         task_name=task_obj.name
 53         if task_obj:
 54             log(host, 'FAILED',task_name,res)
 55 
 56     def runner_on_ok(self, host, res):
 57         task_obj=getattr(self,'task',None)
 58         task_name=task_obj.name
 59         if task_obj:
 60             log(host, 'OK',task_name, res)
 61         else:
 62             pass
 63     def runner_on_skipped(self, host, item=None):
 64         task_obj=getattr(self,'task',None)
 65         task_name=task_obj.name
 66         if task_obj:
 67            log(host, 'SKIPPED',task_name,'...')
 68         else:
 69            pass
 70     def runner_on_unreachable(self, host, res):
 71         task_obj=getattr(self,'task',None)
 72         if task_obj:
 73             log(host,'UNREACHABLE',task_obj.name,res)
 74         else:
 75             pass
 76     def runner_on_no_hosts(self):
 77         pass
 78 
 79     def runner_on_async_poll(self, host, res, jid, clock):
 80         pass
 81 
 82     def runner_on_async_ok(self, host, res, jid):
 83         pass
 84 
 85     def runner_on_async_failed(self, host, res, jid):
 86         task_obj=getattr(self,'task',None)
 87         task_name=task_obj.name
 88         if task_obj:
 89             log(host,'ASYNC_FAILED',task_obj.name,res)
 90     def playbook_on_start(self):
 91         pass
 92 
 93     def playbook_on_notify(self, host, handler):
 94         pass
 95 
 96     def playbook_on_no_hosts_matched(self):
 97         pass
 98 
 99     def playbook_on_no_hosts_remaining(self):
100         pass
101 
102     def playbook_on_task_start(self, name, is_conditional):
103         pass
104 
105     def playbook_on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None):
106         pass
107 
108     def playbook_on_setup(self):
109         pass
110 
111     def playbook_on_import_for_host(self, host, imported_file):
112         log(host, 'IMPORTED', imported_file)
113 
114     def playbook_on_not_import_for_host(self, host, missing_file):
115         log(host, 'NOTIMPORTED', missing_file)
116 
117     def playbook_on_play_start(self, name):
118         pass
119 
120     def playbook_on_stats(self, stats):
121         pass

 輸出結果:

1 OK 127.0.0.1 ansible_job_id 58 None
2 fail 172.17.3.11 SSH Error: ssh: connect to host 172.17.3.11 port 22: Connection timed out
3     while connecting to 172.17.3.11:22
4 It is sometimes useful to re-run the command using -vvvv, which prints SSH debug output to help diagnose the issue. 58 None
5 FAILED 127.0.0.1  58 None
6 fail 172.17.3.11 SSH Error: ssh: connect to host 172.17.3.11 port 22: Connection timed out
7     while connecting to 172.17.3.11:22
8 It is sometimes useful to re-run the command using -vvvv, which prints SSH debug output to help diagnose the issue. 58 None
9 ASYNC_FAILED 127.0.0.1  58 None

 yaml文件:

 

接收端api代碼:

1 def task_callback(request):
2     if request.method=='POST':
3         task_status=request.POST.get('task_status')
4         task_host=request.POST.get('task_host')
5         task_res=request.POST.get('task_res')
6         task_id=request.POST.get('task_id')
7         task_jid=request.POST.get('jid')
8         print(task_status,task_host,task_res,task_id,task_jid)
9         return  HttpResponse('ok')

 

看到上面結果一共拉去了3次,每次的結果都不一樣,只有最后的結果:ASYNC_FAILED 127.0.0.1  58 None ,fail 172.17.3.11 SSH Error: ssh: connect to host 172.17.3.11 port 22: Connection timed out

是我們需要的。這樣就需要我們對結果進行篩選。首先需要我們知道ansible的在異步的時候返回的數據是什么?

修改插件代碼:

  1 #coding=utf-8
  2 #author:evil
  3 
  4 import os
  5 import time
  6 import json
  7 import requests 
  8 
  9 Data_Fail_List=['UNREACHABLE','SKIPPED','ASYNC_FAILED']
 10 
 11 
 12 def log(host, category,task_id, data,jid=None):
 13     if type(data) == dict:
 14         if 'verbose_override' in data:
 15             # avoid logging extraneous data from facts
 16             data = 'omitted'
 17         else:
 18             task_res=json.dumps(data)
 19             task_status=category
 20             task_id=task_id 
 21             task_host=host
 22             jid=jid
 23             payload={'task_status':task_status,'task_res':task_res,'task_id':task_id,'task_host':task_host,'jid':jid}
 24             requests.post("http://10.13.166.117:18000/task_callback/", data=payload)
 25     else:
 26         if data=='ansible_job_id':
 27             exit(0)    
 28         task_status='fail'
 29         task_res=data
 30         task_id=task_id
 31         task_host=host
 32         payload={'task_status':'fail','task_res':data,'task_id':task_id,'task_host':task_host}
 33         requests.post("http://10.13.166.117:18000/task_callback/", data=payload)
 34         
 35 
 36 class CallbackModule(object):
 37     """
 38     logs playbook results, per host, in /var/log/ansible/hosts
 39     """
 40 
 41     def on_any(self, *args, **kwargs):
 42         pass
 43 
 44     def runner_on_failed(self, host, res, ignore_errors=False):
 45         task_obj=getattr(self,'task',None)
 46         task_name=task_obj.name
 47         if task_obj:
 48             log(host, 'FAILED',task_name,res)
 49 
 50     def runner_on_ok(self, host, res):
 51         task_obj=getattr(self,'task',None)
 52         task_name=task_obj.name
 53         if task_obj:
 54             log(host, 'OK',task_name, res)
 55         else:
 56             pass
 57     def runner_on_skipped(self, host, item=None):
 58         task_obj=getattr(self,'task',None)
 59         task_name=task_obj.name
 60         if task_obj:
 61            log(host, 'SKIPPED',task_name,'...')
 62         else:
 63            pass
 64     def runner_on_unreachable(self, host, res):
 65         task_obj=getattr(self,'task',None)
 66         if task_obj:
 67             log(host,'UNREACHABLE',task_obj.name,res)
 68         else:
 69             pass
 70     def runner_on_no_hosts(self):
 71         pass
 72 
 73     def runner_on_async_poll(self, host, res, jid, clock):
 74         pass
 75 
 76     def runner_on_async_ok(self, host, res, jid):
 77         pass
 78 
 79     def runner_on_async_failed(self, host, res, jid):
 80         task_obj=getattr(self,'task',None)
 81         task_name=task_obj.name
 82         if task_obj:
 83             log(host,'ASYNC_FAILED',task_obj.name,res)
 84     def playbook_on_start(self):
 85         pass
 86 
 87     def playbook_on_notify(self, host, handler):
 88         pass
 89 
 90     def playbook_on_no_hosts_matched(self):
 91         pass
 92 
 93     def playbook_on_no_hosts_remaining(self):
 94         pass
 95 
 96     def playbook_on_task_start(self, name, is_conditional):
 97         pass
 98 
 99     def playbook_on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None):
100         pass
101 
102     def playbook_on_setup(self):
103         pass
104 
105     def playbook_on_import_for_host(self, host, imported_file):
106         log(host, 'IMPORTED', imported_file)
107 
108     def playbook_on_not_import_for_host(self, host, missing_file):
109         log(host, 'NOTIMPORTED', missing_file)
110 
111     def playbook_on_play_start(self, name):
112         pass
113 
114     def playbook_on_stats(self, stats):
115         pas

 

結果:

1 OK 127.0.0.1 {"started": 1, "invocation": {"module_name": "shell", "module_args": "top"}, "results_file": "/root/.ansible_async/825224993532.82362", "ansible_job_id": "825224993532.82362"} 58 None
2 fail 172.17.3.11 SSH Error: ssh: connect to host 172.17.3.11 port 22: Connection timed out
3     while connecting to 172.17.3.11:22
4 It is sometimes useful to re-run the command using -vvvv, which prints SSH debug output to help diagnose the issue. 58 None
5 FAILED 127.0.0.1 {"cmd": "top", "end": "2017-01-21 23:14:48.666570", "ansible_job_id": "825224993532.82362", "stdout": "", "changed": true, "invocation": {"module_name": "async_status", "module_args": "jid=825224993532.82362"}, "start": "2017-01-21 23:14:48.654472", "finished": 1, "stderr": "\ttop: failed tty get", "rc": 1, "delta": "0:00:00.012098", "warnings": []} 58 None
6 fail 172.17.3.11 SSH Error: ssh: connect to host 172.17.3.11 port 22: Connection timed out
7     while connecting to 172.17.3.11:22
8 It is sometimes useful to re-run the command using -vvvv, which prints SSH debug output to help diagnose the issue. 58 None
9 ASYNC_FAILED 127.0.0.1 {"changed": true, "end": "2017-01-21 23:14:48.666570", "ansible_job_id": "825224993532.82362", "stdout": "", "cmd": "top", "rc": 1, "start": "2017-01-21 23:14:48.654472", "finished": 1, "stderr": "\ttop: failed tty get", "delta": "0:00:00.012098", "invocation": {"module_name": "async_status", "module_args": "jid=825224993532.82362"}, "warnings": []} 58 None

 

可以看得出,返回的結果是一個字典:

1  {"changed": true, "end": "2017-01-21 23:14:48.666570", "ansible_job_id": "825224993532.82362", "stdout": "", "cmd": "top", "rc": 1, "start": "2017-01-21 23:14:48.654472", "finished": 1, "stderr": "\ttop: failed tty get", "delta": "0:00:00.012098", "invocation": {"module_name": "async_status", "module_args": "jid=825224993532.82362"}, "warnings": []}

 

只有這一條是我想要的結果。這個是失敗的任務,成功的任務返回任務執行時間。

修改yaml文件:

 

然后抓取執行結果:

1 OK 127.0.0.1 {"cmd": "sleep 4", "end": "2017-01-21 23:20:20.276296", "ansible_job_id": "139543736001.82612", "stdout": "", "changed": true, "invocation": {"module_name": "async_status", "module_args": "jid=139543736001.82612"}, "start": "2017-01-21 23:20:16.260931", "finished": 1, "stderr": "", "rc": 0, "delta": "0:00:04.015365", "warnings": []}

里面有個key值:delta 這個值 是任務耗時!我們觀察如上返回結果,我們發現最后無論是失敗還是成功,最后的結果:changed:True的狀態。

所以根絕這個我們對插件的代碼進行更改:

  1 # cat log_plays.py
  2 #coding=utf-8
  3 #author:evil
  4 
  5 import os
  6 import time
  7 import json
  8 import requests 
  9 
 10 Data_Fail_List=['UNREACHABLE','SKIPPED','ASYNC_FAILED']
 11 
 12 
 13 def log(host, category,task_id, data,jid=None):
 14     if type(data) == dict:
 15         if 'verbose_override' in data:
 16             # avoid logging extraneous data from facts
 17             data = 'omitted'
 18         else:
 19             data = data.copy()
 20             invocation = data.pop('invocation', None)
 21             if 'changed' in data:
 22                 if data['changed'] and  category=='OK':
 23                     task_res=data['stdout']
 24                 elif data['changed'] and  category=='ASYNC_FAILED':
 25                     task_res=data['stderr']
 26                 else:
 27                     return False 
 28             elif 'stdout' in data.keys():
 29                 task_res=data['stdout']
 30             elif 'started' in data.keys():
 31                 if data['started']==1:
 32                     return False 
 33             else:
 34                 task_res=data
 35             #task_res=json.dumps(data)
 36             task_status=category
 37             task_id=task_id 
 38             task_host=host
 39             jid=jid
 40             payload={'task_status':task_status,'task_res':task_res,'task_id':task_id,'task_host':task_host,'jid':jid}
 41             requests.post("http://10.13.166.117:18000/task_callback/", data=payload)
 42     else:
 43         task_status='fail'
 44         task_res=data
 45         task_id=task_id
 46         task_host=host
 47         payload={'task_status':'fail','task_res':data,'task_id':task_id,'task_host':task_host}
 48         requests.post("http://10.13.166.117:18000/task_callback/", data=payload)
 49         
 50 
 51 class CallbackModule(object):
 52     """
 53     logs playbook results, per host, in /var/log/ansible/hosts
 54     """
 55 
 56     def on_any(self, *args, **kwargs):
 57         pass
 58 
 59     def runner_on_failed(self, host, res, ignore_errors=False):
 60         task_obj=getattr(self,'task',None)
 61         task_name=task_obj.name
 62         if task_obj:
 63             log(host, 'FAILED',task_name,res)
 64 
 65     def runner_on_ok(self, host, res):
 66         task_obj=getattr(self,'task',None)
 67         task_name=task_obj.name
 68         if task_obj:
 69             log(host, 'OK',task_name, res)
 70         else:
 71             pass
 72     def runner_on_skipped(self, host, item=None):
 73         task_obj=getattr(self,'task',None)
 74         task_name=task_obj.name
 75         if task_obj:
 76            log(host, 'SKIPPED',task_name,'...')
 77         else:
 78            pass
 79     def runner_on_unreachable(self, host, res):
 80         task_obj=getattr(self,'task',None)
 81         if task_obj:
 82             log(host,'UNREACHABLE',task_obj.name,res)
 83         else:
 84             pass
 85     def runner_on_no_hosts(self):
 86         pass
 87 
 88     def runner_on_async_poll(self, host, res, jid, clock):
 89         pass
 90 
 91     def runner_on_async_ok(self, host, res, jid):
 92         pass
 93 
 94     def runner_on_async_failed(self, host, res, jid):
 95         task_obj=getattr(self,'task',None)
 96         task_name=task_obj.name
 97         if task_obj:
 98             log(host,'ASYNC_FAILED',task_obj.name,res)
 99     def playbook_on_start(self):
100         pass
101 
102     def playbook_on_notify(self, host, handler):
103         pass
104 
105     def playbook_on_no_hosts_matched(self):
106         pass
107 
108     def playbook_on_no_hosts_remaining(self):
109         pass
110 
111     def playbook_on_task_start(self, name, is_conditional):
112         pass
113 
114     def playbook_on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None):
115         pass
116 
117     def playbook_on_setup(self):
118         pass
119 
120     def playbook_on_import_for_host(self, host, imported_file):
121         log(host, 'IMPORTED', imported_file)
122 
123     def playbook_on_not_import_for_host(self, host, missing_file):
124         log(host, 'NOTIMPORTED', missing_file)
125 
126     def playbook_on_play_start(self, name):
127         pass
128 
129     def playbook_on_stats(self, stats):
130         pass

查看輸出結果:

1 fail 172.17.3.11 SSH Error: ssh: connect to host 172.17.3.11 port 22: Connection timed out
2     while connecting to 172.17.3.11:22
3 It is sometimes useful to re-run the command using -vvvv, which prints SSH debug output to help diagnose the issue. 58 None
4 
5 fail 172.17.3.11 SSH Error: ssh: connect to host 172.17.3.11 port 22: Connection timed out
6     while connecting to 172.17.3.11:22
7 It is sometimes useful to re-run the command using -vvvv, which prints SSH debug output to help diagnose the issue. 58 None
8 
9 ASYNC_FAILED 127.0.0.1     top: failed tty get 58 None

 

可以看得出我們獲取我們想要的結果。接下來我們可以進行數據庫的操作!!!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM