From cc37eea591152cbbb9b55c63fb488c2d0ee0565c Mon Sep 17 00:00:00 2001 From: Isaku Yamahata Date: Thu, 26 Jun 2014 18:13:20 +0900 Subject: [PATCH] agent for rpc proxy Change-Id: I99ed807f785540568905ca74991c565101b0ec16 --- tacker/oslo_service.py | 205 ++++++++++ tacker/tests/unit/services/vm/__init__.py | 0 .../tests/unit/services/vm/agent/__init__.py | 0 .../unit/services/vm/agent/test_agent.py | 296 ++++++++++++++ .../services/vm/agent/test_namespace_proxy.py | 173 ++++++++ .../unit/services/vm/agent/test_target.py | 39 ++ tacker/vm/agent/__init__.py | 0 tacker/vm/agent/agent.py | 368 ++++++++++++++++++ tacker/vm/agent/config.py | 31 ++ tacker/vm/agent/namespace_proxy.py | 340 ++++++++++++++++ tacker/vm/agent/target.py | 45 +++ tacker/vm/agent_base.py | 46 +++ 12 files changed, 1543 insertions(+) create mode 100644 tacker/oslo_service.py create mode 100644 tacker/tests/unit/services/vm/__init__.py create mode 100644 tacker/tests/unit/services/vm/agent/__init__.py create mode 100644 tacker/tests/unit/services/vm/agent/test_agent.py create mode 100644 tacker/tests/unit/services/vm/agent/test_namespace_proxy.py create mode 100644 tacker/tests/unit/services/vm/agent/test_target.py create mode 100644 tacker/vm/agent/__init__.py create mode 100644 tacker/vm/agent/agent.py create mode 100644 tacker/vm/agent/config.py create mode 100644 tacker/vm/agent/namespace_proxy.py create mode 100644 tacker/vm/agent/target.py create mode 100644 tacker/vm/agent_base.py diff --git a/tacker/oslo_service.py b/tacker/oslo_service.py new file mode 100644 index 000000000..69043f1b9 --- /dev/null +++ b/tacker/oslo_service.py @@ -0,0 +1,205 @@ +# Copyright 2011 VMware, Inc +# All Rights Reserved. +# +# based on tacker.service and nova.service +# Copyright 2013, 2014 Intel Corporation. +# Copyright 2013, 2014 Isaku Yamahata +# +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# @author: Isaku Yamahata, Intel Corporation. + +import inspect +import os.path +import random + +from oslo import messaging + +from tacker import context +from tacker.openstack.common.gettextutils import _ +from tacker.openstack.common import importutils +from tacker.openstack.common import log as logging +from tacker.openstack.common import loopingcall +from tacker.openstack.common import service +from tacker import service as tacker_service # noqa # for service_opts + + +LOG = logging.getLogger(__name__) +TRANSPORT_ALIASES = { + 'tacker.openstack.common.rpc.impl_kombu': 'rabbit', + 'tacker.openstack.common.rpc.impl_qpid': 'qpid', + 'tacker.openstack.common.rpc.impl_zmq': 'zmq', + 'tacker.openstack.common.rpc.impl_fake': 'fake', +} + + +# replacement for tacker.openstack.common.rpc.service.Service +class RpcService(service.Service): + """Service object for binaries running on hosts. + + A service enables rpc by listening to queues based on topic and host. + """ + def __init__(self, conf, host, topic, manager=None, serializer=None): + super(RpcService, self).__init__() + self.conf = conf + self.host = host + self.topic = topic + self.serializer = serializer + if manager is None: + self.manager = self + else: + self.manager = manager + + def start(self): + super(RpcService, self).start() + + target = messaging.Target(topic=self.topic, server=self.host) + endpoints = [self.manager] + transport = messaging.get_transport(self.conf, + aliases=TRANSPORT_ALIASES) + self.rpcserver = messaging.get_rpc_server( + transport, target, endpoints, executor='eventlet', + serializer=self.serializer) + + # Hook to allow the manager to do other initializations after + # the rpc connection is created. + if callable(getattr(self.manager, 'initialize_service_hook', None)): + self.manager.initialize_service_hook(self) + + self.rpcserver.start() + + def stop(self): + # Try to shut the connection down, but if we get any sort of + # errors, go ahead and ignore them.. as we're shutting down anyway + try: + self.rpcserver.stop() + self.rpcserver.wait() + except Exception: + pass + super(RpcService, self).stop() + + +# replacement for tacker.service.Service +class TackerService(RpcService): + def __init__(self, conf, host, binary, topic, manager, + report_interval=None, + periodic_interval=None, periodic_fuzzy_delay=None, + *args, **kwargs): + self.binary = binary + self.manager_class_name = manager + manager_class = importutils.import_class(self.manager_class_name) + self.manager = manager_class(conf=conf, host=host, *args, **kwargs) + self.report_interval = report_interval + self.periodic_interval = periodic_interval + self.periodic_fuzzy_delay = periodic_fuzzy_delay + self.saved_args, self.saved_kwargs = args, kwargs + self.timers = [] + super(TackerService, self).__init__(conf, host, topic, + manager=self.manager) + + def start(self): + self.manager.init_host() + super(TackerService, self).start() + if self.report_interval: + pulse = loopingcall.FixedIntervalLoopingCall(self.report_state) + pulse.start(interval=self.report_interval, + initial_delay=self.report_interval) + self.timers.append(pulse) + + if self.periodic_interval: + if self.periodic_fuzzy_delay: + initial_delay = random.randint(0, self.periodic_fuzzy_delay) + else: + initial_delay = None + + periodic = loopingcall.FixedIntervalLoopingCall( + self.periodic_tasks) + periodic.start(interval=self.periodic_interval, + initial_delay=initial_delay) + self.timers.append(periodic) + self.manager.after_start() + + def kill(self): + """Destroy the service object.""" + self.stop() + + def stop(self): + super(TackerService, self).stop() + for x in self.timers: + try: + x.stop() + except Exception: + LOG.exception(_("Exception occurs when timer stops")) + pass + self.timers = [] + + def wait(self): + super(TackerService, self).wait() + for x in self.timers: + try: + x.wait() + except Exception: + LOG.exception(_("Exception occurs when waiting for timer")) + pass + + def periodic_tasks(self, raise_on_error=False): + """Tasks to be run at a periodic interval.""" + ctxt = context.get_admin_context() + self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error) + + def report_state(self): + """Update the state of this service.""" + # Todo(gongysh) report state to tacker server + pass + + def __getattr__(self, key): + manager = self.__dict__.get('manager', None) + return getattr(manager, key) + + @classmethod + def create(cls, conf, host=None, binary=None, topic=None, manager=None, + report_interval=None, periodic_interval=None, + periodic_fuzzy_delay=None): + """Instantiates class and passes back application object. + + :param host: defaults to conf.host + :param binary: defaults to basename of executable + :param topic: defaults to bin_name - 'nova-' part + :param manager: defaults to conf._manager + :param report_interval: defaults to conf.report_interval + :param periodic_interval: defaults to conf.periodic_interval + :param periodic_fuzzy_delay: defaults to conf.periodic_fuzzy_delay + + """ + if not host: + host = conf.host + if not binary: + binary = os.path.basename(inspect.stack()[-1][1]) + if not topic: + topic = binary.rpartition('tacker-')[2] + topic = topic.replace("-", "_") + if not manager: + manager = conf.get('%s_manager' % topic, None) + if report_interval is None: + report_interval = conf.AGENT.report_interval + if periodic_interval is None: + periodic_interval = conf.periodic_interval + if periodic_fuzzy_delay is None: + periodic_fuzzy_delay = conf.periodic_fuzzy_delay + service_obj = cls(conf, host, binary, topic, manager, + report_interval=report_interval, + periodic_interval=periodic_interval, + periodic_fuzzy_delay=periodic_fuzzy_delay) + + return service_obj diff --git a/tacker/tests/unit/services/vm/__init__.py b/tacker/tests/unit/services/vm/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tacker/tests/unit/services/vm/agent/__init__.py b/tacker/tests/unit/services/vm/agent/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tacker/tests/unit/services/vm/agent/test_agent.py b/tacker/tests/unit/services/vm/agent/test_agent.py new file mode 100644 index 000000000..b56cf07a4 --- /dev/null +++ b/tacker/tests/unit/services/vm/agent/test_agent.py @@ -0,0 +1,296 @@ +# Copyright 2014 Intel Corporation. +# Copyright 2014 Isaku Yamahata +# +# All Rights Reserved. +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Isaku Yamahata, Intel Corporation. + +import uuid + +import mock +from oslo.config import cfg +import oslo.messaging.rpc.client +from oslo.messaging import target +from oslo.messaging import transport + +import tacker.agent.linux.ip_lib +from tacker import context +from tacker.tests import base +from tacker.vm.agent import agent + + +class TestVMService(base.BaseTestCase): + ctxt = context.Context('user', 'tenant') + network_id = str(uuid.uuid4()) + subnet_id = str(uuid.uuid4()) + port_id = str(uuid.uuid4()) + mac_address = '00:00:00:00:00:01' + netmask = '/24' + network_address = '192.168.1.0' + cidr = network_address + netmask + ip_address = '192.168.1.3' + gw_address = '192.168.1.1' + port = { + 'id': port_id, + 'network_id': network_id, + 'mac_address': mac_address, + 'fixed_ips': [{'subnet_id': subnet_id, + 'ip_address': ip_address, + 'subnet': { + 'cidr': cidr, + 'ip_version': 4, + 'gateway_ip': gw_address}}] + } + + def setUp(self): + super(TestVMService, self).setUp() + conf = cfg.CONF + + # NOTE(yamahata): work around. rpc driver-dependent config variables + # remove this line once tacker are fully ported to oslo.messaging + from tacker.openstack.common import rpc + conf.unregister_opts(rpc.rpc_opts) + + conf.register_opts(transport._transport_opts) + conf.set_override('rpc_backend', + 'tacker.openstack.common.rpc.impl_fake') + self.addCleanup(mock.patch.stopall) + self.mock_get_transport_p = mock.patch('oslo.messaging.get_transport') + self.mock_get_transport = self.mock_get_transport_p.start() + self.mock_import_object_p = mock.patch( + 'tacker.openstack.common.importutils.import_object') + self.mock_import_object = self.mock_import_object_p.start() + self.vif_driver = mock.create_autospec( + tacker.agent.linux.interface.NullDriver) + self.mock_import_object.return_value = self.vif_driver + + self.agent = agent.ServiceVMAgent('host', conf=conf) + + self.mock_process_manager_p = mock.patch.object( + tacker.agent.linux.external_process, 'ProcessManager') + self.mock_process_manager = self.mock_process_manager_p.start() + self.mock_process_manager_instance = ( + self.mock_process_manager.return_value) + self.mock_device_exists_p = mock.patch( + 'tacker.agent.linux.ip_lib.device_exists') + self.mock_device_exists = self.mock_device_exists_p.start() + self.mock_device_exists.return_value = False + self.mock_ipwrapper_p = mock.patch.object(tacker.agent.linux.ip_lib, + 'IPWrapper') + self.mock_ipwrapper = self.mock_ipwrapper_p.start() + self.mock_ipwrapper_instance = self.mock_ipwrapper.return_value + self.mock_rpc_client_p = mock.patch.object(oslo.messaging.rpc, + 'RPCClient') + self.mock_rpc_client = self.mock_rpc_client_p.start() + self.mock_rpc_client_instance = self.mock_rpc_client.return_value + + def test_create_destroy_namespace_agent(self): + self.agent.create_namespace_agent(self.ctxt, self.port) + self.vif_driver.plug.assert_called_once_with( + self.network_id, self.port_id, mock.ANY, self.mac_address, + namespace=mock.ANY) + self.vif_driver.init_l3.assert_called_once_with( + mock.ANY, [self.ip_address + self.netmask], namespace=mock.ANY) + self.mock_ipwrapper.assert_called_once_with( + mock.ANY, namespace=mock.ANY) + self.mock_ipwrapper_instance.netns.execute.assert_called_once_with( + ['route', 'add', 'default', 'gw', self.gw_address], + check_exit_code=False) + self.assertTrue(self.mock_process_manager_instance.enable.called) + + self.agent.destroy_namespace_agent(self.ctxt, self.port_id) + self.vif_driver.unplug.assert_called_once_with(mock.ANY, + namespace=mock.ANY) + self.mock_process_manager_instance.disable.assert_called_once_with() + + def test_create_rpc_proxy_wrong_port_id(self): + func = lambda: self.agent.create_rpc_proxy( + self.ctxt, self.port_id, 'topic=src_topic,server=src_server', + 'topic=dst_topic,server=dst_server', 'wrong-direction') + self.assertRaises(RuntimeError, func) + + def test_create_rpc_proxy_wrong_direction(self): + self.agent.create_namespace_agent(self.ctxt, self.port) + func = lambda: self.agent.create_rpc_proxy( + self.ctxt, self.port_id, 'topic=src_topic,server=src_server', + 'topic=dst_topic,server=dst_server', 'wrong-direction') + self.assertRaises(RuntimeError, func) + self.agent.destroy_namespace_agent(self.ctxt, self.port_id) + + def test_create_destroy_rpc_proxy_send(self): + self.agent.create_namespace_agent(self.ctxt, self.port) + + with mock.patch('oslo.messaging.proxy.get_proxy_server' + ) as mock_get_proxy_server: + mock_transport = self.mock_get_transport.return_value + mock_instance = mock_get_proxy_server.return_value + + proxy_id = self.agent.create_rpc_proxy( + self.ctxt, self.port_id, 'topic=src_topic,server=src_server', + 'topic=dst_topic,server=dst_server', 'send') + src_target = target.Target(topic='src_topic', server='src_server') + dst_target = target.Target(topic='dst_topic', server='dst_server') + mock_get_proxy_server.assert_called_once_with( + mock_transport, src_target, None, + mock_transport, dst_target, None, executor=mock.ANY) + mock_instance.start.assert_called_once_with() + + self.agent.destroy_rpc_proxy(self.ctxt, self.port_id, proxy_id) + mock_instance.stop.assert_called_once_with() + mock_instance.wait.assert_called_once_with() + + self.agent.destroy_namespace_agent(self.ctxt, self.port_id) + + def test_create_destroy_rpc_proxy_receive(self): + self.agent.create_namespace_agent(self.ctxt, self.port) + + with mock.patch('oslo.messaging.proxy.get_proxy_server' + ) as mock_get_proxy_server: + mock_transport = self.mock_get_transport.return_value + mock_instance = mock_get_proxy_server.return_value + + proxy_id = self.agent.create_rpc_proxy( + self.ctxt, self.port_id, 'topic=src_topic,server=src_server', + 'topic=dst_topic,server=dst_server', 'receive') + src_target = target.Target(topic='src_topic', server='src_server') + dst_target = target.Target(topic='dst_topic', server='dst_server') + mock_get_proxy_server.assert_called_once_with( + mock_transport, dst_target, None, + mock_transport, src_target, None, executor=mock.ANY) + mock_instance.start.assert_called_once_with() + + self.agent.destroy_rpc_proxy(self.ctxt, self.port_id, proxy_id) + mock_instance.stop.assert_called_once_with() + mock_instance.wait.assert_called_once_with() + + self.agent.destroy_namespace_agent(self.ctxt, self.port_id) + + def test_create_destroy_rpc_proxy(self): + self.mock_device_exists.return_value = False + self.agent.create_namespace_agent(self.ctxt, self.port) + + with mock.patch('oslo.messaging.proxy.get_proxy_server' + ) as mock_get_proxy_server: + mock_transport = self.mock_get_transport.return_value + + proxy_id_send = self.agent.create_rpc_proxy( + self.ctxt, self.port_id, + 'topic=src_topic_send,server=src_server_send', + 'topic=dst_topic_send,server=dst_server_send', 'send') + src_target_send = target.Target(topic='src_topic_send', + server='src_server_send') + dst_target_send = target.Target(topic='dst_topic_send', + server='dst_server_send') + self.agent.create_rpc_proxy( + self.ctxt, self.port_id, + 'topic=src_topic_receive,server=src_server_receive', + 'topic=dst_topic_receive,server=dst_server_receive', 'receive') + src_target_recv = target.Target(topic='src_topic_receive', + server='src_server_receive') + dst_target_recv = target.Target(topic='dst_topic_receive', + server='dst_server_receive') + + self.agent.destroy_rpc_proxy(self.ctxt, + self.port_id, proxy_id_send) + self.agent.destroy_namespace_agent(self.ctxt, self.port_id) + + mock_get_proxy_server.assert_has_calls([ + mock.call(mock_transport, src_target_send, None, + mock_transport, dst_target_send, None, + executor=mock.ANY), + mock.call().start(), + mock.call(mock_transport, dst_target_recv, None, + mock_transport, src_target_recv, None, + executor=mock.ANY), + mock.call().start(), + mock.call().stop(), mock.call().wait(), + mock.call().stop(), mock.call().wait()]) + + def _test_create_destroy_rpc_namespace_proxy_direction(self, direction): + self.agent.create_namespace_agent(self.ctxt, self.port) + + src_target = 'topic=src_topic,server=src_server' + dst_transport_url = 'rabbit://guest:guest@host:5672' + dst_target = 'topic=dst_topic,server=dst_server' + ns_proxy_id = self.agent.create_rpc_namespace_proxy( + self.ctxt, self.port_id, src_target, + dst_transport_url, dst_target, direction) + kwargs = { + 'src_target': src_target, + 'dst_transport_url': dst_transport_url, + 'dst_target': dst_target, + 'direction': direction, + } + self.mock_rpc_client_instance.call.assert_called_once_with( + {}, 'create_rpc_namespace_proxy', **kwargs) + + self.agent.destroy_rpc_namespace_proxy(self.ctxt, + self.port_id, ns_proxy_id) + + self.mock_rpc_client_instance.call.assert_has_calls([ + mock.call({}, 'create_rpc_namespace_proxy', **kwargs), + mock.call({}, 'destroy_rpc_namespace_proxy', + namespace_proxy_id=ns_proxy_id)]) + + self.agent.destroy_namespace_agent(self.ctxt, self.port_id) + + def test_create_destroy_rpc_namespace_proxy_send(self): + self._test_create_destroy_rpc_namespace_proxy_direction('send') + + def test_create_destroy_rpc_namespace_proxy_receive(self): + self._test_create_destroy_rpc_namespace_proxy_direction('receive') + + def test_create_destroy_rpc_namespace_proxy(self): + self.agent.create_namespace_agent(self.ctxt, self.port) + + src_target_send = 'topic=src_topic_send,server=src_server_send' + dst_transport_url_send = 'rabbit://guestsend:guestsend@sendhost:5672' + dst_target_send = 'topic=dst_topic_send,server=dst_server_send' + direction_send = 'send' + ns_proxy_id_send = self.agent.create_rpc_namespace_proxy( + self.ctxt, self.port_id, src_target_send, + dst_transport_url_send, dst_target_send, direction_send) + kwargs_send = { + 'src_target': src_target_send, + 'dst_transport_url': dst_transport_url_send, + 'dst_target': dst_target_send, + 'direction': direction_send, + } + + src_target_recv = 'topic=src_topic_recv,server=src_server_recv' + dst_transport_url_recv = 'rabbit://guestrecv:guestrecv@recvhost:5672' + dst_target_recv = 'topic=dst_topic_recv,server=dst_server_recv' + direction_recv = 'receive' + self.agent.create_rpc_namespace_proxy( + self.ctxt, self.port_id, src_target_recv, + dst_transport_url_recv, dst_target_recv, direction_recv) + kwargs_recv = { + 'src_target': src_target_recv, + 'dst_transport_url': dst_transport_url_recv, + 'dst_target': dst_target_recv, + 'direction': direction_recv, + } + + self.agent.destroy_rpc_namespace_proxy(self.ctxt, + self.port_id, ns_proxy_id_send) + self.agent.destroy_namespace_agent(self.ctxt, self.port_id) + + self.mock_rpc_client_instance.call.assert_has_calls([ + mock.call({}, 'create_rpc_namespace_proxy', **kwargs_send), + mock.call({}, 'create_rpc_namespace_proxy', **kwargs_recv), + mock.call({}, 'destroy_rpc_namespace_proxy', + namespace_proxy_id=ns_proxy_id_send), + mock.call({}, 'destroy_namespace_agent')]) diff --git a/tacker/tests/unit/services/vm/agent/test_namespace_proxy.py b/tacker/tests/unit/services/vm/agent/test_namespace_proxy.py new file mode 100644 index 000000000..3efb6f1e9 --- /dev/null +++ b/tacker/tests/unit/services/vm/agent/test_namespace_proxy.py @@ -0,0 +1,173 @@ +# Copyright 2014 Intel Corporation. +# Copyright 2014 Isaku Yamahata +# +# All Rights Reserved. +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Isaku Yamahata, Intel Corporation. + +import mock +from oslo.config import cfg + +from tacker import context +from tacker.tests import base +from tacker.vm.agent import namespace_proxy +from tacker.vm.agent import target + + +class TestNamespaceAgent(base.BaseTestCase): + def setUp(self): + super(TestNamespaceAgent, self).setUp() + self.addCleanup(mock.patch.stopall) + self.mock_get_transport_p = mock.patch('oslo.messaging.get_transport') + self.mock_get_transport = self.mock_get_transport_p.start() + self.mock_transport = self.mock_get_transport.return_value + + def server_stop(): + pass + self.agent = namespace_proxy.ServiceVMNamespaceAgent( + 'host', conf=cfg.CONF, src_transport=self.mock_transport, + server_stop=server_stop) + + def test_start_stop_wait(self): + self.agent.init_host() + self.agent.after_start() + self.agent.stop() + self.agent.wait() + + def test_desstroy_namespace_agent(self): + self.agent.init_host() + self.agent.after_start() + ctxt = context.Context('user', 'tenant') + self.agent.destroy_namespace_agent(ctxt) + self.agent.wait() + + def test_create_destroy_rpc_namespace_proxy_send(self): + self.agent.init_host() + self.agent.after_start() + ctxt = context.Context('user', 'tenant') + with mock.patch('oslo.messaging.proxy.get_proxy_server' + ) as mock_get_proxy_server: + mock_proxy_server = mock_get_proxy_server.return_value + src_unix_target = 'topic=src_topic,server=src_server' + dst_transport_url = 'fake:///' + dst_target = 'topic=dst_topic,server=dst_server' + ns_proxy_id = self.agent.create_rpc_namespace_proxy( + ctxt, src_unix_target, dst_transport_url, dst_target, 'send') + src_unix_target = target.target_parse(src_unix_target) + dst_target = target.target_parse(dst_target) + mock_get_proxy_server.assert_called_once_with( + self.mock_transport, src_unix_target, None, + self.mock_transport, dst_target, None, executor=mock.ANY) + # mock_proxy_server.start.assert_called_once_with() + + self.agent.destroy_rpc_namespace_proxy(ctxt, ns_proxy_id) + mock_proxy_server.stop.assert_called_once_with() + mock_proxy_server.wait.assert_called_once_with() + self.mock_transport.cleanup.assert_called_once_with() + self.agent.stop() + self.agent.wait() + + def test_create_destroy_rpc_namespace_proxy_receive(self): + self.agent.init_host() + self.agent.after_start() + ctxt = context.Context('user', 'tenant') + with mock.patch('oslo.messaging.proxy.get_proxy_server' + ) as mock_get_proxy_server: + mock_proxy_server = mock_get_proxy_server.return_value + src_unix_target = 'topic=src_topic,server=src_server' + dst_transport_url = 'fake:///' + dst_target = 'topic=dst_topic,server=dst_server' + ns_proxy_id = self.agent.create_rpc_namespace_proxy( + ctxt, src_unix_target, dst_transport_url, dst_target, + 'receive') + src_unix_target = target.target_parse(src_unix_target) + dst_target = target.target_parse(dst_target) + mock_get_proxy_server.assert_called_once_with( + self.mock_transport, dst_target, None, + self.mock_transport, src_unix_target, None, executor=mock.ANY) + # mock_proxy_server.start.assert_called_once_with() + + self.agent.destroy_rpc_namespace_proxy(ctxt, ns_proxy_id) + mock_proxy_server.stop.assert_called_once_with() + mock_proxy_server.wait.assert_called_once_with() + self.mock_transport.cleanup.assert_called_once_with() + self.agent.stop() + self.agent.wait() + + def test_create_destroy_rpc_namespace_proxy(self): + self.agent.init_host() + self.agent.after_start() + ctxt = context.Context('user', 'tenant') + with mock.patch('oslo.messaging.proxy.get_proxy_server' + ) as mock_get_proxy_server: + mock_proxy_server = mock_get_proxy_server.return_value + src_unix_target_send = ('topic=src_topic_send,' + 'server=src_server_send') + dst_transport_url_send = 'fake:///' + dst_target_send = 'topic=dst_topic_send,server=dst_server_send' + ns_proxy_id_send = self.agent.create_rpc_namespace_proxy( + ctxt, src_unix_target_send, dst_transport_url_send, + dst_target_send, 'send') + src_unix_target_send = target.target_parse(src_unix_target_send) + dst_target_send = target.target_parse(dst_target_send) + mock_get_proxy_server.assert_called_once_with( + self.mock_transport, src_unix_target_send, None, + self.mock_transport, dst_target_send, None, executor=mock.ANY) + # mock_proxy_server.start.assert_called_once_with() + + src_unix_target_recv = ('topic=src_topic_recv,' + 'server=src_server_recv') + dst_transport_url_recv = 'fake:///' + dst_target_recv = 'topic=dst_topic_recv,server=dst_server_recv' + self.agent.create_rpc_namespace_proxy( + ctxt, src_unix_target_recv, dst_transport_url_recv, + dst_target_recv, 'receive') + src_unix_target_recv = target.target_parse(src_unix_target_recv) + dst_target_recv = target.target_parse(dst_target_recv) + + # mock.call().__hash__()/mock.call.__hash__() doesn't work + # due to __getattr__. So create it manually + call_hash = mock._Call(name='().__hash__') + mock_get_proxy_server.assert_has_calls([ + mock.call(self.mock_transport, src_unix_target_send, None, + self.mock_transport, dst_target_send, None, + executor=mock.ANY), + # mock.call().start(), + call_hash(), + mock.call(self.mock_transport, dst_target_recv, None, + self.mock_transport, src_unix_target_recv, None, + executor=mock.ANY), + # mock.call().start(), + call_hash()]) + + self.agent.destroy_rpc_namespace_proxy(ctxt, ns_proxy_id_send) + mock_proxy_server.stop.assert_called_once_with() + mock_proxy_server.wait.assert_called_once_with() + + self.agent.destroy_namespace_agent(ctxt) + self.agent.wait() + call_hash = mock._Call(name='__hash__') + mock_proxy_server.assert_has_calls([ + # mock.call.start(), + call_hash(), + # mock.call.start(), + call_hash(), + call_hash(), + mock.call.stop(), + mock.call.wait(), + mock.call.stop(), + mock.call.wait()]) + self.mock_transport.cleanup.assert_called_once_with() diff --git a/tacker/tests/unit/services/vm/agent/test_target.py b/tacker/tests/unit/services/vm/agent/test_target.py new file mode 100644 index 000000000..c6d3dec1a --- /dev/null +++ b/tacker/tests/unit/services/vm/agent/test_target.py @@ -0,0 +1,39 @@ +# Copyright 2014 Intel Corporation. +# Copyright 2014 Isaku Yamahata +# +# All Rights Reserved. +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Isaku Yamahata, Intel Corporation. + +from oslo.messaging import target + +from tacker.tests import base +from tacker.vm.agent import target as agent_target + + +class TestTarget(base.BaseTestCase): + target_str = ('exchange=default,topic=topic,namespace=namespace,' + 'version=version,server=server,fanout=False') + target_instance = target.Target('default', 'topic', 'namespace', 'version', + 'server', False) + + def test_parse(self): + t = agent_target.target_parse(self.target_str) + self.assertEqual(t, self.target_instance) + + def test_str(self): + t = agent_target.target_str(self.target_instance) + self.assertEqual(t, self.target_str) diff --git a/tacker/vm/agent/__init__.py b/tacker/vm/agent/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tacker/vm/agent/agent.py b/tacker/vm/agent/agent.py new file mode 100644 index 000000000..5b0d3da67 --- /dev/null +++ b/tacker/vm/agent/agent.py @@ -0,0 +1,368 @@ +# Copyright 2014 Intel Corporation. +# Copyright 2014 Isaku Yamahata +# +# All Rights Reserved. +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Isaku Yamahata, Intel Corporation. + +import eventlet +eventlet.monkey_patch() + +import atexit +import inspect +import uuid + +import netaddr +from oslo.config import cfg +from oslo import messaging +from oslo.messaging._drivers import impl_unix +from oslo.messaging import proxy +from oslo.messaging import rpc +from oslo.messaging import transport + +from tacker.agent.common import config as agent_config +from tacker.agent.linux import external_process +from tacker.agent.linux import interface +from tacker.agent.linux import ip_lib +from tacker.common import config +from tacker.common import legacy +from tacker.common import topics +from tacker import manager +from tacker.openstack.common import excutils +from tacker.openstack.common import importutils +from tacker.openstack.common import lockutils +from tacker.openstack.common import log as logging +from tacker.openstack.common import service +from tacker import oslo_service +from tacker.services.loadbalancer.drivers.haproxy import namespace_driver +from tacker.vm.agent import config as vm_config +from tacker.vm.agent import target + +# _DEBUG = False +_DEBUG = True +LOG = logging.getLogger(__name__) + + +class NamespaceProxyAgentApi(object): + """ + api servicevm agent -> namespace proxy agent + """ + def __init__(self, unix_transport): + super(NamespaceProxyAgentApi, self).__init__() + target_ = messaging.Target(topic=topics.SERVICEVM_AGENT_NAMEPSACE) + self._client = rpc.RPCClient(unix_transport, target_) + + def _call(self, **kwargs): + method = inspect.stack()[1][3] + ctxt = {} + return self._client.call(ctxt, method, **kwargs) + + def destroy_namespace_agent(self): + return self._call() + + def create_rpc_namespace_proxy(self, src_target, + dst_transport_url, dst_target, direction): + return self._call( + src_target=src_target, dst_transport_url=dst_transport_url, + dst_target=dst_target, direction=direction) + + def destroy_rpc_namespace_proxy(self, namespace_proxy_id): + return self._call(namespace_proxy_id=namespace_proxy_id) + + +class NamespaceAgent(object): + def __init__(self, port_id, unix_transport, pm): + super(NamespaceAgent, self).__init__() + self.port_id = port_id + self.unix_transport = unix_transport + self.pm = pm + self.local_proxies = {} + self.api = NamespaceProxyAgentApi(unix_transport) + + +class ServiceVMAgent(manager.Manager): + _NS_PREFIX = 'qsvcvm-' + + @staticmethod + def _get_ns_name(port_id): + return ServiceVMAgent._NS_PREFIX + port_id + + def __init__(self, host=None, **kwargs): + conf = kwargs['conf'] + super(ServiceVMAgent, self).__init__(host=host) + self.conf = conf + self.root_helper = agent_config.get_root_helper(self.conf) + self._proxies = {} + + try: + vif_driver = importutils.import_object(conf.interface_driver, conf) + except ImportError: + with excutils.save_and_reraise_exception(): + msg = (_('Error importing interface driver: %s') + % conf.interface_driver) + LOG.error(msg) + self._vif_driver = vif_driver + self._proxy_agents = {} + self._src_transport = None + self._get_src_transport() + atexit.register(self._atexit) + + def _atexit(self): + for ns_agent in self._proxy_agents.values(): + ns_agent.pm.disable() + for port_id in self._proxy_agents.keys(): + self._unplug(port_id) + + def _get_src_transport(self): + conf = self.conf + + conf.register_opts(transport._transport_opts) + rpc_backend = conf.rpc_backend + if conf.transport_url is not None: + src_url = conf.transport_url + elif (rpc_backend.endswith('.impl_kombu') or + rpc_backend.endswith('.impl_rabbit')): + from oslo.messaging._drivers import impl_rabbit + conf.register_opts(impl_rabbit.rabbit_opts) + src_url = 'rabbit://%s:%s@%s:%s/%s' % ( + conf.rabbit_userid, conf.rabbit_password, + conf.rabbit_host, conf.rabbit_port, + conf.rabbit_virtual_host) + elif rpc_backend.endswith('.impl_qpid'): + from oslo.messaging._drivers import impl_qpid + conf.register_opts(impl_qpid.qpid_opts) + src_url = 'qpid://%s:%s@%s:%s/' % ( + conf.pid_username, conf.qpid_password, + conf.qpid_hostname, conf.qpid_port) + elif rpc_backend.endswith('.impl_zmq'): + from oslo.messaging._drivers import impl_zmq + conf.register_opts(impl_zmq.zmq_opts) + src_url = 'zmq://%s:%s/' % (conf.rpc_zmq_host, conf.rpc_zmq_port) + elif rpc_backend.endswith('.impl_fake'): + src_url = 'fake:///' + else: + raise NotImplementedError( + _('rpc_backend %s is not supported') % rpc_backend) + + self._src_transport = messaging.get_transport(conf, src_url) + + def __del__(self): + if self._src_transport is not None: + self._src_transport.cleanup() + + # def create_device(self, context, device): + # LOG.debug(_('create_device %s'), device) + + # def update_device(self, context, device): + # LOG.debug(_('update_device %s'), device) + + # def delete_device(self, context, device): + # LOG.debug(_('delete_device %s'), device) + + # def create_service(self, context, device, service_instance): + # LOG.debug(_('create_service %(device)s %(service_instance)s'), + # {'device': device, 'service_instance': service_instance}) + + # def update_service(self, context, device, service_instance): + # LOG.debug(_('update_service %(device)s %(service_instance)s'), + # {'device': device, 'service_instance': service_instance}) + + # def delete_service(self, context, device, service_instance): + # LOG.debug(_('delete_service %(device)s %(service_instance)s'), + # {'device': device, 'service_instance': service_instance}) + + # TODO(yamahata): copied from loadbalancer/drivers/haproxy/namespace_driver + # consolidate it. + def _plug(self, port_config): + vif_driver = self._vif_driver + namespace = self._get_ns_name(port_config['id']) + interface_name = vif_driver.get_device_name( + namespace_driver.Wrap(port_config)) + + if not ip_lib.device_exists(interface_name, self.root_helper, + namespace): + vif_driver.plug( + port_config['network_id'], port_config['id'], interface_name, + port_config['mac_address'], namespace=namespace) + cidrs = [ + '%s/%s' % (ip['ip_address'], + netaddr.IPNetwork(ip['subnet']['cidr']).prefixlen) + for ip in port_config['fixed_ips'] + ] + vif_driver.init_l3(interface_name, cidrs, namespace=namespace) + + gw_ip = port_config['fixed_ips'][0]['subnet'].get('gateway_ip') + if gw_ip: + cmd = ['route', 'add', 'default', 'gw', gw_ip] + ip_wrapper = ip_lib.IPWrapper(self.root_helper, + namespace=namespace) + ip_wrapper.netns.execute(cmd, check_exit_code=False) + + def _unplug(self, port_id): + port_stub = {'id': port_id} + namespace = self._get_ns_name(port_id) + vif_driver = self._vif_driver + interface_name = vif_driver.get_device_name( + namespace_driver.Wrap(port_stub)) + vif_driver.unplug(interface_name, namespace=namespace) + + @lockutils.synchronized('servicevm-agent', 'tacker-') + def create_namespace_agent(self, context, port): + conf = self.conf + port_id = port['id'] + path = 'rpc-proxy-%s' % port_id + unix_url = 'punix:///%s' % path + unix_transport = messaging.get_transport(conf, unix_url) + unix_transport._driver.punix_listening.wait() + + self._plug(port) + pm = external_process.ProcessManager( + conf, port_id, root_helper=self.root_helper, + namespace=self._get_ns_name(port_id)) + + def cmd_callback(pid_file_name): + cmd = ['tacker-servicevm-ns-rpc-proxy', + '--pid-file=%s' % pid_file_name, + '--svcvm-proxy-dir=%s' % conf.svcvm_proxy_dir, + '--src-transport-url', 'unix:///%s' % path] + cmd.extend(agent_config.get_log_args( + conf, 'tacker-servicevm-ns-rpc-proxy-%s.log' % port_id)) + if _DEBUG: + cmd += ['--log-file=/tmp/tacker-servicevm-ns-rpc-proxy-' + '%s.log' % port_id] + return cmd + pm.enable(cmd_callback) + + ns_agent = NamespaceAgent(port_id, unix_transport, pm) + self._proxy_agents[port_id] = ns_agent + + @lockutils.synchronized('servicevm-agent', 'tacker-') + def destroy_namespace_agent(self, context, port_id): + ns_agent = self._proxy_agents.pop(port_id) + ns_agent.api.destroy_namespace_agent() + for proxy_server in ns_agent.local_proxies.values(): + proxy_server.stop() + for proxy_server in ns_agent.local_proxies.values(): + proxy_server.wait() + ns_agent.pm.disable() + self._unplug(port_id) + + def _create_rpc_proxy(self, ns_agent, src_transport, src_target, + dst_transport, dst_target): + rpc_proxy_id = str(uuid.uuid4()) + src_target = target.target_parse(src_target) + assert src_target.server + dst_target = target.target_parse(dst_target) + assert dst_target.server + proxy_server = proxy.get_proxy_server( + src_transport, src_target, None, + dst_transport, dst_target, None, executor='eventlet') + ns_agent.local_proxies[rpc_proxy_id] = proxy_server + proxy_server.start() + return rpc_proxy_id + + def _get_proxy_agent(self, port_id): + ns_agent = self._proxy_agents.get(port_id) + if ns_agent is None: + msg = _('unknown port_id %s') % port_id + LOG.error(msg) + raise RuntimeError(msg) + return ns_agent + + @lockutils.synchronized('servicevm-agent', 'tacker-') + def create_rpc_proxy(self, context, port_id, + src_target, dst_unix_target, direction): + ns_agent = self._get_proxy_agent(port_id) + if direction == 'send': + return self._create_rpc_proxy( + ns_agent, self._src_transport, src_target, + ns_agent.unix_transport, dst_unix_target) + elif direction == 'receive': + return self._create_rpc_proxy( + ns_agent, ns_agent.unix_transport, dst_unix_target, + self._src_transport, src_target) + else: + msg = _('unknown direction %s') % direction + LOG.error(msg) + raise RuntimeError(msg) + + @lockutils.synchronized('servicevm-agent', 'tacker-') + def destroy_rpc_proxy(self, context, port_id, rpc_proxy_id): + ns_agent = self._get_proxy_agent(port_id) + proxy_server = ns_agent.local_proxies.pop(rpc_proxy_id) + proxy_server.stop() + proxy_server.wait() + + @lockutils.synchronized('servicevm-agent', 'tacker-') + def create_rpc_namespace_proxy(self, context, port_id, src_target, + dst_transport_url, dst_target, direction): + ns_agent = self._get_proxy_agent(port_id) + ns_proxy_id = ns_agent.api.create_rpc_namespace_proxy( + src_target, dst_transport_url, dst_target, direction) + LOG.debug("create_rpc_namespace_proxy %s", ns_proxy_id) + return ns_proxy_id + + @lockutils.synchronized('servicevm-agent', 'tacker-') + def destroy_rpc_namespace_proxy(self, context, + port_id, namespace_proxy_id): + ns_agent = self._get_proxy_agent(port_id) + return ns_agent.api.destroy_rpc_namespace_proxy(namespace_proxy_id) + + +class ServiceVMAgentWithStateReport(ServiceVMAgent): + # TODO(yamahata) + pass + + +def _register_options(conf): + conf.register_opts(interface.OPTS) + agent_config.register_interface_driver_opts_helper(conf) + agent_config.register_agent_state_opts_helper(conf) + agent_config.register_root_helper(conf) + conf.register_opts(vm_config.OPTS) + conf.register_opts(impl_unix.unix_opts) + + +def main(): + conf = cfg.CONF + + # NOTE(yamahata): work around. rpc driver-dependent config variables + # remove this line once tacker are fully ported to oslo.messaging + from tacker.openstack.common import rpc + conf.unregister_opts(rpc.rpc_opts) + + # NOTE(yamahata): corresponds to + # tacker.common.config.rpc.set_default(control_exchange='tacker') + messaging.set_transport_defaults('tacker') + + _register_options(conf) + conf(project='tacker') + config.setup_logging(conf) + legacy.modernize_quantum_config(conf) + # NOTE(yamahata): workaround for state_path + # oslo.messaging doesn't know state_path + conf.set_override('rpc_unix_ipc_dir', conf.svcvm_proxy_dir) + + server = oslo_service.TackerService.create( + topic=topics.SERVICEVM_AGENT, + manager='tacker.vm.agent.agent.ServiceVMAgentWithStateReport', + report_interval=conf.AGENT.report_interval, + conf=conf) + service.launch(server).wait() + + +if __name__ == '__main__': + main() diff --git a/tacker/vm/agent/config.py b/tacker/vm/agent/config.py new file mode 100644 index 000000000..4c698c9de --- /dev/null +++ b/tacker/vm/agent/config.py @@ -0,0 +1,31 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013, 2014 Intel Corporation. +# Copyright 2013, 2014 Isaku Yamahata +# +# All Rights Reserved. +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Isaku Yamahata, Intel Corporation. + +from oslo.config import cfg + + +OPTS = [ + cfg.StrOpt('svcvm-proxy-dir', + default='$state_path/svcvm_proxy_dir', + help=_('Location for servicevm agent proxy ' + 'UNIX domain socket')), +] diff --git a/tacker/vm/agent/namespace_proxy.py b/tacker/vm/agent/namespace_proxy.py new file mode 100644 index 000000000..222f2f0cf --- /dev/null +++ b/tacker/vm/agent/namespace_proxy.py @@ -0,0 +1,340 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013, 2014 Intel Corporation. +# Copyright 2013, 2014 Isaku Yamahata +# +# All Rights Reserved. +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Isaku Yamahata, Intel Corporation. + +import eventlet +eventlet.monkey_patch() + + +import os.path +import random +import sys +import uuid + +from oslo.config import cfg +from oslo import messaging +from oslo.messaging._drivers import impl_unix +from oslo.messaging import proxy +from oslo.messaging import rpc + +from tacker.agent.common import config as agent_config +from tacker.agent.linux import daemon +from tacker.common import config +from tacker.common import legacy +from tacker.common import topics +from tacker.common import utils +from tacker import context +from tacker import manager +from tacker.openstack.common import importutils +from tacker.openstack.common import lockutils +from tacker.openstack.common import log as logging +from tacker.openstack.common import service +from tacker import oslo_service +from tacker.vm.agent import config as vm_config +from tacker.vm.agent import target + + +LOG = logging.getLogger(__name__) + + +class NamespaceProxies(object): + def __init__(self): + super(NamespaceProxies, self).__init__() + self.urls = {} # dict: transport_url -> transport + self.transports = {} # dict: transport -> transport_url + self.proxies = {} # uuid -> (transport, proxy_server) + self.transport_to_proxies = {} # transport -> set of proxy_servers + + def get_transport(self, transport_url): + return self.urls.get(transport_url, None) + + def add_transport(self, transport_url, transport): + assert transport_url not in self.urls + assert transport not in self.transports + self.transports[transport_url] = transport + self.urls[transport] = transport_url + + def del_proxy(self, namespace_proxy_id): + transport, proxy_server = self.proxies.pop(namespace_proxy_id) + proxies = self.transport_to_proxies[transport] + proxies.remove(proxy_server) + if proxies: + transport = None + else: + transport_url = self.urls.pop(transport) + del self.transports[transport_url] + del self.transport_to_proxies[transport] + return (transport, proxy_server) + + def add_proxy(self, namespace_proxy_id, transport, proxy_server): + assert namespace_proxy_id not in self.proxies + self.proxies[namespace_proxy_id] = (transport, proxy_server) + proxies = self.transport_to_proxies.setdefault(transport, set()) + proxies.add(proxy_server) + + +class ServiceVMNamespaceAgent(manager.Manager): + def __init__(self, host=None, **kwargs): + LOG.debug(_('host %(host)s, kwargs %(kwargs)s'), + {'host': host, 'kwargs': kwargs}) + super(ServiceVMNamespaceAgent, self).__init__(host=host) + + for key in ('conf', 'src_transport', 'server_stop', ): + setattr(self, key, kwargs[key]) + assert self.src_transport is not None + assert self.server_stop is not None + + self._proxies = NamespaceProxies() + + def stop(self): + LOG.debug('stop') + self.server_stop() + ns_proxies = self._proxies + for _transport, proxy_server in ns_proxies.proxies.values(): + proxy_server.stop() + + def wait(self): + LOG.debug('wait') + ns_proxies = self._proxies + for _transport, proxy_server in ns_proxies.proxies.values(): + proxy_server.wait() + for transport in ns_proxies.transports.values(): + transport.cleanup() + + @lockutils.synchronized('servicevm-namespace-agent', 'tacker-') + def destroy_namespace_agent(self, context): + self.stop() + + def _create_rpc_namespace_proxy(self, src_transport, src_target, + dst_transport, dst_target): + src_target = target.target_parse(src_target) + assert src_target.server + dst_target = target.target_parse(dst_target) + assert dst_target.server + return proxy.get_proxy_server( + src_transport, src_target, None, + dst_transport, dst_target, None, executor='eventlet') + + @lockutils.synchronized('servicevm-namespace-agent', 'tacker-') + def create_rpc_namespace_proxy(self, context, src_target, + dst_transport_url, dst_target, direction): + LOG.debug('create_rpc_namespace_proxy %s %s %s %s %s', + context, src_target, dst_transport_url, dst_target, + direction) + dst_transport = self._proxies.get_transport(dst_transport_url) + if dst_transport is None: + dst_transport = messaging.get_transport(self.conf, + dst_transport_url) + self._proxies.add_transport(dst_transport_url, dst_transport) + if direction == 'send': + proxy_server = self._create_rpc_namespace_proxy( + self.src_transport, src_target, dst_transport, dst_target) + elif direction == 'receive': + proxy_server = self._create_rpc_namespace_proxy( + dst_transport, dst_target, self.src_transport, src_target) + else: + msg = _('unknown direction %s') % direction + LOG.error(msg) + raise RuntimeError(msg) + + # proxy_server.start() + eventlet.spawn(proxy_server.start) + namespace_proxy_id = str(uuid.uuid4()) + self._proxies.add_proxy(namespace_proxy_id, + dst_transport, proxy_server) + LOG.debug('namespace_proxy_id %s', namespace_proxy_id) + return namespace_proxy_id + + @lockutils.synchronized('servicevm-namespace-agent', 'tacker-') + def destroy_rpc_namespace_proxy(self, context, namespace_proxy_id): + LOG.debug('namespace_proxy_id %s', namespace_proxy_id) + try: + transport, proxy_server = self._proxies.del_proxy( + namespace_proxy_id) + except KeyError: + return + proxy_server.stop() + proxy_server.wait() + if transport is not None: + transport.cleanup() + + +# TODO(yamahata): class Service is stolen from nova.service and modified. +# port tacker to oslo.messaging and delete this class. +class Service(service.Service): + def __init__(self, conf, host, binary, topic, manager_, + report_interval=None, periodic_enable=None, + periodic_fuzzy_delay=None, periodic_interval_max=None, + *args, **kwargs): + super(Service, self).__init__() + self.conf = conf + self.host = host + self.binary = binary + self.topic = topic + self.manager_class_name = manager_ + manager_class = importutils.import_class(self.manager_class_name) + kwargs_ = kwargs.copy() + kwargs_['conf'] = conf + self.manager = manager_class(host=self.host, *args, **kwargs_) + self.src_transport = kwargs['src_transport'] + self.rpcserver = None + self.report_interval = report_interval + self.periodic_enable = periodic_enable + self.periodic_fuzzy_delay = periodic_fuzzy_delay + self.periodic_interval_max = periodic_interval_max + self.saved_args, self.saved_kwargs = args, kwargs + + def start(self): + self.manager.init_host() + LOG.debug(_("Creating RPC server for service %(topic)s %(driver)s"), + {'topic': self.topic, 'driver': self.src_transport._driver}) + + target = messaging.Target(topic=self.topic, server=self.host) + endpoints = [self.manager] + self.rpcserver = rpc.get_rpc_server(self.src_transport, target, + endpoints, executor='eventlet') + self.rpcserver.start() + + if self.periodic_enable: + if self.periodic_fuzzy_delay: + initial_delay = random.randint(0, self.periodic_fuzzy_delay) + else: + initial_delay = None + + self.tg.add_dynamic_timer( + self.periodic_tasks, initial_delay=initial_delay, + periodic_interval_max=self.periodic_interval_max) + self.manager.after_start() + LOG.debug('start done') + + @classmethod + def create(cls, conf, src_transport, + host=None, binary=None, topic=None, manager_=None, **kwargs): + if not host: + host = conf.host + if not binary: + binary = os.path.basename(sys.argv[0]) + if not topic: + topic = binary.rpartition('tacker-')[2] + topic = topic.replace('-', '_') + if not manager_: + manager_ = conf.get('%s_manager' % topic, None) + service_obj = cls(conf, host, binary, topic, manager_, + src_transport=src_transport, **kwargs) + return service_obj + + def kill(self): + self.stop() + + def stop(self): + try: + self.rpcserver.stop() + self.manager.stop() + except Exception: + LOG.exception(_('failed to stop rpcserver')) + + super(Service, self).stop() + + def wait(self): + try: + self.rpcserver.wait() + self.manager.wait() + except Exception: + LOG.exception(_('failed to wait rpcserver')) + + super(Service, self).wait() + + def periodic_tasks(self, raise_on_error=False): + """Tasks to be run at a periodic interval.""" + ctxt = context.get_admin_context() + return self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error) + + def report_state(self): + pass + + +class ProxyDaemon(daemon.Daemon): + def __init__(self, conf): + self._conf = conf + super(ProxyDaemon, self).__init__(conf.pid_file, uuid=conf.port_id) + + def run(self): + conf = self._conf + + def server_stop(): + server.stop() + LOG.debug(_('src transport url %s'), conf.src_transport_url) + src_transport = messaging.get_transport( + conf, conf.src_transport_url, + aliases=oslo_service.TRANSPORT_ALIASES) + server = Service.create( + conf=conf, topic=topics.SERVICEVM_AGENT_NAMEPSACE, + manager_=('tacker.vm.agent.namespace_proxy.' + 'ServiceVMNamespaceAgent'), + src_transport=src_transport, server_stop=server_stop) + service.launch(server).wait() + src_transport.cleanup() + + +def _register_options(conf): + cli_opts = [ + cfg.StrOpt('pid-file', help=_('pid file of this process.')), + cfg.StrOpt('port-id', help=_('uuid of port')), + cfg.StrOpt('src-transport-url', help='src transport url'), + cfg.BoolOpt('daemonize', default=True, help=_('Run as daemon')) + ] + conf.register_cli_opts(cli_opts) + agent_config.register_agent_state_opts_helper(conf) + agent_config.register_root_helper(conf) + conf.register_cli_opts(vm_config.OPTS) + conf.register_opts(impl_unix.unix_opts) + + +def main(): + conf = cfg.CONF + + # NOTE(yamahata): work around. rpc driver-dependent config variables + # remove this line once tacker are fully ported to oslo.messaging + from tacker.openstack.common import rpc + conf.unregister_opts(rpc.rpc_opts) + + # NOTE(yamahata): corresponds to + # tacker.common.config.rpc.set_default(control_exchange='tacker') + messaging.set_transport_defaults('tacker') + + _register_options(conf) + conf(project='tacker') + config.setup_logging(conf) + legacy.modernize_quantum_config(conf) + # NOTE(yamahata): workaround for state_path + # oslo.messaging doesn't know state_path + conf.set_override('rpc_unix_ipc_dir', conf.svcvm_proxy_dir) + utils.log_opt_values(LOG) + + proxy = ProxyDaemon(conf) + if conf.daemonize: + proxy.start() + else: + proxy.run() + + +if __name__ == '__main__': + main() diff --git a/tacker/vm/agent/target.py b/tacker/vm/agent/target.py new file mode 100644 index 000000000..2aa0c7c62 --- /dev/null +++ b/tacker/vm/agent/target.py @@ -0,0 +1,45 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013, 2014 Intel Corporation. +# Copyright 2013, 2014 Isaku Yamahata +# +# All Rights Reserved. +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Isaku Yamahata, Intel Corporation. + + +from oslo.messaging import target + + +_KEYS = ['exchange', 'topic', 'namespace', 'version', 'server', 'fanout'] +_BOOLEAN_STATES = {'1': True, 'yes': True, 'true': True, 'on': True, + '0': False, 'no': False, 'false': False, 'off': False} + + +def target_parse(target_str): + attrs = target_str.split(',') + kwargs = dict(attr.split('=', 1) for attr in attrs) + if 'fanout' in kwargs: + # should use oslo.config.types.Bool.__call__ ? + value = kwargs['fanout'] + kwargs['fanout'] = _BOOLEAN_STATES[value.lower()] + return target.Target(**kwargs) + + +def target_str(target): + attrs = [(key, getattr(target, key)) + for key in _KEYS if getattr(target, key) is not None] + return ','.join('%s=%s' % attr for attr in attrs) diff --git a/tacker/vm/agent_base.py b/tacker/vm/agent_base.py new file mode 100644 index 000000000..08c01709d --- /dev/null +++ b/tacker/vm/agent_base.py @@ -0,0 +1,46 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013, 2014 Intel Corporation. +# Copyright 2013, 2014 Isaku Yamahata +# +# All Rights Reserved. +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Isaku Yamahata, Intel Corporation. + +# TODO(yamahata): consolidate with l3-agent, lbaas-agent once +# agent consolidation is done. +# https://blueprints.launchpad.net/tacker/+spec/l3-agent-consolication + +import abc +import six + + +# TODO(yamahata) +# communicate with service vm +@six.add_metaclass(abc.ABCMeta) +class ServiceVMAgentBase(object): + + @abc.abstractmethod + def create(self, info): + pass + + @abc.abstractmethod + def delete(self, info): + pass + + @abc.abstractmethod + def update(self, info): + pass