Adding status field to image location -- scrubber changes
Adding a status field to image's each location property, each location status can be 'active', 'pending_delete' and 'deleted'. Under location's status information Scrubber service can make cleanup based on DB records also but not a dedicated queue-file for each image. This is third part of this change which covered scrubber DB queue enablement and double queue supporting. Partially-Implements BP: image-location-status Change-Id: I8ee9e9254c8371bbbce8b28d643e3d474b2361bc Signed-off-by: Zhi Yan Liu <zhiyanl@cn.ibm.com>
This commit is contained in:
parent
44e607df7c
commit
70e0a246f9
@ -48,9 +48,61 @@ registry_port = 9191
|
||||
# glance-scrubber and glance-api.
|
||||
#lock_path=<None>
|
||||
|
||||
# API to use for accessing data. Default value points to sqlalchemy
|
||||
# package, it is also possible to use: glance.db.registry.api
|
||||
#data_api = glance.db.sqlalchemy.api
|
||||
|
||||
# ================= Security Options ==========================
|
||||
|
||||
# AES key for encrypting store 'location' metadata, including
|
||||
# -- if used -- Swift or S3 credentials
|
||||
# Should be set to a random string of length 16, 24 or 32 bytes
|
||||
#metadata_encryption_key = <16, 24 or 32 char registry metadata key>
|
||||
|
||||
# ================= Database Options ===============+==========
|
||||
|
||||
[database]
|
||||
|
||||
# The SQLAlchemy connection string used to connect to the
|
||||
# database (string value)
|
||||
#connection=sqlite:////glance/openstack/common/db/$sqlite_db
|
||||
|
||||
# The SQLAlchemy connection string used to connect to the
|
||||
# slave database (string value)
|
||||
#slave_connection=
|
||||
|
||||
# timeout before idle sql connections are reaped (integer
|
||||
# value)
|
||||
#idle_timeout=3600
|
||||
|
||||
# Minimum number of SQL connections to keep open in a pool
|
||||
# (integer value)
|
||||
#min_pool_size=1
|
||||
|
||||
# Maximum number of SQL connections to keep open in a pool
|
||||
# (integer value)
|
||||
#max_pool_size=<None>
|
||||
|
||||
# maximum db connection retries during startup. (setting -1
|
||||
# implies an infinite retry count) (integer value)
|
||||
#max_retries=10
|
||||
|
||||
# interval between retries of opening a sql connection
|
||||
# (integer value)
|
||||
#retry_interval=10
|
||||
|
||||
# If set, use this value for max_overflow with sqlalchemy
|
||||
# (integer value)
|
||||
#max_overflow=<None>
|
||||
|
||||
# Verbosity of SQL debugging information. 0=None,
|
||||
# 100=Everything (integer value)
|
||||
#connection_debug=0
|
||||
|
||||
# Add python stack traces to SQL as comment strings (boolean
|
||||
# value)
|
||||
#connection_trace=false
|
||||
|
||||
# If set, use this value for pool_timeout with sqlalchemy
|
||||
# (integer value)
|
||||
#pool_timeout=<None>
|
||||
|
@ -50,7 +50,7 @@ class RegistryClient(BaseClient):
|
||||
**kwargs)
|
||||
|
||||
def decrypt_metadata(self, image_metadata):
|
||||
if self.metadata_encryption_key is not None:
|
||||
if self.metadata_encryption_key:
|
||||
if image_metadata.get('location'):
|
||||
location = crypt.urlsafe_decrypt(self.metadata_encryption_key,
|
||||
image_metadata['location'])
|
||||
@ -67,7 +67,7 @@ class RegistryClient(BaseClient):
|
||||
return image_metadata
|
||||
|
||||
def encrypt_metadata(self, image_metadata):
|
||||
if self.metadata_encryption_key is not None:
|
||||
if self.metadata_encryption_key:
|
||||
location_url = image_metadata.get('location')
|
||||
if location_url:
|
||||
location = crypt.urlsafe_encrypt(self.metadata_encryption_key,
|
||||
|
@ -26,15 +26,17 @@ from glance.common import crypt
|
||||
from glance.common import exception
|
||||
from glance.common import utils
|
||||
from glance import context
|
||||
from glance.openstack.common import gettextutils
|
||||
import glance.db as db_api
|
||||
from glance import i18n
|
||||
from glance.openstack.common import lockutils
|
||||
import glance.openstack.common.log as logging
|
||||
import glance.registry.client.v1.api as registry
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
_LE = gettextutils._LE
|
||||
_LI = gettextutils._LI
|
||||
_LW = gettextutils._LW
|
||||
|
||||
_LI = i18n._LI
|
||||
_LW = i18n._LW
|
||||
_LE = i18n._LE
|
||||
|
||||
scrubber_opts = [
|
||||
cfg.StrOpt('scrubber_datadir',
|
||||
@ -67,6 +69,8 @@ class ScrubQueue(object):
|
||||
The queue contains image's location which need to delete from backend.
|
||||
"""
|
||||
def __init__(self):
|
||||
self.scrub_time = CONF.scrub_time
|
||||
self.metadata_encryption_key = CONF.metadata_encryption_key
|
||||
registry.configure_registry_client()
|
||||
registry.configure_registry_admin_creds()
|
||||
self.registry = registry.get_registry_client(context.RequestContext())
|
||||
@ -115,33 +119,33 @@ class ScrubFileQueue(ScrubQueue):
|
||||
super(ScrubFileQueue, self).__init__()
|
||||
self.scrubber_datadir = CONF.scrubber_datadir
|
||||
utils.safe_mkdirs(self.scrubber_datadir)
|
||||
self.scrub_time = CONF.scrub_time
|
||||
self.metadata_encryption_key = CONF.metadata_encryption_key
|
||||
|
||||
def _read_queue_file(self, file_path):
|
||||
"""Reading queue file to loading deleted location and timestamp out.
|
||||
|
||||
:param file_path: Queue file full path
|
||||
|
||||
:retval a list of image location timestamp tuple from queue file
|
||||
:retval a list of image location id, uri and timestamp tuple
|
||||
"""
|
||||
loc_ids = []
|
||||
uris = []
|
||||
delete_times = []
|
||||
|
||||
try:
|
||||
with open(file_path, 'r') as f:
|
||||
while True:
|
||||
uri = f.readline().strip()
|
||||
if uri:
|
||||
uris.append(uri)
|
||||
loc_id = f.readline().strip()
|
||||
if loc_id:
|
||||
lid = unicode(loc_id)
|
||||
loc_ids.append(int(lid) if lid.isdigit() else lid)
|
||||
uris.append(unicode(f.readline().strip()))
|
||||
delete_times.append(int(f.readline().strip()))
|
||||
else:
|
||||
break
|
||||
return loc_ids, uris, delete_times
|
||||
except Exception:
|
||||
LOG.error(_LE("%s file can not be read.") % file_path)
|
||||
|
||||
return uris, delete_times
|
||||
|
||||
def _update_queue_file(self, file_path, remove_record_idxs):
|
||||
"""Updating queue file to remove such queue records.
|
||||
|
||||
@ -155,9 +159,10 @@ class ScrubFileQueue(ScrubQueue):
|
||||
# keep record index be valid.
|
||||
remove_record_idxs.sort(reverse=True)
|
||||
for record_idx in remove_record_idxs:
|
||||
# Each record has two lines
|
||||
line_no = (record_idx + 1) * 2 - 1
|
||||
del lines[line_no:line_no + 2]
|
||||
# Each record has three lines:
|
||||
# location id, uri and delete time.
|
||||
line_no = (record_idx + 1) * 3 - 1
|
||||
del lines[line_no:line_no + 3]
|
||||
with open(file_path, 'w') as f:
|
||||
f.write(''.join(lines))
|
||||
os.chmod(file_path, 0o600)
|
||||
@ -193,27 +198,32 @@ class ScrubFileQueue(ScrubQueue):
|
||||
utils.exception_to_str(e))
|
||||
return False
|
||||
|
||||
delete_time = time.time() + self.scrub_time
|
||||
file_path = os.path.join(self.scrubber_datadir, str(image_id))
|
||||
|
||||
if self.metadata_encryption_key is not None:
|
||||
loc_id = location.get('id', '-')
|
||||
if self.metadata_encryption_key:
|
||||
uri = crypt.urlsafe_encrypt(self.metadata_encryption_key,
|
||||
location['url'], 64)
|
||||
else:
|
||||
uri = location['url']
|
||||
delete_time = time.time() + self.scrub_time
|
||||
file_path = os.path.join(self.scrubber_datadir, str(image_id))
|
||||
|
||||
if os.path.exists(file_path):
|
||||
# Append the uri of location to the queue file
|
||||
with open(file_path, 'a') as f:
|
||||
f.write('\n')
|
||||
f.write('\n'.join([uri, str(int(delete_time))]))
|
||||
f.write('\n'.join([str(loc_id),
|
||||
uri,
|
||||
str(int(delete_time))]))
|
||||
else:
|
||||
# NOTE(zhiyan): Protect the file before we write any data.
|
||||
open(file_path, 'w').close()
|
||||
os.chmod(file_path, 0o600)
|
||||
with open(file_path, 'w') as f:
|
||||
f.write('\n'.join([uri, str(int(delete_time))]))
|
||||
f.write('\n'.join([str(loc_id),
|
||||
uri,
|
||||
str(int(delete_time))]))
|
||||
os.utime(file_path, (delete_time, delete_time))
|
||||
|
||||
return True
|
||||
|
||||
def _walk_all_locations(self, remove=False):
|
||||
@ -221,7 +231,7 @@ class ScrubFileQueue(ScrubQueue):
|
||||
|
||||
:param remove: Whether remove location from queue or not after walk
|
||||
|
||||
:retval a list of image image_id and location tuple from scrub queue
|
||||
:retval a list of image id, location id and uri tuple from scrub queue
|
||||
"""
|
||||
if not os.path.exists(self.scrubber_datadir):
|
||||
LOG.info(_LI("%s directory does not exist.") %
|
||||
@ -236,7 +246,8 @@ class ScrubFileQueue(ScrubQueue):
|
||||
with lockutils.lock("scrubber-%s" % image_id,
|
||||
lock_file_prefix='glance-', external=True):
|
||||
file_path = os.path.join(self.scrubber_datadir, image_id)
|
||||
uris, delete_times = self._read_queue_file(file_path)
|
||||
records = self._read_queue_file(file_path)
|
||||
loc_ids, uris, delete_times = records
|
||||
|
||||
remove_record_idxs = []
|
||||
skipped = False
|
||||
@ -245,8 +256,11 @@ class ScrubFileQueue(ScrubQueue):
|
||||
skipped = True
|
||||
continue
|
||||
else:
|
||||
ret.append((image_id, uris[record_idx]))
|
||||
ret.append((image_id,
|
||||
loc_ids[record_idx],
|
||||
uris[record_idx]))
|
||||
remove_record_idxs.append(record_idx)
|
||||
|
||||
if remove:
|
||||
if skipped:
|
||||
# NOTE(zhiyan): remove location records from
|
||||
@ -286,7 +300,11 @@ class ScrubDBQueue(ScrubQueue):
|
||||
"""Database-based image scrub queue class."""
|
||||
def __init__(self):
|
||||
super(ScrubDBQueue, self).__init__()
|
||||
self.cleanup_scrubber_time = CONF.cleanup_scrubber_time
|
||||
admin_tenant_name = CONF.admin_tenant_name
|
||||
admin_token = self.registry.auth_tok
|
||||
self.admin_context = context.RequestContext(user=CONF.admin_user,
|
||||
tenant=admin_tenant_name,
|
||||
auth_tok=admin_token)
|
||||
|
||||
def add_location(self, image_id, location, user_context=None):
|
||||
"""Adding image location to scrub queue.
|
||||
@ -297,14 +315,21 @@ class ScrubDBQueue(ScrubQueue):
|
||||
|
||||
:retval A boolean value to indicate success or not
|
||||
"""
|
||||
raise NotImplementedError
|
||||
loc_id = location.get('id')
|
||||
if loc_id:
|
||||
db_api.get_api().image_location_delete(self.admin_context,
|
||||
image_id, loc_id,
|
||||
'pending_delete')
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def _walk_all_locations(self, remove=False):
|
||||
"""Returns a list of image id and location tuple from scrub queue.
|
||||
|
||||
:param remove: Whether remove location from queue or not after walk
|
||||
|
||||
:retval a list of image id and location tuple from scrub queue
|
||||
:retval a list of image id, location id and uri tuple from scrub queue
|
||||
"""
|
||||
filters = {'deleted': True,
|
||||
'is_public': 'none',
|
||||
@ -321,14 +346,28 @@ class ScrubDBQueue(ScrubQueue):
|
||||
delete_time = calendar.timegm(time.strptime(date_str,
|
||||
"%Y-%m-%dT%H:%M:%S"))
|
||||
|
||||
if delete_time + self.cleanup_scrubber_time > time.time():
|
||||
if delete_time + self.scrub_time > time.time():
|
||||
continue
|
||||
|
||||
ret.extend([(image['id'], location['uri'])
|
||||
for location in image['location_data']])
|
||||
for loc in image['location_data']:
|
||||
if loc['status'] != 'pending_delete':
|
||||
continue
|
||||
|
||||
if remove:
|
||||
self.registry.update_image(image['id'], {'status': 'deleted'})
|
||||
if self.metadata_encryption_key:
|
||||
uri = crypt.urlsafe_encrypt(self.metadata_encryption_key,
|
||||
loc['url'], 64)
|
||||
else:
|
||||
uri = loc['url']
|
||||
|
||||
ret.append((image['id'], loc['id'], uri))
|
||||
|
||||
if remove:
|
||||
db_api.get_api().image_location_delete(self.admin_context,
|
||||
image['id'],
|
||||
loc['id'],
|
||||
'deleted')
|
||||
self.registry.update_image(image['id'],
|
||||
{'status': 'deleted'})
|
||||
return ret
|
||||
|
||||
def get_all_locations(self):
|
||||
@ -415,28 +454,53 @@ class Scrubber(object):
|
||||
registry.configure_registry_admin_creds()
|
||||
self.registry = registry.get_registry_client(context.RequestContext())
|
||||
|
||||
# Here we create a request context with credentials to support
|
||||
# delayed delete when using multi-tenant backend storage
|
||||
admin_tenant = CONF.admin_tenant_name
|
||||
auth_token = self.registry.auth_tok
|
||||
self.admin_context = context.RequestContext(user=CONF.admin_user,
|
||||
tenant=admin_tenant,
|
||||
auth_tok=auth_token)
|
||||
|
||||
(self.file_queue, self.db_queue) = get_scrub_queues()
|
||||
|
||||
def _get_delete_jobs(self, queue, pop):
|
||||
try:
|
||||
if pop:
|
||||
image_id_uri_list = queue.pop_all_locations()
|
||||
records = queue.pop_all_locations()
|
||||
else:
|
||||
image_id_uri_list = queue.get_all_locations()
|
||||
except Exception:
|
||||
LOG.error(_LE("Can not %s scrub jobs from queue.") %
|
||||
'pop' if pop else 'get')
|
||||
return None
|
||||
records = queue.get_all_locations()
|
||||
except Exception as err:
|
||||
LOG.error(_LE("Can not %(op)s scrub jobs from queue: %(err)s") %
|
||||
{'op': 'pop' if pop else 'get',
|
||||
'err': utils.exception_to_str(err)})
|
||||
return {}
|
||||
|
||||
delete_jobs = {}
|
||||
for image_id, image_uri in image_id_uri_list:
|
||||
for image_id, loc_id, loc_uri in records:
|
||||
if image_id not in delete_jobs:
|
||||
delete_jobs[image_id] = []
|
||||
delete_jobs[image_id].append((image_id, image_uri))
|
||||
delete_jobs[image_id].append((image_id, loc_id, loc_uri))
|
||||
return delete_jobs
|
||||
|
||||
def _merge_delete_jobs(self, file_jobs, db_jobs):
|
||||
ret = {}
|
||||
for image_id, file_job_items in file_jobs.iteritems():
|
||||
ret[image_id] = file_job_items
|
||||
db_job_items = db_jobs.get(image_id, [])
|
||||
for db_item in db_job_items:
|
||||
if db_item not in file_job_items:
|
||||
ret[image_id].append(db_item)
|
||||
for image_id, db_job_items in db_jobs.iteritems():
|
||||
if image_id not in ret:
|
||||
ret[image_id] = db_job_items
|
||||
return ret
|
||||
|
||||
def run(self, pool, event=None):
|
||||
delete_jobs = self._get_delete_jobs(self.file_queue, True)
|
||||
file_jobs = self._get_delete_jobs(self.file_queue, True)
|
||||
db_jobs = self._get_delete_jobs(self.db_queue, False)
|
||||
delete_jobs = self._merge_delete_jobs(file_jobs, db_jobs)
|
||||
|
||||
if delete_jobs:
|
||||
for image_id, jobs in six.iteritems(delete_jobs):
|
||||
self._scrub_image(pool, image_id, jobs)
|
||||
@ -459,27 +523,21 @@ class Scrubber(object):
|
||||
not self.file_queue.has_image(image_id)):
|
||||
self.registry.update_image(image_id, {'status': 'deleted'})
|
||||
|
||||
def _delete_image_location_from_backend(self, image_id, uri):
|
||||
if CONF.metadata_encryption_key is not None:
|
||||
def _delete_image_location_from_backend(self, image_id, loc_id, uri):
|
||||
if CONF.metadata_encryption_key:
|
||||
uri = crypt.urlsafe_decrypt(CONF.metadata_encryption_key, uri)
|
||||
|
||||
try:
|
||||
LOG.debug("Deleting URI from image %(image_id)s." %
|
||||
{'image_id': image_id})
|
||||
|
||||
# Here we create a request context with credentials to support
|
||||
# delayed delete when using multi-tenant backend storage
|
||||
admin_tenant = CONF.admin_tenant_name
|
||||
auth_token = self.registry.auth_tok
|
||||
admin_context = context.RequestContext(user=CONF.admin_user,
|
||||
tenant=admin_tenant,
|
||||
auth_tok=auth_token)
|
||||
|
||||
self.store_api.delete_from_backend(admin_context, uri)
|
||||
LOG.debug("Deleting URI from image %s." % image_id)
|
||||
self.store_api.delete_from_backend(self.admin_context, uri)
|
||||
if loc_id != '-':
|
||||
db_api.get_api().image_location_delete(self.admin_context,
|
||||
image_id,
|
||||
int(loc_id),
|
||||
'deleted')
|
||||
LOG.info(_LI("Image %s has been deleted.") % image_id)
|
||||
except Exception:
|
||||
msg = (_LE("Failed to delete URI from image %(image_id)s") %
|
||||
{'image_id': image_id})
|
||||
LOG.error(msg)
|
||||
LOG.warn(_LW("Unable to delete URI from image %s.") % image_id)
|
||||
|
||||
def _read_cleanup_file(self, file_path):
|
||||
"""Reading cleanup to get latest cleanup timestamp.
|
||||
|
@ -527,6 +527,7 @@ class ScrubberDaemon(Server):
|
||||
self.daemon = daemon
|
||||
|
||||
self.image_dir = os.path.join(self.test_dir, "images")
|
||||
self.scrub_time = 5
|
||||
self.scrubber_datadir = os.path.join(self.test_dir,
|
||||
"scrubber")
|
||||
self.pid_file = os.path.join(self.test_dir, "scrubber.pid")
|
||||
@ -540,6 +541,11 @@ class ScrubberDaemon(Server):
|
||||
"2")
|
||||
self.metadata_encryption_key = "012345678901234567890123456789ab"
|
||||
self.lock_path = self.test_dir
|
||||
|
||||
default_sql_connection = 'sqlite:////%s/tests.sqlite' % self.test_dir
|
||||
self.sql_connection = os.environ.get('GLANCE_TEST_SQL_CONNECTION',
|
||||
default_sql_connection)
|
||||
|
||||
self.conf_base = """[DEFAULT]
|
||||
verbose = %(verbose)s
|
||||
debug = %(debug)s
|
||||
@ -547,6 +553,7 @@ filesystem_store_datadir=%(image_dir)s
|
||||
log_file = %(log_file)s
|
||||
daemon = %(daemon)s
|
||||
wakeup_time = 2
|
||||
scrub_time = %(scrub_time)s
|
||||
scrubber_datadir = %(scrubber_datadir)s
|
||||
registry_host = 127.0.0.1
|
||||
registry_port = %(registry_port)s
|
||||
@ -557,6 +564,8 @@ swift_store_key = %(swift_store_key)s
|
||||
swift_store_container = %(swift_store_container)s
|
||||
swift_store_auth_version = %(swift_store_auth_version)s
|
||||
lock_path = %(lock_path)s
|
||||
sql_connection = %(sql_connection)s
|
||||
sql_idle_timeout = 3600
|
||||
"""
|
||||
|
||||
def start(self, expect_exit=True, expected_exitcode=0, **kwargs):
|
||||
|
@ -51,7 +51,8 @@ class TestScrubber(functional.FunctionalTest):
|
||||
scrubs them
|
||||
"""
|
||||
self.cleanup()
|
||||
self.start_servers(delayed_delete=True, daemon=True)
|
||||
self.start_servers(delayed_delete=True, daemon=True,
|
||||
metadata_encryption_key='')
|
||||
|
||||
headers = {
|
||||
'x-image-meta-name': 'test_image',
|
||||
@ -89,7 +90,8 @@ class TestScrubber(functional.FunctionalTest):
|
||||
daemon mode
|
||||
"""
|
||||
self.cleanup()
|
||||
self.start_servers(delayed_delete=True, daemon=False)
|
||||
self.start_servers(delayed_delete=True, daemon=False,
|
||||
metadata_encryption_key='')
|
||||
|
||||
headers = {
|
||||
'x-image-meta-name': 'test_image',
|
||||
@ -146,6 +148,7 @@ class TestScrubber(functional.FunctionalTest):
|
||||
|
||||
self.cleanup()
|
||||
self.start_servers(delayed_delete=True, daemon=False,
|
||||
metadata_encryption_key='',
|
||||
default_store='swift', **swift_config)
|
||||
|
||||
# add an image
|
||||
|
@ -47,9 +47,7 @@ class TestScrubber(test_utils.BaseTestCase):
|
||||
super(TestScrubber, self).tearDown()
|
||||
|
||||
def _scrubber_cleanup_with_store_delete_exception(self, ex):
|
||||
fname = lambda: str(uuid.uuid4())
|
||||
|
||||
uri = 'file://some/path/%s' % (fname)
|
||||
uri = 'file://some/path/%s' % uuid.uuid4()
|
||||
id = 'helloworldid'
|
||||
scrub = scrubber.Scrubber(glance.store)
|
||||
scrub.registry = self.mox.CreateMockAnything()
|
||||
@ -61,7 +59,7 @@ class TestScrubber(test_utils.BaseTestCase):
|
||||
uri).AndRaise(ex)
|
||||
self.mox.ReplayAll()
|
||||
scrub._scrub_image(eventlet.greenpool.GreenPool(1),
|
||||
id, [(id, uri)])
|
||||
id, [(id, '-', uri)])
|
||||
self.mox.VerifyAll()
|
||||
|
||||
q_path = os.path.join(self.data_dir, id)
|
||||
|
Loading…
Reference in New Issue
Block a user