JSON Path-like querying for variables
This patch implements the ability to query for values in nested variable documents. For instance, if we had hosts with a variables that look like: { "hardware_profiles": { "disks": [ { "manufacturer": "seagate", "capacity_quantity": 2, "capacity_unit": "TB" }, { "manufacturer": "western", "capacity_quantity": 3, "capacity_unit": "TB" } ] } } and we wanted to grab all of them with a disk that was manufactured by Seagate, we would like to be able to query like so: GET /v1/hosts?vars=hardware_profiles.disks[*].manufacturer:"seagate" This does modify the variables query to expect valid JSON values. So, strings need to be quoted, for instance. Change-Id: Id597d3e57d2e28766fecd1b314f53176543e1b9d Closes-Bug: 1671116
This commit is contained in:
parent
0e4280ede3
commit
606926b89d
|
@ -6,6 +6,8 @@ from operator import attrgetter
|
|||
import sys
|
||||
import uuid
|
||||
|
||||
import jsonpath_rw as jspath
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_db import options as db_options
|
||||
|
@ -13,7 +15,8 @@ from oslo_db.sqlalchemy import session
|
|||
from oslo_db.sqlalchemy import utils as db_utils
|
||||
from oslo_log import log
|
||||
|
||||
from sqlalchemy import or_, sql
|
||||
from sqlalchemy import and_, sql
|
||||
from sqlalchemy import func as sa_func
|
||||
import sqlalchemy.orm.exc as sa_exc
|
||||
from sqlalchemy.orm import with_polymorphic
|
||||
|
||||
|
@ -32,6 +35,14 @@ _DEFAULT_SQL_CONNECTION = 'sqlite://'
|
|||
db_options.set_defaults(cfg.CONF,
|
||||
connection=_DEFAULT_SQL_CONNECTION)
|
||||
|
||||
MYSQL_INVALID_JSONPATH_EXPRESSION = 3143
|
||||
MYSQL_INVALID_JSON_TEXT = 3141
|
||||
|
||||
JSON_EXCEPTIONS = {
|
||||
MYSQL_INVALID_JSONPATH_EXPRESSION: exceptions.InvalidJSONPath,
|
||||
MYSQL_INVALID_JSON_TEXT: exceptions.InvalidJSONValue,
|
||||
}
|
||||
|
||||
|
||||
def _create_facade_lazily():
|
||||
global _FACADE
|
||||
|
@ -156,58 +167,153 @@ def model_query(context, model, *args, **kwargs):
|
|||
model=model, session=session, args=args, **kwargs)
|
||||
|
||||
|
||||
def _generate_or_clauses(kv_pairs):
|
||||
or_clauses = []
|
||||
for k, v in kv_pairs:
|
||||
or_clauses.append(
|
||||
((models.Variable.key == k) & (models.Variable.value == v)))
|
||||
return or_clauses
|
||||
def _find_first_key_fragment(root):
|
||||
"""Finds element where first key Field exists."""
|
||||
desired = jspath.Fields
|
||||
if isinstance(root, desired):
|
||||
return root
|
||||
while not isinstance(root.left, desired):
|
||||
root = root.left
|
||||
return root
|
||||
|
||||
|
||||
def _split_key_path_prefix(root):
|
||||
"""Extract first key and initial path from parsed JSON Path.
|
||||
|
||||
This is necessitated by us not actually including the top-most Key
|
||||
component in the JSON column (Variables.value) in the database, so we only
|
||||
need a parser to extract the first Key from the expression. The rest can be
|
||||
handled in the database.
|
||||
|
||||
This essentially takes a parsed JSON Path, finds the first field and
|
||||
optional Slice or Index. It then uses the value of the first field's value
|
||||
(fields[0]) as the key and, if a Slice of Index exist on the right
|
||||
side of the expression, it builds an Array specifier as the initial part of
|
||||
the path and then returns them.
|
||||
"""
|
||||
# NOTE(thomasem): Because of how keys work and our data model, the first
|
||||
# element should not be anything other than a Fields or a Child fragment.
|
||||
if not isinstance(root, (jspath.Fields, jspath.Child)):
|
||||
raise exceptions.InvalidJSONPath()
|
||||
|
||||
path = ''
|
||||
fragment = _find_first_key_fragment(root)
|
||||
right = getattr(fragment, 'right', None)
|
||||
left = getattr(fragment, 'left', fragment)
|
||||
|
||||
if isinstance(right, jspath.Slice) and any([right.start, right.end]):
|
||||
# NOTE(thomasem): MySQL 5.7 does not support arbitrary slices, only
|
||||
# '[*]'.
|
||||
raise exceptions.InvalidJSONPath()
|
||||
elif isinstance(right, jspath.Slice):
|
||||
path = '[*]'
|
||||
elif isinstance(right, jspath.Index):
|
||||
path = '[{}]'.format(right.index)
|
||||
|
||||
key = left.fields[0]
|
||||
return key, path
|
||||
|
||||
|
||||
def _parse_path_expr(path_expression):
|
||||
"""Split into the first key and the path used for querying MySQL."""
|
||||
try:
|
||||
parsed = jspath.parse(path_expression)
|
||||
except Exception as e:
|
||||
# NOTE(thomasem): Unfortunately, this library raises an Exception
|
||||
# instead of something more specific.
|
||||
raise exceptions.InvalidJSONPath() from e
|
||||
|
||||
# NOTE(thomasem): Get the first key from the parsed JSON Path expression
|
||||
# and initial path for the Variables.value JSON column, if there is any.
|
||||
# There would only be an initial path if there's an Array specifier
|
||||
# immediately adjacent to the first key, for example: 'foo[5]' or 'foo[*]'.
|
||||
key, path = _split_key_path_prefix(parsed)
|
||||
|
||||
# NOTE(thomasem): Remove the key we found from the original expression
|
||||
# since that's not included in the Variables.value JSON column.
|
||||
key_removed = path_expression[len(key):]
|
||||
|
||||
# NOTE(thomasem): Because the first key could have been wrapped in quotes
|
||||
# to denote it as a JSON string for handling special characters in the
|
||||
# key, we will want to find the first delimiter ('.') if one exists to
|
||||
# know where to slice off the remainder of the path and combine prefix
|
||||
# and suffix.
|
||||
path_suffix_start = key_removed.find('.')
|
||||
if path_suffix_start > -1:
|
||||
path = '{}{}'.format(path, key_removed[path_suffix_start:])
|
||||
return key, '${}'.format(path)
|
||||
|
||||
|
||||
def _json_path_clause(kv_pair):
|
||||
path_expr, value = kv_pair
|
||||
key, path = _parse_path_expr(path_expr)
|
||||
|
||||
json_match = sa_func.json_contains(
|
||||
sa_func.json_extract(models.Variable.value, path), value)
|
||||
|
||||
# NOTE(thomasem): Order is important here. MySQL will short-circuit and
|
||||
# not properly validate the JSON Path expression when the key doesn't exist
|
||||
# if the key match is first int he and_(...) expression. So, let's put
|
||||
# the json_match first.
|
||||
return and_(json_match, models.Variable.key == key)
|
||||
|
||||
|
||||
def _get_variables(query):
|
||||
try:
|
||||
return set(query)
|
||||
except db_exc.DBError as e:
|
||||
orig = e.inner_exception.orig
|
||||
code = orig.args[0]
|
||||
if orig and code in JSON_EXCEPTIONS:
|
||||
raise JSON_EXCEPTIONS[code]() from e
|
||||
raise
|
||||
|
||||
|
||||
def _matching_resources(query, resource_cls, get_descendants, kv):
|
||||
# NOTE(jimbaker) The below algorithm works as follows:
|
||||
#
|
||||
# 1. Computes the generalized descendants for each k:v var in the query;
|
||||
#
|
||||
# 2. Computes their intersection, returning a set of matching
|
||||
# resources (empty set if conjunction of kv matches does not
|
||||
# match)
|
||||
|
||||
# NOTE(jimbaker) Build a query that determine all resources that
|
||||
# match this key/value, taking in account variable
|
||||
# resolution. Note that this value is generalized to be a JSON
|
||||
# path; and the resolution is inverted by finding any possible
|
||||
# descendants.
|
||||
|
||||
kv_pairs = list(kv.items())
|
||||
matches = dict((kv_pair, set()) for kv_pair in kv_pairs)
|
||||
matches = {}
|
||||
for kv_pair in kv_pairs:
|
||||
match = matches[kv_pair] = set()
|
||||
kv_clause = _json_path_clause(kv_pair)
|
||||
matching_variables = _get_variables(
|
||||
query.session.query(models.Variable).filter(kv_clause))
|
||||
|
||||
# NOTE(jimbaker) this query can be readily generalized. Some
|
||||
# options could include:
|
||||
#
|
||||
# * Key existence (good for treating vars as if they are labels)
|
||||
# * JSON path matches on the values
|
||||
# * Nested queries that use JSON paths for the underlying implementation
|
||||
#
|
||||
# But for now, simply find all variables that explicitly match one
|
||||
# or more key value pairs.
|
||||
#
|
||||
# Regardless of any generalization, this means at this point we
|
||||
# need to construct the disjunction ("or") of all the supplied kv
|
||||
# pairs. (The next step will then compute the conjunction, but
|
||||
# with respect to resolution.)
|
||||
q = query.session.query(models.Variable)
|
||||
q = q.filter(or_(*_generate_or_clauses(kv_pairs)))
|
||||
variables = set(q)
|
||||
for variable in variables:
|
||||
match = matches[(variable.key, variable.value)]
|
||||
for variable in matching_variables:
|
||||
if isinstance(variable.parent, resource_cls):
|
||||
match.add(variable.parent)
|
||||
|
||||
# NOTE(jimbaker) now check for descendent overrides; in
|
||||
# particular, it's possible that a chain of overrides
|
||||
# could occur such that the original top value was
|
||||
# restored.
|
||||
#
|
||||
# Because all of the matching variables were returned by
|
||||
# the query; and SQLAlchemy guarantees that within the
|
||||
# scope of a transaction ("unit of work") that a given
|
||||
# object will have the same identity, we can check with
|
||||
# respect to matching_variables. Do note that arbitrary
|
||||
# additional object graph queries may be done to check the
|
||||
# resolution ordering.
|
||||
|
||||
for descendant in get_descendants(variable.parent):
|
||||
for level in descendant.resolution_order:
|
||||
desc_variable = level._variables.get(variable.key)
|
||||
if desc_variable is not None:
|
||||
if desc_variable in variables:
|
||||
if desc_variable in matching_variables:
|
||||
match.add(descendant)
|
||||
break
|
||||
|
||||
# NOTE(jimbaker) For now, we simply match for the conjunction
|
||||
# ("and") of all the supplied kv pairs we are matching
|
||||
# against. Generalize as desired with other boolean logic.
|
||||
# against. Generalize in the future as desired with other boolean
|
||||
# logic.
|
||||
_, first_match = matches.popitem()
|
||||
if matches:
|
||||
resources = first_match.intersection(*matches.values())
|
||||
|
@ -230,6 +336,9 @@ _resource_mapping = {
|
|||
models.Cell: (
|
||||
[models.Project, models.Cloud, models.Region],
|
||||
attrgetter('cells')),
|
||||
models.Network: (
|
||||
[models.Project, models.Cloud, models.Region, models.Cell],
|
||||
attrgetter('networks')),
|
||||
models.Device: (
|
||||
[models.Project, models.Cloud, models.Region, models.Cell,
|
||||
models.Device],
|
||||
|
@ -246,6 +355,7 @@ def matching_resources(query, resource_cls, kv, resolved):
|
|||
return getter(parent)
|
||||
else:
|
||||
return []
|
||||
|
||||
return _matching_resources(query, resource_cls, get_desc, kv)
|
||||
|
||||
|
||||
|
@ -259,7 +369,7 @@ def _add_var_filters_to_query(query, model, var_filters, resolved=True):
|
|||
)
|
||||
if not resource_ids:
|
||||
# short circuit; this also avoids SQLAlchemy reporting that it is
|
||||
# working with an empty in clause
|
||||
# working with an empty "in" clause
|
||||
return query.filter(sql.false())
|
||||
return query.filter(model.id.in_(resource_ids))
|
||||
|
||||
|
|
|
@ -79,6 +79,14 @@ class BadRequest(Base):
|
|||
code = 400
|
||||
|
||||
|
||||
class InvalidJSONPath(BadRequest):
|
||||
msg = "The query contains an invalid JSON Path expression."
|
||||
|
||||
|
||||
class InvalidJSONValue(BadRequest):
|
||||
msg = "An invalid JSON value was specified."
|
||||
|
||||
|
||||
class NotFound(Base):
|
||||
code = 404
|
||||
msg = "Not Found"
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import contextlib
|
||||
import copy
|
||||
import docker
|
||||
import json
|
||||
import requests
|
||||
|
@ -27,6 +28,13 @@ HEADER_USERNAME = 'X-Auth-User'
|
|||
HEADER_PROJECT = 'X-Auth-Project'
|
||||
|
||||
|
||||
def get_root_headers():
|
||||
return {
|
||||
HEADER_USERNAME: FAKE_DATA_GEN_BOOTSTRAP_USERNAME,
|
||||
HEADER_TOKEN: FAKE_DATA_GEN_BOOTSTRAP_TOKEN
|
||||
}
|
||||
|
||||
|
||||
class DockerSetup(threading.Thread):
|
||||
|
||||
def __init__(self):
|
||||
|
@ -198,6 +206,7 @@ class TestCase(testtools.TestCase):
|
|||
super(TestCase, self).setUp()
|
||||
self.container_setup_error = _container.error
|
||||
self.session = requests.Session()
|
||||
|
||||
if not self.container_setup_error:
|
||||
data = _container.container_data
|
||||
self.service_ip = data['NetworkSettings']['IPAddress']
|
||||
|
@ -206,6 +215,9 @@ class TestCase(testtools.TestCase):
|
|||
self.session.headers[HEADER_USERNAME] = FAKE_DATA_GEN_USERNAME
|
||||
self.session.headers[HEADER_TOKEN] = FAKE_DATA_GEN_TOKEN
|
||||
|
||||
self.root_headers = copy.deepcopy(self.session.headers)
|
||||
self.root_headers.update(get_root_headers())
|
||||
|
||||
setup_database(self.service_ip)
|
||||
|
||||
def tearDown(self):
|
||||
|
@ -279,12 +291,12 @@ class TestCase(testtools.TestCase):
|
|||
self.assertFailureFormat(resp)
|
||||
return resp
|
||||
|
||||
def create_project(self, name, headers=None, variables=None):
|
||||
def create_project(self, name, variables=None):
|
||||
url = self.url + '/v1/projects'
|
||||
payload = {'name': name}
|
||||
if variables:
|
||||
payload['variables'] = variables
|
||||
response = self.post(url, headers=headers, data=payload)
|
||||
response = self.post(url, headers=self.root_headers, data=payload)
|
||||
self.assertEqual(201, response.status_code)
|
||||
self.assertIn('Location', response.headers)
|
||||
project = response.json()
|
||||
|
@ -356,6 +368,31 @@ class TestCase(testtools.TestCase):
|
|||
)
|
||||
return cell.json()
|
||||
|
||||
def create_network(
|
||||
self, name, cloud, region, cidr, gateway, netmask, variables=None
|
||||
):
|
||||
|
||||
url = self.url + '/v1/networks'
|
||||
payload = {
|
||||
'name': name,
|
||||
'cidr': cidr,
|
||||
'gateway': gateway,
|
||||
'netmask': netmask,
|
||||
'region_id': region['id'],
|
||||
'cloud_id': cloud['id'],
|
||||
}
|
||||
if variables:
|
||||
payload['variables'] = variables
|
||||
|
||||
network = self.post(url, data=payload)
|
||||
self.assertEqual(201, network.status_code)
|
||||
self.assertIn('Location', network.headers)
|
||||
self.assertEqual(
|
||||
network.headers['Location'],
|
||||
"{}/{}".format(url, network.json()['id'])
|
||||
)
|
||||
return network.json()
|
||||
|
||||
def create_host(self, name, cloud, region, hosttype, ip_address,
|
||||
parent_id=None, **variables):
|
||||
url = self.url + '/v1/hosts'
|
||||
|
|
|
@ -22,7 +22,7 @@ class APIV1HostTest(DeviceTestBase, APIV1ResourceWithVariablesTestCase):
|
|||
self.create_host('host2', 'server', '192.168.1.2', **vars2)
|
||||
|
||||
url = self.url + '/v1/hosts'
|
||||
resp = self.get(url, vars='a:b')
|
||||
resp = self.get(url, vars='a:"b"')
|
||||
self.assertEqual(200, resp.status_code)
|
||||
hosts = resp.json()['hosts']
|
||||
self.assertEqual(2, len(hosts))
|
||||
|
@ -30,7 +30,7 @@ class APIV1HostTest(DeviceTestBase, APIV1ResourceWithVariablesTestCase):
|
|||
{host['ip_address'] for host in hosts})
|
||||
|
||||
url = self.url + '/v1/hosts'
|
||||
resp = self.get(url, vars='host:one')
|
||||
resp = self.get(url, vars='host:"one"')
|
||||
self.assertEqual(200, resp.status_code)
|
||||
hosts = resp.json()['hosts']
|
||||
self.assertEqual(1, len(hosts))
|
||||
|
@ -352,7 +352,7 @@ class APIV1HostTest(DeviceTestBase, APIV1ResourceWithVariablesTestCase):
|
|||
region=region, **host_vars)
|
||||
url = self.url + '/v1/hosts'
|
||||
|
||||
resp = self.get(url, vars="foo:bar,baz:zoo")
|
||||
resp = self.get(url, vars='foo:"bar",baz:"zoo"')
|
||||
hosts = resp.json()['hosts']
|
||||
self.assertEqual(1, len(hosts))
|
||||
self.assertEqual(host2['id'], hosts[0]['id'])
|
||||
|
@ -367,7 +367,7 @@ class APIV1HostTest(DeviceTestBase, APIV1ResourceWithVariablesTestCase):
|
|||
region=region, **host_vars)
|
||||
url = self.url + '/v1/hosts'
|
||||
|
||||
resp = self.get(url, vars='foo:bar')
|
||||
resp = self.get(url, vars='foo:"bar"')
|
||||
hosts = resp.json()['hosts']
|
||||
self.assertEqual(2, len(hosts))
|
||||
self.assertListEqual(sorted([host1['id'], host2['id']]),
|
||||
|
@ -382,12 +382,12 @@ class APIV1HostTest(DeviceTestBase, APIV1ResourceWithVariablesTestCase):
|
|||
region=region)
|
||||
url = self.url + '/v1/hosts'
|
||||
|
||||
resp = self.get(url, vars='foo:baz')
|
||||
resp = self.get(url, vars='foo:"baz"')
|
||||
hosts = resp.json()['hosts']
|
||||
self.assertEqual(1, len(hosts))
|
||||
self.assertEqual(host1['id'], hosts[0]['id'])
|
||||
|
||||
resp = self.get(url, vars='foo:bar')
|
||||
resp = self.get(url, vars='foo:"bar"')
|
||||
hosts = resp.json()['hosts']
|
||||
self.assertEqual(1, len(hosts))
|
||||
self.assertEqual(host2['id'], hosts[0]['id'])
|
||||
|
@ -399,12 +399,12 @@ class APIV1HostTest(DeviceTestBase, APIV1ResourceWithVariablesTestCase):
|
|||
parent_id=host1['id'], baz='boo')
|
||||
url = self.url + '/v1/hosts'
|
||||
|
||||
resp = self.get(url, vars='baz:zoo')
|
||||
resp = self.get(url, vars='baz:"zoo"')
|
||||
hosts = resp.json()['hosts']
|
||||
self.assertEqual(1, len(hosts))
|
||||
self.assertEqual(host1['id'], hosts[0]['id'])
|
||||
|
||||
resp = self.get(url, vars='baz:boo')
|
||||
resp = self.get(url, vars='baz:"boo"')
|
||||
hosts = resp.json()['hosts']
|
||||
self.assertEqual(1, len(hosts))
|
||||
self.assertEqual(host2['id'], hosts[0]['id'])
|
||||
|
@ -417,7 +417,8 @@ class APIV1HostTest(DeviceTestBase, APIV1ResourceWithVariablesTestCase):
|
|||
# NOTE(thomasem): Unfortunately, we use resolved-values instead of
|
||||
# resolved_values, so we can't pass this in as kwargs to self.get(...),
|
||||
# see https://bugs.launchpad.net/craton/+bug/1672880.
|
||||
url = self.url + '/v1/hosts?resolved-values=false&vars=foo:bar,baz:zoo'
|
||||
url = self.url + \
|
||||
'/v1/hosts?resolved-values=false&vars=foo:"bar",baz:"zoo"'
|
||||
|
||||
resp = self.get(url)
|
||||
hosts = resp.json()['hosts']
|
||||
|
|
|
@ -0,0 +1,550 @@
|
|||
from craton import exceptions
|
||||
from craton.tests import functional
|
||||
|
||||
TEST_STRING = "I'm just a string"
|
||||
|
||||
TEST_ARRAY = [
|
||||
1,
|
||||
23.4,
|
||||
True,
|
||||
False,
|
||||
'false',
|
||||
TEST_STRING,
|
||||
{
|
||||
'bumbleywump': 'cucumberpatch',
|
||||
'literal_boolean': 'true'
|
||||
},
|
||||
['sub', 'array', True]
|
||||
]
|
||||
|
||||
TEST_DICT = {
|
||||
'foo': {
|
||||
'nested_string': 'Bumbleywump Cucumberpatch',
|
||||
'nested_bool': True,
|
||||
'nested_null': None,
|
||||
'nested_int': 1,
|
||||
'nested_float': 3.14,
|
||||
'nested_boolstr': 'false',
|
||||
'hyphenated-key': 'look-at-all-these-hyphens!',
|
||||
},
|
||||
'bar': TEST_ARRAY,
|
||||
'baz': 'zoo'
|
||||
}
|
||||
|
||||
|
||||
def _get_variables_for(name):
|
||||
return {
|
||||
'{}_dict'.format(name): TEST_DICT,
|
||||
'{}_array'.format(name): TEST_ARRAY,
|
||||
'{}_string'.format(name): TEST_STRING,
|
||||
}
|
||||
|
||||
|
||||
class JSONPathResolvedSearchTestCase(functional.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(JSONPathResolvedSearchTestCase, self).setUp()
|
||||
self.cloud = self.create_cloud(
|
||||
name='cloud1',
|
||||
variables=_get_variables_for('cloud1'),
|
||||
)
|
||||
self.region = self.create_region(
|
||||
name='region1',
|
||||
cloud=self.cloud,
|
||||
variables=_get_variables_for('region1'),
|
||||
)
|
||||
self.cell = self.create_cell(
|
||||
name='cell1',
|
||||
cloud=self.cloud,
|
||||
region=self.region,
|
||||
variables=_get_variables_for('cell1')
|
||||
)
|
||||
self.switches = []
|
||||
for i in range(2):
|
||||
name = 'netdev{}'.format(str(i))
|
||||
self.switches.append(self.create_network_device(
|
||||
name=name,
|
||||
cloud=self.cloud,
|
||||
region=self.region,
|
||||
cell=self.cell,
|
||||
device_type='switch',
|
||||
ip_address='192.168.{}.1'.format(i),
|
||||
**_get_variables_for(name)
|
||||
))
|
||||
|
||||
self.hosts = []
|
||||
for i in range(len(self.switches) * 3):
|
||||
name = 'host{}'.format(i)
|
||||
self.hosts.append(self.create_host(
|
||||
name=name,
|
||||
cloud=self.cloud,
|
||||
region=self.region,
|
||||
cell=self.cell,
|
||||
hosttype='server',
|
||||
ip_address='192.168.{}.2'.format(i),
|
||||
parent_id=self.switches[i % len(self.switches)]['id'],
|
||||
**_get_variables_for(name)
|
||||
))
|
||||
|
||||
def test_jsonpath_search_device_parent(self):
|
||||
url = self.url + '/v1/hosts'
|
||||
queries = [
|
||||
'netdev1_dict.foo."hyphenated-key":"look-at-all-these-hyphens!"',
|
||||
]
|
||||
expected_names = ['host1', 'host3', 'host5']
|
||||
|
||||
resp = self.get(url, vars=','.join(queries))
|
||||
hosts = resp.json()['hosts']
|
||||
parent_ids = set([h['parent_id'] for h in hosts])
|
||||
|
||||
self.assertEqual(3, len(hosts))
|
||||
self.assertEqual(1, len(parent_ids))
|
||||
self.assertEqual(self.switches[1]['id'], parent_ids.pop())
|
||||
self.assertListEqual(
|
||||
sorted(expected_names),
|
||||
sorted([h['name'] for h in hosts])
|
||||
)
|
||||
|
||||
def test_jsonpath_search_device_parent_override(self):
|
||||
url = self.url + '/v1/hosts'
|
||||
queries = [
|
||||
'netdev1_dict.foo."hyphenated-key":"look-at-all-these-hyphens!"',
|
||||
]
|
||||
variables_put = {
|
||||
'netdev1_dict': {
|
||||
'foo': {
|
||||
'hyphenated-key': 'look-at-all-these-hyphens'
|
||||
}
|
||||
}
|
||||
}
|
||||
self.put('{}/{}/variables'.format(url, self.hosts[3]['id']),
|
||||
data=variables_put)
|
||||
resp = self.get(url, vars=','.join(queries))
|
||||
hosts = resp.json()['hosts']
|
||||
parent_ids = set([h['parent_id'] for h in hosts])
|
||||
|
||||
self.assertEqual(2, len(hosts))
|
||||
self.assertEqual(1, len(parent_ids))
|
||||
self.assertEqual(self.switches[1]['id'], parent_ids.pop())
|
||||
|
||||
def test_jsonpath_search_device_child_vars_included(self):
|
||||
url = self.url + '/v1/hosts'
|
||||
queries = [
|
||||
'netdev1_dict.foo."hyphenated-key":"look-at-all-these-hyphens!"',
|
||||
]
|
||||
modified_id = self.hosts[0]['id']
|
||||
variables_put = {
|
||||
'netdev1_dict': {
|
||||
'foo': {
|
||||
'hyphenated-key': 'look-at-all-these-hyphens!'
|
||||
}
|
||||
}
|
||||
}
|
||||
self.put('{}/{}/variables'.format(url, modified_id),
|
||||
data=variables_put)
|
||||
expected_names = ['host0', 'host1', 'host3', 'host5']
|
||||
|
||||
resp = self.get(url, vars=','.join(queries))
|
||||
hosts = resp.json()['hosts']
|
||||
|
||||
self.assertEqual(4, len(hosts))
|
||||
self.assertListEqual(
|
||||
sorted(expected_names),
|
||||
sorted([h['name'] for h in hosts])
|
||||
)
|
||||
|
||||
def test_jsonpath_search_device_conjunctive_parent_vars(self):
|
||||
url = self.url + '/v1/hosts'
|
||||
queries = [
|
||||
'netdev1_dict.foo."hyphenated-key":"look-at-all-these-hyphens!"',
|
||||
'region1_array[2]:true',
|
||||
'cloud1_dict.bar[3]:false',
|
||||
]
|
||||
resp = self.get(url, vars=','.join(queries))
|
||||
hosts = resp.json()['hosts']
|
||||
parent_ids = set([h['parent_id'] for h in hosts])
|
||||
|
||||
self.assertEqual(3, len(hosts))
|
||||
self.assertEqual(1, len(parent_ids))
|
||||
self.assertEqual(self.switches[1]['id'], parent_ids.pop())
|
||||
|
||||
|
||||
class JSONPathSearchTestCaseMixin(object):
|
||||
|
||||
resource = '<resource>'
|
||||
|
||||
def get_resource_url(self):
|
||||
return '{}/v1/{}'.format(self.url, self.resource)
|
||||
|
||||
def setup_projects(self, projects):
|
||||
created = []
|
||||
for name, variables in projects:
|
||||
created.append(self.create_project(
|
||||
name=name,
|
||||
variables=variables
|
||||
))
|
||||
return created
|
||||
|
||||
def setup_clouds(self, clouds):
|
||||
created = []
|
||||
for name, variables in clouds:
|
||||
created.append(self.create_cloud(
|
||||
name=name,
|
||||
variables=variables
|
||||
))
|
||||
return created
|
||||
|
||||
def setup_regions(self, regions):
|
||||
created = []
|
||||
cloud = self.create_cloud(name='cloud1')
|
||||
for name, variables in regions:
|
||||
created.append(self.create_region(
|
||||
name=name,
|
||||
cloud=cloud,
|
||||
variables=variables
|
||||
))
|
||||
return created
|
||||
|
||||
def setup_cells(self, cells):
|
||||
created = []
|
||||
cloud = self.create_cloud(name='cloud1')
|
||||
region = self.create_region(
|
||||
name='region1',
|
||||
cloud=cloud
|
||||
)
|
||||
for name, variables in cells:
|
||||
created.append(self.create_cell(
|
||||
name=name,
|
||||
cloud=cloud,
|
||||
region=region,
|
||||
variables=variables
|
||||
))
|
||||
return created
|
||||
|
||||
def setup_networks(self, networks):
|
||||
created = []
|
||||
cloud = self.create_cloud(name='cloud1')
|
||||
region = self.create_region(
|
||||
name='region1',
|
||||
cloud=cloud
|
||||
)
|
||||
for name, variables in networks:
|
||||
created.append(self.create_network(
|
||||
name=name,
|
||||
cloud=cloud,
|
||||
region=region,
|
||||
cidr='192.168.0.0/24',
|
||||
gateway='192.168.0.1',
|
||||
netmask='255.255.255.0',
|
||||
variables=variables
|
||||
))
|
||||
return created
|
||||
|
||||
def setup_network_devices(self, network_devices):
|
||||
created = []
|
||||
cloud = self.create_cloud(name='cloud1')
|
||||
region = self.create_region(
|
||||
name='region1',
|
||||
cloud=cloud
|
||||
)
|
||||
for name, variables in network_devices:
|
||||
created.append(self.create_network_device(
|
||||
name=name,
|
||||
cloud=cloud,
|
||||
region=region,
|
||||
device_type='switch',
|
||||
ip_address='192.168.0.1',
|
||||
**variables
|
||||
))
|
||||
return created
|
||||
|
||||
def setup_hosts(self, hosts):
|
||||
created = []
|
||||
cloud = self.create_cloud(name='cloud1')
|
||||
region = self.create_region(
|
||||
name='region1',
|
||||
cloud=cloud
|
||||
)
|
||||
for name, variables in hosts:
|
||||
created.append(self.create_host(
|
||||
name=name,
|
||||
cloud=cloud,
|
||||
region=region,
|
||||
hosttype='server',
|
||||
ip_address='192.168.0.1',
|
||||
**variables
|
||||
))
|
||||
return created
|
||||
|
||||
def setup_resources(self, resources):
|
||||
setup_fn = {
|
||||
"projects": self.setup_projects,
|
||||
"clouds": self.setup_clouds,
|
||||
"regions": self.setup_regions,
|
||||
"cells": self.setup_cells,
|
||||
"networks": self.setup_networks,
|
||||
"network-devices": self.setup_network_devices,
|
||||
"hosts": self.setup_hosts,
|
||||
}
|
||||
return setup_fn[self.resource](resources)
|
||||
|
||||
def resources_from_response(self, resp):
|
||||
return resp.json()[self.resource.replace('-', '_')]
|
||||
|
||||
def get_resources(self, **params):
|
||||
headers = None
|
||||
if self.resource in ('projects',):
|
||||
headers = self.root_headers
|
||||
resp = self.get(self.get_resource_url(), headers=headers,
|
||||
details='all', **params)
|
||||
return resp
|
||||
|
||||
def test_jsonpath_search_nested_string(self):
|
||||
resources = (
|
||||
('resource1', {'foo': TEST_DICT}),
|
||||
('resource2', {'foo': {'baz': 'nope'}})
|
||||
)
|
||||
created = self.setup_resources(resources)
|
||||
|
||||
found = self.resources_from_response(self.get_resources(
|
||||
vars='foo.foo.nested_string:"Bumbleywump Cucumberpatch"'))
|
||||
|
||||
self.assertEqual(1, len(found))
|
||||
self.assertEqual(created[0]['id'], found[0]['id'])
|
||||
self.assertEqual(created[0]['variables'], found[0]['variables'])
|
||||
|
||||
def test_jsonpath_search_nested_string_wildcard(self):
|
||||
resources = (
|
||||
('resource1', {'foo': TEST_DICT}),
|
||||
('resource2', {'foo': {"baz": "zoom"}})
|
||||
)
|
||||
created = self.setup_resources(resources)
|
||||
|
||||
found = self.resources_from_response(
|
||||
self.get_resources(vars='foo.*:"zoo"'))
|
||||
|
||||
self.assertEqual(1, len(found))
|
||||
self.assertEqual(created[0]['id'], found[0]['id'])
|
||||
self.assertEqual(created[0]['variables'], found[0]['variables'])
|
||||
|
||||
def test_jsonpath_search_array_string(self):
|
||||
resources = (
|
||||
('resource1', {'foo': TEST_ARRAY}),
|
||||
('resource2', {'foo': TEST_ARRAY}),
|
||||
('resource3', {'foo': ["I'm just a string", 1, 2, 3, 4, 'foo']}),
|
||||
)
|
||||
created = self.setup_resources(resources)
|
||||
|
||||
found = self.resources_from_response(
|
||||
self.get_resources(vars='foo[5]:"I\'m just a string"'))
|
||||
|
||||
self.assertEqual(2, len(found))
|
||||
self.assertListEqual(sorted([c['id'] for c in created[:2]]),
|
||||
sorted([f['id'] for f in found]))
|
||||
|
||||
def test_jsonpath_search_array_string_wildcard(self):
|
||||
resources = (
|
||||
('resource1', {'foo': TEST_ARRAY}),
|
||||
('resource2', {'foo': TEST_ARRAY}),
|
||||
('resource3', {'foo': ["I'm just a string", True]}),
|
||||
('resource4', {'foo': ['Bumbleywump Cucumberpatch']}),
|
||||
)
|
||||
created = self.setup_resources(resources)
|
||||
|
||||
found = self.resources_from_response(
|
||||
self.get_resources(vars='foo[*]:"I\'m just a string"'))
|
||||
|
||||
self.assertEqual(3, len(found))
|
||||
self.assertListEqual(sorted([c['id'] for c in created[:3]]),
|
||||
sorted([f['id'] for f in found]))
|
||||
|
||||
def test_jsonpath_search_nested_array_string(self):
|
||||
resources = (
|
||||
('resource1', {'foo': TEST_DICT}),
|
||||
('resource2', {'foo': TEST_DICT}),
|
||||
('resource3', {'foo': {"bar": ["I'm just a string", True]}}),
|
||||
('resource4', {'foo': TEST_ARRAY}),
|
||||
)
|
||||
created = self.setup_resources(resources)
|
||||
|
||||
found = self.resources_from_response(
|
||||
self.get_resources(vars='foo.bar[*]:"I\'m just a string"'))
|
||||
|
||||
self.assertEqual(3, len(found))
|
||||
self.assertListEqual(sorted([c['id'] for c in created[:3]]),
|
||||
sorted([f['id'] for f in found]))
|
||||
|
||||
def test_jsonpath_search_nested_int(self):
|
||||
resources = (
|
||||
('resource1', {'foo': TEST_DICT}),
|
||||
('resource2', {'foo': {"foo": {"nested_int": "1"}}})
|
||||
)
|
||||
created = self.setup_resources(resources)
|
||||
|
||||
found = self.resources_from_response(
|
||||
self.get_resources(vars='foo.foo.nested_int:1'))
|
||||
|
||||
self.assertEqual(1, len(found))
|
||||
self.assertEqual(created[0]['id'], found[0]['id'])
|
||||
self.assertEqual(created[0]['variables'], found[0]['variables'])
|
||||
|
||||
def test_jsonpath_search_nested_float(self):
|
||||
resources = (
|
||||
('resource1', {'foo': TEST_DICT}),
|
||||
('resource2', {'foo': {"foo": {"nested_float": 3}}})
|
||||
)
|
||||
created = self.setup_resources(resources)
|
||||
|
||||
found = self.resources_from_response(
|
||||
self.get_resources(vars='foo.foo.nested_float:3.14'))
|
||||
|
||||
self.assertEqual(1, len(found))
|
||||
self.assertEqual(created[0]['id'], found[0]['id'])
|
||||
self.assertEqual(created[0]['variables'], found[0]['variables'])
|
||||
|
||||
def test_jsonpath_search_nested_bool(self):
|
||||
resources = (
|
||||
('resource1', {'foo': TEST_DICT}),
|
||||
('resource2', {'foo': {"foo": {"nested_bool": 'true'}}})
|
||||
)
|
||||
created = self.setup_resources(resources)
|
||||
|
||||
found = self.resources_from_response(
|
||||
self.get_resources(vars='foo.foo.nested_bool:true'))
|
||||
|
||||
self.assertEqual(1, len(found))
|
||||
self.assertEqual(created[0]['id'], found[0]['id'])
|
||||
self.assertEqual(created[0]['variables'], found[0]['variables'])
|
||||
|
||||
def test_jsonpath_search_nested_boolstr(self):
|
||||
resources = (
|
||||
('resource1', {'foo': TEST_DICT}),
|
||||
('resource2', {'foo': {"foo": {"nested_boolstr": False}}})
|
||||
)
|
||||
created = self.setup_resources(resources)
|
||||
|
||||
found = self.resources_from_response(
|
||||
self.get_resources(vars='foo.foo.nested_boolstr:"false"'))
|
||||
|
||||
self.assertEqual(1, len(found))
|
||||
self.assertEqual(created[0]['id'], found[0]['id'])
|
||||
self.assertEqual(created[0]['variables'], found[0]['variables'])
|
||||
|
||||
def test_jsonpath_search_nested_null(self):
|
||||
resources = (
|
||||
('resource1', {'foo': TEST_DICT}),
|
||||
('resource2', {'foo': {"foo": {"nested_null": 'test'}}})
|
||||
)
|
||||
created = self.setup_resources(resources)
|
||||
|
||||
found = self.resources_from_response(
|
||||
self.get_resources(vars='foo.foo.nested_null:null'))
|
||||
|
||||
self.assertEqual(1, len(found))
|
||||
self.assertEqual(created[0]['id'], found[0]['id'])
|
||||
self.assertEqual(created[0]['variables'], found[0]['variables'])
|
||||
|
||||
def test_jsonpath_search_hyphenated(self):
|
||||
resources = (
|
||||
('resource1', {'foo': TEST_DICT}),
|
||||
('resource2', {'foo': {"foo": {"hyphenated-key": 'test-test'}}})
|
||||
)
|
||||
created = self.setup_resources(resources)
|
||||
|
||||
found = self.resources_from_response(self.get_resources(
|
||||
vars='foo.foo."hyphenated-key":"look-at-all-these-hyphens!"'))
|
||||
|
||||
self.assertEqual(1, len(found))
|
||||
self.assertEqual(created[0]['id'], found[0]['id'])
|
||||
self.assertEqual(created[0]['variables'], found[0]['variables'])
|
||||
|
||||
def test_jsonpath_search_key_with_period(self):
|
||||
resources = (
|
||||
('resource1', {'v3.0': TEST_DICT}),
|
||||
('resource2', {'v3.0': {"foo": {"hyphenated-key": 'test-test'}}})
|
||||
)
|
||||
created = self.setup_resources(resources)
|
||||
|
||||
found = self.resources_from_response(self.get_resources(
|
||||
vars='"v3.0".foo."hyphenated-key":"look-at-all-these-hyphens!"'))
|
||||
|
||||
self.assertEqual(1, len(found))
|
||||
self.assertEqual(created[0]['id'], found[0]['id'])
|
||||
self.assertEqual(created[0]['variables'], found[0]['variables'])
|
||||
|
||||
def test_jsonpath_search_non_string_member(self):
|
||||
self.setup_resources((
|
||||
('resource1', {'v3.0': TEST_DICT}),
|
||||
))
|
||||
|
||||
resp = self.get_resources(
|
||||
vars='v3.0.foo."hyphenated-key":"look-at-all-these-hyphens!"')
|
||||
self.assertBadRequest(resp)
|
||||
self.assertEqual(exceptions.InvalidJSONPath.msg,
|
||||
resp.json()['message'])
|
||||
|
||||
def test_jsonpath_search_hyphenated_without_quotes(self):
|
||||
self.setup_resources((
|
||||
('resource1', {'v3.0': TEST_DICT}),
|
||||
))
|
||||
|
||||
resp = self.get_resources(
|
||||
vars='foo.hyphenated-key:"look-at-all-these-hyphens!"')
|
||||
self.assertBadRequest(resp)
|
||||
self.assertEqual(exceptions.InvalidJSONPath.msg,
|
||||
resp.json()['message'])
|
||||
|
||||
def test_jsonpath_search_invalid_first_key(self):
|
||||
self.setup_resources((
|
||||
('resource1', {'v3.0': TEST_DICT}),
|
||||
))
|
||||
|
||||
resp = self.get_resources(vars='[*]foo.bar:"string"')
|
||||
self.assertBadRequest(resp)
|
||||
self.assertEqual(exceptions.InvalidJSONPath.msg,
|
||||
resp.json()['message'])
|
||||
|
||||
def test_jsonpath_search_bad_json_string_value(self):
|
||||
self.setup_resources((
|
||||
('resource1', {'v3.0': TEST_DICT}),
|
||||
))
|
||||
|
||||
resp = self.get_resources(vars='foo.bar:string')
|
||||
self.assertBadRequest(resp)
|
||||
self.assertEqual(exceptions.InvalidJSONValue.msg,
|
||||
resp.json()['message'])
|
||||
|
||||
|
||||
class ProjectsJSONPathSearchTestCase(functional.TestCase,
|
||||
JSONPathSearchTestCaseMixin):
|
||||
resource = 'projects'
|
||||
|
||||
|
||||
class CloudsJSONPathSearchTestCase(functional.TestCase,
|
||||
JSONPathSearchTestCaseMixin):
|
||||
resource = 'clouds'
|
||||
|
||||
|
||||
class RegionsJSONPathSearchTestCase(functional.TestCase,
|
||||
JSONPathSearchTestCaseMixin):
|
||||
resource = 'regions'
|
||||
|
||||
|
||||
class CellsJSONPathSearchTestCase(functional.TestCase,
|
||||
JSONPathSearchTestCaseMixin):
|
||||
resource = 'cells'
|
||||
|
||||
|
||||
class NetworksJSONPathSearchTestCase(functional.TestCase,
|
||||
JSONPathSearchTestCaseMixin):
|
||||
resource = 'networks'
|
||||
|
||||
|
||||
class NetworkDevicesJSONPathSearchTestCase(functional.TestCase,
|
||||
JSONPathSearchTestCaseMixin):
|
||||
resource = 'network-devices'
|
||||
|
||||
|
||||
class HostsJSONPathSearchTestCase(functional.TestCase,
|
||||
JSONPathSearchTestCaseMixin):
|
||||
resource = 'hosts'
|
|
@ -1,30 +1,13 @@
|
|||
import copy
|
||||
|
||||
from craton.tests import functional
|
||||
from craton.tests.functional.test_variable_calls import \
|
||||
APIV1ResourceWithVariablesTestCase
|
||||
|
||||
|
||||
class ProjectTests(functional.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(ProjectTests, self).setUp()
|
||||
self.root_headers = copy.deepcopy(self.session.headers)
|
||||
self.root_headers[functional.HEADER_USERNAME] = \
|
||||
functional.FAKE_DATA_GEN_BOOTSTRAP_USERNAME
|
||||
self.root_headers[functional.HEADER_TOKEN] = \
|
||||
functional.FAKE_DATA_GEN_BOOTSTRAP_TOKEN
|
||||
|
||||
def tearDown(self):
|
||||
super(ProjectTests, self).tearDown()
|
||||
|
||||
|
||||
class TestPaginationOfProjects(ProjectTests):
|
||||
class TestPaginationOfProjects(functional.TestCase):
|
||||
def setUp(self):
|
||||
super(TestPaginationOfProjects, self).setUp()
|
||||
self.projects = [
|
||||
self.create_project('project-{}'.format(i),
|
||||
headers=self.root_headers)
|
||||
self.create_project('project-{}'.format(i))
|
||||
for i in range(0, 61)
|
||||
]
|
||||
|
||||
|
@ -38,7 +21,7 @@ class TestPaginationOfProjects(ProjectTests):
|
|||
self.assertEqual(30, len(projects))
|
||||
|
||||
def test_lists_projects_with_the_same_name(self):
|
||||
self.create_project('project-0', headers=self.root_headers)
|
||||
self.create_project('project-0')
|
||||
|
||||
response = self.get(self.url + '/v1/projects',
|
||||
name='project-0',
|
||||
|
@ -48,29 +31,26 @@ class TestPaginationOfProjects(ProjectTests):
|
|||
self.assertEqual(2, len(projects))
|
||||
|
||||
|
||||
class APIV1ProjectTest(ProjectTests, APIV1ResourceWithVariablesTestCase):
|
||||
class APIV1ProjectTest(APIV1ResourceWithVariablesTestCase):
|
||||
|
||||
resource = 'projects'
|
||||
|
||||
def test_project_create_with_variables(self):
|
||||
variables = {'a': 'b'}
|
||||
project_name = 'test'
|
||||
project = self.create_project(project_name,
|
||||
headers=self.root_headers,
|
||||
variables=variables)
|
||||
project = self.create_project(project_name, variables=variables)
|
||||
self.assertEqual(project_name, project['name'])
|
||||
self.assertEqual(variables, project['variables'])
|
||||
|
||||
def test_create_project_supports_vars_ops(self):
|
||||
project = self.create_project('test', headers=self.root_headers,
|
||||
variables={'a': 'b'})
|
||||
project = self.create_project('test', variables={'a': 'b'})
|
||||
self.assert_vars_get_expected(project['id'], {'a': 'b'})
|
||||
self.assert_vars_can_be_set(project['id'])
|
||||
self.assert_vars_can_be_deleted(project['id'])
|
||||
|
||||
def test_project_create_with_duplicate_name_works(self):
|
||||
project_name = 'test'
|
||||
self.create_project(project_name, headers=self.root_headers)
|
||||
self.create_project(project_name)
|
||||
url = self.url + '/v1/projects'
|
||||
payload = {'name': project_name}
|
||||
project = self.post(url, headers=self.root_headers, data=payload)
|
||||
|
@ -79,9 +59,9 @@ class APIV1ProjectTest(ProjectTests, APIV1ResourceWithVariablesTestCase):
|
|||
def test_project_get_all_with_name_filter(self):
|
||||
proj1 = 'test1'
|
||||
proj2 = 'test2'
|
||||
self.create_project(proj2, headers=self.root_headers)
|
||||
self.create_project(proj2)
|
||||
for i in range(3):
|
||||
self.create_project(proj1, headers=self.root_headers)
|
||||
self.create_project(proj1)
|
||||
url = self.url + '/v1/projects?name={}'.format(proj1)
|
||||
resp = self.get(url, headers=self.root_headers)
|
||||
projects = resp.json()['projects']
|
||||
|
@ -92,15 +72,14 @@ class APIV1ProjectTest(ProjectTests, APIV1ResourceWithVariablesTestCase):
|
|||
def test_get_project_details(self):
|
||||
project_name = 'test'
|
||||
project_vars = {"who": "that"}
|
||||
project = self.create_project(project_name, headers=self.root_headers,
|
||||
variables=project_vars)
|
||||
project = self.create_project(project_name, variables=project_vars)
|
||||
url = self.url + '/v1/projects/{}'.format(project['id'])
|
||||
project_with_detail = self.get(url, headers=self.root_headers)
|
||||
self.assertEqual(project_name, project_with_detail.json()['name'])
|
||||
self.assertEqual(project_vars, project_with_detail.json()['variables'])
|
||||
|
||||
def test_project_delete(self):
|
||||
project1 = self.create_project('test1', headers=self.root_headers)
|
||||
project1 = self.create_project('test1')
|
||||
url = self.url + '/v1/projects'
|
||||
projects = self.get(url, headers=self.root_headers)
|
||||
# NOTE(thomasem): Have to include the default project created by
|
||||
|
@ -115,7 +94,7 @@ class APIV1ProjectTest(ProjectTests, APIV1ResourceWithVariablesTestCase):
|
|||
|
||||
def test_project_variables_update(self):
|
||||
project_name = 'test'
|
||||
project = self.create_project(project_name, headers=self.root_headers)
|
||||
project = self.create_project(project_name)
|
||||
variables = {"bumbleywump": "cucumberpatch"}
|
||||
|
||||
put_url = self.url + '/v1/projects/{}/variables'.format(project['id'])
|
||||
|
@ -135,8 +114,7 @@ class APIV1ProjectTest(ProjectTests, APIV1ResourceWithVariablesTestCase):
|
|||
expected_vars = {'foo': 'bar'}
|
||||
variables.update(expected_vars)
|
||||
|
||||
project = self.create_project(project_name, headers=self.root_headers,
|
||||
variables=variables)
|
||||
project = self.create_project(project_name, variables=variables)
|
||||
self.assert_vars_get_expected(project['id'], variables)
|
||||
self.assert_vars_can_be_deleted(project['id'])
|
||||
|
||||
|
|
|
@ -1,22 +1,10 @@
|
|||
import copy
|
||||
|
||||
from craton.tests import functional
|
||||
|
||||
|
||||
class UserTests(functional.TestCase):
|
||||
def setUp(self):
|
||||
super(UserTests, self).setUp()
|
||||
self.root_headers = copy.deepcopy(self.session.headers)
|
||||
self.root_headers[functional.HEADER_USERNAME] = \
|
||||
functional.FAKE_DATA_GEN_BOOTSTRAP_USERNAME
|
||||
self.root_headers[functional.HEADER_TOKEN] = \
|
||||
functional.FAKE_DATA_GEN_BOOTSTRAP_TOKEN
|
||||
|
||||
def tearDown(self):
|
||||
super(UserTests, self).tearDown()
|
||||
|
||||
def test_create_user(self):
|
||||
project = self.create_project('test', headers=self.root_headers)
|
||||
project = self.create_project('test')
|
||||
url = self.url + '/v1/users'
|
||||
payload = {'username': 'testuser', 'project_id': project['id']}
|
||||
user = self.post(url, data=payload)
|
||||
|
@ -25,7 +13,7 @@ class UserTests(functional.TestCase):
|
|||
self.assertEqual(payload['project_id'], user.json()['project_id'])
|
||||
|
||||
def test_create_user_with_admin_priv(self):
|
||||
project = self.create_project('test', headers=self.root_headers)
|
||||
project = self.create_project('test')
|
||||
url = self.url + '/v1/users'
|
||||
payload = {'username': 'testuser', 'project_id': project['id'],
|
||||
'is_admin': True}
|
||||
|
|
|
@ -6,6 +6,7 @@ decorator>=3.4.0 # BSD
|
|||
Flask!=0.11,<1.0,>=0.10 # BSD
|
||||
Flask-RESTful>=0.3.5 # BSD
|
||||
jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT
|
||||
jsonpath-rw>=1.2.0,<2.0 # Apache-2.0
|
||||
kazoo>=2.2 # Apache-2.0
|
||||
keystonemiddleware>=4.12.0 # Apache-2.0
|
||||
oslo.db>=4.15.0 # Apache-2.0
|
||||
|
|
|
@ -10,6 +10,7 @@ while [[ RET -ne 0 ]]; do
|
|||
RET=$?
|
||||
done
|
||||
|
||||
mysql -uroot -e "SET GLOBAL log_output = 'TABLE';SET GLOBAL general_log = 'ON';"
|
||||
mysql -uroot -e "CREATE DATABASE craton CHARACTER SET = 'utf8'"
|
||||
mysql -uroot -e "GRANT ALL PRIVILEGES ON craton.* TO 'craton'@'%' IDENTIFIED BY 'craton'"
|
||||
mysqladmin flush-privileges
|
||||
|
|
Loading…
Reference in New Issue