drop agent related code which isn't used at this point
Change-Id: I1726802b837d0d84eac4b0f935ceca0df6412288
This commit is contained in:
parent
885387a4da
commit
313d8da79f
@ -1,101 +0,0 @@
|
|||||||
# Copyright 2014 Intel Corporation.
|
|
||||||
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
|
|
||||||
# <isaku.yamahata at gmail com>
|
|
||||||
# All Rights Reserved.
|
|
||||||
#
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
#
|
|
||||||
# @author: Isaku Yamahata, Intel Corporation.
|
|
||||||
|
|
||||||
import sqlalchemy as sa
|
|
||||||
|
|
||||||
from tacker.db import db_base
|
|
||||||
from tacker.db import model_base
|
|
||||||
|
|
||||||
|
|
||||||
class ProxyMgmtPort(model_base.BASE):
|
|
||||||
device_id = sa.Column(sa.String(255), primary_key=True)
|
|
||||||
port_id = sa.Column(sa.String(36), nullable=False)
|
|
||||||
dst_transport_url = sa.Column(sa.String(255))
|
|
||||||
svr_proxy_id = sa.Column(sa.String(36))
|
|
||||||
svr_ns_proxy_id = sa.Column(sa.String(36))
|
|
||||||
clt_proxy_id = sa.Column(sa.String(36))
|
|
||||||
clt_ns_proxy_id = sa.Column(sa.String(36))
|
|
||||||
|
|
||||||
|
|
||||||
class ProxyServicePort(model_base.BASE):
|
|
||||||
service_instance_id = sa.Column(sa.String(255), primary_key=True)
|
|
||||||
svr_proxy_id = sa.Column(sa.String(36))
|
|
||||||
svr_ns_proxy_id = sa.Column(sa.String(36))
|
|
||||||
clt_proxy_id = sa.Column(sa.String(36))
|
|
||||||
clt_ns_proxy_id = sa.Column(sa.String(36))
|
|
||||||
|
|
||||||
|
|
||||||
class RpcProxyDb(db_base.CommonDbMixin):
|
|
||||||
def _make_proxy_mgmt_port(self, proxy_mgmt_port):
|
|
||||||
key_list = ('device_id', 'port_id', 'dst_transport_url',
|
|
||||||
'svr_proxy_id', 'svr_ns_proxy_id',
|
|
||||||
'clt_proxy_id', 'clt_ns_proxy_id')
|
|
||||||
return dict((key, getattr(proxy_mgmt_port, key)) for key in key_list)
|
|
||||||
|
|
||||||
def _make_proxy_service_port(self, proxy_service_port):
|
|
||||||
key_list = ('service_instance_id', 'svr_proxy_id', 'svr_ns_proxy_id',
|
|
||||||
'clt_proxy_id', 'clt_ns_proxy_id')
|
|
||||||
return dict((key, getattr(proxy_service_port, key))
|
|
||||||
for key in key_list)
|
|
||||||
|
|
||||||
def create_proxy_mgmt_port(self, context, device_id, port_id,
|
|
||||||
dst_transport_url,
|
|
||||||
svr_proxy_id, svr_ns_proxy_id,
|
|
||||||
clt_proxy_id, clt_ns_proxy_id):
|
|
||||||
with context.session.begin(subtransactions=True):
|
|
||||||
proxy_mgmt_port = ProxyMgmtPort(
|
|
||||||
device_id=device_id, port_id=port_id,
|
|
||||||
dst_transport_url=dst_transport_url,
|
|
||||||
svr_proxy_id=svr_proxy_id, svr_ns_proxy_id=svr_ns_proxy_id,
|
|
||||||
clt_proxy_id=clt_proxy_id, clt_ns_proxy_id=clt_ns_proxy_id)
|
|
||||||
context.session.add(proxy_mgmt_port)
|
|
||||||
|
|
||||||
def delete_proxy_mgmt_port(self, context, port_id):
|
|
||||||
with context.session.begin(subtransactions=True):
|
|
||||||
context.session.query(ProxyMgmtPort).filter_by(
|
|
||||||
port_id=port_id).delete()
|
|
||||||
|
|
||||||
def get_proxy_mgmt_port(self, context, device_id):
|
|
||||||
with context.session.begin(subtransactions=True):
|
|
||||||
proxy_mgmt_port = context.session.query(ProxyMgmtPort).filter_by(
|
|
||||||
device_id=device_id).one()
|
|
||||||
return self._make_proxy_mgmt_port(proxy_mgmt_port)
|
|
||||||
|
|
||||||
def create_proxy_service_port(self, context, service_instance_id,
|
|
||||||
svr_proxy_id, svr_ns_proxy_id,
|
|
||||||
clt_proxy_id, clt_ns_proxy_id):
|
|
||||||
with context.session.begin(subtransactions=True):
|
|
||||||
proxy_service_port = ProxyServicePort(
|
|
||||||
service_instance_id=service_instance_id,
|
|
||||||
svr_proxy_id=svr_proxy_id, svr_ns_proxy_id=svr_ns_proxy_id,
|
|
||||||
clt_proxy_id=clt_proxy_id, clt_ns_proxy_id=clt_ns_proxy_id)
|
|
||||||
context.session.add(proxy_service_port)
|
|
||||||
|
|
||||||
def delete_proxy_service_port(self, context, service_instance_id):
|
|
||||||
with context.session.begin(subtransactions=True):
|
|
||||||
context.session.query(ProxyServicePort).filter_by(
|
|
||||||
service_instance_id=service_instance_id).delete()
|
|
||||||
|
|
||||||
def get_proxy_service_port(self, context, service_instance_id):
|
|
||||||
with context.session.begin(subtransactions=True):
|
|
||||||
proxy_service_port = context.session.query(
|
|
||||||
ProxyServicePort).filter_by(
|
|
||||||
service_instance_id=service_instance_id).one()
|
|
||||||
return self._make_proxy_service_port(proxy_service_port)
|
|
@ -1,368 +0,0 @@
|
|||||||
# Copyright 2014 Intel Corporation.
|
|
||||||
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
|
|
||||||
# <isaku.yamahata at gmail com>
|
|
||||||
# All Rights Reserved.
|
|
||||||
#
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
#
|
|
||||||
# @author: Isaku Yamahata, Intel Corporation.
|
|
||||||
|
|
||||||
import eventlet
|
|
||||||
eventlet.monkey_patch()
|
|
||||||
|
|
||||||
import atexit
|
|
||||||
import inspect
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
import netaddr
|
|
||||||
from oslo.config import cfg
|
|
||||||
from oslo import messaging
|
|
||||||
from oslo.messaging._drivers import impl_unix
|
|
||||||
from oslo.messaging import proxy
|
|
||||||
from oslo.messaging import rpc
|
|
||||||
from oslo.messaging import transport
|
|
||||||
|
|
||||||
from tacker.agent.common import config as agent_config
|
|
||||||
from tacker.agent.linux import external_process
|
|
||||||
from tacker.agent.linux import interface
|
|
||||||
from tacker.agent.linux import ip_lib
|
|
||||||
from tacker.common import config
|
|
||||||
from tacker.common import legacy
|
|
||||||
from tacker.common import topics
|
|
||||||
from tacker import manager
|
|
||||||
from tacker.openstack.common import excutils
|
|
||||||
from tacker.openstack.common import importutils
|
|
||||||
from tacker.openstack.common import lockutils
|
|
||||||
from tacker.openstack.common import log as logging
|
|
||||||
from tacker.openstack.common import service
|
|
||||||
from tacker import oslo_service
|
|
||||||
from tacker.services.loadbalancer.drivers.haproxy import namespace_driver
|
|
||||||
from tacker.vm.agent import config as vm_config
|
|
||||||
from tacker.vm.agent import target
|
|
||||||
|
|
||||||
# _DEBUG = False
|
|
||||||
_DEBUG = True
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class NamespaceProxyAgentApi(object):
|
|
||||||
"""
|
|
||||||
api servicevm agent -> namespace proxy agent
|
|
||||||
"""
|
|
||||||
def __init__(self, unix_transport):
|
|
||||||
super(NamespaceProxyAgentApi, self).__init__()
|
|
||||||
target_ = messaging.Target(topic=topics.SERVICEVM_AGENT_NAMEPSACE)
|
|
||||||
self._client = rpc.RPCClient(unix_transport, target_)
|
|
||||||
|
|
||||||
def _call(self, **kwargs):
|
|
||||||
method = inspect.stack()[1][3]
|
|
||||||
ctxt = {}
|
|
||||||
return self._client.call(ctxt, method, **kwargs)
|
|
||||||
|
|
||||||
def destroy_namespace_agent(self):
|
|
||||||
return self._call()
|
|
||||||
|
|
||||||
def create_rpc_namespace_proxy(self, src_target,
|
|
||||||
dst_transport_url, dst_target, direction):
|
|
||||||
return self._call(
|
|
||||||
src_target=src_target, dst_transport_url=dst_transport_url,
|
|
||||||
dst_target=dst_target, direction=direction)
|
|
||||||
|
|
||||||
def destroy_rpc_namespace_proxy(self, namespace_proxy_id):
|
|
||||||
return self._call(namespace_proxy_id=namespace_proxy_id)
|
|
||||||
|
|
||||||
|
|
||||||
class NamespaceAgent(object):
|
|
||||||
def __init__(self, port_id, unix_transport, pm):
|
|
||||||
super(NamespaceAgent, self).__init__()
|
|
||||||
self.port_id = port_id
|
|
||||||
self.unix_transport = unix_transport
|
|
||||||
self.pm = pm
|
|
||||||
self.local_proxies = {}
|
|
||||||
self.api = NamespaceProxyAgentApi(unix_transport)
|
|
||||||
|
|
||||||
|
|
||||||
class ServiceVMAgent(manager.Manager):
|
|
||||||
_NS_PREFIX = 'qsvcvm-'
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _get_ns_name(port_id):
|
|
||||||
return ServiceVMAgent._NS_PREFIX + port_id
|
|
||||||
|
|
||||||
def __init__(self, host=None, **kwargs):
|
|
||||||
conf = kwargs['conf']
|
|
||||||
super(ServiceVMAgent, self).__init__(host=host)
|
|
||||||
self.conf = conf
|
|
||||||
self.root_helper = agent_config.get_root_helper(self.conf)
|
|
||||||
self._proxies = {}
|
|
||||||
|
|
||||||
try:
|
|
||||||
vif_driver = importutils.import_object(conf.interface_driver, conf)
|
|
||||||
except ImportError:
|
|
||||||
with excutils.save_and_reraise_exception():
|
|
||||||
msg = (_('Error importing interface driver: %s')
|
|
||||||
% conf.interface_driver)
|
|
||||||
LOG.error(msg)
|
|
||||||
self._vif_driver = vif_driver
|
|
||||||
self._proxy_agents = {}
|
|
||||||
self._src_transport = None
|
|
||||||
self._get_src_transport()
|
|
||||||
atexit.register(self._atexit)
|
|
||||||
|
|
||||||
def _atexit(self):
|
|
||||||
for ns_agent in self._proxy_agents.values():
|
|
||||||
ns_agent.pm.disable()
|
|
||||||
for port_id in self._proxy_agents.keys():
|
|
||||||
self._unplug(port_id)
|
|
||||||
|
|
||||||
def _get_src_transport(self):
|
|
||||||
conf = self.conf
|
|
||||||
|
|
||||||
conf.register_opts(transport._transport_opts)
|
|
||||||
rpc_backend = conf.rpc_backend
|
|
||||||
if conf.transport_url is not None:
|
|
||||||
src_url = conf.transport_url
|
|
||||||
elif (rpc_backend.endswith('.impl_kombu') or
|
|
||||||
rpc_backend.endswith('.impl_rabbit')):
|
|
||||||
from oslo.messaging._drivers import impl_rabbit
|
|
||||||
conf.register_opts(impl_rabbit.rabbit_opts)
|
|
||||||
src_url = 'rabbit://%s:%s@%s:%s/%s' % (
|
|
||||||
conf.rabbit_userid, conf.rabbit_password,
|
|
||||||
conf.rabbit_host, conf.rabbit_port,
|
|
||||||
conf.rabbit_virtual_host)
|
|
||||||
elif rpc_backend.endswith('.impl_qpid'):
|
|
||||||
from oslo.messaging._drivers import impl_qpid
|
|
||||||
conf.register_opts(impl_qpid.qpid_opts)
|
|
||||||
src_url = 'qpid://%s:%s@%s:%s/' % (
|
|
||||||
conf.pid_username, conf.qpid_password,
|
|
||||||
conf.qpid_hostname, conf.qpid_port)
|
|
||||||
elif rpc_backend.endswith('.impl_zmq'):
|
|
||||||
from oslo.messaging._drivers import impl_zmq
|
|
||||||
conf.register_opts(impl_zmq.zmq_opts)
|
|
||||||
src_url = 'zmq://%s:%s/' % (conf.rpc_zmq_host, conf.rpc_zmq_port)
|
|
||||||
elif rpc_backend.endswith('.impl_fake'):
|
|
||||||
src_url = 'fake:///'
|
|
||||||
else:
|
|
||||||
raise NotImplementedError(
|
|
||||||
_('rpc_backend %s is not supported') % rpc_backend)
|
|
||||||
|
|
||||||
self._src_transport = messaging.get_transport(conf, src_url)
|
|
||||||
|
|
||||||
def __del__(self):
|
|
||||||
if self._src_transport is not None:
|
|
||||||
self._src_transport.cleanup()
|
|
||||||
|
|
||||||
# def create_device(self, context, device):
|
|
||||||
# LOG.debug(_('create_device %s'), device)
|
|
||||||
|
|
||||||
# def update_device(self, context, device):
|
|
||||||
# LOG.debug(_('update_device %s'), device)
|
|
||||||
|
|
||||||
# def delete_device(self, context, device):
|
|
||||||
# LOG.debug(_('delete_device %s'), device)
|
|
||||||
|
|
||||||
# def create_service(self, context, device, service_instance):
|
|
||||||
# LOG.debug(_('create_service %(device)s %(service_instance)s'),
|
|
||||||
# {'device': device, 'service_instance': service_instance})
|
|
||||||
|
|
||||||
# def update_service(self, context, device, service_instance):
|
|
||||||
# LOG.debug(_('update_service %(device)s %(service_instance)s'),
|
|
||||||
# {'device': device, 'service_instance': service_instance})
|
|
||||||
|
|
||||||
# def delete_service(self, context, device, service_instance):
|
|
||||||
# LOG.debug(_('delete_service %(device)s %(service_instance)s'),
|
|
||||||
# {'device': device, 'service_instance': service_instance})
|
|
||||||
|
|
||||||
# TODO(yamahata): copied from loadbalancer/drivers/haproxy/namespace_driver
|
|
||||||
# consolidate it.
|
|
||||||
def _plug(self, port_config):
|
|
||||||
vif_driver = self._vif_driver
|
|
||||||
namespace = self._get_ns_name(port_config['id'])
|
|
||||||
interface_name = vif_driver.get_device_name(
|
|
||||||
namespace_driver.Wrap(port_config))
|
|
||||||
|
|
||||||
if not ip_lib.device_exists(interface_name, self.root_helper,
|
|
||||||
namespace):
|
|
||||||
vif_driver.plug(
|
|
||||||
port_config['network_id'], port_config['id'], interface_name,
|
|
||||||
port_config['mac_address'], namespace=namespace)
|
|
||||||
cidrs = [
|
|
||||||
'%s/%s' % (ip['ip_address'],
|
|
||||||
netaddr.IPNetwork(ip['subnet']['cidr']).prefixlen)
|
|
||||||
for ip in port_config['fixed_ips']
|
|
||||||
]
|
|
||||||
vif_driver.init_l3(interface_name, cidrs, namespace=namespace)
|
|
||||||
|
|
||||||
gw_ip = port_config['fixed_ips'][0]['subnet'].get('gateway_ip')
|
|
||||||
if gw_ip:
|
|
||||||
cmd = ['route', 'add', 'default', 'gw', gw_ip]
|
|
||||||
ip_wrapper = ip_lib.IPWrapper(self.root_helper,
|
|
||||||
namespace=namespace)
|
|
||||||
ip_wrapper.netns.execute(cmd, check_exit_code=False)
|
|
||||||
|
|
||||||
def _unplug(self, port_id):
|
|
||||||
port_stub = {'id': port_id}
|
|
||||||
namespace = self._get_ns_name(port_id)
|
|
||||||
vif_driver = self._vif_driver
|
|
||||||
interface_name = vif_driver.get_device_name(
|
|
||||||
namespace_driver.Wrap(port_stub))
|
|
||||||
vif_driver.unplug(interface_name, namespace=namespace)
|
|
||||||
|
|
||||||
@lockutils.synchronized('servicevm-agent', 'tacker-')
|
|
||||||
def create_namespace_agent(self, context, port):
|
|
||||||
conf = self.conf
|
|
||||||
port_id = port['id']
|
|
||||||
path = 'rpc-proxy-%s' % port_id
|
|
||||||
unix_url = 'punix:///%s' % path
|
|
||||||
unix_transport = messaging.get_transport(conf, unix_url)
|
|
||||||
unix_transport._driver.punix_listening.wait()
|
|
||||||
|
|
||||||
self._plug(port)
|
|
||||||
pm = external_process.ProcessManager(
|
|
||||||
conf, port_id, root_helper=self.root_helper,
|
|
||||||
namespace=self._get_ns_name(port_id))
|
|
||||||
|
|
||||||
def cmd_callback(pid_file_name):
|
|
||||||
cmd = ['tacker-servicevm-ns-rpc-proxy',
|
|
||||||
'--pid-file=%s' % pid_file_name,
|
|
||||||
'--svcvm-proxy-dir=%s' % conf.svcvm_proxy_dir,
|
|
||||||
'--src-transport-url', 'unix:///%s' % path]
|
|
||||||
cmd.extend(agent_config.get_log_args(
|
|
||||||
conf, 'tacker-servicevm-ns-rpc-proxy-%s.log' % port_id))
|
|
||||||
if _DEBUG:
|
|
||||||
cmd += ['--log-file=/tmp/tacker-servicevm-ns-rpc-proxy-'
|
|
||||||
'%s.log' % port_id]
|
|
||||||
return cmd
|
|
||||||
pm.enable(cmd_callback)
|
|
||||||
|
|
||||||
ns_agent = NamespaceAgent(port_id, unix_transport, pm)
|
|
||||||
self._proxy_agents[port_id] = ns_agent
|
|
||||||
|
|
||||||
@lockutils.synchronized('servicevm-agent', 'tacker-')
|
|
||||||
def destroy_namespace_agent(self, context, port_id):
|
|
||||||
ns_agent = self._proxy_agents.pop(port_id)
|
|
||||||
ns_agent.api.destroy_namespace_agent()
|
|
||||||
for proxy_server in ns_agent.local_proxies.values():
|
|
||||||
proxy_server.stop()
|
|
||||||
for proxy_server in ns_agent.local_proxies.values():
|
|
||||||
proxy_server.wait()
|
|
||||||
ns_agent.pm.disable()
|
|
||||||
self._unplug(port_id)
|
|
||||||
|
|
||||||
def _create_rpc_proxy(self, ns_agent, src_transport, src_target,
|
|
||||||
dst_transport, dst_target):
|
|
||||||
rpc_proxy_id = str(uuid.uuid4())
|
|
||||||
src_target = target.target_parse(src_target)
|
|
||||||
assert src_target.server
|
|
||||||
dst_target = target.target_parse(dst_target)
|
|
||||||
assert dst_target.server
|
|
||||||
proxy_server = proxy.get_proxy_server(
|
|
||||||
src_transport, src_target, None,
|
|
||||||
dst_transport, dst_target, None, executor='eventlet')
|
|
||||||
ns_agent.local_proxies[rpc_proxy_id] = proxy_server
|
|
||||||
proxy_server.start()
|
|
||||||
return rpc_proxy_id
|
|
||||||
|
|
||||||
def _get_proxy_agent(self, port_id):
|
|
||||||
ns_agent = self._proxy_agents.get(port_id)
|
|
||||||
if ns_agent is None:
|
|
||||||
msg = _('unknown port_id %s') % port_id
|
|
||||||
LOG.error(msg)
|
|
||||||
raise RuntimeError(msg)
|
|
||||||
return ns_agent
|
|
||||||
|
|
||||||
@lockutils.synchronized('servicevm-agent', 'tacker-')
|
|
||||||
def create_rpc_proxy(self, context, port_id,
|
|
||||||
src_target, dst_unix_target, direction):
|
|
||||||
ns_agent = self._get_proxy_agent(port_id)
|
|
||||||
if direction == 'send':
|
|
||||||
return self._create_rpc_proxy(
|
|
||||||
ns_agent, self._src_transport, src_target,
|
|
||||||
ns_agent.unix_transport, dst_unix_target)
|
|
||||||
elif direction == 'receive':
|
|
||||||
return self._create_rpc_proxy(
|
|
||||||
ns_agent, ns_agent.unix_transport, dst_unix_target,
|
|
||||||
self._src_transport, src_target)
|
|
||||||
else:
|
|
||||||
msg = _('unknown direction %s') % direction
|
|
||||||
LOG.error(msg)
|
|
||||||
raise RuntimeError(msg)
|
|
||||||
|
|
||||||
@lockutils.synchronized('servicevm-agent', 'tacker-')
|
|
||||||
def destroy_rpc_proxy(self, context, port_id, rpc_proxy_id):
|
|
||||||
ns_agent = self._get_proxy_agent(port_id)
|
|
||||||
proxy_server = ns_agent.local_proxies.pop(rpc_proxy_id)
|
|
||||||
proxy_server.stop()
|
|
||||||
proxy_server.wait()
|
|
||||||
|
|
||||||
@lockutils.synchronized('servicevm-agent', 'tacker-')
|
|
||||||
def create_rpc_namespace_proxy(self, context, port_id, src_target,
|
|
||||||
dst_transport_url, dst_target, direction):
|
|
||||||
ns_agent = self._get_proxy_agent(port_id)
|
|
||||||
ns_proxy_id = ns_agent.api.create_rpc_namespace_proxy(
|
|
||||||
src_target, dst_transport_url, dst_target, direction)
|
|
||||||
LOG.debug("create_rpc_namespace_proxy %s", ns_proxy_id)
|
|
||||||
return ns_proxy_id
|
|
||||||
|
|
||||||
@lockutils.synchronized('servicevm-agent', 'tacker-')
|
|
||||||
def destroy_rpc_namespace_proxy(self, context,
|
|
||||||
port_id, namespace_proxy_id):
|
|
||||||
ns_agent = self._get_proxy_agent(port_id)
|
|
||||||
return ns_agent.api.destroy_rpc_namespace_proxy(namespace_proxy_id)
|
|
||||||
|
|
||||||
|
|
||||||
class ServiceVMAgentWithStateReport(ServiceVMAgent):
|
|
||||||
# TODO(yamahata)
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def _register_options(conf):
|
|
||||||
conf.register_opts(interface.OPTS)
|
|
||||||
agent_config.register_interface_driver_opts_helper(conf)
|
|
||||||
agent_config.register_agent_state_opts_helper(conf)
|
|
||||||
agent_config.register_root_helper(conf)
|
|
||||||
conf.register_opts(vm_config.OPTS)
|
|
||||||
conf.register_opts(impl_unix.unix_opts)
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
conf = cfg.CONF
|
|
||||||
|
|
||||||
# NOTE(yamahata): work around. rpc driver-dependent config variables
|
|
||||||
# remove this line once tacker are fully ported to oslo.messaging
|
|
||||||
from tacker.openstack.common import rpc
|
|
||||||
conf.unregister_opts(rpc.rpc_opts)
|
|
||||||
|
|
||||||
# NOTE(yamahata): corresponds to
|
|
||||||
# tacker.common.config.rpc.set_default(control_exchange='tacker')
|
|
||||||
messaging.set_transport_defaults('tacker')
|
|
||||||
|
|
||||||
_register_options(conf)
|
|
||||||
conf(project='tacker')
|
|
||||||
config.setup_logging(conf)
|
|
||||||
legacy.modernize_quantum_config(conf)
|
|
||||||
# NOTE(yamahata): workaround for state_path
|
|
||||||
# oslo.messaging doesn't know state_path
|
|
||||||
conf.set_override('rpc_unix_ipc_dir', conf.svcvm_proxy_dir)
|
|
||||||
|
|
||||||
server = oslo_service.TackerService.create(
|
|
||||||
topic=topics.SERVICEVM_AGENT,
|
|
||||||
manager='tacker.vm.agent.agent.ServiceVMAgentWithStateReport',
|
|
||||||
report_interval=conf.AGENT.report_interval,
|
|
||||||
conf=conf)
|
|
||||||
service.launch(server).wait()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
@ -1,31 +0,0 @@
|
|||||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
||||||
#
|
|
||||||
# Copyright 2013, 2014 Intel Corporation.
|
|
||||||
# Copyright 2013, 2014 Isaku Yamahata <isaku.yamahata at intel com>
|
|
||||||
# <isaku.yamahata at gmail com>
|
|
||||||
# All Rights Reserved.
|
|
||||||
#
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
#
|
|
||||||
# @author: Isaku Yamahata, Intel Corporation.
|
|
||||||
|
|
||||||
from oslo.config import cfg
|
|
||||||
|
|
||||||
|
|
||||||
OPTS = [
|
|
||||||
cfg.StrOpt('svcvm-proxy-dir',
|
|
||||||
default='$state_path/svcvm_proxy_dir',
|
|
||||||
help=_('Location for servicevm agent proxy '
|
|
||||||
'UNIX domain socket')),
|
|
||||||
]
|
|
@ -1,340 +0,0 @@
|
|||||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
||||||
#
|
|
||||||
# Copyright 2013, 2014 Intel Corporation.
|
|
||||||
# Copyright 2013, 2014 Isaku Yamahata <isaku.yamahata at intel com>
|
|
||||||
# <isaku.yamahata at gmail com>
|
|
||||||
# All Rights Reserved.
|
|
||||||
#
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
#
|
|
||||||
# @author: Isaku Yamahata, Intel Corporation.
|
|
||||||
|
|
||||||
import eventlet
|
|
||||||
eventlet.monkey_patch()
|
|
||||||
|
|
||||||
|
|
||||||
import os.path
|
|
||||||
import random
|
|
||||||
import sys
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
from oslo.config import cfg
|
|
||||||
from oslo import messaging
|
|
||||||
from oslo.messaging._drivers import impl_unix
|
|
||||||
from oslo.messaging import proxy
|
|
||||||
from oslo.messaging import rpc
|
|
||||||
|
|
||||||
from tacker.agent.common import config as agent_config
|
|
||||||
from tacker.agent.linux import daemon
|
|
||||||
from tacker.common import config
|
|
||||||
from tacker.common import legacy
|
|
||||||
from tacker.common import topics
|
|
||||||
from tacker.common import utils
|
|
||||||
from tacker import context
|
|
||||||
from tacker import manager
|
|
||||||
from tacker.openstack.common import importutils
|
|
||||||
from tacker.openstack.common import lockutils
|
|
||||||
from tacker.openstack.common import log as logging
|
|
||||||
from tacker.openstack.common import service
|
|
||||||
from tacker import oslo_service
|
|
||||||
from tacker.vm.agent import config as vm_config
|
|
||||||
from tacker.vm.agent import target
|
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class NamespaceProxies(object):
|
|
||||||
def __init__(self):
|
|
||||||
super(NamespaceProxies, self).__init__()
|
|
||||||
self.urls = {} # dict: transport_url -> transport
|
|
||||||
self.transports = {} # dict: transport -> transport_url
|
|
||||||
self.proxies = {} # uuid -> (transport, proxy_server)
|
|
||||||
self.transport_to_proxies = {} # transport -> set of proxy_servers
|
|
||||||
|
|
||||||
def get_transport(self, transport_url):
|
|
||||||
return self.urls.get(transport_url, None)
|
|
||||||
|
|
||||||
def add_transport(self, transport_url, transport):
|
|
||||||
assert transport_url not in self.urls
|
|
||||||
assert transport not in self.transports
|
|
||||||
self.transports[transport_url] = transport
|
|
||||||
self.urls[transport] = transport_url
|
|
||||||
|
|
||||||
def del_proxy(self, namespace_proxy_id):
|
|
||||||
transport, proxy_server = self.proxies.pop(namespace_proxy_id)
|
|
||||||
proxies = self.transport_to_proxies[transport]
|
|
||||||
proxies.remove(proxy_server)
|
|
||||||
if proxies:
|
|
||||||
transport = None
|
|
||||||
else:
|
|
||||||
transport_url = self.urls.pop(transport)
|
|
||||||
del self.transports[transport_url]
|
|
||||||
del self.transport_to_proxies[transport]
|
|
||||||
return (transport, proxy_server)
|
|
||||||
|
|
||||||
def add_proxy(self, namespace_proxy_id, transport, proxy_server):
|
|
||||||
assert namespace_proxy_id not in self.proxies
|
|
||||||
self.proxies[namespace_proxy_id] = (transport, proxy_server)
|
|
||||||
proxies = self.transport_to_proxies.setdefault(transport, set())
|
|
||||||
proxies.add(proxy_server)
|
|
||||||
|
|
||||||
|
|
||||||
class ServiceVMNamespaceAgent(manager.Manager):
|
|
||||||
def __init__(self, host=None, **kwargs):
|
|
||||||
LOG.debug(_('host %(host)s, kwargs %(kwargs)s'),
|
|
||||||
{'host': host, 'kwargs': kwargs})
|
|
||||||
super(ServiceVMNamespaceAgent, self).__init__(host=host)
|
|
||||||
|
|
||||||
for key in ('conf', 'src_transport', 'server_stop', ):
|
|
||||||
setattr(self, key, kwargs[key])
|
|
||||||
assert self.src_transport is not None
|
|
||||||
assert self.server_stop is not None
|
|
||||||
|
|
||||||
self._proxies = NamespaceProxies()
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
LOG.debug('stop')
|
|
||||||
self.server_stop()
|
|
||||||
ns_proxies = self._proxies
|
|
||||||
for _transport, proxy_server in ns_proxies.proxies.values():
|
|
||||||
proxy_server.stop()
|
|
||||||
|
|
||||||
def wait(self):
|
|
||||||
LOG.debug('wait')
|
|
||||||
ns_proxies = self._proxies
|
|
||||||
for _transport, proxy_server in ns_proxies.proxies.values():
|
|
||||||
proxy_server.wait()
|
|
||||||
for transport in ns_proxies.transports.values():
|
|
||||||
transport.cleanup()
|
|
||||||
|
|
||||||
@lockutils.synchronized('servicevm-namespace-agent', 'tacker-')
|
|
||||||
def destroy_namespace_agent(self, context):
|
|
||||||
self.stop()
|
|
||||||
|
|
||||||
def _create_rpc_namespace_proxy(self, src_transport, src_target,
|
|
||||||
dst_transport, dst_target):
|
|
||||||
src_target = target.target_parse(src_target)
|
|
||||||
assert src_target.server
|
|
||||||
dst_target = target.target_parse(dst_target)
|
|
||||||
assert dst_target.server
|
|
||||||
return proxy.get_proxy_server(
|
|
||||||
src_transport, src_target, None,
|
|
||||||
dst_transport, dst_target, None, executor='eventlet')
|
|
||||||
|
|
||||||
@lockutils.synchronized('servicevm-namespace-agent', 'tacker-')
|
|
||||||
def create_rpc_namespace_proxy(self, context, src_target,
|
|
||||||
dst_transport_url, dst_target, direction):
|
|
||||||
LOG.debug('create_rpc_namespace_proxy %s %s %s %s %s',
|
|
||||||
context, src_target, dst_transport_url, dst_target,
|
|
||||||
direction)
|
|
||||||
dst_transport = self._proxies.get_transport(dst_transport_url)
|
|
||||||
if dst_transport is None:
|
|
||||||
dst_transport = messaging.get_transport(self.conf,
|
|
||||||
dst_transport_url)
|
|
||||||
self._proxies.add_transport(dst_transport_url, dst_transport)
|
|
||||||
if direction == 'send':
|
|
||||||
proxy_server = self._create_rpc_namespace_proxy(
|
|
||||||
self.src_transport, src_target, dst_transport, dst_target)
|
|
||||||
elif direction == 'receive':
|
|
||||||
proxy_server = self._create_rpc_namespace_proxy(
|
|
||||||
dst_transport, dst_target, self.src_transport, src_target)
|
|
||||||
else:
|
|
||||||
msg = _('unknown direction %s') % direction
|
|
||||||
LOG.error(msg)
|
|
||||||
raise RuntimeError(msg)
|
|
||||||
|
|
||||||
# proxy_server.start()
|
|
||||||
eventlet.spawn(proxy_server.start)
|
|
||||||
namespace_proxy_id = str(uuid.uuid4())
|
|
||||||
self._proxies.add_proxy(namespace_proxy_id,
|
|
||||||
dst_transport, proxy_server)
|
|
||||||
LOG.debug('namespace_proxy_id %s', namespace_proxy_id)
|
|
||||||
return namespace_proxy_id
|
|
||||||
|
|
||||||
@lockutils.synchronized('servicevm-namespace-agent', 'tacker-')
|
|
||||||
def destroy_rpc_namespace_proxy(self, context, namespace_proxy_id):
|
|
||||||
LOG.debug('namespace_proxy_id %s', namespace_proxy_id)
|
|
||||||
try:
|
|
||||||
transport, proxy_server = self._proxies.del_proxy(
|
|
||||||
namespace_proxy_id)
|
|
||||||
except KeyError:
|
|
||||||
return
|
|
||||||
proxy_server.stop()
|
|
||||||
proxy_server.wait()
|
|
||||||
if transport is not None:
|
|
||||||
transport.cleanup()
|
|
||||||
|
|
||||||
|
|
||||||
# TODO(yamahata): class Service is stolen from nova.service and modified.
|
|
||||||
# port tacker to oslo.messaging and delete this class.
|
|
||||||
class Service(service.Service):
|
|
||||||
def __init__(self, conf, host, binary, topic, manager_,
|
|
||||||
report_interval=None, periodic_enable=None,
|
|
||||||
periodic_fuzzy_delay=None, periodic_interval_max=None,
|
|
||||||
*args, **kwargs):
|
|
||||||
super(Service, self).__init__()
|
|
||||||
self.conf = conf
|
|
||||||
self.host = host
|
|
||||||
self.binary = binary
|
|
||||||
self.topic = topic
|
|
||||||
self.manager_class_name = manager_
|
|
||||||
manager_class = importutils.import_class(self.manager_class_name)
|
|
||||||
kwargs_ = kwargs.copy()
|
|
||||||
kwargs_['conf'] = conf
|
|
||||||
self.manager = manager_class(host=self.host, *args, **kwargs_)
|
|
||||||
self.src_transport = kwargs['src_transport']
|
|
||||||
self.rpcserver = None
|
|
||||||
self.report_interval = report_interval
|
|
||||||
self.periodic_enable = periodic_enable
|
|
||||||
self.periodic_fuzzy_delay = periodic_fuzzy_delay
|
|
||||||
self.periodic_interval_max = periodic_interval_max
|
|
||||||
self.saved_args, self.saved_kwargs = args, kwargs
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
self.manager.init_host()
|
|
||||||
LOG.debug(_("Creating RPC server for service %(topic)s %(driver)s"),
|
|
||||||
{'topic': self.topic, 'driver': self.src_transport._driver})
|
|
||||||
|
|
||||||
target = messaging.Target(topic=self.topic, server=self.host)
|
|
||||||
endpoints = [self.manager]
|
|
||||||
self.rpcserver = rpc.get_rpc_server(self.src_transport, target,
|
|
||||||
endpoints, executor='eventlet')
|
|
||||||
self.rpcserver.start()
|
|
||||||
|
|
||||||
if self.periodic_enable:
|
|
||||||
if self.periodic_fuzzy_delay:
|
|
||||||
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
|
|
||||||
else:
|
|
||||||
initial_delay = None
|
|
||||||
|
|
||||||
self.tg.add_dynamic_timer(
|
|
||||||
self.periodic_tasks, initial_delay=initial_delay,
|
|
||||||
periodic_interval_max=self.periodic_interval_max)
|
|
||||||
self.manager.after_start()
|
|
||||||
LOG.debug('start done')
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def create(cls, conf, src_transport,
|
|
||||||
host=None, binary=None, topic=None, manager_=None, **kwargs):
|
|
||||||
if not host:
|
|
||||||
host = conf.host
|
|
||||||
if not binary:
|
|
||||||
binary = os.path.basename(sys.argv[0])
|
|
||||||
if not topic:
|
|
||||||
topic = binary.rpartition('tacker-')[2]
|
|
||||||
topic = topic.replace('-', '_')
|
|
||||||
if not manager_:
|
|
||||||
manager_ = conf.get('%s_manager' % topic, None)
|
|
||||||
service_obj = cls(conf, host, binary, topic, manager_,
|
|
||||||
src_transport=src_transport, **kwargs)
|
|
||||||
return service_obj
|
|
||||||
|
|
||||||
def kill(self):
|
|
||||||
self.stop()
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
try:
|
|
||||||
self.rpcserver.stop()
|
|
||||||
self.manager.stop()
|
|
||||||
except Exception:
|
|
||||||
LOG.exception(_('failed to stop rpcserver'))
|
|
||||||
|
|
||||||
super(Service, self).stop()
|
|
||||||
|
|
||||||
def wait(self):
|
|
||||||
try:
|
|
||||||
self.rpcserver.wait()
|
|
||||||
self.manager.wait()
|
|
||||||
except Exception:
|
|
||||||
LOG.exception(_('failed to wait rpcserver'))
|
|
||||||
|
|
||||||
super(Service, self).wait()
|
|
||||||
|
|
||||||
def periodic_tasks(self, raise_on_error=False):
|
|
||||||
"""Tasks to be run at a periodic interval."""
|
|
||||||
ctxt = context.get_admin_context()
|
|
||||||
return self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
|
|
||||||
|
|
||||||
def report_state(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class ProxyDaemon(daemon.Daemon):
|
|
||||||
def __init__(self, conf):
|
|
||||||
self._conf = conf
|
|
||||||
super(ProxyDaemon, self).__init__(conf.pid_file, uuid=conf.port_id)
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
conf = self._conf
|
|
||||||
|
|
||||||
def server_stop():
|
|
||||||
server.stop()
|
|
||||||
LOG.debug(_('src transport url %s'), conf.src_transport_url)
|
|
||||||
src_transport = messaging.get_transport(
|
|
||||||
conf, conf.src_transport_url,
|
|
||||||
aliases=oslo_service.TRANSPORT_ALIASES)
|
|
||||||
server = Service.create(
|
|
||||||
conf=conf, topic=topics.SERVICEVM_AGENT_NAMEPSACE,
|
|
||||||
manager_=('tacker.vm.agent.namespace_proxy.'
|
|
||||||
'ServiceVMNamespaceAgent'),
|
|
||||||
src_transport=src_transport, server_stop=server_stop)
|
|
||||||
service.launch(server).wait()
|
|
||||||
src_transport.cleanup()
|
|
||||||
|
|
||||||
|
|
||||||
def _register_options(conf):
|
|
||||||
cli_opts = [
|
|
||||||
cfg.StrOpt('pid-file', help=_('pid file of this process.')),
|
|
||||||
cfg.StrOpt('port-id', help=_('uuid of port')),
|
|
||||||
cfg.StrOpt('src-transport-url', help='src transport url'),
|
|
||||||
cfg.BoolOpt('daemonize', default=True, help=_('Run as daemon'))
|
|
||||||
]
|
|
||||||
conf.register_cli_opts(cli_opts)
|
|
||||||
agent_config.register_agent_state_opts_helper(conf)
|
|
||||||
agent_config.register_root_helper(conf)
|
|
||||||
conf.register_cli_opts(vm_config.OPTS)
|
|
||||||
conf.register_opts(impl_unix.unix_opts)
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
conf = cfg.CONF
|
|
||||||
|
|
||||||
# NOTE(yamahata): work around. rpc driver-dependent config variables
|
|
||||||
# remove this line once tacker are fully ported to oslo.messaging
|
|
||||||
from tacker.openstack.common import rpc
|
|
||||||
conf.unregister_opts(rpc.rpc_opts)
|
|
||||||
|
|
||||||
# NOTE(yamahata): corresponds to
|
|
||||||
# tacker.common.config.rpc.set_default(control_exchange='tacker')
|
|
||||||
messaging.set_transport_defaults('tacker')
|
|
||||||
|
|
||||||
_register_options(conf)
|
|
||||||
conf(project='tacker')
|
|
||||||
config.setup_logging(conf)
|
|
||||||
legacy.modernize_quantum_config(conf)
|
|
||||||
# NOTE(yamahata): workaround for state_path
|
|
||||||
# oslo.messaging doesn't know state_path
|
|
||||||
conf.set_override('rpc_unix_ipc_dir', conf.svcvm_proxy_dir)
|
|
||||||
utils.log_opt_values(LOG)
|
|
||||||
|
|
||||||
proxy = ProxyDaemon(conf)
|
|
||||||
if conf.daemonize:
|
|
||||||
proxy.start()
|
|
||||||
else:
|
|
||||||
proxy.run()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
@ -1,45 +0,0 @@
|
|||||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
||||||
#
|
|
||||||
# Copyright 2013, 2014 Intel Corporation.
|
|
||||||
# Copyright 2013, 2014 Isaku Yamahata <isaku.yamahata at intel com>
|
|
||||||
# <isaku.yamahata at gmail com>
|
|
||||||
# All Rights Reserved.
|
|
||||||
#
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
#
|
|
||||||
# @author: Isaku Yamahata, Intel Corporation.
|
|
||||||
|
|
||||||
|
|
||||||
from oslo.messaging import target
|
|
||||||
|
|
||||||
|
|
||||||
_KEYS = ['exchange', 'topic', 'namespace', 'version', 'server', 'fanout']
|
|
||||||
_BOOLEAN_STATES = {'1': True, 'yes': True, 'true': True, 'on': True,
|
|
||||||
'0': False, 'no': False, 'false': False, 'off': False}
|
|
||||||
|
|
||||||
|
|
||||||
def target_parse(target_str):
|
|
||||||
attrs = target_str.split(',')
|
|
||||||
kwargs = dict(attr.split('=', 1) for attr in attrs)
|
|
||||||
if 'fanout' in kwargs:
|
|
||||||
# should use oslo.config.types.Bool.__call__ ?
|
|
||||||
value = kwargs['fanout']
|
|
||||||
kwargs['fanout'] = _BOOLEAN_STATES[value.lower()]
|
|
||||||
return target.Target(**kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
def target_str(target):
|
|
||||||
attrs = [(key, getattr(target, key))
|
|
||||||
for key in _KEYS if getattr(target, key) is not None]
|
|
||||||
return ','.join('%s=%s' % attr for attr in attrs)
|
|
@ -1,46 +0,0 @@
|
|||||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
||||||
#
|
|
||||||
# Copyright 2013, 2014 Intel Corporation.
|
|
||||||
# Copyright 2013, 2014 Isaku Yamahata <isaku.yamahata at intel com>
|
|
||||||
# <isaku.yamahata at gmail com>
|
|
||||||
# All Rights Reserved.
|
|
||||||
#
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
#
|
|
||||||
# @author: Isaku Yamahata, Intel Corporation.
|
|
||||||
|
|
||||||
# TODO(yamahata): consolidate with l3-agent, lbaas-agent once
|
|
||||||
# agent consolidation is done.
|
|
||||||
# https://blueprints.launchpad.net/tacker/+spec/l3-agent-consolication
|
|
||||||
|
|
||||||
import abc
|
|
||||||
import six
|
|
||||||
|
|
||||||
|
|
||||||
# TODO(yamahata)
|
|
||||||
# communicate with service vm
|
|
||||||
@six.add_metaclass(abc.ABCMeta)
|
|
||||||
class ServiceVMAgentBase(object):
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def create(self, info):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def delete(self, info):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def update(self, info):
|
|
||||||
pass
|
|
@ -1,34 +0,0 @@
|
|||||||
# Copyright 2014 Intel Corporation.
|
|
||||||
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
|
|
||||||
# <isaku.yamahata at gmail com>
|
|
||||||
# All Rights Reserved.
|
|
||||||
#
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
#
|
|
||||||
# @author: Isaku Yamahata, Intel Corporation.
|
|
||||||
|
|
||||||
from oslo.config import cfg
|
|
||||||
|
|
||||||
from tacker.common import topics
|
|
||||||
|
|
||||||
|
|
||||||
_RPC_AGENT_OPTS = [
|
|
||||||
cfg.StrOpt('device_id', default=None, help=_('The device id')),
|
|
||||||
cfg.StrOpt('topic', default=topics.SERVICEVM_AGENT,
|
|
||||||
help=_('rpc topic for agent to subscribe')),
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
def register_servicevm_agent_opts(conf):
|
|
||||||
conf.register_opts(_RPC_AGENT_OPTS, group='servicevm_agent')
|
|
@ -1,181 +0,0 @@
|
|||||||
# Copyright 2014 Intel Corporation.
|
|
||||||
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
|
|
||||||
# <isaku.yamahata at gmail com>
|
|
||||||
# All Rights Reserved.
|
|
||||||
#
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
#
|
|
||||||
# @author: Isaku Yamahata, Intel Corporation.
|
|
||||||
|
|
||||||
from oslo.config import cfg
|
|
||||||
|
|
||||||
from tacker.db.vm import proxy_db
|
|
||||||
from tacker.openstack.common import log as logging
|
|
||||||
from tacker.vm import constants
|
|
||||||
from tacker.vm.mgmt_drivers.rpc import rpc
|
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class AgentRpcProxyMGMTDriver(rpc.AgentRpcMGMTDriver):
|
|
||||||
_TRANSPORT_OPTS = [
|
|
||||||
cfg.StrOpt('dst_transport_url',
|
|
||||||
# TODO(yamahata): make user, pass, port configurable
|
|
||||||
# per servicevm
|
|
||||||
#'<scheme>://<user>:<pass>@<host>:<port>/'
|
|
||||||
default='rabbit://guest:guest@%(host)s:5672/',
|
|
||||||
help='A URL representing the messaging driver '
|
|
||||||
'to use and its full configuration.'),
|
|
||||||
]
|
|
||||||
|
|
||||||
def __init__(self, conf=None):
|
|
||||||
super(AgentRpcProxyMGMTDriver, self).__init__()
|
|
||||||
self.db = proxy_db.RpcProxyDb()
|
|
||||||
self.conf = conf or cfg.CONF
|
|
||||||
self.conf.register_opts(self._TRANSPORT_OPTS)
|
|
||||||
|
|
||||||
def get_type(self):
|
|
||||||
return 'agent-proxy'
|
|
||||||
|
|
||||||
def get_name(self):
|
|
||||||
return 'agent-proxy'
|
|
||||||
|
|
||||||
def get_description(self):
|
|
||||||
return 'agent-proxy'
|
|
||||||
|
|
||||||
def mgmt_create_post(self, plugin, context, device):
|
|
||||||
LOG.debug('mgmt_create_post')
|
|
||||||
mgmt_entries = [sc_entry for sc_entry in device['service_context']
|
|
||||||
if (sc_entry['role'] == constants.ROLE_MGMT and
|
|
||||||
sc_entry.get('port_id'))]
|
|
||||||
assert mgmt_entries
|
|
||||||
mgmt_entry = mgmt_entries[0]
|
|
||||||
|
|
||||||
vm_port_id = mgmt_entry['port_id']
|
|
||||||
vm_port = plugin._core_plugin.get_port(context, vm_port_id)
|
|
||||||
fixed_ip = vm_port['fixed_ips'][0]['ip_address']
|
|
||||||
# TODO(yamahata): make all parameters(scheme, user, pass, port)
|
|
||||||
# configurable
|
|
||||||
dst_transport_url = self.conf.dst_transport_url % {'host': fixed_ip}
|
|
||||||
|
|
||||||
network_id = mgmt_entry['network_id']
|
|
||||||
assert network_id
|
|
||||||
|
|
||||||
proxy_api = plugin.proxy_api
|
|
||||||
port_id = proxy_api.create_namespace_agent(plugin._core_plugin,
|
|
||||||
context, network_id)
|
|
||||||
|
|
||||||
device_id = device['id']
|
|
||||||
target = 'topic=%s,server=%s' % (self._mgmt_topic(device),
|
|
||||||
self._mgmt_server(device))
|
|
||||||
svr_proxy_id = proxy_api.create_rpc_proxy(
|
|
||||||
context, port_id, target, target, 'receive')
|
|
||||||
LOG.debug('mgmt_create_post: svr_proxy_id: %s', svr_proxy_id)
|
|
||||||
svr_ns_proxy_id = proxy_api.create_rpc_namespace_proxy(
|
|
||||||
context, port_id, target, dst_transport_url, target, 'receive')
|
|
||||||
LOG.debug('mgmt_create_post: svr_ns_proxy_id: %s', svr_ns_proxy_id)
|
|
||||||
clt_proxy_id = proxy_api.create_rpc_proxy(
|
|
||||||
context, port_id, target, target, 'send')
|
|
||||||
LOG.debug('mgmt_create_post: clt_proxy_id: %s', clt_proxy_id)
|
|
||||||
clt_ns_proxy_id = proxy_api.create_rpc_namespace_proxy(
|
|
||||||
context, port_id, target, dst_transport_url, target, 'send')
|
|
||||||
LOG.debug('mgmt_create_post: clt_ns_proxy_id: %s', clt_ns_proxy_id)
|
|
||||||
|
|
||||||
LOG.debug('mgmt_create_ppost: '
|
|
||||||
'svr: %s svr_ns: %s clt: %s clt_ns: %s ',
|
|
||||||
svr_proxy_id, svr_ns_proxy_id, clt_proxy_id, clt_ns_proxy_id)
|
|
||||||
self.db.create_proxy_mgmt_port(
|
|
||||||
context, device_id, port_id, dst_transport_url,
|
|
||||||
svr_proxy_id, svr_ns_proxy_id, clt_proxy_id, clt_ns_proxy_id)
|
|
||||||
|
|
||||||
def mgmt_delete_post(self, plugin, context, device):
|
|
||||||
LOG.debug('mgmt_delete_post')
|
|
||||||
device_id = device['id']
|
|
||||||
|
|
||||||
proxy_mgmt_port = self.db.get_proxy_mgmt_port(context, device_id)
|
|
||||||
port_id = proxy_mgmt_port['port_id']
|
|
||||||
svr_proxy_id = proxy_mgmt_port['svr_proxy_id']
|
|
||||||
svr_ns_proxy_id = proxy_mgmt_port['svr_ns_proxy_id']
|
|
||||||
clt_proxy_id = proxy_mgmt_port['clt_proxy_id']
|
|
||||||
clt_ns_proxy_id = proxy_mgmt_port['clt_ns_proxy_id']
|
|
||||||
|
|
||||||
proxy_api = plugin.proxy_api
|
|
||||||
proxy_api.destroy_rpc_namespace_proxy(context,
|
|
||||||
port_id, clt_ns_proxy_id)
|
|
||||||
proxy_api.destroy_rpc_proxy(context, port_id, clt_proxy_id)
|
|
||||||
proxy_api.destroy_rpc_namespace_proxy(context,
|
|
||||||
port_id, svr_ns_proxy_id)
|
|
||||||
proxy_api.destroy_rpc_proxy(context, port_id, svr_proxy_id)
|
|
||||||
proxy_api.destroy_namespace_agent(plugin._core_plugin,
|
|
||||||
context, port_id)
|
|
||||||
|
|
||||||
self.db.delete_proxy_mgmt_port(context, port_id)
|
|
||||||
|
|
||||||
def mgmt_service_create_pre(self, plugin, context, device,
|
|
||||||
service_instance):
|
|
||||||
LOG.debug('mgmt_service_create_pre')
|
|
||||||
proxy_mgmt_port = self.db.get_proxy_mgmt_port(context, device['id'])
|
|
||||||
port_id = proxy_mgmt_port['port_id']
|
|
||||||
dst_transport_url = proxy_mgmt_port['dst_transport_url']
|
|
||||||
|
|
||||||
proxy_api = plugin.proxy_api
|
|
||||||
target = 'topic=%s,server=%s' % (
|
|
||||||
self._mgmt_service_topic(device, service_instance),
|
|
||||||
self._mgmt_service_server(device, service_instance))
|
|
||||||
svr_proxy_id = proxy_api.create_rpc_proxy(
|
|
||||||
context, port_id, target, target, 'receive')
|
|
||||||
LOG.debug('mgmt_service_create_pre: svr_proxy_id: %s', svr_proxy_id)
|
|
||||||
svr_ns_proxy_id = proxy_api.create_rpc_namespace_proxy(
|
|
||||||
context, port_id, target, dst_transport_url, target, 'receive')
|
|
||||||
LOG.debug('mgmt_service_create_pre: svr_ns_proxy_id: %s',
|
|
||||||
svr_ns_proxy_id)
|
|
||||||
clt_proxy_id = proxy_api.create_rpc_proxy(
|
|
||||||
context, port_id, target, target, 'send')
|
|
||||||
LOG.debug('mgmt_service_create_pre: clt_proxy_id: %s', clt_proxy_id)
|
|
||||||
clt_ns_proxy_id = proxy_api.create_rpc_namespace_proxy(
|
|
||||||
context, port_id, target, dst_transport_url, target, 'send')
|
|
||||||
LOG.debug('mgmt_service_create_pre: clt_ns_proxy_id: %s',
|
|
||||||
clt_ns_proxy_id)
|
|
||||||
|
|
||||||
LOG.debug('mgmt_service_create_pre: '
|
|
||||||
'svr: %s svr_ns: %s clt: %s clt_ns: %s ',
|
|
||||||
svr_proxy_id, svr_ns_proxy_id, clt_proxy_id, clt_ns_proxy_id)
|
|
||||||
self.db.create_proxy_service_port(
|
|
||||||
context, service_instance['id'],
|
|
||||||
svr_proxy_id, svr_ns_proxy_id, clt_proxy_id, clt_ns_proxy_id)
|
|
||||||
|
|
||||||
def mgmt_service_delete_post(self, plugin, context, device,
|
|
||||||
service_instance):
|
|
||||||
LOG.debug('mgmt_service_delete_post')
|
|
||||||
proxy_mgmt_port = self.db.get_proxy_mgmt_port(context, device['id'])
|
|
||||||
port_id = proxy_mgmt_port['port_id']
|
|
||||||
service_instance_id = service_instance['id']
|
|
||||||
proxy_service_port = self.db.get_proxy_service_port(
|
|
||||||
context, service_instance_id)
|
|
||||||
|
|
||||||
svr_proxy_id = proxy_service_port['svr_proxy_id']
|
|
||||||
svr_ns_proxy_id = proxy_service_port['svr_ns_proxy_id']
|
|
||||||
clt_proxy_id = proxy_service_port['clt_proxy_id']
|
|
||||||
clt_ns_proxy_id = proxy_service_port['clt_ns_proxy_id']
|
|
||||||
|
|
||||||
proxy_api = plugin.proxy_api
|
|
||||||
proxy_api.destroy_rpc_namespace_proxy(context,
|
|
||||||
port_id, clt_ns_proxy_id)
|
|
||||||
proxy_api.destroy_rpc_proxy(context, port_id, clt_proxy_id)
|
|
||||||
proxy_api.destroy_rpc_namespace_proxy(context,
|
|
||||||
port_id, svr_ns_proxy_id)
|
|
||||||
proxy_api.destroy_rpc_proxy(context, port_id, svr_proxy_id)
|
|
||||||
|
|
||||||
self.db.delete_proxy_service_port(context, service_instance_id)
|
|
@ -1,107 +0,0 @@
|
|||||||
# Copyright 2014 Intel Corporation.
|
|
||||||
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
|
|
||||||
# <isaku.yamahata at gmail com>
|
|
||||||
# All Rights Reserved.
|
|
||||||
#
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
#
|
|
||||||
# @author: Isaku Yamahata, Intel Corporation.
|
|
||||||
|
|
||||||
from tacker.common import rpc_compat
|
|
||||||
from tacker.common import topics
|
|
||||||
from tacker.vm.mgmt_drivers import abstract_driver
|
|
||||||
from tacker.vm.mgmt_drivers import constants
|
|
||||||
|
|
||||||
|
|
||||||
class ServiceVMAgentRpcApi(rpc_compat.RpcProxy):
|
|
||||||
BASE_RPC_API_VERSION = '1.0'
|
|
||||||
|
|
||||||
def __init__(self, topic=topics.SERVICEVM_AGENT):
|
|
||||||
super(ServiceVMAgentRpcApi, self).__init__(
|
|
||||||
topic=topic, default_version=self.BASE_RPC_API_VERSION)
|
|
||||||
|
|
||||||
def rpc_cast(self, context, method, kwargs, topic):
|
|
||||||
self.cast(context, self.make_msg(method, **kwargs), topic=topic)
|
|
||||||
|
|
||||||
|
|
||||||
# TODO(yamahata): port this to oslo.messaging
|
|
||||||
# address format needs be changed to
|
|
||||||
# oslo.messaging.target.Target
|
|
||||||
class AgentRpcMGMTDriver(abstract_driver.DeviceMGMTAbstractDriver):
|
|
||||||
_TOPIC = topics.SERVICEVM_AGENT # can be overridden by subclass
|
|
||||||
_RPC_API = {} # topic -> ServiceVMAgentRpcApi
|
|
||||||
|
|
||||||
@property
|
|
||||||
def _rpc_api(self):
|
|
||||||
topic = self._TOPIC
|
|
||||||
api = self._RPC_API.get(topic)
|
|
||||||
if api is None:
|
|
||||||
api = ServiceVMAgentRpcApi(topic=topic)
|
|
||||||
api = self._RPC_API.setdefault(topic, api)
|
|
||||||
return api
|
|
||||||
|
|
||||||
def get_type(self):
|
|
||||||
return 'agent-rpc'
|
|
||||||
|
|
||||||
def get_name(self):
|
|
||||||
return 'agent-rpc'
|
|
||||||
|
|
||||||
def get_description(self):
|
|
||||||
return 'agent-rpc'
|
|
||||||
|
|
||||||
def mgmt_get_config(self, plugin, context, device):
|
|
||||||
return {'/etc/tacker/servicevm-agent.ini':
|
|
||||||
'[servicevm]\n'
|
|
||||||
'topic = %s\n'
|
|
||||||
'device_id = %s\n'
|
|
||||||
% (self._TOPIC, device['id'])}
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _address(topic, server):
|
|
||||||
return '%s.%s' % (topic, server)
|
|
||||||
|
|
||||||
def _mgmt_server(self, device):
|
|
||||||
return device['id']
|
|
||||||
|
|
||||||
def _mgmt_topic(self, device):
|
|
||||||
return '%s-%s' % (self._TOPIC, self._mgmt_server(device))
|
|
||||||
|
|
||||||
def mgmt_address(self, plugin, context, device):
|
|
||||||
return self._address(self._mgmt_topic(device),
|
|
||||||
self._mgmt_server(device))
|
|
||||||
|
|
||||||
def mgmt_call(self, plugin, context, device, kwargs):
|
|
||||||
topic = device['mgmt_address']
|
|
||||||
method = kwargs[constants.KEY_ACTION]
|
|
||||||
kwargs_ = kwargs[constants.KEY_KWARGS]
|
|
||||||
self._rpc_api.rpc_cast(context, method, kwargs_, topic)
|
|
||||||
|
|
||||||
def _mgmt_service_server(self, device, service_instance):
|
|
||||||
return '%s-%s' % (device['id'], service_instance['id'])
|
|
||||||
|
|
||||||
def _mgmt_service_topic(self, device, service_instance):
|
|
||||||
return '%s-%s' % (self._TOPIC,
|
|
||||||
self._mgmt_service_server(device, service_instance))
|
|
||||||
|
|
||||||
def mgmt_service_address(self, plugin, context, device, service_instance):
|
|
||||||
return self._address(
|
|
||||||
self._mgmt_service_topic(device, service_instance),
|
|
||||||
self._mgmt_service_server(device, service_instance))
|
|
||||||
|
|
||||||
def mgmt_service_call(self, plugin, context, device,
|
|
||||||
service_instance, kwargs):
|
|
||||||
method = kwargs[constants.KEY_ACTION]
|
|
||||||
kwargs_ = kwargs[constants.KEY_KWARGS]
|
|
||||||
topic = service_instance['mgmt_address']
|
|
||||||
self._rpc_api.rpc_cast(context, method, kwargs_, topic)
|
|
@ -27,14 +27,12 @@ from oslo.config import cfg
|
|||||||
|
|
||||||
from tacker.api.v1 import attributes
|
from tacker.api.v1 import attributes
|
||||||
from tacker.common import driver_manager
|
from tacker.common import driver_manager
|
||||||
from tacker.common import topics
|
|
||||||
from tacker.db.vm import vm_db
|
from tacker.db.vm import vm_db
|
||||||
from tacker.extensions import servicevm
|
from tacker.extensions import servicevm
|
||||||
from tacker.openstack.common import excutils
|
from tacker.openstack.common import excutils
|
||||||
from tacker.openstack.common import log as logging
|
from tacker.openstack.common import log as logging
|
||||||
from tacker.plugins.common import constants
|
from tacker.plugins.common import constants
|
||||||
from tacker.vm.mgmt_drivers import constants as mgmt_constants
|
from tacker.vm.mgmt_drivers import constants as mgmt_constants
|
||||||
from tacker.vm import proxy_api
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -114,7 +112,6 @@ class ServiceVMPlugin(vm_db.ServiceResourcePluginDb, ServiceVMMgmtMixin):
|
|||||||
self._device_manager = driver_manager.DriverManager(
|
self._device_manager = driver_manager.DriverManager(
|
||||||
'tacker.servicevm.device.drivers',
|
'tacker.servicevm.device.drivers',
|
||||||
cfg.CONF.servicevm.device_driver)
|
cfg.CONF.servicevm.device_driver)
|
||||||
self.proxy_api = proxy_api.ServiceVMPluginApi(topics.SERVICEVM_AGENT)
|
|
||||||
|
|
||||||
def spawn_n(self, function, *args, **kwargs):
|
def spawn_n(self, function, *args, **kwargs):
|
||||||
self._pool.spawn_n(function, *args, **kwargs)
|
self._pool.spawn_n(function, *args, **kwargs)
|
||||||
|
@ -1,116 +0,0 @@
|
|||||||
# Copyright 2013, 2014 Intel Corporation.
|
|
||||||
# Copyright 2013, 2014 Isaku Yamahata <isaku.yamahata at intel com>
|
|
||||||
# <isaku.yamahata at gmail com>
|
|
||||||
# All Rights Reserved.
|
|
||||||
#
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
#
|
|
||||||
# @author: Isaku Yamahata, Intel Corporation.
|
|
||||||
|
|
||||||
import inspect
|
|
||||||
|
|
||||||
from tacker.api.v1 import attributes
|
|
||||||
from tacker.common import rpc_compat
|
|
||||||
from tacker.openstack.common import excutils
|
|
||||||
from tacker.openstack.common import log as logging
|
|
||||||
from tacker.plugins.common import constants
|
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
# TODO(yamahata): convert oslo.messaging
|
|
||||||
class ServiceVMPluginApi(rpc_compat.RpcProxy):
|
|
||||||
API_VERSION = '1.0'
|
|
||||||
|
|
||||||
def __init__(self, topic):
|
|
||||||
super(ServiceVMPluginApi, self).__init__(topic, self.API_VERSION)
|
|
||||||
|
|
||||||
def _call(self, context, **kwargs):
|
|
||||||
method = inspect.stack()[1][3]
|
|
||||||
LOG.debug('ServiceVMPluginApi method = %s kwargs = %s', method, kwargs)
|
|
||||||
return self.call(context, self.make_msg(method, **kwargs))
|
|
||||||
|
|
||||||
def create_namespace_agent(self, core_plugin, context, network_id):
|
|
||||||
"""
|
|
||||||
:param dst_transport_url: st
|
|
||||||
:type dst_transport_url: str that represents
|
|
||||||
oslo.messaging.transportURL
|
|
||||||
"""
|
|
||||||
port_data = {
|
|
||||||
'name': '_svcvm-rpc-namespace-agent-' + network_id,
|
|
||||||
'network_id': network_id,
|
|
||||||
'mac_address': attributes.ATTR_NOT_SPECIFIED,
|
|
||||||
'admin_state_up': True,
|
|
||||||
'device_id': '_svcvm-rpc-proxy-' + network_id,
|
|
||||||
'device_owner': 'tacker:' + constants.SERVICEVM,
|
|
||||||
'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
|
|
||||||
}
|
|
||||||
port = core_plugin.create_port(context, {'port': port_data})
|
|
||||||
for i in xrange(len(port['fixed_ips'])):
|
|
||||||
ipallocation = port['fixed_ips'][i]
|
|
||||||
subnet_id = ipallocation['subnet_id']
|
|
||||||
subnet = core_plugin.get_subnet(context, subnet_id)
|
|
||||||
ipallocation['subnet'] = subnet
|
|
||||||
port_id = port['id']
|
|
||||||
try:
|
|
||||||
self._call(context, port=port)
|
|
||||||
except Exception:
|
|
||||||
with excutils.save_and_reraise_exception():
|
|
||||||
core_plugin.delete_port(context, port_id)
|
|
||||||
return port_id
|
|
||||||
|
|
||||||
def destroy_namespace_agent(self, core_plugin, context, port_id):
|
|
||||||
self._call(context, port_id=port_id)
|
|
||||||
core_plugin.delete_port(context, port_id)
|
|
||||||
|
|
||||||
def create_rpc_proxy(self, context, port_id, src_target, dst_unix_target,
|
|
||||||
direction):
|
|
||||||
"""
|
|
||||||
:param src_target: target to listen/send
|
|
||||||
:type src_target: oslo.messaging.Target
|
|
||||||
:param dst_unix_target: target to send/listen
|
|
||||||
:type dst_unix_target: oslo.messaging.Target
|
|
||||||
:param direction: RPC direction
|
|
||||||
:type direction: str 'send' or 'receive'
|
|
||||||
'send': tacker server -> agent
|
|
||||||
'receive': neturon server <- agent
|
|
||||||
"""
|
|
||||||
return self._call(context, port_id=port_id, src_target=src_target,
|
|
||||||
dst_unix_target=dst_unix_target, direction=direction)
|
|
||||||
|
|
||||||
def destroy_rpc_proxy(self, context, port_id, rpc_proxy_id):
|
|
||||||
return self._call(context, proxy_id=port_id, rpc_proxy_id=rpc_proxy_id)
|
|
||||||
|
|
||||||
def create_rpc_namespace_proxy(self, context, port_id, src_target,
|
|
||||||
dst_transport_url, dst_target, direction):
|
|
||||||
"""
|
|
||||||
:param src_target: target to listen/send
|
|
||||||
:type src_target: oslo.messaging.Target
|
|
||||||
:param dst_target: target to send/listen
|
|
||||||
:type dst_target: oslo.messaging.Target
|
|
||||||
:param direction: RPC direction
|
|
||||||
:type direction: str 'send' or 'receive'
|
|
||||||
'send': tacker server -> agent
|
|
||||||
'receive': neturon server <- agent
|
|
||||||
"""
|
|
||||||
return self._call(context, port_id=port_id,
|
|
||||||
src_target=src_target,
|
|
||||||
dst_transport_url=dst_transport_url,
|
|
||||||
dst_target=dst_target, direction=direction)
|
|
||||||
|
|
||||||
def destroy_rpc_namespace_proxy(self, context, port_id,
|
|
||||||
namespace_proxy_id):
|
|
||||||
return self._call(context, port_id=port_id,
|
|
||||||
namespace_proxy_id=namespace_proxy_id)
|
|
Loading…
Reference in New Issue
Block a user