Notify resource_versions from agents only when needed

resource_versions were included into agent state reports recently to
support rolling upgrades (commit 97a272a892)
The downside is that it brought additional processing when handling state
reports on server side: update of local resources versions cache and
more seriously rpc casts to all other servers to do the same.
All this led to a visible performance degradation at scale with hundreds
of agents constantly sending reports. Under load (rally test) agents
may start "blinking" which makes cluster very unstable.

In fact there is no need to send and update resource_versions in each state
report. I see two cases when it should be done:
 1) agent was restarted (after it was upgraded);
 2) agent revived - which means that server was not receiving or being able
    to process state reports for some time (agent_down_time). During that
    time agent might be upgraded and restarted.

So this patch makes agents include resource_versions info only on startup.
After agent revival server itself will update version_manager with
resource_versions taken from agent DB record - this is to avoid
version_manager being outdated.

Closes-Bug: #1567497
Change-Id: I47a9869801f4e8f8af2a656749166b6fb49bcd3b
(cherry picked from commit e532ee3fcc)
This commit is contained in:
Oleg Bondarev 2016-04-07 16:45:52 +03:00
parent de3813296d
commit 05a4a34b7e
5 changed files with 70 additions and 8 deletions

View File

@ -336,8 +336,10 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
res['availability_zone'] = agent_state['availability_zone']
configurations_dict = agent_state.get('configurations', {})
res['configurations'] = jsonutils.dumps(configurations_dict)
resource_versions_dict = agent_state.get('resource_versions', {})
res['resource_versions'] = jsonutils.dumps(resource_versions_dict)
resource_versions_dict = agent_state.get('resource_versions')
if resource_versions_dict:
res['resource_versions'] = jsonutils.dumps(
resource_versions_dict)
res['load'] = self._get_agent_load(agent_state)
current_time = timeutils.utcnow()
try:
@ -345,6 +347,13 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
context, agent_state['agent_type'], agent_state['host'])
if not agent_db.is_active:
status = constants.AGENT_REVIVED
if 'resource_versions' not in agent_state:
# updating agent_state with resource_versions taken
# from db so that
# _update_local_agent_resource_versions() will call
# version_manager and bring it up to date
agent_state['resource_versions'] = self._get_dict(
agent_db, 'resource_versions')
res['heartbeat_timestamp'] = current_time
if agent_state.get('start_flag'):
res['started_at'] = current_time
@ -458,7 +467,10 @@ class AgentExtRpcCallback(object):
return agent_status
def _update_local_agent_resource_versions(self, context, agent_state):
resource_versions_dict = agent_state.get('resource_versions', {})
resource_versions_dict = agent_state.get('resource_versions')
if not resource_versions_dict:
return
version_manager.update_versions(
version_manager.AgentConsumer(agent_type=agent_state['agent_type'],
host=agent_state['host']),

View File

@ -123,6 +123,8 @@ class CommonAgentLoop(service.Service):
'Doing a full sync.'),
self.agent_type)
self.fullsync = True
# we only want to update resource versions on startup
self.agent_state.pop('resource_versions', None)
self.agent_state.pop('start_flag', None)
except Exception:
LOG.exception(_LE("Failed reporting state!"))

View File

@ -174,6 +174,9 @@ class SriovNicSwitchAgent(object):
self.agent_state.get('configurations')['devices'] = devices
self.state_rpc.report_state(self.context,
self.agent_state)
# we only want to update resource versions on startup
self.agent_state.pop('resource_versions', None)
self.agent_state.pop('start_flag', None)
except Exception:
LOG.exception(_LE("Failed reporting state!"))

View File

@ -316,6 +316,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
'Doing a full sync.'))
self.fullsync = True
# we only want to update resource versions on startup
self.agent_state.pop('resource_versions', None)
if self.agent_state.pop('start_flag', None):
# On initial start, we notify systemd after initialization
# is complete.

View File

@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import datetime
import mock
@ -27,7 +28,6 @@ from neutron.common import exceptions as n_exc
from neutron import context
from neutron.db import agents_db
from neutron.db import db_base_plugin_v2 as base_plugin
from neutron.tests import base
from neutron.tests.unit import testlib_api
# the below code is required for the following reason
@ -271,14 +271,13 @@ class TestAgentsDbGetAgents(TestAgentsDbBase):
self.assertEqual(alive, agent['alive'])
class TestAgentExtRpcCallback(base.BaseTestCase):
class TestAgentExtRpcCallback(TestAgentsDbBase):
def setUp(self):
super(TestAgentExtRpcCallback, self).setUp()
self.plugin = mock.Mock()
self.context = mock.Mock()
self.callback = agents_db.AgentExtRpcCallback(self.plugin)
self.callback.server_versions_rpc = mock.Mock()
self.versions_rpc = self.callback.server_versions_rpc
self.callback.START_TIME = datetime.datetime(datetime.MINYEAR, 1, 1)
self.update_versions = mock.patch(
'neutron.api.rpc.callbacks.version_manager.'
@ -296,6 +295,50 @@ class TestAgentExtRpcCallback(base.BaseTestCase):
callback.report_state(self.context, agent_state=self.agent_state,
time=TEST_TIME)
report_agent_resource_versions = (
callback.server_versions_rpc.report_agent_resource_versions)
self.versions_rpc.report_agent_resource_versions)
report_agent_resource_versions.assert_called_once_with(
mock.ANY, mock.ANY, mock.ANY, TEST_RESOURCE_VERSIONS)
def test_no_version_updates_on_further_state_reports(self):
self.test_create_or_update_agent_updates_version_manager()
# agents include resource_versions only in the first report after
# start so versions should not be updated on the second report
second_agent_state = copy.deepcopy(self.agent_state)
second_agent_state['agent_state'].pop('resource_versions')
self.update_versions.reset_mock()
report_agent_resource_versions = (
self.versions_rpc.report_agent_resource_versions)
report_agent_resource_versions.reset_mock()
self.callback.report_state(self.context,
agent_state=second_agent_state,
time=TEST_TIME)
self.assertFalse(self.update_versions.called)
self.assertFalse(report_agent_resource_versions.called)
def test_version_updates_on_agent_revival(self):
self.test_create_or_update_agent_updates_version_manager()
second_agent_state = copy.deepcopy(self.agent_state)
second_agent_state['agent_state'].pop('resource_versions')
self._take_down_agent()
self.update_versions.reset_mock()
report_agent_resource_versions = (
self.versions_rpc.report_agent_resource_versions)
report_agent_resource_versions.reset_mock()
# agent didn't include resource_versions in report but server will
# take them from db for the revived agent
self.callback.report_state(self.context,
agent_state=second_agent_state,
time=TEST_TIME)
self.update_versions.assert_called_once_with(
mock.ANY, TEST_RESOURCE_VERSIONS)
report_agent_resource_versions.assert_called_once_with(
mock.ANY, mock.ANY, mock.ANY, TEST_RESOURCE_VERSIONS)
def _take_down_agent(self):
with self.context.session.begin(subtransactions=True):
query = self.context.session.query(agents_db.Agent)
agt = query.first()
agt.heartbeat_timestamp = (
agt.heartbeat_timestamp - datetime.timedelta(hours=1))