ansible 的callback插件自定义


  由于ansible在2.0版本的时候,python api发生较大改变,所以一直在使用ansible的1.9版本。

在之前学习的过程中,在执行playbook的时候,在执行shell的命令的时候,由于当前屏幕不输出命令结果,一直认为是ansible不返回命令结果。最近在写自动化平台的时候,需要捕获ansible的playbook的shell结果。

所以找官网和资料研究下。ansible其实提供接口给咱们对执行返回的数据进行重定向。我们可以根据自己的需求,写自己的callback'插件。如下例子:

 1 import os
 2 import time
 3 import json
 4 
 5 TIME_FORMAT="%b %d %Y %H:%M:%S"##日期格式
 6 MSG_FORMAT="%(now)s - %(category)s - %(data)s\n\n"##消息的格式出入字典!
 7 
 8 if not os.path.exists("/var/log/ansible/hosts"):#ansible执行结果存放的目录。
 9     os.makedirs("/var/log/ansible/hosts")
10 
11 def log(host, category, data):##该函数主要是作用,将返回的数据以日志的形式写入文件中。host:表示当前执行的主机IP,category:执行状态比如OK  unreachable、faile等。
12     if type(data) == dict:#判断返回的数据类型。
13         if 'verbose_override' in data:#如果字典的key值中含有关键字:verbose_override表示该信息是ansible返回的facts的元素数据。不需要进行额外记录。
14             # avoid logging extraneous data from facts
15             data = 'omitted'
16         else:
17             data = data.copy()#拷贝正常数据,如果一个字典是不可变的,我们可以进行copy然后进行修改。
18             invocation = data.pop('invocation', None)#invocation的对应的key是当前用户调用的模块以及模块参数。
19             data = json.dumps(data)#进行序列化转化成字符串。
20             if invocation is not None:#如果模块不为空的话,拼凑数据结构。
21                 data = json.dumps(invocation) + " => %s " % data
22 
23     path = os.path.join("/var/log/ansible/hosts", host)#拼接结果文件,以IP为名字的。
24     now = time.strftime(TIME_FORMAT, time.localtime())#时间串。
25     fd = open(path, "a")#写文件。
26     fd.write(MSG_FORMAT % dict(now=now, category=category, data=data))
27     fd.close()
28 
29 class CallbackModule(object):
30     """
31     logs playbook results, per host, in /var/log/ansible/hosts 该类是处理不同的情况下的ansible的产生的结果的进行不同的处理。可以根据自己的需求进行筛选自己的需要的函数。
32     """
33 
34     def on_any(self, *args, **kwargs):
35         pass
36 
37     def runner_on_failed(self, host, res, ignore_errors=False):#执行失败的时候。
38         log(host, 'FAILED', res)
39 
40     def runner_on_ok(self, host, res):#正常的结果。
41         log(host, 'OK', res)
42 
43     def runner_on_skipped(self, host, item=None):#忽略!
44         log(host, 'SKIPPED', '...')
45 
46     def runner_on_unreachable(self, host, res):#主机不可达!
47         log(host, 'UNREACHABLE', res)
48 
49     def runner_on_no_hosts(self):
50         pass
51 
52     def runner_on_async_poll(self, host, res, jid, clock):
53         pass
54 
55     def runner_on_async_ok(self, host, res, jid):
56         pass
57 
58     def runner_on_async_failed(self, host, res, jid):
59         log(host, 'ASYNC_FAILED', res)
60 
61     def playbook_on_start(self):
62         pass
63 
64     def playbook_on_notify(self, host, handler):
65         pass
66 
67     def playbook_on_no_hosts_matched(self):
68         pass
69 
70     def playbook_on_no_hosts_remaining(self):
71         pass
72 
73     def playbook_on_task_start(self, name, is_conditional):
74         pass
75 
76     def playbook_on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None):
77         pass
78 
79     def playbook_on_setup(self):
80         pass
81 
82     def playbook_on_import_for_host(self, host, imported_file):
83         log(host, 'IMPORTED', imported_file)
84 
85     def playbook_on_not_import_for_host(self, host, missing_file):
86         log(host, 'NOTIMPORTED', missing_file)
87 
88     def playbook_on_play_start(self, name):
89         pass
90 
91     def playbook_on_stats(self, stats):
92         pass

 该文件命名为:log_plays.py

进行执行:

yaml文件:

1 $ cat check.yml 
2 ---
3 - name: lmd 
4   hosts: test 
5   remote_user: bjliumeide 
6   tasks:
7      - name: ensure apache is at the latest version
8        shell: w 

 

host文件:

1 [test]
2 172.17.33.75
3 172.17.33.74
4 127.0.0.1

 

去对应的目录查看:

查看到生成对应的以IP为命令的文件。

