Merge "RPC Callback rolling upgrades reporting, and integration"

This commit is contained in:
Jenkins 2016-03-01 18:54:43 +00:00 committed by Gerrit Code Review
commit b380b15d4c
14 changed files with 441 additions and 79 deletions

View File

@ -164,10 +164,9 @@ Cached subset will be re-evaluated (to cut down the version sets as agents
upgrade) after configured TTL.
As a fast path to update this cache on all neutron-servers when upgraded agents
come up (or old agents revive after a long timeout or even a downgrade), we could
introduce a fanout queue consumed by servers, to additionally notify from one
agent to all neutron-servers about the "versions of interest" in the agent just
comming up.
come up (or old agents revive after a long timeout or even a downgrade) the
server registering the new status update will notify the other servers about
the new consumer resource versions via cast.
All notifications for all calculated version sets must be sent, as non-upgraded
agents would otherwise not receive them.

View File

@ -12,16 +12,41 @@
import collections
import copy
import pprint
import time
from neutron_lib import constants
from oslo_log import log as logging
from oslo_utils import importutils
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.callbacks import exceptions
LOG = logging.getLogger(__name__)
VERSIONS_TTL = 60
# This is the list of agents that started using this rpc push/pull mechanism
# for versioned objects, but at that time stable/liberty, they were not
# reporting versions, so we need to assume they need QosPolicy 1.0
#TODO(mangelajo): Remove this logic in Newton, since those agents will be
# already reporting From N to O
NON_REPORTING_AGENT_TYPES = [constants.AGENT_TYPE_OVS,
constants.AGENT_TYPE_NIC_SWITCH]
# NOTE(mangelajo): if we import this globally we end up with a (very
# long) circular dependency, this can be fixed if we
# stop importing all exposed classes in
# neutron.api.rpc.callbacks.resources and provide
# a decorator to expose classes
def _import_resources():
return importutils.import_module('neutron.api.rpc.callbacks.resources')
AgentConsumer = collections.namedtuple('AgentConsumer', ['agent_type',
'host'])
AgentConsumer.__repr__ = lambda self: '%s@%s' % self
class ResourceConsumerTracker(object):
"""Class to be provided back by consumer_versions_callback.
@ -59,6 +84,7 @@ class ResourceConsumerTracker(object):
self._needs_recalculation = False
def _get_local_resource_versions(self):
resources = _import_resources()
local_resource_versions = collections.defaultdict(set)
for resource_type, version in (
resources.LOCAL_RESOURCE_VERSIONS.items()):
@ -67,13 +93,17 @@ class ResourceConsumerTracker(object):
# TODO(mangelajo): add locking with _recalculate_versions if we ever
# move out of green threads.
def _set_version(self, consumer_id, resource_type, version):
def _set_version(self, consumer, resource_type, version):
"""Set or update a consumer resource type version."""
self._versions[resource_type].add(version)
prev_version = (
self._versions_by_consumer[consumer_id].get(resource_type, None))
self._versions_by_consumer[consumer_id][resource_type] = version
if prev_version and (prev_version != version):
consumer_versions = self._versions_by_consumer[consumer]
prev_version = consumer_versions.get(resource_type, None)
if version:
consumer_versions[resource_type] = version
else:
consumer_versions.pop(resource_type, None)
if prev_version != version:
# If a version got updated/changed in a consumer, we need to
# recalculate the main dictionary of versions based on the
# new _versions_by_consumer.
@ -82,18 +112,46 @@ class ResourceConsumerTracker(object):
self._needs_recalculation = True
LOG.debug("Version for resource type %(resource_type)s changed "
"%(prev_version)s to %(version)s on "
"consumer %(consumer_id)s",
"consumer %(consumer)s",
{'resource_type': resource_type,
'version': version,
'prev_version': prev_version,
'consumer_id': consumer_id})
'consumer': consumer})
def set_versions(self, consumer_id, versions):
def set_versions(self, consumer, versions):
"""Set or update an specific consumer resource types."""
for resource_type, resource_version in versions.items():
self._set_version(consumer_id, resource_type,
self._set_version(consumer, resource_type,
resource_version)
if versions:
self._cleanup_removed_versions(consumer, versions)
else:
self._handle_no_set_versions(consumer)
def _cleanup_removed_versions(self, consumer, versions):
"""Check if any version report has been removed, and cleanup."""
prev_resource_types = set(
self._versions_by_consumer[consumer].keys())
cur_resource_types = set(versions.keys())
removed_resource_types = prev_resource_types - cur_resource_types
for resource_type in removed_resource_types:
self._set_version(consumer, resource_type, None)
def _handle_no_set_versions(self, consumer):
"""Handle consumers reporting no versions."""
if isinstance(consumer, AgentConsumer):
if consumer.agent_type in NON_REPORTING_AGENT_TYPES:
resources = _import_resources()
self._versions_by_consumer[consumer] = {
resources.QOS_POLICY: '1.0'}
self._versions[resources.QOS_POLICY].add('1.0')
return
if self._versions_by_consumer[consumer]:
self._needs_recalculation = True
self._versions_by_consumer[consumer] = {}
def get_resource_versions(self, resource_type):
"""Fetch the versions necessary to notify all consumers."""
if self._needs_recalculation:
@ -102,6 +160,18 @@ class ResourceConsumerTracker(object):
return copy.copy(self._versions[resource_type])
def report(self):
"""Output debug information about the consumer versions."""
#TODO(mangelajo): report only when pushed_versions differ from
# previous reports.
format = lambda versions: pprint.pformat(dict(versions), indent=4)
debug_dict = {'pushed_versions': format(self._versions),
'consumer_versions': format(self._versions_by_consumer)}
LOG.debug('Tracked resource versions report:\n'
'pushed versions:\n%(pushed_versions)s\n\n'
'consumer versions:\n%(consumer_versions)s\n', debug_dict)
# TODO(mangelajo): Add locking if we ever move out of greenthreads.
def _recalculate_versions(self):
"""Recalculate the _versions set.
@ -130,8 +200,9 @@ class CachedResourceConsumerTracker(object):
new_tracker = ResourceConsumerTracker()
self._consumer_versions_callback(new_tracker)
self._versions = new_tracker
self._versions.report()
else:
pass # TODO(mangelajo): throw exception if callback not provided
raise exceptions.VersionsCallbackNotFound()
def _check_expiration(self):
if time.time() > self._expires_at:
@ -145,11 +216,22 @@ class CachedResourceConsumerTracker(object):
self._check_expiration()
return self._versions.get_resource_versions(resource_type)
def update_versions(self, consumer_id, resource_versions):
self._versions.set_versions(consumer_id, resource_versions)
def update_versions(self, consumer, resource_versions):
self._versions.set_versions(consumer, resource_versions)
def report(self):
self._check_expiration()
self._versions.report()
_cached_version_tracker = None
cached_version_tracker = CachedResourceConsumerTracker()
#NOTE(ajo): add locking if we ever stop using greenthreads
def _get_cached_tracker():
global _cached_version_tracker
if not _cached_version_tracker:
_cached_version_tracker = CachedResourceConsumerTracker()
return _cached_version_tracker
def set_consumer_versions_callback(callback):
@ -160,30 +242,23 @@ def set_consumer_versions_callback(callback):
The callback will receive a ResourceConsumerTracker object,
and the ResourceConsumerTracker methods must be used to provide
each consumer_id versions. Consumer ids can be obtained from this
each consumer versions. Consumer ids can be obtained from this
module via the next functions:
* get_agent_consumer_id
"""
cached_version_tracker.set_consumer_versions_callback(callback)
_get_cached_tracker().set_consumer_versions_callback(callback)
def get_resource_versions(resource_type):
"""Return the set of versions expected by the consumers of a resource."""
return cached_version_tracker.get_resource_versions(resource_type)
return _get_cached_tracker().get_resource_versions(resource_type)
def update_versions(consumer_id, resource_versions):
def update_versions(consumer, resource_versions):
"""Update the resources' versions for a consumer id."""
cached_version_tracker.set_versions(consumer_id, resource_versions)
_get_cached_tracker().update_versions(consumer, resource_versions)
def get_agent_consumer_id(agent_type, agent_host):
"""Return a consumer id string for an agent type + host tuple.
The logic behind this function, is that, eventually we could have
consumers of RPC callbacks which are not agents, thus we want
to totally collate all the different consumer types and provide
unique consumer ids.
"""
return "%(agent_type)s@%(agent_host)s" % {'agent_type': agent_type,
'agent_host': agent_host}
def report():
"""Report resource versions in debug logs."""
_get_cached_tracker().report()

