Notify resource_versions from agents only when needed
resource_versions were included into agent state reports recently to
support rolling upgrades (commit 97a272a892
)
The downside is that it brought additional processing when handling state
reports on server side: update of local resources versions cache and
more seriously rpc casts to all other servers to do the same.
All this led to a visible performance degradation at scale with hundreds
of agents constantly sending reports. Under load (rally test) agents
may start "blinking" which makes cluster very unstable.
In fact there is no need to send and update resource_versions in each state
report. I see two cases when it should be done:
1) agent was restarted (after it was upgraded);
2) agent revived - which means that server was not receiving or being able
to process state reports for some time (agent_down_time). During that
time agent might be upgraded and restarted.
So this patch makes agents include resource_versions info only on startup.
After agent revival server itself will update version_manager with
resource_versions taken from agent DB record - this is to avoid
version_manager being outdated.
Closes-Bug: #1567497
Change-Id: I47a9869801f4e8f8af2a656749166b6fb49bcd3b
This commit is contained in:
parent
444b3a5d9a
commit
e532ee3fcc
|
@ -349,8 +349,10 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
|||
res['availability_zone'] = agent_state['availability_zone']
|
||||
configurations_dict = agent_state.get('configurations', {})
|
||||
res['configurations'] = jsonutils.dumps(configurations_dict)
|
||||
resource_versions_dict = agent_state.get('resource_versions', {})
|
||||
res['resource_versions'] = jsonutils.dumps(resource_versions_dict)
|
||||
resource_versions_dict = agent_state.get('resource_versions')
|
||||
if resource_versions_dict:
|
||||
res['resource_versions'] = jsonutils.dumps(
|
||||
resource_versions_dict)
|
||||
res['load'] = self._get_agent_load(agent_state)
|
||||
current_time = timeutils.utcnow()
|
||||
try:
|
||||
|
@ -358,6 +360,13 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
|||
context, agent_state['agent_type'], agent_state['host'])
|
||||
if not agent_db.is_active:
|
||||
status = constants.AGENT_REVIVED
|
||||
if 'resource_versions' not in agent_state:
|
||||
# updating agent_state with resource_versions taken
|
||||
# from db so that
|
||||
# _update_local_agent_resource_versions() will call
|
||||
# version_manager and bring it up to date
|
||||
agent_state['resource_versions'] = self._get_dict(
|
||||
agent_db, 'resource_versions')
|
||||
res['heartbeat_timestamp'] = current_time
|
||||
if agent_state.get('start_flag'):
|
||||
res['started_at'] = current_time
|
||||
|
@ -472,7 +481,10 @@ class AgentExtRpcCallback(object):
|
|||
return agent_status
|
||||
|
||||
def _update_local_agent_resource_versions(self, context, agent_state):
|
||||
resource_versions_dict = agent_state.get('resource_versions', {})
|
||||
resource_versions_dict = agent_state.get('resource_versions')
|
||||
if not resource_versions_dict:
|
||||
return
|
||||
|
||||
version_manager.update_versions(
|
||||
version_manager.AgentConsumer(agent_type=agent_state['agent_type'],
|
||||
host=agent_state['host']),
|
||||
|
|
|
@ -123,6 +123,8 @@ class CommonAgentLoop(service.Service):
|
|||
'Doing a full sync.'),
|
||||
self.agent_type)
|
||||
self.fullsync = True
|
||||
# we only want to update resource versions on startup
|
||||
self.agent_state.pop('resource_versions', None)
|
||||
self.agent_state.pop('start_flag', None)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Failed reporting state!"))
|
||||
|
|
|
@ -174,6 +174,9 @@ class SriovNicSwitchAgent(object):
|
|||
self.agent_state.get('configurations')['devices'] = devices
|
||||
self.state_rpc.report_state(self.context,
|
||||
self.agent_state)
|
||||
|
||||
# we only want to update resource versions on startup
|
||||
self.agent_state.pop('resource_versions', None)
|
||||
self.agent_state.pop('start_flag', None)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Failed reporting state!"))
|
||||
|
|
|
@ -316,6 +316,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
'Doing a full sync.'))
|
||||
self.fullsync = True
|
||||
|
||||
# we only want to update resource versions on startup
|
||||
self.agent_state.pop('resource_versions', None)
|
||||
if self.agent_state.pop('start_flag', None):
|
||||
# On initial start, we notify systemd after initialization
|
||||
# is complete.
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import copy
|
||||
import datetime
|
||||
import mock
|
||||
|
||||
|
@ -27,7 +28,6 @@ from neutron.common import exceptions as n_exc
|
|||
from neutron import context
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import db_base_plugin_v2 as base_plugin
|
||||
from neutron.tests import base
|
||||
from neutron.tests.unit import testlib_api
|
||||
|
||||
# the below code is required for the following reason
|
||||
|
@ -271,14 +271,13 @@ class TestAgentsDbGetAgents(TestAgentsDbBase):
|
|||
self.assertEqual(alive, agent['alive'])
|
||||
|
||||
|
||||
class TestAgentExtRpcCallback(base.BaseTestCase):
|
||||
class TestAgentExtRpcCallback(TestAgentsDbBase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestAgentExtRpcCallback, self).setUp()
|
||||
self.plugin = mock.Mock()
|
||||
self.context = mock.Mock()
|
||||
self.callback = agents_db.AgentExtRpcCallback(self.plugin)
|
||||
self.callback.server_versions_rpc = mock.Mock()
|
||||
self.versions_rpc = self.callback.server_versions_rpc
|
||||
self.callback.START_TIME = datetime.datetime(datetime.MINYEAR, 1, 1)
|
||||
self.update_versions = mock.patch(
|
||||
'neutron.api.rpc.callbacks.version_manager.'
|
||||
|
@ -296,6 +295,50 @@ class TestAgentExtRpcCallback(base.BaseTestCase):
|
|||
callback.report_state(self.context, agent_state=self.agent_state,
|
||||
time=TEST_TIME)
|
||||
report_agent_resource_versions = (
|
||||
callback.server_versions_rpc.report_agent_resource_versions)
|
||||
self.versions_rpc.report_agent_resource_versions)
|
||||
report_agent_resource_versions.assert_called_once_with(
|
||||
mock.ANY, mock.ANY, mock.ANY, TEST_RESOURCE_VERSIONS)
|
||||
|
||||
def test_no_version_updates_on_further_state_reports(self):
|
||||
self.test_create_or_update_agent_updates_version_manager()
|
||||
# agents include resource_versions only in the first report after
|
||||
# start so versions should not be updated on the second report
|
||||
second_agent_state = copy.deepcopy(self.agent_state)
|
||||
second_agent_state['agent_state'].pop('resource_versions')
|
||||
self.update_versions.reset_mock()
|
||||
report_agent_resource_versions = (
|
||||
self.versions_rpc.report_agent_resource_versions)
|
||||
report_agent_resource_versions.reset_mock()
|
||||
|
||||
self.callback.report_state(self.context,
|
||||
agent_state=second_agent_state,
|
||||
time=TEST_TIME)
|
||||
self.assertFalse(self.update_versions.called)
|
||||
self.assertFalse(report_agent_resource_versions.called)
|
||||
|
||||
def test_version_updates_on_agent_revival(self):
|
||||
self.test_create_or_update_agent_updates_version_manager()
|
||||
second_agent_state = copy.deepcopy(self.agent_state)
|
||||
second_agent_state['agent_state'].pop('resource_versions')
|
||||
self._take_down_agent()
|
||||
self.update_versions.reset_mock()
|
||||
report_agent_resource_versions = (
|
||||
self.versions_rpc.report_agent_resource_versions)
|
||||
report_agent_resource_versions.reset_mock()
|
||||
|
||||
# agent didn't include resource_versions in report but server will
|
||||
# take them from db for the revived agent
|
||||
self.callback.report_state(self.context,
|
||||
agent_state=second_agent_state,
|
||||
time=TEST_TIME)
|
||||
self.update_versions.assert_called_once_with(
|
||||
mock.ANY, TEST_RESOURCE_VERSIONS)
|
||||
report_agent_resource_versions.assert_called_once_with(
|
||||
mock.ANY, mock.ANY, mock.ANY, TEST_RESOURCE_VERSIONS)
|
||||
|
||||
def _take_down_agent(self):
|
||||
with self.context.session.begin(subtransactions=True):
|
||||
query = self.context.session.query(agents_db.Agent)
|
||||
agt = query.first()
|
||||
agt.heartbeat_timestamp = (
|
||||
agt.heartbeat_timestamp - datetime.timedelta(hours=1))
|
||||
|
|
Loading…
Reference in New Issue