Ansible这个工具在现在运维中用的越来越多,它可以方便的批量执行命令,上传文件,配置playbook初始化一些新机器非常快速方便,当然除了我们命令行使用外,我们还可以通过ansible提供的api接口编写一些自动化脚本工作,在2.0版本之前,调用ansible的api是一件非常简单容易的事,几行代码就可以搞定,而且还非常直观,但到了2.0版本后会发现调用没那么容易了,如果没有python基础,一上来肯定是懵逼的状态,不过别急,今天我们就把跟2.0版本的api调用逐一看一遍,当然很多例子我也是从网上找到,但我都做了小的改动并测试通过没问题了,先来一个简单的调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
#!/usr/bin/env python import os import sys from collections import namedtuple from ansible.parsing.dataloader import DataLoader from ansible.vars import VariableManager from ansible.inventory import Inventory from ansible.executor.playbook_executor import PlaybookExecutor variable_manager = VariableManager() loader = DataLoader() inventory = Inventory(loader=loader, variable_manager=variable_manager, host_list='/etc/ansible/hosts') playbook_path = '/tmp/onedir/main.yml' if not os.path.exists(playbook_path): print '[INFO] The playbook does not exist' sys.exit() Options = namedtuple('Options', ['listtags', 'listtasks', 'listhosts', 'syntax', 'connection','module_path', 'forks', 'remote_user', 'private_key_file', 'ssh_common_arg s', 'ssh_extra_args', 'sftp_extra_args', 'scp_extra_args', 'become', 'become_method', 'become_user', 'verbosity', 'check']) options = Options(listtags=False, listtasks=False, listhosts=False, syntax=False, connection='ssh', module_path=None, forks=100, remote_user='slotlocker', private_key_f ile=None, ssh_common_args=None, ssh_extra_args=None, sftp_extra_args=None, scp_extra_args=None, become=True, become_method=None, become_user='root', verbosity=None, che ck=False) #print options variable_manager.extra_vars = {'hosts': 'port'} # This can accomodate various other command line arguments.` passwords = {} pbex = PlaybookExecutor(playbooks=[playbook_path], inventory=inventory, variable_manager=variable_manager, loader=loader, options=options, passwords=passwords) results = pbex.run() |
这是最简单的方法,它运行的结果跟用ansible-playbook 命令执行的结果显示是一样的。
我们再看一个复杂点的,运行完直接返回json结果的,这个我们都比较喜欢,可以直接解析json,这个是来源网络,我保留原创的注释和姓名,这是一个非常好的例子,从这个代码里能看到Ansible2.0 api的一些变化,首先你要定义自己的callback函数,定义完callback,定义个月任务类,这个任务类里写明然后获取数据,就是你自定义的的callback,自己要写方法把它获取出来,如果用自带的defautl回调函数,就不用,像我们第一个例子,我们没定义回调,也可以运行,但显示的格式就是默认的,这是playbook执行方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# -*- coding:utf8 -*- ''' Created on 2017年1月13日 @author: qiancheng ''' import os import json from collections import namedtuple from ansible.inventory import Inventory from ansible.vars import VariableManager from ansible.parsing.dataloader import DataLoader from ansible.executor.playbook_executor import PlaybookExecutor from ansible.plugins.callback import CallbackBase from ansible.errors import AnsibleParserError class mycallback(CallbackBase): #这里是状态回调,各种成功失败的状态,里面的各种方法其实都是从写于CallbackBase父类里面的,其实还有很多,可以根据需要拿出来用 def __init__(self,*args): super(mycallback,self).__init__(display=None) self.status_ok=json.dumps({}) self.status_fail=json.dumps({}) self.status_unreachable=json.dumps({}) self.status_playbook='' self.status_no_hosts=False self.host_ok = {} self.host_failed={} self.host_unreachable={} def v2_runner_on_ok(self,result): host=result._host.get_name() self.runner_on_ok(host, result._result) #self.status_ok=json.dumps({host:result._result},indent=4) self.host_ok[host] = result def v2_runner_on_failed(self, result, ignore_errors=False): host = result._host.get_name() self.runner_on_failed(host, result._result, ignore_errors) #self.status_fail=json.dumps({host:result._result},indent=4) self.host_failed[host] = result def v2_runner_on_unreachable(self, result): host = result._host.get_name() self.runner_on_unreachable(host, result._result) #self.status_unreachable=json.dumps({host:result._result},indent=4) self.host_unreachable[host] = result def v2_playbook_on_no_hosts_matched(self): self.playbook_on_no_hosts_matched() self.status_no_hosts=True def v2_playbook_on_play_start(self, play): self.playbook_on_play_start(play.name) self.playbook_path=play.name class my_ansible_play(): #这里是ansible运行 #初始化各项参数,大部分都定义好,只有几个参数是必须要传入的 def __init__(self, playbook, extra_vars={}, host_list='/etc/ansible/hosts', connection='ssh', become=False, become_user=None, module_path=None, fork=50, ansible_cfg=None, #os.environ["ANSIBLE_CONFIG"] = None passwords={}, check=False): self.playbook_path=playbook self.passwords=passwords self.extra_vars=extra_vars Options = namedtuple('Options', ['listtags', 'listtasks', 'listhosts', 'syntax', 'connection','module_path', 'forks', 'private_key_file', 'ssh_common_args', 'ssh_extra_args', 'sftp_extra_args', 'scp_extra_args', 'become', 'become_method', 'become_user', 'verbosity', 'check']) self.options = Options(listtags=False, listtasks=False, listhosts=False, syntax=False, connection=connection, module_path=module_path, forks=fork, private_key_file=None, ssh_common_args=None, ssh_extra_args=None, sftp_extra_args=None, scp_extra_args=None, become=become, become_method=None, become_user=become_user, verbosity=None, check=check) if ansible_cfg != None: os.environ["ANSIBLE_CONFIG"] = ansible_cfg self.variable_manager=VariableManager() self.loader=DataLoader() self.inventory=Inventory(loader=self.loader,variable_manager=self.variable_manager,host_list=host_list) #定义运行的方法和返回值 def run(self): complex_msg={} if not os.path.exists(self.playbook_path): code=1000 results={'playbook':self.playbook_path,'msg':self.playbook_path+' playbook is not exist','flag':False} #results=self.playbook_path+'playbook is not existed' #return code,complex_msg,results pbex= PlaybookExecutor(playbooks=[self.playbook_path], inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.options, passwords=self.passwords) self.results_callback=mycallback() #print dir(self.results_callback) print dir(pbex._tqm) pbex._tqm._stdout_callback=self.results_callback try: code=pbex.run() except AnsibleParserError: code=1001 results={'playbook':self.playbook_path,'msg':self.playbook_path+' playbook have syntax error','flag':False} #results='syntax error in '+self.playbook_path #语法错误 return code,results if self.results_callback.status_no_hosts: code=1002 results={'playbook':self.playbook_path,'msg':self.results_callback.status_no_hosts,'flag':False,'executed':False} #results='no host match in '+self.playbook_path return code,results def get_result(self): self.result_all={'success':{},'fail':{},'unreachable':{}} #print result_all #print dir(self.results_callback) for host, result in self.results_callback.host_ok.items(): self.result_all['success'][host] = result._result for host, result in self.results_callback.host_failed.items(): self.result_all['failed'][host] = result._result['msg'] for host, result in self.results_callback.host_unreachable.items(): self.result_all['unreachable'][host]= result._result['msg'] for i in self.result_all['success'].keys(): #print i,self.result_all['success'][i] return self.result_all #print self.result_all['fail'] #print self.result_all['unreachable'] #print self.result_all if __name__ =='__main__': play_book=my_ansible_play('/tmp/onedir/main.yml') play_book.run() data = play_book.get_result() print data |
还有一个例子,是作者进行了一些命令封装,使用起来也非常方便,先创建一个ansible_api.py,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# -*- coding:utf-8 -*- import os import sys import logging from collections import namedtuple from ansible.parsing.dataloader import DataLoader from ansible.vars import VariableManager from ansible.inventory import Inventory from ansible.inventory.group import Group from ansible.inventory.host import Host from ansible.playbook.play import Play from ansible.executor.task_queue_manager import TaskQueueManager from ansible.executor.playbook_executor import PlaybookExecutor from ansible.plugins.callback import CallbackBase logger = logging.basicConfig() class ResultsCollector(CallbackBase): def __init__(self, *args, **kwargs): super(ResultsCollector, self).__init__(*args, **kwargs) self.host_ok = {} self.host_unreachable = {} self.host_failed = {} def v2_runner_on_unreachable(self, result): self.host_unreachable[result._host.get_name()] = result def v2_runner_on_ok(self, result, *args, **kwargs): self.host_ok[result._host.get_name()] = result def v2_runner_on_failed(self, result, *args, **kwargs): self.host_failed[result._host.get_name()] = result class MyInventory(Inventory): """ this is my ansible inventory object. """ def __init__(self, resource, loader, variable_manager): """ resource的数据格式是一个列表字典,比如 { "group1": { "hosts": [{"hostname": "10.0.0.0", "port": "22", "username": "test", "password": "pass"}, ...], "vars": {"var1": value1, "var2": value2, ...} } } 如果你只传入1个列表,这默认该列表内的所有主机属于my_group组,比如 [{"hostname": "10.0.0.0", "port": "22", "username": "test", "password": "pass"}, ...] """ self.resource = resource self.inventory = Inventory(loader=loader, variable_manager=variable_manager, host_list=[]) self.gen_inventory() def my_add_group(self, hosts, groupname, groupvars=None): """ add hosts to a group """ my_group = Group(name=groupname) # if group variables exists, add them to group if groupvars: for key, value in groupvars.iteritems(): my_group.set_variable(key, value) # add hosts to group for host in hosts: # set connection variables hostname = host.get("hostname") hostip = host.get('ip', hostname) hostport = host.get("port") username = host.get("username") password = host.get("password") ssh_key = host.get("ssh_key") my_host = Host(name=hostname, port=hostport) my_host.set_variable('ansible_ssh_host', hostip) my_host.set_variable('ansible_ssh_port', hostport) my_host.set_variable('ansible_ssh_user', username) my_host.set_variable('ansible_ssh_pass', password) my_host.set_variable('ansible_ssh_private_key_file', ssh_key) # set other variables for key, value in host.iteritems(): if key not in ["hostname", "port", "username", "password"]: my_host.set_variable(key, value) # add to group my_group.add_host(my_host) self.inventory.add_group(my_group) def gen_inventory(self): """ add hosts to inventory. """ if isinstance(self.resource, list): self.my_add_group(self.resource, 'default_group') elif isinstance(self.resource, dict): for groupname, hosts_and_vars in self.resource.iteritems(): self.my_add_group(hosts_and_vars.get("hosts"), groupname, hosts_and_vars.get("vars")) class AnsibleAPI(object): """ This is a General object for parallel execute modules. """ def __init__(self, resource, *args, **kwargs): self.resource = resource self.inventory = None self.variable_manager = None self.loader = None self.options = None self.passwords = None self.callback = None self.__initializeData() self.results_raw = {} def __initializeData(self): """ 初始化ansible """ Options = namedtuple('Options', ['connection', 'module_path', 'forks', 'timeout', 'remote_user', 'ask_pass', 'private_key_file', 'ssh_common_args', 'ssh_extra_args', 'sftp_extra_args', 'scp_extra_args', 'become', 'become_method', 'become_user', 'ask_value_pass', 'verbosity', 'check', 'listhosts', 'listtasks', 'listtags', 'syntax']) # initialize needed objects self.variable_manager = VariableManager() self.loader = DataLoader() self.options = Options(connection='smart', module_path='/usr/share/ansible', forks=100, timeout=10, remote_user='root', ask_pass=False, private_key_file=None, ssh_common_args=None, ssh_extra_args=None, sftp_extra_args=None, scp_extra_args=None, become=None, become_method=None, become_user='root', ask_value_pass=False, verbosity=None, check=False, listhosts=False, listtasks=False, listtags=False, syntax=False) self.passwords = dict(sshpass=None, becomepass=None) self.inventory = MyInventory(self.resource, self.loader, self.variable_manager).inventory self.variable_manager.set_inventory(self.inventory) def run(self, host_list, module_name, module_args): """ run module from andible ad-hoc. module_name: ansible module_name module_args: ansible module args """ # create play with tasks play_source = dict( name="Ansible Play", hosts=host_list, gather_facts='no', tasks=[dict(action=dict(module=module_name, args=module_args))] ) play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader) # actually run it tqm = None self.callback = ResultsCollector() try: tqm = TaskQueueManager( inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.options, passwords=self.passwords, ) tqm._stdout_callback = self.callback tqm.run(play) finally: if tqm is not None: tqm.cleanup() def run_playbook(self, host_list, role_name, role_uuid, temp_param): """ run ansible palybook """ try: self.callback = ResultsCollector() filenames = ['' + '/handlers/ansible/v1_0/sudoers.yml'] # playbook的路径 template_file = '' # 模板文件的路径 if not os.path.exists(template_file): sys.exit() extra_vars = {} # 额外的参数 sudoers.yml以及模板中的参数,它对应ansible-playbook test.yml --extra-vars "host='aa' name='cc' " host_list_str = ','.join([item for item in host_list]) extra_vars['host_list'] = host_list_str extra_vars['username'] = role_name extra_vars['template_dir'] = template_file extra_vars['command_list'] = temp_param.get('cmdList') extra_vars['role_uuid'] = 'role-%s' % role_uuid self.variable_manager.extra_vars = extra_vars # actually run it executor = PlaybookExecutor( playbooks=filenames, inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.options, passwords=self.passwords, ) executor._tqm._stdout_callback = self.callback executor.run() except Exception as e: print "error:",e.message def get_result(self): self.results_raw = {'success': {}, 'failed': {}, 'unreachable': {}} for host, result in self.callback.host_ok.items(): self.results_raw['success'][host] = result._result for host, result in self.callback.host_failed.items(): self.results_raw['failed'][host] = result._result.get('msg') or result._result for host, result in self.callback.host_unreachable.items(): self.results_raw['unreachable'][host] = result._result['msg'] return self.results_raw |
再创建一个脚本引用它,这个脚本好处在域动态创建hosts主机:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# -*- coding:utf-8 -*- import json from ansible_api import AnsibleAPI class AnsiInterface(AnsibleAPI): def __init__(self, resource, *args, **kwargs): super(AnsiInterface, self).__init__(resource, *args, **kwargs) @staticmethod def deal_result(info): host_ips = info.get('success').keys() info['success'] = host_ips error_ips = info.get('failed') error_msg = {} for key, value in error_ips.items(): temp = {} temp[key] = value.get('stderr') error_msg.update(temp) info['failed'] = error_msg return json.dumps(info) def copy_file(self, host_list, src=None, dest=None): """ copy file """ module_args = "src=%s dest=%s"%(src, dest) self.run(host_list, 'copy', module_args) result = self.get_result() return self.deal_result(result) def exec_command(self, host_list, cmds): """ commands """ self.run(host_list, 'command', cmds) result = self.get_result() return self.deal_result(result) def exec_script(self, host_list, path): """ 在远程主机执行shell命令或者.sh脚本 """ self.run(host_list, 'shell', path) result = self.get_result() return self.deal_result(result) if __name__ == "__main__": resource = [{"hostname": "X.X.X.X", "port": "29157", "username": "root", "password": "password", "ip": 'X.X.X.X'}, {"hostname": "172.20.3.31", "port": "22", "username": "root", "password": "password", "ip": '172.20.3.31'}] interface = AnsiInterface(resource) #print "copy: ", interface.copy_file(['172.20.3.18', '172.20.3.31'], src='/Users/majing/test1.py', dest='/opt') print "commands: ", interface.exec_command(['x.x.x.x'], 'hostname') #print "shell: ", interface.exec_script(['172.20.3.18', '172.20.3.31'], 'chdir=/home ls') #print "shell: ", interface.exec_script(['172.20.3.18', '172.20.3.31'], 'sh /opt/test.sh') |
另一个封装方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
import os from tempfile import NamedTemporaryFile from ansible.inventory import Inventory from ansible.vars import VariableManager from ansible.parsing.dataloader import DataLoader from ansible.executor import playbook_executor from ansible.utils.display import Display class Options(object): """ Options class to replace Ansible OptParser """ def __init__(self, verbosity=None, inventory=None, listhosts=None, subset=None, module_paths=None, extra_vars=None, forks=None, ask_vault_pass=None, vault_password_files=None, new_vault_password_file=None, output_file=None, tags=None, skip_tags=None, one_line=None, tree=None, ask_sudo_pass=None, ask_su_pass=None, sudo=None, sudo_user=None, become=None, become_method=None, become_user=None, become_ask_pass=None, ask_pass=None, private_key_file=None, remote_user=None, connection=None, timeout=None, ssh_common_args=None, sftp_extra_args=None, scp_extra_args=None, ssh_extra_args=None, poll_interval=None, seconds=None, check=None, syntax=None, diff=None, force_handlers=None, flush_cache=None, listtasks=None, listtags=None, module_path=None): self.verbosity = verbosity self.inventory = inventory self.listhosts = listhosts self.subset = subset self.module_paths = module_paths self.extra_vars = extra_vars self.forks = forks self.ask_vault_pass = ask_vault_pass self.vault_password_files = vault_password_files self.new_vault_password_file = new_vault_password_file self.output_file = output_file self.tags = tags self.skip_tags = skip_tags self.one_line = one_line self.tree = tree self.ask_sudo_pass = ask_sudo_pass self.ask_su_pass = ask_su_pass self.sudo = sudo self.sudo_user = sudo_user self.become = become self.become_method = become_method self.become_user = become_user self.become_ask_pass = become_ask_pass self.ask_pass = ask_pass self.private_key_file = private_key_file self.remote_user = remote_user self.connection = connection self.timeout = timeout self.ssh_common_args = ssh_common_args self.sftp_extra_args = sftp_extra_args self.scp_extra_args = scp_extra_args self.ssh_extra_args = ssh_extra_args self.poll_interval = poll_interval self.seconds = seconds self.check = check self.syntax = syntax self.diff = diff self.force_handlers = force_handlers self.flush_cache = flush_cache self.listtasks = listtasks self.listtags = listtags self.module_path = module_path class Runner(object): def __init__(self, hostnames, playbook, private_key_file, run_data, become_pass, verbosity=0): self.run_data = run_data self.options = Options() self.options.private_key_file = private_key_file self.options.verbosity = verbosity self.options.connection = 'ssh' # Need a connection type "smart" or "ssh" self.options.become = True self.options.become_method = 'sudo' self.options.become_user = 'root' # Set global verbosity self.display = Display() self.display.verbosity = self.options.verbosity # Executor appears to have it's own # verbosity object/setting as well playbook_executor.verbosity = self.options.verbosity # Become Pass Needed if not logging in as user root passwords = {'become_pass': become_pass} # Gets data from YAML/JSON files self.loader = DataLoader() self.loader.set_vault_password(os.environ['VAULT_PASS']) # All the variables from all the various places self.variable_manager = VariableManager() self.variable_manager.extra_vars = self.run_data # Parse hosts, I haven't found a good way to # pass hosts in without using a parsed template :( # (Maybe you know how?) self.hosts = NamedTemporaryFile(delete=False) self.hosts.write("""[run_hosts] %s """ % hostnames) self.hosts.close() # This was my attempt to pass in hosts directly. # # Also Note: In py2.7, "isinstance(foo, str)" is valid for # latin chars only. Luckily, hostnames are # ascii-only, which overlaps latin charset ## if isinstance(hostnames, str): ## hostnames = {"customers": {"hosts": [hostnames]}} # Set inventory, using most of above objects self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=self.hosts.name) self.variable_manager.set_inventory(self.inventory) # Playbook to run. Assumes it is # local to this python file pb_dir = os.path.dirname(__file__) playbook = "%s/%s" % (pb_dir, playbook) # Setup playbook executor, but don't run until run() called self.pbex = playbook_executor.PlaybookExecutor( playbooks=[playbook], inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.options, passwords=passwords) def run(self): # Results of PlaybookExecutor self.pbex.run() stats = self.pbex._tqm._stats # Test if success for record_logs run_success = True hosts = sorted(stats.processed.keys()) for h in hosts: t = stats.summarize(h) if t['unreachable'] > 0 or t['failures'] > 0: run_success = False # Dirty hack to send callback to save logs with data we want # Note that function "record_logs" is one I created and put into # the playbook callback file self.pbex._tqm.send_callback( 'record_logs', user_id=self.run_data['user_id'], success=run_success ) # Remove created temporary files os.remove(self.hosts.name) return stats |
调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
#!/usr/bin/env python from task import Runner # You may want this to run as user root instead # or make this an environmental variable, or # a CLI prompt. Whatever you want! #become_user_password = 'foo-whatever' #run_data: { # 'user_id': 12345, # 'foo': 'bar', # 'baz': 'cux-or-whatever-this-one-is' #} run_data = {'hosts': 'port'} runner = Runner( hostnames='x.x.x.x', playbook='main.yml', private_key_file='/root/.ssh/authorized_keys', run_data=run_data, become_pass=None, verbosity=0, ) stats = runner.run() # Maybe do something with stats here? If you want! #return stats print stats |
这个也来源网络,执行命方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# -*- coding:utf-8 -*- # !/usr/bin/env python # # Author: Shawn.T # Email: shawntai.ds@gmail.com # # this is the Interface package of Ansible2 API # from collections import namedtuple from ansible.parsing.dataloader import DataLoader from ansible.vars import VariableManager from ansible.inventory import Inventory from ansible.playbook.play import Play from ansible.executor.task_queue_manager import TaskQueueManager from tempfile import NamedTemporaryFile import os class AnsibleTask(object): def __init__(self, targetHost): Options = namedtuple( 'Options', [ 'listtags', 'listtasks', 'listhosts', 'syntax', 'connection','module_path', 'forks', 'remote_user', 'private_key_file', 'ssh_common_args', 'ssh_extra_args', 'sftp_extra_args', 'scp_extra_args', 'become', 'become_method', 'become_user', 'verbosity', 'check' ] ) # initialize needed objects self.variable_manager = VariableManager() self.options = Options( listtags=False, listtasks=False, listhosts=False, syntax=False, connection='smart', module_path='/usr/lib/python2.7/site-packages/ansible/modules', forks=100, remote_user='root', private_key_file=None, ssh_common_args=None, ssh_extra_args=None, sftp_extra_args=None, scp_extra_args=None, become=False, become_method=None, become_user='root', verbosity=None, check=False ) self.passwords = dict(vault_pass='secret') self.loader = DataLoader() # create inventory and pass to var manager self.hostsFile = NamedTemporaryFile(delete=False) self.hostsFile.write(targetHost) self.hostsFile.close() self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=self.hostsFile.name) self.variable_manager.set_inventory(self.inventory) def ansiblePlay(self): # create play with tasks args = "ls /" play_source = dict( name = "Ansible Play", hosts = 'port', gather_facts = 'no', tasks = [ dict(action=dict(module='shell', args=args), register='shell_out'), dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))) ] ) play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader) # run it tqm = None try: tqm = TaskQueueManager( inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.options, passwords=self.passwords, stdout_callback='default', ) result = tqm.run(play) except e: print e finally: # print result if tqm is not None: tqm.cleanup() os.remove(self.hostsFile.name) self.inventory.clear_pattern_cache() print result, 'ok' return result if __name__ == '__main__': op = AnsibleTask('x.x.x.x') result = op.ansiblePlay() |
最后一个可以执行命令也可以执行playbook,例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# -*- coding: utf-8 -*- import json from ansible.parsing.dataloader import DataLoader from ansible.vars import VariableManager from ansible.inventory import Inventory from ansible.playbook.play import Play from ansible.executor.task_queue_manager import TaskQueueManager from ansible.executor.playbook_executor import PlaybookExecutor from collections import namedtuple from ansible.plugins import callback_loader from ansible.plugins.callback import CallbackBase import os import logging #loader = DataLoader() #variable_manager = VariableManager() #inventory = Inventory(loader=loader, variable_manager=variable_manager) #inventory = Inventory(loader=loader, variable_manager=variable_manager, host_list='/etc/ansible/hosts') #playbook_path = '/tmp/onedir/main.yml' #variable_manager.set_inventory(inventory) #get result output class mycallback(CallbackBase): #这里是状态回调,各种成功失败的状态,里面的各种方法其实都是从写于CallbackBase父类里面的,其实还有很多,可以根据需要拿出来用 def __init__(self,*args): super(mycallback,self).__init__(display=None) self.status_ok=json.dumps({}) self.status_fail=json.dumps({}) self.status_unreachable=json.dumps({}) self.status_playbook='' self.status_no_hosts=False self.host_ok = {} self.host_failed={} self.host_unreachable={} def v2_runner_on_ok(self,result): host=result._host.get_name() self.runner_on_ok(host, result._result) #self.status_ok=json.dumps({host:result._result},indent=4) self.host_ok[host] = result def v2_runner_on_failed(self, result, ignore_errors=False): host = result._host.get_name() self.runner_on_failed(host, result._result, ignore_errors) #self.status_fail=json.dumps({host:result._result},indent=4) self.host_failed[host] = result def v2_runner_on_unreachable(self, result): host = result._host.get_name() self.runner_on_unreachable(host, result._result) #self.status_unreachable=json.dumps({host:result._result},indent=4) self.host_unreachable[host] = result def v2_playbook_on_no_hosts_matched(self): self.playbook_on_no_hosts_matched() self.status_no_hosts=True def v2_playbook_on_play_start(self, play): self.playbook_on_play_start(play.name) self.playbook_path=play.name #class Options(object): # """ # Options class to replace Ansible OptParser # """ # def __init__(self, verbosity=None, inventory=None, listhosts='/etc/ansible/hosts', subset=None, module_paths=None, extra_vars=None, # forks=None, ask_vault_pass=None, vault_password_files=None, new_vault_password_file=None, # output_file=None, tags=None, skip_tags=None, one_line=None, tree=None, ask_sudo_pass=None, ask_su_pass=None, # sudo=None, sudo_user=None, become=None, become_method=None, become_user=None, become_ask_pass=None, # ask_pass=None, private_key_file=None, remote_user=None, connection=None, timeout=None, ssh_common_args=None, # sftp_extra_args=None, scp_extra_args=None, ssh_extra_args=None, poll_interval=None, seconds=None, check=None, # syntax=None, diff=None, force_handlers=None, flush_cache=None, listtasks=None, listtags=None, module_path=None): # self.verbosity = verbosity # self.inventory = inventory # self.listhosts = listhosts # self.subset = subset # self.module_paths = module_paths # self.extra_vars = extra_vars # self.forks = forks # self.ask_vault_pass = ask_vault_pass # self.vault_password_files = vault_password_files # self.new_vault_password_file = new_vault_password_file # self.output_file = output_file # self.tags = tags # self.skip_tags = skip_tags # self.one_line = one_line # self.tree = tree # self.ask_sudo_pass = ask_sudo_pass # self.ask_su_pass = ask_su_pass # self.sudo = sudo # self.sudo_user = sudo_user # self.become = become # self.become_method = become_method # self.become_user = become_user # self.become_ask_pass = become_ask_pass # self.ask_pass = ask_pass # self.private_key_file = private_key_file # self.remote_user = remote_user # self.connection = connection # self.timeout = timeout # self.ssh_common_args = ssh_common_args # self.sftp_extra_args = sftp_extra_args # self.scp_extra_args = scp_extra_args # self.ssh_extra_args = ssh_extra_args # self.poll_interval = poll_interval # self.seconds = seconds # self.check = check # self.syntax = syntax # self.diff = diff # self.force_handlers = force_handlers # self.flush_cache = flush_cache # self.listtasks = listtasks # self.listtags = listtags # self.module_path = module_path # #options = Options() variable_manager = VariableManager() loader = DataLoader() inventory = Inventory(loader=loader, variable_manager=variable_manager, host_list='/etc/ansible/hosts') playbook_path = '/tmp/onedir/main.yml' if not os.path.exists(playbook_path): print '[INFO] The playbook does not exist' sys.exit() Options = namedtuple('Options', ['listtags', 'listtasks', 'listhosts', 'syntax', 'connection','module_path', 'forks', 'remote_user', 'private_key_file', 'ssh_common_arg s', 'ssh_extra_args', 'sftp_extra_args', 'scp_extra_args', 'become', 'become_method', 'become_user', 'verbosity', 'check']) options = Options(listtags=False, listtasks=False, listhosts=False, syntax=False, connection='ssh', module_path=None, forks=100, remote_user='slotlocker', private_key_f ile=None, ssh_common_args=None, ssh_extra_args=None, sftp_extra_args=None, scp_extra_args=None, become=True, become_method=None, become_user='root', verbosity=None, che ck=False) variable_manager.extra_vars = {'hosts': 'port'} passwords = {} #pbex = PlaybookExecutor(playbooks=[playbook_path], inventory=inventory, variable_manager=variable_manager, loader=loader, options=options, passwords=passwords) #results = pbex.run() def run_adhoc(ip,order): #variable_manager.extra_vars={"ansible_ssh_user":"root" , "ansible_ssh_pass":"passwd"} variable_manager.extra_vars = {'hosts': 'port'} play_source = {"name":"Ansible Ad-Hoc","hosts":"%s"%ip,"gather_facts":"no","tasks":[{"action":{"module":"command","args":"%s"%order}}]} play = Play().load(play_source, variable_manager=variable_manager, loader=loader) tqm = None callback = mycallback() try: tqm = TaskQueueManager( inventory=inventory, variable_manager=variable_manager, loader=loader, options=options, passwords=None, run_tree=False, #stdout_callback='default', ) tqm._stdout_callback = callback result = tqm.run(play) return callback finally: if tqm is not None: tqm.cleanup() def run_playbook(books): results_callback = callback_loader.get('json') playbooks = [books] #variable_manager.extra_vars={"ansible_ssh_user":"root" , "ansible_ssh_pass":"passwd"} variable_manager.extra_vars = {'hosts': 'port'} callback = mycallback() pd = PlaybookExecutor( playbooks=playbooks, inventory=inventory, variable_manager=variable_manager, loader=loader, options=options, passwords=None, ) #print dir(pd) #print type(pd) #print dir(pd._tqm) #pd._tqm._stdout_callback = callback try: result = pd.run() print result #return callback except Exception as e: print e def get_result(self): self.result_all={'success':{},'fail':{},'unreachable':{}} #print result_all #print dir(self.results_callback) for host, result in self.results_callback.host_ok.items(): self.result_all['success'][host] = result._result for host, result in self.results_callback.host_failed.items(): self.result_all['failed'][host] = result._result['msg'] for host, result in self.results_callback.host_unreachable.items(): self.result_all['unreachable'][host]= result._result['msg'] for i in self.result_all['success'].keys(): #print i,self.result_all['success'][i] return self.result_all if __name__ == '__main__': #run_playbook("/tmp/onedir/main.yml") run_adhoc("x.x.x.x", "ifconfig") |
重要点:
1 |
<span class="hljs-class"><span class="hljs-title">TaskQueueManager初始化参数:</span></span> |
- inventory –> 由ansible.inventory模块创建,用于导入inventory文件
- variable_manager –> 由ansible.vars模块创建,用于存储各类变量信息
- loader –> 由ansible.parsing.dataloader模块创建,用于数据解析
- options –> 存放各类配置信息的数据字典
- passwords –> 登录密码,可设置加密信息
- stdout_callback –> 回调函数
1 |
PlaybookExecutor初始化参数跟TaskQueueManager基本一样, |
生效就是初始化下面三个对象(VariableManager、DataLoader、Namedtuple)了,看源码能知道都需要哪些参数。
最后写一下如果要自定义ansible模块,最好用python,如果非要用bash,那就要自己返回Json格式的数据,例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
构造json返回值如下: cat <<EOF { "changed": true, "msg": "OK", "stdout": $(cat *.log | python -c 'import json,sys; print json.dumps(sys.stdin.read())' "stdout_lines": $(cat *.log.list) } |
放到判断里,根据条件返回json格式的数据。