View File

@ -120,6 +120,53 @@ class ResourcesPullRpcCallback(object):
return obj.obj_to_primitive(target_version=version)
class ResourcesPushToServersRpcApi(object):
"""Publisher-side RPC (stub) for plugin-to-plugin fanout interaction.
This class implements the client side of an rpc interface. The receiver
side can be found below: ResourcesPushToServerRpcCallback. For more
information on this RPC interface, see doc/source/devref/rpc_callbacks.rst.
"""
def __init__(self):
target = oslo_messaging.Target(
topic=topics.SERVER_RESOURCE_VERSIONS, version='1.0',
namespace=constants.RPC_NAMESPACE_RESOURCES)
self.client = n_rpc.get_client(target)
@log_helpers.log_method_call
def report_agent_resource_versions(self, context, agent_type, agent_host,
version_map):
"""Fan out all the agent resource versions to other servers."""
cctxt = self.client.prepare(fanout=True)
cctxt.cast(context, 'report_agent_resource_versions',
agent_type=agent_type,
agent_host=agent_host,
version_map=version_map)
class ResourcesPushToServerRpcCallback(object):
"""Receiver-side RPC (implementation) for plugin-to-plugin interaction.
This class implements the receiver side of an rpc interface.
The client side can be found above: ResourcePushToServerRpcApi. For more
information on this RPC interface, see doc/source/devref/rpc_callbacks.rst.
"""
# History
# 1.0 Initial version
target = oslo_messaging.Target(
version='1.0', namespace=constants.RPC_NAMESPACE_RESOURCES)
@log_helpers.log_method_call
def report_agent_resource_versions(self, context, agent_type, agent_host,
version_map):
consumer_id = version_manager.AgentConsumer(agent_type=agent_type,
host=agent_host)
version_manager.update_versions(consumer_id, version_map)
class ResourcesPushRpcApi(object):
"""Plugin-side RPC for plugin-to-agents interaction.

View File

@ -27,6 +27,7 @@ UPDATE = 'update'
AGENT = 'q-agent-notifier'
PLUGIN = 'q-plugin'
SERVER_RESOURCE_VERSIONS = 'q-server-resource-versions'
L3PLUGIN = 'q-l3-plugin'
REPORTS = 'q-reports-plugin'
DHCP = 'q-dhcp-notifer'

View File

@ -21,6 +21,7 @@ from oslo_db import exception as db_exc
from oslo_log import log as logging
import oslo_messaging
from oslo_serialization import jsonutils
from oslo_utils import importutils
from oslo_utils import timeutils
import six
import sqlalchemy as sa
@ -28,6 +29,7 @@ from sqlalchemy.orm import exc
from sqlalchemy import sql
from neutron._i18n import _, _LE, _LI, _LW
from neutron.api.rpc.callbacks import version_manager
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron import context
@ -68,6 +70,11 @@ AGENT_OPTS = [
]
cfg.CONF.register_opts(AGENT_OPTS)
# this is the ratio from agent_down_time to the time we use to consider
# the agents down for considering their resource versions in the
# version_manager callback
DOWNTIME_VERSIONS_RATIO = 2
class Agent(model_base.BASEV2, model_base.HasId):
"""Represents agents running in neutron deployments."""
@ -98,6 +105,10 @@ class Agent(model_base.BASEV2, model_base.HasId):
description = sa.Column(sa.String(attributes.DESCRIPTION_MAX_LEN))
# configurations: a json dict string, I think 4095 is enough
configurations = sa.Column(sa.String(4095), nullable=False)
# resource_versions: json dict, 8191 allows for ~256 resource versions
# assuming ~32byte length "'name': 'ver',"
# the whole row limit is 65535 bytes in mysql
resource_versions = sa.Column(sa.String(8191))
# load - number of resources hosted by the agent
load = sa.Column(sa.Integer, server_default='0', nullable=False)
@ -162,6 +173,11 @@ class AgentAvailabilityZoneMixin(az_ext.AvailabilityZonePluginBase):
class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
"""Mixin class to add agent extension to db_base_plugin_v2."""
def __init__(self, *args, **kwargs):
version_manager.set_consumer_versions_callback(
self._get_agents_resource_versions)
super(AgentDbMixin, self).__init__(*args, **kwargs)
def _get_agent(self, context, id):
try:
agent = self._get_by_id(context, Agent, id)
@ -186,18 +202,28 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
{'agent_type': agent_type, 'agent_id': agent.id})
return agent
@classmethod
def is_agent_down(cls, heart_beat_time):
@staticmethod
def is_agent_down(heart_beat_time):
return timeutils.is_older_than(heart_beat_time,
cfg.CONF.agent_down_time)
@staticmethod
def is_agent_considered_for_versions(agent_dict):
return not timeutils.is_older_than(agent_dict['heartbeat_timestamp'],
cfg.CONF.agent_down_time *
DOWNTIME_VERSIONS_RATIO)
def get_configuration_dict(self, agent_db):
return self._get_dict(agent_db, 'configurations')
def _get_dict(self, agent_db, dict_name):
try:
conf = jsonutils.loads(agent_db.configurations)
conf = jsonutils.loads(getattr(agent_db, dict_name))
except Exception:
msg = _LW('Configuration for agent %(agent_type)s on host %(host)s'
' is invalid.')
LOG.warn(msg, {'agent_type': agent_db.agent_type,
msg = _LW('Dictionary %(dict_name)s for agent %(agent_type)s on '
'host %(host)s is invalid.')
LOG.warn(msg, {'dict_name': dict_name,
'agent_type': agent_db.agent_type,
'host': agent_db.host})
conf = {}
return conf
@ -217,9 +243,9 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
ext_agent.RESOURCE_NAME + 's')
res = dict((k, agent[k]) for k in attr
if k not in ['alive', 'configurations'])
res['alive'] = not AgentDbMixin.is_agent_down(
res['heartbeat_timestamp'])
res['configurations'] = self.get_configuration_dict(agent)
res['alive'] = not self.is_agent_down(res['heartbeat_timestamp'])
res['configurations'] = self._get_dict(agent, 'configurations')
res['resource_versions'] = self._get_dict(agent, 'resource_versions')
res['availability_zone'] = agent['availability_zone']
return self._fields(res, fields)
@ -245,7 +271,6 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
filters=filters, fields=fields)
alive = filters and filters.get('alive', None)
if alive:
# alive filter will be a list
alive = attributes.convert_to_boolean(alive[0])
agents = [agent for agent in agents if agent['alive'] == alive]
return agents
@ -311,6 +336,8 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
res['availability_zone'] = agent_state['availability_zone']
configurations_dict = agent_state.get('configurations', {})
res['configurations'] = jsonutils.dumps(configurations_dict)
resource_versions_dict = agent_state.get('resource_versions', {})
res['resource_versions'] = jsonutils.dumps(resource_versions_dict)
res['load'] = self._get_agent_load(agent_state)
current_time = timeutils.utcnow()
try:
@ -340,7 +367,6 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
def create_or_update_agent(self, context, agent):
"""Create or update agent according to report."""
try:
return self._create_or_update_agent(context, agent)
except db_exc.DBDuplicateEntry:
@ -358,6 +384,29 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
# agent entry, which will be updated multiple times
return self._create_or_update_agent(context, agent)
def _get_agents_considered_for_versions(self):
up_agents = self.get_agents(context.get_admin_context(),
filters={'admin_state_up': [True]})
return filter(self.is_agent_considered_for_versions, up_agents)
def _get_agents_resource_versions(self, tracker):
"""Get the known agent resource versions and update the tracker.
Receives a version_manager.ResourceConsumerTracker instance and it's
expected to look up in to the database and update every agent resource
versions.
This method is called from version_manager when the cached information
has passed TTL.
"""
for agent in self._get_agents_considered_for_versions():
resource_versions = agent.get('resource_versions', {})
consumer = version_manager.AgentConsumer(
agent_type=agent['agent_type'], host=agent['host'])
LOG.debug("Update consumer %(consumer)s versions to: "
"%(versions)s", {'consumer': consumer,
'versions': resource_versions})
tracker.set_versions(consumer, resource_versions)
class AgentExtRpcCallback(object):
"""Processes the rpc report in plugin implementations.
@ -378,6 +427,12 @@ class AgentExtRpcCallback(object):
def __init__(self, plugin=None):
super(AgentExtRpcCallback, self).__init__()
self.plugin = plugin
#TODO(ajo): fix the resources circular dependency issue by dynamically
# registering object types in the RPC callbacks api
resources_rpc = importutils.import_module(
'neutron.api.rpc.handlers.resources_rpc')
# Initialize RPC api directed to other neutron-servers
self.server_versions_rpc = resources_rpc.ResourcesPushToServersRpcApi()
def report_state(self, context, **kwargs):
"""Report state from agent to server.
@ -398,7 +453,20 @@ class AgentExtRpcCallback(object):
return
if not self.plugin:
self.plugin = manager.NeutronManager.get_plugin()
return self.plugin.create_or_update_agent(context, agent_state)
agent_status = self.plugin.create_or_update_agent(context, agent_state)
self._update_local_agent_resource_versions(context, agent_state)
return agent_status
def _update_local_agent_resource_versions(self, context, agent_state):
resource_versions_dict = agent_state.get('resource_versions', {})
version_manager.update_versions(
version_manager.AgentConsumer(agent_type=agent_state['agent_type'],
host=agent_state['host']),
resource_versions_dict)
# report other neutron-servers about this quickly
self.server_versions_rpc.report_agent_resource_versions(
context, agent_state['agent_type'], agent_state['host'],
resource_versions_dict)
def _check_clock_sync_on_agent_start(self, agent_state, agent_time):
"""Checks if the server and the agent times are in sync.

