Browse Source

Move all nsxlib code and tests to vmware_nsxlib

Change-Id: I75533e713a680674368d16f0a7aeb4fdbffe3608
changes/17/380817/8
Adit Sarfaty 5 years ago
parent
commit
e9ddc3dd33
  1. 7
      .testr.conf
  2. 12
      requirements.txt
  3. 260
      run_tests.sh
  4. 18
      test-requirements.txt
  5. 0
      tools/__init__.py
  6. 9
      tools/ostestr_compat_shim.sh
  7. 116
      tox.ini
  8. 43
      vmware_nsxlib/_i18n.py
  9. 28
      vmware_nsxlib/tests/test_vmware_nsxlib.py
  10. 0
      vmware_nsxlib/tests/unit/__init__.py
  11. 0
      vmware_nsxlib/tests/unit/v3/__init__.py
  12. 231
      vmware_nsxlib/tests/unit/v3/mocks.py
  13. 322
      vmware_nsxlib/tests/unit/v3/nsxlib_testcase.py
  14. 308
      vmware_nsxlib/tests/unit/v3/test_client.py
  15. 205
      vmware_nsxlib/tests/unit/v3/test_cluster.py
  16. 162
      vmware_nsxlib/tests/unit/v3/test_constants.py
  17. 178
      vmware_nsxlib/tests/unit/v3/test_qos_switching_profile.py
  18. 514
      vmware_nsxlib/tests/unit/v3/test_resources.py
  19. 83
      vmware_nsxlib/tests/unit/v3/test_switch.py
  20. 204
      vmware_nsxlib/tests/unit/v3/test_utils.py
  21. 437
      vmware_nsxlib/v3/__init__.py
  22. 208
      vmware_nsxlib/v3/client.py
  23. 493
      vmware_nsxlib/v3/cluster.py
  24. 123
      vmware_nsxlib/v3/config.py
  25. 97
      vmware_nsxlib/v3/exceptions.py
  26. 64
      vmware_nsxlib/v3/native_dhcp.py
  27. 150
      vmware_nsxlib/v3/ns_group_manager.py
  28. 96
      vmware_nsxlib/v3/nsx_constants.py
  29. 576
      vmware_nsxlib/v3/resources.py
  30. 194
      vmware_nsxlib/v3/router.py
  31. 529
      vmware_nsxlib/v3/security.py
  32. 213
      vmware_nsxlib/v3/utils.py
  33. 17
      vmware_nsxlib/version.py

7
.testr.conf

@ -1,7 +1,4 @@
[DEFAULT]
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \
OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \
OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \
${PYTHON:-python} -m subunit.run discover -t ./ . $LISTOPT $IDOPTION
test_command=OS_STDOUT_CAPTURE=1 OS_STDERR_CAPTURE=1 OS_LOG_CAPTURE=1 ${PYTHON:-python} -m subunit.run discover -t ./ ${OS_TEST_PATH:-./vmware_nsxlib/tests/unit} $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list
test_list_option=--list

12
requirements.txt

@ -3,3 +3,15 @@
# process, which may cause wedges in the gate later.
pbr>=1.6 # Apache-2.0
enum34;python_version=='2.7' or python_version=='2.6' or python_version=='3.3' # BSD
eventlet!=0.18.3,>=0.18.2 # MIT
netaddr!=0.7.16,>=0.7.13 # BSD
retrying!=1.3.0,>=1.2.3 # Apache-2.0
six>=1.9.0 # MIT
neutron-lib>=0.4.0 # Apache-2.0
oslo.i18n>=2.1.0 # Apache-2.0
oslo.log>=3.11.0 # Apache-2.0
oslo.serialization>=1.10.0 # Apache-2.0
oslo.service>=1.10.0 # Apache-2.0
oslo.utils>=3.16.0 # Apache-2.0

260
run_tests.sh

