scheduler should ignore earlier time service capabilities
if rabbitmq is too much pressure or blockage.scheduler will not received service capabilities, but once the message queue(rabbitmq) returns to normal, scheduler will received many service capabilities, these service capabilities are acquired by manila share at different times, so the timestamp of service capabilities shoud added at share manage layer(before rpc), but not scheduler layer(after rpc), once scheduler get an newer service capabilities, there is no need to update an earlier service capabilities. Closes-Bug: #1908963 Change-Id: I6ce99ed4451c5d02cb4446861fa59e55a94951a5
This commit is contained in:
parent
475eeafd8d
commit
70bb650e7f
@ -87,11 +87,13 @@ class Scheduler(object):
|
||||
"""Get the normalized set of capabilities for the services."""
|
||||
return self.host_manager.get_service_capabilities()
|
||||
|
||||
def update_service_capabilities(self, service_name, host, capabilities):
|
||||
def update_service_capabilities(self, service_name, host,
|
||||
capabilities, timestamp):
|
||||
"""Process a capability update from a service node."""
|
||||
self.host_manager.update_service_capabilities(service_name,
|
||||
host,
|
||||
capabilities)
|
||||
capabilities,
|
||||
timestamp)
|
||||
|
||||
def hosts_up(self, context, topic):
|
||||
"""Return the list of hosts that have a running service for topic."""
|
||||
|
@ -567,7 +567,8 @@ class HostManager(object):
|
||||
hosts,
|
||||
weight_properties)
|
||||
|
||||
def update_service_capabilities(self, service_name, host, capabilities):
|
||||
def update_service_capabilities(self, service_name, host,
|
||||
capabilities, timestamp):
|
||||
"""Update the per-service capabilities based on this notification."""
|
||||
if service_name not in ('share',):
|
||||
LOG.debug('Ignoring %(service_name)s service update '
|
||||
@ -577,7 +578,15 @@ class HostManager(object):
|
||||
|
||||
# Copy the capabilities, so we don't modify the original dict
|
||||
capability_copy = dict(capabilities)
|
||||
capability_copy["timestamp"] = timeutils.utcnow() # Reported time
|
||||
timestamp = timestamp or timeutils.utcnow()
|
||||
capability_copy["timestamp"] = timestamp # Reported time
|
||||
|
||||
capab_old = self.service_states.get(host, {"timestamp": 0})
|
||||
# Ignore older updates
|
||||
if capab_old['timestamp'] and timestamp < capab_old['timestamp']:
|
||||
LOG.info('Ignoring old capability report from %s.', host)
|
||||
return
|
||||
|
||||
self.service_states[host] = capability_copy
|
||||
|
||||
LOG.debug("Received %(service_name)s service update from "
|
||||
|
@ -19,11 +19,14 @@
|
||||
Scheduler Service
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_service import periodic_task
|
||||
from oslo_utils import excutils
|
||||
from oslo_utils import importutils
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from manila.common import constants
|
||||
from manila import context
|
||||
@ -96,13 +99,18 @@ class SchedulerManager(manager.Manager):
|
||||
return self.driver.get_service_capabilities()
|
||||
|
||||
def update_service_capabilities(self, context, service_name=None,
|
||||
host=None, capabilities=None, **kwargs):
|
||||
host=None, capabilities=None,
|
||||
timestamp=None, **kwargs):
|
||||
"""Process a capability update from a service node."""
|
||||
if capabilities is None:
|
||||
capabilities = {}
|
||||
elif timestamp:
|
||||
timestamp = datetime.strptime(timestamp,
|
||||
timeutils.PERFECT_TIME_FORMAT)
|
||||
self.driver.update_service_capabilities(service_name,
|
||||
host,
|
||||
capabilities)
|
||||
capabilities,
|
||||
timestamp)
|
||||
|
||||
def create_share_instance(self, context, request_spec=None,
|
||||
filter_properties=None):
|
||||
|
@ -19,6 +19,7 @@ Client side of the scheduler manager RPC API.
|
||||
from oslo_config import cfg
|
||||
import oslo_messaging as messaging
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from manila import rpc
|
||||
|
||||
@ -64,11 +65,13 @@ class SchedulerAPI(object):
|
||||
service_name, host,
|
||||
capabilities):
|
||||
call_context = self.client.prepare(fanout=True, version='1.0')
|
||||
timestamp = jsonutils.to_primitive(timeutils.utcnow())
|
||||
call_context.cast(context,
|
||||
'update_service_capabilities',
|
||||
service_name=service_name,
|
||||
host=host,
|
||||
capabilities=capabilities)
|
||||
capabilities=capabilities,
|
||||
timestamp=timestamp)
|
||||
|
||||
def get_pools(self, context, filters=None, cached=False):
|
||||
call_context = self.client.prepare(version='1.9')
|
||||
|
@ -47,12 +47,14 @@ class SchedulerTestCase(test.TestCase):
|
||||
service_name = 'fake_service'
|
||||
host = 'fake_host'
|
||||
capabilities = {'fake_capability': 'fake_value'}
|
||||
timestamp = 1111
|
||||
with mock.patch.object(self.driver.host_manager,
|
||||
'update_service_capabilities', mock.Mock()):
|
||||
self.driver.update_service_capabilities(
|
||||
service_name, host, capabilities)
|
||||
service_name, host, capabilities, timestamp)
|
||||
(self.driver.host_manager.update_service_capabilities.
|
||||
assert_called_once_with(service_name, host, capabilities))
|
||||
assert_called_once_with(service_name, host,
|
||||
capabilities, timestamp))
|
||||
|
||||
def test_hosts_up(self):
|
||||
service1 = {'host': 'host1'}
|
||||
|
@ -112,21 +112,14 @@ class HostManagerTestCase(test.TestCase):
|
||||
host2_share_capabs = dict(free_capacity_gb=5432, timestamp=1)
|
||||
host3_share_capabs = dict(free_capacity_gb=6543, timestamp=1)
|
||||
service_name = 'share'
|
||||
with mock.patch.object(timeutils, 'utcnow',
|
||||
mock.Mock(return_value=31337)):
|
||||
self.host_manager.update_service_capabilities(
|
||||
service_name, 'host1', host1_share_capabs)
|
||||
timeutils.utcnow.assert_called_once_with()
|
||||
with mock.patch.object(timeutils, 'utcnow',
|
||||
mock.Mock(return_value=31338)):
|
||||
self.host_manager.update_service_capabilities(
|
||||
service_name, 'host2', host2_share_capabs)
|
||||
timeutils.utcnow.assert_called_once_with()
|
||||
with mock.patch.object(timeutils, 'utcnow',
|
||||
mock.Mock(return_value=31339)):
|
||||
self.host_manager.update_service_capabilities(
|
||||
service_name, 'host3', host3_share_capabs)
|
||||
timeutils.utcnow.assert_called_once_with()
|
||||
self.host_manager.update_service_capabilities(
|
||||
service_name, 'host1', host1_share_capabs, 31337)
|
||||
|
||||
self.host_manager.update_service_capabilities(
|
||||
service_name, 'host2', host2_share_capabs, 31338)
|
||||
|
||||
self.host_manager.update_service_capabilities(
|
||||
service_name, 'host3', host3_share_capabs, 31339)
|
||||
|
||||
# Make sure dictionary isn't re-assigned
|
||||
self.assertEqual(service_states, self.host_manager.service_states)
|
||||
|
@ -121,7 +121,7 @@ class SchedulerManagerTestCase(test.TestCase):
|
||||
self.manager.update_service_capabilities(
|
||||
self.context, service_name=service_name, host=host)
|
||||
(self.manager.driver.update_service_capabilities.
|
||||
assert_called_once_with(service_name, host, {}))
|
||||
assert_called_once_with(service_name, host, {}, None))
|
||||
with mock.patch.object(self.manager.driver,
|
||||
'update_service_capabilities', mock.Mock()):
|
||||
capabilities = {'fake_capability': 'fake_value'}
|
||||
@ -129,7 +129,8 @@ class SchedulerManagerTestCase(test.TestCase):
|
||||
self.context, service_name=service_name, host=host,
|
||||
capabilities=capabilities)
|
||||
(self.manager.driver.update_service_capabilities.
|
||||
assert_called_once_with(service_name, host, capabilities))
|
||||
assert_called_once_with(service_name, host,
|
||||
capabilities, None))
|
||||
|
||||
@mock.patch.object(db, 'share_update', mock.Mock())
|
||||
@mock.patch('manila.message.api.API.create')
|
||||
|
@ -0,0 +1,6 @@
|
||||
---
|
||||
fixes:
|
||||
- |
|
||||
Share scheduler will ignore earlier time service capabilities. See
|
||||
`bug 1908963 <https://bugs.launchpad.net/manila/+bug/1908963>`_ for
|
||||
more details.
|
Loading…
Reference in New Issue
Block a user