Remove unused tacker agent code

Remove code related to servicevm configuration & monitoring using
proxy_api and rpc. Tacker no longer uses these mechanisms to
configure and monitor VNFs.

Change-Id: I9b7c8b87bb5042551459a5d176f094504be09a91
This commit is contained in:
Sridhar Ramaswamy 2015-08-20 23:32:55 +00:00
parent 0b422aa7ad
commit d88ab88a08
18 changed files with 0 additions and 2162 deletions

View File

@ -1,296 +0,0 @@
# Copyright 2014 Intel Corporation.
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
import uuid
import mock
import oslo.messaging.rpc.client
from oslo.messaging import target
from oslo.messaging import transport
from oslo_config import cfg
import tacker.agent.linux.ip_lib
from tacker import context
from tacker.tests import base
from tacker.vm.agent import agent
class TestVMService(base.BaseTestCase):
ctxt = context.Context('user', 'tenant')
network_id = str(uuid.uuid4())
subnet_id = str(uuid.uuid4())
port_id = str(uuid.uuid4())
mac_address = '00:00:00:00:00:01'
netmask = '/24'
network_address = '192.168.1.0'
cidr = network_address + netmask
ip_address = '192.168.1.3'
gw_address = '192.168.1.1'
port = {
'id': port_id,
'network_id': network_id,
'mac_address': mac_address,
'fixed_ips': [{'subnet_id': subnet_id,
'ip_address': ip_address,
'subnet': {
'cidr': cidr,
'ip_version': 4,
'gateway_ip': gw_address}}]
}
def setUp(self):
super(TestVMService, self).setUp()
conf = cfg.CONF
# NOTE(yamahata): work around. rpc driver-dependent config variables
# remove this line once tacker are fully ported to oslo.messaging
from tacker.openstack.common import rpc
conf.unregister_opts(rpc.rpc_opts)
conf.register_opts(transport._transport_opts)
conf.set_override('rpc_backend',
'tacker.openstack.common.rpc.impl_fake')
self.addCleanup(mock.patch.stopall)
self.mock_get_transport_p = mock.patch('oslo.messaging.get_transport')
self.mock_get_transport = self.mock_get_transport_p.start()
self.mock_import_object_p = mock.patch(
'tacker.openstack.common.importutils.import_object')
self.mock_import_object = self.mock_import_object_p.start()
self.vif_driver = mock.create_autospec(
tacker.agent.linux.interface.NullDriver)
self.mock_import_object.return_value = self.vif_driver
self.agent = agent.ServiceVMAgent('host', conf=conf)
self.mock_process_manager_p = mock.patch.object(
tacker.agent.linux.external_process, 'ProcessManager')
self.mock_process_manager = self.mock_process_manager_p.start()
self.mock_process_manager_instance = (
self.mock_process_manager.return_value)
self.mock_device_exists_p = mock.patch(
'tacker.agent.linux.ip_lib.device_exists')
self.mock_device_exists = self.mock_device_exists_p.start()
self.mock_device_exists.return_value = False
self.mock_ipwrapper_p = mock.patch.object(tacker.agent.linux.ip_lib,
'IPWrapper')
self.mock_ipwrapper = self.mock_ipwrapper_p.start()
self.mock_ipwrapper_instance = self.mock_ipwrapper.return_value
self.mock_rpc_client_p = mock.patch.object(oslo.messaging.rpc,
'RPCClient')
self.mock_rpc_client = self.mock_rpc_client_p.start()
self.mock_rpc_client_instance = self.mock_rpc_client.return_value
def test_create_destroy_namespace_agent(self):
self.agent.create_namespace_agent(self.ctxt, self.port)
self.vif_driver.plug.assert_called_once_with(
self.network_id, self.port_id, mock.ANY, self.mac_address,
namespace=mock.ANY)
self.vif_driver.init_l3.assert_called_once_with(
mock.ANY, [self.ip_address + self.netmask], namespace=mock.ANY)
self.mock_ipwrapper.assert_called_once_with(
mock.ANY, namespace=mock.ANY)
self.mock_ipwrapper_instance.netns.execute.assert_called_once_with(
['route', 'add', 'default', 'gw', self.gw_address],
check_exit_code=False)
self.assertTrue(self.mock_process_manager_instance.enable.called)
self.agent.destroy_namespace_agent(self.ctxt, self.port_id)
self.vif_driver.unplug.assert_called_once_with(mock.ANY,
namespace=mock.ANY)
self.mock_process_manager_instance.disable.assert_called_once_with()
def test_create_rpc_proxy_wrong_port_id(self):
func = lambda: self.agent.create_rpc_proxy(
self.ctxt, self.port_id, 'topic=src_topic,server=src_server',
'topic=dst_topic,server=dst_server', 'wrong-direction')
self.assertRaises(RuntimeError, func)
def test_create_rpc_proxy_wrong_direction(self):
self.agent.create_namespace_agent(self.ctxt, self.port)
func = lambda: self.agent.create_rpc_proxy(
self.ctxt, self.port_id, 'topic=src_topic,server=src_server',
'topic=dst_topic,server=dst_server', 'wrong-direction')
self.assertRaises(RuntimeError, func)
self.agent.destroy_namespace_agent(self.ctxt, self.port_id)
def test_create_destroy_rpc_proxy_send(self):
self.agent.create_namespace_agent(self.ctxt, self.port)
with mock.patch('oslo.messaging.proxy.get_proxy_server'
) as mock_get_proxy_server:
mock_transport = self.mock_get_transport.return_value
mock_instance = mock_get_proxy_server.return_value
proxy_id = self.agent.create_rpc_proxy(
self.ctxt, self.port_id, 'topic=src_topic,server=src_server',
'topic=dst_topic,server=dst_server', 'send')
src_target = target.Target(topic='src_topic', server='src_server')
dst_target = target.Target(topic='dst_topic', server='dst_server')
mock_get_proxy_server.assert_called_once_with(
mock_transport, src_target, None,
mock_transport, dst_target, None, executor=mock.ANY)
mock_instance.start.assert_called_once_with()
self.agent.destroy_rpc_proxy(self.ctxt, self.port_id, proxy_id)
mock_instance.stop.assert_called_once_with()
mock_instance.wait.assert_called_once_with()
self.agent.destroy_namespace_agent(self.ctxt, self.port_id)
def test_create_destroy_rpc_proxy_receive(self):
self.agent.create_namespace_agent(self.ctxt, self.port)
with mock.patch('oslo.messaging.proxy.get_proxy_server'
) as mock_get_proxy_server:
mock_transport = self.mock_get_transport.return_value
mock_instance = mock_get_proxy_server.return_value
proxy_id = self.agent.create_rpc_proxy(
self.ctxt, self.port_id, 'topic=src_topic,server=src_server',
'topic=dst_topic,server=dst_server', 'receive')
src_target = target.Target(topic='src_topic', server='src_server')
dst_target = target.Target(topic='dst_topic', server='dst_server')
mock_get_proxy_server.assert_called_once_with(
mock_transport, dst_target, None,
mock_transport, src_target, None, executor=mock.ANY)
mock_instance.start.assert_called_once_with()
self.agent.destroy_rpc_proxy(self.ctxt, self.port_id, proxy_id)
mock_instance.stop.assert_called_once_with()
mock_instance.wait.assert_called_once_with()
self.agent.destroy_namespace_agent(self.ctxt, self.port_id)
def test_create_destroy_rpc_proxy(self):
self.mock_device_exists.return_value = False
self.agent.create_namespace_agent(self.ctxt, self.port)
with mock.patch('oslo.messaging.proxy.get_proxy_server'
) as mock_get_proxy_server:
mock_transport = self.mock_get_transport.return_value
proxy_id_send = self.agent.create_rpc_proxy(
self.ctxt, self.port_id,
'topic=src_topic_send,server=src_server_send',
'topic=dst_topic_send,server=dst_server_send', 'send')
src_target_send = target.Target(topic='src_topic_send',
server='src_server_send')
dst_target_send = target.Target(topic='dst_topic_send',
server='dst_server_send')
self.agent.create_rpc_proxy(
self.ctxt, self.port_id,
'topic=src_topic_receive,server=src_server_receive',
'topic=dst_topic_receive,server=dst_server_receive', 'receive')
src_target_recv = target.Target(topic='src_topic_receive',
server='src_server_receive')
dst_target_recv = target.Target(topic='dst_topic_receive',
server='dst_server_receive')
self.agent.destroy_rpc_proxy(self.ctxt,
self.port_id, proxy_id_send)
self.agent.destroy_namespace_agent(self.ctxt, self.port_id)
mock_get_proxy_server.assert_has_calls([
mock.call(mock_transport, src_target_send, None,
mock_transport, dst_target_send, None,
executor=mock.ANY),
mock.call().start(),
mock.call(mock_transport, dst_target_recv, None,
mock_transport, src_target_recv, None,
executor=mock.ANY),
mock.call().start(),
mock.call().stop(), mock.call().wait(),
mock.call().stop(), mock.call().wait()])
def _test_create_destroy_rpc_namespace_proxy_direction(self, direction):
self.agent.create_namespace_agent(self.ctxt, self.port)
src_target = 'topic=src_topic,server=src_server'
dst_transport_url = 'rabbit://guest:guest@host:5672'
dst_target = 'topic=dst_topic,server=dst_server'
ns_proxy_id = self.agent.create_rpc_namespace_proxy(
self.ctxt, self.port_id, src_target,
dst_transport_url, dst_target, direction)
kwargs = {
'src_target': src_target,
'dst_transport_url': dst_transport_url,
'dst_target': dst_target,
'direction': direction,
}
self.mock_rpc_client_instance.call.assert_called_once_with(
{}, 'create_rpc_namespace_proxy', **kwargs)
self.agent.destroy_rpc_namespace_proxy(self.ctxt,
self.port_id, ns_proxy_id)
self.mock_rpc_client_instance.call.assert_has_calls([
mock.call({}, 'create_rpc_namespace_proxy', **kwargs),
mock.call({}, 'destroy_rpc_namespace_proxy',
namespace_proxy_id=ns_proxy_id)])
self.agent.destroy_namespace_agent(self.ctxt, self.port_id)
def test_create_destroy_rpc_namespace_proxy_send(self):
self._test_create_destroy_rpc_namespace_proxy_direction('send')
def test_create_destroy_rpc_namespace_proxy_receive(self):
self._test_create_destroy_rpc_namespace_proxy_direction('receive')
def test_create_destroy_rpc_namespace_proxy(self):
self.agent.create_namespace_agent(self.ctxt, self.port)
src_target_send = 'topic=src_topic_send,server=src_server_send'
dst_transport_url_send = 'rabbit://guestsend:guestsend@sendhost:5672'
dst_target_send = 'topic=dst_topic_send,server=dst_server_send'
direction_send = 'send'
ns_proxy_id_send = self.agent.create_rpc_namespace_proxy(
self.ctxt, self.port_id, src_target_send,
dst_transport_url_send, dst_target_send, direction_send)
kwargs_send = {
'src_target': src_target_send,
'dst_transport_url': dst_transport_url_send,
'dst_target': dst_target_send,
'direction': direction_send,
}
src_target_recv = 'topic=src_topic_recv,server=src_server_recv'
dst_transport_url_recv = 'rabbit://guestrecv:guestrecv@recvhost:5672'
dst_target_recv = 'topic=dst_topic_recv,server=dst_server_recv'
direction_recv = 'receive'
self.agent.create_rpc_namespace_proxy(
self.ctxt, self.port_id, src_target_recv,
dst_transport_url_recv, dst_target_recv, direction_recv)
kwargs_recv = {
'src_target': src_target_recv,
'dst_transport_url': dst_transport_url_recv,
'dst_target': dst_target_recv,
'direction': direction_recv,
}
self.agent.destroy_rpc_namespace_proxy(self.ctxt,
self.port_id, ns_proxy_id_send)
self.agent.destroy_namespace_agent(self.ctxt, self.port_id)
self.mock_rpc_client_instance.call.assert_has_calls([
mock.call({}, 'create_rpc_namespace_proxy', **kwargs_send),
mock.call({}, 'create_rpc_namespace_proxy', **kwargs_recv),
mock.call({}, 'destroy_rpc_namespace_proxy',
namespace_proxy_id=ns_proxy_id_send),
mock.call({}, 'destroy_namespace_agent')])