View File

@ -1 +1 @@
15e43b934f81
31ed664953e6

View File

@ -0,0 +1,34 @@
# Copyright 2016 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Add resource_versions row to agent table
Revision ID: 31ed664953e6
Revises: c3a73f615e4
Create Date: 2016-01-15 13:41:30.016915
"""
# revision identifiers, used by Alembic.
revision = '31ed664953e6'
down_revision = '15e43b934f81'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column('agents',
sa.Column('resource_versions', sa.String(length=8191)))

View File

@ -27,6 +27,7 @@ from neutron._i18n import _LE, _LI
from neutron.agent.l2.extensions import manager as ext_manager
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.callbacks import resources
from neutron.common import config as common_config
from neutron.common import constants
from neutron.common import topics
@ -84,12 +85,14 @@ class CommonAgentLoop(service.Service):
configurations = {'extensions': self.ext_manager.names()}
configurations.update(self.mgr.get_agent_configurations())
#TODO(mangelajo): optimize resource_versions (see ovs agent)
self.agent_state = {
'binary': self.agent_binary,
'host': cfg.CONF.host,
'topic': constants.L2_AGENT_TOPIC,
'configurations': configurations,
'agent_type': self.agent_type,
'resource_versions': resources.LOCAL_RESOURCE_VERSIONS,
'start_flag': True}
report_interval = cfg.CONF.AGENT.report_interval

View File

@ -29,6 +29,7 @@ from neutron._i18n import _, _LE, _LI, _LW
from neutron.agent.l2.extensions import manager as ext_manager
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.callbacks import resources
from neutron.common import config as common_config
from neutron.common import constants as n_constants
from neutron.common import topics
@ -125,12 +126,15 @@ class SriovNicSwitchAgent(object):
configurations = {'device_mappings': physical_devices_mappings,
'extensions': self.ext_manager.names()}
#TODO(mangelajo): optimize resource_versions (see ovs agent)
self.agent_state = {
'binary': 'neutron-sriov-nic-agent',
'host': self.conf.host,
'topic': n_constants.L2_AGENT_TOPIC,
'configurations': configurations,
'agent_type': n_constants.AGENT_TYPE_NIC_SWITCH,
'resource_versions': resources.LOCAL_RESOURCE_VERSIONS,
'start_flag': True}
# The initialization is complete; we can start receiving messages

View File

@ -37,6 +37,7 @@ from neutron.agent.linux import ip_lib
from neutron.agent.linux import polling as linux_polling
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import dvr_rpc
from neutron.common import config
from neutron.common import constants as n_const
@ -231,6 +232,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.enable_tunneling,
self.enable_distributed_routing)
#TODO(mangelajo): optimize resource_versions to only report
# versions about resources which are common,
# or which are used by specific extensions.
self.agent_state = {
'binary': 'neutron-openvswitch-agent',
'host': host,
@ -250,6 +254,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
'ovs_capabilities': self.ovs.capabilities,
'vhostuser_socket_dir':
ovs_conf.vhostuser_socket_dir},
'resource_versions': resources.LOCAL_RESOURCE_VERSIONS,
'agent_type': agent_conf.agent_type,
'start_flag': True}

View File

@ -211,6 +211,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.topic = topics.PLUGIN
self.conn = n_rpc.create_connection()
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
self.conn.create_consumer(
topics.SERVER_RESOURCE_VERSIONS,
[resources_rpc.ResourcesPushToServerRpcCallback()],
fanout=True)
# process state reports despite dedicated rpc workers
self.conn.create_consumer(topics.REPORTS,
[agents_db.AgentExtRpcCallback()],

View File

@ -10,16 +10,18 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import mock
from neutron.api.rpc.callbacks import exceptions
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.callbacks import version_manager
from neutron.tests import base
TEST_RESOURCE_TYPE = 'TestResourceType'
TEST_RESOURCE_VERSION_A = '1.11'
TEST_RESOURCE_VERSION_B = '1.12'
TEST_VERSION_A = '1.11'
TEST_VERSION_B = '1.12'
TEST_RESOURCE_TYPE_2 = 'AnotherResource'
@ -27,44 +29,85 @@ AGENT_HOST_1 = 'host-1'
AGENT_HOST_2 = 'host-2'
AGENT_TYPE_1 = 'dhcp-agent'
AGENT_TYPE_2 = 'openvswitch-agent'
CONSUMER_1 = version_manager.get_agent_consumer_id(AGENT_HOST_1, AGENT_TYPE_1)
CONSUMER_2 = version_manager.get_agent_consumer_id(AGENT_HOST_2, AGENT_TYPE_2)
CONSUMER_1 = version_manager.AgentConsumer(AGENT_TYPE_1, AGENT_HOST_1)
CONSUMER_2 = version_manager.AgentConsumer(AGENT_TYPE_2, AGENT_HOST_2)
class ResourceConsumerTrackerTest(base.BaseTestCase):
def test_consumer_set_versions(self):
cv = version_manager.ResourceConsumerTracker()
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE:
TEST_RESOURCE_VERSION_A})
self.assertIn(TEST_RESOURCE_VERSION_A,
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_VERSION_A})
self.assertIn(TEST_VERSION_A,
cv.get_resource_versions(TEST_RESOURCE_TYPE))
def test_consumer_updates_version(self):
cv = version_manager.ResourceConsumerTracker()
for version in [TEST_RESOURCE_VERSION_A, TEST_RESOURCE_VERSION_B]:
for version in [TEST_VERSION_A, TEST_VERSION_B]:
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: version})
self.assertEqual(set([TEST_RESOURCE_VERSION_B]),
self.assertEqual(set([TEST_VERSION_B]),
cv.get_resource_versions(TEST_RESOURCE_TYPE))
def test_multiple_consumer_version_update(self):
cv = version_manager.ResourceConsumerTracker()
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE:
TEST_RESOURCE_VERSION_A})
cv.set_versions(CONSUMER_2, {TEST_RESOURCE_TYPE:
TEST_RESOURCE_VERSION_A})
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE:
TEST_RESOURCE_VERSION_B})
self.assertEqual(set([TEST_RESOURCE_VERSION_A,
TEST_RESOURCE_VERSION_B]),
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_VERSION_A})
cv.set_versions(CONSUMER_2, {TEST_RESOURCE_TYPE: TEST_VERSION_A})
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_VERSION_B})
self.assertEqual(set([TEST_VERSION_A, TEST_VERSION_B]),
cv.get_resource_versions(TEST_RESOURCE_TYPE))
def test_consumer_downgrades_removing_resource(self):
cv = version_manager.ResourceConsumerTracker()
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_VERSION_B,
TEST_RESOURCE_TYPE_2: TEST_VERSION_A})
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_VERSION_A})
self.assertEqual(set(),
cv.get_resource_versions(TEST_RESOURCE_TYPE_2))
self.assertEqual(set([TEST_VERSION_A]),
cv.get_resource_versions(TEST_RESOURCE_TYPE))
def test_consumer_downgrades_stops_reporting(self):
cv = version_manager.ResourceConsumerTracker()
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_VERSION_B,
TEST_RESOURCE_TYPE_2: TEST_VERSION_A})
cv.set_versions(CONSUMER_1, {})
for resource_type in [TEST_RESOURCE_TYPE, TEST_RESOURCE_TYPE_2]:
self.assertEqual(set(),
cv.get_resource_versions(resource_type))
def test_compatibility_liberty_sriov_and_ovs_agents(self):
def _fake_local_versions(self):
local_versions = collections.defaultdict(set)
local_versions[resources.QOS_POLICY].add('1.11')
return local_versions
for agent_type in version_manager.NON_REPORTING_AGENT_TYPES:
consumer_id = version_manager.AgentConsumer(agent_type,
AGENT_HOST_1)
cv = version_manager.ResourceConsumerTracker()
cv._get_local_resource_versions = _fake_local_versions
cv._versions = _fake_local_versions(mock.ANY)
cv.set_versions(consumer_id, {})
self.assertEqual(set(['1.0', '1.11']),
cv.get_resource_versions(resources.QOS_POLICY))
def test_different_adds_triggers_recalculation(self):
cv = version_manager.ResourceConsumerTracker()
for version in [TEST_RESOURCE_VERSION_A, TEST_RESOURCE_VERSION_B]:
for version in [TEST_VERSION_A, TEST_VERSION_B]:
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: version})
self.assertTrue(cv._needs_recalculation)
@ -75,8 +118,7 @@ class ResourceConsumerTrackerTest(base.BaseTestCase):
class CachedResourceConsumerTrackerTest(base.BaseTestCase):
# TODO(mangelajo): re-enable once we provide the callback
def _test_exception_with_no_callback(self):
def test_exception_with_no_callback(self):
cached_tracker = version_manager.CachedResourceConsumerTracker()
self.assertRaises(
exceptions.VersionsCallbackNotFound,
@ -85,7 +127,7 @@ class CachedResourceConsumerTrackerTest(base.BaseTestCase):
def _set_consumer_versions_callback(self, cached_tracker):
def consumer_versions(rct):
rct.set_versions(CONSUMER_1,
{TEST_RESOURCE_TYPE: TEST_RESOURCE_VERSION_A})
{TEST_RESOURCE_TYPE: TEST_VERSION_A})
cached_tracker.set_consumer_versions_callback(consumer_versions)
@ -93,7 +135,7 @@ class CachedResourceConsumerTrackerTest(base.BaseTestCase):
cached_tracker = version_manager.CachedResourceConsumerTracker()
self._set_consumer_versions_callback(cached_tracker)
self.assertIn(TEST_RESOURCE_VERSION_A,
self.assertIn(TEST_VERSION_A,
cached_tracker.get_resource_versions(
TEST_RESOURCE_TYPE))
@ -108,8 +150,8 @@ class CachedResourceConsumerTrackerTest(base.BaseTestCase):
TEST_RESOURCE_TYPE_2)
cached_tracker.update_versions(
CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_RESOURCE_VERSION_B,
TEST_RESOURCE_TYPE_2: TEST_RESOURCE_VERSION_A})
CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_VERSION_B,
TEST_RESOURCE_TYPE_2: TEST_VERSION_A})
final_versions = cached_tracker.get_resource_versions(
TEST_RESOURCE_TYPE)
@ -124,7 +166,7 @@ class CachedResourceConsumerTrackerTest(base.BaseTestCase):
def consumer_versions_callback(consumer_tracker):
consumer_tracker.set_versions(
CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_RESOURCE_VERSION_A})
CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_VERSION_A})
self.refreshed = True
cached_tracker = version_manager.CachedResourceConsumerTracker()

View File

@ -143,6 +143,21 @@ class ResourcesPullRpcApiTestCase(ResourcesRpcBaseTestCase):
resource_id)
class ResourcesPushToServerRpcCallbackTestCase(ResourcesRpcBaseTestCase):
def test_report_versions(self):
callbacks = resources_rpc.ResourcesPushToServerRpcCallback()
with mock.patch('neutron.api.rpc.callbacks.version_manager'
'.update_versions') as update_versions:
version_map = {'A': '1.0'}
callbacks.report_agent_resource_versions(context=mock.ANY,
agent_type='DHCP Agent',
agent_host='fake-host',
version_map=version_map)
update_versions.assert_called_once_with(mock.ANY,
version_map)
class ResourcesPullRpcCallbackTestCase(ResourcesRpcBaseTestCase):
def setUp(self):

View File

@ -1,3 +1,4 @@
# pylint: disable=pointless-string-statement
# Copyright (c) 2013 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -26,6 +27,7 @@ from neutron.common import exceptions as n_exc
from neutron import context
from neutron.db import agents_db
from neutron.db import db_base_plugin_v2 as base_plugin
from neutron.tests import base
from neutron.tests.unit import testlib_api
# the below code is required for the following reason
@ -38,6 +40,15 @@ from neutron.tests.unit import testlib_api
load_tests = testscenarios.load_tests_apply_scenarios
TEST_RESOURCE_VERSIONS = {"A": "1.0"}
AGENT_STATUS = {'agent_type': 'Open vSwitch agent',
'binary': 'neutron-openvswitch-agent',
'host': 'overcloud-notcompute',
'topic': 'N/A',
'resource_versions': TEST_RESOURCE_VERSIONS}
TEST_TIME = '2016-02-26T17:08:06.116'
class FakePlugin(base_plugin.NeutronDbPluginV2, agents_db.AgentDbMixin):
"""A fake plugin class containing all DB methods."""
@ -55,7 +66,8 @@ class TestAgentsDbBase(testlib_api.SqlTestCase):
host=host,
agent_type=agent_type,
topic='foo_topic',
configurations="",
configurations="{}",
resource_versions="{}",
created_at=timeutils.utcnow(),
started_at=timeutils.utcnow(),
heartbeat_timestamp=timeutils.utcnow())
@ -67,12 +79,19 @@ class TestAgentsDbBase(testlib_api.SqlTestCase):
with self.context.session.begin(subtransactions=True):
self.context.session.add(agent)
def _create_and_save_agents(self, hosts, agent_type, down_agents_count=0):
def _create_and_save_agents(self, hosts, agent_type, down_agents_count=0,
down_but_version_considered=0):
agents = self._get_agents(hosts, agent_type)
# bring down the specified agents
for agent in agents[:down_agents_count]:
agent['heartbeat_timestamp'] -= datetime.timedelta(minutes=60)
# bring down just enough so their version is still considered
for agent in agents[down_agents_count:(
down_but_version_considered + down_agents_count)]:
agent['heartbeat_timestamp'] -= datetime.timedelta(
seconds=(cfg.CONF.agent_down_time + 1))
self._save_agents(agents)
return agents
@ -81,12 +100,7 @@ class TestAgentsDbMixin(TestAgentsDbBase):
def setUp(self):
super(TestAgentsDbMixin, self).setUp()
self.agent_status = {
'agent_type': 'Open vSwitch agent',
'binary': 'neutron-openvswitch-agent',
'host': 'overcloud-notcompute',
'topic': 'N/A'
}
self.agent_status = dict(AGENT_STATUS)
def test_get_enabled_agent_on_host_found(self):
agents = self._create_and_save_agents(['foo_host'],
@ -182,6 +196,27 @@ class TestAgentsDbMixin(TestAgentsDbBase):
" DHCP Agent 2015-05-06 22:40:40.432295 some.node"}
)
def test_get_dict(self):
db_obj = mock.Mock(conf1='{"test": "1234"}')
conf1 = self.plugin._get_dict(db_obj, 'conf1')
self.assertIn('test', conf1)
self.assertEqual("1234", conf1['test'])
def get_configurations_dict(self):
db_obj = mock.Mock(configurations='{"cfg1": "val1"}')
cfg = self.plugin.get_configuration_dict(db_obj)
self.assertIn('cfg', cfg)
def test__get_agents_resource_versions(self):
tracker = mock.Mock()
self._create_and_save_agents(
['host-%d' % i for i in range(5)],
constants.AGENT_TYPE_L3,
down_agents_count=3,
down_but_version_considered=2)
self.plugin._get_agents_resource_versions(tracker)
self.assertEqual(tracker.set_versions.call_count, 2)
class TestAgentsDbGetAgents(TestAgentsDbBase):
scenarios = [
@ -234,3 +269,33 @@ class TestAgentsDbGetAgents(TestAgentsDbBase):
self.agents_alive == 'true')
for agent in returned_agents:
self.assertEqual(alive, agent['alive'])
class TestAgentExtRpcCallback(base.BaseTestCase):
def setUp(self):
super(TestAgentExtRpcCallback, self).setUp()
self.plugin = mock.Mock()
self.context = mock.Mock()
self.callback = agents_db.AgentExtRpcCallback(self.plugin)
self.callback.server_versions_rpc = mock.Mock()
self.callback.START_TIME = datetime.datetime(datetime.MINYEAR, 1, 1)
self.update_versions = mock.patch(
'neutron.api.rpc.callbacks.version_manager.'
'update_versions').start()
self.agent_state = {'agent_state': dict(AGENT_STATUS)}
def test_create_or_update_agent_updates_version_manager(self):
self.callback.report_state(self.context, agent_state=self.agent_state,
time=TEST_TIME)
self.update_versions.assert_called_once_with(
mock.ANY, TEST_RESOURCE_VERSIONS)
def test_create_or_update_agent_updates_other_servers(self):
callback = self.callback
callback.report_state(self.context, agent_state=self.agent_state,
time=TEST_TIME)
report_agent_resource_versions = (
callback.server_versions_rpc.report_agent_resource_versions)
report_agent_resource_versions.assert_called_once_with(
mock.ANY, mock.ANY, mock.ANY, TEST_RESOURCE_VERSIONS)