Provide rpc for port status update

Tricircle Neutron plugin provides rpc for bottom service to update
port status in top layer.

Change-Id: I0ffff9cec472af9ac725815715f3373ee5949018
This commit is contained in:
zhiyuan_cai 2015-09-23 16:51:30 +08:00
parent 2b1f7aa559
commit 80d037fb4e
6 changed files with 251 additions and 0 deletions

48
tricircle/common/rpc.py Normal file
View File

@ -0,0 +1,48 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import neutron.common.rpc as neutron_rpc
import neutron.common.topics as neutron_topics
import neutron.context as neutron_context
from oslo_config import cfg
import oslo_messaging
class NetworkingRpcApi(object):
def __init__(self):
if not neutron_rpc.TRANSPORT:
neutron_rpc.init(cfg.CONF)
target = oslo_messaging.Target(topic=neutron_topics.PLUGIN,
version='1.0')
self.client = neutron_rpc.get_client(target)
# adapt tricircle context to neutron context
def _make_neutron_context(self, context):
return neutron_context.ContextBase(context.user, context.tenant,
auth_token=context.auth_token,
is_admin=context.is_admin,
request_id=context.request_id,
user_name=context.user_name,
tenant_name=context.tenant_name)
def update_port_up(self, context, port_id):
call_context = self.client.prepare()
return call_context.call(self._make_neutron_context(context),
'update_port_up', port_id=port_id)
def update_port_down(self, context, port_id):
call_context = self.client.prepare()
return call_context.call(self._make_neutron_context(context),
'update_port_down', port_id=port_id)

View File

