Merge "Notify resource_versions from agents only when needed"
This commit is contained in:
commit
64e9796306
|
@ -349,8 +349,10 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
|||
res['availability_zone'] = agent_state['availability_zone']
|
||||
configurations_dict = agent_state.get('configurations', {})
|
||||
res['configurations'] = jsonutils.dumps(configurations_dict)
|
||||
resource_versions_dict = agent_state.get('resource_versions', {})
|
||||
res['resource_versions'] = jsonutils.dumps(resource_versions_dict)
|
||||
resource_versions_dict = agent_state.get('resource_versions')
|
||||
if resource_versions_dict:
|
||||
res['resource_versions'] = jsonutils.dumps(
|
||||
resource_versions_dict)
|
||||
res['load'] = self._get_agent_load(agent_state)
|
||||
current_time = timeutils.utcnow()
|
||||
try:
|
||||
|
@ -358,6 +360,13 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
|||
context, agent_state['agent_type'], agent_state['host'])
|
||||
if not agent_db.is_active:
|
||||
status = constants.AGENT_REVIVED
|
||||
if 'resource_versions' not in agent_state:
|
||||
# updating agent_state with resource_versions taken
|
||||
# from db so that
|
||||
# _update_local_agent_resource_versions() will call
|
||||
# version_manager and bring it up to date
|
||||
agent_state['resource_versions'] = self._get_dict(
|
||||
agent_db, 'resource_versions')
|
||||
res['heartbeat_timestamp'] = current_time
|
||||
if agent_state.get('start_flag'):
|
||||
res['started_at'] = current_time
|
||||
|
@ -472,7 +481,10 @@ class AgentExtRpcCallback(object):
|
|||
return agent_status
|
||||
|
||||
def _update_local_agent_resource_versions(self, context, agent_state):
|
||||
resource_versions_dict = agent_state.get('resource_versions', {})
|
||||
resource_versions_dict = agent_state.get('resource_versions')
|
||||
if not resource_versions_dict:
|
||||
return
|
||||
|
||||
version_manager.update_versions(
|
||||
version_manager.AgentConsumer(agent_type=agent_state['agent_type'],
|
||||
host=agent_state['host']),
|
||||
|
|
|
@ -123,6 +123,8 @@ class CommonAgentLoop(service.Service):
|
|||
'Doing a full sync.'),
|
||||
self.agent_type)
|
||||
self.fullsync = True
|
||||
# we only want to update resource versions on startup
|
||||
self.agent_state.pop('resource_versions', None)
|
||||
self.agent_state.pop('start_flag', None)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Failed reporting state!"))
|
||||
|
|
|
@ -174,6 +174,9 @@ class SriovNicSwitchAgent(object):
|
|||
self.agent_state.get('configurations')['devices'] = devices
|
||||
self.state_rpc.report_state(self.context,
|
||||
self.agent_state)
|
||||
|
||||
# we only want to update resource versions on startup
|
||||
self.agent_state.pop('resource_versions', None)
|
||||
self.agent_state.pop('start_flag', None)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Failed reporting state!"))
|
||||
|
|
|
@ -316,6 +316,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
'Doing a full sync.'))
|
||||
self.fullsync = True
|
||||
|
||||
# we only want to update resource versions on startup
|
||||
self.agent_state.pop('resource_versions', None)
|
||||
if self.agent_state.pop('start_flag', None):
|
||||
# On initial start, we notify systemd after initialization
|
||||
# is complete.
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import copy
|
||||
import datetime
|
||||
import mock
|
||||
|
||||
|
@ -27,7 +28,6 @@ from neutron.common import exceptions as n_exc
|
|||
from neutron import context
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import db_base_plugin_v2 as base_plugin
|
||||
from neutron.tests import base
|
||||
from neutron.tests.unit import testlib_api
|
||||
|
||||
# the below code is required for the following reason
|
||||
|
@ -271,14 +271,13 @@ class TestAgentsDbGetAgents(TestAgentsDbBase):
|
|||
self.assertEqual(alive, agent['alive'])
|
||||
|
||||
|
||||
class TestAgentExtRpcCallback(base.BaseTestCase):
|
||||
class TestAgentExtRpcCallback(TestAgentsDbBase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestAgentExtRpcCallback, self).setUp()
|
||||
self.plugin = mock.Mock()
|
||||
self.context = mock.Mock()
|
||||
self.callback = agents_db.AgentExtRpcCallback(self.plugin)
|
||||
self.callback.server_versions_rpc = mock.Mock()
|
||||
self.versions_rpc = self.callback.server_versions_rpc
|
||||
self.callback.START_TIME = datetime.datetime(datetime.MINYEAR, 1, 1)
|
||||
self.update_versions = mock.patch(
|
||||
'neutron.api.rpc.callbacks.version_manager.'
|
||||
|
@ -296,6 +295,50 @@ class TestAgentExtRpcCallback(base.BaseTestCase):
|
|||
callback.report_state(self.context, agent_state=self.agent_state,
|
||||
time=TEST_TIME)
|
||||
report_agent_resource_versions = (
|
||||
callback.server_versions_rpc.report_agent_resource_versions)
|
||||
self.versions_rpc.report_agent_resource_versions)
|
||||
report_agent_resource_versions.assert_called_once_with(
|
||||
mock.ANY, mock.ANY, mock.ANY, TEST_RESOURCE_VERSIONS)
|
||||
|
||||
def test_no_version_updates_on_further_state_reports(self):
|
||||
self.test_create_or_update_agent_updates_version_manager()
|
||||
# agents include resource_versions only in the first report after
|
||||
# start so versions should not be updated on the second report
|
||||
second_agent_state = copy.deepcopy(self.agent_state)
|
||||
second_agent_state['agent_state'].pop('resource_versions')
|
||||
self.update_versions.reset_mock()
|
||||
report_agent_resource_versions = (
|
||||
self.versions_rpc.report_agent_resource_versions)
|
||||
report_agent_resource_versions.reset_mock()
|
||||
|
||||
self.callback.report_state(self.context,
|
||||
agent_state=second_agent_state,
|
||||
time=TEST_TIME)
|
||||
self.assertFalse(self.update_versions.called)
|
||||
self.assertFalse(report_agent_resource_versions.called)
|
||||
|
||||
def test_version_updates_on_agent_revival(self):
|
||||
self.test_create_or_update_agent_updates_version_manager()
|
||||
second_agent_state = copy.deepcopy(self.agent_state)
|
||||
second_agent_state['agent_state'].pop('resource_versions')
|
||||
self._take_down_agent()
|
||||
self.update_versions.reset_mock()
|
||||
report_agent_resource_versions = (
|
||||
self.versions_rpc.report_agent_resource_versions)
|
||||
report_agent_resource_versions.reset_mock()
|
||||
|
||||
# agent didn't include resource_versions in report but server will
|
||||
# take them from db for the revived agent
|
||||
self.callback.report_state(self.context,
|
||||
agent_state=second_agent_state,
|
||||
time=TEST_TIME)
|
||||
self.update_versions.assert_called_once_with(
|
||||
mock.ANY, TEST_RESOURCE_VERSIONS)
|
||||
report_agent_resource_versions.assert_called_once_with(
|
||||
mock.ANY, mock.ANY, mock.ANY, TEST_RESOURCE_VERSIONS)
|
||||
|
||||
def _take_down_agent(self):
|
||||
with self.context.session.begin(subtransactions=True):
|
||||
query = self.context.session.query(agents_db.Agent)
|
||||
agt = query.first()
|
||||
agt.heartbeat_timestamp = (
|
||||
agt.heartbeat_timestamp - datetime.timedelta(hours=1))
|
||||
|
|
Loading…
Reference in New Issue