Use rendezvous elections for scheduler event processing
This will encourage a multi-scheduler system to distribute the connection event processing threads among the different schedulers. Change-Id: Ifffc08b067873505ccb0bccc589229c1e744b3be
This commit is contained in:
@ -2481,7 +2481,7 @@ class ZuulTestCase(BaseTestCase):
|
||||
self.zk_client.client, f"/test/{uuid.uuid4().hex}")
|
||||
|
||||
self.connection_event_queues = DefaultKeyDict(
|
||||
lambda cn: ConnectionEventQueue(self.zk_client, cn)
|
||||
lambda cn: ConnectionEventQueue(self.zk_client, cn, None)
|
||||
)
|
||||
# requires zk client
|
||||
self.setupAllProjectKeys(self.config)
|
||||
|
@ -688,7 +688,8 @@ class TestConnectionEventQueue(EventQueueBaseTestCase):
|
||||
|
||||
def test_connection_events(self):
|
||||
# Test enqueue/dequeue of the connection event queue.
|
||||
queue = event_queues.ConnectionEventQueue(self.zk_client, "dummy")
|
||||
queue = event_queues.ConnectionEventQueue(
|
||||
self.zk_client, "dummy", None)
|
||||
|
||||
self.assertEqual(len(queue), 0)
|
||||
self.assertFalse(queue.hasEvents())
|
||||
@ -713,7 +714,8 @@ class TestConnectionEventQueue(EventQueueBaseTestCase):
|
||||
|
||||
def test_event_watch(self):
|
||||
# Test the registered function is called on new events.
|
||||
queue = event_queues.ConnectionEventQueue(self.zk_client, "dummy")
|
||||
queue = event_queues.ConnectionEventQueue(
|
||||
self.zk_client, "dummy", None)
|
||||
|
||||
event = threading.Event()
|
||||
queue.registerEventWatch(event.set)
|
||||
@ -725,7 +727,8 @@ class TestConnectionEventQueue(EventQueueBaseTestCase):
|
||||
break
|
||||
|
||||
def test_event_offset(self):
|
||||
queue = event_queues.ConnectionEventQueue(self.zk_client, "dummy")
|
||||
queue = event_queues.ConnectionEventQueue(
|
||||
self.zk_client, "dummy", None)
|
||||
self.assertEqual(len(queue), 0)
|
||||
self.assertFalse(queue.hasEvents())
|
||||
|
||||
|
@ -46,7 +46,7 @@ from zuul.zk.change_cache import (
|
||||
ConcurrentUpdateError,
|
||||
)
|
||||
from zuul.zk.config_cache import SystemConfigCache, UnparsedConfigCache
|
||||
from zuul.zk.election import RendezvousElection
|
||||
from zuul.zk.election import SessionAwareElection, RendezvousElection
|
||||
from zuul.zk.exceptions import LockException
|
||||
from zuul.zk.executor import ExecutorApi
|
||||
from zuul.zk.job_request_queue import JobRequestEvent
|
||||
@ -3152,3 +3152,81 @@ class TestRendezvousElection(ZooKeeperBaseTestCase):
|
||||
e2.running = False
|
||||
c2.state = c2.STOPPED
|
||||
t2.join()
|
||||
|
||||
def test_rendezvous_election_upgrade(self):
|
||||
# Test that we can upgrade from a "regular" election to a
|
||||
# rendezvous election.
|
||||
|
||||
# The numbers (e1, e2) mirror the test above, but "1" is the
|
||||
# regular election and "2" is the rendezvous.
|
||||
c2 = SchedulerComponent(self.zk_client, "bar")
|
||||
c2.register()
|
||||
|
||||
e1 = SessionAwareElection(
|
||||
self.zk_client.client,
|
||||
'/test/election/lock',
|
||||
)
|
||||
e2 = RendezvousElection(
|
||||
self.zk_client.client,
|
||||
'/test/election/lock',
|
||||
'scheduler',
|
||||
c2,
|
||||
)
|
||||
|
||||
self.assertEqual(0, len(e2._getScores()))
|
||||
self.assertEqual(None, e2._getWinner())
|
||||
self.assertFalse(e2.is_still_valid())
|
||||
|
||||
# c2 will be the winner since it's the only one running
|
||||
c2.state = c2.RUNNING
|
||||
time.sleep(1)
|
||||
self.assertEqual(1, len(e2._getScores()))
|
||||
self.assertEqual(c2.hostname, e2._getWinner().hostname)
|
||||
self.assertTrue(e2.is_still_valid())
|
||||
|
||||
event1 = threading.Event()
|
||||
event2 = threading.Event()
|
||||
e1._test_stop = False
|
||||
|
||||
def run1():
|
||||
event1.set()
|
||||
while not e1._test_stop:
|
||||
time.sleep(0.1)
|
||||
|
||||
def run2():
|
||||
event2.set()
|
||||
while e2.is_still_valid():
|
||||
time.sleep(0.1)
|
||||
|
||||
t1 = threading.Thread(
|
||||
target=e1.run,
|
||||
args=(run1,),
|
||||
)
|
||||
t1.start()
|
||||
# Wait for the thread to start
|
||||
event1.wait()
|
||||
|
||||
t2 = threading.Thread(
|
||||
target=e2.run,
|
||||
args=(run2,),
|
||||
)
|
||||
t2.start()
|
||||
|
||||
time.sleep(1)
|
||||
# Second component should still be waiting
|
||||
self.assertFalse(event2.is_set())
|
||||
|
||||
# Stop the thread
|
||||
self.log.debug("Stop c1")
|
||||
e1._test_stop = True
|
||||
# Wait for the thread to stop
|
||||
t1.join()
|
||||
|
||||
# Wait for the second election to win
|
||||
event2.wait()
|
||||
|
||||
self.log.debug("Stop c2")
|
||||
# Stop the second election by stopping the second component
|
||||
e2.running = False
|
||||
c2.state = c2.STOPPED
|
||||
t2.join()
|
||||
|
@ -604,6 +604,10 @@ class BaseThreadPoolEventConnector:
|
||||
def start(self):
|
||||
self._event_dispatcher.start()
|
||||
|
||||
def _shouldStop(self):
|
||||
return (self._stopped or
|
||||
not self.event_queue.election.is_still_valid())
|
||||
|
||||
def _onNewEvent(self):
|
||||
self._dispatcher_wake_event.set()
|
||||
# Stop the data watch in case the connector was stopped
|
||||
@ -639,7 +643,7 @@ class BaseThreadPoolEventConnector:
|
||||
# running; if we are stopping, then we need to continue
|
||||
# this loop until previously processed events are
|
||||
# completed but not start processing any new events.
|
||||
if not self._stopped:
|
||||
if not self._shouldStop():
|
||||
delay = self._dispatchEvents()
|
||||
|
||||
# Now process the futures from this or any previous
|
||||
@ -651,7 +655,7 @@ class BaseThreadPoolEventConnector:
|
||||
# new events (or stop altogether); otherwise we need to
|
||||
# continue processing futures.
|
||||
if not len(self._event_forward_queue):
|
||||
if self._stopped:
|
||||
if self._shouldStop():
|
||||
return
|
||||
self._dispatcher_wake_event.wait(delay or 10)
|
||||
else:
|
||||
@ -675,7 +679,7 @@ class BaseThreadPoolEventConnector:
|
||||
event_id_offset = None
|
||||
|
||||
for event in self.event_queue.iter(event_id_offset):
|
||||
if self._stopped:
|
||||
if self._shouldStop():
|
||||
break
|
||||
|
||||
processor = self._getEventProcessor(event)
|
||||
|
@ -540,7 +540,7 @@ class GerritEventConnector(BaseThreadPoolEventConnector):
|
||||
|
||||
delay = None
|
||||
for event in self.event_queue.iter(event_id_offset):
|
||||
if self._stopped:
|
||||
if self._shouldStop():
|
||||
break
|
||||
|
||||
self._peek_queue.append(event)
|
||||
@ -2084,8 +2084,12 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
||||
component_registry)
|
||||
|
||||
self.log.info("Creating Zookeeper event queue")
|
||||
if self.sched:
|
||||
component_info = self.sched.component_info
|
||||
else:
|
||||
component_info = None
|
||||
self.event_queue = ConnectionEventQueue(
|
||||
zk_client, self.connection_name)
|
||||
zk_client, self.connection_name, component_info)
|
||||
|
||||
# If the connection was not loaded by a scheduler, but by e.g.
|
||||
# zuul-web, we want to stop here.
|
||||
|
@ -1304,9 +1304,12 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
||||
self._branch_cache = BranchCache(zk_client, self, component_registry)
|
||||
|
||||
self.log.debug('Creating Zookeeper event queue')
|
||||
if self.sched:
|
||||
component_info = self.sched.component_info
|
||||
else:
|
||||
component_info = None
|
||||
self.event_queue = ConnectionEventQueue(
|
||||
zk_client, self.connection_name
|
||||
)
|
||||
zk_client, self.connection_name, component_info)
|
||||
|
||||
# If the connection was not loaded by a scheduler, but by e.g.
|
||||
# zuul-web, we want to stop here.
|
||||
@ -2554,7 +2557,8 @@ class GithubWebController(BaseWebController):
|
||||
self.zuul_web = zuul_web
|
||||
self.event_queue = ConnectionEventQueue(
|
||||
self.zuul_web.zk_client,
|
||||
self.connection.connection_name
|
||||
self.connection.connection_name,
|
||||
None
|
||||
)
|
||||
self.token = self.connection.connection_config.get('webhook_token')
|
||||
|
||||
|
@ -567,9 +567,12 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
||||
self._branch_cache = BranchCache(zk_client, self, component_registry)
|
||||
|
||||
self.log.info('Creating Zookeeper event queue')
|
||||
if self.sched:
|
||||
component_info = self.sched.component_info
|
||||
else:
|
||||
component_info = None
|
||||
self.event_queue = ConnectionEventQueue(
|
||||
zk_client, self.connection_name
|
||||
)
|
||||
zk_client, self.connection_name, component_info)
|
||||
|
||||
# If the connection was not loaded by a scheduler, but by e.g.
|
||||
# zuul-web, we want to stop here.
|
||||
@ -884,7 +887,8 @@ class GitlabWebController(BaseWebController):
|
||||
self.zuul_web = zuul_web
|
||||
self.event_queue = ConnectionEventQueue(
|
||||
self.zuul_web.zk_client,
|
||||
self.connection.connection_name
|
||||
self.connection.connection_name,
|
||||
None
|
||||
)
|
||||
|
||||
def _validate_token(self, headers):
|
||||
|
@ -514,9 +514,12 @@ class PagureConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
||||
self._branch_cache = BranchCache(zk_client, self, component_registry)
|
||||
|
||||
self.log.info('Creating Zookeeper event queue')
|
||||
if self.sched:
|
||||
component_info = self.sched.component_info
|
||||
else:
|
||||
component_info = None
|
||||
self.event_queue = ConnectionEventQueue(
|
||||
zk_client, self.connection_name
|
||||
)
|
||||
zk_client, self.connection_name, component_info)
|
||||
|
||||
# If the connection was not loaded by a scheduler, but by e.g.
|
||||
# zuul-web, we want to stop here.
|
||||
@ -849,7 +852,8 @@ class PagureWebController(BaseWebController):
|
||||
self.zuul_web = zuul_web
|
||||
self.event_queue = ConnectionEventQueue(
|
||||
self.zuul_web.zk_client,
|
||||
self.connection.connection_name
|
||||
self.connection.connection_name,
|
||||
None
|
||||
)
|
||||
|
||||
def _source_whitelisted(self, remote_ip, forwarded_ip):
|
||||
|
@ -60,16 +60,18 @@ class RendezvousElection:
|
||||
self.client = client
|
||||
self.path = path
|
||||
self.lock = SessionAwareLock(self.client, self.path)
|
||||
# Whether we are running at all
|
||||
self.running = True
|
||||
# Whether we are the current winner of the rendezvous hash
|
||||
self.is_winner = False
|
||||
self.is_winner = None
|
||||
self.component_change_event = threading.Event()
|
||||
self._checkWinner()
|
||||
COMPONENT_REGISTRY.registry.registerCallback(self._onComponentChange)
|
||||
|
||||
# Similar to the Election API
|
||||
def run(self, func, *args, **kw):
|
||||
# Allow the cancel method to stop this loop
|
||||
# Handle a restart
|
||||
self.running = True
|
||||
if self.is_winner is None:
|
||||
self._checkWinner()
|
||||
while self.running:
|
||||
if not self.is_winner:
|
||||
self.log.debug("Did not win election for %s", self.path)
|
||||
@ -89,7 +91,7 @@ class RendezvousElection:
|
||||
# Similar to the Election API
|
||||
def cancel(self):
|
||||
self.running = False
|
||||
self.is_winner = False
|
||||
self.is_winner = None
|
||||
self.lock.cancel()
|
||||
self.component_change_event.set()
|
||||
|
||||
|
@ -31,7 +31,7 @@ from zuul import model
|
||||
from zuul.lib.collections import DefaultKeyDict
|
||||
from zuul.lib.logutil import get_annotated_logger
|
||||
from zuul.zk import ZooKeeperSimpleBase, sharding
|
||||
from zuul.zk.election import SessionAwareElection
|
||||
from zuul.zk.election import SessionAwareElection, RendezvousElection
|
||||
|
||||
RESULT_EVENT_TYPE_MAP = {
|
||||
"BuildCompletedEvent": model.BuildCompletedEvent,
|
||||
@ -910,15 +910,17 @@ class ConnectionEventQueue(ZooKeeperEventQueue):
|
||||
|
||||
log = logging.getLogger("zuul.ConnectionEventQueue")
|
||||
|
||||
def __init__(self, client, connection_name):
|
||||
def __init__(self, client, connection_name, component_info):
|
||||
queue_root = "/".join((CONNECTION_ROOT, connection_name, "events"))
|
||||
super().__init__(client, queue_root)
|
||||
self.election_root = "/".join(
|
||||
(CONNECTION_ROOT, connection_name, "election")
|
||||
)
|
||||
self.kazoo_client.ensure_path(self.election_root)
|
||||
self.election = SessionAwareElection(
|
||||
self.kazoo_client, self.election_root)
|
||||
if component_info:
|
||||
self.election = RendezvousElection(
|
||||
self.kazoo_client, self.election_root,
|
||||
"scheduler", component_info)
|
||||
|
||||
def _eventWatch(self, callback, event_list):
|
||||
if event_list:
|
||||
|
Reference in New Issue
Block a user