From 78ba1438794ce19c9a67d4d775e3bd113b447582 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 27 Oct 2014 12:55:54 -0700 Subject: [PATCH 01/12] Don't use variables that aren't initialized The code should not try to use acquire_time if it has not been initialized to some reasonable value. This commit sets it to none and checks that it has been set before using it in the release method to calculate the holding time. Change-Id: Id55d549f75a348d744f753a68e1ec298bdd25b33 --- oslo/concurrency/lockutils.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/oslo/concurrency/lockutils.py b/oslo/concurrency/lockutils.py index a3db92c..95f9ffe 100644 --- a/oslo/concurrency/lockutils.py +++ b/oslo/concurrency/lockutils.py @@ -85,6 +85,7 @@ class _FileLock(object): def __init__(self, name): self.lockfile = None self.fname = name + self.acquire_time = None def acquire(self): basedir = os.path.dirname(self.fname) @@ -129,11 +130,15 @@ class _FileLock(object): return self def release(self): + if self.acquire_time is None: + raise threading.ThreadError(_("Unable to release an unacquired" + " lock")) try: release_time = time.time() LOG.debug('Releasing file lock "%s" after holding it for %0.3fs', self.fname, (release_time - self.acquire_time)) self.unlock() + self.acquire_time = None except IOError: LOG.exception(_LE("Could not unlock the acquired lock `%s`"), self.fname) From d5ea62c53a03ffa8dba1c4326054d4f2ab405649 Mon Sep 17 00:00:00 2001 From: Ben Nemec Date: Tue, 28 Oct 2014 15:15:44 +0000 Subject: [PATCH 02/12] lockutils-wrapper cleanup There were a couple of issues with the lockutils wrapper. Most importantly, lockutils silently did nothing if it was called in the same way as the old incubator lockutils module. This had the potential to cause problems when simply doing a find and replace to migrate to oslo.concurrency. The problem is fixed by making lockutils raise an exception if it is called directly. There was also a name mismatch in the docstring for the console entry point, which is also fixed in this change. Closes-Bug: 1386734 Change-Id: I8868820ca314eb8d6fee83cc66fea886c1e74e27 --- oslo/concurrency/lockutils.py | 10 ++++++++-- tests/unit/test_lockutils.py | 8 ++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/oslo/concurrency/lockutils.py b/oslo/concurrency/lockutils.py index 95f9ffe..15e3984 100644 --- a/oslo/concurrency/lockutils.py +++ b/oslo/concurrency/lockutils.py @@ -358,10 +358,10 @@ def _lock_wrapper(argv): """Create a dir for locks and pass it to command from arguments This is exposed as a console script entry point named - oslo-concurrency-lock-wrapper + lockutils-wrapper If you run this: - oslo-concurrency-lock-wrapper python setup.py testr + lockutils-wrapper python setup.py testr a temporary directory will be created for all your locks and passed to all your tests in an environment variable. The temporary dir will be deleted @@ -379,3 +379,9 @@ def _lock_wrapper(argv): def main(): sys.exit(_lock_wrapper(sys.argv)) + + +if __name__ == '__main__': + raise NotImplementedError(_('Calling lockutils directly is no longer ' + 'supported. Please use the ' + 'lockutils-wrapper console script instead.')) diff --git a/tests/unit/test_lockutils.py b/tests/unit/test_lockutils.py index 7d85029..57a2f9e 100644 --- a/tests/unit/test_lockutils.py +++ b/tests/unit/test_lockutils.py @@ -18,6 +18,7 @@ import multiprocessing import os import shutil import signal +import subprocess import sys import tempfile import threading @@ -520,6 +521,13 @@ class LockutilsModuleTestCase(test_base.BaseTestCase): retval = lockutils._lock_wrapper(argv) self.assertEqual(retval, 1) + def test_direct_call_explodes(self): + cmd = [sys.executable, '-m', 'oslo.concurrency.lockutils'] + with open(os.devnull, 'w') as devnull: + retval = subprocess.call(cmd, stderr=devnull) + # 1 for Python 2.7 and 3.x, 255 for 2.6 + self.assertIn(retval, [1, 255]) + class TestLockFixture(test_base.BaseTestCase): From 63e618beb137e2887eabceeb51831034d3384e40 Mon Sep 17 00:00:00 2001 From: OpenStack Proposal Bot Date: Wed, 29 Oct 2014 06:10:48 +0000 Subject: [PATCH 03/12] Imported Translations from Transifex For more information about this automatic import see: https://wiki.openstack.org/wiki/Translations/Infrastructure Change-Id: I8eb9dc27716e59ad507f1b1ec0450da1b4795b67 --- .../locale/en_GB/LC_MESSAGES/oslo.concurrency.po | 16 +++++++++++++--- oslo.concurrency/locale/oslo.concurrency.pot | 16 +++++++++++++--- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/oslo.concurrency/locale/en_GB/LC_MESSAGES/oslo.concurrency.po b/oslo.concurrency/locale/en_GB/LC_MESSAGES/oslo.concurrency.po index 88bee5d..be6b319 100644 --- a/oslo.concurrency/locale/en_GB/LC_MESSAGES/oslo.concurrency.po +++ b/oslo.concurrency/locale/en_GB/LC_MESSAGES/oslo.concurrency.po @@ -9,8 +9,8 @@ msgid "" msgstr "" "Project-Id-Version: oslo.concurrency\n" "Report-Msgid-Bugs-To: EMAIL@ADDRESS\n" -"POT-Creation-Date: 2014-10-25 06:11+0000\n" -"PO-Revision-Date: 2014-10-25 02:29+0000\n" +"POT-Creation-Date: 2014-10-29 06:10+0000\n" +"PO-Revision-Date: 2014-10-28 20:48+0000\n" "Last-Translator: openstackjenkins \n" "Language-Team: English (United Kingdom) " "(http://www.transifex.com/projects/p/osloconcurrency/language/en_GB/)\n" @@ -20,11 +20,21 @@ msgstr "" "Content-Transfer-Encoding: 8bit\n" "Generated-By: Babel 1.3\n" -#: oslo/concurrency/lockutils.py:115 +#: oslo/concurrency/lockutils.py:120 #, python-format msgid "Unable to acquire lock on `%(filename)s` due to %(exception)s" msgstr "Unable to acquire lock on `%(filename)s` due to %(exception)s" +#: oslo/concurrency/lockutils.py:134 +msgid "Unable to release an unacquired lock" +msgstr "" + +#: oslo/concurrency/lockutils.py:385 +msgid "" +"Calling lockutils directly is no longer supported. Please use the " +"lockutils-wrapper console script instead." +msgstr "" + #: oslo/concurrency/processutils.py:69 msgid "Unexpected error while running command." msgstr "Unexpected error while running command." diff --git a/oslo.concurrency/locale/oslo.concurrency.pot b/oslo.concurrency/locale/oslo.concurrency.pot index 8d38800..211a83c 100644 --- a/oslo.concurrency/locale/oslo.concurrency.pot +++ b/oslo.concurrency/locale/oslo.concurrency.pot @@ -7,9 +7,9 @@ #, fuzzy msgid "" msgstr "" -"Project-Id-Version: oslo.concurrency 0.1.0.4.g6c5f5bf\n" +"Project-Id-Version: oslo.concurrency 0.2.0.2.gd5ea62c\n" "Report-Msgid-Bugs-To: EMAIL@ADDRESS\n" -"POT-Creation-Date: 2014-10-25 06:11+0000\n" +"POT-Creation-Date: 2014-10-29 06:10+0000\n" "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" "Last-Translator: FULL NAME \n" "Language-Team: LANGUAGE \n" @@ -18,11 +18,21 @@ msgstr "" "Content-Transfer-Encoding: 8bit\n" "Generated-By: Babel 1.3\n" -#: oslo/concurrency/lockutils.py:115 +#: oslo/concurrency/lockutils.py:120 #, python-format msgid "Unable to acquire lock on `%(filename)s` due to %(exception)s" msgstr "" +#: oslo/concurrency/lockutils.py:134 +msgid "Unable to release an unacquired lock" +msgstr "" + +#: oslo/concurrency/lockutils.py:385 +msgid "" +"Calling lockutils directly is no longer supported. Please use the " +"lockutils-wrapper console script instead." +msgstr "" + #: oslo/concurrency/processutils.py:69 msgid "Unexpected error while running command." msgstr "" From fa52a63e4f972294fd41321b5ce0fe581a3cc03d Mon Sep 17 00:00:00 2001 From: Dan Prince Date: Wed, 29 Oct 2014 15:41:31 -0400 Subject: [PATCH 04/12] Only modify autoindex.rst if it exists This fixes doc build errors when using older Sphinx versions. Change-Id: I0ed9bed311d6309257c0a111671d0fea46f5da7c Closes-bug: #1387343 --- doc/source/conf.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/doc/source/conf.py b/doc/source/conf.py index 4f69dc1..9fad769 100755 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -41,13 +41,15 @@ exclude_patterns = [ ] # Prune the excluded patterns from the autoindex -for line in fileinput.input('api/autoindex.rst', inplace=True): - found = False - for pattern in exclude_patterns: - if fnmatch.fnmatch(line, '*' + pattern[4:]): - found = True - if not found: - print line, +PATH = 'api/autoindex.rst' +if os.path.isfile(PATH) and os.access(PATH, os.R_OK): + for line in fileinput.input(PATH, inplace=True): + found = False + for pattern in exclude_patterns: + if fnmatch.fnmatch(line, '*' + pattern[4:]): + found = True + if not found: + print line, # The suffix of source filenames. source_suffix = '.rst' @@ -91,4 +93,4 @@ latex_documents = [ ] # Example configuration for intersphinx: refer to the Python standard library. -#intersphinx_mapping = {'http://docs.python.org/': None} \ No newline at end of file +#intersphinx_mapping = {'http://docs.python.org/': None} From 58de317f8487a20768da6deafe2d2a7aa3ebcc1d Mon Sep 17 00:00:00 2001 From: Victor Sergeyev Date: Thu, 13 Nov 2014 14:41:29 +0200 Subject: [PATCH 05/12] Improve testing in py3 environment Change-Id: I1d169f0a87d4251778b32e80b3627b407340fec8 --- oslo/concurrency/processutils.py | 2 +- tests/unit/test_processutils.py | 10 +++++----- tox.ini | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/oslo/concurrency/processutils.py b/oslo/concurrency/processutils.py index 4a388b6..1f8718b 100644 --- a/oslo/concurrency/processutils.py +++ b/oslo/concurrency/processutils.py @@ -181,7 +181,7 @@ def execute(*cmd, **kwargs): 'specify a root helper.')) cmd = shlex.split(root_helper) + list(cmd) - cmd = map(str, cmd) + cmd = [str(c) for c in cmd] sanitized_cmd = strutils.mask_password(' '.join(cmd)) while attempts > 0: diff --git a/tests/unit/test_processutils.py b/tests/unit/test_processutils.py index 17bec9e..d77e9ba 100644 --- a/tests/unit/test_processutils.py +++ b/tests/unit/test_processutils.py @@ -142,7 +142,7 @@ exit 1 self.assertRaises(processutils.ProcessExecutionError, processutils.execute, tmpfilename, tmpfilename2, attempts=10, - process_input='foo', + process_input=b'foo', delay_on_retry=False) fp = open(tmpfilename2, 'r') runs = fp.read() @@ -199,7 +199,7 @@ grep foo os.chmod(tmpfilename, 0o755) processutils.execute(tmpfilename, tmpfilename2, - process_input='foo', + process_input=b'foo', attempts=2) finally: os.unlink(tmpfilename) @@ -295,7 +295,7 @@ grep foo out, err = processutils.execute('/usr/bin/env', env_variables=env_vars) - self.assertIn('SUPER_UNIQUE_VAR=The answer is 42', out) + self.assertIn(b'SUPER_UNIQUE_VAR=The answer is 42', out) def test_exception_and_masking(self): tmpfilename = self.create_tempfiles( @@ -314,8 +314,8 @@ grep foo 'something') self.assertEqual(38, err.exit_code) - self.assertEqual(err.stdout, 'onstdout --password="***"\n') - self.assertEqual(err.stderr, 'onstderr --password="***"\n') + self.assertIn('onstdout --password="***"', err.stdout) + self.assertIn('onstderr --password="***"', err.stderr) self.assertEqual(err.cmd, ' '.join([tmpfilename, 'password="***"', 'something'])) diff --git a/tox.ini b/tox.ini index 2f1a852..fca4498 100644 --- a/tox.ini +++ b/tox.ini @@ -23,13 +23,13 @@ commands = deps = -r{toxinidir}/requirements-py3.txt -r{toxinidir}/test-requirements.txt commands = - lockutils-wrapper python -m testtools.run tests.unit.test_lockutils + lockutils-wrapper python setup.py testr --slowest --testr-args='{posargs}' [testenv:py34] deps = -r{toxinidir}/requirements-py3.txt -r{toxinidir}/test-requirements.txt commands = - lockutils-wrapper python -m testtools.run tests.unit.test_lockutils + lockutils-wrapper python setup.py testr --slowest --testr-args='{posargs}' [testenv:pep8] commands = flake8 From bca4a0d827fbef53d2c5a969d895b908a4362485 Mon Sep 17 00:00:00 2001 From: Doug Hellmann Date: Fri, 14 Nov 2014 14:26:53 -0500 Subject: [PATCH 06/12] Move out of the oslo namespace package Move the public API out of oslo.concurrency to oslo_concurrency. Retain the ability to import from the old namespace package for backwards compatibility for this release cycle. bp/drop-namespace-packages Change-Id: I20d1647b1c3ef8cab3b69eccfe168eeb01703b72 --- .gitignore | 1 + .testr.conf | 2 +- doc/source/conf.py | 5 +- oslo/concurrency/__init__.py | 29 + oslo/concurrency/fixture/__init__.py | 13 + .../__init__.py | 0 .../concurrency => oslo_concurrency}/_i18n.py | 2 +- .../fixture}/__init__.py | 0 .../fixture/lockutils.py | 2 +- .../LC_MESSAGES/oslo.concurrency-log-info.po | 0 .../en_GB/LC_MESSAGES/oslo.concurrency.po | 0 .../locale/oslo.concurrency-log-critical.pot | 0 .../locale/oslo.concurrency-log-error.pot | 0 .../locale/oslo.concurrency-log-info.pot | 0 .../locale/oslo.concurrency-log-warning.pot | 0 .../locale/oslo.concurrency.pot | 0 .../lockutils.py | 4 +- .../openstack}/__init__.py | 0 oslo_concurrency/openstack/common/__init__.py | 0 .../openstack/common/fileutils.py | 0 .../concurrency => oslo_concurrency}/opts.py | 4 +- .../processutils.py | 2 +- oslo_concurrency/tests/__init__.py | 0 oslo_concurrency/tests/unit/__init__.py | 0 .../tests}/unit/test_lockutils.py | 6 +- .../tests}/unit/test_lockutils_eventlet.py | 2 +- .../tests}/unit/test_processutils.py | 8 +- setup.cfg | 9 +- tests/test_lockutils.py | 551 ++++++++++++++++++ tests/test_processutils.py | 519 +++++++++++++++++ tests/test_warning.py | 29 + tox.ini | 4 +- 32 files changed, 1169 insertions(+), 23 deletions(-) rename {oslo/concurrency/openstack => oslo_concurrency}/__init__.py (100%) rename {oslo/concurrency => oslo_concurrency}/_i18n.py (94%) rename {oslo/concurrency/openstack/common => oslo_concurrency/fixture}/__init__.py (100%) rename {oslo/concurrency => oslo_concurrency}/fixture/lockutils.py (97%) rename {oslo.concurrency => oslo_concurrency}/locale/en_GB/LC_MESSAGES/oslo.concurrency-log-info.po (100%) rename {oslo.concurrency => oslo_concurrency}/locale/en_GB/LC_MESSAGES/oslo.concurrency.po (100%) rename {oslo.concurrency => oslo_concurrency}/locale/oslo.concurrency-log-critical.pot (100%) rename {oslo.concurrency => oslo_concurrency}/locale/oslo.concurrency-log-error.pot (100%) rename {oslo.concurrency => oslo_concurrency}/locale/oslo.concurrency-log-info.pot (100%) rename {oslo.concurrency => oslo_concurrency}/locale/oslo.concurrency-log-warning.pot (100%) rename {oslo.concurrency => oslo_concurrency}/locale/oslo.concurrency.pot (100%) rename {oslo/concurrency => oslo_concurrency}/lockutils.py (99%) rename {tests/unit => oslo_concurrency/openstack}/__init__.py (100%) create mode 100644 oslo_concurrency/openstack/common/__init__.py rename {oslo/concurrency => oslo_concurrency}/openstack/common/fileutils.py (100%) rename {oslo/concurrency => oslo_concurrency}/opts.py (93%) rename {oslo/concurrency => oslo_concurrency}/processutils.py (99%) create mode 100644 oslo_concurrency/tests/__init__.py create mode 100644 oslo_concurrency/tests/unit/__init__.py rename {tests => oslo_concurrency/tests}/unit/test_lockutils.py (99%) rename {tests => oslo_concurrency/tests}/unit/test_lockutils_eventlet.py (97%) rename {tests => oslo_concurrency/tests}/unit/test_processutils.py (98%) create mode 100644 tests/test_lockutils.py create mode 100644 tests/test_processutils.py create mode 100644 tests/test_warning.py diff --git a/.gitignore b/.gitignore index 1399c98..7a8e0ab 100644 --- a/.gitignore +++ b/.gitignore @@ -41,6 +41,7 @@ output/*/index.html # Sphinx doc/build +doc/source/api # pbr generates these AUTHORS diff --git a/.testr.conf b/.testr.conf index 19721fc..fb62267 100644 --- a/.testr.conf +++ b/.testr.conf @@ -2,6 +2,6 @@ test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \ OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \ OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \ - ${PYTHON:-python} -m subunit.run discover -t ./ ./tests $LISTOPT $IDOPTION + ${PYTHON:-python} -m subunit.run discover -t ./ . $LISTOPT $IDOPTION test_id_option=--load-list $IDFILE test_list_option=--list \ No newline at end of file diff --git a/doc/source/conf.py b/doc/source/conf.py index 9fad769..2d42847 100755 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -36,8 +36,9 @@ extensions = [ # files. exclude_patterns = [ 'api/tests.*', # avoid of docs generation from tests - 'api/oslo.concurrency.openstack.common.*', # skip common modules - 'api/oslo.concurrency._*', # skip private modules + 'api/oslo.concurrency.*', # skip deprecated import from namespace package + 'api/oslo_concurrency.openstack.common.*', # skip common modules + 'api/oslo_concurrency._*', # skip private modules ] # Prune the excluded patterns from the autoindex diff --git a/oslo/concurrency/__init__.py b/oslo/concurrency/__init__.py index e69de29..afd4685 100644 --- a/oslo/concurrency/__init__.py +++ b/oslo/concurrency/__init__.py @@ -0,0 +1,29 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import warnings + +from oslo_concurrency import lockutils # noqa +from oslo_concurrency import processutils # noqa + + +def deprecated(): + new_name = __name__.replace('.', '_') + warnings.warn( + ('The oslo namespace package is deprecated. Please use %s instead.' % + new_name), + DeprecationWarning, + stacklevel=1, + ) + + +deprecated() diff --git a/oslo/concurrency/fixture/__init__.py b/oslo/concurrency/fixture/__init__.py index e69de29..07f8bbc 100644 --- a/oslo/concurrency/fixture/__init__.py +++ b/oslo/concurrency/fixture/__init__.py @@ -0,0 +1,13 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_concurrency.fixture import lockutils # noqa diff --git a/oslo/concurrency/openstack/__init__.py b/oslo_concurrency/__init__.py similarity index 100% rename from oslo/concurrency/openstack/__init__.py rename to oslo_concurrency/__init__.py diff --git a/oslo/concurrency/_i18n.py b/oslo_concurrency/_i18n.py similarity index 94% rename from oslo/concurrency/_i18n.py rename to oslo_concurrency/_i18n.py index eea7d44..9bbb11c 100644 --- a/oslo/concurrency/_i18n.py +++ b/oslo_concurrency/_i18n.py @@ -16,7 +16,7 @@ from oslo import i18n -_translators = i18n.TranslatorFactory(domain='oslo.concurrency') +_translators = i18n.TranslatorFactory(domain='oslo_concurrency') # The primary translation function using the well-known name "_" _ = _translators.primary diff --git a/oslo/concurrency/openstack/common/__init__.py b/oslo_concurrency/fixture/__init__.py similarity index 100% rename from oslo/concurrency/openstack/common/__init__.py rename to oslo_concurrency/fixture/__init__.py diff --git a/oslo/concurrency/fixture/lockutils.py b/oslo_concurrency/fixture/lockutils.py similarity index 97% rename from oslo/concurrency/fixture/lockutils.py rename to oslo_concurrency/fixture/lockutils.py index 5751932..01cec72 100644 --- a/oslo/concurrency/fixture/lockutils.py +++ b/oslo_concurrency/fixture/lockutils.py @@ -15,7 +15,7 @@ import fixtures -from oslo.concurrency import lockutils +from oslo_concurrency import lockutils class LockFixture(fixtures.Fixture): diff --git a/oslo.concurrency/locale/en_GB/LC_MESSAGES/oslo.concurrency-log-info.po b/oslo_concurrency/locale/en_GB/LC_MESSAGES/oslo.concurrency-log-info.po similarity index 100% rename from oslo.concurrency/locale/en_GB/LC_MESSAGES/oslo.concurrency-log-info.po rename to oslo_concurrency/locale/en_GB/LC_MESSAGES/oslo.concurrency-log-info.po diff --git a/oslo.concurrency/locale/en_GB/LC_MESSAGES/oslo.concurrency.po b/oslo_concurrency/locale/en_GB/LC_MESSAGES/oslo.concurrency.po similarity index 100% rename from oslo.concurrency/locale/en_GB/LC_MESSAGES/oslo.concurrency.po rename to oslo_concurrency/locale/en_GB/LC_MESSAGES/oslo.concurrency.po diff --git a/oslo.concurrency/locale/oslo.concurrency-log-critical.pot b/oslo_concurrency/locale/oslo.concurrency-log-critical.pot similarity index 100% rename from oslo.concurrency/locale/oslo.concurrency-log-critical.pot rename to oslo_concurrency/locale/oslo.concurrency-log-critical.pot diff --git a/oslo.concurrency/locale/oslo.concurrency-log-error.pot b/oslo_concurrency/locale/oslo.concurrency-log-error.pot similarity index 100% rename from oslo.concurrency/locale/oslo.concurrency-log-error.pot rename to oslo_concurrency/locale/oslo.concurrency-log-error.pot diff --git a/oslo.concurrency/locale/oslo.concurrency-log-info.pot b/oslo_concurrency/locale/oslo.concurrency-log-info.pot similarity index 100% rename from oslo.concurrency/locale/oslo.concurrency-log-info.pot rename to oslo_concurrency/locale/oslo.concurrency-log-info.pot diff --git a/oslo.concurrency/locale/oslo.concurrency-log-warning.pot b/oslo_concurrency/locale/oslo.concurrency-log-warning.pot similarity index 100% rename from oslo.concurrency/locale/oslo.concurrency-log-warning.pot rename to oslo_concurrency/locale/oslo.concurrency-log-warning.pot diff --git a/oslo.concurrency/locale/oslo.concurrency.pot b/oslo_concurrency/locale/oslo.concurrency.pot similarity index 100% rename from oslo.concurrency/locale/oslo.concurrency.pot rename to oslo_concurrency/locale/oslo.concurrency.pot diff --git a/oslo/concurrency/lockutils.py b/oslo_concurrency/lockutils.py similarity index 99% rename from oslo/concurrency/lockutils.py rename to oslo_concurrency/lockutils.py index 15e3984..b87c01e 100644 --- a/oslo/concurrency/lockutils.py +++ b/oslo_concurrency/lockutils.py @@ -30,8 +30,8 @@ from oslo.config import cfg from oslo.config import cfgfilter import six -from oslo.concurrency._i18n import _, _LE, _LI -from oslo.concurrency.openstack.common import fileutils +from oslo_concurrency._i18n import _, _LE, _LI +from oslo_concurrency.openstack.common import fileutils LOG = logging.getLogger(__name__) diff --git a/tests/unit/__init__.py b/oslo_concurrency/openstack/__init__.py similarity index 100% rename from tests/unit/__init__.py rename to oslo_concurrency/openstack/__init__.py diff --git a/oslo_concurrency/openstack/common/__init__.py b/oslo_concurrency/openstack/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/oslo/concurrency/openstack/common/fileutils.py b/oslo_concurrency/openstack/common/fileutils.py similarity index 100% rename from oslo/concurrency/openstack/common/fileutils.py rename to oslo_concurrency/openstack/common/fileutils.py diff --git a/oslo/concurrency/opts.py b/oslo_concurrency/opts.py similarity index 93% rename from oslo/concurrency/opts.py rename to oslo_concurrency/opts.py index 202dfc0..a6f1e45 100644 --- a/oslo/concurrency/opts.py +++ b/oslo_concurrency/opts.py @@ -20,7 +20,7 @@ __all__ = [ import copy -from oslo.concurrency import lockutils +from oslo_concurrency import lockutils def list_opts(): @@ -34,7 +34,7 @@ def list_opts(): registered. A group name of None corresponds to the [DEFAULT] group in config files. - This function is also discoverable via the 'oslo.concurrency' entry point + This function is also discoverable via the 'oslo_concurrency' entry point under the 'oslo.config.opts' namespace. The purpose of this is to allow tools like the Oslo sample config file diff --git a/oslo/concurrency/processutils.py b/oslo_concurrency/processutils.py similarity index 99% rename from oslo/concurrency/processutils.py rename to oslo_concurrency/processutils.py index 1f8718b..051acec 100644 --- a/oslo/concurrency/processutils.py +++ b/oslo_concurrency/processutils.py @@ -29,7 +29,7 @@ from oslo.utils import importutils from oslo.utils import strutils import six -from oslo.concurrency._i18n import _ +from oslo_concurrency._i18n import _ # NOTE(bnemec): eventlet doesn't monkey patch subprocess, so we need to diff --git a/oslo_concurrency/tests/__init__.py b/oslo_concurrency/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/oslo_concurrency/tests/unit/__init__.py b/oslo_concurrency/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/test_lockutils.py b/oslo_concurrency/tests/unit/test_lockutils.py similarity index 99% rename from tests/unit/test_lockutils.py rename to oslo_concurrency/tests/unit/test_lockutils.py index 57a2f9e..96b9cdb 100644 --- a/tests/unit/test_lockutils.py +++ b/oslo_concurrency/tests/unit/test_lockutils.py @@ -28,9 +28,9 @@ from oslo.config import cfg from oslotest import base as test_base import six -from oslo.concurrency.fixture import lockutils as fixtures -from oslo.concurrency import lockutils from oslo.config import fixture as config +from oslo_concurrency.fixture import lockutils as fixtures +from oslo_concurrency import lockutils class LockTestCase(test_base.BaseTestCase): @@ -522,7 +522,7 @@ class LockutilsModuleTestCase(test_base.BaseTestCase): self.assertEqual(retval, 1) def test_direct_call_explodes(self): - cmd = [sys.executable, '-m', 'oslo.concurrency.lockutils'] + cmd = [sys.executable, '-m', 'oslo_concurrency.lockutils'] with open(os.devnull, 'w') as devnull: retval = subprocess.call(cmd, stderr=devnull) # 1 for Python 2.7 and 3.x, 255 for 2.6 diff --git a/tests/unit/test_lockutils_eventlet.py b/oslo_concurrency/tests/unit/test_lockutils_eventlet.py similarity index 97% rename from tests/unit/test_lockutils_eventlet.py rename to oslo_concurrency/tests/unit/test_lockutils_eventlet.py index 41c19c6..bb333d7 100644 --- a/tests/unit/test_lockutils_eventlet.py +++ b/oslo_concurrency/tests/unit/test_lockutils_eventlet.py @@ -20,7 +20,7 @@ import eventlet from eventlet import greenpool from oslotest import base as test_base -from oslo.concurrency import lockutils +from oslo_concurrency import lockutils class TestFileLocks(test_base.BaseTestCase): diff --git a/tests/unit/test_processutils.py b/oslo_concurrency/tests/unit/test_processutils.py similarity index 98% rename from tests/unit/test_processutils.py rename to oslo_concurrency/tests/unit/test_processutils.py index d77e9ba..50ea611 100644 --- a/tests/unit/test_processutils.py +++ b/oslo_concurrency/tests/unit/test_processutils.py @@ -27,7 +27,7 @@ import mock from oslotest import base as test_base import six -from oslo.concurrency import processutils +from oslo_concurrency import processutils from oslotest import mockpatch PROCESS_EXECUTION_ERROR_LOGGING_TEST = """#!/bin/bash exit 41""" @@ -396,14 +396,14 @@ def fake_execute_raises(*cmd, **kwargs): class TryCmdTestCase(test_base.BaseTestCase): def test_keep_warnings(self): self.useFixture(fixtures.MonkeyPatch( - 'oslo.concurrency.processutils.execute', fake_execute)) + 'oslo_concurrency.processutils.execute', fake_execute)) o, e = processutils.trycmd('this is a command'.split(' ')) self.assertNotEqual('', o) self.assertNotEqual('', e) def test_keep_warnings_from_raise(self): self.useFixture(fixtures.MonkeyPatch( - 'oslo.concurrency.processutils.execute', fake_execute_raises)) + 'oslo_concurrency.processutils.execute', fake_execute_raises)) o, e = processutils.trycmd('this is a command'.split(' '), discard_warnings=True) self.assertIsNotNone(o) @@ -411,7 +411,7 @@ class TryCmdTestCase(test_base.BaseTestCase): def test_discard_warnings(self): self.useFixture(fixtures.MonkeyPatch( - 'oslo.concurrency.processutils.execute', fake_execute)) + 'oslo_concurrency.processutils.execute', fake_execute)) o, e = processutils.trycmd('this is a command'.split(' '), discard_warnings=True) self.assertIsNotNone(o) diff --git a/setup.cfg b/setup.cfg index cb2c835..9f99301 100644 --- a/setup.cfg +++ b/setup.cfg @@ -22,15 +22,18 @@ classifier = [files] packages = oslo - oslo.concurrency + oslo.concurrency + oslo.concurrency.fixture + oslo_concurrency + oslo_concurrency.fixture namespace_packages = oslo [entry_points] oslo.config.opts = - oslo.concurrency = oslo.concurrency.opts:list_opts + oslo.concurrency = oslo_concurrency.opts:list_opts console_scripts = - lockutils-wrapper = oslo.concurrency.lockutils:main + lockutils-wrapper = oslo_concurrency.lockutils:main [build_sphinx] source-dir = doc/source diff --git a/tests/test_lockutils.py b/tests/test_lockutils.py new file mode 100644 index 0000000..0bde4a8 --- /dev/null +++ b/tests/test_lockutils.py @@ -0,0 +1,551 @@ +# Copyright 2011 Justin Santa Barbara +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import errno +import fcntl +import multiprocessing +import os +import shutil +import signal +import subprocess +import sys +import tempfile +import threading +import time + +from oslo.config import cfg +from oslotest import base as test_base +import six + +from oslo.concurrency.fixture import lockutils as fixtures # noqa +from oslo.concurrency import lockutils # noqa +from oslo.config import fixture as config + + +class LockTestCase(test_base.BaseTestCase): + + def setUp(self): + super(LockTestCase, self).setUp() + self.config = self.useFixture(config.Config(lockutils.CONF)).config + + def test_synchronized_wrapped_function_metadata(self): + @lockutils.synchronized('whatever', 'test-') + def foo(): + """Bar.""" + pass + + self.assertEqual(foo.__doc__, 'Bar.', "Wrapped function's docstring " + "got lost") + self.assertEqual(foo.__name__, 'foo', "Wrapped function's name " + "got mangled") + + def test_lock_acquire_release_file_lock(self): + lock_dir = tempfile.mkdtemp() + lock_file = os.path.join(lock_dir, 'lock') + lock = lockutils._FcntlLock(lock_file) + + def try_lock(): + try: + my_lock = lockutils._FcntlLock(lock_file) + my_lock.lockfile = open(lock_file, 'w') + my_lock.trylock() + my_lock.unlock() + os._exit(1) + except IOError: + os._exit(0) + + def attempt_acquire(count): + children = [] + for i in range(count): + child = multiprocessing.Process(target=try_lock) + child.start() + children.append(child) + exit_codes = [] + for child in children: + child.join() + exit_codes.append(child.exitcode) + return sum(exit_codes) + + self.assertTrue(lock.acquire()) + try: + acquired_children = attempt_acquire(10) + self.assertEqual(0, acquired_children) + finally: + lock.release() + + try: + acquired_children = attempt_acquire(5) + self.assertNotEqual(0, acquired_children) + finally: + try: + shutil.rmtree(lock_dir) + except IOError: + pass + + def test_lock_internally(self): + """We can lock across multiple threads.""" + saved_sem_num = len(lockutils._semaphores) + seen_threads = list() + + def f(_id): + with lockutils.lock('testlock2', 'test-', external=False): + for x in range(10): + seen_threads.append(_id) + + threads = [] + for i in range(10): + thread = threading.Thread(target=f, args=(i,)) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + self.assertEqual(len(seen_threads), 100) + # Looking at the seen threads, split it into chunks of 10, and verify + # that the last 9 match the first in each chunk. + for i in range(10): + for j in range(9): + self.assertEqual(seen_threads[i * 10], + seen_threads[i * 10 + 1 + j]) + + self.assertEqual(saved_sem_num, len(lockutils._semaphores), + "Semaphore leak detected") + + def test_nested_synchronized_external_works(self): + """We can nest external syncs.""" + tempdir = tempfile.mkdtemp() + try: + self.config(lock_path=tempdir, group='oslo_concurrency') + sentinel = object() + + @lockutils.synchronized('testlock1', 'test-', external=True) + def outer_lock(): + + @lockutils.synchronized('testlock2', 'test-', external=True) + def inner_lock(): + return sentinel + return inner_lock() + + self.assertEqual(sentinel, outer_lock()) + + finally: + if os.path.exists(tempdir): + shutil.rmtree(tempdir) + + def _do_test_lock_externally(self): + """We can lock across multiple processes.""" + + def lock_files(handles_dir): + + with lockutils.lock('external', 'test-', external=True): + # Open some files we can use for locking + handles = [] + for n in range(50): + path = os.path.join(handles_dir, ('file-%s' % n)) + handles.append(open(path, 'w')) + + # Loop over all the handles and try locking the file + # without blocking, keep a count of how many files we + # were able to lock and then unlock. If the lock fails + # we get an IOError and bail out with bad exit code + count = 0 + for handle in handles: + try: + fcntl.flock(handle, fcntl.LOCK_EX | fcntl.LOCK_NB) + count += 1 + fcntl.flock(handle, fcntl.LOCK_UN) + except IOError: + os._exit(2) + finally: + handle.close() + + # Check if we were able to open all files + self.assertEqual(50, count) + + handles_dir = tempfile.mkdtemp() + try: + children = [] + for n in range(50): + pid = os.fork() + if pid: + children.append(pid) + else: + try: + lock_files(handles_dir) + finally: + os._exit(0) + + for child in children: + (pid, status) = os.waitpid(child, 0) + if pid: + self.assertEqual(0, status) + finally: + if os.path.exists(handles_dir): + shutil.rmtree(handles_dir, ignore_errors=True) + + def test_lock_externally(self): + lock_dir = tempfile.mkdtemp() + self.config(lock_path=lock_dir, group='oslo_concurrency') + + try: + self._do_test_lock_externally() + finally: + if os.path.exists(lock_dir): + shutil.rmtree(lock_dir, ignore_errors=True) + + def test_lock_externally_lock_dir_not_exist(self): + lock_dir = tempfile.mkdtemp() + os.rmdir(lock_dir) + self.config(lock_path=lock_dir, group='oslo_concurrency') + + try: + self._do_test_lock_externally() + finally: + if os.path.exists(lock_dir): + shutil.rmtree(lock_dir, ignore_errors=True) + + def test_synchronized_with_prefix(self): + lock_name = 'mylock' + lock_pfix = 'mypfix-' + + foo = lockutils.synchronized_with_prefix(lock_pfix) + + @foo(lock_name, external=True) + def bar(dirpath, pfix, name): + return True + + lock_dir = tempfile.mkdtemp() + self.config(lock_path=lock_dir, group='oslo_concurrency') + + self.assertTrue(bar(lock_dir, lock_pfix, lock_name)) + + def test_synchronized_without_prefix(self): + lock_dir = tempfile.mkdtemp() + self.config(lock_path=lock_dir, group='oslo_concurrency') + + @lockutils.synchronized('lock', external=True) + def test_without_prefix(): + # We can't check much + pass + + try: + test_without_prefix() + finally: + if os.path.exists(lock_dir): + shutil.rmtree(lock_dir, ignore_errors=True) + + def test_synchronized_prefix_without_hypen(self): + lock_dir = tempfile.mkdtemp() + self.config(lock_path=lock_dir, group='oslo_concurrency') + + @lockutils.synchronized('lock', 'hypen', True) + def test_without_hypen(): + # We can't check much + pass + + try: + test_without_hypen() + finally: + if os.path.exists(lock_dir): + shutil.rmtree(lock_dir, ignore_errors=True) + + def test_contextlock(self): + lock_dir = tempfile.mkdtemp() + self.config(lock_path=lock_dir, group='oslo_concurrency') + + try: + # Note(flaper87): Lock is not external, which means + # a semaphore will be yielded + with lockutils.lock("test") as sem: + if six.PY2: + self.assertTrue(isinstance(sem, threading._Semaphore)) + else: + self.assertTrue(isinstance(sem, threading.Semaphore)) + + # NOTE(flaper87): Lock is external so an InterProcessLock + # will be yielded. + with lockutils.lock("test2", external=True) as lock: + self.assertTrue(lock.exists()) + + with lockutils.lock("test1", + external=True) as lock1: + self.assertTrue(isinstance(lock1, + lockutils.InterProcessLock)) + finally: + if os.path.exists(lock_dir): + shutil.rmtree(lock_dir, ignore_errors=True) + + def test_contextlock_unlocks(self): + lock_dir = tempfile.mkdtemp() + self.config(lock_path=lock_dir, group='oslo_concurrency') + + sem = None + + try: + with lockutils.lock("test") as sem: + if six.PY2: + self.assertTrue(isinstance(sem, threading._Semaphore)) + else: + self.assertTrue(isinstance(sem, threading.Semaphore)) + + with lockutils.lock("test2", external=True) as lock: + self.assertTrue(lock.exists()) + + # NOTE(flaper87): Lock should be free + with lockutils.lock("test2", external=True) as lock: + self.assertTrue(lock.exists()) + + # NOTE(flaper87): Lock should be free + # but semaphore should already exist. + with lockutils.lock("test") as sem2: + self.assertEqual(sem, sem2) + finally: + if os.path.exists(lock_dir): + shutil.rmtree(lock_dir, ignore_errors=True) + + def _test_remove_lock_external_file(self, lock_dir, use_external=False): + lock_name = 'mylock' + lock_pfix = 'mypfix-remove-lock-test-' + + if use_external: + lock_path = lock_dir + else: + lock_path = None + + lockutils.remove_external_lock_file(lock_name, lock_pfix, lock_path) + + for ent in os.listdir(lock_dir): + self.assertRaises(OSError, ent.startswith, lock_pfix) + + if os.path.exists(lock_dir): + shutil.rmtree(lock_dir, ignore_errors=True) + + def test_remove_lock_external_file(self): + lock_dir = tempfile.mkdtemp() + self.config(lock_path=lock_dir, group='oslo_concurrency') + self._test_remove_lock_external_file(lock_dir) + + def test_remove_lock_external_file_lock_path(self): + lock_dir = tempfile.mkdtemp() + self._test_remove_lock_external_file(lock_dir, + use_external=True) + + def test_no_slash_in_b64(self): + # base64(sha1(foobar)) has a slash in it + with lockutils.lock("foobar"): + pass + + def test_deprecated_names(self): + paths = self.create_tempfiles([['fake.conf', '\n'.join([ + '[DEFAULT]', + 'lock_path=foo', + 'disable_process_locking=True']) + ]]) + conf = cfg.ConfigOpts() + conf(['--config-file', paths[0]]) + conf.register_opts(lockutils._opts, 'oslo_concurrency') + self.assertEqual(conf.oslo_concurrency.lock_path, 'foo') + self.assertTrue(conf.oslo_concurrency.disable_process_locking) + + +class BrokenLock(lockutils._FileLock): + def __init__(self, name, errno_code): + super(BrokenLock, self).__init__(name) + self.errno_code = errno_code + + def unlock(self): + pass + + def trylock(self): + err = IOError() + err.errno = self.errno_code + raise err + + +class FileBasedLockingTestCase(test_base.BaseTestCase): + def setUp(self): + super(FileBasedLockingTestCase, self).setUp() + self.lock_dir = tempfile.mkdtemp() + + def test_lock_file_exists(self): + lock_file = os.path.join(self.lock_dir, 'lock-file') + + @lockutils.synchronized('lock-file', external=True, + lock_path=self.lock_dir) + def foo(): + self.assertTrue(os.path.exists(lock_file)) + + foo() + + def test_bad_acquire(self): + lock_file = os.path.join(self.lock_dir, 'lock') + lock = BrokenLock(lock_file, errno.EBUSY) + + self.assertRaises(threading.ThreadError, lock.acquire) + + def test_interprocess_lock(self): + lock_file = os.path.join(self.lock_dir, 'processlock') + + pid = os.fork() + if pid: + # Make sure the child grabs the lock first + start = time.time() + while not os.path.exists(lock_file): + if time.time() - start > 5: + self.fail('Timed out waiting for child to grab lock') + time.sleep(0) + lock1 = lockutils.InterProcessLock('foo') + lock1.lockfile = open(lock_file, 'w') + # NOTE(bnemec): There is a brief window between when the lock file + # is created and when it actually becomes locked. If we happen to + # context switch in that window we may succeed in locking the + # file. Keep retrying until we either get the expected exception + # or timeout waiting. + while time.time() - start < 5: + try: + lock1.trylock() + lock1.unlock() + time.sleep(0) + except IOError: + # This is what we expect to happen + break + else: + self.fail('Never caught expected lock exception') + # We don't need to wait for the full sleep in the child here + os.kill(pid, signal.SIGKILL) + else: + try: + lock2 = lockutils.InterProcessLock('foo') + lock2.lockfile = open(lock_file, 'w') + have_lock = False + while not have_lock: + try: + lock2.trylock() + have_lock = True + except IOError: + pass + finally: + # NOTE(bnemec): This is racy, but I don't want to add any + # synchronization primitives that might mask a problem + # with the one we're trying to test here. + time.sleep(.5) + os._exit(0) + + def test_interthread_external_lock(self): + call_list = [] + + @lockutils.synchronized('foo', external=True, lock_path=self.lock_dir) + def foo(param): + """Simulate a long-running threaded operation.""" + call_list.append(param) + # NOTE(bnemec): This is racy, but I don't want to add any + # synchronization primitives that might mask a problem + # with the one we're trying to test here. + time.sleep(.5) + call_list.append(param) + + def other(param): + foo(param) + + thread = threading.Thread(target=other, args=('other',)) + thread.start() + # Make sure the other thread grabs the lock + # NOTE(bnemec): File locks do not actually work between threads, so + # this test is verifying that the local semaphore is still enforcing + # external locks in that case. This means this test does not have + # the same race problem as the process test above because when the + # file is created the semaphore has already been grabbed. + start = time.time() + while not os.path.exists(os.path.join(self.lock_dir, 'foo')): + if time.time() - start > 5: + self.fail('Timed out waiting for thread to grab lock') + time.sleep(0) + thread1 = threading.Thread(target=other, args=('main',)) + thread1.start() + thread1.join() + thread.join() + self.assertEqual(call_list, ['other', 'other', 'main', 'main']) + + def test_non_destructive(self): + lock_file = os.path.join(self.lock_dir, 'not-destroyed') + with open(lock_file, 'w') as f: + f.write('test') + with lockutils.lock('not-destroyed', external=True, + lock_path=self.lock_dir): + with open(lock_file) as f: + self.assertEqual(f.read(), 'test') + + +class LockutilsModuleTestCase(test_base.BaseTestCase): + + def setUp(self): + super(LockutilsModuleTestCase, self).setUp() + self.old_env = os.environ.get('OSLO_LOCK_PATH') + if self.old_env is not None: + del os.environ['OSLO_LOCK_PATH'] + + def tearDown(self): + if self.old_env is not None: + os.environ['OSLO_LOCK_PATH'] = self.old_env + super(LockutilsModuleTestCase, self).tearDown() + + def test_main(self): + script = '\n'.join([ + 'import os', + 'lock_path = os.environ.get("OSLO_LOCK_PATH")', + 'assert lock_path is not None', + 'assert os.path.isdir(lock_path)', + ]) + argv = ['', sys.executable, '-c', script] + retval = lockutils._lock_wrapper(argv) + self.assertEqual(retval, 0, "Bad OSLO_LOCK_PATH has been set") + + def test_return_value_maintained(self): + script = '\n'.join([ + 'import sys', + 'sys.exit(1)', + ]) + argv = ['', sys.executable, '-c', script] + retval = lockutils._lock_wrapper(argv) + self.assertEqual(retval, 1) + + def test_direct_call_explodes(self): + cmd = [sys.executable, '-m', 'oslo_concurrency.lockutils'] + with open(os.devnull, 'w') as devnull: + retval = subprocess.call(cmd, stderr=devnull) + # 1 for Python 2.7 and 3.x, 255 for 2.6 + self.assertIn(retval, [1, 255]) + + +class TestLockFixture(test_base.BaseTestCase): + + def setUp(self): + super(TestLockFixture, self).setUp() + self.config = self.useFixture(config.Config(lockutils.CONF)).config + self.tempdir = tempfile.mkdtemp() + + def _check_in_lock(self): + self.assertTrue(self.lock.exists()) + + def tearDown(self): + self._check_in_lock() + super(TestLockFixture, self).tearDown() + + def test_lock_fixture(self): + # Setup lock fixture to test that teardown is inside the lock + self.config(lock_path=self.tempdir, group='oslo_concurrency') + fixture = fixtures.LockFixture('test-lock') + self.useFixture(fixture) + self.lock = fixture.lock diff --git a/tests/test_processutils.py b/tests/test_processutils.py new file mode 100644 index 0000000..ce07e72 --- /dev/null +++ b/tests/test_processutils.py @@ -0,0 +1,519 @@ +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import print_function + +import errno +import logging +import multiprocessing +import os +import stat +import tempfile + +import fixtures +import mock +from oslotest import base as test_base +from oslotest import mockpatch +import six + +from oslo.concurrency import processutils # noqa + +PROCESS_EXECUTION_ERROR_LOGGING_TEST = """#!/bin/bash +exit 41""" + +TEST_EXCEPTION_AND_MASKING_SCRIPT = """#!/bin/bash +# This is to test stdout and stderr +# and the command returned in an exception +# when a non-zero exit code is returned +echo onstdout --password='"secret"' +echo onstderr --password='"secret"' 1>&2 +exit 38""" + + +class UtilsTest(test_base.BaseTestCase): + # NOTE(jkoelker) Moar tests from nova need to be ported. But they + # need to be mock'd out. Currently they require actually + # running code. + def test_execute_unknown_kwargs(self): + self.assertRaises(processutils.UnknownArgumentError, + processutils.execute, + hozer=True) + + @mock.patch.object(multiprocessing, 'cpu_count', return_value=8) + def test_get_worker_count(self, mock_cpu_count): + self.assertEqual(8, processutils.get_worker_count()) + + @mock.patch.object(multiprocessing, 'cpu_count', + side_effect=NotImplementedError()) + def test_get_worker_count_cpu_count_not_implemented(self, + mock_cpu_count): + self.assertEqual(1, processutils.get_worker_count()) + + +class ProcessExecutionErrorTest(test_base.BaseTestCase): + + def test_defaults(self): + err = processutils.ProcessExecutionError() + self.assertTrue('None\n' in six.text_type(err)) + self.assertTrue('code: -\n' in six.text_type(err)) + + def test_with_description(self): + description = 'The Narwhal Bacons at Midnight' + err = processutils.ProcessExecutionError(description=description) + self.assertTrue(description in six.text_type(err)) + + def test_with_exit_code(self): + exit_code = 0 + err = processutils.ProcessExecutionError(exit_code=exit_code) + self.assertTrue(str(exit_code) in six.text_type(err)) + + def test_with_cmd(self): + cmd = 'telinit' + err = processutils.ProcessExecutionError(cmd=cmd) + self.assertTrue(cmd in six.text_type(err)) + + def test_with_stdout(self): + stdout = """ + Lo, praise of the prowess of people-kings + of spear-armed Danes, in days long sped, + we have heard, and what honot the athelings won! + Oft Scyld the Scefing from squadroned foes, + from many a tribe, the mead-bench tore, + awing the earls. Since erse he lay + friendless, a foundling, fate repaid him: + for he waxed under welkin, in wealth he trove, + till before him the folk, both far and near, + who house by the whale-path, heard his mandate, + gabe him gits: a good king he! + To him an heir was afterward born, + a son in his halls, whom heaven sent + to favor the fol, feeling their woe + that erst they had lacked an earl for leader + so long a while; the Lord endowed him, + the Wielder of Wonder, with world's renown. + """.strip() + err = processutils.ProcessExecutionError(stdout=stdout) + print(six.text_type(err)) + self.assertTrue('people-kings' in six.text_type(err)) + + def test_with_stderr(self): + stderr = 'Cottonian library' + err = processutils.ProcessExecutionError(stderr=stderr) + self.assertTrue(stderr in six.text_type(err)) + + def test_retry_on_failure(self): + fd, tmpfilename = tempfile.mkstemp() + _, tmpfilename2 = tempfile.mkstemp() + try: + fp = os.fdopen(fd, 'w+') + fp.write('''#!/bin/sh +# If stdin fails to get passed during one of the runs, make a note. +if ! grep -q foo +then + echo 'failure' > "$1" +fi +# If stdin has failed to get passed during this or a previous run, exit early. +if grep failure "$1" +then + exit 1 +fi +runs="$(cat $1)" +if [ -z "$runs" ] +then + runs=0 +fi +runs=$(($runs + 1)) +echo $runs > "$1" +exit 1 +''') + fp.close() + os.chmod(tmpfilename, 0o755) + self.assertRaises(processutils.ProcessExecutionError, + processutils.execute, + tmpfilename, tmpfilename2, attempts=10, + process_input=b'foo', + delay_on_retry=False) + fp = open(tmpfilename2, 'r') + runs = fp.read() + fp.close() + self.assertNotEqual(runs.strip(), 'failure', 'stdin did not ' + 'always get passed ' + 'correctly') + runs = int(runs.strip()) + self.assertEqual(runs, 10, 'Ran %d times instead of 10.' % (runs,)) + finally: + os.unlink(tmpfilename) + os.unlink(tmpfilename2) + + def test_unknown_kwargs_raises_error(self): + self.assertRaises(processutils.UnknownArgumentError, + processutils.execute, + '/usr/bin/env', 'true', + this_is_not_a_valid_kwarg=True) + + def test_check_exit_code_boolean(self): + processutils.execute('/usr/bin/env', 'false', check_exit_code=False) + self.assertRaises(processutils.ProcessExecutionError, + processutils.execute, + '/usr/bin/env', 'false', check_exit_code=True) + + def test_check_exit_code_list(self): + processutils.execute('/usr/bin/env', 'sh', '-c', 'exit 101', + check_exit_code=(101, 102)) + processutils.execute('/usr/bin/env', 'sh', '-c', 'exit 102', + check_exit_code=(101, 102)) + self.assertRaises(processutils.ProcessExecutionError, + processutils.execute, + '/usr/bin/env', 'sh', '-c', 'exit 103', + check_exit_code=(101, 102)) + self.assertRaises(processutils.ProcessExecutionError, + processutils.execute, + '/usr/bin/env', 'sh', '-c', 'exit 0', + check_exit_code=(101, 102)) + + def test_no_retry_on_success(self): + fd, tmpfilename = tempfile.mkstemp() + _, tmpfilename2 = tempfile.mkstemp() + try: + fp = os.fdopen(fd, 'w+') + fp.write("""#!/bin/sh +# If we've already run, bail out. +grep -q foo "$1" && exit 1 +# Mark that we've run before. +echo foo > "$1" +# Check that stdin gets passed correctly. +grep foo +""") + fp.close() + os.chmod(tmpfilename, 0o755) + processutils.execute(tmpfilename, + tmpfilename2, + process_input=b'foo', + attempts=2) + finally: + os.unlink(tmpfilename) + os.unlink(tmpfilename2) + + # This test and the one below ensures that when communicate raises + # an OSError, we do the right thing(s) + def test_exception_on_communicate_error(self): + mock = self.useFixture(mockpatch.Patch( + 'subprocess.Popen.communicate', + side_effect=OSError(errno.EAGAIN, 'fake-test'))) + + self.assertRaises(OSError, + processutils.execute, + '/usr/bin/env', + 'false', + check_exit_code=False) + + self.assertEqual(1, mock.mock.call_count) + + def test_retry_on_communicate_error(self): + mock = self.useFixture(mockpatch.Patch( + 'subprocess.Popen.communicate', + side_effect=OSError(errno.EAGAIN, 'fake-test'))) + + self.assertRaises(OSError, + processutils.execute, + '/usr/bin/env', + 'false', + check_exit_code=False, + attempts=5) + + self.assertEqual(5, mock.mock.call_count) + + def _test_and_check_logging_communicate_errors(self, log_errors=None, + attempts=None): + mock = self.useFixture(mockpatch.Patch( + 'subprocess.Popen.communicate', + side_effect=OSError(errno.EAGAIN, 'fake-test'))) + + fixture = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG)) + kwargs = {} + + if log_errors: + kwargs.update({"log_errors": log_errors}) + + if attempts: + kwargs.update({"attempts": attempts}) + + self.assertRaises(OSError, + processutils.execute, + '/usr/bin/env', + 'false', + **kwargs) + + self.assertEqual(attempts if attempts else 1, mock.mock.call_count) + self.assertIn('Got an OSError', fixture.output) + self.assertIn('errno: 11', fixture.output) + self.assertIn("'/usr/bin/env false'", fixture.output) + + def test_logging_on_communicate_error_1(self): + self._test_and_check_logging_communicate_errors( + log_errors=processutils.LOG_FINAL_ERROR, + attempts=None) + + def test_logging_on_communicate_error_2(self): + self._test_and_check_logging_communicate_errors( + log_errors=processutils.LOG_FINAL_ERROR, + attempts=1) + + def test_logging_on_communicate_error_3(self): + self._test_and_check_logging_communicate_errors( + log_errors=processutils.LOG_FINAL_ERROR, + attempts=5) + + def test_logging_on_communicate_error_4(self): + self._test_and_check_logging_communicate_errors( + log_errors=processutils.LOG_ALL_ERRORS, + attempts=None) + + def test_logging_on_communicate_error_5(self): + self._test_and_check_logging_communicate_errors( + log_errors=processutils.LOG_ALL_ERRORS, + attempts=1) + + def test_logging_on_communicate_error_6(self): + self._test_and_check_logging_communicate_errors( + log_errors=processutils.LOG_ALL_ERRORS, + attempts=5) + + def test_with_env_variables(self): + env_vars = {'SUPER_UNIQUE_VAR': 'The answer is 42'} + + out, err = processutils.execute('/usr/bin/env', env_variables=env_vars) + + self.assertIn(b'SUPER_UNIQUE_VAR=The answer is 42', out) + + def test_exception_and_masking(self): + tmpfilename = self.create_tempfiles( + [["test_exceptions_and_masking", + TEST_EXCEPTION_AND_MASKING_SCRIPT]], ext='bash')[0] + + os.chmod(tmpfilename, (stat.S_IRWXU | + stat.S_IRGRP | + stat.S_IXGRP | + stat.S_IROTH | + stat.S_IXOTH)) + + err = self.assertRaises(processutils.ProcessExecutionError, + processutils.execute, + tmpfilename, 'password="secret"', + 'something') + + self.assertEqual(38, err.exit_code) + self.assertIn('onstdout --password="***"', err.stdout) + self.assertIn('onstderr --password="***"', err.stderr) + self.assertEqual(err.cmd, ' '.join([tmpfilename, + 'password="***"', + 'something'])) + self.assertNotIn('secret', str(err)) + + +class ProcessExecutionErrorLoggingTest(test_base.BaseTestCase): + def setUp(self): + super(ProcessExecutionErrorLoggingTest, self).setUp() + self.tmpfilename = self.create_tempfiles( + [["process_execution_error_logging_test", + PROCESS_EXECUTION_ERROR_LOGGING_TEST]], + ext='bash')[0] + + os.chmod(self.tmpfilename, (stat.S_IRWXU | stat.S_IRGRP | + stat.S_IXGRP | stat.S_IROTH | + stat.S_IXOTH)) + + def _test_and_check(self, log_errors=None, attempts=None): + fixture = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG)) + kwargs = {} + + if log_errors: + kwargs.update({"log_errors": log_errors}) + + if attempts: + kwargs.update({"attempts": attempts}) + + err = self.assertRaises(processutils.ProcessExecutionError, + processutils.execute, + self.tmpfilename, + **kwargs) + + self.assertEqual(41, err.exit_code) + self.assertIn(self.tmpfilename, fixture.output) + + def test_with_invalid_log_errors(self): + self.assertRaises(processutils.InvalidArgumentError, + processutils.execute, + self.tmpfilename, + log_errors='invalid') + + def test_with_log_errors_NONE(self): + self._test_and_check(log_errors=None, attempts=None) + + def test_with_log_errors_final(self): + self._test_and_check(log_errors=processutils.LOG_FINAL_ERROR, + attempts=None) + + def test_with_log_errors_all(self): + self._test_and_check(log_errors=processutils.LOG_ALL_ERRORS, + attempts=None) + + def test_multiattempt_with_log_errors_NONE(self): + self._test_and_check(log_errors=None, attempts=3) + + def test_multiattempt_with_log_errors_final(self): + self._test_and_check(log_errors=processutils.LOG_FINAL_ERROR, + attempts=3) + + def test_multiattempt_with_log_errors_all(self): + self._test_and_check(log_errors=processutils.LOG_ALL_ERRORS, + attempts=3) + + +def fake_execute(*cmd, **kwargs): + return 'stdout', 'stderr' + + +def fake_execute_raises(*cmd, **kwargs): + raise processutils.ProcessExecutionError(exit_code=42, + stdout='stdout', + stderr='stderr', + cmd=['this', 'is', 'a', + 'command']) + + +class TryCmdTestCase(test_base.BaseTestCase): + def test_keep_warnings(self): + self.useFixture(fixtures.MonkeyPatch( + 'oslo_concurrency.processutils.execute', fake_execute)) + o, e = processutils.trycmd('this is a command'.split(' ')) + self.assertNotEqual('', o) + self.assertNotEqual('', e) + + def test_keep_warnings_from_raise(self): + self.useFixture(fixtures.MonkeyPatch( + 'oslo_concurrency.processutils.execute', fake_execute_raises)) + o, e = processutils.trycmd('this is a command'.split(' '), + discard_warnings=True) + self.assertIsNotNone(o) + self.assertNotEqual('', e) + + def test_discard_warnings(self): + self.useFixture(fixtures.MonkeyPatch( + 'oslo_concurrency.processutils.execute', fake_execute)) + o, e = processutils.trycmd('this is a command'.split(' '), + discard_warnings=True) + self.assertIsNotNone(o) + self.assertEqual('', e) + + +class FakeSshChannel(object): + def __init__(self, rc): + self.rc = rc + + def recv_exit_status(self): + return self.rc + + +class FakeSshStream(six.StringIO): + def setup_channel(self, rc): + self.channel = FakeSshChannel(rc) + + +class FakeSshConnection(object): + def __init__(self, rc): + self.rc = rc + + def exec_command(self, cmd): + stdout = FakeSshStream('stdout') + stdout.setup_channel(self.rc) + return (six.StringIO(), + stdout, + six.StringIO('stderr')) + + +class SshExecuteTestCase(test_base.BaseTestCase): + def test_invalid_addl_env(self): + self.assertRaises(processutils.InvalidArgumentError, + processutils.ssh_execute, + None, 'ls', addl_env='important') + + def test_invalid_process_input(self): + self.assertRaises(processutils.InvalidArgumentError, + processutils.ssh_execute, + None, 'ls', process_input='important') + + def test_works(self): + o, e = processutils.ssh_execute(FakeSshConnection(0), 'ls') + self.assertEqual('stdout', o) + self.assertEqual('stderr', e) + + def test_fails(self): + self.assertRaises(processutils.ProcessExecutionError, + processutils.ssh_execute, FakeSshConnection(1), 'ls') + + def _test_compromising_ssh(self, rc, check): + fixture = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG)) + fake_stdin = six.StringIO() + + fake_stdout = mock.Mock() + fake_stdout.channel.recv_exit_status.return_value = rc + fake_stdout.read.return_value = 'password="secret"' + + fake_stderr = six.StringIO('password="foobar"') + + command = 'ls --password="bar"' + + connection = mock.Mock() + connection.exec_command.return_value = (fake_stdin, fake_stdout, + fake_stderr) + + if check and rc != -1 and rc != 0: + err = self.assertRaises(processutils.ProcessExecutionError, + processutils.ssh_execute, + connection, command, + check_exit_code=check) + + self.assertEqual(rc, err.exit_code) + self.assertEqual(err.stdout, 'password="***"') + self.assertEqual(err.stderr, 'password="***"') + self.assertEqual(err.cmd, 'ls --password="***"') + self.assertNotIn('secret', str(err)) + self.assertNotIn('foobar', str(err)) + else: + o, e = processutils.ssh_execute(connection, command, + check_exit_code=check) + self.assertEqual('password="***"', o) + self.assertEqual('password="***"', e) + self.assertIn('password="***"', fixture.output) + self.assertNotIn('bar', fixture.output) + + def test_compromising_ssh1(self): + self._test_compromising_ssh(rc=-1, check=True) + + def test_compromising_ssh2(self): + self._test_compromising_ssh(rc=0, check=True) + + def test_compromising_ssh3(self): + self._test_compromising_ssh(rc=1, check=True) + + def test_compromising_ssh4(self): + self._test_compromising_ssh(rc=1, check=False) + + def test_compromising_ssh5(self): + self._test_compromising_ssh(rc=0, check=False) + + def test_compromising_ssh6(self): + self._test_compromising_ssh(rc=-1, check=False) diff --git a/tests/test_warning.py b/tests/test_warning.py new file mode 100644 index 0000000..ba1c4df --- /dev/null +++ b/tests/test_warning.py @@ -0,0 +1,29 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import imp + +import mock +from oslotest import base as test_base + + +class DeprecationWarningTest(test_base.BaseTestCase): + + @mock.patch('warnings.warn') + def test_warning(self, mock_warn): + import oslo.concurrency + imp.reload(oslo.concurrency) + self.assertTrue(mock_warn.called) + args = mock_warn.call_args + self.assertIn('oslo_concurrency', args[0][0]) + self.assertIn('deprecated', args[0][0]) + self.assertTrue(issubclass(args[0][1], DeprecationWarning)) diff --git a/tox.ini b/tox.ini index fca4498..d14cd1a 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] minversion = 1.6 -envlist = py26,py27,py33,py34,pypy,pep8 +envlist = py33,py34,py26,py27,pep8 # NOTE(dhellmann): We cannot set skipdist=True # for oslo libraries because of the namespace package. #skipsdist = True @@ -54,4 +54,4 @@ exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build [hacking] import_exceptions = - oslo.concurrency._i18n + oslo_concurrency._i18n From faa30f8f132cd61a19cbc101cdac284ad4909029 Mon Sep 17 00:00:00 2001 From: Davanum Srinivas Date: Mon, 17 Nov 2014 18:53:02 -0500 Subject: [PATCH 07/12] Flesh out the README Add details to the README on what can be done with the library Closes-Bug: #1391550 Change-Id: Ia5d34d4d23eaab663764aad0d877999a5414e8d0 --- README.rst | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 6599157..5a0407a 100644 --- a/README.rst +++ b/README.rst @@ -2,7 +2,9 @@ oslo.concurrency ================== -oslo.concurrency library +Oslo concurrency library has utilities for safely running multi-thread, +multi-process applications using locking mechanisms and for running +external processes. * Free software: Apache license * Documentation: http://docs.openstack.org/developer/oslo.concurrency From 656f908f8b84341a5700fb2dc7d9d0250daf9f09 Mon Sep 17 00:00:00 2001 From: Andreas Jaeger Date: Wed, 19 Nov 2014 07:03:55 -0500 Subject: [PATCH 08/12] Move locale files to proper place The module is called oslo.concurrency and thus locale files must reside in oslo.concurrency/locale for the infra scripts to work - as configured in setup.cfg as well. Change-Id: Id39a495bb91dc0ad443a943e754aba62f4a55d6a --- .../locale/en_GB/LC_MESSAGES/oslo.concurrency-log-info.po | 0 .../locale/en_GB/LC_MESSAGES/oslo.concurrency.po | 0 .../locale/oslo.concurrency-log-critical.pot | 0 .../locale/oslo.concurrency-log-error.pot | 0 .../locale/oslo.concurrency-log-info.pot | 0 .../locale/oslo.concurrency-log-warning.pot | 0 .../locale/oslo.concurrency.pot | 0 7 files changed, 0 insertions(+), 0 deletions(-) rename {oslo_concurrency => oslo.concurrency}/locale/en_GB/LC_MESSAGES/oslo.concurrency-log-info.po (100%) rename {oslo_concurrency => oslo.concurrency}/locale/en_GB/LC_MESSAGES/oslo.concurrency.po (100%) rename {oslo_concurrency => oslo.concurrency}/locale/oslo.concurrency-log-critical.pot (100%) rename {oslo_concurrency => oslo.concurrency}/locale/oslo.concurrency-log-error.pot (100%) rename {oslo_concurrency => oslo.concurrency}/locale/oslo.concurrency-log-info.pot (100%) rename {oslo_concurrency => oslo.concurrency}/locale/oslo.concurrency-log-warning.pot (100%) rename {oslo_concurrency => oslo.concurrency}/locale/oslo.concurrency.pot (100%) diff --git a/oslo_concurrency/locale/en_GB/LC_MESSAGES/oslo.concurrency-log-info.po b/oslo.concurrency/locale/en_GB/LC_MESSAGES/oslo.concurrency-log-info.po similarity index 100% rename from oslo_concurrency/locale/en_GB/LC_MESSAGES/oslo.concurrency-log-info.po rename to oslo.concurrency/locale/en_GB/LC_MESSAGES/oslo.concurrency-log-info.po diff --git a/oslo_concurrency/locale/en_GB/LC_MESSAGES/oslo.concurrency.po b/oslo.concurrency/locale/en_GB/LC_MESSAGES/oslo.concurrency.po similarity index 100% rename from oslo_concurrency/locale/en_GB/LC_MESSAGES/oslo.concurrency.po rename to oslo.concurrency/locale/en_GB/LC_MESSAGES/oslo.concurrency.po diff --git a/oslo_concurrency/locale/oslo.concurrency-log-critical.pot b/oslo.concurrency/locale/oslo.concurrency-log-critical.pot similarity index 100% rename from oslo_concurrency/locale/oslo.concurrency-log-critical.pot rename to oslo.concurrency/locale/oslo.concurrency-log-critical.pot diff --git a/oslo_concurrency/locale/oslo.concurrency-log-error.pot b/oslo.concurrency/locale/oslo.concurrency-log-error.pot similarity index 100% rename from oslo_concurrency/locale/oslo.concurrency-log-error.pot rename to oslo.concurrency/locale/oslo.concurrency-log-error.pot diff --git a/oslo_concurrency/locale/oslo.concurrency-log-info.pot b/oslo.concurrency/locale/oslo.concurrency-log-info.pot similarity index 100% rename from oslo_concurrency/locale/oslo.concurrency-log-info.pot rename to oslo.concurrency/locale/oslo.concurrency-log-info.pot diff --git a/oslo_concurrency/locale/oslo.concurrency-log-warning.pot b/oslo.concurrency/locale/oslo.concurrency-log-warning.pot similarity index 100% rename from oslo_concurrency/locale/oslo.concurrency-log-warning.pot rename to oslo.concurrency/locale/oslo.concurrency-log-warning.pot diff --git a/oslo_concurrency/locale/oslo.concurrency.pot b/oslo.concurrency/locale/oslo.concurrency.pot similarity index 100% rename from oslo_concurrency/locale/oslo.concurrency.pot rename to oslo.concurrency/locale/oslo.concurrency.pot From 3bda65ccbb1a087dc4f5526c49799eae7d81c504 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 27 Oct 2014 14:06:37 -0700 Subject: [PATCH 09/12] Allow for providing a customized semaphore container It can be undesirable to have a globally shared sempahore container, especially since oslo.concurrency can now be shared among many disjoint applications and libraries. When a single container is used it is now possible to have those disjoint applications/libraries collide on the same sempahore names. This is not a good pattern to continue with, so in order to move away from it allow a custom container to be provided (which defaults to the existing global one) so that users of oslo.concurrency may provide there own container if they so desire. Change-Id: I9aab42e21ba0f52997de3e7c9b0fea51db5c7289 --- oslo_concurrency/lockutils.py | 70 ++++++++++++++----- oslo_concurrency/tests/unit/test_lockutils.py | 25 +++++++ 2 files changed, 78 insertions(+), 17 deletions(-) diff --git a/oslo_concurrency/lockutils.py b/oslo_concurrency/lockutils.py index b87c01e..7812db8 100644 --- a/oslo_concurrency/lockutils.py +++ b/oslo_concurrency/lockutils.py @@ -185,8 +185,42 @@ else: import fcntl InterProcessLock = _FcntlLock -_semaphores = weakref.WeakValueDictionary() -_semaphores_lock = threading.Lock() + +class Semaphores(object): + """A garbage collected container of semaphores. + + This collection internally uses a weak value dictionary so that when a + semaphore is no longer in use (by any threads) it will automatically be + removed from this container by the garbage collector. + """ + + def __init__(self): + self._semaphores = weakref.WeakValueDictionary() + self._lock = threading.Lock() + + def get(self, name): + """Gets (or creates) a semaphore with a given name. + + :param name: The semaphore name to get/create (used to associate + previously created names with the same semaphore). + + Returns an newly constructed semaphore (or an existing one if it was + already created for the given name). + """ + with self._lock: + try: + return self._semaphores[name] + except KeyError: + sem = threading.Semaphore() + self._semaphores[name] = sem + return sem + + def __len__(self): + """Returns how many semaphores exist at the current time.""" + return len(self._semaphores) + + +_semaphores = Semaphores() def _get_lock_path(name, lock_file_prefix, lock_path=None): @@ -211,11 +245,12 @@ def external_lock(name, lock_file_prefix=None, lock_path=None): return InterProcessLock(lock_file_path) -def remove_external_lock_file(name, lock_file_prefix=None, lock_path=None): +def remove_external_lock_file(name, lock_file_prefix=None, lock_path=None, + semaphores=None): """Remove an external lock file when it's not used anymore This will be helpful when we have a lot of lock files """ - with internal_lock(name): + with internal_lock(name, semaphores=semaphores): lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path) try: os.remove(lock_file_path) @@ -224,20 +259,15 @@ def remove_external_lock_file(name, lock_file_prefix=None, lock_path=None): {'file': lock_file_path}) -def internal_lock(name): - with _semaphores_lock: - try: - sem = _semaphores[name] - except KeyError: - sem = threading.Semaphore() - _semaphores[name] = sem - - return sem +def internal_lock(name, semaphores=None): + if semaphores is None: + semaphores = _semaphores + return semaphores.get(name) @contextlib.contextmanager def lock(name, lock_file_prefix=None, external=False, lock_path=None, - do_log=True): + do_log=True, semaphores=None): """Context based lock This function yields a `threading.Semaphore` instance (if we don't use @@ -259,8 +289,13 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None, :param do_log: Whether to log acquire/release messages. This is primarily intended to reduce log message duplication when `lock` is used from the `synchronized` decorator. + + :param semaphores: Container that provides semaphores to use when locking. + This ensures that threads inside the same application can not collide, + due to the fact that external process locks are unaware of a processes + active threads. """ - int_lock = internal_lock(name) + int_lock = internal_lock(name, semaphores=semaphores) with int_lock: if do_log: LOG.debug('Acquired semaphore "%(lock)s"', {'lock': name}) @@ -276,7 +311,8 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None, LOG.debug('Releasing semaphore "%(lock)s"', {'lock': name}) -def synchronized(name, lock_file_prefix=None, external=False, lock_path=None): +def synchronized(name, lock_file_prefix=None, external=False, lock_path=None, + semaphores=None): """Synchronization decorator. Decorating a method like so:: @@ -307,7 +343,7 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None): t2 = None try: with lock(name, lock_file_prefix, external, lock_path, - do_log=False): + do_log=False, semaphores=semaphores): t2 = time.time() LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: ' 'waited %(wait_secs)0.3fs', diff --git a/oslo_concurrency/tests/unit/test_lockutils.py b/oslo_concurrency/tests/unit/test_lockutils.py index 96b9cdb..f665ebd 100644 --- a/oslo_concurrency/tests/unit/test_lockutils.py +++ b/oslo_concurrency/tests/unit/test_lockutils.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import errno import fcntl import multiprocessing @@ -93,6 +94,30 @@ class LockTestCase(test_base.BaseTestCase): except IOError: pass + def test_lock_internally_different_collections(self): + s1 = lockutils.Semaphores() + s2 = lockutils.Semaphores() + trigger = threading.Event() + who_ran = collections.deque() + + def f(name, semaphores, pull_trigger): + with lockutils.internal_lock('testing', semaphores=semaphores): + if pull_trigger: + trigger.set() + else: + trigger.wait() + who_ran.append(name) + + threads = [ + threading.Thread(target=f, args=(1, s1, True)), + threading.Thread(target=f, args=(2, s2, False)), + ] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + self.assertEqual([1, 2], sorted(who_ran)) + def test_lock_internally(self): """We can lock across multiple threads.""" saved_sem_num = len(lockutils._semaphores) From 46c836ee28aff5bfa57598971cd175a0be0951d9 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 24 Oct 2014 11:29:44 -0700 Subject: [PATCH 10/12] Allow the lock delay to be provided When a lock can't be acquired there is currently a hard coded delay (0.01) that is used before trying again, instead of having a hard coded delay we should allow this delay to be configured since having it set at a hard coded value can limit concurrency (if the delay is actually way to high) or cause to much contention (if the delay is actually way to low). This review adds on that logic and also uses the retrying library to perform the acquisition attempts (and associated failures when/if they occur); as well as shows logs after a given amount of time has elapsed with the logs being output at a given periodicity. Change-Id: Ideeefba1439ddd677c608d01becb4f6a0d4bc83d --- oslo_concurrency/lockutils.py | 138 ++++++++++++++++++++++++++-------- requirements-py3.txt | 1 + requirements.txt | 1 + 3 files changed, 109 insertions(+), 31 deletions(-) diff --git a/oslo_concurrency/lockutils.py b/oslo_concurrency/lockutils.py index 7812db8..ae1c498 100644 --- a/oslo_concurrency/lockutils.py +++ b/oslo_concurrency/lockutils.py @@ -28,6 +28,7 @@ import weakref from oslo.config import cfg from oslo.config import cfgfilter +import retrying import six from oslo_concurrency._i18n import _, _LE, _LI @@ -64,6 +65,86 @@ def set_defaults(lock_path): cfg.set_defaults(_opts, lock_path=lock_path) +class _Hourglass(object): + """A hourglass like periodic timer.""" + + def __init__(self, period): + self._period = period + self._last_flipped = None + + def flip(self): + """Flips the hourglass. + + The drain() method will now only return true until the period + is reached again. + """ + self._last_flipped = time.time() + + def drain(self): + """Drains the hourglass, returns True if period reached.""" + if self._last_flipped is None: + return True + else: + elapsed = max(0, time.time() - self._last_flipped) + return elapsed >= self._period + + +def _lock_retry(delay, filename, + # These parameters trigger logging to begin after a certain + # amount of time has elapsed where the lock couldn't be + # acquired (log statements will be emitted after that duration + # at the provided periodicity). + log_begins_after=1.0, log_periodicity=0.5): + """Retry logic that acquiring a lock will go through.""" + + # If this returns True, a retry attempt will occur (using the defined + # retry policy we have requested the retrying library to apply), if it + # returns False then the original exception will be re-raised (if it + # raises a new or different exception the original exception will be + # replaced with that one and raised). + def retry_on_exception(e): + if isinstance(e, IOError) and e.errno in (errno.EACCES, errno.EAGAIN): + return True + raise threading.ThreadError(_("Unable to acquire lock on" + " `%(filename)s` due to" + " %(exception)s") % + { + 'filename': filename, + 'exception': e, + }) + + # Logs all attempts (with information about how long we have been trying + # to acquire the underlying lock...); after a threshold has been passed, + # and only at a fixed rate... + def never_stop(hg, attempt_number, delay_since_first_attempt_ms): + delay_since_first_attempt = delay_since_first_attempt_ms / 1000.0 + if delay_since_first_attempt >= log_begins_after: + if hg.drain(): + LOG.debug("Attempting to acquire %s (delayed %0.2f seconds)", + filename, delay_since_first_attempt) + hg.flip() + return False + + # The retrying library seems to prefer milliseconds for some reason; this + # might be changed in (see: https://github.com/rholder/retrying/issues/6) + # someday in the future... + delay_ms = delay * 1000.0 + + def decorator(func): + + @six.wraps(func) + def wrapper(*args, **kwargs): + hg = _Hourglass(log_periodicity) + r = retrying.Retrying(wait_fixed=delay_ms, + retry_on_exception=retry_on_exception, + stop_func=functools.partial(never_stop, hg)) + return r.call(func, *args, **kwargs) + + return wrapper + + return decorator + + class _FileLock(object): """Lock implementation which allows multiple locks, working around issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does @@ -87,9 +168,11 @@ class _FileLock(object): self.fname = name self.acquire_time = None - def acquire(self): - basedir = os.path.dirname(self.fname) + def acquire(self, delay=0.01): + if delay < 0: + raise ValueError("Delay must be greater than or equal to zero") + basedir = os.path.dirname(self.fname) if not os.path.exists(basedir): fileutils.ensure_tree(basedir) LOG.info(_LI('Created lock path: %s'), basedir) @@ -98,32 +181,20 @@ class _FileLock(object): # the target file. This eliminates the possibility of an attacker # creating a symlink to an important file in our lock_path. self.lockfile = open(self.fname, 'a') - start_time = time.time() - while True: - try: - # Using non-blocking locks since green threads are not - # patched to deal with blocking locking calls. - # Also upon reading the MSDN docs for locking(), it seems - # to have a laughable 10 attempts "blocking" mechanism. - self.trylock() - self.acquire_time = time.time() - LOG.debug('Acquired file lock "%s" after waiting %0.3fs', - self.fname, (self.acquire_time - start_time)) - return True - except IOError as e: - if e.errno in (errno.EACCES, errno.EAGAIN): - # external locks synchronise things like iptables - # updates - give it some time to prevent busy spinning - time.sleep(0.01) - else: - raise threading.ThreadError(_("Unable to acquire lock on" - " `%(filename)s` due to" - " %(exception)s") % - { - 'filename': self.fname, - 'exception': e, - }) + + # Using non-blocking locks (with retries) since green threads are not + # patched to deal with blocking locking calls. Also upon reading the + # MSDN docs for locking(), it seems to have a 'laughable' 10 + # attempts "blocking" mechanism. + do_acquire = _lock_retry(delay=delay, + filename=self.fname)(self.trylock) + do_acquire() + self.acquire_time = time.time() + LOG.debug('Acquired file lock "%s" after waiting %0.3fs', + self.fname, (self.acquire_time - start_time)) + + return True def __enter__(self): self.acquire() @@ -267,7 +338,7 @@ def internal_lock(name, semaphores=None): @contextlib.contextmanager def lock(name, lock_file_prefix=None, external=False, lock_path=None, - do_log=True, semaphores=None): + do_log=True, semaphores=None, delay=0.01): """Context based lock This function yields a `threading.Semaphore` instance (if we don't use @@ -294,6 +365,8 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None, This ensures that threads inside the same application can not collide, due to the fact that external process locks are unaware of a processes active threads. + + :param delay: Delay between acquisition attempts (in seconds). """ int_lock = internal_lock(name, semaphores=semaphores) with int_lock: @@ -302,8 +375,11 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None, try: if external and not CONF.oslo_concurrency.disable_process_locking: ext_lock = external_lock(name, lock_file_prefix, lock_path) - with ext_lock: + ext_lock.acquire(delay=delay) + try: yield ext_lock + finally: + ext_lock.release() else: yield int_lock finally: @@ -312,7 +388,7 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None, def synchronized(name, lock_file_prefix=None, external=False, lock_path=None, - semaphores=None): + semaphores=None, delay=0.01): """Synchronization decorator. Decorating a method like so:: @@ -343,7 +419,7 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None, t2 = None try: with lock(name, lock_file_prefix, external, lock_path, - do_log=False, semaphores=semaphores): + do_log=False, semaphores=semaphores, delay=delay): t2 = time.time() LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: ' 'waited %(wait_secs)0.3fs', diff --git a/requirements-py3.txt b/requirements-py3.txt index b1a8722..a27b434 100644 --- a/requirements-py3.txt +++ b/requirements-py3.txt @@ -11,3 +11,4 @@ oslo.i18n>=1.0.0 # Apache-2.0 oslo.utils>=1.0.0 # Apache-2.0 posix_ipc six>=1.7.0 +retrying>=1.2.2,!=1.3.0 # Apache-2.0 diff --git a/requirements.txt b/requirements.txt index b1a8722..a27b434 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ oslo.i18n>=1.0.0 # Apache-2.0 oslo.utils>=1.0.0 # Apache-2.0 posix_ipc six>=1.7.0 +retrying>=1.2.2,!=1.3.0 # Apache-2.0 From 19f07c600c1c5f142c06b8261360e230b97b2c69 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Tue, 25 Nov 2014 14:09:06 -0800 Subject: [PATCH 11/12] Add a TODO for retrying pull request #20 Sometime in a future retrying version there will likely/hopefully exist a ability to more easily filter on specific exceptions; this commit adds a note to the upstream pull request that may add this feature. Change-Id: I44f1c9fe4fbbd5f77032ca7bfff4e2e6b7ff7622 --- oslo_concurrency/lockutils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/oslo_concurrency/lockutils.py b/oslo_concurrency/lockutils.py index ae1c498..e9fd4d7 100644 --- a/oslo_concurrency/lockutils.py +++ b/oslo_concurrency/lockutils.py @@ -103,6 +103,9 @@ def _lock_retry(delay, filename, # raises a new or different exception the original exception will be # replaced with that one and raised). def retry_on_exception(e): + # TODO(harlowja): once/if https://github.com/rholder/retrying/pull/20 + # gets merged we should just switch to using that to avoid having to + # catch and inspect all execeptions (and there types...) if isinstance(e, IOError) and e.errno in (errno.EACCES, errno.EAGAIN): return True raise threading.ThreadError(_("Unable to acquire lock on" From 54c84da50a7dcda327c01047846294c77516ca38 Mon Sep 17 00:00:00 2001 From: Ben Nemec Date: Tue, 28 Oct 2014 17:43:30 +0000 Subject: [PATCH 12/12] Add external lock fixture This was requested by consumers of the library so they don't have to enable external locks globally with the OSLO_LOCK_PATH env var, which can mask bugs in unit tests that have interdependencies because it makes every lock in any unit test shared. The new fixture allows a separate lock directory to be created for each test, and allows external locking to only be enabled for tests that need it. Change-Id: Iae7ce302e1a3a5ad90ca5310f5ac7a6164867637 --- oslo_concurrency/fixture/lockutils.py | 25 +++++++++++++++++++++++++ tests/test_lockutils.py | 24 ++++++++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/oslo_concurrency/fixture/lockutils.py b/oslo_concurrency/fixture/lockutils.py index 01cec72..9639e40 100644 --- a/oslo_concurrency/fixture/lockutils.py +++ b/oslo_concurrency/fixture/lockutils.py @@ -14,6 +14,7 @@ # under the License. import fixtures +from oslo.config import fixture as config from oslo_concurrency import lockutils @@ -49,3 +50,27 @@ class LockFixture(fixtures.Fixture): super(LockFixture, self).setUp() self.addCleanup(self.mgr.__exit__, None, None, None) self.lock = self.mgr.__enter__() + + +class ExternalLockFixture(fixtures.Fixture): + """Configure lock_path so external locks can be used in unit tests. + + Creates a temporary directory to hold file locks and sets the oslo.config + lock_path opt to use it. This can be used to enable external locking + on a per-test basis, rather than globally with the OSLO_LOCK_PATH + environment variable. + + Example:: + + def test_method(self): + self.useFixture(ExternalLockFixture()) + something_that_needs_external_locks() + + Alternatively, the useFixture call could be placed in a test class's + setUp method to provide this functionality to all tests in the class. + """ + def setUp(self): + super(ExternalLockFixture, self).setUp() + temp_dir = self.useFixture(fixtures.TempDir()) + conf = self.useFixture(config.Config(lockutils.CONF)).config + conf(lock_path=temp_dir.path, group='oslo_concurrency') diff --git a/tests/test_lockutils.py b/tests/test_lockutils.py index 0bde4a8..1788be2 100644 --- a/tests/test_lockutils.py +++ b/tests/test_lockutils.py @@ -549,3 +549,27 @@ class TestLockFixture(test_base.BaseTestCase): fixture = fixtures.LockFixture('test-lock') self.useFixture(fixture) self.lock = fixture.lock + + +class TestExternalLockFixture(test_base.BaseTestCase): + def test_fixture(self): + # NOTE(bnemec): This test case is only valid if lockutils-wrapper is + # _not_ in use. Otherwise lock_path will be set on lockutils import + # and this test will pass regardless of whether the fixture is used. + self.useFixture(fixtures.ExternalLockFixture()) + # This will raise an exception if lock_path is not set + with lockutils.external_lock('foo'): + pass + + def test_with_existing_config_fixture(self): + # Make sure the config fixture in the ExternalLockFixture doesn't + # cause any issues for tests using their own config fixture. + conf = self.useFixture(config.Config()) + self.useFixture(fixtures.ExternalLockFixture()) + with lockutils.external_lock('bar'): + conf.register_opt(cfg.StrOpt('foo')) + conf.config(foo='bar') + self.assertEqual(cfg.CONF.foo, 'bar') + # Due to config filter, lock_path should still not be present in + # the global config opt. + self.assertFalse(hasattr(cfg.CONF, 'lock_path'))