Build-out + test a redis backed jobboard
Part of blueprint taskflow-redis-jobs Change-Id: I7c94e2201c5d933c8a1ec73fc0cf705962e5eef6
This commit is contained in:
@@ -209,6 +209,20 @@ Additional *configuration* parameters:
|
|||||||
See :py:class:`~taskflow.jobs.backends.impl_zookeeper.ZookeeperJobBoard`
|
See :py:class:`~taskflow.jobs.backends.impl_zookeeper.ZookeeperJobBoard`
|
||||||
for implementation details.
|
for implementation details.
|
||||||
|
|
||||||
|
Redis
|
||||||
|
-----
|
||||||
|
|
||||||
|
**Board type**: ``'redis'``
|
||||||
|
|
||||||
|
Uses `redis`_ to provide the jobboard capabilities and semantics by using
|
||||||
|
a redis hash datastructure and individual job ownership keys (that can
|
||||||
|
optionally expire after a given amount of time).
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
See :py:class:`~taskflow.jobs.backends.impl_redis.RedisJobBoard`
|
||||||
|
for implementation details.
|
||||||
|
|
||||||
Considerations
|
Considerations
|
||||||
==============
|
==============
|
||||||
|
|
||||||
@@ -272,11 +286,17 @@ Zookeeper
|
|||||||
|
|
||||||
.. automodule:: taskflow.jobs.backends.impl_zookeeper
|
.. automodule:: taskflow.jobs.backends.impl_zookeeper
|
||||||
|
|
||||||
|
Redis
|
||||||
|
-----
|
||||||
|
|
||||||
|
.. automodule:: taskflow.jobs.backends.impl_redis
|
||||||
|
|
||||||
Hierarchy
|
Hierarchy
|
||||||
=========
|
=========
|
||||||
|
|
||||||
.. inheritance-diagram::
|
.. inheritance-diagram::
|
||||||
taskflow.jobs.base
|
taskflow.jobs.base
|
||||||
|
taskflow.jobs.backends.impl_redis
|
||||||
taskflow.jobs.backends.impl_zookeeper
|
taskflow.jobs.backends.impl_zookeeper
|
||||||
:parts: 1
|
:parts: 1
|
||||||
|
|
||||||
@@ -284,3 +304,4 @@ Hierarchy
|
|||||||
.. _zookeeper: http://zookeeper.apache.org/
|
.. _zookeeper: http://zookeeper.apache.org/
|
||||||
.. _kazoo: http://kazoo.readthedocs.org/
|
.. _kazoo: http://kazoo.readthedocs.org/
|
||||||
.. _stevedore: http://stevedore.readthedocs.org/
|
.. _stevedore: http://stevedore.readthedocs.org/
|
||||||
|
.. _redis: http://redis.io/
|
||||||
|
|||||||
@@ -43,6 +43,11 @@ Persistence
|
|||||||
|
|
||||||
.. automodule:: taskflow.utils.persistence_utils
|
.. automodule:: taskflow.utils.persistence_utils
|
||||||
|
|
||||||
|
Redis
|
||||||
|
~~~~~
|
||||||
|
|
||||||
|
.. automodule:: taskflow.utils.redis_utils
|
||||||
|
|
||||||
Schema
|
Schema
|
||||||
~~~~~~
|
~~~~~~
|
||||||
|
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ packages =
|
|||||||
[entry_points]
|
[entry_points]
|
||||||
taskflow.jobboards =
|
taskflow.jobboards =
|
||||||
zookeeper = taskflow.jobs.backends.impl_zookeeper:ZookeeperJobBoard
|
zookeeper = taskflow.jobs.backends.impl_zookeeper:ZookeeperJobBoard
|
||||||
|
redis = taskflow.jobs.backends.impl_redis:RedisJobBoard
|
||||||
|
|
||||||
taskflow.conductors =
|
taskflow.conductors =
|
||||||
blocking = taskflow.conductors.backends.impl_blocking:BlockingConductor
|
blocking = taskflow.conductors.backends.impl_blocking:BlockingConductor
|
||||||
|
|||||||
951
taskflow/jobs/backends/impl_redis.py
Normal file
951
taskflow/jobs/backends/impl_redis.py
Normal file
@@ -0,0 +1,951 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
import datetime
|
||||||
|
import string
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
import fasteners
|
||||||
|
import msgpack
|
||||||
|
from oslo_serialization import msgpackutils
|
||||||
|
from oslo_utils import strutils
|
||||||
|
from oslo_utils import timeutils
|
||||||
|
from oslo_utils import uuidutils
|
||||||
|
from redis import exceptions as redis_exceptions
|
||||||
|
import six
|
||||||
|
from six.moves import range as compat_range
|
||||||
|
|
||||||
|
from taskflow import exceptions as exc
|
||||||
|
from taskflow.jobs import base
|
||||||
|
from taskflow import logging
|
||||||
|
from taskflow import states
|
||||||
|
from taskflow.types import timing
|
||||||
|
from taskflow.utils import misc
|
||||||
|
from taskflow.utils import redis_utils as ru
|
||||||
|
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def _translate_failures():
|
||||||
|
"""Translates common redis exceptions into taskflow exceptions."""
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
except redis_exceptions.ConnectionError:
|
||||||
|
exc.raise_with_cause(exc.JobFailure, "Failed to connect to redis")
|
||||||
|
except redis_exceptions.TimeoutError:
|
||||||
|
exc.raise_with_cause(exc.JobFailure,
|
||||||
|
"Failed to communicate with redis, connection"
|
||||||
|
" timed out")
|
||||||
|
except redis_exceptions.RedisError:
|
||||||
|
exc.raise_with_cause(exc.JobFailure,
|
||||||
|
"Failed to communicate with redis,"
|
||||||
|
" internal error")
|
||||||
|
|
||||||
|
|
||||||
|
class RedisJob(base.Job):
|
||||||
|
"""A redis job."""
|
||||||
|
|
||||||
|
def __init__(self, board, name, sequence, key,
|
||||||
|
uuid=None, details=None,
|
||||||
|
created_on=None, backend=None,
|
||||||
|
book=None, book_data=None):
|
||||||
|
super(RedisJob, self).__init__(board, name,
|
||||||
|
uuid=uuid, details=details,
|
||||||
|
backend=backend,
|
||||||
|
book=book, book_data=book_data)
|
||||||
|
self._created_on = created_on
|
||||||
|
self._client = board._client
|
||||||
|
self._redis_version = board._redis_version
|
||||||
|
self._sequence = sequence
|
||||||
|
self._key = key
|
||||||
|
self._last_modified_key = board.join(key + board.LAST_MODIFIED_POSTFIX)
|
||||||
|
self._owner_key = board.join(key + board.OWNED_POSTFIX)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def key(self):
|
||||||
|
"""Key (in board listings/trash hash) the job data is stored under."""
|
||||||
|
return self._key
|
||||||
|
|
||||||
|
@property
|
||||||
|
def last_modified_key(self):
|
||||||
|
"""Key the job last modified data is stored under."""
|
||||||
|
return self._last_modified_key
|
||||||
|
|
||||||
|
@property
|
||||||
|
def owner_key(self):
|
||||||
|
"""Key the job claim + data of the owner is stored under."""
|
||||||
|
return self._owner_key
|
||||||
|
|
||||||
|
@property
|
||||||
|
def sequence(self):
|
||||||
|
"""Sequence number of the current job."""
|
||||||
|
return self._sequence
|
||||||
|
|
||||||
|
def expires_in(self):
|
||||||
|
"""How many seconds until the claim expires.
|
||||||
|
|
||||||
|
Returns the number of seconds until the ownership entry expires or
|
||||||
|
:attr:`~taskflow.utils.redis_utils.UnknownExpire.DOES_NOT_EXPIRE` or
|
||||||
|
:attr:`~taskflow.utils.redis_utils.UnknownExpire.KEY_NOT_FOUND` if it
|
||||||
|
does not expire or if the expiry can not be determined (perhaps the
|
||||||
|
:attr:`.owner_key` expired at/before time of inquiry?).
|
||||||
|
"""
|
||||||
|
with _translate_failures():
|
||||||
|
return ru.get_expiry(self._client, self._owner_key,
|
||||||
|
prior_version=self._redis_version)
|
||||||
|
|
||||||
|
def extend_expiry(self, expiry):
|
||||||
|
"""Extends the owner key (aka the claim) expiry for this job.
|
||||||
|
|
||||||
|
NOTE(harlowja): if the claim for this job did **not** previously
|
||||||
|
have an expiry associated with it, calling this method will create
|
||||||
|
one (and after that time elapses the claim on this job will cease
|
||||||
|
to exist).
|
||||||
|
|
||||||
|
Returns ``True`` if the expiry request was performed
|
||||||
|
otherwise ``False``.
|
||||||
|
"""
|
||||||
|
with _translate_failures():
|
||||||
|
return ru.apply_expiry(self._client, self._owner_key, expiry,
|
||||||
|
prior_version=self._redis_version)
|
||||||
|
|
||||||
|
def __lt__(self, other):
|
||||||
|
if self.created_on == other.created_on:
|
||||||
|
return self.sequence < other.sequence
|
||||||
|
else:
|
||||||
|
return self.created_on < other.created_on
|
||||||
|
|
||||||
|
@property
|
||||||
|
def created_on(self):
|
||||||
|
return self._created_on
|
||||||
|
|
||||||
|
@property
|
||||||
|
def last_modified(self):
|
||||||
|
with _translate_failures():
|
||||||
|
raw_last_modified = self._client.get(self._last_modified_key)
|
||||||
|
last_modified = None
|
||||||
|
if raw_last_modified:
|
||||||
|
last_modified = self._board._loads(
|
||||||
|
raw_last_modified, root_types=(datetime.datetime,))
|
||||||
|
# NOTE(harlowja): just incase this is somehow busted (due to time
|
||||||
|
# sync issues/other), give back the most recent one (since redis
|
||||||
|
# does not maintain clock information; we could have this happen
|
||||||
|
# due to now clients who mutate jobs also send the time in).
|
||||||
|
last_modified = max(last_modified, self._created_on)
|
||||||
|
return last_modified
|
||||||
|
|
||||||
|
@property
|
||||||
|
def state(self):
|
||||||
|
listings_key = self._board.listings_key
|
||||||
|
owner_key = self._owner_key
|
||||||
|
listings_sub_key = self._key
|
||||||
|
|
||||||
|
def _do_fetch(p):
|
||||||
|
# NOTE(harlowja): state of a job in redis is not set into any
|
||||||
|
# explicit 'state' field, but is maintained by what nodes exist in
|
||||||
|
# redis instead (ie if a owner key exists, then we know a owner
|
||||||
|
# is active, if no job data exists and no owner, then we know that
|
||||||
|
# the job is unclaimed, and so-on)...
|
||||||
|
p.multi()
|
||||||
|
p.hexists(listings_key, listings_sub_key)
|
||||||
|
p.exists(owner_key)
|
||||||
|
job_exists, owner_exists = p.execute()
|
||||||
|
if not job_exists:
|
||||||
|
if owner_exists:
|
||||||
|
# This should **not** be possible due to lua code ordering
|
||||||
|
# but let's log an INFO statement if it does happen (so
|
||||||
|
# that it can be investigated)...
|
||||||
|
LOG.info("Unexpected owner key found at '%s' when job"
|
||||||
|
" key '%s[%s]' was not found", owner_key,
|
||||||
|
listings_key, listings_sub_key)
|
||||||
|
return states.COMPLETE
|
||||||
|
else:
|
||||||
|
if owner_exists:
|
||||||
|
return states.CLAIMED
|
||||||
|
else:
|
||||||
|
return states.UNCLAIMED
|
||||||
|
|
||||||
|
with _translate_failures():
|
||||||
|
return self._client.transaction(_do_fetch,
|
||||||
|
listings_key, owner_key,
|
||||||
|
value_from_callable=True)
|
||||||
|
|
||||||
|
|
||||||
|
class RedisJobBoard(base.JobBoard):
|
||||||
|
"""A jobboard backed by `redis`_.
|
||||||
|
|
||||||
|
Powered by the `redis-py <http://redis-py.readthedocs.org/>`_ library.
|
||||||
|
|
||||||
|
This jobboard creates job entries by listing jobs in a redis `hash`_. This
|
||||||
|
hash contains jobs that can be actively worked on by (and examined/claimed
|
||||||
|
by) some set of eligible consumers. Job posting is typically performed
|
||||||
|
using the :meth:`.post` method (this creates a hash entry with job
|
||||||
|
contents/details encoded in `msgpack`_). The users of these
|
||||||
|
jobboard(s) (potentially on disjoint sets of machines) can then
|
||||||
|
iterate over the available jobs and decide if they want to attempt to
|
||||||
|
claim one of the jobs they have iterated over. If so they will then
|
||||||
|
attempt to contact redis and they will attempt to create a key in
|
||||||
|
redis (using a embedded lua script to perform this atomically) to claim a
|
||||||
|
desired job. If the entity trying to use the jobboard to :meth:`.claim`
|
||||||
|
the job is able to create that lock/owner key then it will be
|
||||||
|
allowed (and expected) to perform whatever *work* the contents of that
|
||||||
|
job described. Once the claiming entity is finished the lock/owner key
|
||||||
|
and the `hash`_ entry will be deleted (if successfully completed) in a
|
||||||
|
single request (also using a embedded lua script to perform this
|
||||||
|
atomically). If the claiming entity is not successful (or the entity
|
||||||
|
that claimed the job dies) the lock/owner key can be released
|
||||||
|
automatically (by **optional** usage of a claim expiry) or by
|
||||||
|
using :meth:`.abandon` to manually abandon the job so that it can be
|
||||||
|
consumed/worked on by others.
|
||||||
|
|
||||||
|
NOTE(harlowja): by default the :meth:`.claim` has no expiry (which
|
||||||
|
means claims will be persistent, even under claiming entity failure). To
|
||||||
|
ensure a expiry occurs pass a numeric value for the ``expiry`` keyword
|
||||||
|
argument to the :meth:`.claim` method that defines how many seconds the
|
||||||
|
claim should be retained for. When an expiry is used ensure that that
|
||||||
|
claim is kept alive while it is being worked on by using
|
||||||
|
the :py:meth:`~.RedisJob.extend_expiry` method periodically.
|
||||||
|
|
||||||
|
.. _msgpack: http://msgpack.org/
|
||||||
|
.. _redis: http://redis.io/
|
||||||
|
.. _hash: http://redis.io/topics/data-types#hashes
|
||||||
|
"""
|
||||||
|
|
||||||
|
CLIENT_CONF_TRANSFERS = tuple([
|
||||||
|
# Host config...
|
||||||
|
('host', str),
|
||||||
|
('port', int),
|
||||||
|
|
||||||
|
# See: http://redis.io/commands/auth
|
||||||
|
('password', str),
|
||||||
|
|
||||||
|
# Data encoding/decoding + error handling
|
||||||
|
('encoding', str),
|
||||||
|
('encoding_errors', str),
|
||||||
|
|
||||||
|
# Connection settings.
|
||||||
|
('socket_timeout', float),
|
||||||
|
('socket_connect_timeout', float),
|
||||||
|
|
||||||
|
# This one negates the usage of host, port, socket connection
|
||||||
|
# settings as it doesn't use the same kind of underlying socket...
|
||||||
|
('unix_socket_path', str),
|
||||||
|
|
||||||
|
# Do u want ssl???
|
||||||
|
('ssl', strutils.bool_from_string),
|
||||||
|
('ssl_keyfile', str),
|
||||||
|
('ssl_certfile', str),
|
||||||
|
('ssl_cert_reqs', str),
|
||||||
|
('ssl_ca_certs', str),
|
||||||
|
|
||||||
|
# See: http://www.rediscookbook.org/multiple_databases.html
|
||||||
|
('db', int),
|
||||||
|
])
|
||||||
|
"""
|
||||||
|
Keys (and value type converters) that we allow to proxy from the jobboard
|
||||||
|
configuration into the redis client (used to configure the redis client
|
||||||
|
internals if no explicit client is provided via the ``client`` keyword
|
||||||
|
argument).
|
||||||
|
|
||||||
|
See: http://redis-py.readthedocs.org/en/latest/#redis.Redis
|
||||||
|
|
||||||
|
See: https://github.com/andymccurdy/redis-py/blob/2.10.3/redis/client.py
|
||||||
|
"""
|
||||||
|
|
||||||
|
#: Postfix (combined with job key) used to make a jobs owner key.
|
||||||
|
OWNED_POSTFIX = b".owned"
|
||||||
|
|
||||||
|
#: Postfix (combined with job key) used to make a jobs last modified key.
|
||||||
|
LAST_MODIFIED_POSTFIX = b".last_modified"
|
||||||
|
|
||||||
|
#: Default namespace for keys when none is provided.
|
||||||
|
DEFAULT_NAMESPACE = b'taskflow'
|
||||||
|
|
||||||
|
MIN_REDIS_VERSION = (2, 6)
|
||||||
|
"""
|
||||||
|
Minimum redis version this backend requires.
|
||||||
|
|
||||||
|
This version is required since we need the built-in server-side lua
|
||||||
|
scripting support that is included in 2.6 and newer.
|
||||||
|
"""
|
||||||
|
|
||||||
|
NAMESPACE_SEP = b':'
|
||||||
|
"""
|
||||||
|
Separator that is used to combine a key with the namespace (to get
|
||||||
|
the **actual** key that will be used).
|
||||||
|
"""
|
||||||
|
|
||||||
|
KEY_PIECE_SEP = b'.'
|
||||||
|
"""
|
||||||
|
Separator that is used to combine a bunch of key pieces together (to get
|
||||||
|
the **actual** key that will be used).
|
||||||
|
"""
|
||||||
|
|
||||||
|
#: Expected lua response status field when call is ok.
|
||||||
|
SCRIPT_STATUS_OK = "ok"
|
||||||
|
|
||||||
|
#: Expected lua response status field when call is **not** ok.
|
||||||
|
SCRIPT_STATUS_ERROR = "error"
|
||||||
|
|
||||||
|
#: Expected lua script error response when the owner is not as expected.
|
||||||
|
SCRIPT_NOT_EXPECTED_OWNER = "Not expected owner!"
|
||||||
|
|
||||||
|
#: Expected lua script error response when the owner is not findable.
|
||||||
|
SCRIPT_UNKNOWN_OWNER = "Unknown owner!"
|
||||||
|
|
||||||
|
#: Expected lua script error response when the job is not findable.
|
||||||
|
SCRIPT_UNKNOWN_JOB = "Unknown job!"
|
||||||
|
|
||||||
|
#: Expected lua script error response when the job is already claimed.
|
||||||
|
SCRIPT_ALREADY_CLAIMED = "Job already claimed!"
|
||||||
|
|
||||||
|
SCRIPT_TEMPLATES = {
|
||||||
|
'consume': """
|
||||||
|
-- Extract *all* the variables (so we can easily know what they are)...
|
||||||
|
local owner_key = KEYS[1]
|
||||||
|
local listings_key = KEYS[2]
|
||||||
|
local last_modified_key = KEYS[3]
|
||||||
|
|
||||||
|
local expected_owner = ARGV[1]
|
||||||
|
local job_key = ARGV[2]
|
||||||
|
local result = {}
|
||||||
|
if redis.call("hexists", listings_key, job_key) == 1 then
|
||||||
|
if redis.call("exists", owner_key) == 1 then
|
||||||
|
local owner = redis.call("get", owner_key)
|
||||||
|
if owner ~= expected_owner then
|
||||||
|
result["status"] = "${error}"
|
||||||
|
result["reason"] = "${not_expected_owner}"
|
||||||
|
result["owner"] = owner
|
||||||
|
else
|
||||||
|
-- The order is important here, delete the owner first (and if
|
||||||
|
-- that blows up, the job data will still exist so it can be
|
||||||
|
-- worked on again, instead of the reverse)...
|
||||||
|
redis.call("del", owner_key, last_modified_key)
|
||||||
|
redis.call("hdel", listings_key, job_key)
|
||||||
|
result["status"] = "${ok}"
|
||||||
|
end
|
||||||
|
else
|
||||||
|
result["status"] = "${error}"
|
||||||
|
result["reason"] = "${unknown_owner}"
|
||||||
|
end
|
||||||
|
else
|
||||||
|
result["status"] = "${error}"
|
||||||
|
result["reason"] = "${unknown_job}"
|
||||||
|
end
|
||||||
|
return cmsgpack.pack(result)
|
||||||
|
""",
|
||||||
|
'claim': """
|
||||||
|
local function apply_ttl(key, ms_expiry)
|
||||||
|
if ms_expiry ~= nil then
|
||||||
|
redis.call("pexpire", key, ms_expiry)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
-- Extract *all* the variables (so we can easily know what they are)...
|
||||||
|
local owner_key = KEYS[1]
|
||||||
|
local listings_key = KEYS[2]
|
||||||
|
local last_modified_key = KEYS[3]
|
||||||
|
|
||||||
|
local expected_owner = ARGV[1]
|
||||||
|
local job_key = ARGV[2]
|
||||||
|
local last_modified_blob = ARGV[3]
|
||||||
|
|
||||||
|
-- If this is non-numeric (which it may be) this becomes nil
|
||||||
|
local ms_expiry = nil
|
||||||
|
if ARGV[4] ~= "none" then
|
||||||
|
ms_expiry = tonumber(ARGV[4])
|
||||||
|
end
|
||||||
|
local result = {}
|
||||||
|
if redis.call("hexists", listings_key, job_key) == 1 then
|
||||||
|
if redis.call("exists", owner_key) == 1 then
|
||||||
|
local owner = redis.call("get", owner_key)
|
||||||
|
if owner == expected_owner then
|
||||||
|
-- Owner is the same, leave it alone...
|
||||||
|
redis.call("set", last_modified_key, last_modified_blob)
|
||||||
|
apply_ttl(owner_key, ms_expiry)
|
||||||
|
result["status"] = "${ok}"
|
||||||
|
else
|
||||||
|
result["status"] = "${error}"
|
||||||
|
result["reason"] = "${already_claimed}"
|
||||||
|
result["owner"] = owner
|
||||||
|
end
|
||||||
|
else
|
||||||
|
redis.call("set", owner_key, expected_owner)
|
||||||
|
redis.call("set", last_modified_key, last_modified_blob)
|
||||||
|
apply_ttl(owner_key, ms_expiry)
|
||||||
|
result["status"] = "${ok}"
|
||||||
|
end
|
||||||
|
else
|
||||||
|
result["status"] = "${error}"
|
||||||
|
result["reason"] = "${unknown_job}"
|
||||||
|
end
|
||||||
|
return cmsgpack.pack(result)
|
||||||
|
""",
|
||||||
|
'abandon': """
|
||||||
|
-- Extract *all* the variables (so we can easily know what they are)...
|
||||||
|
local owner_key = KEYS[1]
|
||||||
|
local listings_key = KEYS[2]
|
||||||
|
local last_modified_key = KEYS[3]
|
||||||
|
|
||||||
|
local expected_owner = ARGV[1]
|
||||||
|
local job_key = ARGV[2]
|
||||||
|
local last_modified_blob = ARGV[3]
|
||||||
|
local result = {}
|
||||||
|
if redis.call("hexists", listings_key, job_key) == 1 then
|
||||||
|
if redis.call("exists", owner_key) == 1 then
|
||||||
|
local owner = redis.call("get", owner_key)
|
||||||
|
if owner ~= expected_owner then
|
||||||
|
result["status"] = "${error}"
|
||||||
|
result["reason"] = "${not_expected_owner}"
|
||||||
|
result["owner"] = owner
|
||||||
|
else
|
||||||
|
redis.call("del", owner_key)
|
||||||
|
redis.call("set", last_modified_key, last_modified_blob)
|
||||||
|
result["status"] = "${ok}"
|
||||||
|
end
|
||||||
|
else
|
||||||
|
result["status"] = "${error}"
|
||||||
|
result["reason"] = "${unknown_owner}"
|
||||||
|
end
|
||||||
|
else
|
||||||
|
result["status"] = "${error}"
|
||||||
|
result["reason"] = "${unknown_job}"
|
||||||
|
end
|
||||||
|
return cmsgpack.pack(result)
|
||||||
|
""",
|
||||||
|
'trash': """
|
||||||
|
-- Extract *all* the variables (so we can easily know what they are)...
|
||||||
|
local owner_key = KEYS[1]
|
||||||
|
local listings_key = KEYS[2]
|
||||||
|
local last_modified_key = KEYS[3]
|
||||||
|
local trash_listings_key = KEYS[4]
|
||||||
|
|
||||||
|
local expected_owner = ARGV[1]
|
||||||
|
local job_key = ARGV[2]
|
||||||
|
local last_modified_blob = ARGV[3]
|
||||||
|
local result = {}
|
||||||
|
if redis.call("hexists", listings_key, job_key) == 1 then
|
||||||
|
local raw_posting = redis.call("hget", listings_key, job_key)
|
||||||
|
if redis.call("exists", owner_key) == 1 then
|
||||||
|
local owner = redis.call("get", owner_key)
|
||||||
|
if owner ~= expected_owner then
|
||||||
|
result["status"] = "${error}"
|
||||||
|
result["reason"] = "${not_expected_owner}"
|
||||||
|
result["owner"] = owner
|
||||||
|
else
|
||||||
|
-- This ordering is important (try to first move the value
|
||||||
|
-- and only if that works do we try to do any deletions)...
|
||||||
|
redis.call("hset", trash_listings_key, job_key, raw_posting)
|
||||||
|
redis.call("set", last_modified_key, last_modified_blob)
|
||||||
|
redis.call("del", owner_key)
|
||||||
|
redis.call("hdel", listings_key, job_key)
|
||||||
|
result["status"] = "${ok}"
|
||||||
|
end
|
||||||
|
else
|
||||||
|
result["status"] = "${error}"
|
||||||
|
result["reason"] = "${unknown_owner}"
|
||||||
|
end
|
||||||
|
else
|
||||||
|
result["status"] = "${error}"
|
||||||
|
result["reason"] = "${unknown_job}"
|
||||||
|
end
|
||||||
|
return cmsgpack.pack(result)
|
||||||
|
""",
|
||||||
|
}
|
||||||
|
"""`Lua`_ **template** scripts that will be used by various methods (they
|
||||||
|
are turned into real scripts and loaded on call into the :func:`.connect`
|
||||||
|
method).
|
||||||
|
|
||||||
|
Some things to note:
|
||||||
|
|
||||||
|
- The lua script is ran serially, so when this runs no other command will
|
||||||
|
be mutating the backend (and redis also ensures that no other script
|
||||||
|
will be running) so atomicity of these scripts are guaranteed by redis.
|
||||||
|
|
||||||
|
- Transactions were considered (and even mostly implemented) but
|
||||||
|
ultimately rejected since redis does not support rollbacks and
|
||||||
|
transactions can **not** be interdependent (later operations can **not**
|
||||||
|
depend on the results of earlier operations). Both of these issues limit
|
||||||
|
our ability to correctly report errors (with useful messages) and to
|
||||||
|
maintain consistency under failure/contention (due to the inability to
|
||||||
|
rollback). A third and final blow to using transactions was to
|
||||||
|
correctly use them we would have to set a watch on a *very* contentious
|
||||||
|
key (the listings key) which would under load cause clients to retry more
|
||||||
|
often then would be desired (this also increases network load, CPU
|
||||||
|
cycles used, transactions failures triggered and so on).
|
||||||
|
|
||||||
|
- Partial transaction execution is possible due to pre/post ``EXEC``
|
||||||
|
failures (and the lack of rollback makes this worse).
|
||||||
|
|
||||||
|
So overall after thinking, it seemed like having little lua scripts
|
||||||
|
was not that bad (even if it is somewhat convoluted) due to the above and
|
||||||
|
public mentioned issues with transactions. In general using lua scripts
|
||||||
|
for this purpose seems to be somewhat common practice and it solves the
|
||||||
|
issues that came up when transactions were considered & implemented.
|
||||||
|
|
||||||
|
Some links about redis (and redis + lua) that may be useful to look over:
|
||||||
|
|
||||||
|
- `Atomicity of scripts`_
|
||||||
|
- `Scripting and transactions`_
|
||||||
|
- `Why redis does not support rollbacks`_
|
||||||
|
- `Intro to lua for redis programmers`_
|
||||||
|
- `Five key takeaways for developing with redis`_
|
||||||
|
- `Everything you always wanted to know about redis`_ (slides)
|
||||||
|
|
||||||
|
.. _Lua: http://www.lua.org/
|
||||||
|
.. _Atomicity of scripts: http://redis.io/commands/eval#atomicity-of-\
|
||||||
|
scripts
|
||||||
|
.. _Scripting and transactions: http://redis.io/topics/transactions#redis-\
|
||||||
|
scripting-and-transactions
|
||||||
|
.. _Why redis does not support rollbacks: http://redis.io/topics/transa\
|
||||||
|
ctions#why-redis-does-not-suppo\
|
||||||
|
rt-roll-backs
|
||||||
|
.. _Intro to lua for redis programmers: http://www.redisgreen.net/blog/int\
|
||||||
|
ro-to-lua-for-redis-programmers
|
||||||
|
.. _Five key takeaways for developing with redis: https://redislabs.com/bl\
|
||||||
|
og/5-key-takeaways-fo\
|
||||||
|
r-developing-with-redis
|
||||||
|
.. _Everything you always wanted to know about redis: http://www.slidesh
|
||||||
|
are.net/carlosabal\
|
||||||
|
de/everything-you-a\
|
||||||
|
lways-wanted-to-\
|
||||||
|
know-about-redis-b\
|
||||||
|
ut-were-afraid-to-ask
|
||||||
|
"""
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _make_client(cls, conf):
|
||||||
|
client_conf = {}
|
||||||
|
for key, value_type_converter in cls.CLIENT_CONF_TRANSFERS:
|
||||||
|
if key in conf:
|
||||||
|
if value_type_converter is not None:
|
||||||
|
client_conf[key] = value_type_converter(conf[key])
|
||||||
|
else:
|
||||||
|
client_conf[key] = conf[key]
|
||||||
|
return ru.RedisClient(**client_conf)
|
||||||
|
|
||||||
|
def __init__(self, name, conf,
|
||||||
|
client=None, persistence=None):
|
||||||
|
super(RedisJobBoard, self).__init__(name, conf)
|
||||||
|
self._closed = True
|
||||||
|
if client is not None:
|
||||||
|
self._client = client
|
||||||
|
self._owns_client = False
|
||||||
|
else:
|
||||||
|
self._client = self._make_client(self._conf)
|
||||||
|
# NOTE(harlowja): This client should not work until connected...
|
||||||
|
self._client.close()
|
||||||
|
self._owns_client = True
|
||||||
|
self._namespace = self._conf.get('namespace', self.DEFAULT_NAMESPACE)
|
||||||
|
self._open_close_lock = threading.RLock()
|
||||||
|
# Redis server version connected to + scripts (populated on connect).
|
||||||
|
self._redis_version = None
|
||||||
|
self._scripts = {}
|
||||||
|
# The backend to load the full logbooks from, since what is sent over
|
||||||
|
# the data connection is only the logbook uuid and name, and not the
|
||||||
|
# full logbook.
|
||||||
|
self._persistence = persistence
|
||||||
|
|
||||||
|
def join(self, key_piece, *more_key_pieces):
|
||||||
|
"""Create and return a namespaced key from many segments.
|
||||||
|
|
||||||
|
NOTE(harlowja): all pieces that are text/unicode are converted into
|
||||||
|
their binary equivalent (if they are already binary no conversion
|
||||||
|
takes place) before being joined (as redis expects binary keys and not
|
||||||
|
unicode/text ones).
|
||||||
|
"""
|
||||||
|
namespace_pieces = []
|
||||||
|
if self._namespace is not None:
|
||||||
|
namespace_pieces = [self._namespace, self.NAMESPACE_SEP]
|
||||||
|
else:
|
||||||
|
namespace_pieces = []
|
||||||
|
key_pieces = [key_piece]
|
||||||
|
if more_key_pieces:
|
||||||
|
key_pieces.extend(more_key_pieces)
|
||||||
|
for i in compat_range(0, len(namespace_pieces)):
|
||||||
|
namespace_pieces[i] = misc.binary_encode(namespace_pieces[i])
|
||||||
|
for i in compat_range(0, len(key_pieces)):
|
||||||
|
key_pieces[i] = misc.binary_encode(key_pieces[i])
|
||||||
|
namespace = b"".join(namespace_pieces)
|
||||||
|
key = self.KEY_PIECE_SEP.join(key_pieces)
|
||||||
|
return namespace + key
|
||||||
|
|
||||||
|
@property
|
||||||
|
def namespace(self):
|
||||||
|
"""The namespace all keys will be prefixed with (or none)."""
|
||||||
|
return self._namespace
|
||||||
|
|
||||||
|
@misc.cachedproperty
|
||||||
|
def trash_key(self):
|
||||||
|
"""Key where a hash will be stored with trashed jobs in it."""
|
||||||
|
return self.join(b"trash")
|
||||||
|
|
||||||
|
@misc.cachedproperty
|
||||||
|
def sequence_key(self):
|
||||||
|
"""Key where a integer will be stored (used to sequence jobs)."""
|
||||||
|
return self.join(b"sequence")
|
||||||
|
|
||||||
|
@misc.cachedproperty
|
||||||
|
def listings_key(self):
|
||||||
|
"""Key where a hash will be stored with active jobs in it."""
|
||||||
|
return self.join(b"listings")
|
||||||
|
|
||||||
|
@property
|
||||||
|
def job_count(self):
|
||||||
|
with _translate_failures():
|
||||||
|
return self._client.hlen(self.listings_key)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def connected(self):
|
||||||
|
return not self._closed
|
||||||
|
|
||||||
|
@fasteners.locked(lock='_open_close_lock')
|
||||||
|
def connect(self):
|
||||||
|
self.close()
|
||||||
|
if self._owns_client:
|
||||||
|
self._client = self._make_client(self._conf)
|
||||||
|
with _translate_failures():
|
||||||
|
# The client maintains a connection pool, so do a ping and
|
||||||
|
# if that works then assume the connection works, which may or
|
||||||
|
# may not be continuously maintained (if the server dies
|
||||||
|
# at a later time, we will become aware of that when the next
|
||||||
|
# op occurs).
|
||||||
|
self._client.ping()
|
||||||
|
is_new_enough, redis_version = ru.is_server_new_enough(
|
||||||
|
self._client, self.MIN_REDIS_VERSION)
|
||||||
|
if not is_new_enough:
|
||||||
|
wanted_version = ".".join([str(p)
|
||||||
|
for p in self.MIN_REDIS_VERSION])
|
||||||
|
if redis_version:
|
||||||
|
raise exc.JobFailure("Redis version %s or greater is"
|
||||||
|
" required (version %s is to"
|
||||||
|
" old)" % (wanted_version,
|
||||||
|
redis_version))
|
||||||
|
else:
|
||||||
|
raise exc.JobFailure("Redis version %s or greater is"
|
||||||
|
" required" % (wanted_version))
|
||||||
|
else:
|
||||||
|
self._redis_version = redis_version
|
||||||
|
script_params = {
|
||||||
|
# Status field values.
|
||||||
|
'ok': self.SCRIPT_STATUS_OK,
|
||||||
|
'error': self.SCRIPT_STATUS_ERROR,
|
||||||
|
|
||||||
|
# Known error reasons (when status field is error).
|
||||||
|
'not_expected_owner': self.SCRIPT_NOT_EXPECTED_OWNER,
|
||||||
|
'unknown_owner': self.SCRIPT_UNKNOWN_OWNER,
|
||||||
|
'unknown_job': self.SCRIPT_UNKNOWN_JOB,
|
||||||
|
'already_claimed': self.SCRIPT_ALREADY_CLAIMED,
|
||||||
|
}
|
||||||
|
prepared_scripts = {}
|
||||||
|
for n, raw_script_tpl in six.iteritems(self.SCRIPT_TEMPLATES):
|
||||||
|
script_tpl = string.Template(raw_script_tpl)
|
||||||
|
script_blob = script_tpl.substitute(**script_params)
|
||||||
|
script = self._client.register_script(script_blob)
|
||||||
|
prepared_scripts[n] = script
|
||||||
|
self._scripts.update(prepared_scripts)
|
||||||
|
self._closed = False
|
||||||
|
|
||||||
|
@fasteners.locked(lock='_open_close_lock')
|
||||||
|
def close(self):
|
||||||
|
if self._owns_client:
|
||||||
|
self._client.close()
|
||||||
|
self._scripts.clear()
|
||||||
|
self._redis_version = None
|
||||||
|
self._closed = True
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _dumps(obj):
|
||||||
|
try:
|
||||||
|
return msgpackutils.dumps(obj)
|
||||||
|
except (msgpack.PackException, ValueError):
|
||||||
|
# TODO(harlowja): remove direct msgpack exception access when
|
||||||
|
# oslo.utils provides easy access to the underlying msgpack
|
||||||
|
# pack/unpack exceptions..
|
||||||
|
exc.raise_with_cause(exc.JobFailure,
|
||||||
|
"Failed to serialize object to"
|
||||||
|
" msgpack blob")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _loads(blob, root_types=(dict,)):
|
||||||
|
try:
|
||||||
|
return misc.decode_msgpack(blob, root_types=root_types)
|
||||||
|
except (msgpack.UnpackException, ValueError):
|
||||||
|
# TODO(harlowja): remove direct msgpack exception access when
|
||||||
|
# oslo.utils provides easy access to the underlying msgpack
|
||||||
|
# pack/unpack exceptions..
|
||||||
|
exc.raise_with_cause(exc.JobFailure,
|
||||||
|
"Failed to deserialize object from"
|
||||||
|
" msgpack blob (of length %s)" % len(blob))
|
||||||
|
|
||||||
|
_decode_owner = staticmethod(misc.binary_decode)
|
||||||
|
|
||||||
|
_encode_owner = staticmethod(misc.binary_encode)
|
||||||
|
|
||||||
|
def find_owner(self, job):
|
||||||
|
owner_key = self.join(job.key + self.OWNED_POSTFIX)
|
||||||
|
with _translate_failures():
|
||||||
|
raw_owner = self._client.get(owner_key)
|
||||||
|
return self._decode_owner(raw_owner)
|
||||||
|
|
||||||
|
def post(self, name, book=None, details=None):
|
||||||
|
job_uuid = uuidutils.generate_uuid()
|
||||||
|
posting = base.format_posting(job_uuid, name,
|
||||||
|
created_on=timeutils.utcnow(),
|
||||||
|
book=book, details=details)
|
||||||
|
with _translate_failures():
|
||||||
|
sequence = self._client.incr(self.sequence_key)
|
||||||
|
posting.update({
|
||||||
|
'sequence': sequence,
|
||||||
|
})
|
||||||
|
with _translate_failures():
|
||||||
|
raw_posting = self._dumps(posting)
|
||||||
|
raw_job_uuid = six.b(job_uuid)
|
||||||
|
was_posted = bool(self._client.hsetnx(self.listings_key,
|
||||||
|
raw_job_uuid, raw_posting))
|
||||||
|
if not was_posted:
|
||||||
|
raise exc.JobFailure("New job located at '%s[%s]' could not"
|
||||||
|
" be posted" % (self.listings_key,
|
||||||
|
raw_job_uuid))
|
||||||
|
else:
|
||||||
|
return RedisJob(self, name, sequence, raw_job_uuid,
|
||||||
|
uuid=job_uuid, details=details,
|
||||||
|
created_on=posting['created_on'],
|
||||||
|
book=book, book_data=posting.get('book'),
|
||||||
|
backend=self._persistence)
|
||||||
|
|
||||||
|
def wait(self, timeout=None, initial_delay=0.005,
|
||||||
|
max_delay=1.0, sleep_func=time.sleep):
|
||||||
|
if initial_delay > max_delay:
|
||||||
|
raise ValueError("Initial delay %s must be less than or equal"
|
||||||
|
" to the provided max delay %s"
|
||||||
|
% (initial_delay, max_delay))
|
||||||
|
# This does a spin-loop that backs off by doubling the delay
|
||||||
|
# up to the provided max-delay. In the future we could try having
|
||||||
|
# a secondary client connected into redis pubsub and use that
|
||||||
|
# instead, but for now this is simpler.
|
||||||
|
w = timing.StopWatch(duration=timeout)
|
||||||
|
w.start()
|
||||||
|
delay = initial_delay
|
||||||
|
while True:
|
||||||
|
jc = self.job_count
|
||||||
|
if jc > 0:
|
||||||
|
it = self.iterjobs()
|
||||||
|
return it
|
||||||
|
else:
|
||||||
|
if w.expired():
|
||||||
|
raise exc.NotFound("Expired waiting for jobs to"
|
||||||
|
" arrive; waited %s seconds"
|
||||||
|
% w.elapsed())
|
||||||
|
else:
|
||||||
|
remaining = w.leftover(return_none=True)
|
||||||
|
if remaining is not None:
|
||||||
|
delay = min(delay * 2, remaining, max_delay)
|
||||||
|
else:
|
||||||
|
delay = min(delay * 2, max_delay)
|
||||||
|
sleep_func(delay)
|
||||||
|
|
||||||
|
def iterjobs(self, only_unclaimed=False, ensure_fresh=False):
|
||||||
|
with _translate_failures():
|
||||||
|
raw_postings = self._client.hgetall(self.listings_key)
|
||||||
|
postings = []
|
||||||
|
for raw_job_key, raw_posting in six.iteritems(raw_postings):
|
||||||
|
posting = self._loads(raw_posting)
|
||||||
|
details = posting.get('details', {})
|
||||||
|
job_uuid = posting['uuid']
|
||||||
|
job = RedisJob(self, posting['name'], posting['sequence'],
|
||||||
|
raw_job_key, uuid=job_uuid, details=details,
|
||||||
|
created_on=posting['created_on'],
|
||||||
|
book_data=posting.get('book'),
|
||||||
|
backend=self._persistence)
|
||||||
|
postings.append(job)
|
||||||
|
postings = sorted(postings)
|
||||||
|
for job in postings:
|
||||||
|
if only_unclaimed:
|
||||||
|
if job.state == states.UNCLAIMED:
|
||||||
|
yield job
|
||||||
|
else:
|
||||||
|
yield job
|
||||||
|
|
||||||
|
@base.check_who
|
||||||
|
def consume(self, job, who):
|
||||||
|
script = self._get_script('consume')
|
||||||
|
with _translate_failures():
|
||||||
|
raw_who = self._encode_owner(who)
|
||||||
|
raw_result = script(keys=[job.owner_key, self.listings_key,
|
||||||
|
job.last_modified_key],
|
||||||
|
args=[raw_who, job.key])
|
||||||
|
result = self._loads(raw_result)
|
||||||
|
status = result['status']
|
||||||
|
if status != self.SCRIPT_STATUS_OK:
|
||||||
|
reason = result.get('reason')
|
||||||
|
if reason == self.SCRIPT_UNKNOWN_JOB:
|
||||||
|
raise exc.NotFound("Job %s not found to be"
|
||||||
|
" consumed" % (job.uuid))
|
||||||
|
elif reason == self.SCRIPT_UNKNOWN_OWNER:
|
||||||
|
raise exc.NotFound("Can not consume job %s"
|
||||||
|
" which we can not determine"
|
||||||
|
" the owner of" % (job.uuid))
|
||||||
|
elif reason == self.SCRIPT_NOT_EXPECTED_OWNER:
|
||||||
|
raw_owner = result.get('owner')
|
||||||
|
if raw_owner:
|
||||||
|
owner = self._decode_owner(raw_owner)
|
||||||
|
raise exc.JobFailure("Can not consume job %s"
|
||||||
|
" which is not owned by %s (it is"
|
||||||
|
" actively owned by %s)"
|
||||||
|
% (job.uuid, who, owner))
|
||||||
|
else:
|
||||||
|
raise exc.JobFailure("Can not consume job %s"
|
||||||
|
" which is not owned by %s"
|
||||||
|
% (job.uuid, who))
|
||||||
|
else:
|
||||||
|
raise exc.JobFailure("Failure to consume job %s,"
|
||||||
|
" unknown internal error (reason=%s)"
|
||||||
|
% (job.uuid, reason))
|
||||||
|
|
||||||
|
@base.check_who
|
||||||
|
def claim(self, job, who, expiry=None):
|
||||||
|
if expiry is None:
|
||||||
|
# On the lua side none doesn't translate to nil so we have
|
||||||
|
# do to this string conversion to make sure that we can tell
|
||||||
|
# the difference.
|
||||||
|
ms_expiry = "none"
|
||||||
|
else:
|
||||||
|
ms_expiry = int(expiry * 1000.0)
|
||||||
|
if ms_expiry <= 0:
|
||||||
|
raise ValueError("Provided expiry (when converted to"
|
||||||
|
" milliseconds) must be greater"
|
||||||
|
" than zero instead of %s" % (expiry))
|
||||||
|
script = self._get_script('claim')
|
||||||
|
with _translate_failures():
|
||||||
|
raw_who = self._encode_owner(who)
|
||||||
|
raw_result = script(keys=[job.owner_key, self.listings_key,
|
||||||
|
job.last_modified_key],
|
||||||
|
args=[raw_who, job.key,
|
||||||
|
# NOTE(harlowja): we need to send this
|
||||||
|
# in as a blob (even if it's not
|
||||||
|
# set/used), since the format can not
|
||||||
|
# currently be created in lua...
|
||||||
|
self._dumps(timeutils.utcnow()),
|
||||||
|
ms_expiry])
|
||||||
|
result = self._loads(raw_result)
|
||||||
|
status = result['status']
|
||||||
|
if status != self.SCRIPT_STATUS_OK:
|
||||||
|
reason = result.get('reason')
|
||||||
|
if reason == self.SCRIPT_UNKNOWN_JOB:
|
||||||
|
raise exc.NotFound("Job %s not found to be"
|
||||||
|
" claimed" % (job.uuid))
|
||||||
|
elif reason == self.SCRIPT_ALREADY_CLAIMED:
|
||||||
|
raw_owner = result.get('owner')
|
||||||
|
if raw_owner:
|
||||||
|
owner = self._decode_owner(raw_owner)
|
||||||
|
raise exc.UnclaimableJob("Job %s already"
|
||||||
|
" claimed by %s"
|
||||||
|
% (job.uuid, owner))
|
||||||
|
else:
|
||||||
|
raise exc.UnclaimableJob("Job %s already"
|
||||||
|
" claimed" % (job.uuid))
|
||||||
|
else:
|
||||||
|
raise exc.JobFailure("Failure to claim job %s,"
|
||||||
|
" unknown internal error (reason=%s)"
|
||||||
|
% (job.uuid, reason))
|
||||||
|
|
||||||
|
@base.check_who
|
||||||
|
def abandon(self, job, who):
|
||||||
|
script = self._get_script('abandon')
|
||||||
|
with _translate_failures():
|
||||||
|
raw_who = self._encode_owner(who)
|
||||||
|
raw_result = script(keys=[job.owner_key, self.listings_key,
|
||||||
|
job.last_modified_key],
|
||||||
|
args=[raw_who, job.key,
|
||||||
|
self._dumps(timeutils.utcnow())])
|
||||||
|
result = self._loads(raw_result)
|
||||||
|
status = result.get('status')
|
||||||
|
if status != self.SCRIPT_STATUS_OK:
|
||||||
|
reason = result.get('reason')
|
||||||
|
if reason == self.SCRIPT_UNKNOWN_JOB:
|
||||||
|
raise exc.NotFound("Job %s not found to be"
|
||||||
|
" abandoned" % (job.uuid))
|
||||||
|
elif reason == self.SCRIPT_UNKNOWN_OWNER:
|
||||||
|
raise exc.NotFound("Can not abandon job %s"
|
||||||
|
" which we can not determine"
|
||||||
|
" the owner of" % (job.uuid))
|
||||||
|
elif reason == self.SCRIPT_NOT_EXPECTED_OWNER:
|
||||||
|
raw_owner = result.get('owner')
|
||||||
|
if raw_owner:
|
||||||
|
owner = self._decode_owner(raw_owner)
|
||||||
|
raise exc.JobFailure("Can not abandon job %s"
|
||||||
|
" which is not owned by %s (it is"
|
||||||
|
" actively owned by %s)"
|
||||||
|
% (job.uuid, who, owner))
|
||||||
|
else:
|
||||||
|
raise exc.JobFailure("Can not abandon job %s"
|
||||||
|
" which is not owned by %s"
|
||||||
|
% (job.uuid, who))
|
||||||
|
else:
|
||||||
|
raise exc.JobFailure("Failure to abandon job %s,"
|
||||||
|
" unknown internal"
|
||||||
|
" error (status=%s, reason=%s)"
|
||||||
|
% (job.uuid, status, reason))
|
||||||
|
|
||||||
|
def _get_script(self, name):
|
||||||
|
try:
|
||||||
|
return self._scripts[name]
|
||||||
|
except KeyError:
|
||||||
|
exc.raise_with_cause(exc.NotFound,
|
||||||
|
"Can not access %s script (has this"
|
||||||
|
" board been connected?)" % name)
|
||||||
|
|
||||||
|
@base.check_who
|
||||||
|
def trash(self, job, who):
|
||||||
|
script = self._get_script('trash')
|
||||||
|
with _translate_failures():
|
||||||
|
raw_who = self._encode_owner(who)
|
||||||
|
raw_result = script(keys=[job.owner_key, self.listings_key,
|
||||||
|
job.last_modified_key, self.trash_key],
|
||||||
|
args=[raw_who, job.key,
|
||||||
|
self._dumps(timeutils.utcnow())])
|
||||||
|
result = self._loads(raw_result)
|
||||||
|
status = result['status']
|
||||||
|
if status != self.SCRIPT_STATUS_OK:
|
||||||
|
reason = result.get('reason')
|
||||||
|
if reason == self.SCRIPT_UNKNOWN_JOB:
|
||||||
|
raise exc.NotFound("Job %s not found to be"
|
||||||
|
" trashed" % (job.uuid))
|
||||||
|
elif reason == self.SCRIPT_UNKNOWN_OWNER:
|
||||||
|
raise exc.NotFound("Can not trash job %s"
|
||||||
|
" which we can not determine"
|
||||||
|
" the owner of" % (job.uuid))
|
||||||
|
elif reason == self.SCRIPT_NOT_EXPECTED_OWNER:
|
||||||
|
raw_owner = result.get('owner')
|
||||||
|
if raw_owner:
|
||||||
|
owner = self._decode_owner(raw_owner)
|
||||||
|
raise exc.JobFailure("Can not trash job %s"
|
||||||
|
" which is not owned by %s (it is"
|
||||||
|
" actively owned by %s)"
|
||||||
|
% (job.uuid, who, owner))
|
||||||
|
else:
|
||||||
|
raise exc.JobFailure("Can not trash job %s"
|
||||||
|
" which is not owned by %s"
|
||||||
|
% (job.uuid, who))
|
||||||
|
else:
|
||||||
|
raise exc.JobFailure("Failure to trash job %s,"
|
||||||
|
" unknown internal error (reason=%s)"
|
||||||
|
% (job.uuid, reason))
|
||||||
@@ -41,37 +41,17 @@ from taskflow.utils import misc
|
|||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def check_who(meth):
|
|
||||||
"""Decorator that checks the expected owner type & value restrictions."""
|
|
||||||
|
|
||||||
@six.wraps(meth)
|
|
||||||
def wrapper(self, job, who, *args, **kwargs):
|
|
||||||
if not isinstance(who, six.string_types):
|
|
||||||
raise TypeError("Job applicant must be a string type")
|
|
||||||
if len(who) == 0:
|
|
||||||
raise ValueError("Job applicant must be non-empty")
|
|
||||||
return meth(self, job, who, *args, **kwargs)
|
|
||||||
|
|
||||||
return wrapper
|
|
||||||
|
|
||||||
|
|
||||||
class ZookeeperJob(base.Job):
|
class ZookeeperJob(base.Job):
|
||||||
"""A zookeeper job."""
|
"""A zookeeper job."""
|
||||||
|
|
||||||
def __init__(self, name, board, client, backend, path,
|
def __init__(self, board, name, client, path,
|
||||||
uuid=None, details=None, book=None, book_data=None,
|
uuid=None, details=None, book=None, book_data=None,
|
||||||
created_on=None):
|
created_on=None, backend=None):
|
||||||
super(ZookeeperJob, self).__init__(name, uuid=uuid, details=details)
|
super(ZookeeperJob, self).__init__(board, name,
|
||||||
self._board = board
|
uuid=uuid, details=details,
|
||||||
self._book = book
|
backend=backend,
|
||||||
if not book_data:
|
book=book, book_data=book_data)
|
||||||
book_data = {}
|
|
||||||
self._book_data = book_data
|
|
||||||
self._client = client
|
self._client = client
|
||||||
self._backend = backend
|
|
||||||
if all((self._book, self._book_data)):
|
|
||||||
raise ValueError("Only one of 'book_data' or 'book'"
|
|
||||||
" can be provided")
|
|
||||||
self._path = k_paths.normpath(path)
|
self._path = k_paths.normpath(path)
|
||||||
self._lock_path = self._path + board.LOCK_POSTFIX
|
self._lock_path = self._path + board.LOCK_POSTFIX
|
||||||
self._created_on = created_on
|
self._created_on = created_on
|
||||||
@@ -82,10 +62,12 @@ class ZookeeperJob(base.Job):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def lock_path(self):
|
def lock_path(self):
|
||||||
|
"""Path the job lock/claim and owner znode is stored."""
|
||||||
return self._lock_path
|
return self._lock_path
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def path(self):
|
def path(self):
|
||||||
|
"""Path the job data znode is stored."""
|
||||||
return self._path
|
return self._path
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@@ -154,28 +136,8 @@ class ZookeeperJob(base.Job):
|
|||||||
self._node_not_found = True
|
self._node_not_found = True
|
||||||
return self._created_on
|
return self._created_on
|
||||||
|
|
||||||
@property
|
|
||||||
def board(self):
|
|
||||||
return self._board
|
|
||||||
|
|
||||||
def _load_book(self):
|
|
||||||
book_uuid = self.book_uuid
|
|
||||||
if self._backend is not None and book_uuid is not None:
|
|
||||||
# TODO(harlowja): we are currently limited by assuming that the
|
|
||||||
# job posted has the same backend as this loader (to start this
|
|
||||||
# seems to be a ok assumption, and can be adjusted in the future
|
|
||||||
# if we determine there is a use-case for multi-backend loaders,
|
|
||||||
# aka a registry of loaders).
|
|
||||||
with contextlib.closing(self._backend.get_connection()) as conn:
|
|
||||||
return conn.get_logbook(book_uuid)
|
|
||||||
# No backend to fetch from or no uuid specified
|
|
||||||
return None
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def state(self):
|
def state(self):
|
||||||
return self._fetch_state()
|
|
||||||
|
|
||||||
def _fetch_state(self):
|
|
||||||
owner = self.board.find_owner(self)
|
owner = self.board.find_owner(self)
|
||||||
job_data = {}
|
job_data = {}
|
||||||
try:
|
try:
|
||||||
@@ -217,30 +179,6 @@ class ZookeeperJob(base.Job):
|
|||||||
def __hash__(self):
|
def __hash__(self):
|
||||||
return hash(self.path)
|
return hash(self.path)
|
||||||
|
|
||||||
@property
|
|
||||||
def book(self):
|
|
||||||
if self._book is None:
|
|
||||||
self._book = self._load_book()
|
|
||||||
return self._book
|
|
||||||
|
|
||||||
@property
|
|
||||||
def book_uuid(self):
|
|
||||||
if self._book:
|
|
||||||
return self._book.uuid
|
|
||||||
if self._book_data:
|
|
||||||
return self._book_data.get('uuid')
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
@property
|
|
||||||
def book_name(self):
|
|
||||||
if self._book:
|
|
||||||
return self._book.name
|
|
||||||
if self._book_data:
|
|
||||||
return self._book_data.get('name')
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
class ZookeeperJobBoardIterator(six.Iterator):
|
class ZookeeperJobBoardIterator(six.Iterator):
|
||||||
"""Iterator over a zookeeper jobboard that iterates over potential jobs.
|
"""Iterator over a zookeeper jobboard that iterates over potential jobs.
|
||||||
@@ -273,6 +211,7 @@ class ZookeeperJobBoardIterator(six.Iterator):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def board(self):
|
def board(self):
|
||||||
|
"""The board this iterator was created from."""
|
||||||
return self._board
|
return self._board
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
@@ -310,28 +249,32 @@ class ZookeeperJobBoardIterator(six.Iterator):
|
|||||||
|
|
||||||
|
|
||||||
class ZookeeperJobBoard(base.NotifyingJobBoard):
|
class ZookeeperJobBoard(base.NotifyingJobBoard):
|
||||||
"""A jobboard backend by zookeeper.
|
"""A jobboard backed by `zookeeper`_.
|
||||||
|
|
||||||
Powered by the `kazoo <http://kazoo.readthedocs.org/>`_ library.
|
Powered by the `kazoo <http://kazoo.readthedocs.org/>`_ library.
|
||||||
|
|
||||||
This jobboard creates *sequenced* persistent znodes in a directory in
|
This jobboard creates *sequenced* persistent znodes in a directory in
|
||||||
zookeeper (that directory defaults to ``/taskflow/jobs``) and uses
|
zookeeper and uses zookeeper watches to notify other jobboards of
|
||||||
zookeeper watches to notify other jobboards that the job which was posted
|
jobs which were posted using the :meth:`.post` method (this creates a
|
||||||
using the :meth:`.post` method (this creates a znode with contents/details
|
znode with job contents/details encoded in `json`_). The users of these
|
||||||
in json). The users of those jobboard(s) (potentially on disjoint sets of
|
jobboard(s) (potentially on disjoint sets of machines) can then iterate
|
||||||
machines) can then iterate over the available jobs and decide if they want
|
over the available jobs and decide if they want
|
||||||
to attempt to claim one of the jobs they have iterated over. If so they
|
to attempt to claim one of the jobs they have iterated over. If so they
|
||||||
will then attempt to contact zookeeper and they will attempt to create a
|
will then attempt to contact zookeeper and they will attempt to create a
|
||||||
ephemeral znode using the name of the persistent znode + ".lock" as a
|
ephemeral znode using the name of the persistent znode + ".lock" as a
|
||||||
postfix. If the entity trying to use the jobboard to :meth:`.claim` the
|
postfix. If the entity trying to use the jobboard to :meth:`.claim` the
|
||||||
job is able to create a ephemeral znode with that name then it will be
|
job is able to create a ephemeral znode with that name then it will be
|
||||||
allowed (and expected) to perform whatever *work* the contents of that
|
allowed (and expected) to perform whatever *work* the contents of that
|
||||||
job described. Once finished the ephemeral znode and persistent znode may
|
job described. Once the claiming entity is finished the ephemeral znode
|
||||||
be deleted (if successfully completed) in a single transaction or if not
|
and persistent znode will be deleted (if successfully completed) in a
|
||||||
successful (or the entity that claimed the znode dies) the ephemeral
|
single transaction. If the claiming entity is not successful (or the
|
||||||
znode will be released (either manually by using :meth:`.abandon` or
|
entity that claimed the znode dies) the ephemeral znode will be
|
||||||
automatically by zookeeper when the ephemeral node and associated session
|
released (either manually by using :meth:`.abandon` or automatically by
|
||||||
is deemed to have been lost).
|
zookeeper when the ephemeral node and associated session is deemed to
|
||||||
|
have been lost).
|
||||||
|
|
||||||
|
.. _zookeeper: http://zookeeper.apache.org/
|
||||||
|
.. _json: http://json.org/
|
||||||
"""
|
"""
|
||||||
|
|
||||||
#: Transaction support was added in 3.4.0 so we need at least that version.
|
#: Transaction support was added in 3.4.0 so we need at least that version.
|
||||||
@@ -366,11 +309,9 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
|||||||
self._path = path
|
self._path = path
|
||||||
self._trash_path = self._path.replace(k_paths.basename(self._path),
|
self._trash_path = self._path.replace(k_paths.basename(self._path),
|
||||||
self.TRASH_FOLDER)
|
self.TRASH_FOLDER)
|
||||||
# The backend to load the full logbooks from, since whats sent over
|
# The backend to load the full logbooks from, since what is sent over
|
||||||
# the zookeeper data connection is only the logbook uuid and name, and
|
# the data connection is only the logbook uuid and name, and not the
|
||||||
# not currently the full logbook (later when a zookeeper backend
|
# full logbook.
|
||||||
# appears we can likely optimize for that backend usage by directly
|
|
||||||
# reading from the path where the data is stored, if we want).
|
|
||||||
self._persistence = persistence
|
self._persistence = persistence
|
||||||
# Misc. internal details
|
# Misc. internal details
|
||||||
self._known_jobs = {}
|
self._known_jobs = {}
|
||||||
@@ -401,10 +342,12 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def path(self):
|
def path(self):
|
||||||
|
"""Path where all job znodes will be stored."""
|
||||||
return self._path
|
return self._path
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def trash_path(self):
|
def trash_path(self):
|
||||||
|
"""Path where all trashed job znodes will be stored."""
|
||||||
return self._trash_path
|
return self._trash_path
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@@ -476,8 +419,9 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
|||||||
# jobs information into the known job set (if it's already
|
# jobs information into the known job set (if it's already
|
||||||
# existing then just leave it alone).
|
# existing then just leave it alone).
|
||||||
if path not in self._known_jobs:
|
if path not in self._known_jobs:
|
||||||
job = ZookeeperJob(job_data['name'], self,
|
job = ZookeeperJob(self, job_data['name'],
|
||||||
self._client, self._persistence, path,
|
self._client, path,
|
||||||
|
backend=self._persistence,
|
||||||
uuid=job_data['uuid'],
|
uuid=job_data['uuid'],
|
||||||
book_data=job_data.get("book"),
|
book_data=job_data.get("book"),
|
||||||
details=job_data.get("details", {}),
|
details=job_data.get("details", {}),
|
||||||
@@ -536,46 +480,31 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
|||||||
self._process_child(path, request)
|
self._process_child(path, request)
|
||||||
|
|
||||||
def post(self, name, book=None, details=None):
|
def post(self, name, book=None, details=None):
|
||||||
|
|
||||||
def format_posting(job_uuid):
|
|
||||||
posting = {
|
|
||||||
'uuid': job_uuid,
|
|
||||||
'name': name,
|
|
||||||
}
|
|
||||||
if details:
|
|
||||||
posting['details'] = details
|
|
||||||
else:
|
|
||||||
posting['details'] = {}
|
|
||||||
if book is not None:
|
|
||||||
posting['book'] = {
|
|
||||||
'name': book.name,
|
|
||||||
'uuid': book.uuid,
|
|
||||||
}
|
|
||||||
return posting
|
|
||||||
|
|
||||||
# NOTE(harlowja): Jobs are not ephemeral, they will persist until they
|
# NOTE(harlowja): Jobs are not ephemeral, they will persist until they
|
||||||
# are consumed (this may change later, but seems safer to do this until
|
# are consumed (this may change later, but seems safer to do this until
|
||||||
# further notice).
|
# further notice).
|
||||||
job_uuid = uuidutils.generate_uuid()
|
job_uuid = uuidutils.generate_uuid()
|
||||||
|
job_posting = base.format_posting(job_uuid, name,
|
||||||
|
book=book, details=details)
|
||||||
|
raw_job_posting = misc.binary_encode(jsonutils.dumps(job_posting))
|
||||||
with self._wrap(job_uuid, None,
|
with self._wrap(job_uuid, None,
|
||||||
"Posting failure: %s", ensure_known=False):
|
fail_msg_tpl="Posting failure: %s",
|
||||||
job_posting = format_posting(job_uuid)
|
ensure_known=False):
|
||||||
job_posting = misc.binary_encode(jsonutils.dumps(job_posting))
|
|
||||||
job_path = self._client.create(self._job_base,
|
job_path = self._client.create(self._job_base,
|
||||||
value=job_posting,
|
value=raw_job_posting,
|
||||||
sequence=True,
|
sequence=True,
|
||||||
ephemeral=False)
|
ephemeral=False)
|
||||||
job = ZookeeperJob(name, self, self._client,
|
job = ZookeeperJob(self, name, self._client, job_path,
|
||||||
self._persistence, job_path,
|
backend=self._persistence,
|
||||||
book=book, details=details,
|
book=book, details=details, uuid=job_uuid,
|
||||||
uuid=job_uuid)
|
book_data=job_posting.get('book'))
|
||||||
with self._job_cond:
|
with self._job_cond:
|
||||||
self._known_jobs[job_path] = job
|
self._known_jobs[job_path] = job
|
||||||
self._job_cond.notify_all()
|
self._job_cond.notify_all()
|
||||||
self._emit(base.POSTED, details={'job': job})
|
self._emit(base.POSTED, details={'job': job})
|
||||||
return job
|
return job
|
||||||
|
|
||||||
@check_who
|
@base.check_who
|
||||||
def claim(self, job, who):
|
def claim(self, job, who):
|
||||||
def _unclaimable_try_find_owner(cause):
|
def _unclaimable_try_find_owner(cause):
|
||||||
try:
|
try:
|
||||||
@@ -589,7 +518,8 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
|||||||
excp.raise_with_cause(excp.UnclaimableJob,
|
excp.raise_with_cause(excp.UnclaimableJob,
|
||||||
message, cause=cause)
|
message, cause=cause)
|
||||||
|
|
||||||
with self._wrap(job.uuid, job.path, "Claiming failure: %s"):
|
with self._wrap(job.uuid, job.path,
|
||||||
|
fail_msg_tpl="Claiming failure: %s"):
|
||||||
# NOTE(harlowja): post as json which will allow for future changes
|
# NOTE(harlowja): post as json which will allow for future changes
|
||||||
# more easily than a raw string/text.
|
# more easily than a raw string/text.
|
||||||
value = jsonutils.dumps({
|
value = jsonutils.dumps({
|
||||||
@@ -653,7 +583,9 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
|||||||
excp.raise_with_cause(excp.JobFailure, fail_msg_tpl % (job_uuid))
|
excp.raise_with_cause(excp.JobFailure, fail_msg_tpl % (job_uuid))
|
||||||
|
|
||||||
def find_owner(self, job):
|
def find_owner(self, job):
|
||||||
with self._wrap(job.uuid, job.path, "Owner query failure: %s"):
|
with self._wrap(job.uuid, job.path,
|
||||||
|
fail_msg_tpl="Owner query failure: %s",
|
||||||
|
ensure_known=False):
|
||||||
try:
|
try:
|
||||||
self._client.sync(job.lock_path)
|
self._client.sync(job.lock_path)
|
||||||
raw_data, _lock_stat = self._client.get(job.lock_path)
|
raw_data, _lock_stat = self._client.get(job.lock_path)
|
||||||
@@ -669,14 +601,15 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
|||||||
return (misc.decode_json(lock_data), lock_stat,
|
return (misc.decode_json(lock_data), lock_stat,
|
||||||
misc.decode_json(job_data), job_stat)
|
misc.decode_json(job_data), job_stat)
|
||||||
|
|
||||||
@check_who
|
@base.check_who
|
||||||
def consume(self, job, who):
|
def consume(self, job, who):
|
||||||
with self._wrap(job.uuid, job.path, "Consumption failure: %s"):
|
with self._wrap(job.uuid, job.path,
|
||||||
|
fail_msg_tpl="Consumption failure: %s"):
|
||||||
try:
|
try:
|
||||||
owner_data = self._get_owner_and_data(job)
|
owner_data = self._get_owner_and_data(job)
|
||||||
lock_data, lock_stat, data, data_stat = owner_data
|
lock_data, lock_stat, data, data_stat = owner_data
|
||||||
except k_exceptions.NoNodeError:
|
except k_exceptions.NoNodeError:
|
||||||
excp.raise_with_cause(excp.JobFailure,
|
excp.raise_with_cause(excp.NotFound,
|
||||||
"Can not consume a job %s"
|
"Can not consume a job %s"
|
||||||
" which we can not determine"
|
" which we can not determine"
|
||||||
" the owner of" % (job.uuid))
|
" the owner of" % (job.uuid))
|
||||||
@@ -690,14 +623,15 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
|||||||
kazoo_utils.checked_commit(txn)
|
kazoo_utils.checked_commit(txn)
|
||||||
self._remove_job(job.path)
|
self._remove_job(job.path)
|
||||||
|
|
||||||
@check_who
|
@base.check_who
|
||||||
def abandon(self, job, who):
|
def abandon(self, job, who):
|
||||||
with self._wrap(job.uuid, job.path, "Abandonment failure: %s"):
|
with self._wrap(job.uuid, job.path,
|
||||||
|
fail_msg_tpl="Abandonment failure: %s"):
|
||||||
try:
|
try:
|
||||||
owner_data = self._get_owner_and_data(job)
|
owner_data = self._get_owner_and_data(job)
|
||||||
lock_data, lock_stat, data, data_stat = owner_data
|
lock_data, lock_stat, data, data_stat = owner_data
|
||||||
except k_exceptions.NoNodeError:
|
except k_exceptions.NoNodeError:
|
||||||
excp.raise_with_cause(excp.JobFailure,
|
excp.raise_with_cause(excp.NotFound,
|
||||||
"Can not abandon a job %s"
|
"Can not abandon a job %s"
|
||||||
" which we can not determine"
|
" which we can not determine"
|
||||||
" the owner of" % (job.uuid))
|
" the owner of" % (job.uuid))
|
||||||
@@ -709,14 +643,15 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
|||||||
txn.delete(job.lock_path, version=lock_stat.version)
|
txn.delete(job.lock_path, version=lock_stat.version)
|
||||||
kazoo_utils.checked_commit(txn)
|
kazoo_utils.checked_commit(txn)
|
||||||
|
|
||||||
@check_who
|
@base.check_who
|
||||||
def trash(self, job, who):
|
def trash(self, job, who):
|
||||||
with self._wrap(job.uuid, job.path, "Trash failure: %s"):
|
with self._wrap(job.uuid, job.path,
|
||||||
|
fail_msg_tpl="Trash failure: %s"):
|
||||||
try:
|
try:
|
||||||
owner_data = self._get_owner_and_data(job)
|
owner_data = self._get_owner_and_data(job)
|
||||||
lock_data, lock_stat, data, data_stat = owner_data
|
lock_data, lock_stat, data, data_stat = owner_data
|
||||||
except k_exceptions.NoNodeError:
|
except k_exceptions.NoNodeError:
|
||||||
excp.raise_with_cause(excp.JobFailure,
|
excp.raise_with_cause(excp.NotFound,
|
||||||
"Can not trash a job %s"
|
"Can not trash a job %s"
|
||||||
" which we can not determine"
|
" which we can not determine"
|
||||||
" the owner of" % (job.uuid))
|
" the owner of" % (job.uuid))
|
||||||
|
|||||||
@@ -16,6 +16,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
|
import contextlib
|
||||||
|
|
||||||
from oslo_utils import uuidutils
|
from oslo_utils import uuidutils
|
||||||
import six
|
import six
|
||||||
@@ -43,7 +44,9 @@ class Job(object):
|
|||||||
reverting...
|
reverting...
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, name, uuid=None, details=None):
|
def __init__(self, board, name,
|
||||||
|
uuid=None, details=None, backend=None,
|
||||||
|
book=None, book_data=None):
|
||||||
if uuid:
|
if uuid:
|
||||||
self._uuid = uuid
|
self._uuid = uuid
|
||||||
else:
|
else:
|
||||||
@@ -52,6 +55,12 @@ class Job(object):
|
|||||||
if not details:
|
if not details:
|
||||||
details = {}
|
details = {}
|
||||||
self._details = details
|
self._details = details
|
||||||
|
self._backend = backend
|
||||||
|
self._board = board
|
||||||
|
self._book = book
|
||||||
|
if not book_data:
|
||||||
|
book_data = {}
|
||||||
|
self._book_data = book_data
|
||||||
|
|
||||||
@abc.abstractproperty
|
@abc.abstractproperty
|
||||||
def last_modified(self):
|
def last_modified(self):
|
||||||
@@ -61,34 +70,47 @@ class Job(object):
|
|||||||
def created_on(self):
|
def created_on(self):
|
||||||
"""The datetime the job was created on."""
|
"""The datetime the job was created on."""
|
||||||
|
|
||||||
@abc.abstractproperty
|
@property
|
||||||
def board(self):
|
def board(self):
|
||||||
"""The board this job was posted on or was created from."""
|
"""The board this job was posted on or was created from."""
|
||||||
|
return self._board
|
||||||
|
|
||||||
@abc.abstractproperty
|
@abc.abstractproperty
|
||||||
def state(self):
|
def state(self):
|
||||||
"""The current state of this job."""
|
"""Access the current state of this job."""
|
||||||
|
pass
|
||||||
|
|
||||||
@abc.abstractproperty
|
@property
|
||||||
def book(self):
|
def book(self):
|
||||||
"""Logbook associated with this job.
|
"""Logbook associated with this job.
|
||||||
|
|
||||||
If no logbook is associated with this job, this property is None.
|
If no logbook is associated with this job, this property is None.
|
||||||
"""
|
"""
|
||||||
|
if self._book is None:
|
||||||
|
self._book = self._load_book()
|
||||||
|
return self._book
|
||||||
|
|
||||||
@abc.abstractproperty
|
@property
|
||||||
def book_uuid(self):
|
def book_uuid(self):
|
||||||
"""UUID of logbook associated with this job.
|
"""UUID of logbook associated with this job.
|
||||||
|
|
||||||
If no logbook is associated with this job, this property is None.
|
If no logbook is associated with this job, this property is None.
|
||||||
"""
|
"""
|
||||||
|
if self._book is not None:
|
||||||
|
return self._book.uuid
|
||||||
|
else:
|
||||||
|
return self._book_data.get('uuid')
|
||||||
|
|
||||||
@abc.abstractproperty
|
@property
|
||||||
def book_name(self):
|
def book_name(self):
|
||||||
"""Name of logbook associated with this job.
|
"""Name of logbook associated with this job.
|
||||||
|
|
||||||
If no logbook is associated with this job, this property is None.
|
If no logbook is associated with this job, this property is None.
|
||||||
"""
|
"""
|
||||||
|
if self._book is not None:
|
||||||
|
return self._book.name
|
||||||
|
else:
|
||||||
|
return self._book_data.get('name')
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def uuid(self):
|
def uuid(self):
|
||||||
@@ -105,10 +127,24 @@ class Job(object):
|
|||||||
"""The non-uniquely identifying name of this job."""
|
"""The non-uniquely identifying name of this job."""
|
||||||
return self._name
|
return self._name
|
||||||
|
|
||||||
|
def _load_book(self):
|
||||||
|
book_uuid = self.book_uuid
|
||||||
|
if self._backend is not None and book_uuid is not None:
|
||||||
|
# TODO(harlowja): we are currently limited by assuming that the
|
||||||
|
# job posted has the same backend as this loader (to start this
|
||||||
|
# seems to be a ok assumption, and can be adjusted in the future
|
||||||
|
# if we determine there is a use-case for multi-backend loaders,
|
||||||
|
# aka a registry of loaders).
|
||||||
|
with contextlib.closing(self._backend.get_connection()) as conn:
|
||||||
|
return conn.get_logbook(book_uuid)
|
||||||
|
# No backend to fetch from or no uuid specified
|
||||||
|
return None
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
"""Pretty formats the job into something *more* meaningful."""
|
"""Pretty formats the job into something *more* meaningful."""
|
||||||
return "%s %s (%s): %s" % (type(self).__name__,
|
return "%s: %s (uuid=%s, details=%s)" % (type(self).__name__,
|
||||||
self.name, self.uuid, self.details)
|
self.name, self.uuid,
|
||||||
|
self.details)
|
||||||
|
|
||||||
|
|
||||||
@six.add_metaclass(abc.ABCMeta)
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
@@ -312,3 +348,40 @@ class NotifyingJobBoard(JobBoard):
|
|||||||
def __init__(self, name, conf):
|
def __init__(self, name, conf):
|
||||||
super(NotifyingJobBoard, self).__init__(name, conf)
|
super(NotifyingJobBoard, self).__init__(name, conf)
|
||||||
self.notifier = notifier.Notifier()
|
self.notifier = notifier.Notifier()
|
||||||
|
|
||||||
|
|
||||||
|
# Internal helpers for usage by board implementations...
|
||||||
|
|
||||||
|
def check_who(meth):
|
||||||
|
|
||||||
|
@six.wraps(meth)
|
||||||
|
def wrapper(self, job, who, *args, **kwargs):
|
||||||
|
if not isinstance(who, six.string_types):
|
||||||
|
raise TypeError("Job applicant must be a string type")
|
||||||
|
if len(who) == 0:
|
||||||
|
raise ValueError("Job applicant must be non-empty")
|
||||||
|
return meth(self, job, who, *args, **kwargs)
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
def format_posting(uuid, name, created_on=None, last_modified=None,
|
||||||
|
details=None, book=None):
|
||||||
|
posting = {
|
||||||
|
'uuid': uuid,
|
||||||
|
'name': name,
|
||||||
|
}
|
||||||
|
if created_on is not None:
|
||||||
|
posting['created_on'] = created_on
|
||||||
|
if last_modified is not None:
|
||||||
|
posting['last_modified'] = last_modified
|
||||||
|
if details:
|
||||||
|
posting['details'] = details
|
||||||
|
else:
|
||||||
|
posting['details'] = {}
|
||||||
|
if book is not None:
|
||||||
|
posting['book'] = {
|
||||||
|
'name': book.name,
|
||||||
|
'uuid': book.uuid,
|
||||||
|
}
|
||||||
|
return posting
|
||||||
|
|||||||
@@ -18,20 +18,13 @@ import contextlib
|
|||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from kazoo.recipe import watchers
|
|
||||||
from oslo_utils import uuidutils
|
|
||||||
|
|
||||||
from taskflow import exceptions as excp
|
from taskflow import exceptions as excp
|
||||||
from taskflow.persistence.backends import impl_dir
|
from taskflow.persistence.backends import impl_dir
|
||||||
from taskflow import states
|
from taskflow import states
|
||||||
from taskflow.test import mock
|
|
||||||
from taskflow.tests import utils as test_utils
|
from taskflow.tests import utils as test_utils
|
||||||
from taskflow.utils import misc
|
|
||||||
from taskflow.utils import persistence_utils as p_utils
|
from taskflow.utils import persistence_utils as p_utils
|
||||||
from taskflow.utils import threading_utils
|
from taskflow.utils import threading_utils
|
||||||
|
|
||||||
FLUSH_PATH_TPL = '/taskflow/flush-test/%s'
|
|
||||||
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def connect_close(*args):
|
def connect_close(*args):
|
||||||
@@ -44,72 +37,20 @@ def connect_close(*args):
|
|||||||
a.close()
|
a.close()
|
||||||
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
|
||||||
def flush(client, path=None):
|
|
||||||
# This uses the linearity guarantee of zookeeper (and associated libraries)
|
|
||||||
# to create a temporary node, wait until a watcher notifies it's created,
|
|
||||||
# then yield back for more work, and then at the end of that work delete
|
|
||||||
# the created node. This ensures that the operations done in the yield
|
|
||||||
# of this context manager will be applied and all watchers will have fired
|
|
||||||
# before this context manager exits.
|
|
||||||
if not path:
|
|
||||||
path = FLUSH_PATH_TPL % uuidutils.generate_uuid()
|
|
||||||
created = threading.Event()
|
|
||||||
deleted = threading.Event()
|
|
||||||
|
|
||||||
def on_created(data, stat):
|
|
||||||
if stat is not None:
|
|
||||||
created.set()
|
|
||||||
return False # cause this watcher to cease to exist
|
|
||||||
|
|
||||||
def on_deleted(data, stat):
|
|
||||||
if stat is None:
|
|
||||||
deleted.set()
|
|
||||||
return False # cause this watcher to cease to exist
|
|
||||||
|
|
||||||
watchers.DataWatch(client, path, func=on_created)
|
|
||||||
client.create(path, makepath=True)
|
|
||||||
if not created.wait(test_utils.WAIT_TIMEOUT):
|
|
||||||
raise RuntimeError("Could not receive creation of %s in"
|
|
||||||
" the alloted timeout of %s seconds"
|
|
||||||
% (path, test_utils.WAIT_TIMEOUT))
|
|
||||||
try:
|
|
||||||
yield
|
|
||||||
finally:
|
|
||||||
watchers.DataWatch(client, path, func=on_deleted)
|
|
||||||
client.delete(path, recursive=True)
|
|
||||||
if not deleted.wait(test_utils.WAIT_TIMEOUT):
|
|
||||||
raise RuntimeError("Could not receive deletion of %s in"
|
|
||||||
" the alloted timeout of %s seconds"
|
|
||||||
% (path, test_utils.WAIT_TIMEOUT))
|
|
||||||
|
|
||||||
|
|
||||||
class BoardTestMixin(object):
|
class BoardTestMixin(object):
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def flush(self, client):
|
||||||
|
yield
|
||||||
|
|
||||||
|
def close_client(self, client):
|
||||||
|
pass
|
||||||
|
|
||||||
def test_connect(self):
|
def test_connect(self):
|
||||||
self.assertFalse(self.board.connected)
|
self.assertFalse(self.board.connected)
|
||||||
with connect_close(self.board):
|
with connect_close(self.board):
|
||||||
self.assertTrue(self.board.connected)
|
self.assertTrue(self.board.connected)
|
||||||
|
|
||||||
@mock.patch("taskflow.jobs.backends.impl_zookeeper.misc."
|
|
||||||
"millis_to_datetime")
|
|
||||||
def test_posting_dates(self, mock_dt):
|
|
||||||
epoch = misc.millis_to_datetime(0)
|
|
||||||
mock_dt.return_value = epoch
|
|
||||||
|
|
||||||
with connect_close(self.board):
|
|
||||||
j = self.board.post('test', p_utils.temporary_log_book())
|
|
||||||
self.assertEqual(epoch, j.created_on)
|
|
||||||
self.assertEqual(epoch, j.last_modified)
|
|
||||||
|
|
||||||
self.assertTrue(mock_dt.called)
|
|
||||||
|
|
||||||
def test_board_iter(self):
|
|
||||||
with connect_close(self.board):
|
|
||||||
it = self.board.iterjobs()
|
|
||||||
self.assertEqual(it.board, self.board)
|
|
||||||
self.assertFalse(it.only_unclaimed)
|
|
||||||
self.assertFalse(it.ensure_fresh)
|
|
||||||
|
|
||||||
def test_board_iter_empty(self):
|
def test_board_iter_empty(self):
|
||||||
with connect_close(self.board):
|
with connect_close(self.board):
|
||||||
jobs_found = list(self.board.iterjobs())
|
jobs_found = list(self.board.iterjobs())
|
||||||
@@ -155,7 +96,7 @@ class BoardTestMixin(object):
|
|||||||
def test_posting_claim(self):
|
def test_posting_claim(self):
|
||||||
|
|
||||||
with connect_close(self.board):
|
with connect_close(self.board):
|
||||||
with flush(self.client):
|
with self.flush(self.client):
|
||||||
self.board.post('test', p_utils.temporary_log_book())
|
self.board.post('test', p_utils.temporary_log_book())
|
||||||
|
|
||||||
self.assertEqual(1, self.board.job_count)
|
self.assertEqual(1, self.board.job_count)
|
||||||
@@ -164,7 +105,7 @@ class BoardTestMixin(object):
|
|||||||
j = possible_jobs[0]
|
j = possible_jobs[0]
|
||||||
self.assertEqual(states.UNCLAIMED, j.state)
|
self.assertEqual(states.UNCLAIMED, j.state)
|
||||||
|
|
||||||
with flush(self.client):
|
with self.flush(self.client):
|
||||||
self.board.claim(j, self.board.name)
|
self.board.claim(j, self.board.name)
|
||||||
|
|
||||||
self.assertEqual(self.board.name, self.board.find_owner(j))
|
self.assertEqual(self.board.name, self.board.find_owner(j))
|
||||||
@@ -173,25 +114,24 @@ class BoardTestMixin(object):
|
|||||||
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
||||||
self.assertEqual(0, len(possible_jobs))
|
self.assertEqual(0, len(possible_jobs))
|
||||||
|
|
||||||
self.assertRaisesAttrAccess(excp.NotFound, j, 'state')
|
self.close_client(self.client)
|
||||||
self.assertRaises(excp.NotFound,
|
self.assertRaisesAttrAccess(excp.JobFailure, j, 'state')
|
||||||
self.board.consume, j, self.board.name)
|
|
||||||
|
|
||||||
def test_posting_claim_consume(self):
|
def test_posting_claim_consume(self):
|
||||||
|
|
||||||
with connect_close(self.board):
|
with connect_close(self.board):
|
||||||
with flush(self.client):
|
with self.flush(self.client):
|
||||||
self.board.post('test', p_utils.temporary_log_book())
|
self.board.post('test', p_utils.temporary_log_book())
|
||||||
|
|
||||||
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
||||||
self.assertEqual(1, len(possible_jobs))
|
self.assertEqual(1, len(possible_jobs))
|
||||||
j = possible_jobs[0]
|
j = possible_jobs[0]
|
||||||
with flush(self.client):
|
with self.flush(self.client):
|
||||||
self.board.claim(j, self.board.name)
|
self.board.claim(j, self.board.name)
|
||||||
|
|
||||||
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
||||||
self.assertEqual(0, len(possible_jobs))
|
self.assertEqual(0, len(possible_jobs))
|
||||||
with flush(self.client):
|
with self.flush(self.client):
|
||||||
self.board.consume(j, self.board.name)
|
self.board.consume(j, self.board.name)
|
||||||
|
|
||||||
self.assertEqual(0, len(list(self.board.iterjobs())))
|
self.assertEqual(0, len(list(self.board.iterjobs())))
|
||||||
@@ -201,18 +141,18 @@ class BoardTestMixin(object):
|
|||||||
def test_posting_claim_abandon(self):
|
def test_posting_claim_abandon(self):
|
||||||
|
|
||||||
with connect_close(self.board):
|
with connect_close(self.board):
|
||||||
with flush(self.client):
|
with self.flush(self.client):
|
||||||
self.board.post('test', p_utils.temporary_log_book())
|
self.board.post('test', p_utils.temporary_log_book())
|
||||||
|
|
||||||
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
||||||
self.assertEqual(1, len(possible_jobs))
|
self.assertEqual(1, len(possible_jobs))
|
||||||
j = possible_jobs[0]
|
j = possible_jobs[0]
|
||||||
with flush(self.client):
|
with self.flush(self.client):
|
||||||
self.board.claim(j, self.board.name)
|
self.board.claim(j, self.board.name)
|
||||||
|
|
||||||
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
||||||
self.assertEqual(0, len(possible_jobs))
|
self.assertEqual(0, len(possible_jobs))
|
||||||
with flush(self.client):
|
with self.flush(self.client):
|
||||||
self.board.abandon(j, self.board.name)
|
self.board.abandon(j, self.board.name)
|
||||||
|
|
||||||
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
||||||
@@ -221,12 +161,12 @@ class BoardTestMixin(object):
|
|||||||
def test_posting_claim_diff_owner(self):
|
def test_posting_claim_diff_owner(self):
|
||||||
|
|
||||||
with connect_close(self.board):
|
with connect_close(self.board):
|
||||||
with flush(self.client):
|
with self.flush(self.client):
|
||||||
self.board.post('test', p_utils.temporary_log_book())
|
self.board.post('test', p_utils.temporary_log_book())
|
||||||
|
|
||||||
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
||||||
self.assertEqual(1, len(possible_jobs))
|
self.assertEqual(1, len(possible_jobs))
|
||||||
with flush(self.client):
|
with self.flush(self.client):
|
||||||
self.board.claim(possible_jobs[0], self.board.name)
|
self.board.claim(possible_jobs[0], self.board.name)
|
||||||
|
|
||||||
possible_jobs = list(self.board.iterjobs())
|
possible_jobs = list(self.board.iterjobs())
|
||||||
@@ -236,14 +176,6 @@ class BoardTestMixin(object):
|
|||||||
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
||||||
self.assertEqual(0, len(possible_jobs))
|
self.assertEqual(0, len(possible_jobs))
|
||||||
|
|
||||||
def test_posting_no_post(self):
|
|
||||||
with connect_close(self.board):
|
|
||||||
with mock.patch.object(self.client, 'create') as create_func:
|
|
||||||
create_func.side_effect = IOError("Unable to post")
|
|
||||||
self.assertRaises(IOError, self.board.post,
|
|
||||||
'test', p_utils.temporary_log_book())
|
|
||||||
self.assertEqual(0, self.board.job_count)
|
|
||||||
|
|
||||||
def test_posting_with_book(self):
|
def test_posting_with_book(self):
|
||||||
backend = impl_dir.DirBackend(conf={
|
backend = impl_dir.DirBackend(conf={
|
||||||
'path': self.makeTmpDir(),
|
'path': self.makeTmpDir(),
|
||||||
@@ -252,9 +184,9 @@ class BoardTestMixin(object):
|
|||||||
book, flow_detail = p_utils.temporary_flow_detail(backend)
|
book, flow_detail = p_utils.temporary_flow_detail(backend)
|
||||||
self.assertEqual(1, len(book))
|
self.assertEqual(1, len(book))
|
||||||
|
|
||||||
client, board = self._create_board(persistence=backend)
|
client, board = self.create_board(persistence=backend)
|
||||||
with connect_close(board):
|
with connect_close(board):
|
||||||
with flush(client):
|
with self.flush(client):
|
||||||
board.post('test', book)
|
board.post('test', book)
|
||||||
|
|
||||||
possible_jobs = list(board.iterjobs(only_unclaimed=True))
|
possible_jobs = list(board.iterjobs(only_unclaimed=True))
|
||||||
@@ -273,11 +205,12 @@ class BoardTestMixin(object):
|
|||||||
def test_posting_abandon_no_owner(self):
|
def test_posting_abandon_no_owner(self):
|
||||||
|
|
||||||
with connect_close(self.board):
|
with connect_close(self.board):
|
||||||
with flush(self.client):
|
with self.flush(self.client):
|
||||||
self.board.post('test', p_utils.temporary_log_book())
|
self.board.post('test', p_utils.temporary_log_book())
|
||||||
|
|
||||||
self.assertEqual(1, self.board.job_count)
|
self.assertEqual(1, self.board.job_count)
|
||||||
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
||||||
self.assertEqual(1, len(possible_jobs))
|
self.assertEqual(1, len(possible_jobs))
|
||||||
j = possible_jobs[0]
|
j = possible_jobs[0]
|
||||||
self.assertRaises(excp.JobFailure, self.board.abandon, j, j.name)
|
self.assertRaises(excp.NotFound, self.board.abandon,
|
||||||
|
j, j.name)
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ import contextlib
|
|||||||
from zake import fake_client
|
from zake import fake_client
|
||||||
|
|
||||||
from taskflow.jobs import backends
|
from taskflow.jobs import backends
|
||||||
|
from taskflow.jobs.backends import impl_redis
|
||||||
from taskflow.jobs.backends import impl_zookeeper
|
from taskflow.jobs.backends import impl_zookeeper
|
||||||
from taskflow import test
|
from taskflow import test
|
||||||
|
|
||||||
@@ -47,3 +48,15 @@ class BackendFetchingTest(test.TestCase):
|
|||||||
with contextlib.closing(backends.fetch('test', conf, **kwargs)) as be:
|
with contextlib.closing(backends.fetch('test', conf, **kwargs)) as be:
|
||||||
self.assertIsInstance(be, impl_zookeeper.ZookeeperJobBoard)
|
self.assertIsInstance(be, impl_zookeeper.ZookeeperJobBoard)
|
||||||
self.assertIs(existing_client, be._client)
|
self.assertIs(existing_client, be._client)
|
||||||
|
|
||||||
|
def test_redis_entry_point_text(self):
|
||||||
|
conf = 'redis'
|
||||||
|
with contextlib.closing(backends.fetch('test', conf)) as be:
|
||||||
|
self.assertIsInstance(be, impl_redis.RedisJobBoard)
|
||||||
|
|
||||||
|
def test_redis_entry_point(self):
|
||||||
|
conf = {
|
||||||
|
'board': 'redis',
|
||||||
|
}
|
||||||
|
with contextlib.closing(backends.fetch('test', conf)) as be:
|
||||||
|
self.assertIsInstance(be, impl_redis.RedisJobBoard)
|
||||||
|
|||||||
81
taskflow/tests/unit/jobs/test_redis_job.py
Normal file
81
taskflow/tests/unit/jobs/test_redis_job.py
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
from oslo_utils import uuidutils
|
||||||
|
import six
|
||||||
|
import testtools
|
||||||
|
|
||||||
|
from taskflow.jobs.backends import impl_redis
|
||||||
|
from taskflow import states
|
||||||
|
from taskflow import test
|
||||||
|
from taskflow.tests.unit.jobs import base
|
||||||
|
from taskflow.tests import utils as test_utils
|
||||||
|
from taskflow.utils import persistence_utils as p_utils
|
||||||
|
from taskflow.utils import redis_utils as ru
|
||||||
|
|
||||||
|
|
||||||
|
REDIS_AVAILABLE = test_utils.redis_available(
|
||||||
|
impl_redis.RedisJobBoard.MIN_REDIS_VERSION)
|
||||||
|
|
||||||
|
|
||||||
|
@testtools.skipIf(not REDIS_AVAILABLE, 'redis is not available')
|
||||||
|
class RedisJobboardTest(test.TestCase, base.BoardTestMixin):
|
||||||
|
def close_client(self, client):
|
||||||
|
client.close()
|
||||||
|
|
||||||
|
def create_board(self, persistence=None):
|
||||||
|
namespace = uuidutils.generate_uuid()
|
||||||
|
client = ru.RedisClient()
|
||||||
|
config = {
|
||||||
|
'namespace': six.b("taskflow-%s" % namespace),
|
||||||
|
}
|
||||||
|
kwargs = {
|
||||||
|
'client': client,
|
||||||
|
'persistence': persistence,
|
||||||
|
}
|
||||||
|
board = impl_redis.RedisJobBoard('test-board', config, **kwargs)
|
||||||
|
self.addCleanup(board.close)
|
||||||
|
self.addCleanup(self.close_client, client)
|
||||||
|
return (client, board)
|
||||||
|
|
||||||
|
def test_posting_claim_expiry(self):
|
||||||
|
|
||||||
|
with base.connect_close(self.board):
|
||||||
|
with self.flush(self.client):
|
||||||
|
self.board.post('test', p_utils.temporary_log_book())
|
||||||
|
|
||||||
|
self.assertEqual(1, self.board.job_count)
|
||||||
|
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
||||||
|
self.assertEqual(1, len(possible_jobs))
|
||||||
|
j = possible_jobs[0]
|
||||||
|
self.assertEqual(states.UNCLAIMED, j.state)
|
||||||
|
|
||||||
|
with self.flush(self.client):
|
||||||
|
self.board.claim(j, self.board.name, expiry=0.5)
|
||||||
|
|
||||||
|
self.assertEqual(self.board.name, self.board.find_owner(j))
|
||||||
|
self.assertEqual(states.CLAIMED, j.state)
|
||||||
|
|
||||||
|
time.sleep(0.6)
|
||||||
|
self.assertEqual(states.UNCLAIMED, j.state)
|
||||||
|
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
||||||
|
self.assertEqual(1, len(possible_jobs))
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(RedisJobboardTest, self).setUp()
|
||||||
|
self.client, self.board = self.create_board()
|
||||||
@@ -14,6 +14,10 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
import threading
|
||||||
|
|
||||||
|
from kazoo.recipe import watchers
|
||||||
from oslo_serialization import jsonutils
|
from oslo_serialization import jsonutils
|
||||||
from oslo_utils import uuidutils
|
from oslo_utils import uuidutils
|
||||||
import six
|
import six
|
||||||
@@ -24,13 +28,14 @@ from zake import utils as zake_utils
|
|||||||
from taskflow.jobs.backends import impl_zookeeper
|
from taskflow.jobs.backends import impl_zookeeper
|
||||||
from taskflow import states
|
from taskflow import states
|
||||||
from taskflow import test
|
from taskflow import test
|
||||||
|
from taskflow.test import mock
|
||||||
from taskflow.tests.unit.jobs import base
|
from taskflow.tests.unit.jobs import base
|
||||||
from taskflow.tests import utils as test_utils
|
from taskflow.tests import utils as test_utils
|
||||||
from taskflow.utils import kazoo_utils
|
from taskflow.utils import kazoo_utils
|
||||||
from taskflow.utils import misc
|
from taskflow.utils import misc
|
||||||
from taskflow.utils import persistence_utils as p_utils
|
from taskflow.utils import persistence_utils as p_utils
|
||||||
|
|
||||||
|
FLUSH_PATH_TPL = '/taskflow/flush-test/%s'
|
||||||
TEST_PATH_TPL = '/taskflow/board-test/%s'
|
TEST_PATH_TPL = '/taskflow/board-test/%s'
|
||||||
ZOOKEEPER_AVAILABLE = test_utils.zookeeper_available(
|
ZOOKEEPER_AVAILABLE = test_utils.zookeeper_available(
|
||||||
impl_zookeeper.ZookeeperJobBoard.MIN_ZK_VERSION)
|
impl_zookeeper.ZookeeperJobBoard.MIN_ZK_VERSION)
|
||||||
@@ -38,9 +43,81 @@ TRASH_FOLDER = impl_zookeeper.ZookeeperJobBoard.TRASH_FOLDER
|
|||||||
LOCK_POSTFIX = impl_zookeeper.ZookeeperJobBoard.LOCK_POSTFIX
|
LOCK_POSTFIX = impl_zookeeper.ZookeeperJobBoard.LOCK_POSTFIX
|
||||||
|
|
||||||
|
|
||||||
|
class ZookeeperBoardTestMixin(base.BoardTestMixin):
|
||||||
|
def close_client(self, client):
|
||||||
|
kazoo_utils.finalize_client(client)
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def flush(self, client, path=None):
|
||||||
|
# This uses the linearity guarantee of zookeeper (and associated
|
||||||
|
# libraries) to create a temporary node, wait until a watcher notifies
|
||||||
|
# it's created, then yield back for more work, and then at the end of
|
||||||
|
# that work delete the created node. This ensures that the operations
|
||||||
|
# done in the yield of this context manager will be applied and all
|
||||||
|
# watchers will have fired before this context manager exits.
|
||||||
|
if not path:
|
||||||
|
path = FLUSH_PATH_TPL % uuidutils.generate_uuid()
|
||||||
|
created = threading.Event()
|
||||||
|
deleted = threading.Event()
|
||||||
|
|
||||||
|
def on_created(data, stat):
|
||||||
|
if stat is not None:
|
||||||
|
created.set()
|
||||||
|
return False # cause this watcher to cease to exist
|
||||||
|
|
||||||
|
def on_deleted(data, stat):
|
||||||
|
if stat is None:
|
||||||
|
deleted.set()
|
||||||
|
return False # cause this watcher to cease to exist
|
||||||
|
|
||||||
|
watchers.DataWatch(client, path, func=on_created)
|
||||||
|
client.create(path, makepath=True)
|
||||||
|
if not created.wait(test_utils.WAIT_TIMEOUT):
|
||||||
|
raise RuntimeError("Could not receive creation of %s in"
|
||||||
|
" the alloted timeout of %s seconds"
|
||||||
|
% (path, test_utils.WAIT_TIMEOUT))
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
watchers.DataWatch(client, path, func=on_deleted)
|
||||||
|
client.delete(path, recursive=True)
|
||||||
|
if not deleted.wait(test_utils.WAIT_TIMEOUT):
|
||||||
|
raise RuntimeError("Could not receive deletion of %s in"
|
||||||
|
" the alloted timeout of %s seconds"
|
||||||
|
% (path, test_utils.WAIT_TIMEOUT))
|
||||||
|
|
||||||
|
def test_posting_no_post(self):
|
||||||
|
with base.connect_close(self.board):
|
||||||
|
with mock.patch.object(self.client, 'create') as create_func:
|
||||||
|
create_func.side_effect = IOError("Unable to post")
|
||||||
|
self.assertRaises(IOError, self.board.post,
|
||||||
|
'test', p_utils.temporary_log_book())
|
||||||
|
self.assertEqual(0, self.board.job_count)
|
||||||
|
|
||||||
|
def test_board_iter(self):
|
||||||
|
with base.connect_close(self.board):
|
||||||
|
it = self.board.iterjobs()
|
||||||
|
self.assertEqual(it.board, self.board)
|
||||||
|
self.assertFalse(it.only_unclaimed)
|
||||||
|
self.assertFalse(it.ensure_fresh)
|
||||||
|
|
||||||
|
@mock.patch("taskflow.jobs.backends.impl_zookeeper.misc."
|
||||||
|
"millis_to_datetime")
|
||||||
|
def test_posting_dates(self, mock_dt):
|
||||||
|
epoch = misc.millis_to_datetime(0)
|
||||||
|
mock_dt.return_value = epoch
|
||||||
|
|
||||||
|
with base.connect_close(self.board):
|
||||||
|
j = self.board.post('test', p_utils.temporary_log_book())
|
||||||
|
self.assertEqual(epoch, j.created_on)
|
||||||
|
self.assertEqual(epoch, j.last_modified)
|
||||||
|
|
||||||
|
self.assertTrue(mock_dt.called)
|
||||||
|
|
||||||
|
|
||||||
@testtools.skipIf(not ZOOKEEPER_AVAILABLE, 'zookeeper is not available')
|
@testtools.skipIf(not ZOOKEEPER_AVAILABLE, 'zookeeper is not available')
|
||||||
class ZookeeperJobboardTest(test.TestCase, base.BoardTestMixin):
|
class ZookeeperJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
|
||||||
def _create_board(self, persistence=None):
|
def create_board(self, persistence=None):
|
||||||
|
|
||||||
def cleanup_path(client, path):
|
def cleanup_path(client, path):
|
||||||
if not client.connected:
|
if not client.connected:
|
||||||
@@ -52,39 +129,39 @@ class ZookeeperJobboardTest(test.TestCase, base.BoardTestMixin):
|
|||||||
board = impl_zookeeper.ZookeeperJobBoard('test-board', {'path': path},
|
board = impl_zookeeper.ZookeeperJobBoard('test-board', {'path': path},
|
||||||
client=client,
|
client=client,
|
||||||
persistence=persistence)
|
persistence=persistence)
|
||||||
self.addCleanup(kazoo_utils.finalize_client, client)
|
self.addCleanup(self.close_client, client)
|
||||||
self.addCleanup(cleanup_path, client, path)
|
self.addCleanup(cleanup_path, client, path)
|
||||||
self.addCleanup(board.close)
|
self.addCleanup(board.close)
|
||||||
return (client, board)
|
return (client, board)
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(ZookeeperJobboardTest, self).setUp()
|
super(ZookeeperJobboardTest, self).setUp()
|
||||||
self.client, self.board = self._create_board()
|
self.client, self.board = self.create_board()
|
||||||
|
|
||||||
|
|
||||||
class ZakeJobboardTest(test.TestCase, base.BoardTestMixin):
|
class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
|
||||||
def _create_board(self, persistence=None):
|
def create_board(self, persistence=None):
|
||||||
client = fake_client.FakeClient()
|
client = fake_client.FakeClient()
|
||||||
board = impl_zookeeper.ZookeeperJobBoard('test-board', {},
|
board = impl_zookeeper.ZookeeperJobBoard('test-board', {},
|
||||||
client=client,
|
client=client,
|
||||||
persistence=persistence)
|
persistence=persistence)
|
||||||
self.addCleanup(board.close)
|
self.addCleanup(board.close)
|
||||||
self.addCleanup(kazoo_utils.finalize_client, client)
|
self.addCleanup(self.close_client, client)
|
||||||
return (client, board)
|
return (client, board)
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(ZakeJobboardTest, self).setUp()
|
super(ZakeJobboardTest, self).setUp()
|
||||||
self.client, self.board = self._create_board()
|
self.client, self.board = self.create_board()
|
||||||
self.bad_paths = [self.board.path, self.board.trash_path]
|
self.bad_paths = [self.board.path, self.board.trash_path]
|
||||||
self.bad_paths.extend(zake_utils.partition_path(self.board.path))
|
self.bad_paths.extend(zake_utils.partition_path(self.board.path))
|
||||||
|
|
||||||
def test_posting_owner_lost(self):
|
def test_posting_owner_lost(self):
|
||||||
|
|
||||||
with base.connect_close(self.board):
|
with base.connect_close(self.board):
|
||||||
with base.flush(self.client):
|
with self.flush(self.client):
|
||||||
j = self.board.post('test', p_utils.temporary_log_book())
|
j = self.board.post('test', p_utils.temporary_log_book())
|
||||||
self.assertEqual(states.UNCLAIMED, j.state)
|
self.assertEqual(states.UNCLAIMED, j.state)
|
||||||
with base.flush(self.client):
|
with self.flush(self.client):
|
||||||
self.board.claim(j, self.board.name)
|
self.board.claim(j, self.board.name)
|
||||||
self.assertEqual(states.CLAIMED, j.state)
|
self.assertEqual(states.CLAIMED, j.state)
|
||||||
|
|
||||||
@@ -102,10 +179,10 @@ class ZakeJobboardTest(test.TestCase, base.BoardTestMixin):
|
|||||||
def test_posting_state_lock_lost(self):
|
def test_posting_state_lock_lost(self):
|
||||||
|
|
||||||
with base.connect_close(self.board):
|
with base.connect_close(self.board):
|
||||||
with base.flush(self.client):
|
with self.flush(self.client):
|
||||||
j = self.board.post('test', p_utils.temporary_log_book())
|
j = self.board.post('test', p_utils.temporary_log_book())
|
||||||
self.assertEqual(states.UNCLAIMED, j.state)
|
self.assertEqual(states.UNCLAIMED, j.state)
|
||||||
with base.flush(self.client):
|
with self.flush(self.client):
|
||||||
self.board.claim(j, self.board.name)
|
self.board.claim(j, self.board.name)
|
||||||
self.assertEqual(states.CLAIMED, j.state)
|
self.assertEqual(states.CLAIMED, j.state)
|
||||||
|
|
||||||
@@ -123,14 +200,14 @@ class ZakeJobboardTest(test.TestCase, base.BoardTestMixin):
|
|||||||
def test_trashing_claimed_job(self):
|
def test_trashing_claimed_job(self):
|
||||||
|
|
||||||
with base.connect_close(self.board):
|
with base.connect_close(self.board):
|
||||||
with base.flush(self.client):
|
with self.flush(self.client):
|
||||||
j = self.board.post('test', p_utils.temporary_log_book())
|
j = self.board.post('test', p_utils.temporary_log_book())
|
||||||
self.assertEqual(states.UNCLAIMED, j.state)
|
self.assertEqual(states.UNCLAIMED, j.state)
|
||||||
with base.flush(self.client):
|
with self.flush(self.client):
|
||||||
self.board.claim(j, self.board.name)
|
self.board.claim(j, self.board.name)
|
||||||
self.assertEqual(states.CLAIMED, j.state)
|
self.assertEqual(states.CLAIMED, j.state)
|
||||||
|
|
||||||
with base.flush(self.client):
|
with self.flush(self.client):
|
||||||
self.board.trash(j, self.board.name)
|
self.board.trash(j, self.board.name)
|
||||||
|
|
||||||
trashed = []
|
trashed = []
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ import contextlib
|
|||||||
import string
|
import string
|
||||||
import threading
|
import threading
|
||||||
|
|
||||||
|
import redis
|
||||||
import six
|
import six
|
||||||
|
|
||||||
from taskflow import exceptions
|
from taskflow import exceptions
|
||||||
@@ -27,6 +28,7 @@ from taskflow import retry
|
|||||||
from taskflow import task
|
from taskflow import task
|
||||||
from taskflow.types import failure
|
from taskflow.types import failure
|
||||||
from taskflow.utils import kazoo_utils
|
from taskflow.utils import kazoo_utils
|
||||||
|
from taskflow.utils import redis_utils
|
||||||
|
|
||||||
ARGS_KEY = '__args__'
|
ARGS_KEY = '__args__'
|
||||||
KWARGS_KEY = '__kwargs__'
|
KWARGS_KEY = '__kwargs__'
|
||||||
@@ -73,6 +75,18 @@ def zookeeper_available(min_version, timeout=3):
|
|||||||
kazoo_utils.finalize_client(client)
|
kazoo_utils.finalize_client(client)
|
||||||
|
|
||||||
|
|
||||||
|
def redis_available(min_version):
|
||||||
|
client = redis.StrictRedis()
|
||||||
|
try:
|
||||||
|
client.ping()
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
ok, redis_version = redis_utils.is_server_new_enough(client,
|
||||||
|
min_version)
|
||||||
|
return ok
|
||||||
|
|
||||||
|
|
||||||
class NoopRetry(retry.AlwaysRevert):
|
class NoopRetry(retry.AlwaysRevert):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ import types
|
|||||||
|
|
||||||
import enum
|
import enum
|
||||||
from oslo_serialization import jsonutils
|
from oslo_serialization import jsonutils
|
||||||
|
from oslo_serialization import msgpackutils
|
||||||
from oslo_utils import encodeutils
|
from oslo_utils import encodeutils
|
||||||
from oslo_utils import importutils
|
from oslo_utils import importutils
|
||||||
from oslo_utils import netutils
|
from oslo_utils import netutils
|
||||||
@@ -285,19 +286,7 @@ def binary_decode(data, encoding='utf-8', errors='strict'):
|
|||||||
errors=errors)
|
errors=errors)
|
||||||
|
|
||||||
|
|
||||||
def decode_json(raw_data, root_types=(dict,)):
|
def _check_decoded_type(data, root_types=(dict,)):
|
||||||
"""Parse raw data to get JSON object.
|
|
||||||
|
|
||||||
Decodes a JSON from a given raw data binary and checks that the root
|
|
||||||
type of that decoded object is in the allowed set of types (by
|
|
||||||
default a JSON object/dict should be the root type).
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
data = jsonutils.loads(binary_decode(raw_data))
|
|
||||||
except UnicodeDecodeError as e:
|
|
||||||
raise ValueError("Expected UTF-8 decodable data: %s" % e)
|
|
||||||
except ValueError as e:
|
|
||||||
raise ValueError("Expected JSON decodable data: %s" % e)
|
|
||||||
if root_types:
|
if root_types:
|
||||||
if not isinstance(root_types, tuple):
|
if not isinstance(root_types, tuple):
|
||||||
root_types = tuple(root_types)
|
root_types = tuple(root_types)
|
||||||
@@ -312,6 +301,40 @@ def decode_json(raw_data, root_types=(dict,)):
|
|||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
def decode_msgpack(raw_data, root_types=(dict,)):
|
||||||
|
"""Parse raw data to get decoded object.
|
||||||
|
|
||||||
|
Decodes a msgback encoded 'blob' from a given raw data binary string and
|
||||||
|
checks that the root type of that decoded object is in the allowed set of
|
||||||
|
types (by default a dict should be the root type).
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
data = msgpackutils.loads(raw_data)
|
||||||
|
except Exception as e:
|
||||||
|
# TODO(harlowja): fix this when msgpackutils exposes the msgpack
|
||||||
|
# exceptions so that we can avoid catching just exception...
|
||||||
|
raise ValueError("Expected msgpack decodable data: %s" % e)
|
||||||
|
else:
|
||||||
|
return _check_decoded_type(data, root_types=root_types)
|
||||||
|
|
||||||
|
|
||||||
|
def decode_json(raw_data, root_types=(dict,)):
|
||||||
|
"""Parse raw data to get decoded object.
|
||||||
|
|
||||||
|
Decodes a JSON encoded 'blob' from a given raw data binary string and
|
||||||
|
checks that the root type of that decoded object is in the allowed set of
|
||||||
|
types (by default a dict should be the root type).
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
data = jsonutils.loads(binary_decode(raw_data))
|
||||||
|
except UnicodeDecodeError as e:
|
||||||
|
raise ValueError("Expected UTF-8 decodable data: %s" % e)
|
||||||
|
except ValueError as e:
|
||||||
|
raise ValueError("Expected JSON decodable data: %s" % e)
|
||||||
|
else:
|
||||||
|
return _check_decoded_type(data, root_types=root_types)
|
||||||
|
|
||||||
|
|
||||||
class cachedproperty(object):
|
class cachedproperty(object):
|
||||||
"""A *thread-safe* descriptor property that is only evaluated once.
|
"""A *thread-safe* descriptor property that is only evaluated once.
|
||||||
|
|
||||||
|
|||||||
133
taskflow/utils/redis_utils.py
Normal file
133
taskflow/utils/redis_utils.py
Normal file
@@ -0,0 +1,133 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import enum
|
||||||
|
|
||||||
|
import redis
|
||||||
|
from redis import exceptions as redis_exceptions
|
||||||
|
import six
|
||||||
|
|
||||||
|
|
||||||
|
def _raise_on_closed(meth):
|
||||||
|
|
||||||
|
@six.wraps(meth)
|
||||||
|
def wrapper(self, *args, **kwargs):
|
||||||
|
if self.closed:
|
||||||
|
raise redis_exceptions.ConnectionError("Connection has been"
|
||||||
|
" closed")
|
||||||
|
return meth(self, *args, **kwargs)
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
class RedisClient(redis.StrictRedis):
|
||||||
|
"""A redis client that can be closed (and raises on-usage after closed).
|
||||||
|
|
||||||
|
TODO(harlowja): if https://github.com/andymccurdy/redis-py/issues/613 ever
|
||||||
|
gets resolved or merged or other then we can likely remove this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(RedisClient, self).__init__(*args, **kwargs)
|
||||||
|
self.closed = False
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.closed = True
|
||||||
|
self.connection_pool.disconnect()
|
||||||
|
|
||||||
|
execute_command = _raise_on_closed(redis.StrictRedis.execute_command)
|
||||||
|
transaction = _raise_on_closed(redis.StrictRedis.transaction)
|
||||||
|
pubsub = _raise_on_closed(redis.StrictRedis.pubsub)
|
||||||
|
|
||||||
|
|
||||||
|
class UnknownExpire(enum.IntEnum):
|
||||||
|
"""Non-expiry (not ttls) results return from :func:`.get_expiry`.
|
||||||
|
|
||||||
|
See: http://redis.io/commands/ttl or http://redis.io/commands/pttl
|
||||||
|
"""
|
||||||
|
|
||||||
|
DOES_NOT_EXPIRE = -1
|
||||||
|
"""
|
||||||
|
The command returns ``-1`` if the key exists but has no associated expire.
|
||||||
|
"""
|
||||||
|
|
||||||
|
#: The command returns ``-2`` if the key does not exist.
|
||||||
|
KEY_NOT_FOUND = -2
|
||||||
|
|
||||||
|
|
||||||
|
DOES_NOT_EXPIRE = UnknownExpire.DOES_NOT_EXPIRE
|
||||||
|
KEY_NOT_FOUND = UnknownExpire.KEY_NOT_FOUND
|
||||||
|
|
||||||
|
_UNKNOWN_EXPIRE_MAPPING = dict((e.value, e) for e in list(UnknownExpire))
|
||||||
|
|
||||||
|
|
||||||
|
def get_expiry(client, key, prior_version=None):
|
||||||
|
"""Gets an expiry for a key (using **best** determined ttl method)."""
|
||||||
|
is_new_enough, _prior_version = is_server_new_enough(
|
||||||
|
client, (2, 6), prior_version=prior_version)
|
||||||
|
if is_new_enough:
|
||||||
|
result = client.pttl(key)
|
||||||
|
try:
|
||||||
|
return _UNKNOWN_EXPIRE_MAPPING[result]
|
||||||
|
except KeyError:
|
||||||
|
return result / 1000.0
|
||||||
|
else:
|
||||||
|
result = client.ttl(key)
|
||||||
|
try:
|
||||||
|
return _UNKNOWN_EXPIRE_MAPPING[result]
|
||||||
|
except KeyError:
|
||||||
|
return float(result)
|
||||||
|
|
||||||
|
|
||||||
|
def apply_expiry(client, key, expiry, prior_version=None):
|
||||||
|
"""Applies an expiry to a key (using **best** determined expiry method)."""
|
||||||
|
is_new_enough, _prior_version = is_server_new_enough(
|
||||||
|
client, (2, 6), prior_version=prior_version)
|
||||||
|
if is_new_enough:
|
||||||
|
# Use milliseconds (as that is what pexpire uses/expects...)
|
||||||
|
ms_expiry = expiry * 1000.0
|
||||||
|
ms_expiry = max(0, int(ms_expiry))
|
||||||
|
result = client.pexpire(key, ms_expiry)
|
||||||
|
else:
|
||||||
|
# Only supports seconds (not subseconds...)
|
||||||
|
sec_expiry = int(expiry)
|
||||||
|
sec_expiry = max(0, sec_expiry)
|
||||||
|
result = client.expire(key, sec_expiry)
|
||||||
|
return bool(result)
|
||||||
|
|
||||||
|
|
||||||
|
def is_server_new_enough(client, min_version,
|
||||||
|
default=False, prior_version=None):
|
||||||
|
"""Checks if a client is attached to a new enough redis server."""
|
||||||
|
if not prior_version:
|
||||||
|
try:
|
||||||
|
server_info = client.info()
|
||||||
|
except redis_exceptions.ResponseError:
|
||||||
|
server_info = {}
|
||||||
|
version_text = server_info.get('redis_version', '')
|
||||||
|
else:
|
||||||
|
version_text = prior_version
|
||||||
|
version_pieces = []
|
||||||
|
for p in version_text.split("."):
|
||||||
|
try:
|
||||||
|
version_pieces.append(int(p))
|
||||||
|
except ValueError:
|
||||||
|
break
|
||||||
|
if not version_pieces:
|
||||||
|
return (default, version_text)
|
||||||
|
else:
|
||||||
|
version_pieces = tuple(version_pieces)
|
||||||
|
return (version_pieces >= min_version, version_text)
|
||||||
@@ -15,6 +15,9 @@ kombu>=3.0.7
|
|||||||
zake>=0.1.6 # Apache-2.0
|
zake>=0.1.6 # Apache-2.0
|
||||||
kazoo>=2.2
|
kazoo>=2.2
|
||||||
|
|
||||||
|
# Used for testing redis jobboards
|
||||||
|
redis>=2.10.0
|
||||||
|
|
||||||
# Used for testing database persistence backends.
|
# Used for testing database persistence backends.
|
||||||
SQLAlchemy<1.1.0,>=0.9.7
|
SQLAlchemy<1.1.0,>=0.9.7
|
||||||
alembic>=0.7.2
|
alembic>=0.7.2
|
||||||
|
|||||||
Reference in New Issue
Block a user