Scrubber refactoring

* Adding multiple locations image support.
* Adding lock protection to prevent race condition between glance-api
and glance-scrubber service.
* Refactoring scrub queue code.

Implement bp: glance-scrubber-refactoring
docImpact

Change-Id: I050ff212d73ace8e84dcd800245b608210d6b29a
Signed-off-by: Zhi Yan Liu <zhiyanl@cn.ibm.com>
This commit is contained in:
Zhi Yan Liu 2013-07-30 23:05:10 +08:00
parent 0e77554b73
commit 85075f4b11
16 changed files with 862 additions and 152 deletions

View File

@ -103,6 +103,10 @@ workers = 1
# Supported values for the 'disk_format' image attribute
#disk_formats=ami,ari,aki,vhd,vmdk,raw,qcow2,vdi,iso
# Directory to use for lock files. Default to a temp directory
# (string value). This setting needs to be the same for both
# glance-scrubber and glance-api.
#lock_path=<None>
# Set a system wide quota for every user. This value is the total number
# of bytes that a user can use across all storage systems. A value of

View File

@ -40,6 +40,11 @@ registry_port = 9191
# admin_user = %SERVICE_USER%
# admin_password = %SERVICE_PASSWORD%
# Directory to use for lock files. Default to a temp directory
# (string value). This setting needs to be the same for both
# glance-scrubber and glance-api.
#lock_path=<None>
# ================= Security Options ==========================
# AES key for encrypting store 'location' metadata, including

View File

@ -48,8 +48,6 @@ from glance.openstack.common import strutils
import glance.registry.client.v1.api as registry
from glance.store import (get_from_backend,
get_size_from_backend,
safe_delete_from_backend,
schedule_delayed_delete_from_backend,
get_store_from_location,
get_store_from_scheme)

View File

