add mgmt drivers(work in progress)

Change-Id: I4bc1b2d59c9082c860d2427089986393b7fbdc34
This commit is contained in:
Isaku Yamahata 2015-04-06 17:51:41 -07:00
parent 04ae125290
commit 59122b433c
4 changed files with 322 additions and 0 deletions

View File

View File

@ -0,0 +1,34 @@
# Copyright 2014 Intel Corporation.
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
from oslo.config import cfg
from tacker.common import topics
_RPC_AGENT_OPTS = [
cfg.StrOpt('device_id', default=None, help=_('The device id')),
cfg.StrOpt('topic', default=topics.SERVICEVM_AGENT,
help=_('rpc topic for agent to subscribe')),
]
def register_servicevm_agent_opts(conf):
conf.register_opts(_RPC_AGENT_OPTS, group='servicevm_agent')

View File

@ -0,0 +1,181 @@
# Copyright 2014 Intel Corporation.
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
from oslo.config import cfg
from tacker.db.vm import proxy_db
from tacker.openstack.common import log as logging
from tacker.vm import constants
from tacker.vm.mgmt_drivers.rpc import rpc
LOG = logging.getLogger(__name__)
class AgentRpcProxyMGMTDriver(rpc.AgentRpcMGMTDriver):
_TRANSPORT_OPTS = [
cfg.StrOpt('dst_transport_url',
# TODO(yamahata): make user, pass, port configurable
# per servicevm
#'<scheme>://<user>:<pass>@<host>:<port>/'
default='rabbit://guest:guest@%(host)s:5672/',
help='A URL representing the messaging driver '
'to use and its full configuration.'),
]
def __init__(self, conf=None):
super(AgentRpcProxyMGMTDriver, self).__init__()
self.db = proxy_db.RpcProxyDb()
self.conf = conf or cfg.CONF
self.conf.register_opts(self._TRANSPORT_OPTS)
def get_type(self):
return 'agent-proxy'
def get_name(self):
return 'agent-proxy'
def get_description(self):
return 'agent-proxy'
def mgmt_create_post(self, plugin, context, device):
LOG.debug('mgmt_create_post')
mgmt_entries = [sc_entry for sc_entry in device['service_context']
if (sc_entry['role'] == constants.ROLE_MGMT and
sc_entry.get('port_id'))]
assert mgmt_entries
mgmt_entry = mgmt_entries[0]
vm_port_id = mgmt_entry['port_id']
vm_port = plugin._core_plugin.get_port(context, vm_port_id)
fixed_ip = vm_port['fixed_ips'][0]['ip_address']
# TODO(yamahata): make all parameters(scheme, user, pass, port)
# configurable
dst_transport_url = self.conf.dst_transport_url % {'host': fixed_ip}
network_id = mgmt_entry['network_id']
assert network_id
proxy_api = plugin.proxy_api
port_id = proxy_api.create_namespace_agent(plugin._core_plugin,
context, network_id)
device_id = device['id']
target = 'topic=%s,server=%s' % (self._mgmt_topic(device),
self._mgmt_server(device))
svr_proxy_id = proxy_api.create_rpc_proxy(
context, port_id, target, target, 'receive')
LOG.debug('mgmt_create_post: svr_proxy_id: %s', svr_proxy_id)
svr_ns_proxy_id = proxy_api.create_rpc_namespace_proxy(
context, port_id, target, dst_transport_url, target, 'receive')
LOG.debug('mgmt_create_post: svr_ns_proxy_id: %s', svr_ns_proxy_id)
clt_proxy_id = proxy_api.create_rpc_proxy(
context, port_id, target, target, 'send')
LOG.debug('mgmt_create_post: clt_proxy_id: %s', clt_proxy_id)
clt_ns_proxy_id = proxy_api.create_rpc_namespace_proxy(
context, port_id, target, dst_transport_url, target, 'send')
LOG.debug('mgmt_create_post: clt_ns_proxy_id: %s', clt_ns_proxy_id)
LOG.debug('mgmt_create_ppost: '
'svr: %s svr_ns: %s clt: %s clt_ns: %s ',
svr_proxy_id, svr_ns_proxy_id, clt_proxy_id, clt_ns_proxy_id)
self.db.create_proxy_mgmt_port(
context, device_id, port_id, dst_transport_url,
svr_proxy_id, svr_ns_proxy_id, clt_proxy_id, clt_ns_proxy_id)
def mgmt_delete_post(self, plugin, context, device):
LOG.debug('mgmt_delete_post')
device_id = device['id']
proxy_mgmt_port = self.db.get_proxy_mgmt_port(context, device_id)
port_id = proxy_mgmt_port['port_id']
svr_proxy_id = proxy_mgmt_port['svr_proxy_id']
svr_ns_proxy_id = proxy_mgmt_port['svr_ns_proxy_id']
clt_proxy_id = proxy_mgmt_port['clt_proxy_id']
clt_ns_proxy_id = proxy_mgmt_port['clt_ns_proxy_id']
proxy_api = plugin.proxy_api
proxy_api.destroy_rpc_namespace_proxy(context,
port_id, clt_ns_proxy_id)
proxy_api.destroy_rpc_proxy(context, port_id, clt_proxy_id)
proxy_api.destroy_rpc_namespace_proxy(context,
port_id, svr_ns_proxy_id)
proxy_api.destroy_rpc_proxy(context, port_id, svr_proxy_id)
proxy_api.destroy_namespace_agent(plugin._core_plugin,
context, port_id)
self.db.delete_proxy_mgmt_port(context, port_id)
def mgmt_service_create_pre(self, plugin, context, device,
service_instance):
LOG.debug('mgmt_service_create_pre')
proxy_mgmt_port = self.db.get_proxy_mgmt_port(context, device['id'])
port_id = proxy_mgmt_port['port_id']
dst_transport_url = proxy_mgmt_port['dst_transport_url']
proxy_api = plugin.proxy_api
target = 'topic=%s,server=%s' % (
self._mgmt_service_topic(device, service_instance),
self._mgmt_service_server(device, service_instance))
svr_proxy_id = proxy_api.create_rpc_proxy(
context, port_id, target, target, 'receive')
LOG.debug('mgmt_service_create_pre: svr_proxy_id: %s', svr_proxy_id)
svr_ns_proxy_id = proxy_api.create_rpc_namespace_proxy(
context, port_id, target, dst_transport_url, target, 'receive')
LOG.debug('mgmt_service_create_pre: svr_ns_proxy_id: %s',
svr_ns_proxy_id)
clt_proxy_id = proxy_api.create_rpc_proxy(
context, port_id, target, target, 'send')
LOG.debug('mgmt_service_create_pre: clt_proxy_id: %s', clt_proxy_id)
clt_ns_proxy_id = proxy_api.create_rpc_namespace_proxy(
context, port_id, target, dst_transport_url, target, 'send')
LOG.debug('mgmt_service_create_pre: clt_ns_proxy_id: %s',
clt_ns_proxy_id)
LOG.debug('mgmt_service_create_pre: '
'svr: %s svr_ns: %s clt: %s clt_ns: %s ',
svr_proxy_id, svr_ns_proxy_id, clt_proxy_id, clt_ns_proxy_id)
self.db.create_proxy_service_port(
context, service_instance['id'],
svr_proxy_id, svr_ns_proxy_id, clt_proxy_id, clt_ns_proxy_id)
def mgmt_service_delete_post(self, plugin, context, device,
service_instance):
LOG.debug('mgmt_service_delete_post')
proxy_mgmt_port = self.db.get_proxy_mgmt_port(context, device['id'])
port_id = proxy_mgmt_port['port_id']
service_instance_id = service_instance['id']
proxy_service_port = self.db.get_proxy_service_port(
context, service_instance_id)
svr_proxy_id = proxy_service_port['svr_proxy_id']
svr_ns_proxy_id = proxy_service_port['svr_ns_proxy_id']
clt_proxy_id = proxy_service_port['clt_proxy_id']
clt_ns_proxy_id = proxy_service_port['clt_ns_proxy_id']
proxy_api = plugin.proxy_api
proxy_api.destroy_rpc_namespace_proxy(context,
port_id, clt_ns_proxy_id)
proxy_api.destroy_rpc_proxy(context, port_id, clt_proxy_id)
proxy_api.destroy_rpc_namespace_proxy(context,
port_id, svr_ns_proxy_id)
proxy_api.destroy_rpc_proxy(context, port_id, svr_proxy_id)
self.db.delete_proxy_service_port(context, service_instance_id)

