diff --git a/requirements.txt b/requirements.txt index bb482905d8..1c5587bdbe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,3 +16,5 @@ gear>=0.5.4,<1.0.0 apscheduler>=2.1.1,<3.0 python-swiftclient>=1.6 python-keystoneclient>=0.4.2 +PrettyTable>=0.6,<0.8 +babel diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 7e1416f450..d191357ae2 100755 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -3870,3 +3870,62 @@ For CI problems and help debugging, contact ci@example.org""" self.worker.hold_jobs_in_build = False self.worker.release() self.waitUntilSettled() + + def test_client_get_running_jobs(self): + "Test that the RPC client can get a list of running jobs" + self.worker.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + A.addApproval('CRVW', 2) + self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) + self.waitUntilSettled() + + client = zuul.rpcclient.RPCClient('127.0.0.1', + self.gearman_server.port) + + # Wait for gearman server to send the initial workData back to zuul + start = time.time() + while True: + if time.time() - start > 10: + raise Exception("Timeout waiting for gearman server to report " + + "back to the client") + build = self.launcher.builds.values()[0] + if build.worker.name == "My Worker": + break + else: + time.sleep(0) + + running_items = client.get_running_jobs() + + self.assertEqual(1, len(running_items)) + running_item = running_items[0] + self.assertEqual([], running_item['failing_reasons']) + self.assertEqual([], running_item['items_behind']) + self.assertEqual('https://hostname/1', running_item['url']) + self.assertEqual(None, running_item['item_ahead']) + self.assertEqual('org/project', running_item['project']) + self.assertEqual(None, running_item['remaining_time']) + self.assertEqual(True, running_item['active']) + self.assertEqual('1,1', running_item['id']) + + self.assertEqual(3, len(running_item['jobs'])) + for job in running_item['jobs']: + if job['name'] == 'project-merge': + self.assertEqual('project-merge', job['name']) + self.assertEqual('gate', job['pipeline']) + self.assertEqual(False, job['retry']) + self.assertEqual(13, len(job['parameters'])) + self.assertEqual('https://server/job/project-merge/0/', + job['url']) + self.assertEqual(7, len(job['worker'])) + self.assertEqual(False, job['canceled']) + self.assertEqual(True, job['voting']) + self.assertEqual(None, job['result']) + self.assertEqual('gate', job['pipeline']) + break + + self.worker.hold_jobs_in_build = False + self.worker.release() + self.waitUntilSettled() + + running_items = client.get_running_jobs() + self.assertEqual(0, len(running_items)) diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py index a334bff19d..147fade9bd 100644 --- a/zuul/cmd/client.py +++ b/zuul/cmd/client.py @@ -15,11 +15,15 @@ # under the License. import argparse +import babel.dates import ConfigParser +import datetime import logging import logging.config import os +import prettytable import sys +import time import zuul.rpcclient @@ -66,6 +70,23 @@ class Client(object): required=True, nargs='+') cmd_promote.set_defaults(func=self.promote) + cmd_show = subparsers.add_parser('show', + help='valid show subcommands') + show_subparsers = cmd_show.add_subparsers(title='show') + show_running_jobs = show_subparsers.add_parser( + 'running-jobs', + help='show the running jobs' + ) + show_running_jobs.add_argument( + '--columns', + help="comma separated list of columns to display (or 'ALL')", + choices=self._show_running_jobs_columns().keys().append('ALL'), + default='name, worker.name, start_time, result' + ) + + # TODO: add filters such as queue, project, changeid etc + show_running_jobs.set_defaults(func=self.show_running_jobs) + self.args = parser.parse_args() def _get_version(self): @@ -119,6 +140,147 @@ class Client(object): change_ids=self.args.changes) return r + def show_running_jobs(self): + client = zuul.rpcclient.RPCClient(self.server, self.port) + running_items = client.get_running_jobs() + + if len(running_items) == 0: + print "No jobs currently running" + return True + + all_fields = self._show_running_jobs_columns() + if self.args.columns.upper() == 'ALL': + fields = all_fields.keys() + else: + fields = [f.strip().lower() for f in self.args.columns.split(',') + if f.strip().lower() in all_fields.keys()] + + table = prettytable.PrettyTable( + field_names=[all_fields[f]['title'] for f in fields]) + for item in running_items: + for job in item['jobs']: + values = [] + for f in fields: + v = job + for part in f.split('.'): + if hasattr(v, 'get'): + v = v.get(part, '') + if ('transform' in all_fields[f] + and callable(all_fields[f]['transform'])): + v = all_fields[f]['transform'](v) + if 'append' in all_fields[f]: + v += all_fields[f]['append'] + values.append(v) + table.add_row(values) + print table + return True + + def _epoch_to_relative_time(self, epoch): + if epoch: + delta = datetime.timedelta(seconds=(time.time() - int(epoch))) + return babel.dates.format_timedelta(delta, locale='en_US') + else: + return "Unknown" + + def _boolean_to_yes_no(self, value): + return 'Yes' if value else 'No' + + def _boolean_to_pass_fail(self, value): + return 'Pass' if value else 'Fail' + + def _format_list(self, l): + return ', '.join(l) if isinstance(l, list) else '' + + def _show_running_jobs_columns(self): + """A helper function to get the list of available columns for + `zuul show running-jobs`. Also describes how to convert particular + values (for example epoch to time string)""" + + return { + 'name': { + 'title': 'Job Name', + }, + 'elapsed_time': { + 'title': 'Elapsed Time', + 'transform': self._epoch_to_relative_time + }, + 'remaining_time': { + 'title': 'Remaining Time', + 'transform': self._epoch_to_relative_time + }, + 'url': { + 'title': 'URL' + }, + 'result': { + 'title': 'Result' + }, + 'voting': { + 'title': 'Voting', + 'transform': self._boolean_to_yes_no + }, + 'uuid': { + 'title': 'UUID' + }, + 'launch_time': { + 'title': 'Launch Time', + 'transform': self._epoch_to_relative_time, + 'append': ' ago' + }, + 'start_time': { + 'title': 'Start Time', + 'transform': self._epoch_to_relative_time, + 'append': ' ago' + }, + 'end_time': { + 'title': 'End Time', + 'transform': self._epoch_to_relative_time, + 'append': ' ago' + }, + 'estimated_time': { + 'title': 'Estimated Time', + 'transform': self._epoch_to_relative_time, + 'append': ' to go' + }, + 'pipeline': { + 'title': 'Pipeline' + }, + 'canceled': { + 'title': 'Canceled', + 'transform': self._boolean_to_yes_no + }, + 'retry': { + 'title': 'Retry' + }, + 'number': { + 'title': 'Number' + }, + 'parameters': { + 'title': 'Parameters' + }, + 'worker.name': { + 'title': 'Worker' + }, + 'worker.hostname': { + 'title': 'Worker Hostname' + }, + 'worker.ips': { + 'title': 'Worker IPs', + 'transform': self._format_list + }, + 'worker.fqdn': { + 'title': 'Worker Domain' + }, + 'worker.progam': { + 'title': 'Worker Program' + }, + 'worker.version': { + 'title': 'Worker Version' + }, + 'worker.extra': { + 'title': 'Worker Extra' + }, + } + def main(): client = Client() diff --git a/zuul/model.py b/zuul/model.py index 9028577282..82ce9d0378 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -269,7 +269,7 @@ class Pipeline(object): if j_changes: j_queue['heads'].append(j_changes) j_changes = [] - j_changes.append(self.formatItemJSON(e)) + j_changes.append(e.formatJSON()) if (len(j_changes) > 1 and (j_changes[-2]['remaining_time'] is not None) and (j_changes[-1]['remaining_time'] is not None)): @@ -280,101 +280,6 @@ class Pipeline(object): j_queue['heads'].append(j_changes) return j_pipeline - def formatStatus(self, item, indent=0, html=False): - changeish = item.change - indent_str = ' ' * indent - ret = '' - if html and hasattr(changeish, 'url') and changeish.url is not None: - ret += '%sProject %s change %s\n' % ( - indent_str, - changeish.project.name, - changeish.url, - changeish._id()) - else: - ret += '%sProject %s change %s based on %s\n' % ( - indent_str, - changeish.project.name, - changeish._id(), - item.item_ahead) - for job in self.getJobs(changeish): - build = item.current_build_set.getBuild(job.name) - if build: - result = build.result - else: - result = None - job_name = job.name - if not job.voting: - voting = ' (non-voting)' - else: - voting = '' - if html: - if build: - url = build.url - else: - url = None - if url is not None: - job_name = '%s' % (url, job_name) - ret += '%s %s: %s%s' % (indent_str, job_name, result, voting) - ret += '\n' - return ret - - def formatItemJSON(self, item): - changeish = item.change - ret = {} - ret['active'] = item.active - if hasattr(changeish, 'url') and changeish.url is not None: - ret['url'] = changeish.url - else: - ret['url'] = None - ret['id'] = changeish._id() - if item.item_ahead: - ret['item_ahead'] = item.item_ahead.change._id() - else: - ret['item_ahead'] = None - ret['items_behind'] = [i.change._id() for i in item.items_behind] - ret['failing_reasons'] = item.current_build_set.failing_reasons - ret['zuul_ref'] = item.current_build_set.ref - ret['project'] = changeish.project.name - ret['enqueue_time'] = int(item.enqueue_time * 1000) - ret['jobs'] = [] - max_remaining = 0 - for job in self.getJobs(changeish): - now = time.time() - build = item.current_build_set.getBuild(job.name) - elapsed = None - remaining = None - result = None - url = None - if build: - result = build.result - url = build.url - if build.start_time: - if build.end_time: - elapsed = int((build.end_time - - build.start_time) * 1000) - remaining = 0 - else: - elapsed = int((now - build.start_time) * 1000) - if build.estimated_time: - remaining = max( - int(build.estimated_time * 1000) - elapsed, - 0) - if remaining and remaining > max_remaining: - max_remaining = remaining - ret['jobs'].append( - dict( - name=job.name, - elapsed_time=elapsed, - remaining_time=remaining, - url=url, - result=result, - voting=job.voting)) - if self.haveAllJobsStarted(item): - ret['remaining_time'] = max_remaining - else: - ret['remaining_time'] = None - return ret - class ActionReporter(object): """An ActionReporter has a reporter and its configured paramaters""" @@ -760,6 +665,124 @@ class QueueItem(object): def setReportedResult(self, result): self.current_build_set.result = result + def formatJSON(self): + changeish = self.change + ret = {} + ret['active'] = self.active + if hasattr(changeish, 'url') and changeish.url is not None: + ret['url'] = changeish.url + else: + ret['url'] = None + ret['id'] = changeish._id() + if self.item_ahead: + ret['item_ahead'] = self.item_ahead.change._id() + else: + ret['item_ahead'] = None + ret['items_behind'] = [i.change._id() for i in self.items_behind] + ret['failing_reasons'] = self.current_build_set.failing_reasons + ret['zuul_ref'] = self.current_build_set.ref + ret['project'] = changeish.project.name + ret['enqueue_time'] = int(self.enqueue_time * 1000) + ret['jobs'] = [] + max_remaining = 0 + for job in self.pipeline.getJobs(changeish): + now = time.time() + build = self.current_build_set.getBuild(job.name) + elapsed = None + remaining = None + result = None + url = None + worker = None + if build: + result = build.result + url = build.url + if build.start_time: + if build.end_time: + elapsed = int((build.end_time - + build.start_time) * 1000) + remaining = 0 + else: + elapsed = int((now - build.start_time) * 1000) + if build.estimated_time: + remaining = max( + int(build.estimated_time * 1000) - elapsed, + 0) + worker = { + 'name': build.worker.name, + 'hostname': build.worker.hostname, + 'ips': build.worker.ips, + 'fqdn': build.worker.fqdn, + 'program': build.worker.program, + 'version': build.worker.version, + 'extra': build.worker.extra + } + if remaining and remaining > max_remaining: + max_remaining = remaining + + ret['jobs'].append({ + 'name': job.name, + 'elapsed_time': elapsed, + 'remaining_time': remaining, + 'url': url, + 'result': result, + 'voting': job.voting, + 'uuid': build.uuid if build else None, + 'launch_time': build.launch_time if build else None, + 'start_time': build.start_time if build else None, + 'end_time': build.end_time if build else None, + 'estimated_time': build.estimated_time if build else None, + 'pipeline': build.pipeline.name if build else None, + 'canceled': build.canceled if build else None, + 'retry': build.retry if build else None, + 'number': build.number if build else None, + 'parameters': build.parameters if build else None, + 'worker': worker + }) + + if self.pipeline.haveAllJobsStarted(self): + ret['remaining_time'] = max_remaining + else: + ret['remaining_time'] = None + return ret + + def formatStatus(self, indent=0, html=False): + changeish = self.change + indent_str = ' ' * indent + ret = '' + if html and hasattr(changeish, 'url') and changeish.url is not None: + ret += '%sProject %s change %s\n' % ( + indent_str, + changeish.project.name, + changeish.url, + changeish._id()) + else: + ret += '%sProject %s change %s based on %s\n' % ( + indent_str, + changeish.project.name, + changeish._id(), + self.item_ahead) + for job in self.pipeline.getJobs(changeish): + build = self.current_build_set.getBuild(job.name) + if build: + result = build.result + else: + result = None + job_name = job.name + if not job.voting: + voting = ' (non-voting)' + else: + voting = '' + if html: + if build: + url = build.url + else: + url = None + if url is not None: + job_name = '%s' % (url, job_name) + ret += '%s %s: %s%s' % (indent_str, job_name, result, voting) + ret += '\n' + return ret + class Changeish(object): """Something like a change; either a change or a ref""" diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py index 69390c04f6..7f572be760 100644 --- a/zuul/rpcclient.py +++ b/zuul/rpcclient.py @@ -46,7 +46,7 @@ class RPCClient(object): if job.exception: raise RPCFailure(job.exception) self.log.debug("Job complete, success: %s" % (not job.failure)) - return (not job.failure) + return job def enqueue(self, pipeline, project, trigger, change): data = {'pipeline': pipeline, @@ -54,13 +54,21 @@ class RPCClient(object): 'trigger': trigger, 'change': change, } - return self.submitJob('zuul:enqueue', data) + return not self.submitJob('zuul:enqueue', data).failure def promote(self, pipeline, change_ids): data = {'pipeline': pipeline, 'change_ids': change_ids, } - return self.submitJob('zuul:promote', data) + return not self.submitJob('zuul:promote', data).failure + + def get_running_jobs(self): + data = {} + job = self.submitJob('zuul:get_running_jobs', data) + if job.failure: + return False + else: + return json.loads(job.data[0]) def shutdown(self): self.gearman.shutdown() diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py index c1b92168c4..a5a5b5de28 100644 --- a/zuul/rpclistener.py +++ b/zuul/rpclistener.py @@ -48,6 +48,7 @@ class RPCListener(object): def register(self): self.worker.registerFunction("zuul:enqueue") self.worker.registerFunction("zuul:promote") + self.worker.registerFunction("zuul:get_running_jobs") def stop(self): self.log.debug("Stopping") @@ -123,3 +124,14 @@ class RPCListener(object): change_ids = args['change_ids'] self.sched.promote(pipeline_name, change_ids) job.sendWorkComplete() + + def handle_get_running_jobs(self, job): + # args = json.loads(job.arguments) + # TODO: use args to filter by pipeline etc + running_items = [] + for pipeline_name, pipeline in self.sched.layout.pipelines.iteritems(): + for queue in pipeline.queues: + for item in queue.queue: + running_items.append(item.formatJSON()) + + job.sendWorkComplete(json.dumps(running_items)) diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 18f44db7d2..f5d6629c54 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -1305,7 +1305,7 @@ class BasePipelineManager(object): changed = True status = '' for item in queue.queue: - status += self.pipeline.formatStatus(item) + status += item.formatStatus() if status: self.log.debug("Queue %s status is now:\n %s" % (queue.name, status)) @@ -1334,7 +1334,7 @@ class BasePipelineManager(object): self.pipeline.setResult(item, build) self.log.debug("Item %s status is now:\n %s" % - (item, self.pipeline.formatStatus(item))) + (item, item.formatStatus())) self.updateBuildDescriptions(build.build_set) return True