其中一个文件内容:

类似如上内容。

如上的log_plays.py有个问题:

1         if 'verbose_override' in data:
2             # avoid logging extraneous data from facts
3             data = 'omitted'

 

 返回上面的data中如果有facts的结果的话,我们结果文件中还是有该条记录。对log_plays.py进行修改之后:

 1 import os
 2 import time
 3 import json
 4 
 5 
 6 TIME_FORMAT="%b %d %Y %H:%M:%S"
 7 MSG_FORMAT="%(now)s - %(category)s - %(data)s\n\n"
 8 
 9 if not os.path.exists("/export/bjliumeide/log/ansible/hosts"):
10     os.makedirs("/export/bjliumeide/log/ansible/hosts")
11 
12 def log(host, category, data):
13     if type(data) == dict:
14         if 'verbose_override' in data:
15             # avoid logging extraneous data from facts
16             data = 'omitted'
17         else:
18             data = data.copy()
19             invocation = data.pop('invocation', None)
20             #data = json.dumps(data)
21             #if invocation is not None:
22              #   data = json.dumps(invocation) + " => %s " % data
23             path = os.path.join("/export/bjliumeide/log/ansible/hosts", host)
24             now = time.strftime(TIME_FORMAT, time.localtime())
25             fd = open(path, "a")
26             fd.write(MSG_FORMAT % dict(now=now, category=category, data=data))
27             fd.close()
28     else:##对于返回的结果不是字典的情况进行处理。常常是报错。
29         
30         path = os.path.join("/export/bjliumeide/log/ansible/hosts", host)
31         now = time.strftime(TIME_FORMAT, time.localtime())
32         fd = open(path, "a")
33         fd.write(MSG_FORMAT % dict(now=now, category=category, data=data))
34         fd.close()
35 
36 class CallbackModule(object):
37     """
38     logs playbook results, per host, in /var/log/ansible/hosts
39     """
40 
41     def on_any(self, *args, **kwargs):
42         pass
43 
44     def runner_on_failed(self, host, res, ignore_errors=False):
45         log(host, 'FAILED', res)
46 
47     def runner_on_ok(self, host, res):
48         log(host, 'OK', res)
49 
50     def runner_on_skipped(self, host, item=None):
51         log(host, 'SKIPPED', '...')
52 
53     def runner_on_unreachable(self, host, res):
54         log(host, 'UNREACHABLE', res)
55 
56     def runner_on_no_hosts(self):
57         pass
58 
59     def runner_on_async_poll(self, host, res, jid, clock):
60         pass
61 
62     def runner_on_async_ok(self, host, res, jid):
63         pass
64 
65     def runner_on_async_failed(self, host, res, jid):
66         log(host, 'ASYNC_FAILED', res)
67 
68     def playbook_on_start(self):
69         pass
70 
71     def playbook_on_notify(self, host, handler):
72         pass
73 
74     def playbook_on_no_hosts_matched(self):
75         pass
76 
77     def playbook_on_no_hosts_remaining(self):
78         pass
79 
80     def playbook_on_task_start(self, name, is_conditional):
81         pass
82 
83     def playbook_on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None):
84         pass
85 
86     def playbook_on_setup(self):
87         pass
88 
89     def playbook_on_import_for_host(self, host, imported_file):
90         log(host, 'IMPORTED', imported_file)
91 
92     def playbook_on_not_import_for_host(self, host, missing_file):
93         log(host, 'NOTIMPORTED', missing_file)
94 
95     def playbook_on_play_start(self, name):
96         pass
97 
98     def playbook_on_stats(self, stats):
99         pass

 修改项:

  • facts数据不在写入执行结果文件中。
  • 对于返回的结果不是字典的情况下(报错返回的是字符串),进行分别处理。

 对如上进行修改方便我们进行入库操作。方便前台查询。

接下来进行数据库的操作代替文件操作。

 难点:

我们现在即使改成插入数据库的操作,但是有一个问题:我们无法确认该执行任务 改如何插入数据库中那个任务id对应的任务IP的任务结果内容。经查看官网的插件例子里。发现如下的介绍内容:

context_demo.py

 1 import os
 2 import time
 3 import json
 4 
 5 class CallbackModule(object):
 6     """
 7     This is a very trivial example of how any callback function can get at play and task objects.
 8     play will be 'None' for runner invocations, and task will be None for 'setup' invocations.
 9     """
10 
11     def on_any(self, *args, **kwargs):
12         play = getattr(self, 'play', None)
13         task = getattr(self, 'task', None)
14 print "play = %s, task = %s, args = %s, kwargs = %s" % (play,task,args,kwargs)

xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

