Ansible launcher: support ftp publisher
Change-Id: I53af72a68659e886d3a4bc974b1b638644128aa5
This commit is contained in:
committed by
James E. Blair
parent
79be4baf48
commit
08d7d4b49b
@@ -17,6 +17,7 @@ import logging
|
||||
import multiprocessing
|
||||
import os
|
||||
import Queue
|
||||
import re
|
||||
import shutil
|
||||
import signal
|
||||
import socket
|
||||
@@ -56,6 +57,7 @@ class JobDir(object):
|
||||
|
||||
class LaunchServer(object):
|
||||
log = logging.getLogger("zuul.LaunchServer")
|
||||
section_re = re.compile('site "(.*?)"')
|
||||
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
@@ -65,6 +67,17 @@ class LaunchServer(object):
|
||||
self.jobs = self.mpmanager.dict()
|
||||
self.zmq_send_queue = multiprocessing.JoinableQueue()
|
||||
self.termination_queue = multiprocessing.JoinableQueue()
|
||||
self.sites = {}
|
||||
|
||||
for section in config.sections():
|
||||
m = self.section_re.match(section)
|
||||
if m:
|
||||
sitename = m.group(1)
|
||||
d = {}
|
||||
d['host'] = config.get(section, 'host')
|
||||
d['user'] = config.get(section, 'user')
|
||||
d['pass'] = config.get(section, 'pass')
|
||||
self.sites[sitename] = d
|
||||
|
||||
def start(self):
|
||||
self._gearman_running = True
|
||||
@@ -192,7 +205,7 @@ class LaunchServer(object):
|
||||
def assignNode(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
self.log.debug("Assigned node with arguments: %s" % (args,))
|
||||
worker = NodeWorker(self.config, self.jobs,
|
||||
worker = NodeWorker(self.config, self.jobs, self.sites,
|
||||
args['name'], args['host'],
|
||||
args['description'], args['labels'],
|
||||
self.hostname, self.zmq_send_queue,
|
||||
@@ -225,11 +238,12 @@ class LaunchServer(object):
|
||||
class NodeWorker(object):
|
||||
log = logging.getLogger("zuul.NodeWorker")
|
||||
|
||||
def __init__(self, config, jobs, name, host, description, labels,
|
||||
def __init__(self, config, jobs, sites, name, host, description, labels,
|
||||
manager_name, zmq_send_queue, termination_queue):
|
||||
self.log.debug("Creating node worker %s" % (name,))
|
||||
self.config = config
|
||||
self.jobs = jobs
|
||||
self.sites = sites
|
||||
self.name = name
|
||||
self.host = host
|
||||
self.description = description
|
||||
@@ -511,6 +525,59 @@ class NodeWorker(object):
|
||||
def getHostList(self):
|
||||
return [('node', dict(ansible_host=self.host))]
|
||||
|
||||
def _makeSCPTask(self, publisher):
|
||||
tasks = []
|
||||
for scpfile in publisher['scp']['files']:
|
||||
site = publisher['scp']['site']
|
||||
if site not in self.sites:
|
||||
raise Exception("Undefined SCP site: %s" % (site,))
|
||||
site = self.sites[site]
|
||||
if scpfile.get('copy-console'):
|
||||
src = '/tmp/console.log'
|
||||
else:
|
||||
src = scpfile['source']
|
||||
syncargs = dict(src=src,
|
||||
dest=scpfile['target'])
|
||||
task = dict(synchronize=syncargs,
|
||||
delegate_to=site['host'])
|
||||
if not scpfile.get('copy-after-failure'):
|
||||
task['when'] = 'success'
|
||||
tasks.append(task)
|
||||
return tasks
|
||||
|
||||
def _makeFTPTask(self, jobdir, publisher):
|
||||
tasks = []
|
||||
ftp = publisher['ftp']
|
||||
site = ftp['site']
|
||||
if site not in self.sites:
|
||||
raise Exception("Undefined FTP site: %s" % site)
|
||||
site = self.sites[site]
|
||||
ftproot = tempfile.mkdtemp(dir=jobdir.ansible_root)
|
||||
ftpcontent = os.path.join(ftproot, 'content')
|
||||
os.makedirs(ftpcontent)
|
||||
ftpscript = os.path.join(ftproot, 'script')
|
||||
syncargs = dict(src=ftp['source'],
|
||||
dest=ftpcontent)
|
||||
task = dict(synchronize=syncargs,
|
||||
when='success')
|
||||
tasks.append(task)
|
||||
task = dict(shell='lftp -f %s' % ftpscript,
|
||||
when='success')
|
||||
ftpsource = ftpcontent
|
||||
if ftp.get('remove-prefix'):
|
||||
ftpsource = os.path.join(ftpcontent, ftp['remove-prefix'])
|
||||
while ftpsource[-1] == '/':
|
||||
ftpsource = ftpsource[:-1]
|
||||
ftptarget = ftp['target']
|
||||
while ftptarget[-1] == '/':
|
||||
ftptarget = ftptarget[:-1]
|
||||
with open(ftpscript, 'w') as script:
|
||||
script.write('open %s\n' % site['host'])
|
||||
script.write('user %s %s\n' % (site['user'], site['pass']))
|
||||
script.write('mirror -R %s %s\n' % (ftpsource, ftptarget))
|
||||
tasks.append(task)
|
||||
return tasks
|
||||
|
||||
def prepareAnsibleFiles(self, jobdir, gearman_job):
|
||||
with open(jobdir.inventory, 'w') as inventory:
|
||||
for host_name, host_vars in self.getHostList():
|
||||
@@ -548,18 +615,9 @@ class NodeWorker(object):
|
||||
tasks = []
|
||||
for publisher in jjb_job.get('publishers', []):
|
||||
if 'scp' in publisher:
|
||||
for scpfile in publisher['scp']['files']:
|
||||
if scpfile.get('copy-console'):
|
||||
src = '/tmp/console.log'
|
||||
else:
|
||||
src = scpfile['source']
|
||||
syncargs = dict(src=src,
|
||||
dest=scpfile['target'])
|
||||
task = dict(synchronize=syncargs,
|
||||
delegate_to=publisher['scp']['site'])
|
||||
if not scpfile.get('copy-after-failure'):
|
||||
task['when'] = 'success'
|
||||
tasks.append(task)
|
||||
tasks.extend(self._makeSCPTask(publisher))
|
||||
if 'ftp' in publisher:
|
||||
tasks.extend(self._makeFTPTask(jobdir, publisher))
|
||||
play = dict(hosts='node', name='Publishers',
|
||||
tasks=tasks)
|
||||
playbook.write(yaml.dump([play]))
|
||||
|
||||
Reference in New Issue
Block a user