Run two statemachine driver threads

Currenly a single thread runs all of the delete state machines,
followed by all of the create state machines.  Each of these has
some ZK overhead as they either update or delete znodes.  The
delete state machines for the openstack driver in particular don't
do much other than delete znodes.  By using one thread to drive
create state machines and a second thread to delete them, we can
parallelize more ZK operations, and in the case where quota is
available, more quickly respond to create requests.

Change-Id: I5c157c5c29c709ee52d2ddd8a54eb9062ee20ab9
This commit is contained in:
James E. Blair 2023-03-15 13:39:34 -07:00
parent c56cab313a
commit f9422d6342
1 changed files with 38 additions and 20 deletions

View File

@ -535,7 +535,8 @@ class StateMachineProvider(Provider, QuotaSupport):
self.launchers = []
self._zk = None
self.keyscan_worker = None
self.state_machine_thread = None
self.create_state_machine_thread = None
self.delete_state_machine_thread = None
self.running = False
num_labels = sum([len(pool.labels)
for pool in provider.pools.values()])
@ -556,10 +557,14 @@ class StateMachineProvider(Provider, QuotaSupport):
self.keyscan_worker = ThreadPoolExecutor(
thread_name_prefix=f'keyscan-{self.provider.name}',
max_workers=workers)
self.state_machine_thread = threading.Thread(
target=self._runStateMachines,
self.create_state_machine_thread = threading.Thread(
target=self._runCreateStateMachines,
daemon=True)
self.state_machine_thread.start()
self.create_state_machine_thread.start()
self.delete_state_machine_thread = threading.Thread(
target=self._runDeleteStateMachines,
daemon=True)
self.delete_state_machine_thread.start()
def stop(self):
self.log.debug("Stopping")
@ -569,7 +574,8 @@ class StateMachineProvider(Provider, QuotaSupport):
self.stop_thread.start()
def _stop(self):
if self.state_machine_thread:
if (self.create_state_machine_thread or
self.delete_state_machine_thread):
while self.launchers or self.deleters:
time.sleep(1)
self.running = False
@ -583,43 +589,55 @@ class StateMachineProvider(Provider, QuotaSupport):
def join(self):
self.log.debug("Joining")
if self.state_machine_thread:
self.state_machine_thread.join()
if self.create_state_machine_thread:
self.create_state_machine_thread.join()
if self.delete_state_machine_thread:
self.delete_state_machine_thread.join()
if self.stop_thread:
self.stop_thread.join()
self.log.debug("Joined")
def _runStateMachines(self):
def _runStateMachines(self, create_or_delete, state_machines):
while self.running:
to_remove = []
loop_start = time.monotonic()
state_machines = self.deleters + self.launchers
if state_machines:
self.log.debug("Running %s state machines",
len(state_machines))
self.log.debug("Running %s %s state machines",
len(state_machines), create_or_delete)
for sm in state_machines:
try:
node_id = None
if sm.node:
node_id = sm.node.id
sm.runStateMachine()
if sm.complete:
self.log.debug("Removing state machine from runner")
self.log.debug(
f"Removing {create_or_delete} state machine "
f"for {node_id} from runner")
to_remove.append(sm)
except Exception:
self.log.exception("Error running state machine:")
self.log.exception(
f"Error running {create_or_delete} state machine "
f"for {node_id}:")
for sm in to_remove:
if sm in self.deleters:
self.deleters.remove(sm)
if sm in self.launchers:
self.launchers.remove(sm)
state_machines.remove(sm)
loop_end = time.monotonic()
if state_machines or to_remove:
self.log.debug("Ran %s %s state machines in %s seconds",
len(state_machines), create_or_delete,
loop_end - loop_start)
if state_machines:
self.log.debug("Ran %s state machines in %s seconds",
len(state_machines), loop_end - loop_start)
if self.launchers or self.deleters:
time.sleep(max(0, self.MAXIMUM_SLEEP -
(loop_end - loop_start)))
else:
time.sleep(self.MINIMUM_SLEEP)
def _runCreateStateMachines(self):
self._runStateMachines("create", self.launchers)
def _runDeleteStateMachines(self):
self._runStateMachines("delete", self.deleters)
def getRequestHandler(self, poolworker, request):
return StateMachineHandler(poolworker, request)