Switch to Zookeeper backed trigger event queues

Trigger events will now be dispatched via Zookeeper. The event queues
are namespaced by tenant since the event processing will later require a
tenant lock in a multi scheduler deployment.

Gitlab events hold their labels as a non-serializable set attribute; this
change adjusts them to be held in a list (but set operations are still
used for de-duplication).

Change-Id: Ie54fc16488ab8cbc15f97d003f36c12b8a648ed4
This commit is contained in:
Simon Westphahl 2020-11-03 15:24:51 +01:00
parent be8d216629
commit 2e6cfff818
12 changed files with 224 additions and 228 deletions

View File

@ -4032,7 +4032,6 @@ class SchedulerTestApp:
self.event_queues = [
self.sched.result_event_queue,
self.sched.trigger_event_queue,
self.sched.management_event_queue
]
@ -4041,7 +4040,6 @@ class SchedulerTestApp:
self.sched.executor.gearman.waitForServer()
self.sched.reconfigure(
self.config, validate_tenants=validate_tenants)
self.sched.wakeUp()
def fullReconfigure(self):
try:
@ -4916,6 +4914,18 @@ class ZuulTestCase(BaseTestCase):
for event_queue in self.additional_event_queues:
event_queue.join()
def __areZooKeeperEventQueuesEmpty(self, matcher=None) -> bool:
for sched in map(lambda app: app.sched, self.scheds.filter(matcher)):
if sched.trigger_events.hasEvents():
return False
for tenant in sched.abide.tenants.values():
for pipeline_name in tenant.layout.pipelines:
if sched.pipeline_trigger_events[tenant.name][
pipeline_name
].hasEvents():
return False
return True
def waitUntilSettled(self, msg="", matcher=None) -> None:
self.log.debug("Waiting until settled... (%s)", msg)
start = time.time()
@ -4928,6 +4938,10 @@ class ZuulTestCase(BaseTestCase):
for event_queue in self.__event_queues(matcher):
self.log.error(" %s: %s" %
(event_queue, event_queue.empty()))
self.log.error(
"All ZK event queues empty: %s",
self.__areZooKeeperEventQueuesEmpty(matcher),
)
self.log.error("All builds waiting: %s" %
(self.__areAllBuildsWaiting(matcher),))
self.log.error("All merge jobs waiting: %s" %
@ -4953,7 +4967,8 @@ class ZuulTestCase(BaseTestCase):
self.__eventQueuesJoin(matcher)
self.scheds.execute(
lambda app: app.sched.run_handler_lock.acquire())
if (self.__areAllMergeJobsWaiting(matcher) and
if (self.__areZooKeeperEventQueuesEmpty(matcher) and
self.__areAllMergeJobsWaiting(matcher) and
self.__haveAllBuildsReported(matcher) and
self.__areAllBuildsWaiting(matcher) and
self.__areAllNodeRequestsComplete(matcher) and

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from tests.base import ZuulTestCase, ZuulGithubAppTestCase
from zuul.driver.zuul.zuulmodel import ZuulTriggerEvent
@ -63,29 +65,24 @@ class TestZuulTriggerParentChangeEnqueued(ZuulTestCase):
# Now directly enqueue a change into the check. As no pipeline reacts
# on parent-change-enqueued from pipeline check no
# parent-change-enqueued event is expected.
zuultrigger_event_count = 0
_add_trigger_event = self.scheds.first.sched.addTriggerEvent
def counting_put(*args, **kwargs):
nonlocal zuultrigger_event_count
if isinstance(args[0], ZuulTriggerEvent):
zuultrigger_event_count += 1
self.scheds.first.sched.trigger_event_queue\
.put_orig(*args, **kwargs)
def addTriggerEvent(driver_name, event):
self.assertNotIsInstance(event, ZuulTriggerEvent)
_add_trigger_event(driver_name, event)
self.scheds.first.sched.trigger_event_queue.put_orig = \
self.scheds.first.sched.trigger_event_queue.put
self.scheds.first.sched.trigger_event_queue.put = counting_put
with mock.patch.object(
self.scheds.first.sched, "addTriggerEvent", addTriggerEvent
):
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
C.addApproval('Verified', -1)
D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D')
D.addApproval('Verified', -1)
D.setDependsOn(C, 1)
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
C.addApproval('Verified', -1)
D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D')
D.addApproval('Verified', -1)
D.setDependsOn(C, 1)
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(self.history), 4)
self.assertEqual(zuultrigger_event_count, 0)
self.waitUntilSettled()
self.assertEqual(len(self.history), 4)
class TestZuulTriggerParentChangeEnqueuedGithub(ZuulGithubAppTestCase):
@ -145,31 +142,31 @@ class TestZuulTriggerParentChangeEnqueuedGithub(ZuulGithubAppTestCase):
# on parent-change-enqueued from pipeline check no
# parent-change-enqueued event is expected.
self.waitUntilSettled()
zuultrigger_event_count = 0
def counting_put(*args, **kwargs):
nonlocal zuultrigger_event_count
if isinstance(args[0], ZuulTriggerEvent):
zuultrigger_event_count += 1
self.scheds.first.sched.trigger_event_queue\
.put_orig(*args, **kwargs)
_add_trigger_event = self.scheds.first.sched.addTriggerEvent
self.scheds.first.sched.trigger_event_queue.put_orig = \
self.scheds.first.sched.trigger_event_queue.put
self.scheds.first.sched.trigger_event_queue.put = counting_put
def addTriggerEvent(driver_name, event):
self.assertNotIsInstance(event, ZuulTriggerEvent)
_add_trigger_event(driver_name, event)
C = self.fake_github.openFakePullRequest('org/project', 'master', 'C')
C.addLabel('for-check') # should go to check
with mock.patch.object(
self.scheds.first.sched, "addTriggerEvent", addTriggerEvent
):
C = self.fake_github.openFakePullRequest(
'org/project', 'master', 'C'
)
C.addLabel('for-check') # should go to check
msg = "Depends-On: https://github.com/org/project1/pull/%s" % C.number
D = self.fake_github.openFakePullRequest(
'org/project', 'master', 'D', body=msg)
D.addLabel('for-check') # should go to check
self.fake_github.emitEvent(C.getPullRequestOpenedEvent())
msg = "Depends-On: https://github.com/org/project1/pull/{}".format(
C.number
)
D = self.fake_github.openFakePullRequest(
'org/project', 'master', 'D', body=msg)
D.addLabel('for-check') # should go to check
self.fake_github.emitEvent(C.getPullRequestOpenedEvent())
self.waitUntilSettled()
self.assertEqual(len(self.history), 4)
self.assertEqual(zuultrigger_event_count, 0)
self.waitUntilSettled()
self.assertEqual(len(self.history), 4)
# After starting recording installation containing org2/project
# should not be contacted

View File

@ -71,7 +71,6 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
self.log.exception("Reconfiguration failed:")
def exit_handler(self, signum, frame):
self.sched.exit()
self.sched.join()
self.stop_gear_server()
sys.exit(0)
@ -146,7 +145,6 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
self.sched.start()
self.sched.reconfigure(self.config,
validate_tenants=self.args.validate_tenants)
self.sched.wakeUp()
except Exception:
self.log.exception("Error starting Zuul:")
# TODO(jeblair): If we had all threads marked as daemon,

View File

@ -258,7 +258,9 @@ class GerritEventConnector(threading.Thread):
self._getChange(event)
self.connection.logEvent(event)
self.connection.sched.addEvent(event)
self.connection.sched.addTriggerEvent(
self.connection.driver_name, event
)
def _getChange(self, event):
# Grab the change if we are managing the project or if it exists in the

View File

@ -151,7 +151,7 @@ class GitConnection(BaseConnection):
self.getChange(event)
self.logEvent(event)
# Pass the event to the scheduler
self.sched.addEvent(event)
self.sched.addTriggerEvent(self.driver_name, event)
def onLoad(self):
self.log.debug("Starting Git Watcher")

View File

@ -694,13 +694,7 @@ class GithubEventProcessor(object):
return event
def _get_sender(self, body):
login = body.get('sender').get('login')
if login:
# TODO(tobiash): it might be better to plumb in the installation id
project = body.get('repository', {}).get('full_name')
user = self.connection.getUser(login, project)
self.log.debug("Got user %s", user)
return user
return body.get('sender').get('login')
class GithubEventConnector:
@ -756,9 +750,15 @@ class GithubEventConnector:
if future is None:
return
event = future.result()
if event:
self.connection.logEvent(event)
self.connection.sched.addEvent(event)
if not event:
return
self.connection.logEvent(event)
if isinstance(event, DequeueEvent):
self.connection.sched.addManagementEvent(event)
else:
self.connection.sched.addTriggerEvent(
self.connection.driver_name, event
)
except Exception:
self.log.exception("Exception moving GitHub event:")
finally:

View File

@ -146,7 +146,7 @@ class GitlabEventConnector(threading.Thread):
label["title"] for
label in body["changes"]["labels"]["current"]]
new_labels = set(current_labels) - set(previous_labels)
event.labels = new_labels
event.labels = list(new_labels)
elif attrs['action'] in ('approved', 'unapproved'):
event.action = attrs['action']
else:
@ -237,7 +237,9 @@ class GitlabEventConnector(threading.Thread):
self.connection.checkBranchCache(event.project_name, event)
self.connection.logEvent(event)
self.connection.sched.addEvent(event)
self.connection.sched.addTriggerEvent(
self.connection.driver_name, event
)
def run(self):
while True:

View File

@ -88,7 +88,7 @@ class GitlabTriggerEvent(TriggerEvent):
r = [super(GitlabTriggerEvent, self)._repr()]
if self.action:
r.append("action:%s" % self.action)
r.append("project:%s" % self.canonical_project_name)
r.append("project:%s" % self.project_name)
if self.change_number:
r.append("mr:%s" % self.change_number)
if self.labels:

View File

@ -255,7 +255,9 @@ class PagureEventConnector(threading.Thread):
event=event)
event.project_hostname = self.connection.canonical_hostname
self.connection.logEvent(event)
self.connection.sched.addEvent(event)
self.connection.sched.addTriggerEvent(
self.connection.driver_name, event
)
def _event_base(self, body, pull_data_field='pullrequest'):
event = PagureTriggerEvent()