@ -0,0 +1,260 @@
#!/usr/bin/env bash
set -eu
function usage {
echo "Usage: $0 [OPTION]..."
echo "Run Neutron's test suite(s)"
echo ""
echo " -V, --virtual-env Always use virtualenv. Install automatically if not present"
echo " -N, --no-virtual-env Don't use virtualenv. Run tests in local environment"
echo " -s, --no-site-packages Isolate the virtualenv from the global Python environment"
echo " -r, --recreate-db Recreate the test database (deprecated, as this is now the default)."
echo " -n, --no-recreate-db Don't recreate the test database."
echo " -f, --force Force a clean re-build of the virtual environment. Useful when dependencies have been added."
echo " -u, --update Update the virtual environment with any newer package versions"
echo " -p, --pep8 Just run PEP8 and HACKING compliance check"
echo " -8, --pep8-only-changed [<basecommit>]"
echo " Just run PEP8 and HACKING compliance check on files changed since HEAD~1 (or <basecommit>)"
echo " -P, --no-pep8 Don't run static code checks"
echo " -c, --coverage Generate coverage report"
echo " -d, --debug Run tests with testtools instead of testr. This allows you to use the debugger."
echo " -h, --help Print this usage message"
echo " --virtual-env-path <path> Location of the virtualenv directory"
echo " Default: \$(pwd)"
echo " --virtual-env-name <name> Name of the virtualenv directory"
echo " Default: .venv"
echo " --tools-path <dir> Location of the tools directory"
echo " Default: \$(pwd)"
echo ""
echo "Note: with no options specified, the script will try to run the tests in a virtual environment,"
echo " If no virtualenv is found, the script will ask if you would like to create one. If you "
echo " prefer to run tests NOT in a virtual environment, simply pass the -N option."
exit
}
function process_options {
i=1
while [ $i -le $# ]; do
case "${!i}" in
-h|--help) usage;;
-V|--virtual-env) always_venv=1; never_venv=0;;
-N|--no-virtual-env) always_venv=0; never_venv=1;;
-s|--no-site-packages) no_site_packages=1;;
-r|--recreate-db) recreate_db=1;;
-n|--no-recreate-db) recreate_db=0;;
-f|--force) force=1;;
-u|--update) update=1;;
-p|--pep8) just_pep8=1;;
-8|--pep8-only-changed) just_pep8_changed=1;;
-P|--no-pep8) no_pep8=1;;
-c|--coverage) coverage=1;;
-d|--debug) debug=1;;
--virtual-env-path)
(( i++ ))
venv_path=${!i}
;;
--virtual-env-name)
(( i++ ))
venv_dir=${!i}
;;
--tools-path)
(( i++ ))
tools_path=${!i}
;;
-*) testopts="$testopts ${!i}";;
*) testargs="$testargs ${!i}"
esac
(( i++ ))
done
}
tool_path=${tools_path:-$(pwd)}
venv_path=${venv_path:-$(pwd)}
venv_dir=${venv_name:-.venv}
with_venv=tools/with_venv.sh
always_venv=0
never_venv=0
force=0
no_site_packages=0
installvenvopts=
testargs=
testopts=
wrapper=""
just_pep8=0
just_pep8_changed=0
no_pep8=0
coverage=0
debug=0
recreate_db=1
update=0
LANG=en_US.UTF-8
LANGUAGE=en_US:en
LC_ALL=C
process_options $@
# Make our paths available to other scripts we call
export venv_path
export venv_dir
export venv_name
export tools_dir
export venv=${venv_path}/${venv_dir}
if [ $no_site_packages -eq 1 ]; then
installvenvopts="--no-site-packages"
fi
function run_tests {
# Cleanup *pyc
${wrapper} find . -type f -name "*.pyc" -delete
if [ $debug -eq 1 ]; then
if [ "$testopts" = "" ] && [ "$testargs" = "" ]; then
# Default to running all tests if specific test is not
# provided.
testargs="discover ./vmware_nsxlib/tests"
fi
${wrapper} python -m testtools.run $testopts $testargs
# Short circuit because all of the testr and coverage stuff
# below does not make sense when running testtools.run for
# debugging purposes.
return $?
fi
if [ $coverage -eq 1 ]; then
TESTRTESTS="$TESTRTESTS --coverage"
else
TESTRTESTS="$TESTRTESTS --slowest"
fi
# Just run the test suites in current environment
set +e
testargs=`echo "$testargs" | sed -e's/^\s*\(.*\)\s*$/\1/'`
TESTRTESTS="$TESTRTESTS --testr-args='--subunit $testopts $testargs'"
OS_TEST_PATH=`echo $testargs|grep -o 'vmware_nsxlib\neutron\.tests[^[:space:]:]\+'|tr . /`
if [ -n "$OS_TEST_PATH" ]; then
os_test_dir=$(dirname "$OS_TEST_PATH")
else
os_test_dir=''
fi
if [ -d "$OS_TEST_PATH" ]; then
wrapper="OS_TEST_PATH=$OS_TEST_PATH $wrapper"
elif [ -d "$os_test_dir" ]; then
wrapper="OS_TEST_PATH=$os_test_dir $wrapper"
fi
echo "Running \`${wrapper} $TESTRTESTS\`"
bash -c "${wrapper} $TESTRTESTS | ${wrapper} subunit2pyunit"
RESULT=$?
set -e
copy_subunit_log
if [ $coverage -eq 1 ]; then
echo "Generating coverage report in covhtml/"
# Don't compute coverage for common code, which is tested elsewhere
${wrapper} coverage combine
${wrapper} coverage html --include='neutron/*' --omit='neutron/openstack/common/*' -d covhtml -i
fi
return $RESULT
}
function copy_subunit_log {
LOGNAME=`cat .testrepository/next-stream`
LOGNAME=$(($LOGNAME - 1))
LOGNAME=".testrepository/${LOGNAME}"
cp $LOGNAME subunit.log
}
function warn_on_flake8_without_venv {
if [ $never_venv -eq 1 ]; then
echo "**WARNING**:"
echo "Running flake8 without virtual env may miss OpenStack HACKING detection"
fi
}
function run_pep8 {
echo "Running flake8 ..."
warn_on_flake8_without_venv
${wrapper} flake8
}
function run_pep8_changed {
# NOTE(gilliard) We want use flake8 to check the entirety of every file that has
# a change in it. Unfortunately the --filenames argument to flake8 only accepts
# file *names* and there are no files named (eg) "nova/compute/manager.py". The
# --diff argument behaves surprisingly as well, because although you feed it a
# diff, it actually checks the file on disk anyway.
local target=${testargs:-HEAD~1}
local files=$(git diff --name-only $target | tr '\n' ' ')
echo "Running flake8 on ${files}"
warn_on_flake8_without_venv
diff -u --from-file /dev/null ${files} | ${wrapper} flake8 --diff
}
TESTRTESTS="python setup.py testr"
if [ $never_venv -eq 0 ]
then
# Remove the virtual environment if --force used
if [ $force -eq 1 ]; then
echo "Cleaning virtualenv..."
rm -rf ${venv}
fi
if [ $update -eq 1 ]; then
echo "Updating virtualenv..."
python tools/install_venv.py $installvenvopts
fi
if [ -e ${venv} ]; then
wrapper="${with_venv}"
else
if [ $always_venv -eq 1 ]; then
# Automatically install the virtualenv
python tools/install_venv.py $installvenvopts
wrapper="${with_venv}"
else
echo -e "No virtual environment found...create one? (Y/n) \c"
read use_ve
if [ "x$use_ve" = "xY" -o "x$use_ve" = "x" -o "x$use_ve" = "xy" ]; then
# Install the virtualenv and run the test suite in it
python tools/install_venv.py $installvenvopts
wrapper=${with_venv}
fi
fi
fi
fi
# Delete old coverage data from previous runs
if [ $coverage -eq 1 ]; then
${wrapper} coverage erase
fi
if [ $just_pep8 -eq 1 ]; then
run_pep8
exit
fi
if [ $just_pep8_changed -eq 1 ]; then
run_pep8_changed
exit
fi
if [ $recreate_db -eq 1 ]; then
rm -f tests.sqlite
fi
run_tests
# NOTE(sirp): we only want to run pep8 when we're running the full-test suite,
# not when we're running tests individually. To handle this, we need to
# distinguish between options (testopts), which begin with a '-', and
# arguments (testargs).
if [ -z "$testargs" ]; then
if [ $no_pep8 -eq 0 ]; then
run_pep8
fi
fi

18
test-requirements.txt

