Using oslo.concurrency lib
Replace processutils and lockutils modules of oslo-incubator with oslo.concurrency lib. Change-Id: Ic1af8753a70f1aada22efe8132e48cbc16e14f3f Signed-off-by: Zhi Yan Liu <zhiyanl@cn.ibm.com>
This commit is contained in:
parent
3065df96cd
commit
882049a613
@ -85,11 +85,6 @@ backlog = 4096
|
|||||||
# Supported values for the 'disk_format' image attribute
|
# Supported values for the 'disk_format' image attribute
|
||||||
#disk_formats=ami,ari,aki,vhd,vmdk,raw,qcow2,vdi,iso
|
#disk_formats=ami,ari,aki,vhd,vmdk,raw,qcow2,vdi,iso
|
||||||
|
|
||||||
# Directory to use for lock files. Default to a temp directory
|
|
||||||
# (string value). This setting needs to be the same for both
|
|
||||||
# glance-scrubber and glance-api.
|
|
||||||
#lock_path=<None>
|
|
||||||
|
|
||||||
# Property Protections config file
|
# Property Protections config file
|
||||||
# This file contains the rules for property protections and the roles/policies
|
# This file contains the rules for property protections and the roles/policies
|
||||||
# associated with it.
|
# associated with it.
|
||||||
@ -402,6 +397,20 @@ image_cache_dir = /var/lib/glance/image-cache/
|
|||||||
# (setting -1 implies an infinite retry count) (integer value)
|
# (setting -1 implies an infinite retry count) (integer value)
|
||||||
#db_max_retries = 20
|
#db_max_retries = 20
|
||||||
|
|
||||||
|
[oslo_concurrency]
|
||||||
|
|
||||||
|
# Enables or disables inter-process locks. (boolean value)
|
||||||
|
# Deprecated group/name - [DEFAULT]/disable_process_locking
|
||||||
|
#disable_process_locking = false
|
||||||
|
|
||||||
|
# Directory to use for lock files. For security, the specified
|
||||||
|
# directory should only be writable by the user running the processes
|
||||||
|
# that need locking. It could be read from environment variable
|
||||||
|
# OSLO_LOCK_PATH. This setting needs to be the same for both
|
||||||
|
# glance-scrubber and glance-api service. Default to a temp directory.
|
||||||
|
# Deprecated group/name - [DEFAULT]/lock_path (string value)
|
||||||
|
#lock_path = /tmp
|
||||||
|
|
||||||
[keystone_authtoken]
|
[keystone_authtoken]
|
||||||
identity_uri = http://127.0.0.1:35357
|
identity_uri = http://127.0.0.1:35357
|
||||||
admin_tenant_name = %SERVICE_TENANT_NAME%
|
admin_tenant_name = %SERVICE_TENANT_NAME%
|
||||||
|
@ -43,11 +43,6 @@ registry_port = 9191
|
|||||||
# admin_user = %SERVICE_USER%
|
# admin_user = %SERVICE_USER%
|
||||||
# admin_password = %SERVICE_PASSWORD%
|
# admin_password = %SERVICE_PASSWORD%
|
||||||
|
|
||||||
# Directory to use for lock files. Default to a temp directory
|
|
||||||
# (string value). This setting needs to be the same for both
|
|
||||||
# glance-scrubber and glance-api.
|
|
||||||
#lock_path=<None>
|
|
||||||
|
|
||||||
# API to use for accessing data. Default value points to sqlalchemy
|
# API to use for accessing data. Default value points to sqlalchemy
|
||||||
# package, it is also possible to use: glance.db.registry.api
|
# package, it is also possible to use: glance.db.registry.api
|
||||||
#data_api = glance.db.sqlalchemy.api
|
#data_api = glance.db.sqlalchemy.api
|
||||||
@ -121,3 +116,17 @@ registry_port = 9191
|
|||||||
# If set, use this value for pool_timeout with sqlalchemy
|
# If set, use this value for pool_timeout with sqlalchemy
|
||||||
# (integer value)
|
# (integer value)
|
||||||
#pool_timeout=<None>
|
#pool_timeout=<None>
|
||||||
|
|
||||||
|
[oslo_concurrency]
|
||||||
|
|
||||||
|
# Enables or disables inter-process locks. (boolean value)
|
||||||
|
# Deprecated group/name - [DEFAULT]/disable_process_locking
|
||||||
|
#disable_process_locking = false
|
||||||
|
|
||||||
|
# Directory to use for lock files. For security, the specified
|
||||||
|
# directory should only be writable by the user running the processes
|
||||||
|
# that need locking. It could be read from environment variable
|
||||||
|
# OSLO_LOCK_PATH. This setting needs to be the same for both
|
||||||
|
# glance-scrubber and glance-api service. Default to a temp directory.
|
||||||
|
# Deprecated group/name - [DEFAULT]/lock_path (string value)
|
||||||
|
#lock_path = /tmp
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
output_file = etc/glance-api.conf.sample
|
output_file = etc/glance-api.conf.sample
|
||||||
namespace = glance.api
|
namespace = glance.api
|
||||||
namespace = glance.store
|
namespace = glance.store
|
||||||
|
namespace = oslo.concurrency
|
||||||
namespace = oslo.messaging
|
namespace = oslo.messaging
|
||||||
namespace = oslo.db
|
namespace = oslo.db
|
||||||
namespace = oslo.db.concurrency
|
namespace = oslo.db.concurrency
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
[DEFAULT]
|
[DEFAULT]
|
||||||
output_file = etc/glance-scrubber.conf.sample
|
output_file = etc/glance-scrubber.conf.sample
|
||||||
namespace = glance.scrubber
|
namespace = glance.scrubber
|
||||||
|
namespace = oslo.concurrency
|
||||||
namespace = oslo.db
|
namespace = oslo.db
|
||||||
namespace = oslo.db.concurrency
|
namespace = oslo.db.concurrency
|
||||||
|
@ -14,12 +14,12 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
|
from oslo.concurrency import lockutils
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
import glance.async
|
import glance.async
|
||||||
import glance.common.scripts as scripts
|
import glance.common.scripts as scripts
|
||||||
from glance import i18n
|
from glance import i18n
|
||||||
from glance.openstack.common import lockutils
|
|
||||||
import glance.openstack.common.log as logging
|
import glance.openstack.common.log as logging
|
||||||
|
|
||||||
|
|
||||||
|
@ -23,7 +23,9 @@ import logging
|
|||||||
import logging.config
|
import logging.config
|
||||||
import logging.handlers
|
import logging.handlers
|
||||||
import os
|
import os
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
from oslo.concurrency import lockutils
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
from paste import deploy
|
from paste import deploy
|
||||||
|
|
||||||
@ -159,6 +161,9 @@ CONF.register_opts(common_opts)
|
|||||||
|
|
||||||
|
|
||||||
def parse_args(args=None, usage=None, default_config_files=None):
|
def parse_args(args=None, usage=None, default_config_files=None):
|
||||||
|
if "OSLO_LOCK_PATH" not in os.environ:
|
||||||
|
lockutils.set_defaults(tempfile.gettempdir())
|
||||||
|
|
||||||
CONF(args=args,
|
CONF(args=args,
|
||||||
project='glance',
|
project='glance',
|
||||||
version=version.cached_version_string(),
|
version=version.cached_version_string(),
|
||||||
|
@ -17,6 +17,7 @@ __all__ = [
|
|||||||
'run',
|
'run',
|
||||||
]
|
]
|
||||||
|
|
||||||
|
from oslo.concurrency import lockutils
|
||||||
from oslo.utils import excutils
|
from oslo.utils import excutils
|
||||||
import six
|
import six
|
||||||
|
|
||||||
@ -26,7 +27,6 @@ from glance.common.scripts import utils as script_utils
|
|||||||
from glance.common import store_utils
|
from glance.common import store_utils
|
||||||
from glance.common import utils as common_utils
|
from glance.common import utils as common_utils
|
||||||
from glance import i18n
|
from glance import i18n
|
||||||
from glance.openstack.common import lockutils
|
|
||||||
import glance.openstack.common.log as logging
|
import glance.openstack.common.log as logging
|
||||||
|
|
||||||
|
|
||||||
|
@ -34,6 +34,7 @@ from eventlet.green import socket
|
|||||||
from eventlet.green import ssl
|
from eventlet.green import ssl
|
||||||
import eventlet.greenio
|
import eventlet.greenio
|
||||||
import eventlet.wsgi
|
import eventlet.wsgi
|
||||||
|
from oslo.concurrency import processutils
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
from oslo.serialization import jsonutils
|
from oslo.serialization import jsonutils
|
||||||
import routes
|
import routes
|
||||||
@ -47,7 +48,7 @@ from glance.common import exception
|
|||||||
from glance.common import utils
|
from glance.common import utils
|
||||||
from glance import i18n
|
from glance import i18n
|
||||||
import glance.openstack.common.log as logging
|
import glance.openstack.common.log as logging
|
||||||
from glance.openstack.common import processutils
|
|
||||||
|
|
||||||
_ = i18n._
|
_ = i18n._
|
||||||
|
|
||||||
|
@ -1,326 +0,0 @@
|
|||||||
# Copyright 2011 OpenStack Foundation.
|
|
||||||
# All Rights Reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
import contextlib
|
|
||||||
import errno
|
|
||||||
import functools
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import shutil
|
|
||||||
import subprocess
|
|
||||||
import sys
|
|
||||||
import tempfile
|
|
||||||
import threading
|
|
||||||
import time
|
|
||||||
import weakref
|
|
||||||
|
|
||||||
from oslo.config import cfg
|
|
||||||
|
|
||||||
from glance.openstack.common import fileutils
|
|
||||||
from glance.openstack.common._i18n import _, _LE, _LI
|
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
util_opts = [
|
|
||||||
cfg.BoolOpt('disable_process_locking', default=False,
|
|
||||||
help='Enables or disables inter-process locks.'),
|
|
||||||
cfg.StrOpt('lock_path',
|
|
||||||
default=os.environ.get("GLANCE_LOCK_PATH"),
|
|
||||||
help='Directory to use for lock files.')
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
CONF = cfg.CONF
|
|
||||||
CONF.register_opts(util_opts)
|
|
||||||
|
|
||||||
|
|
||||||
def set_defaults(lock_path):
|
|
||||||
cfg.set_defaults(util_opts, lock_path=lock_path)
|
|
||||||
|
|
||||||
|
|
||||||
class _FileLock(object):
|
|
||||||
"""Lock implementation which allows multiple locks, working around
|
|
||||||
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
|
|
||||||
not require any cleanup. Since the lock is always held on a file
|
|
||||||
descriptor rather than outside of the process, the lock gets dropped
|
|
||||||
automatically if the process crashes, even if __exit__ is not executed.
|
|
||||||
|
|
||||||
There are no guarantees regarding usage by multiple green threads in a
|
|
||||||
single process here. This lock works only between processes. Exclusive
|
|
||||||
access between local threads should be achieved using the semaphores
|
|
||||||
in the @synchronized decorator.
|
|
||||||
|
|
||||||
Note these locks are released when the descriptor is closed, so it's not
|
|
||||||
safe to close the file descriptor while another green thread holds the
|
|
||||||
lock. Just opening and closing the lock file can break synchronisation,
|
|
||||||
so lock files must be accessed only using this abstraction.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, name):
|
|
||||||
self.lockfile = None
|
|
||||||
self.fname = name
|
|
||||||
|
|
||||||
def acquire(self):
|
|
||||||
basedir = os.path.dirname(self.fname)
|
|
||||||
|
|
||||||
if not os.path.exists(basedir):
|
|
||||||
fileutils.ensure_tree(basedir)
|
|
||||||
LOG.info(_LI('Created lock path: %s'), basedir)
|
|
||||||
|
|
||||||
self.lockfile = open(self.fname, 'w')
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
# Using non-blocking locks since green threads are not
|
|
||||||
# patched to deal with blocking locking calls.
|
|
||||||
# Also upon reading the MSDN docs for locking(), it seems
|
|
||||||
# to have a laughable 10 attempts "blocking" mechanism.
|
|
||||||
self.trylock()
|
|
||||||
LOG.debug('Got file lock "%s"', self.fname)
|
|
||||||
return True
|
|
||||||
except IOError as e:
|
|
||||||
if e.errno in (errno.EACCES, errno.EAGAIN):
|
|
||||||
# external locks synchronise things like iptables
|
|
||||||
# updates - give it some time to prevent busy spinning
|
|
||||||
time.sleep(0.01)
|
|
||||||
else:
|
|
||||||
raise threading.ThreadError(_("Unable to acquire lock on"
|
|
||||||
" `%(filename)s` due to"
|
|
||||||
" %(exception)s") %
|
|
||||||
{'filename': self.fname,
|
|
||||||
'exception': e})
|
|
||||||
|
|
||||||
def __enter__(self):
|
|
||||||
self.acquire()
|
|
||||||
return self
|
|
||||||
|
|
||||||
def release(self):
|
|
||||||
try:
|
|
||||||
self.unlock()
|
|
||||||
self.lockfile.close()
|
|
||||||
LOG.debug('Released file lock "%s"', self.fname)
|
|
||||||
except IOError:
|
|
||||||
LOG.exception(_LE("Could not release the acquired lock `%s`"),
|
|
||||||
self.fname)
|
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
||||||
self.release()
|
|
||||||
|
|
||||||
def exists(self):
|
|
||||||
return os.path.exists(self.fname)
|
|
||||||
|
|
||||||
def trylock(self):
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
def unlock(self):
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
|
|
||||||
class _WindowsLock(_FileLock):
|
|
||||||
def trylock(self):
|
|
||||||
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
|
|
||||||
|
|
||||||
def unlock(self):
|
|
||||||
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
|
|
||||||
|
|
||||||
|
|
||||||
class _FcntlLock(_FileLock):
|
|
||||||
def trylock(self):
|
|
||||||
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
||||||
|
|
||||||
def unlock(self):
|
|
||||||
fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
|
|
||||||
|
|
||||||
|
|
||||||
if os.name == 'nt':
|
|
||||||
import msvcrt
|
|
||||||
InterProcessLock = _WindowsLock
|
|
||||||
else:
|
|
||||||
import fcntl
|
|
||||||
InterProcessLock = _FcntlLock
|
|
||||||
|
|
||||||
_semaphores = weakref.WeakValueDictionary()
|
|
||||||
_semaphores_lock = threading.Lock()
|
|
||||||
|
|
||||||
|
|
||||||
def _get_lock_path(name, lock_file_prefix, lock_path=None):
|
|
||||||
# NOTE(mikal): the lock name cannot contain directory
|
|
||||||
# separators
|
|
||||||
name = name.replace(os.sep, '_')
|
|
||||||
if lock_file_prefix:
|
|
||||||
sep = '' if lock_file_prefix.endswith('-') else '-'
|
|
||||||
name = '%s%s%s' % (lock_file_prefix, sep, name)
|
|
||||||
|
|
||||||
local_lock_path = lock_path or CONF.lock_path
|
|
||||||
|
|
||||||
if not local_lock_path:
|
|
||||||
raise cfg.RequiredOptError('lock_path')
|
|
||||||
|
|
||||||
return os.path.join(local_lock_path, name)
|
|
||||||
|
|
||||||
|
|
||||||
def external_lock(name, lock_file_prefix=None, lock_path=None):
|
|
||||||
LOG.debug('Attempting to grab external lock "%(lock)s"',
|
|
||||||
{'lock': name})
|
|
||||||
|
|
||||||
lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path)
|
|
||||||
|
|
||||||
return InterProcessLock(lock_file_path)
|
|
||||||
|
|
||||||
|
|
||||||
def remove_external_lock_file(name, lock_file_prefix=None):
|
|
||||||
"""Remove an external lock file when it's not used anymore
|
|
||||||
This will be helpful when we have a lot of lock files
|
|
||||||
"""
|
|
||||||
with internal_lock(name):
|
|
||||||
lock_file_path = _get_lock_path(name, lock_file_prefix)
|
|
||||||
try:
|
|
||||||
os.remove(lock_file_path)
|
|
||||||
except OSError:
|
|
||||||
LOG.info(_LI('Failed to remove file %(file)s'),
|
|
||||||
{'file': lock_file_path})
|
|
||||||
|
|
||||||
|
|
||||||
def internal_lock(name):
|
|
||||||
with _semaphores_lock:
|
|
||||||
try:
|
|
||||||
sem = _semaphores[name]
|
|
||||||
LOG.debug('Using existing semaphore "%s"', name)
|
|
||||||
except KeyError:
|
|
||||||
sem = threading.Semaphore()
|
|
||||||
_semaphores[name] = sem
|
|
||||||
LOG.debug('Created new semaphore "%s"', name)
|
|
||||||
|
|
||||||
return sem
|
|
||||||
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
|
||||||
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
|
|
||||||
"""Context based lock
|
|
||||||
|
|
||||||
This function yields a `threading.Semaphore` instance (if we don't use
|
|
||||||
eventlet.monkey_patch(), else `semaphore.Semaphore`) unless external is
|
|
||||||
True, in which case, it'll yield an InterProcessLock instance.
|
|
||||||
|
|
||||||
:param lock_file_prefix: The lock_file_prefix argument is used to provide
|
|
||||||
lock files on disk with a meaningful prefix.
|
|
||||||
|
|
||||||
:param external: The external keyword argument denotes whether this lock
|
|
||||||
should work across multiple processes. This means that if two different
|
|
||||||
workers both run a method decorated with @synchronized('mylock',
|
|
||||||
external=True), only one of them will execute at a time.
|
|
||||||
"""
|
|
||||||
int_lock = internal_lock(name)
|
|
||||||
with int_lock:
|
|
||||||
LOG.debug('Acquired semaphore "%(lock)s"', {'lock': name})
|
|
||||||
try:
|
|
||||||
if external and not CONF.disable_process_locking:
|
|
||||||
ext_lock = external_lock(name, lock_file_prefix, lock_path)
|
|
||||||
with ext_lock:
|
|
||||||
yield ext_lock
|
|
||||||
else:
|
|
||||||
yield int_lock
|
|
||||||
finally:
|
|
||||||
LOG.debug('Releasing semaphore "%(lock)s"', {'lock': name})
|
|
||||||
|
|
||||||
|
|
||||||
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
|
|
||||||
"""Synchronization decorator.
|
|
||||||
|
|
||||||
Decorating a method like so::
|
|
||||||
|
|
||||||
@synchronized('mylock')
|
|
||||||
def foo(self, *args):
|
|
||||||
...
|
|
||||||
|
|
||||||
ensures that only one thread will execute the foo method at a time.
|
|
||||||
|
|
||||||
Different methods can share the same lock::
|
|
||||||
|
|
||||||
@synchronized('mylock')
|
|
||||||
def foo(self, *args):
|
|
||||||
...
|
|
||||||
|
|
||||||
@synchronized('mylock')
|
|
||||||
def bar(self, *args):
|
|
||||||
...
|
|
||||||
|
|
||||||
This way only one of either foo or bar can be executing at a time.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def wrap(f):
|
|
||||||
@functools.wraps(f)
|
|
||||||
def inner(*args, **kwargs):
|
|
||||||
try:
|
|
||||||
with lock(name, lock_file_prefix, external, lock_path):
|
|
||||||
LOG.debug('Got semaphore / lock "%(function)s"',
|
|
||||||
{'function': f.__name__})
|
|
||||||
return f(*args, **kwargs)
|
|
||||||
finally:
|
|
||||||
LOG.debug('Semaphore / lock released "%(function)s"',
|
|
||||||
{'function': f.__name__})
|
|
||||||
return inner
|
|
||||||
return wrap
|
|
||||||
|
|
||||||
|
|
||||||
def synchronized_with_prefix(lock_file_prefix):
|
|
||||||
"""Partial object generator for the synchronization decorator.
|
|
||||||
|
|
||||||
Redefine @synchronized in each project like so::
|
|
||||||
|
|
||||||
(in nova/utils.py)
|
|
||||||
from nova.openstack.common import lockutils
|
|
||||||
|
|
||||||
synchronized = lockutils.synchronized_with_prefix('nova-')
|
|
||||||
|
|
||||||
|
|
||||||
(in nova/foo.py)
|
|
||||||
from nova import utils
|
|
||||||
|
|
||||||
@utils.synchronized('mylock')
|
|
||||||
def bar(self, *args):
|
|
||||||
...
|
|
||||||
|
|
||||||
The lock_file_prefix argument is used to provide lock files on disk with a
|
|
||||||
meaningful prefix.
|
|
||||||
"""
|
|
||||||
|
|
||||||
return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)
|
|
||||||
|
|
||||||
|
|
||||||
def main(argv):
|
|
||||||
"""Create a dir for locks and pass it to command from arguments
|
|
||||||
|
|
||||||
If you run this:
|
|
||||||
python -m openstack.common.lockutils python setup.py testr <etc>
|
|
||||||
|
|
||||||
a temporary directory will be created for all your locks and passed to all
|
|
||||||
your tests in an environment variable. The temporary dir will be deleted
|
|
||||||
afterwards and the return value will be preserved.
|
|
||||||
"""
|
|
||||||
|
|
||||||
lock_dir = tempfile.mkdtemp()
|
|
||||||
os.environ["GLANCE_LOCK_PATH"] = lock_dir
|
|
||||||
try:
|
|
||||||
ret_val = subprocess.call(argv[1:])
|
|
||||||
finally:
|
|
||||||
shutil.rmtree(lock_dir, ignore_errors=True)
|
|
||||||
return ret_val
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
sys.exit(main(sys.argv))
|
|
@ -1,289 +0,0 @@
|
|||||||
# Copyright 2011 OpenStack Foundation.
|
|
||||||
# All Rights Reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
"""
|
|
||||||
System-level utilities and helper functions.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import errno
|
|
||||||
import logging
|
|
||||||
import multiprocessing
|
|
||||||
import os
|
|
||||||
import random
|
|
||||||
import shlex
|
|
||||||
import signal
|
|
||||||
|
|
||||||
from eventlet.green import subprocess
|
|
||||||
from eventlet import greenthread
|
|
||||||
from oslo.utils import strutils
|
|
||||||
import six
|
|
||||||
|
|
||||||
from glance.openstack.common._i18n import _
|
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class InvalidArgumentError(Exception):
|
|
||||||
def __init__(self, message=None):
|
|
||||||
super(InvalidArgumentError, self).__init__(message)
|
|
||||||
|
|
||||||
|
|
||||||
class UnknownArgumentError(Exception):
|
|
||||||
def __init__(self, message=None):
|
|
||||||
super(UnknownArgumentError, self).__init__(message)
|
|
||||||
|
|
||||||
|
|
||||||
class ProcessExecutionError(Exception):
|
|
||||||
def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
|
|
||||||
description=None):
|
|
||||||
self.exit_code = exit_code
|
|
||||||
self.stderr = stderr
|
|
||||||
self.stdout = stdout
|
|
||||||
self.cmd = cmd
|
|
||||||
self.description = description
|
|
||||||
|
|
||||||
if description is None:
|
|
||||||
description = _("Unexpected error while running command.")
|
|
||||||
if exit_code is None:
|
|
||||||
exit_code = '-'
|
|
||||||
message = _('%(description)s\n'
|
|
||||||
'Command: %(cmd)s\n'
|
|
||||||
'Exit code: %(exit_code)s\n'
|
|
||||||
'Stdout: %(stdout)r\n'
|
|
||||||
'Stderr: %(stderr)r') % {'description': description,
|
|
||||||
'cmd': cmd,
|
|
||||||
'exit_code': exit_code,
|
|
||||||
'stdout': stdout,
|
|
||||||
'stderr': stderr}
|
|
||||||
super(ProcessExecutionError, self).__init__(message)
|
|
||||||
|
|
||||||
|
|
||||||
class NoRootWrapSpecified(Exception):
|
|
||||||
def __init__(self, message=None):
|
|
||||||
super(NoRootWrapSpecified, self).__init__(message)
|
|
||||||
|
|
||||||
|
|
||||||
def _subprocess_setup():
|
|
||||||
# Python installs a SIGPIPE handler by default. This is usually not what
|
|
||||||
# non-Python subprocesses expect.
|
|
||||||
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
|
|
||||||
|
|
||||||
|
|
||||||
def execute(*cmd, **kwargs):
|
|
||||||
"""Helper method to shell out and execute a command through subprocess.
|
|
||||||
|
|
||||||
Allows optional retry.
|
|
||||||
|
|
||||||
:param cmd: Passed to subprocess.Popen.
|
|
||||||
:type cmd: string
|
|
||||||
:param process_input: Send to opened process.
|
|
||||||
:type process_input: string
|
|
||||||
:param env_variables: Environment variables and their values that
|
|
||||||
will be set for the process.
|
|
||||||
:type env_variables: dict
|
|
||||||
:param check_exit_code: Single bool, int, or list of allowed exit
|
|
||||||
codes. Defaults to [0]. Raise
|
|
||||||
:class:`ProcessExecutionError` unless
|
|
||||||
program exits with one of these code.
|
|
||||||
:type check_exit_code: boolean, int, or [int]
|
|
||||||
:param delay_on_retry: True | False. Defaults to True. If set to True,
|
|
||||||
wait a short amount of time before retrying.
|
|
||||||
:type delay_on_retry: boolean
|
|
||||||
:param attempts: How many times to retry cmd.
|
|
||||||
:type attempts: int
|
|
||||||
:param run_as_root: True | False. Defaults to False. If set to True,
|
|
||||||
the command is prefixed by the command specified
|
|
||||||
in the root_helper kwarg.
|
|
||||||
:type run_as_root: boolean
|
|
||||||
:param root_helper: command to prefix to commands called with
|
|
||||||
run_as_root=True
|
|
||||||
:type root_helper: string
|
|
||||||
:param shell: whether or not there should be a shell used to
|
|
||||||
execute this command. Defaults to false.
|
|
||||||
:type shell: boolean
|
|
||||||
:param loglevel: log level for execute commands.
|
|
||||||
:type loglevel: int. (Should be logging.DEBUG or logging.INFO)
|
|
||||||
:returns: (stdout, stderr) from process execution
|
|
||||||
:raises: :class:`UnknownArgumentError` on
|
|
||||||
receiving unknown arguments
|
|
||||||
:raises: :class:`ProcessExecutionError`
|
|
||||||
"""
|
|
||||||
|
|
||||||
process_input = kwargs.pop('process_input', None)
|
|
||||||
env_variables = kwargs.pop('env_variables', None)
|
|
||||||
check_exit_code = kwargs.pop('check_exit_code', [0])
|
|
||||||
ignore_exit_code = False
|
|
||||||
delay_on_retry = kwargs.pop('delay_on_retry', True)
|
|
||||||
attempts = kwargs.pop('attempts', 1)
|
|
||||||
run_as_root = kwargs.pop('run_as_root', False)
|
|
||||||
root_helper = kwargs.pop('root_helper', '')
|
|
||||||
shell = kwargs.pop('shell', False)
|
|
||||||
loglevel = kwargs.pop('loglevel', logging.DEBUG)
|
|
||||||
|
|
||||||
if isinstance(check_exit_code, bool):
|
|
||||||
ignore_exit_code = not check_exit_code
|
|
||||||
check_exit_code = [0]
|
|
||||||
elif isinstance(check_exit_code, int):
|
|
||||||
check_exit_code = [check_exit_code]
|
|
||||||
|
|
||||||
if kwargs:
|
|
||||||
raise UnknownArgumentError(_('Got unknown keyword args: %r') % kwargs)
|
|
||||||
|
|
||||||
if run_as_root and hasattr(os, 'geteuid') and os.geteuid() != 0:
|
|
||||||
if not root_helper:
|
|
||||||
raise NoRootWrapSpecified(
|
|
||||||
message=_('Command requested root, but did not '
|
|
||||||
'specify a root helper.'))
|
|
||||||
cmd = shlex.split(root_helper) + list(cmd)
|
|
||||||
|
|
||||||
cmd = map(str, cmd)
|
|
||||||
sanitized_cmd = strutils.mask_password(' '.join(cmd))
|
|
||||||
|
|
||||||
while attempts > 0:
|
|
||||||
attempts -= 1
|
|
||||||
try:
|
|
||||||
LOG.log(loglevel, _('Running cmd (subprocess): %s'), sanitized_cmd)
|
|
||||||
_PIPE = subprocess.PIPE # pylint: disable=E1101
|
|
||||||
|
|
||||||
if os.name == 'nt':
|
|
||||||
preexec_fn = None
|
|
||||||
close_fds = False
|
|
||||||
else:
|
|
||||||
preexec_fn = _subprocess_setup
|
|
||||||
close_fds = True
|
|
||||||
|
|
||||||
obj = subprocess.Popen(cmd,
|
|
||||||
stdin=_PIPE,
|
|
||||||
stdout=_PIPE,
|
|
||||||
stderr=_PIPE,
|
|
||||||
close_fds=close_fds,
|
|
||||||
preexec_fn=preexec_fn,
|
|
||||||
shell=shell,
|
|
||||||
env=env_variables)
|
|
||||||
result = None
|
|
||||||
for _i in six.moves.range(20):
|
|
||||||
# NOTE(russellb) 20 is an arbitrary number of retries to
|
|
||||||
# prevent any chance of looping forever here.
|
|
||||||
try:
|
|
||||||
if process_input is not None:
|
|
||||||
result = obj.communicate(process_input)
|
|
||||||
else:
|
|
||||||
result = obj.communicate()
|
|
||||||
except OSError as e:
|
|
||||||
if e.errno in (errno.EAGAIN, errno.EINTR):
|
|
||||||
continue
|
|
||||||
raise
|
|
||||||
break
|
|
||||||
obj.stdin.close() # pylint: disable=E1101
|
|
||||||
_returncode = obj.returncode # pylint: disable=E1101
|
|
||||||
LOG.log(loglevel, 'Result was %s' % _returncode)
|
|
||||||
if not ignore_exit_code and _returncode not in check_exit_code:
|
|
||||||
(stdout, stderr) = result
|
|
||||||
sanitized_stdout = strutils.mask_password(stdout)
|
|
||||||
sanitized_stderr = strutils.mask_password(stderr)
|
|
||||||
raise ProcessExecutionError(exit_code=_returncode,
|
|
||||||
stdout=sanitized_stdout,
|
|
||||||
stderr=sanitized_stderr,
|
|
||||||
cmd=sanitized_cmd)
|
|
||||||
return result
|
|
||||||
except ProcessExecutionError:
|
|
||||||
if not attempts:
|
|
||||||
raise
|
|
||||||
else:
|
|
||||||
LOG.log(loglevel, _('%r failed. Retrying.'), sanitized_cmd)
|
|
||||||
if delay_on_retry:
|
|
||||||
greenthread.sleep(random.randint(20, 200) / 100.0)
|
|
||||||
finally:
|
|
||||||
# NOTE(termie): this appears to be necessary to let the subprocess
|
|
||||||
# call clean something up in between calls, without
|
|
||||||
# it two execute calls in a row hangs the second one
|
|
||||||
greenthread.sleep(0)
|
|
||||||
|
|
||||||
|
|
||||||
def trycmd(*args, **kwargs):
|
|
||||||
"""A wrapper around execute() to more easily handle warnings and errors.
|
|
||||||
|
|
||||||
Returns an (out, err) tuple of strings containing the output of
|
|
||||||
the command's stdout and stderr. If 'err' is not empty then the
|
|
||||||
command can be considered to have failed.
|
|
||||||
|
|
||||||
:discard_warnings True | False. Defaults to False. If set to True,
|
|
||||||
then for succeeding commands, stderr is cleared
|
|
||||||
|
|
||||||
"""
|
|
||||||
discard_warnings = kwargs.pop('discard_warnings', False)
|
|
||||||
|
|
||||||
try:
|
|
||||||
out, err = execute(*args, **kwargs)
|
|
||||||
failed = False
|
|
||||||
except ProcessExecutionError as exn:
|
|
||||||
out, err = '', six.text_type(exn)
|
|
||||||
failed = True
|
|
||||||
|
|
||||||
if not failed and discard_warnings and err:
|
|
||||||
# Handle commands that output to stderr but otherwise succeed
|
|
||||||
err = ''
|
|
||||||
|
|
||||||
return out, err
|
|
||||||
|
|
||||||
|
|
||||||
def ssh_execute(ssh, cmd, process_input=None,
|
|
||||||
addl_env=None, check_exit_code=True):
|
|
||||||
sanitized_cmd = strutils.mask_password(cmd)
|
|
||||||
LOG.debug('Running cmd (SSH): %s', sanitized_cmd)
|
|
||||||
if addl_env:
|
|
||||||
raise InvalidArgumentError(_('Environment not supported over SSH'))
|
|
||||||
|
|
||||||
if process_input:
|
|
||||||
# This is (probably) fixable if we need it...
|
|
||||||
raise InvalidArgumentError(_('process_input not supported over SSH'))
|
|
||||||
|
|
||||||
stdin_stream, stdout_stream, stderr_stream = ssh.exec_command(cmd)
|
|
||||||
channel = stdout_stream.channel
|
|
||||||
|
|
||||||
# NOTE(justinsb): This seems suspicious...
|
|
||||||
# ...other SSH clients have buffering issues with this approach
|
|
||||||
stdout = stdout_stream.read()
|
|
||||||
sanitized_stdout = strutils.mask_password(stdout)
|
|
||||||
stderr = stderr_stream.read()
|
|
||||||
sanitized_stderr = strutils.mask_password(stderr)
|
|
||||||
|
|
||||||
stdin_stream.close()
|
|
||||||
|
|
||||||
exit_status = channel.recv_exit_status()
|
|
||||||
|
|
||||||
# exit_status == -1 if no exit code was returned
|
|
||||||
if exit_status != -1:
|
|
||||||
LOG.debug('Result was %s' % exit_status)
|
|
||||||
if check_exit_code and exit_status != 0:
|
|
||||||
raise ProcessExecutionError(exit_code=exit_status,
|
|
||||||
stdout=sanitized_stdout,
|
|
||||||
stderr=sanitized_stderr,
|
|
||||||
cmd=sanitized_cmd)
|
|
||||||
|
|
||||||
return (sanitized_stdout, sanitized_stderr)
|
|
||||||
|
|
||||||
|
|
||||||
def get_worker_count():
|
|
||||||
"""Utility to get the default worker count.
|
|
||||||
|
|
||||||
@return: The number of CPUs if that can be determined, else a default
|
|
||||||
worker count of 1 is returned.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
return multiprocessing.cpu_count()
|
|
||||||
except NotImplementedError:
|
|
||||||
return 1
|
|
@ -34,7 +34,6 @@ import glance.common.wsgi
|
|||||||
import glance.image_cache
|
import glance.image_cache
|
||||||
import glance.image_cache.drivers.sqlite
|
import glance.image_cache.drivers.sqlite
|
||||||
import glance.notifier
|
import glance.notifier
|
||||||
import glance.openstack.common.lockutils
|
|
||||||
import glance.openstack.common.log
|
import glance.openstack.common.log
|
||||||
import glance.openstack.common.policy
|
import glance.openstack.common.policy
|
||||||
import glance.registry
|
import glance.registry
|
||||||
@ -65,7 +64,6 @@ _api_opts = [
|
|||||||
glance.registry.client.registry_client_ctx_opts,
|
glance.registry.client.registry_client_ctx_opts,
|
||||||
glance.registry.client.registry_client_opts,
|
glance.registry.client.registry_client_opts,
|
||||||
glance.registry.client.v1.api.registry_client_ctx_opts,
|
glance.registry.client.v1.api.registry_client_ctx_opts,
|
||||||
glance.openstack.common.lockutils.util_opts,
|
|
||||||
glance.openstack.common.policy.policy_opts,
|
glance.openstack.common.policy.policy_opts,
|
||||||
glance.scrubber.scrubber_opts])))),
|
glance.scrubber.scrubber_opts])))),
|
||||||
('image_format', glance.common.config.image_format_opts),
|
('image_format', glance.common.config.image_format_opts),
|
||||||
@ -87,7 +85,6 @@ _registry_opts = [
|
|||||||
_scrubber_opts = [
|
_scrubber_opts = [
|
||||||
(None, list(itertools.chain(*(_global_opt_lists + [
|
(None, list(itertools.chain(*(_global_opt_lists + [
|
||||||
glance.common.config.common_opts,
|
glance.common.config.common_opts,
|
||||||
glance.openstack.common.lockutils.util_opts,
|
|
||||||
glance.openstack.common.policy.policy_opts,
|
glance.openstack.common.policy.policy_opts,
|
||||||
glance.scrubber.scrubber_opts,
|
glance.scrubber.scrubber_opts,
|
||||||
glance.scrubber.scrubber_cmd_opts,
|
glance.scrubber.scrubber_cmd_opts,
|
||||||
|
@ -19,6 +19,7 @@ import os
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
|
from oslo.concurrency import lockutils
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
import six
|
import six
|
||||||
|
|
||||||
@ -28,7 +29,6 @@ from glance.common import utils
|
|||||||
from glance import context
|
from glance import context
|
||||||
import glance.db as db_api
|
import glance.db as db_api
|
||||||
from glance import i18n
|
from glance import i18n
|
||||||
from glance.openstack.common import lockutils
|
|
||||||
import glance.openstack.common.log as logging
|
import glance.openstack.common.log as logging
|
||||||
import glance.registry.client.v1.api as registry
|
import glance.registry.client.v1.api as registry
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@ import os
|
|||||||
|
|
||||||
import glance_store as store
|
import glance_store as store
|
||||||
from glance_store import location
|
from glance_store import location
|
||||||
|
from oslo.concurrency import lockutils
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
from oslo.db import options
|
from oslo.db import options
|
||||||
from oslo.serialization import jsonutils
|
from oslo.serialization import jsonutils
|
||||||
@ -61,10 +62,10 @@ class IsolatedUnitTest(StoreClearingUnitTest):
|
|||||||
super(IsolatedUnitTest, self).setUp()
|
super(IsolatedUnitTest, self).setUp()
|
||||||
options.set_defaults(CONF, connection='sqlite://',
|
options.set_defaults(CONF, connection='sqlite://',
|
||||||
sqlite_db='glance.sqlite')
|
sqlite_db='glance.sqlite')
|
||||||
|
lockutils.set_defaults(os.path.join(self.test_dir))
|
||||||
|
|
||||||
self.config(verbose=False,
|
self.config(verbose=False,
|
||||||
debug=False,
|
debug=False)
|
||||||
lock_path=os.path.join(self.test_dir))
|
|
||||||
|
|
||||||
self.config(default_store='filesystem',
|
self.config(default_store='filesystem',
|
||||||
filesystem_store_datadir=os.path.join(self.test_dir),
|
filesystem_store_datadir=os.path.join(self.test_dir),
|
||||||
|
@ -131,8 +131,6 @@ class OptsTestCase(utils.BaseTestCase):
|
|||||||
'registry_client_insecure',
|
'registry_client_insecure',
|
||||||
'registry_client_timeout',
|
'registry_client_timeout',
|
||||||
'send_identity_headers',
|
'send_identity_headers',
|
||||||
'disable_process_locking',
|
|
||||||
'lock_path',
|
|
||||||
'scrubber_datadir',
|
'scrubber_datadir',
|
||||||
'scrub_time',
|
'scrub_time',
|
||||||
'cleanup_scrubber',
|
'cleanup_scrubber',
|
||||||
@ -252,8 +250,6 @@ class OptsTestCase(utils.BaseTestCase):
|
|||||||
'pydev_worker_debug_host',
|
'pydev_worker_debug_host',
|
||||||
'pydev_worker_debug_port',
|
'pydev_worker_debug_port',
|
||||||
'metadata_encryption_key',
|
'metadata_encryption_key',
|
||||||
'disable_process_locking',
|
|
||||||
'lock_path',
|
|
||||||
'scrubber_datadir',
|
'scrubber_datadir',
|
||||||
'scrub_time',
|
'scrub_time',
|
||||||
'cleanup_scrubber',
|
'cleanup_scrubber',
|
||||||
|
@ -4,10 +4,8 @@
|
|||||||
module=_i18n
|
module=_i18n
|
||||||
module=install_venv_common
|
module=install_venv_common
|
||||||
module=local
|
module=local
|
||||||
module=lockutils
|
|
||||||
module=log
|
module=log
|
||||||
module=policy
|
module=policy
|
||||||
module=processutils
|
|
||||||
|
|
||||||
# The base module to hold the copy of openstack.common
|
# The base module to hold the copy of openstack.common
|
||||||
base=glance
|
base=glance
|
||||||
|
@ -25,6 +25,7 @@ pycrypto>=2.6
|
|||||||
iso8601>=0.1.9
|
iso8601>=0.1.9
|
||||||
ordereddict
|
ordereddict
|
||||||
oslo.config>=1.4.0 # Apache-2.0
|
oslo.config>=1.4.0 # Apache-2.0
|
||||||
|
oslo.concurrency>=0.3.0 # Apache-2.0
|
||||||
oslo.utils>=1.1.0 # Apache-2.0
|
oslo.utils>=1.1.0 # Apache-2.0
|
||||||
stevedore>=1.1.0 # Apache-2.0
|
stevedore>=1.1.0 # Apache-2.0
|
||||||
netaddr>=0.7.12
|
netaddr>=0.7.12
|
||||||
|
@ -178,7 +178,7 @@ function run_pep8 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
TESTRTESTS="python -m glance.openstack.common.lockutils python setup.py testr"
|
TESTRTESTS="lockutils-wrapper python setup.py testr"
|
||||||
|
|
||||||
if [ $never_venv -eq 0 ]
|
if [ $never_venv -eq 0 ]
|
||||||
then
|
then
|
||||||
|
3
tox.ini
3
tox.ini
@ -9,8 +9,7 @@ usedevelop = True
|
|||||||
install_command = pip install --allow-all-external --allow-insecure netaddr -U {opts} {packages}
|
install_command = pip install --allow-all-external --allow-insecure netaddr -U {opts} {packages}
|
||||||
deps = -r{toxinidir}/requirements.txt
|
deps = -r{toxinidir}/requirements.txt
|
||||||
-r{toxinidir}/test-requirements.txt
|
-r{toxinidir}/test-requirements.txt
|
||||||
commands = python -m glance.openstack.common.lockutils python setup.py testr --slowest \
|
commands = lockutils-wrapper python setup.py testr --slowest --testr-args='--concurrency 0 {posargs}'
|
||||||
--testr-args='--concurrency 0 {posargs}'
|
|
||||||
whitelist_externals = bash
|
whitelist_externals = bash
|
||||||
|
|
||||||
[tox:jenkins]
|
[tox:jenkins]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user