Merge "Add more steps to stop an engine"
This commit is contained in:
commit
e9cb9ab547
|
@ -128,7 +128,7 @@ class Engine(object):
|
||||||
self._watchdog_thread.join()
|
self._watchdog_thread.join()
|
||||||
|
|
||||||
def schedule(self):
|
def schedule(self):
|
||||||
while True:
|
while self._state == self.ENABLED:
|
||||||
(next_time, next_jobs) = self.wait_next(self.engine_timeout)
|
(next_time, next_jobs) = self.wait_next(self.engine_timeout)
|
||||||
# NOTE(praneshp): here, call a function that will wait till next
|
# NOTE(praneshp): here, call a function that will wait till next
|
||||||
# time and call next_jobs,
|
# time and call next_jobs,
|
||||||
|
@ -146,8 +146,8 @@ class Engine(object):
|
||||||
if not self.run_queue:
|
if not self.run_queue:
|
||||||
if watch and watch.expired():
|
if watch and watch.expired():
|
||||||
raise exceptions.TimeoutException(
|
raise exceptions.TimeoutException(
|
||||||
"Died after waiting for audits to arrive for %s" %
|
"Died at %s after waiting for audits to arrive "
|
||||||
watch.elapsed())
|
"for %s" % (utils.wallclock(), watch.elapsed()))
|
||||||
else:
|
else:
|
||||||
# Grab all the jobs for the next time.
|
# Grab all the jobs for the next time.
|
||||||
next_jobs.append(self.run_queue.popleft())
|
next_jobs.append(self.run_queue.popleft())
|
||||||
|
@ -169,12 +169,13 @@ class Engine(object):
|
||||||
now = datetime.datetime.now()
|
now = datetime.datetime.now()
|
||||||
cron = croniter.croniter(schedule, now)
|
cron = croniter.croniter(schedule, now)
|
||||||
next_iteration = cron.get_next(datetime.datetime)
|
next_iteration = cron.get_next(datetime.datetime)
|
||||||
while True:
|
while self._state == self.ENABLED:
|
||||||
LOG.info('It is %s, next serializer at %s', now, next_iteration)
|
LOG.info('It is %s, next serializer at %s', now, next_iteration)
|
||||||
pause.until(next_iteration)
|
pause.until(next_iteration)
|
||||||
now = datetime.datetime.now()
|
now = datetime.datetime.now()
|
||||||
next_iteration = cron.get_next(datetime.datetime)
|
next_iteration = cron.get_next(datetime.datetime)
|
||||||
self.run_serializer(next_iteration, now)
|
if self._state == self.ENABLED:
|
||||||
|
self.run_serializer(next_iteration, now)
|
||||||
|
|
||||||
def run_serializer(self, next_iteration, current_time):
|
def run_serializer(self, next_iteration, current_time):
|
||||||
LOG.info("Running serializer for %s at %s", self.name, current_time)
|
LOG.info("Running serializer for %s at %s", self.name, current_time)
|
||||||
|
@ -201,7 +202,10 @@ class Engine(object):
|
||||||
|
|
||||||
new_additions.sort(key=operator.itemgetter('time'))
|
new_additions.sort(key=operator.itemgetter('time'))
|
||||||
|
|
||||||
self.run_queue.extend(new_additions)
|
# NOTE(praneshp): Protect this operation with a state check, so in
|
||||||
|
# case of race conditions no extra audit scripts are added.
|
||||||
|
if self._state == self.ENABLED:
|
||||||
|
self.run_queue.extend(new_additions)
|
||||||
LOG.info("Run queue till %s is %s", next_iteration, self.run_queue)
|
LOG.info("Run queue till %s is %s", next_iteration, self.run_queue)
|
||||||
LOG.info("Repair scripts at %s: %s", next_iteration, self._repairs)
|
LOG.info("Repair scripts at %s: %s", next_iteration, self._repairs)
|
||||||
except Exception:
|
except Exception:
|
||||||
|
@ -216,8 +220,10 @@ class Engine(object):
|
||||||
def stop_engine(self):
|
def stop_engine(self):
|
||||||
LOG.info("Stopping engine %s", self.name)
|
LOG.info("Stopping engine %s", self.name)
|
||||||
# Set state to stop, which will stop serializers
|
# Set state to stop, which will stop serializers
|
||||||
LOG.info("Setting %s to state: %s", self.name, states.DISABLED)
|
# Clear run queue
|
||||||
self._state = states.DISABLED
|
LOG.info("Clearing audit run queue for %s", self.name)
|
||||||
|
self.run_queue.clear()
|
||||||
|
# Stop all repairs - not yet implemented
|
||||||
# Stop watchdog monitoring
|
# Stop watchdog monitoring
|
||||||
LOG.info("Stopping watchdog for %s", self.name)
|
LOG.info("Stopping watchdog for %s", self.name)
|
||||||
self._watchdog_thread.stop()
|
self._watchdog_thread.stop()
|
||||||
|
@ -236,6 +242,11 @@ class Engine(object):
|
||||||
def setup_audit(self, execution_time, audit_list):
|
def setup_audit(self, execution_time, audit_list):
|
||||||
try:
|
try:
|
||||||
pause.until(execution_time)
|
pause.until(execution_time)
|
||||||
|
# Only proceed if engine is running, i.e in enabled state.
|
||||||
|
if self._state != self.ENABLED:
|
||||||
|
LOG.info("%s is disabled, so not running audits at %s",
|
||||||
|
self.name, execution_time)
|
||||||
|
return
|
||||||
LOG.info("Time: %s, Starting %s", execution_time, audit_list)
|
LOG.info("Time: %s, Starting %s", execution_time, audit_list)
|
||||||
audit_futures = []
|
audit_futures = []
|
||||||
for audit in audit_list:
|
for audit in audit_list:
|
||||||
|
|
Loading…
Reference in New Issue