cascade_service: client wrapper

Implement client wrapper in DAL to access resources in top
OpenStack layer.

Partially implements: blueprint implement-dal

Change-Id: I8b6f5efdb49c87b6aeb4b7aad38f54c842c21d40
This commit is contained in:
zhiyuan_cai 2015-08-29 18:24:52 +08:00
parent be30bb2fd8
commit 7a014921f9
6 changed files with 720 additions and 5 deletions

View File

@ -258,6 +258,45 @@
# If set, use this value for pool_timeout with sqlalchemy
# pool_timeout = 10
[client]
# Keystone authentication URL
# auth_url = http://127.0.0.1:5000/v3
# Keystone service URL
# identity_url = http://127.0.0.1:35357/v3
# If set to True, endpoint will be automatically refreshed if timeout
# accessing endpoint.
# auto_refresh_endpoint = False
# Name of top site which client needs to access
# top_site_name =
# Username of admin account for synchronizing endpoint with Keystone
# admin_username =
# Password of admin account for synchronizing endpoint with Keystone
# admin_password =
# Tenant name of admin account for synchronizing endpoint with Keystone
# admin_tenant =
# User domain name of admin account for synchronizing endpoint with Keystone
# admin_user_domain_name = default
# Tenant domain name of admin account for synchronizing endpoint with Keystone
# admin_tenant_domain_name = default
# Timeout for glance client in seconds
# glance_timeout = 60
# Timeout for neutron client in seconds
# neutron_timeout = 60
# Timeout for nova client in seconds
# nova_timeout = 60
[oslo_concurrency]
# Directory to use for lock files. For security, the specified directory should

268
tricircle/db/client.py Normal file
View File

@ -0,0 +1,268 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import inspect
import uuid
from keystoneclient.auth.identity import v3 as auth_identity
from keystoneclient.auth import token_endpoint
from keystoneclient import session
from keystoneclient.v3 import client as keystone_client
from oslo_config import cfg
from oslo_log import log as logging
import tricircle.context as tricircle_context
from tricircle.db import exception
from tricircle.db import models
from tricircle.db import resource_handle
client_opts = [
cfg.StrOpt('auth_url',
default='http://127.0.0.1:5000/v3',
help='keystone authorization url'),
cfg.StrOpt('identity_url',
default='http://127.0.0.1:35357/v3',
help='keystone service url'),
cfg.BoolOpt('auto_refresh_endpoint',
default=False,
help='if set to True, endpoint will be automatically'
'refreshed if timeout accessing endpoint'),
cfg.StrOpt('top_site_name',
help='name of top site which client needs to access'),
cfg.StrOpt('admin_username',
help='username of admin account, needed when'
' auto_refresh_endpoint set to True'),
cfg.StrOpt('admin_password',
help='password of admin account, needed when'
' auto_refresh_endpoint set to True'),
cfg.StrOpt('admin_tenant',
help='tenant name of admin account, needed when'
' auto_refresh_endpoint set to True'),
cfg.StrOpt('admin_user_domain_name',
default='Default',
help='user domain name of admin account, needed when'
' auto_refresh_endpoint set to True'),
cfg.StrOpt('admin_tenant_domain_name',
default='Default',
help='tenant domain name of admin account, needed when'
' auto_refresh_endpoint set to True')
]
client_opt_group = cfg.OptGroup('client')
cfg.CONF.register_group(client_opt_group)
cfg.CONF.register_opts(client_opts, group=client_opt_group)
LOG = logging.getLogger(__name__)
class Client(object):
def __init__(self):
self.auth_url = cfg.CONF.client.auth_url
self.resource_service_map = {}
self.service_handle_map = {}
for _, handle_class in inspect.getmembers(resource_handle):
if not inspect.isclass(handle_class):
continue
if not hasattr(handle_class, 'service_type'):
continue
handle_obj = handle_class(self.auth_url)
self.service_handle_map[handle_obj.service_type] = handle_obj
for resource in handle_obj.support_resource:
self.resource_service_map[resource] = handle_obj.service_type
setattr(self, 'list_%ss' % resource,
functools.partial(self.list_resources, resource))
def _get_admin_token(self):
auth = auth_identity.Password(
auth_url=cfg.CONF.client.identity_url,
username=cfg.CONF.client.admin_username,
password=cfg.CONF.client.admin_password,
project_name=cfg.CONF.client.admin_tenant,
user_domain_name=cfg.CONF.client.admin_user_domain_name,
project_domain_name=cfg.CONF.client.admin_tenant_domain_name)
sess = session.Session(auth=auth)
return sess.get_token()
def _get_endpoint_from_keystone(self, cxt):
auth = token_endpoint.Token(cfg.CONF.client.identity_url,
cxt.auth_token)
sess = session.Session(auth=auth)
cli = keystone_client.Client(session=sess)
service_id_name_map = {}
for service in cli.services.list():
service_dict = service.to_dict()
service_id_name_map[service_dict['id']] = service_dict['name']
region_service_endpoint_map = {}
for endpoint in cli.endpoints.list():
endpoint_dict = endpoint.to_dict()
if endpoint_dict['interface'] != 'public':
continue
region_id = endpoint_dict['region']
service_id = endpoint_dict['service_id']
url = endpoint_dict['url']
service_name = service_id_name_map[service_id]
if region_id not in region_service_endpoint_map:
region_service_endpoint_map[region_id] = {}
region_service_endpoint_map[region_id][service_name] = url
return region_service_endpoint_map
def _get_config_with_retry(self, cxt, filters, site, service, retry):
conf_list = models.list_site_service_configuration(cxt, filters)
if len(conf_list) > 1:
raise exception.EndpointNotUnique(site, service)
if len(conf_list) == 0:
if not retry:
raise exception.EndpointNotFound(site, service)
self._update_endpoint_from_keystone(cxt, True)
return self._get_config_with_retry(cxt,
filters, site, service, False)
return conf_list
def _ensure_endpoint_set(self, cxt, service):
handle = self.service_handle_map[service]
if not handle.is_endpoint_url_set():
site_filters = [{'key': 'site_name',
'comparator': 'eq',
'value': cfg.CONF.client.top_site_name}]
site_list = models.list_sites(cxt, site_filters)
if len(site_list) == 0:
raise exception.ResourceNotFound(models.Site,
cfg.CONF.client.top_site_name)
# site_name is unique key, safe to get the first element
site_id = site_list[0]['site_id']
config_filters = [
{'key': 'site_id', 'comparator': 'eq', 'value': site_id},
{'key': 'service_type', 'comparator': 'eq', 'value': service}]
conf_list = self._get_config_with_retry(
cxt, config_filters, site_id, service,
cfg.CONF.client.auto_refresh_endpoint)
url = conf_list[0]['service_url']
handle.update_endpoint_url(url)
def _update_endpoint_from_keystone(self, cxt, is_internal):
"""Update the database by querying service endpoint url from Keystone
:param cxt: context object
:param is_internal: if True, this method utilizes pre-configured admin
username and password to apply an new admin token, this happens only
when auto_refresh_endpoint is set to True. if False, token in cxt is
directly used, users should prepare admin token themselves
:return: None
"""
if is_internal:
admin_context = tricircle_context.Context()
admin_context.auth_token = self._get_admin_token()
endpoint_map = self._get_endpoint_from_keystone(admin_context)
else:
endpoint_map = self._get_endpoint_from_keystone(cxt)
for region in endpoint_map:
# use region name to query site
site_filters = [{'key': 'site_name', 'comparator': 'eq',
'value': region}]
site_list = models.list_sites(cxt, site_filters)
# skip region/site not registered in cascade service
if len(site_list) != 1:
continue
for service in endpoint_map[region]:
site_id = site_list[0]['site_id']
config_filters = [{'key': 'site_id', 'comparator': 'eq',
'value': site_id},
{'key': 'service_type', 'comparator': 'eq',
'value': service}]
config_list = models.list_site_service_configuration(
cxt, config_filters)
if len(config_list) > 1:
raise exception.EndpointNotUnique(site_id, service)
if len(config_list) == 1:
config_id = config_list[0]['service_id']
update_dict = {
'service_url': endpoint_map[region][service]}
models.update_site_service_configuration(
cxt, config_id, update_dict)
else:
config_dict = {
'service_id': str(uuid.uuid4()),
'site_id': site_id,
'service_name': '%s_%s' % (region, service),
'service_type': service,
'service_url': endpoint_map[region][service]
}
models.create_site_service_configuration(
cxt, config_dict)
def get_endpoint(self, cxt, site_id, service):
"""Get endpoint url of given site and service
:param cxt: context object
:param site_id: site id
:param service: service type
:return: endpoint url for given site and service
:raises: EndpointNotUnique, EndpointNotFound
"""
config_filters = [
{'key': 'site_id', 'comparator': 'eq', 'value': site_id},
{'key': 'service_type', 'comparator': 'eq', 'value': service}]
conf_list = self._get_config_with_retry(
cxt, config_filters, site_id, service,
cfg.CONF.client.auto_refresh_endpoint)
return conf_list[0]['service_url']
def update_endpoint_from_keystone(self, cxt):
"""Update the database by querying service endpoint url from Keystone
Only admin should invoke this method since it requires admin token
:param cxt: context object containing admin token
:return: None
"""
self._update_endpoint_from_keystone(cxt, False)
def list_resources(self, resource, cxt, filters=None):
"""Query resource in site of top layer
Directly invoke this method to query resources, or use
list_(resource)s (self, cxt, filters=None), for example,
list_servers (self, cxt, filters=None). These methods are
automatically generated according to the supported resources
of each ResourceHandle class.
:param resource: resource type
:param cxt: context object
:param filters: list of dict with key 'key', 'comparator', 'value'
like {'key': 'name', 'comparator': 'eq', 'value': 'private'}, 'key'
is the field name of resources
:return: list of dict containing resources information
:raises: EndpointNotAvailable
"""
if resource not in self.resource_service_map:
raise exception.ResourceNotSupported(resource, 'list')
service = self.resource_service_map[resource]
self._ensure_endpoint_set(cxt, service)
handle = self.service_handle_map[service]
filters = filters or []
try:
return handle.handle_list(cxt, resource, filters)
except exception.EndpointNotAvailable as e:
if cfg.CONF.client.auto_refresh_endpoint:
LOG.warn(e.message + ', update endpoint and try again')
self._update_endpoint_from_keystone(cxt, True)
self._ensure_endpoint_set(cxt, service)
return handle.handle_list(cxt, resource, filters)
else:
raise e

View File

@ -14,11 +14,57 @@
# under the License.
class EndpointNotAvailable(Exception):
def __init__(self, service, url):
self.service = service
self.url = url
message = "Endpoint %(url)s for %(service)s is not available" % {
'url': url,
'service': service
}
super(EndpointNotAvailable, self).__init__(message)
class EndpointNotUnique(Exception):
def __init__(self, site, service):
self.site = site
self.service = service
message = "Endpoint for %(service)s in %(site)s not unique" % {
'site': site,
'service': service
}
super(EndpointNotUnique, self).__init__(message)
class EndpointNotFound(Exception):
def __init__(self, site, service):
self.site = site
self.service = service
message = "Endpoint for %(service)s in %(site)s not found" % {
'site': site,
'service': service
}
super(EndpointNotFound, self).__init__(message)
class ResourceNotFound(Exception):
def __init__(self, model, pk_value):
res_type = model.__name__.lower()
message = "Could not find %(res_type)s: %(pk_value)s" % {
'res_type': res_type,
'pk_value': pk_value
def __init__(self, model, unique_key):
resource_type = model.__name__.lower()
self.resource_type = resource_type
self.unique_key = unique_key
message = "Could not find %(resource_type)s: %(unique_key)s" % {
'resource_type': resource_type,
'unique_key': unique_key
}
super(ResourceNotFound, self).__init__(message)
class ResourceNotSupported(Exception):
def __init__(self, resource, method):
self.resource = resource
self.method = method
message = "%(method)s method not supported for %(resource)s" % {
'resource': resource,
'method': method
}
super(ResourceNotSupported, self).__init__(message)

View File

@ -55,6 +55,23 @@ def create_site_service_configuration(context, config_dict):
config_dict)
def delete_site_service_configuration(context, config_id):
with context.session.begin():
return core.delete_resource(context,
SiteServiceConfiguration, config_id)
def list_site_service_configuration(context, filters):
with context.session.begin():
return core.query_resource(context, SiteServiceConfiguration, filters)
def update_site_service_configuration(context, config_id, update_dict):
with context.session.begin():
return core.update_resource(
context, SiteServiceConfiguration, config_id, update_dict)
class Site(core.ModelBase, core.DictBase):
__tablename__ = 'cascaded_sites'
attributes = ['site_id', 'site_name', 'az_id']

View File

@ -0,0 +1,147 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import glanceclient as g_client
import glanceclient.exc as g_exceptions
from neutronclient.common import exceptions as q_exceptions
from neutronclient.neutron import client as q_client
from novaclient import client as n_client
from oslo_config import cfg
from requests import exceptions as r_exceptions
from tricircle.db import exception as exception
client_opts = [
cfg.IntOpt('glance_timeout',
default=60,
help='timeout for glance client in seconds'),
cfg.IntOpt('neutron_timeout',
default=60,
help='timeout for neutron client in seconds'),
cfg.IntOpt('nova_timeout',
default=60,
help='timeout for nova client in seconds'),
]
cfg.CONF.register_opts(client_opts, group='client')
def _transform_filters(filters):
filter_dict = {}
for query_filter in filters:
# only eq filter supported at first
if query_filter['comparator'] != 'eq':
continue
key = query_filter['key']
value = query_filter['value']
filter_dict[key] = value
return filter_dict
class ResourceHandle(object):
def __init__(self, auth_url):
self.auth_url = auth_url
self.endpoint_url = None
def is_endpoint_url_set(self):
return self.endpoint_url is not None
def update_endpoint_url(self, url):
self.endpoint_url = url
class GlanceResourceHandle(ResourceHandle):
service_type = 'glance'
support_resource = ('image', )
def _get_client(self, cxt):
return g_client.Client('1',
token=cxt.auth_token,
auth_url=self.auth_url,
endpoint=self.endpoint_url,
timeout=cfg.CONF.client.glance_timeout)
def handle_list(self, cxt, resource, filters):
if resource not in self.support_resource:
return []
try:
client = self._get_client(cxt)
collection = '%ss' % resource
return [res.to_dict() for res in getattr(
client, collection).list(filters=_transform_filters(filters))]
except g_exceptions.InvalidEndpoint:
self.endpoint_url = None
raise exception.EndpointNotAvailable('glance',
client.http_client.endpoint)
class NeutronResourceHandle(ResourceHandle):
service_type = 'neutron'
support_resource = ('network', 'subnet', 'port', 'router',
'security_group', 'security_group_rule')
def _get_client(self, cxt):
return q_client.Client('2.0',
token=cxt.auth_token,
auth_url=self.auth_url,
endpoint_url=self.endpoint_url,
timeout=cfg.CONF.client.neutron_timeout)
def handle_list(self, cxt, resource, filters):
if resource not in self.support_resource:
return []
try:
client = self._get_client(cxt)
collection = '%ss' % resource
search_opts = _transform_filters(filters)
return [res for res in getattr(
client, 'list_%s' % collection)(**search_opts)[collection]]
except q_exceptions.ConnectionFailed:
self.endpoint_url = None
raise exception.EndpointNotAvailable(
'neutron', client.httpclient.endpoint_url)
class NovaResourceHandle(ResourceHandle):
service_type = 'nova'
support_resource = ('flavor', 'server')
def _get_client(self, cxt):
cli = n_client.Client('2',
auth_token=cxt.auth_token,
auth_url=self.auth_url,
timeout=cfg.CONF.client.nova_timeout)
cli.set_management_url(
self.endpoint_url.replace('$(tenant_id)s', cxt.tenant))
return cli
def handle_list(self, cxt, resource, filters):
if resource not in self.support_resource:
return []
try:
client = self._get_client(cxt)
collection = '%ss' % resource
# only server list supports filter
if resource == 'server':
search_opts = _transform_filters(filters)
return [res.to_dict() for res in getattr(
client, collection).list(search_opts=search_opts)]
else:
return [res.to_dict() for res in getattr(client,
collection).list()]
except r_exceptions.ConnectTimeout:
self.endpoint_url = None
raise exception.EndpointNotAvailable('nova',
client.client.management_url)

View File

@ -0,0 +1,198 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import mock
from oslo_config import cfg
from tricircle import context
from tricircle.db import client
from tricircle.db import core
from tricircle.db import exception
from tricircle.db import models
from tricircle.db import resource_handle
FAKE_AZ = 'fake_az'
FAKE_RESOURCE = 'fake_res'
FAKE_SITE_ID = 'fake_site_id'
FAKE_SITE_NAME = 'fake_site_name'
FAKE_SERVICE_ID = 'fake_service_id'
FAKE_SERVICE_NAME = 'fake_service_name'
FAKE_TYPE = 'fake_type'
FAKE_URL = 'http://127.0.0.1:12345'
FAKE_URL_INVALID = 'http://127.0.0.1:23456'
class FakeException(Exception):
pass
class FakeClient(object):
def __init__(self, url):
self.endpoint = url
self.resources = [{'name': 'res1'}, {'name': 'res2'}]
def list_fake_res(self, search_opts):
# make sure endpoint is correctly set
if self.endpoint != FAKE_URL:
raise FakeException()
if not search_opts:
return [res for res in self.resources]
else:
return [res for res in self.resources if (
res['name'] == search_opts['name'])]
class FakeResHandle(resource_handle.ResourceHandle):
def _get_client(self, cxt):
return FakeClient(self.endpoint_url)
def handle_list(self, cxt, resource, filters):
try:
cli = self._get_client(cxt)
return cli.list_fake_res(
resource_handle._transform_filters(filters))
except FakeException:
self.endpoint_url = None
raise exception.EndpointNotAvailable(FAKE_TYPE, cli.endpoint)
class ClientTest(unittest.TestCase):
def setUp(self):
core.initialize()
core.ModelBase.metadata.create_all(core.get_engine())
self.context = context.Context()
site_dict = {
'site_id': FAKE_SITE_ID,
'site_name': FAKE_SITE_NAME,
'az_id': FAKE_AZ
}
type_dict = {
'id': 1,
'service_type': FAKE_TYPE
}
config_dict = {
'service_id': FAKE_SERVICE_ID,
'site_id': FAKE_SITE_ID,
'service_name': FAKE_SERVICE_NAME,
'service_type': FAKE_TYPE,
'service_url': FAKE_URL
}
models.create_site(self.context, site_dict)
models.create_service_type(self.context, type_dict)
models.create_site_service_configuration(self.context, config_dict)
cfg.CONF.set_override(name='top_site_name', override=FAKE_SITE_NAME,
group='client')
self.client = client.Client()
self.client.resource_service_map[FAKE_RESOURCE] = FAKE_TYPE
self.client.service_handle_map[FAKE_TYPE] = FakeResHandle(None)
def test_list(self):
resources = self.client.list_resources(
FAKE_RESOURCE, self.context, [])
self.assertEqual(resources, [{'name': 'res1'}, {'name': 'res2'}])
def test_list_with_filters(self):
resources = self.client.list_resources(
FAKE_RESOURCE, self.context, [{'key': 'name',
'comparator': 'eq',
'value': 'res2'}])
self.assertEqual(resources, [{'name': 'res2'}])
def test_list_endpoint_not_found(self):
cfg.CONF.set_override(name='auto_refresh_endpoint', override=False,
group='client')
# delete the configuration so endpoint cannot be found
models.delete_site_service_configuration(self.context, FAKE_SERVICE_ID)
# auto refresh set to False, directly raise exception
self.assertRaises(exception.EndpointNotFound,
self.client.list_resources,
FAKE_RESOURCE, self.context, [])
def test_list_endpoint_not_found_retry(self):
cfg.CONF.set_override(name='auto_refresh_endpoint', override=True,
group='client')
# delete the configuration so endpoint cannot be found
models.delete_site_service_configuration(self.context, FAKE_SERVICE_ID)
self.client._get_admin_token = mock.Mock()
self.client._get_endpoint_from_keystone = mock.Mock()
self.client._get_endpoint_from_keystone.return_value = {
FAKE_SITE_NAME: {FAKE_TYPE: FAKE_URL}
}
resources = self.client.list_resources(
FAKE_RESOURCE, self.context, [])
self.assertEqual(resources, [{'name': 'res1'}, {'name': 'res2'}])
def test_list_endpoint_not_unique(self):
# add a new configuration with same site and service type
config_dict = {
'service_id': FAKE_SERVICE_ID + '_new',
'site_id': FAKE_SITE_ID,
'service_name': FAKE_SERVICE_NAME + '_new',
'service_type': FAKE_TYPE,
'service_url': FAKE_URL
}
models.create_site_service_configuration(self.context, config_dict)
self.assertRaises(exception.EndpointNotUnique,
self.client.list_resources,
FAKE_RESOURCE, self.context, [])
def test_list_endpoint_not_valid(self):
cfg.CONF.set_override(name='auto_refresh_endpoint', override=False,
group='client')
update_dict = {'service_url': FAKE_URL_INVALID}
# update url to an invalid one
models.update_site_service_configuration(self.context,
FAKE_SERVICE_ID,
update_dict)
# auto refresh set to False, directly raise exception
self.assertRaises(exception.EndpointNotAvailable,
self.client.list_resources,
FAKE_RESOURCE, self.context, [])
def test_list_endpoint_not_valid_retry(self):
cfg.CONF.set_override(name='auto_refresh_endpoint', override=True,
group='client')
update_dict = {'service_url': FAKE_URL_INVALID}
# update url to an invalid one
models.update_site_service_configuration(self.context,
FAKE_SERVICE_ID,
update_dict)
self.client._get_admin_token = mock.Mock()
self.client._get_endpoint_from_keystone = mock.Mock()
self.client._get_endpoint_from_keystone.return_value = {
FAKE_SITE_NAME: {FAKE_TYPE: FAKE_URL}
}
resources = self.client.list_resources(
FAKE_RESOURCE, self.context, [])
self.assertEqual(resources, [{'name': 'res1'}, {'name': 'res2'}])
def test_get_endpoint(self):
cfg.CONF.set_override(name='auto_refresh_endpoint', override=False,
group='client')
url = self.client.get_endpoint(self.context, FAKE_SITE_ID, FAKE_TYPE)
self.assertEqual(url, FAKE_URL)
def tearDown(self):
core.ModelBase.metadata.drop_all(core.get_engine())