Add filter to list

1.Cyborg should support filtering queries. This patch is the
implementation of client.
2.Add UT for cyborgclient.

Change-Id: Ia4a98be93f2a32e7e818d4831a24652c78b97788
This commit is contained in:
wangzh21 2018-07-30 09:19:50 +08:00
parent 16bdd7cccf
commit 0e01835b7f
20 changed files with 1298 additions and 11 deletions

View File

@ -29,13 +29,25 @@ def common_filters(marker=None, limit=None, sort_key=None, sort_dir=None):
"""
filters = []
if isinstance(limit, int):
filters.append('limit=%s' % limit)
filters.append('filters.field=limit')
filters.append('filters.value=%d' % limit)
if marker is not None:
filters.append('marker=%s' % marker)
filters.append('filters.field=marker')
filters.append('filters.value=%s' % marker)
if sort_key is not None:
filters.append('sort_key=%s' % sort_key)
filters.append('filters.field=sort_key')
filters.append('filters.value=%s' % sort_key)
if sort_dir is not None:
filters.append('sort_dir=%s' % sort_dir)
filters.append('filters.field=sort_dir')
filters.append('filters.value=%s' % sort_dir)
return filters
def add_filters(filters, **kwargs):
if kwargs:
for field, value in kwargs.iteritems():
filters.append('filters.field=%s' % field)
filters.append('filters.value=%s' % value)
return filters

View File

View File

@ -0,0 +1,52 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import fixtures
import testtools
_TRUE_VALUES = ('true', '1', 'yes')
class TestCase(testtools.TestCase):
"""Test case base class for all unit tests."""
def setUp(self):
"""Run before each test method to initialize test environment."""
super(TestCase, self).setUp()
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
try:
test_timeout = int(test_timeout)
except ValueError:
# If timeout value is invalid do not set a timeout.
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
self.useFixture(fixtures.NestedTempfile())
self.useFixture(fixtures.TempHomeDir())
if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
self.log_fixture = self.useFixture(fixtures.FakeLogger())

View File

@ -0,0 +1,313 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
import six
from cyborgclient.common import httpclient as http
from cyborgclient import exceptions as exc
from cyborgclient.tests.unit import utils
def _get_error_body(faultstring=None, debuginfo=None):
error_body = {
'faultstring': faultstring,
'debuginfo': debuginfo
}
raw_error_body = json.dumps(error_body)
body = {'error_message': raw_error_body}
raw_body = json.dumps(body)
return raw_body
HTTP_CLASS = six.moves.http_client.HTTPConnection
HTTPS_CLASS = http.VerifiedHTTPSConnection
DEFAULT_TIMEOUT = 600
class HttpClientTest(utils.BaseTestCase):
def test_url_generation_trailing_slash_in_base(self):
client = http.HTTPClient('http://localhost/')
url = client._make_connection_url('/v1/accelerators')
self.assertEqual('/v1/accelerators', url)
def test_url_generation_without_trailing_slash_in_base(self):
client = http.HTTPClient('http://localhost')
url = client._make_connection_url('/v1/accelerators')
self.assertEqual('/v1/accelerators', url)
def test_url_generation_prefix_slash_in_path(self):
client = http.HTTPClient('http://localhost/')
url = client._make_connection_url('/v1/accelerators')
self.assertEqual('/v1/accelerators', url)
def test_url_generation_without_prefix_slash_in_path(self):
client = http.HTTPClient('http://localhost')
url = client._make_connection_url('v1/accelerators')
self.assertEqual('/v1/accelerators', url)
def test_server_exception_empty_body(self):
error_body = _get_error_body()
fake_resp = utils.FakeResponse({'content-type': 'application/json'},
six.StringIO(error_body),
version=1,
status=500)
client = http.HTTPClient(
'http://localhost/',
api_version='/v1')
client.get_connection = (
lambda *a, **kw: utils.FakeConnection(fake_resp))
error = self.assertRaises(exc.InternalServerError,
client.json_request,
'GET', '/v1/accelerators')
self.assertEqual('Internal Server Error (HTTP 500)', str(error))
def test_server_exception_msg_only(self):
error_msg = 'test error msg'
error_body = _get_error_body(error_msg)
fake_resp = utils.FakeResponse({'content-type': 'application/json'},
six.StringIO(error_body),
version=1,
status=500)
client = http.HTTPClient(
'http://localhost/',
api_version='/v1')
client.get_connection = (
lambda *a, **kw: utils.FakeConnection(fake_resp))
error = self.assertRaises(exc.InternalServerError,
client.json_request,
'GET', '/v1/accelerators')
self.assertEqual(error_msg + ' (HTTP 500)', str(error))
def test_server_exception_msg_and_traceback(self):
error_msg = 'another test error'
error_trace = ("\"Traceback (most recent call last):\\n\\n "
"File \\\"/usr/local/lib/python2.7/...")
error_body = _get_error_body(error_msg, error_trace)
fake_resp = utils.FakeResponse({'content-type': 'application/json'},
six.StringIO(error_body),
version=1,
status=500)
client = http.HTTPClient(
'http://localhost/',
api_version='/v1')
client.get_connection = (
lambda *a, **kw: utils.FakeConnection(fake_resp))
error = self.assertRaises(exc.InternalServerError,
client.json_request,
'GET', '/v1/accelerators')
self.assertEqual(
'%(error)s (HTTP 500)\n%(trace)s' % {'error': error_msg,
'trace': error_trace},
"%(error)s\n%(details)s" % {'error': str(error),
'details': str(error.details)})
def test_get_connection_params(self):
endpoint = 'http://cyborg-host:6666'
expected = (HTTP_CLASS,
('cyborg-host', 6666, ''),
{'timeout': DEFAULT_TIMEOUT})
params = http.HTTPClient.get_connection_params(endpoint)
self.assertEqual(expected, params)
def test_get_connection_params_with_trailing_slash(self):
endpoint = 'http://cyborg-host:6666/'
expected = (HTTP_CLASS,
('cyborg-host', 6666, ''),
{'timeout': DEFAULT_TIMEOUT})
params = http.HTTPClient.get_connection_params(endpoint)
self.assertEqual(expected, params)
def test_get_connection_params_with_ssl(self):
endpoint = 'https://cyborg-host:6666'
expected = (HTTPS_CLASS,
('cyborg-host', 6666, ''),
{
'timeout': DEFAULT_TIMEOUT,
'ca_file': None,
'cert_file': None,
'key_file': None,
'insecure': False,
})
params = http.HTTPClient.get_connection_params(endpoint)
self.assertEqual(expected, params)
def test_get_connection_params_with_ssl_params(self):
endpoint = 'https://cyborg-host:6666'
ssl_args = {
'ca_file': '/path/to/ca_file',
'cert_file': '/path/to/cert_file',
'key_file': '/path/to/key_file',
'insecure': True,
}
expected_kwargs = {'timeout': DEFAULT_TIMEOUT}
expected_kwargs.update(ssl_args)
expected = (HTTPS_CLASS,
('cyborg-host', 6666, ''),
expected_kwargs)
params = http.HTTPClient.get_connection_params(endpoint, **ssl_args)
self.assertEqual(expected, params)
def test_get_connection_params_with_timeout(self):
endpoint = 'http://cyborg-host:6666'
expected = (HTTP_CLASS,
('cyborg-host', 6666, ''),
{'timeout': 300.0})
params = http.HTTPClient.get_connection_params(endpoint, timeout=300)
self.assertEqual(expected, params)
def test_get_connection_params_with_version(self):
endpoint = 'http://cyborg-host:6666/v1'
expected = (HTTP_CLASS,
('cyborg-host', 6666, ''),
{'timeout': DEFAULT_TIMEOUT})
params = http.HTTPClient.get_connection_params(endpoint)
self.assertEqual(expected, params)
def test_get_connection_params_with_version_trailing_slash(self):
endpoint = 'http://cyborg-host:6666/v1/'
expected = (HTTP_CLASS,
('cyborg-host', 6666, ''),
{'timeout': DEFAULT_TIMEOUT})
params = http.HTTPClient.get_connection_params(endpoint)
self.assertEqual(expected, params)
def test_get_connection_params_with_subpath(self):
endpoint = 'http://cyborg-host:6666/cyborg'
expected = (HTTP_CLASS,
('cyborg-host', 6666, '/cyborg'),
{'timeout': DEFAULT_TIMEOUT})
params = http.HTTPClient.get_connection_params(endpoint)
self.assertEqual(expected, params)
def test_get_connection_params_with_subpath_trailing_slash(self):
endpoint = 'http://cyborg-host:6666/cyborg/'
expected = (HTTP_CLASS,
('cyborg-host', 6666, '/cyborg'),
{'timeout': DEFAULT_TIMEOUT})
params = http.HTTPClient.get_connection_params(endpoint)
self.assertEqual(expected, params)
def test_get_connection_params_with_subpath_version(self):
endpoint = 'http://cyborg-host:6666/cyborg/v1'
expected = (HTTP_CLASS,
('cyborg-host', 6666, '/cyborg'),
{'timeout': DEFAULT_TIMEOUT})
params = http.HTTPClient.get_connection_params(endpoint)
self.assertEqual(expected, params)
def test_get_connection_params_with_subpath_version_trailing_slash(self):
endpoint = 'http://cyborg-host:6666/cyborg/v1/'
expected = (HTTP_CLASS,
('cyborg-host', 6666, '/cyborg'),
{'timeout': DEFAULT_TIMEOUT})
params = http.HTTPClient.get_connection_params(endpoint)
self.assertEqual(expected, params)
def test_401_unauthorized_exception(self):
error_body = _get_error_body()
fake_resp = utils.FakeResponse({'content-type': 'text/plain'},
six.StringIO(error_body),
version=1,
status=401)
client = http.HTTPClient(
'http://localhost/',
api_version='/v1')
client.get_connection = (lambda *a,
**kw: utils.FakeConnection(fake_resp))
self.assertRaises(exc.Unauthorized, client.json_request,
'GET', '/v1/accelerators')
class SessionClientTest(utils.BaseTestCase):
def test_server_exception_msg_and_traceback(self):
error_msg = 'another test error'
error_trace = ("\"Traceback (most recent call last):\\n\\n "
"File \\\"/usr/local/lib/python2.7/...")
error_body = _get_error_body(error_msg, error_trace)
fake_session = utils.FakeSession({'Content-Type': 'application/json'},
error_body,
500)
client = http.SessionClient(
api_version='/v1',
session=fake_session)
error = self.assertRaises(exc.InternalServerError,
client.json_request,
'GET', '/v1/accelerators')
self.assertEqual(
'%(error)s (HTTP 500)\n%(trace)s' % {'error': error_msg,
'trace': error_trace},
"%(error)s\n%(details)s" % {'error': str(error),
'details': str(error.details)})
def test_server_exception_empty_body(self):
error_body = _get_error_body()
fake_session = utils.FakeSession({'Content-Type': 'application/json'},
error_body,
500)
client = http.SessionClient(
api_version='/v1',
session=fake_session)
error = self.assertRaises(exc.InternalServerError,
client.json_request,
'GET', '/v1/accelerators')
self.assertEqual('Internal Server Error (HTTP 500)', str(error))
def test_bypass_url(self):
fake_response = utils.FakeSessionResponse(
{}, content="", status_code=201)
fake_session = mock.MagicMock()
fake_session.request.side_effect = [fake_response]
client = http.SessionClient(
api_version='/v1',
session=fake_session, endpoint_override='http://cyborg')
client.json_request('GET', '/v1/accelerators')
self.assertEqual(
fake_session.request.call_args[1]['endpoint_override'],
'http://cyborg'
)
def test_exception(self):
fake_response = utils.FakeSessionResponse(
{}, content="", status_code=504)
fake_session = mock.MagicMock()
fake_session.request.side_effect = [fake_response]
client = http.SessionClient(
api_version='/v1',
session=fake_session, endpoint_override='http://cyborg')
self.assertRaises(exc.GatewayTimeout,
client.json_request,
'GET', '/v1/accelerators')

View File

@ -0,0 +1,38 @@
#
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cyborgclient.common import utils
from cyborgclient.tests.unit import utils as test_utils
class CommonFiltersTest(test_utils.BaseTestCase):
def test_limit(self):
result = utils.common_filters(limit=42)
self.assertEqual(['filters.field=limit', 'filters.value=42'], result)
def test_limit_0(self):
result = utils.common_filters(limit=0)
self.assertEqual(['filters.field=limit', 'filters.value=0'], result)
def test_limit_negative_number(self):
result = utils.common_filters(limit=-2)
self.assertEqual(['filters.field=limit', 'filters.value=-2'], result)
def test_other(self):
for key in ('marker', 'sort_key', 'sort_dir'):
result = utils.common_filters(**{key: 'test'})
self.assertEqual(['filters.field=%s' % key, 'filters.value=test'],
result)

View File

View File

@ -0,0 +1,279 @@
# Copyright 2015 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import sys
import fixtures
from keystoneauth1 import fixture
import mock
import six
from testtools import matchers
from cyborgclient import exceptions
import cyborgclient.shell
from cyborgclient.tests.unit import utils
FAKE_ENV = {'OS_USERNAME': 'username',
'OS_PASSWORD': 'password',
'OS_PROJECT_NAME': 'project_name',
'OS_AUTH_URL': 'http://no.where/v2.0'}
FAKE_ENV2 = {'OS_USER_ID': 'user_id',
'OS_PASSWORD': 'password',
'OS_PROJECT_ID': 'project_id',
'OS_AUTH_URL': 'http://no.where/v2.0'}
FAKE_ENV3 = {'OS_USERNAME': 'username',
'OS_PASSWORD': 'password',
'OS_PROJECT_ID': 'project_id',
'OS_AUTH_URL': 'http://no.where/v2.0'}
FAKE_ENV4 = {'OS_USERNAME': 'username',
'OS_PASSWORD': 'password',
'OS_PROJECT_ID': 'project_id',
'OS_USER_DOMAIN_NAME': 'Default',
'OS_PROJECT_DOMAIN_NAME': 'Default',
'OS_AUTH_URL': 'http://no.where/v3'}
def _create_ver_list(versions):
return {'versions': {'values': versions}}
class ShellTest(utils.TestCase):
AUTH_URL = utils.FAKE_ENV['OS_AUTH_URL']
_msg_no_tenant_project = ("You must provide a project name or project id"
" via --os-project-name, --os-project-id,"
" env[OS_PROJECT_NAME] or env[OS_PROJECT_ID]")
def setUp(self):
super(ShellTest, self).setUp()
self.nc_util = mock.patch(
'cyborgclient.common.cliutils.isunauthenticated').start()
self.nc_util.return_value = False
def test_help_unknown_command(self):
self.assertRaises(exceptions.CommandError, self.shell, 'help foofoo')
def test_help(self):
required = [
'.*?^usage: ',
'.*?^See "cyborg help COMMAND" for help on a specific command',
]
stdout, stderr = self.shell('help')
for r in required:
self.assertThat((stdout + stderr),
matchers.MatchesRegex(r, re.DOTALL | re.MULTILINE))
def test_help_no_options(self):
required = [
'.*?^usage: ',
'.*?^See "cyborg help COMMAND" for help on a specific command.',
]
stdout, stderr = self.shell('')
for r in required:
self.assertThat((stdout + stderr),
matchers.MatchesRegex(r, re.DOTALL | re.MULTILINE))
def test_bash_completion(self):
stdout, stderr = self.shell('bash-completion')
# just check we have some output
required = [
'.*--fields',
'.*help',
'.*accelerator-list',
'.*--help']
for r in required:
self.assertThat((stdout + stderr),
matchers.MatchesRegex(r, re.DOTALL | re.MULTILINE))
def test_no_username(self):
required = ('You must provide a username via either'
' --os-username or via env[OS_USERNAME]')
self.make_env(exclude='OS_USERNAME')
try:
self.shell('accelerator-list')
except exceptions.CommandError as exc:
self.assertEqual(required, exc.message)
else:
self.fail('CommandError not raised')
def test_no_user_id(self):
required = ('You must provide a username via'
' either --os-username or via env[OS_USERNAME]')
self.make_env(exclude='OS_USER_ID', fake_env=FAKE_ENV2)
try:
self.shell('accelerator-list')
except exceptions.CommandError as exc:
self.assertEqual(required, exc.message)
else:
self.fail('CommandError not raised')
def test_no_project_name(self):
required = self._msg_no_tenant_project
self.make_env(exclude='OS_PROJECT_NAME')
try:
self.shell('accelerator-list')
except exceptions.CommandError as exc:
self.assertEqual(required, exc.message)
else:
self.fail('CommandError not raised')
def test_no_project_id(self):
required = self._msg_no_tenant_project
self.make_env(exclude='OS_PROJECT_ID', fake_env=FAKE_ENV3)
try:
self.shell('accelerator-list')
except exceptions.CommandError as exc:
self.assertEqual(required, exc.message)
else:
self.fail('CommandError not raised')
def test_no_auth_url(self):
required = ('You must provide an auth url'
' via either --os-auth-url or via env[OS_AUTH_URL]')
self.make_env(exclude='OS_AUTH_URL')
try:
self.shell('accelerator-list')
except exceptions.CommandError as exc:
self.assertEqual(required, exc.message)
else:
self.fail('CommandError not raised')
@mock.patch('cyborgclient.v1.client.Client')
def test_service_type(self, mock_client):
self.make_env()
self.shell('accelerator-list')
_, client_kwargs = mock_client.call_args_list[0]
self.assertEqual('accelerator', client_kwargs['service_type'])
@mock.patch('cyborgclient.v1.client.Client')
def test_insecure(self, mock_client):
self.make_env()
self.shell('--insecure accelerator-list')
_, session_kwargs = mock_client.call_args_list[0]
self.assertEqual(True, session_kwargs['insecure'])
@mock.patch('sys.stdin', side_effect=mock.MagicMock)
@mock.patch('getpass.getpass', side_effect=EOFError)
def test_no_password(self, mock_getpass, mock_stdin):
required = ('You must provide a password via either --os-password, '
'env[OS_PASSWORD], or prompted response')
self.make_env(exclude='OS_PASSWORD')
try:
self.shell('accelerator-list')
except exceptions.CommandError as exc:
self.assertEqual(required, exc.message)
else:
self.fail('CommandError not raised')
@mock.patch('sys.argv', ['cyborg'])
@mock.patch('sys.stdout', six.StringIO())
@mock.patch('sys.stderr', six.StringIO())
def test_main_noargs(self):
# Ensure that main works with no command-line arguments
try:
cyborgclient.shell.main()
except SystemExit:
self.fail('Unexpected SystemExit')
# We expect the normal usage as a result
self.assertIn('Command-line interface to the OpenStack Cyborg API',
sys.stdout.getvalue())
@mock.patch('cyborgclient.v1.client.Client')
def _test_main_region(self, command, expected_region_name, mock_client):
self.shell(command)
mock_client.assert_called_once_with(
api_version='latest', auth_token=None,
auth_url=self.AUTH_URL, cloud=None, cyborg_url=None,
insecure=False, interface='public', password='password',
project_domain_id=None, project_domain_name=None, project_id=None,
project_name='project_name', region_name=expected_region_name,
service_type='accelerator', user_domain_id=None,
user_domain_name=None, user_id=None, username='username')
def test_main_option_region(self):
self.make_env()
self._test_main_region('--os-region-name=myregion accelerator-list',
'myregion')
def test_main_env_region(self):
fake_env = dict(utils.FAKE_ENV, OS_REGION_NAME='myregion')
self.make_env(fake_env=fake_env)
self._test_main_region('accelerator-list', 'myregion')
def test_main_no_region(self):
self.make_env()
self._test_main_region('accelerator-list', None)
@mock.patch('cyborgclient.v1.client.Client')
def test_main_endpoint_public(self, mock_client):
self.make_env()
self.shell('--endpoint-type publicURL accelerator-list')
mock_client.assert_called_once_with(
username='username', password='password',
interface='public', project_id=None,
project_name='project_name', auth_url=self.AUTH_URL,
service_type='accelerator', region_name=None,
project_domain_id=None, project_domain_name=None,
user_domain_id=None, user_domain_name=None,
insecure=False, user_id=None, api_version='latest',
auth_token=None, cyborg_url=None, cloud=None)
@mock.patch('cyborgclient.v1.client.Client')
def test_main_endpoint_internal(self, mock_client):
self.make_env()
self.shell('--endpoint-type internalURL accelerator-list')
mock_client.assert_called_once_with(
api_version='latest', auth_token=None,
auth_url=self.AUTH_URL, cloud=None, cyborg_url=None,
insecure=False, interface='internal', password='password',
project_domain_id=None, project_domain_name=None, project_id=None,
project_name='project_name', region_name=None,
service_type='accelerator', user_domain_id=None,
user_domain_name=None, user_id=None, username='username')
class ShellTestKeystoneV3(ShellTest):
AUTH_URL = 'http://no.where/v3'
def make_env(self, exclude=None, fake_env=FAKE_ENV):
if 'OS_AUTH_URL' in fake_env:
fake_env.update({'OS_AUTH_URL': self.AUTH_URL})
env = dict((k, v) for k, v in fake_env.items() if k != exclude)
self.useFixture(fixtures.MonkeyPatch('os.environ', env))
def register_keystone_discovery_fixture(self, mreq):
v3_url = "http://no.where/v3"
v3_version = fixture.V3Discovery(v3_url)
mreq.register_uri(
'GET', v3_url, json=_create_ver_list([v3_version]),
status_code=200)
@mock.patch('cyborgclient.v1.client.Client')
def test_main_endpoint_public(self, mock_client):
self.make_env(fake_env=FAKE_ENV4)
self.shell('--endpoint-type publicURL accelerator-list')
mock_client.assert_called_once_with(
username='username', password='password',
interface='public', project_id='project_id',
project_name=None, auth_url=self.AUTH_URL,
service_type='accelerator', region_name=None,
project_domain_id=None, project_domain_name='Default',
user_domain_id=None, user_domain_name='Default',
insecure=False, user_id=None, api_version='latest',
auth_token=None, cyborg_url=None, cloud=None)

View File

@ -0,0 +1,179 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import datetime
import os
import sys
import fixtures
import six
import testtools
from cyborgclient.common import httpclient as http
from cyborgclient import shell
FAKE_ENV = {'OS_USERNAME': 'username',
'OS_PASSWORD': 'password',
'OS_PROJECT_NAME': 'project_name',
'OS_AUTH_URL': 'http://no.where/v2.0'}
class BaseTestCase(testtools.TestCase):
def setUp(self):
super(BaseTestCase, self).setUp()
self.useFixture(fixtures.FakeLogger())
class FakeAPI(object):
def __init__(self, responses):
self.responses = responses
self.calls = []
def _request(self, method, url, headers=None, body=None):
call = (method, url, headers or {}, body)
self.calls.append(call)
return self.responses[url][method]
def raw_request(self, *args, **kwargs):
response = self._request(*args, **kwargs)
body_iter = http.ResponseBodyIterator(six.StringIO(response[1]))
return FakeResponse(response[0]), body_iter
def json_request(self, *args, **kwargs):
response = self._request(*args, **kwargs)
return FakeResponse(response[0]), response[1]
class FakeConnection(object):
def __init__(self, response=None):
self._response = response
self._last_request = None
def request(self, method, conn_url, **kwargs):
self._last_request = (method, conn_url, kwargs)
def setresponse(self, response):
self._response = response
def getresponse(self):
return self._response
class FakeResponse(object):
def __init__(self, headers, body=None, version=None, status=None,
reason=None):
"""Fake object to help testing.
:param headers: dict representing HTTP response headers
:param body: file-like object
"""
self.headers = headers
self.body = body
self.version = version
self.status = status
self.reason = reason
def getheaders(self):
return copy.deepcopy(self.headers).items()
def getheader(self, key, default):
return self.headers.get(key, default)
def read(self, amt):
return self.body.read(amt)
class FakeServiceCatalog(object):
def url_for(self, endpoint_type, service_type, attr=None,
filter_value=None):
if attr == 'region' and filter_value:
return 'http://regionhost:6666/v1/f14b41234'
else:
return 'http://localhost:6666/v1/f14b41234'
class FakeKeystone(object):
service_catalog = FakeServiceCatalog()
timestamp = datetime.datetime.utcnow() + datetime.timedelta(days=5)
def __init__(self, auth_token):
self.auth_token = auth_token
self.auth_ref = {
'token': {'expires': FakeKeystone.timestamp.strftime(
'%Y-%m-%dT%H:%M:%S.%f'),
'id': 'd1a541311782870742235'}
}
class TestCase(testtools.TestCase):
TEST_REQUEST_BASE = {
'verify': True,
}
def setUp(self):
super(TestCase, self).setUp()
if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
os.environ.get('OS_STDOUT_CAPTURE') == '1'):
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
os.environ.get('OS_STDERR_CAPTURE') == '1'):
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
def make_env(self, exclude=None, fake_env=FAKE_ENV):
env = dict((k, v) for k, v in fake_env.items() if k != exclude)
self.useFixture(fixtures.MonkeyPatch('os.environ', env))
def shell(self, argstr, exitcodes=(0,)):
orig = sys.stdout
orig_stderr = sys.stderr
try:
sys.stdout = six.StringIO()
sys.stderr = six.StringIO()
_shell = shell.OpenStackCyborgShell()
_shell.main(argstr.split())
except SystemExit:
exc_type, exc_value, exc_traceback = sys.exc_info()
self.assertIn(exc_value.code, exitcodes)
finally:
stdout = sys.stdout.getvalue()
sys.stdout.close()
sys.stdout = orig
stderr = sys.stderr.getvalue()
sys.stderr.close()
sys.stderr = orig_stderr
return (stdout, stderr)
class FakeSessionResponse(object):
def __init__(self, headers, content=None, status_code=None):
self.headers = headers
self.content = content
self.status_code = status_code
class FakeSession(object):
def __init__(self, headers, content=None, status_code=None):
self.headers = headers
self.content = content
self.status_code = status_code
def request(self, url, method, **kwargs):
return FakeSessionResponse(self.headers, self.content,
self.status_code)

View File

View File

@ -0,0 +1,53 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import mock
from testtools import matchers
from cyborgclient.tests.unit import utils
FAKE_ENV = {'OS_USERNAME': 'username',
'OS_PASSWORD': 'password',
'OS_PROJECT_NAME': 'project_name',
'OS_AUTH_URL': 'http://no.where/v2.0',
'BYPASS_URL': 'http://cyborg'}
class TestCommandLineArgument(utils.TestCase):
def setUp(self):
super(TestCommandLineArgument, self).setUp()
self.make_env(fake_env=FAKE_ENV)
session_client = mock.patch(
'cyborgclient.common.httpclient.SessionClient')
session_client.start()
loader = mock.patch('keystoneauth1.loading.get_plugin_loader')
loader.start()
session = mock.patch('keystoneauth1.session.Session')
session.start()
self.addCleanup(session_client.stop)
self.addCleanup(loader.stop)
self.addCleanup(session.stop)
def _test_arg_success(self, command):
stdout, stderr = self.shell(command)
def _test_arg_failure(self, command, error_msg):
stdout, stderr = self.shell(command, (2,))
for line in error_msg:
self.assertThat((stdout + stderr),
matchers.MatchesRegex(line,
re.DOTALL | re.MULTILINE))

View File

@ -0,0 +1,70 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from testtools import matchers
from cyborgclient.tests.unit import utils
from cyborgclient.v1 import accelerators
ACCELERATOR1 = {"user_id": "3a4b753552964978af7f76ce9fecf7d0",
"description": "test", "links":
[{"href": "http://127.0.0.1:6666/v1/accelerators/fake_uuid",
"rel": "self"},
{"href": "http://127.0.0.1:6666/accelerators/fake_uuid",
"rel": "bookmark"}],
"acc_capability": "test",
"created_at": "2018-07-20T07:45:04+00:00",
"vendor_id": "test", "updated_at": None,
"acc_type": None, "name": "test-cyborg-create",
"product_id": "test", "device_type": "test", "remotable": 1,
"project_id": None,
"uuid": "4cc55aab-dac6-486f-ad14-284c8e554589"}
ACCELERATOR2 = {"user_id": "3a4b753552964978af7f76ce9fecf7d0",
"description": "test", "links":
[{"href": "http://127.0.0.1:6666/v1/accelerators/fake_uuid",
"rel": "self"},
{"href": "http://127.0.0.1:6666/accelerators/fake_uuid",
"rel": "bookmark"}],
"acc_capability": "test",
"created_at": "2018-07-20T08:11:14+00:00",
"vendor_id": "test", "updated_at": None,
"acc_type": None, "name": "test-cyborg-create",
"product_id": "test", "device_type": "test", "remotable": 1,
"project_id": None,
"uuid": "a444397a-deba-4c94-984f-ce35fbcdec42"}
fake_responses = {
'/v1/accelerators':
{
'GET': (
{},
{'accelerators': [ACCELERATOR1, ACCELERATOR2]},
)
}
}
class AcceleratorManagerTest(testtools.TestCase):
def setUp(self):
super(AcceleratorManagerTest, self).setUp()
self.api = utils.FakeAPI(fake_responses)
self.mgr = accelerators.AcceleratorManager(self.api)
def test_accelerators_list(self):
accelerators = self.mgr.list()
expect = [
('GET', '/v1/accelerators', {}, None),
]
self.assertEqual(expect, self.api.calls)
self.assertThat(accelerators, matchers.HasLength(2))

View File

@ -0,0 +1,22 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from cyborgclient.tests.unit.v1 import shell_test_base
class ShellTest(shell_test_base.TestCommandLineArgument):
@mock.patch('cyborgclient.common.cliutils.print_list')
def test_accelerator_list(self, mock_print_list):
pass

View File

@ -0,0 +1,68 @@
# Copyright (c) 2015 Thales Services SAS
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import testtools
from cyborgclient.v1 import client
class ClientTest(testtools.TestCase):
@mock.patch('cyborgclient.common.httpclient.SessionClient')
@mock.patch('keystoneauth1.session.Session')
def test_init_with_session(self, mock_session, http_client):
session = mock.Mock()
client.Client(session=session)
mock_session.assert_not_called()
http_client.assert_called_once_with(
interface='public',
region_name=None,
service_name=None,
service_type='accelerator',
session=session,
endpoint_override=None,
api_version=None)
@mock.patch('cyborgclient.common.httpclient.SessionClient')
@mock.patch('keystoneauth1.session.Session')
def test_init_with_endpoint_override(self, mock_session, http_client):
session = mock.Mock()
client.Client(session=session, endpoint_override='cyborgurl')
mock_session.assert_not_called()
http_client.assert_called_once_with(
interface='public',
region_name=None,
service_name=None,
service_type='accelerator',
session=session,
endpoint_override='cyborgurl',
api_version=None)
@mock.patch('cyborgclient.common.httpclient.SessionClient')
@mock.patch('keystoneauth1.session.Session')
def test_init_with_cyborg_url_and_endpoint_override(self, mock_session,
http_client):
session = mock.Mock()
client.Client(session=session, cyborg_url='cyborgurl',
endpoint_override='cyborgurl')
mock_session.assert_not_called()
http_client.assert_called_once_with(
interface='public',
region_name=None,
service_name=None,
service_type='accelerator',
session=session,
endpoint_override='cyborgurl',
api_version=None)

View File

@ -0,0 +1,171 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import testtools
from testtools import matchers
from cyborgclient.tests.unit import utils
from cyborgclient.v1 import deployables
DEPLOYABLE1 = {
"instance_uuid": "", "assignable": True,
"vendor": "test_vendor1",
"parent_uuid": "1859ef92-90c9-462e-ba78-3fd8295aa390",
"links": [
{"href": "http://127.0.0.1:6666/v1/deployables/fake_uuid",
"rel": "self"
},
{"href": "http://127.0.0.1:6666/deployables/fake_uuid",
"rel": "bookmark"}],
"updated_at": "2018-07-27T13:07:14+00:00",
"interface_type": "pci",
"uuid": "1859ef92-90c9-462e-ba78-3fd8295aa390",
"name": "test_name1", "host": "host_test1",
"version": "1", "board": "test_board1",
"address": "test_addr1", "created_at": None,
"type": "pf", "availability": "1",
"root_uuid": "1859ef92-90c9-462e-ba78-3fd8295aa390"
}
DEPLOYABLE2 = {
"instance_uuid": None, "assignable": False,
"vendor": "test_vendor2",
"parent_uuid": "5a7dfaf9-7f4e-42d0-bfac-b2f464110d9f",
"links": [
{"href": "http://127.0.0.1:6666/v1/deployables/fake_uuid",
"rel": "self"},
{"href": "http://127.0.0.1:6666/deployables/fake_uuid",
"rel": "bookmark"}],
"updated_at": "2018-07-27T19:51:18+00:00",
"interface_type": "mdev",
"uuid": "5a7dfaf9-7f4e-42d0-bfac-b2f464110d9f",
"name": "test_name2", "host": "host_test2", "version": "2",
"board": "test_board2", "address": "test_addr2",
"created_at": None, "type": "vf", "availability": "1",
"root_uuid": "5a7dfaf9-7f4e-42d0-bfac-b2f464110d9f"
}
ALLOC_DEPLOYABLE1 = {
"instance_uuid": None, "assignable": False,
"vendor": "test_vendor2",
"parent_uuid": "5a7dfaf9-7f4e-42d0-bfac-b2f464110d9f",
"links": [
{"href": "http://127.0.0.1:6666/v1/deployables/fake_uuid",
"rel": "self"},
{"href": "http://127.0.0.1:6666/deployables/fake_uuid",
"rel": "bookmark"}],
"updated_at": "2018-07-27T19:51:18+00:00",
"interface_type": "mdev",
"uuid": "5a7dfaf9-7f4e-42d0-bfac-b2f464110d9f",
"name": "test_name2", "host": "host_test2", "version": "2",
"board": "test_board2", "address": "test_addr2",
"created_at": None, "type": "vf", "availability": "1",
"root_uuid": "5a7dfaf9-7f4e-42d0-bfac-b2f464110d9f"
}
fake_responses = {
'/v1/accelerators/deployables':
{
'GET': (
{},
{'deployables': [DEPLOYABLE1, DEPLOYABLE2]},
)
},
'/v1/accelerators/deployables/%s' % ALLOC_DEPLOYABLE1["uuid"]:
{
'PATCH': (
{"instance_uuid": "fake_instance_uuid"},
ALLOC_DEPLOYABLE1,
),
},
'/v1/accelerators/deployables?filters.field=limit&filters.value=2':
{
'GET': (
{},
{'deployables': [DEPLOYABLE1, DEPLOYABLE2]},
)
},
'/v1/accelerators/deployables?filters.field=marker&filters.value=%s' %
DEPLOYABLE1['uuid']:
{
'GET': (
{},
{'deployables': [DEPLOYABLE1, DEPLOYABLE2]},
)
}
}
class DeployableManagerTest(testtools.TestCase):
def setUp(self):
super(DeployableManagerTest, self).setUp()
self.api = utils.FakeAPI(fake_responses)
self.mgr = deployables.DeployableManager(self.api)
def test_deployables_list(self):
deployables = self.mgr.list()
expect = [
('GET', '/v1/accelerators/deployables', {}, None),
]
self.assertEqual(expect, self.api.calls)
self.assertThat(deployables, matchers.HasLength(2))
def _test_deployables_list_with_filters(self, limit=None, marker=None,
sort_key=None, sort_dir=None,
expect=[], **add_filters):
deployables_filter = self.mgr.list(limit=limit, marker=marker,
sort_key=sort_key,
sort_dir=sort_dir, **add_filters)
self.assertEqual(expect, self.api.calls)
self.assertThat(deployables_filter, matchers.HasLength(2))
def test_deployables_list_with_limit(self):
expect = [
('GET', '/v1/accelerators/deployables?filters.field='
'limit&filters.value=2', {}, None),
]
self._test_deployables_list_with_filters(
limit=2,
expect=expect)
def test_deployables_list_with_marker(self):
expect = [
('GET', '/v1/accelerators/deployables?filters.field=marker&'
'filters.value=%s' % DEPLOYABLE1['uuid'],
{}, None),
]
self._test_deployables_list_with_filters(
marker=DEPLOYABLE1['uuid'],
expect=expect)
def test_deployables_allocation(self):
dep_for_alloc = copy.deepcopy(ALLOC_DEPLOYABLE1)
self.mgr.allocation(dep_for_alloc["uuid"], "fake_instance_uuid")
expect = [
('PATCH', '/v1/accelerators/deployables/%s' %
dep_for_alloc["uuid"],
{}, [{'op': 'replace',
'path': '/instance_uuid',
'value': 'fake_instance_uuid'}])
]
self.assertEqual(expect, self.api.calls)
def test_deployables_deallocation(self):
dep_for_dealloc = copy.deepcopy(ALLOC_DEPLOYABLE1)
self.mgr.deallocation(dep_for_dealloc["uuid"])
expect = [
('PATCH', '/v1/accelerators/deployables/%s' %
dep_for_dealloc["uuid"],
{}, [{'op': 'replace',
'path': '/instance_uuid',
'value': None}])
]
self.assertEqual(expect, self.api.calls)

View File

@ -40,12 +40,14 @@ class BaseModelManager(base.Manager):
base_url = ''
@classmethod
def _path(cls, id=None):
def _path(cls, id=None, filters=None):
if filters:
return '/v1/' + cls.base_url + filters
return '/v1/' + cls.base_url + \
'/%s' % id if id else '/v1/' + cls.base_url
def list(self, limit=None, marker=None, sort_key=None,
sort_dir=None, detail=False):
sort_dir=None, detail=False, **add_filters):
"""Retrieve a list of accelerators.
:param marker: Optional, the UUID of a baymodel, eg the last
@ -75,17 +77,18 @@ class BaseModelManager(base.Manager):
limit = int(limit)
filters = utils.common_filters(marker, limit, sort_key, sort_dir)
utils.add_filters(filters, **add_filters)
path = ''
if detail:
path += 'detail'
if filters:
path += '?' + '&'.join(filters)
if limit is None:
return self._list(self._path(path), self.__class__.api_name)
return self._list(self._path(filters=path),
self.__class__.api_name)
else:
return self._list_pagination(self._path(path),
return self._list_pagination(self._path(filters=path),
self.__class__.api_name,
limit=limit)

View File

@ -42,3 +42,29 @@ class DeployableManager(basemodels.BaseModelManager):
"value": instance_uuid}]
resp = self.update(deployable_uuid, body)
return resp
def list(self, limit=None, marker=None, sort_key=None,
sort_dir=None, **add_filters):
"""List accelerators.
:param limit:The maximum number of results to return per
request, if:
1) limit > 0, the maximum number of accelerators to return.
2) limit == 0, return the entire list of accelerators.
3) limit param is NOT specified (None), the number of items
returned respect the maximum imposed by the Cyborg API
(see Cyborg's api.max_limit option).
:param sort_key: Optional, field used for sorting.
:param sort_dir: Optional, direction of sorting, either 'asc' (the
default) or 'desc'.
:param extra_filters: Optional, additional filter parameters for query
deployable, such as interface_type=pci, host=node-1.
:return:A list of accelerators.
"""
res = super(DeployableManager, self).list(limit=limit, marker=marker,
sort_key=sort_key,
sort_dir=sort_dir,
**add_filters)
return res

View File

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

View File

@ -5,6 +5,7 @@ skipsdist = True
[testenv]
usedevelop = True
whitelist_externals = rm
install_command = pip install -c{env:UPPER_CONSTRAINTS_FILE:https://git.openstack.org/cgit/openstack/requirements/plain/upper-constraints.txt} {opts} {packages}
setenv =
VIRTUAL_ENV={envdir}
@ -12,7 +13,8 @@ setenv =
OS_STDOUT_CAPTURE=1
OS_STDERR_CAPTURE=1
OS_TEST_TIMEOUT=60
deps = -r{toxinidir}/test-requirements.txt
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands = stestr run {posargs}
[testenv:pep8]