diff --git a/tacker/db/vm/proxy_db.py b/tacker/db/vm/proxy_db.py deleted file mode 100644 index 8256aa737..000000000 --- a/tacker/db/vm/proxy_db.py +++ /dev/null @@ -1,101 +0,0 @@ -# Copyright 2014 Intel Corporation. -# Copyright 2014 Isaku Yamahata -# -# All Rights Reserved. -# -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -# @author: Isaku Yamahata, Intel Corporation. - -import sqlalchemy as sa - -from tacker.db import db_base -from tacker.db import model_base - - -class ProxyMgmtPort(model_base.BASE): - device_id = sa.Column(sa.String(255), primary_key=True) - port_id = sa.Column(sa.String(36), nullable=False) - dst_transport_url = sa.Column(sa.String(255)) - svr_proxy_id = sa.Column(sa.String(36)) - svr_ns_proxy_id = sa.Column(sa.String(36)) - clt_proxy_id = sa.Column(sa.String(36)) - clt_ns_proxy_id = sa.Column(sa.String(36)) - - -class ProxyServicePort(model_base.BASE): - service_instance_id = sa.Column(sa.String(255), primary_key=True) - svr_proxy_id = sa.Column(sa.String(36)) - svr_ns_proxy_id = sa.Column(sa.String(36)) - clt_proxy_id = sa.Column(sa.String(36)) - clt_ns_proxy_id = sa.Column(sa.String(36)) - - -class RpcProxyDb(db_base.CommonDbMixin): - def _make_proxy_mgmt_port(self, proxy_mgmt_port): - key_list = ('device_id', 'port_id', 'dst_transport_url', - 'svr_proxy_id', 'svr_ns_proxy_id', - 'clt_proxy_id', 'clt_ns_proxy_id') - return dict((key, getattr(proxy_mgmt_port, key)) for key in key_list) - - def _make_proxy_service_port(self, proxy_service_port): - key_list = ('service_instance_id', 'svr_proxy_id', 'svr_ns_proxy_id', - 'clt_proxy_id', 'clt_ns_proxy_id') - return dict((key, getattr(proxy_service_port, key)) - for key in key_list) - - def create_proxy_mgmt_port(self, context, device_id, port_id, - dst_transport_url, - svr_proxy_id, svr_ns_proxy_id, - clt_proxy_id, clt_ns_proxy_id): - with context.session.begin(subtransactions=True): - proxy_mgmt_port = ProxyMgmtPort( - device_id=device_id, port_id=port_id, - dst_transport_url=dst_transport_url, - svr_proxy_id=svr_proxy_id, svr_ns_proxy_id=svr_ns_proxy_id, - clt_proxy_id=clt_proxy_id, clt_ns_proxy_id=clt_ns_proxy_id) - context.session.add(proxy_mgmt_port) - - def delete_proxy_mgmt_port(self, context, port_id): - with context.session.begin(subtransactions=True): - context.session.query(ProxyMgmtPort).filter_by( - port_id=port_id).delete() - - def get_proxy_mgmt_port(self, context, device_id): - with context.session.begin(subtransactions=True): - proxy_mgmt_port = context.session.query(ProxyMgmtPort).filter_by( - device_id=device_id).one() - return self._make_proxy_mgmt_port(proxy_mgmt_port) - - def create_proxy_service_port(self, context, service_instance_id, - svr_proxy_id, svr_ns_proxy_id, - clt_proxy_id, clt_ns_proxy_id): - with context.session.begin(subtransactions=True): - proxy_service_port = ProxyServicePort( - service_instance_id=service_instance_id, - svr_proxy_id=svr_proxy_id, svr_ns_proxy_id=svr_ns_proxy_id, - clt_proxy_id=clt_proxy_id, clt_ns_proxy_id=clt_ns_proxy_id) - context.session.add(proxy_service_port) - - def delete_proxy_service_port(self, context, service_instance_id): - with context.session.begin(subtransactions=True): - context.session.query(ProxyServicePort).filter_by( - service_instance_id=service_instance_id).delete() - - def get_proxy_service_port(self, context, service_instance_id): - with context.session.begin(subtransactions=True): - proxy_service_port = context.session.query( - ProxyServicePort).filter_by( - service_instance_id=service_instance_id).one() - return self._make_proxy_service_port(proxy_service_port) diff --git a/tacker/vm/agent/__init__.py b/tacker/vm/agent/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tacker/vm/agent/agent.py b/tacker/vm/agent/agent.py deleted file mode 100644 index 5b0d3da67..000000000 --- a/tacker/vm/agent/agent.py +++ /dev/null @@ -1,368 +0,0 @@ -# Copyright 2014 Intel Corporation. -# Copyright 2014 Isaku Yamahata -# -# All Rights Reserved. -# -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -# @author: Isaku Yamahata, Intel Corporation. - -import eventlet -eventlet.monkey_patch() - -import atexit -import inspect -import uuid - -import netaddr -from oslo.config import cfg -from oslo import messaging -from oslo.messaging._drivers import impl_unix -from oslo.messaging import proxy -from oslo.messaging import rpc -from oslo.messaging import transport - -from tacker.agent.common import config as agent_config -from tacker.agent.linux import external_process -from tacker.agent.linux import interface -from tacker.agent.linux import ip_lib -from tacker.common import config -from tacker.common import legacy -from tacker.common import topics -from tacker import manager -from tacker.openstack.common import excutils -from tacker.openstack.common import importutils -from tacker.openstack.common import lockutils -from tacker.openstack.common import log as logging -from tacker.openstack.common import service -from tacker import oslo_service -from tacker.services.loadbalancer.drivers.haproxy import namespace_driver -from tacker.vm.agent import config as vm_config -from tacker.vm.agent import target - -# _DEBUG = False -_DEBUG = True -LOG = logging.getLogger(__name__) - - -class NamespaceProxyAgentApi(object): - """ - api servicevm agent -> namespace proxy agent - """ - def __init__(self, unix_transport): - super(NamespaceProxyAgentApi, self).__init__() - target_ = messaging.Target(topic=topics.SERVICEVM_AGENT_NAMEPSACE) - self._client = rpc.RPCClient(unix_transport, target_) - - def _call(self, **kwargs): - method = inspect.stack()[1][3] - ctxt = {} - return self._client.call(ctxt, method, **kwargs) - - def destroy_namespace_agent(self): - return self._call() - - def create_rpc_namespace_proxy(self, src_target, - dst_transport_url, dst_target, direction): - return self._call( - src_target=src_target, dst_transport_url=dst_transport_url, - dst_target=dst_target, direction=direction) - - def destroy_rpc_namespace_proxy(self, namespace_proxy_id): - return self._call(namespace_proxy_id=namespace_proxy_id) - - -class NamespaceAgent(object): - def __init__(self, port_id, unix_transport, pm): - super(NamespaceAgent, self).__init__() - self.port_id = port_id - self.unix_transport = unix_transport - self.pm = pm - self.local_proxies = {} - self.api = NamespaceProxyAgentApi(unix_transport) - - -class ServiceVMAgent(manager.Manager): - _NS_PREFIX = 'qsvcvm-' - - @staticmethod - def _get_ns_name(port_id): - return ServiceVMAgent._NS_PREFIX + port_id - - def __init__(self, host=None, **kwargs): - conf = kwargs['conf'] - super(ServiceVMAgent, self).__init__(host=host) - self.conf = conf - self.root_helper = agent_config.get_root_helper(self.conf) - self._proxies = {} - - try: - vif_driver = importutils.import_object(conf.interface_driver, conf) - except ImportError: - with excutils.save_and_reraise_exception(): - msg = (_('Error importing interface driver: %s') - % conf.interface_driver) - LOG.error(msg) - self._vif_driver = vif_driver - self._proxy_agents = {} - self._src_transport = None - self._get_src_transport() - atexit.register(self._atexit) - - def _atexit(self): - for ns_agent in self._proxy_agents.values(): - ns_agent.pm.disable() - for port_id in self._proxy_agents.keys(): - self._unplug(port_id) - - def _get_src_transport(self): - conf = self.conf - - conf.register_opts(transport._transport_opts) - rpc_backend = conf.rpc_backend - if conf.transport_url is not None: - src_url = conf.transport_url - elif (rpc_backend.endswith('.impl_kombu') or - rpc_backend.endswith('.impl_rabbit')): - from oslo.messaging._drivers import impl_rabbit - conf.register_opts(impl_rabbit.rabbit_opts) - src_url = 'rabbit://%s:%s@%s:%s/%s' % ( - conf.rabbit_userid, conf.rabbit_password, - conf.rabbit_host, conf.rabbit_port, - conf.rabbit_virtual_host) - elif rpc_backend.endswith('.impl_qpid'): - from oslo.messaging._drivers import impl_qpid - conf.register_opts(impl_qpid.qpid_opts) - src_url = 'qpid://%s:%s@%s:%s/' % ( - conf.pid_username, conf.qpid_password, - conf.qpid_hostname, conf.qpid_port) - elif rpc_backend.endswith('.impl_zmq'): - from oslo.messaging._drivers import impl_zmq - conf.register_opts(impl_zmq.zmq_opts) - src_url = 'zmq://%s:%s/' % (conf.rpc_zmq_host, conf.rpc_zmq_port) - elif rpc_backend.endswith('.impl_fake'): - src_url = 'fake:///' - else: - raise NotImplementedError( - _('rpc_backend %s is not supported') % rpc_backend) - - self._src_transport = messaging.get_transport(conf, src_url) - - def __del__(self): - if self._src_transport is not None: - self._src_transport.cleanup() - - # def create_device(self, context, device): - # LOG.debug(_('create_device %s'), device) - - # def update_device(self, context, device): - # LOG.debug(_('update_device %s'), device) - - # def delete_device(self, context, device): - # LOG.debug(_('delete_device %s'), device) - - # def create_service(self, context, device, service_instance): - # LOG.debug(_('create_service %(device)s %(service_instance)s'), - # {'device': device, 'service_instance': service_instance}) - - # def update_service(self, context, device, service_instance): - # LOG.debug(_('update_service %(device)s %(service_instance)s'), - # {'device': device, 'service_instance': service_instance}) - - # def delete_service(self, context, device, service_instance): - # LOG.debug(_('delete_service %(device)s %(service_instance)s'), - # {'device': device, 'service_instance': service_instance}) - - # TODO(yamahata): copied from loadbalancer/drivers/haproxy/namespace_driver - # consolidate it. - def _plug(self, port_config): - vif_driver = self._vif_driver - namespace = self._get_ns_name(port_config['id']) - interface_name = vif_driver.get_device_name( - namespace_driver.Wrap(port_config)) - - if not ip_lib.device_exists(interface_name, self.root_helper, - namespace): - vif_driver.plug( - port_config['network_id'], port_config['id'], interface_name, - port_config['mac_address'], namespace=namespace) - cidrs = [ - '%s/%s' % (ip['ip_address'], - netaddr.IPNetwork(ip['subnet']['cidr']).prefixlen) - for ip in port_config['fixed_ips'] - ] - vif_driver.init_l3(interface_name, cidrs, namespace=namespace) - - gw_ip = port_config['fixed_ips'][0]['subnet'].get('gateway_ip') - if gw_ip: - cmd = ['route', 'add', 'default', 'gw', gw_ip] - ip_wrapper = ip_lib.IPWrapper(self.root_helper, - namespace=namespace) - ip_wrapper.netns.execute(cmd, check_exit_code=False) - - def _unplug(self, port_id): - port_stub = {'id': port_id} - namespace = self._get_ns_name(port_id) - vif_driver = self._vif_driver - interface_name = vif_driver.get_device_name( - namespace_driver.Wrap(port_stub)) - vif_driver.unplug(interface_name, namespace=namespace) - - @lockutils.synchronized('servicevm-agent', 'tacker-') - def create_namespace_agent(self, context, port): - conf = self.conf - port_id = port['id'] - path = 'rpc-proxy-%s' % port_id - unix_url = 'punix:///%s' % path - unix_transport = messaging.get_transport(conf, unix_url) - unix_transport._driver.punix_listening.wait() - - self._plug(port) - pm = external_process.ProcessManager( - conf, port_id, root_helper=self.root_helper, - namespace=self._get_ns_name(port_id)) - - def cmd_callback(pid_file_name): - cmd = ['tacker-servicevm-ns-rpc-proxy', - '--pid-file=%s' % pid_file_name, - '--svcvm-proxy-dir=%s' % conf.svcvm_proxy_dir, - '--src-transport-url', 'unix:///%s' % path] - cmd.extend(agent_config.get_log_args( - conf, 'tacker-servicevm-ns-rpc-proxy-%s.log' % port_id)) - if _DEBUG: - cmd += ['--log-file=/tmp/tacker-servicevm-ns-rpc-proxy-' - '%s.log' % port_id] - return cmd - pm.enable(cmd_callback) - - ns_agent = NamespaceAgent(port_id, unix_transport, pm) - self._proxy_agents[port_id] = ns_agent - - @lockutils.synchronized('servicevm-agent', 'tacker-') - def destroy_namespace_agent(self, context, port_id): - ns_agent = self._proxy_agents.pop(port_id) - ns_agent.api.destroy_namespace_agent() - for proxy_server in ns_agent.local_proxies.values(): - proxy_server.stop() - for proxy_server in ns_agent.local_proxies.values(): - proxy_server.wait() - ns_agent.pm.disable() - self._unplug(port_id) - - def _create_rpc_proxy(self, ns_agent, src_transport, src_target, - dst_transport, dst_target): - rpc_proxy_id = str(uuid.uuid4()) - src_target = target.target_parse(src_target) - assert src_target.server - dst_target = target.target_parse(dst_target) - assert dst_target.server - proxy_server = proxy.get_proxy_server( - src_transport, src_target, None, - dst_transport, dst_target, None, executor='eventlet') - ns_agent.local_proxies[rpc_proxy_id] = proxy_server - proxy_server.start() - return rpc_proxy_id - - def _get_proxy_agent(self, port_id): - ns_agent = self._proxy_agents.get(port_id) - if ns_agent is None: - msg = _('unknown port_id %s') % port_id - LOG.error(msg) - raise RuntimeError(msg) - return ns_agent - - @lockutils.synchronized('servicevm-agent', 'tacker-') - def create_rpc_proxy(self, context, port_id, - src_target, dst_unix_target, direction): - ns_agent = self._get_proxy_agent(port_id) - if direction == 'send': - return self._create_rpc_proxy( - ns_agent, self._src_transport, src_target, - ns_agent.unix_transport, dst_unix_target) - elif direction == 'receive': - return self._create_rpc_proxy( - ns_agent, ns_agent.unix_transport, dst_unix_target, - self._src_transport, src_target) - else: - msg = _('unknown direction %s') % direction - LOG.error(msg) - raise RuntimeError(msg) - - @lockutils.synchronized('servicevm-agent', 'tacker-') - def destroy_rpc_proxy(self, context, port_id, rpc_proxy_id): - ns_agent = self._get_proxy_agent(port_id) - proxy_server = ns_agent.local_proxies.pop(rpc_proxy_id) - proxy_server.stop() - proxy_server.wait() - - @lockutils.synchronized('servicevm-agent', 'tacker-') - def create_rpc_namespace_proxy(self, context, port_id, src_target, - dst_transport_url, dst_target, direction): - ns_agent = self._get_proxy_agent(port_id) - ns_proxy_id = ns_agent.api.create_rpc_namespace_proxy( - src_target, dst_transport_url, dst_target, direction) - LOG.debug("create_rpc_namespace_proxy %s", ns_proxy_id) - return ns_proxy_id - - @lockutils.synchronized('servicevm-agent', 'tacker-') - def destroy_rpc_namespace_proxy(self, context, - port_id, namespace_proxy_id): - ns_agent = self._get_proxy_agent(port_id) - return ns_agent.api.destroy_rpc_namespace_proxy(namespace_proxy_id) - - -class ServiceVMAgentWithStateReport(ServiceVMAgent): - # TODO(yamahata) - pass - - -def _register_options(conf): - conf.register_opts(interface.OPTS) - agent_config.register_interface_driver_opts_helper(conf) - agent_config.register_agent_state_opts_helper(conf) - agent_config.register_root_helper(conf) - conf.register_opts(vm_config.OPTS) - conf.register_opts(impl_unix.unix_opts) - - -def main(): - conf = cfg.CONF - - # NOTE(yamahata): work around. rpc driver-dependent config variables - # remove this line once tacker are fully ported to oslo.messaging - from tacker.openstack.common import rpc - conf.unregister_opts(rpc.rpc_opts) - - # NOTE(yamahata): corresponds to - # tacker.common.config.rpc.set_default(control_exchange='tacker') - messaging.set_transport_defaults('tacker') - - _register_options(conf) - conf(project='tacker') - config.setup_logging(conf) - legacy.modernize_quantum_config(conf) - # NOTE(yamahata): workaround for state_path - # oslo.messaging doesn't know state_path - conf.set_override('rpc_unix_ipc_dir', conf.svcvm_proxy_dir) - - server = oslo_service.TackerService.create( - topic=topics.SERVICEVM_AGENT, - manager='tacker.vm.agent.agent.ServiceVMAgentWithStateReport', - report_interval=conf.AGENT.report_interval, - conf=conf) - service.launch(server).wait() - - -if __name__ == '__main__': - main() diff --git a/tacker/vm/agent/config.py b/tacker/vm/agent/config.py deleted file mode 100644 index 4c698c9de..000000000 --- a/tacker/vm/agent/config.py +++ /dev/null @@ -1,31 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 -# -# Copyright 2013, 2014 Intel Corporation. -# Copyright 2013, 2014 Isaku Yamahata -# -# All Rights Reserved. -# -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -# @author: Isaku Yamahata, Intel Corporation. - -from oslo.config import cfg - - -OPTS = [ - cfg.StrOpt('svcvm-proxy-dir', - default='$state_path/svcvm_proxy_dir', - help=_('Location for servicevm agent proxy ' - 'UNIX domain socket')), -] diff --git a/tacker/vm/agent/namespace_proxy.py b/tacker/vm/agent/namespace_proxy.py deleted file mode 100644 index 222f2f0cf..000000000 --- a/tacker/vm/agent/namespace_proxy.py +++ /dev/null @@ -1,340 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 -# -# Copyright 2013, 2014 Intel Corporation. -# Copyright 2013, 2014 Isaku Yamahata -# -# All Rights Reserved. -# -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -# @author: Isaku Yamahata, Intel Corporation. - -import eventlet -eventlet.monkey_patch() - - -import os.path -import random -import sys -import uuid - -from oslo.config import cfg -from oslo import messaging -from oslo.messaging._drivers import impl_unix -from oslo.messaging import proxy -from oslo.messaging import rpc - -from tacker.agent.common import config as agent_config -from tacker.agent.linux import daemon -from tacker.common import config -from tacker.common import legacy -from tacker.common import topics -from tacker.common import utils -from tacker import context -from tacker import manager -from tacker.openstack.common import importutils -from tacker.openstack.common import lockutils -from tacker.openstack.common import log as logging -from tacker.openstack.common import service -from tacker import oslo_service -from tacker.vm.agent import config as vm_config -from tacker.vm.agent import target - - -LOG = logging.getLogger(__name__) - - -class NamespaceProxies(object): - def __init__(self): - super(NamespaceProxies, self).__init__() - self.urls = {} # dict: transport_url -> transport - self.transports = {} # dict: transport -> transport_url - self.proxies = {} # uuid -> (transport, proxy_server) - self.transport_to_proxies = {} # transport -> set of proxy_servers - - def get_transport(self, transport_url): - return self.urls.get(transport_url, None) - - def add_transport(self, transport_url, transport): - assert transport_url not in self.urls - assert transport not in self.transports - self.transports[transport_url] = transport - self.urls[transport] = transport_url - - def del_proxy(self, namespace_proxy_id): - transport, proxy_server = self.proxies.pop(namespace_proxy_id) - proxies = self.transport_to_proxies[transport] - proxies.remove(proxy_server) - if proxies: - transport = None - else: - transport_url = self.urls.pop(transport) - del self.transports[transport_url] - del self.transport_to_proxies[transport] - return (transport, proxy_server) - - def add_proxy(self, namespace_proxy_id, transport, proxy_server): - assert namespace_proxy_id not in self.proxies - self.proxies[namespace_proxy_id] = (transport, proxy_server) - proxies = self.transport_to_proxies.setdefault(transport, set()) - proxies.add(proxy_server) - - -class ServiceVMNamespaceAgent(manager.Manager): - def __init__(self, host=None, **kwargs): - LOG.debug(_('host %(host)s, kwargs %(kwargs)s'), - {'host': host, 'kwargs': kwargs}) - super(ServiceVMNamespaceAgent, self).__init__(host=host) - - for key in ('conf', 'src_transport', 'server_stop', ): - setattr(self, key, kwargs[key]) - assert self.src_transport is not None - assert self.server_stop is not None - - self._proxies = NamespaceProxies() - - def stop(self): - LOG.debug('stop') - self.server_stop() - ns_proxies = self._proxies - for _transport, proxy_server in ns_proxies.proxies.values(): - proxy_server.stop() - - def wait(self): - LOG.debug('wait') - ns_proxies = self._proxies - for _transport, proxy_server in ns_proxies.proxies.values(): - proxy_server.wait() - for transport in ns_proxies.transports.values(): - transport.cleanup() - - @lockutils.synchronized('servicevm-namespace-agent', 'tacker-') - def destroy_namespace_agent(self, context): - self.stop() - - def _create_rpc_namespace_proxy(self, src_transport, src_target, - dst_transport, dst_target): - src_target = target.target_parse(src_target) - assert src_target.server - dst_target = target.target_parse(dst_target) - assert dst_target.server - return proxy.get_proxy_server( - src_transport, src_target, None, - dst_transport, dst_target, None, executor='eventlet') - - @lockutils.synchronized('servicevm-namespace-agent', 'tacker-') - def create_rpc_namespace_proxy(self, context, src_target, - dst_transport_url, dst_target, direction): - LOG.debug('create_rpc_namespace_proxy %s %s %s %s %s', - context, src_target, dst_transport_url, dst_target, - direction) - dst_transport = self._proxies.get_transport(dst_transport_url) - if dst_transport is None: - dst_transport = messaging.get_transport(self.conf, - dst_transport_url) - self._proxies.add_transport(dst_transport_url, dst_transport) - if direction == 'send': - proxy_server = self._create_rpc_namespace_proxy( - self.src_transport, src_target, dst_transport, dst_target) - elif direction == 'receive': - proxy_server = self._create_rpc_namespace_proxy( - dst_transport, dst_target, self.src_transport, src_target) - else: - msg = _('unknown direction %s') % direction - LOG.error(msg) - raise RuntimeError(msg) - - # proxy_server.start() - eventlet.spawn(proxy_server.start) - namespace_proxy_id = str(uuid.uuid4()) - self._proxies.add_proxy(namespace_proxy_id, - dst_transport, proxy_server) - LOG.debug('namespace_proxy_id %s', namespace_proxy_id) - return namespace_proxy_id - - @lockutils.synchronized('servicevm-namespace-agent', 'tacker-') - def destroy_rpc_namespace_proxy(self, context, namespace_proxy_id): - LOG.debug('namespace_proxy_id %s', namespace_proxy_id) - try: - transport, proxy_server = self._proxies.del_proxy( - namespace_proxy_id) - except KeyError: - return - proxy_server.stop() - proxy_server.wait() - if transport is not None: - transport.cleanup() - - -# TODO(yamahata): class Service is stolen from nova.service and modified. -# port tacker to oslo.messaging and delete this class. -class Service(service.Service): - def __init__(self, conf, host, binary, topic, manager_, - report_interval=None, periodic_enable=None, - periodic_fuzzy_delay=None, periodic_interval_max=None, - *args, **kwargs): - super(Service, self).__init__() - self.conf = conf - self.host = host - self.binary = binary - self.topic = topic - self.manager_class_name = manager_ - manager_class = importutils.import_class(self.manager_class_name) - kwargs_ = kwargs.copy() - kwargs_['conf'] = conf - self.manager = manager_class(host=self.host, *args, **kwargs_) - self.src_transport = kwargs['src_transport'] - self.rpcserver = None - self.report_interval = report_interval - self.periodic_enable = periodic_enable - self.periodic_fuzzy_delay = periodic_fuzzy_delay - self.periodic_interval_max = periodic_interval_max - self.saved_args, self.saved_kwargs = args, kwargs - - def start(self): - self.manager.init_host() - LOG.debug(_("Creating RPC server for service %(topic)s %(driver)s"), - {'topic': self.topic, 'driver': self.src_transport._driver}) - - target = messaging.Target(topic=self.topic, server=self.host) - endpoints = [self.manager] - self.rpcserver = rpc.get_rpc_server(self.src_transport, target, - endpoints, executor='eventlet') - self.rpcserver.start() - - if self.periodic_enable: - if self.periodic_fuzzy_delay: - initial_delay = random.randint(0, self.periodic_fuzzy_delay) - else: - initial_delay = None - - self.tg.add_dynamic_timer( - self.periodic_tasks, initial_delay=initial_delay, - periodic_interval_max=self.periodic_interval_max) - self.manager.after_start() - LOG.debug('start done') - - @classmethod - def create(cls, conf, src_transport, - host=None, binary=None, topic=None, manager_=None, **kwargs): - if not host: - host = conf.host - if not binary: - binary = os.path.basename(sys.argv[0]) - if not topic: - topic = binary.rpartition('tacker-')[2] - topic = topic.replace('-', '_') - if not manager_: - manager_ = conf.get('%s_manager' % topic, None) - service_obj = cls(conf, host, binary, topic, manager_, - src_transport=src_transport, **kwargs) - return service_obj - - def kill(self): - self.stop() - - def stop(self): - try: - self.rpcserver.stop() - self.manager.stop() - except Exception: - LOG.exception(_('failed to stop rpcserver')) - - super(Service, self).stop() - - def wait(self): - try: - self.rpcserver.wait() - self.manager.wait() - except Exception: - LOG.exception(_('failed to wait rpcserver')) - - super(Service, self).wait() - - def periodic_tasks(self, raise_on_error=False): - """Tasks to be run at a periodic interval.""" - ctxt = context.get_admin_context() - return self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error) - - def report_state(self): - pass - - -class ProxyDaemon(daemon.Daemon): - def __init__(self, conf): - self._conf = conf - super(ProxyDaemon, self).__init__(conf.pid_file, uuid=conf.port_id) - - def run(self): - conf = self._conf - - def server_stop(): - server.stop() - LOG.debug(_('src transport url %s'), conf.src_transport_url) - src_transport = messaging.get_transport( - conf, conf.src_transport_url, - aliases=oslo_service.TRANSPORT_ALIASES) - server = Service.create( - conf=conf, topic=topics.SERVICEVM_AGENT_NAMEPSACE, - manager_=('tacker.vm.agent.namespace_proxy.' - 'ServiceVMNamespaceAgent'), - src_transport=src_transport, server_stop=server_stop) - service.launch(server).wait() - src_transport.cleanup() - - -def _register_options(conf): - cli_opts = [ - cfg.StrOpt('pid-file', help=_('pid file of this process.')), - cfg.StrOpt('port-id', help=_('uuid of port')), - cfg.StrOpt('src-transport-url', help='src transport url'), - cfg.BoolOpt('daemonize', default=True, help=_('Run as daemon')) - ] - conf.register_cli_opts(cli_opts) - agent_config.register_agent_state_opts_helper(conf) - agent_config.register_root_helper(conf) - conf.register_cli_opts(vm_config.OPTS) - conf.register_opts(impl_unix.unix_opts) - - -def main(): - conf = cfg.CONF - - # NOTE(yamahata): work around. rpc driver-dependent config variables - # remove this line once tacker are fully ported to oslo.messaging - from tacker.openstack.common import rpc - conf.unregister_opts(rpc.rpc_opts) - - # NOTE(yamahata): corresponds to - # tacker.common.config.rpc.set_default(control_exchange='tacker') - messaging.set_transport_defaults('tacker') - - _register_options(conf) - conf(project='tacker') - config.setup_logging(conf) - legacy.modernize_quantum_config(conf) - # NOTE(yamahata): workaround for state_path - # oslo.messaging doesn't know state_path - conf.set_override('rpc_unix_ipc_dir', conf.svcvm_proxy_dir) - utils.log_opt_values(LOG) - - proxy = ProxyDaemon(conf) - if conf.daemonize: - proxy.start() - else: - proxy.run() - - -if __name__ == '__main__': - main() diff --git a/tacker/vm/agent/target.py b/tacker/vm/agent/target.py deleted file mode 100644 index 2aa0c7c62..000000000 --- a/tacker/vm/agent/target.py +++ /dev/null @@ -1,45 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 -# -# Copyright 2013, 2014 Intel Corporation. -# Copyright 2013, 2014 Isaku Yamahata -# -# All Rights Reserved. -# -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -# @author: Isaku Yamahata, Intel Corporation. - - -from oslo.messaging import target - - -_KEYS = ['exchange', 'topic', 'namespace', 'version', 'server', 'fanout'] -_BOOLEAN_STATES = {'1': True, 'yes': True, 'true': True, 'on': True, - '0': False, 'no': False, 'false': False, 'off': False} - - -def target_parse(target_str): - attrs = target_str.split(',') - kwargs = dict(attr.split('=', 1) for attr in attrs) - if 'fanout' in kwargs: - # should use oslo.config.types.Bool.__call__ ? - value = kwargs['fanout'] - kwargs['fanout'] = _BOOLEAN_STATES[value.lower()] - return target.Target(**kwargs) - - -def target_str(target): - attrs = [(key, getattr(target, key)) - for key in _KEYS if getattr(target, key) is not None] - return ','.join('%s=%s' % attr for attr in attrs) diff --git a/tacker/vm/agent_base.py b/tacker/vm/agent_base.py deleted file mode 100644 index 08c01709d..000000000 --- a/tacker/vm/agent_base.py +++ /dev/null @@ -1,46 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 -# -# Copyright 2013, 2014 Intel Corporation. -# Copyright 2013, 2014 Isaku Yamahata -# -# All Rights Reserved. -# -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -# @author: Isaku Yamahata, Intel Corporation. - -# TODO(yamahata): consolidate with l3-agent, lbaas-agent once -# agent consolidation is done. -# https://blueprints.launchpad.net/tacker/+spec/l3-agent-consolication - -import abc -import six - - -# TODO(yamahata) -# communicate with service vm -@six.add_metaclass(abc.ABCMeta) -class ServiceVMAgentBase(object): - - @abc.abstractmethod - def create(self, info): - pass - - @abc.abstractmethod - def delete(self, info): - pass - - @abc.abstractmethod - def update(self, info): - pass diff --git a/tacker/vm/mgmt_drivers/rpc/__init__.py b/tacker/vm/mgmt_drivers/rpc/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tacker/vm/mgmt_drivers/rpc/config.py b/tacker/vm/mgmt_drivers/rpc/config.py deleted file mode 100644 index 212a6e02c..000000000 --- a/tacker/vm/mgmt_drivers/rpc/config.py +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright 2014 Intel Corporation. -# Copyright 2014 Isaku Yamahata -# -# All Rights Reserved. -# -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -# @author: Isaku Yamahata, Intel Corporation. - -from oslo.config import cfg - -from tacker.common import topics - - -_RPC_AGENT_OPTS = [ - cfg.StrOpt('device_id', default=None, help=_('The device id')), - cfg.StrOpt('topic', default=topics.SERVICEVM_AGENT, - help=_('rpc topic for agent to subscribe')), -] - - -def register_servicevm_agent_opts(conf): - conf.register_opts(_RPC_AGENT_OPTS, group='servicevm_agent') diff --git a/tacker/vm/mgmt_drivers/rpc/proxy.py b/tacker/vm/mgmt_drivers/rpc/proxy.py deleted file mode 100644 index 5f14bdfdc..000000000 --- a/tacker/vm/mgmt_drivers/rpc/proxy.py +++ /dev/null @@ -1,181 +0,0 @@ -# Copyright 2014 Intel Corporation. -# Copyright 2014 Isaku Yamahata -# -# All Rights Reserved. -# -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -# @author: Isaku Yamahata, Intel Corporation. - -from oslo.config import cfg - -from tacker.db.vm import proxy_db -from tacker.openstack.common import log as logging -from tacker.vm import constants -from tacker.vm.mgmt_drivers.rpc import rpc - - -LOG = logging.getLogger(__name__) - - -class AgentRpcProxyMGMTDriver(rpc.AgentRpcMGMTDriver): - _TRANSPORT_OPTS = [ - cfg.StrOpt('dst_transport_url', - # TODO(yamahata): make user, pass, port configurable - # per servicevm - #'://:@:/' - default='rabbit://guest:guest@%(host)s:5672/', - help='A URL representing the messaging driver ' - 'to use and its full configuration.'), - ] - - def __init__(self, conf=None): - super(AgentRpcProxyMGMTDriver, self).__init__() - self.db = proxy_db.RpcProxyDb() - self.conf = conf or cfg.CONF - self.conf.register_opts(self._TRANSPORT_OPTS) - - def get_type(self): - return 'agent-proxy' - - def get_name(self): - return 'agent-proxy' - - def get_description(self): - return 'agent-proxy' - - def mgmt_create_post(self, plugin, context, device): - LOG.debug('mgmt_create_post') - mgmt_entries = [sc_entry for sc_entry in device['service_context'] - if (sc_entry['role'] == constants.ROLE_MGMT and - sc_entry.get('port_id'))] - assert mgmt_entries - mgmt_entry = mgmt_entries[0] - - vm_port_id = mgmt_entry['port_id'] - vm_port = plugin._core_plugin.get_port(context, vm_port_id) - fixed_ip = vm_port['fixed_ips'][0]['ip_address'] - # TODO(yamahata): make all parameters(scheme, user, pass, port) - # configurable - dst_transport_url = self.conf.dst_transport_url % {'host': fixed_ip} - - network_id = mgmt_entry['network_id'] - assert network_id - - proxy_api = plugin.proxy_api - port_id = proxy_api.create_namespace_agent(plugin._core_plugin, - context, network_id) - - device_id = device['id'] - target = 'topic=%s,server=%s' % (self._mgmt_topic(device), - self._mgmt_server(device)) - svr_proxy_id = proxy_api.create_rpc_proxy( - context, port_id, target, target, 'receive') - LOG.debug('mgmt_create_post: svr_proxy_id: %s', svr_proxy_id) - svr_ns_proxy_id = proxy_api.create_rpc_namespace_proxy( - context, port_id, target, dst_transport_url, target, 'receive') - LOG.debug('mgmt_create_post: svr_ns_proxy_id: %s', svr_ns_proxy_id) - clt_proxy_id = proxy_api.create_rpc_proxy( - context, port_id, target, target, 'send') - LOG.debug('mgmt_create_post: clt_proxy_id: %s', clt_proxy_id) - clt_ns_proxy_id = proxy_api.create_rpc_namespace_proxy( - context, port_id, target, dst_transport_url, target, 'send') - LOG.debug('mgmt_create_post: clt_ns_proxy_id: %s', clt_ns_proxy_id) - - LOG.debug('mgmt_create_ppost: ' - 'svr: %s svr_ns: %s clt: %s clt_ns: %s ', - svr_proxy_id, svr_ns_proxy_id, clt_proxy_id, clt_ns_proxy_id) - self.db.create_proxy_mgmt_port( - context, device_id, port_id, dst_transport_url, - svr_proxy_id, svr_ns_proxy_id, clt_proxy_id, clt_ns_proxy_id) - - def mgmt_delete_post(self, plugin, context, device): - LOG.debug('mgmt_delete_post') - device_id = device['id'] - - proxy_mgmt_port = self.db.get_proxy_mgmt_port(context, device_id) - port_id = proxy_mgmt_port['port_id'] - svr_proxy_id = proxy_mgmt_port['svr_proxy_id'] - svr_ns_proxy_id = proxy_mgmt_port['svr_ns_proxy_id'] - clt_proxy_id = proxy_mgmt_port['clt_proxy_id'] - clt_ns_proxy_id = proxy_mgmt_port['clt_ns_proxy_id'] - - proxy_api = plugin.proxy_api - proxy_api.destroy_rpc_namespace_proxy(context, - port_id, clt_ns_proxy_id) - proxy_api.destroy_rpc_proxy(context, port_id, clt_proxy_id) - proxy_api.destroy_rpc_namespace_proxy(context, - port_id, svr_ns_proxy_id) - proxy_api.destroy_rpc_proxy(context, port_id, svr_proxy_id) - proxy_api.destroy_namespace_agent(plugin._core_plugin, - context, port_id) - - self.db.delete_proxy_mgmt_port(context, port_id) - - def mgmt_service_create_pre(self, plugin, context, device, - service_instance): - LOG.debug('mgmt_service_create_pre') - proxy_mgmt_port = self.db.get_proxy_mgmt_port(context, device['id']) - port_id = proxy_mgmt_port['port_id'] - dst_transport_url = proxy_mgmt_port['dst_transport_url'] - - proxy_api = plugin.proxy_api - target = 'topic=%s,server=%s' % ( - self._mgmt_service_topic(device, service_instance), - self._mgmt_service_server(device, service_instance)) - svr_proxy_id = proxy_api.create_rpc_proxy( - context, port_id, target, target, 'receive') - LOG.debug('mgmt_service_create_pre: svr_proxy_id: %s', svr_proxy_id) - svr_ns_proxy_id = proxy_api.create_rpc_namespace_proxy( - context, port_id, target, dst_transport_url, target, 'receive') - LOG.debug('mgmt_service_create_pre: svr_ns_proxy_id: %s', - svr_ns_proxy_id) - clt_proxy_id = proxy_api.create_rpc_proxy( - context, port_id, target, target, 'send') - LOG.debug('mgmt_service_create_pre: clt_proxy_id: %s', clt_proxy_id) - clt_ns_proxy_id = proxy_api.create_rpc_namespace_proxy( - context, port_id, target, dst_transport_url, target, 'send') - LOG.debug('mgmt_service_create_pre: clt_ns_proxy_id: %s', - clt_ns_proxy_id) - - LOG.debug('mgmt_service_create_pre: ' - 'svr: %s svr_ns: %s clt: %s clt_ns: %s ', - svr_proxy_id, svr_ns_proxy_id, clt_proxy_id, clt_ns_proxy_id) - self.db.create_proxy_service_port( - context, service_instance['id'], - svr_proxy_id, svr_ns_proxy_id, clt_proxy_id, clt_ns_proxy_id) - - def mgmt_service_delete_post(self, plugin, context, device, - service_instance): - LOG.debug('mgmt_service_delete_post') - proxy_mgmt_port = self.db.get_proxy_mgmt_port(context, device['id']) - port_id = proxy_mgmt_port['port_id'] - service_instance_id = service_instance['id'] - proxy_service_port = self.db.get_proxy_service_port( - context, service_instance_id) - - svr_proxy_id = proxy_service_port['svr_proxy_id'] - svr_ns_proxy_id = proxy_service_port['svr_ns_proxy_id'] - clt_proxy_id = proxy_service_port['clt_proxy_id'] - clt_ns_proxy_id = proxy_service_port['clt_ns_proxy_id'] - - proxy_api = plugin.proxy_api - proxy_api.destroy_rpc_namespace_proxy(context, - port_id, clt_ns_proxy_id) - proxy_api.destroy_rpc_proxy(context, port_id, clt_proxy_id) - proxy_api.destroy_rpc_namespace_proxy(context, - port_id, svr_ns_proxy_id) - proxy_api.destroy_rpc_proxy(context, port_id, svr_proxy_id) - - self.db.delete_proxy_service_port(context, service_instance_id) diff --git a/tacker/vm/mgmt_drivers/rpc/rpc.py b/tacker/vm/mgmt_drivers/rpc/rpc.py deleted file mode 100644 index 1e79ae184..000000000 --- a/tacker/vm/mgmt_drivers/rpc/rpc.py +++ /dev/null @@ -1,107 +0,0 @@ -# Copyright 2014 Intel Corporation. -# Copyright 2014 Isaku Yamahata -# -# All Rights Reserved. -# -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -# @author: Isaku Yamahata, Intel Corporation. - -from tacker.common import rpc_compat -from tacker.common import topics -from tacker.vm.mgmt_drivers import abstract_driver -from tacker.vm.mgmt_drivers import constants - - -class ServiceVMAgentRpcApi(rpc_compat.RpcProxy): - BASE_RPC_API_VERSION = '1.0' - - def __init__(self, topic=topics.SERVICEVM_AGENT): - super(ServiceVMAgentRpcApi, self).__init__( - topic=topic, default_version=self.BASE_RPC_API_VERSION) - - def rpc_cast(self, context, method, kwargs, topic): - self.cast(context, self.make_msg(method, **kwargs), topic=topic) - - -# TODO(yamahata): port this to oslo.messaging -# address format needs be changed to -# oslo.messaging.target.Target -class AgentRpcMGMTDriver(abstract_driver.DeviceMGMTAbstractDriver): - _TOPIC = topics.SERVICEVM_AGENT # can be overridden by subclass - _RPC_API = {} # topic -> ServiceVMAgentRpcApi - - @property - def _rpc_api(self): - topic = self._TOPIC - api = self._RPC_API.get(topic) - if api is None: - api = ServiceVMAgentRpcApi(topic=topic) - api = self._RPC_API.setdefault(topic, api) - return api - - def get_type(self): - return 'agent-rpc' - - def get_name(self): - return 'agent-rpc' - - def get_description(self): - return 'agent-rpc' - - def mgmt_get_config(self, plugin, context, device): - return {'/etc/tacker/servicevm-agent.ini': - '[servicevm]\n' - 'topic = %s\n' - 'device_id = %s\n' - % (self._TOPIC, device['id'])} - - @staticmethod - def _address(topic, server): - return '%s.%s' % (topic, server) - - def _mgmt_server(self, device): - return device['id'] - - def _mgmt_topic(self, device): - return '%s-%s' % (self._TOPIC, self._mgmt_server(device)) - - def mgmt_address(self, plugin, context, device): - return self._address(self._mgmt_topic(device), - self._mgmt_server(device)) - - def mgmt_call(self, plugin, context, device, kwargs): - topic = device['mgmt_address'] - method = kwargs[constants.KEY_ACTION] - kwargs_ = kwargs[constants.KEY_KWARGS] - self._rpc_api.rpc_cast(context, method, kwargs_, topic) - - def _mgmt_service_server(self, device, service_instance): - return '%s-%s' % (device['id'], service_instance['id']) - - def _mgmt_service_topic(self, device, service_instance): - return '%s-%s' % (self._TOPIC, - self._mgmt_service_server(device, service_instance)) - - def mgmt_service_address(self, plugin, context, device, service_instance): - return self._address( - self._mgmt_service_topic(device, service_instance), - self._mgmt_service_server(device, service_instance)) - - def mgmt_service_call(self, plugin, context, device, - service_instance, kwargs): - method = kwargs[constants.KEY_ACTION] - kwargs_ = kwargs[constants.KEY_KWARGS] - topic = service_instance['mgmt_address'] - self._rpc_api.rpc_cast(context, method, kwargs_, topic) diff --git a/tacker/vm/plugin.py b/tacker/vm/plugin.py index 5d1c6056a..34a4097b4 100644 --- a/tacker/vm/plugin.py +++ b/tacker/vm/plugin.py @@ -27,14 +27,12 @@ from oslo.config import cfg from tacker.api.v1 import attributes from tacker.common import driver_manager -from tacker.common import topics from tacker.db.vm import vm_db from tacker.extensions import servicevm from tacker.openstack.common import excutils from tacker.openstack.common import log as logging from tacker.plugins.common import constants from tacker.vm.mgmt_drivers import constants as mgmt_constants -from tacker.vm import proxy_api LOG = logging.getLogger(__name__) @@ -114,7 +112,6 @@ class ServiceVMPlugin(vm_db.ServiceResourcePluginDb, ServiceVMMgmtMixin): self._device_manager = driver_manager.DriverManager( 'tacker.servicevm.device.drivers', cfg.CONF.servicevm.device_driver) - self.proxy_api = proxy_api.ServiceVMPluginApi(topics.SERVICEVM_AGENT) def spawn_n(self, function, *args, **kwargs): self._pool.spawn_n(function, *args, **kwargs) diff --git a/tacker/vm/proxy_api.py b/tacker/vm/proxy_api.py deleted file mode 100644 index c3749e352..000000000 --- a/tacker/vm/proxy_api.py +++ /dev/null @@ -1,116 +0,0 @@ -# Copyright 2013, 2014 Intel Corporation. -# Copyright 2013, 2014 Isaku Yamahata -# -# All Rights Reserved. -# -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -# @author: Isaku Yamahata, Intel Corporation. - -import inspect - -from tacker.api.v1 import attributes -from tacker.common import rpc_compat -from tacker.openstack.common import excutils -from tacker.openstack.common import log as logging -from tacker.plugins.common import constants - - -LOG = logging.getLogger(__name__) - - -# TODO(yamahata): convert oslo.messaging -class ServiceVMPluginApi(rpc_compat.RpcProxy): - API_VERSION = '1.0' - - def __init__(self, topic): - super(ServiceVMPluginApi, self).__init__(topic, self.API_VERSION) - - def _call(self, context, **kwargs): - method = inspect.stack()[1][3] - LOG.debug('ServiceVMPluginApi method = %s kwargs = %s', method, kwargs) - return self.call(context, self.make_msg(method, **kwargs)) - - def create_namespace_agent(self, core_plugin, context, network_id): - """ - :param dst_transport_url: st - :type dst_transport_url: str that represents - oslo.messaging.transportURL - """ - port_data = { - 'name': '_svcvm-rpc-namespace-agent-' + network_id, - 'network_id': network_id, - 'mac_address': attributes.ATTR_NOT_SPECIFIED, - 'admin_state_up': True, - 'device_id': '_svcvm-rpc-proxy-' + network_id, - 'device_owner': 'tacker:' + constants.SERVICEVM, - 'fixed_ips': attributes.ATTR_NOT_SPECIFIED, - } - port = core_plugin.create_port(context, {'port': port_data}) - for i in xrange(len(port['fixed_ips'])): - ipallocation = port['fixed_ips'][i] - subnet_id = ipallocation['subnet_id'] - subnet = core_plugin.get_subnet(context, subnet_id) - ipallocation['subnet'] = subnet - port_id = port['id'] - try: - self._call(context, port=port) - except Exception: - with excutils.save_and_reraise_exception(): - core_plugin.delete_port(context, port_id) - return port_id - - def destroy_namespace_agent(self, core_plugin, context, port_id): - self._call(context, port_id=port_id) - core_plugin.delete_port(context, port_id) - - def create_rpc_proxy(self, context, port_id, src_target, dst_unix_target, - direction): - """ - :param src_target: target to listen/send - :type src_target: oslo.messaging.Target - :param dst_unix_target: target to send/listen - :type dst_unix_target: oslo.messaging.Target - :param direction: RPC direction - :type direction: str 'send' or 'receive' - 'send': tacker server -> agent - 'receive': neturon server <- agent - """ - return self._call(context, port_id=port_id, src_target=src_target, - dst_unix_target=dst_unix_target, direction=direction) - - def destroy_rpc_proxy(self, context, port_id, rpc_proxy_id): - return self._call(context, proxy_id=port_id, rpc_proxy_id=rpc_proxy_id) - - def create_rpc_namespace_proxy(self, context, port_id, src_target, - dst_transport_url, dst_target, direction): - """ - :param src_target: target to listen/send - :type src_target: oslo.messaging.Target - :param dst_target: target to send/listen - :type dst_target: oslo.messaging.Target - :param direction: RPC direction - :type direction: str 'send' or 'receive' - 'send': tacker server -> agent - 'receive': neturon server <- agent - """ - return self._call(context, port_id=port_id, - src_target=src_target, - dst_transport_url=dst_transport_url, - dst_target=dst_target, direction=direction) - - def destroy_rpc_namespace_proxy(self, context, port_id, - namespace_proxy_id): - return self._call(context, port_id=port_id, - namespace_proxy_id=namespace_proxy_id)