Merge "Ensure the fetching jobs does not fetch anything when in bad state"
This commit is contained in:
@@ -14,6 +14,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import contextlib
|
||||
import functools
|
||||
import sys
|
||||
@@ -23,6 +24,7 @@ import fasteners
|
||||
import futurist
|
||||
from kazoo import exceptions as k_exceptions
|
||||
from kazoo.protocol import paths as k_paths
|
||||
from kazoo.protocol import states as k_states
|
||||
from kazoo.recipe import watchers
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import excutils
|
||||
@@ -261,6 +263,19 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
||||
#: Default znode path used for jobs (data, locks...).
|
||||
DEFAULT_PATH = "/taskflow/jobs"
|
||||
|
||||
STATE_HISTORY_LENGTH = 2
|
||||
"""
|
||||
Number of prior state changes to keep a history of, mainly useful
|
||||
for history tracking and debugging connectivity issues.
|
||||
"""
|
||||
|
||||
NO_FETCH_STATES = (k_states.KazooState.LOST, k_states.KazooState.SUSPENDED)
|
||||
"""
|
||||
Client states underwhich we return empty lists from fetching routines,
|
||||
during these states the underlying connection either is being recovered
|
||||
or may be recovered (aka, it has not full disconnected).
|
||||
"""
|
||||
|
||||
def __init__(self, name, conf,
|
||||
client=None, persistence=None, emit_notifications=True):
|
||||
super(ZookeeperJobBoard, self).__init__(name, conf)
|
||||
@@ -298,6 +313,8 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
||||
self._worker = None
|
||||
self._emit_notifications = bool(emit_notifications)
|
||||
self._connected = False
|
||||
self._suspended = False
|
||||
self._last_states = collections.deque(maxlen=self.STATE_HISTORY_LENGTH)
|
||||
|
||||
def _try_emit(self, state, details):
|
||||
# Submit the work to the executor to avoid blocking the kazoo threads
|
||||
@@ -334,10 +351,25 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
||||
return len(self._known_jobs)
|
||||
|
||||
def _fetch_jobs(self, ensure_fresh=False):
|
||||
if ensure_fresh:
|
||||
self._force_refresh()
|
||||
with self._job_cond:
|
||||
return sorted(six.itervalues(self._known_jobs), reverse=True)
|
||||
try:
|
||||
last_state = self._last_states[0]
|
||||
except IndexError:
|
||||
last_state = None
|
||||
if last_state in self.NO_FETCH_STATES:
|
||||
# NOTE(harlowja): on lost clear out all known jobs (from the
|
||||
# in-memory mapping) as we can not safely assume there are any
|
||||
# jobs to continue working on in this state.
|
||||
if last_state == k_states.KazooState.LOST and self._known_jobs:
|
||||
# This will force the jobboard to drop all (in-memory) jobs
|
||||
# that are not in this list (pretty much simulating what
|
||||
# would happen if a jobboard data directory was emptied).
|
||||
self._on_job_posting([], delayed=False)
|
||||
return []
|
||||
else:
|
||||
if ensure_fresh:
|
||||
self._force_refresh()
|
||||
with self._job_cond:
|
||||
return sorted(six.itervalues(self._known_jobs))
|
||||
|
||||
def _force_refresh(self):
|
||||
try:
|
||||
@@ -364,12 +396,15 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
||||
|
||||
def _remove_job(self, path):
|
||||
if path not in self._known_jobs:
|
||||
return
|
||||
return False
|
||||
with self._job_cond:
|
||||
job = self._known_jobs.pop(path, None)
|
||||
if job is not None:
|
||||
LOG.debug("Removed job that was at path '%s'", path)
|
||||
self._try_emit(base.REMOVAL, details={'job': job})
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def _process_child(self, path, request, quiet=True):
|
||||
"""Receives the result of a child data fetch request."""
|
||||
@@ -456,8 +491,13 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
||||
investigate_paths.append(path)
|
||||
if pending_removals:
|
||||
with self._job_cond:
|
||||
for path in pending_removals:
|
||||
self._remove_job(path)
|
||||
am_removed = 0
|
||||
try:
|
||||
for path in pending_removals:
|
||||
am_removed += int(self._remove_job(path))
|
||||
finally:
|
||||
if am_removed:
|
||||
self._job_cond.notify_all()
|
||||
for path in investigate_paths:
|
||||
# Fire off the request to populate this job.
|
||||
#
|
||||
@@ -694,7 +734,24 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
||||
kazoo_utils.checked_commit(txn)
|
||||
|
||||
def _state_change_listener(self, state):
|
||||
LOG.debug("Kazoo client has changed to state: %s", state)
|
||||
if self._last_states:
|
||||
LOG.debug("Kazoo client has changed to"
|
||||
" state '%s' from prior states '%s'", state,
|
||||
self._last_states)
|
||||
else:
|
||||
LOG.debug("Kazoo client has changed to state '%s' (from"
|
||||
" its initial/uninitialized state)", state)
|
||||
self._last_states.appendleft(state)
|
||||
if state == k_states.KazooState.LOST:
|
||||
self._connected = False
|
||||
LOG.warn("Connection to zookeeper has been lost")
|
||||
elif state == k_states.KazooState.SUSPENDED:
|
||||
LOG.warn("Connection to zookeeper has been suspended")
|
||||
self._suspended = True
|
||||
else:
|
||||
# Must be CONNECTED then (as there are only 3 enums)
|
||||
if self._suspended:
|
||||
self._suspended = False
|
||||
|
||||
def wait(self, timeout=None):
|
||||
# Wait until timeout expires (or forever) for jobs to appear.
|
||||
@@ -738,6 +795,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
||||
self._known_jobs.clear()
|
||||
LOG.debug("Stopped & cleared local state")
|
||||
self._connected = False
|
||||
self._last_states.clear()
|
||||
|
||||
@fasteners.locked(lock='_open_close_lock')
|
||||
def connect(self, timeout=10.0):
|
||||
|
Reference in New Issue
Block a user