initial locking implementation

This adds both thread level and process level locking to the cache index.
It also adds a "pending" state to cached object collections, allowing a
single instance of the CacheManager to do long-running downloads without
holding a lock on the index the entire time.

Change-Id: I6a0fcf0941f75fa5d78cc06eb78af6531d712162
This commit is contained in:
Ian McLeod 2013-10-30 09:25:28 -05:00
parent 17f95d3c7d
commit 65e06f4603
3 changed files with 150 additions and 38 deletions

View File

@ -16,11 +16,15 @@
import logging import logging
import json import json
import os
import os.path import os.path
import pycurl import pycurl
import guestfs import guestfs
import fcntl
import threading
import time
import StackEnvironment
from Singleton import Singleton from Singleton import Singleton
from StackEnvironment import StackEnvironment
class CacheManager(Singleton): class CacheManager(Singleton):
@ -40,11 +44,11 @@ class CacheManager(Singleton):
# TODO: Sane handling of a pending cache item # TODO: Sane handling of a pending cache item
# TODO: Configurable # TODO: Configurable
CACHE_ROOT = "/var/lib/novaimagebuilder/" CACHE_ROOT = "/var/lib/novaimagebuilder/"
#INDEX_LOCK = lock() INDEX_THREAD_LOCK = threading.Lock()
INDEX_FILE = "_cache_index" INDEX_FILE = "_cache_index"
def _singleton_init(self): def _singleton_init(self):
self.env = StackEnvironment() self.env = StackEnvironment.StackEnvironment()
self.log = logging.getLogger('%s.%s' % (__name__, self.__class__.__name__)) self.log = logging.getLogger('%s.%s' % (__name__, self.__class__.__name__))
self.index_filename = self.CACHE_ROOT + self.INDEX_FILE self.index_filename = self.CACHE_ROOT + self.INDEX_FILE
if not os.path.isfile(self.index_filename): if not os.path.isfile(self.index_filename):
@ -55,6 +59,8 @@ class CacheManager(Singleton):
index_file.close() index_file.close()
# This should be None except when we are actively working on it and hold a lock # This should be None except when we are actively working on it and hold a lock
self.index = None self.index = None
self.index_file = None
self.locked = False
def lock_and_get_index(self): def lock_and_get_index(self):
""" """
@ -64,30 +70,46 @@ class CacheManager(Singleton):
write_index_and_unlock() or unlock_index() depending upon whether or not the write_index_and_unlock() or unlock_index() depending upon whether or not the
index has been modified. index has been modified.
""" """
# We acquire a thread lock under all circumstances
#self.INDEX_LOCK.acquire() # This is the safest approach and should be relatively harmless if we are used
index_file = open(self.index_filename) # as a module in a non-threaded Python program
self.index = json.load(index_file) self.INDEX_THREAD_LOCK.acquire()
index_file.close() # atomic create if not present
fd = os.open(self.index_filename, os.O_RDWR | os.O_CREAT)
# blocking
fcntl.flock(fd, fcntl.LOCK_EX)
self.index_file = os.fdopen(fd, "r+")
index = self.index_file.read()
if len(index) == 0:
# Empty - possibly because we created it earlier - create empty dict
self.index = { }
else:
self.index = json.loads(index)
def write_index_and_unlock(self): def write_index_and_unlock(self):
""" """
Write contents of self.index back to the persistent file and then unlock it Write contents of self.index back to the persistent file and then unlock it
""" """
self.index_file.seek(0)
index_file = open(self.index_filename, 'w') self.index_file.truncate()
json.dump(self.index , index_file) json.dump(self.index , self.index_file)
index_file.close() # TODO: Double-check that this is safe
self.index_file.flush()
fcntl.flock(self.index_file, fcntl.LOCK_UN)
self.index_file.close()
self.index = None self.index = None
#self.INDEX_LOCK.release() self.INDEX_THREAD_LOCK.release()
def unlock_index(self): def unlock_index(self):
""" """
Release the cache index lock without updating the persistent file Release the cache index lock without updating the persistent file
""" """
self.index = None self.index = None
#self.INDEX_LOCK.release() fcntl.flock(self.index_file, fcntl.LOCK_UN)
self.index_file.close()
self.index_file = None
self.INDEX_THREAD_LOCK.release()
# INDEX looks like # INDEX looks like
# #
@ -95,6 +117,11 @@ class CacheManager(Singleton):
# "install_iso_kernel": { "local" # "install_iso_kernel": { "local"
def _get_index_value(self, os_ver_arch, name, location): def _get_index_value(self, os_ver_arch, name, location):
"""
Utility function to retrieve the location of the named object for the given OS version and architecture.
Only use this if your thread has obtained the thread-global lock by using the
lock_and_get_index() function above
"""
if self.index is None: if self.index is None:
raise Exception("Attempt made to read index values while a locked index is not present") raise Exception("Attempt made to read index values while a locked index is not present")
@ -114,6 +141,11 @@ class CacheManager(Singleton):
return self.index[os_ver_arch][name][location] return self.index[os_ver_arch][name][location]
def _set_index_value(self, os_ver_arch, name, location, value): def _set_index_value(self, os_ver_arch, name, location, value):
"""
Utility function to set the location of the named object for the given OS version and architecture.
Only use this if your thread has obtained the thread-global lock by using the
lock_and_get_index() function above
"""
if self.index is None: if self.index is None:
raise Exception("Attempt made to read index values while a locked index is not present") raise Exception("Attempt made to read index values while a locked index is not present")
@ -124,9 +156,8 @@ class CacheManager(Singleton):
self.index[os_ver_arch][name] = {} self.index[os_ver_arch][name] = {}
# If the specific location is not specified, assume value is the entire dict # If the specific location is not specified, assume value is the entire dict
# or a string indicating the object is pending
if not location: if not location:
if type(value) is not dict:
raise Exception("When setting a value without a location, the value must be a dict")
self.index[os_ver_arch][name] = value self.index[os_ver_arch][name] = value
return return
@ -150,19 +181,42 @@ class CacheManager(Singleton):
glance: Glance object UUID glance: Glance object UUID
cinder: Cinder object UUID cinder: Cinder object UUID
""" """
# TODO: Gracefully deal with the situation where, for example, we are asked to save_local
# and find that the object is already cached but only exists in glance and/or cinder
# TODO: Allow for local-only caching
self.lock_and_get_index() pending_countdown = 360
existing_cache = self._get_index_value(os_plugin.os_ver_arch(), object_type, None) while True:
if existing_cache: self.lock_and_get_index()
self.log.debug("Found object in cache") existing_cache = self._get_index_value(os_plugin.os_ver_arch(), object_type, None)
self.unlock_index() if existing_cache == None:
return existing_cache # We are the first - mark as pending and then start to retreive
# TODO: special case when object is ISO and sub-artifacts are not cached self._set_index_value(os_plugin.os_ver_arch(), object_type, None, "pending")
self.write_index_and_unlock()
break
if isinstance(existing_cache, dict):
self.log.debug("Found object in cache")
self.unlock_index()
return existing_cache
# TODO: special case when object is ISO and sub-artifacts are not cached
if existing_cache == "pending":
# Another thread or process is currently obtaining this object
# poll every 10 seconds until we get a dict, then return it
# TODO: A graceful event based solution
self.unlock_index()
if pending_countdown == 360:
self.log.debug("Object is being retrieved in another thread or process - Waiting")
pending_countdown -= 1
if pending_countdown == 0:
raise Exception("Waited one hour on pending cache fill for version (%s) - object (%s)- giving up" %
( os_plugin.os_ver_arch(), object_type ) )
sleep(10)
continue
# The object is not yet in the cache # We should never get here
# TODO: Some mechanism to indicate that retrieval is in progress raise Exception("Got unexpected non-string, non-dict, non-None value when reading cache")
# additional calls to get the same object should block until this is done
self.unlock_index() # If we have gotten here the object is not yet in the cache
self.log.debug("Object not in cache") self.log.debug("Object not in cache")
# TODO: If not save_local and the plugin doesn't need the iso, direct download in glance # TODO: If not save_local and the plugin doesn't need the iso, direct download in glance
@ -245,4 +299,4 @@ class CacheManager(Singleton):
c.perform() c.perform()
c.close() c.close()
finally: finally:
os.close(fd) os.close(fd)