View File

@ -0,0 +1,107 @@
# Copyright 2014 Intel Corporation.
# Copyright 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Isaku Yamahata, Intel Corporation.
from tacker.common import rpc_compat
from tacker.common import topics
from tacker.vm.mgmt_drivers import abstract_driver
from tacker.vm.mgmt_drivers import constants
class ServiceVMAgentRpcApi(rpc_compat.RpcProxy):
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic=topics.SERVICEVM_AGENT):
super(ServiceVMAgentRpcApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
def rpc_cast(self, context, method, kwargs, topic):
self.cast(context, self.make_msg(method, **kwargs), topic=topic)
# TODO(yamahata): port this to oslo.messaging
# address format needs be changed to
# oslo.messaging.target.Target
class AgentRpcMGMTDriver(abstract_driver.DeviceMGMTAbstractDriver):
_TOPIC = topics.SERVICEVM_AGENT # can be overridden by subclass
_RPC_API = {} # topic -> ServiceVMAgentRpcApi
@property
def _rpc_api(self):
topic = self._TOPIC
api = self._RPC_API.get(topic)
if api is None:
api = ServiceVMAgentRpcApi(topic=topic)
api = self._RPC_API.setdefault(topic, api)
return api
def get_type(self):
return 'agent-rpc'
def get_name(self):
return 'agent-rpc'
def get_description(self):
return 'agent-rpc'
def mgmt_get_config(self, plugin, context, device):
return {'/etc/tacker/servicevm-agent.ini':
'[servicevm]\n'
'topic = %s\n'
'device_id = %s\n'
% (self._TOPIC, device['id'])}
@staticmethod
def _address(topic, server):
return '%s.%s' % (topic, server)
def _mgmt_server(self, device):
return device['id']
def _mgmt_topic(self, device):
return '%s-%s' % (self._TOPIC, self._mgmt_server(device))
def mgmt_address(self, plugin, context, device):
return self._address(self._mgmt_topic(device),
self._mgmt_server(device))
def mgmt_call(self, plugin, context, device, kwargs):
topic = device['mgmt_address']
method = kwargs[constants.KEY_ACTION]
kwargs_ = kwargs[constants.KEY_KWARGS]
self._rpc_api.rpc_cast(context, method, kwargs_, topic)
def _mgmt_service_server(self, device, service_instance):
return '%s-%s' % (device['id'], service_instance['id'])
def _mgmt_service_topic(self, device, service_instance):
return '%s-%s' % (self._TOPIC,
self._mgmt_service_server(device, service_instance))
def mgmt_service_address(self, plugin, context, device, service_instance):
return self._address(
self._mgmt_service_topic(device, service_instance),
self._mgmt_service_server(device, service_instance))
def mgmt_service_call(self, plugin, context, device,
service_instance, kwargs):
method = kwargs[constants.KEY_ACTION]
kwargs_ = kwargs[constants.KEY_KWARGS]
topic = service_instance['mgmt_address']
self._rpc_api.rpc_cast(context, method, kwargs_, topic)