diff --git a/entropy/engine.py b/entropy/engine.py index 36470e3..3d34e81 100644 --- a/entropy/engine.py +++ b/entropy/engine.py @@ -128,7 +128,7 @@ class Engine(object): self._watchdog_thread.join() def schedule(self): - while True: + while self._state == self.ENABLED: (next_time, next_jobs) = self.wait_next(self.engine_timeout) # NOTE(praneshp): here, call a function that will wait till next # time and call next_jobs, @@ -146,8 +146,8 @@ class Engine(object): if not self.run_queue: if watch and watch.expired(): raise exceptions.TimeoutException( - "Died after waiting for audits to arrive for %s" % - watch.elapsed()) + "Died at %s after waiting for audits to arrive " + "for %s" % (utils.wallclock(), watch.elapsed())) else: # Grab all the jobs for the next time. next_jobs.append(self.run_queue.popleft()) @@ -169,12 +169,13 @@ class Engine(object): now = datetime.datetime.now() cron = croniter.croniter(schedule, now) next_iteration = cron.get_next(datetime.datetime) - while True: + while self._state == self.ENABLED: LOG.info('It is %s, next serializer at %s', now, next_iteration) pause.until(next_iteration) now = datetime.datetime.now() next_iteration = cron.get_next(datetime.datetime) - self.run_serializer(next_iteration, now) + if self._state == self.ENABLED: + self.run_serializer(next_iteration, now) def run_serializer(self, next_iteration, current_time): LOG.info("Running serializer for %s at %s", self.name, current_time) @@ -201,7 +202,10 @@ class Engine(object): new_additions.sort(key=operator.itemgetter('time')) - self.run_queue.extend(new_additions) + # NOTE(praneshp): Protect this operation with a state check, so in + # case of race conditions no extra audit scripts are added. + if self._state == self.ENABLED: + self.run_queue.extend(new_additions) LOG.info("Run queue till %s is %s", next_iteration, self.run_queue) LOG.info("Repair scripts at %s: %s", next_iteration, self._repairs) except Exception: @@ -216,8 +220,10 @@ class Engine(object): def stop_engine(self): LOG.info("Stopping engine %s", self.name) # Set state to stop, which will stop serializers - LOG.info("Setting %s to state: %s", self.name, states.DISABLED) - self._state = states.DISABLED + # Clear run queue + LOG.info("Clearing audit run queue for %s", self.name) + self.run_queue.clear() + # Stop all repairs - not yet implemented # Stop watchdog monitoring LOG.info("Stopping watchdog for %s", self.name) self._watchdog_thread.stop() @@ -236,6 +242,11 @@ class Engine(object): def setup_audit(self, execution_time, audit_list): try: pause.until(execution_time) + # Only proceed if engine is running, i.e in enabled state. + if self._state != self.ENABLED: + LOG.info("%s is disabled, so not running audits at %s", + self.name, execution_time) + return LOG.info("Time: %s, Starting %s", execution_time, audit_list) audit_futures = [] for audit in audit_list: