Site create RPC

Implement site create RPC. Now tricircle api can utilize RPC
to notify cascade service to start rpc server for new site.

Partially implements: blueprint implement-api
Ref: https://blueprints.launchpad.net/tricircle/+spec/implement-api

Change-Id: I73879a84d31b5ac9004cfe3f18cb9a984d53099c
This commit is contained in:
zhiyuan_cai 2015-11-20 15:31:41 +08:00
parent 0caaa2b979
commit fd5e79451e
22 changed files with 250 additions and 186 deletions

View File

@ -34,7 +34,7 @@ import nova.objects as nova_objects
from nova.objects import base as objects_base
import nova.rpc as nova_rpc
import tricircle.cascade_service.service as service
import tricircle.dispatcher.service as service
def block_db_access():
@ -62,7 +62,7 @@ def process_command_line_arguments():
logging.register_options(cfg.CONF)
logging.set_defaults()
cfg.CONF(sys.argv[1:])
logging.setup(cfg.CONF, "cascade_service", version='0.1')
logging.setup(cfg.CONF, "dispatcher", version='0.1')
def _set_up_nova_objects():

View File

@ -18,9 +18,9 @@ function create_tricircle_accounts {
create_service_user "tricircle"
if [[ "$KEYSTONE_CATALOG_BACKEND" = 'sql' ]]; then
local tricircle_cascade_service=$(get_or_create_service "tricircle" \
local tricircle_dispatcher=$(get_or_create_service "tricircle" \
"Cascading" "OpenStack Cascading Service")
get_or_create_endpoint $tricircle_cascade_service \
get_or_create_endpoint $tricircle_dispatcher \
"$REGION_NAME" \
"$SERVICE_PROTOCOL://$TRICIRCLE_CASCADE_API_HOST:$TRICIRCLE_CASCADE_API_PORT/v1.0" \
"$SERVICE_PROTOCOL://$TRICIRCLE_CASCADE_API_HOST:$TRICIRCLE_CASCADE_API_PORT/v1.0" \
@ -54,18 +54,18 @@ function configure_tricircle_plugin {
if is_service_enabled t-svc ; then
echo "Configuring Neutron for Tricircle Cascade Service"
sudo install -d -o $STACK_USER -m 755 $TRICIRCLE_CONF_DIR
cp -p $TRICIRCLE_DIR/etc/cascade_service.conf $TRICIRCLE_CASCADE_CONF
cp -p $TRICIRCLE_DIR/etc/dispatcher.conf $TRICIRCLE_DISPATCHER_CONF
TRICIRCLE_POLICY_FILE=$TRICIRCLE_CONF_DIR/policy.json
cp $TRICIRCLE_DIR/etc/policy.json $TRICIRCLE_POLICY_FILE
iniset $TRICIRCLE_CASCADE_CONF DEFAULT debug $ENABLE_DEBUG_LOG_LEVEL
iniset $TRICIRCLE_CASCADE_CONF DEFAULT verbose True
setup_colorized_logging $TRICIRCLE_CASCADE_CONF DEFAULT
iniset $TRICIRCLE_CASCADE_CONF DEFAULT bind_host $TRICIRCLE_CASCADE_LISTEN_ADDRESS
iniset $TRICIRCLE_CASCADE_CONF DEFAULT use_syslog $SYSLOG
iniset_rpc_backend tricircle $TRICIRCLE_CASCADE_CONF
iniset $TRICIRCLE_CASCADE_CONF database connection `database_connection_url tricircle`
iniset $TRICIRCLE_DISPATCHER_CONF DEFAULT debug $ENABLE_DEBUG_LOG_LEVEL
iniset $TRICIRCLE_DISPATCHER_CONF DEFAULT verbose True
setup_colorized_logging $TRICIRCLE_DISPATCHER_CONF DEFAULT tenant_name
iniset $TRICIRCLE_DISPATCHER_CONF DEFAULT bind_host $TRICIRCLE_DISPATCHER_LISTEN_ADDRESS
iniset $TRICIRCLE_DISPATCHER_CONF DEFAULT use_syslog $SYSLOG
iniset_rpc_backend tricircle $TRICIRCLE_DISPATCHER_CONF
iniset $TRICIRCLE_DISPATCHER_CONF database connection `database_connection_url tricircle`
fi
}
@ -121,13 +121,13 @@ if [[ "$Q_ENABLE_TRICIRCLE" == "True" ]]; then
echo export PYTHONPATH=\$PYTHONPATH:$TRICIRCLE_DIR >> $RC_DIR/.localrc.auto
recreate_database tricircle
python "$TRICIRCLE_DIR/cmd/manage.py" "$TRICIRCLE_CASCADE_CONF"
python "$TRICIRCLE_DIR/cmd/manage.py" "$TRICIRCLE_DISPATCHER_CONF"
elif [[ "$1" == "stack" && "$2" == "extra" ]]; then
echo_summary "Initializing Cascading Service"
if is_service_enabled t-svc; then
run_process t-svc "python $TRICIRCLE_CASCADE_SERVICE --config-file $TRICIRCLE_CASCADE_CONF --config-dir $TRICIRCLE_CONF_DIR"
run_process t-svc "python $TRICIRCLE_DISPATCHER --config-file $TRICIRCLE_DISPATCHER_CONF --config-dir $TRICIRCLE_CONF_DIR"
fi
if is_service_enabled t-svc-api; then

View File

@ -7,9 +7,9 @@ TRICIRCLE_BRANCH=${TRICIRCLE_BRANCH:-master}
TRICIRCLE_CONF_DIR=${TRICIRCLE_CONF_DIR:-/etc/tricircle}
# cascade service
TRICIRCLE_CASCADE_SERVICE=$TRICIRCLE_DIR/cmd/cascade_service.py
TRICIRCLE_CASCADE_CONF=$TRICIRCLE_CONF_DIR/cascade_service.conf
TRICIRCLE_CASCADE_LISTEN_ADDRESS=${TRICIRCLE_CASCADE_LISTEN_ADDRESS:-0.0.0.0}
TRICIRCLE_DISPATCHER=$TRICIRCLE_DIR/cmd/dispatcher.py
TRICIRCLE_DISPATCHER_CONF=$TRICIRCLE_CONF_DIR/dispatcher.conf
TRICIRCLE_DISPATCHER_LISTEN_ADDRESS=${TRICIRCLE_DISPATCHER_LISTEN_ADDRESS:-0.0.0.0}
# cascade rest api
TRICIRCLE_CASCADE_API=$TRICIRCLE_DIR/cmd/api.py

0
etc/api.conf Executable file → Normal file
View File

View File

@ -20,6 +20,8 @@ import pecan
from pecan import request
from pecan import rest
from tricircle.common import cascading_site_api
from tricircle.common import utils
import tricircle.context as t_context
from tricircle.db import client
from tricircle.db import exception
@ -156,9 +158,9 @@ class SitesController(rest.RestController):
pecan.abort(409, 'Site with name %s exists' % site_name)
return
ag_name = 'ag_%s' % site_name
ag_name = utils.get_ag_name(site_name)
# top site doesn't need az
az_name = 'az_%s' % site_name if not is_top_site else ''
az_name = utils.get_az_name(site_name) if not is_top_site else ''
try:
site_dict = {'site_id': str(uuid.uuid4()),
@ -178,6 +180,8 @@ class SitesController(rest.RestController):
try:
top_client = client.Client()
top_client.create_aggregates(context, ag_name, az_name)
site_api = cascading_site_api.CascadingSiteNotifyAPI()
site_api.create_site(context, site_name)
except Exception as e:
LOG.debug(e.message)
# delete previously created site

View File

@ -1,132 +0,0 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from socket import gethostname
from oslo_config import cfg
from nova import exception
from nova import objects
from nova.scheduler import driver
from nova.scheduler.manager import SchedulerManager
from tricircle.common.utils import get_import_path
from tricircle.cascade_service import site_manager
from tricircle.cascade_service.compute import NovaService
cfg.CONF.import_opt('scheduler_topic', 'nova.scheduler.rpcapi')
_REPORT_INTERVAL = 30
_REPORT_INTERVAL_MAX = 60
def _get_import_path(klass):
return "%s.%s" % (klass.__module__, klass.__name__)
def create_server():
return NovaService(
host=gethostname(),
binary="nova-scheduler",
topic="scheduler", # TODO(saggi): get from conf
db_allowed=False,
periodic_enable=True,
report_interval=_REPORT_INTERVAL,
periodic_interval_max=_REPORT_INTERVAL_MAX,
manager=get_import_path(SchedulerManager),
scheduler_driver=get_import_path(TricircleSchedulerDriver),
)
class _AvailabilityZone(object):
def __init__(self, name, host_manager):
self.name = name
self._host_manager = host_manager
self._site_manager = site_manager.get_instance()
@property
def host_aggregates(self):
for aggregate in self._host_manager.aggregates:
if aggregate.metadata[u'availability_zone'] == self.name:
yield aggregate
@property
def member_hosts(self):
for aggregate in self.host_aggregates:
for host in aggregate.hosts:
yield host
@property
def valid_sites(self):
for host in self.member_hosts:
yield self._site_manager.get_site(host)
class _HostManager(object):
def __init__(self):
self.aggregates = []
# Required methods from OpenStack interface
def update_aggregates(self, aggregates):
# This is not called reliably enough to trust
# we just reload the aggregates on every call
pass
def delete_aggregate(self, aggregate):
# This is not called reliably enough to trust
# we just reload the aggregates on every call
pass
def update_instance_info(self, context, host_name, instance_info):
pass
def delete_instance_info(self, context, host_name, instance_uuid):
pass
def sync_instance_info(self, context, host_name, instance_uuids):
pass
# Tricircle only methods
def get_availability_zone(self, az_name):
return _AvailabilityZone(az_name, self)
def reload_aggregates(self, context):
self.aggregates = objects.AggregateList.get_all(context)
class TricircleSchedulerDriver(driver.Scheduler):
def __init__(self):
super(TricircleSchedulerDriver, self).__init__()
self.host_manager = _HostManager()
self._site_manager = site_manager.get_instance()
def select_destinations(self, ctxt, request_spec, filter_properties):
self.host_manager.reload_aggregates(ctxt)
availability_zone = self.host_manager.get_availability_zone(
request_spec[u'instance_properties'][u'availability_zone'])
for site in availability_zone.valid_sites:
site.prepare_for_instance(request_spec, filter_properties)
return [{
'host': site.name,
'nodename': site.get_nodes()[0].hypervisor_hostname,
'limits': None,
}]
else:
raise exception.NoValidHost(
"No sites match requested availability zone")

View File

@ -0,0 +1,50 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
import oslo_messaging
from tricircle.common import rpc
from tricircle.common import topics
LOG = logging.getLogger(__name__)
class CascadingSiteNotifyAPI(object):
"""API for to notify Cascading service for the site API."""
def __init__(self, topic=topics.CASCADING_SERVICE):
target = oslo_messaging.Target(topic=topic,
exchange="tricircle",
namespace="site",
version='1.0',
fanout=True)
self.client = rpc.create_client(target)
def _cast_message(self, context, method, payload):
"""Cast the payload to the running cascading service instances."""
cctx = self.client.prepare()
LOG.debug('Fanout notify at %(topic)s.%(namespace)s the message '
'%(method)s for CascadingSite. payload: %(payload)s',
{'topic': cctx.target.topic,
'namespace': cctx.target.namespace,
'payload': payload,
'method': method})
cctx.cast(context, method, payload=payload)
def create_site(self, context, site_name):
self._cast_message(context, "create_site", site_name)

View File

@ -22,7 +22,6 @@ from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from tricircle.common import topics
from tricircle.common.serializer import CascadeSerializer as Serializer
TRANSPORT = oslo_messaging.get_transport(cfg.CONF)
@ -58,15 +57,7 @@ class NetworkingRpcApi(object):
'update_port_down', port_id=port_id)
def create_client(component_name):
topic = topics.CASCADING_SERVICE
target = oslo_messaging.Target(
exchange="tricircle",
topic=topic,
namespace=component_name,
version='1.0',
)
def create_client(target):
return oslo_messaging.RPCClient(
TRANSPORT,
target,

View File

@ -17,6 +17,8 @@ import six
from oslo_messaging import Serializer
from neutron.api.v2.attributes import ATTR_NOT_SPECIFIED
import tricircle.context as t_context
class Mapping(object):
def __init__(self, mapping):
@ -70,13 +72,7 @@ class CascadeSerializer(Serializer):
return entity
def serialize_context(self, context):
if self._base is not None:
context = self._base.serialize_context(context)
return context
return context.to_dict()
def deserialize_context(self, context):
if self._base is not None:
context = self._base.deserialize_context(context)
return context
return t_context.Context.from_dict(context)

View File

@ -16,3 +16,11 @@
def get_import_path(cls):
return cls.__module__ + "." + cls.__name__
def get_ag_name(site_name):
return 'ag_%s' % site_name
def get_az_name(site_name):
return 'az_%s' % site_name

View File

@ -18,6 +18,16 @@ from oslo_context import context as oslo_ctx
from tricircle.db import core
def get_db_context():
return Context()
def get_admin_context():
ctx = Context()
ctx.is_admin = True
return ctx
class ContextBase(oslo_ctx.RequestContext):
def __init__(self, auth_token=None, user_id=None, tenant_id=None,
is_admin=False, request_id=None, overwrite=True,

View File

@ -119,7 +119,7 @@ class Client(object):
getattr(self, '%s_resources' % operation),
resource))
def _get_admin_token(self):
def _get_keystone_session(self):
auth = auth_identity.Password(
auth_url=cfg.CONF.client.identity_url,
username=cfg.CONF.client.admin_username,
@ -127,8 +127,13 @@ class Client(object):
project_name=cfg.CONF.client.admin_tenant,
user_domain_name=cfg.CONF.client.admin_user_domain_name,
project_domain_name=cfg.CONF.client.admin_tenant_domain_name)
sess = session.Session(auth=auth)
return sess.get_token()
return session.Session(auth=auth)
def _get_admin_token(self):
return self._get_keystone_session().get_token()
def _get_admin_project_id(self):
return self._get_keystone_session().get_project_id()
def _get_endpoint_from_keystone(self, cxt):
auth = token_endpoint.Token(cfg.CONF.client.identity_url,
@ -285,6 +290,10 @@ class Client(object):
:return: list of dict containing resources information
:raises: EndpointNotAvailable
"""
if cxt.is_admin and not cxt.auth_token:
cxt.auth_token = self._get_admin_token()
cxt.tenant = self._get_admin_project_id()
service = self.resource_service_map[resource]
handle = self.service_handle_map[service]
filters = filters or []
@ -310,6 +319,10 @@ class Client(object):
:return: a dict containing resource information
:raises: EndpointNotAvailable
"""
if cxt.is_admin and not cxt.auth_token:
cxt.auth_token = self._get_admin_token()
cxt.tenant = self._get_admin_project_id()
service = self.resource_service_map[resource]
handle = self.service_handle_map[service]
return handle.handle_create(cxt, resource, *args, **kwargs)
@ -328,6 +341,39 @@ class Client(object):
:return: None
:raises: EndpointNotAvailable
"""
if cxt.is_admin and not cxt.auth_token:
cxt.auth_token = self._get_admin_token()
cxt.tenant = self._get_admin_project_id()
service = self.resource_service_map[resource]
handle = self.service_handle_map[service]
handle.handle_delete(cxt, resource, resource_id)
@_safe_operation('action')
def action_resources(self, resource, cxt, action, *args, **kwargs):
"""Apply action on resource in site of top layer
Directly invoke this method to apply action, or use
action_(resource)s (self, cxt, action, *args, **kwargs). These methods
are automatically generated according to the supported resources of
each ResourceHandle class.
:param resource: resource type
:param cxt: context object
:param action: action applied on resource
:param args, kwargs: passed according to resource type
--------------------------
resource -> action -> args -> kwargs
--------------------------
aggregate -> add_host -> aggregate, host -> none
--------------------------
:return: None
:raises: EndpointNotAvailable
"""
if cxt.is_admin and not cxt.auth_token:
cxt.auth_token = self._get_admin_token()
cxt.tenant = self._get_admin_project_id()
service = self.resource_service_map[resource]
handle = self.service_handle_map[service]
return handle.handle_action(cxt, resource, action, *args, **kwargs)

View File

@ -40,8 +40,9 @@ client_opts = [
cfg.CONF.register_opts(client_opts, group='client')
LIST, CREATE, DELETE = 1, 2, 4
operation_index_map = {'list': LIST, 'create': CREATE, 'delete': DELETE}
LIST, CREATE, DELETE, ACTION = 1, 2, 4, 8
operation_index_map = {'list': LIST, 'create': CREATE,
'delete': DELETE, 'action': ACTION}
LOG = logging.getLogger(__name__)
@ -126,7 +127,7 @@ class NovaResourceHandle(ResourceHandle):
service_type = 'nova'
support_resource = {'flavor': LIST,
'server': LIST,
'aggregate': LIST | CREATE | DELETE}
'aggregate': LIST | CREATE | DELETE | ACTION}
def _get_client(self, cxt):
cli = n_client.Client('2',
@ -177,3 +178,14 @@ class NovaResourceHandle(ResourceHandle):
except n_exceptions.NotFound:
LOG.debug("Delete %(resource)s %(resource_id)s which not found",
{'resource': resource, 'resource_id': resource_id})
def handle_action(self, cxt, resource, action, *args, **kwargs):
try:
client = self._get_client(cxt)
collection = '%ss' % resource
resource_manager = getattr(client, collection)
getattr(resource_manager, action)(*args, **kwargs)
except r_exceptions.ConnectTimeout:
self.endpoint_url = None
raise exception.EndpointNotAvailable('nova',
client.client.management_url)

View File

@ -0,0 +1,32 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import oslo_messaging
from oslo_log import log as logging
from tricircle.dispatcher import site_manager
LOG = logging.getLogger(__name__)
class CascadeSiteServiceEndpoint(object):
target = oslo_messaging.Target(namespace="site",
version='1.0')
def create_site(self, ctx, payload):
site_manager.get_instance().create_site(ctx, payload)

View File

@ -22,11 +22,13 @@ import oslo_messaging
from tricircle.common import topics
from tricircle.common.serializer import CascadeSerializer as Serializer
from tricircle.cascade_service import scheduler
from tricircle.dispatcher import site_manager
# import endpoints here
from tricircle.cascade_service.endpoints.networking import (
from tricircle.dispatcher.endpoints.networking import (
CascadeNetworkingServiceEndpoint)
from tricircle.dispatcher.endpoints.site import (
CascadeSiteServiceEndpoint)
LOG = logging.getLogger(__name__)
@ -54,6 +56,7 @@ def _create_main_cascade_server():
endpoints = [
server_control_endpoint,
CascadeNetworkingServiceEndpoint(),
CascadeSiteServiceEndpoint()
]
server = oslo_messaging.get_rpc_server(
transport,
@ -64,10 +67,11 @@ def _create_main_cascade_server():
)
server_control_endpoint.server = server
# init _SiteManager to start fake nodes
site_manager.get_instance()
return server
def setup_server():
scheduler_server = scheduler.create_server()
scheduler_server.start()
return _create_main_cascade_server()

View File

@ -16,7 +16,11 @@
from oslo_serialization import jsonutils as json
from tricircle.common.singleton import Singleton
from tricircle.cascade_service.compute import ComputeHostManager
from tricircle.common import utils
import tricircle.context as t_context
from tricircle.dispatcher.compute import ComputeHostManager
from tricircle.db import client
from tricircle.db import models
class Node(object):
@ -101,22 +105,38 @@ class _SiteManager(object):
self._sites = {}
self.compute_host_manager = ComputeHostManager(self)
# create fake data
# NOTE(saggi) replace with DAL access when available
self.create_site("Fake01")
self.create_site("Fake02")
sites = models.list_sites(t_context.get_db_context(), [])
for site in sites:
# skip top site
if not site['az_id']:
continue
self.create_site(t_context.get_admin_context(), site['site_name'])
def create_site(self, site_name):
def create_site(self, context, site_name):
"""creates a fake site, in reality the information about available
sites should be pulled from the DAL and not created at will.
"""
# TODO(saggi): thread safty
# TODO(saggi): thread safety
if site_name in self._sites:
raise RuntimeError("Site already exists in site map")
# TODO(zhiyuan): use DHT to judge whether host this site or not
self._sites[site_name] = Site(site_name)
self.compute_host_manager.create_host_adapter(site_name)
ag_name = utils.get_ag_name(site_name)
top_client = client.Client()
aggregates = top_client.list_resources('aggregate', context)
for aggregate in aggregates:
if aggregate['name'] == ag_name:
if site_name in aggregate['hosts']:
return
else:
top_client.action_resources('aggregate', context,
'add_host', aggregate['id'],
site_name)
return
def get_site(self, site_name):
return self._sites[site_name]

View File

@ -70,6 +70,14 @@ class FakeClient(object):
except ValueError:
pass
def action_fake_res(self, name, rename):
if self.endpoint != FAKE_URL:
raise FakeException()
for res in FAKE_RESOURCES:
if res['name'] == name:
res['name'] = rename
break
class FakeResHandle(resource_handle.ResourceHandle):
def _get_client(self, cxt):
@ -100,6 +108,14 @@ class FakeResHandle(resource_handle.ResourceHandle):
self.endpoint_url = None
raise exception.EndpointNotAvailable(FAKE_TYPE, cli.endpoint)
def handle_action(self, cxt, resource, action, name, rename):
try:
cli = self._get_client(cxt)
cli.action_fake_res(name, rename)
except FakeException:
self.endpoint_url = None
raise exception.EndpointNotAvailable(FAKE_TYPE, cli.endpoint)
class ClientTest(unittest.TestCase):
def setUp(self):
@ -133,6 +149,7 @@ class ClientTest(unittest.TestCase):
self.client.operation_resources_map['list'].add(FAKE_RESOURCE)
self.client.operation_resources_map['create'].add(FAKE_RESOURCE)
self.client.operation_resources_map['delete'].add(FAKE_RESOURCE)
self.client.operation_resources_map['action'].add(FAKE_RESOURCE)
self.client.service_handle_map[FAKE_TYPE] = FakeResHandle(None)
def test_list(self):
@ -160,6 +177,12 @@ class ClientTest(unittest.TestCase):
resources = self.client.list_resources(FAKE_RESOURCE, self.context)
self.assertEqual(resources, [{'name': 'res2'}])
def test_action(self):
self.client.action_resources(FAKE_RESOURCE, self.context,
'rename', 'res1', 'res3')
resources = self.client.list_resources(FAKE_RESOURCE, self.context)
self.assertEqual(resources, [{'name': 'res3'}, {'name': 'res2'}])
def test_list_endpoint_not_found(self):
cfg.CONF.set_override(name='auto_refresh_endpoint', override=False,
group='client')