Add Etcd backend for jobboard

Add a new backend for jobboard based on Etcd3, it uses the etcd3gw
bindings.

Change-Id: I12eaa04dfb11941802200736e56a4fb5761fb368
This commit is contained in:
Gregory Thiemonge 2024-04-11 08:04:50 -04:00
parent 3ca2d4fdc8
commit c43c7bc3f5
8 changed files with 1016 additions and 2 deletions

View File

@ -288,6 +288,19 @@ optionally expire after a given amount of time).
See :py:class:`~taskflow.jobs.backends.impl_redis.RedisJobBoard`
for implementation details.
Etcd
----
**Board type**: ``'etcd'``
Uses `etcd`_ to provide the jobboard capabilities by using Etcd key values data
structures and individual job ownership key (that can optionally expire after a
given amount of time).
.. note::
See :py:class:`~taskflow.jobs.backends.impl_etcd.EtcdJobBoard`
for implementation details.
Considerations
==============
@ -356,6 +369,11 @@ Redis
.. automodule:: taskflow.jobs.backends.impl_redis
Etcd
----
.. automodule:: taskflow.jobs.backends.impl_etcd
Hierarchy
=========
@ -363,6 +381,7 @@ Hierarchy
taskflow.jobs.base
taskflow.jobs.backends.impl_redis
taskflow.jobs.backends.impl_zookeeper
taskflow.jobs.backends.impl_etcd
:parts: 1
.. _paradigm shift: https://wiki.openstack.org/wiki/TaskFlow/Paradigm_shifts#Workflow_ownership_transfer
@ -370,3 +389,4 @@ Hierarchy
.. _kazoo: https://kazoo.readthedocs.io/en/latest/
.. _stevedore: https://docs.openstack.org/stevedore/latest
.. _redis: https://redis.io/
.. _etcd: https://etcd.io/

View File

@ -0,0 +1,6 @@
---
features:
- |
Added an Etcd-based backend for jobboard. This backend is similar to the
Redis backend, it requires that the consumer extends the expiry of the job
that is being running.

View File

@ -34,6 +34,7 @@ packages =
taskflow.jobboards =
zookeeper = taskflow.jobs.backends.impl_zookeeper:ZookeeperJobBoard
redis = taskflow.jobs.backends.impl_redis:RedisJobBoard
etcd = taskflow.jobs.backends.impl_etcd:EtcdJobBoard
taskflow.conductors =
blocking = taskflow.conductors.backends.impl_blocking:BlockingConductor
@ -63,6 +64,8 @@ zookeeper =
zake>=0.1.6 # Apache-2.0
redis =
redis>=4.0.0 # MIT
etcd =
etcd3gw>=2.0.0 # Apache-2.0
workers =
kombu>=4.3.0 # BSD
eventlet =

View File

