Make notify_service_capabilities cluster aware

We recently introduced a mechanism to notify ceilometer of updated
service capabilities, but the mechanism was meant to work with the old
implementation of Active/Active where jobs were we overused the host
field for the cluster.

This patch changes the way that mechanism work and makes it aware of the
new cluster mechanism.

To do this we change what data we store in service_states dictionary in
the host_manager.  Prior to this patch we were storing all the
individual hosts, but now we store only the data per backend, so we are
either storing the capabilities with the host or the cluster as the key.

Implements: blueprint cinder-volume-active-active-support
Change-Id: I8732ce321f4d0c41b914afa7acf48d6cf839d9be
This commit is contained in:
Gorka Eguileor 2016-12-20 18:28:40 +01:00
parent 349bda1774
commit e4b468aae6
8 changed files with 170 additions and 71 deletions

View File

@ -191,7 +191,7 @@ class SchedulerDependentManager(ThreadPoolManager):
self.scheduler_rpcapi.notify_service_capabilities(
context,
self.service_name,
self.host,
self.service_topic_queue,
self.last_capabilities)
except exception.ServiceTooOld as e:
# This means we have Newton's c-sch in the deployment, so

View File

@ -109,11 +109,13 @@ class Scheduler(object):
cluster_name,
timestamp)
def notify_service_capabilities(self, service_name, host, capabilities):
def notify_service_capabilities(self, service_name, backend,
capabilities, timestamp):
"""Notify capability update from a service node."""
self.host_manager.notify_service_capabilities(service_name,
host,
capabilities)
backend,
capabilities,
timestamp)
def host_passes_filters(self, context, backend, request_spec,
filter_properties):

View File