@ -14,10 +14,14 @@
# under the License.
import oslo_log.helpers as log_helpers
from oslo_log import log
from neutron.extensions import portbindings
from neutron.common import exceptions as n_exc
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.db import agentschedulers_db
from neutron.db import db_base_plugin_v2
from neutron.db import external_net_db
@ -27,6 +31,7 @@ from neutron.db import portbindings_db
from neutron.db import securitygroups_db
from neutron.i18n import _LI
from tricircle.common import cascading_networking_api as c_net_api
from tricircle.networking_tricircle import rpc as c_net_rpc
LOG = log.getLogger(__name__)
@ -60,6 +65,18 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
self._cascading_rpc_api = c_net_api.CascadingNetworkingNotifyAPI()
self._setup_rpc()
def _setup_rpc(self):
self.endpoints = [c_net_rpc.RpcCallbacks()]
@log_helpers.log_method_call
def start_rpc_listeners(self):
self.topic = topics.PLUGIN
self.conn = n_rpc.create_connection(new=True)
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
return self.conn.consume_in_threads()
def create_network(self, context, network):
with context.session.begin(subtransactions=True):
result = super(TricirclePlugin, self).create_network(
@ -118,6 +135,19 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
return ret_val
def update_port_status(self, context, port_id, port_status):
with context.session.begin(subtransactions=True):
try:
port = super(TricirclePlugin, self).get_port(context, port_id)
port['status'] = port_status
neutron_db = super(TricirclePlugin, self).update_port(
context, port_id, {'port': port})
except n_exc.PortNotFound:
LOG.debug("Port %(port)s update to %(status)s not found",
{'port': port_id, 'status': port_status})
return None
return neutron_db
def extend_port_dict_binding(self, port_res, port_db):
super(TricirclePlugin, self).extend_port_dict_binding(
port_res, port_db)

View File

@ -0,0 +1,38 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import neutron.common.constants as neutron_const
from neutron import manager
from oslo_log import log
import oslo_messaging
LOG = log.getLogger(__name__)
class RpcCallbacks(object):
target = oslo_messaging.Target(version='1.0')
def update_port_up(self, context, **kwargs):
port_id = kwargs.get('port_id')
plugin = manager.NeutronManager.get_plugin()
plugin.update_port_status(context, port_id,
neutron_const.PORT_STATUS_ACTIVE)
def update_port_down(self, context, **kwargs):
port_id = kwargs.get('port_id')
plugin = manager.NeutronManager.get_plugin()
plugin.update_port_status(context, port_id,
neutron_const.PORT_STATUS_DOWN)

View File

@ -0,0 +1,76 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from mock import patch
import unittest
from neutron.common import constants as neutron_const
from neutron.common import exceptions as neutron_exceptions
from neutron.common import rpc as neutron_rpc
from neutron.db import db_base_plugin_v2
from tricircle import context
from tricircle.networking_tricircle.plugin import TricirclePlugin
FAKE_PORT_ID = 'fake_port_uuid'
FAKE_PORT = {
'id': FAKE_PORT_ID,
'status': neutron_const.PORT_STATUS_DOWN
}
def fake_get_port(instance, context, port_id):
if port_id == FAKE_PORT_ID:
return FAKE_PORT
else:
raise neutron_exceptions.PortNotFound(port_id=port_id)
def fake_update_port(instance, context, port_id, port):
FAKE_PORT['status'] = port['port']['status']
return FAKE_PORT
class TricirclePluginTest(unittest.TestCase):
def setUp(self):
FAKE_PORT['status'] = neutron_const.PORT_STATUS_DOWN
@patch.object(neutron_rpc, 'get_client', new=mock.Mock())
@patch.object(db_base_plugin_v2.NeutronDbPluginV2,
'update_port', new=fake_update_port)
@patch.object(db_base_plugin_v2.NeutronDbPluginV2,
'get_port', new=fake_get_port)
def test_update_port_status(self):
plugin = TricirclePlugin()
# this method requires a neutron context, but for test we just pass
# a tricircle context
port = plugin.update_port_status(context.Context(), FAKE_PORT_ID,
neutron_const.PORT_STATUS_ACTIVE)
self.assertEqual(FAKE_PORT['status'], neutron_const.PORT_STATUS_ACTIVE)
self.assertIsNotNone(port)
@patch.object(neutron_rpc, 'get_client', new=mock.Mock())
@patch.object(db_base_plugin_v2.NeutronDbPluginV2,
'update_port', new=fake_update_port)
@patch.object(db_base_plugin_v2.NeutronDbPluginV2,
'get_port', new=fake_get_port)
def test_update_port_status_port_not_found(self):
plugin = TricirclePlugin()
port = plugin.update_port_status(context.Context(), 'no_such_port',
neutron_const.PORT_STATUS_ACTIVE)
self.assertEqual(FAKE_PORT['status'], neutron_const.PORT_STATUS_DOWN)
self.assertIsNone(port)

View File

@ -0,0 +1,59 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from mock import patch
import unittest
from neutron.common import constants as neutron_const
from neutron.common import rpc as neutron_rpc
from neutron import manager
from tricircle.networking_tricircle import plugin
from tricircle.networking_tricircle import rpc
FAKE_PORT_ID = 'fake_port_uuid'
FAKE_CONTEXT = object()
class RpcCallbacksTest(unittest.TestCase):
def setUp(self):
self.callbacks = rpc.RpcCallbacks()
@patch.object(neutron_rpc, 'get_client', new=mock.Mock())
def test_update_port_up(self):
with patch.object(manager.NeutronManager,
'get_plugin') as get_plugin_method:
with patch.object(plugin.TricirclePlugin,
'update_port_status') as update_method:
get_plugin_method.return_value = plugin.TricirclePlugin()
self.callbacks.update_port_up(FAKE_CONTEXT,
port_id=FAKE_PORT_ID)
update_method.assert_called_once_with(
FAKE_CONTEXT, FAKE_PORT_ID,
neutron_const.PORT_STATUS_ACTIVE)
@patch.object(neutron_rpc, 'get_client', new=mock.Mock())
def test_update_port_down(self):
with patch.object(manager.NeutronManager,
'get_plugin') as get_plugin_method:
with patch.object(plugin.TricirclePlugin,
'update_port_status') as update_method:
get_plugin_method.return_value = plugin.TricirclePlugin()
self.callbacks.update_port_down(FAKE_CONTEXT,
port_id=FAKE_PORT_ID)
update_method.assert_called_once_with(
FAKE_CONTEXT, FAKE_PORT_ID, neutron_const.PORT_STATUS_DOWN)