log_plays.py内容:

  1 import os
  2 import time
  3 import json
  4 
  5 TIME_FORMAT="%b %d %Y %H:%M:%S"
  6 MSG_FORMAT="%(now)s - %(category)s - %(data)s\n\n"
  7 
  8 if not os.path.exists("/export/bjliumeide/log/ansible/hosts"):
  9     os.makedirs("/export/bjliumeide/log/ansible/hosts")
 10 
 11 def log(host, category, data):
 12     if type(data) == dict:
 13         if 'verbose_override' in data:
 14             # avoid logging extraneous data from facts
 15             #data = 'omitted'
 16             pass
 17         else:
 18             data = data.copy()
 19             invocation = data.pop('invocation', None)
 20             #data = json.dumps(data)
 21             #if invocation is not None:
 22              #   data = json.dumps(invocation) + " => %s " % data
 23         path = os.path.join("/export/bjliumeide/log/ansible/hosts", host)
 24         now = time.strftime(TIME_FORMAT, time.localtime())
 25         fd = open(path, "a")
 26         fd.write(MSG_FORMAT % dict(now=now, category=category, data=data))
 27         fd.close()
 28     else:
 29         
 30         path = os.path.join("/export/bjliumeide/log/ansible/hosts", host)
 31         now = time.strftime(TIME_FORMAT, time.localtime())
 32         fd = open(path, "a")
 33         fd.write(MSG_FORMAT % dict(now=now, category=category, data=data))
 34         fd.close()
 35 
 36 class CallbackModule(object):
 37     """
 38     logs playbook results, per host, in /var/log/ansible/hosts
 39     """
 40 
 41     def on_any(self, *args, **kwargs):
 42         pass
 43 
 44     def runner_on_failed(self, host, res, ignore_errors=False):
 45         log(host, 'FAILED', res)
 46 
 47     def runner_on_ok(self, host, res):
 48         task=getattr(self,'task',None)
 49         log(host,task.name, res)
 50     def runner_on_skipped(self, host, item=None):
 51         log(host, 'SKIPPED', '...')
 52 
 53     def runner_on_unreachable(self, host, res):
 54         task=getattr(self,'task',None)
 55         if task:
 56             log(host,task.name,res)#返回task.name
 57         else:
 58             pass
 59 
 60     def runner_on_no_hosts(self):
 61         pass
 62 
 63     def runner_on_async_poll(self, host, res, jid, clock):
 64         pass
 65 
 66     def runner_on_async_ok(self, host, res, jid):
 67         pass
 68 
 69     def runner_on_async_failed(self, host, res, jid):
 70         log(host, 'ASYNC_FAILED', res)
 71 
 72     def playbook_on_start(self):
 73         pass
 74 
 75     def playbook_on_notify(self, host, handler):
 76         pass
 77 
 78     def playbook_on_no_hosts_matched(self):
 79         pass
 80 
 81     def playbook_on_no_hosts_remaining(self):
 82         pass
 83 
 84     def playbook_on_task_start(self, name, is_conditional):
 85         pass
 86 
 87     def playbook_on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None):
 88         pass
 89 
 90     def playbook_on_setup(self):
 91         pass
 92 
 93     def playbook_on_import_for_host(self, host, imported_file):
 94         log(host, 'IMPORTED', imported_file)
 95 
 96     def playbook_on_not_import_for_host(self, host, missing_file):
 97         log(host, 'NOTIMPORTED', missing_file)
 98 
 99     def playbook_on_play_start(self, name):
100         pass
101 
102     def playbook_on_stats(self, stats):
103         pass

然后我们故意输错密码:

 1 $ ansible-playbook  check.yml  -k
 2 SSH password: 
 3 
 4 PLAY [lmd] ******************************************************************** 
 5 
 6 TASK: [121] ******************************************************************* 
 7 fatal: [172.17.3.11] => Authentication failure.
 8 
 9 FATAL: all hosts have already failed -- aborting
10 
11 PLAY RECAP ******************************************************************** 
12            to retry, use: --limit @/export/bjliumeide/check.retry
13 
14 172.17.3.11                : ok=0    changed=0    unreachable=1    failed=0   

看到任务执行结果:unreachable

在来看下结果文件的内容是否有我们想要的121

这样我就和我们任务task_id关联起来了。原打算将ansible服务器直接写入数据库中。最后还是选择在ansible将数据post到db端,进行入库。

