diff --git a/novaimagebuilder/CacheManager.py b/novaimagebuilder/CacheManager.py index ef915ff..6b0ce34 100644 --- a/novaimagebuilder/CacheManager.py +++ b/novaimagebuilder/CacheManager.py @@ -16,11 +16,15 @@ import logging import json +import os import os.path import pycurl import guestfs +import fcntl +import threading +import time +import StackEnvironment from Singleton import Singleton -from StackEnvironment import StackEnvironment class CacheManager(Singleton): @@ -40,11 +44,11 @@ class CacheManager(Singleton): # TODO: Sane handling of a pending cache item # TODO: Configurable CACHE_ROOT = "/var/lib/novaimagebuilder/" - #INDEX_LOCK = lock() + INDEX_THREAD_LOCK = threading.Lock() INDEX_FILE = "_cache_index" def _singleton_init(self): - self.env = StackEnvironment() + self.env = StackEnvironment.StackEnvironment() self.log = logging.getLogger('%s.%s' % (__name__, self.__class__.__name__)) self.index_filename = self.CACHE_ROOT + self.INDEX_FILE if not os.path.isfile(self.index_filename): @@ -55,6 +59,8 @@ class CacheManager(Singleton): index_file.close() # This should be None except when we are actively working on it and hold a lock self.index = None + self.index_file = None + self.locked = False def lock_and_get_index(self): """ @@ -64,30 +70,46 @@ class CacheManager(Singleton): write_index_and_unlock() or unlock_index() depending upon whether or not the index has been modified. """ - - #self.INDEX_LOCK.acquire() - index_file = open(self.index_filename) - self.index = json.load(index_file) - index_file.close() + # We acquire a thread lock under all circumstances + # This is the safest approach and should be relatively harmless if we are used + # as a module in a non-threaded Python program + self.INDEX_THREAD_LOCK.acquire() + # atomic create if not present + fd = os.open(self.index_filename, os.O_RDWR | os.O_CREAT) + # blocking + fcntl.flock(fd, fcntl.LOCK_EX) + self.index_file = os.fdopen(fd, "r+") + index = self.index_file.read() + if len(index) == 0: + # Empty - possibly because we created it earlier - create empty dict + self.index = { } + else: + self.index = json.loads(index) def write_index_and_unlock(self): """ Write contents of self.index back to the persistent file and then unlock it """ - - index_file = open(self.index_filename, 'w') - json.dump(self.index , index_file) - index_file.close() + self.index_file.seek(0) + self.index_file.truncate() + json.dump(self.index , self.index_file) + # TODO: Double-check that this is safe + self.index_file.flush() + fcntl.flock(self.index_file, fcntl.LOCK_UN) + self.index_file.close() self.index = None - #self.INDEX_LOCK.release() + self.INDEX_THREAD_LOCK.release() def unlock_index(self): """ Release the cache index lock without updating the persistent file """ - self.index = None - #self.INDEX_LOCK.release() + fcntl.flock(self.index_file, fcntl.LOCK_UN) + self.index_file.close() + self.index_file = None + self.INDEX_THREAD_LOCK.release() + # INDEX looks like # @@ -95,6 +117,11 @@ class CacheManager(Singleton): # "install_iso_kernel": { "local" def _get_index_value(self, os_ver_arch, name, location): + """ + Utility function to retrieve the location of the named object for the given OS version and architecture. + Only use this if your thread has obtained the thread-global lock by using the + lock_and_get_index() function above + """ if self.index is None: raise Exception("Attempt made to read index values while a locked index is not present") @@ -114,6 +141,11 @@ class CacheManager(Singleton): return self.index[os_ver_arch][name][location] def _set_index_value(self, os_ver_arch, name, location, value): + """ + Utility function to set the location of the named object for the given OS version and architecture. + Only use this if your thread has obtained the thread-global lock by using the + lock_and_get_index() function above + """ if self.index is None: raise Exception("Attempt made to read index values while a locked index is not present") @@ -124,9 +156,8 @@ class CacheManager(Singleton): self.index[os_ver_arch][name] = {} # If the specific location is not specified, assume value is the entire dict + # or a string indicating the object is pending if not location: - if type(value) is not dict: - raise Exception("When setting a value without a location, the value must be a dict") self.index[os_ver_arch][name] = value return @@ -150,19 +181,42 @@ class CacheManager(Singleton): glance: Glance object UUID cinder: Cinder object UUID """ + # TODO: Gracefully deal with the situation where, for example, we are asked to save_local + # and find that the object is already cached but only exists in glance and/or cinder + # TODO: Allow for local-only caching - self.lock_and_get_index() - existing_cache = self._get_index_value(os_plugin.os_ver_arch(), object_type, None) - if existing_cache: - self.log.debug("Found object in cache") - self.unlock_index() - return existing_cache - # TODO: special case when object is ISO and sub-artifacts are not cached + pending_countdown = 360 + while True: + self.lock_and_get_index() + existing_cache = self._get_index_value(os_plugin.os_ver_arch(), object_type, None) + if existing_cache == None: + # We are the first - mark as pending and then start to retreive + self._set_index_value(os_plugin.os_ver_arch(), object_type, None, "pending") + self.write_index_and_unlock() + break + if isinstance(existing_cache, dict): + self.log.debug("Found object in cache") + self.unlock_index() + return existing_cache + # TODO: special case when object is ISO and sub-artifacts are not cached + if existing_cache == "pending": + # Another thread or process is currently obtaining this object + # poll every 10 seconds until we get a dict, then return it + # TODO: A graceful event based solution + self.unlock_index() + if pending_countdown == 360: + self.log.debug("Object is being retrieved in another thread or process - Waiting") + pending_countdown -= 1 + if pending_countdown == 0: + raise Exception("Waited one hour on pending cache fill for version (%s) - object (%s)- giving up" % + ( os_plugin.os_ver_arch(), object_type ) ) + sleep(10) + continue - # The object is not yet in the cache - # TODO: Some mechanism to indicate that retrieval is in progress - # additional calls to get the same object should block until this is done - self.unlock_index() + # We should never get here + raise Exception("Got unexpected non-string, non-dict, non-None value when reading cache") + + # If we have gotten here the object is not yet in the cache self.log.debug("Object not in cache") # TODO: If not save_local and the plugin doesn't need the iso, direct download in glance @@ -245,4 +299,4 @@ class CacheManager(Singleton): c.perform() c.close() finally: - os.close(fd) \ No newline at end of file + os.close(fd) diff --git a/tests/MockStackEnvironment.py b/tests/MockStackEnvironment.py index 268261b..419fd53 100644 --- a/tests/MockStackEnvironment.py +++ b/tests/MockStackEnvironment.py @@ -41,7 +41,7 @@ class MockStackEnvironment(Singleton): IMAGE_STATUS_LIST = ('QUEUED', 'SAVING', 'ACTIVE', 'KILLED', 'DELETED', 'PENDING_DELETE') def _singleton_init(self): - super(MockStackEnvironment, self)._singleton_init() + super(StackEnvironment, self)._singleton_init() self.log = logging.getLogger('%s.%s' % (__name__, self.__class__.__name__)) # Attributes controlling Mock behavior self.cinder = False @@ -110,4 +110,4 @@ class MockStackEnvironment(Singleton): def launch_instance(self, root_disk=None, install_iso=None, secondary_iso=None, floppy=None, aki=None, ari=None, cmdline=None, userdata=None): - return MockNovaInstance(object(), self) \ No newline at end of file + return MockNovaInstance(object(), self) diff --git a/tests/testcache.py b/tests/testcache.py index 743e505..2e27b79 100755 --- a/tests/testcache.py +++ b/tests/testcache.py @@ -16,15 +16,24 @@ # limitations under the License. import sys -sys.path.append("../novaimagebuilder") -from MockStackEnvironment import MockStackEnvironment as StackEnvironment -from novaimagebuilder.CacheManager import CacheManager +sys.path.append("../") +import MockStackEnvironment +sys.modules['StackEnvironment'] = sys.modules.pop('MockStackEnvironment') +sys.modules['StackEnvironment'].StackEnvironment = sys.modules['StackEnvironment'].MockStackEnvironment +import StackEnvironment +import novaimagebuilder.CacheManager +novaimagebuilder.CacheManager.StackEnvironment = StackEnvironment import logging +import threading +import multiprocessing + logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(name)s thread(%(threadName)s) Message: %(message)s') +se = StackEnvironment.StackEnvironment() + class MockOSPlugin(object): def __init__(self, os_ver_arch = "fedora19-x86_64", wants_iso = True ): @@ -40,8 +49,57 @@ class MockOSPlugin(object): print "---- the following should do a glance and cinder upload" mosp = MockOSPlugin(os_ver_arch = "fedora18-x86_64", wants_iso = False) -mse = StackEnvironment("username","password","tenant","auth_url") -cm = CacheManager(mse) +#mse = StackEnvironment("username","password","tenant","auth_url") +mse = StackEnvironment.StackEnvironment() +cm = novaimagebuilder.CacheManager.CacheManager() -cm.retrieve_and_cache_object("install-iso", mosp, "http://repos.fedorapeople.org/repos/aeolus/imagefactory/testing/repos/rhel/imagefactory.repo", - True) +# Create our bogus entry in the cache index and set it to 0 +cm.lock_and_get_index() +cm._set_index_value("testobjOS", "testobjname", "testloc", "0") +cm.write_index_and_unlock() + +class UpdateThread(): + def __call__(self): + #print "about to run 20 updates" + for i in range(0,20): + cm.lock_and_get_index() + #print "--------- three lines below" + #print "In the lock - 1 next line should always show value" + value = cm._get_index_value("testobjOS", "testobjname", "testloc") + #print "In the lock - 2 value %s" % (value) + newvalue = int(value) + 1 + cm._set_index_value("testobjOS", "testobjname", "testloc", str(newvalue)) + #print "In the lock - 3 did update - leaving" + #print "--------- three lines above" + cm.write_index_and_unlock() + +class MultiThreadProcess(): + def __call__(self): + #print "Here I run 20 threads" + threads = [ ] + for i in range (0,20): + thread = threading.Thread(group=None, target=UpdateThread()) + threads.append(thread) + thread.run() + +# Fork 20 copies of myself +processes = [ ] +for i in range(0,20): + proc = multiprocessing.Process(group=None, target=MultiThreadProcess()) + processes.append(proc) + proc.start() +for proc in processes: + proc.join() + +cm.lock_and_get_index() +value = cm._get_index_value("testobjOS", "testobjname", "testloc") +cm.unlock_index() +print "Final value should be 8000 and is %s" % (value) + +# Have each process create 20 threads + +# Have each + + +#cm.retrieve_and_cache_object("install-iso2", mosp, "http://repos.fedorapeople.org/repos/aeolus/imagefactory/testing/repos/rhel/imagefactory.repo", +# True)