Strictly sequence reconfiguration events
In the before times when we only had a single scheduler, it was naturally the case that reconfiguration events were processed as they were encountered and no trigger events which arrived after them would be processed until the reconfiguration was complete. As we added more event queues to support SOS, it became possible for trigger events which arrived at the scheduler to be processed before a tenant reconfiguration caused by a preceding event to be complete. This is now even possible with a single scheduler. As a concrete example, imagine a change merges which updates the jobs which should run on a tag, and then a tag is created. A scheduler will process both of those events in succession. The first will cause it to submit a tenant reconfiguration event, and then forward the trigger event to any matching pipelines. The second event will also be forwarded to pipeline event queues. The pipeline events will then be processed, and then only at that point will the scheduler return to the start of the run loop and process the reconfiguration event. To correct this, we can take one of two approaches: make the reconfiguration more synchronous, or make it safer to be asynchronous. To make reconfiguration more synchronous, we would need to be able to upgrade a tenant read lock into a tenant write lock without releasing it. The lock recipes we use from kazoo do not support this. While it would be possible to extend them to do so, it would lead us further from parity with the upstream kazoo recipes, so this aproach is not used. Instead, we will make it safer for reconfiguration to be asynchronous by annotating every trigger event we forward with the last reconfiguration event that was seen before it. This means that every trigger event now specifies the minimum reconfiguration time for that event. If our local scheduler has not reached that time, we should stop processing trigger events and wait for it to catch up. This means that schedulers may continue to process events up to the point of a reconfiguration, but will then stop. The already existing short-circuit to abort processing once a scheduler is ready to reconfigure a tenant (where we check the tenant write lock contenders for a waiting reconfiguration) helps us get out of the way of pending reconfigurations as well. In short, once a reconfiguration is ready to start, we won't start processing tenant events anymore because of the existing lock check. And up until that happens, we will process as many events as possible until any further events require the reconfiguration. We will use the ltime of the tenant trigger event as our timestamp. As we forward tenant trigger events to the pipeline trigger event queues, we decide whether an event should cause a reconfiguration. Whenever one does, we note the ltime of that event and store it as metadata on the tenant trigger event queue so that we always know what the most recent required minimum ltime is (ie, the ltime of the most recently seen event that should cause a reconfiguration). Every event that we forward to the pipeline trigger queue will be annotated to specify that its minimum required reconfiguration ltime is that most recently seen ltime. And each time we reconfigure a tenant, we store the ltime of the event that prompted the reconfiguration in the layout state. If we later process a pipeline trigger event with a minimum required reconfigure ltime greater than the current one, we know we need to stop and wait for a reconfiguration, so we abort early. Because this system involves several event queues and objects each of which may be serialized at any point during a rolling upgrade, every involved object needs to have appropriate default value handling, and a synchronized model api change is not helpful. The remainder of this commit message is a description of what happens with each object when handled by either an old or new scheduler component during a rolling upgrade. When forwarding a trigger event and submitting a tenant reconfiguration event: The tenant trigger event zuul_event_ltime is initialized from zk, so will always have a value. The pipeline management event trigger_event_ltime is initialzed to the tenant trigger event zuul_event_ltime, so a new scheduler will write out the value. If an old scheduler creates the tenant reconfiguration event, it will be missing the trigger_event_ltime. The _reconfigureTenant method is called with a last_reconfigure_event_ltime parameter, which is either the trigger_event_ltime above in the case of a tenant reconfiguration event forwarded by a new scheduler, or -1 in all other cases (including other types of reconfiguration, or a tenant reconfiguration event forwarded by an old scheduler). If it is -1, it will use the current ltime so that if we process an event from an old scheduler which is missing the event ltime, or we are bootstrapping a tenant or otherwise reconfiguring in a context where we don't have a triggering event ltime, we will use an ltime which is very new so that we don't defer processing trigger events. We also ensure we never go backward, so that if we process an event from an old scheduler (and thus use the current ltime) then process an event from a new scheduler with an older (than "now") ltime, we retain the newer ltime. Each time a tenant reconfiguration event is submitted, the ltime of that reconfiguration event is stored on the trigger event queue. This is then used as the min_reconfigure_ltime attribute on the forwarded trigger events. This is updated by new schedulers, and ignored by old ones, so if an old scheduler process a tenant trigger event queue it won't update the min ltime. That will just mean that any events processed by a new scheduler may continue to use an older ltime as their minimum, which should not cause a problem. Any events forwarded by an old scheduler will omit the min_reconfigure_ltime field; that field will be initialized to -1 when loaded on a new scheduler. When processing pipeline trigger events: In process_pipeline_trigger_queue we compare two values: the last_reconfigure_event_ltime on the layout state which is either set to a value as above (by a new scheduler), or will be -1 if it was last written by an old scheduler (including in the case it was overwritten by an old scheduler; it will re-initialize to -1 in that case). The event.min_reconfigure_ltime field will either be the most recent reconfiguration ltime seen by a new scheduler forwarding trigger events, or -1 otherwise. If the min_reconfigure_ltime of an event is -1, we retain the old behavior of processing the event regardless. Only if we have a min_reconfigure_ltime > -1 and it is greater than the layout state last_reconfigure_event_ltime (which itself may be -1, and thus less than the min_reconfigure_ltime) do we abort processing the event. (The test_config_update test for the Gerrit checks plugin is updated to include an extra waitUntilSettled since a potential test race was observed during development.) Change-Id: Icb6a7858591ab867e7006c7c80bfffeb582b28ee
This commit is contained in:
75
tests/fixtures/layouts/trigger-sequence.yaml
vendored
Normal file
75
tests/fixtures/layouts/trigger-sequence.yaml
vendored
Normal file
@@ -0,0 +1,75 @@
|
||||
- pipeline:
|
||||
name: check
|
||||
manager: independent
|
||||
trigger:
|
||||
gerrit:
|
||||
- event: patchset-created
|
||||
success:
|
||||
gerrit:
|
||||
Verified: 1
|
||||
failure:
|
||||
gerrit:
|
||||
Verified: -1
|
||||
|
||||
- pipeline:
|
||||
name: gate
|
||||
manager: dependent
|
||||
success-message: Build succeeded (gate).
|
||||
trigger:
|
||||
gerrit:
|
||||
- event: comment-added
|
||||
approval:
|
||||
- Approved: 1
|
||||
success:
|
||||
gerrit:
|
||||
Verified: 2
|
||||
submit: true
|
||||
failure:
|
||||
gerrit:
|
||||
Verified: -2
|
||||
start:
|
||||
gerrit:
|
||||
Verified: 0
|
||||
precedence: high
|
||||
|
||||
- pipeline:
|
||||
name: post
|
||||
manager: independent
|
||||
trigger:
|
||||
gerrit:
|
||||
- event: ref-updated
|
||||
ref: ^(?!refs/).*$
|
||||
|
||||
- pipeline:
|
||||
name: tag
|
||||
manager: independent
|
||||
trigger:
|
||||
gerrit:
|
||||
- event: ref-updated
|
||||
ref: ^refs/tags/.*$
|
||||
|
||||
- job:
|
||||
name: base
|
||||
parent: null
|
||||
run: playbooks/base.yaml
|
||||
nodeset:
|
||||
nodes:
|
||||
- label: ubuntu-xenial
|
||||
name: controller
|
||||
|
||||
- job:
|
||||
name: check-job
|
||||
run: playbooks/check.yaml
|
||||
|
||||
- job:
|
||||
name: post-job
|
||||
run: playbooks/post.yaml
|
||||
|
||||
- project:
|
||||
name: org/project
|
||||
check:
|
||||
jobs:
|
||||
- check-job
|
||||
gate:
|
||||
jobs:
|
||||
- check-job
|
||||
@@ -699,6 +699,7 @@ class TestPolling(ZuulTestCase):
|
||||
files=file_dict)
|
||||
A.setMerged()
|
||||
self.waitForPoll('gerrit')
|
||||
self.waitUntilSettled()
|
||||
|
||||
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
|
||||
B.setCheck('zuul:check', reset=True)
|
||||
|
||||
@@ -38,7 +38,7 @@ from tests.base import (AnsibleZuulTestCase, BaseTestCase,
|
||||
simple_layout, random_sha1)
|
||||
from tests.base import ZuulWebFixture
|
||||
|
||||
EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {})
|
||||
EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1)
|
||||
|
||||
|
||||
class TestGithubDriver(ZuulTestCase):
|
||||
|
||||
@@ -28,7 +28,7 @@ from tests.base import ZuulTestCase, ZuulWebFixture
|
||||
|
||||
from testtools.matchers import MatchesRegex
|
||||
|
||||
EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {})
|
||||
EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1)
|
||||
|
||||
|
||||
class TestGitlabWebhook(ZuulTestCase):
|
||||
|
||||
@@ -26,7 +26,7 @@ from zuul.zk.layout import LayoutState
|
||||
from tests.base import ZuulTestCase, simple_layout
|
||||
from tests.base import ZuulWebFixture
|
||||
|
||||
EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {})
|
||||
EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1)
|
||||
|
||||
|
||||
class TestPagureDriver(ZuulTestCase):
|
||||
|
||||
@@ -50,8 +50,9 @@ from tests.base import (
|
||||
)
|
||||
from zuul.zk.change_cache import ChangeKey
|
||||
from zuul.zk.layout import LayoutState
|
||||
from zuul.zk.locks import management_queue_lock
|
||||
|
||||
EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {})
|
||||
EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1)
|
||||
|
||||
|
||||
class TestSchedulerSSL(SSLZuulTestCase):
|
||||
@@ -4214,6 +4215,56 @@ class TestScheduler(ZuulTestCase):
|
||||
dict(name='check-job', result='SUCCESS', changes='1,1'),
|
||||
])
|
||||
|
||||
@simple_layout('layouts/trigger-sequence.yaml')
|
||||
def test_live_reconfiguration_trigger_sequence(self):
|
||||
# Test that events arriving after an event that triggers a
|
||||
# reconfiguration are handled after the reconfiguration
|
||||
# completes.
|
||||
|
||||
in_repo_conf = "[{project: {tag: {jobs: [post-job]}}}]"
|
||||
file_dict = {'zuul.yaml': in_repo_conf}
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
|
||||
files=file_dict)
|
||||
sched = self.scheds.first.sched
|
||||
# Hold the management queue so that we don't process any
|
||||
# reconfiguration events yet.
|
||||
with management_queue_lock(
|
||||
self.zk_client, 'tenant-one', blocking=False
|
||||
):
|
||||
with sched.run_handler_lock:
|
||||
A.setMerged()
|
||||
# Submit two events while no processing is happening:
|
||||
# A change merged event that will trigger a reconfiguration
|
||||
self.fake_gerrit.addEvent(A.getChangeMergedEvent())
|
||||
|
||||
# And a tag event which should only run a job after
|
||||
# the config change above is in effect.
|
||||
event = self.fake_gerrit.addFakeTag(
|
||||
'org/project', 'master', 'foo')
|
||||
self.fake_gerrit.addEvent(event)
|
||||
|
||||
# Wait for the tenant trigger queue to empty out, and for
|
||||
# us to have a tenant management as well as a pipeline
|
||||
# trigger event. At this point, we should be deferring
|
||||
# the trigger event until the management event is handled.
|
||||
for _ in iterate_timeout(60, 'queues'):
|
||||
with sched.run_handler_lock:
|
||||
if sched.trigger_events['tenant-one'].hasEvents():
|
||||
continue
|
||||
if not sched.pipeline_trigger_events[
|
||||
'tenant-one']['tag'].hasEvents():
|
||||
continue
|
||||
if not sched.management_events['tenant-one'].hasEvents():
|
||||
continue
|
||||
break
|
||||
|
||||
# Now we can resume and process the reconfiguration event
|
||||
sched.wake_event.set()
|
||||
self.waitUntilSettled()
|
||||
self.assertHistory([
|
||||
dict(name='post-job', result='SUCCESS'),
|
||||
])
|
||||
|
||||
@simple_layout('layouts/repo-deleted.yaml')
|
||||
def test_repo_deleted(self):
|
||||
self.init_repo("org/delete-project")
|
||||
|
||||
@@ -1290,7 +1290,7 @@ class TestLayoutStore(ZooKeeperBaseTestCase):
|
||||
"github": 456,
|
||||
}
|
||||
state = LayoutState("tenant", "hostname", 0, layout_uuid,
|
||||
branch_cache_min_ltimes)
|
||||
branch_cache_min_ltimes, -1)
|
||||
store["tenant"] = state
|
||||
self.assertEqual(state, store["tenant"])
|
||||
self.assertNotEqual(state.ltime, -1)
|
||||
@@ -1301,9 +1301,9 @@ class TestLayoutStore(ZooKeeperBaseTestCase):
|
||||
def test_ordering(self):
|
||||
layout_uuid = uuid.uuid4().hex
|
||||
state_one = LayoutState("tenant", "hostname", 1, layout_uuid,
|
||||
{}, ltime=1)
|
||||
{}, -1, ltime=1)
|
||||
state_two = LayoutState("tenant", "hostname", 2, layout_uuid,
|
||||
{}, ltime=2)
|
||||
{}, -1, ltime=2)
|
||||
|
||||
self.assertGreater(state_two, state_one)
|
||||
|
||||
@@ -1312,9 +1312,9 @@ class TestLayoutStore(ZooKeeperBaseTestCase):
|
||||
min_ltimes = defaultdict(lambda x: -1)
|
||||
min_ltimes['foo'] = 1
|
||||
state_one = LayoutState("tenant", "hostname", 1, uuid.uuid4().hex,
|
||||
{}, ltime=1)
|
||||
{}, -1, ltime=1)
|
||||
state_two = LayoutState("tenant", "hostname", 2, uuid.uuid4().hex,
|
||||
{}, ltime=2)
|
||||
{}, -1, ltime=2)
|
||||
store.setMinLtimes(state_one, min_ltimes)
|
||||
store.setMinLtimes(state_two, min_ltimes)
|
||||
store['tenant'] = state_one
|
||||
|
||||
@@ -1044,6 +1044,7 @@ class Client(zuul.cmd.ZuulApp):
|
||||
tenant_name=args.tenant,
|
||||
hostname='admin command',
|
||||
last_reconfigured=int(time.time()),
|
||||
last_reconfigure_event_ltime=-1,
|
||||
uuid=uuid4().hex,
|
||||
branch_cache_min_ltimes={},
|
||||
ltime=ps._zstat.last_modified_transaction_id,
|
||||
|
||||
@@ -5910,6 +5910,7 @@ class TenantReconfigureEvent(ManagementEvent):
|
||||
self.tenant_name = tenant_name
|
||||
self.project_branches = set([(project_name, branch_name)])
|
||||
self.branch_cache_ltimes = {}
|
||||
self.trigger_event_ltime = -1
|
||||
self.merged_events = []
|
||||
|
||||
def __ne__(self, other):
|
||||
@@ -5931,6 +5932,8 @@ class TenantReconfigureEvent(ManagementEvent):
|
||||
self.branch_cache_ltimes.get(connection_name, ltime), ltime)
|
||||
self.zuul_event_ltime = max(self.zuul_event_ltime,
|
||||
other.zuul_event_ltime)
|
||||
self.trigger_event_ltime = max(self.trigger_event_ltime,
|
||||
other.trigger_event_ltime)
|
||||
self.merged_events.append(other)
|
||||
|
||||
def toDict(self):
|
||||
@@ -5938,6 +5941,7 @@ class TenantReconfigureEvent(ManagementEvent):
|
||||
d["tenant_name"] = self.tenant_name
|
||||
d["project_branches"] = list(self.project_branches)
|
||||
d["branch_cache_ltimes"] = self.branch_cache_ltimes
|
||||
d["trigger_event_ltime"] = self.trigger_event_ltime
|
||||
return d
|
||||
|
||||
@classmethod
|
||||
@@ -5953,6 +5957,7 @@ class TenantReconfigureEvent(ManagementEvent):
|
||||
tuple(pb) for pb in data["project_branches"]
|
||||
)
|
||||
event.branch_cache_ltimes = data.get("branch_cache_ltimes", {})
|
||||
event.trigger_event_ltime = data.get("trigger_event_ltime", -1)
|
||||
return event
|
||||
|
||||
|
||||
@@ -6289,6 +6294,9 @@ class TriggerEvent(AbstractEvent):
|
||||
self.branch_deleted = False
|
||||
self.branch_protected = True
|
||||
self.ref = None
|
||||
# For reconfiguration sequencing
|
||||
self.min_reconfigure_ltime = -1
|
||||
self.zuul_event_ltime = None
|
||||
# For management events (eg: enqueue / promote)
|
||||
self.tenant_name = None
|
||||
self.project_hostname = None
|
||||
@@ -6326,6 +6334,8 @@ class TriggerEvent(AbstractEvent):
|
||||
"branch_deleted": self.branch_deleted,
|
||||
"branch_protected": self.branch_protected,
|
||||
"ref": self.ref,
|
||||
"min_reconfigure_ltime": self.min_reconfigure_ltime,
|
||||
"zuul_event_ltime": self.zuul_event_ltime,
|
||||
"tenant_name": self.tenant_name,
|
||||
"project_hostname": self.project_hostname,
|
||||
"project_name": self.project_name,
|
||||
@@ -6358,6 +6368,8 @@ class TriggerEvent(AbstractEvent):
|
||||
self.branch_deleted = d["branch_deleted"]
|
||||
self.branch_protected = d["branch_protected"]
|
||||
self.ref = d["ref"]
|
||||
self.min_reconfigure_ltime = d.get("min_reconfigure_ltime", -1)
|
||||
self.zuul_event_ltime = d.get("zuul_event_ltime", None)
|
||||
self.tenant_name = d["tenant_name"]
|
||||
self.project_hostname = d["project_hostname"]
|
||||
self.project_name = d["project_name"]
|
||||
|
||||
@@ -860,9 +860,14 @@ class Scheduler(threading.Thread):
|
||||
self.log.exception("Exception reporting runtime stats")
|
||||
|
||||
def reconfigureTenant(self, tenant, project, trigger_event):
|
||||
if trigger_event:
|
||||
trigger_event_ltime = trigger_event.zuul_event_ltime
|
||||
else:
|
||||
trigger_event_ltime = None
|
||||
self.log.debug("Submitting tenant reconfiguration event for "
|
||||
"%s due to event %s in project %s",
|
||||
tenant.name, trigger_event, project)
|
||||
"%s due to event %s in project %s, ltime %s",
|
||||
tenant.name, trigger_event, project,
|
||||
trigger_event_ltime)
|
||||
branch = trigger_event and trigger_event.branch
|
||||
event = TenantReconfigureEvent(
|
||||
tenant.name, project.canonical_name, branch,
|
||||
@@ -870,6 +875,7 @@ class Scheduler(threading.Thread):
|
||||
if trigger_event:
|
||||
event.branch_cache_ltimes[trigger_event.connection_name] = (
|
||||
trigger_event.branch_cache_ltime)
|
||||
event.trigger_event_ltime = trigger_event_ltime
|
||||
self.management_events[tenant.name].put(event, needs_result=False)
|
||||
|
||||
def fullReconfigureCommandHandler(self):
|
||||
@@ -970,7 +976,7 @@ class Scheduler(threading.Thread):
|
||||
if layout_state is None:
|
||||
# Reconfigure only tenants w/o an existing layout state
|
||||
ctx = self.createZKContext(tlock, self.log)
|
||||
self._reconfigureTenant(ctx, min_ltimes, tenant)
|
||||
self._reconfigureTenant(ctx, min_ltimes, -1, tenant)
|
||||
self._reportInitialStats(tenant)
|
||||
else:
|
||||
self.local_layout_state[tenant_name] = layout_state
|
||||
@@ -1422,6 +1428,7 @@ class Scheduler(threading.Thread):
|
||||
ctx = self.createZKContext(lock, self.log)
|
||||
if tenant is not None:
|
||||
self._reconfigureTenant(ctx, min_ltimes,
|
||||
-1,
|
||||
tenant, old_tenant)
|
||||
else:
|
||||
self._reconfigureDeleteTenant(ctx, old_tenant)
|
||||
@@ -1485,6 +1492,7 @@ class Scheduler(threading.Thread):
|
||||
tenant = self.abide.tenants[event.tenant_name]
|
||||
ctx = self.createZKContext(lock, self.log)
|
||||
self._reconfigureTenant(ctx, min_ltimes,
|
||||
event.trigger_event_ltime,
|
||||
tenant, old_tenant)
|
||||
duration = round(time.monotonic() - start, 3)
|
||||
self.log.info("Tenant reconfiguration complete for %s (duration: %s "
|
||||
@@ -1639,7 +1647,8 @@ class Scheduler(threading.Thread):
|
||||
request)
|
||||
self.cancelJob(build_set, request_job)
|
||||
|
||||
def _reconfigureTenant(self, context, min_ltimes, tenant,
|
||||
def _reconfigureTenant(self, context, min_ltimes,
|
||||
last_reconfigure_event_ltime, tenant,
|
||||
old_tenant=None):
|
||||
# This is called from _doReconfigureEvent while holding the
|
||||
# layout lock
|
||||
@@ -1666,10 +1675,29 @@ class Scheduler(threading.Thread):
|
||||
for s in self.connections.getSources()
|
||||
}
|
||||
|
||||
# Make sure last_reconfigure_event_ltime never goes backward
|
||||
old_layout_state = self.tenant_layout_state.get(tenant.name)
|
||||
if old_layout_state:
|
||||
if (old_layout_state.last_reconfigure_event_ltime >
|
||||
last_reconfigure_event_ltime):
|
||||
self.log.debug("Setting layout state last reconfigure ltime "
|
||||
"to previous ltime %s which is newer than %s",
|
||||
old_layout_state.last_reconfigure_event_ltime,
|
||||
last_reconfigure_event_ltime)
|
||||
last_reconfigure_event_ltime =\
|
||||
old_layout_state.last_reconfigure_event_ltime
|
||||
if last_reconfigure_event_ltime < 0:
|
||||
last_reconfigure_event_ltime = self.zk_client.getCurrentLtime()
|
||||
self.log.debug("Setting layout state last reconfigure ltime "
|
||||
"to current ltime %s", last_reconfigure_event_ltime)
|
||||
else:
|
||||
self.log.debug("Setting layout state last reconfigure ltime "
|
||||
"to %s", last_reconfigure_event_ltime)
|
||||
layout_state = LayoutState(
|
||||
tenant_name=tenant.name,
|
||||
hostname=self.hostname,
|
||||
last_reconfigured=int(time.time()),
|
||||
last_reconfigure_event_ltime=last_reconfigure_event_ltime,
|
||||
uuid=tenant.layout.uuid,
|
||||
branch_cache_min_ltimes=branch_cache_min_ltimes,
|
||||
)
|
||||
@@ -2178,6 +2206,8 @@ class Scheduler(threading.Thread):
|
||||
"Unable to refresh pipeline change list for %s",
|
||||
pipeline.name)
|
||||
|
||||
# Get the ltime of the last reconfiguration event
|
||||
self.trigger_events[tenant.name].refreshMetadata()
|
||||
for event in self.trigger_events[tenant.name]:
|
||||
log = get_annotated_logger(self.log, event.zuul_event_id)
|
||||
log.debug("Forwarding trigger event %s", event)
|
||||
@@ -2266,7 +2296,15 @@ class Scheduler(threading.Thread):
|
||||
# out cached data for this project and perform a
|
||||
# reconfiguration.
|
||||
self.reconfigureTenant(tenant, change.project, event)
|
||||
# This will become the new required minimum event ltime
|
||||
# for every trigger event processed after the
|
||||
# reconfiguration, so make sure we update it after having
|
||||
# submitted the reconfiguration event.
|
||||
self.trigger_events[tenant.name].last_reconfigure_event_ltime =\
|
||||
event.zuul_event_ltime
|
||||
|
||||
event.min_reconfigure_ltime = self.trigger_events[
|
||||
tenant.name].last_reconfigure_event_ltime
|
||||
for pipeline in tenant.layout.pipelines.values():
|
||||
if (
|
||||
pipeline.manager.eventMatches(event, change)
|
||||
@@ -2281,6 +2319,21 @@ class Scheduler(threading.Thread):
|
||||
if self._stopped:
|
||||
return
|
||||
log = get_annotated_logger(self.log, event.zuul_event_id)
|
||||
if not isinstance(event, SupercedeEvent):
|
||||
local_state = self.local_layout_state[tenant.name]
|
||||
last_ltime = local_state.last_reconfigure_event_ltime
|
||||
# The event tells us the ltime of the most recent
|
||||
# reconfiguration event up to that point. If our local
|
||||
# layout state wasn't generated by an event after that
|
||||
# time, then we are too out of date to process this event.
|
||||
# Abort now and wait for an update.
|
||||
if (event.min_reconfigure_ltime > -1 and
|
||||
event.min_reconfigure_ltime > last_ltime):
|
||||
log.debug("Trigger event minimum reconfigure ltime of %s "
|
||||
"newer than current reconfigure ltime of %s, "
|
||||
"aborting early",
|
||||
event.min_reconfigure_ltime, last_ltime)
|
||||
return
|
||||
log.debug("Processing trigger event %s", event)
|
||||
try:
|
||||
if isinstance(event, SupercedeEvent):
|
||||
|
||||
@@ -776,7 +776,7 @@ class TriggerEventQueue(ZooKeeperEventQueue):
|
||||
self._put(data)
|
||||
|
||||
def __iter__(self):
|
||||
for data, ack_ref, _ in self._iterEvents():
|
||||
for data, ack_ref, zstat in self._iterEvents():
|
||||
try:
|
||||
if (data["driver_name"] is None and
|
||||
data["event_type"] == "SupercedeEvent"):
|
||||
@@ -793,6 +793,9 @@ class TriggerEventQueue(ZooKeeperEventQueue):
|
||||
event = event_class.fromDict(event_data)
|
||||
event.ack_ref = ack_ref
|
||||
event.driver_name = data["driver_name"]
|
||||
# Initialize the logical timestamp if not valid
|
||||
if event.zuul_event_ltime is None:
|
||||
event.zuul_event_ltime = zstat.creation_transaction_id
|
||||
yield event
|
||||
|
||||
|
||||
@@ -803,6 +806,28 @@ class TenantTriggerEventQueue(TriggerEventQueue):
|
||||
queue_root = TENANT_TRIGGER_ROOT.format(
|
||||
tenant=tenant_name)
|
||||
super().__init__(client, queue_root, connections)
|
||||
self.metadata = {}
|
||||
|
||||
def _setQueueMetadata(self):
|
||||
encoded_data = json.dumps(
|
||||
self.metadata, sort_keys=True).encode("utf-8")
|
||||
self.kazoo_client.set(self.queue_root, encoded_data)
|
||||
|
||||
def refreshMetadata(self):
|
||||
data, zstat = self.kazoo_client.get(self.queue_root)
|
||||
try:
|
||||
self.metadata = json.loads(data)
|
||||
except json.JSONDecodeError:
|
||||
self.metadata = {}
|
||||
|
||||
@property
|
||||
def last_reconfigure_event_ltime(self):
|
||||
return self.metadata.get('last_reconfigure_event_ltime', -1)
|
||||
|
||||
@last_reconfigure_event_ltime.setter
|
||||
def last_reconfigure_event_ltime(self, val):
|
||||
self.metadata['last_reconfigure_event_ltime'] = val
|
||||
self._setQueueMetadata()
|
||||
|
||||
@classmethod
|
||||
def createRegistry(cls, client, connections):
|
||||
|
||||
@@ -49,12 +49,15 @@ class LayoutState:
|
||||
"""
|
||||
|
||||
def __init__(self, tenant_name, hostname, last_reconfigured, uuid,
|
||||
branch_cache_min_ltimes, ltime=-1):
|
||||
branch_cache_min_ltimes, last_reconfigure_event_ltime,
|
||||
ltime=-1):
|
||||
self.uuid = uuid
|
||||
self.ltime = ltime
|
||||
self.tenant_name = tenant_name
|
||||
self.hostname = hostname
|
||||
self.last_reconfigured = last_reconfigured
|
||||
self.last_reconfigure_event_ltime =\
|
||||
last_reconfigure_event_ltime
|
||||
self.branch_cache_min_ltimes = branch_cache_min_ltimes
|
||||
|
||||
def toDict(self):
|
||||
@@ -62,6 +65,8 @@ class LayoutState:
|
||||
"tenant_name": self.tenant_name,
|
||||
"hostname": self.hostname,
|
||||
"last_reconfigured": self.last_reconfigured,
|
||||
"last_reconfigure_event_ltime":
|
||||
self.last_reconfigure_event_ltime,
|
||||
"uuid": self.uuid,
|
||||
"branch_cache_min_ltimes": self.branch_cache_min_ltimes,
|
||||
}
|
||||
@@ -74,6 +79,7 @@ class LayoutState:
|
||||
data["last_reconfigured"],
|
||||
data.get("uuid"),
|
||||
data.get("branch_cache_min_ltimes"),
|
||||
data.get("last_reconfigure_event_ltime", -1),
|
||||
data.get("ltime", -1),
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user