Add pipeline timing metrics
This adds several metrics for different phases of processing an item in a pipeline: * How long we wait for a response from mergers * How long it takes to get or compute a layout * How long it takes to freeze jobs * How long we wait for node requests to complete * How long we wait for an executor to start running a job after the request And finally, the total amount of time from the original event until the first job starts. We already report that at the tenant level, this duplicates that for a pipeline-specific metric. Several of these would also make sense as job metrics, but since they are mainly intended to diagnose Zuul system performance and not individual jobs, that would be a waste of storage space due to the extremely high cardinality. Additionally, two other timing metrics are added: the cumulative time spent reading and writing ZKObject data to ZK during pipeline processing. These can help determine whether more effort should be spent optimizing ZK data transfer. In preparing this change, I noticed that python statsd emits floating point values for timing. It's not clear whether this strictly matches the statsd spec, but since it does emit values with that precision, I have removed several int() casts in order to maintain the precision through to the statsd client. I also noticed a place where we were writing a monotonic timestamp value in a JSON serialized string to ZK. I do not believe this value is currently being used, therefore there is no further error to correct, however, we should not use time.monotonic() for values that are serialized since the reference clock will be different on different systems. Several new attributes are added to the QueueItem and Build classes, but are done so in a way that is backwards compatible, so no model api schema upgrade is needed. The code sites where they are used protect against the null values which will occur in a mixed-version cluster (the components will just not emit these stats in those cases). Change-Id: Iaacbef7fa2ed93bfc398a118c5e8cfbc0a67b846
This commit is contained in:
@ -79,11 +79,66 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
|
||||
|
||||
Holds metrics specific to jobs. This hierarchy includes:
|
||||
|
||||
.. stat:: <pipeline name>
|
||||
.. stat:: <pipeline>
|
||||
|
||||
A set of metrics for each pipeline named as defined in the Zuul
|
||||
config.
|
||||
|
||||
.. stat:: event_enqueue_time
|
||||
:type: timer
|
||||
|
||||
The time elapsed from when a trigger event was received from
|
||||
the remote system to when the corresponding item is enqueued
|
||||
in a pipeline.
|
||||
|
||||
.. stat:: merge_request_time
|
||||
:type: timer
|
||||
|
||||
The amount of time spent waiting for the initial merge
|
||||
operation(s). This will always include a request to a Zuul
|
||||
merger to speculatively merge the change, but it may also
|
||||
include a second request submitted in parallel to identify
|
||||
the files altered by the change.
|
||||
|
||||
.. stat:: layout_generation_time
|
||||
:type: timer
|
||||
|
||||
The amount of time spent generating a dynamic configuration layout.
|
||||
|
||||
.. stat:: job_freeze_time
|
||||
:type: timer
|
||||
|
||||
The amount of time spent freezing the inheritance hierarchy
|
||||
and parameters of a job.
|
||||
|
||||
.. stat:: repo_state_time
|
||||
:type: timer
|
||||
|
||||
The amount of time waiting for a secondary Zuul merger
|
||||
operation to collect additional information about the repo
|
||||
state of required projects.
|
||||
|
||||
.. stat:: node_request_time
|
||||
:type: timer
|
||||
|
||||
The amount of time spent waiting for each node request to be
|
||||
fulfilled.
|
||||
|
||||
.. stat:: job_wait_time
|
||||
:type: timer
|
||||
|
||||
How long a job waited for an executor to start running it
|
||||
after the build was requested.
|
||||
|
||||
.. stat:: event_job_time
|
||||
:type: timer
|
||||
|
||||
The total amount of time elapsed from when a trigger event
|
||||
was received from the remote system until the item's first
|
||||
job is run. This is only emitted once per queue item, even
|
||||
if its buildset is reset due to a speculative execution
|
||||
failure.
|
||||
|
||||
.. stat:: all_jobs
|
||||
:type: counter
|
||||
|
||||
@ -153,8 +208,8 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
|
||||
.. stat:: wait_time
|
||||
:type: timer
|
||||
|
||||
How long each item spent in the pipeline before its first job
|
||||
started.
|
||||
How long the job waited for an executor to
|
||||
start running it after the build was requested.
|
||||
|
||||
.. stat:: current_changes
|
||||
:type: gauge
|
||||
@ -174,6 +229,12 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
|
||||
The number of changes for this project processed by the
|
||||
pipeline since Zuul started.
|
||||
|
||||
.. stat:: read_time
|
||||
:type: timer
|
||||
|
||||
The time spent reading data from ZooKeeper during a single
|
||||
pipeline processing run.
|
||||
|
||||
.. stat:: refresh
|
||||
:type: timer
|
||||
|
||||
@ -206,6 +267,12 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
|
||||
|
||||
The size of the pipeline's management event queue.
|
||||
|
||||
.. stat:: write_time
|
||||
:type: timer
|
||||
|
||||
The time spent writing data to ZooKeeper during a single
|
||||
pipeline processing run.
|
||||
|
||||
.. stat:: zuul.executor.<executor>
|
||||
|
||||
Holds metrics emitted by individual executors. The ``<executor>``
|
||||
|
16
releasenotes/notes/pipeline-timing-ea263e6e5939b1aa.yaml
Normal file
16
releasenotes/notes/pipeline-timing-ea263e6e5939b1aa.yaml
Normal file
@ -0,0 +1,16 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
The following new statsd metrics are available in order to monitor
|
||||
Zuul system performance:
|
||||
|
||||
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.event_enqueue_time`
|
||||
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.merge_request_time`
|
||||
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.layout_generation_time`
|
||||
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.job_freeze_time`
|
||||
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.repo_state_time`
|
||||
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.node_request_time`
|
||||
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.job_wait_time`
|
||||
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.event_job_time`
|
||||
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.read_time`
|
||||
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.write_time`
|
@ -5544,6 +5544,10 @@ class ZuulTestCase(BaseTestCase):
|
||||
return s_value
|
||||
time.sleep(0.1)
|
||||
|
||||
stats = list(itertools.chain.from_iterable(
|
||||
[s.decode('utf-8').split('\n') for s in self.statsd.stats]))
|
||||
for stat in stats:
|
||||
self.log.debug("Stat: %s", stat)
|
||||
raise StatException("Key %s not found in reported stats" % key)
|
||||
|
||||
def assertUnReportedStat(self, key, value=None, kind=None):
|
||||
|
@ -1,3 +1,8 @@
|
||||
[statsd]
|
||||
# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
|
||||
# see: https://github.com/jsocol/pystatsd/issues/61
|
||||
server=127.0.0.1
|
||||
|
||||
[scheduler]
|
||||
tenant_config=config/multi-driver/main.yaml
|
||||
|
||||
|
5
tests/fixtures/zuul-sql-driver-mysql.conf
vendored
5
tests/fixtures/zuul-sql-driver-mysql.conf
vendored
@ -1,3 +1,8 @@
|
||||
[statsd]
|
||||
# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
|
||||
# see: https://github.com/jsocol/pystatsd/issues/61
|
||||
server=127.0.0.1
|
||||
|
||||
[scheduler]
|
||||
tenant_config=main.yaml
|
||||
|
||||
|
5
tests/fixtures/zuul-sql-driver-postgres.conf
vendored
5
tests/fixtures/zuul-sql-driver-postgres.conf
vendored
@ -1,3 +1,8 @@
|
||||
[statsd]
|
||||
# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
|
||||
# see: https://github.com/jsocol/pystatsd/issues/61
|
||||
server=127.0.0.1
|
||||
|
||||
[scheduler]
|
||||
tenant_config=main.yaml
|
||||
|
||||
|
@ -440,13 +440,21 @@ class TestScheduler(ZuulTestCase):
|
||||
value='0', kind='g')
|
||||
|
||||
# Catch time / monotonic errors
|
||||
val = self.assertReportedStat(
|
||||
'zuul.tenant.tenant-one.event_enqueue_processing_time',
|
||||
kind='ms')
|
||||
self.assertTrue(0.0 < float(val) < 60000.0)
|
||||
val = self.assertReportedStat(
|
||||
'zuul.tenant.tenant-one.event_enqueue_time', kind='ms')
|
||||
self.assertTrue(0.0 < float(val) < 60000.0)
|
||||
for key in [
|
||||
'zuul.tenant.tenant-one.event_enqueue_processing_time',
|
||||
'zuul.tenant.tenant-one.event_enqueue_time',
|
||||
'zuul.tenant.tenant-one.pipeline.gate.event_enqueue_time',
|
||||
'zuul.tenant.tenant-one.pipeline.gate.merge_request_time',
|
||||
'zuul.tenant.tenant-one.pipeline.gate.job_freeze_time',
|
||||
'zuul.tenant.tenant-one.pipeline.gate.node_request_time',
|
||||
'zuul.tenant.tenant-one.pipeline.gate.job_wait_time',
|
||||
'zuul.tenant.tenant-one.pipeline.gate.event_job_time',
|
||||
'zuul.tenant.tenant-one.pipeline.gate.resident_time',
|
||||
'zuul.tenant.tenant-one.pipeline.gate.read_time',
|
||||
'zuul.tenant.tenant-one.pipeline.gate.write_time',
|
||||
]:
|
||||
val = self.assertReportedStat(key, kind='ms')
|
||||
self.assertTrue(0.0 < float(val) < 60000.0)
|
||||
|
||||
self.assertReportedStat('zuul.tenant.tenant-one.pipeline.gate.'
|
||||
'data_size_compressed',
|
||||
|
@ -1240,6 +1240,12 @@ class TestInRepoConfig(ZuulTestCase):
|
||||
dict(name='project-test3', result='SUCCESS', changes='2,1'),
|
||||
], ordered=False)
|
||||
|
||||
# Catch time / monotonic errors
|
||||
val = self.assertReportedStat('zuul.tenant.tenant-one.pipeline.'
|
||||
'tenant-one-gate.layout_generation_time',
|
||||
kind='ms')
|
||||
self.assertTrue(0.0 < float(val) < 60000.0)
|
||||
|
||||
def test_dynamic_template(self):
|
||||
# Tests that a project can't update a template in another
|
||||
# project.
|
||||
@ -7196,6 +7202,12 @@ class TestProvidesRequiresMysql(ZuulTestCase):
|
||||
}
|
||||
}])
|
||||
|
||||
# Catch time / monotonic errors
|
||||
val = self.assertReportedStat('zuul.tenant.tenant-one.pipeline.'
|
||||
'gate.repo_state_time',
|
||||
kind='ms')
|
||||
self.assertTrue(0.0 < float(val) < 60000.0)
|
||||
|
||||
@simple_layout('layouts/provides-requires-unshared.yaml')
|
||||
def test_provides_requires_unshared_queue(self):
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
@ -1120,7 +1120,10 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
return self.getFallbackLayout(item)
|
||||
|
||||
log.debug("Preparing dynamic layout for: %s" % item.change)
|
||||
return self._loadDynamicLayout(item)
|
||||
start = time.time()
|
||||
layout = self._loadDynamicLayout(item)
|
||||
self.reportPipelineTiming('layout_generation_time', start)
|
||||
return layout
|
||||
|
||||
def _branchesForRepoState(self, projects, tenant, items=None):
|
||||
items = items or []
|
||||
@ -1253,6 +1256,7 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
branches=branches)
|
||||
item.current_build_set.updateAttributes(
|
||||
self.current_context,
|
||||
repo_state_request_time=time.time(),
|
||||
repo_state_state=item.current_build_set.PENDING)
|
||||
return True
|
||||
|
||||
@ -1341,10 +1345,12 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
if not item.current_build_set.job_graph:
|
||||
try:
|
||||
log.debug("Freezing job graph for %s" % (item,))
|
||||
start = time.time()
|
||||
item.freezeJobGraph(self.getLayout(item),
|
||||
self.current_context,
|
||||
skip_file_matcher=False,
|
||||
redact_secrets_and_keys=False)
|
||||
self.reportPipelineTiming('job_freeze_time', start)
|
||||
except Exception as e:
|
||||
# TODOv3(jeblair): nicify this exception as it will be reported
|
||||
log.exception("Error freezing job graph for %s" % (item,))
|
||||
@ -1564,6 +1570,17 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
log = get_annotated_logger(self.log, build.zuul_event_id)
|
||||
log.debug("Build %s started", build)
|
||||
self.sql.reportBuildStart(build)
|
||||
self.reportPipelineTiming('job_wait_time',
|
||||
build.execute_time, build.start_time)
|
||||
if not build.build_set.item.first_job_start_time:
|
||||
# Only report this for the first job in a queue item so
|
||||
# that we don't include gate resets.
|
||||
build.build_set.item.updateAttributes(
|
||||
self.current_context,
|
||||
first_job_start_time=build.start_time)
|
||||
self.reportPipelineTiming('event_job_time',
|
||||
build.build_set.item.event.timestamp,
|
||||
build.start_time)
|
||||
return True
|
||||
|
||||
def onBuildPaused(self, build):
|
||||
@ -1664,14 +1681,25 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
source.setChangeAttributes(item.change, files=event.files)
|
||||
build_set.updateAttributes(self.current_context,
|
||||
files_state=build_set.COMPLETE)
|
||||
if build_set.merge_state == build_set.COMPLETE:
|
||||
# We're the second of the files/merger pair, report the stat
|
||||
self.reportPipelineTiming('merge_request_time',
|
||||
build_set.configured_time)
|
||||
|
||||
def onMergeCompleted(self, event, build_set):
|
||||
if build_set.merge_state == build_set.COMPLETE:
|
||||
self._onGlobalRepoStateCompleted(event, build_set)
|
||||
self.reportPipelineTiming('repo_state_time',
|
||||
build_set.repo_state_request_time)
|
||||
else:
|
||||
self._onMergeCompleted(event, build_set)
|
||||
if build_set.files_state == build_set.COMPLETE:
|
||||
# We're the second of the files/merger pair, report the stat
|
||||
self.reportPipelineTiming('merge_request_time',
|
||||
build_set.configured_time)
|
||||
|
||||
def _onMergeCompleted(self, event, build_set):
|
||||
|
||||
item = build_set.item
|
||||
source = self.sched.connections.getSource(
|
||||
item.change.project.connection_name)
|
||||
@ -1702,12 +1730,14 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
item.setUnableToMerge()
|
||||
|
||||
def _onGlobalRepoStateCompleted(self, event, build_set):
|
||||
item = build_set.item
|
||||
if not event.updated:
|
||||
item = build_set.item
|
||||
self.log.info("Unable to get global repo state for change %s"
|
||||
% item.change)
|
||||
item.setUnableToMerge()
|
||||
else:
|
||||
self.log.info("Received global repo state for change %s"
|
||||
% item.change)
|
||||
with build_set.activeContext(self.current_context):
|
||||
build_set.setExtraRepoState(event.repo_state)
|
||||
build_set.repo_state_state = build_set.COMPLETE
|
||||
@ -1716,6 +1746,7 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
# TODOv3(jeblair): handle provisioning failure here
|
||||
log = get_annotated_logger(self.log, request.event_id)
|
||||
|
||||
self.reportPipelineTiming('node_request_time', request.created_time)
|
||||
if nodeset is not None:
|
||||
build_set.jobNodeRequestComplete(request.job_name, nodeset)
|
||||
if not request.fulfilled:
|
||||
@ -1878,7 +1909,7 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
# Update the gauge on enqueue and dequeue, but timers only
|
||||
# when dequeing.
|
||||
if item.dequeue_time:
|
||||
dt = int((item.dequeue_time - item.enqueue_time) * 1000)
|
||||
dt = (item.dequeue_time - item.enqueue_time) * 1000
|
||||
else:
|
||||
dt = None
|
||||
items = len(self.pipeline.getAllItems())
|
||||
@ -1913,12 +1944,27 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
if added and hasattr(item.event, 'arrived_at_scheduler_timestamp'):
|
||||
now = time.time()
|
||||
arrived = item.event.arrived_at_scheduler_timestamp
|
||||
processing = int((now - arrived) * 1000)
|
||||
elapsed = int((now - item.event.timestamp) * 1000)
|
||||
processing = (now - arrived) * 1000
|
||||
elapsed = (now - item.event.timestamp) * 1000
|
||||
self.sched.statsd.timing(
|
||||
basekey + '.event_enqueue_processing_time',
|
||||
processing)
|
||||
self.sched.statsd.timing(
|
||||
basekey + '.event_enqueue_time', elapsed)
|
||||
self.reportPipelineTiming('event_enqueue_time',
|
||||
item.event.timestamp)
|
||||
except Exception:
|
||||
self.log.exception("Exception reporting pipeline stats")
|
||||
|
||||
def reportPipelineTiming(self, key, start, end=None):
|
||||
if not self.sched.statsd:
|
||||
return
|
||||
if not start:
|
||||
return
|
||||
if end is None:
|
||||
end = time.time()
|
||||
pipeline = self.pipeline
|
||||
tenant = pipeline.tenant
|
||||
stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}'
|
||||
dt = (end - start) * 1000
|
||||
self.sched.statsd.timing(f'{stats_key}.{key}', dt)
|
||||
|
@ -1485,6 +1485,7 @@ class NodeRequest(object):
|
||||
if 'tenant_name' in data:
|
||||
self.tenant_name = data['tenant_name']
|
||||
self.nodes = data.get('nodes', [])
|
||||
self.created_time = data.get('created_time')
|
||||
|
||||
@classmethod
|
||||
def fromDict(cls, data):
|
||||
@ -3624,6 +3625,9 @@ class BuildSet(zkobject.ZKObject):
|
||||
files_state=self.NEW,
|
||||
repo_state_state=self.NEW,
|
||||
configured=False,
|
||||
configured_time=None, # When setConfigured was called
|
||||
start_time=None, # When the buildset reported start
|
||||
repo_state_request_time=None, # When the refstate job was called
|
||||
fail_fast=False,
|
||||
job_graph=None,
|
||||
jobs={},
|
||||
@ -3727,6 +3731,9 @@ class BuildSet(zkobject.ZKObject):
|
||||
"fail_fast": self.fail_fast,
|
||||
"job_graph": (self.job_graph.toDict()
|
||||
if self.job_graph else None),
|
||||
"configured_time": self.configured_time,
|
||||
"start_time": self.start_time,
|
||||
"repo_state_request_time": self.repo_state_request_time,
|
||||
# jobs (serialize as separate objects)
|
||||
}
|
||||
return json.dumps(data, sort_keys=True).encode("utf8")
|
||||
@ -3831,8 +3838,8 @@ class BuildSet(zkobject.ZKObject):
|
||||
"builds": builds,
|
||||
"retry_builds": retry_builds,
|
||||
# These are local cache objects only valid for one pipeline run
|
||||
'_old_job_graph': None,
|
||||
'_old_jobs': {},
|
||||
"_old_job_graph": None,
|
||||
"_old_jobs": {},
|
||||
})
|
||||
return data
|
||||
|
||||
@ -3868,6 +3875,7 @@ class BuildSet(zkobject.ZKObject):
|
||||
self.dependent_changes = [i.change.toDict() for i in items]
|
||||
self.merger_items = [i.makeMergerItem() for i in items]
|
||||
self.configured = True
|
||||
self.configured_time = time.time()
|
||||
|
||||
def getStateName(self, state_num):
|
||||
return self.states_map.get(
|
||||
@ -4016,6 +4024,7 @@ class QueueItem(zkobject.ZKObject):
|
||||
enqueue_time=None,
|
||||
report_time=None,
|
||||
dequeue_time=None,
|
||||
first_job_start_time=None,
|
||||
reported=False,
|
||||
reported_start=False,
|
||||
quiet=False,
|
||||
@ -4088,6 +4097,7 @@ class QueueItem(zkobject.ZKObject):
|
||||
"dynamic_state": self.dynamic_state,
|
||||
"bundle": self.bundle and self.bundle.serialize(),
|
||||
"dequeued_bundle_failing": self.dequeued_bundle_failing,
|
||||
"first_job_start_time": self.first_job_start_time,
|
||||
}
|
||||
return json.dumps(data, sort_keys=True).encode("utf8")
|
||||
|
||||
|
@ -146,7 +146,7 @@ class Nodepool(object):
|
||||
if request.canceled:
|
||||
state = 'canceled'
|
||||
elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
|
||||
dt = int((request.state_time - request.requested_time) * 1000)
|
||||
dt = (request.state_time - request.requested_time) * 1000
|
||||
|
||||
key = 'zuul.nodepool.requests.%s' % state
|
||||
pipe.incr(key + ".total")
|
||||
|
@ -795,14 +795,14 @@ class Scheduler(threading.Thread):
|
||||
'RETRY' if build.result is None else build.result
|
||||
)
|
||||
if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
|
||||
dt = int((build.end_time - build.start_time) * 1000)
|
||||
dt = (build.end_time - build.start_time) * 1000
|
||||
self.statsd.timing(key, dt)
|
||||
self.statsd.incr(key)
|
||||
# zuul.tenant.<tenant>.pipeline.<pipeline>.project.
|
||||
# <host>.<project>.<branch>.job.<job>.wait_time
|
||||
if build.start_time:
|
||||
key = '%s.wait_time' % jobkey
|
||||
dt = int((build.start_time - build.execute_time) * 1000)
|
||||
dt = (build.start_time - build.execute_time) * 1000
|
||||
self.statsd.timing(key, dt)
|
||||
except Exception:
|
||||
self.log.exception("Exception reporting runtime stats")
|
||||
@ -1891,6 +1891,11 @@ class Scheduler(threading.Thread):
|
||||
self._process_pipeline(tenant, pipeline)
|
||||
# Update pipeline summary for zuul-web
|
||||
pipeline.summary.update(ctx, self.globals)
|
||||
if self.statsd:
|
||||
self.statsd.timing(f'{stats_key}.read_time',
|
||||
ctx.cumulative_read_time * 1000)
|
||||
self.statsd.timing(f'{stats_key}.write_time',
|
||||
ctx.cumulative_write_time * 1000)
|
||||
except LockException:
|
||||
self.log.debug("Skipping locked pipeline %s in tenant %s",
|
||||
pipeline.name, tenant.name)
|
||||
|
@ -590,7 +590,7 @@ class ManagementEventQueue(ZooKeeperEventQueue):
|
||||
return
|
||||
|
||||
result_data = {"traceback": event.traceback,
|
||||
"timestamp": time.monotonic()}
|
||||
"timestamp": time.time()}
|
||||
try:
|
||||
self.kazoo_client.set(
|
||||
event.result_ref,
|
||||
|
@ -14,6 +14,7 @@
|
||||
|
||||
import io
|
||||
from contextlib import suppress
|
||||
import time
|
||||
import zlib
|
||||
|
||||
from kazoo.exceptions import NoNodeError
|
||||
@ -30,6 +31,8 @@ class RawShardIO(io.RawIOBase):
|
||||
self.shard_base = path
|
||||
self.compressed_bytes_read = 0
|
||||
self.compressed_bytes_written = 0
|
||||
self.cumulative_read_time = 0.0
|
||||
self.cumulative_write_time = 0.0
|
||||
|
||||
def readable(self):
|
||||
return True
|
||||
@ -46,12 +49,17 @@ class RawShardIO(io.RawIOBase):
|
||||
@property
|
||||
def _shards(self):
|
||||
try:
|
||||
return self.client.get_children(self.shard_base)
|
||||
start = time.perf_counter()
|
||||
ret = self.client.get_children(self.shard_base)
|
||||
self.cumulative_read_time += time.perf_counter() - start
|
||||
return ret
|
||||
except NoNodeError:
|
||||
return []
|
||||
|
||||
def _getData(self, path):
|
||||
start = time.perf_counter()
|
||||
data, _ = self.client.get(path)
|
||||
self.cumulative_read_time += time.perf_counter() - start
|
||||
self.compressed_bytes_read += len(data)
|
||||
return zlib.decompress(data)
|
||||
|
||||
@ -69,12 +77,14 @@ class RawShardIO(io.RawIOBase):
|
||||
shard_bytes = zlib.compress(shard_bytes)
|
||||
if not (len(shard_bytes) < NODE_BYTE_SIZE_LIMIT):
|
||||
raise RuntimeError("Shard too large")
|
||||
start = time.perf_counter()
|
||||
self.client.create(
|
||||
"{}/".format(self.shard_base),
|
||||
shard_bytes,
|
||||
sequence=True,
|
||||
makepath=True,
|
||||
)
|
||||
self.cumulative_write_time += time.perf_counter() - start
|
||||
self.compressed_bytes_written += len(shard_bytes)
|
||||
return min(byte_count, NODE_BYTE_SIZE_LIMIT)
|
||||
|
||||
@ -88,6 +98,10 @@ class BufferedShardWriter(io.BufferedWriter):
|
||||
def compressed_bytes_written(self):
|
||||
return self.__raw.compressed_bytes_written
|
||||
|
||||
@property
|
||||
def cumulative_write_time(self):
|
||||
return self.__raw.cumulative_write_time
|
||||
|
||||
|
||||
class BufferedShardReader(io.BufferedReader):
|
||||
def __init__(self, client, path):
|
||||
@ -97,3 +111,7 @@ class BufferedShardReader(io.BufferedReader):
|
||||
@property
|
||||
def compressed_bytes_read(self):
|
||||
return self.__raw.compressed_bytes_read
|
||||
|
||||
@property
|
||||
def cumulative_read_time(self):
|
||||
return self.__raw.cumulative_read_time
|
||||
|
@ -31,6 +31,8 @@ class ZKContext:
|
||||
self.lock = lock
|
||||
self.stop_event = stop_event
|
||||
self.log = log
|
||||
self.cumulative_read_time = 0.0
|
||||
self.cumulative_write_time = 0.0
|
||||
|
||||
def sessionIsValid(self):
|
||||
return ((not self.lock or self.lock.is_still_valid()) and
|
||||
@ -237,7 +239,10 @@ class ZKObject:
|
||||
path = self.getPath()
|
||||
while context.sessionIsValid():
|
||||
try:
|
||||
start = time.perf_counter()
|
||||
compressed_data, zstat = context.client.get(path)
|
||||
context.cumulative_read_time += time.perf_counter() - start
|
||||
|
||||
self._set(_zkobject_hash=None)
|
||||
try:
|
||||
data = zlib.decompress(compressed_data)
|
||||
@ -278,6 +283,7 @@ class ZKObject:
|
||||
while context.sessionIsValid():
|
||||
try:
|
||||
compressed_data = zlib.compress(data)
|
||||
start = time.perf_counter()
|
||||
if create:
|
||||
real_path, zstat = context.client.create(
|
||||
path, compressed_data, makepath=True,
|
||||
@ -285,6 +291,7 @@ class ZKObject:
|
||||
else:
|
||||
zstat = context.client.set(path, compressed_data,
|
||||
version=self._zstat.version)
|
||||
context.cumulative_write_time += time.perf_counter() - start
|
||||
self._set(_zstat=zstat,
|
||||
_zkobject_hash=hash(data),
|
||||
_zkobject_compressed_size=len(compressed_data),
|
||||
@ -336,6 +343,8 @@ class ShardedZKObject(ZKObject):
|
||||
context.client, path) as stream:
|
||||
data = stream.read()
|
||||
compressed_size = stream.compressed_bytes_read
|
||||
context.cumulative_read_time += \
|
||||
stream.cumulative_read_time
|
||||
if not data and context.client.exists(path) is None:
|
||||
raise NoNodeError
|
||||
self._set(**self.deserialize(data, context))
|
||||
@ -382,6 +391,9 @@ class ShardedZKObject(ZKObject):
|
||||
stream.write(data)
|
||||
stream.flush()
|
||||
compressed_size = stream.compressed_bytes_written
|
||||
context.cumulative_write_time += \
|
||||
stream.cumulative_write_time
|
||||
|
||||
self._set(_zkobject_hash=hash(data),
|
||||
_zkobject_compressed_size=compressed_size,
|
||||
_zkobject_uncompressed_size=len(data),
|
||||
|
Reference in New Issue
Block a user