@ -0,0 +1,556 @@
# Copyright (C) Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import typing
import etcd3gw
import fasteners
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from oslo_utils import uuidutils
from taskflow import exceptions as exc
from taskflow.jobs import base
from taskflow import logging
from taskflow import states
from taskflow.utils import misc
if typing.TYPE_CHECKING:
from taskflow.types import entity
LOG = logging.getLogger(__name__)
class EtcdJob(base.Job):
board: 'EtcdJobBoard'
def __init__(self, board: 'EtcdJobBoard', name, client, key,
uuid=None, details=None, backend=None,
book=None, book_data=None,
priority=base.JobPriority.NORMAL,
sequence=None, created_on=None):
super().__init__(board, name, uuid=uuid, details=details,
backend=backend, book=book, book_data=book_data)
self._client = client
self._key = key
self._priority = priority
self._sequence = sequence
self._created_on = created_on
self._root = board._root_path
self._lease = None
@property
def key(self):
return self._key
@property
def last_modified(self):
try:
raw_data = self.board.get_last_modified(self)
data = jsonutils.loads(raw_data)
ret = timeutils.parse_strtime(data["last_modified"])
return ret
except Exception:
LOG.exception("Can not read load_modified key.")
return 0
@property
def created_on(self):
return self._created_on
@property
def state(self):
"""Access the current state of this job."""
owner = self.board.find_owner(self)
data = self.board.get_one(self.key)
if not data:
if owner is not None:
LOG.info(f"Owner key was found for job {self.uuid}, "
f"but the key {self.key} is missing")
return states.COMPLETE
if not owner:
return states.UNCLAIMED
return states.CLAIMED
@property
def sequence(self):
return self._sequence
@property
def priority(self):
return self._priority
@property
def lease(self):
if not self._lease:
owner_data = self.board.get_owner_data(self)
if 'lease_id' not in owner_data:
return None
lease_id = owner_data['lease_id']
self._lease = etcd3gw.Lease(id=lease_id,
client=self._client)
return self._lease
def expires_in(self):
"""How many seconds until the claim expires."""
if self.lease is None:
return -1
return self.lease.ttl()
def extend_expiry(self, expiry):
"""Extends the owner key (aka the claim) expiry for this job.
Returns ``True`` if the expiry request was performed
otherwise ``False``.
"""
if self.lease is None:
return False
ret = self.lease.refresh()
# we can also update the last_modified of the job here
return (ret > 0)
@property
def root(self):
return self._root
def __lt__(self, other):
if not isinstance(other, EtcdJob):
return NotImplemented
if self.root == other.root:
if self.priority == other.priority:
return self.sequence < other.sequence
else:
ordered = base.JobPriority.reorder(
(self.priority, self), (other.priority, other))
if ordered[0] is self:
return False
return True
else:
# Different jobboards with different roots...
return self.root < other.root
def __eq__(self, other):
if not isinstance(other, EtcdJob):
return NotImplemented
return ((self.root, self.sequence, self.priority) ==
(other.root, other.sequence, other.priority))
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(self.key)
class EtcdJobBoard(base.JobBoard):
ROOT_PATH = "/taskflow/jobs"
TRASH_PATH = "/taskflow/.trash"
DEFAULT_PATH = "jobboard"
JOB_PREFIX = "job"
SEQUENCE_KEY = "sequence"
LOCK_POSTFIX = ".lock"
LAST_MODIFIED_POSTFIX = ".last_modified"
ETCD_CONFIG_OPTIONS = (
("host", str),
("port", int),
("protocol", str),
("ca_cert", str),
("cert_key", str),
("cert_cert", str),
("timeout", int),
("api_path", str),
)
INIT_STATE = 'init'
CONNECTED_STATE = 'connected'
FETCH_STATE = 'fetched'
def __init__(self, name, conf, persistence=None):
super().__init__(name, conf)
self._client = None
self._persistence = persistence
self._state = self.INIT_STATE
path_elems = [self.ROOT_PATH,
self._conf.get("path", self.DEFAULT_PATH)]
self._root_path = self.join(*path_elems)
self._job_cache = {}
self._job_cond = threading.Condition()
self._open_close_lock = threading.RLock()
self._watcher_thd = None
self._thread_cancel = None
self._watcher = None
self._watcher_cancel = None
def join(self, root, *args):
return "/".join([root] + [a.strip("/") for a in args])
def incr(self, key):
"""Atomically increment an integer, create it if it doesn't exist"""
while True:
value = self._client.get(key)
if not value:
res = self._client.create(key, 1)
if res:
return 1
# Another thread has just created the key after we failed to
# read it, retry to get the new current value
continue
value = int(value[0])
next_value = value + 1
res = self._client.replace(key, value, next_value)
if res:
return next_value
def get_one(self, key):
if self._client is None:
raise exc.JobFailure(f"Can not read key {key}, client is closed")
value = self._client.get(key)
if not value:
return None
return value[0]
def _fetch_jobs(self, only_unclaimed=False, ensure_fresh=False):
# TODO(gthiemonge) only_unclaimed is ignored
if ensure_fresh or self._state != self.FETCH_STATE:
self._ensure_fresh()
return sorted(self._job_cache.values())
def _ensure_fresh(self):
prefix = self.join(self._root_path, self.JOB_PREFIX)
jobs = self._client.get_prefix(prefix)
listed_jobs = {}
for job in jobs:
data, metadata = job
key = misc.binary_decode(metadata['key'])
if (key.endswith(self.LOCK_POSTFIX) or
key.endswith(self.LAST_MODIFIED_POSTFIX)):
continue
listed_jobs[key] = data
removed_jobs = []
with self._job_cond:
for key in self._job_cache.keys():
if key not in listed_jobs:
removed_jobs.append(key)
for key in removed_jobs:
self._remove_job_from_cache(key)
for key, data in listed_jobs.items():
self._process_incoming_job(key, data)
self._state = self.FETCH_STATE
def _process_incoming_job(self, key, data):
try:
job_data = jsonutils.loads(data)
except jsonutils.json.JSONDecodeError:
msg = ("Incorrectly formatted job data found at "
f"key: {key}")
LOG.warning(msg, exc_info=True)
LOG.info("Deleting invalid job data at key: %s", key)
self._client.delete(key)
raise exc.JobFailure(msg)
with self._job_cond:
if key not in self._job_cache:
job_priority = base.JobPriority.convert(job_data["priority"])
new_job = EtcdJob(self,
job_data["name"],
self._client,
key,
uuid=job_data["uuid"],
details=job_data.get("details", {}),
backend=self._persistence,
book_data=job_data.get("book"),
priority=job_priority,
sequence=job_data["sequence"])
self._job_cache[key] = new_job
self._job_cond.notify_all()
def _remove_job_from_cache(self, key):
"""Remove job from cache."""
with self._job_cond:
if key in self._job_cache:
self._job_cache.pop(key, None)
def _board_removal_func(self, job):
try:
self._remove_job_from_cache(job.key)
self._client.delete_prefix(job.key)
except Exception:
LOG.exception(f"Failed to delete prefix {job.key}")
def iterjobs(self, only_unclaimed=False, ensure_fresh=False):
"""Returns an iterator of jobs that are currently on this board."""
return base.JobBoardIterator(
self, LOG, only_unclaimed=only_unclaimed,
ensure_fresh=ensure_fresh,
board_fetch_func=self._fetch_jobs,
board_removal_func=self._board_removal_func)
def wait(self, timeout=None):
"""Waits a given amount of time for **any** jobs to be posted."""
# Wait until timeout expires (or forever) for jobs to appear.
watch = timeutils.StopWatch(duration=timeout)
watch.start()
with self._job_cond:
while True:
if not self._job_cache:
if watch.expired():
raise exc.NotFound("Expired waiting for jobs to"
" arrive; waited %s seconds"
% watch.elapsed())
# This is done since the given timeout can not be provided
# to the condition variable, since we can not ensure that
# when we acquire the condition that there will actually
# be jobs (especially if we are spuriously awaken), so we
# must recalculate the amount of time we really have left.
self._job_cond.wait(watch.leftover(return_none=True))
else:
curr_jobs = self._fetch_jobs()
fetch_func = lambda ensure_fresh: curr_jobs
removal_func = lambda a_job: self._remove_job_from_cache(
a_job.key)
return base.JobBoardIterator(
self, LOG, board_fetch_func=fetch_func,
board_removal_func=removal_func)
@property
def job_count(self):
"""Returns how many jobs are on this jobboard."""
return len(self._job_cache)
def get_owner_data(self, job: EtcdJob):
owner_key = job.key + self.LOCK_POSTFIX
owner_data = self.get_one(owner_key)
if not owner_data:
return None
return jsonutils.loads(owner_data)
def find_owner(self, job: EtcdJob):
"""Gets the owner of the job if one exists."""
data = self.get_owner_data(job)
if data:
return data['owner']
return None
def get_owner_and_data(self, job: EtcdJob):
owner = self.find_owner(job)
data = self.get_one(job.key)
return owner, data
def set_last_modified(self, job: EtcdJob):
key = f"{job.key}{self.LAST_MODIFIED_POSTFIX}"
now = timeutils.utcnow()
self._client.put(key, jsonutils.dumps({"last_modified": now}))
def get_last_modified(self, job: EtcdJob):
key = f"{job.key}{self.LAST_MODIFIED_POSTFIX}"
return self.get_one(key)
def post(self, name, book=None, details=None,
priority=base.JobPriority.NORMAL) -> EtcdJob:
"""Atomically creates and posts a job to the jobboard."""
job_priority = base.JobPriority.convert(priority)
job_uuid = uuidutils.generate_uuid()
job_posting = base.format_posting(job_uuid, name,
created_on=timeutils.utcnow(),
book=book, details=details,
priority=job_priority)
seq = self.incr(self.join(self._root_path, self.SEQUENCE_KEY))
key = self.join(self._root_path, f"{self.JOB_PREFIX}{seq}")
job_posting["sequence"] = seq
raw_job_posting = jsonutils.dumps(job_posting)
self._client.create(key, raw_job_posting)
job = EtcdJob(self, name, self._client, key,
uuid=job_uuid,
details=details,
backend=self._persistence,
book=book,
book_data=job_posting.get('book'),
priority=job_priority,
sequence=seq)
with self._job_cond:
self._job_cache[key] = job
self._job_cond.notify_all()
return job
@base.check_who
def claim(self, job, who, expiry=None):
"""Atomically attempts to claim the provided job."""
owner_key = job.key + self.LOCK_POSTFIX
ttl = expiry or self._conf.get('ttl', None)
if ttl:
lease = self._client.lease(ttl=ttl)
else:
lease = None
owner_dict = {
"owner": who,
}
if lease:
owner_dict["lease_id"] = lease.id
owner_value = jsonutils.dumps(owner_dict)
# Create a lock for the job, if the lock already exists, the job
# is owned by another worker
created = self._client.create(owner_key, owner_value, lease=lease)
if not created:
# Creation is denied, revoke the lease, we cannot claim the job.
if lease:
lease.revoke()
owner = self.find_owner(job)
if owner:
message = f"Job {job.uuid} already claimed by '{owner}'"
else:
message = f"Job {job.uuid} already claimed"
raise exc.UnclaimableJob(message)
# Ensure that the job still exists, it may have been claimed and
# consumed by another thread before we enter this function
if not self.get_one(job.key):
# Revoke the lease
if lease:
lease.revoke()
else:
self._client.delete(owner_key)
raise exc.UnclaimableJob(f"Job {job.uuid} already deleted.")
self.set_last_modified(job)
@base.check_who
def consume(self, job, who):
"""Permanently (and atomically) removes a job from the jobboard."""
owner, data = self.get_owner_and_data(job)
if data is None or owner is None:
raise exc.NotFound(f"Can not find job {job.uuid}")
if owner != who:
raise exc.JobFailure(f"Can not consume a job {job.uuid}"
f" which is not owned by {who}")
self._client.delete_prefix(job.key)
self._remove_job_from_cache(job.key)
@base.check_who
def abandon(self, job, who):
"""Atomically attempts to abandon the provided job."""
owner, data = self.get_owner_and_data(job)
if data is None or owner is None:
raise exc.NotFound(f"Can not find job {job.uuid}")
if owner != who:
raise exc.JobFailure(f"Can not abandon a job {job.uuid}"
f" which is not owned by {who}")
owner_key = job.key + self.LOCK_POSTFIX
self._client.delete(owner_key)
@base.check_who
def trash(self, job, who):
"""Trash the provided job."""
owner, data = self.get_owner_and_data(job)
if data is None or owner is None:
raise exc.NotFound(f"Can not find job {job.uuid}")
if owner != who:
raise exc.JobFailure(f"Can not trash a job {job.uuid} "
f"which is not owned by {who}")
trash_key = job.key.replace(self.ROOT_PATH, self.TRASH_PATH)
self._client.create(trash_key, data)
self._client.delete_prefix(job.key)
self._remove_job_from_cache(job.key)
def register_entity(self, entity: 'entity.Entity'):
"""Register an entity to the jobboard('s backend), e.g: a conductor"""
# TODO(gthiemonge) Doesn't seem to be useful with Etcd
@property
def connected(self):
"""Returns if this jobboard is connected."""
return self._client is not None
@fasteners.locked(lock='_open_close_lock')
def connect(self):
"""Opens the connection to any backend system."""
if self._client is None:
etcd_conf = {}
for config_opts in self.ETCD_CONFIG_OPTIONS:
key, value_type = config_opts
if key in self._conf:
etcd_conf[key] = value_type(self._conf[key])
self._client = etcd3gw.Etcd3Client(**etcd_conf)
self._state = self.CONNECTED_STATE
watch_url = self.join(self._root_path, self.JOB_PREFIX)
self._thread_cancel = threading.Event()
(self._watcher,
self._watcher_cancel) = self._client.watch_prefix(watch_url)
self._watcher_thd = threading.Thread(target=self._watcher_thread)
self._watcher_thd.start()
def _watcher_thread(self):
while not self._thread_cancel.is_set():
for event in self._watcher:
if "kv" not in event:
continue
key_value = event["kv"]
key = misc.binary_decode(key_value["key"])
if (key.endswith(self.LOCK_POSTFIX) or
key.endswith(self.LAST_MODIFIED_POSTFIX)):
continue
if event.get("type") == "DELETE":
self._remove_job_from_cache(key)
else:
data = key_value["value"]
self._process_incoming_job(key, data)
@fasteners.locked(lock='_open_close_lock')
def close(self):
"""Close the connection to any backend system."""
if self._client is not None:
if self._watcher_cancel is not None:
self._watcher_cancel()
if self._thread_cancel is not None:
self._thread_cancel.set()
if self._watcher_thd is not None:
self._watcher_thd.join()
del self._client
self._client = None
self._state = self.INIT_STATE