View File

@ -41,7 +41,7 @@ class MockStackEnvironment(Singleton):
IMAGE_STATUS_LIST = ('QUEUED', 'SAVING', 'ACTIVE', 'KILLED', 'DELETED', 'PENDING_DELETE') IMAGE_STATUS_LIST = ('QUEUED', 'SAVING', 'ACTIVE', 'KILLED', 'DELETED', 'PENDING_DELETE')
def _singleton_init(self): def _singleton_init(self):
super(MockStackEnvironment, self)._singleton_init() super(StackEnvironment, self)._singleton_init()
self.log = logging.getLogger('%s.%s' % (__name__, self.__class__.__name__)) self.log = logging.getLogger('%s.%s' % (__name__, self.__class__.__name__))
# Attributes controlling Mock behavior # Attributes controlling Mock behavior
self.cinder = False self.cinder = False
@ -110,4 +110,4 @@ class MockStackEnvironment(Singleton):
def launch_instance(self, root_disk=None, install_iso=None, secondary_iso=None, floppy=None, aki=None, ari=None, def launch_instance(self, root_disk=None, install_iso=None, secondary_iso=None, floppy=None, aki=None, ari=None,
cmdline=None, userdata=None): cmdline=None, userdata=None):
return MockNovaInstance(object(), self) return MockNovaInstance(object(), self)