@ -5,13 +5,21 @@
hacking<0.12,>=0.11.0 # Apache-2.0
coverage>=3.6 # Apache-2.0
fixtures>=3.0.0 # Apache-2.0/BSD
mock>=2.0 # BSD
python-subunit>=0.0.18 # Apache-2.0/BSD
sphinx!=1.3b1,<1.3,>=1.2.1 # BSD
oslosphinx!=3.4.0,>=2.5.0 # Apache-2.0
sphinx!=1.3b1,<1.4,>=1.2.1 # BSD
oslosphinx>=4.7.0 # Apache-2.0
oslotest>=1.10.0 # Apache-2.0
testrepository>=0.0.18 # Apache-2.0/BSD
testscenarios>=0.4 # Apache-2.0/BSD
testresources>=0.2.4 # Apache-2.0/BSD
testtools>=1.4.0 # MIT
# releasenotes
testscenarios>=0.4 # Apache-2.0/BSD
WebTest>=2.0 # MIT
# This is needed for subunit-trace
tempest-lib>=0.14.0 # Apache-2.0
reno>=1.8.0 # Apache2
bandit>=1.1.0 # Apache-2.0
tempest>=12.1.0 # Apache-2.0
pylint==1.4.5 # GPLv2
requests-mock>=1.1 # Apache-2.0

0
tools/__init__.py

9
tools/ostestr_compat_shim.sh

@ -0,0 +1,9 @@
#!/bin/sh
# Copied from neutron/tools. Otherwise no units tests are found.
# preserve old behavior of using an arg as a regex when '--' is not present
case $@ in
(*--*) ostestr $@;;
('') ostestr;;
(*) ostestr --regex "$@"
esac

116
tox.ini

@ -1,39 +1,109 @@
[tox]
minversion = 2.0
envlist = py34,py27,pypy,pep8
envlist = py35,py34,py27,pep8,docs
minversion = 1.6
skipsdist = True
[testenv]
setenv = VIRTUAL_ENV={envdir}
PYTHONWARNINGS=default::DeprecationWarning
passenv = TRACE_FAILONLY GENERATE_HASHES http_proxy HTTP_PROXY https_proxy HTTPS_PROXY no_proxy NO_PROXY
usedevelop = True
install_command = pip install -U {opts} {packages}
setenv =
VIRTUAL_ENV={envdir}
deps = -r{toxinidir}/test-requirements.txt
commands = python setup.py test --slowest --testr-args='{posargs}'
install_command =
pip install -U -c{env:UPPER_CONSTRAINTS_FILE:https://git.openstack.org/cgit/openstack/requirements/plain/upper-constraints.txt} {opts} {packages}
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
whitelist_externals = sh
commands =
{toxinidir}/tools/ostestr_compat_shim.sh {posargs}
# there is also secret magic in ostestr which lets you run in a fail only
# mode. To do this define the TRACE_FAILONLY environmental variable.
[testenv:pep8]
commands = flake8 {posargs}
[testenv:common]
# Fake job to define environment variables shared between dsvm/non-dsvm jobs
setenv = OS_TEST_TIMEOUT=180
commands = false
[testenv:venv]
commands = {posargs}
[testenv:functional]
basepython = python2.7
setenv = {[testenv]setenv}
{[testenv:common]setenv}
OS_TEST_PATH=./vmware_nsxlib/tests/functional
OS_LOG_PATH={env:OS_LOG_PATH:/opt/stack/logs}
deps =
{[testenv]deps}
-r{toxinidir}/vmware_nsxlib/tests/functional/requirements.txt
[testenv:cover]
commands = python setup.py test --coverage --testr-args='{posargs}'
[testenv:dsvm-functional]
basepython = python2.7
setenv = OS_SUDO_TESTING=1
OS_FAIL_ON_MISSING_DEPS=1
OS_TEST_TIMEOUT=180
sitepackages=True
deps =
{[testenv:functional]deps}
commands =
[testenv:docs]
commands = python setup.py build_sphinx
[tox:jenkins]
sitepackages = True
[testenv:releasenotes]
commands = sphinx-build -a -E -W -d releasenotes/build/doctrees -b html releasenotes/source releasenotes/build/html
[testenv:pep8]
basepython = python2.7
deps =
{[testenv]deps}
commands =
sphinx-build -a -E -W -d releasenotes/build/doctrees -b html releasenotes/source releasenotes/build/html
# Checks for coding and style guidelines
flake8
{[testenv:genconfig]commands}
whitelist_externals =
sh
bash
[testenv:debug]
commands = oslo_debug_helper {posargs}
[testenv:bandit]
deps = -r{toxinidir}/test-requirements.txt
commands = bandit -r vmware_nsxlib -n 5 -ll
[flake8]
# E123, E125 skipped as they are invalid PEP-8.
[testenv:cover]
basepython = python2.7
commands =
python setup.py testr --coverage --testr-args='{posargs}'
coverage report
[testenv:venv]
commands = {posargs}
[testenv:docs]
commands = sphinx-build -W -b html doc/source doc/build/html
show-source = True
ignore = E123,E125
[flake8]
# E125 continuation line does not distinguish itself from next logical line
# E126 continuation line over-indented for hanging indent
# E128 continuation line under-indented for visual indent
# E129 visually indented line with same indent as next logical line
# E265 block comment should start with ‘# ‘
# H305 imports not grouped correctly
# H307 like imports should be grouped together
# H402 one line docstring needs punctuation
# H404 multi line docstring should start with a summary
# H405 multi line docstring summary not separated with an empty line
# H904 Wrap long lines in parentheses instead of a backslash
# TODO(dougwig) -- uncomment this to test for remaining linkages
# N530 direct neutron imports not allowed
ignore = E125,E126,E128,E129,E265,H305,H307,H402,H404,H405,H904,N530
show-source = true
builtins = _
exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build
exclude = .venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build,.ropeproject
[hacking]
import_exceptions = vmware_nsxlib._i18n,
vmware_nsxlib_tempest._i18n
local-check-factory = neutron_lib.hacking.checks.factory
[testenv:genconfig]
commands =
[testenv:uuidgen]
commands =
check-uuid --fix

43
vmware_nsxlib/_i18n.py

@ -0,0 +1,43 @@
# Copyright (c) 2015 VMware, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import oslo_i18n
DOMAIN = "vmware_nsxlib"
_translators = oslo_i18n.TranslatorFactory(domain=DOMAIN)
# The primary translation function using the well-known name "_"
_ = _translators.primary
# The contextual translation function using the name "_C"
_C = _translators.contextual_form
# The plural translation function using the name "_P"
_P = _translators.plural_form
# Translators for log levels.
#
# The abbreviated names are meant to reflect the usual use of a short
# name like '_'. The "L" is for "log" and the other letter comes from
# the level.
_LI = _translators.log_info
_LW = _translators.log_warning
_LE = _translators.log_error
_LC = _translators.log_critical
def get_available_languages():
return oslo_i18n.get_available_languages(DOMAIN)

28
vmware_nsxlib/tests/test_vmware_nsxlib.py

@ -1,28 +0,0 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
test_vmware_nsxlib
----------------------------------
Tests for `vmware_nsxlib` module.
"""
from vmware_nsxlib.tests import base
class TestVmware_nsxlib(base.TestCase):
def test_something(self):
pass

0
vmware_nsxlib/tests/unit/__init__.py

0
vmware_nsxlib/tests/unit/v3/__init__.py

231
vmware_nsxlib/tests/unit/v3/mocks.py

@ -0,0 +1,231 @@
# Copyright (c) 2015 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import requests
import six.moves.urllib.parse as urlparse
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from vmware_nsxlib.v3 import nsx_constants
FAKE_NAME = "fake_name"
DEFAULT_TIER0_ROUTER_UUID = "efad0078-9204-4b46-a2d8-d4dd31ed448f"
NSX_BRIDGE_CLUSTER_NAME = 'default bridge cluster'
FAKE_MANAGER = "fake_manager_ip"
def make_fake_switch(switch_uuid=None, tz_uuid=None, name=FAKE_NAME):
if not switch_uuid:
switch_uuid = uuidutils.generate_uuid()
if not tz_uuid:
tz_uuid = uuidutils.generate_uuid()
fake_switch = {
"id": switch_uuid,
"display_name": name,
"resource_type": "LogicalSwitch",
"address_bindings": [],
"transport_zone_id": tz_uuid,
"replication_mode": nsx_constants.MTEP,
"admin_state": nsx_constants.ADMIN_STATE_UP,
"vni": 50056,
"switching_profile_ids": [
{
"value": "64814784-7896-3901-9741-badeff705639",
"key": "IpDiscoverySwitchingProfile"
},
{
"value": "fad98876-d7ff-11e4-b9d6-1681e6b88ec1",
"key": "SpoofGuardSwitchingProfile"
},
{
"value": "93b4b7e8-f116-415d-a50c-3364611b5d09",
"key": "PortMirroringSwitchingProfile"
},
{
"value": "fbc4fb17-83d9-4b53-a286-ccdf04301888",
"key": "SwitchSecuritySwitchingProfile"
},
{
"value": "f313290b-eba8-4262-bd93-fab5026e9495",
"key": "QosSwitchingProfile"
}
],
}
return fake_switch
def make_fake_dhcp_profile():
return {"id": uuidutils.generate_uuid(),
"edge_cluster_id": uuidutils.generate_uuid(),
"edge_cluster_member_indexes": [0, 1]}
def make_fake_metadata_proxy():
return {"id": uuidutils.generate_uuid(),
"metadata_server_url": "http://1.2.3.4",
"secret": "my secret",
"edge_cluster_id": uuidutils.generate_uuid(),
"edge_cluster_member_indexes": [0, 1]}
class MockRequestsResponse(object):
def __init__(self, status_code, content=None):
self.status_code = status_code
self.content = content
def json(self):
return jsonutils.loads(self.content)
class MockRequestSessionApi(object):
def __init__(self):
self._store = {}
def _format_uri(self, uri):
uri = urlparse.urlparse(uri).path
while uri.endswith('/'):
uri = uri[:-1]
while uri.startswith('/'):
uri = uri[1:]
if not self._is_uuid_uri(uri):
uri = "%s/" % uri
return uri
def _is_uuid_uri(self, uri):
return uuidutils.is_uuid_like(
urlparse.urlparse(uri).path.split('/')[-1])
def _query(self, search_key, copy=True):
items = []
for uri, obj in self._store.items():
if uri.startswith(search_key):
items.append(obj.copy() if copy else obj)
return items
def _build_response(self, url, content=None,
status=requests.codes.ok, **kwargs):
if isinstance(content, list):
content = {
'result_count': len(content),
'results': content
}
if (content is not None and kwargs.get('headers', {}).get(
'Content-Type') == 'application/json'):
content = jsonutils.dumps(content)
return MockRequestsResponse(status, content=content)
def _get_content(self, **kwargs):
content = kwargs.get('data', None)
if content and kwargs.get('headers', {}).get(
'Content-Type') == 'application/json':
content = jsonutils.loads(content)
return content
def get(self, url, **kwargs):
url = self._format_uri(url)
if self._is_uuid_uri(url):
item = self._store.get(url)
code = requests.codes.ok if item else requests.codes.not_found
return self._build_response(
url, content=item, status=code, **kwargs)
return self._build_response(
url, content=self._query(url), status=requests.codes.ok, **kwargs)
def _create(self, url, content, **kwargs):
resource_id = content.get('id')
if resource_id and self._store.get("%s%s" % (url, resource_id)):
return self._build_response(
url, content=None, status=requests.codes.bad, **kwargs)
resource_id = resource_id or uuidutils.generate_uuid()
content['id'] = resource_id
self._store["%s%s" % (url, resource_id)] = content.copy()
return content
def post(self, url, **kwargs):
parsed_url = urlparse.urlparse(url)
url = self._format_uri(url)
if self._is_uuid_uri(url):
if self._store.get(url) is None:
return self._build_response(
url, content=None, status=requests.codes.bad, **kwargs)
body = self._get_content(**kwargs)
if body is None:
return self._build_response(
url, content=None, status=requests.codes.bad, **kwargs)
response_content = None
url_queries = urlparse.parse_qs(parsed_url.query)
if 'create_multiple' in url_queries.get('action', []):
response_content = {}
for resource_name, resource_body in body.items():
for new_resource in resource_body:
created_resource = self._create(
url, new_resource, **kwargs)
if response_content.get(resource_name, None) is None:
response_content[resource_name] = []
response_content[resource_name].append(created_resource)
else:
response_content = self._create(url, body, **kwargs)
if isinstance(response_content, MockRequestsResponse):
return response_content
return self._build_response(
url, content=response_content,
status=requests.codes.created, **kwargs)
def put(self, url, **kwargs):
url = self._format_uri(url)
item = {}
if self._is_uuid_uri(url):
item = self._store.get(url, None)
if item is None:
return self._build_response(
url, content=None,
status=requests.codes.not_found, **kwargs)
body = self._get_content(**kwargs)
if body is None:
return self._build_response(
url, content=None, status=requests.codes.bad, **kwargs)
item.update(body)
self._store[url] = item
return self._build_response(
url, content=item, status=requests.codes.ok, **kwargs)
def delete(self, url, **kwargs):
url = self._format_uri(url)
if not self._store.get(url):
return self._build_response(
url, content=None, status=requests.codes.not_found, **kwargs)
del self._store[url]
return self._build_response(
url, content=None, status=requests.codes.ok, **kwargs)

322
vmware_nsxlib/tests/unit/v3/nsxlib_testcase.py

@ -0,0 +1,322 @@
# Copyright (c) 2015 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import copy
import mock
import unittest
from oslo_utils import uuidutils
from requests import exceptions as requests_exceptions
from vmware_nsxlib import v3
from vmware_nsxlib.v3 import client as nsx_client
from vmware_nsxlib.v3 import cluster as nsx_cluster
from vmware_nsxlib.v3 import config
NSX_USER = 'admin'
NSX_PASSWORD = 'default'
NSX_MANAGER = '1.2.3.4'
NSX_INSECURE = False
NSX_CERT = '/opt/stack/certs/nsx.pem'
NSX_HTTP_RETRIES = 10
NSX_HTTP_TIMEOUT = 10
NSX_HTTP_READ_TIMEOUT = 180
NSX_CONCURENT_CONN = 10
NSX_CONN_IDLE_TIME = 10
NSX_MAX_ATTEMPTS = 10
PLUGIN_SCOPE = "plugin scope"
PLUGIN_TAG = "plugin tag"
PLUGIN_VER = "plugin ver"
def _mock_nsxlib():
def _return_id_key(*args, **kwargs):
return {'id': uuidutils.generate_uuid()}
def _mock_add_rules_in_section(*args):
# NOTE(arosen): the code in the neutron plugin expects the
# neutron rule id as the display_name.
rules = args[0]
return {
'rules': [
{'display_name': rule['display_name'],
'id': uuidutils.generate_uuid()}
for rule in rules
]}
mock.patch(
"vmware_nsxlib.v3.cluster.NSXRequestsHTTPProvider"
".validate_connection").start()
mock.patch(
"vmware_nsxlib.v3.security.NsxLibNsGroup.create",
side_effect=_return_id_key
).start()
mock.patch(
"vmware_nsxlib.v3.security.NsxLibFirewallSection.create_empty",
side_effect=_return_id_key).start()
mock.patch(
"vmware_nsxlib.v3.security.NsxLibFirewallSection.init_default",
side_effect=_return_id_key).start()
mock.patch(
"vmware_nsxlib.v3.security.NsxLibNsGroup.list").start()
mock.patch(
"vmware_nsxlib.v3.security.NsxLibFirewallSection.add_rules",
side_effect=_mock_add_rules_in_section).start()
mock.patch(
"vmware_nsxlib.v3.NsxLibTransportZone.get_id_by_name_or_id",
side_effect=_return_id_key).start()
mock.patch(
"vmware_nsxlib.v3.NsxLib.get_version",
return_value='1.1.0').start()
def get_default_nsxlib_config():
return config.NsxLibConfig(
username=NSX_USER,
password=NSX_PASSWORD,
retries=NSX_HTTP_RETRIES,
insecure=NSX_INSECURE,
ca_file=NSX_CERT,
concurrent_connections=NSX_CONCURENT_CONN,
http_timeout=NSX_HTTP_TIMEOUT,
http_read_timeout=NSX_HTTP_READ_TIMEOUT,
conn_idle_timeout=NSX_CONN_IDLE_TIME,
http_provider=None,
nsx_api_managers=[],
plugin_scope=PLUGIN_SCOPE,
plugin_tag=PLUGIN_TAG,
plugin_ver=PLUGIN_VER)
class NsxLibTestCase(unittest.TestCase):
def setUp(self, *args, **kwargs):
super(NsxLibTestCase, self).setUp()
_mock_nsxlib()
nsxlib_config = get_default_nsxlib_config()
self.nsxlib = v3.NsxLib(nsxlib_config)
# print diffs when assert comparisons fail
self.maxDiff = None
class MemoryMockAPIProvider(nsx_cluster.AbstractHTTPProvider):
"""Acts as a HTTP provider for mocking which is backed
by a MockRequestSessionApi.
"""
def __init__(self, mock_session_api):
self._store = mock_session_api
@property
def provider_id(self):
return "Memory mock API"
def validate_connection(self, cluster_api, endpoint, conn):
return
def new_connection(self, cluster_api, provider):
# all callers use the same backing
return self._store
def is_connection_exception(self, exception):
return isinstance(exception, requests_exceptions.ConnectionError)
class NsxClientTestCase(NsxLibTestCase):
class MockNSXClusteredAPI(nsx_cluster.NSXClusteredAPI):
def __init__(
self, session_response=None,
username=None,
password=None,
retries=None,
insecure=None,
ca_file=None,
concurrent_connections=None,
http_timeout=None,
http_read_timeout=None,
conn_idle_timeout=None,
nsx_api_managers=None):
nsxlib_config = config.NsxLibConfig(
username=username or NSX_USER,
password=password or NSX_PASSWORD,
retries=retries or NSX_HTTP_RETRIES,
insecure=insecure if insecure is not None else NSX_INSECURE,
ca_file=ca_file or NSX_CERT,
concurrent_connections=(concurrent_connections or
NSX_CONCURENT_CONN),
http_timeout=http_timeout or NSX_HTTP_TIMEOUT,
http_read_timeout=http_read_timeout or NSX_HTTP_READ_TIMEOUT,
conn_idle_timeout=conn_idle_timeout or NSX_CONN_IDLE_TIME,
http_provider=NsxClientTestCase.MockHTTPProvider(
session_response=session_response),
nsx_api_managers=nsx_api_managers or [NSX_MANAGER],
plugin_scope=PLUGIN_SCOPE,
plugin_tag=PLUGIN_TAG,
plugin_ver=PLUGIN_VER)
super(NsxClientTestCase.MockNSXClusteredAPI, self).__init__(
nsxlib_config)
self._record = mock.Mock()
def record_call(self, request, **kwargs):
verb = request.method.lower()
# filter out requests specific attributes
checked_kwargs = copy.copy(kwargs)
del checked_kwargs['proxies']
del checked_kwargs['stream']
if 'allow_redirects' in checked_kwargs:
del checked_kwargs['allow_redirects']
for attr in ['url', 'body']:
checked_kwargs[attr] = getattr(request, attr, None)
# remove headers we don't need to verify
checked_kwargs['headers'] = copy.copy(request.headers)
for header in ['Accept-Encoding', 'User-Agent',
'Connection', 'Authorization',
'Content-Length']:
if header in checked_kwargs['headers']:
del checked_kwargs['headers'][header]
checked_kwargs['headers'] = request.headers
# record the call in the mock object
method = getattr(self._record, verb)
method(**checked_kwargs)
def assert_called_once(self, verb, **kwargs):
mock_call = getattr(self._record, verb.lower())
mock_call.assert_called_once_with(**kwargs)
@property
def recorded_calls(self):
return self._record
class MockHTTPProvider(nsx_cluster.NSXRequestsHTTPProvider):
def __init__(self, session_response=None):
super(NsxClientTestCase.MockHTTPProvider, self).__init__()
self._session_response = session_response
def new_connection(self, cluster_api, provider):
# wrapper the session so we can intercept and record calls
session = super(NsxClientTestCase.MockHTTPProvider,
self).new_connection(cluster_api, provider)
mock_adapter = mock.Mock()
session_send = session.send
def _adapter_send(request, **kwargs):
# record calls at the requests HTTP adapter level
mock_response = mock.Mock()
mock_response.history = None
# needed to bypass requests internal checks for mock
mock_response.raw._original_response = {}
# record the request for later verification
cluster_api.record_call(request, **kwargs)
return mock_response
def _session_send(request, **kwargs):
# calls at the Session level
if self._session_response:
# consumer has setup a response for the session
cluster_api.record_call(request, **kwargs)
return (self._session_response()
if hasattr(self._session_response, '__call__')
else self._session_response)
# bypass requests redirect handling for mock
kwargs['allow_redirects'] = False
# session send will end up calling adapter send
return session_send(request, **kwargs)
mock_adapter.send = _adapter_send
session.send = _session_send
def _mock_adapter(*args, **kwargs):
# use our mock adapter rather than requests adapter
return mock_adapter
session.get_adapter = _mock_adapter
return session
def validate_connection(self, cluster_api, endpoint, conn):
assert conn is not None
def mock_nsx_clustered_api(self, session_response=None, **kwargs):
return NsxClientTestCase.MockNSXClusteredAPI(
session_response=session_response, **kwargs)
def mocked_resource(self, resource_class, mock_validate=True,
session_response=None):
mocked = resource_class(nsx_client.NSX3Client(
self.mock_nsx_clustered_api(session_response=session_response),
max_attempts=NSX_MAX_ATTEMPTS))
if mock_validate:
mock.patch.object(mocked._client, '_validate_result').start()
return mocked
def new_mocked_client(self, client_class, mock_validate=True,
session_response=None, mock_cluster=None,
**kwargs):
client = client_class(mock_cluster or self.mock_nsx_clustered_api(
session_response=session_response), **kwargs)
if mock_validate:
mock.patch.object(client, '_validate_result').start()
new_client_for = client.new_client_for
def _new_client_for(*args, **kwargs):
sub_client = new_client_for(*args, **kwargs)
if mock_validate:
mock.patch.object(sub_client, '_validate_result').start()
return sub_client
client.new_client_for = _new_client_for
return client
def new_mocked_cluster(self, conf_managers, validate_conn_func,
concurrent_connections=None):
mock_provider = mock.Mock()
mock_provider.default_scheme = 'https'
mock_provider.validate_connection = validate_conn_func
nsxlib_config = get_default_nsxlib_config()
if concurrent_connections:
nsxlib_config.concurrent_connections = concurrent_connections
nsxlib_config.http_provider = mock_provider
nsxlib_config.nsx_api_managers = conf_managers
return nsx_cluster.NSXClusteredAPI(nsxlib_config)

308
vmware_nsxlib/tests/unit/v3/test_client.py

@ -0,0 +1,308 @@
# Copyright 2015 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import copy
from oslo_log import log
from oslo_serialization import jsonutils
from vmware_nsxlib.tests.unit.v3 import mocks
from vmware_nsxlib.tests.unit.v3 import nsxlib_testcase
from vmware_nsxlib.v3 import client
from vmware_nsxlib.v3 import exceptions as nsxlib_exc
LOG = log.getLogger(__name__)
CLIENT_PKG = 'vmware_nsxlib.v3.client'
DFT_ACCEPT_HEADERS = {
'Accept': '*/*'
}
def _headers(**kwargs):
headers = copy.copy(DFT_ACCEPT_HEADERS)
headers.update(kwargs)
return headers
def assert_call(verb, client_or_resource,
url, verify=nsxlib_testcase.NSX_CERT,
data=None, headers=DFT_ACCEPT_HEADERS,
timeout=(nsxlib_testcase.NSX_HTTP_TIMEOUT,
nsxlib_testcase.NSX_HTTP_READ_TIMEOUT)):
nsx_client = client_or_resource
if getattr(nsx_client, '_client', None) is not None:
nsx_client = nsx_client._client
cluster = nsx_client._conn
cluster.assert_called_once(
verb,
**{'url': url, 'verify': verify, 'body': data,
'headers': headers, 'cert': None, 'timeout': timeout})
def assert_json_call(verb, client_or_resource, url,
verify=nsxlib_testcase.NSX_CERT,
data=None,
headers=client.JSONRESTClient._DEFAULT_HEADERS):
return assert_call(verb, client_or_resource, url,
verify=verify, data=data,
headers=headers)
class NsxV3RESTClientTestCase(nsxlib_testcase.NsxClientTestCase):
def test_client_url_prefix(self):
api = self.new_mocked_client(client.RESTClient,
url_prefix='/cloud/api')
api.list()
assert_call(
'get', api,
'https://1.2.3.4/cloud/api')
api = self.new_mocked_client(client.RESTClient,
url_prefix='/cloud/api')
api.url_list('v1/ports')
assert_call(
'get', api,
'https://1.2.3.4/cloud/api/v1/ports')
def test_client_headers(self):
default_headers = {'Content-Type': 'application/golang'}
api = self.new_mocked_client(
client.RESTClient, default_headers=default_headers,
url_prefix='/v1/api')
api.list()
assert_call(
'get', api,
'https://1.2.3.4/v1/api',
headers=_headers(**default_headers))
api = self.new_mocked_client(
client.RESTClient,
default_headers=default_headers,
url_prefix='/v1/api')
method_headers = {'X-API-Key': 'strong-crypt'}
api.url_list('ports/33', headers=method_headers)
method_headers.update(default_headers)
assert_call(
'get', api,
'https://1.2.3.4/v1/api/ports/33',
headers=_headers(**method_headers))
def test_client_for(self):
api = self.new_mocked_client(client.RESTClient, url_prefix='api/v1/')
sub_api = api.new_client_for('switch/ports')
sub_api.get('11a2b')
assert_call(
'get', sub_api,
'https://1.2.3.4/api/v1/switch/ports/11a2b')
def test_client_list(self):
api = self.new_mocked_client(client.RESTClient,
url_prefix='api/v1/ports')
api.list()
assert_call(
'get', api,
'https://1.2.3.4/api/v1/ports')
def test_client_get(self):
api = self.new_mocked_client(client.RESTClient,
url_prefix='api/v1/ports')
api.get('unique-id')
assert_call(
'get', api,
'https://1.2.3.4/api/v1/ports/unique-id')
def test_client_delete(self):
api = self.new_mocked_client(client.RESTClient,
url_prefix='api/v1/ports')
api.delete('unique-id')
assert_call(
'delete', api,
'https://1.2.3.4/api/v1/ports/unique-id')
def test_client_update(self):
api = self.new_mocked_client(client.RESTClient,
url_prefix='api/v1/ports')
api.update('unique-id', jsonutils.dumps({'name': 'a-new-name'}))
assert_call(
'put', api,
'https://1.2.3.4/api/v1/ports/unique-id',
data=jsonutils.dumps({'name': 'a-new-name'}))
def test_client_create(self):
api = self.new_mocked_client(client.RESTClient,
url_prefix='api/v1/ports')
api.create(body=jsonutils.dumps({'resource-name': 'port1'}))
assert_call(
'post', api,
'https://1.2.3.4/api/v1/ports',
data=jsonutils.dumps({'resource-name': 'port1'}))
def test_client_url_list(self):
api = self.new_mocked_client(client.RESTClient,
url_prefix='api/v1/ports')
json_headers = {'Content-Type': 'application/json'}
api.url_list('/connections', json_headers)
assert_call(
'get', api,
'https://1.2.3.4/api/v1/ports/connections',
headers=_headers(**json_headers))
def test_client_url_get(self):
api = self.new_mocked_client(client.RESTClient,
url_prefix='api/v1/ports')
api.url_get('connections/1')
assert_call(
'get', api,
'https://1.2.3.4/api/v1/ports/connections/1')
def test_client_url_delete(self):
api = self.new_mocked_client(client.RESTClient,
url_prefix='api/v1/ports')
api.url_delete('1')
assert_call(
'delete', api,
'https://1.2.3.4/api/v1/ports/1')
def test_client_url_put(self):
api = self.new_mocked_client(client.RESTClient,
url_prefix='api/v1/ports')
api.url_put('connections/1', jsonutils.dumps({'name': 'conn1'}))
assert_call(
'put', api,
'https://1.2.3.4/api/v1/ports/connections/1',
data=jsonutils.dumps({'name': 'conn1'}))
def test_client_url_post(self):
api = self.new_mocked_client(client.RESTClient,
url_prefix='api/v1/ports')
api.url_post('1/connections', jsonutils.dumps({'name': 'conn1'}))
assert_call(
'post', api,
'https://1.2.3.4/api/v1/ports/1/connections',
data=jsonutils.dumps({'name': 'conn1'}))
def test_client_validate_result(self):
def _verb_response_code(http_verb, status_code):
response = mocks.MockRequestsResponse(
status_code, None)
client_api = self.new_mocked_client(
client.RESTClient, mock_validate=False,
session_response=response)
client_call = getattr(client_api, "url_%s" % http_verb)
client_call('', None)
for verb in ['get', 'post', 'put', 'delete']:
for code in client.RESTClient._VERB_RESP_CODES.get(verb):
_verb_response_code(verb, code)
self.assertRaises(
nsxlib_exc.ManagerError,
_verb_response_code, verb, 500)
class NsxV3JSONClientTestCase(nsxlib_testcase.NsxClientTestCase):
def test_json_request(self):
resp = mocks.MockRequestsResponse(
200, jsonutils.dumps({'result': {'ok': 200}}))
api = self.new_mocked_client(client.JSONRESTClient,
session_response=resp,
url_prefix='api/v2/nat')
resp = api.create(body={'name': 'mgmt-egress'})
assert_json_call(
'post', api,
'https://1.2.3.4/api/v2/nat',
data=jsonutils.dumps({'name': 'mgmt-egress'}))
self.assertEqual(resp, {'result': {'ok': 200}})
class NsxV3APIClientTestCase(nsxlib_testcase.NsxClientTestCase):
def test_api_call(self):
api = self.new_mocked_client(client.NSX3Client)
api.get('ports')
assert_json_call(
'get', api,
'https://1.2.3.4/api/v1/ports')
# NOTE(boden): remove this when tmp brigding removed
class NsxV3APIClientBridgeTestCase(nsxlib_testcase.NsxClientTestCase):
def test_get_resource(self):
api = self.new_mocked_client(client.NSX3Client)
api.get('ports')
assert_json_call(
'get', api,
'https://1.2.3.4/api/v1/ports')
def test_create_resource(self):
api = self.new_mocked_client(client.NSX3Client)
api.create('ports', {'resource-name': 'port1'})
assert_json_call(
'post', api,
'https://1.2.3.4/api/v1/ports',
data=jsonutils.dumps({'resource-name': 'port1'}))
def test_update_resource(self):
api = self.new_mocked_client(client.NSX3Client)
api.update('ports/1', {'name': 'a-new-name'})
assert_json_call(
'put', api,
'https://1.2.3.4/api/v1/ports/1',
data=jsonutils.dumps({'name': 'a-new-name'}))
def test_delete_resource(self):
api = self.new_mocked_client(client.NSX3Client)
api.delete('ports/11')
assert_json_call(
'delete', api,
'https://1.2.3.4/api/v1/ports/11')

205
vmware_nsxlib/tests/unit/v3/test_cluster.py

@ -0,0 +1,205 @@
# Copyright 2015 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import mock
import six.moves.urllib.parse as urlparse
import unittest
from oslo_serialization import jsonutils
from requests import exceptions as requests_exceptions
from vmware_nsxlib.tests.unit.v3 import mocks
from vmware_nsxlib.tests.unit.v3 import nsxlib_testcase
from vmware_nsxlib.v3 import client
from vmware_nsxlib.v3 import cluster
from vmware_nsxlib.v3 import exceptions as nsxlib_exc
def _validate_conn_up(*args, **kwargs):
return
def _validate_conn_down(*args, **kwargs):
raise requests_exceptions.ConnectionError()
class RequestsHTTPProviderTestCase(unittest.TestCase):
def test_new_connection(self):
mock_api = mock.Mock()
mock_api.nsxlib_config = mock.Mock()
mock_api.nsxlib_config.username = 'nsxuser'
mock_api.nsxlib_config.password = 'nsxpassword'
mock_api.nsxlib_config.retries = 100
mock_api.nsxlib_config.insecure = True
mock_api.nsxlib_config.ca_file = None
mock_api.nsxlib_config.http_timeout = 99
mock_api.nsxlib_config.conn_idle_timeout = 39
provider = cluster.NSXRequestsHTTPProvider()
session = provider.new_connection(
mock_api, cluster.Provider('9.8.7.6', 'https://9.8.7.6',
'nsxuser', 'nsxpassword', None))
self.assertEqual(session.auth, ('nsxuser', 'nsxpassword'))
self.assertEqual(session.verify, False)
self.assertEqual(session.cert, None)
self.assertEqual(session.adapters['https://'].max_retries.total, 100)
self.assertEqual(session.timeout, 99)
def test_validate_connection(self):
self.skipTest("Revist")
mock_conn = mocks.MockRequestSessionApi()
mock_ep = mock.Mock()
mock_ep.provider.url = 'https://1.2.3.4'
provider = cluster.NSXRequestsHTTPProvider()
self.assertRaises(nsxlib_exc.ResourceNotFound,
provider.validate_connection,
mock.Mock(), mock_ep, mock_conn)
mock_conn.post('api/v1/transport-zones',
data=jsonutils.dumps({'id': 'dummy-tz'}),
headers=client.JSONRESTClient._DEFAULT_HEADERS)
provider.validate_connection(mock.Mock(), mock_ep, mock_conn)
class NsxV3ClusteredAPITestCase(nsxlib_testcase.NsxClientTestCase):
def _assert_providers(self, cluster_api, provider_tuples):
self.assertEqual(len(cluster_api.providers), len(provider_tuples))
def _assert_provider(pid, purl):
for provider in cluster_api.providers:
if provider.id == pid and provider.url == purl:
return
self.fail("Provider: %s not found" % pid)
for provider_tuple in provider_tuples:
_assert_provider(provider_tuple[0], provider_tuple[1])
def test_conf_providers_no_scheme(self):
conf_managers = ['8.9.10.11', '9.10.11.12:4433']
api = self.new_mocked_cluster(conf_managers, _validate_conn_up)
self._assert_providers(
api, [(p, "https://%s" % p) for p in conf_managers])
def test_conf_providers_with_scheme(self):
conf_managers = ['http://8.9.10.11:8080', 'https://9.10.11.12:4433']
api = self.new_mocked_cluster(conf_managers, _validate_conn_up)
self._assert_providers(
api, [(urlparse.urlparse(p).netloc, p) for p in conf_managers])
def test_http_retries(self):
api = self.mock_nsx_clustered_api(retries=9)
with api.endpoints['1.2.3.4'].pool.item() as session:
self.assertEqual(
session.adapters['https://'].max_retries.total, 9)
def test_conns_per_pool(self):
conf_managers = ['8.9.10.11', '9.10.11.12:4433']
api = self.new_mocked_cluster(
conf_managers, _validate_conn_up,
concurrent_connections=11)
for ep_id, ep in api.endpoints.items():
self.assertEqual(ep.pool.max_size, 11)
def test_timeouts(self):
api = self.mock_nsx_clustered_api(http_read_timeout=37, http_timeout=7)
api.get('logical-ports')
mock_call = api.recorded_calls.method_calls[0]
name, args, kwargs = mock_call
self.assertEqual(kwargs['timeout'], (7, 37))
class ClusteredAPITestCase(nsxlib_testcase.NsxClientTestCase):
def _test_health(self, validate_fn, expected_health):
conf_managers = ['8.9.10.11', '9.10.11.12']
api = self.new_mocked_cluster(conf_managers, validate_fn)
self.assertEqual(api.health, expected_health)
def test_orange_health(self):
def _validate(cluster_api, endpoint, conn):
if endpoint.provider.id == '8.9.10.11':
raise Exception()
self._test_health(_validate, cluster.ClusterHealth.ORANGE)
def test_green_health(self):
self._test_health(_validate_conn_up, cluster.ClusterHealth.GREEN)
def test_red_health(self):
self._test_health(_validate_conn_down, cluster.ClusterHealth.RED)
def test_cluster_validate_with_exception(self):
conf_managers = ['8.9.10.11', '9.10.11.12', '10.11.12.13']
api = self.new_mocked_cluster(conf_managers, _validate_conn_down)
self.assertEqual(len(api.endpoints), 3)
self.assertRaises(nsxlib_exc.ServiceClusterUnavailable,
api.get, 'api/v1/transport-zones')
def test_cluster_proxy_stale_revision(self):
def stale_revision():
raise nsxlib_exc.StaleRevision(manager='1.1.1.1',
operation='whatever')
api = self.mock_nsx_clustered_api(session_response=stale_revision)
self.assertRaises(nsxlib_exc.StaleRevision,
api.get, 'api/v1/transport-zones')
def test_cluster_proxy_connection_error(self):
def connect_timeout():
raise requests_exceptions.ConnectTimeout()
api = self.mock_nsx_clustered_api(session_response=connect_timeout)
api._validate = mock.Mock()
self.assertRaises(nsxlib_exc.ServiceClusterUnavailable,
api.get, 'api/v1/transport-zones')
def test_cluster_round_robin_servicing(self):
conf_managers = ['8.9.10.11', '9.10.11.12', '10.11.12.13']
api = self.mock_nsx_clustered_api(nsx_api_managers=conf_managers)
api._validate = mock.Mock()
eps = list(api._endpoints.values())
def _get_schedule(num_eps):
return [api._select_endpoint() for i in range(num_eps)]
self.assertEqual(_get_schedule(3), eps)
self.assertEqual(_get_schedule(6), [eps[0], eps[1], eps[2],
eps[0], eps[1], eps[2]])
eps[0]._state = cluster.EndpointState.DOWN
self.assertEqual(_get_schedule(4), [eps[1], eps[2], eps[1], eps[2]])
eps[1]._state = cluster.EndpointState.DOWN
self.assertEqual(_get_schedule(2), [eps[2], eps[2]])
eps[0]._state = cluster.EndpointState.UP
self.assertEqual(_get_schedule(4), [eps[0], eps[2], eps[0], eps[2]])
def test_reinitialize_cluster(self):
api = self.mock_n