Merge "Unify gearman worker handling"
This commit is contained in:
commit
eb1c5e28a6
|
@ -3488,7 +3488,7 @@ class ZuulTestCase(BaseTestCase):
|
|||
# It hasn't been reported yet.
|
||||
return False
|
||||
# Make sure that none of the worker connections are in GRAB_WAIT
|
||||
worker = self.executor_server.executor_worker
|
||||
worker = self.executor_server.executor_gearworker.gearman
|
||||
for connection in worker.active_connections:
|
||||
if connection.state == 'GRAB_WAIT':
|
||||
return False
|
||||
|
|
|
@ -224,10 +224,11 @@ class TestScheduler(ZuulTestCase):
|
|||
def test_branch_deletion(self):
|
||||
"Test the correct variant of a job runs on a branch"
|
||||
self._startMerger()
|
||||
for f in list(self.executor_server.merger_worker.functions.keys()):
|
||||
merger_gear = self.executor_server.merger_gearworker.gearman
|
||||
for f in list(merger_gear.functions.keys()):
|
||||
f = f.decode('utf8')
|
||||
if f.startswith('merger:'):
|
||||
self.executor_server.merger_worker.unRegisterFunction(f)
|
||||
merger_gear.unRegisterFunction(f)
|
||||
|
||||
self.create_branch('org/project', 'stable')
|
||||
self.fake_gerrit.addEvent(
|
||||
|
|
|
@ -23,7 +23,6 @@ import threading
|
|||
import time
|
||||
import re
|
||||
import json
|
||||
import traceback
|
||||
|
||||
import cherrypy
|
||||
import cachecontrol
|
||||
|
@ -36,11 +35,9 @@ import requests
|
|||
import github3
|
||||
import github3.exceptions
|
||||
|
||||
import gear
|
||||
|
||||
from zuul.connection import BaseConnection
|
||||
from zuul.lib.gearworker import ZuulGearWorker
|
||||
from zuul.web.handler import BaseWebController
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.lib.logutil import get_annotated_logger
|
||||
from zuul.model import Ref, Branch, Tag, Project
|
||||
from zuul.exceptions import MergeFailure
|
||||
|
@ -268,36 +265,21 @@ class GithubGearmanWorker(object):
|
|||
def __init__(self, connection):
|
||||
self.config = connection.sched.config
|
||||
self.connection = connection
|
||||
self.thread = threading.Thread(target=self._run,
|
||||
name='github-gearman-worker')
|
||||
self._running = False
|
||||
|
||||
handler = "github:%s:payload" % self.connection.connection_name
|
||||
self.jobs = {
|
||||
handler: self.handle_payload,
|
||||
}
|
||||
|
||||
def _run(self):
|
||||
while self._running:
|
||||
try:
|
||||
job = self.gearman.getJob()
|
||||
try:
|
||||
if job.name not in self.jobs:
|
||||
self.log.exception("Exception while running job")
|
||||
job.sendWorkException(
|
||||
traceback.format_exc().encode('utf8'))
|
||||
continue
|
||||
output = self.jobs[job.name](json.loads(job.arguments))
|
||||
job.sendWorkComplete(json.dumps(output))
|
||||
except Exception:
|
||||
self.log.exception("Exception while running job")
|
||||
job.sendWorkException(
|
||||
traceback.format_exc().encode('utf8'))
|
||||
except gear.InterruptedError:
|
||||
pass
|
||||
except Exception:
|
||||
self.log.exception("Exception while getting job")
|
||||
self.gearworker = ZuulGearWorker(
|
||||
'Zuul Github Connector',
|
||||
'zuul.GithubGearmanWorker',
|
||||
'github-gearman-worker',
|
||||
self.config,
|
||||
self.jobs)
|
||||
|
||||
def handle_payload(self, args):
|
||||
def handle_payload(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
headers = args.get("headers")
|
||||
body = args.get("body")
|
||||
|
||||
|
@ -314,7 +296,7 @@ class GithubGearmanWorker(object):
|
|||
output = {'return_code': 503}
|
||||
log.exception("Exception handling Github event:")
|
||||
|
||||
return output
|
||||
job.sendWorkComplete(json.dumps(output))
|
||||
|
||||
def __dispatch_event(self, body, headers, log):
|
||||
try:
|
||||
|
@ -334,29 +316,10 @@ class GithubGearmanWorker(object):
|
|||
raise Exception(message)
|
||||
|
||||
def start(self):
|
||||
self._running = True
|
||||
server = self.config.get('gearman', 'server')
|
||||
port = get_default(self.config, 'gearman', 'port', 4730)
|
||||
ssl_key = get_default(self.config, 'gearman', 'ssl_key')
|
||||
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
|
||||
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
self.gearman = gear.TextWorker('Zuul Github Connector')
|
||||
self.log.debug("Connect to gearman")
|
||||
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
|
||||
keepalive=True, tcp_keepidle=60,
|
||||
tcp_keepintvl=30, tcp_keepcnt=5)
|
||||
self.log.debug("Waiting for server")
|
||||
self.gearman.waitForServer()
|
||||
self.log.debug("Registering")
|
||||
for job in self.jobs:
|
||||
self.gearman.registerFunction(job)
|
||||
self.thread.start()
|
||||
self.gearworker.start()
|
||||
|
||||
def stop(self):
|
||||
self._running = False
|
||||
self.gearman.stopWaitingForJobs()
|
||||
self.thread.join()
|
||||
self.gearman.shutdown()
|
||||
self.gearworker.stop()
|
||||
|
||||
|
||||
class GithubEventProcessor(object):
|
||||
|
|
|
@ -32,6 +32,7 @@ import git
|
|||
from urllib.parse import urlsplit
|
||||
|
||||
from zuul.lib.ansible import AnsibleManager
|
||||
from zuul.lib.gearworker import ZuulGearWorker
|
||||
from zuul.lib.yamlutil import yaml
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.lib.logutil import get_annotated_logger
|
||||
|
@ -2415,6 +2416,36 @@ class ExecutorServer(object):
|
|||
self.ansible_manager.install()
|
||||
self.ansible_manager.copyAnsibleFiles()
|
||||
|
||||
self.merger_jobs = {
|
||||
'merger:merge': self.merge,
|
||||
'merger:cat': self.cat,
|
||||
'merger:refstate': self.refstate,
|
||||
'merger:fileschanges': self.fileschanges,
|
||||
}
|
||||
self.merger_gearworker = ZuulGearWorker(
|
||||
'Zuul Executor Merger',
|
||||
'zuul.ExecutorServer',
|
||||
'merger',
|
||||
self.config,
|
||||
self.merger_jobs)
|
||||
|
||||
function_name = 'executor:execute'
|
||||
if self.zone:
|
||||
function_name += ':%s' % self.zone
|
||||
|
||||
self.executor_jobs = {
|
||||
"executor:resume:%s" % self.hostname: self.resumeJob,
|
||||
"executor:stop:%s" % self.hostname: self.stopJob,
|
||||
function_name: self.executeJob,
|
||||
}
|
||||
|
||||
self.executor_gearworker = ZuulGearWorker(
|
||||
'Zuul Executor Server',
|
||||
'zuul.ExecutorServer',
|
||||
'executor',
|
||||
self.config,
|
||||
self.executor_jobs)
|
||||
|
||||
def _getMerger(self, root, cache_root, logger=None):
|
||||
return zuul.merger.merger.Merger(
|
||||
root, self.connections, self.merge_email, self.merge_name,
|
||||
|
@ -2424,25 +2455,9 @@ class ExecutorServer(object):
|
|||
def start(self):
|
||||
self._running = True
|
||||
self._command_running = True
|
||||
server = self.config.get('gearman', 'server')
|
||||
port = get_default(self.config, 'gearman', 'port', 4730)
|
||||
ssl_key = get_default(self.config, 'gearman', 'ssl_key')
|
||||
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
|
||||
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
|
||||
self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
|
||||
keepalive=True, tcp_keepidle=60,
|
||||
tcp_keepintvl=30, tcp_keepcnt=5)
|
||||
self.executor_worker = ExecutorExecuteWorker(
|
||||
self, 'Zuul Executor Server')
|
||||
self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
|
||||
keepalive=True, tcp_keepidle=60,
|
||||
tcp_keepintvl=30, tcp_keepcnt=5)
|
||||
self.log.debug("Waiting for server")
|
||||
self.merger_worker.waitForServer()
|
||||
self.executor_worker.waitForServer()
|
||||
self.log.debug("Registering")
|
||||
self.register()
|
||||
|
||||
self.merger_gearworker.start()
|
||||
self.executor_gearworker.start()
|
||||
|
||||
self.log.debug("Starting command processor")
|
||||
self.command_socket.start()
|
||||
|
@ -2458,14 +2473,7 @@ class ExecutorServer(object):
|
|||
update_thread.daemon = True
|
||||
update_thread.start()
|
||||
self.update_threads.append(update_thread)
|
||||
self.merger_thread = threading.Thread(target=self.run_merger,
|
||||
name='merger')
|
||||
self.merger_thread.daemon = True
|
||||
self.merger_thread.start()
|
||||
self.executor_thread = threading.Thread(target=self.run_executor,
|
||||
name='executor')
|
||||
self.executor_thread.daemon = True
|
||||
self.executor_thread.start()
|
||||
|
||||
self.governor_stop_event = threading.Event()
|
||||
self.governor_thread = threading.Thread(target=self.run_governor,
|
||||
name='governor')
|
||||
|
@ -2473,35 +2481,24 @@ class ExecutorServer(object):
|
|||
self.governor_thread.start()
|
||||
self.disk_accountant.start()
|
||||
|
||||
def register(self):
|
||||
self.register_work()
|
||||
self.executor_worker.registerFunction("executor:resume:%s" %
|
||||
self.hostname)
|
||||
self.executor_worker.registerFunction("executor:stop:%s" %
|
||||
self.hostname)
|
||||
self.merger_worker.registerFunction("merger:merge")
|
||||
self.merger_worker.registerFunction("merger:cat")
|
||||
self.merger_worker.registerFunction("merger:refstate")
|
||||
self.merger_worker.registerFunction("merger:fileschanges")
|
||||
|
||||
def register_work(self):
|
||||
if self._running:
|
||||
self.accepting_work = True
|
||||
function_name = 'executor:execute'
|
||||
if self.zone:
|
||||
function_name += ':%s' % self.zone
|
||||
self.executor_worker.registerFunction(function_name)
|
||||
self.executor_gearworker.gearman.registerFunction(function_name)
|
||||
# TODO(jeblair): Update geard to send a noop after
|
||||
# registering for a job which is in the queue, then remove
|
||||
# this API violation.
|
||||
self.executor_worker._sendGrabJobUniq()
|
||||
self.executor_gearworker.gearman._sendGrabJobUniq()
|
||||
|
||||
def unregister_work(self):
|
||||
self.accepting_work = False
|
||||
function_name = 'executor:execute'
|
||||
if self.zone:
|
||||
function_name += ':%s' % self.zone
|
||||
self.executor_worker.unRegisterFunction(function_name)
|
||||
self.executor_gearworker.gearman.unRegisterFunction(function_name)
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping")
|
||||
|
@ -2511,8 +2508,8 @@ class ExecutorServer(object):
|
|||
self.governor_stop_event.set()
|
||||
self.governor_thread.join()
|
||||
# Stop accepting new jobs
|
||||
self.merger_worker.setFunctions([])
|
||||
self.executor_worker.setFunctions([])
|
||||
self.merger_gearworker.gearman.setFunctions([])
|
||||
self.executor_gearworker.gearman.setFunctions([])
|
||||
# Tell the executor worker to abort any jobs it just accepted,
|
||||
# and grab the list of currently running job workers.
|
||||
with self.run_lock:
|
||||
|
@ -2541,8 +2538,8 @@ class ExecutorServer(object):
|
|||
|
||||
# All job results should have been sent by now, shutdown the
|
||||
# gearman workers.
|
||||
self.merger_worker.shutdown()
|
||||
self.executor_worker.shutdown()
|
||||
self.merger_gearworker.stop()
|
||||
self.executor_gearworker.stop()
|
||||
|
||||
if self.statsd:
|
||||
base_key = 'zuul.executor.{hostname}'
|
||||
|
@ -2558,8 +2555,8 @@ class ExecutorServer(object):
|
|||
self.governor_thread.join()
|
||||
for update_thread in self.update_threads:
|
||||
update_thread.join()
|
||||
self.merger_thread.join()
|
||||
self.executor_thread.join()
|
||||
self.merger_gearworker.join()
|
||||
self.executor_gearworker.join()
|
||||
|
||||
def pause(self):
|
||||
self.pause_sensor.pause = True
|
||||
|
@ -2661,82 +2658,11 @@ class ExecutorServer(object):
|
|||
task = self.update_queue.put(task)
|
||||
return task
|
||||
|
||||
def run_merger(self):
|
||||
self.log.debug("Starting merger listener")
|
||||
while self._running:
|
||||
try:
|
||||
job = self.merger_worker.getJob()
|
||||
try:
|
||||
self.mergerJobDispatch(job)
|
||||
except Exception:
|
||||
self.log.exception("Exception while running job")
|
||||
job.sendWorkException(
|
||||
traceback.format_exc().encode('utf8'))
|
||||
except gear.InterruptedError:
|
||||
pass
|
||||
except Exception:
|
||||
self.log.exception("Exception while getting job")
|
||||
|
||||
def mergerJobDispatch(self, job):
|
||||
if job.name == 'merger:cat':
|
||||
self.log.debug("Got cat job: %s" % job.unique)
|
||||
self.cat(job)
|
||||
elif job.name == 'merger:merge':
|
||||
self.log.debug("Got merge job: %s" % job.unique)
|
||||
self.merge(job)
|
||||
elif job.name == 'merger:refstate':
|
||||
self.log.debug("Got refstate job: %s" % job.unique)
|
||||
self.refstate(job)
|
||||
elif job.name == 'merger:fileschanges':
|
||||
self.log.debug("Got fileschanges job: %s" % job.unique)
|
||||
self.fileschanges(job)
|
||||
else:
|
||||
self.log.error("Unable to handle job %s" % job.name)
|
||||
job.sendWorkFail()
|
||||
|
||||
def run_executor(self):
|
||||
self.log.debug("Starting executor listener")
|
||||
while self._running:
|
||||
try:
|
||||
job = self.executor_worker.getJob()
|
||||
try:
|
||||
self.executorJobDispatch(job)
|
||||
except Exception:
|
||||
self.log.exception("Exception while running job")
|
||||
job.sendWorkException(
|
||||
traceback.format_exc().encode('utf8'))
|
||||
except gear.InterruptedError:
|
||||
pass
|
||||
except Exception:
|
||||
self.log.exception("Exception while getting job")
|
||||
|
||||
def executorJobDispatch(self, job):
|
||||
with self.run_lock:
|
||||
if not self._running:
|
||||
job.sendWorkFail()
|
||||
return
|
||||
|
||||
args = json.loads(job.arguments)
|
||||
zuul_event_id = args.get('zuul_event_id')
|
||||
log = get_annotated_logger(self.log, zuul_event_id)
|
||||
|
||||
function_name = 'executor:execute'
|
||||
if self.zone:
|
||||
function_name += ':%s' % self.zone
|
||||
if job.name == (function_name):
|
||||
log.debug("Got %s job: %s", function_name, job.unique)
|
||||
self.executeJob(job)
|
||||
elif job.name.startswith('executor:resume'):
|
||||
log.debug("Got resume job: %s", job.unique)
|
||||
self.resumeJob(job)
|
||||
elif job.name.startswith('executor:stop'):
|
||||
log.debug("Got stop job: %s", job.unique)
|
||||
self.stopJob(job)
|
||||
else:
|
||||
log.error("Unable to handle job %s", job.name)
|
||||
job.sendWorkFail()
|
||||
|
||||
def executeJob(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
zuul_event_id = args.get('zuul_event_id')
|
||||
log = get_annotated_logger(self.log, zuul_event_id)
|
||||
log.debug("Got %s job: %s", job.name, job.unique)
|
||||
if self.statsd:
|
||||
base_key = 'zuul.executor.{hostname}'
|
||||
self.statsd.incr(base_key + '.builds')
|
||||
|
@ -2841,6 +2767,7 @@ class ExecutorServer(object):
|
|||
log.exception("Exception sending stop command to worker:")
|
||||
|
||||
def cat(self, job):
|
||||
self.log.debug("Got cat job: %s" % job.unique)
|
||||
args = json.loads(job.arguments)
|
||||
task = self.update(args['connection'], args['project'])
|
||||
task.wait()
|
||||
|
@ -2855,6 +2782,7 @@ class ExecutorServer(object):
|
|||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def fileschanges(self, job):
|
||||
self.log.debug("Got fileschanges job: %s" % job.unique)
|
||||
args = json.loads(job.arguments)
|
||||
zuul_event_id = args.get('zuul_event_id')
|
||||
task = self.update(args['connection'], args['project'])
|
||||
|
@ -2872,6 +2800,7 @@ class ExecutorServer(object):
|
|||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def refstate(self, job):
|
||||
self.log.debug("Got refstate job: %s" % job.unique)
|
||||
args = json.loads(job.arguments)
|
||||
zuul_event_id = args.get('zuul_event_id')
|
||||
success, repo_state = self.merger.getRepoState(
|
||||
|
@ -2883,6 +2812,7 @@ class ExecutorServer(object):
|
|||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def merge(self, job):
|
||||
self.log.debug("Got merge job: %s" % job.unique)
|
||||
args = json.loads(job.arguments)
|
||||
zuul_event_id = args.get('zuul_event_id')
|
||||
ret = self.merger.mergeChanges(args['items'], args.get('files'),
|
||||
|
|
|
@ -0,0 +1,87 @@
|
|||
# Copyright 2019 BMW Group
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import logging
|
||||
import threading
|
||||
import traceback
|
||||
|
||||
import gear
|
||||
|
||||
from zuul.lib.config import get_default
|
||||
|
||||
|
||||
class ZuulGearWorker:
|
||||
|
||||
def __init__(self, name, logger_name, thread_name, config, jobs,
|
||||
worker_class=gear.TextWorker):
|
||||
self.log = logging.getLogger(logger_name)
|
||||
|
||||
self._running = True
|
||||
self.name = name
|
||||
self.worker_class = worker_class
|
||||
|
||||
self.server = config.get('gearman', 'server')
|
||||
self.port = get_default(config, 'gearman', 'port', 4730)
|
||||
self.ssl_key = get_default(config, 'gearman', 'ssl_key')
|
||||
self.ssl_cert = get_default(config, 'gearman', 'ssl_cert')
|
||||
self.ssl_ca = get_default(config, 'gearman', 'ssl_ca')
|
||||
|
||||
self.gearman = None
|
||||
self.jobs = jobs
|
||||
|
||||
self.thread = threading.Thread(target=self._run, name=thread_name)
|
||||
self.thread.daemon = True
|
||||
|
||||
def start(self):
|
||||
self.gearman = self.worker_class(self.name)
|
||||
self.log.debug('Connect to gearman')
|
||||
self.gearman.addServer(self.server, self.port, self.ssl_key,
|
||||
self.ssl_cert, self.ssl_ca,
|
||||
keepalive=True, tcp_keepidle=60,
|
||||
tcp_keepintvl=30, tcp_keepcnt=5)
|
||||
self.log.debug('Waiting for server')
|
||||
self.gearman.waitForServer()
|
||||
|
||||
self.log.debug('Registering')
|
||||
for job in self.jobs:
|
||||
self.gearman.registerFunction(job)
|
||||
self.thread.start()
|
||||
|
||||
def stop(self):
|
||||
self._running = False
|
||||
self.gearman.stopWaitingForJobs()
|
||||
self.thread.join()
|
||||
self.gearman.shutdown()
|
||||
|
||||
def join(self):
|
||||
self.thread.join()
|
||||
|
||||
def _run(self):
|
||||
while self._running:
|
||||
try:
|
||||
job = self.gearman.getJob()
|
||||
try:
|
||||
if job.name not in self.jobs:
|
||||
self.log.exception("Exception while running job")
|
||||
job.sendWorkException(
|
||||
traceback.format_exc().encode('utf8'))
|
||||
continue
|
||||
self.jobs[job.name](job)
|
||||
except Exception:
|
||||
self.log.exception('Exception while running job')
|
||||
job.sendWorkException(
|
||||
traceback.format_exc().encode('utf-8'))
|
||||
except gear.InterruptedError:
|
||||
pass
|
||||
except Exception:
|
||||
self.log.exception('Exception while getting job')
|
|
@ -15,12 +15,10 @@
|
|||
import json
|
||||
import logging
|
||||
import threading
|
||||
import traceback
|
||||
|
||||
import gear
|
||||
|
||||
from zuul.lib import commandsocket
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.lib.gearworker import ZuulGearWorker
|
||||
from zuul.merger import merger
|
||||
|
||||
|
||||
|
@ -52,22 +50,24 @@ class MergeServer(object):
|
|||
'/var/lib/zuul/merger.socket')
|
||||
self.command_socket = commandsocket.CommandSocket(command_socket)
|
||||
|
||||
self.jobs = {
|
||||
'merger:merge': self.merge,
|
||||
'merger:cat': self.cat,
|
||||
'merger:refstate': self.refstate,
|
||||
'merger:fileschanges': self.fileschanges,
|
||||
}
|
||||
self.gearworker = ZuulGearWorker(
|
||||
'Zuul Merger',
|
||||
'zuul.MergeServer',
|
||||
'merger-gearman-worker',
|
||||
self.config,
|
||||
self.jobs)
|
||||
|
||||
def start(self):
|
||||
self._running = True
|
||||
self.log.debug("Starting worker")
|
||||
self.gearworker.start()
|
||||
|
||||
self._command_running = True
|
||||
server = self.config.get('gearman', 'server')
|
||||
port = get_default(self.config, 'gearman', 'port', 4730)
|
||||
ssl_key = get_default(self.config, 'gearman', 'ssl_key')
|
||||
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
|
||||
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
self.worker = gear.TextWorker('Zuul Merger')
|
||||
self.worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
|
||||
keepalive=True, tcp_keepidle=60,
|
||||
tcp_keepintvl=30, tcp_keepcnt=5)
|
||||
self.log.debug("Waiting for server")
|
||||
self.worker.waitForServer()
|
||||
self.log.debug("Registering")
|
||||
self.register()
|
||||
self.log.debug("Starting command processor")
|
||||
self.command_socket.start()
|
||||
self.command_thread = threading.Thread(
|
||||
|
@ -75,27 +75,15 @@ class MergeServer(object):
|
|||
self.command_thread.daemon = True
|
||||
self.command_thread.start()
|
||||
|
||||
self.log.debug("Starting worker")
|
||||
self.thread = threading.Thread(target=self.run)
|
||||
self.thread.daemon = True
|
||||
self.thread.start()
|
||||
|
||||
def register(self):
|
||||
self.worker.registerFunction("merger:merge")
|
||||
self.worker.registerFunction("merger:cat")
|
||||
self.worker.registerFunction("merger:refstate")
|
||||
self.worker.registerFunction("merger:fileschanges")
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping")
|
||||
self._running = False
|
||||
self.gearworker.stop()
|
||||
self._command_running = False
|
||||
self.worker.shutdown()
|
||||
self.command_socket.stop()
|
||||
self.log.debug("Stopped")
|
||||
|
||||
def join(self):
|
||||
self.thread.join()
|
||||
self.gearworker.join()
|
||||
|
||||
def runCommand(self):
|
||||
while self._command_running:
|
||||
|
@ -106,36 +94,8 @@ class MergeServer(object):
|
|||
except Exception:
|
||||
self.log.exception("Exception while processing command")
|
||||
|
||||
def run(self):
|
||||
self.log.debug("Starting merge listener")
|
||||
while self._running:
|
||||
try:
|
||||
job = self.worker.getJob()
|
||||
try:
|
||||
if job.name == 'merger:merge':
|
||||
self.log.debug("Got merge job: %s" % job.unique)
|
||||
self.merge(job)
|
||||
elif job.name == 'merger:cat':
|
||||
self.log.debug("Got cat job: %s" % job.unique)
|
||||
self.cat(job)
|
||||
elif job.name == 'merger:refstate':
|
||||
self.log.debug("Got refstate job: %s" % job.unique)
|
||||
self.refstate(job)
|
||||
elif job.name == 'merger:fileschanges':
|
||||
self.log.debug("Got fileschanges job: %s" % job.unique)
|
||||
self.fileschanges(job)
|
||||
else:
|
||||
self.log.error("Unable to handle job %s" % job.name)
|
||||
job.sendWorkFail()
|
||||
except Exception:
|
||||
self.log.exception("Exception while running job")
|
||||
job.sendWorkException(traceback.format_exc())
|
||||
except gear.InterruptedError:
|
||||
return
|
||||
except Exception:
|
||||
self.log.exception("Exception while getting job")
|
||||
|
||||
def merge(self, job):
|
||||
self.log.debug("Got merge job: %s" % job.unique)
|
||||
args = json.loads(job.arguments)
|
||||
zuul_event_id = args.get('zuul_event_id')
|
||||
ret = self.merger.mergeChanges(
|
||||
|
@ -153,6 +113,7 @@ class MergeServer(object):
|
|||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def refstate(self, job):
|
||||
self.log.debug("Got refstate job: %s" % job.unique)
|
||||
args = json.loads(job.arguments)
|
||||
zuul_event_id = args.get('zuul_event_id')
|
||||
success, repo_state = self.merger.getRepoState(
|
||||
|
@ -163,6 +124,7 @@ class MergeServer(object):
|
|||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def cat(self, job):
|
||||
self.log.debug("Got cat job: %s" % job.unique)
|
||||
args = json.loads(job.arguments)
|
||||
self.merger.updateRepo(args['connection'], args['project'])
|
||||
files = self.merger.getFiles(args['connection'], args['project'],
|
||||
|
@ -173,6 +135,7 @@ class MergeServer(object):
|
|||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def fileschanges(self, job):
|
||||
self.log.debug("Got fileschanges job: %s" % job.unique)
|
||||
args = json.loads(job.arguments)
|
||||
zuul_event_id = args.get('zuul_event_id')
|
||||
self.merger.updateRepo(args['connection'], args['project'],
|
||||
|
|
|
@ -15,15 +15,11 @@
|
|||
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import traceback
|
||||
|
||||
import gear
|
||||
|
||||
from zuul import model
|
||||
from zuul.connection import BaseConnection
|
||||
from zuul.lib import encryption
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.lib.gearworker import ZuulGearWorker
|
||||
from zuul.lib.jsonutil import ZuulJSONEncoder
|
||||
|
||||
|
||||
|
@ -34,81 +30,50 @@ class RPCListener(object):
|
|||
self.config = config
|
||||
self.sched = sched
|
||||
|
||||
def start(self):
|
||||
self._running = True
|
||||
server = self.config.get('gearman', 'server')
|
||||
port = get_default(self.config, 'gearman', 'port', 4730)
|
||||
ssl_key = get_default(self.config, 'gearman', 'ssl_key')
|
||||
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
|
||||
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
self.worker = gear.TextWorker('Zuul RPC Listener')
|
||||
self.worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
|
||||
keepalive=True, tcp_keepidle=60,
|
||||
tcp_keepintvl=30, tcp_keepcnt=5)
|
||||
self.log.debug("Waiting for server")
|
||||
self.worker.waitForServer()
|
||||
self.log.debug("Registering")
|
||||
self.register()
|
||||
self.thread = threading.Thread(target=self.run)
|
||||
self.thread.daemon = True
|
||||
self.thread.start()
|
||||
self.jobs = {}
|
||||
functions = [
|
||||
'autohold',
|
||||
'autohold_list',
|
||||
'allowed_labels_get',
|
||||
'dequeue',
|
||||
'enqueue',
|
||||
'enqueue_ref',
|
||||
'promote',
|
||||
'get_running_jobs',
|
||||
'get_job_log_stream_address',
|
||||
'tenant_list',
|
||||
'tenant_sql_connection',
|
||||
'status_get',
|
||||
'job_get',
|
||||
'job_list',
|
||||
'project_get',
|
||||
'project_list',
|
||||
'project_freeze_jobs',
|
||||
'pipeline_list',
|
||||
'key_get',
|
||||
'config_errors_list',
|
||||
'connection_list',
|
||||
]
|
||||
for func in functions:
|
||||
f = getattr(self, 'handle_%s' % func)
|
||||
self.jobs['zuul:%s' % func] = f
|
||||
self.gearworker = ZuulGearWorker(
|
||||
'Zuul RPC Listener',
|
||||
'zuul.RPCListener',
|
||||
'zuul-rpc-gearman-worker',
|
||||
self.config,
|
||||
self.jobs)
|
||||
|
||||
def register(self):
|
||||
self.worker.registerFunction("zuul:autohold")
|
||||
self.worker.registerFunction("zuul:autohold_list")
|
||||
self.worker.registerFunction("zuul:allowed_labels_get")
|
||||
self.worker.registerFunction("zuul:dequeue")
|
||||
self.worker.registerFunction("zuul:enqueue")
|
||||
self.worker.registerFunction("zuul:enqueue_ref")
|
||||
self.worker.registerFunction("zuul:promote")
|
||||
self.worker.registerFunction("zuul:get_running_jobs")
|
||||
self.worker.registerFunction("zuul:get_job_log_stream_address")
|
||||
self.worker.registerFunction("zuul:tenant_list")
|
||||
self.worker.registerFunction("zuul:tenant_sql_connection")
|
||||
self.worker.registerFunction("zuul:status_get")
|
||||
self.worker.registerFunction("zuul:job_get")
|
||||
self.worker.registerFunction("zuul:job_list")
|
||||
self.worker.registerFunction("zuul:project_get")
|
||||
self.worker.registerFunction("zuul:project_list")
|
||||
self.worker.registerFunction("zuul:project_freeze_jobs")
|
||||
self.worker.registerFunction("zuul:pipeline_list")
|
||||
self.worker.registerFunction("zuul:key_get")
|
||||
self.worker.registerFunction("zuul:config_errors_list")
|
||||
self.worker.registerFunction("zuul:connection_list")
|
||||
def start(self):
|
||||
self.gearworker.start()
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping")
|
||||
self._running = False
|
||||
self.worker.shutdown()
|
||||
self.gearworker.stop()
|
||||
self.log.debug("Stopped")
|
||||
|
||||
def join(self):
|
||||
self.thread.join()
|
||||
|
||||
def run(self):
|
||||
self.log.debug("Starting RPC listener")
|
||||
while self._running:
|
||||
try:
|
||||
job = self.worker.getJob()
|
||||
self.log.debug("Received job %s" % job.name)
|
||||
z, jobname = job.name.split(':')
|
||||
attrname = 'handle_' + jobname
|
||||
if hasattr(self, attrname):
|
||||
f = getattr(self, attrname)
|
||||
if callable(f):
|
||||
try:
|
||||
f(job)
|
||||
except Exception:
|
||||
self.log.exception("Exception while running job")
|
||||
job.sendWorkException(traceback.format_exc())
|
||||
else:
|
||||
job.sendWorkFail()
|
||||
else:
|
||||
job.sendWorkFail()
|
||||
except gear.InterruptedError:
|
||||
return
|
||||
except Exception:
|
||||
self.log.exception("Exception while getting job")
|
||||
self.gearworker.join()
|
||||
|
||||
def handle_dequeue(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
|
|
|
@ -405,7 +405,7 @@ class Scheduler(threading.Thread):
|
|||
def _runStats(self):
|
||||
if not self.statsd:
|
||||
return
|
||||
functions = getGearmanFunctions(self.rpc.worker)
|
||||
functions = getGearmanFunctions(self.rpc.gearworker.gearman)
|
||||
executors_accepting = 0
|
||||
executors_online = 0
|
||||
execute_queue = 0
|
||||
|
|
Loading…
Reference in New Issue