Merge "Add Neutron Client Utilities"

This commit is contained in:
Jenkins 2015-03-25 10:06:11 +00:00 committed by Gerrit Code Review
commit 5e9db631bc
5 changed files with 344 additions and 0 deletions

View File

@ -9,3 +9,52 @@
# namespace.
# extension_drivers =
# Example: extension_drivers = anewextensiondriver
#[neutron]
# (StrOpt) neutron server URL
# neutron_server_url =
# Example: neutron_server_url = 'http://127.0.0.1:9696'
# (IntOpt) Timeout value for connecting to neutron in seconds
# url_timeout =
# Example url_timeout = 30
# (StrOpt) User ID for connecting to neutron in admin context
# admin_user_id =
# (StrOpt) Username for connecting to neutron in admin context
# admin_user_name =
# (StrOpt) Password for connecting to neutron in admin context
# admin_password =
# (StrOpt) Tenant id for connecting to neutron in admin context
# admin_tenant_id =
# (StrOpt) Tenant name for connecting to neutron in admin context.
# This option will be ignored if neutron_admin_tenant_id
# is set. Note that with Keystone V3 tenant names are
# only unique within a domain.
# admin_tenant_id =
# (StrOpt) Region name for connecting to neutron in admin context.
# region_name =
# (StrOpt) Authorization URL for connecting to neutron in admin context
# admin_auth_url =
# Example: admin_auth_url = 'http://localhost:5000/v2.0'
# (BoolOpt) If set, ignore any SSL validation issues
# api_insecure =
# Example: api_insecure = False
# (StrOpt) Authorization strategy for connecting to neutron in admin context.
# auth_strategy =
# Example: auth_strategy = 'keystone'
# (IntOpt) Number of seconds before querying neutron for extensions
# extension_sync_interval =
# Example: extension_sync_interval = 600
# (StrOpt) Location of CA certificates file to use for neutron client requests.
# ca_certificates_file =

View File

View File

View File

@ -0,0 +1,154 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from neutron.openstack.common import lockutils
from neutronclient.common import exceptions as neutron_client_exc
from neutronclient.v2_0 import client as clientv20
from oslo.config import cfg
neutron_opts = [
cfg.StrOpt('neutron_server_url',
default='http://127.0.0.1:9696',
help='URL for connecting to neutron'),
cfg.IntOpt('url_timeout',
default=30,
help='Timeout value for connecting to neutron in seconds'),
cfg.StrOpt('admin_user_id',
help='User id for connecting to neutron in admin context'),
cfg.StrOpt('admin_username',
help='Username for connecting to neutron in admin context'),
cfg.StrOpt('admin_password',
help='Password for connecting to neutron in admin context',
secret=True),
cfg.StrOpt('admin_tenant_id',
help='Tenant id for connecting to neutron in admin context'),
cfg.StrOpt('admin_tenant_name',
help='Tenant name for connecting to neutron in admin context. '
'This option will be ignored if neutron_admin_tenant_id '
'is set. Note that with Keystone V3 tenant names are '
'only unique within a domain.'),
# region_name required?
cfg.StrOpt('region_name',
help='Region name for connecting to neutron in admin context'),
cfg.StrOpt('admin_auth_url',
default='http://localhost:5000/v2.0',
help='Authorization URL for connecting to neutron in admin '
'context'),
cfg.BoolOpt('api_insecure',
default=False,
help='If set, ignore any SSL validation issues'),
cfg.StrOpt('auth_strategy',
default='keystone',
help='Authorization strategy for connecting to '
'neutron in admin context'),
# extension_sync_interval required?
cfg.IntOpt('extension_sync_interval',
default=600,
help='Number of seconds before querying neutron for '
'extensions'),
cfg.StrOpt('ca_certificates_file',
help='Location of CA certificates file to use for '
'neutron client requests.'),
]
CONF = cfg.CONF
CONF.register_opts(neutron_opts, 'neutron')
class AdminTokenStore(object):
_instance = None
def __init__(self):
self.admin_auth_token = None
@classmethod
def get(cls):
if cls._instance is None:
cls._instance = cls()
return cls._instance
def _get_client(token=None, admin=False):
params = {
'endpoint_url': CONF.neutron.neutron_server_url,
'timeout': CONF.neutron.url_timeout,
'insecure': CONF.neutron.api_insecure,
'ca_cert': CONF.neutron.ca_certificates_file,
'auth_strategy': CONF.neutron.auth_strategy,
'token': token,
}
if admin:
if CONF.neutron.admin_user_id:
params['user_id'] = CONF.neutron.admin_user_id
else:
params['username'] = CONF.neutron.admin_username
if CONF.neutron.admin_tenant_id:
params['tenant_id'] = CONF.neutron.admin_tenant_id
else:
params['tenant_name'] = CONF.neutron.admin_tenant_name
params['password'] = CONF.neutron.admin_password
params['auth_url'] = CONF.neutron.admin_auth_url
return clientv20.Client(**params)
class ClientWrapper(clientv20.Client):
'''A neutron client wrapper class.
Wraps the callable methods, executes it and updates the token,
as it might change when expires.
'''
def __init__(self, base_client):
# Expose all attributes from the base_client instance
self.__dict__ = base_client.__dict__
self.base_client = base_client
def __getattribute__(self, name):
obj = object.__getattribute__(self, name)
if callable(obj):
obj = object.__getattribute__(self, 'proxy')(obj)
return obj
def proxy(self, obj):
def wrapper(*args, **kwargs):
ret = obj(*args, **kwargs)
new_token = self.base_client.get_auth_info()['auth_token']
_update_token(new_token)
return ret
return wrapper
def _update_token(new_token):
with lockutils.lock('neutron_admin_auth_token_lock'):
token_store = AdminTokenStore.get()
token_store.admin_auth_token = new_token
def get_client(context, admin=False):
if admin or (context.is_admin and not context.auth_token):
with lockutils.lock('neutron_admin_auth_token_lock'):
orig_token = AdminTokenStore.get().admin_auth_token
client = _get_client(orig_token, admin=True)
return ClientWrapper(client)
# We got a user token that we can use as-is
if context.auth_token:
token = context.auth_token
return _get_client(token=token)
# We did not get a user token and we should not be using
# an admin token so log an error
raise neutron_client_exc.Unauthorized()

View File

@ -0,0 +1,141 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import contextlib
import mock
import unittest
from gbpservice.network.neutronv2 import client as neutronclient
from neutronclient.common import exceptions
from neutronclient.v2_0 import client
from oslo.config import cfg
from neutron import context
CONF = cfg.CONF
# NOTE: Neutron client raises Exception which is discouraged by HACKING.
# We set this variable here and use it for assertions below to avoid
# the hacking checks until we can make neutron client throw a custom
# exception class instead.
NEUTRON_CLIENT_EXCEPTION = Exception
class TestNeutronClient(unittest.TestCase):
def setUp(self):
super(TestNeutronClient, self).setUp()
def test_withtoken(self):
CONF.set_override('neutron_server_url',
'http://anyhost',
group='neutron')
CONF.set_override('url_timeout',
30,
group='neutron')
my_context = context.ContextBase('userid',
'my_tenantid',
auth_token='token')
cl = neutronclient.get_client(my_context)
self.assertEqual(CONF.neutron.neutron_server_url,
cl.httpclient.endpoint_url)
self.assertEqual(my_context.auth_token,
cl.httpclient.auth_token)
self.assertEqual(CONF.neutron.url_timeout, cl.httpclient.timeout)
def test_withouttoken(self):
my_context = context.ContextBase('userid', 'my_tenantid')
self.assertRaises(exceptions.Unauthorized,
neutronclient.get_client,
my_context)
def test_withtoken_context_is_admin(self):
CONF.set_override('neutron_server_url',
'http://anyhost',
group='neutron')
CONF.set_override('url_timeout',
30,
group='neutron')
my_context = context.ContextBase('userid',
'my_tenantid',
auth_token='token',
is_admin=True)
cl = neutronclient.get_client(my_context)
self.assertEqual(CONF.neutron.neutron_server_url,
cl.httpclient.endpoint_url)
self.assertEqual(my_context.auth_token,
cl.httpclient.auth_token)
self.assertEqual(CONF.neutron.url_timeout, cl.httpclient.timeout)
def test_withouttoken_keystone_connection_error(self):
CONF.set_override('neutron_server_url',
'http://anyhost',
group='neutron')
CONF.set_override('auth_strategy',
'keystone',
group='neutron')
my_context = context.ContextBase('userid', 'my_tenantid')
self.assertRaises(NEUTRON_CLIENT_EXCEPTION,
neutronclient.get_client,
my_context)
def test_reuse_admin_token(self):
CONF.set_override('neutron_server_url',
'http://anyhost',
group='neutron')
CONF.set_override('url_timeout',
30,
group='neutron')
token_store = neutronclient.AdminTokenStore.get()
token_store.admin_auth_token = 'new_token'
my_context = context.ContextBase('userid', 'my_tenantid',
auth_token='token')
with contextlib.nested(
mock.patch.object(client.Client, "list_networks",
side_effect=mock.Mock),
mock.patch.object(client.Client, 'get_auth_info',
return_value={'auth_token': 'new_token1'}),
):
client1 = neutronclient.get_client(my_context, True)
client1.list_networks(retrieve_all=False)
self.assertEqual('new_token1', token_store.admin_auth_token)
client1 = neutronclient.get_client(my_context, True)
client1.list_networks(retrieve_all=False)
self.assertEqual('new_token1', token_store.admin_auth_token)
def test_admin_token_updated(self):
CONF.set_override('neutron_server_url',
'http://anyhost',
group='neutron')
CONF.set_override('url_timeout',
30,
group='neutron')
token_store = neutronclient.AdminTokenStore.get()
token_store.admin_auth_token = 'new_token'
tokens = [{'auth_token': 'new_token1'}, {'auth_token': 'new_token'}]
my_context = context.ContextBase('userid', 'my_tenantid',
auth_token='token')
with contextlib.nested(
mock.patch.object(client.Client, "list_networks",
side_effect=mock.Mock),
mock.patch.object(client.Client, 'get_auth_info',
side_effect=tokens.pop),
):
client1 = neutronclient.get_client(my_context, True)
client1.list_networks(retrieve_all=False)
self.assertEqual('new_token', token_store.admin_auth_token)
client1 = neutronclient.get_client(my_context, True)
client1.list_networks(retrieve_all=False)
self.assertEqual('new_token1', token_store.admin_auth_token)