View File

@ -1,173 +0,0 @@
# Copyright 2014 Intel Corporation.
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
import mock
from oslo_config import cfg
from tacker import context
from tacker.tests import base
from tacker.vm.agent import namespace_proxy
from tacker.vm.agent import target
class TestNamespaceAgent(base.BaseTestCase):
def setUp(self):
super(TestNamespaceAgent, self).setUp()
self.addCleanup(mock.patch.stopall)
self.mock_get_transport_p = mock.patch('oslo.messaging.get_transport')
self.mock_get_transport = self.mock_get_transport_p.start()
self.mock_transport = self.mock_get_transport.return_value
def server_stop():
pass
self.agent = namespace_proxy.ServiceVMNamespaceAgent(
'host', conf=cfg.CONF, src_transport=self.mock_transport,
server_stop=server_stop)
def test_start_stop_wait(self):
self.agent.init_host()
self.agent.after_start()
self.agent.stop()
self.agent.wait()
def test_desstroy_namespace_agent(self):
self.agent.init_host()
self.agent.after_start()
ctxt = context.Context('user', 'tenant')
self.agent.destroy_namespace_agent(ctxt)
self.agent.wait()
def test_create_destroy_rpc_namespace_proxy_send(self):
self.agent.init_host()
self.agent.after_start()
ctxt = context.Context('user', 'tenant')
with mock.patch('oslo.messaging.proxy.get_proxy_server'
) as mock_get_proxy_server:
mock_proxy_server = mock_get_proxy_server.return_value
src_unix_target = 'topic=src_topic,server=src_server'
dst_transport_url = 'fake:///'
dst_target = 'topic=dst_topic,server=dst_server'
ns_proxy_id = self.agent.create_rpc_namespace_proxy(
ctxt, src_unix_target, dst_transport_url, dst_target, 'send')
src_unix_target = target.target_parse(src_unix_target)
dst_target = target.target_parse(dst_target)
mock_get_proxy_server.assert_called_once_with(
self.mock_transport, src_unix_target, None,
self.mock_transport, dst_target, None, executor=mock.ANY)
# mock_proxy_server.start.assert_called_once_with()
self.agent.destroy_rpc_namespace_proxy(ctxt, ns_proxy_id)
mock_proxy_server.stop.assert_called_once_with()
mock_proxy_server.wait.assert_called_once_with()
self.mock_transport.cleanup.assert_called_once_with()
self.agent.stop()
self.agent.wait()
def test_create_destroy_rpc_namespace_proxy_receive(self):
self.agent.init_host()
self.agent.after_start()
ctxt = context.Context('user', 'tenant')
with mock.patch('oslo.messaging.proxy.get_proxy_server'
) as mock_get_proxy_server:
mock_proxy_server = mock_get_proxy_server.return_value
src_unix_target = 'topic=src_topic,server=src_server'
dst_transport_url = 'fake:///'
dst_target = 'topic=dst_topic,server=dst_server'
ns_proxy_id = self.agent.create_rpc_namespace_proxy(
ctxt, src_unix_target, dst_transport_url, dst_target,
'receive')
src_unix_target = target.target_parse(src_unix_target)
dst_target = target.target_parse(dst_target)
mock_get_proxy_server.assert_called_once_with(
self.mock_transport, dst_target, None,
self.mock_transport, src_unix_target, None, executor=mock.ANY)
# mock_proxy_server.start.assert_called_once_with()
self.agent.destroy_rpc_namespace_proxy(ctxt, ns_proxy_id)
mock_proxy_server.stop.assert_called_once_with()
mock_proxy_server.wait.assert_called_once_with()
self.mock_transport.cleanup.assert_called_once_with()
self.agent.stop()
self.agent.wait()
def test_create_destroy_rpc_namespace_proxy(self):
self.agent.init_host()
self.agent.after_start()
ctxt = context.Context('user', 'tenant')
with mock.patch('oslo.messaging.proxy.get_proxy_server'
) as mock_get_proxy_server:
mock_proxy_server = mock_get_proxy_server.return_value
src_unix_target_send = ('topic=src_topic_send,'
'server=src_server_send')
dst_transport_url_send = 'fake:///'
dst_target_send = 'topic=dst_topic_send,server=dst_server_send'
ns_proxy_id_send = self.agent.create_rpc_namespace_proxy(
ctxt, src_unix_target_send, dst_transport_url_send,
dst_target_send, 'send')
src_unix_target_send = target.target_parse(src_unix_target_send)
dst_target_send = target.target_parse(dst_target_send)
mock_get_proxy_server.assert_called_once_with(
self.mock_transport, src_unix_target_send, None,
self.mock_transport, dst_target_send, None, executor=mock.ANY)
# mock_proxy_server.start.assert_called_once_with()
src_unix_target_recv = ('topic=src_topic_recv,'
'server=src_server_recv')
dst_transport_url_recv = 'fake:///'
dst_target_recv = 'topic=dst_topic_recv,server=dst_server_recv'
self.agent.create_rpc_namespace_proxy(
ctxt, src_unix_target_recv, dst_transport_url_recv,
dst_target_recv, 'receive')
src_unix_target_recv = target.target_parse(src_unix_target_recv)
dst_target_recv = target.target_parse(dst_target_recv)
# mock.call().__hash__()/mock.call.__hash__() doesn't work
# due to __getattr__. So create it manually
call_hash = mock._Call(name='().__hash__')
mock_get_proxy_server.assert_has_calls([
mock.call(self.mock_transport, src_unix_target_send, None,
self.mock_transport, dst_target_send, None,
executor=mock.ANY),
# mock.call().start(),
call_hash(),
mock.call(self.mock_transport, dst_target_recv, None,
self.mock_transport, src_unix_target_recv, None,
executor=mock.ANY),
# mock.call().start(),
call_hash()])
self.agent.destroy_rpc_namespace_proxy(ctxt, ns_proxy_id_send)
mock_proxy_server.stop.assert_called_once_with()
mock_proxy_server.wait.assert_called_once_with()
self.agent.destroy_namespace_agent(ctxt)
self.agent.wait()
call_hash = mock._Call(name='__hash__')
mock_proxy_server.assert_has_calls([
# mock.call.start(),
call_hash(),
# mock.call.start(),
call_hash(),
call_hash(),
mock.call.stop(),
mock.call.wait(),
mock.call.stop(),
mock.call.wait()])
self.mock_transport.cleanup.assert_called_once_with()