View File

@ -130,7 +130,7 @@ class TimerDriver(Driver, TriggerInterface):
event.timestamp = time.time()
log = get_annotated_logger(self.log, event)
log.debug("Adding event")
self.sched.addEvent(event)
self.sched.addTriggerEvent(self.name, event)
def stop(self):
if self.apsched:

View File

@ -96,7 +96,7 @@ class ZuulDriver(Driver, TriggerInterface):
event.ref = change.ref
event.zuul_event_id = str(uuid4().hex)
event.timestamp = time.time()
self.sched.addEvent(event)
self.sched.addTriggerEvent(self.name, event)
def _createParentChangeEnqueuedEvents(self, change, pipeline, tenant,
event):
@ -133,7 +133,7 @@ class ZuulDriver(Driver, TriggerInterface):
event.patch_number = change.patchset
event.ref = change.ref
event.zuul_event_id = str(uuid4().hex)
self.sched.addEvent(event)
self.sched.addTriggerEvent(self.name, event)
def getTrigger(self, connection_name, config=None):
return zuultrigger.ZuulTrigger(self, config)

View File

@ -18,7 +18,6 @@
import json
import logging
import os
import pickle
import re
import queue
import socket
@ -53,19 +52,22 @@ from zuul.model import (
EnqueueEvent,
FilesChangesCompletedEvent,
HoldRequest,
ManagementEvent,
MergeCompletedEvent,
NodesProvisionedEvent,
PromoteEvent,
ReconfigureEvent,
ResultEvent,
SmartReconfigureEvent,
Tenant,
TenantReconfigureEvent,
TriggerEvent,
)
from zuul.zk import ZooKeeperClient
from zuul.zk.components import ZooKeeperComponentRegistry
from zuul.zk.event_queues import (
GlobalEventWatcher,
GlobalTriggerEventQueue,
PipelineEventWatcher,
PipelineTriggerEventQueue,
)
from zuul.zk.nodepool import ZooKeeperNodepool
COMMANDS = ['full-reconfigure', 'smart-reconfigure', 'stop', 'repl', 'norepl']
@ -113,7 +115,6 @@ class Scheduler(threading.Thread):
'repl': self.start_repl,
'norepl': self.stop_repl,
}
self._hibernate = False
self._stopped = False
self._zuul_app = app
@ -143,9 +144,22 @@ class Scheduler(threading.Thread):
)
)
self.trigger_event_queue = queue.Queue()
self.result_event_queue = queue.Queue()
self.management_event_queue = zuul.lib.queue.MergedQueue()
self.global_watcher = GlobalEventWatcher(
self.zk_client, self.wake_event.set
)
self.pipeline_watcher = PipelineEventWatcher(
self.zk_client, self.wake_event.set
)
self.trigger_events = GlobalTriggerEventQueue(
self.zk_client, self.connections
)
self.pipeline_trigger_events = (
PipelineTriggerEventQueue.createRegistry(
self.zk_client, self.connections
)
)
self.abide = model.Abide()
self.unparsed_abide = model.UnparsedAbideConfig()
@ -298,38 +312,23 @@ class Scheduler(threading.Thread):
self.statsd.gauge('zuul.mergers.jobs_running', merge_running)
self.statsd.gauge('zuul.mergers.jobs_queued', merge_queue)
self.statsd.gauge('zuul.scheduler.eventqueues.trigger',
self.trigger_event_queue.qsize())
len(self.trigger_events))
self.statsd.gauge('zuul.scheduler.eventqueues.result',
self.result_event_queue.qsize())
self.statsd.gauge('zuul.scheduler.eventqueues.management',
self.management_event_queue.qsize())
def addEvent(self, event):
# Check the event type and put it in the corresponding queue
if isinstance(event, TriggerEvent):
return self._addTriggerEvent(event)
if isinstance(event, ManagementEvent):
return self._addManagementEvent(event)
if isinstance(event, ResultEvent):
return self._addResultEvent(event)
self.log.warning(
"Unable to found appropriate queue for event %s", event
)
def _addTriggerEvent(self, event):
def addTriggerEvent(self, driver_name, event):
event.arrived_at_scheduler_timestamp = time.time()
self.trigger_event_queue.put(event)
self.trigger_events.put(driver_name, event)
self.wake_event.set()
def _addManagementEvent(self, event):
def addManagementEvent(self, event):
self.management_event_queue.put(event)
self.wake_event.set()
event.wait()
def _addResultEvent(self, event):
def addResultEvent(self, event):
self.result_event_queue.put(event)
self.wake_event.set()
@ -570,17 +569,6 @@ class Scheduler(threading.Thread):
event.wait()
self.log.debug("Enqueue complete")
def exit(self):
self.log.debug("Prepare to exit")
self._hibernate = True
self.wake_event.set()
self.log.debug("Waiting for exit")
def _get_queue_pickle_file(self):
state_dir = get_default(self.config, 'scheduler', 'state_dir',
'/var/lib/zuul', expand_user=True)
return os.path.join(state_dir, 'queue.pickle')
def _get_time_database_dir(self):
state_dir = get_default(self.config, 'scheduler', 'state_dir',
'/var/lib/zuul', expand_user=True)
@ -602,60 +590,6 @@ class Scheduler(threading.Thread):
"current mode is %o" % (key_dir, mode))
return key_dir
def _save_queue(self) -> None:
# TODO JK: Remove when queues in ZK
pickle_file = self._get_queue_pickle_file()
events = []
while not self.trigger_event_queue.empty():
events.append(self.trigger_event_queue.get())
self.log.debug("Queue length is %s" % len(events))
if events:
self.log.debug("Saving queue")
pickle.dump(events, open(pickle_file, 'wb'))
def _load_queue(self) -> None:
# TODO JK: Remove when queues in ZK
pickle_file = self._get_queue_pickle_file()
if os.path.exists(pickle_file):
self.log.debug("Loading queue")
events = pickle.load(open(pickle_file, 'rb'))
self.log.debug("Queue length is %s" % len(events))
for event in events:
self.trigger_event_queue.put(event)
else:
self.log.debug("No queue file found")
def _delete_queue(self) -> None:
# TODO JK: Remove when queues in ZK
pickle_file = self._get_queue_pickle_file()
if os.path.exists(pickle_file):
self.log.debug("Deleting saved queue")
os.unlink(pickle_file)
def wakeUp(self) -> None:
"""
Wakes up scheduler by loading pickled queue.
TODO JK: Remove when queues in ZK
"""
try:
self._load_queue()
except Exception:
self.log.exception("Unable to load queue")
try:
self._delete_queue()
except Exception:
self.log.exception("Unable to delete saved queue")
self.log.debug("Resuming queue processing")
self.wake_event.set()
def _doHibernate(self) -> None:
# TODO JK: Remove when queues in ZK
if self._hibernate:
self.log.debug("Exiting")
self._save_queue()
os._exit(0)
def _checkTenantSourceConf(self, config):
tenant_config = None
script = False
@ -1109,13 +1043,11 @@ class Scheduler(threading.Thread):
not self._stopped):
self.process_result_queue()
if not self._hibernate:
while (not self.trigger_event_queue.empty() and
not self._stopped):
self.process_event_queue()
if not self._stopped:
self.process_global_trigger_queue()
if self._hibernate and self._areAllBuildsComplete():
self._doHibernate()
if not self._stopped:
self.process_trigger_queue()
for tenant in self.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
@ -1154,72 +1086,126 @@ class Scheduler(threading.Thread):
"End maintain connection cache for: %s" % connection)
self.log.debug("Connection cache size: %s" % len(relevant))
def process_event_queue(self):
self.log.debug("Fetching trigger event")
event = self.trigger_event_queue.get()
log = get_annotated_logger(self.log, event.zuul_event_id)
log.debug("Processing trigger event %s" % event)
def process_global_trigger_queue(self):
for event in self.trigger_events:
log = get_annotated_logger(
self.log, event.zuul_event_id
)
log.debug("Forwarding trigger event %s", event)
try:
for tenant in self.abide.tenants.values():
self._forward_trigger_event(event, tenant)
finally:
self.trigger_events.ack(event)
def _forward_trigger_event(self, event, tenant):
log = get_annotated_logger(
self.log, event.zuul_event_id
)
_, project = tenant.getProject(
event.canonical_project_name
)
if project is None:
return
try:
full_project_name = ('/'.join([event.project_hostname,
event.project_name]))
for tenant in self.abide.tenants.values():
(trusted, project) = tenant.getProject(full_project_name)
if project is None:
continue
try:
change = project.source.getChange(event)
except exceptions.ChangeNotFound as e:
log.debug("Unable to get change %s from source %s",
e.change, project.source)
continue
reconfigure_tenant = False
if ((event.branch_updated and
hasattr(change, 'files') and
change.updatesConfig(tenant)) or
(event.branch_deleted and
self.abide.hasUnparsedBranchCache(project.canonical_name,
event.branch))):
reconfigure_tenant = True
change = project.source.getChange(event)
except exceptions.ChangeNotFound as e:
log.debug("Unable to get change %s from source %s",
e.change, project.source)
return
# The branch_created attribute is also true when a tag is
# created. Since we load config only from branches only trigger
# a tenant reconfiguration if the branch is set as well.
if event.branch_created and event.branch:
reconfigure_tenant = True
reconfigure_tenant = False
if ((event.branch_updated and
hasattr(change, 'files') and
change.updatesConfig(tenant)) or
(event.branch_deleted and
self.abide.hasUnparsedBranchCache(project.canonical_name,
event.branch))):
reconfigure_tenant = True
# If the driver knows the branch but we don't have a config, we
# also need to reconfigure. This happens if a GitHub branch
# was just configured as protected without a push in between.
if (event.branch in project.source.getProjectBranches(
project, tenant)
and not self.abide.hasUnparsedBranchCache(
project.canonical_name, event.branch)):
reconfigure_tenant = True
# The branch_created attribute is also true when a tag is
# created. Since we load config only from branches only trigger
# a tenant reconfiguration if the branch is set as well.
if event.branch_created and event.branch:
reconfigure_tenant = True
# If the branch is unprotected and unprotected branches
# are excluded from the tenant for that project skip reconfig.
if (reconfigure_tenant and not
event.branch_protected and
tenant.getExcludeUnprotectedBranches(project)):
# If the driver knows the branch but we don't have a config, we
# also need to reconfigure. This happens if a GitHub branch
# was just configured as protected without a push in between.
if (event.branch in project.source.getProjectBranches(
project, tenant)
and not self.abide.hasUnparsedBranchCache(
project.canonical_name, event.branch)):
reconfigure_tenant = True
reconfigure_tenant = False
# If the branch is unprotected and unprotected branches
# are excluded from the tenant for that project skip reconfig.
if (reconfigure_tenant and not
event.branch_protected and
tenant.getExcludeUnprotectedBranches(project)):
if reconfigure_tenant:
# The change that just landed updates the config
# or a branch was just created or deleted. Clear
# out cached data for this project and perform a
# reconfiguration.
self.reconfigureTenant(tenant, change.project, event)
for pipeline in tenant.layout.pipelines.values():
if event.isPatchsetCreated():
pipeline.manager.removeOldVersionsOfChange(
change, event)
elif event.isChangeAbandoned():
pipeline.manager.removeAbandonedChange(change, event)
if pipeline.manager.eventMatches(event, change):
pipeline.manager.addChange(change, event)
finally:
self.trigger_event_queue.task_done()
reconfigure_tenant = False
if reconfigure_tenant:
# The change that just landed updates the config
# or a branch was just created or deleted. Clear
# out cached data for this project and perform a
# reconfiguration.
self.reconfigureTenant(tenant, change.project, event)
for pipeline in tenant.layout.pipelines.values():
if (
pipeline.manager.eventMatches(event, change)
or pipeline.manager.isChangeAlreadyInPipeline(change)
or pipeline.manager.findOldVersionOfChangeAlreadyInQueue(
change
)
):
self.pipeline_trigger_events[tenant.name][
pipeline.name
].put(event.driver_name, event)
def process_trigger_queue(self):
for tenant in self.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
for event in self.pipeline_trigger_events[tenant.name][
pipeline.name
]:
if self._stopped:
return
log = get_annotated_logger(
self.log, event.zuul_event_id
)
log.debug("Processing trigger event %s", event)
try:
self._process_trigger_event(tenant, pipeline, event)
finally:
self.pipeline_trigger_events[tenant.name][
pipeline.name
].ack(event)
def _process_trigger_event(self, tenant, pipeline, event):
log = get_annotated_logger(
self.log, event.zuul_event_id
)
trusted, project = tenant.getProject(event.canonical_project_name)
if project is None:
return
try:
change = project.source.getChange(event)
except exceptions.ChangeNotFound as e:
log.debug("Unable to get change %s from source %s",
e.change, project.source)
return
if event.isPatchsetCreated():
pipeline.manager.removeOldVersionsOfChange(
change, event)
elif event.isChangeAbandoned():
pipeline.manager.removeAbandonedChange(change, event)
if pipeline.manager.eventMatches(event, change):
pipeline.manager.addChange(change, event)
def process_management_queue(self):
self.log.debug("Fetching management event")
@ -1545,14 +1531,8 @@ class Scheduler(threading.Thread):
data['zuul_version'] = self.zuul_version
websocket_url = get_default(self.config, 'web', 'websocket_url', None)
if self._hibernate:
data['message'] = 'Queue only mode: preparing to hibernate,' \
' queue length: %s'\
% self.trigger_event_queue.qsize()
data['trigger_event_queue'] = {}
data['trigger_event_queue']['length'] = \
self.trigger_event_queue.qsize()
data['trigger_event_queue']['length'] = len(self.trigger_events)
data['result_event_queue'] = {}
data['result_event_queue']['length'] = \
self.result_event_queue.qsize()