Merge "Add: scripts_all_pairs - new functionality"
This commit is contained in:
commit
6ed9f2ba64
@ -331,6 +331,9 @@ def main(argv=None):
|
||||
if nm.has(Node.ckey, Node.skey):
|
||||
pretty_run(args.quiet, 'Executing commands and scripts',
|
||||
nm.run_commands, args=(args.maxthreads,))
|
||||
if nm.has('scripts_all_pairs'):
|
||||
pretty_run(args.quiet, 'Executing paired scripts',
|
||||
nm.run_scripts_all_pairs, args=(args.maxthreads,))
|
||||
if nm.has(Node.fkey, Node.flkey):
|
||||
pretty_run(args.quiet, 'Collecting files and filelists',
|
||||
nm.get_files, args=(args.maxthreads,))
|
||||
|
@ -16,7 +16,7 @@
|
||||
# under the License.
|
||||
|
||||
project_name = 'timmy'
|
||||
version = '1.20.6'
|
||||
version = '1.21.0'
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
|
115
timmy/nodes.py
115
timmy/nodes.py
@ -73,7 +73,7 @@ class Node(object):
|
||||
|
||||
def __init__(self, ip, conf, id=None, name=None, fqdn=None, mac=None,
|
||||
cluster=None, roles=None, os_platform=None,
|
||||
online=True, status="ready", logger=None):
|
||||
online=True, status="ready", logger=None, network_data=None):
|
||||
self.logger = logger or logging.getLogger(project_name)
|
||||
self.id = int(id) if id is not None else None
|
||||
self.mac = mac
|
||||
@ -89,6 +89,7 @@ class Node(object):
|
||||
self.logger.critical('Node: ip address must be defined')
|
||||
sys.exit(111)
|
||||
self.ip = ip
|
||||
self.network_data = network_data
|
||||
self.release = None
|
||||
self.files = []
|
||||
self.filelists = []
|
||||
@ -367,6 +368,80 @@ class Node(object):
|
||||
prefix=self.prefix)
|
||||
self.check_code(code, 'exec_simple_cmd', cmd, errs, ok_codes)
|
||||
|
||||
def exec_pair(self, phase, server_node=None, fake=False):
|
||||
sn = server_node
|
||||
cl = self.cluster_repr
|
||||
if sn:
|
||||
self.logger.debug('%s: phase %s: server %s' % (self.repr, phase,
|
||||
sn.repr))
|
||||
else:
|
||||
self.logger.debug('%s: phase %s' % (self.repr, phase))
|
||||
nond_msg = ('%s: network specified but network_data not set for %s')
|
||||
nonet_msg = ('%s: network %s not found in network_data of %s')
|
||||
nosrv_msg = ('%s: server_node not provided')
|
||||
noip_msg = ('%s: %s has no IP in network %s')
|
||||
for i in self.scripts_all_pairs:
|
||||
if phase not in i:
|
||||
self.logger.warning('phase %s not defined in config' % phase)
|
||||
return self.scripts_all_pairs
|
||||
if phase.startswith('client'):
|
||||
if not sn:
|
||||
self.logger.warning(nosrv_msg % self.repr)
|
||||
return self.scripts_all_pairs
|
||||
if 'network' in i:
|
||||
if not sn.network_data:
|
||||
self.logger.warning(nond_msg % (self.repr, sn.repr))
|
||||
return self.scripts_all_pairs
|
||||
nd = sn.network_data
|
||||
net_dict = dict((v['name'], v) for v in nd)
|
||||
if i['network'] not in net_dict:
|
||||
self.logger.warning(nonet_msg % (self.repr, sn.repr))
|
||||
return self.scripts_all_pairs
|
||||
if 'ip' not in net_dict[i['network']]:
|
||||
self.logger.warning(noip_msg % (self.repr, sn.repr,
|
||||
i['network']))
|
||||
return self.scripts_all_pairs
|
||||
ip = net_dict[i['network']]['ip']
|
||||
if '/' in ip:
|
||||
server_ip = ip.split('/')[0]
|
||||
else:
|
||||
server_ip = ip
|
||||
else:
|
||||
server_ip = sn.ip
|
||||
phase_val = i[phase]
|
||||
ddir = os.path.join(self.outdir, 'scripts_all_pairs', cl, phase,
|
||||
self.repr)
|
||||
tools.mdir(ddir)
|
||||
if type(phase_val) is dict:
|
||||
env_vars = [phase_val.values()[0]]
|
||||
phase_val = phase_val.keys()[0]
|
||||
else:
|
||||
env_vars = self.env_vars
|
||||
if os.path.sep in phase_val:
|
||||
f = phase_val
|
||||
else:
|
||||
f = os.path.join(self.rqdir, Node.skey, phase_val)
|
||||
dfile = os.path.join(ddir, os.path.basename(f))
|
||||
if phase.startswith('client'):
|
||||
env_vars.append('SERVER_IP=%s' % server_ip)
|
||||
dname = os.path.basename(f) + '-%s' % server_ip
|
||||
dfile = os.path.join(ddir, dname)
|
||||
elif phase == 'server_stop' and 'server_output' in i:
|
||||
env_vars.append('SERVER_OUTPUT=%s' % i['server_output'])
|
||||
if fake:
|
||||
return self.scripts_all_pairs
|
||||
outs, errs, code = tools.ssh_node(ip=self.ip,
|
||||
filename=f,
|
||||
ssh_opts=self.ssh_opts,
|
||||
env_vars=env_vars,
|
||||
timeout=self.timeout,
|
||||
prefix=self.prefix)
|
||||
self.check_code(code, 'exec_pair, phase:%s' % phase, f, errs)
|
||||
if phase == 'server_start' and code == 0:
|
||||
i['server_output'] = outs.strip()
|
||||
open(dfile, 'a+').write(outs)
|
||||
return self.scripts_all_pairs
|
||||
|
||||
def get_files(self, timeout=15):
|
||||
self.logger.info('%s: getting files' % self.repr)
|
||||
cl = self.cluster_repr
|
||||
@ -914,7 +989,7 @@ class NodeManager(object):
|
||||
for node_data in self.nodes_json:
|
||||
params = {'conf': self.conf}
|
||||
keys = ['id', 'cluster', 'roles', 'fqdn', 'name', 'mac',
|
||||
'os_platform', 'status', 'online', 'ip']
|
||||
'os_platform', 'status', 'online', 'ip', 'network_data']
|
||||
for key in keys:
|
||||
if key in node_data:
|
||||
params[key] = node_data[key]
|
||||
@ -1164,6 +1239,39 @@ class NodeManager(object):
|
||||
run_items.append(tools.RunItem(target=n.put_files))
|
||||
tools.run_batch(run_items, 10)
|
||||
|
||||
@run_with_lock
|
||||
def run_scripts_all_pairs(self, maxthreads, fake=False):
|
||||
if len(self.selected_nodes()) < 2:
|
||||
self.logger.warning('less than 2 nodes are available, '
|
||||
'skipping paired scripts')
|
||||
return
|
||||
run_server_start_items = []
|
||||
run_server_stop_items = []
|
||||
for n in self.selected_nodes():
|
||||
start_args = {'phase': 'server_start', 'fake': fake}
|
||||
run_server_start_items.append(tools.RunItem(target=n.exec_pair,
|
||||
args=start_args,
|
||||
key=n.ip))
|
||||
stop_args = {'phase': 'server_stop', 'fake': fake}
|
||||
run_server_stop_items.append(tools.RunItem(target=n.exec_pair,
|
||||
args=stop_args))
|
||||
result = tools.run_batch(run_server_start_items, maxthreads,
|
||||
dict_result=True)
|
||||
for key in result:
|
||||
self.nodes[key].scripts_all_pairs = result[key]
|
||||
for pairset in tools.all_pairs(self.selected_nodes()):
|
||||
run_client_items = []
|
||||
self.logger.info(['%s->%s' % (p[0].ip, p[1].ip) for p in pairset])
|
||||
for pair in pairset:
|
||||
client = pair[0]
|
||||
server = pair[1]
|
||||
client_args = {'phase': 'client', 'server_node': server,
|
||||
'fake': fake}
|
||||
run_client_items.append(tools.RunItem(target=client.exec_pair,
|
||||
args=client_args))
|
||||
tools.run_batch(run_client_items, len(run_client_items))
|
||||
tools.run_batch(run_server_stop_items, maxthreads)
|
||||
|
||||
def has(self, *keys):
|
||||
nodes = {}
|
||||
for k in keys:
|
||||
@ -1176,6 +1284,9 @@ class NodeManager(object):
|
||||
nodes[k].append(n)
|
||||
return nodes
|
||||
|
||||
def selected_nodes(self):
|
||||
return [n for n in self.nodes.values() if not n.filtered_out]
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
return 0
|
||||
|
@ -318,7 +318,7 @@ def ssh_node(ip, command='', ssh_opts=None, env_vars=None, timeout=15,
|
||||
bstr = "%s timeout '%s' bash -c " % (
|
||||
env_vars, timeout)
|
||||
else:
|
||||
bstr = "timeout '%s' ssh -t -T %s '%s' '%s' " % (
|
||||
bstr = "timeout '%s' ssh -T %s '%s' '%s' " % (
|
||||
timeout, ssh_opts, ip, env_vars)
|
||||
if filename is None:
|
||||
cmd = '%s %s' % (bstr, quote(prefix + ' ' + command))
|
||||
@ -390,5 +390,32 @@ def w_list(value):
|
||||
return value if type(value) == list else [value]
|
||||
|
||||
|
||||
def all_pairs(items):
|
||||
def incomplete(i_set, p_dict):
|
||||
for i, p_set in p_dict.items():
|
||||
not_paired = i_set.difference(p_set).difference([i])
|
||||
if not_paired:
|
||||
return not_paired
|
||||
|
||||
items_set = set(items)
|
||||
pairs = []
|
||||
paired = {}
|
||||
for i in items_set:
|
||||
paired[i] = set()
|
||||
while incomplete(items_set, paired):
|
||||
busy = set()
|
||||
current_pairs = []
|
||||
for i in [i for i in items if items_set.difference(paired[i])]:
|
||||
can_pair = incomplete(items_set.difference(busy), {i: paired[i]})
|
||||
if i not in busy and can_pair:
|
||||
pair_i = next(iter(can_pair))
|
||||
current_pairs.append([i, pair_i])
|
||||
busy.add(i)
|
||||
busy.add(pair_i)
|
||||
paired[i].add(pair_i)
|
||||
pairs.append(current_pairs)
|
||||
return pairs
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(0)
|
||||
|
Loading…
x
Reference in New Issue
Block a user