View File

@ -1,39 +0,0 @@
# Copyright 2014 Intel Corporation.
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
from oslo.messaging import target
from tacker.tests import base
from tacker.vm.agent import target as agent_target
class TestTarget(base.BaseTestCase):
target_str = ('exchange=default,topic=topic,namespace=namespace,'
'version=version,server=server,fanout=False')
target_instance = target.Target('default', 'topic', 'namespace', 'version',
'server', False)
def test_parse(self):
t = agent_target.target_parse(self.target_str)
self.assertEqual(t, self.target_instance)
def test_str(self):
t = agent_target.target_str(self.target_instance)
self.assertEqual(t, self.target_str)

View File

@ -1,201 +0,0 @@
# Copyright 2014 Intel Corporation.
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
import uuid
import mock
from oslo_config import cfg
from tacker.common import topics
from tacker import context
from tacker.db.vm import proxy_db # noqa
from tacker.tests.unit.services.vm.mgmt_drivers import test_rpc
from tacker.vm.mgmt_drivers.rpc import proxy
from tacker.vm import proxy_api
_uuid = lambda: str(uuid.uuid4())
class TestMgmtProxyDriver(test_rpc.TestMgmtRpcDriver):
_TENANT_ID = _uuid()
_CONTEXT = context.Context('', _TENANT_ID)
_NETWORK_ID = _uuid(),
_SUBNET_ID = _uuid()
_DEVICE_PORT_ID = _uuid()
_DEVICE = {
'id': _uuid(),
'mgmt_address': 'device-address',
'service_context': [{
'network_id': _NETWORK_ID,
'port_id': _DEVICE_PORT_ID,
'role': 'mgmt',
}],
}
_PORT_ID = _uuid()
_IP_ADDRESS = '1.1.1.1'
_PORT = {
'id': _PORT_ID,
'fixed_ips': [{
'subnet_id': _SUBNET_ID,
'ip_address': _IP_ADDRESS,
}],
}
_PROXY_ID = [_uuid(), _uuid(), _uuid(), _uuid()]
_NS_PROXY_ID = [_uuid(), _uuid(), _uuid(), _uuid()]
def setUp(self):
super(TestMgmtProxyDriver, self).setUp()
self.mock_proxy_api_p = mock.patch(
'tacker.vm.proxy_api.ServiceVMPluginApi')
self.mock_proxy_api = self.mock_proxy_api_p.start()
mock_proxy_api_instance = mock.Mock()
self.mock_proxy_api.return_value = mock_proxy_api_instance
self.mock_proxy_api_instance = mock_proxy_api_instance
mock_proxy_api_instance.create_namespace_agent.return_value = (
self._PORT_ID)
mock_proxy_api_instance.create_rpc_proxy = mock.Mock(
side_effect=self._PROXY_ID)
mock_proxy_api_instance.create_rpc_namespace_proxy = mock.Mock(
side_effect=self._NS_PROXY_ID)
self.plugin.proxy_api = proxy_api.ServiceVMPluginApi('fake-topic')
self.plugin._core_plugin.get_port.return_value = self._PORT
self.mgmt_driver = proxy.AgentRpcProxyMGMTDriver()
def test_mgmt_create_post_delete_post(self):
mgmt_driver = self.mgmt_driver
mock_instance = self.mock_proxy_api_instance
mgmt_driver.mgmt_create_post(self.plugin, self._CONTEXT, self._DEVICE)
mock_instance.create_namespace_agent.assert_called_once_with(
self.plugin._core_plugin, self._CONTEXT, self._NETWORK_ID)
target = 'topic=%s,server=%s' % (topics.SERVICEVM_AGENT,
self._DEVICE['id'])
mock_instance.create_rpc_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, target, target, 'receive'),
mock.call(self._CONTEXT, self._PORT_ID, target, target, 'send')])
dst_transport_url = cfg.CONF.dst_transport_url % {'host':
self._IP_ADDRESS}
mock_instance.create_rpc_namespace_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, target, dst_transport_url,
target, 'receive'),
mock.call(self._CONTEXT, self._PORT_ID, target, dst_transport_url,
target, 'send')])
mgmt_address = mgmt_driver.mgmt_address(self.plugin, self._CONTEXT,
self._DEVICE)
self.assertEqual(mgmt_address, '%s.%s' % (topics.SERVICEVM_AGENT,
self._DEVICE['id']))
mgmt_driver.mgmt_call(self.plugin, self._CONTEXT, self._DEVICE,
self._KWARGS)
msg = {'args': self._KWARGS['kwargs'],
'namespace': None,
'method': 'action-name'}
self.mock_rpc_proxy_cast.assert_called_once_with(
self._CONTEXT, msg, topic='device-address')
mgmt_driver.mgmt_delete_post(self.plugin, self._CONTEXT, self._DEVICE)
mock_instance.destroy_rpc_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, self._PROXY_ID[1]),
mock.call(self._CONTEXT, self._PORT_ID, self._PROXY_ID[0])])
mock_instance.destroy_rpc_namespace_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, self._NS_PROXY_ID[1]),
mock.call(self._CONTEXT, self._PORT_ID, self._NS_PROXY_ID[0])])
mock_instance.destroy_namespace_agent.assert_called_once_with(
self.plugin._core_plugin, self._CONTEXT, self._PORT_ID)
def test_mgmt_service_create_pre_delete_post(self):
mgmt_driver = self.mgmt_driver
mock_instance = self.mock_proxy_api_instance
mgmt_driver.mgmt_create_post(self.plugin, self._CONTEXT, self._DEVICE)
mock_instance.create_namespace_agent.assert_called_once_with(
self.plugin._core_plugin, self._CONTEXT, self._NETWORK_ID)
target = 'topic=%s,server=%s' % (topics.SERVICEVM_AGENT,
self._DEVICE['id'])
mock_instance.create_rpc_proxy.assert_has_calls([
mock.call(self._CONTEXT, mock.ANY, target, target, 'receive'),
mock.call(self._CONTEXT, mock.ANY, target, target, 'send')])
dst_transport_url = cfg.CONF.dst_transport_url % {'host':
self._IP_ADDRESS}
mock_instance.create_rpc_namespace_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, target, dst_transport_url,
target, 'receive'),
mock.call(self._CONTEXT, self._PORT_ID, target, dst_transport_url,
target, 'send')])
mock_instance.create_rpc_proxy.reset_mock()
mock_instance.create_rpc_namespace_proxy.reset_mock()
mgmt_driver.mgmt_service_create_pre(
self.plugin, self._CONTEXT, self._DEVICE, self._SERVICE_INSTANCE)
target = 'topic=%s-%s,server=%s' % (
topics.SERVICEVM_AGENT, self._DEVICE['id'],
self._SERVICE_INSTANCE['id'])
mock_instance.create_rpc_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, target, target, 'receive'),
mock.call(self._CONTEXT, self._PORT_ID, target, target, 'send')])
dst_transport_url = cfg.CONF.dst_transport_url % {'host':
self._IP_ADDRESS}
mock_instance.create_rpc_namespace_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, target, dst_transport_url,
target, 'receive'),
mock.call(self._CONTEXT, self._PORT_ID, target, dst_transport_url,
target, 'send')])
mgmt_service_address = mgmt_driver.mgmt_service_address(
self.plugin, self._CONTEXT, self._DEVICE, self._SERVICE_INSTANCE)
self.assertEqual(mgmt_service_address, '%s-%s.%s' % (
topics.SERVICEVM_AGENT, self._DEVICE['id'],
self._SERVICE_INSTANCE['id']))
mgmt_driver.mgmt_service_call(
self.plugin, self._CONTEXT, self._DEVICE, self._SERVICE_INSTANCE,
self._KWARGS)
msg = {'args': self._KWARGS['kwargs'],
'namespace': None,
'method': 'action-name'}
self.mock_rpc_proxy_cast.assert_called_once_with(
self._CONTEXT, msg, topic='service-instance-address')
mgmt_driver.mgmt_service_delete_post(
self.plugin, self._CONTEXT, self._DEVICE, self._SERVICE_INSTANCE)
mock_instance.destroy_rpc_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, self._PROXY_ID[3]),
mock.call(self._CONTEXT, self._PORT_ID, self._PROXY_ID[2])])
mock_instance.destroy_rpc_namespace_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, self._NS_PROXY_ID[3]),
mock.call(self._CONTEXT, self._PORT_ID, self._NS_PROXY_ID[2])])
mock_instance.destroy_rpc_proxy.reset.mock()
mock_instance.destroy_rpc_namespace_proxy.reset_mock()
mgmt_driver.mgmt_delete_post(self.plugin, self._CONTEXT, self._DEVICE)
mock_instance.destroy_rpc_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, self._PROXY_ID[1]),
mock.call(self._CONTEXT, self._PORT_ID, self._PROXY_ID[0])])
mock_instance.destroy_rpc_namespace_proxy.assert_has_calls([
mock.call(self._CONTEXT, self._PORT_ID, self._NS_PROXY_ID[1]),
mock.call(self._CONTEXT, self._PORT_ID, self._NS_PROXY_ID[0])])
mock_instance.destroy_namespace_agent.assert_called_once_with(
self.plugin._core_plugin, self._CONTEXT, self._PORT_ID)

