Move statemachine node init into TPE
This moves the node initialization and lock from the assignHandlers thread to a new threadpool executor. There are several ZK calls that happen in sequence as part of this, and if we move them out of the assignHandlers thread we can increase overall throughput. Change-Id: I67a32eed4102ab6ff56b1c21a65fe7dd071448e5
This commit is contained in:
parent
e248f96adc
commit
713ba8b7f4
|
@ -84,6 +84,7 @@ class StateMachineNodeLauncher(stats.StatsReporter):
|
|||
self.provider_config = provider_config
|
||||
# Local additions:
|
||||
self.state_machine = None
|
||||
self.start_future = None
|
||||
self.keyscan_future = None
|
||||
self.manager = handler.manager
|
||||
self.start_time = None
|
||||
|
@ -94,6 +95,10 @@ class StateMachineNodeLauncher(stats.StatsReporter):
|
|||
return True
|
||||
|
||||
def launch(self):
|
||||
self.start_future = self.manager.state_machine_start_worker.submit(
|
||||
self.startStateMachine)
|
||||
|
||||
def startStateMachine(self):
|
||||
label = self.handler.pool.labels[self.node.type[0]]
|
||||
if label.diskimage:
|
||||
diskimage = self.provider_config.diskimages[
|
||||
|
@ -178,14 +183,18 @@ class StateMachineNodeLauncher(stats.StatsReporter):
|
|||
|
||||
def runStateMachine(self):
|
||||
instance = None
|
||||
state_machine = self.state_machine
|
||||
node = self.node
|
||||
statsd_key = 'ready'
|
||||
|
||||
if self.start_time is None:
|
||||
self.start_time = time.monotonic()
|
||||
|
||||
try:
|
||||
if self.state_machine is None:
|
||||
if self.start_future and self.start_future.done():
|
||||
self.start_future.result()
|
||||
else:
|
||||
return
|
||||
state_machine = self.state_machine
|
||||
if self.start_time is None:
|
||||
self.start_time = time.monotonic()
|
||||
if (self.state_machine.complete and self.keyscan_future
|
||||
and self.keyscan_future.done()):
|
||||
keys = self.keyscan_future.result()
|
||||
|
@ -233,12 +242,14 @@ class StateMachineNodeLauncher(stats.StatsReporter):
|
|||
"Lost ZooKeeper session trying to launch for node %s",
|
||||
node.id)
|
||||
node.state = zk.FAILED
|
||||
node.external_id = state_machine.external_id
|
||||
if state_machine:
|
||||
node.external_id = state_machine.external_id
|
||||
statsd_key = 'error.zksession'
|
||||
except exceptions.QuotaException:
|
||||
self.log.info("Aborting node %s due to quota failure" % node.id)
|
||||
node.state = zk.ABORTED
|
||||
node.external_id = state_machine.external_id
|
||||
if state_machine:
|
||||
node.external_id = state_machine.external_id
|
||||
self.zk.storeNode(node)
|
||||
statsd_key = 'error.quota'
|
||||
self.manager.invalidateQuotaCache()
|
||||
|
@ -246,7 +257,8 @@ class StateMachineNodeLauncher(stats.StatsReporter):
|
|||
self.log.exception(
|
||||
"Launch failed for node %s:", node.id)
|
||||
node.state = zk.FAILED
|
||||
node.external_id = state_machine.external_id
|
||||
if state_machine:
|
||||
node.external_id = state_machine.external_id
|
||||
self.zk.storeNode(node)
|
||||
|
||||
if hasattr(e, 'statsd_key'):
|
||||
|
@ -527,6 +539,7 @@ class StateMachineProvider(Provider, QuotaSupport):
|
|||
self.launchers = []
|
||||
self._zk = None
|
||||
self.keyscan_worker = None
|
||||
self.start_machine_start_worker = None
|
||||
self.state_machine_thread = None
|
||||
self.running = False
|
||||
num_labels = sum([len(pool.labels)
|
||||
|
@ -547,6 +560,14 @@ class StateMachineProvider(Provider, QuotaSupport):
|
|||
self.keyscan_worker = ThreadPoolExecutor(
|
||||
thread_name_prefix=f'keyscan-{self.provider.name}',
|
||||
max_workers=workers)
|
||||
# This is mostly ZK operations so we don't expect to need as
|
||||
# much parallelism.
|
||||
workers = 8
|
||||
self.log.info("Create state machiner starter with max workers=%s",
|
||||
workers)
|
||||
self.state_machine_start_worker = ThreadPoolExecutor(
|
||||
thread_name_prefix=f'start-{self.provider.name}',
|
||||
max_workers=workers)
|
||||
self.state_machine_thread = threading.Thread(
|
||||
target=self._runStateMachines,
|
||||
daemon=True)
|
||||
|
@ -560,6 +581,8 @@ class StateMachineProvider(Provider, QuotaSupport):
|
|||
self.running = False
|
||||
if self.keyscan_worker:
|
||||
self.keyscan_worker.shutdown()
|
||||
if self.state_machine_start_worker:
|
||||
self.state_machine_start_worker.shutdown()
|
||||
self.adapter.stop()
|
||||
self.log.debug("Stopped")
|
||||
|
||||
|
|
Loading…
Reference in New Issue