@ -374,7 +374,7 @@ class HostManager(object):
'reserved_percentage'])
def __init__(self):
self.service_states = {} # { <host>: {<service>: {cap k : v}}}
self.service_states = {} # { <host|cluster>: {<service>: {cap k : v}}}
self.backend_state_map = {}
self.filter_handler = filters.BackendFilterHandler('cinder.scheduler.'
'filters')
@ -384,7 +384,7 @@ class HostManager(object):
'cinder.scheduler.weights')
self.weight_classes = self.weight_handler.get_all_classes()
self._no_capabilities_hosts = set() # Services without capabilities
self._no_capabilities_backends = set() # Services without capabilities
self._update_backend_state_map(cinder_context.get_admin_context())
self.service_states_last_update = {}
@ -476,9 +476,16 @@ class HostManager(object):
capab_copy["timestamp"] = timestamp
# Set the default capabilities in case None is set.
capab_old = self.service_states.get(host, {"timestamp": 0})
backend = cluster_name or host
capab_old = self.service_states.get(backend, {"timestamp": 0})
capab_last_update = self.service_states_last_update.get(
host, {"timestamp": 0})
backend, {"timestamp": 0})
# Ignore older updates
if capab_old['timestamp'] and timestamp < capab_old['timestamp']:
LOG.info(_LI('Ignoring old capability report from %s.'),
backend)
return
# If the capabilites are not changed and the timestamp is older,
# record the capabilities.
@ -490,9 +497,9 @@ class HostManager(object):
(not capab_old.get("timestamp")) or
(not capab_last_update.get("timestamp")) or
(capab_last_update["timestamp"] < capab_old["timestamp"])):
self.service_states_last_update[host] = capab_old
self.service_states_last_update[backend] = capab_old
self.service_states[host] = capab_copy
self.service_states[backend] = capab_copy
cluster_msg = (('Cluster: %s - Host: ' % cluster_name) if cluster_name
else '')
@ -502,39 +509,40 @@ class HostManager(object):
'cap': capabilities,
'cluster': cluster_msg})
self._no_capabilities_hosts.discard(host)
self._no_capabilities_backends.discard(backend)
def notify_service_capabilities(self, service_name, host, capabilities):
def notify_service_capabilities(self, service_name, backend, capabilities,
timestamp):
"""Notify the ceilometer with updated volume stats"""
# TODO(geguileo): Make this work with Active/Active
if service_name != 'volume':
return
updated = []
capa_new = self.service_states.get(host, {})
timestamp = timeutils.utcnow()
capa_new = self.service_states.get(backend, {})
timestamp = timestamp or timeutils.utcnow()
# Compare the capabilities and timestamps to decide notifying
if not capa_new:
updated = self._get_updated_pools(capa_new, capabilities)
else:
if timestamp > self.service_states[host]["timestamp"]:
updated = self._get_updated_pools(self.service_states[host],
capabilities)
if timestamp > self.service_states[backend]["timestamp"]:
updated = self._get_updated_pools(
self.service_states[backend], capabilities)
if not updated:
updated = self._get_updated_pools(
self.service_states_last_update.get(host, {}),
self.service_states.get(host, {}))
self.service_states_last_update.get(backend, {}),
self.service_states.get(backend, {}))
if updated:
capab_copy = dict(capabilities)
capab_copy["timestamp"] = timestamp
# If capabilities changes, notify and record the capabilities.
self.service_states_last_update[host] = capab_copy
self.get_usage_and_notify(capabilities, updated, host, timestamp)
self.service_states_last_update[backend] = capab_copy
self.get_usage_and_notify(capabilities, updated, backend,
timestamp)
def has_all_capabilities(self):
return len(self._no_capabilities_hosts) == 0
return len(self._no_capabilities_backends) == 0
def _update_backend_state_map(self, context):
@ -546,20 +554,29 @@ class HostManager(object):
'frozen': False})
active_backends = set()
active_hosts = set()
no_capabilities_hosts = set()
no_capabilities_backends = set()
for service in volume_services.objects:
host = service.host
if not service.is_up:
LOG.warning(_LW("volume service is down. (host: %s)"), host)
continue
capabilities = self.service_states.get(host, None)
backend_key = service.service_topic_queue
# We only pay attention to the first up service of a cluster since
# they all refer to the same capabilities entry in service_states
if backend_key in active_backends:
active_hosts.add(host)
continue
# Capabilities may come from the cluster or the host if the service
# has just been converted to a cluster service.
capabilities = (self.service_states.get(service.cluster_name, None)
or self.service_states.get(service.host, None))
if capabilities is None:
no_capabilities_hosts.add(host)
no_capabilities_backends.add(backend_key)
continue
# Since the service could have been added or remove from a cluster
backend_key = service.service_topic_queue
backend_state = self.backend_state_map.get(backend_key, None)
if not backend_state:
backend_state = self.backend_state_cls(
@ -569,18 +586,12 @@ class HostManager(object):
service=dict(service))
self.backend_state_map[backend_key] = backend_state
# We may be receiving capability reports out of order from
# different volume services in a cluster, so we drop older updates
# and only update for newer capability reports.
if (backend_state.capabilities['timestamp'] <=
capabilities['timestamp']):
# update capabilities and attributes in backend_state
backend_state.update_from_volume_capability(
capabilities, service=dict(service))
# update capabilities and attributes in backend_state
backend_state.update_from_volume_capability(capabilities,
service=dict(service))
active_backends.add(backend_key)
active_hosts.add(host)
self._no_capabilities_hosts = no_capabilities_hosts
self._no_capabilities_backends = no_capabilities_backends
# remove non-active keys from backend_state_map
inactive_backend_keys = set(self.backend_state_map) - active_backends

View File

@ -110,13 +110,21 @@ class SchedulerManager(manager.CleanableManager, manager.Manager):
timestamp)
def notify_service_capabilities(self, context, service_name,
host, capabilities):
capabilities, host=None, backend=None,
timestamp=None):
"""Process a capability update from a service node."""
# TODO(geguileo): On v4 remove host field.
if capabilities is None:
capabilities = {}
# If we received the timestamp we have to deserialize it
elif timestamp:
timestamp = datetime.strptime(timestamp,
timeutils.PERFECT_TIME_FORMAT)
backend = backend or host
self.driver.notify_service_capabilities(service_name,
host,
capabilities)
backend,
capabilities,
timestamp)
def _wait_for_scheduler(self):
# NOTE(dulek): We're waiting for scheduler to announce that it's ready