修改后log_play.py

  1 #coding=utf-8
  2 #author:evil
  3 
  4 import os
  5 import time
  6 import json
  7 import requests
  8 
  9 Data_Fail_List=['UNREACHABLE','SKIPPED']
 10 
 11 
 12 def log(host, category,task_id, data):
 13     if type(data) == dict:
 14         if 'verbose_override' in data:
 15             # avoid logging extraneous data from facts
 16             data = 'omitted'
 17         else:
 18             data = data.copy()
 19             invocation = data.pop('invocation', None)
 20             task_status='success'
 21             task_res=data['stdout']
 22             task_id=task_id
 23             task_host=host
 24             payload={'task_status':'success','task_res':task_res,'task_id':task_id,'task_host':task_host}
 25             requests.post("http://10.13.168.153:8000/task_callback/", data=payload)
 26     else:
 27         task_status='fail'
 28         task_res=data
 29         task_id=task_id
 30 "log_plays.py" 118L, 3419C written
 31 [root@localhost callback_plugins]# cat log_plays.py
 32 #coding=utf-8
 33 #author:evil
 34 
 35 import os
 36 import time
 37 import json
 38 import requests 
 39 
 40 Data_Fail_List=['UNREACHABLE','SKIPPED']
 41 
 42 
 43 def log(host, category,task_id, data):
 44     if type(data) == dict:
 45         if 'verbose_override' in data:
 46             # avoid logging extraneous data from facts
 47             data = 'omitted'
 48         else:
 49             data = data.copy()
 50             invocation = data.pop('invocation', None)
 51             task_status='success'
 52             task_res=data['stdout']
 53             task_id=task_id 
 54             task_host=host
 55             payload={'task_status':'success','task_res':task_res,'task_id':task_id,'task_host':task_host}
 56             requests.post("http://10.13.168.153:8000/task_callback/", data=payload)
 57     else:
 58         task_status='fail'
 59         task_res=data
 60         task_id=task_id
 61         task_host=host
 62         payload={'task_status':'fail','task_res':data,'task_id':task_id,'task_host':task_host}
 63         requests.post("http://10.13.168.153:8000/task_callback/", data=payload)
 64         
 65         #path = os.path.join("/export/bjliumeide/log/ansible/hosts", host)
 66         #now = time.strftime(TIME_FORMAT, time.localtime())
 67         #fd = open(path, "a")
 68         #fd.write(MSG_FORMAT % dict(now=now, category=category,task_id=task_id, data=data))
 69         #fd.close()
 70 
 71 class CallbackModule(object):
 72     """
 73     logs playbook results, per host, in /var/log/ansible/hosts
 74     """
 75 
 76     def on_any(self, *args, **kwargs):
 77         pass
 78 
 79     def runner_on_failed(self, host, res, ignore_errors=False):
 80         task_obj=getattr(self,'task',None)
 81         task_name=task_obj.name
 82         if task_obj:
 83             log(host, 'FAILED',task_name,res)
 84 
 85     def runner_on_ok(self, host, res):
 86         task_obj=getattr(self,'task',None)
 87         task_name=task_obj.name
 88         if task_obj:
 89             log(host, 'OK',task_name, res)
 90         else:
 91             pass
 92     def runner_on_skipped(self, host, item=None):
 93         task_obj=getattr(self,'task',None)
 94         task_name=task_obj.name
 95         if task_obj:
 96            log(host, 'SKIPPED',task_name,'...')
 97         else:
 98            pass
 99     def runner_on_unreachable(self, host, res):
100         task_obj=getattr(self,'task',None)
101         if task_obj:
102             log(host,'UNREACHABLE',task_obj.name,res)
103         else:
104             pass
105     def runner_on_no_hosts(self):
106         pass
107 
108     def runner_on_async_poll(self, host, res, jid, clock):
109         pass
110 
111     def runner_on_async_ok(self, host, res, jid):
112         pass
113 
114     def runner_on_async_failed(self, host, res, jid):
115         log(host, 'ASYNC_FAILED', res)
116 
117     def playbook_on_start(self):
118         pass
119 
120     def playbook_on_notify(self, host, handler):
121         pass
122 
123     def playbook_on_no_hosts_matched(self):
124         pass
125 
126     def playbook_on_no_hosts_remaining(self):
127         pass
128 
129     def playbook_on_task_start(self, name, is_conditional):
130         pass
131 
132     def playbook_on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None):
133         pass
134 
135     def playbook_on_setup(self):
136         pass
137 
138     def playbook_on_import_for_host(self, host, imported_file):
139         log(host, 'IMPORTED', imported_file)
140 
141     def playbook_on_not_import_for_host(self, host, missing_file):
142         log(host, 'NOTIMPORTED', missing_file)
143 
144     def playbook_on_play_start(self, name):
145         pass
146 
147     def playbook_on_stats(self, stats):
148         pass

db端接收代码:

1 def task_callback(request):
2     if request.method=='POST':
3         task_status=request.POST.get('task_status')
4         task_host=request.POST.get('task_host')
5         task_res=request.POST.get('task_res')
6         task_id=request.POST.get('task_id')
7         print(task_status,task_host,task_res,task_id)
8         return  HttpResponse('ok')

在ansible端执行playbook看db端是否接收到:

