Backport sshutils and move redis init earlier

Backport sshutils from vmtp (use SSHAccess)
Init redis earlier before any testing VM is started

Change-Id: I7f9302988b1aeb6e46e0483f1f0c82ffac80f21c
This commit is contained in:
ahothan 2015-05-04 07:19:39 -07:00
parent bb8f74d48f
commit c1a288c831
4 changed files with 322 additions and 75 deletions

View File

@ -31,6 +31,9 @@ class KBHTTPServerUpException(Exception):
class KBHTTPBenchException(Exception):
pass
class KBProxyConnectionException(Exception):
pass
class KBScheduler(object):
"""
Control the testing VMs on the testing cloud
@ -52,19 +55,30 @@ class KBScheduler(object):
def setup_redis(self):
self.redis_obj = redis.StrictRedis(connection_pool=self.connection_pool)
success = False
# Check for connections to redis server
for retry in xrange(1, self.config.redis_retry_count + 1):
try:
self.redis_obj.get("test")
success = True
except (redis.exceptions.ConnectionError):
LOG.warn("Connecting to redis server... Retry #%d", retry)
time.sleep(1)
continue
break
if not success:
LOG.error("Error: Cannot connect to the Redis server")
raise KBProxyConnectionException()
# Subscribe to message channel
self.pubsub = self.redis_obj.pubsub(ignore_subscribe_messages=True)
self.pubsub.subscribe(self.report_chan_name)
def dispose(self):
if self.pubsub:
self.pubsub.unsubscribe()
self.pubsub.close()
def send_cmd(self, cmd, client_type, data):
message = {'cmd': cmd, 'sender-id': 'kb-master',
'client-type': client_type, 'data': data}

View File

@ -199,7 +199,7 @@ class KloudBuster(object):
Executes tests serially
Support concurrency in fututure
"""
kbscheduler = None
try:
self.kloud.create_resources()
self.kloud.create_vms()
@ -244,6 +244,8 @@ class KloudBuster(object):
self.testing_kloud.delete_resources()
except Exception:
traceback.print_exc()
if kbscheduler:
kbscheduler.dispose()
if __name__ == '__main__':

View File