View File

@ -67,9 +67,10 @@ class SchedulerAPI(rpc.RPCAPI):
update_service_capabilities and send the timestamp from the
capabilities.
3.4 - Adds work_cleanup and do_cleanup methods.
3.5 - Make notify_service_capabilities support A/A
"""
RPC_API_VERSION = '3.4'
RPC_API_VERSION = '3.5'
RPC_DEFAULT_VERSION = '3.0'
TOPIC = constants.SCHEDULER_TOPIC
BINARY = 'cinder-scheduler'
@ -170,6 +171,11 @@ class SchedulerAPI(rpc.RPCAPI):
cctxt = self._get_cctxt()
return cctxt.call(ctxt, 'get_pools', filters=filters)
@staticmethod
def prepare_timestamp(timestamp):
timestamp = timestamp or timeutils.utcnow()
return jsonutils.to_primitive(timestamp)
def update_service_capabilities(self, ctxt, service_name, host,
capabilities, cluster_name,
timestamp=None):
@ -180,9 +186,8 @@ class SchedulerAPI(rpc.RPCAPI):
# If server accepts timestamping the capabilities and the cluster name
if self.client.can_send_version(version):
# Serialize the timestamp
timestamp = timestamp or timeutils.utcnow()
msg_args.update(cluster_name=cluster_name,
timestamp=jsonutils.to_primitive(timestamp))
timestamp=self.prepare_timestamp(timestamp))
else:
version = '3.0'
@ -190,16 +195,24 @@ class SchedulerAPI(rpc.RPCAPI):
cctxt.cast(ctxt, 'update_service_capabilities', **msg_args)
def notify_service_capabilities(self, ctxt, service_name,
host, capabilities):
# TODO(geguileo): Make this work with Active/Active
cctxt = self._get_cctxt(version='3.1')
if not cctxt.can_send_version('3.1'):
backend, capabilities, timestamp=None):
version = '3.1'
if not self.client.can_send_version(version):
msg = _('notify_service_capabilities requires cinder-scheduler '
'RPC API version >= 3.1.')
raise exception.ServiceTooOld(msg)
cctxt.cast(ctxt, 'notify_service_capabilities',
service_name=service_name, host=host,
capabilities=capabilities)
parameters = {'service_name': service_name,
'capabilities': capabilities}
if self.client.can_send_version('3.5'):
version = '3.5'
parameters.update(backend=backend,
timestamp=self.prepare_timestamp(timestamp))
else:
parameters['host'] = backend
cctxt = self._get_cctxt(version=version)
cctxt.cast(ctxt, 'notify_service_capabilities', **parameters)
def work_cleanup(self, ctxt, cleanup_request):
"""Generate individual service cleanup requests from user request."""

View File

@ -17,6 +17,7 @@ Tests For HostManager
"""
from datetime import datetime
from datetime import timedelta
import mock
from oslo_serialization import jsonutils
@ -102,13 +103,21 @@ class HostManagerTestCase(test.TestCase):
_mock_get_updated_pools.return_value = []
timestamp = jsonutils.to_primitive(datetime.utcnow())
host1_volume_capabs = dict(free_capacity_gb=4321, timestamp=timestamp)
host1_old_volume_capabs = dict(free_capacity_gb=1, timestamp=timestamp)
host2_volume_capabs = dict(free_capacity_gb=5432)
host3_volume_capabs = dict(free_capacity_gb=6543)
service_name = 'volume'
# The host manager receives a deserialized timestamp
timestamp = datetime.strptime(timestamp, timeutils.PERFECT_TIME_FORMAT)
self.host_manager.update_service_capabilities(service_name, 'host1',
host1_volume_capabs,
None, timestamp)
# It'll ignore older updates
old_timestamp = timestamp - timedelta(hours=1)
self.host_manager.update_service_capabilities(service_name, 'host1',
host1_old_volume_capabs,
None, old_timestamp)
self.host_manager.update_service_capabilities(service_name, 'host2',
host2_volume_capabs,
None, None)
@ -162,7 +171,7 @@ class HostManagerTestCase(test.TestCase):
# S0: notify_service_capabilities()
self.host_manager.notify_service_capabilities(service_name, 'host1',
capab1)
capab1, None)
self.assertDictEqual(dict(dict(timestamp=31337), **capab1),
self.host_manager.service_states['host1'])
self.assertDictEqual(
@ -236,7 +245,7 @@ class HostManagerTestCase(test.TestCase):
# S1: notify_service_capabilities()
self.host_manager_1.notify_service_capabilities(service_name, 'host1',
capab1)
capab1, None)
self.assertDictEqual(dict(dict(timestamp=31341), **capab1),
self.host_manager_1.service_states['host1'])
@ -285,7 +294,7 @@ class HostManagerTestCase(test.TestCase):
# S0: notify_service_capabilities()
self.host_manager.notify_service_capabilities(service_name, 'host1',
capab1)
capab1, None)
self.assertDictEqual(
dict(dict(timestamp=31338), **capab1),
self.host_manager.service_states_last_update['host1'])
@ -371,7 +380,7 @@ class HostManagerTestCase(test.TestCase):
# S1: notify_service_capabilities()
self.host_manager_1.notify_service_capabilities(service_name, 'host1',
capab2)
capab2, None)
self.assertDictEqual(dict(dict(timestamp=31345), **capab1),
self.host_manager_1.service_states['host1'])
@ -445,7 +454,7 @@ class HostManagerTestCase(test.TestCase):
#
# S0: notify_service_capabilities()
self.host_manager.notify_service_capabilities(service_name, 'host1',
capab2)
capab2, None)
self.assertDictEqual(
dict(dict(timestamp=31349), **capab2),
self.host_manager.service_states_last_update['host1'])
@ -563,18 +572,6 @@ class HostManagerTestCase(test.TestCase):
self.assertEqual(1, len(res))
self.assertEqual(dates[1], res[0]['capabilities']['timestamp'])
# Now we simulate old service that doesn't send timestamp
del mocked_service_states['host1']['timestamp']
with mock.patch.dict(self.host_manager.service_states,
mocked_service_states):
self.host_manager.update_service_capabilities(service_name,
'host1',
host_volume_capabs,
None, None)
res = self.host_manager.get_pools(context)
self.assertEqual(1, len(res))
self.assertEqual(dates[2], res[0]['capabilities']['timestamp'])
@mock.patch('cinder.objects.Service.is_up', True)
def test_get_all_backend_states_cluster(self):
"""Test get_all_backend_states when we have clustered services.

View File

@ -17,6 +17,8 @@
Unit Tests for cinder.scheduler.rpcapi
"""
from datetime import datetime
import ddt
import mock
@ -38,17 +40,24 @@ class SchedulerRpcAPITestCase(test.TestCase):
self.volume_id = fake_constants.VOLUME_ID
def _test_scheduler_api(self, method, rpc_method,
fanout=False, **kwargs):
fanout=False, ignore_for_method=None,
ignore_for_rpc=None, **kwargs):
ctxt = self.context
rpcapi = scheduler_rpcapi.SchedulerAPI()
expected_retval = 'foo' if rpc_method == 'call' else None
ignore_for_method = ignore_for_method or []
ignore_for_rpc = ignore_for_rpc or []
target = {
"fanout": fanout,
"version": kwargs.pop('version', rpcapi.RPC_API_VERSION)
}
expected_msg = kwargs.copy()
expected_msg = {k: v for k, v in kwargs.items()
if k not in ignore_for_rpc}
method_args = {k: v for k, v in kwargs.items()
if k not in ignore_for_method}
self.fake_args = None
self.fake_kwargs = None
@ -69,7 +78,7 @@ class SchedulerRpcAPITestCase(test.TestCase):
with mock.patch.object(rpcapi.client, rpc_method) as mock_method:
mock_method.side_effect = _fake_rpc_method
retval = getattr(rpcapi, method)(ctxt, **kwargs)
retval = getattr(rpcapi, method)(ctxt, **method_args)
self.assertEqual(expected_retval, retval)
expected_args = [ctxt, method, expected_msg]
for arg, expected_arg in zip(self.fake_args, expected_args):
@ -107,14 +116,35 @@ class SchedulerRpcAPITestCase(test.TestCase):
create_worker_mock.assert_called_once()
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
def test_notify_service_capabilities(self, can_send_version_mock):
def test_notify_service_capabilities_backend(self, can_send_version_mock):
"""Test sending new backend by RPC instead of old host parameter."""
capabilities = {'host': 'fake_host',
'total': '10.01', }
with mock.patch('oslo_utils.timeutils.utcnow',
return_value=datetime(1970, 1, 1)):
self._test_scheduler_api('notify_service_capabilities',
rpc_method='cast',
service_name='fake_name',
backend='fake_host',
capabilities=capabilities,
timestamp='1970-01-01T00:00:00.000000',
ignore_for_method=['timestamp'],
version='3.5')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
side_effect=(True, False))
def test_notify_service_capabilities_host(self, can_send_version_mock):
"""Test sending old host RPC parameter instead of backend."""
capabilities = {'host': 'fake_host',
'total': '10.01', }
self._test_scheduler_api('notify_service_capabilities',
rpc_method='cast',
service_name='fake_name',
host='fake_host',
backend='fake_host',
capabilities=capabilities,
ignore_for_method=['host'],
ignore_for_rpc=['backend'],
version='3.1')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
@ -127,7 +157,10 @@ class SchedulerRpcAPITestCase(test.TestCase):
'notify_service_capabilities',
rpc_method='cast',
service_name='fake_name',
backend='fake_host',
host='fake_host',
ignore_for_method=['host'],
ignore_for_rpc=['backend'],
capabilities=capabilities,
version='3.1')

