Merge "Increase the level of usefulness of the dispatching logging"
This commit is contained in:
@@ -106,9 +106,10 @@ class Conductor(object):
|
||||
@abc.abstractmethod
|
||||
def _dispatch_job(self, job):
|
||||
"""Accepts a single (already claimed) job and causes it to be run in
|
||||
an engine. The job is consumed upon completion (unless False is
|
||||
returned which will signify the job should be abandoned instead)
|
||||
an engine. Returns a boolean that signifies whether the job should
|
||||
be consumed. The job is consumed upon completion (unless False is
|
||||
returned which will signify the job should be abandoned instead).
|
||||
|
||||
:param job: A Job instance that has already been claimed by the
|
||||
jobboard.
|
||||
:param job: A job instance that has already been claimed by the
|
||||
jobboard.
|
||||
"""
|
||||
|
||||
@@ -83,13 +83,10 @@ class SingleThreadedConductor(base.Conductor):
|
||||
return True
|
||||
|
||||
def _dispatch_job(self, job):
|
||||
LOG.info("Dispatching job: %s", job)
|
||||
try:
|
||||
engine = self._engine_from_job(job)
|
||||
except Exception as e:
|
||||
raise excp.ConductorFailure("Failed creating an engine", cause=e)
|
||||
engine = self._engine_from_job(job)
|
||||
consume = True
|
||||
with logging_listener.LoggingListener(engine, log=LOG):
|
||||
consume = True
|
||||
LOG.debug("Dispatching engine %s for job: %s", engine, job)
|
||||
try:
|
||||
engine.run()
|
||||
except excp.WrappedFailure as e:
|
||||
@@ -107,7 +104,7 @@ class SingleThreadedConductor(base.Conductor):
|
||||
LOG.warn("Job execution failed: %s", job, exc_info=True)
|
||||
else:
|
||||
LOG.info("Job completed successfully: %s", job)
|
||||
return consume
|
||||
return consume
|
||||
|
||||
def run(self):
|
||||
self._dead.clear()
|
||||
@@ -125,25 +122,26 @@ class SingleThreadedConductor(base.Conductor):
|
||||
except (excp.UnclaimableJob, excp.NotFound):
|
||||
LOG.debug("Job already claimed or consumed: %s", job)
|
||||
continue
|
||||
dispatched += 1
|
||||
consume = False
|
||||
try:
|
||||
consume = self._dispatch_job(job)
|
||||
except excp.ConductorFailure:
|
||||
except Exception:
|
||||
LOG.warn("Job dispatching failed: %s", job,
|
||||
exc_info=True)
|
||||
else:
|
||||
try:
|
||||
if consume:
|
||||
self._jobboard.consume(job, self._name)
|
||||
else:
|
||||
self._jobboard.abandon(job, self._name)
|
||||
except excp.JobFailure:
|
||||
if consume:
|
||||
LOG.warn("Failed job consumption: %s", job,
|
||||
exc_info=True)
|
||||
else:
|
||||
LOG.warn("Failed job abandonment: %s", job,
|
||||
exc_info=True)
|
||||
dispatched += 1
|
||||
try:
|
||||
if consume:
|
||||
self._jobboard.consume(job, self._name)
|
||||
else:
|
||||
self._jobboard.abandon(job, self._name)
|
||||
except excp.JobFailure:
|
||||
if consume:
|
||||
LOG.warn("Failed job consumption: %s", job,
|
||||
exc_info=True)
|
||||
else:
|
||||
LOG.warn("Failed job abandonment: %s", job,
|
||||
exc_info=True)
|
||||
if dispatched == 0 and not self._wait_timeout.is_stopped():
|
||||
self._wait_timeout.wait()
|
||||
finally:
|
||||
|
||||
Reference in New Issue
Block a user