Merge "Add LocalZKContext for job freezing"
This commit is contained in:
commit
1d05b0a95f
@ -30,6 +30,7 @@ import zuul.lib.connections
|
||||
|
||||
from tests.base import BaseTestCase, FIXTURE_DIR
|
||||
from zuul.lib.ansible import AnsibleManager
|
||||
from zuul.zk.zkobject import LocalZKContext
|
||||
|
||||
|
||||
class Dummy(object):
|
||||
@ -71,6 +72,7 @@ class TestJob(BaseTestCase):
|
||||
self.pipeline = model.Pipeline('gate', self.tenant)
|
||||
self.pipeline.source_context = self.context
|
||||
self.pipeline.manager = mock.Mock()
|
||||
self.zk_context = LocalZKContext(self.log)
|
||||
self.layout.addPipeline(self.pipeline)
|
||||
self.queue = DummyChangeQueue.new(None, pipeline=self.pipeline)
|
||||
self.pcontext = configloader.ParseContext(
|
||||
@ -221,7 +223,7 @@ class TestJob(BaseTestCase):
|
||||
self.assertTrue(python27.changeMatchesBranch(change))
|
||||
self.assertFalse(python27diablo.changeMatchesBranch(change))
|
||||
|
||||
item.freezeJobGraph(self.layout)
|
||||
item.freezeJobGraph(self.layout, self.zk_context)
|
||||
self.assertEqual(len(item.getJobs()), 1)
|
||||
job = item.getJobs()[0]
|
||||
self.assertEqual(job.name, 'python27')
|
||||
@ -234,7 +236,7 @@ class TestJob(BaseTestCase):
|
||||
self.assertTrue(python27.changeMatchesBranch(change))
|
||||
self.assertTrue(python27diablo.changeMatchesBranch(change))
|
||||
|
||||
item.freezeJobGraph(self.layout)
|
||||
item.freezeJobGraph(self.layout, self.zk_context)
|
||||
self.assertEqual(len(item.getJobs()), 1)
|
||||
job = item.getJobs()[0]
|
||||
self.assertEqual(job.name, 'python27')
|
||||
@ -283,7 +285,7 @@ class TestJob(BaseTestCase):
|
||||
self.assertFalse(python27.changeMatchesFiles(change))
|
||||
|
||||
self.pipeline.manager.getFallbackLayout = mock.Mock(return_value=None)
|
||||
item.freezeJobGraph(self.layout)
|
||||
item.freezeJobGraph(self.layout, self.zk_context)
|
||||
self.assertEqual([], item.getJobs())
|
||||
|
||||
def test_job_source_project(self):
|
||||
@ -353,7 +355,7 @@ class TestJob(BaseTestCase):
|
||||
with testtools.ExpectedException(
|
||||
Exception,
|
||||
"Pre-review pipeline gate does not allow post-review job"):
|
||||
item.freezeJobGraph(self.layout)
|
||||
item.freezeJobGraph(self.layout, self.zk_context)
|
||||
|
||||
|
||||
class TestGraph(BaseTestCase):
|
||||
|
@ -1282,7 +1282,7 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
if not item.job_graph:
|
||||
try:
|
||||
log.debug("Freezing job graph for %s" % (item,))
|
||||
item.freezeJobGraph(self.getLayout(item))
|
||||
item.freezeJobGraph(self.getLayout(item), self.current_context)
|
||||
except Exception as e:
|
||||
# TODOv3(jeblair): nicify this exception as it will be reported
|
||||
log.exception("Error freezing job graph for %s" % (item,))
|
||||
|
@ -3072,16 +3072,15 @@ class QueueItem(zkobject.ZKObject):
|
||||
self.current_build_set.warning_messages.append(msg)
|
||||
self.log.info(msg)
|
||||
|
||||
def freezeJobGraph(self, layout, skip_file_matcher=False):
|
||||
def freezeJobGraph(self, layout, context, skip_file_matcher=False):
|
||||
"""Find or create actual matching jobs for this item's change and
|
||||
store the resulting job tree."""
|
||||
|
||||
ppc = layout.getProjectPipelineConfig(self)
|
||||
ctx = self.pipeline.manager.current_context
|
||||
try:
|
||||
# Conditionally set self.ppc so that the debug method can
|
||||
# consult it as we resolve the jobs.
|
||||
self.updateAttributes(ctx, project_pipeline_config=ppc)
|
||||
self.updateAttributes(context, project_pipeline_config=ppc)
|
||||
if ppc:
|
||||
for msg in ppc.debug_messages:
|
||||
self.debug(msg)
|
||||
@ -3099,9 +3098,9 @@ class QueueItem(zkobject.ZKObject):
|
||||
# created the layout.
|
||||
job_graph.project_metadata = layout.project_metadata
|
||||
|
||||
self.updateAttributes(ctx, job_graph=job_graph)
|
||||
self.updateAttributes(context, job_graph=job_graph)
|
||||
except Exception:
|
||||
self.updateAttributes(ctx, project_pipeline_config=None,
|
||||
self.updateAttributes(context, project_pipeline_config=None,
|
||||
job_graph=None, _old_job_graph=None)
|
||||
raise
|
||||
|
||||
|
@ -26,37 +26,7 @@ from zuul.connection import BaseConnection
|
||||
from zuul.lib import encryption
|
||||
from zuul.lib.gearworker import ZuulGearWorker
|
||||
from zuul.lib.jsonutil import ZuulJSONEncoder
|
||||
|
||||
|
||||
class LocalBuildSet(model.BuildSet):
|
||||
"""Local non-persistent build set."""
|
||||
|
||||
def _save(self, ctx, create=False):
|
||||
pass
|
||||
|
||||
|
||||
class LocalQueueItem(model.QueueItem):
|
||||
"""Local non-persistent queue item for job freezing."""
|
||||
|
||||
@classmethod
|
||||
def new(klass, context, **kw):
|
||||
obj = klass()
|
||||
obj._set(**kw)
|
||||
files_state = (model.BuildSet.COMPLETE if obj.change.files is not None
|
||||
else model.BuildSet.NEW)
|
||||
obj.updateAttributes(context, current_build_set=LocalBuildSet.new(
|
||||
context, item=obj, files_state=files_state, uuid=None))
|
||||
return obj
|
||||
|
||||
def _save(self, ctx, create=False):
|
||||
pass
|
||||
|
||||
|
||||
class LocalChangeQueue(model.ChangeQueue):
|
||||
"""Local non-persistent change queue."""
|
||||
|
||||
def _save(self, ctx, create=False):
|
||||
pass
|
||||
from zuul.zk.zkobject import LocalZKContext
|
||||
|
||||
|
||||
class RPCListenerBase(metaclass=ABCMeta):
|
||||
@ -471,10 +441,11 @@ class RPCListener(RPCListenerBase):
|
||||
|
||||
change = model.Branch(project)
|
||||
change.branch = args.get("branch", "master")
|
||||
queue = LocalChangeQueue.new(None, pipeline=pipeline)
|
||||
item = LocalQueueItem.new(None, queue=queue, change=change,
|
||||
pipeline=queue.pipeline)
|
||||
item.freezeJobGraph(tenant.layout, skip_file_matcher=True)
|
||||
context = LocalZKContext(self.log)
|
||||
queue = model.ChangeQueue.new(context, pipeline=pipeline)
|
||||
item = model.QueueItem.new(context, queue=queue, change=change,
|
||||
pipeline=queue.pipeline)
|
||||
item.freezeJobGraph(tenant.layout, context, skip_file_matcher=True)
|
||||
|
||||
output = []
|
||||
|
||||
@ -502,10 +473,11 @@ class RPCListener(RPCListenerBase):
|
||||
|
||||
change = model.Branch(project)
|
||||
change.branch = args.get("branch", "master")
|
||||
queue = LocalChangeQueue.new(None, pipeline=pipeline)
|
||||
item = LocalQueueItem.new(None, queue=queue, change=change,
|
||||
pipeline=queue.pipeline)
|
||||
item.freezeJobGraph(tenant.layout, skip_file_matcher=True)
|
||||
context = LocalZKContext(self.log)
|
||||
queue = model.ChangeQueue.new(context, pipeline=pipeline)
|
||||
item = model.QueueItem.new(context, queue=queue, change=change,
|
||||
pipeline=queue.pipeline)
|
||||
item.freezeJobGraph(tenant.layout, context, skip_file_matcher=True)
|
||||
|
||||
job = item.job_graph.jobs.get(args.get("job"))
|
||||
if not job:
|
||||
@ -515,6 +487,7 @@ class RPCListener(RPCListenerBase):
|
||||
uuid = '0' * 32
|
||||
params = zuul.executor.common.construct_build_params(
|
||||
uuid, self.sched, job, item, pipeline)
|
||||
params['zuul']['buildset'] = None
|
||||
gear_job.sendWorkComplete(json.dumps(params, cls=ZuulJSONEncoder))
|
||||
|
||||
def handle_allowed_labels_get(self, job):
|
||||
|
@ -35,6 +35,19 @@ class ZKContext:
|
||||
(not self.stop_event or not self.stop_event.is_set()))
|
||||
|
||||
|
||||
class LocalZKContext:
|
||||
"""A Local ZKContext that means don't actually write anything to ZK"""
|
||||
|
||||
def __init__(self, log):
|
||||
self.client = None
|
||||
self.lock = None
|
||||
self.stop_event = None
|
||||
self.log = log
|
||||
|
||||
def sessionIsValid(self):
|
||||
return True
|
||||
|
||||
|
||||
class ZKObject:
|
||||
_retry_interval = 5
|
||||
|
||||
@ -176,6 +189,8 @@ class ZKObject:
|
||||
raise Exception("ZooKeeper session or lock not valid")
|
||||
|
||||
def _save(self, context, create=False):
|
||||
if isinstance(context, LocalZKContext):
|
||||
return
|
||||
data = self.serialize()
|
||||
path = self.getPath()
|
||||
while context.sessionIsValid():
|
||||
@ -239,6 +254,8 @@ class ShardedZKObject(ZKObject):
|
||||
raise Exception("ZooKeeper session or lock not valid")
|
||||
|
||||
def _save(self, context, create=False):
|
||||
if isinstance(context, LocalZKContext):
|
||||
return
|
||||
data = self.serialize()
|
||||
path = self.getPath()
|
||||
while context.sessionIsValid():
|
||||
|
Loading…
Reference in New Issue
Block a user