View File

@ -18,6 +18,7 @@ Tests For Scheduler
"""
import collections
from datetime import datetime
import mock
from oslo_config import cfg
@ -123,6 +124,40 @@ class SchedulerManagerTestCase(test.TestCase):
_mock_update_cap.assert_called_once_with(service, host, capabilities,
None, None)
@mock.patch('cinder.scheduler.driver.Scheduler.'
'notify_service_capabilities')
def test_notify_service_capabilities_no_timestamp(self, _mock_notify_cap):
"""Test old interface that receives host."""
service = 'volume'
host = 'fake_host'
capabilities = {'fake_capability': 'fake_value'}
self.manager.notify_service_capabilities(self.context,
service_name=service,
host=host,
capabilities=capabilities)
_mock_notify_cap.assert_called_once_with(service, host, capabilities,
None)
@mock.patch('cinder.scheduler.driver.Scheduler.'
'notify_service_capabilities')
def test_notify_service_capabilities_timestamp(self, _mock_notify_cap):
"""Test new interface that receives backend and timestamp."""
service = 'volume'
backend = 'fake_cluster'
capabilities = {'fake_capability': 'fake_value'}
timestamp = '1970-01-01T00:00:00.000000'
self.manager.notify_service_capabilities(self.context,
service_name=service,
backend=backend,
capabilities=capabilities,
timestamp=timestamp)
_mock_notify_cap.assert_called_once_with(service, backend,
capabilities,
datetime(1970, 1, 1))
@mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume')
@mock.patch('cinder.message.api.API.create')
@mock.patch('cinder.db.volume_update')