mgmt driver for rpc and rpc proxy

Change-Id: I6c53514335301ad848fcd98cb1122a85ec688d14
This commit is contained in:
Isaku Yamahata 2014-06-26 18:12:35 +09:00
parent 4de5186d51
commit 7b3b013782
10 changed files with 1027 additions and 0 deletions

View File

@ -0,0 +1,59 @@
# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""rpc_proxy
Revision ID: 81ffa86020d
Revises: 1c6b0d82afcd
Create Date: 2014-03-19 15:50:11.712686
"""
# revision identifiers, used by Alembic.
revision = '81ffa86020d'
down_revision = '1c6b0d82afcd'
# Change to ['*'] if this migration applies to all plugins
from alembic import op
import sqlalchemy as sa
def upgrade(active_plugins=None, options=None):
op.create_table(
'proxymgmtports',
sa.Column('device_id', sa.String(255)),
sa.Column('port_id', sa.String(36), nullable=False),
sa.Column('dst_transport_url', sa.String(255)),
sa.Column('svr_proxy_id', sa.String(36)),
sa.Column('svr_ns_proxy_id', sa.String(36)),
sa.Column('clt_proxy_id', sa.String(36)),
sa.Column('clt_ns_proxy_id', sa.String(36)),
sa.PrimaryKeyConstraint('device_id'),
)
op.create_table(
'proxyserviceports',
sa.Column('service_instance_id', sa.String(255)),
sa.Column('svr_proxy_id', sa.String(36)),
sa.Column('svr_ns_proxy_id', sa.String(36)),
sa.Column('clt_proxy_id', sa.String(36)),
sa.Column('clt_ns_proxy_id', sa.String(36)),
sa.PrimaryKeyConstraint('service_instance_id'),
)
def downgrade(active_plugins=None, options=None):
op.drop_table('proxymgmtport')
op.drop_table('proxyserviceport')

101
tacker/db/vm/proxy_db.py Normal file
View File

@ -0,0 +1,101 @@
# Copyright 2014 Intel Corporation.
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
import sqlalchemy as sa
from tacker.db import db_base
from tacker.db import model_base
class ProxyMgmtPort(model_base.BASE):
device_id = sa.Column(sa.String(255), primary_key=True)
port_id = sa.Column(sa.String(36), nullable=False)
dst_transport_url = sa.Column(sa.String(255))
svr_proxy_id = sa.Column(sa.String(36))
svr_ns_proxy_id = sa.Column(sa.String(36))
clt_proxy_id = sa.Column(sa.String(36))
clt_ns_proxy_id = sa.Column(sa.String(36))
class ProxyServicePort(model_base.BASE):
service_instance_id = sa.Column(sa.String(255), primary_key=True)
svr_proxy_id = sa.Column(sa.String(36))
svr_ns_proxy_id = sa.Column(sa.String(36))
clt_proxy_id = sa.Column(sa.String(36))
clt_ns_proxy_id = sa.Column(sa.String(36))
class RpcProxyDb(db_base.CommonDbMixin):
def _make_proxy_mgmt_port(self, proxy_mgmt_port):
key_list = ('device_id', 'port_id', 'dst_transport_url',
'svr_proxy_id', 'svr_ns_proxy_id',
'clt_proxy_id', 'clt_ns_proxy_id')
return dict((key, getattr(proxy_mgmt_port, key)) for key in key_list)
def _make_proxy_service_port(self, proxy_service_port):
key_list = ('service_instance_id', 'svr_proxy_id', 'svr_ns_proxy_id',
'clt_proxy_id', 'clt_ns_proxy_id')
return dict((key, getattr(proxy_service_port, key))
for key in key_list)
def create_proxy_mgmt_port(self, context, device_id, port_id,
dst_transport_url,
svr_proxy_id, svr_ns_proxy_id,
clt_proxy_id, clt_ns_proxy_id):
with context.session.begin(subtransactions=True):
proxy_mgmt_port = ProxyMgmtPort(
device_id=device_id, port_id=port_id,
dst_transport_url=dst_transport_url,
svr_proxy_id=svr_proxy_id, svr_ns_proxy_id=svr_ns_proxy_id,
clt_proxy_id=clt_proxy_id, clt_ns_proxy_id=clt_ns_proxy_id)
context.session.add(proxy_mgmt_port)
def delete_proxy_mgmt_port(self, context, port_id):
with context.session.begin(subtransactions=True):
context.session.query(ProxyMgmtPort).filter_by(
port_id=port_id).delete()
def get_proxy_mgmt_port(self, context, device_id):
with context.session.begin(subtransactions=True):
proxy_mgmt_port = context.session.query(ProxyMgmtPort).filter_by(
device_id=device_id).one()
return self._make_proxy_mgmt_port(proxy_mgmt_port)
def create_proxy_service_port(self, context, service_instance_id,
svr_proxy_id, svr_ns_proxy_id,
clt_proxy_id, clt_ns_proxy_id):
with context.session.begin(subtransactions=True):
proxy_service_port = ProxyServicePort(
service_instance_id=service_instance_id,
svr_proxy_id=svr_proxy_id, svr_ns_proxy_id=svr_ns_proxy_id,
clt_proxy_id=clt_proxy_id, clt_ns_proxy_id=clt_ns_proxy_id)
context.session.add(proxy_service_port)
def delete_proxy_service_port(self, context, service_instance_id):
with context.session.begin(subtransactions=True):
context.session.query(ProxyServicePort).filter_by(
service_instance_id=service_instance_id).delete()
def get_proxy_service_port(self, context, service_instance_id):
with context.session.begin(subtransactions=True):
proxy_service_port = context.session.query(
ProxyServicePort).filter_by(
service_instance_id=service_instance_id).one()
return self._make_proxy_service_port(proxy_service_port)

View File

@ -0,0 +1,201 @@
# Copyright 2014 Intel Corporation.
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
import uuid
import mock
from oslo.config import cfg
from tacker.common import topics
from tacker import context
from tacker.db.vm import proxy_db # noqa
from tacker.tests.unit.services.vm.mgmt_drivers import test_rpc
from tacker.vm.mgmt_drivers.rpc import proxy
from tacker.vm import proxy_api
_uuid = lambda: str(uuid.uuid4())
class TestMgmtProxyDriver(test_rpc.TestMgmtRpcDriver):
_TENANT_ID = _uuid()
_CONTEXT = context.Context('', _TENANT_ID)
_NETWORK_ID = _uuid(),
_SUBNET_ID = _uuid()
_DEVICE_PORT_ID = _uuid()
_DEVICE = {
'id': _uuid(),
'mgmt_address': 'device-address',
'service_context': [{
'network_id': _NETWORK_ID,
'port_id': _DEVICE_PORT_ID,
'role': 'mgmt',
}],
}
_PORT_ID = _uuid()
_IP_ADDRESS = '1.1.1.1'
_PORT = {
'id': _PORT_ID,
'fixed_ips': [{
'subnet_id': _SUBNET_ID,
'ip_address': _IP_ADDRESS,
}],
}
_PROXY_ID = [_uuid(), _uuid(), _uuid(), _uuid()]
_NS_PROXY_ID = [_uuid(), _uuid(), _uuid(), _uuid()]
def setUp(self):
super(TestMgmtProxyDriver, self).setUp()
self.mock_proxy_api_p = mock.patch(
'tacker.vm.proxy_api.ServiceVMPluginApi')
self.mock_proxy_api = self.mock_proxy_api_p.start()
mock_proxy_api_instance = mock.Mock()
self.mock_proxy_api.return_value = mock_proxy_api_instance
self.mock_proxy_api_instance = mock_proxy_api_instance
mock_proxy_api_instance.create_namespace_agent.return_value = (
self._PORT_ID)
mock_proxy_api_instance.create_rpc_proxy = mock.Mock(
side_effect=self._PROXY_ID)
mock_proxy_api_instance.create_rpc_namespace_proxy = mock.Mock(
side_effect=self._NS_PROXY_ID)
self.plugin.proxy_api = proxy_api.ServiceVMPluginApi('fake-topic')
self.plugin._core_plugin.get_port.return_value = self._PORT
self.mgmt_driver = proxy.AgentRpcProxyMGMTDriver()
def test_mgmt_create_post_delete_post(self):
mgmt_driver = self.mgmt_driver
mock_instance = self.mock_proxy_api_instance
mgmt_driver.mgmt_create_post(self.plugin, self._CONTEXT, self._DEVICE)
mock_instance.create_namespace_agent.assert_called_once_with(
self.plugin._core_plugin, self._CONTEXT, self._NETWORK_ID)
target = 'topic=%s,server=%s' % (topics.SERVICEVM_AGENT,
self._DEVICE['id'])
mock_instance.create_rpc_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, target, target, 'receive'),
mock.call(self._CONTEXT, self._PORT_ID, target, target, 'send')])
dst_transport_url = cfg.CONF.dst_transport_url % {'host':
self._IP_ADDRESS}
mock_instance.create_rpc_namespace_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, target, dst_transport_url,
target, 'receive'),
mock.call(self._CONTEXT, self._PORT_ID, target, dst_transport_url,
target, 'send')])
mgmt_address = mgmt_driver.mgmt_address(self.plugin, self._CONTEXT,
self._DEVICE)
self.assertEqual(mgmt_address, '%s.%s' % (topics.SERVICEVM_AGENT,
self._DEVICE['id']))
mgmt_driver.mgmt_call(self.plugin, self._CONTEXT, self._DEVICE,
self._KWARGS)
msg = {'args': self._KWARGS['kwargs'],
'namespace': None,
'method': 'action-name'}
self.mock_rpc_proxy_cast.assert_called_once_with(
self._CONTEXT, msg, topic='device-address')
mgmt_driver.mgmt_delete_post(self.plugin, self._CONTEXT, self._DEVICE)
mock_instance.destroy_rpc_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, self._PROXY_ID[1]),
mock.call(self._CONTEXT, self._PORT_ID, self._PROXY_ID[0])])
mock_instance.destroy_rpc_namespace_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, self._NS_PROXY_ID[1]),
mock.call(self._CONTEXT, self._PORT_ID, self._NS_PROXY_ID[0])])
mock_instance.destroy_namespace_agent.assert_called_once_with(
self.plugin._core_plugin, self._CONTEXT, self._PORT_ID)
def test_mgmt_service_create_pre_delete_post(self):
mgmt_driver = self.mgmt_driver
mock_instance = self.mock_proxy_api_instance
mgmt_driver.mgmt_create_post(self.plugin, self._CONTEXT, self._DEVICE)
mock_instance.create_namespace_agent.assert_called_once_with(
self.plugin._core_plugin, self._CONTEXT, self._NETWORK_ID)
target = 'topic=%s,server=%s' % (topics.SERVICEVM_AGENT,
self._DEVICE['id'])
mock_instance.create_rpc_proxy.assert_has_calls([
mock.call(self._CONTEXT, mock.ANY, target, target, 'receive'),
mock.call(self._CONTEXT, mock.ANY, target, target, 'send')])
dst_transport_url = cfg.CONF.dst_transport_url % {'host':
self._IP_ADDRESS}
mock_instance.create_rpc_namespace_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, target, dst_transport_url,
target, 'receive'),
mock.call(self._CONTEXT, self._PORT_ID, target, dst_transport_url,
target, 'send')])
mock_instance.create_rpc_proxy.reset_mock()
mock_instance.create_rpc_namespace_proxy.reset_mock()
mgmt_driver.mgmt_service_create_pre(
self.plugin, self._CONTEXT, self._DEVICE, self._SERVICE_INSTANCE)
target = 'topic=%s-%s,server=%s' % (
topics.SERVICEVM_AGENT, self._DEVICE['id'],
self._SERVICE_INSTANCE['id'])
mock_instance.create_rpc_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, target, target, 'receive'),
mock.call(self._CONTEXT, self._PORT_ID, target, target, 'send')])
dst_transport_url = cfg.CONF.dst_transport_url % {'host':
self._IP_ADDRESS}
mock_instance.create_rpc_namespace_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, target, dst_transport_url,
target, 'receive'),
mock.call(self._CONTEXT, self._PORT_ID, target, dst_transport_url,
target, 'send')])
mgmt_service_address = mgmt_driver.mgmt_service_address(
self.plugin, self._CONTEXT, self._DEVICE, self._SERVICE_INSTANCE)
self.assertEqual(mgmt_service_address, '%s-%s.%s' % (
topics.SERVICEVM_AGENT, self._DEVICE['id'],
self._SERVICE_INSTANCE['id']))
mgmt_driver.mgmt_service_call(
self.plugin, self._CONTEXT, self._DEVICE, self._SERVICE_INSTANCE,
self._KWARGS)
msg = {'args': self._KWARGS['kwargs'],
'namespace': None,
'method': 'action-name'}
self.mock_rpc_proxy_cast.assert_called_once_with(
self._CONTEXT, msg, topic='service-instance-address')
mgmt_driver.mgmt_service_delete_post(
self.plugin, self._CONTEXT, self._DEVICE, self._SERVICE_INSTANCE)
mock_instance.destroy_rpc_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, self._PROXY_ID[3]),
mock.call(self._CONTEXT, self._PORT_ID, self._PROXY_ID[2])])
mock_instance.destroy_rpc_namespace_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, self._NS_PROXY_ID[3]),
mock.call(self._CONTEXT, self._PORT_ID, self._NS_PROXY_ID[2])])
mock_instance.destroy_rpc_proxy.reset.mock()
mock_instance.destroy_rpc_namespace_proxy.reset_mock()
mgmt_driver.mgmt_delete_post(self.plugin, self._CONTEXT, self._DEVICE)
mock_instance.destroy_rpc_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, self._PROXY_ID[1]),
mock.call(self._CONTEXT, self._PORT_ID, self._PROXY_ID[0])])
mock_instance.destroy_rpc_namespace_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, self._NS_PROXY_ID[1]),
mock.call(self._CONTEXT, self._PORT_ID, self._NS_PROXY_ID[0])])
mock_instance.destroy_namespace_agent.assert_called_once_with(
self.plugin._core_plugin, self._CONTEXT, self._PORT_ID)

View File

@ -0,0 +1,84 @@
# Copyright 2014 Intel Corporation.
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
import uuid
import mock
from oslo.config import cfg
from tacker.db import api as db
import tacker.openstack.common.rpc.proxy
from tacker.tests import base
from tacker.vm.mgmt_drivers import constants
from tacker.vm.mgmt_drivers.rpc import rpc
_uuid = lambda: str(uuid.uuid4())
class TestMgmtRpcDriver(base.BaseTestCase):
_CONTEXT = {}
_DEVICE = {
'id': _uuid(),
'mgmt_address': 'device-address'
}
_SERVICE_INSTANCE = {
'id': _uuid(),
'mgmt_address': 'service-instance-address',
}
_KWARGS = {
constants.KEY_ACTION: 'action-name',
constants.KEY_KWARGS: {
constants.KEY_ACTION: 'method-name',
}
}
def setUp(self):
super(TestMgmtRpcDriver, self).setUp()
db.configure_db()
self.addCleanup(db.clear_db)
cfg.CONF.set_override('rpc_backend',
'tacker.openstack.common.rpc.impl_fake')
self.plugin = mock.Mock()
self.mock_rpc_proxy_cast_p = mock.patch.object(
tacker.openstack.common.rpc.proxy.RpcProxy, 'cast')
self.mock_rpc_proxy_cast = self.mock_rpc_proxy_cast_p.start()
self.mgmt_driver = rpc.AgentRpcMGMTDriver()
def test_rpc_mgmt_call(self):
self.mgmt_driver.mgmt_call(self.plugin, self._CONTEXT,
self._DEVICE, self._KWARGS)
self.mock_rpc_proxy_cast.assert_called_once_with(
self._CONTEXT,
{'args': {'action': 'method-name'},
'namespace': None,
'method': 'action-name'},
topic='device-address')
def test_rpc_mgmt_service_call(self):
self.mgmt_driver.mgmt_service_call(
self.plugin, self._CONTEXT, self._DEVICE, self._SERVICE_INSTANCE,
self._KWARGS)
self.mock_rpc_proxy_cast.assert_called_once_with(
self._CONTEXT,
{'args': {'action': 'method-name'},
'namespace': None,
'method': 'action-name'},
topic='service-instance-address')

View File

@ -0,0 +1,144 @@
# Copyright 2014 Intel Corporation.
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
import uuid
import mock
from oslo.config import cfg
from tacker.api.v1 import attributes
import tacker.openstack.common.rpc.proxy
from tacker.tests import base
from tacker.vm import proxy_api
class TestProxyApi(base.BaseTestCase):
network_id = str(uuid.uuid4())
subnet_id = str(uuid.uuid4())
port_id = str(uuid.uuid4())
direction = 'send'
src_target = 'topic=src_topic,server=src_server'
dst_target = 'topic=dst_topic,server=dst_server'
def setUp(self):
super(TestProxyApi, self).setUp()
cfg.CONF.set_override('rpc_backend',
'tacker.openstack.common.rpc.impl_fake')
self.context = object()
self.api = proxy_api.ServiceVMPluginApi('fake-topic')
self.core_plugin = mock.Mock()
self.mock_rpc_proxy_call_p = mock.patch.object(
tacker.openstack.common.rpc.proxy.RpcProxy, 'call')
self.mock_rpc_proxy_call = self.mock_rpc_proxy_call_p.start()
def test_create_namespace_agent(self):
core_plugin = self.core_plugin
subnet = {
'id': self.subnet_id,
}
core_plugin.get_subnet.return_value = subnet
port = {
'id': self.port_id,
'network_id': self.network_id,
'fixed_ips': [
{'subnet_id': self.subnet_id}
]
}
core_plugin.create_port.return_value = port
self.api.create_namespace_agent(core_plugin, self.context,
self.network_id)
self.core_plugin.create_port.assert_called_once_with(
self.context, {'port': {'name': mock.ANY,
'admin_state_up': True,
'network_id': self.network_id,
'device_owner': 'tacker:SERVICEVM',
'mac_address': mock.ANY,
'device_id': mock.ANY,
'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
}})
self.mock_rpc_proxy_call.assert_called_once_with(
self.context,
{'args': {'port': {
'id': self.port_id,
'network_id': self.network_id,
'fixed_ips': [{
'subnet_id': self.subnet_id,
'subnet': {'id': self.subnet_id}}]}},
'namespace': None,
'method': 'create_namespace_agent'})
def test_destroy_namespace_agent(self):
self.api.destroy_namespace_agent(self.core_plugin, self.context,
self.port_id)
self.mock_rpc_proxy_call.assert_called_once_with(
self.context, {'args': {'port_id': self.port_id},
'namespace': None,
'method': 'destroy_namespace_agent'})
self.core_plugin.delete_port.assert_called_once_with(self.context,
self.port_id)
def test_creeat_rpc_proxy(self):
self.api.create_rpc_proxy(
self.context, self.port_id, self.src_target, self.dst_target,
self.direction)
self.mock_rpc_proxy_call.assert_called_once_with(
self.context, {'args': {'port_id': self.port_id,
'src_target': self.src_target,
'dst_unix_target': self.dst_target,
'direction': self.direction},
'namespace': None,
'method': 'create_rpc_proxy'})
def test_destroy_rpc_proxy(self):
proxy_id = str(uuid.uuid4())
rpc_proxy_id = str(uuid.uuid4())
self.api.destroy_rpc_proxy(self.context, proxy_id, rpc_proxy_id)
self.mock_rpc_proxy_call.assert_called_once_with(
self.context, {'args': {'proxy_id': proxy_id,
'rpc_proxy_id': rpc_proxy_id},
'namespace': None,
'method': 'destroy_rpc_proxy'})
def test_create_rpc_namespace_proxy(self):
dst_transport_url = 'fake:///'
direction = 'send'
self.api.create_rpc_namespace_proxy(
self.context, self.port_id, self.src_target,
dst_transport_url, self.dst_target, direction)
self.mock_rpc_proxy_call.assert_called_once_with(
self.context, {'args': {'dst_transport_url': dst_transport_url,
'direction': direction,
'port_id': self.port_id,
'src_target': self.src_target,
'dst_target': self.dst_target},
'namespace': None,
'method': 'create_rpc_namespace_proxy'})
def test_destroy_rpc_namespace_proxy(self):
ns_proxy_id = str(uuid.uuid4())
self.api.destroy_rpc_namespace_proxy(self.context, self.port_id,
ns_proxy_id)
self.mock_rpc_proxy_call.assert_called_once_with(
self.context, {'args': {'port_id': self.port_id,
'namespace_proxy_id': ns_proxy_id},
'namespace': None,
'method': 'destroy_rpc_namespace_proxy'})

View File

View File

@ -0,0 +1,34 @@
# Copyright 2014 Intel Corporation.
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
from oslo.config import cfg
from tacker.common import topics
_RPC_AGENT_OPTS = [
cfg.StrOpt('device_id', default=None, help=_('The device id')),
cfg.StrOpt('topic', default=topics.SERVICEVM_AGENT,
help=_('rpc topic for agent to subscribe')),
]
def register_servicevm_agent_opts(conf):
conf.register_opts(_RPC_AGENT_OPTS, group='servicevm_agent')

View File

@ -0,0 +1,181 @@
# Copyright 2014 Intel Corporation.
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
from oslo.config import cfg
from tacker.db.vm import proxy_db
from tacker.openstack.common import log as logging
from tacker.vm import constants
from tacker.vm.mgmt_drivers.rpc import rpc
LOG = logging.getLogger(__name__)
class AgentRpcProxyMGMTDriver(rpc.AgentRpcMGMTDriver):
_TRANSPORT_OPTS = [
cfg.StrOpt('dst_transport_url',
# TODO(yamahata): make user, pass, port configurable
# per servicevm
#'<scheme>://<user>:<pass>@<host>:<port>/'
default='rabbit://guest:guest@%(host)s:5672/',
help='A URL representing the messaging driver '
'to use and its full configuration.'),
]
def __init__(self, conf=None):
super(AgentRpcProxyMGMTDriver, self).__init__()
self.db = proxy_db.RpcProxyDb()
self.conf = conf or cfg.CONF
self.conf.register_opts(self._TRANSPORT_OPTS)
def get_type(self):
return 'agent-proxy'
def get_name(self):
return 'agent-proxy'
def get_description(self):
return 'agent-proxy'
def mgmt_create_post(self, plugin, context, device):
LOG.debug('mgmt_create_post')
mgmt_entries = [sc_entry for sc_entry in device['service_context']
if (sc_entry['role'] == constants.ROLE_MGMT and
sc_entry.get('port_id'))]
assert mgmt_entries
mgmt_entry = mgmt_entries[0]
vm_port_id = mgmt_entry['port_id']
vm_port = plugin._core_plugin.get_port(context, vm_port_id)
fixed_ip = vm_port['fixed_ips'][0]['ip_address']
# TODO(yamahata): make all parameters(scheme, user, pass, port)
# configurable
dst_transport_url = self.conf.dst_transport_url % {'host': fixed_ip}
network_id = mgmt_entry['network_id']
assert network_id
proxy_api = plugin.proxy_api
port_id = proxy_api.create_namespace_agent(plugin._core_plugin,
context, network_id)
device_id = device['id']
target = 'topic=%s,server=%s' % (self._mgmt_topic(device),
self._mgmt_server(device))
svr_proxy_id = proxy_api.create_rpc_proxy(
context, port_id, target, target, 'receive')
LOG.debug('mgmt_create_post: svr_proxy_id: %s', svr_proxy_id)
svr_ns_proxy_id = proxy_api.create_rpc_namespace_proxy(
context, port_id, target, dst_transport_url, target, 'receive')
LOG.debug('mgmt_create_post: svr_ns_proxy_id: %s', svr_ns_proxy_id)
clt_proxy_id = proxy_api.create_rpc_proxy(
context, port_id, target, target, 'send')
LOG.debug('mgmt_create_post: clt_proxy_id: %s', clt_proxy_id)
clt_ns_proxy_id = proxy_api.create_rpc_namespace_proxy(
context, port_id, target, dst_transport_url, target, 'send')
LOG.debug('mgmt_create_post: clt_ns_proxy_id: %s', clt_ns_proxy_id)
LOG.debug('mgmt_create_ppost: '
'svr: %s svr_ns: %s clt: %s clt_ns: %s ',
svr_proxy_id, svr_ns_proxy_id, clt_proxy_id, clt_ns_proxy_id)
self.db.create_proxy_mgmt_port(
context, device_id, port_id, dst_transport_url,
svr_proxy_id, svr_ns_proxy_id, clt_proxy_id, clt_ns_proxy_id)
def mgmt_delete_post(self, plugin, context, device):
LOG.debug('mgmt_delete_post')
device_id = device['id']
proxy_mgmt_port = self.db.get_proxy_mgmt_port(context, device_id)
port_id = proxy_mgmt_port['port_id']
svr_proxy_id = proxy_mgmt_port['svr_proxy_id']
svr_ns_proxy_id = proxy_mgmt_port['svr_ns_proxy_id']
clt_proxy_id = proxy_mgmt_port['clt_proxy_id']
clt_ns_proxy_id = proxy_mgmt_port['clt_ns_proxy_id']
proxy_api = plugin.proxy_api
proxy_api.destroy_rpc_namespace_proxy(context,
port_id, clt_ns_proxy_id)
proxy_api.destroy_rpc_proxy(context, port_id, clt_proxy_id)
proxy_api.destroy_rpc_namespace_proxy(context,
port_id, svr_ns_proxy_id)
proxy_api.destroy_rpc_proxy(context, port_id, svr_proxy_id)
proxy_api.destroy_namespace_agent(plugin._core_plugin,
context, port_id)
self.db.delete_proxy_mgmt_port(context, port_id)
def mgmt_service_create_pre(self, plugin, context, device,
service_instance):
LOG.debug('mgmt_service_create_pre')
proxy_mgmt_port = self.db.get_proxy_mgmt_port(context, device['id'])
port_id = proxy_mgmt_port['port_id']
dst_transport_url = proxy_mgmt_port['dst_transport_url']
proxy_api = plugin.proxy_api
target = 'topic=%s,server=%s' % (
self._mgmt_service_topic(device, service_instance),
self._mgmt_service_server(device, service_instance))
svr_proxy_id = proxy_api.create_rpc_proxy(
context, port_id, target, target, 'receive')
LOG.debug('mgmt_service_create_pre: svr_proxy_id: %s', svr_proxy_id)
svr_ns_proxy_id = proxy_api.create_rpc_namespace_proxy(
context, port_id, target, dst_transport_url, target, 'receive')
LOG.debug('mgmt_service_create_pre: svr_ns_proxy_id: %s',
svr_ns_proxy_id)
clt_proxy_id = proxy_api.create_rpc_proxy(
context, port_id, target, target, 'send')
LOG.debug('mgmt_service_create_pre: clt_proxy_id: %s', clt_proxy_id)
clt_ns_proxy_id = proxy_api.create_rpc_namespace_proxy(
context, port_id, target, dst_transport_url, target, 'send')
LOG.debug('mgmt_service_create_pre: clt_ns_proxy_id: %s',
clt_ns_proxy_id)
LOG.debug('mgmt_service_create_pre: '
'svr: %s svr_ns: %s clt: %s clt_ns: %s ',
svr_proxy_id, svr_ns_proxy_id, clt_proxy_id, clt_ns_proxy_id)
self.db.create_proxy_service_port(
context, service_instance['id'],
svr_proxy_id, svr_ns_proxy_id, clt_proxy_id, clt_ns_proxy_id)
def mgmt_service_delete_post(self, plugin, context, device,
service_instance):
LOG.debug('mgmt_service_delete_post')
proxy_mgmt_port = self.db.get_proxy_mgmt_port(context, device['id'])
port_id = proxy_mgmt_port['port_id']
service_instance_id = service_instance['id']
proxy_service_port = self.db.get_proxy_service_port(
context, service_instance_id)
svr_proxy_id = proxy_service_port['svr_proxy_id']
svr_ns_proxy_id = proxy_service_port['svr_ns_proxy_id']
clt_proxy_id = proxy_service_port['clt_proxy_id']
clt_ns_proxy_id = proxy_service_port['clt_ns_proxy_id']
proxy_api = plugin.proxy_api
proxy_api.destroy_rpc_namespace_proxy(context,
port_id, clt_ns_proxy_id)
proxy_api.destroy_rpc_proxy(context, port_id, clt_proxy_id)
proxy_api.destroy_rpc_namespace_proxy(context,
port_id, svr_ns_proxy_id)
proxy_api.destroy_rpc_proxy(context, port_id, svr_proxy_id)
self.db.delete_proxy_service_port(context, service_instance_id)

View File

@ -0,0 +1,107 @@
# Copyright 2014 Intel Corporation.
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
from tacker.common import topics
from tacker.common import rpc_compat
from tacker.vm.mgmt_drivers import abstract_driver
from tacker.vm.mgmt_drivers import constants
class ServiceVMAgentRpcApi(rpc_compat.RpcProxy):
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic=topics.SERVICEVM_AGENT):
super(ServiceVMAgentRpcApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
def rpc_cast(self, context, method, kwargs, topic):
self.cast(context, self.make_msg(method, **kwargs), topic=topic)
# TODO(yamahata): port this to oslo.messaging
# address format needs be changed to
# oslo.messaging.target.Target
class AgentRpcMGMTDriver(abstract_driver.DeviceMGMTAbstractDriver):
_TOPIC = topics.SERVICEVM_AGENT # can be overridden by subclass
_RPC_API = {} # topic -> ServiceVMAgentRpcApi
@property
def _rpc_api(self):
topic = self._TOPIC
api = self._RPC_API.get(topic)
if api is None:
api = ServiceVMAgentRpcApi(topic=topic)
api = self._RPC_API.setdefault(topic, api)
return api
def get_type(self):
return 'agent-rpc'
def get_name(self):
return 'agent-rpc'
def get_description(self):
return 'agent-rpc'
def mgmt_get_config(self, plugin, context, device):
return {'/etc/tacker/servicevm-agent.ini':
'[servicevm]\n'
'topic = %s\n'
'device_id = %s\n'
% (self._TOPIC, device['id'])}
@staticmethod
def _address(topic, server):
return '%s.%s' % (topic, server)
def _mgmt_server(self, device):
return device['id']
def _mgmt_topic(self, device):
return '%s-%s' % (self._TOPIC, self._mgmt_server(device))
def mgmt_address(self, plugin, context, device):
return self._address(self._mgmt_topic(device),
self._mgmt_server(device))
def mgmt_call(self, plugin, context, device, kwargs):
topic = device['mgmt_address']
method = kwargs[constants.KEY_ACTION]
kwargs_ = kwargs[constants.KEY_KWARGS]
self._rpc_api.rpc_cast(context, method, kwargs_, topic)
def _mgmt_service_server(self, device, service_instance):
return '%s-%s' % (device['id'], service_instance['id'])
def _mgmt_service_topic(self, device, service_instance):
return '%s-%s' % (self._TOPIC,
self._mgmt_service_server(device, service_instance))
def mgmt_service_address(self, plugin, context, device, service_instance):
return self._address(
self._mgmt_service_topic(device, service_instance),
self._mgmt_service_server(device, service_instance))
def mgmt_service_call(self, plugin, context, device,
service_instance, kwargs):
method = kwargs[constants.KEY_ACTION]
kwargs_ = kwargs[constants.KEY_KWARGS]
topic = service_instance['mgmt_address']
self._rpc_api.rpc_cast(context, method, kwargs_, topic)

116
tacker/vm/proxy_api.py Normal file
View File

@ -0,0 +1,116 @@
# Copyright 2013, 2014 Intel Corporation.
# Copyright 2013, 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
import inspect
from tacker.api.v1 import attributes
from tacker.common import rpc_compat
from tacker.openstack.common import excutils
from tacker.openstack.common import log as logging
from tacker.plugins.common import constants
LOG = logging.getLogger(__name__)
# TODO(yamahata): convert oslo.messaging
class ServiceVMPluginApi(rpc_compat.RpcProxy):
API_VERSION = '1.0'
def __init__(self, topic):
super(ServiceVMPluginApi, self).__init__(topic, self.API_VERSION)
def _call(self, context, **kwargs):
method = inspect.stack()[1][3]
LOG.debug('ServiceVMPluginApi method = %s kwargs = %s', method, kwargs)
return self.call(context, self.make_msg(method, **kwargs))
def create_namespace_agent(self, core_plugin, context, network_id):
"""
:param dst_transport_url: st
:type dst_transport_url: str that represents
oslo.messaging.transportURL
"""
port_data = {
'name': '_svcvm-rpc-namespace-agent-' + network_id,
'network_id': network_id,
'mac_address': attributes.ATTR_NOT_SPECIFIED,
'admin_state_up': True,
'device_id': '_svcvm-rpc-proxy-' + network_id,
'device_owner': 'tacker:' + constants.SERVICEVM,
'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
}
port = core_plugin.create_port(context, {'port': port_data})
for i in xrange(len(port['fixed_ips'])):
ipallocation = port['fixed_ips'][i]
subnet_id = ipallocation['subnet_id']
subnet = core_plugin.get_subnet(context, subnet_id)
ipallocation['subnet'] = subnet
port_id = port['id']
try:
self._call(context, port=port)
except Exception:
with excutils.save_and_reraise_exception():
core_plugin.delete_port(context, port_id)
return port_id
def destroy_namespace_agent(self, core_plugin, context, port_id):
self._call(context, port_id=port_id)
core_plugin.delete_port(context, port_id)
def create_rpc_proxy(self, context, port_id, src_target, dst_unix_target,
direction):
"""
:param src_target: target to listen/send
:type src_target: oslo.messaging.Target
:param dst_unix_target: target to send/listen
:type dst_unix_target: oslo.messaging.Target
:param direction: RPC direction
:type direction: str 'send' or 'receive'
'send': tacker server -> agent
'receive': neturon server <- agent
"""
return self._call(context, port_id=port_id, src_target=src_target,
dst_unix_target=dst_unix_target, direction=direction)
def destroy_rpc_proxy(self, context, port_id, rpc_proxy_id):
return self._call(context, proxy_id=port_id, rpc_proxy_id=rpc_proxy_id)
def create_rpc_namespace_proxy(self, context, port_id, src_target,
dst_transport_url, dst_target, direction):
"""
:param src_target: target to listen/send
:type src_target: oslo.messaging.Target
:param dst_target: target to send/listen
:type dst_target: oslo.messaging.Target
:param direction: RPC direction
:type direction: str 'send' or 'receive'
'send': tacker server -> agent
'receive': neturon server <- agent
"""
return self._call(context, port_id=port_id,
src_target=src_target,
dst_transport_url=dst_transport_url,
dst_target=dst_target, direction=direction)
def destroy_rpc_namespace_proxy(self, context, port_id,
namespace_proxy_id):
return self._call(context, port_id=port_id,
namespace_proxy_id=namespace_proxy_id)