neutron/neutron/tests/unit/extensions/test_data_plane_status.py
Brian Haley b79842f289 Start enforcing E125 flake8 directive
Removed E125 (continuation line does not distinguish itself
from next logical line) from the ignore list and fixed all
the indentation issues.  Didn't think it was going to be
close to 100 files when I started.

Change-Id: I0a6f5efec4b7d8d3632dd9dbb43e0ab58af9dff3
2019-07-19 23:39:41 -04:00

132 lines
5.7 KiB
Python

# Copyright (c) 2017 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from webob import exc as web_exc
from neutron_lib.api.definitions import data_plane_status as dps_lib
from neutron_lib.api.definitions import port as port_def
from neutron_lib import constants
from neutron_lib.db import resource_extend
from neutron_lib.tests.unit import fake_notifier
from neutron.db import data_plane_status_db as dps_db
from neutron.db import db_base_plugin_v2
from neutron.extensions import data_plane_status as dps_ext
from neutron.tests.unit.db import test_db_base_plugin_v2
class DataPlaneStatusTestExtensionManager(object):
def get_resources(self):
return []
def get_actions(self):
return []
def get_request_extensions(self):
return []
def get_extended_resources(self, version):
return dps_ext.Data_plane_status.get_extended_resources(version)
@resource_extend.has_resource_extenders
class DataPlaneStatusExtensionTestPlugin(db_base_plugin_v2.NeutronDbPluginV2,
dps_db.DataPlaneStatusMixin):
supported_extension_aliases = [dps_lib.ALIAS]
@staticmethod
@resource_extend.extends([port_def.COLLECTION_NAME])
def _extend_port_data_plane_status(port_res, port_db):
return dps_db.DataPlaneStatusMixin._extend_port_data_plane_status(
port_res, port_db)
def update_port(self, context, id, port):
with context.session.begin(subtransactions=True):
ret_port = super(DataPlaneStatusExtensionTestPlugin,
self).update_port(context, id, port)
if dps_lib.DATA_PLANE_STATUS in port['port']:
self._process_update_port_data_plane_status(context,
port['port'],
ret_port)
return ret_port
class DataPlaneStatusExtensionTestCase(
test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
def setUp(self):
plugin = ('neutron.tests.unit.extensions.test_data_plane_status.'
'DataPlaneStatusExtensionTestPlugin')
ext_mgr = DataPlaneStatusTestExtensionManager()
super(DataPlaneStatusExtensionTestCase, self).setUp(
plugin=plugin, ext_mgr=ext_mgr)
def test_update_port_data_plane_status(self):
with self.port() as port:
data = {'port': {'data_plane_status': constants.ACTIVE}}
req = self.new_update_request(port_def.COLLECTION_NAME,
data,
port['port']['id'])
res = req.get_response(self.api)
p = self.deserialize(self.fmt, res)['port']
self.assertEqual(200, res.status_code)
self.assertEqual(p[dps_lib.DATA_PLANE_STATUS], constants.ACTIVE)
def test_port_create_data_plane_status_default_none(self):
with self.port(name='port1') as port:
req = self.new_show_request(
port_def.COLLECTION_NAME, port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertIsNone(res['port'][dps_lib.DATA_PLANE_STATUS])
def test_port_create_invalid_attr_data_plane_status(self):
kwargs = {dps_lib.DATA_PLANE_STATUS: constants.ACTIVE}
with self.network() as network:
with self.subnet(network=network):
res = self._create_port(self.fmt, network['network']['id'],
arg_list=(dps_lib.DATA_PLANE_STATUS,),
**kwargs)
self.assertEqual(400, res.status_code)
def test_port_update_preserves_data_plane_status(self):
with self.port(name='port1') as port:
res = self._update(port_def.COLLECTION_NAME, port['port']['id'],
{'port': {dps_lib.DATA_PLANE_STATUS:
constants.ACTIVE}})
res = self._update(port_def.COLLECTION_NAME, port['port']['id'],
{'port': {'name': 'port2'}})
self.assertEqual(res['port']['name'], 'port2')
self.assertEqual(res['port'][dps_lib.DATA_PLANE_STATUS],
constants.ACTIVE)
def test_port_update_with_invalid_data_plane_status(self):
with self.port(name='port1') as port:
self._update(port_def.COLLECTION_NAME, port['port']['id'],
{'port': {dps_lib.DATA_PLANE_STATUS: "abc"}},
web_exc.HTTPBadRequest.code)
def test_port_update_event_on_data_plane_status(self):
expect_notify = set(['port.update.start',
'port.update.end'])
with self.port(name='port1') as port:
self._update(port_def.COLLECTION_NAME, port['port']['id'],
{'port': {dps_lib.DATA_PLANE_STATUS:
constants.ACTIVE}})
notify = set(n['event_type'] for n in fake_notifier.NOTIFICATIONS)
duplicated_notify = expect_notify & notify
self.assertEqual(expect_notify, duplicated_notify)
fake_notifier.reset()