View File

@ -1,84 +0,0 @@
# Copyright 2014 Intel Corporation.
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
import uuid
import mock
from oslo_config import cfg
from tacker.db import api as db
import tacker.openstack.common.rpc.proxy
from tacker.tests import base
from tacker.vm.mgmt_drivers import constants
from tacker.vm.mgmt_drivers.rpc import rpc
_uuid = lambda: str(uuid.uuid4())
class TestMgmtRpcDriver(base.BaseTestCase):
_CONTEXT = {}
_DEVICE = {
'id': _uuid(),
'mgmt_address': 'device-address'
}
_SERVICE_INSTANCE = {
'id': _uuid(),
'mgmt_address': 'service-instance-address',
}
_KWARGS = {
constants.KEY_ACTION: 'action-name',
constants.KEY_KWARGS: {
constants.KEY_ACTION: 'method-name',
}
}
def setUp(self):
super(TestMgmtRpcDriver, self).setUp()
db.configure_db()
self.addCleanup(db.clear_db)
cfg.CONF.set_override('rpc_backend',
'tacker.openstack.common.rpc.impl_fake')
self.plugin = mock.Mock()
self.mock_rpc_proxy_cast_p = mock.patch.object(
tacker.openstack.common.rpc.proxy.RpcProxy, 'cast')
self.mock_rpc_proxy_cast = self.mock_rpc_proxy_cast_p.start()
self.mgmt_driver = rpc.AgentRpcMGMTDriver()
def test_rpc_mgmt_call(self):
self.mgmt_driver.mgmt_call(self.plugin, self._CONTEXT,
self._DEVICE, self._KWARGS)
self.mock_rpc_proxy_cast.assert_called_once_with(
self._CONTEXT,
{'args': {'action': 'method-name'},
'namespace': None,
'method': 'action-name'},
topic='device-address')
def test_rpc_mgmt_service_call(self):
self.mgmt_driver.mgmt_service_call(
self.plugin, self._CONTEXT, self._DEVICE, self._SERVICE_INSTANCE,
self._KWARGS)
self.mock_rpc_proxy_cast.assert_called_once_with(
self._CONTEXT,
{'args': {'action': 'method-name'},
'namespace': None,
'method': 'action-name'},
topic='service-instance-address')

View File

@ -1,144 +0,0 @@
# Copyright 2014 Intel Corporation.
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
import uuid
import mock
from oslo_config import cfg
from tacker.api.v1 import attributes
import tacker.openstack.common.rpc.proxy
from tacker.tests import base
from tacker.vm import proxy_api
class TestProxyApi(base.BaseTestCase):
network_id = str(uuid.uuid4())
subnet_id = str(uuid.uuid4())
port_id = str(uuid.uuid4())
direction = 'send'
src_target = 'topic=src_topic,server=src_server'
dst_target = 'topic=dst_topic,server=dst_server'
def setUp(self):
super(TestProxyApi, self).setUp()
cfg.CONF.set_override('rpc_backend',
'tacker.openstack.common.rpc.impl_fake')
self.context = object()
self.api = proxy_api.ServiceVMPluginApi('fake-topic')
self.core_plugin = mock.Mock()
self.mock_rpc_proxy_call_p = mock.patch.object(
tacker.openstack.common.rpc.proxy.RpcProxy, 'call')
self.mock_rpc_proxy_call = self.mock_rpc_proxy_call_p.start()
def test_create_namespace_agent(self):
core_plugin = self.core_plugin
subnet = {
'id': self.subnet_id,
}
core_plugin.get_subnet.return_value = subnet
port = {
'id': self.port_id,
'network_id': self.network_id,
'fixed_ips': [
{'subnet_id': self.subnet_id}
]
}
core_plugin.create_port.return_value = port
self.api.create_namespace_agent(core_plugin, self.context,
self.network_id)
self.core_plugin.create_port.assert_called_once_with(
self.context, {'port': {'name': mock.ANY,
'admin_state_up': True,
'network_id': self.network_id,
'device_owner': 'tacker:SERVICEVM',
'mac_address': mock.ANY,
'device_id': mock.ANY,
'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
}})
self.mock_rpc_proxy_call.assert_called_once_with(
self.context,
{'args': {'port': {
'id': self.port_id,
'network_id': self.network_id,
'fixed_ips': [{
'subnet_id': self.subnet_id,
'subnet': {'id': self.subnet_id}}]}},
'namespace': None,
'method': 'create_namespace_agent'})
def test_destroy_namespace_agent(self):
self.api.destroy_namespace_agent(self.core_plugin, self.context,
self.port_id)
self.mock_rpc_proxy_call.assert_called_once_with(
self.context, {'args': {'port_id': self.port_id},
'namespace': None,
'method': 'destroy_namespace_agent'})
self.core_plugin.delete_port.assert_called_once_with(self.context,
self.port_id)
def test_creeat_rpc_proxy(self):
self.api.create_rpc_proxy(
self.context, self.port_id, self.src_target, self.dst_target,
self.direction)
self.mock_rpc_proxy_call.assert_called_once_with(
self.context, {'args': {'port_id': self.port_id,
'src_target': self.src_target,
'dst_unix_target': self.dst_target,
'direction': self.direction},
'namespace': None,
'method': 'create_rpc_proxy'})
def test_destroy_rpc_proxy(self):
proxy_id = str(uuid.uuid4())
rpc_proxy_id = str(uuid.uuid4())
self.api.destroy_rpc_proxy(self.context, proxy_id, rpc_proxy_id)
self.mock_rpc_proxy_call.assert_called_once_with(
self.context, {'args': {'proxy_id': proxy_id,
'rpc_proxy_id': rpc_proxy_id},
'namespace': None,
'method': 'destroy_rpc_proxy'})
def test_create_rpc_namespace_proxy(self):
dst_transport_url = 'fake:///'
direction = 'send'
self.api.create_rpc_namespace_proxy(
self.context, self.port_id, self.src_target,
dst_transport_url, self.dst_target, direction)
self.mock_rpc_proxy_call.assert_called_once_with(
self.context, {'args': {'dst_transport_url': dst_transport_url,
'direction': direction,
'port_id': self.port_id,
'src_target': self.src_target,
'dst_target': self.dst_target},
'namespace': None,
'method': 'create_rpc_namespace_proxy'})
def test_destroy_rpc_namespace_proxy(self):
ns_proxy_id = str(uuid.uuid4())
self.api.destroy_rpc_namespace_proxy(self.context, self.port_id,
ns_proxy_id)
self.mock_rpc_proxy_call.assert_called_once_with(
self.context, {'args': {'port_id': self.port_id,
'namespace_proxy_id': ns_proxy_id},
'namespace': None,
'method': 'destroy_rpc_namespace_proxy'})

View File