View File

@ -122,8 +122,8 @@ class TestCase(base.BaseTestCase):
self.assertRaises(exc_class, access_func)
def assertRaisesRegex(self, exc_class, pattern, callable_obj,
*args, **kwargs):
def _assertRaisesRegex(self, exc_class, pattern, callable_obj,
*args, **kwargs):
# TODO(harlowja): submit a pull/review request to testtools to add
# this method to there codebase instead of having it exist in ours
# since it really doesn't belong here.

View File

@ -0,0 +1,416 @@
# Copyright (C) Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
import testtools
from taskflow import exceptions as exc
from taskflow.jobs.backends import impl_etcd
from taskflow.jobs import base as jobs_base
from taskflow import test
from taskflow.tests.unit.jobs import base
from taskflow.tests import utils as test_utils
ETCD_AVAILABLE = test_utils.etcd_available()
class EtcdJobBoardMixin:
def create_board(self, conf=None, persistence=None):
self.path = f"test-{uuidutils.generate_uuid()}"
board_conf = {
"path": self.path,
}
if conf:
board_conf.update(conf)
board = impl_etcd.EtcdJobBoard("etcd", board_conf, persistence)
return board._client, board
class MockedEtcdJobBoard(test.TestCase, EtcdJobBoardMixin):
def test_create_board(self):
_, jobboard = self.create_board()
self.assertEqual(f"/taskflow/jobs/{self.path}", jobboard._root_path)
_, jobboard = self.create_board({"path": "/testpath"})
self.assertEqual("/taskflow/jobs/testpath", jobboard._root_path)
@mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard.incr")
@mock.patch("threading.Condition")
@mock.patch("oslo_utils.uuidutils.generate_uuid")
@mock.patch("oslo_utils.timeutils.utcnow")
def test_post(self,
mock_utcnow: mock.Mock,
mock_generated_uuid: mock.Mock,
mock_cond: mock.Mock,
mock_incr: mock.Mock):
mock_incr.return_value = 12
mock_generated_uuid.return_value = "uuid1"
mock_utcnow.return_value = "utcnow1"
mock_book = mock.Mock()
mock_book.name = "book1_name"
mock_book.uuid = "book1_uuid"
mock_details = mock.Mock()
_, jobboard = self.create_board()
jobboard._client = mock.Mock()
job = jobboard.post("post1", book=mock_book,
details=mock_details,
priority=jobs_base.JobPriority.NORMAL)
expected_key = f"/taskflow/jobs/{self.path}/job12"
expected_book_data = {
"name": "book1_name",
"uuid": "book1_uuid"
}
expected_job_posting = {
"uuid": "uuid1",
"name": "post1",
"priority": "NORMAL",
"created_on": "utcnow1",
"details": mock_details,
"book": expected_book_data,
"sequence": 12,
}
mock_incr.assert_called_with(f"/taskflow/jobs/{self.path}/sequence")
jobboard._client.create.assert_called_with(
expected_key, jsonutils.dumps(expected_job_posting))
self.assertEqual("post1", job.name)
self.assertEqual(expected_key, job.key)
self.assertEqual(mock_details, job.details)
self.assertEqual(mock_book, job.book)
self.assertEqual(expected_book_data, job._book_data)
self.assertEqual(jobs_base.JobPriority.NORMAL, job.priority)
self.assertEqual(12, job.sequence)
self.assertEqual(1, len(jobboard._job_cache))
self.assertEqual(job, jobboard._job_cache[expected_key])
@mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard."
"set_last_modified")
def test_claim(self, mock_set_last_modified):
who = "owner1"
lease_id = uuidutils.generate_uuid()
_, jobboard = self.create_board(conf={"ttl": 37})
jobboard._client = mock.Mock()
mock_lease = mock.Mock(id=lease_id)
jobboard._client.lease.return_value = mock_lease
jobboard._client.create.return_value = True
jobboard._client.get.return_value = [mock.Mock()]
job = impl_etcd.EtcdJob(jobboard,
"job7",
jobboard._client,
f"/taskflow/jobs/{self.path}/job7",
uuid=uuidutils.generate_uuid(),
details=mock.Mock(),
backend="etcd",
book=mock.Mock(),
book_data=mock.Mock(),
priority=jobs_base.JobPriority.NORMAL,
sequence=7,
created_on="date")
jobboard.claim(job, who)
jobboard._client.lease.assert_called_once_with(ttl=37)
jobboard._client.create.assert_called_once_with(
f"{job.key}{jobboard.LOCK_POSTFIX}",
jsonutils.dumps({"owner": who,
"lease_id": lease_id}),
lease=mock_lease)
jobboard._client.get.assert_called_once_with(job.key)
mock_lease.revoke.assert_not_called()
mock_set_last_modified.assert_called_once_with(job)
@mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard."
"set_last_modified")
@mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard."
"find_owner")
def test_claim_already_claimed(self, mock_find_owner,
mock_set_last_modified):
who = "owner1"
lease_id = uuidutils.generate_uuid()
mock_find_owner.return_value = who
_, jobboard = self.create_board({"ttl": 37})
jobboard._client = mock.Mock()
mock_lease = mock.Mock(id=lease_id)
jobboard._client.lease.return_value = mock_lease
jobboard._client.create.return_value = False
jobboard._client.get.return_value = []
job = impl_etcd.EtcdJob(jobboard,
"job7",
jobboard._client,
f"/taskflow/jobs/{self.path}/job7",
uuid=uuidutils.generate_uuid(),
details=mock.Mock(),
backend="etcd",
book=mock.Mock(),
book_data=mock.Mock(),
priority=jobs_base.JobPriority.NORMAL,
sequence=7,
created_on="date")
self.assertRaisesRegex(exc.UnclaimableJob, "already claimed by",
jobboard.claim, job, who)
jobboard._client.lease.assert_called_once_with(ttl=37)
jobboard._client.create.assert_called_once_with(
f"{job.key}{jobboard.LOCK_POSTFIX}",
jsonutils.dumps({"owner": who,
"lease_id": lease_id}),
lease=mock_lease)
mock_lease.revoke.assert_called_once()
mock_set_last_modified.assert_not_called()
@mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard."
"set_last_modified")
def test_claim_deleted(self, mock_set_last_modified):
who = "owner1"
lease_id = uuidutils.generate_uuid()
_, jobboard = self.create_board({"ttl": 37})
jobboard._client = mock.Mock()
mock_lease = mock.Mock(id=lease_id)
jobboard._client.lease.return_value = mock_lease
jobboard._client.create.return_value = True
jobboard._client.get.return_value = []
job = impl_etcd.EtcdJob(jobboard,
"job7",
jobboard._client,
f"/taskflow/jobs/{self.path}/job7",
uuid=uuidutils.generate_uuid(),
details=mock.Mock(),
backend="etcd",
book=mock.Mock(),
book_data=mock.Mock(),
priority=jobs_base.JobPriority.NORMAL,
sequence=7,
created_on="date")
self.assertRaisesRegex(exc.UnclaimableJob, "already deleted",
jobboard.claim, job, who)
jobboard._client.lease.assert_called_once_with(ttl=37)
jobboard._client.create.assert_called_once_with(
f"{job.key}{jobboard.LOCK_POSTFIX}",
jsonutils.dumps({"owner": who,
"lease_id": lease_id}),
lease=mock_lease)
jobboard._client.get.assert_called_once_with(job.key)
mock_lease.revoke.assert_called_once()
mock_set_last_modified.assert_not_called()
@mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard."
"get_owner_and_data")
@mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard."
"_remove_job_from_cache")
def test_consume(self, mock__remove_job_from_cache,
mock_get_owner_and_data):
mock_get_owner_and_data.return_value = ["owner1", mock.Mock()]
_, jobboard = self.create_board()
jobboard._client = mock.Mock()
job = impl_etcd.EtcdJob(jobboard,
"job7",
jobboard._client,
f"/taskflow/jobs/{self.path}/job7")
jobboard.consume(job, "owner1")
jobboard._client.delete_prefix.assert_called_once_with(job.key)
mock__remove_job_from_cache.assert_called_once_with(job.key)
@mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard."
"get_owner_and_data")
def test_consume_bad_owner(self, mock_get_owner_and_data):
mock_get_owner_and_data.return_value = ["owner2", mock.Mock()]
_, jobboard = self.create_board()
jobboard._client = mock.Mock()
job = impl_etcd.EtcdJob(jobboard,
"job7",
jobboard._client,
f"/taskflow/jobs/{self.path}/job7")
self.assertRaisesRegex(exc.JobFailure, "which is not owned",
jobboard.consume, job, "owner1")
jobboard._client.delete_prefix.assert_not_called()
@mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard."
"get_owner_and_data")
def test_abandon(self, mock_get_owner_and_data):
mock_get_owner_and_data.return_value = ["owner1", mock.Mock()]
_, jobboard = self.create_board()
jobboard._client = mock.Mock()
job = impl_etcd.EtcdJob(jobboard,
"job7",
jobboard._client,
f"/taskflow/jobs/{self.path}/job7")
jobboard.abandon(job, "owner1")
jobboard._client.delete.assert_called_once_with(
f"{job.key}{jobboard.LOCK_POSTFIX}")
@mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard."
"get_owner_and_data")
def test_abandon_bad_owner(self, mock_get_owner_and_data):
mock_get_owner_and_data.return_value = ["owner2", mock.Mock()]
_, jobboard = self.create_board()
jobboard._client = mock.Mock()
job = impl_etcd.EtcdJob(jobboard,
"job7",
jobboard._client,
f"/taskflow/jobs/{self.path}/job7")
self.assertRaisesRegex(exc.JobFailure, "which is not owned",
jobboard.abandon, job, "owner1")
jobboard._client.delete.assert_not_called()
@mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard."
"get_owner_and_data")
@mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard."
"_remove_job_from_cache")
def test_trash(self, mock__remove_job_from_cache,
mock_get_owner_and_data):
mock_get_owner_and_data.return_value = ["owner1", mock.Mock()]
_, jobboard = self.create_board()
jobboard._client = mock.Mock()
job = impl_etcd.EtcdJob(jobboard,
"job7",
jobboard._client,
f"/taskflow/jobs/{self.path}/job7")
jobboard.trash(job, "owner1")
jobboard._client.create.assert_called_once_with(
f"/taskflow/.trash/{self.path}/job7", mock.ANY)
jobboard._client.delete_prefix.assert_called_once_with(job.key)
mock__remove_job_from_cache.assert_called_once_with(job.key)
@mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard."
"get_owner_and_data")
@mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard."
"_remove_job_from_cache")
def test_trash_bad_owner(self, mock__remove_job_from_cache,
mock_get_owner_and_data):
mock_get_owner_and_data.return_value = ["owner2", mock.Mock()]
_, jobboard = self.create_board()
jobboard._client = mock.Mock()
job = impl_etcd.EtcdJob(jobboard,
"job7",
jobboard._client,
f"/taskflow/jobs/{self.path}/job7")
self.assertRaisesRegex(exc.JobFailure, "which is not owned",
jobboard.trash, job, "owner1")
jobboard._client.create.assert_not_called()
jobboard._client.delete_prefix.assert_not_called()
mock__remove_job_from_cache.assert_not_called()
@mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard."
"get_owner_and_data")
@mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard."
"_remove_job_from_cache")
def test_trash_deleted_job(self, mock__remove_job_from_cache,
mock_get_owner_and_data):
mock_get_owner_and_data.return_value = ["owner1", None]
_, jobboard = self.create_board()
jobboard._client = mock.Mock()
job = impl_etcd.EtcdJob(jobboard,
"job7",
jobboard._client,
f"/taskflow/jobs/{self.path}/job7")
self.assertRaisesRegex(exc.NotFound, "Can not find job",
jobboard.trash, job, "owner1")
jobboard._client.create.assert_not_called()
jobboard._client.delete_prefix.assert_not_called()
mock__remove_job_from_cache.assert_not_called()
@testtools.skipIf(not ETCD_AVAILABLE, 'Etcd is not available')
class EtcdJobBoardTest(test.TestCase, base.BoardTestMixin, EtcdJobBoardMixin):
def setUp(self):
super().setUp()
self.client, self.board = self.create_board()
def test__incr(self):
key = uuidutils.generate_uuid()
self.board.connect()
self.addCleanup(self.board.close)
self.addCleanup(self.board._client.delete, key)
self.assertEqual(1, self.board.incr(key))
self.assertEqual(2, self.board.incr(key))
self.assertEqual(3, self.board.incr(key))
self.assertEqual(b'3', self.board.get_one(key))
self.board.close()
def test_get_one(self):
key1 = uuidutils.generate_uuid()
self.board.connect()
self.addCleanup(self.board._client.delete, key1)
# put data and get it
self.board._client.put(key1, "testset1")
self.assertEqual(b"testset1", self.board.get_one(key1))
# delete data and check that it's not found
self.board._client.delete(key1)
self.assertIsNone(self.board.get_one(key1))
# get a non-existant data
key2 = uuidutils.generate_uuid()
# (ensure it doesn't exist)
self.board._client.delete(key2)
self.assertIsNone(self.board.get_one(key2))
self.board.close()

View File

@ -19,6 +19,7 @@ import string
import threading
import time
import etcd3gw
from oslo_utils import timeutils
import redis
@ -88,6 +89,15 @@ def redis_available(min_version):
return ok
def etcd_available():
client = etcd3gw.Etcd3Client()
try:
client.get("/")
except Exception:
return False
return True
class NoopRetry(retry.AlwaysRevert):
pass

View File

@ -9,6 +9,9 @@ zake>=0.1.6 # Apache-2.0
# redis
redis>=4.0.0 # MIT
# etcd3gw
etcd3gw>=2.0.0 # Apache-2.0
# workers
kombu>=4.3.0 # BSD