# -*- coding: utf-8 -*- # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright (C) 2013 AT&T Labs Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """ #TODO(changbl) Networks: (use /24 address for now (faster OpenStack deployment), increase to /16 later) eth0, management: inherent interface on each rVM eth1, ops: 10.251.x.x/16 eth2, private: 10.252.x.x/16 eth3, public: 172.31.x.x/16 rVMs eth1 IPs [prefix]-gateway, 10.251.0.1 [prefix]-chefserver, 10.251.0.2 [prefix]-controller(s), 10.251.0.3 [ - 10.251.0.255] # maximum 253 [prefix]-worker-1, 10.251.1.1 [prefix]-worker-2(s), 10.251.1.2 [ - 10.251.255.254] # maximum ~65000 """ from collections import OrderedDict import functools import logging import os import Queue import subprocess import time from novaclient.v1_1.client import Client from oslo.config import cfg from inception.utils import cmd from inception.utils import wrapper LOGGER = logging.getLogger(__name__) orchestrator_opts = [ cfg.StrOpt('prefix', default=None, required=True, short='p', help='unique prefix for node names (no hyphens allowed)'), cfg.IntOpt('num_workers', default=2, short='n', help='number of worker nodes to create'), cfg.BoolOpt('atomic', default=False, help='on error, whether rollback, i.e., auto delete all' ' created virtual resources'), cfg.BoolOpt('parallel', default=False, help='execute Chef-related setup tasks in parallel'), cfg.StrOpt('chef_repo', default='git://github.com/att/inception-chef-repo.git', help='URL of Chef repository'), cfg.StrOpt('chef_repo_branch', default='master', help='name of branch of Chef repo to use'), cfg.BoolOpt('sdn', default=False, help='whether use SDN/OpenFlow and a full-mesh topology, or a ' 'star toplogy with conventioanl L2/L3 network'), cfg.StrOpt('ssh_keyfile', default=None, help='path of extra public key(s) for node access via ssh'), cfg.StrOpt('pool', default='research', help='name of pool for floating IP addresses'), cfg.StrOpt('user', default='ubuntu', help='login id with sudo for all nodes'), cfg.StrOpt('image', default='f3d62d5b-a76b-4997-a579-ff946a606132', help='id of image used to construct nodes (=u1204-130621-gv)'), cfg.StrOpt('chefserver_image', default='8848d4cd-1bdf-4627-ae31-ce9bf61440a4', help='id of image to construct chefserver (=u1204-130716-gvc)'), cfg.IntOpt('flavor', default=3, help='id of machine flavor used for nodes (3=medium)'), cfg.IntOpt('gateway_flavor', default=1, help='id of machine flavor used to construct GW (1=tiny)'), cfg.StrOpt('key_name', default='shared', help='name of public key for node access via ssh'), cfg.ListOpt('security_groups', default=['default', 'ssh'], help='list of security groups (firewall rules) for nodes'), cfg.StrOpt('src_dir', default='../bin/', help='relative source location (to __file__) of various' ' chef-related setup scripts on client'), cfg.StrOpt('dst_dir', default='/home/ubuntu/', help='absolute destination path for chef-related setup scripts' ' on nodes'), cfg.StrOpt('userdata', default='userdata.sh.template', help='bash script run by cloud-init in late boot stage' ' (rc.local-like)'), cfg.IntOpt('timeout', default=999999, help='maximum time (in seconds) to wait for all nodes to be' ' ready [ssh-able + userdata]'), cfg.IntOpt('poll_interval', default=5, help='interval (in seconds) between readiness polls'), ] cmd_opts = [ cfg.BoolOpt('shell', default=False, help='initialize, then drop to embedded IPython shell'), cfg.BoolOpt('cleanup', default=False, help='take down the inception cloud'), ] CONF = cfg.CONF CONF.register_cli_opts(orchestrator_opts) CONF.register_cli_opts(cmd_opts) # concatenation character between prefix and name CONCAT_CHAR = '-' class Orchestrator(object): """ orchestrate all inception cloud stuff """ def __init__(self, prefix, num_workers, atomic, parallel, chef_repo, chef_repo_branch, sdn, ssh_keyfile, pool, user, image, chefserver_image, flavor, gateway_flavor, key_name, security_groups, src_dir, dst_dir, userdata, timeout, poll_interval): """ For doc on each param refer to orchestrator_opts """ ## check args if CONCAT_CHAR in prefix: raise ValueError('"%s" cannot exist in prefix=%r' % (CONCAT_CHAR, prefix)) ## args self.prefix = prefix self.num_workers = num_workers self.atomic = atomic self.parallel = parallel self.chef_repo = chef_repo self.chef_repo_branch = chef_repo_branch self.sdn = sdn self.ssh_keyfile = ssh_keyfile self.pool = pool self.user = user self.image = image self.chefserver_image = chefserver_image self.flavor = flavor self.gateway_flavor = gateway_flavor self.key_name = key_name self.security_groups = security_groups self.src_dir = os.path.join(os.path.abspath( os.path.dirname(__file__)), src_dir) self.dst_dir = os.path.abspath(dst_dir) with open(os.path.join(self.src_dir, userdata), 'r') as fin: self.userdata = fin.read() self.timeout = timeout self.poll_interval = poll_interval # Inject the extra ssh public key if any ssh_keycontent = '' if self.ssh_keyfile: with open(self.ssh_keyfile, 'r') as fin: ssh_keycontent = fin.read() self.userdata = self.userdata % (user, ssh_keycontent) # scripts to run on chefserver, execute one by one (sequence matters) self.chefserver_commands = [] self.chefserver_files = OrderedDict() for filename in ('install_chefserver.sh', 'configure_knife.sh', 'setup_chef_repo.sh'): src_file = os.path.join(self.src_dir, filename) dst_file = os.path.join(self.dst_dir, filename) if filename == 'setup_chef_repo.sh': # add two args to this command command = ("/bin/bash" + " " + dst_file + " " + self.chef_repo + " " + self.chef_repo_branch) else: command = "/bin/bash" + " " + dst_file self.chefserver_commands.append(command) with open(src_file, 'r') as fin: value = fin.read() key = dst_file self.chefserver_files[key] = value ## non-args self.client = Client(os.environ['OS_USERNAME'], os.environ['OS_PASSWORD'], os.environ['OS_TENANT_NAME'], os.environ['OS_AUTH_URL']) self._gateway_id = None self._gateway_ip = None self._gateway_name = None self._chefserver_id = None self._chefserver_ip = None self._chefserver_name = None self._controller_id = None self._controller_ip = None self._controller_name = None self._worker_ids = [] self._worker_ips = [] self._worker_names = [] self._gateway_floating_ip = None def start(self, re_raise=False): """ run the whole process @param re_raise: whether re-raise caught exception, for the purpose of notifying external caller. Default: False """ try: self._check_existence() self._create_servers() self._setup_chefserver() self._checkin_chefserver() self._deploy_network_vxlan() self._deploy_dnsmasq() self._setup_controller() self._setup_workers() LOGGER.info("Your inception cloud '%s' is ready!!!", self.prefix) LOGGER.info("Gateway IP is %s", self._gateway_floating_ip.ip) LOGGER.info("Chef server WebUI is http://%s:4040", self._chefserver_ip) LOGGER.info("OpenStack dashboard is https://%s", self._controller_ip) except Exception: LOGGER.exception("Error in launching inception cloud") if self.atomic: self.cleanup() LOGGER.info("Although there was error in creating your " "inception cloud '%s', resources have been " "successfully cleaned up", self.prefix) if re_raise: raise def _check_existence(self): """ Check whether inception cloud existence based on given self.prefix """ full_prefix = self.prefix + CONCAT_CHAR for server in self.client.servers.list(): if server.name.startswith(full_prefix): raise ValueError('prefix=%s is already used' % self.prefix) def _create_servers(self): """ start all VM servers: gateway, chefserver, controller, and workers, via calling Nova client API """ # launch gateway gateway = self.client.servers.create( name=self.prefix + CONCAT_CHAR + 'gateway', image=self.image, flavor=self.gateway_flavor, key_name=self.key_name, security_groups=self.security_groups, userdata=self.userdata) self._gateway_id = gateway.id LOGGER.info("Creating %s", gateway) # launch chefserver chefserver = self.client.servers.create( name=self.prefix + CONCAT_CHAR + 'chefserver', image=self.chefserver_image, flavor=self.flavor, key_name=self.key_name, security_groups=self.security_groups, userdata=self.userdata, files=self.chefserver_files) self._chefserver_id = chefserver.id LOGGER.info("Creating %s", chefserver) # launch controller controller = self.client.servers.create( name=self.prefix + CONCAT_CHAR + 'controller', image=self.image, flavor=self.flavor, key_name=self.key_name, security_groups=self.security_groups, userdata=self.userdata) self._controller_id = controller.id LOGGER.info("Creating %s", controller) # launch workers for i in xrange(self.num_workers): worker = self.client.servers.create( name=self.prefix + CONCAT_CHAR + 'worker%s' % (i + 1), image=self.image, flavor=self.flavor, key_name=self.key_name, security_groups=self.security_groups, userdata=self.userdata) self._worker_ids.append(worker.id) LOGGER.info("Creating %s", worker) LOGGER.info('wait at most %s seconds for servers to be ready' ' (ssh-able + userdata done)', self.timeout) servers_ready = False begin_time = time.time() while time.time() - begin_time <= self.timeout: try: # get IP addr of servers (self._gateway_ip, self._gateway_name) = self._get_server_info( self._gateway_id) (self._chefserver_ip, self._chefserver_name) = ( self._get_server_info(self._chefserver_id)) (self._controller_ip, self._controller_name) = ( self._get_server_info(self._controller_id)) # clear content upon each time retry self._worker_ips = [] self._worker_names = [] for _id in self._worker_ids: (ipaddr, name) = self._get_server_info(_id) self._worker_ips.append(ipaddr) self._worker_names.append(name) # test ssh-able command = '[ -d /etc/inception ]' cmd.ssh(self.user + "@" + self._gateway_ip, command) cmd.ssh(self.user + "@" + self._chefserver_ip, command) cmd.ssh(self.user + "@" + self._controller_ip, command) for worker_ip in self._worker_ips: cmd.ssh(self.user + "@" + worker_ip, command) # indicate that servers are ready servers_ready = True break except (UnboundLocalError, subprocess.CalledProcessError) as error: LOGGER.info('servers are not all ready, error=%s,' ' sleep %s seconds', error, self.poll_interval) time.sleep(self.poll_interval) continue if not servers_ready: raise RuntimeError("No all servers can be brought up") # create a public IP and associate it to gateway floating_ip = self.client.floating_ips.create(pool=self.pool) self.client.servers.add_floating_ip(self._gateway_id, floating_ip) self._gateway_floating_ip = floating_ip LOGGER.info("Creating and associating %s", floating_ip) def _get_server_info(self, _id): """ get server information (IP, hostname) from server ID @param _id: server ID """ server = self.client.servers.get(_id) # get ipaddress (there is only 1 item in the dict) for key in server.networks: ipaddr = server.networks[key][0] return (ipaddr, server.name) def _setup_chefserver(self): """ execute uploaded scripts to install chef, config knife, upload cookbooks, roles, and environments """ for command in self.chefserver_commands: cmd.ssh(self.user + "@" + self._chefserver_ip, command, screen_output=True) def _checkin_chefserver(self): """ check-in all VMs into chefserver (knife bootstrap), and set their environment to be self.prefix """ funcs = [] ipaddrs = ([self._chefserver_ip, self._gateway_ip, self._controller_ip] + self._worker_ips) hostnames = ([self._chefserver_name, self._gateway_name, self._controller_name] + self._worker_names) for (ipaddr, hostname) in zip(ipaddrs, hostnames): uri = self.user + '@' + self._chefserver_ip command = ('/usr/bin/knife bootstrap %s -x %s -N %s -E %s --sudo' % (ipaddr, self.user, hostname, self.prefix)) func = functools.partial(cmd.ssh, uri, command, screen_output=True, agent_forwarding=True) funcs.append(func) self._execute_funcs(funcs) # run an empty list to make sure attributes are properly propagated self._run_chef_client(ipaddrs) # sleep some time time.sleep(5) def _deploy_network_vxlan(self): """ deploy network-vxlan (recipe) via cookbook openvswitch for all VMs, i.e., build VXLAN tunnels with gateway as layer-2 hub and other VMs as spokes, and assign ip address and netmask """ hostnames = ([self._chefserver_name, self._gateway_name, self._controller_name] + self._worker_names) ipaddrs = ([self._chefserver_ip, self._gateway_ip, self._controller_ip] + self._worker_ips) if self.sdn: self._add_run_list(hostnames, 'recipe[openvswitch::network-vxlan-mesh]') self._run_chef_client(ipaddrs) self._add_run_list(hostnames, 'recipe[openvswitch::openflow-apply]') self._run_chef_client(ipaddrs) self._add_run_list([self._controller_name], 'recipe[openvswitch::sdn-controller]') self._run_chef_client([self._controller_ip]) # sleep some time to let nodes connect to SDN controller time.sleep(5) else: self._add_run_list(hostnames, 'recipe[openvswitch::network-vxlan-star]') self._run_chef_client(ipaddrs) def _deploy_dnsmasq(self): """ deploy dnsmasq (recipe) via cookbook openvswitch for all VMs, i.e., install and config on dnsmasq on gateway node, and point all VMs to gateway as nameserver """ hostnames = ([self._chefserver_name, self._gateway_name, self._controller_name] + self._worker_names) self._add_run_list(hostnames, 'recipe[openvswitch::dnsmasq]') ipaddrs = ([self._chefserver_ip, self._gateway_ip, self._controller_ip] + self._worker_ips) self._run_chef_client(ipaddrs) def _add_run_list(self, hostnames, item): """ for each server, add an item to its run_list @param hostnames: hostnames of specified servers @param item: name of the item (e.g., recipe, role, etc) """ funcs = [] for hostname in hostnames: uri = self.user + '@' + self._chefserver_ip command = "/usr/bin/knife node run_list add %s %s" % ( hostname, item) func = functools.partial(cmd.ssh, uri, command, screen_output=True, agent_forwarding=True) funcs.append(func) self._execute_funcs(funcs) def _run_chef_client(self, ipaddrs): """ for each server in the address list, run chef-client for all specified cookbooks in its run_list @param param: ip addresses of the servers """ funcs = [] for ipaddr in ipaddrs: uri = self.user + '@' + ipaddr command = "sudo chef-client" func = functools.partial(cmd.ssh, uri, command, screen_output=True, agent_forwarding=True) funcs.append(func) self._execute_funcs(funcs) def _execute_funcs(self, funcs): """ Execute functions, whether in parallel (via threads) or sequential. If parallel, exceptions of subthreads will be collected in a shared queue, and an exception will raised in main thread later @param funcs: the functions to be executed """ if not self.parallel: for func in funcs: func() else: exception_queue = Queue.Queue() threads = [] # create and start all threads for func in funcs: thread = wrapper.FuncThread(func, exception_queue) threads.append(thread) thread.start() # wait for all threads to finish for thread in threads: thread.join() # check whether got exception in threads got_exception = not exception_queue.empty() while not exception_queue.empty(): thread_name, func_info, exc = exception_queue.get() LOGGER.error('%s %s %s', thread_name, func_info, exc) if got_exception: raise RuntimeError("One or more subthreads got exception") def _setup_controller(self): """ deploy OpenStack controller(s) via misc cookbooks """ self._add_run_list([self._controller_name], "role[os-dev-mode]") self._add_run_list([self._controller_name], "role[os-controller-combined]") self._run_chef_client([self._controller_ip]) def _setup_workers(self): """ deploy workers via misc cookbooks """ self._add_run_list(self._worker_names, "role[os-dev-mode]") self._add_run_list(self._worker_names, "role[os-worker-combined]") self._run_chef_client(self._worker_ips) def cleanup(self, re_raise=False): """ Clean up the whole inception cloud, based on self.prefix @param re_raise: whether re-raise caught exception, for the purpose of notifying external caller. Default: False """ LOGGER.info("Let's clean up inception cloud '%s'...", self.prefix) ## find out servers info servers = [] gateway = None gateway_ip = None full_prefix = self.prefix + CONCAT_CHAR for server in self.client.servers.list(): if server.name.startswith(full_prefix): servers.append(server) if server.name == self.prefix + CONCAT_CHAR + 'gateway': gateway = server # get ipaddress (there is only 1 item in the dict) for key in gateway.networks: if len(gateway.networks[key]) >= 2: gateway_ip = gateway.networks[key][1] ## try deleting the floating IP of gateway try: for floating_ip in self.client.floating_ips.list(): if floating_ip.ip == gateway_ip: LOGGER.info("Disassociating and releasing %s", floating_ip) self.client.servers.remove_floating_ip(gateway, floating_ip) self.client.floating_ips.delete(floating_ip) except Exception: LOGGER.exception("Error in disassociating/releasing floating IP") if re_raise: raise ## try deleting each server for server in servers: try: LOGGER.info('Deleting %s', server) server.delete() except Exception: LOGGER.exception("Error in deleting server %s", server) if re_raise: raise continue LOGGER.info("Inception cloud '%s' has been cleaned up.", self.prefix)