@ -1,368 +0,0 @@
# Copyright 2014 Intel Corporation.
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
import eventlet
eventlet.monkey_patch()
import atexit
import inspect
import uuid
import netaddr
from oslo import messaging
from oslo.messaging._drivers import impl_unix
from oslo.messaging import proxy
from oslo.messaging import rpc
from oslo.messaging import transport
from oslo_config import cfg
from tacker.agent.common import config as agent_config
from tacker.agent.linux import external_process
from tacker.agent.linux import interface
from tacker.agent.linux import ip_lib
from tacker.common import config
from tacker.common import legacy
from tacker.common import topics
from tacker import manager
from tacker.openstack.common import excutils
from tacker.openstack.common import importutils
from tacker.openstack.common import lockutils
from tacker.openstack.common import log as logging
from tacker.openstack.common import service
from tacker import oslo_service
from tacker.services.loadbalancer.drivers.haproxy import namespace_driver
from tacker.vm.agent import config as vm_config
from tacker.vm.agent import target
# _DEBUG = False
_DEBUG = True
LOG = logging.getLogger(__name__)
class NamespaceProxyAgentApi(object):
"""
api servicevm agent -> namespace proxy agent
"""
def __init__(self, unix_transport):
super(NamespaceProxyAgentApi, self).__init__()
target_ = messaging.Target(topic=topics.SERVICEVM_AGENT_NAMEPSACE)
self._client = rpc.RPCClient(unix_transport, target_)
def _call(self, **kwargs):
method = inspect.stack()[1][3]
ctxt = {}
return self._client.call(ctxt, method, **kwargs)
def destroy_namespace_agent(self):
return self._call()
def create_rpc_namespace_proxy(self, src_target,
dst_transport_url, dst_target, direction):
return self._call(
src_target=src_target, dst_transport_url=dst_transport_url,
dst_target=dst_target, direction=direction)
def destroy_rpc_namespace_proxy(self, namespace_proxy_id):
return self._call(namespace_proxy_id=namespace_proxy_id)
class NamespaceAgent(object):
def __init__(self, port_id, unix_transport, pm):
super(NamespaceAgent, self).__init__()
self.port_id = port_id
self.unix_transport = unix_transport
self.pm = pm
self.local_proxies = {}
self.api = NamespaceProxyAgentApi(unix_transport)
class ServiceVMAgent(manager.Manager):
_NS_PREFIX = 'qsvcvm-'
@staticmethod
def _get_ns_name(port_id):
return ServiceVMAgent._NS_PREFIX + port_id
def __init__(self, host=None, **kwargs):
conf = kwargs['conf']
super(ServiceVMAgent, self).__init__(host=host)
self.conf = conf
self.root_helper = agent_config.get_root_helper(self.conf)
self._proxies = {}
try:
vif_driver = importutils.import_object(conf.interface_driver, conf)
except ImportError:
with excutils.save_and_reraise_exception():
msg = (_('Error importing interface driver: %s')
% conf.interface_driver)
LOG.error(msg)
self._vif_driver = vif_driver
self._proxy_agents = {}
self._src_transport = None
self._get_src_transport()
atexit.register(self._atexit)
def _atexit(self):
for ns_agent in self._proxy_agents.values():
ns_agent.pm.disable()
for port_id in self._proxy_agents.keys():
self._unplug(port_id)
def _get_src_transport(self):
conf = self.conf
conf.register_opts(transport._transport_opts)
rpc_backend = conf.rpc_backend
if conf.transport_url is not None:
src_url = conf.transport_url
elif (rpc_backend.endswith('.impl_kombu') or
rpc_backend.endswith('.impl_rabbit')):
from oslo.messaging._drivers import impl_rabbit
conf.register_opts(impl_rabbit.rabbit_opts)
src_url = 'rabbit://%s:%s@%s:%s/%s' % (
conf.rabbit_userid, conf.rabbit_password,
conf.rabbit_host, conf.rabbit_port,
conf.rabbit_virtual_host)
elif rpc_backend.endswith('.impl_qpid'):
from oslo.messaging._drivers import impl_qpid
conf.register_opts(impl_qpid.qpid_opts)
src_url = 'qpid://%s:%s@%s:%s/' % (
conf.pid_username, conf.qpid_password,
conf.qpid_hostname, conf.qpid_port)
elif rpc_backend.endswith('.impl_zmq'):
from oslo.messaging._drivers import impl_zmq
conf.register_opts(impl_zmq.zmq_opts)
src_url = 'zmq://%s:%s/' % (conf.rpc_zmq_host, conf.rpc_zmq_port)
elif rpc_backend.endswith('.impl_fake'):
src_url = 'fake:///'
else:
raise NotImplementedError(
_('rpc_backend %s is not supported') % rpc_backend)
self._src_transport = messaging.get_transport(conf, src_url)
def __del__(self):
if self._src_transport is not None:
self._src_transport.cleanup()
# def create_device(self, context, device):
# LOG.debug(_('create_device %s'), device)
# def update_device(self, context, device):
# LOG.debug(_('update_device %s'), device)
# def delete_device(self, context, device):
# LOG.debug(_('delete_device %s'), device)
# def create_service(self, context, device, service_instance):
# LOG.debug(_('create_service %(device)s %(service_instance)s'),
# {'device': device, 'service_instance': service_instance})
# def update_service(self, context, device, service_instance):
# LOG.debug(_('update_service %(device)s %(service_instance)s'),
# {'device': device, 'service_instance': service_instance})
# def delete_service(self, context, device, service_instance):
# LOG.debug(_('delete_service %(device)s %(service_instance)s'),
# {'device': device, 'service_instance': service_instance})
# TODO(yamahata): copied from loadbalancer/drivers/haproxy/namespace_driver
# consolidate it.
def _plug(self, port_config):
vif_driver = self._vif_driver
namespace = self._get_ns_name(port_config['id'])
interface_name = vif_driver.get_device_name(
namespace_driver.Wrap(port_config))
if not ip_lib.device_exists(interface_name, self.root_helper,
namespace):
vif_driver.plug(
port_config['network_id'], port_config['id'], interface_name,
port_config['mac_address'], namespace=namespace)
cidrs = [
'%s/%s' % (ip['ip_address'],
netaddr.IPNetwork(ip['subnet']['cidr']).prefixlen)
for ip in port_config['fixed_ips']
]
vif_driver.init_l3(interface_name, cidrs, namespace=namespace)
gw_ip = port_config['fixed_ips'][0]['subnet'].get('gateway_ip')
if gw_ip:
cmd = ['route', 'add', 'default', 'gw', gw_ip]
ip_wrapper = ip_lib.IPWrapper(self.root_helper,
namespace=namespace)
ip_wrapper.netns.execute(cmd, check_exit_code=False)
def _unplug(self, port_id):
port_stub = {'id': port_id}
namespace = self._get_ns_name(port_id)
vif_driver = self._vif_driver
interface_name = vif_driver.get_device_name(
namespace_driver.Wrap(port_stub))
vif_driver.unplug(interface_name, namespace=namespace)
@lockutils.synchronized('servicevm-agent', 'tacker-')
def create_namespace_agent(self, context, port):
conf = self.conf
port_id = port['id']
path = 'rpc-proxy-%s' % port_id
unix_url = 'punix:///%s' % path
unix_transport = messaging.get_transport(conf, unix_url)
unix_transport._driver.punix_listening.wait()
self._plug(port)
pm = external_process.ProcessManager(
conf, port_id, root_helper=self.root_helper,
namespace=self._get_ns_name(port_id))
def cmd_callback(pid_file_name):
cmd = ['tacker-servicevm-ns-rpc-proxy',
'--pid-file=%s' % pid_file_name,
'--svcvm-proxy-dir=%s' % conf.svcvm_proxy_dir,
'--src-transport-url', 'unix:///%s' % path]
cmd.extend(agent_config.get_log_args(
conf, 'tacker-servicevm-ns-rpc-proxy-%s.log' % port_id))
if _DEBUG:
cmd += ['--log-file=/tmp/tacker-servicevm-ns-rpc-proxy-'
'%s.log' % port_id]
return cmd
pm.enable(cmd_callback)
ns_agent = NamespaceAgent(port_id, unix_transport, pm)
self._proxy_agents[port_id] = ns_agent
@lockutils.synchronized('servicevm-agent', 'tacker-')
def destroy_namespace_agent(self, context, port_id):
ns_agent = self._proxy_agents.pop(port_id)
ns_agent.api.destroy_namespace_agent()
for proxy_server in ns_agent.local_proxies.values():
proxy_server.stop()
for proxy_server in ns_agent.local_proxies.values():
proxy_server.wait()
ns_agent.pm.disable()
self._unplug(port_id)
def _create_rpc_proxy(self, ns_agent, src_transport, src_target,
dst_transport, dst_target):
rpc_proxy_id = str(uuid.uuid4())
src_target = target.target_parse(src_target)
assert src_target.server
dst_target = target.target_parse(dst_target)
assert dst_target.server
proxy_server = proxy.get_proxy_server(
src_transport, src_target, None,
dst_transport, dst_target, None, executor='eventlet')
ns_agent.local_proxies[rpc_proxy_id] = proxy_server
proxy_server.start()
return rpc_proxy_id
def _get_proxy_agent(self, port_id):
ns_agent = self._proxy_agents.get(port_id)
if ns_agent is None:
msg = _('unknown port_id %s') % port_id
LOG.error(msg)
raise RuntimeError(msg)
return ns_agent
@lockutils.synchronized('servicevm-agent', 'tacker-')
def create_rpc_proxy(self, context, port_id,
src_target, dst_unix_target, direction):
ns_agent = self._get_proxy_agent(port_id)
if direction == 'send':
return self._create_rpc_proxy(
ns_agent, self._src_transport, src_target,
ns_agent.unix_transport, dst_unix_target)
elif direction == 'receive':
return self._create_rpc_proxy(
ns_agent, ns_agent.unix_transport, dst_unix_target,
self._src_transport, src_target)
else:
msg = _('unknown direction %s') % direction
LOG.error(msg)
raise RuntimeError(msg)
@lockutils.synchronized('servicevm-agent', 'tacker-')
def destroy_rpc_proxy(self, context, port_id, rpc_proxy_id):
ns_agent = self._get_proxy_agent(port_id)
proxy_server = ns_agent.local_proxies.pop(rpc_proxy_id)
proxy_server.stop()
proxy_server.wait()
@lockutils.synchronized('servicevm-agent', 'tacker-')
def create_rpc_namespace_proxy(self, context, port_id, src_target,
dst_transport_url, dst_target, direction):
ns_agent = self._get_proxy_agent(port_id)
ns_proxy_id = ns_agent.api.create_rpc_namespace_proxy(
src_target, dst_transport_url, dst_target, direction)
LOG.debug("create_rpc_namespace_proxy %s", ns_proxy_id)
return ns_proxy_id
@lockutils.synchronized('servicevm-agent', 'tacker-')
def destroy_rpc_namespace_proxy(self, context,
port_id, namespace_proxy_id):
ns_agent = self._get_proxy_agent(port_id)
return ns_agent.api.destroy_rpc_namespace_proxy(namespace_proxy_id)
class ServiceVMAgentWithStateReport(ServiceVMAgent):
# TODO(yamahata)
pass
def _register_options(conf):
conf.register_opts(interface.OPTS)
agent_config.register_interface_driver_opts_helper(conf)
agent_config.register_agent_state_opts_helper(conf)
agent_config.register_root_helper(conf)
conf.register_opts(vm_config.OPTS)
conf.register_opts(impl_unix.unix_opts)
def main():
conf = cfg.CONF
# NOTE(yamahata): work around. rpc driver-dependent config variables
# remove this line once tacker are fully ported to oslo.messaging
from tacker.openstack.common import rpc
conf.unregister_opts(rpc.rpc_opts)
# NOTE(yamahata): corresponds to
# tacker.common.config.rpc.set_default(control_exchange='tacker')
messaging.set_transport_defaults('tacker')
_register_options(conf)
conf(project='tacker')
config.setup_logging(conf)
legacy.modernize_quantum_config(conf)
# NOTE(yamahata): workaround for state_path
# oslo.messaging doesn't know state_path
conf.set_override('rpc_unix_ipc_dir', conf.svcvm_proxy_dir)
server = oslo_service.TackerService.create(
topic=topics.SERVICEVM_AGENT,
manager='tacker.vm.agent.agent.ServiceVMAgentWithStateReport',
report_interval=conf.AGENT.report_interval,
conf=conf)
service.launch(server).wait()
if __name__ == '__main__':
main()

