672 lines
26 KiB
Python
672 lines
26 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import collections
|
|
import contextlib
|
|
import functools
|
|
import logging
|
|
import threading
|
|
|
|
from concurrent import futures
|
|
from kazoo import exceptions as k_exceptions
|
|
from kazoo.protocol import paths as k_paths
|
|
from kazoo.recipe import watchers
|
|
import six
|
|
|
|
from taskflow import exceptions as excp
|
|
from taskflow.jobs import job as base_job
|
|
from taskflow.jobs import jobboard
|
|
from taskflow.openstack.common import excutils
|
|
from taskflow.openstack.common import jsonutils
|
|
from taskflow.openstack.common import uuidutils
|
|
from taskflow import states
|
|
from taskflow.types import timing as tt
|
|
from taskflow.utils import kazoo_utils
|
|
from taskflow.utils import lock_utils
|
|
from taskflow.utils import misc
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
UNCLAIMED_JOB_STATES = (
|
|
states.UNCLAIMED,
|
|
)
|
|
ALL_JOB_STATES = (
|
|
states.UNCLAIMED,
|
|
states.COMPLETE,
|
|
states.CLAIMED,
|
|
)
|
|
|
|
# Transaction support was added in 3.4.0
|
|
MIN_ZK_VERSION = (3, 4, 0)
|
|
LOCK_POSTFIX = ".lock"
|
|
JOB_PREFIX = 'job'
|
|
|
|
|
|
def _check_who(who):
|
|
if not isinstance(who, six.string_types):
|
|
raise TypeError("Job applicant must be a string type")
|
|
if len(who) == 0:
|
|
raise ValueError("Job applicant must be non-empty")
|
|
|
|
|
|
class ZookeeperJob(base_job.Job):
|
|
def __init__(self, name, board, client, backend, path,
|
|
uuid=None, details=None, book=None, book_data=None,
|
|
created_on=None):
|
|
super(ZookeeperJob, self).__init__(name, uuid=uuid, details=details)
|
|
self._board = board
|
|
self._book = book
|
|
if not book_data:
|
|
book_data = {}
|
|
self._book_data = book_data
|
|
self._client = client
|
|
self._backend = backend
|
|
if all((self._book, self._book_data)):
|
|
raise ValueError("Only one of 'book_data' or 'book'"
|
|
" can be provided")
|
|
self._path = path
|
|
self._lock_path = path + LOCK_POSTFIX
|
|
self._created_on = created_on
|
|
self._node_not_found = False
|
|
|
|
@property
|
|
def lock_path(self):
|
|
return self._lock_path
|
|
|
|
@property
|
|
def path(self):
|
|
return self._path
|
|
|
|
def _get_node_attr(self, path, attr_name, trans_func=None):
|
|
try:
|
|
_data, node_stat = self._client.get(path)
|
|
attr = getattr(node_stat, attr_name)
|
|
if trans_func is not None:
|
|
return trans_func(attr)
|
|
else:
|
|
return attr
|
|
except k_exceptions.NoNodeError as e:
|
|
raise excp.NotFound("Can not fetch the %r attribute"
|
|
" of job %s (%s), path %s not found"
|
|
% (attr_name, self.uuid, self.path, path), e)
|
|
except self._client.handler.timeout_exception as e:
|
|
raise excp.JobFailure("Can not fetch the %r attribute"
|
|
" of job %s (%s), connection timed out"
|
|
% (attr_name, self.uuid, self.path), e)
|
|
except k_exceptions.SessionExpiredError as e:
|
|
raise excp.JobFailure("Can not fetch the %r attribute"
|
|
" of job %s (%s), session expired"
|
|
% (attr_name, self.uuid, self.path), e)
|
|
except (AttributeError, k_exceptions.KazooException) as e:
|
|
raise excp.JobFailure("Can not fetch the %r attribute"
|
|
" of job %s (%s), internal error" %
|
|
(attr_name, self.uuid, self.path), e)
|
|
|
|
@property
|
|
def last_modified(self):
|
|
modified_on = None
|
|
try:
|
|
if not self._node_not_found:
|
|
modified_on = self._get_node_attr(
|
|
self.path, 'mtime',
|
|
trans_func=misc.millis_to_datetime)
|
|
except excp.NotFound:
|
|
self._node_not_found = True
|
|
return modified_on
|
|
|
|
@property
|
|
def created_on(self):
|
|
# This one we can cache (since it won't change after creation).
|
|
if self._node_not_found:
|
|
return None
|
|
if self._created_on is None:
|
|
try:
|
|
self._created_on = self._get_node_attr(
|
|
self.path, 'ctime',
|
|
trans_func=misc.millis_to_datetime)
|
|
except excp.NotFound:
|
|
self._node_not_found = True
|
|
return self._created_on
|
|
|
|
@property
|
|
def board(self):
|
|
return self._board
|
|
|
|
def _load_book(self):
|
|
book_uuid = self.book_uuid
|
|
if self._backend is not None and book_uuid is not None:
|
|
# TODO(harlowja): we are currently limited by assuming that the
|
|
# job posted has the same backend as this loader (to start this
|
|
# seems to be a ok assumption, and can be adjusted in the future
|
|
# if we determine there is a use-case for multi-backend loaders,
|
|
# aka a registry of loaders).
|
|
with contextlib.closing(self._backend.get_connection()) as conn:
|
|
return conn.get_logbook(book_uuid)
|
|
# No backend to fetch from or no uuid specified
|
|
return None
|
|
|
|
@property
|
|
def state(self):
|
|
owner = self.board.find_owner(self)
|
|
job_data = {}
|
|
try:
|
|
raw_data, _data_stat = self._client.get(self.path)
|
|
job_data = misc.decode_json(raw_data)
|
|
except k_exceptions.NoNodeError:
|
|
pass
|
|
except k_exceptions.SessionExpiredError as e:
|
|
raise excp.JobFailure("Can not fetch the state of %s,"
|
|
" session expired" % (self.uuid), e)
|
|
except self._client.handler.timeout_exception as e:
|
|
raise excp.JobFailure("Can not fetch the state of %s,"
|
|
" connection timed out" % (self.uuid), e)
|
|
except k_exceptions.KazooException as e:
|
|
raise excp.JobFailure("Can not fetch the state of %s, internal"
|
|
" error" % (self.uuid), e)
|
|
if not job_data:
|
|
# No data this job has been completed (the owner that we might have
|
|
# fetched will not be able to be fetched again, since the job node
|
|
# is a parent node of the owner/lock node).
|
|
return states.COMPLETE
|
|
if not owner:
|
|
# No owner, but data, still work to be done.
|
|
return states.UNCLAIMED
|
|
return states.CLAIMED
|
|
|
|
def __cmp__(self, other):
|
|
return cmp(self.path, other.path)
|
|
|
|
def __hash__(self):
|
|
return hash(self.path)
|
|
|
|
@property
|
|
def book(self):
|
|
if self._book is None:
|
|
self._book = self._load_book()
|
|
return self._book
|
|
|
|
@property
|
|
def book_uuid(self):
|
|
if self._book:
|
|
return self._book.uuid
|
|
if self._book_data:
|
|
return self._book_data.get('uuid')
|
|
else:
|
|
return None
|
|
|
|
@property
|
|
def book_name(self):
|
|
if self._book:
|
|
return self._book.name
|
|
if self._book_data:
|
|
return self._book_data.get('name')
|
|
else:
|
|
return None
|
|
|
|
|
|
class ZookeeperJobBoardIterator(six.Iterator):
|
|
"""Iterator over a zookeeper jobboard.
|
|
|
|
It supports the following attributes/constructor arguments:
|
|
|
|
* ensure_fresh: boolean that requests that during every fetch of a new
|
|
set of jobs this will cause the iterator to force the backend to
|
|
refresh (ensuring that the jobboard has the most recent job listings).
|
|
* only_unclaimed: boolean that indicates whether to only iterate
|
|
over unclaimed jobs.
|
|
"""
|
|
|
|
def __init__(self, board, only_unclaimed=False, ensure_fresh=False):
|
|
self._board = board
|
|
self._jobs = collections.deque()
|
|
self._fetched = False
|
|
self.ensure_fresh = ensure_fresh
|
|
self.only_unclaimed = only_unclaimed
|
|
|
|
@property
|
|
def board(self):
|
|
return self._board
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def _next_job(self):
|
|
if self.only_unclaimed:
|
|
allowed_states = UNCLAIMED_JOB_STATES
|
|
else:
|
|
allowed_states = ALL_JOB_STATES
|
|
job = None
|
|
while self._jobs and job is None:
|
|
maybe_job = self._jobs.popleft()
|
|
try:
|
|
if maybe_job.state in allowed_states:
|
|
job = maybe_job
|
|
except excp.JobFailure:
|
|
LOG.warn("Failed determining the state of job: %s (%s)",
|
|
maybe_job.uuid, maybe_job.path, exc_info=True)
|
|
except excp.NotFound:
|
|
self._board._remove_job(maybe_job.path)
|
|
return job
|
|
|
|
def __next__(self):
|
|
if not self._jobs:
|
|
if not self._fetched:
|
|
jobs = self._board._fetch_jobs(ensure_fresh=self.ensure_fresh)
|
|
self._jobs.extend(jobs)
|
|
self._fetched = True
|
|
job = self._next_job()
|
|
if job is None:
|
|
raise StopIteration
|
|
else:
|
|
return job
|
|
|
|
|
|
class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
|
def __init__(self, name, conf,
|
|
client=None, persistence=None, emit_notifications=True):
|
|
super(ZookeeperJobBoard, self).__init__(name, conf)
|
|
if client is not None:
|
|
self._client = client
|
|
self._owned = False
|
|
else:
|
|
self._client = kazoo_utils.make_client(self._conf)
|
|
self._owned = True
|
|
path = str(conf.get("path", "/taskflow/jobs"))
|
|
if not path:
|
|
raise ValueError("Empty zookeeper path is disallowed")
|
|
if not k_paths.isabs(path):
|
|
raise ValueError("Zookeeper path must be absolute")
|
|
self._path = path
|
|
# The backend to load the full logbooks from, since whats sent over
|
|
# the zookeeper data connection is only the logbook uuid and name, and
|
|
# not currently the full logbook (later when a zookeeper backend
|
|
# appears we can likely optimize for that backend usage by directly
|
|
# reading from the path where the data is stored, if we want).
|
|
self._persistence = persistence
|
|
# Misc. internal details
|
|
self._known_jobs = {}
|
|
self._job_lock = threading.RLock()
|
|
self._job_cond = threading.Condition(self._job_lock)
|
|
self._open_close_lock = threading.RLock()
|
|
self._client.add_listener(self._state_change_listener)
|
|
self._bad_paths = frozenset([path])
|
|
self._job_watcher = None
|
|
# Since we use sequenced ids this will be the path that the sequences
|
|
# are prefixed with, for example, job0000000001, job0000000002, ...
|
|
self._job_base = k_paths.join(path, JOB_PREFIX)
|
|
self._worker = None
|
|
self._emit_notifications = bool(emit_notifications)
|
|
|
|
def _emit(self, state, details):
|
|
# Submit the work to the executor to avoid blocking the kazoo queue.
|
|
if self._worker is not None:
|
|
self._worker.submit(self.notifier.notify, state, details)
|
|
|
|
@property
|
|
def path(self):
|
|
return self._path
|
|
|
|
@property
|
|
def job_count(self):
|
|
with self._job_lock:
|
|
return len(self._known_jobs)
|
|
|
|
def _fetch_jobs(self, ensure_fresh=False):
|
|
if ensure_fresh:
|
|
self._force_refresh()
|
|
with self._job_lock:
|
|
return sorted(six.itervalues(self._known_jobs))
|
|
|
|
def _force_refresh(self):
|
|
try:
|
|
children = self._client.get_children(self.path)
|
|
except self._client.handler.timeout_exception as e:
|
|
raise excp.JobFailure("Refreshing failure, connection timed out",
|
|
e)
|
|
except k_exceptions.SessionExpiredError as e:
|
|
raise excp.JobFailure("Refreshing failure, session expired", e)
|
|
except k_exceptions.NoNodeError:
|
|
pass
|
|
except k_exceptions.KazooException as e:
|
|
raise excp.JobFailure("Refreshing failure, internal error", e)
|
|
else:
|
|
self._on_job_posting(children, delayed=False)
|
|
|
|
def iterjobs(self, only_unclaimed=False, ensure_fresh=False):
|
|
return ZookeeperJobBoardIterator(self,
|
|
only_unclaimed=only_unclaimed,
|
|
ensure_fresh=ensure_fresh)
|
|
|
|
def _remove_job(self, path):
|
|
LOG.debug("Removing job that was at path: %s", path)
|
|
with self._job_lock:
|
|
job = self._known_jobs.pop(path, None)
|
|
if job is not None:
|
|
self._emit(jobboard.REMOVAL, details={'job': job})
|
|
|
|
def _process_child(self, path, request):
|
|
"""Receives the result of a child data fetch request."""
|
|
job = None
|
|
try:
|
|
raw_data, node_stat = request.get()
|
|
job_data = misc.decode_json(raw_data)
|
|
created_on = misc.millis_to_datetime(node_stat.ctime)
|
|
except (ValueError, TypeError, KeyError):
|
|
LOG.warn("Incorrectly formatted job data found at path: %s",
|
|
path, exc_info=True)
|
|
except self._client.handler.timeout_exception:
|
|
LOG.warn("Connection timed out fetching job data from path: %s",
|
|
path, exc_info=True)
|
|
except k_exceptions.SessionExpiredError:
|
|
LOG.warn("Session expired fetching job data from path: %s", path,
|
|
exc_info=True)
|
|
except k_exceptions.NoNodeError:
|
|
LOG.debug("No job node found at path: %s, it must have"
|
|
" disappeared or was removed", path)
|
|
except k_exceptions.KazooException:
|
|
LOG.warn("Internal error fetching job data from path: %s",
|
|
path, exc_info=True)
|
|
else:
|
|
self._job_cond.acquire()
|
|
try:
|
|
if path not in self._known_jobs:
|
|
job = ZookeeperJob(job_data['name'], self,
|
|
self._client, self._persistence, path,
|
|
uuid=job_data['uuid'],
|
|
book_data=job_data.get("book"),
|
|
details=job_data.get("details", {}),
|
|
created_on=created_on)
|
|
self._known_jobs[path] = job
|
|
self._job_cond.notify_all()
|
|
finally:
|
|
self._job_cond.release()
|
|
if job is not None:
|
|
self._emit(jobboard.POSTED, details={'job': job})
|
|
|
|
def _on_job_posting(self, children, delayed=True):
|
|
LOG.debug("Got children %s under path %s", children, self.path)
|
|
child_paths = []
|
|
for c in children:
|
|
if c.endswith(LOCK_POSTFIX) or not c.startswith(JOB_PREFIX):
|
|
# Skip lock paths or non-job-paths (these are not valid jobs)
|
|
continue
|
|
child_paths.append(k_paths.join(self.path, c))
|
|
|
|
# Remove jobs that we know about but which are no longer children
|
|
with self._job_lock:
|
|
removals = set()
|
|
for path, _job in six.iteritems(self._known_jobs):
|
|
if path not in child_paths:
|
|
removals.add(path)
|
|
for path in removals:
|
|
self._remove_job(path)
|
|
|
|
# Ensure that we have a job record for each new job that has appeared
|
|
for path in child_paths:
|
|
if path in self._bad_paths:
|
|
continue
|
|
with self._job_lock:
|
|
if path not in self._known_jobs:
|
|
# Fire off the request to populate this job asynchronously.
|
|
#
|
|
# This method is *usually* called from a asynchronous
|
|
# handler so it's better to exit from this quickly to
|
|
# allow other asynchronous handlers to be executed.
|
|
request = self._client.get_async(path)
|
|
child_proc = functools.partial(self._process_child, path)
|
|
if delayed:
|
|
request.rawlink(child_proc)
|
|
else:
|
|
child_proc(request)
|
|
|
|
def post(self, name, book=None, details=None):
|
|
|
|
def format_posting(job_uuid):
|
|
posting = {
|
|
'uuid': job_uuid,
|
|
'name': name,
|
|
}
|
|
if details:
|
|
posting['details'] = details
|
|
else:
|
|
posting['details'] = {}
|
|
if book is not None:
|
|
posting['book'] = {
|
|
'name': book.name,
|
|
'uuid': book.uuid,
|
|
}
|
|
return posting
|
|
|
|
# NOTE(harlowja): Jobs are not ephemeral, they will persist until they
|
|
# are consumed (this may change later, but seems safer to do this until
|
|
# further notice).
|
|
job_uuid = uuidutils.generate_uuid()
|
|
with self._wrap(job_uuid, None,
|
|
"Posting failure: %s", ensure_known=False):
|
|
job_posting = format_posting(job_uuid)
|
|
job_posting = misc.binary_encode(jsonutils.dumps(job_posting))
|
|
job_path = self._client.create(self._job_base,
|
|
value=job_posting,
|
|
sequence=True,
|
|
ephemeral=False)
|
|
job = ZookeeperJob(name, self, self._client,
|
|
self._persistence, job_path,
|
|
book=book, details=details,
|
|
uuid=job_uuid)
|
|
self._job_cond.acquire()
|
|
try:
|
|
self._known_jobs[job_path] = job
|
|
self._job_cond.notify_all()
|
|
finally:
|
|
self._job_cond.release()
|
|
self._emit(jobboard.POSTED, details={'job': job})
|
|
return job
|
|
|
|
def claim(self, job, who):
|
|
_check_who(who)
|
|
with self._wrap(job.uuid, job.path, "Claiming failure: %s"):
|
|
# NOTE(harlowja): post as json which will allow for future changes
|
|
# more easily than a raw string/text.
|
|
value = jsonutils.dumps({
|
|
'owner': who,
|
|
})
|
|
try:
|
|
self._client.create(job.lock_path,
|
|
value=misc.binary_encode(value),
|
|
ephemeral=True)
|
|
except k_exceptions.NodeExistsException:
|
|
# Try to see if we can find who the owner really is...
|
|
try:
|
|
owner = self.find_owner(job)
|
|
except Exception:
|
|
owner = None
|
|
if owner:
|
|
msg = "Job %s already claimed by '%s'" % (job.uuid, owner)
|
|
else:
|
|
msg = "Job %s already claimed" % (job.uuid)
|
|
raise excp.UnclaimableJob(msg)
|
|
|
|
@contextlib.contextmanager
|
|
def _wrap(self, job_uuid, job_path,
|
|
fail_msg_tpl="Failure: %s", ensure_known=True):
|
|
if job_path:
|
|
fail_msg_tpl += " (%s)" % (job_path)
|
|
if ensure_known:
|
|
if not job_path:
|
|
raise ValueError("Unable to check if %r is a known path"
|
|
% (job_path))
|
|
with self._job_lock:
|
|
if job_path not in self._known_jobs:
|
|
fail_msg_tpl += ", unknown job"
|
|
raise excp.NotFound(fail_msg_tpl % (job_uuid))
|
|
try:
|
|
yield
|
|
except self._client.handler.timeout_exception as e:
|
|
fail_msg_tpl += ", connection timed out"
|
|
raise excp.JobFailure(fail_msg_tpl % (job_uuid), e)
|
|
except k_exceptions.SessionExpiredError as e:
|
|
fail_msg_tpl += ", session expired"
|
|
raise excp.JobFailure(fail_msg_tpl % (job_uuid), e)
|
|
except k_exceptions.NoNodeError:
|
|
fail_msg_tpl += ", unknown job"
|
|
raise excp.NotFound(fail_msg_tpl % (job_uuid))
|
|
except k_exceptions.KazooException as e:
|
|
fail_msg_tpl += ", internal error"
|
|
raise excp.JobFailure(fail_msg_tpl % (job_uuid), e)
|
|
|
|
def find_owner(self, job):
|
|
with self._wrap(job.uuid, job.path, "Owner query failure: %s"):
|
|
try:
|
|
self._client.sync(job.lock_path)
|
|
raw_data, _lock_stat = self._client.get(job.lock_path)
|
|
data = misc.decode_json(raw_data)
|
|
owner = data.get("owner")
|
|
except k_exceptions.NoNodeError:
|
|
owner = None
|
|
return owner
|
|
|
|
def _get_owner_and_data(self, job):
|
|
lock_data, lock_stat = self._client.get(job.lock_path)
|
|
job_data, job_stat = self._client.get(job.path)
|
|
return (misc.decode_json(lock_data), lock_stat,
|
|
misc.decode_json(job_data), job_stat)
|
|
|
|
def consume(self, job, who):
|
|
_check_who(who)
|
|
with self._wrap(job.uuid, job.path, "Consumption failure: %s"):
|
|
try:
|
|
owner_data = self._get_owner_and_data(job)
|
|
lock_data, lock_stat, data, data_stat = owner_data
|
|
except k_exceptions.NoNodeError:
|
|
raise excp.JobFailure("Can not consume a job %s"
|
|
" which we can not determine"
|
|
" the owner of" % (job.uuid))
|
|
if lock_data.get("owner") != who:
|
|
raise excp.JobFailure("Can not consume a job %s"
|
|
" which is not owned by %s"
|
|
% (job.uuid, who))
|
|
with self._client.transaction() as txn:
|
|
txn.delete(job.lock_path, version=lock_stat.version)
|
|
txn.delete(job.path, version=data_stat.version)
|
|
self._remove_job(job.path)
|
|
|
|
def abandon(self, job, who):
|
|
_check_who(who)
|
|
with self._wrap(job.uuid, job.path, "Abandonment failure: %s"):
|
|
try:
|
|
owner_data = self._get_owner_and_data(job)
|
|
lock_data, lock_stat, data, data_stat = owner_data
|
|
except k_exceptions.NoNodeError:
|
|
raise excp.JobFailure("Can not abandon a job %s"
|
|
" which we can not determine"
|
|
" the owner of" % (job.uuid))
|
|
if lock_data.get("owner") != who:
|
|
raise excp.JobFailure("Can not abandon a job %s"
|
|
" which is not owned by %s"
|
|
% (job.uuid, who))
|
|
with self._client.transaction() as txn:
|
|
txn.delete(job.lock_path, version=lock_stat.version)
|
|
|
|
def _state_change_listener(self, state):
|
|
LOG.debug("Kazoo client has changed to state: %s", state)
|
|
|
|
def wait(self, timeout=None):
|
|
# Wait until timeout expires (or forever) for jobs to appear.
|
|
watch = None
|
|
if timeout is not None:
|
|
watch = tt.StopWatch(duration=float(timeout)).start()
|
|
self._job_cond.acquire()
|
|
try:
|
|
while True:
|
|
if not self._known_jobs:
|
|
if watch is not None and watch.expired():
|
|
raise excp.NotFound("Expired waiting for jobs to"
|
|
" arrive; waited %s seconds"
|
|
% watch.elapsed())
|
|
# This is done since the given timeout can not be provided
|
|
# to the condition variable, since we can not ensure that
|
|
# when we acquire the condition that there will actually
|
|
# be jobs (especially if we are spuriously awaken), so we
|
|
# must recalculate the amount of time we really have left.
|
|
timeout = None
|
|
if watch is not None:
|
|
timeout = watch.leftover()
|
|
self._job_cond.wait(timeout)
|
|
else:
|
|
it = ZookeeperJobBoardIterator(self)
|
|
it._jobs.extend(self._fetch_jobs())
|
|
it._fetched = True
|
|
return it
|
|
finally:
|
|
self._job_cond.release()
|
|
|
|
@property
|
|
def connected(self):
|
|
return self._client.connected
|
|
|
|
@lock_utils.locked(lock='_open_close_lock')
|
|
def close(self):
|
|
if self._owned:
|
|
LOG.debug("Stopping client")
|
|
kazoo_utils.finalize_client(self._client)
|
|
if self._worker is not None:
|
|
LOG.debug("Shutting down the notifier")
|
|
self._worker.shutdown()
|
|
self._worker = None
|
|
with self._job_lock:
|
|
self._known_jobs.clear()
|
|
LOG.debug("Stopped & cleared local state")
|
|
|
|
@lock_utils.locked(lock='_open_close_lock')
|
|
def connect(self, timeout=10.0):
|
|
|
|
def try_clean():
|
|
# Attempt to do the needed cleanup if post-connection setup does
|
|
# not succeed (maybe the connection is lost right after it is
|
|
# obtained).
|
|
try:
|
|
self.close()
|
|
except k_exceptions.KazooException:
|
|
LOG.exception("Failed cleaning-up after post-connection"
|
|
" initialization failed")
|
|
|
|
try:
|
|
if timeout is not None:
|
|
timeout = float(timeout)
|
|
self._client.start(timeout=timeout)
|
|
except (self._client.handler.timeout_exception,
|
|
k_exceptions.KazooException) as e:
|
|
raise excp.JobFailure("Failed to connect to zookeeper", e)
|
|
try:
|
|
kazoo_utils.check_compatible(self._client, MIN_ZK_VERSION)
|
|
if self._worker is None and self._emit_notifications:
|
|
self._worker = futures.ThreadPoolExecutor(max_workers=1)
|
|
self._client.ensure_path(self.path)
|
|
if self._job_watcher is None:
|
|
self._job_watcher = watchers.ChildrenWatch(
|
|
self._client,
|
|
self.path,
|
|
func=self._on_job_posting,
|
|
allow_session_lost=True)
|
|
except excp.IncompatibleVersion:
|
|
with excutils.save_and_reraise_exception():
|
|
try_clean()
|
|
except (self._client.handler.timeout_exception,
|
|
k_exceptions.KazooException) as e:
|
|
try_clean()
|
|
raise excp.JobFailure("Failed to do post-connection"
|
|
" initialization", e)
|