ensure resources are hashable
when we partition workload, we need resources to be hashable. Change-Id: I7549038618b668f08b7fc4113ed579b5c9a3c6ee Closes-Bug: #1696199
This commit is contained in:
parent
d89b6c3b4c
commit
03b3e14341
@ -29,6 +29,7 @@ from oslo_log import log
|
||||
import oslo_messaging
|
||||
from oslo_utils import fnmatch
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
from six import moves
|
||||
from six.moves.urllib import parse as urlparse
|
||||
from stevedore import extension
|
||||
@ -106,10 +107,10 @@ class Resources(object):
|
||||
if self._resources:
|
||||
static_resources_group = self.agent_manager.construct_group_id(
|
||||
utils.hash_of_set(self._resources))
|
||||
return list(filter(
|
||||
return [v for v in self._resources if
|
||||
self.agent_manager.hashrings[
|
||||
static_resources_group].belongs_to_self, self._resources
|
||||
)) + source_discovery
|
||||
static_resources_group].belongs_to_self(
|
||||
six.text_type(v))] + source_discovery
|
||||
|
||||
return source_discovery
|
||||
|
||||
@ -497,11 +498,10 @@ class AgentManager(cotyledon.Service):
|
||||
discovered = discoverer.discover(self, param)
|
||||
|
||||
if self.partition_coordinator:
|
||||
discovered = list(filter(
|
||||
self.hashrings[
|
||||
discovered = [
|
||||
v for v in discovered if self.hashrings[
|
||||
self.construct_group_id(discoverer.group_id)
|
||||
].belongs_to_self, discovered
|
||||
))
|
||||
].belongs_to_self(six.text_type(v))]
|
||||
|
||||
resources.extend(discovered)
|
||||
if discovery_cache is not None:
|
||||
|
@ -563,7 +563,10 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
self.PollsterAnother.resources)
|
||||
|
||||
def test_discovery_partitioning(self):
|
||||
discovered_resources = ['discovered_1', 'discovered_2']
|
||||
self.Pollster.discovery = 'testdiscovery'
|
||||
self.mgr.discoveries = self.create_discoveries()
|
||||
self.Discovery.resources = discovered_resources
|
||||
self.polling_cfg['sources'][0]['discovery'] = [
|
||||
'testdiscovery', 'testdiscoveryanother',
|
||||
'testdiscoverynonexistent', 'testdiscoveryexception']
|
||||
@ -571,9 +574,23 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
self.setup_polling()
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.mgr.interval_task(polling_tasks.get(60))
|
||||
self.mgr.hashrings.__getitem__.assert_called_with(
|
||||
'central-compute-another_group')
|
||||
self.hashring.belongs_to_self.assert_not_called()
|
||||
self.hashring.belongs_to_self.assert_has_calls(
|
||||
[mock.call('discovered_1'), mock.call('discovered_2')])
|
||||
|
||||
def test_discovery_partitioning_unhashable(self):
|
||||
discovered_resources = [{'unhashable': True}]
|
||||
self.Pollster.discovery = 'testdiscovery'
|
||||
self.mgr.discoveries = self.create_discoveries()
|
||||
self.Discovery.resources = discovered_resources
|
||||
self.polling_cfg['sources'][0]['discovery'] = [
|
||||
'testdiscovery', 'testdiscoveryanother',
|
||||
'testdiscoverynonexistent', 'testdiscoveryexception']
|
||||
self.polling_cfg['sources'][0]['resources'] = []
|
||||
self.setup_polling()
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.mgr.interval_task(polling_tasks.get(60))
|
||||
self.hashring.belongs_to_self.assert_has_calls(
|
||||
[mock.call('{\'unhashable\': True}')])
|
||||
|
||||
def test_static_resources_partitioning(self):
|
||||
static_resources = ['static_1', 'static_2']
|
||||
|
Loading…
x
Reference in New Issue
Block a user