上面虽然解决了任务id的问题。但是有一个问题还未解决:如果用户在在执行命令:tail -f 或者 执行的主机出现卡死的情况,这个时候需要设置命令执行回馈的超时的设置。

解决思路:

1、不用playbook的API。使用annsible的API的时候,ansible本身提供一个参数B:

可以解决这个问题。

 1 [root@localhost ansbile]# ansible -i 1.txt  test  -m command -a "top" -u root -k -B 4
 2 SSH password: 
 3 background launch...
 4 
 5 
 6 127.0.0.1 | success >> {
 7     "ansible_job_id": "273366723200.80789", 
 8     "results_file": "/root/.ansible_async/273366723200.80789", 
 9     "started": 1
10 }

 

 但是还出现一个问题:上面执行其实也是采取异步方式尽心执行。结果需要自己查看。不如playbook的API的方便。

2、用ansible的playbook API的时候,我们可以用异步方式执行我们的任务。首先了解在yaml的里的2个参数:

async:表示任务在后台最大执行时间。

poll:表示间隔几秒去后台查看执行结果。ansible在异步执行任务的时候,执行完毕的之后,会将结果放在执行用户的家目录:

1  ls /root/.ansible_async/ 

 该目录下以jid命令的文件,记录执行结果。

注意:需要定期清理该目录下的文件!!!否则iNode容易打满!!!

 所以采取异步的方式执行我们的任务,不会出现任务后台hang死的情况!

还有一个问题需要解决:hang死可以用异步方式执行。但是心的问题出现:我们自定义的callback插件出现一个问题:

插件代码:

  1 #coding=utf-8
  2 #author:evil
  3 
  4 import os
  5 import time
  6 import json
  7 import requests 
  8 
  9 Data_Fail_List=['UNREACHABLE','SKIPPED','ASYNC_FAILED']
 10 
 11 
 12 def log(host, category,task_id, data,jid=None):
 13     if type(data) == dict:
 14         if 'verbose_override' in data:
 15             # avoid logging extraneous data from facts
 16             data = 'omitted'
 17         else:
 18             data = data.copy()
 19             invocation = data.pop('invocation', None)
 20             if 'stdout' in data.keys():
 21                 task_res=data['stdout']
 22             else:
 23                 task_res=data
 24             #task_res=json.dumps(data)
 25             task_status=category
 26             task_id=task_id 
 27             task_host=host
 28             jid=jid
 29             payload={'task_status':task_status,'task_res':task_res,'task_id':task_id,'task_host':task_host,'jid':jid}
 30             requests.post("http://10.13.166.117:18000/task_callback/", data=payload)
 31     else:
 32         if data=='ansible_job_id':
 33             exit(0)    
 34         task_status='fail'
 35         task_res=data
 36         task_id=task_id
 37         task_host=host
 38         payload={'task_status':'fail','task_res':data,'task_id':task_id,'task_host':task_host}
 39         requests.post("http://10.13.166.117:18000/task_callback/", data=payload)
 40         
 41 
 42 class CallbackModule(object):
 43     """
 44     logs playbook results, per host, in /var/log/ansible/hosts
 45     """
 46 
 47     def on_any(self, *args, **kwargs):
 48         pass
 49 
 50     def runner_on_failed(self, host, res, ignore_errors=False):
 51         task_obj=getattr(self,'task',None)
 52         task_name=task_obj.name
 53         if task_obj:
 54             log(host, 'FAILED',task_name,res)
 55 
 56     def runner_on_ok(self, host, res):
 57         task_obj=getattr(self,'task',None)
 58         task_name=task_obj.name
 59         if task_obj:
 60             log(host, 'OK',task_name, res)
 61         else:
 62             pass
 63     def runner_on_skipped(self, host, item=None):
 64         task_obj=getattr(self,'task',None)
 65         task_name=task_obj.name
 66         if task_obj:
 67            log(host, 'SKIPPED',task_name,'...')
 68         else:
 69            pass
 70     def runner_on_unreachable(self, host, res):
 71         task_obj=getattr(self,'task',None)
 72         if task_obj:
 73             log(host,'UNREACHABLE',task_obj.name,res)
 74         else:
 75             pass
 76     def runner_on_no_hosts(self):
 77         pass
 78 
 79     def runner_on_async_poll(self, host, res, jid, clock):
 80         pass
 81 
 82     def runner_on_async_ok(self, host, res, jid):
 83         pass
 84 
 85     def runner_on_async_failed(self, host, res, jid):
 86         task_obj=getattr(self,'task',None)
 87         task_name=task_obj.name
 88         if task_obj:
 89             log(host,'ASYNC_FAILED',task_obj.name,res)
 90     def playbook_on_start(self):
 91         pass
 92 
 93     def playbook_on_notify(self, host, handler):
 94         pass
 95 
 96     def playbook_on_no_hosts_matched(self):
 97         pass
 98 
 99     def playbook_on_no_hosts_remaining(self):