View File

@ -1,31 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013, 2014 Intel Corporation.
# Copyright 2013, 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
from oslo_config import cfg
OPTS = [
cfg.StrOpt('svcvm-proxy-dir',
default='$state_path/svcvm_proxy_dir',
help=_('Location for servicevm agent proxy '
'UNIX domain socket')),
]

View File

@ -1,340 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013, 2014 Intel Corporation.
# Copyright 2013, 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
import eventlet
eventlet.monkey_patch()
import os.path
import random
import sys
import uuid
from oslo import messaging
from oslo.messaging._drivers import impl_unix
from oslo.messaging import proxy
from oslo.messaging import rpc
from oslo_config import cfg
from tacker.agent.common import config as agent_config
from tacker.agent.linux import daemon
from tacker.common import config
from tacker.common import legacy
from tacker.common import topics
from tacker.common import utils
from tacker import context
from tacker import manager
from tacker.openstack.common import importutils
from tacker.openstack.common import lockutils
from tacker.openstack.common import log as logging
from tacker.openstack.common import service
from tacker import oslo_service
from tacker.vm.agent import config as vm_config
from tacker.vm.agent import target
LOG = logging.getLogger(__name__)
class NamespaceProxies(object):
def __init__(self):
super(NamespaceProxies, self).__init__()
self.urls = {} # dict: transport_url -> transport
self.transports = {} # dict: transport -> transport_url
self.proxies = {} # uuid -> (transport, proxy_server)
self.transport_to_proxies = {} # transport -> set of proxy_servers
def get_transport(self, transport_url):
return self.urls.get(transport_url, None)
def add_transport(self, transport_url, transport):
assert transport_url not in self.urls
assert transport not in self.transports
self.transports[transport_url] = transport
self.urls[transport] = transport_url
def del_proxy(self, namespace_proxy_id):
transport, proxy_server = self.proxies.pop(namespace_proxy_id)
proxies = self.transport_to_proxies[transport]
proxies.remove(proxy_server)
if proxies:
transport = None
else:
transport_url = self.urls.pop(transport)
del self.transports[transport_url]
del self.transport_to_proxies[transport]
return (transport, proxy_server)
def add_proxy(self, namespace_proxy_id, transport, proxy_server):
assert namespace_proxy_id not in self.proxies
self.proxies[namespace_proxy_id] = (transport, proxy_server)
proxies = self.transport_to_proxies.setdefault(transport, set())
proxies.add(proxy_server)
class ServiceVMNamespaceAgent(manager.Manager):
def __init__(self, host=None, **kwargs):
LOG.debug(_('host %(host)s, kwargs %(kwargs)s'),
{'host': host, 'kwargs': kwargs})
super(ServiceVMNamespaceAgent, self).__init__(host=host)
for key in ('conf', 'src_transport', 'server_stop', ):
setattr(self, key, kwargs[key])
assert self.src_transport is not None
assert self.server_stop is not None
self._proxies = NamespaceProxies()
def stop(self):
LOG.debug('stop')
self.server_stop()
ns_proxies = self._proxies
for _transport, proxy_server in ns_proxies.proxies.values():
proxy_server.stop()
def wait(self):
LOG.debug('wait')
ns_proxies = self._proxies
for _transport, proxy_server in ns_proxies.proxies.values():
proxy_server.wait()
for transport in ns_proxies.transports.values():
transport.cleanup()
@lockutils.synchronized('servicevm-namespace-agent', 'tacker-')
def destroy_namespace_agent(self, context):
self.stop()
def _create_rpc_namespace_proxy(self, src_transport, src_target,
dst_transport, dst_target):
src_target = target.target_parse(src_target)
assert src_target.server
dst_target = target.target_parse(dst_target)
assert dst_target.server
return proxy.get_proxy_server(
src_transport, src_target, None,
dst_transport, dst_target, None, executor='eventlet')
@lockutils.synchronized('servicevm-namespace-agent', 'tacker-')
def create_rpc_namespace_proxy(self, context, src_target,
dst_transport_url, dst_target, direction):
LOG.debug('create_rpc_namespace_proxy %s %s %s %s %s',
context, src_target, dst_transport_url, dst_target,
direction)
dst_transport = self._proxies.get_transport(dst_transport_url)
if dst_transport is None:
dst_transport = messaging.get_transport(self.conf,
dst_transport_url)
self._proxies.add_transport(dst_transport_url, dst_transport)
if direction == 'send':
proxy_server = self._create_rpc_namespace_proxy(
self.src_transport, src_target, dst_transport, dst_target)
elif direction == 'receive':
proxy_server = self._create_rpc_namespace_proxy(
dst_transport, dst_target, self.src_transport, src_target)
else:
msg = _('unknown direction %s') % direction
LOG.error(msg)
raise RuntimeError(msg)
# proxy_server.start()
eventlet.spawn(proxy_server.start)
namespace_proxy_id = str(uuid.uuid4())
self._proxies.add_proxy(namespace_proxy_id,
dst_transport, proxy_server)
LOG.debug('namespace_proxy_id %s', namespace_proxy_id)
return namespace_proxy_id
@lockutils.synchronized('servicevm-namespace-agent', 'tacker-')
def destroy_rpc_namespace_proxy(self, context, namespace_proxy_id):
LOG.debug('namespace_proxy_id %s', namespace_proxy_id)
try:
transport, proxy_server = self._proxies.del_proxy(
namespace_proxy_id)
except KeyError:
return
proxy_server.stop()
proxy_server.wait()
if transport is not None:
transport.cleanup()
# TODO(yamahata): class Service is stolen from nova.service and modified.
# port tacker to oslo.messaging and delete this class.
class Service(service.Service):
def __init__(self, conf, host, binary, topic, manager_,
report_interval=None, periodic_enable=None,
periodic_fuzzy_delay=None, periodic_interval_max=None,
*args, **kwargs):
super(Service, self).__init__()
self.conf = conf
self.host = host
self.binary = binary
self.topic = topic
self.manager_class_name = manager_
manager_class = importutils.import_class(self.manager_class_name)
kwargs_ = kwargs.copy()
kwargs_['conf'] = conf
self.manager = manager_class(host=self.host, *args, **kwargs_)
self.src_transport = kwargs['src_transport']
self.rpcserver = None
self.report_interval = report_interval
self.periodic_enable = periodic_enable
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.periodic_interval_max = periodic_interval_max
self.saved_args, self.saved_kwargs = args, kwargs
def start(self):
self.manager.init_host()
LOG.debug(_("Creating RPC server for service %(topic)s %(driver)s"),
{'topic': self.topic, 'driver': self.src_transport._driver})
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
self.rpcserver = rpc.get_rpc_server(self.src_transport, target,
endpoints, executor='eventlet')
self.rpcserver.start()
if self.periodic_enable:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None
self.tg.add_dynamic_timer(
self.periodic_tasks, initial_delay=initial_delay,
periodic_interval_max=self.periodic_interval_max)
self.manager.after_start()
LOG.debug('start done')
@classmethod
def create(cls, conf, src_transport,
host=None, binary=None, topic=None, manager_=None, **kwargs):
if not host:
host = conf.host
if not binary:
binary = os.path.basename(sys.argv[0])
if not topic:
topic = binary.rpartition('tacker-')[2]
topic = topic.replace('-', '_')
if not manager_:
manager_ = conf.get('%s_manager' % topic, None)
service_obj = cls(conf, host, binary, topic, manager_,
src_transport=src_transport, **kwargs)
return service_obj
def kill(self):
self.stop()
def stop(self):
try:
self.rpcserver.stop()
self.manager.stop()
except Exception:
LOG.exception(_('failed to stop rpcserver'))
super(Service, self).stop()
def wait(self):
try:
self.rpcserver.wait()
self.manager.wait()
except Exception:
LOG.exception(_('failed to wait rpcserver'))
super(Service, self).wait()
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
ctxt = context.get_admin_context()
return self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
def report_state(self):
pass
class ProxyDaemon(daemon.Daemon):
def __init__(self, conf):
self._conf = conf
super(ProxyDaemon, self).__init__(conf.pid_file, uuid=conf.port_id)
def run(self):
conf = self._conf
def server_stop():
server.stop()
LOG.debug(_('src transport url %s'), conf.src_transport_url)
src_transport = messaging.get_transport(
conf, conf.src_transport_url,
aliases=oslo_service.TRANSPORT_ALIASES)
server = Service.create(
conf=conf, topic=topics.SERVICEVM_AGENT_NAMEPSACE,
manager_=('tacker.vm.agent.namespace_proxy.'
'ServiceVMNamespaceAgent'),
src_transport=src_transport, server_stop=server_stop)
service.launch(server).wait()
src_transport.cleanup()
def _register_options(conf):
cli_opts = [
cfg.StrOpt('pid-file', help=_('pid file of this process.')),
cfg.StrOpt('port-id', help=_('uuid of port')),
cfg.StrOpt('src-transport-url', help='src transport url'),
cfg.BoolOpt('daemonize', default=True, help=_('Run as daemon'))
]
conf.register_cli_opts(cli_opts)
agent_config.register_agent_state_opts_helper(conf)
agent_config.register_root_helper(conf)
conf.register_cli_opts(vm_config.OPTS)
conf.register_opts(impl_unix.unix_opts)
def main():
conf = cfg.CONF
# NOTE(yamahata): work around. rpc driver-dependent config variables
# remove this line once tacker are fully ported to oslo.messaging
from tacker.openstack.common import rpc
conf.unregister_opts(rpc.rpc_opts)
# NOTE(yamahata): corresponds to
# tacker.common.config.rpc.set_default(control_exchange='tacker')
messaging.set_transport_defaults('tacker')
_register_options(conf)
conf(project='tacker')
config.setup_logging(conf)
legacy.modernize_quantum_config(conf)
# NOTE(yamahata): workaround for state_path
# oslo.messaging doesn't know state_path
conf.set_override('rpc_unix_ipc_dir', conf.svcvm_proxy_dir)
utils.log_opt_values(LOG)
proxy = ProxyDaemon(conf)
if conf.daemonize:
proxy.start()
else:
proxy.run()
if __name__ == '__main__':
main()