@ -13,10 +13,6 @@
# under the License.
#
import os
import stat
import subprocess
import sshutils
from base_compute import BaseCompute
@ -38,10 +34,10 @@ class PerfInstance(BaseCompute):
self.up_flag = False
# SSH Configuration
self.ssh_ip = None
self.ssh_user = config.ssh_vm_username
self.ssh_access = None
self.ssh = None
self.port = None
self.az = None
if 'tp_tool' not in config:
self.tp_tool = None
@ -79,7 +75,7 @@ class PerfInstance(BaseCompute):
tp_tool_res = []
res = {'ip_to': dest_ip}
res['ip_from'] = self.ssh_ip
res['ip_from'] = self.ssh_access.host
if label:
res['desc'] = label
if self.az:
@ -105,12 +101,10 @@ class PerfInstance(BaseCompute):
# Setup the ssh connectivity
# Returns True if success
def setup_ssh(self, ssh_ip, ssh_user):
def setup_ssh(self, host_access):
# used for displaying the source IP in json results
self.ssh_ip = ssh_ip
self.ssh_user = ssh_user
self.ssh = sshutils.SSH(self.ssh_user, self.ssh_ip,
key_filename=self.config.private_key_file,
self.ssh_access = host_access
self.ssh = sshutils.SSH(self.ssh_access,
connect_retry_count=self.config.ssh_retry_count)
return True
@ -119,44 +113,8 @@ class PerfInstance(BaseCompute):
(status, cmd_output, err) = self.ssh.execute(cmd, timeout=timeout)
return (status, cmd_output, err)
# scp a file from the local host to the instance
# Returns True if dest file already exists or scp succeeded
# False in case of scp error
def scp(self, tool_name, source, dest):
# check if the dest file is already present
if self.ssh.stat(dest):
LOG.kbdebug("[%s] Tool %s already present - skipping install"
% (self.vm_name, tool_name))
return True
# scp over the tool binary
# first chmod the local copy since git does not keep the permission
os.chmod(source, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
# scp to the target
scp_opts = '-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no'
scp_cmd = 'scp -i %s %s %s %s@%s:%s' % (self.config.private_key_file,
scp_opts,
source,
self.ssh_user,
self.ssh_ip,
dest)
LOG.kbdebug("[%s] Copying %s to target..." % (self.vm_name, tool_name))
LOG.kbdebug("[%s] %s" % (self.vm_name, scp_cmd))
devnull = open(os.devnull, 'wb')
rc = subprocess.call(scp_cmd, shell=True,
stdout=devnull, stderr=devnull)
if rc:
LOG.error("[%s] Copy to target failed rc=%d" % (self.vm_name, rc))
LOG.error("[%s] %s" % (self.vm_name, scp_cmd))
return False
return True
# Dispose the ssh session
def dispose(self):
if self.ssh:
self.ssh.close()
self.ssh = None
if self.redis_obj:
self.pubsub.unsubscribe()
self.pubsub.close()

View File

@ -61,6 +61,7 @@ import re
import select
import socket
import StringIO
import sys
import time
import log as logging
@ -69,10 +70,6 @@ import scp
LOG = logging.getLogger(__name__)
# from rally.openstack.common.gettextutils import _
class SSHError(Exception):
pass
@ -80,12 +77,70 @@ class SSHError(Exception):
class SSHTimeout(SSHError):
pass
# Check IPv4 address syntax - not completely fool proof but will catch
# some invalid formats
def is_ipv4(address):
try:
socket.inet_aton(address)
except socket.error:
return False
return True
class SSHAccess(object):
'''
A class to contain all the information needed to access a host
(native or virtual) using SSH
'''
def __init__(self, arg_value=None):
'''
decode user@host[:pwd]
'hugo@1.1.1.1:secret' -> ('hugo', '1.1.1.1', 'secret', None)
'huggy@2.2.2.2' -> ('huggy', '2.2.2.2', None, None)
None ->(None, None, None, None)
Examples of fatal errors (will call exit):
'hutch@q.1.1.1' (invalid IP)
'@3.3.3.3' (missing username)
'hiro@' or 'buggy' (missing host IP)
The error field will be None in case of success or will
contain a string describing the error
'''
self.username = None
self.host = None
self.password = None
# name of the file that contains the private key
self.private_key_file = None
# this is the private key itself (a long string starting with
# -----BEGIN RSA PRIVATE KEY-----
# used when the private key is not saved in any file
self.private_key = None
self.public_key_file = None
self.port = 22
self.error = None
if not arg_value:
return
match = re.search(r'^([^@]+)@([0-9\.]+):?(.*)$', arg_value)
if not match:
self.error = 'Invalid argument: ' + arg_value
return
if not is_ipv4(match.group(2)):
self.error = 'Invalid IPv4 address ' + match.group(2)
return
(self.username, self.host, self.password) = match.groups()
def copy_from(self, ssh_access):
self.username = ssh_access.username
self.host = ssh_access.host
self.port = ssh_access.port
self.password = ssh_access.password
self.private_key = ssh_access.private_key
self.public_key_file = ssh_access.public_key_file
self.private_key_file = ssh_access.private_key_file
class SSH(object):
"""Represent ssh connection."""
def __init__(self, user, host, port=22, pkey=None,
key_filename=None, password=None,
def __init__(self, ssh_access,
connect_timeout=60,
connect_retry_count=30,
connect_retry_wait_sec=2):
@ -102,12 +157,11 @@ class SSH(object):
:param connect_retry_wait_sec: seconds to wait between retries
"""
self.user = user
self.host = host
self.port = port
self.pkey = self._get_pkey(pkey) if pkey else None
self.password = password
self.key_filename = key_filename
self.ssh_access = ssh_access
if ssh_access.private_key:
self.pkey = self._get_pkey(ssh_access.private_key)
else:
self.pkey = None
self._client = False
self.connect_timeout = connect_timeout
self.connect_retry_count = connect_retry_count
@ -118,6 +172,9 @@ class SSH(object):
self.__get_distro()
def _get_pkey(self, key):
'''Get the binary form of the private key
from the text form
'''
if isinstance(key, basestring):
key = StringIO.StringIO(key)
errors = []
@ -135,10 +192,12 @@ class SSH(object):
self._client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
for _ in range(self.connect_retry_count):
try:
self._client.connect(self.host, username=self.user,
port=self.port, pkey=self.pkey,
key_filename=self.key_filename,
password=self.password,
self._client.connect(self.ssh_access.host,
username=self.ssh_access.username,
port=self.ssh_access.port,
pkey=self.pkey,
key_filename=self.ssh_access.private_key_file,
password=self.ssh_access.password,
timeout=self.connect_timeout)
return self._client
except (paramiko.AuthenticationException,
@ -148,7 +207,7 @@ class SSH(object):
time.sleep(self.connect_retry_wait_sec)
self._client = None
msg = '[%s] SSH Connection failed after %s attempts' % (self.host,
msg = '[%s] SSH Connection failed after %s attempts' % (self.ssh_access.host,
self.connect_retry_count)
raise SSHError(msg)
@ -229,7 +288,7 @@ class SSH(object):
break
if timeout and (time.time() - timeout) > start_time:
args = {'cmd': cmd, 'host': self.host}
args = {'cmd': cmd, 'host': self.ssh_access.host}
raise SSHTimeout(('Timeout executing command '
'"%(cmd)s" on host %(host)s') % args)
# if e:
@ -273,7 +332,7 @@ class SSH(object):
except (socket.error, SSHError):
time.sleep(interval)
if time.time() > (start_time + timeout):
raise SSHTimeout(('Timeout waiting for "%s"') % self.host)
raise SSHTimeout(('Timeout waiting for "%s"') % self.ssh_access.host)
def __extract_property(self, name, input_str):
expr = name + r'="?([\w\.]*)"?'
@ -347,7 +406,7 @@ class SSH(object):
if int(pkt_loss) < int(pass_threshold):
return 1
else:
LOG.error("Ping to %s failed: %s" % (target_ip, cmd_output))
LOG.error('Ping to %s failed: %s' % (target_ip, cmd_output))
return 0
def get_file_from_host(self, from_path, to_path):
@ -360,7 +419,7 @@ class SSH(object):
try:
scpcon.get(from_path, to_path)
except scp.SCPException as exp:
LOG.error("Send failed: [%s]" % exp)
LOG.error("Send failed: [%s]", exp)
return 0
return 1
@ -374,22 +433,236 @@ class SSH(object):
return None
return cmd_output
def get_host_os_version(self):
'''
Identify the host distribution/relase.
'''
os_release_file = "/etc/os-release"
sys_release_file = "/etc/system-release"
name = ""
version = ""
if self.stat(os_release_file):
data = self.read_remote_file(os_release_file)
if data is None:
LOG.error("ERROR:Failed to read file %s" % os_release_file)
return None
for line in data.splitlines():
mobj = re.match(r'NAME=(.*)', line)
if mobj:
name = mobj.group(1).strip("\"")
mobj = re.match(r'VERSION_ID=(.*)', line)
if mobj:
version = mobj.group(1).strip("\"")
os_name = name + " " + version
return os_name
if self.stat(sys_release_file):
data = self.read_remote_file(sys_release_file)
if data is None:
LOG.error("ERROR:Failed to read file %s" % sys_release_file)
return None
for line in data.splitlines():
mobj = re.match(r'Red Hat.*', line)
if mobj:
return mobj.group(0)
return None
def check_rpm_package_installed(self, rpm_pkg):
'''
Given a host and a package name, check if it is installed on the
system.
'''
check_pkg_cmd = "rpm -qa | grep " + rpm_pkg
(status, cmd_output, _) = self.execute(check_pkg_cmd)
if status:
return None
pkg_pattern = ".*" + rpm_pkg + ".*"
rpm_pattern = re.compile(pkg_pattern, re.IGNORECASE)
for line in cmd_output.splitlines():
mobj = rpm_pattern.match(line)
if mobj:
return mobj.group(0)
print "%s pkg installed " % rpm_pkg
return None
def get_openstack_release(self, ver_str):
'''
Get the release series name from the package version
Refer to here for release tables:
https://wiki.openstack.org/wiki/Releases
'''
ver_table = {"2015.1": "Kilo",
"2014.2": "Juno",
"2014.1": "Icehouse",
"2013.2": "Havana",
"2013.1": "Grizzly",
"2012.2": "Folsom",
"2012.1": "Essex",
"2011.3": "Diablo",
"2011.2": "Cactus",
"2011.1": "Bexar",
"2010.1": "Austin"}
ver_prefix = re.search(r"20\d\d\.\d", ver_str).group(0)
if ver_prefix in ver_table:
return ver_table[ver_prefix]
else:
return "Unknown"
def check_openstack_version(self):
'''
Identify the openstack version running on the controller.
'''
nova_cmd = "nova-manage --version"
(status, _, err_output) = self.execute(nova_cmd)
if status:
return "Unknown"
ver_str = err_output.strip()
release_str = self.get_openstack_release(err_output)
return release_str + " (" + ver_str + ")"
def get_cpu_info(self):
'''
Get the CPU info of the controller.
Note: Here we are assuming the controller node has the exact
hardware as the compute nodes.
'''
cmd = 'cat /proc/cpuinfo | grep -m1 "model name"'
(status, std_output, _) = self.execute(cmd)
if status:
return "Unknown"
model_name = re.search(r":\s(.*)", std_output).group(1)
cmd = 'cat /proc/cpuinfo | grep "model name" | wc -l'
(status, std_output, _) = self.execute(cmd)
if status:
return "Unknown"
cores = std_output.strip()
return (cores + " * " + model_name)
def get_nic_name(self, agent_type, encap, internal_iface_dict):
'''
Get the NIC info of the controller.
Note: Here we are assuming the controller node has the exact
hardware as the compute nodes.
'''
# The internal_ifac_dict is a dictionary contains the mapping between
# hostname and the internal interface name like below:
# {u'hh23-4': u'eth1', u'hh23-5': u'eth1', u'hh23-6': u'eth1'}
cmd = "hostname"
(status, std_output, _) = self.execute(cmd)
if status:
return "Unknown"
hostname = std_output.strip()
if hostname in internal_iface_dict:
iface = internal_iface_dict[hostname]
else:
return "Unknown"
# Figure out which interface is for internal traffic
if 'Linux bridge' in agent_type:
ifname = iface
elif 'Open vSwitch' in agent_type:
if encap == 'vlan':
# [root@hh23-10 ~]# ovs-vsctl list-ports br-inst
# eth1
# phy-br-inst
cmd = 'ovs-vsctl list-ports ' + iface + ' | grep -E "^[^phy].*"'
(status, std_output, _) = self.execute(cmd)
if status:
return "Unknown"
ifname = std_output.strip()
elif encap == 'vxlan' or encap == 'gre':
# This is complicated. We need to first get the local IP address on
# br-tun, then do a reverse lookup to get the physical interface.
#
# [root@hh23-4 ~]# ip addr show to "23.23.2.14"
# 3: eth1: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000
# inet 23.23.2.14/24 brd 23.23.2.255 scope global eth1
# valid_lft forever preferred_lft forever
cmd = "ip addr show to " + iface + " | awk -F: '{print $2}'"
(status, std_output, _) = self.execute(cmd)
if status:
return "Unknown"
ifname = std_output.strip()
else:
return "Unknown"
cmd = 'ethtool -i ' + ifname + ' | grep bus-info'
(status, std_output, _) = self.execute(cmd)
if status:
return "Unknown"
bus_info = re.search(r":\s(.*)", std_output).group(1)
cmd = 'lspci -s ' + bus_info
(status, std_output, _) = self.execute(cmd)
if status:
return "Unknown"
nic_name = re.search(r"Ethernet controller:\s(.*)", std_output).group(1)
return (nic_name)
def get_l2agent_version(self, agent_type):
'''
Get the L2 agent version of the controller.
Note: Here we are assuming the controller node has the exact
hardware as the compute nodes.
'''
if 'Linux bridge' in agent_type:
cmd = "brctl --version | awk -F',' '{print $2}'"
ver_string = "Linux Bridge "
elif 'Open vSwitch' in agent_type:
cmd = "ovs-vsctl --version | awk -F')' '{print $2}'"
ver_string = "OVS "
else:
return "Unknown"
(status, std_output, _) = self.execute(cmd)
if status:
return "Unknown"
return ver_string + std_output.strip()
##################################################
# Only invoke the module directly for test purposes. Should be
# invoked from pns script.
##################################################
def main():
# ssh = SSH('localadmin', '172.29.87.29', key_filename='./ssh/id_rsa')
ssh = SSH('localadmin', '172.22.191.173', key_filename='./ssh/id_rsa')
# As argument pass the SSH access string, e.g. "localadmin@1.1.1.1:secret"
test_ssh = SSH(SSHAccess(sys.argv[1]))
print 'ID=' + ssh.distro_id
print 'ID_LIKE=' + ssh.distro_id_like
print 'VERSION_ID=' + ssh.distro_version
print 'ID=' + test_ssh.distro_id
print 'ID_LIKE=' + test_ssh.distro_id_like
print 'VERSION_ID=' + test_ssh.distro_version
# ssh.wait()
# print ssh.pidof('bash')
# print ssh.stat('/tmp')
print test_ssh.check_openstack_version()
print test_ssh.get_cpu_info()
print test_ssh.get_l2agent_version("Open vSwitch agent")
if __name__ == "__main__":
main()