diff --git a/etc/sahara/sahara.conf.sample b/etc/sahara/sahara.conf.sample index 1f39673e..ffa8a931 100644 --- a/etc/sahara/sahara.conf.sample +++ b/etc/sahara/sahara.conf.sample @@ -205,9 +205,6 @@ # List of logger=LEVEL pairs. (list value) #default_log_levels = amqplib=WARN,qpid.messaging=INFO,stevedore=INFO,eventlet.wsgi.server=WARN,sqlalchemy=WARN,boto=WARN,suds=INFO,keystone=INFO,paramiko=WARN,requests=WARN,iso8601=WARN -# Enables or disables inter-process locks. (boolean value) -#disable_process_locking = false - # Enables data locality for hadoop cluster. Also # enables data locality for Swift used by hadoop. If # enabled, 'compute_topology' and 'swift_topology' @@ -250,9 +247,6 @@ # cancel job execution during this time. (integer value) #job_canceling_timeout = 300 -# Directory to use for lock files. (string value) -#lock_path = - # The name of a logging configuration file. This file is appended to # any existing logging configuration files. For details about logging # configuration files, see the Python logging module documentation. @@ -762,6 +756,24 @@ #ringfile = /etc/oslo/matchmaker_ring.json +[oslo_concurrency] + +# +# From oslo.concurrency +# + +# Enables or disables inter-process locks. (boolean value) +# Deprecated group/name - [DEFAULT]/disable_process_locking +#disable_process_locking = false + +# Directory to use for lock files. For security, the specified +# directory should only be writable by the user running the processes +# that need locking. Defaults to environment variable OSLO_LOCK_PATH. +# If external locks are used, a lock path must be set. (string value) +# Deprecated group/name - [DEFAULT]/lock_path +#lock_path = + + [oslo_messaging_amqp] # diff --git a/openstack-common.conf b/openstack-common.conf index 5d28a698..d0844d8c 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -1,11 +1,9 @@ [DEFAULT] # The list of modules to copy from oslo-incubator -module=lockutils module=log module=periodic_task module=policy -module=processutils module=threadgroup module=uuidutils diff --git a/requirements.txt b/requirements.txt index f9e27af4..83fb8e6f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,7 @@ jsonschema>=2.0.0,<3.0.0 keystonemiddleware>=1.0.0 lockfile>=0.8 oslo.config>=1.4.0 # Apache-2.0 +oslo.concurrency>=0.3.0 # Apache-2.0 oslo.context>=0.1.0 # Apache-2.0 oslo.db>=1.1.0 # Apache-2.0 oslo.i18n>=1.0.0 # Apache-2.0 diff --git a/sahara/config.py b/sahara/config.py index 3558f4ae..79b2226b 100644 --- a/sahara/config.py +++ b/sahara/config.py @@ -19,7 +19,6 @@ from oslo.config import cfg from sahara import exceptions as ex from sahara.i18n import _ -from sahara.openstack.common import lockutils from sahara.openstack.common import log from sahara.openstack.common import periodic_task from sahara.openstack.common import policy @@ -124,7 +123,6 @@ def list_opts(): edp_opts, networking_opts, db_opts, - lockutils.util_opts, policy.policy_opts, log.common_cli_opts, log.generic_log_opts, diff --git a/sahara/openstack/common/lockutils.py b/sahara/openstack/common/lockutils.py deleted file mode 100644 index 720fcd0a..00000000 --- a/sahara/openstack/common/lockutils.py +++ /dev/null @@ -1,326 +0,0 @@ -# Copyright 2011 OpenStack Foundation. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import contextlib -import errno -import functools -import logging -import os -import shutil -import subprocess -import sys -import tempfile -import threading -import time -import weakref - -from oslo.config import cfg - -from sahara.openstack.common import fileutils -from sahara.openstack.common._i18n import _, _LE, _LI - - -LOG = logging.getLogger(__name__) - - -util_opts = [ - cfg.BoolOpt('disable_process_locking', default=False, - help='Enables or disables inter-process locks.'), - cfg.StrOpt('lock_path', - default=os.environ.get("SAHARA_LOCK_PATH"), - help='Directory to use for lock files.') -] - - -CONF = cfg.CONF -CONF.register_opts(util_opts) - - -def set_defaults(lock_path): - cfg.set_defaults(util_opts, lock_path=lock_path) - - -class _FileLock(object): - """Lock implementation which allows multiple locks, working around - issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does - not require any cleanup. Since the lock is always held on a file - descriptor rather than outside of the process, the lock gets dropped - automatically if the process crashes, even if __exit__ is not executed. - - There are no guarantees regarding usage by multiple green threads in a - single process here. This lock works only between processes. Exclusive - access between local threads should be achieved using the semaphores - in the @synchronized decorator. - - Note these locks are released when the descriptor is closed, so it's not - safe to close the file descriptor while another green thread holds the - lock. Just opening and closing the lock file can break synchronisation, - so lock files must be accessed only using this abstraction. - """ - - def __init__(self, name): - self.lockfile = None - self.fname = name - - def acquire(self): - basedir = os.path.dirname(self.fname) - - if not os.path.exists(basedir): - fileutils.ensure_tree(basedir) - LOG.info(_LI('Created lock path: %s'), basedir) - - self.lockfile = open(self.fname, 'w') - - while True: - try: - # Using non-blocking locks since green threads are not - # patched to deal with blocking locking calls. - # Also upon reading the MSDN docs for locking(), it seems - # to have a laughable 10 attempts "blocking" mechanism. - self.trylock() - LOG.debug('Got file lock "%s"', self.fname) - return True - except IOError as e: - if e.errno in (errno.EACCES, errno.EAGAIN): - # external locks synchronise things like iptables - # updates - give it some time to prevent busy spinning - time.sleep(0.01) - else: - raise threading.ThreadError(_("Unable to acquire lock on" - " `%(filename)s` due to" - " %(exception)s") % - {'filename': self.fname, - 'exception': e}) - - def __enter__(self): - self.acquire() - return self - - def release(self): - try: - self.unlock() - self.lockfile.close() - LOG.debug('Released file lock "%s"', self.fname) - except IOError: - LOG.exception(_LE("Could not release the acquired lock `%s`"), - self.fname) - - def __exit__(self, exc_type, exc_val, exc_tb): - self.release() - - def exists(self): - return os.path.exists(self.fname) - - def trylock(self): - raise NotImplementedError() - - def unlock(self): - raise NotImplementedError() - - -class _WindowsLock(_FileLock): - def trylock(self): - msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1) - - def unlock(self): - msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1) - - -class _FcntlLock(_FileLock): - def trylock(self): - fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) - - def unlock(self): - fcntl.lockf(self.lockfile, fcntl.LOCK_UN) - - -if os.name == 'nt': - import msvcrt - InterProcessLock = _WindowsLock -else: - import fcntl - InterProcessLock = _FcntlLock - -_semaphores = weakref.WeakValueDictionary() -_semaphores_lock = threading.Lock() - - -def _get_lock_path(name, lock_file_prefix, lock_path=None): - # NOTE(mikal): the lock name cannot contain directory - # separators - name = name.replace(os.sep, '_') - if lock_file_prefix: - sep = '' if lock_file_prefix.endswith('-') else '-' - name = '%s%s%s' % (lock_file_prefix, sep, name) - - local_lock_path = lock_path or CONF.lock_path - - if not local_lock_path: - raise cfg.RequiredOptError('lock_path') - - return os.path.join(local_lock_path, name) - - -def external_lock(name, lock_file_prefix=None, lock_path=None): - LOG.debug('Attempting to grab external lock "%(lock)s"', - {'lock': name}) - - lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path) - - return InterProcessLock(lock_file_path) - - -def remove_external_lock_file(name, lock_file_prefix=None): - """Remove an external lock file when it's not used anymore - This will be helpful when we have a lot of lock files - """ - with internal_lock(name): - lock_file_path = _get_lock_path(name, lock_file_prefix) - try: - os.remove(lock_file_path) - except OSError: - LOG.info(_LI('Failed to remove file %(file)s'), - {'file': lock_file_path}) - - -def internal_lock(name): - with _semaphores_lock: - try: - sem = _semaphores[name] - LOG.debug('Using existing semaphore "%s"', name) - except KeyError: - sem = threading.Semaphore() - _semaphores[name] = sem - LOG.debug('Created new semaphore "%s"', name) - - return sem - - -@contextlib.contextmanager -def lock(name, lock_file_prefix=None, external=False, lock_path=None): - """Context based lock - - This function yields a `threading.Semaphore` instance (if we don't use - eventlet.monkey_patch(), else `semaphore.Semaphore`) unless external is - True, in which case, it'll yield an InterProcessLock instance. - - :param lock_file_prefix: The lock_file_prefix argument is used to provide - lock files on disk with a meaningful prefix. - - :param external: The external keyword argument denotes whether this lock - should work across multiple processes. This means that if two different - workers both run a method decorated with @synchronized('mylock', - external=True), only one of them will execute at a time. - """ - int_lock = internal_lock(name) - with int_lock: - LOG.debug('Acquired semaphore "%(lock)s"', {'lock': name}) - try: - if external and not CONF.disable_process_locking: - ext_lock = external_lock(name, lock_file_prefix, lock_path) - with ext_lock: - yield ext_lock - else: - yield int_lock - finally: - LOG.debug('Releasing semaphore "%(lock)s"', {'lock': name}) - - -def synchronized(name, lock_file_prefix=None, external=False, lock_path=None): - """Synchronization decorator. - - Decorating a method like so:: - - @synchronized('mylock') - def foo(self, *args): - ... - - ensures that only one thread will execute the foo method at a time. - - Different methods can share the same lock:: - - @synchronized('mylock') - def foo(self, *args): - ... - - @synchronized('mylock') - def bar(self, *args): - ... - - This way only one of either foo or bar can be executing at a time. - """ - - def wrap(f): - @functools.wraps(f) - def inner(*args, **kwargs): - try: - with lock(name, lock_file_prefix, external, lock_path): - LOG.debug('Got semaphore / lock "%(function)s"', - {'function': f.__name__}) - return f(*args, **kwargs) - finally: - LOG.debug('Semaphore / lock released "%(function)s"', - {'function': f.__name__}) - return inner - return wrap - - -def synchronized_with_prefix(lock_file_prefix): - """Partial object generator for the synchronization decorator. - - Redefine @synchronized in each project like so:: - - (in nova/utils.py) - from nova.openstack.common import lockutils - - synchronized = lockutils.synchronized_with_prefix('nova-') - - - (in nova/foo.py) - from nova import utils - - @utils.synchronized('mylock') - def bar(self, *args): - ... - - The lock_file_prefix argument is used to provide lock files on disk with a - meaningful prefix. - """ - - return functools.partial(synchronized, lock_file_prefix=lock_file_prefix) - - -def main(argv): - """Create a dir for locks and pass it to command from arguments - - If you run this: - python -m openstack.common.lockutils python setup.py testr - - a temporary directory will be created for all your locks and passed to all - your tests in an environment variable. The temporary dir will be deleted - afterwards and the return value will be preserved. - """ - - lock_dir = tempfile.mkdtemp() - os.environ["SAHARA_LOCK_PATH"] = lock_dir - try: - ret_val = subprocess.call(argv[1:]) - finally: - shutil.rmtree(lock_dir, ignore_errors=True) - return ret_val - - -if __name__ == '__main__': - sys.exit(main(sys.argv)) diff --git a/sahara/openstack/common/processutils.py b/sahara/openstack/common/processutils.py deleted file mode 100644 index f6332fa0..00000000 --- a/sahara/openstack/common/processutils.py +++ /dev/null @@ -1,289 +0,0 @@ -# Copyright 2011 OpenStack Foundation. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -System-level utilities and helper functions. -""" - -import errno -import logging -import multiprocessing -import os -import random -import shlex -import signal - -from eventlet.green import subprocess -from eventlet import greenthread -from oslo.utils import strutils -import six - -from sahara.openstack.common._i18n import _ - - -LOG = logging.getLogger(__name__) - - -class InvalidArgumentError(Exception): - def __init__(self, message=None): - super(InvalidArgumentError, self).__init__(message) - - -class UnknownArgumentError(Exception): - def __init__(self, message=None): - super(UnknownArgumentError, self).__init__(message) - - -class ProcessExecutionError(Exception): - def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None, - description=None): - self.exit_code = exit_code - self.stderr = stderr - self.stdout = stdout - self.cmd = cmd - self.description = description - - if description is None: - description = _("Unexpected error while running command.") - if exit_code is None: - exit_code = '-' - message = _('%(description)s\n' - 'Command: %(cmd)s\n' - 'Exit code: %(exit_code)s\n' - 'Stdout: %(stdout)r\n' - 'Stderr: %(stderr)r') % {'description': description, - 'cmd': cmd, - 'exit_code': exit_code, - 'stdout': stdout, - 'stderr': stderr} - super(ProcessExecutionError, self).__init__(message) - - -class NoRootWrapSpecified(Exception): - def __init__(self, message=None): - super(NoRootWrapSpecified, self).__init__(message) - - -def _subprocess_setup(): - # Python installs a SIGPIPE handler by default. This is usually not what - # non-Python subprocesses expect. - signal.signal(signal.SIGPIPE, signal.SIG_DFL) - - -def execute(*cmd, **kwargs): - """Helper method to shell out and execute a command through subprocess. - - Allows optional retry. - - :param cmd: Passed to subprocess.Popen. - :type cmd: string - :param process_input: Send to opened process. - :type process_input: string - :param env_variables: Environment variables and their values that - will be set for the process. - :type env_variables: dict - :param check_exit_code: Single bool, int, or list of allowed exit - codes. Defaults to [0]. Raise - :class:`ProcessExecutionError` unless - program exits with one of these code. - :type check_exit_code: boolean, int, or [int] - :param delay_on_retry: True | False. Defaults to True. If set to True, - wait a short amount of time before retrying. - :type delay_on_retry: boolean - :param attempts: How many times to retry cmd. - :type attempts: int - :param run_as_root: True | False. Defaults to False. If set to True, - the command is prefixed by the command specified - in the root_helper kwarg. - :type run_as_root: boolean - :param root_helper: command to prefix to commands called with - run_as_root=True - :type root_helper: string - :param shell: whether or not there should be a shell used to - execute this command. Defaults to false. - :type shell: boolean - :param loglevel: log level for execute commands. - :type loglevel: int. (Should be logging.DEBUG or logging.INFO) - :returns: (stdout, stderr) from process execution - :raises: :class:`UnknownArgumentError` on - receiving unknown arguments - :raises: :class:`ProcessExecutionError` - """ - - process_input = kwargs.pop('process_input', None) - env_variables = kwargs.pop('env_variables', None) - check_exit_code = kwargs.pop('check_exit_code', [0]) - ignore_exit_code = False - delay_on_retry = kwargs.pop('delay_on_retry', True) - attempts = kwargs.pop('attempts', 1) - run_as_root = kwargs.pop('run_as_root', False) - root_helper = kwargs.pop('root_helper', '') - shell = kwargs.pop('shell', False) - loglevel = kwargs.pop('loglevel', logging.DEBUG) - - if isinstance(check_exit_code, bool): - ignore_exit_code = not check_exit_code - check_exit_code = [0] - elif isinstance(check_exit_code, int): - check_exit_code = [check_exit_code] - - if kwargs: - raise UnknownArgumentError(_('Got unknown keyword args: %r') % kwargs) - - if run_as_root and hasattr(os, 'geteuid') and os.geteuid() != 0: - if not root_helper: - raise NoRootWrapSpecified( - message=_('Command requested root, but did not ' - 'specify a root helper.')) - cmd = shlex.split(root_helper) + list(cmd) - - cmd = map(str, cmd) - sanitized_cmd = strutils.mask_password(' '.join(cmd)) - - while attempts > 0: - attempts -= 1 - try: - LOG.log(loglevel, _('Running cmd (subprocess): %s'), sanitized_cmd) - _PIPE = subprocess.PIPE # pylint: disable=E1101 - - if os.name == 'nt': - preexec_fn = None - close_fds = False - else: - preexec_fn = _subprocess_setup - close_fds = True - - obj = subprocess.Popen(cmd, - stdin=_PIPE, - stdout=_PIPE, - stderr=_PIPE, - close_fds=close_fds, - preexec_fn=preexec_fn, - shell=shell, - env=env_variables) - result = None - for _i in six.moves.range(20): - # NOTE(russellb) 20 is an arbitrary number of retries to - # prevent any chance of looping forever here. - try: - if process_input is not None: - result = obj.communicate(process_input) - else: - result = obj.communicate() - except OSError as e: - if e.errno in (errno.EAGAIN, errno.EINTR): - continue - raise - break - obj.stdin.close() # pylint: disable=E1101 - _returncode = obj.returncode # pylint: disable=E1101 - LOG.log(loglevel, 'Result was %s' % _returncode) - if not ignore_exit_code and _returncode not in check_exit_code: - (stdout, stderr) = result - sanitized_stdout = strutils.mask_password(stdout) - sanitized_stderr = strutils.mask_password(stderr) - raise ProcessExecutionError(exit_code=_returncode, - stdout=sanitized_stdout, - stderr=sanitized_stderr, - cmd=sanitized_cmd) - return result - except ProcessExecutionError: - if not attempts: - raise - else: - LOG.log(loglevel, _('%r failed. Retrying.'), sanitized_cmd) - if delay_on_retry: - greenthread.sleep(random.randint(20, 200) / 100.0) - finally: - # NOTE(termie): this appears to be necessary to let the subprocess - # call clean something up in between calls, without - # it two execute calls in a row hangs the second one - greenthread.sleep(0) - - -def trycmd(*args, **kwargs): - """A wrapper around execute() to more easily handle warnings and errors. - - Returns an (out, err) tuple of strings containing the output of - the command's stdout and stderr. If 'err' is not empty then the - command can be considered to have failed. - - :discard_warnings True | False. Defaults to False. If set to True, - then for succeeding commands, stderr is cleared - - """ - discard_warnings = kwargs.pop('discard_warnings', False) - - try: - out, err = execute(*args, **kwargs) - failed = False - except ProcessExecutionError as exn: - out, err = '', six.text_type(exn) - failed = True - - if not failed and discard_warnings and err: - # Handle commands that output to stderr but otherwise succeed - err = '' - - return out, err - - -def ssh_execute(ssh, cmd, process_input=None, - addl_env=None, check_exit_code=True): - sanitized_cmd = strutils.mask_password(cmd) - LOG.debug('Running cmd (SSH): %s', sanitized_cmd) - if addl_env: - raise InvalidArgumentError(_('Environment not supported over SSH')) - - if process_input: - # This is (probably) fixable if we need it... - raise InvalidArgumentError(_('process_input not supported over SSH')) - - stdin_stream, stdout_stream, stderr_stream = ssh.exec_command(cmd) - channel = stdout_stream.channel - - # NOTE(justinsb): This seems suspicious... - # ...other SSH clients have buffering issues with this approach - stdout = stdout_stream.read() - sanitized_stdout = strutils.mask_password(stdout) - stderr = stderr_stream.read() - sanitized_stderr = strutils.mask_password(stderr) - - stdin_stream.close() - - exit_status = channel.recv_exit_status() - - # exit_status == -1 if no exit code was returned - if exit_status != -1: - LOG.debug('Result was %s' % exit_status) - if check_exit_code and exit_status != 0: - raise ProcessExecutionError(exit_code=exit_status, - stdout=sanitized_stdout, - stderr=sanitized_stderr, - cmd=sanitized_cmd) - - return (sanitized_stdout, sanitized_stderr) - - -def get_worker_count(): - """Utility to get the default worker count. - - @return: The number of CPUs if that can be determined, else a default - worker count of 1 is returned. - """ - try: - return multiprocessing.cpu_count() - except NotImplementedError: - return 1 diff --git a/sahara/tests/unit/db/migration/test_migrations_base.py b/sahara/tests/unit/db/migration/test_migrations_base.py index 8348ad9f..95b05296 100644 --- a/sahara/tests/unit/db/migration/test_migrations_base.py +++ b/sahara/tests/unit/db/migration/test_migrations_base.py @@ -33,6 +33,8 @@ from alembic import migration from alembic import script as alembic_script from oslo.config import cfg from oslo.db.sqlalchemy import test_migrations as t_m +from oslo_concurrency import lockutils +from oslo_concurrency import processutils import six.moves.urllib.parse as urlparse import sqlalchemy import sqlalchemy.exc @@ -41,9 +43,7 @@ import testtools import sahara.db.migration from sahara.db.sqlalchemy import api as sa from sahara.db.sqlalchemy import model_base -from sahara.openstack.common import lockutils from sahara.openstack.common import log as logging -from sahara.openstack.common import processutils LOG = logging.getLogger(__name__) diff --git a/sahara/utils/crypto.py b/sahara/utils/crypto.py index 24ef1c37..492ad357 100644 --- a/sahara/utils/crypto.py +++ b/sahara/utils/crypto.py @@ -15,12 +15,12 @@ import os +from oslo_concurrency import processutils import paramiko import six from sahara import exceptions as ex from sahara.i18n import _ -from sahara.openstack.common import processutils from sahara.utils import tempfiles diff --git a/tools/config/config-generator.sahara.conf b/tools/config/config-generator.sahara.conf index d8e1d749..bb92498e 100644 --- a/tools/config/config-generator.sahara.conf +++ b/tools/config/config-generator.sahara.conf @@ -2,5 +2,6 @@ namespace = sahara namespace = oslo.db namespace = oslo.messaging +namespace = oslo.concurrency namespace = keystonemiddleware.auth_token namespace = sahara_main