View File

@ -1,45 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013, 2014 Intel Corporation.
# Copyright 2013, 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
from oslo.messaging import target
_KEYS = ['exchange', 'topic', 'namespace', 'version', 'server', 'fanout']
_BOOLEAN_STATES = {'1': True, 'yes': True, 'true': True, 'on': True,
'0': False, 'no': False, 'false': False, 'off': False}
def target_parse(target_str):
attrs = target_str.split(',')
kwargs = dict(attr.split('=', 1) for attr in attrs)
if 'fanout' in kwargs:
# should use oslo_config.types.Bool.__call__ ?
value = kwargs['fanout']
kwargs['fanout'] = _BOOLEAN_STATES[value.lower()]
return target.Target(**kwargs)
def target_str(target):
attrs = [(key, getattr(target, key))
for key in _KEYS if getattr(target, key) is not None]
return ','.join('%s=%s' % attr for attr in attrs)

View File

@ -1,34 +0,0 @@
# Copyright 2014 Intel Corporation.
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
from oslo_config import cfg
from tacker.common import topics
_RPC_AGENT_OPTS = [
cfg.StrOpt('device_id', default=None, help=_('The device id')),
cfg.StrOpt('topic', default=topics.SERVICEVM_AGENT,
help=_('rpc topic for agent to subscribe')),
]
def register_servicevm_agent_opts(conf):
conf.register_opts(_RPC_AGENT_OPTS, group='servicevm_agent')

View File

@ -1,181 +0,0 @@
# Copyright 2014 Intel Corporation.
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
from oslo_config import cfg
from tacker.db.vm import proxy_db
from tacker.openstack.common import log as logging
from tacker.vm import constants
from tacker.vm.mgmt_drivers.rpc import rpc
LOG = logging.getLogger(__name__)
class AgentRpcProxyMGMTDriver(rpc.AgentRpcMGMTDriver):
_TRANSPORT_OPTS = [
cfg.StrOpt('dst_transport_url',
# TODO(yamahata): make user, pass, port configurable
# per servicevm
#'<scheme>://<user>:<pass>@<host>:<port>/'
default='rabbit://guest:guest@%(host)s:5672/',
help='A URL representing the messaging driver '
'to use and its full configuration.'),
]
def __init__(self, conf=None):
super(AgentRpcProxyMGMTDriver, self).__init__()
self.db = proxy_db.RpcProxyDb()
self.conf = conf or cfg.CONF
self.conf.register_opts(self._TRANSPORT_OPTS)
def get_type(self):
return 'agent-proxy'
def get_name(self):
return 'agent-proxy'
def get_description(self):
return 'agent-proxy'
def mgmt_create_post(self, plugin, context, device):
LOG.debug('mgmt_create_post')
mgmt_entries = [sc_entry for sc_entry in device['service_context']
if (sc_entry['role'] == constants.ROLE_MGMT and
sc_entry.get('port_id'))]
assert mgmt_entries
mgmt_entry = mgmt_entries[0]
vm_port_id = mgmt_entry['port_id']
vm_port = plugin._core_plugin.get_port(context, vm_port_id)
fixed_ip = vm_port['fixed_ips'][0]['ip_address']
# TODO(yamahata): make all parameters(scheme, user, pass, port)
# configurable
dst_transport_url = self.conf.dst_transport_url % {'host': fixed_ip}
network_id = mgmt_entry['network_id']
assert network_id
proxy_api = plugin.proxy_api
port_id = proxy_api.create_namespace_agent(plugin._core_plugin,
context, network_id)
device_id = device['id']
target = 'topic=%s,server=%s' % (self._mgmt_topic(device),
self._mgmt_server(device))
svr_proxy_id = proxy_api.create_rpc_proxy(
context, port_id, target, target, 'receive')
LOG.debug('mgmt_create_post: svr_proxy_id: %s', svr_proxy_id)
svr_ns_proxy_id = proxy_api.create_rpc_namespace_proxy(
context, port_id, target, dst_transport_url, target, 'receive')
LOG.debug('mgmt_create_post: svr_ns_proxy_id: %s', svr_ns_proxy_id)
clt_proxy_id = proxy_api.create_rpc_proxy(
context, port_id, target, target, 'send')
LOG.debug('mgmt_create_post: clt_proxy_id: %s', clt_proxy_id)
clt_ns_proxy_id = proxy_api.create_rpc_namespace_proxy(
context, port_id, target, dst_transport_url, target, 'send')
LOG.debug('mgmt_create_post: clt_ns_proxy_id: %s', clt_ns_proxy_id)
LOG.debug('mgmt_create_ppost: '
'svr: %s svr_ns: %s clt: %s clt_ns: %s ',
svr_proxy_id, svr_ns_proxy_id, clt_proxy_id, clt_ns_proxy_id)
self.db.create_proxy_mgmt_port(
context, device_id, port_id, dst_transport_url,
svr_proxy_id, svr_ns_proxy_id, clt_proxy_id, clt_ns_proxy_id)
def mgmt_delete_post(self, plugin, context, device):
LOG.debug('mgmt_delete_post')
device_id = device['id']
proxy_mgmt_port = self.db.get_proxy_mgmt_port(context, device_id)
port_id = proxy_mgmt_port['port_id']
svr_proxy_id = proxy_mgmt_port['svr_proxy_id']
svr_ns_proxy_id = proxy_mgmt_port['svr_ns_proxy_id']
clt_proxy_id = proxy_mgmt_port['clt_proxy_id']
clt_ns_proxy_id = proxy_mgmt_port['clt_ns_proxy_id']
proxy_api = plugin.proxy_api
proxy_api.destroy_rpc_namespace_proxy(context,
port_id, clt_ns_proxy_id)
proxy_api.destroy_rpc_proxy(context, port_id, clt_proxy_id)
proxy_api.destroy_rpc_namespace_proxy(context,
port_id, svr_ns_proxy_id)
proxy_api.destroy_rpc_proxy(context, port_id, svr_proxy_id)
proxy_api.destroy_namespace_agent(plugin._core_plugin,
context, port_id)
self.db.delete_proxy_mgmt_port(context, port_id)
def mgmt_service_create_pre(self, plugin, context, device,
service_instance):
LOG.debug('mgmt_service_create_pre')
proxy_mgmt_port = self.db.get_proxy_mgmt_port(context, device['id'])
port_id = proxy_mgmt_port['port_id']
dst_transport_url = proxy_mgmt_port['dst_transport_url']
proxy_api = plugin.proxy_api
target = 'topic=%s,server=%s' % (
self._mgmt_service_topic(device, service_instance),
self._mgmt_service_server(device, service_instance))
svr_proxy_id = proxy_api.create_rpc_proxy(
context, port_id, target, target, 'receive')
LOG.debug('mgmt_service_create_pre: svr_proxy_id: %s', svr_proxy_id)
svr_ns_proxy_id = proxy_api.create_rpc_namespace_proxy(
context, port_id, target, dst_transport_url, target, 'receive')
LOG.debug('mgmt_service_create_pre: svr_ns_proxy_id: %s',
svr_ns_proxy_id)
clt_proxy_id = proxy_api.create_rpc_proxy(
context, port_id, target, target, 'send')
LOG.debug('mgmt_service_create_pre: clt_proxy_id: %s', clt_proxy_id)
clt_ns_proxy_id = proxy_api.create_rpc_namespace_proxy(
context, port_id, target, dst_transport_url, target, 'send')
LOG.debug('mgmt_service_create_pre: clt_ns_proxy_id: %s',
clt_ns_proxy_id)
LOG.debug('mgmt_service_create_pre: '
'svr: %s svr_ns: %s clt: %s clt_ns: %s ',
svr_proxy_id, svr_ns_proxy_id, clt_proxy_id, clt_ns_proxy_id)
self.db.create_proxy_service_port(
context, service_instance['id'],
svr_proxy_id, svr_ns_proxy_id, clt_proxy_id, clt_ns_proxy_id)
def mgmt_service_delete_post(self, plugin, context, device,
service_instance):
LOG.debug('mgmt_service_delete_post')
proxy_mgmt_port = self.db.get_proxy_mgmt_port(context, device['id'])
port_id = proxy_mgmt_port['port_id']
service_instance_id = service_instance['id']
proxy_service_port = self.db.get_proxy_service_port(
context, service_instance_id)
svr_proxy_id = proxy_service_port['svr_proxy_id']
svr_ns_proxy_id = proxy_service_port['svr_ns_proxy_id']
clt_proxy_id = proxy_service_port['clt_proxy_id']
clt_ns_proxy_id = proxy_service_port['clt_ns_proxy_id']
proxy_api = plugin.proxy_api
proxy_api.destroy_rpc_namespace_proxy(context,
port_id, clt_ns_proxy_id)
proxy_api.destroy_rpc_proxy(context, port_id, clt_proxy_id)
proxy_api.destroy_rpc_namespace_proxy(context,
port_id, svr_ns_proxy_id)
proxy_api.destroy_rpc_proxy(context, port_id, svr_proxy_id)
self.db.delete_proxy_service_port(context, service_instance_id)

