Don't use taskmanagers in builder
ProviderManager is a TaskManager, and TaskManagers are intended to serialize API requests to a single cloud from multiple threads. Currently each worker in the builder has its own set of ProviderManagers. That means that we are performing cloud API calls in parallel. That's probably okay since we perform very few of them, mostly image uploads and deletes. And in fact, we probably want to avoid blocking on image uploads. However, there is a thread associated with each of these ProviderManagers, and even though they are idle, in aggregate they add up to a significant CPU cost. This makes the use of a TaskManager by a ProviderManager optional and sets the builder not to use it in order to avoid spawning these useless threads. Change-Id: Iaf6498c34a38c384b85d3ab568c43dab0bcdd3d5
This commit is contained in:
parent
6dcb96310f
commit
fe153656df
@ -464,7 +464,8 @@ class CleanupWorker(BaseWorker):
|
|||||||
'''
|
'''
|
||||||
new_config = nodepool_config.loadConfig(self._config_path)
|
new_config = nodepool_config.loadConfig(self._config_path)
|
||||||
self._checkForZooKeeperChanges(new_config)
|
self._checkForZooKeeperChanges(new_config)
|
||||||
provider_manager.ProviderManager.reconfigure(self._config, new_config)
|
provider_manager.ProviderManager.reconfigure(self._config, new_config,
|
||||||
|
use_taskmanager=False)
|
||||||
self._config = new_config
|
self._config = new_config
|
||||||
|
|
||||||
self._cleanup()
|
self._cleanup()
|
||||||
@ -947,7 +948,8 @@ class UploadWorker(BaseWorker):
|
|||||||
'''
|
'''
|
||||||
new_config = nodepool_config.loadConfig(self._config_path)
|
new_config = nodepool_config.loadConfig(self._config_path)
|
||||||
self._checkForZooKeeperChanges(new_config)
|
self._checkForZooKeeperChanges(new_config)
|
||||||
provider_manager.ProviderManager.reconfigure(self._config, new_config)
|
provider_manager.ProviderManager.reconfigure(self._config, new_config,
|
||||||
|
use_taskmanager=False)
|
||||||
self._config = new_config
|
self._config = new_config
|
||||||
|
|
||||||
self._checkForProviderUploads()
|
self._checkForProviderUploads()
|
||||||
|
@ -45,18 +45,18 @@ class NotFound(Exception):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def get_provider_manager(provider):
|
def get_provider_manager(provider, use_taskmanager):
|
||||||
if (provider.cloud_config.get_auth_args().get('auth_url') == 'fake'):
|
if (provider.cloud_config.get_auth_args().get('auth_url') == 'fake'):
|
||||||
return FakeProviderManager(provider)
|
return FakeProviderManager(provider, use_taskmanager)
|
||||||
else:
|
else:
|
||||||
return ProviderManager(provider)
|
return ProviderManager(provider, use_taskmanager)
|
||||||
|
|
||||||
|
|
||||||
class ProviderManager(TaskManager):
|
class ProviderManager(object):
|
||||||
log = logging.getLogger("nodepool.ProviderManager")
|
log = logging.getLogger("nodepool.ProviderManager")
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def reconfigure(old_config, new_config):
|
def reconfigure(old_config, new_config, use_taskmanager=True):
|
||||||
stop_managers = []
|
stop_managers = []
|
||||||
for p in new_config.providers.values():
|
for p in new_config.providers.values():
|
||||||
oldmanager = None
|
oldmanager = None
|
||||||
@ -71,7 +71,7 @@ class ProviderManager(TaskManager):
|
|||||||
ProviderManager.log.debug("Creating new ProviderManager object"
|
ProviderManager.log.debug("Creating new ProviderManager object"
|
||||||
" for %s" % p.name)
|
" for %s" % p.name)
|
||||||
new_config.provider_managers[p.name] = \
|
new_config.provider_managers[p.name] = \
|
||||||
get_provider_manager(p)
|
get_provider_manager(p, use_taskmanager)
|
||||||
new_config.provider_managers[p.name].start()
|
new_config.provider_managers[p.name].start()
|
||||||
|
|
||||||
for stop_manager in stop_managers:
|
for stop_manager in stop_managers:
|
||||||
@ -83,14 +83,28 @@ class ProviderManager(TaskManager):
|
|||||||
m.stop()
|
m.stop()
|
||||||
m.join()
|
m.join()
|
||||||
|
|
||||||
def __init__(self, provider):
|
def __init__(self, provider, use_taskmanager):
|
||||||
super(ProviderManager, self).__init__(None, provider.name,
|
|
||||||
provider.rate)
|
|
||||||
self.provider = provider
|
self.provider = provider
|
||||||
self.resetClient()
|
|
||||||
self._images = {}
|
self._images = {}
|
||||||
self._networks = {}
|
self._networks = {}
|
||||||
self.__flavors = {}
|
self.__flavors = {}
|
||||||
|
self._use_taskmanager = use_taskmanager
|
||||||
|
self._taskmanager = None
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
if self._use_taskmanager:
|
||||||
|
self._taskmanager = TaskManager(None, self.provider.name,
|
||||||
|
self.provider.rate)
|
||||||
|
self._taskmanager.start()
|
||||||
|
self.resetClient()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
if self._taskmanager:
|
||||||
|
self._taskmanager.stop()
|
||||||
|
|
||||||
|
def join(self):
|
||||||
|
if self._taskmanager:
|
||||||
|
self._taskmanager.join()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def _flavors(self):
|
def _flavors(self):
|
||||||
@ -99,19 +113,19 @@ class ProviderManager(TaskManager):
|
|||||||
return self.__flavors
|
return self.__flavors
|
||||||
|
|
||||||
def _getClient(self):
|
def _getClient(self):
|
||||||
|
if self._use_taskmanager:
|
||||||
|
manager = self._taskmanager
|
||||||
|
else:
|
||||||
|
manager = None
|
||||||
return shade.OpenStackCloud(
|
return shade.OpenStackCloud(
|
||||||
cloud_config=self.provider.cloud_config,
|
cloud_config=self.provider.cloud_config,
|
||||||
manager=self,
|
manager=manager,
|
||||||
**self.provider.cloud_config.config)
|
**self.provider.cloud_config.config)
|
||||||
|
|
||||||
def runTask(self, task):
|
|
||||||
# Run the given task in the TaskManager passed to shade. It turns
|
|
||||||
# out that this provider manager is the TaskManager we pass, so
|
|
||||||
# this is a way of running each cloud operation in its own thread
|
|
||||||
task.run(self._client)
|
|
||||||
|
|
||||||
def resetClient(self):
|
def resetClient(self):
|
||||||
self._client = self._getClient()
|
self._client = self._getClient()
|
||||||
|
if self._use_taskmanager:
|
||||||
|
self._taskmanager.setClient(self._client)
|
||||||
|
|
||||||
def _getFlavors(self):
|
def _getFlavors(self):
|
||||||
flavors = self.listFlavors()
|
flavors = self.listFlavors()
|
||||||
@ -347,9 +361,9 @@ class ProviderManager(TaskManager):
|
|||||||
|
|
||||||
|
|
||||||
class FakeProviderManager(ProviderManager):
|
class FakeProviderManager(ProviderManager):
|
||||||
def __init__(self, provider):
|
def __init__(self, provider, use_taskmanager):
|
||||||
self.__client = fakeprovider.FakeOpenStackCloud()
|
self.__client = fakeprovider.FakeOpenStackCloud()
|
||||||
super(FakeProviderManager, self).__init__(provider)
|
super(FakeProviderManager, self).__init__(provider, use_taskmanager)
|
||||||
|
|
||||||
def _getClient(self):
|
def _getClient(self):
|
||||||
return self.__client
|
return self.__client
|
||||||
|
@ -74,6 +74,9 @@ class TaskManager(threading.Thread):
|
|||||||
self._client = None
|
self._client = None
|
||||||
self.statsd = stats.get_client()
|
self.statsd = stats.get_client()
|
||||||
|
|
||||||
|
def setClient(self, client):
|
||||||
|
self._client = client
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._running = False
|
self._running = False
|
||||||
self.queue.put(None)
|
self.queue.put(None)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user