100         pass
101 
102     def playbook_on_task_start(self, name, is_conditional):
103         pass
104 
105     def playbook_on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None):
106         pass
107 
108     def playbook_on_setup(self):
109         pass
110 
111     def playbook_on_import_for_host(self, host, imported_file):
112         log(host, 'IMPORTED', imported_file)
113 
114     def playbook_on_not_import_for_host(self, host, missing_file):
115         log(host, 'NOTIMPORTED', missing_file)
116 
117     def playbook_on_play_start(self, name):
118         pass
119 
120     def playbook_on_stats(self, stats):
121         pass

 输出结果:

1 OK 127.0.0.1 ansible_job_id 58 None
2 fail 172.17.3.11 SSH Error: ssh: connect to host 172.17.3.11 port 22: Connection timed out
3     while connecting to 172.17.3.11:22
4 It is sometimes useful to re-run the command using -vvvv, which prints SSH debug output to help diagnose the issue. 58 None
5 FAILED 127.0.0.1  58 None
6 fail 172.17.3.11 SSH Error: ssh: connect to host 172.17.3.11 port 22: Connection timed out
7     while connecting to 172.17.3.11:22
8 It is sometimes useful to re-run the command using -vvvv, which prints SSH debug output to help diagnose the issue. 58 None
9 ASYNC_FAILED 127.0.0.1  58 None

 yaml文件:

 

接收端api代码:

1 def task_callback(request):
2     if request.method=='POST':
3         task_status=request.POST.get('task_status')
4         task_host=request.POST.get('task_host')
5         task_res=request.POST.get('task_res')
6         task_id=request.POST.get('task_id')
7         task_jid=request.POST.get('jid')
8         print(task_status,task_host,task_res,task_id,task_jid)
9         return  HttpResponse('ok')

 

看到上面结果一共拉去了3次,每次的结果都不一样,只有最后的结果:ASYNC_FAILED 127.0.0.1  58 None ,fail 172.17.3.11 SSH Error: ssh: connect to host 172.17.3.11 port 22: Connection timed out

是我们需要的。这样就需要我们对结果进行筛选。首先需要我们知道ansible的在异步的时候返回的数据是什么?

修改插件代码:

  1 #coding=utf-8
  2 #author:evil
  3 
  4 import os
  5 import time
  6 import json
  7 import requests 
  8 
  9 Data_Fail_List=['UNREACHABLE','SKIPPED','ASYNC_FAILED']
 10 
 11 
 12 def log(host, category,task_id, data,jid=None):
 13     if type(data) == dict:
 14         if 'verbose_override' in data:
 15             # avoid logging extraneous data from facts
 16             data = 'omitted'
 17         else:
 18             task_res=json.dumps(data)
 19             task_status=category
 20             task_id=task_id 
 21             task_host=host
 22             jid=jid
 23             payload={'task_status':task_status,'task_res':task_res,'task_id':task_id,'task_host':task_host,'jid':jid}
 24             requests.post("http://10.13.166.117:18000/task_callback/", data=payload)
 25     else:
 26         if data=='ansible_job_id':
 27             exit(0)    
 28         task_status='fail'
 29         task_res=data
 30         task_id=task_id
 31         task_host=host
 32         payload={'task_status':'fail','task_res':data,'task_id':task_id,'task_host':task_host}
 33         requests.post("http://10.13.166.117:18000/task_callback/", data=payload)
 34         
 35 
 36 class CallbackModule(object):
 37     """
 38     logs playbook results, per host, in /var/log/ansible/hosts
 39     """
 40 
 41     def on_any(self, *args, **kwargs):
 42         pass
 43 
 44     def runner_on_failed(self, host, res, ignore_errors=False):
 45         task_obj=getattr(self,'task',None)
 46         task_name=task_obj.name
 47         if task_obj:
 48             log(host, 'FAILED',task_name,res)
 49 
 50     def runner_on_ok(self, host, res):
 51         task_obj=getattr(self,'task',None)
 52         task_name=task_obj.name
 53         if task_obj:
 54             log(host, 'OK',task_name, res)
 55         else:
 56             pass
 57     def runner_on_skipped(self, host, item=None):
 58         task_obj=getattr(self,'task',None)
 59         task_name=task_obj.name
 60         if task_obj:
 61            log(host, 'SKIPPED',task_name,'...')
 62         else:
 63            pass
 64     def runner_on_unreachable(self, host, res):
 65         task_obj=getattr(self,'task',None)
 66         if task_obj:
 67             log(host,'UNREACHABLE',task_obj.name,res)
 68         else:
 69             pass
 70     def runner_on_no_hosts(self):
 71         pass
 72 
 73     def runner_on_async_poll(self, host, res, jid, clock):
 74         pass
 75 
 76     def runner_on_async_ok(self, host, res, jid):
 77         pass
 78 
 79     def runner_on_async_failed(self, host, res, jid):
 80         task_obj=getattr(self,'task',None)
 81         task_name=task_obj.name
 82         if task_obj:
 83             log(host,'ASYNC_FAILED',task_obj.name,res)
 84     def playbook_on_start(self):
 85         pass
 86 
 87     def playbook_on_notify(self, host, handler):
 88         pass
 89 
 90     def playbook_on_no_hosts_matched(self):
 91         pass
 92 
 93     def playbook_on_no_hosts_remaining(self):
 94         pass
 95 
 96     def playbook_on_task_start(self, name, is_conditional):
 97         pass
 98 
 99     def playbook_on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None):