View File

@ -16,15 +16,24 @@
# limitations under the License. # limitations under the License.
import sys import sys
sys.path.append("../novaimagebuilder") sys.path.append("../")
from MockStackEnvironment import MockStackEnvironment as StackEnvironment import MockStackEnvironment
from novaimagebuilder.CacheManager import CacheManager sys.modules['StackEnvironment'] = sys.modules.pop('MockStackEnvironment')
sys.modules['StackEnvironment'].StackEnvironment = sys.modules['StackEnvironment'].MockStackEnvironment
import StackEnvironment
import novaimagebuilder.CacheManager
novaimagebuilder.CacheManager.StackEnvironment = StackEnvironment
import logging import logging
import threading
import multiprocessing
logging.basicConfig(level=logging.DEBUG, logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(name)s thread(%(threadName)s) Message: %(message)s') format='%(asctime)s %(levelname)s %(name)s thread(%(threadName)s) Message: %(message)s')
se = StackEnvironment.StackEnvironment()
class MockOSPlugin(object): class MockOSPlugin(object):
def __init__(self, os_ver_arch = "fedora19-x86_64", wants_iso = True ): def __init__(self, os_ver_arch = "fedora19-x86_64", wants_iso = True ):
@ -40,8 +49,57 @@ class MockOSPlugin(object):
print "---- the following should do a glance and cinder upload" print "---- the following should do a glance and cinder upload"
mosp = MockOSPlugin(os_ver_arch = "fedora18-x86_64", wants_iso = False) mosp = MockOSPlugin(os_ver_arch = "fedora18-x86_64", wants_iso = False)
mse = StackEnvironment("username","password","tenant","auth_url") #mse = StackEnvironment("username","password","tenant","auth_url")
cm = CacheManager(mse) mse = StackEnvironment.StackEnvironment()
cm = novaimagebuilder.CacheManager.CacheManager()
cm.retrieve_and_cache_object("install-iso", mosp, "http://repos.fedorapeople.org/repos/aeolus/imagefactory/testing/repos/rhel/imagefactory.repo", # Create our bogus entry in the cache index and set it to 0
True) cm.lock_and_get_index()
cm._set_index_value("testobjOS", "testobjname", "testloc", "0")
cm.write_index_and_unlock()
class UpdateThread():
def __call__(self):
#print "about to run 20 updates"
for i in range(0,20):
cm.lock_and_get_index()
#print "--------- three lines below"
#print "In the lock - 1 next line should always show value"
value = cm._get_index_value("testobjOS", "testobjname", "testloc")
#print "In the lock - 2 value %s" % (value)
newvalue = int(value) + 1
cm._set_index_value("testobjOS", "testobjname", "testloc", str(newvalue))
#print "In the lock - 3 did update - leaving"
#print "--------- three lines above"
cm.write_index_and_unlock()
class MultiThreadProcess():
def __call__(self):
#print "Here I run 20 threads"
threads = [ ]
for i in range (0,20):
thread = threading.Thread(group=None, target=UpdateThread())
threads.append(thread)
thread.run()
# Fork 20 copies of myself
processes = [ ]
for i in range(0,20):
proc = multiprocessing.Process(group=None, target=MultiThreadProcess())
processes.append(proc)
proc.start()
for proc in processes:
proc.join()
cm.lock_and_get_index()
value = cm._get_index_value("testobjOS", "testobjname", "testloc")
cm.unlock_index()
print "Final value should be 8000 and is %s" % (value)
# Have each process create 20 threads
# Have each
#cm.retrieve_and_cache_object("install-iso2", mosp, "http://repos.fedorapeople.org/repos/aeolus/imagefactory/testing/repos/rhel/imagefactory.repo",
# True)