diff --git a/tests/fixtures/config/two-tenant/git/common-config/playbooks/run.yaml b/tests/fixtures/config/two-tenant/git/common-config/playbooks/run.yaml new file mode 100644 index 0000000000..f679dceaef --- /dev/null +++ b/tests/fixtures/config/two-tenant/git/common-config/playbooks/run.yaml @@ -0,0 +1,2 @@ +- hosts: all + tasks: [] diff --git a/tests/fixtures/config/two-tenant/git/common-config/zuul.yaml b/tests/fixtures/config/two-tenant/git/common-config/zuul.yaml new file mode 100644 index 0000000000..10df157dfb --- /dev/null +++ b/tests/fixtures/config/two-tenant/git/common-config/zuul.yaml @@ -0,0 +1,24 @@ +- pipeline: + name: check + manager: independent + trigger: + gerrit: + - event: patchset-created + success: + gerrit: + Verified: 1 + failure: + gerrit: + Verified: -1 + +- job: + name: base + parent: null + +- job: + name: test + nodeset: + nodes: + - name: controller + label: ubuntu-trusty + run: playbooks/run.yaml diff --git a/tests/fixtures/config/two-tenant/git/org_project1/README b/tests/fixtures/config/two-tenant/git/org_project1/README new file mode 100644 index 0000000000..9daeafb986 --- /dev/null +++ b/tests/fixtures/config/two-tenant/git/org_project1/README @@ -0,0 +1 @@ +test diff --git a/tests/fixtures/config/two-tenant/git/org_project1/zuul.yaml b/tests/fixtures/config/two-tenant/git/org_project1/zuul.yaml new file mode 100644 index 0000000000..d817cedede --- /dev/null +++ b/tests/fixtures/config/two-tenant/git/org_project1/zuul.yaml @@ -0,0 +1,4 @@ +- project: + check: + jobs: + - test diff --git a/tests/fixtures/config/two-tenant/git/org_project2/README b/tests/fixtures/config/two-tenant/git/org_project2/README new file mode 100644 index 0000000000..9daeafb986 --- /dev/null +++ b/tests/fixtures/config/two-tenant/git/org_project2/README @@ -0,0 +1 @@ +test diff --git a/tests/fixtures/config/two-tenant/git/org_project2/zuul.yaml b/tests/fixtures/config/two-tenant/git/org_project2/zuul.yaml new file mode 100644 index 0000000000..d817cedede --- /dev/null +++ b/tests/fixtures/config/two-tenant/git/org_project2/zuul.yaml @@ -0,0 +1,4 @@ +- project: + check: + jobs: + - test diff --git a/tests/fixtures/config/two-tenant/main.yaml b/tests/fixtures/config/two-tenant/main.yaml new file mode 100644 index 0000000000..101e810ec5 --- /dev/null +++ b/tests/fixtures/config/two-tenant/main.yaml @@ -0,0 +1,17 @@ +- tenant: + name: tenant-one + source: + gerrit: + config-projects: + - common-config + untrusted-projects: + - org/project1 + +- tenant: + name: tenant-two + source: + gerrit: + config-projects: + - common-config + untrusted-projects: + - org/project2 diff --git a/tests/unit/test_sos.py b/tests/unit/test_sos.py index e6ec80c2dc..bc464458c4 100644 --- a/tests/unit/test_sos.py +++ b/tests/unit/test_sos.py @@ -16,6 +16,7 @@ import zuul.model from tests.base import iterate_timeout, ZuulTestCase, simple_layout +from zuul.zk.locks import SessionAwareWriteLock, TENANT_LOCK_ROOT class TestScaleOutScheduler(ZuulTestCase): @@ -322,3 +323,123 @@ class TestSOSCircularDependencies(ZuulTestCase): self.waitUntilSettled(matcher=[app]) self.assertEqual(A.reported, 2) self.assertEqual(B.reported, 2) + + +class TestScaleOutSchedulerMultiTenant(ZuulTestCase): + # Those tests are testing specific interactions between multiple + # schedulers. They create additional schedulers as necessary and + # start or stop them individually to test specific interactions. + # Using the scheduler_count in addition to create even more + # schedulers doesn't make sense for those tests. + scheduler_count = 1 + tenant_config_file = "config/two-tenant/main.yaml" + + def test_background_layout_update(self): + # This test performs a reconfiguration on one scheduler and + # verifies that a second scheduler begins processing changes + # for each tenant as it is updated. + + first = self.scheds.first + # Create a second scheduler instance + second = self.createScheduler() + second.start() + self.assertEqual(len(self.scheds), 2) + tenant_one_lock = SessionAwareWriteLock( + self.zk_client.client, + f"{TENANT_LOCK_ROOT}/tenant-one") + + A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B') + + for _ in iterate_timeout(10, "until priming is complete"): + state_one = first.sched.local_layout_state.get("tenant-one") + state_two = first.sched.local_layout_state.get("tenant-two") + if all([state_one, state_two]): + break + + for _ in iterate_timeout( + 10, "all schedulers to have the same layout state"): + if (second.sched.local_layout_state.get( + "tenant-one") == state_one and + second.sched.local_layout_state.get( + "tenant-two") == state_two): + break + + self.log.debug("Freeze scheduler-1") + with second.sched.layout_update_lock: + state_one = first.sched.local_layout_state.get("tenant-one") + state_two = first.sched.local_layout_state.get("tenant-two") + self.log.debug("Reconfigure scheduler-0") + first.sched.reconfigure(first.config) + for _ in iterate_timeout( + 10, "tenants to be updated on scheduler-0"): + if ((first.sched.local_layout_state["tenant-one"] != + state_one) and + (first.sched.local_layout_state["tenant-two"] != + state_two)): + break + self.waitUntilSettled(matcher=[first]) + self.log.debug("Grab tenant-one write lock") + tenant_one_lock.acquire(blocking=True) + + self.log.debug("Thaw scheduler-1") + self.log.debug("Freeze scheduler-0") + with first.sched.run_handler_lock: + self.log.debug("Open change in tenant-one") + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + + for _ in iterate_timeout(10, "trigger event appears"): + if second.sched.trigger_events['tenant-one'].hasEvents(): + break + + for _ in iterate_timeout( + 10, "tenant-two to be updated on scheduler-1"): + if (first.sched.local_layout_state["tenant-two"] == + second.sched.local_layout_state.get("tenant-two")): + break + # Tenant two should be up to date, but tenant one should + # still be out of date on scheduler two. + self.assertEqual(first.sched.local_layout_state["tenant-two"], + second.sched.local_layout_state["tenant-two"]) + self.assertNotEqual(first.sched.local_layout_state["tenant-one"], + second.sched.local_layout_state["tenant-one"]) + self.log.debug("Verify tenant-one change is unprocessed") + # If we have updated tenant-two's configuration without + # processing the tenant-one change, then we know we've + # completed at least one run loop. + self.assertHistory([]) + + self.log.debug("Open change in tenant-two") + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + self.log.debug("Wait for scheduler-1 to process tenant-two change") + for _ in iterate_timeout(10, "tenant-two build finish"): + if len(self.history): + break + + self.assertHistory([ + dict(name='test', result='SUCCESS', changes='2,1'), + ], ordered=False) + + # Tenant two should be up to date, but tenant one should + # still be out of date on scheduler two. + self.assertEqual(first.sched.local_layout_state["tenant-two"], + second.sched.local_layout_state["tenant-two"]) + self.assertNotEqual(first.sched.local_layout_state["tenant-one"], + second.sched.local_layout_state["tenant-one"]) + + self.log.debug("Release tenant-one write lock") + tenant_one_lock.release() + + self.log.debug("Wait for both changes to be processed") + self.waitUntilSettled(matcher=[second]) + self.assertHistory([ + dict(name='test', result='SUCCESS', changes='2,1'), + dict(name='test', result='SUCCESS', changes='1,1'), + ], ordered=False) + + # Both tenants should be up to date + self.assertEqual(first.sched.local_layout_state["tenant-two"], + second.sched.local_layout_state["tenant-two"]) + self.assertEqual(first.sched.local_layout_state["tenant-one"], + second.sched.local_layout_state["tenant-one"]) + self.waitUntilSettled() diff --git a/zuul/scheduler.py b/zuul/scheduler.py index f6e2a9acad..811943af15 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -151,8 +151,15 @@ class Scheduler(threading.Thread): self.daemon = True self.hostname = socket.getfqdn() self.primed_event = threading.Event() + # Wake up the main run loop self.wake_event = threading.Event() + # Wake up the update loop + self.layout_update_event = threading.Event() + # Only used by tests in order to quiesce the layout update loop + self.layout_update_lock = threading.Lock() + # Don't change the abide without holding this lock self.layout_lock = threading.Lock() + # Only used by tests in order to quiesce the main run loop self.run_handler_lock = threading.Lock() self.command_map = { 'stop': self.stop, @@ -238,8 +245,9 @@ class Scheduler(threading.Thread): self.abide = Abide() self.unparsed_abide = UnparsedAbideConfig() - self.tenant_layout_state = LayoutStateStore(self.zk_client, - self.wake_event.set) + self.tenant_layout_state = LayoutStateStore( + self.zk_client, + self.layout_update_event.set) self.local_layout_state = {} command_socket = get_default( @@ -289,6 +297,12 @@ class Scheduler(threading.Thread): self.start_cleanup_thread.start() self.component_info.state = self.component_info.INITIALIZING + # Start a thread to perform background tenant layout updates + self.layout_update_thread = threading.Thread( + target=self.runTenantLayoutUpdates, name='layout updates') + self.layout_update_thread.daemon = True + self.layout_update_thread.start() + def stop(self): self.log.debug("Stopping scheduler") self._stopped = True @@ -312,6 +326,9 @@ class Scheduler(threading.Thread): self.log.debug("Stopping stats thread") self.stats_election.cancel() self.stats_thread.join() + self.log.debug("Waiting for layout update thread") + self.layout_update_event.set() + self.layout_update_thread.join() self.log.debug("Stopping RPC thread") self.rpc.stop() self.rpc.join() @@ -1010,8 +1027,45 @@ class Scheduler(threading.Thread): except KeyError: raise RuntimeError("No key store password configured!") - def updateTenantLayout(self, tenant_name): - self.log.debug("Updating layout of tenant %s", tenant_name) + def runTenantLayoutUpdates(self): + log = logging.getLogger("zuul.Scheduler.LayoutUpdate") + # Only run this after config priming is complete + self.primed_event.wait() + while not self._stopped: + self.layout_update_event.wait() + self.layout_update_event.clear() + if self._stopped: + break + with self.layout_update_lock: + for tenant_name in list(self.unparsed_abide.tenants): + if self._stopped: + break + try: + with tenant_read_lock(self.zk_client, tenant_name, + blocking=False): + if (self.tenant_layout_state[tenant_name] + > self.local_layout_state[tenant_name]): + log.debug( + "Local layout of tenant %s not up to date", + tenant_name) + self.updateTenantLayout(log, tenant_name) + # Wake up the main thread to process any + # events for this tenant. + self.wake_event.set() + except LockException: + log.debug( + "Skipping layout update of locked tenant %s", + tenant_name) + self.layout_update_event.set() + except Exception: + log.exception("Error updating layout of tenant %s", + tenant_name) + self.layout_update_event.set() + # In case something is locked, don't busy-loop. + time.sleep(0.1) + + def updateTenantLayout(self, log, tenant_name): + log.debug("Updating layout of tenant %s", tenant_name) if self.unparsed_abide.ltime < self.system_config_cache.ltime: self.updateSystemConfig() @@ -1021,7 +1075,7 @@ class Scheduler(threading.Thread): self.connections, self.zk_client, self.globals, self.statsd, self, self.merger, self.keystore) with self.layout_lock: - self.log.debug("Updating local layout of tenant %s ", tenant_name) + log.debug("Updating local layout of tenant %s ", tenant_name) layout_state = self.tenant_layout_state.get(tenant_name) layout_uuid = layout_state and layout_state.uuid if layout_state: @@ -1650,11 +1704,12 @@ class Scheduler(threading.Thread): self.zk_client, tenant_name, blocking=False ): if (self.tenant_layout_state[tenant_name] - > self.local_layout_state[tenant_name]): + > self.local_layout_state[tenant_name]): self.log.debug( "Local layout of tenant %s not up to date", tenant.name) - self.updateTenantLayout(tenant_name) + self.layout_update_event.set() + continue # Get tenant again, as it might have been updated # by a tenant reconfig or layout change.