@ -41,9 +41,10 @@ def initiate_deletion(req, location, id, delayed_delete=False):
:param delayed_delete: whether data deletion will be delayed
"""
if delayed_delete:
glance.store.schedule_delayed_delete_from_backend(location, id)
glance.store.schedule_delayed_delete_from_backend(req.context,
location, id)
else:
glance.store.safe_delete_from_backend(location, req.context, id)
glance.store.safe_delete_from_backend(req.context, location, id)
def _kill(req, image_id):

View File

@ -61,7 +61,7 @@ def main():
glance.store.create_stores()
glance.store.verify_default_store()
app = glance.store.scrubber.Scrubber()
app = glance.store.scrubber.Scrubber(glance.store)
if CONF.daemon:
server = glance.store.scrubber.Daemon(CONF.wakeup_time)

View File

@ -0,0 +1,110 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import errno
import os
from glance.openstack.common import excutils
from glance.openstack.common.gettextutils import _ # noqa
from glance.openstack.common import log as logging
LOG = logging.getLogger(__name__)
_FILE_CACHE = {}
def ensure_tree(path):
"""Create a directory (and any ancestor directories required)
:param path: Directory to create
"""
try:
os.makedirs(path)
except OSError as exc:
if exc.errno == errno.EEXIST:
if not os.path.isdir(path):
raise
else:
raise
def read_cached_file(filename, force_reload=False):
"""Read from a file if it has been modified.
:param force_reload: Whether to reload the file.
:returns: A tuple with a boolean specifying if the data is fresh
or not.
"""
global _FILE_CACHE
if force_reload and filename in _FILE_CACHE:
del _FILE_CACHE[filename]
reloaded = False
mtime = os.path.getmtime(filename)
cache_info = _FILE_CACHE.setdefault(filename, {})
if not cache_info or mtime > cache_info.get('mtime', 0):
LOG.debug(_("Reloading cached file %s") % filename)
with open(filename) as fap:
cache_info['data'] = fap.read()
cache_info['mtime'] = mtime
reloaded = True
return (reloaded, cache_info['data'])
def delete_if_exists(path):
"""Delete a file, but ignore file not found error.
:param path: File to delete
"""
try:
os.unlink(path)
except OSError as e:
if e.errno == errno.ENOENT:
return
else:
raise
@contextlib.contextmanager
def remove_path_on_error(path):
"""Protect code that wants to operate on PATH atomically.
Any exception will cause PATH to be removed.
:param path: File to work with
"""
try:
yield
except Exception:
with excutils.save_and_reraise_exception():
delete_if_exists(path)
def file_open(*args, **kwargs):
"""Open file
see built-in file() documentation for more details
Note: The reason this is kept in a separate module is to easily
be able to provide a stub module that doesn't alter system
state at all (for unit tests)
"""
return file(*args, **kwargs)

View File

@ -0,0 +1,276 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import errno
import functools
import os
import time
import weakref
from eventlet import semaphore
from oslo.config import cfg
from glance.openstack.common import fileutils
from glance.openstack.common.gettextutils import _ # noqa
from glance.openstack.common import local
from glance.openstack.common import log as logging
LOG = logging.getLogger(__name__)
util_opts = [
cfg.BoolOpt('disable_process_locking', default=False,
help='Whether to disable inter-process locks'),
cfg.StrOpt('lock_path',
help=('Directory to use for lock files.'))
]
CONF = cfg.CONF
CONF.register_opts(util_opts)
def set_defaults(lock_path):
cfg.set_defaults(util_opts, lock_path=lock_path)
class _InterProcessLock(object):
"""Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
not require any cleanup. Since the lock is always held on a file
descriptor rather than outside of the process, the lock gets dropped
automatically if the process crashes, even if __exit__ is not executed.
There are no guarantees regarding usage by multiple green threads in a
single process here. This lock works only between processes. Exclusive
access between local threads should be achieved using the semaphores
in the @synchronized decorator.
Note these locks are released when the descriptor is closed, so it's not
safe to close the file descriptor while another green thread holds the
lock. Just opening and closing the lock file can break synchronisation,
so lock files must be accessed only using this abstraction.
"""
def __init__(self, name):
self.lockfile = None
self.fname = name
def __enter__(self):
self.lockfile = open(self.fname, 'w')
while True:
try:
# Using non-blocking locks since green threads are not
# patched to deal with blocking locking calls.
# Also upon reading the MSDN docs for locking(), it seems
# to have a laughable 10 attempts "blocking" mechanism.
self.trylock()
return self
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
# external locks synchronise things like iptables
# updates - give it some time to prevent busy spinning
time.sleep(0.01)
else:
raise
def __exit__(self, exc_type, exc_val, exc_tb):
try:
self.unlock()
self.lockfile.close()
except IOError:
LOG.exception(_("Could not release the acquired lock `%s`"),
self.fname)
def trylock(self):
raise NotImplementedError()
def unlock(self):
raise NotImplementedError()
class _WindowsLock(_InterProcessLock):
def trylock(self):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
def unlock(self):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
class _PosixLock(_InterProcessLock):
def trylock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
def unlock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
if os.name == 'nt':
import msvcrt
InterProcessLock = _WindowsLock
else:
import fcntl
InterProcessLock = _PosixLock
_semaphores = weakref.WeakValueDictionary()
@contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
"""Context based lock
This function yields a `semaphore.Semaphore` instance unless external is
True, in which case, it'll yield an InterProcessLock instance.
:param lock_file_prefix: The lock_file_prefix argument is used to provide
lock files on disk with a meaningful prefix.
:param external: The external keyword argument denotes whether this lock
should work across multiple processes. This means that if two different
workers both run a a method decorated with @synchronized('mylock',
external=True), only one of them will execute at a time.
:param lock_path: The lock_path keyword argument is used to specify a
special location for external lock files to live. If nothing is set, then
CONF.lock_path is used as a default.
"""
# NOTE(soren): If we ever go natively threaded, this will be racy.
# See http://stackoverflow.com/questions/5390569/dyn
# amically-allocating-and-destroying-mutexes
sem = _semaphores.get(name, semaphore.Semaphore())
if name not in _semaphores:
# this check is not racy - we're already holding ref locally
# so GC won't remove the item and there was no IO switch
# (only valid in greenthreads)
_semaphores[name] = sem
with sem:
LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})
# NOTE(mikal): I know this looks odd
if not hasattr(local.strong_store, 'locks_held'):
local.strong_store.locks_held = []
local.strong_store.locks_held.append(name)
try:
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s"'),
{'lock': name})
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path or CONF.lock_path
if not local_lock_path:
raise cfg.RequiredOptError('lock_path')
if not os.path.exists(local_lock_path):
fileutils.ensure_tree(local_lock_path)
LOG.info(_('Created lock path: %s'), local_lock_path)
def add_prefix(name, prefix):
if not prefix:
return name
sep = '' if prefix.endswith('-') else '-'
return '%s%s%s' % (prefix, sep, name)
# NOTE(mikal): the lock name cannot contain directory
# separators
lock_file_name = add_prefix(name.replace(os.sep, '_'),
lock_file_prefix)
lock_file_path = os.path.join(local_lock_path, lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock as lock:
LOG.debug(_('Got file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
yield lock
finally:
LOG.debug(_('Released file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
else:
yield sem
finally:
local.strong_store.locks_held.remove(name)
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
"""Synchronization decorator.
Decorating a method like so::
@synchronized('mylock')
def foo(self, *args):
...
ensures that only one thread will execute the foo method at a time.
Different methods can share the same lock::
@synchronized('mylock')
def foo(self, *args):
...
@synchronized('mylock')
def bar(self, *args):
...
This way only one of either foo or bar can be executing at a time.
"""
def wrap(f):
@functools.wraps(f)
def inner(*args, **kwargs):
with lock(name, lock_file_prefix, external, lock_path):
LOG.debug(_('Got semaphore / lock "%(function)s"'),
{'function': f.__name__})
return f(*args, **kwargs)
LOG.debug(_('Semaphore / lock released "%(function)s"'),
{'function': f.__name__})
return inner
return wrap
def synchronized_with_prefix(lock_file_prefix):
"""Partial object generator for the synchronization decorator.
Redefine @synchronized in each project like so::
(in nova/utils.py)
from nova.openstack.common import lockutils
synchronized = lockutils.synchronized_with_prefix('nova-')
(in nova/foo.py)
from nova import utils
@utils.synchronized('mylock')
def bar(self, *args):
...
The lock_file_prefix argument is used to provide lock files on disk with a
meaningful prefix.
"""
return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)

View File

@ -30,6 +30,7 @@ import glance.domain.proxy
from glance.openstack.common import importutils
import glance.openstack.common.log as logging
from glance.store import location
from glance.store import scrubber
LOG = logging.getLogger(__name__)
@ -53,8 +54,9 @@ store_opts = [
cfg.StrOpt('scrubber_datadir',
default='/var/lib/glance/scrubber',
help=_('Directory that the scrubber will use to track '
'information about what to delete. Make sure this is '
'also set in glance-api.conf')),
'information about what to delete. '
'Make sure this is set in glance-api.conf and '
'glance-scrubber.conf')),
cfg.BoolOpt('delayed_delete', default=False,
help=_('Turn on/off delayed delete.')),
cfg.IntOpt('scrub_time', default=0,
@ -265,7 +267,7 @@ def get_store_from_location(uri):
return loc.store_name
def safe_delete_from_backend(uri, context, image_id, **kwargs):
def safe_delete_from_backend(context, uri, image_id, **kwargs):
"""Given a uri, delete an image from the store."""
try:
return delete_from_backend(context, uri, **kwargs)
@ -281,31 +283,21 @@ def safe_delete_from_backend(uri, context, image_id, **kwargs):
LOG.error(msg)
def schedule_delayed_delete_from_backend(uri, image_id, **kwargs):
"""Given a uri, schedule the deletion of an image."""
datadir = CONF.scrubber_datadir
delete_time = time.time() + CONF.scrub_time
file_path = os.path.join(datadir, str(image_id))
utils.safe_mkdirs(datadir)
if os.path.exists(file_path):
msg = _("Image id %(image_id)s already queued for delete") % {
'image_id': image_id}
raise exception.Duplicate(msg)
if CONF.metadata_encryption_key is not None:
uri = crypt.urlsafe_encrypt(CONF.metadata_encryption_key, uri, 64)
with open(file_path, 'w') as f:
f.write('\n'.join([uri, str(int(delete_time))]))
os.chmod(file_path, 0o600)
os.utime(file_path, (delete_time, delete_time))
def schedule_delayed_delete_from_backend(context, uri, image_id, **kwargs):
"""Given a uri, schedule the deletion of an image location."""
(file_queue, _db_queue) = scrubber.get_scrub_queues()
# NOTE(zhiyan): Defautly ask glance-api store using file based queue.
# In future we can change it using DB based queued instead,
# such as using image location's status to saving pending delete flag
# when that property be added.
file_queue.add_location(image_id, uri)
def delete_image_from_backend(context, store_api, image_id, uri):
if CONF.delayed_delete:
store_api.schedule_delayed_delete_from_backend(uri, image_id)
store_api.schedule_delayed_delete_from_backend(context, uri, image_id)
else:
store_api.safe_delete_from_backend(uri, context, image_id)
store_api.safe_delete_from_backend(context, uri, image_id)
def check_location_metadata(val, key=''):

View File

@ -15,6 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import calendar
import eventlet
import os
@ -26,13 +27,23 @@ from glance.common import crypt
from glance.common import exception
from glance.common import utils
from glance import context
from glance.openstack.common import lockutils
import glance.openstack.common.log as logging
import glance.openstack.common.uuidutils as uuidutils
import glance.registry.client.v1.api as registry
from glance import store
LOG = logging.getLogger(__name__)
scrubber_opts = [
cfg.StrOpt('scrubber_datadir',
default='/var/lib/glance/scrubber',
help=_('Directory that the scrubber will use to track '
'information about what to delete. '
'Make sure this is set in glance-api.conf and '
'glance-scrubber.conf')),
cfg.IntOpt('scrub_time', default=0,
help=_('The amount of time in seconds to delay before '
'performing a delete.')),
cfg.BoolOpt('cleanup_scrubber', default=False,
help=_('A boolean that determines if the scrubber should '
'clean up the files it uses for taking data. Only '
@ -45,6 +56,300 @@ scrubber_opts = [
CONF = cfg.CONF
CONF.register_opts(scrubber_opts)
CONF.import_opt('metadata_encryption_key', 'glance.common.config')
class ScrubQueue(object):
"""Image scrub queue base class.
The queue contains image's location which need to delete from backend.
"""
def __init__(self):
registry.configure_registry_client()
registry.configure_registry_admin_creds()
self.registry = registry.get_registry_client(context.RequestContext())
@abc.abstractmethod
def add_location(self, image_id, uri):
"""Adding image location to scrub queue.
:param image_id: The opaque image identifier
:param uri: The opaque image location uri
"""
pass
@abc.abstractmethod
def get_all_locations(self):
"""Returns a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
pass
@abc.abstractmethod
def pop_all_locations(self):
"""Pop out a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
pass
@abc.abstractmethod
def has_image(self, image_id):
"""Returns whether the queue contains an image or not.
:param image_id: The opaque image identifier
:retval a boolean value to inform including or not
"""
pass
class ScrubFileQueue(ScrubQueue):
"""File-based image scrub queue class."""
def __init__(self):
super(ScrubFileQueue, self).__init__()
self.scrubber_datadir = CONF.scrubber_datadir
utils.safe_mkdirs(self.scrubber_datadir)
self.scrub_time = CONF.scrub_time
self.metadata_encryption_key = CONF.metadata_encryption_key
def _read_queue_file(self, file_path):
"""Reading queue file to loading deleted location and timestamp out.
:param file_path: Queue file full path
:retval a list of image location timestamp tuple from queue file
"""
uris = []
delete_times = []
try:
with open(file_path, 'r') as f:
while True:
uri = f.readline().strip()
if uri:
uris.append(uri)
delete_times.append(int(f.readline().strip()))
else:
break
except Exception:
LOG.error(_("%s file can not be read.") % file_path)
return uris, delete_times
def _update_queue_file(self, file_path, remove_record_idxs):
"""Updating queue file to remove such queue records.
:param file_path: Queue file full path
:param remove_record_idxs: A list of record index those want to remove
"""
try:
with open(file_path, 'r') as f:
lines = f.readlines()
# NOTE(zhiyan) we need bottom up removing to
# keep record index be valid.
remove_record_idxs.sort(reverse=True)
for record_idx in remove_record_idxs:
# Each record has two lines
line_no = (record_idx + 1) * 2 - 1
del lines[line_no:line_no + 2]
with open(file_path, 'w') as f:
f.write(''.join(lines))
os.chmod(file_path, 0o600)
except Exception:
LOG.error(_("%s file can not be wrote.") % file_path)
def add_location(self, image_id, uri):
"""Adding image location to scrub queue.
:param image_id: The opaque image identifier
:param uri: The opaque image location uri
"""
with lockutils.lock("scrubber-%s" % image_id,
lock_file_prefix='glance-', external=True):
# NOTE(zhiyan): make sure scrubber does not cleanup
# 'pending_delete' images concurrently before the code
# get lock and reach here.
try:
image = self.registry.get_image(image_id)
if image['status'] == 'deleted':
return
except exception.NotFound as e:
LOG.error(_("Failed to find image to delete: "
"%(e)s") % locals())
return
delete_time = time.time() + self.scrub_time
file_path = os.path.join(self.scrubber_datadir, str(image_id))
if self.metadata_encryption_key is not None:
uri = crypt.urlsafe_encrypt(self.metadata_encryption_key,
uri, 64)
if os.path.exists(file_path):
# Append the uri of location to the queue file
with open(file_path, 'a') as f:
f.write('\n')
f.write('\n'.join([uri, str(int(delete_time))]))
else:
# NOTE(zhiyan): Protect the file before we write any data.
open(file_path, 'w').close()
os.chmod(file_path, 0o600)
with open(file_path, 'w') as f:
f.write('\n'.join([uri, str(int(delete_time))]))
os.utime(file_path, (delete_time, delete_time))
def _walk_all_locations(self, remove=False):
"""Returns a list of image id and location tuple from scrub queue.
:param remove: Whether remove location from queue or not after walk
:retval a list of image image_id and location tuple from scrub queue
"""
if not os.path.exists(self.scrubber_datadir):
LOG.info(_("%s directory does not exist.") % self.scrubber_datadir)
return []
ret = []
for root, dirs, files in os.walk(self.scrubber_datadir):
for image_id in files:
if not uuidutils.is_uuid_like(image_id):
continue
with lockutils.lock("scrubber-%s" % image_id,
lock_file_prefix='glance-', external=True):
file_path = os.path.join(self.scrubber_datadir, image_id)
uris, delete_times = self._read_queue_file(file_path)
remove_record_idxs = []
skipped = False
for (record_idx, delete_time) in enumerate(delete_times):
if delete_time > time.time():
skipped = True
continue
else:
ret.append((image_id, uris[record_idx]))
remove_record_idxs.append(record_idx)
if remove:
if skipped:
# NOTE(zhiyan): remove location records from
# the queue file.
self._update_queue_file(file_path,
remove_record_idxs)
else:
utils.safe_remove(file_path)
return ret
def get_all_locations(self):
"""Returns a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
return self._walk_all_locations()
def pop_all_locations(self):
"""Pop out a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
return self._walk_all_locations(remove=True)
def has_image(self, image_id):
"""Returns whether the queue contains an image or not.
:param image_id: The opaque image identifier
:retval a boolean value to inform including or not
"""
return os.path.exists(os.path.join(self.scrubber_datadir,
str(image_id)))
class ScrubDBQueue(ScrubQueue):
"""Database-based image scrub queue class."""
def __init__(self):
super(ScrubDBQueue, self).__init__()
self.cleanup_scrubber_time = CONF.cleanup_scrubber_time
def add_location(self, image_id, uri):
"""Adding image location to scrub queue.
:param image_id: The opaque image identifier
:param uri: The opaque image location uri
"""
raise NotImplementedError
def _walk_all_locations(self, remove=False):
"""Returns a list of image id and location tuple from scrub queue.
:param remove: Whether remove location from queue or not after walk
:retval a list of image id and location tuple from scrub queue
"""
filters = {'deleted': True,
'is_public': 'none',
'status': 'pending_delete'}
ret = []
for image in self.registry.get_images_detailed(filters=filters):
deleted_at = image.get('deleted_at')
if not deleted_at:
continue
# NOTE: Strip off microseconds which may occur after the last '.,'
# Example: 2012-07-07T19:14:34.974216
date_str = deleted_at.rsplit('.', 1)[0].rsplit(',', 1)[0]
delete_time = calendar.timegm(time.strptime(date_str,
"%Y-%m-%dT%H:%M:%S"))
if delete_time + self.cleanup_scrubber_time > time.time():
continue
ret.extend([(image['id'], location['uri'])
for location in image['location_data']])
if remove:
self.registry.update_image(image['id'], {'status': 'deleted'})
return ret
def get_all_locations(self):
"""Returns a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
return self._walk_all_locations()
def pop_all_locations(self):
"""Pop out a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
return self._walk_all_locations(remove=True)
def has_image(self, image_id):
"""Returns whether the queue contains an image or not.
:param image_id: The opaque image identifier
:retval a boolean value to inform including or not
"""
try:
image = self.registry.get_image(image_id)
return image['status'] == 'pending_delete'
except exception.NotFound as e:
return False
_file_queue = None
_db_queue = None
def get_scrub_queues():
global _file_queue, _db_queue
if not _file_queue:
_file_queue = ScrubFileQueue()
if not _db_queue:
_db_queue = ScrubDBQueue()
return (_file_queue, _db_queue)
class Daemon(object):
@ -73,140 +378,148 @@ class Daemon(object):
class Scrubber(object):
CLEANUP_FILE = ".cleanup"
def __init__(self, store_api):
LOG.info(_("Initializing scrubber with configuration: %s") %
unicode({'scrubber_datadir': CONF.scrubber_datadir,
'cleanup': CONF.cleanup_scrubber,
'cleanup_time': CONF.cleanup_scrubber_time,
'registry_host': CONF.registry_host,
'registry_port': CONF.registry_port}))
def __init__(self):
self.datadir = CONF.scrubber_datadir
self.cleanup = CONF.cleanup_scrubber
self.cleanup_time = CONF.cleanup_scrubber_time
# configs for registry API store auth
self.admin_user = CONF.admin_user
self.admin_tenant = CONF.admin_tenant_name
utils.safe_mkdirs(CONF.scrubber_datadir)
host, port = CONF.registry_host, CONF.registry_port
LOG.info(_("Initializing scrubber with conf: %s") %
{'datadir': self.datadir, 'cleanup': self.cleanup,
'cleanup_time': self.cleanup_time,
'registry_host': host, 'registry_port': port})
self.store_api = store_api
registry.configure_registry_client()
registry.configure_registry_admin_creds()
ctx = context.RequestContext()
self.registry = registry.get_registry_client(ctx)
self.registry = registry.get_registry_client(context.RequestContext())
utils.safe_mkdirs(self.datadir)
(self.file_queue, self.db_queue) = get_scrub_queues()
def _get_delete_jobs(self, queue, pop):
try:
if pop:
image_id_uri_list = queue.pop_all_locations()
else:
image_id_uri_list = queue.get_all_locations()
except:
LOG.error(_("Can not %s scrub jobs from queue.") %
'pop' if pop else 'get')
return None
delete_jobs = {}
for image_id, image_uri in image_id_uri_list:
if not image_id in delete_jobs:
delete_jobs[image_id] = []
delete_jobs[image_id].append((image_id, image_uri))
return delete_jobs
def run(self, pool, event=None):
now = time.time()
delete_jobs = self._get_delete_jobs(self.file_queue, True)
if delete_jobs:
for image_id, jobs in delete_jobs.iteritems():
self._scrub_image(pool, image_id, jobs)
if not os.path.exists(self.datadir):
LOG.info(_("%s does not exist") % self.datadir)
return
delete_work = []
for root, dirs, files in os.walk(self.datadir):
for id in files:
if id == self.CLEANUP_FILE:
continue
file_name = os.path.join(root, id)
delete_time = os.stat(file_name).st_mtime
if delete_time > now:
continue
uri, delete_time = read_queue_file(file_name)
if delete_time > now:
continue
delete_work.append((id, uri, now))
LOG.info(_("Deleting %s images") % len(delete_work))
# NOTE(bourke): The starmap must be iterated to do work
for job in pool.starmap(self._delete, delete_work):
pass
if self.cleanup:
if CONF.cleanup_scrubber:
self._cleanup(pool)
def _delete(self, id, uri, now):
file_path = os.path.join(self.datadir, str(id))
def _scrub_image(self, pool, image_id, delete_jobs):
if len(delete_jobs) == 0:
return
LOG.info(_("Scrubbing image %(id)s from %(count)d locations.") %
{'id': image_id, 'count': len(delete_jobs)})
# NOTE(bourke): The starmap must be iterated to do work
list(pool.starmap(self._delete_image_from_backend, delete_jobs))
image = self.registry.get_image(image_id)
if (image['status'] == 'pending_delete' and
not self.file_queue.has_image(image_id)):
self.registry.update_image(image_id, {'status': 'deleted'})
def _delete_image_from_backend(self, image_id, uri):
if CONF.metadata_encryption_key is not None:
uri = crypt.urlsafe_decrypt(CONF.metadata_encryption_key, uri)
try:
LOG.debug(_("Deleting %(id)s") % {'id': id})
LOG.debug(_("Deleting %(uri)s from image %(image_id)s.") %
{'image_id': image_id, 'uri': uri})
# Here we create a request context with credentials to support
# delayed delete when using multi-tenant backend storage
ctx = context.RequestContext(auth_tok=self.registry.auth_tok,
user=self.admin_user,
tenant=self.admin_tenant)
store.delete_from_backend(ctx, uri)
except store.UnsupportedBackend:
msg = _("Failed to delete image from store (%(id)s).")
LOG.error(msg % {'id': id})
except exception.NotFound:
msg = _("Image not found in store (%(id)s).")
LOG.error(msg % {'id': id})
admin_tenant = CONF.admin_tenant_name
auth_token = self.registry.auth_tok
admin_context = context.RequestContext(user=CONF.admin_user,
tenant=admin_tenant,
auth_tok=auth_token)
self.registry.update_image(id, {'status': 'deleted'})
utils.safe_remove(file_path)
self.store_api.delete_from_backend(admin_context, uri)
except Exception:
msg = _("Failed to delete image %(image_id)s from %(uri)s.")
LOG.error(msg % {'image_id': image_id, 'uri': uri})
def _read_cleanup_file(self, file_path):
"""Reading cleanup to get latest cleanup timestamp.
:param file_path: Cleanup status file full path
:retval latest cleanup timestamp
"""
try:
if not os.path.exists(file_path):
msg = _("%s file is not exists.") % unicode(file_path)
raise Exception(msg)
atime = int(os.path.getatime(file_path))
mtime = int(os.path.getmtime(file_path))
if atime != mtime:
msg = _("%s file contains conflicting cleanup "
"timestamp.") % unicode(file_path)
raise Exception(msg)
return atime
except Exception as e:
LOG.error(e)
return None
def _update_cleanup_file(self, file_path, cleanup_time):
"""Update latest cleanup timestamp to cleanup file.
:param file_path: Cleanup status file full path
:param cleanup_time: The Latest cleanup timestamp
"""
try:
open(file_path, 'w').close()
os.chmod(file_path, 0o600)
os.utime(file_path, (cleanup_time, cleanup_time))
except Exception:
LOG.error(_("%s file can not be created.") % unicode(file_path))
def _cleanup(self, pool):
now = time.time()
cleanup_file = os.path.join(self.datadir, self.CLEANUP_FILE)
cleanup_file = os.path.join(CONF.scrubber_datadir, ".cleanup")
if not os.path.exists(cleanup_file):
write_queue_file(cleanup_file, 'cleanup', now)
self._update_cleanup_file(cleanup_file, now)
return
_uri, last_run_time = read_queue_file(cleanup_file)
cleanup_time = last_run_time + self.cleanup_time
last_cleanup_time = self._read_cleanup_file(cleanup_file)
cleanup_time = last_cleanup_time + CONF.cleanup_scrubber_time
if cleanup_time > now:
return
LOG.info(_("Getting images deleted before %s") % self.cleanup_time)
write_queue_file(cleanup_file, 'cleanup', now)
LOG.info(_("Getting images deleted before "
"%s") % CONF.cleanup_scrubber_time)
self._update_cleanup_file(cleanup_file, now)
filters = {'deleted': True, 'is_public': 'none',
'status': 'pending_delete'}
pending_deletes = self.registry.get_images_detailed(filters=filters)
delete_jobs = self._get_delete_jobs(self.db_queue, False)
if not delete_jobs:
return
delete_work = []
for pending_delete in pending_deletes:
deleted_at = pending_delete.get('deleted_at')
if not deleted_at:
continue
time_fmt = "%Y-%m-%dT%H:%M:%S"
# NOTE: Strip off microseconds which may occur after the last '.,'
# Example: 2012-07-07T19:14:34.974216
date_str = deleted_at.rsplit('.', 1)[0].rsplit(',', 1)[0]
delete_time = calendar.timegm(time.strptime(date_str,
time_fmt))
if delete_time + self.cleanup_time > now:
continue
delete_work.append((pending_delete['id'],
pending_delete['location'],
now))
LOG.info(_("Deleting %s images") % len(delete_work))
# NOTE(bourke): The starmap must be iterated to do work
for job in pool.starmap(self._delete, delete_work):
pass
def read_queue_file(file_path):
with open(file_path) as f:
uri = f.readline().strip()
delete_time = int(f.readline().strip())
return uri, delete_time
def write_queue_file(file_path, uri, delete_time):
with open(file_path, 'w') as f:
f.write('\n'.join([uri, str(int(delete_time))]))
os.chmod(file_path, 0o600)
os.utime(file_path, (delete_time, delete_time))
for image_id, jobs in delete_jobs.iteritems():
with lockutils.lock("scrubber-%s" % image_id,
lock_file_prefix='glance-', external=True):
if not self.file_queue.has_image(image_id):
# NOTE(zhiyan): scrubber should not cleanup this image
# since a queue file be created for this 'pending_delete'
# image concurrently before the code get lock and
# reach here. The checking only be worth if glance-api and
# glance-scrubber service be deployed on a same host.
self._scrub_image(pool, image_id, jobs)

View File

@ -315,6 +315,7 @@ class ApiServer(Server):
self.sql_connection = os.environ.get('GLANCE_TEST_SQL_CONNECTION',
default_sql_connection)
self.user_storage_quota = 0
self.lock_path = self.test_dir
self.conf_base = """[DEFAULT]
verbose = %(verbose)s
@ -363,7 +364,8 @@ show_image_direct_url = %(show_image_direct_url)s
show_multiple_locations = %(show_multiple_locations)s
user_storage_quota = %(user_storage_quota)s
enable_v1_api = %(enable_v1_api)s
enable_v2_api= %(enable_v2_api)s
enable_v2_api = %(enable_v2_api)s
lock_path = %(lock_path)s
[paste_deploy]
flavor = %(deployment_flavor)s
"""
@ -515,6 +517,7 @@ class ScrubberDaemon(Server):
self.swift_store_auth_version = kwargs.get("swift_store_auth_version",
"2")
self.metadata_encryption_key = "012345678901234567890123456789ab"
self.lock_path = self.test_dir
self.conf_base = """[DEFAULT]
verbose = %(verbose)s
debug = %(debug)s
@ -531,6 +534,7 @@ swift_store_user = %(swift_store_user)s
swift_store_key = %(swift_store_key)s
swift_store_container = %(swift_store_container)s
swift_store_auth_version = %(swift_store_auth_version)s
lock_path = %(lock_path)s
"""
def start(self, expect_exit=True, expected_exitcode=0, **kwargs):

View File

@ -69,7 +69,8 @@ class IsolatedUnitTest(StoreClearingUnitTest):
debug=False,
default_store='filesystem',
filesystem_store_datadir=os.path.join(self.test_dir),
policy_file=policy_file)
policy_file=policy_file,
lock_path=os.path.join(self.test_dir))
stubs.stub_out_registry_and_store_server(self.stubs,
self.test_dir,
registry=self.registry)

View File

@ -184,6 +184,6 @@ class TestHttpStore(base.StoreClearingUnitTest):
ctx = context.RequestContext()
stub_out_registry_image_update(self.stubs)
try:
safe_delete_from_backend(uri, ctx, 'image_id')
safe_delete_from_backend(ctx, uri, 'image_id')
except exception.StoreDeleteNotSupported:
self.fail('StoreDeleteNotSupported should be swallowed')

View File

@ -20,6 +20,7 @@ import shutil
import time
import tempfile
import eventlet
import mox
from glance.common import exception
@ -50,17 +51,17 @@ class TestScrubber(test_utils.BaseTestCase):
uri = 'file://some/path/%s' % (fname)
id = 'helloworldid'
now = time.time()
scrub = glance.store.scrubber.Scrubber()
scrub = glance.store.scrubber.Scrubber(glance.store)
scrub.registry = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(glance.store, "delete_from_backend")
scrub.registry.get_image(id).AndReturn({'status': 'pending_delete'})
scrub.registry.update_image(id, {'status': 'deleted'})
self.mox.StubOutWithMock(glance.store, "delete_from_backend")
glance.store.delete_from_backend(
mox.IgnoreArg(),
uri).AndRaise(ex)
self.mox.ReplayAll()
scrub._delete(id, uri, now)
scrub._scrub_image(eventlet.greenpool.GreenPool(1),
id, [(id, uri)])
self.mox.VerifyAll()
q_path = os.path.join(self.data_dir, id)

View File

@ -124,18 +124,20 @@ class FakeStoreAPI(object):
except KeyError:
raise exception.NotFound()
def safe_delete_from_backend(self, uri, context, id, **kwargs):
def safe_delete_from_backend(self, context, uri, id, **kwargs):
try:
del self.data[uri]
except KeyError:
pass
def schedule_delayed_delete_from_backend(self, uri, id, **kwargs):
def schedule_delayed_delete_from_backend(self, context, uri, id, **kwargs):
pass
def delete_image_from_backend(self, context, store_api, image_id, uri):
glance.store.delete_image_from_backend(context,
store_api, image_id, uri)
if CONF.delayed_delete:
self.schedule_delayed_delete_from_backend(context, uri, image_id)
else:
self.safe_delete_from_backend(context, uri, image_id)
def get_size_from_backend(self, context, location):
return self.get_from_backend(context, location)[1]

View File

@ -43,7 +43,7 @@ class TestUploadUtils(base.StoreClearingUnitTest):
id = unit_test_utils.UUID1
self.mox.StubOutWithMock(glance.store, "safe_delete_from_backend")
glance.store.safe_delete_from_backend(location, req.context, id)
glance.store.safe_delete_from_backend(req.context, location, id)
self.mox.ReplayAll()
upload_utils.initiate_deletion(req, location, id)
@ -57,7 +57,8 @@ class TestUploadUtils(base.StoreClearingUnitTest):
self.mox.StubOutWithMock(glance.store,
"schedule_delayed_delete_from_backend")
glance.store.schedule_delayed_delete_from_backend(location,
glance.store.schedule_delayed_delete_from_backend(req.context,
location,
id)
self.mox.ReplayAll()

View File

@ -1,11 +1,13 @@
[DEFAULT]
# The list of modules to copy from openstack-common
module=fileutils
module=gettextutils
module=importutils
module=install_venv_common
module=jsonutils
module=local
module=lockutils
module=log
module=notifier
module=policy