From 8332a396b1b046eb370c0cb377d836d0c6b6d6ca Mon Sep 17 00:00:00 2001 From: Deepak Tiwari Date: Thu, 12 Sep 2019 18:14:40 -0500 Subject: [PATCH] Enhancements to TaaS agent driver failure handling As per current handling if there is any failure in the taas agent driver there is no way to convey that to the plugin. Due to which end user gets the impression that everything went well. This is confusing. Now, as part of this enhancement, when the tap resources are :- 1. Created: iniitally they shall be created with status 'DOWN' by TaaS plugin. Once the TaaS agent driver exectuion is finished, it will ask the plugin to set the status to either 'ACTIVE' or 'ERROR' depending on whether the driver was able to do its tasks successfully or not. 2. Deleted: as per curretn handling plugin used to first delete the resources from DB and then inform the agent. Now the sequence would be that plugin informs agent first. Agent shall ask the plugin to set the resource status t either INACTIVE (indicates successful deletion, whereby plugin shall clear the resources from the DB) or PENDING_DELETE (failure in deleting the resources from driver, whereby plugin would simply update the status for the resource in DB). Change-Id: If8b1aba3b3955fd705f2a13a79c7225a03369da6 --- neutron_taas/db/taas_db.py | 4 +- .../services/taas/agents/common/taas_agent.py | 103 ++++++++++++++++-- .../services/taas/service_drivers/taas_rpc.py | 64 ++++++++++- neutron_taas/services/taas/taas_plugin.py | 40 ++++--- .../unit/services/taas/test_taas_plugin.py | 87 ++++++++------- 5 files changed, 231 insertions(+), 67 deletions(-) diff --git a/neutron_taas/db/taas_db.py b/neutron_taas/db/taas_db.py index 965de28b..029e2941 100644 --- a/neutron_taas/db/taas_db.py +++ b/neutron_taas/db/taas_db.py @@ -146,7 +146,7 @@ class Taas_db_Mixin(taas.TaasPluginBase): name=t_s['name'], description=t_s['description'], port_id=t_s['port_id'], - status=constants.ACTIVE, + status=constants.DOWN, ) context.session.add(tap_service_db) @@ -211,7 +211,7 @@ class Taas_db_Mixin(taas.TaasPluginBase): tap_service_id=t_f['tap_service_id'], source_port=t_f['source_port'], direction=t_f['direction'], - status=constants.ACTIVE, + status=constants.DOWN, vlan_filter=t_f['vlan_filter'], ) context.session.add(tap_flow_db) diff --git a/neutron_taas/services/taas/agents/common/taas_agent.py b/neutron_taas/services/taas/agents/common/taas_agent.py index 40678270..a6583ac9 100644 --- a/neutron_taas/services/taas/agents/common/taas_agent.py +++ b/neutron_taas/services/taas/agents/common/taas_agent.py @@ -22,6 +22,8 @@ from neutron_taas.services.taas.drivers.linux \ from neutron_taas.common import topics from neutron_taas.services.taas.agents import taas_agent_api as api +from neutron_lib.api.definitions import portbindings +from neutron_lib import constants from neutron_lib import context as neutron_context from neutron_lib import rpc as n_rpc from oslo_config import cfg @@ -53,6 +55,32 @@ class TaasPluginApi(api.TaasPluginApiMixin): return + def set_tap_service_status(self, msg, status, host): + LOG.debug("In RPC Call for set tap service status: Host=%s, MSG=%s, " + "Status=%s" % + (host, msg, status)) + + context = neutron_context.get_admin_context() + + cctxt = self.client.prepare(fanout=False) + cctxt.cast(context, 'set_tap_service_status', msg=msg, status=status, + host=host) + + return + + def set_tap_flow_status(self, msg, status, host): + LOG.debug("In RPC Call for set tap flow status: Host=%s, MSG=%s, " + "Status=%s" % + (host, msg, status)) + + context = neutron_context.get_admin_context() + + cctxt = self.client.prepare(fanout=False) + cctxt.cast(context, 'set_tap_flow_status', msg=msg, status=status, + host=host) + + return + class TaasAgentRpcCallback(api.TaasAgentRpcCallbackMixin): @@ -70,7 +98,30 @@ class TaasAgentRpcCallback(api.TaasAgentRpcCallbackMixin): 'neutron_taas.taas.agent_drivers', self.driver_type)() self.taas_driver.consume_api(self.agent_api) self.taas_driver.initialize() - + self.func_dict = { + 'create_tap_service': { + 'msg_name': 'tap_service', + 'set_status_func_name': 'set_tap_service_status', + 'fail_status': constants.ERROR, + 'succ_status': constants.ACTIVE}, + 'create_tap_flow': { + 'msg_name': 'tap_flow', + 'set_status_func_name': 'set_tap_flow_status', + 'fail_status': constants.ERROR, + 'succ_status': constants.ACTIVE}, + 'delete_tap_service': { + 'msg_name': 'tap_service', + 'set_status_func_name': 'set_tap_service_status', + 'fail_status': constants.PENDING_DELETE, + 'succ_status': constants.INACTIVE}, + 'delete_tap_flow': { + 'msg_name': 'tap_flow', + 'set_status_func_name': 'set_tap_flow_status', + 'fail_status': constants.PENDING_DELETE, + 'succ_status': constants.INACTIVE} + } + self.portbind_drivers_map = {portbindings.VNIC_DIRECT: 'sriov', + portbindings.VNIC_NORMAL: 'ovs'} self._taas_rpc_setup() TaasAgentService(self).start(self.taas_plugin_rpc, self.conf.host) @@ -81,16 +132,33 @@ class TaasAgentRpcCallback(api.TaasAgentRpcCallbackMixin): LOG.debug("Invoking Driver for %(func_name)s from agent", {'func_name': func_name}) + status_msg = {'id': args[self.func_dict[func_name]['msg_name']]['id']} + try: self.taas_driver.__getattribute__(func_name)(args) except Exception: - LOG.debug("Failed to invoke the driver") + LOG.error("Failed to invoke the driver") - return + self.taas_plugin_rpc.__getattribute__( + self.func_dict[func_name]['set_status_func_name'])( + status_msg, + self.func_dict[func_name]['fail_status'], + self.conf.host) + return + + self.taas_plugin_rpc.__getattribute__( + self.func_dict[func_name]['set_status_func_name'])( + status_msg, + self.func_dict[func_name]['succ_status'], + self.conf.host) def create_tap_service(self, context, tap_service, host): """Handle Rpc from plugin to create a tap_service.""" - if host != self.conf.host: + if not self._driver_and_host_verification(host, tap_service['port']): + LOG.debug("RPC Call for Create Tap Serv. Either Host value [%s]" + "(received in RPC) doesn't match the host value " + "stored in agent [%s], or incompatible driver type. " + "Ignoring the message." % (host, self.conf.host)) return LOG.debug("In RPC Call for Create Tap Service: MSG=%s" % tap_service) @@ -100,7 +168,11 @@ class TaasAgentRpcCallback(api.TaasAgentRpcCallbackMixin): 'create_tap_service') def create_tap_flow(self, context, tap_flow_msg, host): - if host != self.conf.host: + if not self._driver_and_host_verification(host, tap_flow_msg['port']): + LOG.debug("RPC Call for Create Tap Flow. Either Host value [%s]" + "(received in RPC) doesn't match the host value " + "stored in agent [%s], or incompatible driver type. " + "Ignoring the message." % (host, self.conf.host)) return LOG.debug("In RPC Call for Create Tap Flow: MSG=%s" % tap_flow_msg) @@ -115,6 +187,10 @@ class TaasAgentRpcCallback(api.TaasAgentRpcCallbackMixin): # where the source and/or destination ports associated # with this tap service were residing. # + if not self._is_driver_port_type_compatible(tap_service['port']): + LOG.debug("RPC Call for Delete Tap Service. Incompatible driver " + "type. Ignoring the message. Host=[%s]" % (host)) + return LOG.debug("In RPC Call for Delete Tap Service: MSG=%s" % tap_service) return self._invoke_driver_for_plugin_api( @@ -123,10 +199,11 @@ class TaasAgentRpcCallback(api.TaasAgentRpcCallbackMixin): 'delete_tap_service') def delete_tap_flow(self, context, tap_flow_msg, host): - if host != self.conf.host: - LOG.debug("RPC Call for Delete Tap Flow. Host value [%s]" + if not self._driver_and_host_verification(host, tap_flow_msg['port']): + LOG.debug("RPC Call for Delete Tap Flow. Either Host value [%s]" "(received in RPC) doesn't match the host value " - "stored in agent [%s]" % (host, self.conf.host)) + "stored in agent [%s], or incompatible driver type. " + "Ignoring the message." % (host, self.conf.host)) return LOG.debug("In RPC Call for Delete Tap Flow: MSG=%s" % tap_flow_msg) @@ -154,6 +231,16 @@ class TaasAgentRpcCallback(api.TaasAgentRpcCallbackMixin): def get_driver_type(self): return self.driver_type + def _is_driver_port_type_compatible(self, port): + return ( + port.get(portbindings.VNIC_TYPE) in self.portbind_drivers_map and + self.portbind_drivers_map[port.get(portbindings.VNIC_TYPE)] == + self.driver_type) + + def _driver_and_host_verification(self, host, port): + return ((host == self.conf.host) and + self._is_driver_port_type_compatible(port)) + class TaasAgentService(service.Service): def __init__(self, driver): diff --git a/neutron_taas/services/taas/service_drivers/taas_rpc.py b/neutron_taas/services/taas/service_drivers/taas_rpc.py index f4634b84..1f223a55 100644 --- a/neutron_taas/services/taas/service_drivers/taas_rpc.py +++ b/neutron_taas/services/taas/service_drivers/taas_rpc.py @@ -90,6 +90,58 @@ class TaasCallbacks(object): super(TaasPlugin, self.plugin).delete_tap_flow( context, tf['id']) + def set_tap_service_status(self, context, msg, status, host=None): + """Handle Rpc from Agent to set the status of Tap resources.""" + LOG.info("In RPC Call to set tap service status: Host=%s, " + "MSG=%s, STATUS=%s" % (host, msg, status)) + + # Clear the resource from DB once agent indicates successful deletion + # by mech driver. + if status == constants.INACTIVE: + with context.session.begin(subtransactions=True): + ts = self.plugin.get_tap_service(context, msg['id']) + driver_context = sd_context.TapServiceContext(self.plugin, + context, + ts) + super(TaasPlugin, self.plugin).delete_tap_service(context, + msg['id']) + self.plugin.driver.delete_tap_service_postcommit( + driver_context) + return + + with context.session.begin(subtransactions=True): + ts = self.plugin.get_tap_service(context, msg['id']) + ts['status'] = status + super(TaasPlugin, self.plugin).update_tap_service( + context, + msg['id'], + {'tap_service': ts}) + + def set_tap_flow_status(self, context, msg, status, host=None): + """Handle Rpc from Agent to set the status of Tap resources.""" + LOG.info("In RPC Call to set tap flow status: Host=%s, " + "MSG=%s, STATUS=%s" % (host, msg, status)) + + # Clear the resource from DB once agent indicates successful deletion + # by mech driver. + if status == constants.INACTIVE: + with context.session.begin(subtransactions=True): + tf = self.plugin.get_tap_flow(context, msg['id']) + driver_context = sd_context.TapFlowContext(self.plugin, + context, + tf) + super(TaasPlugin, self.plugin).delete_tap_flow(context, + msg['id']) + self.plugin.driver.delete_tap_flow_postcommit(driver_context) + return + + with context.session.begin(subtransactions=True): + tf = self.plugin.get_tap_flow(context, msg['id']) + tf['status'] = status + super(TaasPlugin, self.plugin).update_tap_flow(context, + msg['id'], + {'tap_flow': tf}) + class TaasRpcDriver(service_drivers.TaasBaseDriver): """Taas Rpc Service Driver class""" @@ -148,9 +200,6 @@ class TaasRpcDriver(service_drivers.TaasBaseDriver): return def delete_tap_service_precommit(self, context): - pass - - def delete_tap_service_postcommit(self, context): """Send tap service deletion RPC message to agent. This RPC message includes taas_id that is added vlan_range_start to @@ -178,6 +227,9 @@ class TaasRpcDriver(service_drivers.TaasBaseDriver): rpc_msg, host) return + def delete_tap_service_postcommit(self, context): + pass + def create_tap_flow_precommit(self, context): pass @@ -208,9 +260,6 @@ class TaasRpcDriver(service_drivers.TaasBaseDriver): return def delete_tap_flow_precommit(self, context): - pass - - def delete_tap_flow_postcommit(self, context): """Send tap flow deletion RPC message to agent.""" tf = context.tap_flow taas_id = self._get_taas_id(context._plugin_context, tf) @@ -275,3 +324,6 @@ class TaasRpcDriver(service_drivers.TaasBaseDriver): self.agent_rpc.delete_tap_flow(context._plugin_context, rpc_msg, host) return + + def delete_tap_flow_postcommit(self, context): + pass diff --git a/neutron_taas/services/taas/taas_plugin.py b/neutron_taas/services/taas/taas_plugin.py index 2052b66c..887b7689 100644 --- a/neutron_taas/services/taas/taas_plugin.py +++ b/neutron_taas/services/taas/taas_plugin.py @@ -20,6 +20,7 @@ from neutron.services import service_base from neutron_lib.callbacks import events from neutron_lib.callbacks import registry from neutron_lib.callbacks import resources +from neutron_lib import constants from neutron_lib import exceptions as n_exc from neutron_taas.common import constants as taas_consts @@ -123,15 +124,21 @@ class TaasPlugin(taas_db.Taas_db_Mixin): with context.session.begin(subtransactions=True): ts = self.get_tap_service(context, id) driver_context = sd_context.TapServiceContext(self, context, ts) - super(TaasPlugin, self).delete_tap_service(context, id) - self.driver.delete_tap_service_precommit(driver_context) + if ts['status'] == constants.ACTIVE: + ts['status'] = constants.PENDING_DELETE + super(TaasPlugin, self).update_tap_service( + context, id, {'tap_service': ts}) + method = self.driver.delete_tap_service_precommit + else: + super(TaasPlugin, self).delete_tap_service(context, id) + method = self.driver.delete_tap_service_postcommit - try: - self.driver.delete_tap_service_postcommit(driver_context) - except Exception: - with excutils.save_and_reraise_exception(): - LOG.error("Failed to delete tap service on driver. " - "tap_sevice: %s", id) + try: + method(driver_context) + except Exception: + with excutils.save_and_reraise_exception(): + LOG.error("Failed to delete tap service on driver. " + "tap_sevice: %s", id) def create_tap_flow(self, context, tap_flow): LOG.debug("create_tap_flow() called") @@ -170,13 +177,18 @@ class TaasPlugin(taas_db.Taas_db_Mixin): with context.session.begin(subtransactions=True): tf = self.get_tap_flow(context, id) driver_context = sd_context.TapFlowContext(self, context, tf) - super(TaasPlugin, self).delete_tap_flow(context, id) - self.driver.delete_tap_flow_precommit(driver_context) + if tf['status'] == constants.ACTIVE: + tf['status'] = constants.PENDING_DELETE + super(TaasPlugin, self).update_tap_flow(context, id, + {'tap_flow': tf}) + method = self.driver.delete_tap_flow_precommit + else: + super(TaasPlugin, self).delete_tap_flow(context, id) + method = self.driver.delete_tap_flow_postcommit - try: - self.driver.delete_tap_flow_postcommit(driver_context) - except Exception: - with excutils.save_and_reraise_exception(): + try: + method(driver_context) + except Exception: with excutils.save_and_reraise_exception(): LOG.error("Failed to delete tap flow on driver. " "tap_flow: %s", id) diff --git a/neutron_taas/tests/unit/services/taas/test_taas_plugin.py b/neutron_taas/tests/unit/services/taas/test_taas_plugin.py index 45fbe2e8..1c7a0120 100644 --- a/neutron_taas/tests/unit/services/taas/test_taas_plugin.py +++ b/neutron_taas/tests/unit/services/taas/test_taas_plugin.py @@ -18,6 +18,7 @@ import contextlib import testtools from unittest import mock +from neutron_lib import constants from neutron_lib import context from neutron_lib import rpc as n_rpc from neutron_lib.utils import net as n_utils @@ -29,6 +30,7 @@ from neutron.tests.unit import testlib_api import neutron_taas.db.taas_db # noqa import neutron_taas.extensions.taas as taas_ext from neutron_taas.services.taas.service_drivers import taas_agent_api +from neutron_taas.services.taas.service_drivers import taas_rpc from neutron_taas.services.taas import taas_plugin @@ -50,6 +52,7 @@ class TestTaasPlugin(testlib_api.SqlTestCase): return_value=mock.MagicMock()).start() self._plugin = taas_plugin.TaasPlugin() self._context = context.get_admin_context() + self.taas_cbs = taas_rpc.TaasCallbacks(self.driver, self._plugin) self._project_id = self._tenant_id = 'tenant-X' self._network_id = uuidutils.generate_uuid() @@ -86,9 +89,9 @@ class TestTaasPlugin(testlib_api.SqlTestCase): } with mock.patch.object(self._plugin, '_get_port_details', return_value=self._port_details): - yield self._plugin.create_tap_service(self._context, req) + self._plugin.create_tap_service(self._context, req) self._tap_service['id'] = mock.ANY - self._tap_service['status'] = 'ACTIVE' + self._tap_service['status'] = constants.DOWN self.driver.assert_has_calls([ mock.call.create_tap_service_precommit(mock.ANY), @@ -100,6 +103,13 @@ class TestTaasPlugin(testlib_api.SqlTestCase): post_args = self.driver.create_tap_service_postcommit.call_args[0][0] self.assertEqual(self._context, post_args._plugin_context) self.assertEqual(self._tap_service, post_args.tap_service) + self.taas_cbs.set_tap_service_status( + self._context, + {'id': pre_args.tap_service['id']}, + constants.ACTIVE, "dummyHost") + self._tap_service['status'] = constants.ACTIVE + yield self._plugin.get_tap_service(self._context, + pre_args.tap_service['id']) @contextlib.contextmanager def tap_flow(self, tap_service, tenant_id=None): @@ -111,9 +121,9 @@ class TestTaasPlugin(testlib_api.SqlTestCase): } with mock.patch.object(self._plugin, '_get_port_details', return_value=self._port_details): - yield self._plugin.create_tap_flow(self._context, req) + self._plugin.create_tap_flow(self._context, req) self._tap_flow['id'] = mock.ANY - self._tap_flow['status'] = 'ACTIVE' + self._tap_flow['status'] = constants.DOWN self._tap_service['id'] = mock.ANY self._tap_flow['vlan_filter'] = mock.ANY @@ -127,6 +137,13 @@ class TestTaasPlugin(testlib_api.SqlTestCase): post_args = self.driver.create_tap_flow_postcommit.call_args[0][0] self.assertEqual(self._context, post_args._plugin_context) self.assertEqual(self._tap_flow, post_args.tap_flow) + self.taas_cbs.set_tap_flow_status( + self._context, + {'id': pre_args.tap_flow['id']}, + constants.ACTIVE, "dummyHost") + self._tap_flow['status'] = constants.ACTIVE + yield self._plugin.get_tap_flow(self._context, + pre_args.tap_flow['id']) def test_create_tap_service(self): with self.tap_service(): @@ -154,6 +171,10 @@ class TestTaasPlugin(testlib_api.SqlTestCase): ) # free an tap_id and verify could reallocate same taas id self._plugin.delete_tap_service(self._context, ts_id_1) + self.taas_cbs.set_tap_service_status(self._context, + {'id': ts_id_1}, + constants.INACTIVE, + "dummyHost") tap_id_assoc_3 = self._plugin.create_tap_id_association( self._context, ts_id_3) self.assertEqual(set([1, 2]), set([tap_id_assoc_3['taas_id'], @@ -184,51 +205,50 @@ class TestTaasPlugin(testlib_api.SqlTestCase): def test_delete_tap_service(self): with self.tap_service() as ts: self._plugin.delete_tap_service(self._context, ts['id']) + self._tap_service['id'] = ts['id'] self.driver.assert_has_calls([ mock.call.delete_tap_service_precommit(mock.ANY), - mock.call.delete_tap_service_postcommit(mock.ANY), ]) + self._tap_service['status'] = constants.PENDING_DELETE pre_args = self.driver.delete_tap_service_precommit.call_args[0][0] self.assertEqual(self._context, pre_args._plugin_context) self.assertEqual(self._tap_service, pre_args.tap_service) - post_args = self.driver.delete_tap_service_postcommit.call_args[0][0] - self.assertEqual(self._context, post_args._plugin_context) - self.assertEqual(self._tap_service, post_args.tap_service) + self.taas_cbs.set_tap_service_status(self._context, + {'id': self._tap_service['id']}, + constants.INACTIVE, + "dummyHost") def test_delete_tap_service_with_flow(self): with self.tap_service() as ts, \ - self.tap_flow(tap_service=ts['id']): + self.tap_flow(tap_service=ts['id']) as tf: self._plugin.delete_tap_service(self._context, ts['id']) + self._tap_service['id'] = ts['id'] + self._tap_flow['id'] = tf['id'] self.driver.assert_has_calls([ mock.call.delete_tap_flow_precommit(mock.ANY), - mock.call.delete_tap_flow_postcommit(mock.ANY), mock.call.delete_tap_service_precommit(mock.ANY), - mock.call.delete_tap_service_postcommit(mock.ANY), ]) + self._tap_service['status'] = constants.PENDING_DELETE + self._tap_flow['status'] = constants.PENDING_DELETE pre_args = self.driver.delete_tap_flow_precommit.call_args[0][0] self.assertEqual(self._context, pre_args._plugin_context) self.assertEqual(self._tap_flow, pre_args.tap_flow) - post_args = self.driver.delete_tap_flow_postcommit.call_args[0][0] - self.assertEqual(self._context, post_args._plugin_context) - self.assertEqual(self._tap_flow, post_args.tap_flow) pre_args = self.driver.delete_tap_service_precommit.call_args[0][0] self.assertEqual(self._context, pre_args._plugin_context) self.assertEqual(self._tap_service, pre_args.tap_service) - post_args = self.driver.delete_tap_service_postcommit.call_args[0][0] - self.assertEqual(self._context, post_args._plugin_context) - self.assertEqual(self._tap_service, post_args.tap_service) + self.taas_cbs.set_tap_flow_status(self._context, + {'id': self._tap_flow['id']}, + constants.INACTIVE, + "dummyHost") + self.taas_cbs.set_tap_service_status(self._context, + {'id': self._tap_service['id']}, + constants.INACTIVE, + "dummyHost") def test_delete_tap_service_non_existent(self): with testtools.ExpectedException(taas_ext.TapServiceNotFound): self._plugin.delete_tap_service(self._context, 'non-existent') - def test_delete_tap_service_failed_on_service_driver(self): - attr = {'delete_tap_service_postcommit.side_effect': DummyError} - self.driver.configure_mock(**attr) - with self.tap_service() as ts: - with testtools.ExpectedException(DummyError): - self._plugin.delete_tap_service(self._context, ts['id']) - def test_create_tap_flow(self): with self.tap_service() as ts, self.tap_flow(tap_service=ts['id']): pass @@ -256,22 +276,15 @@ class TestTaasPlugin(testlib_api.SqlTestCase): with self.tap_service() as ts, \ self.tap_flow(tap_service=ts['id']) as tf: self._plugin.delete_tap_flow(self._context, tf['id']) - self._tap_flow['id'] = tf['id'] + self._tap_flow['id'] = tf['id'] self.driver.assert_has_calls([ mock.call.delete_tap_flow_precommit(mock.ANY), - mock.call.delete_tap_flow_postcommit(mock.ANY), ]) + self._tap_flow['status'] = constants.PENDING_DELETE pre_args = self.driver.delete_tap_flow_precommit.call_args[0][0] self.assertEqual(self._context, pre_args._plugin_context) self.assertEqual(self._tap_flow, pre_args.tap_flow) - post_args = self.driver.delete_tap_flow_postcommit.call_args[0][0] - self.assertEqual(self._context, post_args._plugin_context) - self.assertEqual(self._tap_flow, post_args.tap_flow) - - def test_delete_tap_flow_failed_on_service_driver(self): - with self.tap_service() as ts, \ - self.tap_flow(tap_service=ts['id']) as tf: - attr = {'delete_tap_flow_postcommit.side_effect': DummyError} - self.driver.configure_mock(**attr) - with testtools.ExpectedException(DummyError): - self._plugin.delete_tap_flow(self._context, tf['id']) + self.taas_cbs.set_tap_flow_status(self._context, + {'id': self._tap_flow['id']}, + constants.INACTIVE, + "dummyHost")