100         pass
101 
102     def playbook_on_setup(self):
103         pass
104 
105     def playbook_on_import_for_host(self, host, imported_file):
106         log(host, 'IMPORTED', imported_file)
107 
108     def playbook_on_not_import_for_host(self, host, missing_file):
109         log(host, 'NOTIMPORTED', missing_file)
110 
111     def playbook_on_play_start(self, name):
112         pass
113 
114     def playbook_on_stats(self, stats):
115         pas

 

结果:

1 OK 127.0.0.1 {"started": 1, "invocation": {"module_name": "shell", "module_args": "top"}, "results_file": "/root/.ansible_async/825224993532.82362", "ansible_job_id": "825224993532.82362"} 58 None
2 fail 172.17.3.11 SSH Error: ssh: connect to host 172.17.3.11 port 22: Connection timed out
3     while connecting to 172.17.3.11:22
4 It is sometimes useful to re-run the command using -vvvv, which prints SSH debug output to help diagnose the issue. 58 None
5 FAILED 127.0.0.1 {"cmd": "top", "end": "2017-01-21 23:14:48.666570", "ansible_job_id": "825224993532.82362", "stdout": "", "changed": true, "invocation": {"module_name": "async_status", "module_args": "jid=825224993532.82362"}, "start": "2017-01-21 23:14:48.654472", "finished": 1, "stderr": "\ttop: failed tty get", "rc": 1, "delta": "0:00:00.012098", "warnings": []} 58 None
6 fail 172.17.3.11 SSH Error: ssh: connect to host 172.17.3.11 port 22: Connection timed out
7     while connecting to 172.17.3.11:22
8 It is sometimes useful to re-run the command using -vvvv, which prints SSH debug output to help diagnose the issue. 58 None
9 ASYNC_FAILED 127.0.0.1 {"changed": true, "end": "2017-01-21 23:14:48.666570", "ansible_job_id": "825224993532.82362", "stdout": "", "cmd": "top", "rc": 1, "start": "2017-01-21 23:14:48.654472", "finished": 1, "stderr": "\ttop: failed tty get", "delta": "0:00:00.012098", "invocation": {"module_name": "async_status", "module_args": "jid=825224993532.82362"}, "warnings": []} 58 None

 

可以看得出,返回的结果是一个字典:

1  {"changed": true, "end": "2017-01-21 23:14:48.666570", "ansible_job_id": "825224993532.82362", "stdout": "", "cmd": "top", "rc": 1, "start": "2017-01-21 23:14:48.654472", "finished": 1, "stderr": "\ttop: failed tty get", "delta": "0:00:00.012098", "invocation": {"module_name": "async_status", "module_args": "jid=825224993532.82362"}, "warnings": []}

 

只有这一条是我想要的结果。这个是失败的任务,成功的任务返回任务执行时间。

修改yaml文件:

 

然后抓取执行结果:

1 OK 127.0.0.1 {"cmd": "sleep 4", "end": "2017-01-21 23:20:20.276296", "ansible_job_id": "139543736001.82612", "stdout": "", "changed": true, "invocation": {"module_name": "async_status", "module_args": "jid=139543736001.82612"}, "start": "2017-01-21 23:20:16.260931", "finished": 1, "stderr": "", "rc": 0, "delta": "0:00:04.015365", "warnings": []}

里面有个key值:delta 这个值 是任务耗时!我们观察如上返回结果,我们发现最后无论是失败还是成功,最后的结果:changed:True的状态。

