Move out of the oslo namespace package

Move the public API out of oslo.concurrency to oslo_concurrency. Retain
the ability to import from the old namespace package for backwards
compatibility for this release cycle.

bp/drop-namespace-packages

Change-Id: I20d1647b1c3ef8cab3b69eccfe168eeb01703b72
This commit is contained in:
Doug Hellmann 2014-11-14 14:26:53 -05:00
parent 58de317f84
commit bca4a0d827
32 changed files with 1169 additions and 23 deletions

1
.gitignore vendored
View File

@ -41,6 +41,7 @@ output/*/index.html
# Sphinx # Sphinx
doc/build doc/build
doc/source/api
# pbr generates these # pbr generates these
AUTHORS AUTHORS

View File

@ -2,6 +2,6 @@
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \ test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \
OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \ OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \
OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \ OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \
${PYTHON:-python} -m subunit.run discover -t ./ ./tests $LISTOPT $IDOPTION ${PYTHON:-python} -m subunit.run discover -t ./ . $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE test_id_option=--load-list $IDFILE
test_list_option=--list test_list_option=--list

View File

@ -36,8 +36,9 @@ extensions = [
# files. # files.
exclude_patterns = [ exclude_patterns = [
'api/tests.*', # avoid of docs generation from tests 'api/tests.*', # avoid of docs generation from tests
'api/oslo.concurrency.openstack.common.*', # skip common modules 'api/oslo.concurrency.*', # skip deprecated import from namespace package
'api/oslo.concurrency._*', # skip private modules 'api/oslo_concurrency.openstack.common.*', # skip common modules
'api/oslo_concurrency._*', # skip private modules
] ]
# Prune the excluded patterns from the autoindex # Prune the excluded patterns from the autoindex

View File

@ -0,0 +1,29 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import warnings
from oslo_concurrency import lockutils # noqa
from oslo_concurrency import processutils # noqa
def deprecated():
new_name = __name__.replace('.', '_')
warnings.warn(
('The oslo namespace package is deprecated. Please use %s instead.' %
new_name),
DeprecationWarning,
stacklevel=1,
)
deprecated()

View File

@ -0,0 +1,13 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_concurrency.fixture import lockutils # noqa

View File

@ -16,7 +16,7 @@
from oslo import i18n from oslo import i18n
_translators = i18n.TranslatorFactory(domain='oslo.concurrency') _translators = i18n.TranslatorFactory(domain='oslo_concurrency')
# The primary translation function using the well-known name "_" # The primary translation function using the well-known name "_"
_ = _translators.primary _ = _translators.primary

View File

@ -15,7 +15,7 @@
import fixtures import fixtures
from oslo.concurrency import lockutils from oslo_concurrency import lockutils
class LockFixture(fixtures.Fixture): class LockFixture(fixtures.Fixture):

View File

@ -30,8 +30,8 @@ from oslo.config import cfg
from oslo.config import cfgfilter from oslo.config import cfgfilter
import six import six
from oslo.concurrency._i18n import _, _LE, _LI from oslo_concurrency._i18n import _, _LE, _LI
from oslo.concurrency.openstack.common import fileutils from oslo_concurrency.openstack.common import fileutils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)

View File

@ -20,7 +20,7 @@ __all__ = [
import copy import copy
from oslo.concurrency import lockutils from oslo_concurrency import lockutils
def list_opts(): def list_opts():
@ -34,7 +34,7 @@ def list_opts():
registered. A group name of None corresponds to the [DEFAULT] group in registered. A group name of None corresponds to the [DEFAULT] group in
config files. config files.
This function is also discoverable via the 'oslo.concurrency' entry point This function is also discoverable via the 'oslo_concurrency' entry point
under the 'oslo.config.opts' namespace. under the 'oslo.config.opts' namespace.
The purpose of this is to allow tools like the Oslo sample config file The purpose of this is to allow tools like the Oslo sample config file

View File

@ -29,7 +29,7 @@ from oslo.utils import importutils
from oslo.utils import strutils from oslo.utils import strutils
import six import six
from oslo.concurrency._i18n import _ from oslo_concurrency._i18n import _
# NOTE(bnemec): eventlet doesn't monkey patch subprocess, so we need to # NOTE(bnemec): eventlet doesn't monkey patch subprocess, so we need to

View File

View File

View File

@ -28,9 +28,9 @@ from oslo.config import cfg
from oslotest import base as test_base from oslotest import base as test_base
import six import six
from oslo.concurrency.fixture import lockutils as fixtures
from oslo.concurrency import lockutils
from oslo.config import fixture as config from oslo.config import fixture as config
from oslo_concurrency.fixture import lockutils as fixtures
from oslo_concurrency import lockutils
class LockTestCase(test_base.BaseTestCase): class LockTestCase(test_base.BaseTestCase):
@ -522,7 +522,7 @@ class LockutilsModuleTestCase(test_base.BaseTestCase):
self.assertEqual(retval, 1) self.assertEqual(retval, 1)
def test_direct_call_explodes(self): def test_direct_call_explodes(self):
cmd = [sys.executable, '-m', 'oslo.concurrency.lockutils'] cmd = [sys.executable, '-m', 'oslo_concurrency.lockutils']
with open(os.devnull, 'w') as devnull: with open(os.devnull, 'w') as devnull:
retval = subprocess.call(cmd, stderr=devnull) retval = subprocess.call(cmd, stderr=devnull)
# 1 for Python 2.7 and 3.x, 255 for 2.6 # 1 for Python 2.7 and 3.x, 255 for 2.6

View File

@ -20,7 +20,7 @@ import eventlet
from eventlet import greenpool from eventlet import greenpool
from oslotest import base as test_base from oslotest import base as test_base
from oslo.concurrency import lockutils from oslo_concurrency import lockutils
class TestFileLocks(test_base.BaseTestCase): class TestFileLocks(test_base.BaseTestCase):

View File

@ -27,7 +27,7 @@ import mock
from oslotest import base as test_base from oslotest import base as test_base
import six import six
from oslo.concurrency import processutils from oslo_concurrency import processutils
from oslotest import mockpatch from oslotest import mockpatch
PROCESS_EXECUTION_ERROR_LOGGING_TEST = """#!/bin/bash PROCESS_EXECUTION_ERROR_LOGGING_TEST = """#!/bin/bash
exit 41""" exit 41"""
@ -396,14 +396,14 @@ def fake_execute_raises(*cmd, **kwargs):
class TryCmdTestCase(test_base.BaseTestCase): class TryCmdTestCase(test_base.BaseTestCase):
def test_keep_warnings(self): def test_keep_warnings(self):
self.useFixture(fixtures.MonkeyPatch( self.useFixture(fixtures.MonkeyPatch(
'oslo.concurrency.processutils.execute', fake_execute)) 'oslo_concurrency.processutils.execute', fake_execute))
o, e = processutils.trycmd('this is a command'.split(' ')) o, e = processutils.trycmd('this is a command'.split(' '))
self.assertNotEqual('', o) self.assertNotEqual('', o)
self.assertNotEqual('', e) self.assertNotEqual('', e)
def test_keep_warnings_from_raise(self): def test_keep_warnings_from_raise(self):
self.useFixture(fixtures.MonkeyPatch( self.useFixture(fixtures.MonkeyPatch(
'oslo.concurrency.processutils.execute', fake_execute_raises)) 'oslo_concurrency.processutils.execute', fake_execute_raises))
o, e = processutils.trycmd('this is a command'.split(' '), o, e = processutils.trycmd('this is a command'.split(' '),
discard_warnings=True) discard_warnings=True)
self.assertIsNotNone(o) self.assertIsNotNone(o)
@ -411,7 +411,7 @@ class TryCmdTestCase(test_base.BaseTestCase):
def test_discard_warnings(self): def test_discard_warnings(self):
self.useFixture(fixtures.MonkeyPatch( self.useFixture(fixtures.MonkeyPatch(
'oslo.concurrency.processutils.execute', fake_execute)) 'oslo_concurrency.processutils.execute', fake_execute))
o, e = processutils.trycmd('this is a command'.split(' '), o, e = processutils.trycmd('this is a command'.split(' '),
discard_warnings=True) discard_warnings=True)
self.assertIsNotNone(o) self.assertIsNotNone(o)

View File

@ -22,15 +22,18 @@ classifier =
[files] [files]
packages = packages =
oslo oslo
oslo.concurrency oslo.concurrency
oslo.concurrency.fixture
oslo_concurrency
oslo_concurrency.fixture
namespace_packages = namespace_packages =
oslo oslo
[entry_points] [entry_points]
oslo.config.opts = oslo.config.opts =
oslo.concurrency = oslo.concurrency.opts:list_opts oslo.concurrency = oslo_concurrency.opts:list_opts
console_scripts = console_scripts =
lockutils-wrapper = oslo.concurrency.lockutils:main lockutils-wrapper = oslo_concurrency.lockutils:main
[build_sphinx] [build_sphinx]
source-dir = doc/source source-dir = doc/source

551
tests/test_lockutils.py Normal file
View File

@ -0,0 +1,551 @@
# Copyright 2011 Justin Santa Barbara
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import errno
import fcntl
import multiprocessing
import os
import shutil
import signal
import subprocess
import sys
import tempfile
import threading
import time
from oslo.config import cfg
from oslotest import base as test_base
import six
from oslo.concurrency.fixture import lockutils as fixtures # noqa
from oslo.concurrency import lockutils # noqa
from oslo.config import fixture as config
class LockTestCase(test_base.BaseTestCase):
def setUp(self):
super(LockTestCase, self).setUp()
self.config = self.useFixture(config.Config(lockutils.CONF)).config
def test_synchronized_wrapped_function_metadata(self):
@lockutils.synchronized('whatever', 'test-')
def foo():
"""Bar."""
pass
self.assertEqual(foo.__doc__, 'Bar.', "Wrapped function's docstring "
"got lost")
self.assertEqual(foo.__name__, 'foo', "Wrapped function's name "
"got mangled")
def test_lock_acquire_release_file_lock(self):
lock_dir = tempfile.mkdtemp()
lock_file = os.path.join(lock_dir, 'lock')
lock = lockutils._FcntlLock(lock_file)
def try_lock():
try:
my_lock = lockutils._FcntlLock(lock_file)
my_lock.lockfile = open(lock_file, 'w')
my_lock.trylock()
my_lock.unlock()
os._exit(1)
except IOError:
os._exit(0)
def attempt_acquire(count):
children = []
for i in range(count):
child = multiprocessing.Process(target=try_lock)
child.start()
children.append(child)
exit_codes = []
for child in children:
child.join()
exit_codes.append(child.exitcode)
return sum(exit_codes)
self.assertTrue(lock.acquire())
try:
acquired_children = attempt_acquire(10)
self.assertEqual(0, acquired_children)
finally:
lock.release()
try:
acquired_children = attempt_acquire(5)
self.assertNotEqual(0, acquired_children)
finally:
try:
shutil.rmtree(lock_dir)
except IOError:
pass
def test_lock_internally(self):
"""We can lock across multiple threads."""
saved_sem_num = len(lockutils._semaphores)
seen_threads = list()
def f(_id):
with lockutils.lock('testlock2', 'test-', external=False):
for x in range(10):
seen_threads.append(_id)
threads = []
for i in range(10):
thread = threading.Thread(target=f, args=(i,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
self.assertEqual(len(seen_threads), 100)
# Looking at the seen threads, split it into chunks of 10, and verify
# that the last 9 match the first in each chunk.
for i in range(10):
for j in range(9):
self.assertEqual(seen_threads[i * 10],
seen_threads[i * 10 + 1 + j])
self.assertEqual(saved_sem_num, len(lockutils._semaphores),
"Semaphore leak detected")
def test_nested_synchronized_external_works(self):
"""We can nest external syncs."""
tempdir = tempfile.mkdtemp()
try:
self.config(lock_path=tempdir, group='oslo_concurrency')
sentinel = object()
@lockutils.synchronized('testlock1', 'test-', external=True)
def outer_lock():
@lockutils.synchronized('testlock2', 'test-', external=True)
def inner_lock():
return sentinel
return inner_lock()
self.assertEqual(sentinel, outer_lock())
finally:
if os.path.exists(tempdir):
shutil.rmtree(tempdir)
def _do_test_lock_externally(self):
"""We can lock across multiple processes."""
def lock_files(handles_dir):
with lockutils.lock('external', 'test-', external=True):
# Open some files we can use for locking
handles = []
for n in range(50):
path = os.path.join(handles_dir, ('file-%s' % n))
handles.append(open(path, 'w'))
# Loop over all the handles and try locking the file
# without blocking, keep a count of how many files we
# were able to lock and then unlock. If the lock fails
# we get an IOError and bail out with bad exit code
count = 0
for handle in handles:
try:
fcntl.flock(handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
count += 1
fcntl.flock(handle, fcntl.LOCK_UN)
except IOError:
os._exit(2)
finally:
handle.close()
# Check if we were able to open all files
self.assertEqual(50, count)
handles_dir = tempfile.mkdtemp()
try:
children = []
for n in range(50):
pid = os.fork()
if pid:
children.append(pid)
else:
try:
lock_files(handles_dir)
finally:
os._exit(0)
for child in children:
(pid, status) = os.waitpid(child, 0)
if pid:
self.assertEqual(0, status)
finally:
if os.path.exists(handles_dir):
shutil.rmtree(handles_dir, ignore_errors=True)
def test_lock_externally(self):
lock_dir = tempfile.mkdtemp()
self.config(lock_path=lock_dir, group='oslo_concurrency')
try:
self._do_test_lock_externally()
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
def test_lock_externally_lock_dir_not_exist(self):
lock_dir = tempfile.mkdtemp()
os.rmdir(lock_dir)
self.config(lock_path=lock_dir, group='oslo_concurrency')
try:
self._do_test_lock_externally()
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
def test_synchronized_with_prefix(self):
lock_name = 'mylock'
lock_pfix = 'mypfix-'
foo = lockutils.synchronized_with_prefix(lock_pfix)
@foo(lock_name, external=True)
def bar(dirpath, pfix, name):
return True
lock_dir = tempfile.mkdtemp()
self.config(lock_path=lock_dir, group='oslo_concurrency')
self.assertTrue(bar(lock_dir, lock_pfix, lock_name))
def test_synchronized_without_prefix(self):
lock_dir = tempfile.mkdtemp()
self.config(lock_path=lock_dir, group='oslo_concurrency')
@lockutils.synchronized('lock', external=True)
def test_without_prefix():
# We can't check much
pass
try:
test_without_prefix()
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
def test_synchronized_prefix_without_hypen(self):
lock_dir = tempfile.mkdtemp()
self.config(lock_path=lock_dir, group='oslo_concurrency')
@lockutils.synchronized('lock', 'hypen', True)
def test_without_hypen():
# We can't check much
pass
try:
test_without_hypen()
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
def test_contextlock(self):
lock_dir = tempfile.mkdtemp()
self.config(lock_path=lock_dir, group='oslo_concurrency')
try:
# Note(flaper87): Lock is not external, which means
# a semaphore will be yielded
with lockutils.lock("test") as sem:
if six.PY2:
self.assertTrue(isinstance(sem, threading._Semaphore))
else:
self.assertTrue(isinstance(sem, threading.Semaphore))
# NOTE(flaper87): Lock is external so an InterProcessLock
# will be yielded.
with lockutils.lock("test2", external=True) as lock:
self.assertTrue(lock.exists())
with lockutils.lock("test1",
external=True) as lock1:
self.assertTrue(isinstance(lock1,
lockutils.InterProcessLock))
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
def test_contextlock_unlocks(self):
lock_dir = tempfile.mkdtemp()
self.config(lock_path=lock_dir, group='oslo_concurrency')
sem = None
try:
with lockutils.lock("test") as sem:
if six.PY2:
self.assertTrue(isinstance(sem, threading._Semaphore))
else:
self.assertTrue(isinstance(sem, threading.Semaphore))
with lockutils.lock("test2", external=True) as lock:
self.assertTrue(lock.exists())
# NOTE(flaper87): Lock should be free
with lockutils.lock("test2", external=True) as lock:
self.assertTrue(lock.exists())
# NOTE(flaper87): Lock should be free
# but semaphore should already exist.
with lockutils.lock("test") as sem2:
self.assertEqual(sem, sem2)
finally:
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
def _test_remove_lock_external_file(self, lock_dir, use_external=False):
lock_name = 'mylock'
lock_pfix = 'mypfix-remove-lock-test-'
if use_external:
lock_path = lock_dir
else:
lock_path = None
lockutils.remove_external_lock_file(lock_name, lock_pfix, lock_path)
for ent in os.listdir(lock_dir):
self.assertRaises(OSError, ent.startswith, lock_pfix)
if os.path.exists(lock_dir):
shutil.rmtree(lock_dir, ignore_errors=True)
def test_remove_lock_external_file(self):
lock_dir = tempfile.mkdtemp()
self.config(lock_path=lock_dir, group='oslo_concurrency')
self._test_remove_lock_external_file(lock_dir)
def test_remove_lock_external_file_lock_path(self):
lock_dir = tempfile.mkdtemp()
self._test_remove_lock_external_file(lock_dir,
use_external=True)
def test_no_slash_in_b64(self):
# base64(sha1(foobar)) has a slash in it
with lockutils.lock("foobar"):
pass
def test_deprecated_names(self):
paths = self.create_tempfiles([['fake.conf', '\n'.join([
'[DEFAULT]',
'lock_path=foo',
'disable_process_locking=True'])
]])
conf = cfg.ConfigOpts()
conf(['--config-file', paths[0]])
conf.register_opts(lockutils._opts, 'oslo_concurrency')
self.assertEqual(conf.oslo_concurrency.lock_path, 'foo')
self.assertTrue(conf.oslo_concurrency.disable_process_locking)
class BrokenLock(lockutils._FileLock):
def __init__(self, name, errno_code):
super(BrokenLock, self).__init__(name)
self.errno_code = errno_code
def unlock(self):
pass
def trylock(self):
err = IOError()
err.errno = self.errno_code
raise err
class FileBasedLockingTestCase(test_base.BaseTestCase):
def setUp(self):
super(FileBasedLockingTestCase, self).setUp()
self.lock_dir = tempfile.mkdtemp()
def test_lock_file_exists(self):
lock_file = os.path.join(self.lock_dir, 'lock-file')
@lockutils.synchronized('lock-file', external=True,
lock_path=self.lock_dir)
def foo():
self.assertTrue(os.path.exists(lock_file))
foo()
def test_bad_acquire(self):
lock_file = os.path.join(self.lock_dir, 'lock')
lock = BrokenLock(lock_file, errno.EBUSY)
self.assertRaises(threading.ThreadError, lock.acquire)
def test_interprocess_lock(self):
lock_file = os.path.join(self.lock_dir, 'processlock')
pid = os.fork()
if pid:
# Make sure the child grabs the lock first
start = time.time()
while not os.path.exists(lock_file):
if time.time() - start > 5:
self.fail('Timed out waiting for child to grab lock')
time.sleep(0)
lock1 = lockutils.InterProcessLock('foo')
lock1.lockfile = open(lock_file, 'w')
# NOTE(bnemec): There is a brief window between when the lock file
# is created and when it actually becomes locked. If we happen to
# context switch in that window we may succeed in locking the
# file. Keep retrying until we either get the expected exception
# or timeout waiting.
while time.time() - start < 5:
try:
lock1.trylock()
lock1.unlock()
time.sleep(0)
except IOError:
# This is what we expect to happen
break
else:
self.fail('Never caught expected lock exception')
# We don't need to wait for the full sleep in the child here
os.kill(pid, signal.SIGKILL)
else:
try:
lock2 = lockutils.InterProcessLock('foo')
lock2.lockfile = open(lock_file, 'w')
have_lock = False
while not have_lock:
try:
lock2.trylock()
have_lock = True
except IOError:
pass
finally:
# NOTE(bnemec): This is racy, but I don't want to add any
# synchronization primitives that might mask a problem
# with the one we're trying to test here.
time.sleep(.5)
os._exit(0)
def test_interthread_external_lock(self):
call_list = []
@lockutils.synchronized('foo', external=True, lock_path=self.lock_dir)
def foo(param):
"""Simulate a long-running threaded operation."""
call_list.append(param)
# NOTE(bnemec): This is racy, but I don't want to add any
# synchronization primitives that might mask a problem
# with the one we're trying to test here.
time.sleep(.5)
call_list.append(param)
def other(param):
foo(param)
thread = threading.Thread(target=other, args=('other',))
thread.start()
# Make sure the other thread grabs the lock
# NOTE(bnemec): File locks do not actually work between threads, so
# this test is verifying that the local semaphore is still enforcing
# external locks in that case. This means this test does not have
# the same race problem as the process test above because when the
# file is created the semaphore has already been grabbed.
start = time.time()
while not os.path.exists(os.path.join(self.lock_dir, 'foo')):
if time.time() - start > 5:
self.fail('Timed out waiting for thread to grab lock')
time.sleep(0)
thread1 = threading.Thread(target=other, args=('main',))
thread1.start()
thread1.join()
thread.join()
self.assertEqual(call_list, ['other', 'other', 'main', 'main'])
def test_non_destructive(self):
lock_file = os.path.join(self.lock_dir, 'not-destroyed')
with open(lock_file, 'w') as f:
f.write('test')
with lockutils.lock('not-destroyed', external=True,
lock_path=self.lock_dir):
with open(lock_file) as f:
self.assertEqual(f.read(), 'test')
class LockutilsModuleTestCase(test_base.BaseTestCase):
def setUp(self):
super(LockutilsModuleTestCase, self).setUp()
self.old_env = os.environ.get('OSLO_LOCK_PATH')
if self.old_env is not None:
del os.environ['OSLO_LOCK_PATH']
def tearDown(self):
if self.old_env is not None:
os.environ['OSLO_LOCK_PATH'] = self.old_env
super(LockutilsModuleTestCase, self).tearDown()
def test_main(self):
script = '\n'.join([
'import os',
'lock_path = os.environ.get("OSLO_LOCK_PATH")',
'assert lock_path is not None',
'assert os.path.isdir(lock_path)',
])
argv = ['', sys.executable, '-c', script]
retval = lockutils._lock_wrapper(argv)
self.assertEqual(retval, 0, "Bad OSLO_LOCK_PATH has been set")
def test_return_value_maintained(self):
script = '\n'.join([
'import sys',
'sys.exit(1)',
])
argv = ['', sys.executable, '-c', script]
retval = lockutils._lock_wrapper(argv)
self.assertEqual(retval, 1)
def test_direct_call_explodes(self):
cmd = [sys.executable, '-m', 'oslo_concurrency.lockutils']
with open(os.devnull, 'w') as devnull:
retval = subprocess.call(cmd, stderr=devnull)
# 1 for Python 2.7 and 3.x, 255 for 2.6
self.assertIn(retval, [1, 255])
class TestLockFixture(test_base.BaseTestCase):
def setUp(self):
super(TestLockFixture, self).setUp()
self.config = self.useFixture(config.Config(lockutils.CONF)).config
self.tempdir = tempfile.mkdtemp()
def _check_in_lock(self):
self.assertTrue(self.lock.exists())
def tearDown(self):
self._check_in_lock()
super(TestLockFixture, self).tearDown()
def test_lock_fixture(self):
# Setup lock fixture to test that teardown is inside the lock
self.config(lock_path=self.tempdir, group='oslo_concurrency')
fixture = fixtures.LockFixture('test-lock')
self.useFixture(fixture)
self.lock = fixture.lock

519
tests/test_processutils.py Normal file
View File

@ -0,0 +1,519 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import errno
import logging
import multiprocessing
import os
import stat
import tempfile
import fixtures
import mock
from oslotest import base as test_base
from oslotest import mockpatch
import six
from oslo.concurrency import processutils # noqa
PROCESS_EXECUTION_ERROR_LOGGING_TEST = """#!/bin/bash
exit 41"""
TEST_EXCEPTION_AND_MASKING_SCRIPT = """#!/bin/bash
# This is to test stdout and stderr
# and the command returned in an exception
# when a non-zero exit code is returned
echo onstdout --password='"secret"'
echo onstderr --password='"secret"' 1>&2
exit 38"""
class UtilsTest(test_base.BaseTestCase):
# NOTE(jkoelker) Moar tests from nova need to be ported. But they
# need to be mock'd out. Currently they require actually
# running code.
def test_execute_unknown_kwargs(self):
self.assertRaises(processutils.UnknownArgumentError,
processutils.execute,
hozer=True)
@mock.patch.object(multiprocessing, 'cpu_count', return_value=8)
def test_get_worker_count(self, mock_cpu_count):
self.assertEqual(8, processutils.get_worker_count())
@mock.patch.object(multiprocessing, 'cpu_count',
side_effect=NotImplementedError())
def test_get_worker_count_cpu_count_not_implemented(self,
mock_cpu_count):
self.assertEqual(1, processutils.get_worker_count())
class ProcessExecutionErrorTest(test_base.BaseTestCase):
def test_defaults(self):
err = processutils.ProcessExecutionError()
self.assertTrue('None\n' in six.text_type(err))
self.assertTrue('code: -\n' in six.text_type(err))
def test_with_description(self):
description = 'The Narwhal Bacons at Midnight'
err = processutils.ProcessExecutionError(description=description)
self.assertTrue(description in six.text_type(err))
def test_with_exit_code(self):
exit_code = 0
err = processutils.ProcessExecutionError(exit_code=exit_code)
self.assertTrue(str(exit_code) in six.text_type(err))
def test_with_cmd(self):
cmd = 'telinit'
err = processutils.ProcessExecutionError(cmd=cmd)
self.assertTrue(cmd in six.text_type(err))
def test_with_stdout(self):
stdout = """
Lo, praise of the prowess of people-kings
of spear-armed Danes, in days long sped,
we have heard, and what honot the athelings won!
Oft Scyld the Scefing from squadroned foes,
from many a tribe, the mead-bench tore,
awing the earls. Since erse he lay
friendless, a foundling, fate repaid him:
for he waxed under welkin, in wealth he trove,
till before him the folk, both far and near,
who house by the whale-path, heard his mandate,
gabe him gits: a good king he!
To him an heir was afterward born,
a son in his halls, whom heaven sent
to favor the fol, feeling their woe
that erst they had lacked an earl for leader
so long a while; the Lord endowed him,
the Wielder of Wonder, with world's renown.
""".strip()
err = processutils.ProcessExecutionError(stdout=stdout)
print(six.text_type(err))
self.assertTrue('people-kings' in six.text_type(err))
def test_with_stderr(self):
stderr = 'Cottonian library'
err = processutils.ProcessExecutionError(stderr=stderr)
self.assertTrue(stderr in six.text_type(err))
def test_retry_on_failure(self):
fd, tmpfilename = tempfile.mkstemp()
_, tmpfilename2 = tempfile.mkstemp()
try:
fp = os.fdopen(fd, 'w+')
fp.write('''#!/bin/sh
# If stdin fails to get passed during one of the runs, make a note.
if ! grep -q foo
then
echo 'failure' > "$1"
fi
# If stdin has failed to get passed during this or a previous run, exit early.
if grep failure "$1"
then
exit 1
fi
runs="$(cat $1)"
if [ -z "$runs" ]
then
runs=0
fi
runs=$(($runs + 1))
echo $runs > "$1"
exit 1
''')
fp.close()
os.chmod(tmpfilename, 0o755)
self.assertRaises(processutils.ProcessExecutionError,
processutils.execute,
tmpfilename, tmpfilename2, attempts=10,
process_input=b'foo',
delay_on_retry=False)
fp = open(tmpfilename2, 'r')
runs = fp.read()
fp.close()
self.assertNotEqual(runs.strip(), 'failure', 'stdin did not '
'always get passed '
'correctly')
runs = int(runs.strip())
self.assertEqual(runs, 10, 'Ran %d times instead of 10.' % (runs,))
finally:
os.unlink(tmpfilename)
os.unlink(tmpfilename2)
def test_unknown_kwargs_raises_error(self):
self.assertRaises(processutils.UnknownArgumentError,
processutils.execute,
'/usr/bin/env', 'true',
this_is_not_a_valid_kwarg=True)
def test_check_exit_code_boolean(self):
processutils.execute('/usr/bin/env', 'false', check_exit_code=False)
self.assertRaises(processutils.ProcessExecutionError,
processutils.execute,
'/usr/bin/env', 'false', check_exit_code=True)
def test_check_exit_code_list(self):
processutils.execute('/usr/bin/env', 'sh', '-c', 'exit 101',
check_exit_code=(101, 102))
processutils.execute('/usr/bin/env', 'sh', '-c', 'exit 102',
check_exit_code=(101, 102))
self.assertRaises(processutils.ProcessExecutionError,
processutils.execute,
'/usr/bin/env', 'sh', '-c', 'exit 103',
check_exit_code=(101, 102))
self.assertRaises(processutils.ProcessExecutionError,
processutils.execute,
'/usr/bin/env', 'sh', '-c', 'exit 0',
check_exit_code=(101, 102))
def test_no_retry_on_success(self):
fd, tmpfilename = tempfile.mkstemp()
_, tmpfilename2 = tempfile.mkstemp()
try:
fp = os.fdopen(fd, 'w+')
fp.write("""#!/bin/sh
# If we've already run, bail out.
grep -q foo "$1" && exit 1
# Mark that we've run before.
echo foo > "$1"
# Check that stdin gets passed correctly.
grep foo
""")
fp.close()
os.chmod(tmpfilename, 0o755)
processutils.execute(tmpfilename,
tmpfilename2,
process_input=b'foo',
attempts=2)
finally:
os.unlink(tmpfilename)
os.unlink(tmpfilename2)
# This test and the one below ensures that when communicate raises
# an OSError, we do the right thing(s)
def test_exception_on_communicate_error(self):
mock = self.useFixture(mockpatch.Patch(
'subprocess.Popen.communicate',
side_effect=OSError(errno.EAGAIN, 'fake-test')))
self.assertRaises(OSError,
processutils.execute,
'/usr/bin/env',
'false',
check_exit_code=False)
self.assertEqual(1, mock.mock.call_count)
def test_retry_on_communicate_error(self):
mock = self.useFixture(mockpatch.Patch(
'subprocess.Popen.communicate',
side_effect=OSError(errno.EAGAIN, 'fake-test')))
self.assertRaises(OSError,
processutils.execute,
'/usr/bin/env',
'false',
check_exit_code=False,
attempts=5)
self.assertEqual(5, mock.mock.call_count)
def _test_and_check_logging_communicate_errors(self, log_errors=None,
attempts=None):
mock = self.useFixture(mockpatch.Patch(
'subprocess.Popen.communicate',
side_effect=OSError(errno.EAGAIN, 'fake-test')))
fixture = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
kwargs = {}
if log_errors:
kwargs.update({"log_errors": log_errors})
if attempts:
kwargs.update({"attempts": attempts})
self.assertRaises(OSError,
processutils.execute,
'/usr/bin/env',
'false',
**kwargs)
self.assertEqual(attempts if attempts else 1, mock.mock.call_count)
self.assertIn('Got an OSError', fixture.output)
self.assertIn('errno: 11', fixture.output)
self.assertIn("'/usr/bin/env false'", fixture.output)
def test_logging_on_communicate_error_1(self):
self._test_and_check_logging_communicate_errors(
log_errors=processutils.LOG_FINAL_ERROR,
attempts=None)
def test_logging_on_communicate_error_2(self):
self._test_and_check_logging_communicate_errors(
log_errors=processutils.LOG_FINAL_ERROR,
attempts=1)
def test_logging_on_communicate_error_3(self):
self._test_and_check_logging_communicate_errors(
log_errors=processutils.LOG_FINAL_ERROR,
attempts=5)
def test_logging_on_communicate_error_4(self):
self._test_and_check_logging_communicate_errors(
log_errors=processutils.LOG_ALL_ERRORS,
attempts=None)
def test_logging_on_communicate_error_5(self):
self._test_and_check_logging_communicate_errors(
log_errors=processutils.LOG_ALL_ERRORS,
attempts=1)
def test_logging_on_communicate_error_6(self):
self._test_and_check_logging_communicate_errors(
log_errors=processutils.LOG_ALL_ERRORS,
attempts=5)
def test_with_env_variables(self):
env_vars = {'SUPER_UNIQUE_VAR': 'The answer is 42'}
out, err = processutils.execute('/usr/bin/env', env_variables=env_vars)
self.assertIn(b'SUPER_UNIQUE_VAR=The answer is 42', out)
def test_exception_and_masking(self):
tmpfilename = self.create_tempfiles(
[["test_exceptions_and_masking",
TEST_EXCEPTION_AND_MASKING_SCRIPT]], ext='bash')[0]
os.chmod(tmpfilename, (stat.S_IRWXU |
stat.S_IRGRP |
stat.S_IXGRP |
stat.S_IROTH |
stat.S_IXOTH))
err = self.assertRaises(processutils.ProcessExecutionError,
processutils.execute,
tmpfilename, 'password="secret"',
'something')
self.assertEqual(38, err.exit_code)
self.assertIn('onstdout --password="***"', err.stdout)
self.assertIn('onstderr --password="***"', err.stderr)
self.assertEqual(err.cmd, ' '.join([tmpfilename,
'password="***"',
'something']))
self.assertNotIn('secret', str(err))
class ProcessExecutionErrorLoggingTest(test_base.BaseTestCase):
def setUp(self):
super(ProcessExecutionErrorLoggingTest, self).setUp()
self.tmpfilename = self.create_tempfiles(
[["process_execution_error_logging_test",
PROCESS_EXECUTION_ERROR_LOGGING_TEST]],
ext='bash')[0]
os.chmod(self.tmpfilename, (stat.S_IRWXU | stat.S_IRGRP |
stat.S_IXGRP | stat.S_IROTH |
stat.S_IXOTH))
def _test_and_check(self, log_errors=None, attempts=None):
fixture = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
kwargs = {}
if log_errors:
kwargs.update({"log_errors": log_errors})
if attempts:
kwargs.update({"attempts": attempts})
err = self.assertRaises(processutils.ProcessExecutionError,
processutils.execute,
self.tmpfilename,
**kwargs)
self.assertEqual(41, err.exit_code)
self.assertIn(self.tmpfilename, fixture.output)
def test_with_invalid_log_errors(self):
self.assertRaises(processutils.InvalidArgumentError,
processutils.execute,
self.tmpfilename,
log_errors='invalid')
def test_with_log_errors_NONE(self):
self._test_and_check(log_errors=None, attempts=None)
def test_with_log_errors_final(self):
self._test_and_check(log_errors=processutils.LOG_FINAL_ERROR,
attempts=None)
def test_with_log_errors_all(self):
self._test_and_check(log_errors=processutils.LOG_ALL_ERRORS,
attempts=None)
def test_multiattempt_with_log_errors_NONE(self):
self._test_and_check(log_errors=None, attempts=3)
def test_multiattempt_with_log_errors_final(self):
self._test_and_check(log_errors=processutils.LOG_FINAL_ERROR,
attempts=3)
def test_multiattempt_with_log_errors_all(self):
self._test_and_check(log_errors=processutils.LOG_ALL_ERRORS,
attempts=3)
def fake_execute(*cmd, **kwargs):
return 'stdout', 'stderr'
def fake_execute_raises(*cmd, **kwargs):
raise processutils.ProcessExecutionError(exit_code=42,
stdout='stdout',
stderr='stderr',
cmd=['this', 'is', 'a',
'command'])
class TryCmdTestCase(test_base.BaseTestCase):
def test_keep_warnings(self):
self.useFixture(fixtures.MonkeyPatch(
'oslo_concurrency.processutils.execute', fake_execute))
o, e = processutils.trycmd('this is a command'.split(' '))
self.assertNotEqual('', o)
self.assertNotEqual('', e)
def test_keep_warnings_from_raise(self):
self.useFixture(fixtures.MonkeyPatch(
'oslo_concurrency.processutils.execute', fake_execute_raises))
o, e = processutils.trycmd('this is a command'.split(' '),
discard_warnings=True)
self.assertIsNotNone(o)
self.assertNotEqual('', e)
def test_discard_warnings(self):
self.useFixture(fixtures.MonkeyPatch(
'oslo_concurrency.processutils.execute', fake_execute))
o, e = processutils.trycmd('this is a command'.split(' '),
discard_warnings=True)
self.assertIsNotNone(o)
self.assertEqual('', e)
class FakeSshChannel(object):
def __init__(self, rc):
self.rc = rc
def recv_exit_status(self):
return self.rc
class FakeSshStream(six.StringIO):
def setup_channel(self, rc):
self.channel = FakeSshChannel(rc)
class FakeSshConnection(object):
def __init__(self, rc):
self.rc = rc
def exec_command(self, cmd):
stdout = FakeSshStream('stdout')
stdout.setup_channel(self.rc)
return (six.StringIO(),
stdout,
six.StringIO('stderr'))
class SshExecuteTestCase(test_base.BaseTestCase):
def test_invalid_addl_env(self):
self.assertRaises(processutils.InvalidArgumentError,
processutils.ssh_execute,
None, 'ls', addl_env='important')
def test_invalid_process_input(self):
self.assertRaises(processutils.InvalidArgumentError,
processutils.ssh_execute,
None, 'ls', process_input='important')
def test_works(self):
o, e = processutils.ssh_execute(FakeSshConnection(0), 'ls')
self.assertEqual('stdout', o)
self.assertEqual('stderr', e)
def test_fails(self):
self.assertRaises(processutils.ProcessExecutionError,
processutils.ssh_execute, FakeSshConnection(1), 'ls')
def _test_compromising_ssh(self, rc, check):
fixture = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
fake_stdin = six.StringIO()
fake_stdout = mock.Mock()
fake_stdout.channel.recv_exit_status.return_value = rc
fake_stdout.read.return_value = 'password="secret"'
fake_stderr = six.StringIO('password="foobar"')
command = 'ls --password="bar"'
connection = mock.Mock()
connection.exec_command.return_value = (fake_stdin, fake_stdout,
fake_stderr)
if check and rc != -1 and rc != 0:
err = self.assertRaises(processutils.ProcessExecutionError,
processutils.ssh_execute,
connection, command,
check_exit_code=check)
self.assertEqual(rc, err.exit_code)
self.assertEqual(err.stdout, 'password="***"')
self.assertEqual(err.stderr, 'password="***"')
self.assertEqual(err.cmd, 'ls --password="***"')
self.assertNotIn('secret', str(err))
self.assertNotIn('foobar', str(err))
else:
o, e = processutils.ssh_execute(connection, command,
check_exit_code=check)
self.assertEqual('password="***"', o)
self.assertEqual('password="***"', e)
self.assertIn('password="***"', fixture.output)
self.assertNotIn('bar', fixture.output)
def test_compromising_ssh1(self):
self._test_compromising_ssh(rc=-1, check=True)
def test_compromising_ssh2(self):
self._test_compromising_ssh(rc=0, check=True)
def test_compromising_ssh3(self):
self._test_compromising_ssh(rc=1, check=True)
def test_compromising_ssh4(self):
self._test_compromising_ssh(rc=1, check=False)
def test_compromising_ssh5(self):
self._test_compromising_ssh(rc=0, check=False)
def test_compromising_ssh6(self):
self._test_compromising_ssh(rc=-1, check=False)

29
tests/test_warning.py Normal file
View File

@ -0,0 +1,29 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import imp
import mock
from oslotest import base as test_base
class DeprecationWarningTest(test_base.BaseTestCase):
@mock.patch('warnings.warn')
def test_warning(self, mock_warn):
import oslo.concurrency
imp.reload(oslo.concurrency)
self.assertTrue(mock_warn.called)
args = mock_warn.call_args
self.assertIn('oslo_concurrency', args[0][0])
self.assertIn('deprecated', args[0][0])
self.assertTrue(issubclass(args[0][1], DeprecationWarning))

View File

@ -1,6 +1,6 @@
[tox] [tox]
minversion = 1.6 minversion = 1.6
envlist = py26,py27,py33,py34,pypy,pep8 envlist = py33,py34,py26,py27,pep8
# NOTE(dhellmann): We cannot set skipdist=True # NOTE(dhellmann): We cannot set skipdist=True
# for oslo libraries because of the namespace package. # for oslo libraries because of the namespace package.
#skipsdist = True #skipsdist = True
@ -54,4 +54,4 @@ exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build
[hacking] [hacking]
import_exceptions = import_exceptions =
oslo.concurrency._i18n oslo_concurrency._i18n