View File

@ -1,107 +0,0 @@
# Copyright 2014 Intel Corporation.
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
from tacker.common import rpc_compat
from tacker.common import topics
from tacker.vm.mgmt_drivers import abstract_driver
from tacker.vm.mgmt_drivers import constants
class ServiceVMAgentRpcApi(rpc_compat.RpcProxy):
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic=topics.SERVICEVM_AGENT):
super(ServiceVMAgentRpcApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
def rpc_cast(self, context, method, kwargs, topic):
self.cast(context, self.make_msg(method, **kwargs), topic=topic)
# TODO(yamahata): port this to oslo.messaging
# address format needs be changed to
# oslo.messaging.target.Target
class AgentRpcMGMTDriver(abstract_driver.DeviceMGMTAbstractDriver):
_TOPIC = topics.SERVICEVM_AGENT # can be overridden by subclass
_RPC_API = {} # topic -> ServiceVMAgentRpcApi
@property
def _rpc_api(self):
topic = self._TOPIC
api = self._RPC_API.get(topic)
if api is None:
api = ServiceVMAgentRpcApi(topic=topic)
api = self._RPC_API.setdefault(topic, api)
return api
def get_type(self):
return 'agent-rpc'
def get_name(self):
return 'agent-rpc'
def get_description(self):
return 'agent-rpc'
def mgmt_get_config(self, plugin, context, device):
return {'/etc/tacker/servicevm-agent.ini':
'[servicevm]\n'
'topic = %s\n'
'device_id = %s\n'
% (self._TOPIC, device['id'])}
@staticmethod
def _address(topic, server):
return '%s.%s' % (topic, server)
def _mgmt_server(self, device):
return device['id']
def _mgmt_topic(self, device):
return '%s-%s' % (self._TOPIC, self._mgmt_server(device))
def mgmt_url(self, plugin, context, device):
return self._address(self._mgmt_topic(device),
self._mgmt_server(device))
def mgmt_call(self, plugin, context, device, kwargs):
topic = device['mgmt_url']
method = kwargs[constants.KEY_ACTION]
kwargs_ = kwargs[constants.KEY_KWARGS]
self._rpc_api.rpc_cast(context, method, kwargs_, topic)
def _mgmt_service_server(self, device, service_instance):
return '%s-%s' % (device['id'], service_instance['id'])
def _mgmt_service_topic(self, device, service_instance):
return '%s-%s' % (self._TOPIC,
self._mgmt_service_server(device, service_instance))
def mgmt_service_address(self, plugin, context, device, service_instance):
return self._address(
self._mgmt_service_topic(device, service_instance),
self._mgmt_service_server(device, service_instance))
def mgmt_service_call(self, plugin, context, device,
service_instance, kwargs):
method = kwargs[constants.KEY_ACTION]
kwargs_ = kwargs[constants.KEY_KWARGS]
topic = service_instance['mgmt_url']
self._rpc_api.rpc_cast(context, method, kwargs_, topic)

View File

@ -28,7 +28,6 @@ from sqlalchemy.orm import exc as orm_exc
from tacker.api.v1 import attributes
from tacker.common import driver_manager
from tacker.common import topics
from tacker import context as t_context
from tacker.db.vm import proxy_db # noqa
from tacker.db.vm import vm_db
@ -38,7 +37,6 @@ from tacker.openstack.common import log as logging
from tacker.plugins.common import constants
from tacker.vm.mgmt_drivers import constants as mgmt_constants
from tacker.vm import monitor
from tacker.vm import proxy_api
LOG = logging.getLogger(__name__)
@ -171,7 +169,6 @@ class ServiceVMPlugin(vm_db.ServiceResourcePluginDb, ServiceVMMgmtMixin):
self._device_manager = driver_manager.DriverManager(
'tacker.servicevm.device.drivers',
cfg.CONF.servicevm.infra_driver)
self.proxy_api = proxy_api.ServiceVMPluginApi(topics.SERVICEVM_AGENT)
self._device_status = monitor.DeviceStatus()
def spawn_n(self, function, *args, **kwargs):

View File

@ -1,116 +0,0 @@
# Copyright 2013, 2014 Intel Corporation.
# Copyright 2013, 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
import inspect
from tacker.api.v1 import attributes
from tacker.common import rpc_compat
from tacker.openstack.common import excutils
from tacker.openstack.common import log as logging
from tacker.plugins.common import constants
LOG = logging.getLogger(__name__)
# TODO(yamahata): convert oslo.messaging
class ServiceVMPluginApi(rpc_compat.RpcProxy):
API_VERSION = '1.0'
def __init__(self, topic):
super(ServiceVMPluginApi, self).__init__(topic, self.API_VERSION)
def _call(self, context, **kwargs):
method = inspect.stack()[1][3]
LOG.debug('ServiceVMPluginApi method = %s kwargs = %s', method, kwargs)
return self.call(context, self.make_msg(method, **kwargs))
def create_namespace_agent(self, core_plugin, context, network_id):
"""
:param dst_transport_url: st
:type dst_transport_url: str that represents
oslo.messaging.transportURL
"""
port_data = {
'name': '_svcvm-rpc-namespace-agent-' + network_id,
'network_id': network_id,
'mac_address': attributes.ATTR_NOT_SPECIFIED,
'admin_state_up': True,
'device_id': '_svcvm-rpc-proxy-' + network_id,
'device_owner': 'tacker:' + constants.SERVICEVM,
'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
}
port = core_plugin.create_port(context, {'port': port_data})
for i in xrange(len(port['fixed_ips'])):
ipallocation = port['fixed_ips'][i]
subnet_id = ipallocation['subnet_id']
subnet = core_plugin.get_subnet(context, subnet_id)
ipallocation['subnet'] = subnet
port_id = port['id']
try:
self._call(context, port=port)
except Exception:
with excutils.save_and_reraise_exception():
core_plugin.delete_port(context, port_id)
return port_id
def destroy_namespace_agent(self, core_plugin, context, port_id):
self._call(context, port_id=port_id)
core_plugin.delete_port(context, port_id)
def create_rpc_proxy(self, context, port_id, src_target, dst_unix_target,
direction):
"""
:param src_target: target to listen/send
:type src_target: oslo.messaging.Target
:param dst_unix_target: target to send/listen
:type dst_unix_target: oslo.messaging.Target
:param direction: RPC direction
:type direction: str 'send' or 'receive'
'send': tacker server -> agent
'receive': neturon server <- agent
"""
return self._call(context, port_id=port_id, src_target=src_target,
dst_unix_target=dst_unix_target, direction=direction)
def destroy_rpc_proxy(self, context, port_id, rpc_proxy_id):
return self._call(context, proxy_id=port_id, rpc_proxy_id=rpc_proxy_id)
def create_rpc_namespace_proxy(self, context, port_id, src_target,
dst_transport_url, dst_target, direction):
"""
:param src_target: target to listen/send
:type src_target: oslo.messaging.Target
:param dst_target: target to send/listen
:type dst_target: oslo.messaging.Target
:param direction: RPC direction
:type direction: str 'send' or 'receive'
'send': tacker server -> agent
'receive': neturon server <- agent
"""
return self._call(context, port_id=port_id,
src_target=src_target,
dst_transport_url=dst_transport_url,
dst_target=dst_target, direction=direction)
def destroy_rpc_namespace_proxy(self, context, port_id,
namespace_proxy_id):
return self._call(context, port_id=port_id,
namespace_proxy_id=namespace_proxy_id)