Merge tag '0.3.0' into debian/kilo

oslo.concurrency 0.3.0 release
This commit is contained in:
Thomas Goirand
2014-12-23 10:44:48 +08:00
30 changed files with 1472 additions and 92 deletions

1
.gitignore vendored
View File

@@ -41,6 +41,7 @@ output/*/index.html
# Sphinx
doc/build
doc/source/api
# pbr generates these
AUTHORS

View File

@@ -2,6 +2,6 @@
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \
OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \
OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \
${PYTHON:-python} -m subunit.run discover -t ./ ./tests $LISTOPT $IDOPTION
${PYTHON:-python} -m subunit.run discover -t ./ . $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list

View File

@@ -2,7 +2,9 @@
oslo.concurrency
==================
oslo.concurrency library
Oslo concurrency library has utilities for safely running multi-thread,
multi-process applications using locking mechanisms and for running
external processes.
* Free software: Apache license
* Documentation: http://docs.openstack.org/developer/oslo.concurrency

View File

@@ -36,12 +36,15 @@ extensions = [
# files.
exclude_patterns = [
'api/tests.*', # avoid of docs generation from tests
'api/oslo.concurrency.openstack.common.*', # skip common modules
'api/oslo.concurrency._*', # skip private modules
'api/oslo.concurrency.*', # skip deprecated import from namespace package
'api/oslo_concurrency.openstack.common.*', # skip common modules
'api/oslo_concurrency._*', # skip private modules
]
# Prune the excluded patterns from the autoindex
for line in fileinput.input('api/autoindex.rst', inplace=True):
PATH = 'api/autoindex.rst'
if os.path.isfile(PATH) and os.access(PATH, os.R_OK):
for line in fileinput.input(PATH, inplace=True):
found = False
for pattern in exclude_patterns:
if fnmatch.fnmatch(line, '*' + pattern[4:]):

View File

@@ -9,8 +9,8 @@ msgid ""
msgstr ""
"Project-Id-Version: oslo.concurrency\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2014-10-25 06:11+0000\n"
"PO-Revision-Date: 2014-10-25 02:29+0000\n"
"POT-Creation-Date: 2014-10-29 06:10+0000\n"
"PO-Revision-Date: 2014-10-28 20:48+0000\n"
"Last-Translator: openstackjenkins <jenkins@openstack.org>\n"
"Language-Team: English (United Kingdom) "
"(http://www.transifex.com/projects/p/osloconcurrency/language/en_GB/)\n"
@@ -20,11 +20,21 @@ msgstr ""
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 1.3\n"
#: oslo/concurrency/lockutils.py:115
#: oslo/concurrency/lockutils.py:120
#, python-format
msgid "Unable to acquire lock on `%(filename)s` due to %(exception)s"
msgstr "Unable to acquire lock on `%(filename)s` due to %(exception)s"
#: oslo/concurrency/lockutils.py:134
msgid "Unable to release an unacquired lock"
msgstr ""
#: oslo/concurrency/lockutils.py:385
msgid ""
"Calling lockutils directly is no longer supported. Please use the "
"lockutils-wrapper console script instead."
msgstr ""
#: oslo/concurrency/processutils.py:69
msgid "Unexpected error while running command."
msgstr "Unexpected error while running command."

View File

@@ -7,9 +7,9 @@
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: oslo.concurrency 0.1.0.4.g6c5f5bf\n"
"Project-Id-Version: oslo.concurrency 0.2.0.2.gd5ea62c\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2014-10-25 06:11+0000\n"
"POT-Creation-Date: 2014-10-29 06:10+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
@@ -18,11 +18,21 @@ msgstr ""
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 1.3\n"
#: oslo/concurrency/lockutils.py:115
#: oslo/concurrency/lockutils.py:120
#, python-format
msgid "Unable to acquire lock on `%(filename)s` due to %(exception)s"
msgstr ""
#: oslo/concurrency/lockutils.py:134
msgid "Unable to release an unacquired lock"
msgstr ""
#: oslo/concurrency/lockutils.py:385
msgid ""
"Calling lockutils directly is no longer supported. Please use the "
"lockutils-wrapper console script instead."
msgstr ""
#: oslo/concurrency/processutils.py:69
msgid "Unexpected error while running command."
msgstr ""

View File

@@ -0,0 +1,29 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import warnings
from oslo_concurrency import lockutils # noqa
from oslo_concurrency import processutils # noqa
def deprecated():
new_name = __name__.replace('.', '_')
warnings.warn(
('The oslo namespace package is deprecated. Please use %s instead.' %
new_name),
DeprecationWarning,
stacklevel=1,
)
deprecated()

View File

@@ -0,0 +1,13 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_concurrency.fixture import lockutils # noqa

View File

@@ -16,7 +16,7 @@
from oslo import i18n
_translators = i18n.TranslatorFactory(domain='oslo.concurrency')
_translators = i18n.TranslatorFactory(domain='oslo_concurrency')
# The primary translation function using the well-known name "_"
_ = _translators.primary

View File

@@ -14,8 +14,9 @@
# under the License.
import fixtures
from oslo.config import fixture as config
from oslo.concurrency import lockutils
from oslo_concurrency import lockutils
class LockFixture(fixtures.Fixture):
@@ -49,3 +50,27 @@ class LockFixture(fixtures.Fixture):
super(LockFixture, self).setUp()
self.addCleanup(self.mgr.__exit__, None, None, None)
self.lock = self.mgr.__enter__()
class ExternalLockFixture(fixtures.Fixture):
"""Configure lock_path so external locks can be used in unit tests.
Creates a temporary directory to hold file locks and sets the oslo.config
lock_path opt to use it. This can be used to enable external locking
on a per-test basis, rather than globally with the OSLO_LOCK_PATH
environment variable.
Example::
def test_method(self):
self.useFixture(ExternalLockFixture())
something_that_needs_external_locks()
Alternatively, the useFixture call could be placed in a test class's
setUp method to provide this functionality to all tests in the class.
"""
def setUp(self):
super(ExternalLockFixture, self).setUp()
temp_dir = self.useFixture(fixtures.TempDir())
conf = self.useFixture(config.Config(lockutils.CONF)).config
conf(lock_path=temp_dir.path, group='oslo_concurrency')

View File

@@ -28,10 +28,11 @@ import weakref
from oslo.config import cfg
from oslo.config import cfgfilter
import retrying
import six
from oslo.concurrency._i18n import _, _LE, _LI
from oslo.concurrency.openstack.common import fileutils
from oslo_concurrency._i18n import _, _LE, _LI
from oslo_concurrency.openstack.common import fileutils
LOG = logging.getLogger(__name__)
@@ -64,6 +65,89 @@ def set_defaults(lock_path):
cfg.set_defaults(_opts, lock_path=lock_path)
class _Hourglass(object):
"""A hourglass like periodic timer."""
def __init__(self, period):
self._period = period
self._last_flipped = None
def flip(self):
"""Flips the hourglass.
The drain() method will now only return true until the period
is reached again.
"""
self._last_flipped = time.time()
def drain(self):
"""Drains the hourglass, returns True if period reached."""
if self._last_flipped is None:
return True
else:
elapsed = max(0, time.time() - self._last_flipped)
return elapsed >= self._period
def _lock_retry(delay, filename,
# These parameters trigger logging to begin after a certain
# amount of time has elapsed where the lock couldn't be
# acquired (log statements will be emitted after that duration
# at the provided periodicity).
log_begins_after=1.0, log_periodicity=0.5):
"""Retry logic that acquiring a lock will go through."""
# If this returns True, a retry attempt will occur (using the defined
# retry policy we have requested the retrying library to apply), if it
# returns False then the original exception will be re-raised (if it
# raises a new or different exception the original exception will be
# replaced with that one and raised).
def retry_on_exception(e):
# TODO(harlowja): once/if https://github.com/rholder/retrying/pull/20
# gets merged we should just switch to using that to avoid having to
# catch and inspect all execeptions (and there types...)
if isinstance(e, IOError) and e.errno in (errno.EACCES, errno.EAGAIN):
return True
raise threading.ThreadError(_("Unable to acquire lock on"
" `%(filename)s` due to"
" %(exception)s") %
{
'filename': filename,
'exception': e,
})
# Logs all attempts (with information about how long we have been trying
# to acquire the underlying lock...); after a threshold has been passed,
# and only at a fixed rate...
def never_stop(hg, attempt_number, delay_since_first_attempt_ms):
delay_since_first_attempt = delay_since_first_attempt_ms / 1000.0
if delay_since_first_attempt >= log_begins_after:
if hg.drain():
LOG.debug("Attempting to acquire %s (delayed %0.2f seconds)",
filename, delay_since_first_attempt)
hg.flip()
return False
# The retrying library seems to prefer milliseconds for some reason; this
# might be changed in (see: https://github.com/rholder/retrying/issues/6)
# someday in the future...
delay_ms = delay * 1000.0
def decorator(func):
@six.wraps(func)
def wrapper(*args, **kwargs):
hg = _Hourglass(log_periodicity)
r = retrying.Retrying(wait_fixed=delay_ms,
retry_on_exception=retry_on_exception,
stop_func=functools.partial(never_stop, hg))
return r.call(func, *args, **kwargs)
return wrapper
return decorator
class _FileLock(object):
"""Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
@@ -85,10 +169,13 @@ class _FileLock(object):
def __init__(self, name):
self.lockfile = None
self.fname = name
self.acquire_time = None
def acquire(self, delay=0.01):
if delay < 0:
raise ValueError("Delay must be greater than or equal to zero")
def acquire(self):
basedir = os.path.dirname(self.fname)
if not os.path.exists(basedir):
fileutils.ensure_tree(basedir)
LOG.info(_LI('Created lock path: %s'), basedir)
@@ -97,43 +184,35 @@ class _FileLock(object):
# the target file. This eliminates the possibility of an attacker
# creating a symlink to an important file in our lock_path.
self.lockfile = open(self.fname, 'a')
start_time = time.time()
while True:
try:
# Using non-blocking locks since green threads are not
# patched to deal with blocking locking calls.
# Also upon reading the MSDN docs for locking(), it seems
# to have a laughable 10 attempts "blocking" mechanism.
self.trylock()
# Using non-blocking locks (with retries) since green threads are not
# patched to deal with blocking locking calls. Also upon reading the
# MSDN docs for locking(), it seems to have a 'laughable' 10
# attempts "blocking" mechanism.
do_acquire = _lock_retry(delay=delay,
filename=self.fname)(self.trylock)
do_acquire()
self.acquire_time = time.time()
LOG.debug('Acquired file lock "%s" after waiting %0.3fs',
self.fname, (self.acquire_time - start_time))
return True
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
# external locks synchronise things like iptables
# updates - give it some time to prevent busy spinning
time.sleep(0.01)
else:
raise threading.ThreadError(_("Unable to acquire lock on"
" `%(filename)s` due to"
" %(exception)s") %
{
'filename': self.fname,
'exception': e,
})
def __enter__(self):
self.acquire()
return self
def release(self):
if self.acquire_time is None:
raise threading.ThreadError(_("Unable to release an unacquired"
" lock"))
try:
release_time = time.time()
LOG.debug('Releasing file lock "%s" after holding it for %0.3fs',
self.fname, (release_time - self.acquire_time))
self.unlock()
self.acquire_time = None
except IOError:
LOG.exception(_LE("Could not unlock the acquired lock `%s`"),
self.fname)
@@ -180,8 +259,42 @@ else:
import fcntl
InterProcessLock = _FcntlLock
_semaphores = weakref.WeakValueDictionary()
_semaphores_lock = threading.Lock()
class Semaphores(object):
"""A garbage collected container of semaphores.
This collection internally uses a weak value dictionary so that when a
semaphore is no longer in use (by any threads) it will automatically be
removed from this container by the garbage collector.
"""
def __init__(self):
self._semaphores = weakref.WeakValueDictionary()
self._lock = threading.Lock()
def get(self, name):
"""Gets (or creates) a semaphore with a given name.
:param name: The semaphore name to get/create (used to associate
previously created names with the same semaphore).
Returns an newly constructed semaphore (or an existing one if it was
already created for the given name).
"""
with self._lock:
try:
return self._semaphores[name]
except KeyError:
sem = threading.Semaphore()
self._semaphores[name] = sem
return sem
def __len__(self):
"""Returns how many semaphores exist at the current time."""
return len(self._semaphores)
_semaphores = Semaphores()
def _get_lock_path(name, lock_file_prefix, lock_path=None):
@@ -206,11 +319,12 @@ def external_lock(name, lock_file_prefix=None, lock_path=None):
return InterProcessLock(lock_file_path)
def remove_external_lock_file(name, lock_file_prefix=None, lock_path=None):
def remove_external_lock_file(name, lock_file_prefix=None, lock_path=None,
semaphores=None):
"""Remove an external lock file when it's not used anymore
This will be helpful when we have a lot of lock files
"""
with internal_lock(name):
with internal_lock(name, semaphores=semaphores):
lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path)
try:
os.remove(lock_file_path)
@@ -219,20 +333,15 @@ def remove_external_lock_file(name, lock_file_prefix=None, lock_path=None):
{'file': lock_file_path})
def internal_lock(name):
with _semaphores_lock:
try:
sem = _semaphores[name]
except KeyError:
sem = threading.Semaphore()
_semaphores[name] = sem
return sem
def internal_lock(name, semaphores=None):
if semaphores is None:
semaphores = _semaphores
return semaphores.get(name)
@contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None,
do_log=True):
do_log=True, semaphores=None, delay=0.01):
"""Context based lock
This function yields a `threading.Semaphore` instance (if we don't use
@@ -254,16 +363,26 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
:param do_log: Whether to log acquire/release messages. This is primarily
intended to reduce log message duplication when `lock` is used from the
`synchronized` decorator.
:param semaphores: Container that provides semaphores to use when locking.
This ensures that threads inside the same application can not collide,
due to the fact that external process locks are unaware of a processes
active threads.
:param delay: Delay between acquisition attempts (in seconds).
"""
int_lock = internal_lock(name)
int_lock = internal_lock(name, semaphores=semaphores)
with int_lock:
if do_log:
LOG.debug('Acquired semaphore "%(lock)s"', {'lock': name})
try:
if external and not CONF.oslo_concurrency.disable_process_locking:
ext_lock = external_lock(name, lock_file_prefix, lock_path)
with ext_lock:
ext_lock.acquire(delay=delay)
try:
yield ext_lock
finally:
ext_lock.release()
else:
yield int_lock
finally:
@@ -271,7 +390,8 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
LOG.debug('Releasing semaphore "%(lock)s"', {'lock': name})
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
semaphores=None, delay=0.01):
"""Synchronization decorator.
Decorating a method like so::
@@ -302,7 +422,7 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
t2 = None
try:
with lock(name, lock_file_prefix, external, lock_path,
do_log=False):
do_log=False, semaphores=semaphores, delay=delay):
t2 = time.time()
LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: '
'waited %(wait_secs)0.3fs',
@@ -353,10 +473,10 @@ def _lock_wrapper(argv):
"""Create a dir for locks and pass it to command from arguments
This is exposed as a console script entry point named
oslo-concurrency-lock-wrapper
lockutils-wrapper
If you run this:
oslo-concurrency-lock-wrapper python setup.py testr <etc>
lockutils-wrapper python setup.py testr <etc>
a temporary directory will be created for all your locks and passed to all
your tests in an environment variable. The temporary dir will be deleted
@@ -374,3 +494,9 @@ def _lock_wrapper(argv):
def main():
sys.exit(_lock_wrapper(sys.argv))
if __name__ == '__main__':
raise NotImplementedError(_('Calling lockutils directly is no longer '
'supported. Please use the '
'lockutils-wrapper console script instead.'))

View File

@@ -20,7 +20,7 @@ __all__ = [
import copy
from oslo.concurrency import lockutils
from oslo_concurrency import lockutils
def list_opts():
@@ -34,7 +34,7 @@ def list_opts():
registered. A group name of None corresponds to the [DEFAULT] group in
config files.
This function is also discoverable via the 'oslo.concurrency' entry point
This function is also discoverable via the 'oslo_concurrency' entry point
under the 'oslo.config.opts' namespace.
The purpose of this is to allow tools like the Oslo sample config file

View File

@@ -29,7 +29,7 @@ from oslo.utils import importutils
from oslo.utils import strutils
import six
from oslo.concurrency._i18n import _
from oslo_concurrency._i18n import _
# NOTE(bnemec): eventlet doesn't monkey patch subprocess, so we need to
@@ -181,7 +181,7 @@ def execute(*cmd, **kwargs):
'specify a root helper.'))
cmd = shlex.split(root_helper) + list(cmd)
cmd = map(str, cmd)
cmd = [str(c) for c in cmd]
sanitized_cmd = strutils.mask_password(' '.join(cmd))
while attempts > 0:

View File

View File

View File

@@ -12,12 +12,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import errno
import fcntl
import multiprocessing
import os
import shutil
import signal
import subprocess
import sys
import tempfile
import threading
@@ -27,9 +29,9 @@ from oslo.config import cfg
from oslotest import base as test_base
import six
from oslo.concurrency.fixture import lockutils as fixtures
from oslo.concurrency import lockutils
from oslo.config import fixture as config
from oslo_concurrency.fixture import lockutils as fixtures
from oslo_concurrency import lockutils
class LockTestCase(test_base.BaseTestCase):
@@ -92,6 +94,30 @@ class LockTestCase(test_base.BaseTestCase):
except IOError:
pass
def test_lock_internally_different_collections(self):
s1 = lockutils.Semaphores()
s2 = lockutils.Semaphores()
trigger = threading.Event()
who_ran = collections.deque()
def f(name, semaphores, pull_trigger):
with lockutils.internal_lock('testing', semaphores=semaphores):
if pull_trigger:
trigger.set()
else:
trigger.wait()
who_ran.append(name)
threads = [
threading.Thread(target=f, args=(1, s1, True)),
threading.Thread(target=f, args=(2, s2, False)),
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
self.assertEqual([1, 2], sorted(who_ran))
def test_lock_internally(self):
"""We can lock across multiple threads."""
saved_sem_num = len(lockutils._semaphores)
@@ -520,6 +546,13 @@ class LockutilsModuleTestCase(test_base.BaseTestCase):
retval = lockutils._lock_wrapper(argv)
self.assertEqual(retval, 1)
def test_direct_call_explodes(self):
cmd = [sys.executable, '-m', 'oslo_concurrency.lockutils']
with open(os.devnull, 'w') as devnull:
retval = subprocess.call(cmd, stderr=devnull)
# 1 for Python 2.7 and 3.x, 255 for 2.6
self.assertIn(retval, [1, 255])
class TestLockFixture(test_base.BaseTestCase):

View File

@@ -20,7 +20,7 @@ import eventlet
from eventlet import greenpool
from oslotest import base as test_base
from oslo.concurrency import lockutils
from oslo_concurrency import lockutils
class TestFileLocks(test_base.BaseTestCase):

View File

@@ -27,7 +27,7 @@ import mock
from oslotest import base as test_base
import six
from oslo.concurrency import processutils
from oslo_concurrency import processutils
from oslotest import mockpatch
PROCESS_EXECUTION_ERROR_LOGGING_TEST = """#!/bin/bash
exit 41"""
@@ -142,7 +142,7 @@ exit 1
self.assertRaises(processutils.ProcessExecutionError,
processutils.execute,
tmpfilename, tmpfilename2, attempts=10,
process_input='foo',
process_input=b'foo',
delay_on_retry=False)
fp = open(tmpfilename2, 'r')
runs = fp.read()
@@ -199,7 +199,7 @@ grep foo
os.chmod(tmpfilename, 0o755)
processutils.execute(tmpfilename,
tmpfilename2,
process_input='foo',
process_input=b'foo',
attempts=2)
finally:
os.unlink(tmpfilename)
@@ -295,7 +295,7 @@ grep foo
out, err = processutils.execute('/usr/bin/env', env_variables=env_vars)
self.assertIn('SUPER_UNIQUE_VAR=The answer is 42', out)
self.assertIn(b'SUPER_UNIQUE_VAR=The answer is 42', out)
def test_exception_and_masking(self):
tmpfilename = self.create_tempfiles(
@@ -314,8 +314,8 @@ grep foo
'something')
self.assertEqual(38, err.exit_code)
self.assertEqual(err.stdout, 'onstdout --password="***"\n')
self.assertEqual(err.stderr, 'onstderr --password="***"\n')
self.assertIn('onstdout --password="***"', err.stdout)
self.assertIn('onstderr --password="***"', err.stderr)
self.assertEqual(err.cmd, ' '.join([tmpfilename,
'password="***"',
'something']))
@@ -396,14 +396,14 @@ def fake_execute_raises(*cmd, **kwargs):
class TryCmdTestCase(test_base.BaseTestCase):
def test_keep_warnings(self):
self.useFixture(fixtures.MonkeyPatch(
'oslo.concurrency.processutils.execute', fake_execute))
'oslo_concurrency.processutils.execute', fake_execute))
o, e = processutils.trycmd('this is a command'.split(' '))
self.assertNotEqual('', o)
self.assertNotEqual('', e)
def test_keep_warnings_from_raise(self):
self.useFixture(fixtures.MonkeyPatch(
'oslo.concurrency.processutils.execute', fake_execute_raises))
'oslo_concurrency.processutils.execute', fake_execute_raises))
o, e = processutils.trycmd('this is a command'.split(' '),
discard_warnings=True)
self.assertIsNotNone(o)
@@ -411,7 +411,7 @@ class TryCmdTestCase(test_base.BaseTestCase):
def test_discard_warnings(self):
self.useFixture(fixtures.MonkeyPatch(
'oslo.concurrency.processutils.execute', fake_execute))
'oslo_concurrency.processutils.execute', fake_execute))
o, e = processutils.trycmd('this is a command'.split(' '),
discard_warnings=True)
self.assertIsNotNone(o)

View File

@@ -11,3 +11,4 @@ oslo.i18n>=1.0.0 # Apache-2.0
oslo.utils>=1.0.0 # Apache-2.0
posix_ipc
six>=1.7.0
retrying>=1.2.2,!=1.3.0 # Apache-2.0

View File

@@ -11,3 +11,4 @@ oslo.i18n>=1.0.0 # Apache-2.0
oslo.utils>=1.0.0 # Apache-2.0
posix_ipc
six>=1.7.0
retrying>=1.2.2,!=1.3.0 # Apache-2.0

View File

@@ -23,14 +23,17 @@ classifier =
packages =
oslo
oslo.concurrency
oslo.concurrency.fixture
oslo_concurrency
oslo_concurrency.fixture
namespace_packages =
oslo
[entry_points]
oslo.config.opts =
oslo.concurrency = oslo.concurrency.opts:list_opts
oslo.concurrency = oslo_concurrency.opts:list_opts
console_scripts =
lockutils-wrapper = oslo.concurrency.lockutils:main
lockutils-wrapper = oslo_concurrency.lockutils:main
[build_sphinx]
source-dir = doc/source

575
tests/test_lockutils.py Normal file
View File

@@ -0,0 +1,575 @@
# Copyright 2011 Justin Santa Barbara
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import errno
import fcntl
import multiprocessing
import os
import shutil
import signal
import subprocess
import sys
import tempfile
import threading
import time
from oslo.config import cfg
from oslotest import base as test_base
import six
from oslo.concurrency.fixture import lockutils as fixtures # noqa
from oslo.concurrency import lockutils # noqa
from oslo.config import fixture as config
class LockTestCase(test_base.BaseTestCase):
def setUp(self):
super(LockTestCase, self).setUp()
self.config = self.useFixture(config.Config(lockutils.CONF)).config
def test_synchronized_wrapped_function_metadata(self):
@lockutils.synchronized('whatever', 'test-')
def foo():
"""Bar."""
pass
self.assertEqual(foo.__doc__, 'Bar.', "Wrapped function's docstring "
"got lost")
self.assertEqual(foo.__name__, 'foo', "Wrapped function's name "
"got mangled")
def test_lock_acquire_release_file_lock(self):
lock_dir = tempfile.mkdtemp()
lock_file = os.path.join(lock_dir, 'lock')
lock = lockutils._FcntlLock(lock_file)
def try_lock():
try:
my_lock = lockutils._FcntlLock(lock_file)
my_lock.lockfile = open(lock_file, 'w')
my_lock.trylock()
my_lock.unlock()
os._exit(1)
except IOError:
os._exit(0)
def attempt_acquire(count):
children = []
for i in range(count):
child = multiprocessing.Process(target=try_lock)
child.start()
children.append(child)
exit_codes = []
for child in children:
child.join()
exit_codes.append(child.exitcode)
return sum(exit_codes)
self.assertTrue(lock.acquire())
try:
acquired_children = attempt_acquire(10)
self.assertEqual(0, acquired_children)
finally:
lock.release()
try:
acquired_children = attempt_acquire(5)
self.assertNotEqual(0, acquired_children)
finally:
try:
shutil.rmtree(lock_dir)
except IOError:
pass
def test_lock_internally(self):
"""We can lock across multiple threads."""
saved_sem_num = len(lockutils._semaphores)
seen_threads = list()
def f(_id):
with lockutils.lock('testlock2', 'test-', external=False):
for x in range(10):
seen_threads.append(_id)
threads = []
for i in range(10):
thread = threading.Thread(target=f, args=(i,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
self.assertEqual(len(seen_threads), 100)
# Looking at the seen threads, split it into chunks of 10, and verify
# that the last 9 match the first in each chunk.
for i in range(10):
for j in range(9):
self.assertEqual(seen_threads[i * 10],
seen_threads[i * 10 + 1 + j])
self.assertEqual(saved_sem_num, len(lockutils._semaphores),
"Semaphore leak detected")
def test_nested_synchronized_external_works(self):
"""We can nest external syncs."""
tempdir = tempfile.mkdtemp()
try:
self.config(lock_path=tempdir, group='oslo_concurrency')
sentinel = object()
@lockutils.synchronized('testlock1', 'test-', external=True)
def outer_lock():
@lockutils.synchronized('testlock2', 'test-', external=True)
def inner_lock():
return sentinel
return inner_lock()
self.assertEqual(sentinel, outer_lock())
finally:
if os.path.exists(tempdir):
shutil.rmtree(tempdir)
def _do_test_lock_externally(self):
"""We can lock across multiple processes."""
def lock_files(handles_dir):
with lockutils.lock('external', 'test-', external=True):
# Open some files we can use for locking
handles = []
for n in range(50):
path = os.path.join(handles_dir, ('file-%s' % n))
handles.append(open(path, 'w'))
# Loop over all the handles and try locking the file
# without blocking, keep a count of how many files we
# were able to lock and then unlock. If the lock fails
# we get an IOError and bail out with bad exit code
count = 0
for handle in handles:
try:
fcntl.flock(handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
count += 1
fcntl.flock(handle, fcntl.LOCK_UN)
except IOError:
os._exit(2)
finally:
handle.close()
# Check if we were able to open all files
self.assertEqual(50, count)
handles_dir = tempfile.mkdtemp()
try:
children = []
for n in range(50):
pid = os.fork()
if pid:
children.append(pid)
else:
try:
lock_files(handles_dir)
finally:
os._exit(0)
for child in children:
(pid, status) = os.waitpid(child, 0)
if pid:
self.assertEqual(0, status)
finally:
if os.path.exists(handles_dir):
shutil.rmtree(handles_dir, ignore_errors=True)
def test_lock_externally(self):
lock_dir = tempfile.mkdtemp()
self.config(lock_path=lock_dir, group='oslo_concurrency')
try:
self._do_test_lock_externally()
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
def test_lock_externally_lock_dir_not_exist(self):
lock_dir = tempfile.mkdtemp()
os.rmdir(lock_dir)
self.config(lock_path=lock_dir, group='oslo_concurrency')
try:
self._do_test_lock_externally()
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
def test_synchronized_with_prefix(self):
lock_name = 'mylock'
lock_pfix = 'mypfix-'
foo = lockutils.synchronized_with_prefix(lock_pfix)
@foo(lock_name, external=True)
def bar(dirpath, pfix, name):
return True
lock_dir = tempfile.mkdtemp()
self.config(lock_path=lock_dir, group='oslo_concurrency')
self.assertTrue(bar(lock_dir, lock_pfix, lock_name))
def test_synchronized_without_prefix(self):
lock_dir = tempfile.mkdtemp()
self.config(lock_path=lock_dir, group='oslo_concurrency')
@lockutils.synchronized('lock', external=True)
def test_without_prefix():
# We can't check much
pass
try:
test_without_prefix()
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
def test_synchronized_prefix_without_hypen(self):
lock_dir = tempfile.mkdtemp()
self.config(lock_path=lock_dir, group='oslo_concurrency')
@lockutils.synchronized('lock', 'hypen', True)
def test_without_hypen():
# We can't check much
pass
try:
test_without_hypen()
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
def test_contextlock(self):
lock_dir = tempfile.mkdtemp()
self.config(lock_path=lock_dir, group='oslo_concurrency')
try:
# Note(flaper87): Lock is not external, which means
# a semaphore will be yielded
with lockutils.lock("test") as sem:
if six.PY2:
self.assertTrue(isinstance(sem, threading._Semaphore))
else:
self.assertTrue(isinstance(sem, threading.Semaphore))
# NOTE(flaper87): Lock is external so an InterProcessLock
# will be yielded.
with lockutils.lock("test2", external=True) as lock:
self.assertTrue(lock.exists())
with lockutils.lock("test1",
external=True) as lock1:
self.assertTrue(isinstance(lock1,
lockutils.InterProcessLock))
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
def test_contextlock_unlocks(self):
lock_dir = tempfile.mkdtemp()
self.config(lock_path=lock_dir, group='oslo_concurrency')
sem = None
try:
with lockutils.lock("test") as sem:
if six.PY2:
self.assertTrue(isinstance(sem, threading._Semaphore))
else:
self.assertTrue(isinstance(sem, threading.Semaphore))
with lockutils.lock("test2", external=True) as lock:
self.assertTrue(lock.exists())
# NOTE(flaper87): Lock should be free
with lockutils.lock("test2", external=True) as lock:
self.assertTrue(lock.exists())
# NOTE(flaper87): Lock should be free
# but semaphore should already exist.
with lockutils.lock("test") as sem2:
self.assertEqual(sem, sem2)
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
def _test_remove_lock_external_file(self, lock_dir, use_external=False):
lock_name = 'mylock'
lock_pfix = 'mypfix-remove-lock-test-'
if use_external:
lock_path = lock_dir
else:
lock_path = None
lockutils.remove_external_lock_file(lock_name, lock_pfix, lock_path)
for ent in os.listdir(lock_dir):
self.assertRaises(OSError, ent.startswith, lock_pfix)
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
def test_remove_lock_external_file(self):
lock_dir = tempfile.mkdtemp()
self.config(lock_path=lock_dir, group='oslo_concurrency')
self._test_remove_lock_external_file(lock_dir)
def test_remove_lock_external_file_lock_path(self):
lock_dir = tempfile.mkdtemp()
self._test_remove_lock_external_file(lock_dir,
use_external=True)
def test_no_slash_in_b64(self):
# base64(sha1(foobar)) has a slash in it
with lockutils.lock("foobar"):
pass
def test_deprecated_names(self):
paths = self.create_tempfiles([['fake.conf', '\n'.join([
'[DEFAULT]',
'lock_path=foo',
'disable_process_locking=True'])
]])
conf = cfg.ConfigOpts()
conf(['--config-file', paths[0]])
conf.register_opts(lockutils._opts, 'oslo_concurrency')
self.assertEqual(conf.oslo_concurrency.lock_path, 'foo')
self.assertTrue(conf.oslo_concurrency.disable_process_locking)
class BrokenLock(lockutils._FileLock):
def __init__(self, name, errno_code):
super(BrokenLock, self).__init__(name)
self.errno_code = errno_code
def unlock(self):
pass
def trylock(self):
err = IOError()
err.errno = self.errno_code
raise err
class FileBasedLockingTestCase(test_base.BaseTestCase):
def setUp(self):
super(FileBasedLockingTestCase, self).setUp()
self.lock_dir = tempfile.mkdtemp()
def test_lock_file_exists(self):
lock_file = os.path.join(self.lock_dir, 'lock-file')
@lockutils.synchronized('lock-file', external=True,
lock_path=self.lock_dir)
def foo():
self.assertTrue(os.path.exists(lock_file))
foo()
def test_bad_acquire(self):
lock_file = os.path.join(self.lock_dir, 'lock')
lock = BrokenLock(lock_file, errno.EBUSY)
self.assertRaises(threading.ThreadError, lock.acquire)
def test_interprocess_lock(self):
lock_file = os.path.join(self.lock_dir, 'processlock')
pid = os.fork()
if pid:
# Make sure the child grabs the lock first
start = time.time()
while not os.path.exists(lock_file):
if time.time() - start > 5:
self.fail('Timed out waiting for child to grab lock')
time.sleep(0)
lock1 = lockutils.InterProcessLock('foo')
lock1.lockfile = open(lock_file, 'w')
# NOTE(bnemec): There is a brief window between when the lock file
# is created and when it actually becomes locked. If we happen to
# context switch in that window we may succeed in locking the
# file. Keep retrying until we either get the expected exception
# or timeout waiting.
while time.time() - start < 5:
try:
lock1.trylock()
lock1.unlock()
time.sleep(0)
except IOError:
# This is what we expect to happen
break
else:
self.fail('Never caught expected lock exception')
# We don't need to wait for the full sleep in the child here
os.kill(pid, signal.SIGKILL)
else:
try:
lock2 = lockutils.InterProcessLock('foo')
lock2.lockfile = open(lock_file, 'w')
have_lock = False
while not have_lock:
try:
lock2.trylock()
have_lock = True
except IOError:
pass
finally:
# NOTE(bnemec): This is racy, but I don't want to add any
# synchronization primitives that might mask a problem
# with the one we're trying to test here.
time.sleep(.5)
os._exit(0)
def test_interthread_external_lock(self):
call_list = []
@lockutils.synchronized('foo', external=True, lock_path=self.lock_dir)
def foo(param):
"""Simulate a long-running threaded operation."""
call_list.append(param)
# NOTE(bnemec): This is racy, but I don't want to add any
# synchronization primitives that might mask a problem
# with the one we're trying to test here.
time.sleep(.5)
call_list.append(param)
def other(param):
foo(param)
thread = threading.Thread(target=other, args=('other',))
thread.start()
# Make sure the other thread grabs the lock
# NOTE(bnemec): File locks do not actually work between threads, so
# this test is verifying that the local semaphore is still enforcing
# external locks in that case. This means this test does not have
# the same race problem as the process test above because when the
# file is created the semaphore has already been grabbed.
start = time.time()
while not os.path.exists(os.path.join(self.lock_dir, 'foo')):
if time.time() - start > 5:
self.fail('Timed out waiting for thread to grab lock')
time.sleep(0)
thread1 = threading.Thread(target=other, args=('main',))
thread1.start()
thread1.join()
thread.join()
self.assertEqual(call_list, ['other', 'other', 'main', 'main'])
def test_non_destructive(self):
lock_file = os.path.join(self.lock_dir, 'not-destroyed')
with open(lock_file, 'w') as f:
f.write('test')
with lockutils.lock('not-destroyed', external=True,
lock_path=self.lock_dir):
with open(lock_file) as f:
self.assertEqual(f.read(), 'test')
class LockutilsModuleTestCase(test_base.BaseTestCase):
def setUp(self):
super(LockutilsModuleTestCase, self).setUp()
self.old_env = os.environ.get('OSLO_LOCK_PATH')
if self.old_env is not None:
del os.environ['OSLO_LOCK_PATH']
def tearDown(self):
if self.old_env is not None:
os.environ['OSLO_LOCK_PATH'] = self.old_env
super(LockutilsModuleTestCase, self).tearDown()
def test_main(self):
script = '\n'.join([
'import os',
'lock_path = os.environ.get("OSLO_LOCK_PATH")',
'assert lock_path is not None',
'assert os.path.isdir(lock_path)',
])
argv = ['', sys.executable, '-c', script]
retval = lockutils._lock_wrapper(argv)
self.assertEqual(retval, 0, "Bad OSLO_LOCK_PATH has been set")
def test_return_value_maintained(self):
script = '\n'.join([
'import sys',
'sys.exit(1)',
])
argv = ['', sys.executable, '-c', script]
retval = lockutils._lock_wrapper(argv)
self.assertEqual(retval, 1)
def test_direct_call_explodes(self):
cmd = [sys.executable, '-m', 'oslo_concurrency.lockutils']
with open(os.devnull, 'w') as devnull:
retval = subprocess.call(cmd, stderr=devnull)
# 1 for Python 2.7 and 3.x, 255 for 2.6
self.assertIn(retval, [1, 255])
class TestLockFixture(test_base.BaseTestCase):
def setUp(self):
super(TestLockFixture, self).setUp()
self.config = self.useFixture(config.Config(lockutils.CONF)).config
self.tempdir = tempfile.mkdtemp()
def _check_in_lock(self):
self.assertTrue(self.lock.exists())
def tearDown(self):
self._check_in_lock()
super(TestLockFixture, self).tearDown()
def test_lock_fixture(self):
# Setup lock fixture to test that teardown is inside the lock
self.config(lock_path=self.tempdir, group='oslo_concurrency')
fixture = fixtures.LockFixture('test-lock')
self.useFixture(fixture)
self.lock = fixture.lock
class TestExternalLockFixture(test_base.BaseTestCase):
def test_fixture(self):
# NOTE(bnemec): This test case is only valid if lockutils-wrapper is
# _not_ in use. Otherwise lock_path will be set on lockutils import
# and this test will pass regardless of whether the fixture is used.
self.useFixture(fixtures.ExternalLockFixture())
# This will raise an exception if lock_path is not set
with lockutils.external_lock('foo'):
pass
def test_with_existing_config_fixture(self):
# Make sure the config fixture in the ExternalLockFixture doesn't
# cause any issues for tests using their own config fixture.
conf = self.useFixture(config.Config())
self.useFixture(fixtures.ExternalLockFixture())
with lockutils.external_lock('bar'):
conf.register_opt(cfg.StrOpt('foo'))
conf.config(foo='bar')
self.assertEqual(cfg.CONF.foo, 'bar')
# Due to config filter, lock_path should still not be present in
# the global config opt.
self.assertFalse(hasattr(cfg.CONF, 'lock_path'))

519
tests/test_processutils.py Normal file
View File

@@ -0,0 +1,519 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import errno
import logging
import multiprocessing
import os
import stat
import tempfile
import fixtures
import mock
from oslotest import base as test_base
from oslotest import mockpatch
import six
from oslo.concurrency import processutils # noqa
PROCESS_EXECUTION_ERROR_LOGGING_TEST = """#!/bin/bash
exit 41"""
TEST_EXCEPTION_AND_MASKING_SCRIPT = """#!/bin/bash
# This is to test stdout and stderr
# and the command returned in an exception
# when a non-zero exit code is returned
echo onstdout --password='"secret"'
echo onstderr --password='"secret"' 1>&2
exit 38"""
class UtilsTest(test_base.BaseTestCase):
# NOTE(jkoelker) Moar tests from nova need to be ported. But they
# need to be mock'd out. Currently they require actually
# running code.
def test_execute_unknown_kwargs(self):
self.assertRaises(processutils.UnknownArgumentError,
processutils.execute,
hozer=True)
@mock.patch.object(multiprocessing, 'cpu_count', return_value=8)
def test_get_worker_count(self, mock_cpu_count):
self.assertEqual(8, processutils.get_worker_count())
@mock.patch.object(multiprocessing, 'cpu_count',
side_effect=NotImplementedError())
def test_get_worker_count_cpu_count_not_implemented(self,
mock_cpu_count):
self.assertEqual(1, processutils.get_worker_count())
class ProcessExecutionErrorTest(test_base.BaseTestCase):
def test_defaults(self):
err = processutils.ProcessExecutionError()
self.assertTrue('None\n' in six.text_type(err))
self.assertTrue('code: -\n' in six.text_type(err))
def test_with_description(self):
description = 'The Narwhal Bacons at Midnight'
err = processutils.ProcessExecutionError(description=description)
self.assertTrue(description in six.text_type(err))
def test_with_exit_code(self):
exit_code = 0
err = processutils.ProcessExecutionError(exit_code=exit_code)
self.assertTrue(str(exit_code) in six.text_type(err))
def test_with_cmd(self):
cmd = 'telinit'
err = processutils.ProcessExecutionError(cmd=cmd)
self.assertTrue(cmd in six.text_type(err))
def test_with_stdout(self):
stdout = """
Lo, praise of the prowess of people-kings
of spear-armed Danes, in days long sped,
we have heard, and what honot the athelings won!
Oft Scyld the Scefing from squadroned foes,
from many a tribe, the mead-bench tore,
awing the earls. Since erse he lay
friendless, a foundling, fate repaid him:
for he waxed under welkin, in wealth he trove,
till before him the folk, both far and near,
who house by the whale-path, heard his mandate,
gabe him gits: a good king he!
To him an heir was afterward born,
a son in his halls, whom heaven sent
to favor the fol, feeling their woe
that erst they had lacked an earl for leader
so long a while; the Lord endowed him,
the Wielder of Wonder, with world's renown.
""".strip()
err = processutils.ProcessExecutionError(stdout=stdout)
print(six.text_type(err))
self.assertTrue('people-kings' in six.text_type(err))
def test_with_stderr(self):
stderr = 'Cottonian library'
err = processutils.ProcessExecutionError(stderr=stderr)
self.assertTrue(stderr in six.text_type(err))
def test_retry_on_failure(self):
fd, tmpfilename = tempfile.mkstemp()
_, tmpfilename2 = tempfile.mkstemp()
try:
fp = os.fdopen(fd, 'w+')
fp.write('''#!/bin/sh
# If stdin fails to get passed during one of the runs, make a note.
if ! grep -q foo
then
echo 'failure' > "$1"
fi
# If stdin has failed to get passed during this or a previous run, exit early.
if grep failure "$1"
then
exit 1
fi
runs="$(cat $1)"
if [ -z "$runs" ]
then
runs=0
fi
runs=$(($runs + 1))
echo $runs > "$1"
exit 1
''')
fp.close()
os.chmod(tmpfilename, 0o755)
self.assertRaises(processutils.ProcessExecutionError,
processutils.execute,
tmpfilename, tmpfilename2, attempts=10,
process_input=b'foo',
delay_on_retry=False)
fp = open(tmpfilename2, 'r')
runs = fp.read()
fp.close()
self.assertNotEqual(runs.strip(), 'failure', 'stdin did not '
'always get passed '
'correctly')
runs = int(runs.strip())
self.assertEqual(runs, 10, 'Ran %d times instead of 10.' % (runs,))
finally:
os.unlink(tmpfilename)
os.unlink(tmpfilename2)
def test_unknown_kwargs_raises_error(self):
self.assertRaises(processutils.UnknownArgumentError,
processutils.execute,
'/usr/bin/env', 'true',
this_is_not_a_valid_kwarg=True)
def test_check_exit_code_boolean(self):
processutils.execute('/usr/bin/env', 'false', check_exit_code=False)
self.assertRaises(processutils.ProcessExecutionError,
processutils.execute,
'/usr/bin/env', 'false', check_exit_code=True)
def test_check_exit_code_list(self):
processutils.execute('/usr/bin/env', 'sh', '-c', 'exit 101',
check_exit_code=(101, 102))
processutils.execute('/usr/bin/env', 'sh', '-c', 'exit 102',
check_exit_code=(101, 102))
self.assertRaises(processutils.ProcessExecutionError,
processutils.execute,
'/usr/bin/env', 'sh', '-c', 'exit 103',
check_exit_code=(101, 102))
self.assertRaises(processutils.ProcessExecutionError,
processutils.execute,
'/usr/bin/env', 'sh', '-c', 'exit 0',
check_exit_code=(101, 102))
def test_no_retry_on_success(self):
fd, tmpfilename = tempfile.mkstemp()
_, tmpfilename2 = tempfile.mkstemp()
try:
fp = os.fdopen(fd, 'w+')
fp.write("""#!/bin/sh
# If we've already run, bail out.
grep -q foo "$1" && exit 1
# Mark that we've run before.
echo foo > "$1"
# Check that stdin gets passed correctly.
grep foo
""")
fp.close()
os.chmod(tmpfilename, 0o755)
processutils.execute(tmpfilename,
tmpfilename2,
process_input=b'foo',
attempts=2)
finally:
os.unlink(tmpfilename)
os.unlink(tmpfilename2)
# This test and the one below ensures that when communicate raises
# an OSError, we do the right thing(s)
def test_exception_on_communicate_error(self):
mock = self.useFixture(mockpatch.Patch(
'subprocess.Popen.communicate',
side_effect=OSError(errno.EAGAIN, 'fake-test')))
self.assertRaises(OSError,
processutils.execute,
'/usr/bin/env',
'false',
check_exit_code=False)
self.assertEqual(1, mock.mock.call_count)
def test_retry_on_communicate_error(self):
mock = self.useFixture(mockpatch.Patch(
'subprocess.Popen.communicate',
side_effect=OSError(errno.EAGAIN, 'fake-test')))
self.assertRaises(OSError,
processutils.execute,
'/usr/bin/env',
'false',
check_exit_code=False,
attempts=5)
self.assertEqual(5, mock.mock.call_count)
def _test_and_check_logging_communicate_errors(self, log_errors=None,
attempts=None):
mock = self.useFixture(mockpatch.Patch(
'subprocess.Popen.communicate',
side_effect=OSError(errno.EAGAIN, 'fake-test')))
fixture = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
kwargs = {}
if log_errors:
kwargs.update({"log_errors": log_errors})
if attempts:
kwargs.update({"attempts": attempts})
self.assertRaises(OSError,
processutils.execute,
'/usr/bin/env',
'false',
**kwargs)
self.assertEqual(attempts if attempts else 1, mock.mock.call_count)
self.assertIn('Got an OSError', fixture.output)
self.assertIn('errno: 11', fixture.output)
self.assertIn("'/usr/bin/env false'", fixture.output)
def test_logging_on_communicate_error_1(self):
self._test_and_check_logging_communicate_errors(
log_errors=processutils.LOG_FINAL_ERROR,
attempts=None)
def test_logging_on_communicate_error_2(self):
self._test_and_check_logging_communicate_errors(
log_errors=processutils.LOG_FINAL_ERROR,
attempts=1)
def test_logging_on_communicate_error_3(self):
self._test_and_check_logging_communicate_errors(
log_errors=processutils.LOG_FINAL_ERROR,
attempts=5)
def test_logging_on_communicate_error_4(self):
self._test_and_check_logging_communicate_errors(
log_errors=processutils.LOG_ALL_ERRORS,
attempts=None)
def test_logging_on_communicate_error_5(self):
self._test_and_check_logging_communicate_errors(
log_errors=processutils.LOG_ALL_ERRORS,
attempts=1)
def test_logging_on_communicate_error_6(self):
self._test_and_check_logging_communicate_errors(
log_errors=processutils.LOG_ALL_ERRORS,
attempts=5)
def test_with_env_variables(self):
env_vars = {'SUPER_UNIQUE_VAR': 'The answer is 42'}
out, err = processutils.execute('/usr/bin/env', env_variables=env_vars)
self.assertIn(b'SUPER_UNIQUE_VAR=The answer is 42', out)
def test_exception_and_masking(self):
tmpfilename = self.create_tempfiles(
[["test_exceptions_and_masking",
TEST_EXCEPTION_AND_MASKING_SCRIPT]], ext='bash')[0]
os.chmod(tmpfilename, (stat.S_IRWXU |
stat.S_IRGRP |
stat.S_IXGRP |
stat.S_IROTH |
stat.S_IXOTH))
err = self.assertRaises(processutils.ProcessExecutionError,
processutils.execute,
tmpfilename, 'password="secret"',
'something')
self.assertEqual(38, err.exit_code)
self.assertIn('onstdout --password="***"', err.stdout)
self.assertIn('onstderr --password="***"', err.stderr)
self.assertEqual(err.cmd, ' '.join([tmpfilename,
'password="***"',
'something']))
self.assertNotIn('secret', str(err))
class ProcessExecutionErrorLoggingTest(test_base.BaseTestCase):
def setUp(self):
super(ProcessExecutionErrorLoggingTest, self).setUp()
self.tmpfilename = self.create_tempfiles(
[["process_execution_error_logging_test",
PROCESS_EXECUTION_ERROR_LOGGING_TEST]],
ext='bash')[0]
os.chmod(self.tmpfilename, (stat.S_IRWXU | stat.S_IRGRP |
stat.S_IXGRP | stat.S_IROTH |
stat.S_IXOTH))
def _test_and_check(self, log_errors=None, attempts=None):
fixture = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
kwargs = {}
if log_errors:
kwargs.update({"log_errors": log_errors})
if attempts:
kwargs.update({"attempts": attempts})
err = self.assertRaises(processutils.ProcessExecutionError,
processutils.execute,
self.tmpfilename,
**kwargs)
self.assertEqual(41, err.exit_code)
self.assertIn(self.tmpfilename, fixture.output)
def test_with_invalid_log_errors(self):
self.assertRaises(processutils.InvalidArgumentError,
processutils.execute,
self.tmpfilename,
log_errors='invalid')
def test_with_log_errors_NONE(self):
self._test_and_check(log_errors=None, attempts=None)
def test_with_log_errors_final(self):
self._test_and_check(log_errors=processutils.LOG_FINAL_ERROR,
attempts=None)
def test_with_log_errors_all(self):
self._test_and_check(log_errors=processutils.LOG_ALL_ERRORS,
attempts=None)
def test_multiattempt_with_log_errors_NONE(self):
self._test_and_check(log_errors=None, attempts=3)
def test_multiattempt_with_log_errors_final(self):
self._test_and_check(log_errors=processutils.LOG_FINAL_ERROR,
attempts=3)
def test_multiattempt_with_log_errors_all(self):
self._test_and_check(log_errors=processutils.LOG_ALL_ERRORS,
attempts=3)
def fake_execute(*cmd, **kwargs):
return 'stdout', 'stderr'
def fake_execute_raises(*cmd, **kwargs):
raise processutils.ProcessExecutionError(exit_code=42,
stdout='stdout',
stderr='stderr',
cmd=['this', 'is', 'a',
'command'])
class TryCmdTestCase(test_base.BaseTestCase):
def test_keep_warnings(self):
self.useFixture(fixtures.MonkeyPatch(
'oslo_concurrency.processutils.execute', fake_execute))
o, e = processutils.trycmd('this is a command'.split(' '))
self.assertNotEqual('', o)
self.assertNotEqual('', e)
def test_keep_warnings_from_raise(self):
self.useFixture(fixtures.MonkeyPatch(
'oslo_concurrency.processutils.execute', fake_execute_raises))
o, e = processutils.trycmd('this is a command'.split(' '),
discard_warnings=True)
self.assertIsNotNone(o)
self.assertNotEqual('', e)
def test_discard_warnings(self):
self.useFixture(fixtures.MonkeyPatch(
'oslo_concurrency.processutils.execute', fake_execute))
o, e = processutils.trycmd('this is a command'.split(' '),
discard_warnings=True)
self.assertIsNotNone(o)
self.assertEqual('', e)
class FakeSshChannel(object):
def __init__(self, rc):
self.rc = rc
def recv_exit_status(self):
return self.rc
class FakeSshStream(six.StringIO):
def setup_channel(self, rc):
self.channel = FakeSshChannel(rc)
class FakeSshConnection(object):
def __init__(self, rc):
self.rc = rc
def exec_command(self, cmd):
stdout = FakeSshStream('stdout')
stdout.setup_channel(self.rc)
return (six.StringIO(),
stdout,
six.StringIO('stderr'))
class SshExecuteTestCase(test_base.BaseTestCase):
def test_invalid_addl_env(self):
self.assertRaises(processutils.InvalidArgumentError,
processutils.ssh_execute,
None, 'ls', addl_env='important')
def test_invalid_process_input(self):
self.assertRaises(processutils.InvalidArgumentError,
processutils.ssh_execute,
None, 'ls', process_input='important')
def test_works(self):
o, e = processutils.ssh_execute(FakeSshConnection(0), 'ls')
self.assertEqual('stdout', o)
self.assertEqual('stderr', e)
def test_fails(self):
self.assertRaises(processutils.ProcessExecutionError,
processutils.ssh_execute, FakeSshConnection(1), 'ls')
def _test_compromising_ssh(self, rc, check):
fixture = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
fake_stdin = six.StringIO()
fake_stdout = mock.Mock()
fake_stdout.channel.recv_exit_status.return_value = rc
fake_stdout.read.return_value = 'password="secret"'
fake_stderr = six.StringIO('password="foobar"')
command = 'ls --password="bar"'
connection = mock.Mock()
connection.exec_command.return_value = (fake_stdin, fake_stdout,
fake_stderr)
if check and rc != -1 and rc != 0:
err = self.assertRaises(processutils.ProcessExecutionError,
processutils.ssh_execute,
connection, command,
check_exit_code=check)
self.assertEqual(rc, err.exit_code)
self.assertEqual(err.stdout, 'password="***"')
self.assertEqual(err.stderr, 'password="***"')
self.assertEqual(err.cmd, 'ls --password="***"')
self.assertNotIn('secret', str(err))
self.assertNotIn('foobar', str(err))
else:
o, e = processutils.ssh_execute(connection, command,
check_exit_code=check)
self.assertEqual('password="***"', o)
self.assertEqual('password="***"', e)
self.assertIn('password="***"', fixture.output)
self.assertNotIn('bar', fixture.output)
def test_compromising_ssh1(self):
self._test_compromising_ssh(rc=-1, check=True)
def test_compromising_ssh2(self):
self._test_compromising_ssh(rc=0, check=True)
def test_compromising_ssh3(self):
self._test_compromising_ssh(rc=1, check=True)
def test_compromising_ssh4(self):
self._test_compromising_ssh(rc=1, check=False)
def test_compromising_ssh5(self):
self._test_compromising_ssh(rc=0, check=False)
def test_compromising_ssh6(self):
self._test_compromising_ssh(rc=-1, check=False)

29
tests/test_warning.py Normal file
View File

@@ -0,0 +1,29 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import imp
import mock
from oslotest import base as test_base
class DeprecationWarningTest(test_base.BaseTestCase):
@mock.patch('warnings.warn')
def test_warning(self, mock_warn):
import oslo.concurrency
imp.reload(oslo.concurrency)
self.assertTrue(mock_warn.called)
args = mock_warn.call_args
self.assertIn('oslo_concurrency', args[0][0])
self.assertIn('deprecated', args[0][0])
self.assertTrue(issubclass(args[0][1], DeprecationWarning))

View File

@@ -1,6 +1,6 @@
[tox]
minversion = 1.6
envlist = py26,py27,py33,py34,pypy,pep8
envlist = py33,py34,py26,py27,pep8
# NOTE(dhellmann): We cannot set skipdist=True
# for oslo libraries because of the namespace package.
#skipsdist = True
@@ -23,13 +23,13 @@ commands =
deps = -r{toxinidir}/requirements-py3.txt
-r{toxinidir}/test-requirements.txt
commands =
lockutils-wrapper python -m testtools.run tests.unit.test_lockutils
lockutils-wrapper python setup.py testr --slowest --testr-args='{posargs}'
[testenv:py34]
deps = -r{toxinidir}/requirements-py3.txt
-r{toxinidir}/test-requirements.txt
commands =
lockutils-wrapper python -m testtools.run tests.unit.test_lockutils
lockutils-wrapper python setup.py testr --slowest --testr-args='{posargs}'
[testenv:pep8]
commands = flake8
@@ -54,4 +54,4 @@ exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build
[hacking]
import_exceptions =
oslo.concurrency._i18n
oslo_concurrency._i18n