From 34c602d832e02be1f6da04114d72c2094eb8060f Mon Sep 17 00:00:00 2001 From: Kobi Samoray Date: Mon, 1 Aug 2016 12:16:14 +0300 Subject: [PATCH] NSXv: eliminate task use from edge creation Edge creation in backend should be done using synchronous call to the backend. Also, using the task mechanism may cause deadlocks, as NSX may lose the create event and then the task might poll forever for an edge which will never create. Change-Id: I2b3213d03d3c3ff8b5497061ecdfad0ee76d47a7 --- .../nsx_v/vshield/edge_appliance_driver.py | 140 +++++------------- .../plugins/nsx_v/vshield/edge_utils.py | 106 +++---------- vmware_nsx/plugins/nsx_v/vshield/vcns.py | 4 +- .../tests/unit/nsx_v/vshield/fake_vcns.py | 49 ++---- .../unit/nsx_v/vshield/test_edge_utils.py | 6 +- .../unit/nsx_v/vshield/test_vcns_driver.py | 69 ++++----- 6 files changed, 94 insertions(+), 280 deletions(-) diff --git a/vmware_nsx/plugins/nsx_v/vshield/edge_appliance_driver.py b/vmware_nsx/plugins/nsx_v/vshield/edge_appliance_driver.py index a66a30285f..eceed2527e 100644 --- a/vmware_nsx/plugins/nsx_v/vshield/edge_appliance_driver.py +++ b/vmware_nsx/plugins/nsx_v/vshield/edge_appliance_driver.py @@ -16,14 +16,17 @@ import time +from neutron.plugins.common import constants as plugin_const from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils from oslo_utils import excutils -from vmware_nsx._i18n import _LE, _LI, _LW +from vmware_nsx._i18n import _, _LE, _LI, _LW +from vmware_nsx.common import exceptions as nsxv_exc from vmware_nsx.common import nsxv_constants from vmware_nsx.common import utils +from vmware_nsx.db import nsxv_db from vmware_nsx.plugins.nsx_v.vshield.common import constants from vmware_nsx.plugins.nsx_v.vshield.common import exceptions from vmware_nsx.plugins.nsx_v.vshield import edge_utils @@ -373,66 +376,6 @@ class EdgeApplianceDriver(object): LOG.debug("Deletion complete vnic %(vnic_index)s: on edge %(edge_id)s", {'vnic_index': index, 'edge_id': edge_id}) - def _deploy_edge(self, task): - userdata = task.userdata - LOG.debug("NSXv: start deploying edge") - request = userdata['request'] - try: - header = self.vcns.deploy_edge(request)[0] - objuri = header['location'] - job_id = objuri[objuri.rfind("/") + 1:] - response = self.vcns.get_edge_id(job_id)[1] - edge_id = response['edgeId'] - LOG.debug("VCNS: deploying edge %s", edge_id) - userdata['edge_id'] = edge_id - status = task_constants.TaskStatus.PENDING - except exceptions.VcnsApiException: - with excutils.save_and_reraise_exception(): - LOG.exception(_LE("NSXv: deploy edge failed.")) - - return status - - def _status_edge(self, task): - edge_id = task.userdata['edge_id'] - try: - response = self.vcns.get_edge_deploy_status(edge_id)[1] - task.userdata['retries'] = 0 - system_status = response.get('systemStatus', None) - if system_status is None: - status = task_constants.TaskStatus.PENDING - elif system_status == 'good': - status = task_constants.TaskStatus.COMPLETED - else: - status = task_constants.TaskStatus.ERROR - except exceptions.VcnsApiException as e: - LOG.exception(_LE("VCNS: Edge %s status query failed."), edge_id) - raise e - except Exception as e: - retries = task.userdata.get('retries', 0) + 1 - if retries < 3: - task.userdata['retries'] = retries - LOG.exception(_LE("VCNS: Unable to retrieve edge %(edge_id)s " - "status. Retry %(retries)d."), - {'edge_id': edge_id, - 'retries': retries}) - status = task_constants.TaskStatus.PENDING - else: - LOG.exception(_LE("VCNS: Unable to retrieve edge %s status. " - "Abort."), edge_id) - status = task_constants.TaskStatus.ERROR - LOG.debug("VCNS: Edge %s status", edge_id) - return status - - def _result_edge(self, task): - edge_id = task.userdata.get('edge_id') - if task.status != task_constants.TaskStatus.COMPLETED: - LOG.error(_LE("NSXv: Failed to deploy edge %(edge_id)s " - "status %(status)d"), - {'edge_id': edge_id, - 'status': task.status}) - else: - LOG.debug("NSXv: Edge %s is deployed", edge_id) - def _update_edge(self, task): edge_id = task.userdata['edge_id'] LOG.debug("start update edge %s", edge_id) @@ -495,11 +438,11 @@ class EdgeApplianceDriver(object): LOG.exception(_LE("VCNS: Failed to get edges:\n%s"), e.response) raise e - def deploy_edge(self, resource_id, name, internal_network, jobdata=None, - dist=False, wait_for_exec=False, loadbalancer_enable=True, - appliance_size=nsxv_constants.LARGE, async=True, + def deploy_edge(self, context, router_id, name, internal_network, + dist=False, loadbalancer_enable=True, + appliance_size=nsxv_constants.LARGE, availability_zone=None): - task_name = 'deploying-%s' % name + edge_name = name edge = self._assemble_edge( edge_name, datacenter_moid=self.datacenter_moid, @@ -538,50 +481,35 @@ class EdgeApplianceDriver(object): if not dist and loadbalancer_enable: self._enable_loadbalancer(edge) - if async: - userdata = { - 'dist': dist, - 'request': edge, - 'router_name': name, - 'jobdata': jobdata - } - task = tasks.Task(task_name, resource_id, - self._deploy_edge, - status_callback=self._status_edge, - result_callback=self._result_edge, - userdata=userdata) - task.add_executed_monitor(self.callbacks.edge_deploy_started) - task.add_result_monitor(self.callbacks.edge_deploy_result) - self.task_manager.add(task) + edge_id = None + try: + header = self.vcns.deploy_edge(edge)[0] + edge_id = header.get('location', '/').split('/')[-1] - if wait_for_exec: - # wait until the deploy task is executed so edge_id is - # available - task.wait(task_constants.TaskState.EXECUTED) + if edge_id: + nsxv_db.update_nsxv_router_binding( + context.session, router_id, edge_id=edge_id) + if not dist: + # Init Edge vnic binding + nsxv_db.init_edge_vnic_binding( + context.session, edge_id) + else: + if router_id: + nsxv_db.update_nsxv_router_binding( + context.session, router_id, + status=plugin_const.ERROR) + error = _('Failed to deploy edge') + raise nsxv_exc.NsxPluginException(err_msg=error) - return task - else: - edge_id = None - try: - header = self.vcns.deploy_edge(edge, - async=False)[0] - edge_id = header['location'].split('/')[-1] - LOG.debug("VCNS: deploying edge %s", edge_id) + self.callbacks.complete_edge_creation( + context, edge_id, name, router_id, dist, True) - self.callbacks.edge_deploy_started_sync( - jobdata['context'], edge_id, name, - jobdata['router_id'], dist) - - self.callbacks.edge_deploy_result_sync( - jobdata['context'], edge_id, name, jobdata['router_id'], - dist, True) - - except exceptions.VcnsApiException: - self.callbacks.edge_deploy_result_sync( - jobdata['context'], edge_id, name, jobdata['router_id'], - dist, False) - with excutils.save_and_reraise_exception(): - LOG.exception(_LE("NSXv: deploy edge failed.")) + except exceptions.VcnsApiException: + self.callbacks.complete_edge_creation( + context, edge_id, name, router_id, dist, False) + with excutils.save_and_reraise_exception(): + LOG.exception(_LE("NSXv: deploy edge failed.")) + return edge_id def update_edge(self, router_id, edge_id, name, internal_network, jobdata=None, dist=False, loadbalancer_enable=True, diff --git a/vmware_nsx/plugins/nsx_v/vshield/edge_utils.py b/vmware_nsx/plugins/nsx_v/vshield/edge_utils.py index ba427f4c93..8176001600 100644 --- a/vmware_nsx/plugins/nsx_v/vshield/edge_utils.py +++ b/vmware_nsx/plugins/nsx_v/vshield/edge_utils.py @@ -48,7 +48,6 @@ from vmware_nsx.plugins.nsx_v.vshield.common import ( from vmware_nsx.plugins.nsx_v.vshield.common import exceptions as nsxapi_exc from vmware_nsx.plugins.nsx_v.vshield.tasks import ( constants as task_const) -from vmware_nsx.plugins.nsx_v.vshield.tasks import tasks from vmware_nsx.plugins.nsx_v.vshield import vcns WORKER_POOL_SIZE = 8 @@ -158,25 +157,15 @@ class EdgeManager(object): def _deploy_edge(self, context, lrouter, lswitch=None, appliance_size=nsxv_constants.COMPACT, - edge_type=nsxv_constants.SERVICE_EDGE, async=True, + edge_type=nsxv_constants.SERVICE_EDGE, availability_zone=None): """Create an edge for logical router support.""" - router_id = lrouter['id'] # deploy edge - jobdata = { - 'router_id': router_id, - 'lrouter': lrouter, - 'lswitch': lswitch, - 'context': context - } - - task = self.nsxv_manager.deploy_edge( - lrouter['id'], lrouter['name'], internal_network=None, - jobdata=jobdata, wait_for_exec=True, + self.nsxv_manager.deploy_edge(context, lrouter['id'], + lrouter['name'], internal_network=None, appliance_size=appliance_size, - dist=(edge_type == nsxv_constants.VDR_EDGE), async=async, + dist=(edge_type == nsxv_constants.VDR_EDGE), availability_zone=availability_zone) - return task def _deploy_backup_edges_on_db(self, context, num, appliance_size=nsxv_constants.COMPACT, @@ -211,7 +200,7 @@ class EdgeManager(object): 'name': router_id} pool.spawn_n(self._deploy_edge, context, fake_router, appliance_size=appliance_size, - edge_type=edge_type, async=False, + edge_type=edge_type, availability_zone=availability_zone) def _delete_edge(self, context, router_binding): @@ -574,7 +563,7 @@ class EdgeManager(object): availability_zone=availability_zone.name) self._deploy_edge(context, lrouter, appliance_size=appliance_size, - edge_type=edge_type, async=False, + edge_type=edge_type, availability_zone=availability_zone) return @@ -601,7 +590,7 @@ class EdgeManager(object): availability_zone=availability_zone.name) self._deploy_edge(context, lrouter, appliance_size=appliance_size, - edge_type=edge_type, async=False, + edge_type=edge_type, availability_zone=availability_zone) else: LOG.debug("Select edge: %(edge_id)s from pool for %(name)s", @@ -627,16 +616,10 @@ class EdgeManager(object): jobdata = { 'context': context, 'router_id': lrouter['id']} - fake_userdata = {'jobdata': jobdata, - 'router_name': lrouter['name'], - 'edge_id': edge_id, - 'dist': dist} - fake_task = tasks.Task(name='fake-deploy-edge-task', - resource_id='fake-resource_id', - execute_callback=None, - userdata=fake_userdata) - fake_task.status = task_const.TaskStatus.COMPLETED - self.nsxv_manager.callbacks.edge_deploy_result(fake_task) + self.nsxv_manager.callbacks.complete_edge_creation( + context, edge_id, lrouter['name'], lrouter['id'], dist, + True) + # change edge's name at backend task = self.nsxv_manager.update_edge( resource_id, available_router_binding['edge_id'], @@ -1782,21 +1765,9 @@ def create_lrouter(nsxv_manager, context, lrouter, lswitch=None, dist=False, availability_zone=availability_zone.name) # deploy edge - jobdata = { - 'router_id': router_id, - 'lrouter': lrouter, - 'lswitch': lswitch, - 'context': context - } - - # deploy and wait until the deploy request has been requested - # so we will have edge_id ready. The wait here should be fine - # as we're not in a database transaction now - task = nsxv_manager.deploy_edge( - router_id, router_name, internal_network=None, - dist=dist, jobdata=jobdata, appliance_size=appliance_size, - availability_zone=availability_zone) - task.wait(task_const.TaskState.RESULT) + nsxv_manager.deploy_edge( + context, router_id, router_name, internal_network=None, dist=dist, + appliance_size=appliance_size, availability_zone=availability_zone) def delete_lrouter(nsxv_manager, context, router_id, dist=False): @@ -2268,55 +2239,12 @@ class NsxVCallbacks(object): def __init__(self, plugin): self.plugin = plugin - def edge_deploy_started(self, task): - """callback when deployment task started.""" - jobdata = task.userdata['jobdata'] - context = jobdata['context'] - router_id = jobdata.get('router_id') - edge_id = task.userdata.get('edge_id') - name = task.userdata.get('router_name') - dist = task.userdata.get('dist') - self.edge_deploy_started_sync(context, edge_id, name, router_id, dist) - - def edge_deploy_started_sync(self, context, edge_id, name, router_id, - dist): - if edge_id: - LOG.debug("Start deploying %(edge_id)s for router %(name)s", - {'edge_id': edge_id, - 'name': name}) - nsxv_db.update_nsxv_router_binding( - context.session, router_id, edge_id=edge_id) - if not dist: - # Init Edge vnic binding - nsxv_db.init_edge_vnic_binding( - context.session, edge_id) - else: - LOG.debug("Failed to deploy Edge") - if router_id: - nsxv_db.update_nsxv_router_binding( - context.session, router_id, - status=plugin_const.ERROR) - - def edge_deploy_result(self, task): - """callback when deployment task finished.""" - jobdata = task.userdata['jobdata'] - context = jobdata['context'] - name = task.userdata.get('router_name') - dist = task.userdata.get('dist') - router_id = jobdata['router_id'] - edge_id = task.userdata.get('edge_id') - - self.edge_deploy_result_sync( - context, edge_id, name, router_id, dist, - task.status == task_const.TaskStatus.COMPLETED) - - def edge_deploy_result_sync(self, context, edge_id, name, router_id, dist, - deploy_successful): + def complete_edge_creation( + self, context, edge_id, name, router_id, dist, deploy_successful): router_db = None if uuidutils.is_uuid_like(router_id): try: - router_db = self.plugin._get_router( - context, router_id) + router_db = self.plugin._get_router(context, router_id) except l3.RouterNotFound: # Router might have been deleted before deploy finished LOG.warning(_LW("Router %s not found"), name) diff --git a/vmware_nsx/plugins/nsx_v/vshield/vcns.py b/vmware_nsx/plugins/nsx_v/vshield/vcns.py index a6c46b92d4..9c22062c62 100644 --- a/vmware_nsx/plugins/nsx_v/vshield/vcns.py +++ b/vmware_nsx/plugins/nsx_v/vshield/vcns.py @@ -142,10 +142,8 @@ class Vcns(object): @retry_upon_exception(exceptions.ResourceNotFound) @retry_upon_exception(exceptions.RequestBad) - def deploy_edge(self, request, async=True): + def deploy_edge(self, request): uri = URI_PREFIX - if async: - uri += "?async=true" return self.do_request(HTTP_POST, uri, request, decode=False) def update_edge(self, edge_id, request, async=False): diff --git a/vmware_nsx/tests/unit/nsx_v/vshield/fake_vcns.py b/vmware_nsx/tests/unit/nsx_v/vshield/fake_vcns.py index cfc433504a..0812c98348 100644 --- a/vmware_nsx/tests/unit/nsx_v/vshield/fake_vcns.py +++ b/vmware_nsx/tests/unit/nsx_v/vshield/fake_vcns.py @@ -87,7 +87,7 @@ class FakeVcns(object): response = {"edgeJob": []} return (header, response) - def deploy_edge(self, request, async=True): + def deploy_edge(self, request): if (self._unique_router_name and not self._validate_edge_name(request['name'])): header = { @@ -104,39 +104,20 @@ class FakeVcns(object): } return (header, jsonutils.dumps(response)) - if async: - self._job_idx = self._job_idx + 1 - job_id = "jobdata-%d" % self._job_idx - self._edge_idx = self._edge_idx + 1 - edge_id = "edge-%d" % self._edge_idx - self._jobs[job_id] = edge_id - self._edges[edge_id] = { - 'name': request['name'], - 'request': request, - 'nat_rules': None, - 'nat_rule_id': 0, - 'interface_index': 1 - } - header = { - 'status': 200, - 'location': 'https://host/api/4.0/jobs/%s' % job_id - } - response = '' - else: - self._edge_idx = self._edge_idx + 1 - edge_id = "edge-%d" % self._edge_idx - self._edges[edge_id] = { - 'name': request['name'], - 'request': request, - 'nat_rules': None, - 'nat_rule_id': 0, - 'interface_index': 1 - } - header = { - 'status': 200, - 'location': 'https://host/api/4.0/edges/%s' % edge_id - } - response = '' + self._edge_idx = self._edge_idx + 1 + edge_id = "edge-%d" % self._edge_idx + self._edges[edge_id] = { + 'name': request['name'], + 'request': request, + 'nat_rules': None, + 'nat_rule_id': 0, + 'interface_index': 1 + } + header = { + 'status': 200, + 'location': 'https://host/api/4.0/edges/%s' % edge_id + } + response = '' return (header, response) def update_edge(self, edge_id, request): diff --git a/vmware_nsx/tests/unit/nsx_v/vshield/test_edge_utils.py b/vmware_nsx/tests/unit/nsx_v/vshield/test_edge_utils.py index 2a255d8c4a..7a1ee21d20 100644 --- a/vmware_nsx/tests/unit/nsx_v/vshield/test_edge_utils.py +++ b/vmware_nsx/tests/unit/nsx_v/vshield/test_edge_utils.py @@ -203,13 +203,9 @@ class EdgeUtilsTestCase(EdgeUtilsTestCaseMixin): edge_utils.create_lrouter(self.nsxv_manager, self.ctx, lrouter, lswitch=None, dist=False, availability_zone=self.az) - self.nsxv_manager.deploy_edge.assert_called_once_with( + self.nsxv_manager.deploy_edge.assert_called_once_with(self.ctx, lrouter['id'], (lrouter['name'] + '-' + lrouter['id']), internal_network=None, dist=False, availability_zone=self.az, - jobdata={'router_id': lrouter['id'], - 'lrouter': lrouter, - 'lswitch': None, - 'context': self.ctx}, appliance_size=vcns_const.SERVICE_SIZE_MAPPING['router']) def _test_update_intereface_primary_addr(self, old_ip, new_ip, isUplink): diff --git a/vmware_nsx/tests/unit/nsx_v/vshield/test_vcns_driver.py b/vmware_nsx/tests/unit/nsx_v/vshield/test_vcns_driver.py index 445ea2f92c..9fb7cf6c0f 100644 --- a/vmware_nsx/tests/unit/nsx_v/vshield/test_vcns_driver.py +++ b/vmware_nsx/tests/unit/nsx_v/vshield/test_vcns_driver.py @@ -16,13 +16,16 @@ from eventlet import greenthread import mock +from neutron import context as neutron_context from neutron.tests import base from oslo_config import cfg import six +from vmware_nsx.common import exceptions as nsxv_exc from vmware_nsx.plugins.nsx_v import availability_zones as nsx_az from vmware_nsx.plugins.nsx_v.vshield.common import ( constants as vcns_const) +from vmware_nsx.plugins.nsx_v.vshield import edge_appliance_driver as e_drv from vmware_nsx.plugins.nsx_v.vshield.tasks import ( constants as ts_const) from vmware_nsx.plugins.nsx_v.vshield.tasks import tasks as ts @@ -317,6 +320,9 @@ class VcnsDriverTestCase(base.BaseTestCase): def setUp(self): super(VcnsDriverTestCase, self).setUp() + self.ctx = neutron_context.get_admin_context() + self.temp_e_drv_nsxv_db = e_drv.nsxv_db + e_drv.nsxv_db = mock.MagicMock() self.config_parse(args=['--config-file', VCNS_CONFIG_FILE]) self.fc = fake_vcns.FakeVcns() @@ -333,34 +339,22 @@ class VcnsDriverTestCase(base.BaseTestCase): self.result = None def tearDown(self): + e_drv.nsxv_db = self.temp_e_drv_nsxv_db self.vcns_driver.task_manager.stop() # Task manager should not leave running threads around # if _thread is None it means it was killed in stop() self.assertIsNone(self.vcns_driver.task_manager._thread) super(VcnsDriverTestCase, self).tearDown() + def complete_edge_creation( + self, context, edge_id, name, router_id, dist, deploy_successful): + pass + def _deploy_edge(self): - task = self.vcns_driver.deploy_edge( - 'router-id', 'myedge', 'internal-network', {}, wait_for_exec=True, + self.edge_id = self.vcns_driver.deploy_edge( + self.ctx, 'router-id', 'myedge', 'internal-network', availability_zone=self.az) self.assertEqual(self.edge_id, 'edge-1') - task.wait(ts_const.TaskState.RESULT) - return task - - def edge_deploy_started(self, task): - self.edge_id = task.userdata['edge_id'] - - def edge_deploy_started_sync(self, context, edge_id, name, router_id, - dist): - pass - - def edge_deploy_result(self, task): - if task.status == ts_const.TaskStatus.COMPLETED: - task.userdata['jobdata']['edge_deploy_result'] = True - - def edge_deploy_result_sync(self, context, edge_id, name, router_id, - dist, deploy_successful): - pass def edge_delete_result(self, task): if task.status == ts_const.TaskStatus.COMPLETED: @@ -378,35 +372,24 @@ class VcnsDriverTestCase(base.BaseTestCase): if task.status == ts_const.TaskStatus.COMPLETED: task.userdata['jobdata']['interface_update_result'] = True - def test_deploy_edge_with_async(self): - jobdata = {} - task = self.vcns_driver.deploy_edge( - 'router-id', 'myedge', 'internal-network', jobdata=jobdata, - wait_for_exec=True, availability_zone=self.az) - self.assertEqual(self.edge_id, 'edge-1') - task.wait(ts_const.TaskState.RESULT) - self.assertEqual(task.status, ts_const.TaskStatus.COMPLETED) - self.assertTrue(jobdata.get('edge_deploy_result')) - - def test_deploy_edge_with_sync(self): - jobdata = {"context": "fake_context", - "router_id": "fake_router_id"} + def test_deploy_edge_with(self): self.vcns_driver.deploy_edge( - 'router-id', 'myedge', 'internal-network', jobdata=jobdata, - wait_for_exec=True, async=False, availability_zone=self.az) + self.ctx, 'router-id', 'myedge', 'internal-network', + availability_zone=self.az) status = self.vcns_driver.get_edge_status('edge-1') self.assertEqual(status, vcns_const.RouterStatus.ROUTER_STATUS_ACTIVE) def test_deploy_edge_fail(self): - task1 = self.vcns_driver.deploy_edge( - 'router-1', 'myedge', 'internal-network', {}, wait_for_exec=True, + self.vcns_driver.deploy_edge( + self.ctx, 'router-1', 'myedge', 'internal-network', availability_zone=self.az) - task2 = self.vcns_driver.deploy_edge( - 'router-2', 'myedge', 'internal-network', {}, wait_for_exec=True, + # self.vcns_driver.deploy_edge( + # self.ctx, 'router-2', 'myedge', 'internal-network', + # availability_zone=self.az) + self.assertRaises( + nsxv_exc.NsxPluginException, self.vcns_driver.deploy_edge, + self.ctx, 'router-2', 'myedge', 'internal-network', availability_zone=self.az) - task1.wait(ts_const.TaskState.RESULT) - task2.wait(ts_const.TaskState.RESULT) - self.assertEqual(task2.status, ts_const.TaskStatus.ERROR) def test_get_edge_status(self): self._deploy_edge() @@ -540,11 +523,11 @@ class VcnsDriverHATestCase(VcnsDriverTestCase): self.vcns_driver.vcns.orig_deploy = self.vcns_driver.vcns.deploy_edge self.vcns_driver.vcns.deploy_edge = self._fake_deploy_edge - def _fake_deploy_edge(self, request, async=True): + def _fake_deploy_edge(self, request): # validate the appliance structure in the request, # and return the regular (fake) response found_app = request['appliances']['appliances'] self.assertEqual(len(found_app), 2) self.assertEqual(found_app[0]['datastoreId'], self._data_store) self.assertEqual(found_app[1]['datastoreId'], self._ha_data_store) - return self.vcns_driver.vcns.orig_deploy(request, async) + return self.vcns_driver.vcns.orig_deploy(request)