From c1a288c83145d440f6c9608e0b3c955a42139cad Mon Sep 17 00:00:00 2001 From: ahothan Date: Mon, 4 May 2015 07:19:39 -0700 Subject: [PATCH] Backport sshutils and move redis init earlier Backport sshutils from vmtp (use SSHAccess) Init redis earlier before any testing VM is started Change-Id: I7f9302988b1aeb6e46e0483f1f0c82ffac80f21c --- scale/kb_scheduler.py | 14 ++ scale/kloudbuster.py | 4 +- scale/perf_instance.py | 54 +------ scale/sshutils.py | 325 +++++++++++++++++++++++++++++++++++++---- 4 files changed, 322 insertions(+), 75 deletions(-) diff --git a/scale/kb_scheduler.py b/scale/kb_scheduler.py index 0bdc80e..9dd8f0e 100644 --- a/scale/kb_scheduler.py +++ b/scale/kb_scheduler.py @@ -31,6 +31,9 @@ class KBHTTPServerUpException(Exception): class KBHTTPBenchException(Exception): pass +class KBProxyConnectionException(Exception): + pass + class KBScheduler(object): """ Control the testing VMs on the testing cloud @@ -52,19 +55,30 @@ class KBScheduler(object): def setup_redis(self): self.redis_obj = redis.StrictRedis(connection_pool=self.connection_pool) + success = False # Check for connections to redis server for retry in xrange(1, self.config.redis_retry_count + 1): try: self.redis_obj.get("test") + success = True except (redis.exceptions.ConnectionError): LOG.warn("Connecting to redis server... Retry #%d", retry) time.sleep(1) continue break + if not success: + LOG.error("Error: Cannot connect to the Redis server") + raise KBProxyConnectionException() + # Subscribe to message channel self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True) self.pubsub.subscribe(self.report_chan_name) + def dispose(self): + if self.pubsub: + self.pubsub.unsubscribe() + self.pubsub.close() + def send_cmd(self, cmd, client_type, data): message = {'cmd': cmd, 'sender-id': 'kb-master', 'client-type': client_type, 'data': data} diff --git a/scale/kloudbuster.py b/scale/kloudbuster.py index 3274678..e693367 100644 --- a/scale/kloudbuster.py +++ b/scale/kloudbuster.py @@ -199,7 +199,7 @@ class KloudBuster(object): Executes tests serially Support concurrency in fututure """ - + kbscheduler = None try: self.kloud.create_resources() self.kloud.create_vms() @@ -244,6 +244,8 @@ class KloudBuster(object): self.testing_kloud.delete_resources() except Exception: traceback.print_exc() + if kbscheduler: + kbscheduler.dispose() if __name__ == '__main__': diff --git a/scale/perf_instance.py b/scale/perf_instance.py index 18d8fbc..f91a390 100644 --- a/scale/perf_instance.py +++ b/scale/perf_instance.py @@ -13,10 +13,6 @@ # under the License. # -import os -import stat -import subprocess - import sshutils from base_compute import BaseCompute @@ -38,10 +34,10 @@ class PerfInstance(BaseCompute): self.up_flag = False # SSH Configuration - self.ssh_ip = None - self.ssh_user = config.ssh_vm_username + self.ssh_access = None self.ssh = None self.port = None + self.az = None if 'tp_tool' not in config: self.tp_tool = None @@ -79,7 +75,7 @@ class PerfInstance(BaseCompute): tp_tool_res = [] res = {'ip_to': dest_ip} - res['ip_from'] = self.ssh_ip + res['ip_from'] = self.ssh_access.host if label: res['desc'] = label if self.az: @@ -105,12 +101,10 @@ class PerfInstance(BaseCompute): # Setup the ssh connectivity # Returns True if success - def setup_ssh(self, ssh_ip, ssh_user): + def setup_ssh(self, host_access): # used for displaying the source IP in json results - self.ssh_ip = ssh_ip - self.ssh_user = ssh_user - self.ssh = sshutils.SSH(self.ssh_user, self.ssh_ip, - key_filename=self.config.private_key_file, + self.ssh_access = host_access + self.ssh = sshutils.SSH(self.ssh_access, connect_retry_count=self.config.ssh_retry_count) return True @@ -119,44 +113,8 @@ class PerfInstance(BaseCompute): (status, cmd_output, err) = self.ssh.execute(cmd, timeout=timeout) return (status, cmd_output, err) - # scp a file from the local host to the instance - # Returns True if dest file already exists or scp succeeded - # False in case of scp error - def scp(self, tool_name, source, dest): - - # check if the dest file is already present - if self.ssh.stat(dest): - LOG.kbdebug("[%s] Tool %s already present - skipping install" - % (self.vm_name, tool_name)) - return True - # scp over the tool binary - # first chmod the local copy since git does not keep the permission - os.chmod(source, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) - - # scp to the target - scp_opts = '-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no' - scp_cmd = 'scp -i %s %s %s %s@%s:%s' % (self.config.private_key_file, - scp_opts, - source, - self.ssh_user, - self.ssh_ip, - dest) - LOG.kbdebug("[%s] Copying %s to target..." % (self.vm_name, tool_name)) - LOG.kbdebug("[%s] %s" % (self.vm_name, scp_cmd)) - devnull = open(os.devnull, 'wb') - rc = subprocess.call(scp_cmd, shell=True, - stdout=devnull, stderr=devnull) - if rc: - LOG.error("[%s] Copy to target failed rc=%d" % (self.vm_name, rc)) - LOG.error("[%s] %s" % (self.vm_name, scp_cmd)) - return False - return True - # Dispose the ssh session def dispose(self): if self.ssh: self.ssh.close() self.ssh = None - if self.redis_obj: - self.pubsub.unsubscribe() - self.pubsub.close() diff --git a/scale/sshutils.py b/scale/sshutils.py index 7bb6ca3..97331ba 100644 --- a/scale/sshutils.py +++ b/scale/sshutils.py @@ -61,6 +61,7 @@ import re import select import socket import StringIO +import sys import time import log as logging @@ -69,10 +70,6 @@ import scp LOG = logging.getLogger(__name__) - -# from rally.openstack.common.gettextutils import _ - - class SSHError(Exception): pass @@ -80,12 +77,70 @@ class SSHError(Exception): class SSHTimeout(SSHError): pass +# Check IPv4 address syntax - not completely fool proof but will catch +# some invalid formats +def is_ipv4(address): + try: + socket.inet_aton(address) + except socket.error: + return False + return True + +class SSHAccess(object): + ''' + A class to contain all the information needed to access a host + (native or virtual) using SSH + ''' + def __init__(self, arg_value=None): + ''' + decode user@host[:pwd] + 'hugo@1.1.1.1:secret' -> ('hugo', '1.1.1.1', 'secret', None) + 'huggy@2.2.2.2' -> ('huggy', '2.2.2.2', None, None) + None ->(None, None, None, None) + Examples of fatal errors (will call exit): + 'hutch@q.1.1.1' (invalid IP) + '@3.3.3.3' (missing username) + 'hiro@' or 'buggy' (missing host IP) + The error field will be None in case of success or will + contain a string describing the error + ''' + self.username = None + self.host = None + self.password = None + # name of the file that contains the private key + self.private_key_file = None + # this is the private key itself (a long string starting with + # -----BEGIN RSA PRIVATE KEY----- + # used when the private key is not saved in any file + self.private_key = None + self.public_key_file = None + self.port = 22 + self.error = None + + if not arg_value: + return + match = re.search(r'^([^@]+)@([0-9\.]+):?(.*)$', arg_value) + if not match: + self.error = 'Invalid argument: ' + arg_value + return + if not is_ipv4(match.group(2)): + self.error = 'Invalid IPv4 address ' + match.group(2) + return + (self.username, self.host, self.password) = match.groups() + + def copy_from(self, ssh_access): + self.username = ssh_access.username + self.host = ssh_access.host + self.port = ssh_access.port + self.password = ssh_access.password + self.private_key = ssh_access.private_key + self.public_key_file = ssh_access.public_key_file + self.private_key_file = ssh_access.private_key_file class SSH(object): """Represent ssh connection.""" - def __init__(self, user, host, port=22, pkey=None, - key_filename=None, password=None, + def __init__(self, ssh_access, connect_timeout=60, connect_retry_count=30, connect_retry_wait_sec=2): @@ -102,12 +157,11 @@ class SSH(object): :param connect_retry_wait_sec: seconds to wait between retries """ - self.user = user - self.host = host - self.port = port - self.pkey = self._get_pkey(pkey) if pkey else None - self.password = password - self.key_filename = key_filename + self.ssh_access = ssh_access + if ssh_access.private_key: + self.pkey = self._get_pkey(ssh_access.private_key) + else: + self.pkey = None self._client = False self.connect_timeout = connect_timeout self.connect_retry_count = connect_retry_count @@ -118,6 +172,9 @@ class SSH(object): self.__get_distro() def _get_pkey(self, key): + '''Get the binary form of the private key + from the text form + ''' if isinstance(key, basestring): key = StringIO.StringIO(key) errors = [] @@ -135,10 +192,12 @@ class SSH(object): self._client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) for _ in range(self.connect_retry_count): try: - self._client.connect(self.host, username=self.user, - port=self.port, pkey=self.pkey, - key_filename=self.key_filename, - password=self.password, + self._client.connect(self.ssh_access.host, + username=self.ssh_access.username, + port=self.ssh_access.port, + pkey=self.pkey, + key_filename=self.ssh_access.private_key_file, + password=self.ssh_access.password, timeout=self.connect_timeout) return self._client except (paramiko.AuthenticationException, @@ -148,7 +207,7 @@ class SSH(object): time.sleep(self.connect_retry_wait_sec) self._client = None - msg = '[%s] SSH Connection failed after %s attempts' % (self.host, + msg = '[%s] SSH Connection failed after %s attempts' % (self.ssh_access.host, self.connect_retry_count) raise SSHError(msg) @@ -229,7 +288,7 @@ class SSH(object): break if timeout and (time.time() - timeout) > start_time: - args = {'cmd': cmd, 'host': self.host} + args = {'cmd': cmd, 'host': self.ssh_access.host} raise SSHTimeout(('Timeout executing command ' '"%(cmd)s" on host %(host)s') % args) # if e: @@ -273,7 +332,7 @@ class SSH(object): except (socket.error, SSHError): time.sleep(interval) if time.time() > (start_time + timeout): - raise SSHTimeout(('Timeout waiting for "%s"') % self.host) + raise SSHTimeout(('Timeout waiting for "%s"') % self.ssh_access.host) def __extract_property(self, name, input_str): expr = name + r'="?([\w\.]*)"?' @@ -347,7 +406,7 @@ class SSH(object): if int(pkt_loss) < int(pass_threshold): return 1 else: - LOG.error("Ping to %s failed: %s" % (target_ip, cmd_output)) + LOG.error('Ping to %s failed: %s' % (target_ip, cmd_output)) return 0 def get_file_from_host(self, from_path, to_path): @@ -360,7 +419,7 @@ class SSH(object): try: scpcon.get(from_path, to_path) except scp.SCPException as exp: - LOG.error("Send failed: [%s]" % exp) + LOG.error("Send failed: [%s]", exp) return 0 return 1 @@ -374,22 +433,236 @@ class SSH(object): return None return cmd_output + def get_host_os_version(self): + ''' + Identify the host distribution/relase. + ''' + os_release_file = "/etc/os-release" + sys_release_file = "/etc/system-release" + name = "" + version = "" + + if self.stat(os_release_file): + data = self.read_remote_file(os_release_file) + if data is None: + LOG.error("ERROR:Failed to read file %s" % os_release_file) + return None + + for line in data.splitlines(): + mobj = re.match(r'NAME=(.*)', line) + if mobj: + name = mobj.group(1).strip("\"") + + mobj = re.match(r'VERSION_ID=(.*)', line) + if mobj: + version = mobj.group(1).strip("\"") + + os_name = name + " " + version + return os_name + + if self.stat(sys_release_file): + data = self.read_remote_file(sys_release_file) + if data is None: + LOG.error("ERROR:Failed to read file %s" % sys_release_file) + return None + + for line in data.splitlines(): + mobj = re.match(r'Red Hat.*', line) + if mobj: + return mobj.group(0) + + return None + + def check_rpm_package_installed(self, rpm_pkg): + ''' + Given a host and a package name, check if it is installed on the + system. + ''' + check_pkg_cmd = "rpm -qa | grep " + rpm_pkg + + (status, cmd_output, _) = self.execute(check_pkg_cmd) + if status: + return None + + pkg_pattern = ".*" + rpm_pkg + ".*" + rpm_pattern = re.compile(pkg_pattern, re.IGNORECASE) + + for line in cmd_output.splitlines(): + mobj = rpm_pattern.match(line) + if mobj: + return mobj.group(0) + + print "%s pkg installed " % rpm_pkg + + return None + + def get_openstack_release(self, ver_str): + ''' + Get the release series name from the package version + Refer to here for release tables: + https://wiki.openstack.org/wiki/Releases + ''' + ver_table = {"2015.1": "Kilo", + "2014.2": "Juno", + "2014.1": "Icehouse", + "2013.2": "Havana", + "2013.1": "Grizzly", + "2012.2": "Folsom", + "2012.1": "Essex", + "2011.3": "Diablo", + "2011.2": "Cactus", + "2011.1": "Bexar", + "2010.1": "Austin"} + + ver_prefix = re.search(r"20\d\d\.\d", ver_str).group(0) + if ver_prefix in ver_table: + return ver_table[ver_prefix] + else: + return "Unknown" + + def check_openstack_version(self): + ''' + Identify the openstack version running on the controller. + ''' + nova_cmd = "nova-manage --version" + (status, _, err_output) = self.execute(nova_cmd) + + if status: + return "Unknown" + + ver_str = err_output.strip() + release_str = self.get_openstack_release(err_output) + return release_str + " (" + ver_str + ")" + + def get_cpu_info(self): + ''' + Get the CPU info of the controller. + + Note: Here we are assuming the controller node has the exact + hardware as the compute nodes. + ''' + + cmd = 'cat /proc/cpuinfo | grep -m1 "model name"' + (status, std_output, _) = self.execute(cmd) + if status: + return "Unknown" + model_name = re.search(r":\s(.*)", std_output).group(1) + + cmd = 'cat /proc/cpuinfo | grep "model name" | wc -l' + (status, std_output, _) = self.execute(cmd) + if status: + return "Unknown" + cores = std_output.strip() + + return (cores + " * " + model_name) + + def get_nic_name(self, agent_type, encap, internal_iface_dict): + ''' + Get the NIC info of the controller. + + Note: Here we are assuming the controller node has the exact + hardware as the compute nodes. + ''' + + # The internal_ifac_dict is a dictionary contains the mapping between + # hostname and the internal interface name like below: + # {u'hh23-4': u'eth1', u'hh23-5': u'eth1', u'hh23-6': u'eth1'} + + cmd = "hostname" + (status, std_output, _) = self.execute(cmd) + if status: + return "Unknown" + hostname = std_output.strip() + + if hostname in internal_iface_dict: + iface = internal_iface_dict[hostname] + else: + return "Unknown" + + # Figure out which interface is for internal traffic + if 'Linux bridge' in agent_type: + ifname = iface + elif 'Open vSwitch' in agent_type: + if encap == 'vlan': + # [root@hh23-10 ~]# ovs-vsctl list-ports br-inst + # eth1 + # phy-br-inst + cmd = 'ovs-vsctl list-ports ' + iface + ' | grep -E "^[^phy].*"' + (status, std_output, _) = self.execute(cmd) + if status: + return "Unknown" + ifname = std_output.strip() + elif encap == 'vxlan' or encap == 'gre': + # This is complicated. We need to first get the local IP address on + # br-tun, then do a reverse lookup to get the physical interface. + # + # [root@hh23-4 ~]# ip addr show to "23.23.2.14" + # 3: eth1: mtu 1500 qdisc mq state UP qlen 1000 + # inet 23.23.2.14/24 brd 23.23.2.255 scope global eth1 + # valid_lft forever preferred_lft forever + cmd = "ip addr show to " + iface + " | awk -F: '{print $2}'" + (status, std_output, _) = self.execute(cmd) + if status: + return "Unknown" + ifname = std_output.strip() + else: + return "Unknown" + + cmd = 'ethtool -i ' + ifname + ' | grep bus-info' + (status, std_output, _) = self.execute(cmd) + if status: + return "Unknown" + bus_info = re.search(r":\s(.*)", std_output).group(1) + + cmd = 'lspci -s ' + bus_info + (status, std_output, _) = self.execute(cmd) + if status: + return "Unknown" + nic_name = re.search(r"Ethernet controller:\s(.*)", std_output).group(1) + + return (nic_name) + + def get_l2agent_version(self, agent_type): + ''' + Get the L2 agent version of the controller. + + Note: Here we are assuming the controller node has the exact + hardware as the compute nodes. + ''' + if 'Linux bridge' in agent_type: + cmd = "brctl --version | awk -F',' '{print $2}'" + ver_string = "Linux Bridge " + elif 'Open vSwitch' in agent_type: + cmd = "ovs-vsctl --version | awk -F')' '{print $2}'" + ver_string = "OVS " + else: + return "Unknown" + + (status, std_output, _) = self.execute(cmd) + if status: + return "Unknown" + + return ver_string + std_output.strip() + ################################################## # Only invoke the module directly for test purposes. Should be # invoked from pns script. ################################################## def main(): - # ssh = SSH('localadmin', '172.29.87.29', key_filename='./ssh/id_rsa') - ssh = SSH('localadmin', '172.22.191.173', key_filename='./ssh/id_rsa') + # As argument pass the SSH access string, e.g. "localadmin@1.1.1.1:secret" + test_ssh = SSH(SSHAccess(sys.argv[1])) - print 'ID=' + ssh.distro_id - print 'ID_LIKE=' + ssh.distro_id_like - print 'VERSION_ID=' + ssh.distro_version + print 'ID=' + test_ssh.distro_id + print 'ID_LIKE=' + test_ssh.distro_id_like + print 'VERSION_ID=' + test_ssh.distro_version # ssh.wait() # print ssh.pidof('bash') # print ssh.stat('/tmp') + print test_ssh.check_openstack_version() + print test_ssh.get_cpu_info() + print test_ssh.get_l2agent_version("Open vSwitch agent") if __name__ == "__main__": main()