所以根绝这个我们对插件的代码进行更改:

  1 # cat log_plays.py
  2 #coding=utf-8
  3 #author:evil
  4 
  5 import os
  6 import time
  7 import json
  8 import requests 
  9 
 10 Data_Fail_List=['UNREACHABLE','SKIPPED','ASYNC_FAILED']
 11 
 12 
 13 def log(host, category,task_id, data,jid=None):
 14     if type(data) == dict:
 15         if 'verbose_override' in data:
 16             # avoid logging extraneous data from facts
 17             data = 'omitted'
 18         else:
 19             data = data.copy()
 20             invocation = data.pop('invocation', None)
 21             if 'changed' in data:
 22                 if data['changed'] and  category=='OK':
 23                     task_res=data['stdout']
 24                 elif data['changed'] and  category=='ASYNC_FAILED':
 25                     task_res=data['stderr']
 26                 else:
 27                     return False 
 28             elif 'stdout' in data.keys():
 29                 task_res=data['stdout']
 30             elif 'started' in data.keys():
 31                 if data['started']==1:
 32                     return False 
 33             else:
 34                 task_res=data
 35             #task_res=json.dumps(data)
 36             task_status=category
 37             task_id=task_id 
 38             task_host=host
 39             jid=jid
 40             payload={'task_status':task_status,'task_res':task_res,'task_id':task_id,'task_host':task_host,'jid':jid}
 41             requests.post("http://10.13.166.117:18000/task_callback/", data=payload)
 42     else:
 43         task_status='fail'
 44         task_res=data
 45         task_id=task_id
 46         task_host=host
 47         payload={'task_status':'fail','task_res':data,'task_id':task_id,'task_host':task_host}
 48         requests.post("http://10.13.166.117:18000/task_callback/", data=payload)
 49         
 50 
 51 class CallbackModule(object):
 52     """
 53     logs playbook results, per host, in /var/log/ansible/hosts
 54     """
 55 
 56     def on_any(self, *args, **kwargs):
 57         pass
 58 
 59     def runner_on_failed(self, host, res, ignore_errors=False):
 60         task_obj=getattr(self,'task',None)
 61         task_name=task_obj.name
 62         if task_obj:
 63             log(host, 'FAILED',task_name,res)
 64 
 65     def runner_on_ok(self, host, res):
 66         task_obj=getattr(self,'task',None)
 67         task_name=task_obj.name
 68         if task_obj:
 69             log(host, 'OK',task_name, res)
 70         else:
 71             pass
 72     def runner_on_skipped(self, host, item=None):
 73         task_obj=getattr(self,'task',None)
 74         task_name=task_obj.name
 75         if task_obj:
 76            log(host, 'SKIPPED',task_name,'...')
 77         else:
 78            pass
 79     def runner_on_unreachable(self, host, res):
 80         task_obj=getattr(self,'task',None)
 81         if task_obj:
 82             log(host,'UNREACHABLE',task_obj.name,res)
 83         else:
 84             pass
 85     def runner_on_no_hosts(self):
 86         pass
 87 
 88     def runner_on_async_poll(self, host, res, jid, clock):
 89         pass
 90 
 91     def runner_on_async_ok(self, host, res, jid):
 92         pass
 93 
 94     def runner_on_async_failed(self, host, res, jid):
 95         task_obj=getattr(self,'task',None)
 96         task_name=task_obj.name
 97         if task_obj:
 98             log(host,'ASYNC_FAILED',task_obj.name,res)
 99     def playbook_on_start(self):
100         pass
101 
102     def playbook_on_notify(self, host, handler):
103         pass
104 
105     def playbook_on_no_hosts_matched(self):
106         pass
107 
108     def playbook_on_no_hosts_remaining(self):
109         pass
110 
111     def playbook_on_task_start(self, name, is_conditional):
112         pass
113 
114     def playbook_on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None):
115         pass
116 
117     def playbook_on_setup(self):
118         pass
119 
120     def playbook_on_import_for_host(self, host, imported_file):
121         log(host, 'IMPORTED', imported_file)
122 
123     def playbook_on_not_import_for_host(self, host, missing_file):
124         log(host, 'NOTIMPORTED', missing_file)
125 
126     def playbook_on_play_start(self, name):
127         pass
128 
129     def playbook_on_stats(self, stats):
130         pass

查看输出结果:

1 fail 172.17.3.11 SSH Error: ssh: connect to host 172.17.3.11 port 22: Connection timed out
2     while connecting to 172.17.3.11:22
3 It is sometimes useful to re-run the command using -vvvv, which prints SSH debug output to help diagnose the issue. 58 None
4 
5 fail 172.17.3.11 SSH Error: ssh: connect to host 172.17.3.11 port 22: Connection timed out
6     while connecting to 172.17.3.11:22
7 It is sometimes useful to re-run the command using -vvvv, which prints SSH debug output to help diagnose the issue. 58 None
8 
9 ASYNC_FAILED 127.0.0.1     top: failed tty get 58 None

 

可以看得出我们获取我们想要的结果。接下来我们可以进行数据库的操作!!!

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM