Only update ZKObject values in ZK if necessary
Several parts of the NNFI implementation set attributes without checking their existing values. Usually they will be the same from one pass to the next (like "item.active = active"). That is inefficient. Rather than updating all that code and changing our python programming habits, let's make ZKObject smarter so it doesn't write to ZK when it's not necessary. Change-Id: Ibbe0d52a0f6ce28ad26753f7c5133027f572b33c
This commit is contained in:
@@ -1496,18 +1496,39 @@ class TestZKObject(ZooKeeperBaseTestCase):
|
||||
'/zuul/pipeline/fake_tenant')
|
||||
self.assertEqual(pipeline2.foo, 'bar')
|
||||
|
||||
def get_ltime(obj):
|
||||
zstat = self.zk_client.client.exists(obj.getPath())
|
||||
return zstat.last_modified_transaction_id
|
||||
|
||||
# Update an object
|
||||
with tenant_write_lock(self.zk_client, tenant_name) as lock:
|
||||
context = ZKContext(self.zk_client, lock, stop_event, self.log)
|
||||
pipeline1.updateAttributes(context, foo='baz')
|
||||
self.assertEqual(pipeline1.foo, 'baz')
|
||||
ltime1 = get_ltime(pipeline1)
|
||||
pipeline1.updateAttributes(context, foo='qux')
|
||||
self.assertEqual(pipeline1.foo, 'qux')
|
||||
ltime2 = get_ltime(pipeline1)
|
||||
self.assertNotEqual(ltime1, ltime2)
|
||||
|
||||
# This should not produce an unnecessary write
|
||||
pipeline1.updateAttributes(context, foo='qux')
|
||||
ltime3 = get_ltime(pipeline1)
|
||||
self.assertEqual(ltime2, ltime3)
|
||||
|
||||
# Update an object using an active context
|
||||
with tenant_write_lock(self.zk_client, tenant_name) as lock:
|
||||
context = ZKContext(self.zk_client, lock, stop_event, self.log)
|
||||
ltime1 = get_ltime(pipeline1)
|
||||
with pipeline1.activeContext(context):
|
||||
pipeline1.foo = 'baz'
|
||||
self.assertEqual(pipeline1.foo, 'baz')
|
||||
self.assertEqual(pipeline1.foo, 'baz')
|
||||
ltime2 = get_ltime(pipeline1)
|
||||
self.assertNotEqual(ltime1, ltime2)
|
||||
|
||||
# This should not produce an unnecessary write
|
||||
with pipeline1.activeContext(context):
|
||||
pipeline1.foo = 'baz'
|
||||
ltime3 = get_ltime(pipeline1)
|
||||
self.assertEqual(ltime2, ltime3)
|
||||
|
||||
# Update of object w/o active context should not work
|
||||
with testtools.ExpectedException(Exception):
|
||||
|
||||
@@ -1873,7 +1873,8 @@ class JobData(zkobject.ShardedZKObject):
|
||||
obj = klass()
|
||||
kw['hash'] = JobData.getHash(kw['data'])
|
||||
obj._set(**kw)
|
||||
obj._save(context, create=True)
|
||||
data = obj._trySerialize(context)
|
||||
obj._save(context, data, create=True)
|
||||
return obj
|
||||
|
||||
@staticmethod
|
||||
@@ -1963,7 +1964,8 @@ class FrozenJob(zkobject.ZKObject):
|
||||
v = None
|
||||
kw['_' + k] = v
|
||||
obj._set(**kw)
|
||||
obj._save(context, create=True)
|
||||
data = obj._trySerialize(context)
|
||||
obj._save(context, data, create=True)
|
||||
|
||||
# If we need to make any JobData entries, do that now.
|
||||
update_kw = {}
|
||||
@@ -3832,12 +3834,14 @@ class BuildSet(zkobject.ZKObject):
|
||||
def removeJobNodeSetInfo(self, job_name):
|
||||
if job_name not in self.nodeset_info:
|
||||
raise Exception("No job nodeset for %s" % (job_name))
|
||||
del self.nodeset_info[job_name]
|
||||
with self.activeContext(self.item.pipeline.manager.current_context):
|
||||
del self.nodeset_info[job_name]
|
||||
|
||||
def setJobNodeRequestID(self, job_name, request_id):
|
||||
if job_name in self.node_requests:
|
||||
raise Exception("Prior node request for %s" % (job_name))
|
||||
self.node_requests[job_name] = request_id
|
||||
with self.activeContext(self.item.pipeline.manager.current_context):
|
||||
self.node_requests[job_name] = request_id
|
||||
|
||||
def getJobNodeRequestID(self, job_name):
|
||||
return self.node_requests.get(job_name)
|
||||
@@ -3858,7 +3862,8 @@ class BuildSet(zkobject.ZKObject):
|
||||
info['zone'] = None
|
||||
info['provider'] = node.provider
|
||||
info['nodes'] = [n.id for n in nodeset.getNodes()]
|
||||
self.nodeset_info[job_name] = info
|
||||
with self.activeContext(self.item.pipeline.manager.current_context):
|
||||
self.nodeset_info[job_name] = info
|
||||
|
||||
def getTries(self, job_name):
|
||||
return self.tries.get(job_name, 0)
|
||||
@@ -3946,7 +3951,8 @@ class QueueItem(zkobject.ZKObject):
|
||||
def new(klass, context, **kw):
|
||||
obj = klass()
|
||||
obj._set(**kw)
|
||||
obj._save(context, create=True)
|
||||
data = obj._trySerialize(context)
|
||||
obj._save(context, data, create=True)
|
||||
files_state = (BuildSet.COMPLETE if obj.change.files is not None
|
||||
else BuildSet.NEW)
|
||||
obj.updateAttributes(context, current_build_set=BuildSet.new(
|
||||
|
||||
@@ -2200,7 +2200,9 @@ class Scheduler(threading.Thread):
|
||||
|
||||
build.end_time = event_result["end_time"]
|
||||
build.setResultData(result_data, secret_result_data)
|
||||
build.build_set.warning_messages.extend(warnings)
|
||||
build.build_set.updateAttributes(
|
||||
pipeline.manager.current_context,
|
||||
warning_messages=build.build_set.warning_messages + warnings)
|
||||
build.held = event_result.get("held")
|
||||
|
||||
build.result = result
|
||||
|
||||
@@ -66,8 +66,8 @@ class BranchCacheZKObject(ShardedZKObject):
|
||||
}
|
||||
return json.dumps(data).encode("utf8")
|
||||
|
||||
def _save(self, context, create=False):
|
||||
super()._save(context, create)
|
||||
def _save(self, context, data, create=False):
|
||||
super()._save(context, data, create)
|
||||
zstat = context.client.exists(self.getPath())
|
||||
self._set(_zstat=zstat)
|
||||
|
||||
|
||||
@@ -98,13 +98,16 @@ class ZKObject:
|
||||
call as possible for efficient network use.
|
||||
"""
|
||||
old = self.__dict__.copy()
|
||||
oldserial = self._trySerialize(context)
|
||||
self._set(**kw)
|
||||
try:
|
||||
self._save(context)
|
||||
except Exception:
|
||||
# Roll back our old values if we aren't able to update ZK.
|
||||
self._set(**old)
|
||||
raise
|
||||
newserial = self._trySerialize(context)
|
||||
if oldserial != newserial:
|
||||
try:
|
||||
self._save(context, newserial)
|
||||
except Exception:
|
||||
# Roll back our old values if we aren't able to update ZK.
|
||||
self._set(**old)
|
||||
raise
|
||||
|
||||
@contextlib.contextmanager
|
||||
def activeContext(self, context):
|
||||
@@ -113,14 +116,17 @@ class ZKObject:
|
||||
f"Another context is already active {self._active_context}")
|
||||
try:
|
||||
old = self.__dict__.copy()
|
||||
oldserial = self._trySerialize(context)
|
||||
self._set(_active_context=context)
|
||||
yield
|
||||
try:
|
||||
self._save(context)
|
||||
except Exception:
|
||||
# Roll back our old values if we aren't able to update ZK.
|
||||
self._set(**old)
|
||||
raise
|
||||
newserial = self._trySerialize(context)
|
||||
if oldserial != newserial:
|
||||
try:
|
||||
self._save(context, newserial)
|
||||
except Exception:
|
||||
# Roll back our old values if we aren't able to update ZK.
|
||||
self._set(**old)
|
||||
raise
|
||||
finally:
|
||||
self._set(_active_context=None)
|
||||
|
||||
@@ -129,7 +135,8 @@ class ZKObject:
|
||||
"""Create a new instance and save it in ZooKeeper"""
|
||||
obj = klass()
|
||||
obj._set(**kw)
|
||||
obj._save(context, create=True)
|
||||
data = obj._trySerialize(context)
|
||||
obj._save(context, data, create=True)
|
||||
return obj
|
||||
|
||||
@classmethod
|
||||
@@ -144,6 +151,18 @@ class ZKObject:
|
||||
"""Update data from ZK"""
|
||||
self._load(context)
|
||||
|
||||
def _trySerialize(self, context):
|
||||
if isinstance(context, LocalZKContext):
|
||||
return b''
|
||||
try:
|
||||
return self.serialize()
|
||||
except Exception:
|
||||
# A higher level must handle this exception, but log
|
||||
# ourself here so we know what object triggered it.
|
||||
context.log.error(
|
||||
"Exception serializing ZKObject %s", self)
|
||||
raise
|
||||
|
||||
def delete(self, context):
|
||||
path = self.getPath()
|
||||
while context.sessionIsValid():
|
||||
@@ -195,17 +214,9 @@ class ZKObject:
|
||||
raise
|
||||
raise Exception("ZooKeeper session or lock not valid")
|
||||
|
||||
def _save(self, context, create=False):
|
||||
def _save(self, context, data, create=False):
|
||||
if isinstance(context, LocalZKContext):
|
||||
return
|
||||
try:
|
||||
data = self.serialize()
|
||||
except Exception:
|
||||
# A higher level must handle this exception, but log
|
||||
# ourself here so we know what object triggered it.
|
||||
context.log.error(
|
||||
"Exception serializing ZKObject %s", self)
|
||||
raise
|
||||
path = self.getPath()
|
||||
while context.sessionIsValid():
|
||||
try:
|
||||
@@ -275,17 +286,9 @@ class ShardedZKObject(ZKObject):
|
||||
raise InvalidObjectError from exc
|
||||
raise Exception("ZooKeeper session or lock not valid")
|
||||
|
||||
def _save(self, context, create=False):
|
||||
def _save(self, context, data, create=False):
|
||||
if isinstance(context, LocalZKContext):
|
||||
return
|
||||
try:
|
||||
data = self.serialize()
|
||||
except Exception:
|
||||
# A higher level must handle this exception, but log
|
||||
# ourself here so we know what object triggered it.
|
||||
context.log.error(
|
||||
"Exception serializing ZKObject %s", self)
|
||||
raise
|
||||
path = self.getPath()
|
||||
while context.sessionIsValid():
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user