Remove API V1 Support Part-1

Also support the sqlalchemy 2.0 in backend.

Zaqar has be planing to remove the V1 APIs support for a while.
As the first step, this patch will remove the v1 funcational test
in wsgi.

Change-Id: I0b7b57a5fb4d203441889b0166a4c35180de2466
This commit is contained in:
hwang 2024-05-06 14:00:47 -07:00
parent 47f8c9b3da
commit cf5f9a6128
10 changed files with 65 additions and 1146 deletions

View File

@ -41,17 +41,17 @@ def _match(project, queue):
class CatalogueController(base.CatalogueBase):
def list(self, project):
stmt = sa.sql.select([tables.Catalogue]).where(
stmt = sa.sql.select(tables.Catalogue).where(
tables.Catalogue.c.project == project
)
cursor = self.driver.run(stmt)
cursor = self.driver.fetch_all(stmt)
return (_normalize(v) for v in cursor)
def get(self, project, queue):
stmt = sa.sql.select([tables.Catalogue]).where(
stmt = sa.sql.select(tables.Catalogue).where(
_match(project, queue)
)
entry = self.driver.run(stmt).fetchone()
entry = self.driver.fetch_one(stmt)
if entry is None:
raise errors.QueueNotMapped(queue, project)

View File

@ -47,10 +47,6 @@ class ControlDriver(storage.ControlDriverBase):
if (uri.startswith('mysql://') or
uri.startswith('mysql+pymysql://')):
# oslo_db.create_engine makes a test connection, throw that out
# first. mysql time_zone can be added to oslo_db as a
# startup option
engine.dispose()
sa.event.listen(engine, 'connect',
self._mysql_on_connect)
@ -65,7 +61,18 @@ class ControlDriver(storage.ControlDriverBase):
# closes it once the operations are completed
# TODO(wangxiyuan): we should migrate to oslo.db asap.
def run(self, *args, **kwargs):
return self.engine.execute(*args, **kwargs)
with self.engine.connect() as conn:
result = conn.execute(*args, **kwargs)
conn.commit()
return result
def fetch_all(self, *args, **kwargs):
with self.engine.connect() as conn:
return conn.execute(*args, **kwargs).fetchall()
def fetch_one(self, *args, **kwargs):
with self.engine.connect() as conn:
return conn.execute(*args, **kwargs).fetchone()
def close(self):
pass

View File

@ -39,14 +39,14 @@ class FlavorsController(base.FlavorsBase):
# TODO(cpp-cabrera): optimization - limit the columns returned
# when detailed=False by specifying them in the select()
# clause
stmt = sa.sql.select([tables.Flavors]).where(
stmt = sa.sql.select(tables.Flavors).where(
sa.and_(tables.Flavors.c.name > marker,
tables.Flavors.c.project == project)
)
if limit > 0:
stmt = stmt.limit(limit)
cursor = self.driver.run(stmt)
cursor = self.driver.fetch_all(stmt)
marker_name = {}
@ -60,12 +60,12 @@ class FlavorsController(base.FlavorsBase):
@utils.raises_conn_error
def get(self, name, project=None, detailed=False):
stmt = sa.sql.select([tables.Flavors]).where(
stmt = sa.sql.select(tables.Flavors).where(
sa.and_(tables.Flavors.c.name == name,
tables.Flavors.c.project == project)
)
flavor = self.driver.run(stmt).fetchone()
flavor = self.driver.fetch_one(stmt)
if flavor is None:
raise errors.FlavorDoesNotExist(name)
@ -90,11 +90,11 @@ class FlavorsController(base.FlavorsBase):
@utils.raises_conn_error
def exists(self, name, project=None):
stmt = sa.sql.select([tables.Flavors.c.name]).where(
stmt = sa.sql.select(tables.Flavors.c.name).where(
sa.and_(tables.Flavors.c.name == name,
tables.Flavors.c.project == project)
).limit(1)
return self.driver.run(stmt).fetchone() is not None
return self.driver.fetch_one(stmt) is not None
@utils.raises_conn_error
def update(self, name, project=None, capabilities=None):

View File

@ -38,15 +38,15 @@ class PoolsController(base.PoolsBase):
# TODO(cpp-cabrera): optimization - limit the columns returned
# when detailed=False by specifying them in the select()
# clause
stmt = sa.sql.select([tables.Pools.c.name, tables.Pools.c.uri,
tables.Pools.c.weight,
tables.Pools.c.options,
tables.Pools.c.flavor]).where(
stmt = sa.sql.select(tables.Pools.c.name, tables.Pools.c.uri,
tables.Pools.c.weight,
tables.Pools.c.options,
tables.Pools.c.flavor).where(
tables.Pools.c.name > marker
)
if limit > 0:
stmt = stmt.limit(limit)
cursor = self.driver.run(stmt)
cursor = self.driver.fetch_all(stmt)
marker_name = {}
@ -63,37 +63,38 @@ class PoolsController(base.PoolsBase):
flavor_name = flavor.get("name", None) if flavor is not None\
else None
if flavor_name is not None:
stmt = sa.sql.select([tables.Pools]).where(
stmt = sa.sql.select(tables.Pools.c.name, tables.Pools.c.uri,
tables.Pools.c.weight,
tables.Pools.c.options,
tables.Pools.c.flavor).where(
tables.Pools.c.flavor == flavor_name
)
else:
stmt = sa.sql.select([tables.Pools])
stmt = sa.sql.select(tables.Pools.c.name, tables.Pools.c.uri,
tables.Pools.c.weight,
tables.Pools.c.options,
tables.Pools.c.flavor)
cursor = self.driver.run(stmt)
cursor = self.driver.fetch_all(stmt)
normalizer = functools.partial(_normalize, detailed=detailed)
return (normalizer(v) for v in cursor)
get_result = (normalizer(v) for v in cursor)
return get_result
@utils.raises_conn_error
def _get(self, name, detailed=False):
stmt = sa.sql.select([tables.Pools]).where(
stmt = sa.sql.select(tables.Pools.c.name, tables.Pools.c.uri,
tables.Pools.c.weight, tables.Pools.c.options,
tables.Pools.c.flavor).where(
tables.Pools.c.name == name
)
pool = self.driver.run(stmt).fetchone()
pool = self.driver.fetch_one(stmt)
if pool is None:
raise errors.PoolDoesNotExist(name)
return _normalize(pool, detailed)
def _ensure_group_exists(self, name):
try:
stmt = sa.sql.expression.insert(tables.PoolGroup).values(name=name)
self.driver.run(stmt)
return True
except oslo_db.exception.DBDuplicateEntry:
return False
# TODO(cpp-cabrera): rename to upsert
@utils.raises_conn_error
def _create(self, name, weight, uri, flavor=None,
@ -101,7 +102,7 @@ class PoolsController(base.PoolsBase):
opts = None if options is None else utils.json_encode(options)
try:
stmt = sa.sql.expression.insert(tables.Pools).values(
stmt = sa.sql.insert(tables.Pools).values(
name=name, weight=weight, uri=uri,
flavor=flavor, options=opts
)
@ -115,10 +116,9 @@ class PoolsController(base.PoolsBase):
@utils.raises_conn_error
def _exists(self, name):
stmt = sa.sql.select([tables.Pools.c.name]).where(
tables.Pools.c.name == name
).limit(1)
return self.driver.run(stmt).fetchone() is not None
stmt = sa.sql.select(tables.Pools.c.name).where(
tables.Pools.c.name == name).limit(1)
return self.driver.fetch_one(stmt) is not None
@utils.raises_conn_error
def _update(self, name, **kwargs):

View File

@ -30,19 +30,20 @@ class QueueController(storage.Queue):
if project is None:
project = ''
fields = [tables.Queues.c.name]
fields = tables.Queues.c.name
if detailed:
fields.append(tables.Queues.c.metadata)
fields = tables.Queues.c["name", "metadata"]
if marker:
sel = sa.sql.select(fields, sa.and_(
tables.Queues.c.project == project,
tables.Queues.c.name > marker))
sel = sa.sql.select(fields).where(sa.and_(
tables.Queues.c.project == project,
tables.Queues.c.name > marker))
else:
sel = sa.sql.select(fields, tables.Queues.c.project == project)
sel = sa.sql.select(fields).where(
tables.Queues.c.project == project)
sel = sel.order_by(sa.asc(tables.Queues.c.name)).limit(limit)
records = self.driver.run(sel)
records = self.driver.fetch_all(sel)
marker_name = {}
@ -60,12 +61,10 @@ class QueueController(storage.Queue):
if project is None:
project = ''
sel = sa.sql.select([tables.Queues.c.metadata], sa.and_(
tables.Queues.c.project == project,
tables.Queues.c.name == name
))
sel = sa.sql.select(tables.Queues.c.metadata).where(sa.and_(
tables.Queues.c.project == project, tables.Queues.c.name == name))
queue = self.driver.run(sel).fetchone()
queue = self.driver.fetch_one(sel)
if queue is None:
raise errors.QueueDoesNotExist(name, project)
@ -96,14 +95,10 @@ class QueueController(storage.Queue):
if project is None:
project = ''
sel = sa.sql.select([tables.Queues.c.id], sa.and_(
tables.Queues.c.project == project,
tables.Queues.c.name == name
))
res = self.driver.run(sel)
r = res.fetchone()
res.close()
return r is not None
sel = sa.sql.select(tables.Queues.c.id).where(sa.and_(
tables.Queues.c.project == project, tables.Queues.c.name == name))
res = self.driver.fetch_one(sel)
return res is not None
def set_metadata(self, name, metadata, project):
if project is None:
@ -138,9 +133,7 @@ class QueueController(storage.Queue):
def _calculate_resource_count(self, project=None):
if project is None:
project = ''
sel = sa.sql.select([sa.sql.func.count('*')],
tables.Queues.c.project == project)
res = self.driver.run(sel)
r = res.fetchone()
res.close()
return r is not None
sel = sa.sql.select(sa.sql.func.count('*')).where(
tables.Queues.c.project == project)
res = self.driver.fetch_one(sel)
return res is not None

View File

@ -21,7 +21,6 @@ import os
import jsonschema
from oslo_utils import timeutils
from zaqar.api.v1 import response as response_v1
from zaqar.api.v1_1 import response as response_v1_1
from zaqar.api.v2 import response as response_v2
from zaqar import bootstrap
@ -387,12 +386,6 @@ class ZaqarAdminServer(Server):
return server.run
class V1FunctionalTestBase(FunctionalTestBase):
def setUp(self):
super(V1FunctionalTestBase, self).setUp()
self.response = response_v1.ResponseSchema(self.limits)
class V1_1FunctionalTestBase(FunctionalTestBase):
def setUp(self):
super(V1_1FunctionalTestBase, self).setUp()

View File

@ -1,259 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
import ddt
from zaqar.tests.functional import base
from zaqar.tests.functional import helpers
@ddt.ddt
class TestClaims(base.V1FunctionalTestBase):
"""Tests for Claims."""
server_class = base.ZaqarServer
def setUp(self):
super(TestClaims, self).setUp()
self.queue = uuid.uuid1()
self.queue_url = ("{url}/{version}/queues/{queue}".format(
url=self.cfg.zaqar.url,
version="v1",
queue=self.queue))
self.client.put(self.queue_url)
self.claim_url = self.queue_url + '/claims'
self.client.set_base_url(self.claim_url)
# Post Messages
url = self.queue_url + '/messages'
doc = helpers.create_message_body(
messagecount=self.limits.max_messages_per_page)
for i in range(10):
result = self.client.post(url, data=doc)
self.assertEqual(201, result.status_code)
@ddt.data({}, dict(limit=2))
def test_claim_messages(self, params):
"""Claim messages."""
message_count = params.get('limit',
self.limits.max_messages_per_claim_or_pop)
doc = {"ttl": 300, "grace": 100}
result = self.client.post(params=params, data=doc)
self.assertEqual(201, result.status_code)
actual_message_count = len(result.json())
self.assertMessageCount(actual_message_count, message_count)
response_headers = set(result.headers.keys())
self.assertIsSubset(self.headers_response_with_body, response_headers)
self.assertSchema(result.json(), 'claim_create')
test_claim_messages.tags = ['smoke', 'positive']
def test_query_claim(self):
"""Query Claim."""
params = {'limit': 1}
doc = {"ttl": 300, "grace": 100}
result = self.client.post(params=params, data=doc)
self.assertEqual(201, result.status_code)
location = result.headers['Location']
url = self.cfg.zaqar.url + location
result = self.client.get(url)
self.assertEqual(200, result.status_code)
self.assertSchema(result.json(), 'claim_get')
test_query_claim.tags = ['smoke', 'positive']
def test_claim_more_than_allowed(self):
"""Claim more than max allowed per request.
Zaqar allows a maximum of 20 messages per claim by default.
"""
params = {"limit": self.limits.max_messages_per_claim_or_pop + 1}
doc = {"ttl": 300, "grace": 100}
result = self.client.post(params=params, data=doc)
self.assertEqual(400, result.status_code)
test_claim_more_than_allowed.tags = ['negative']
def test_claim_patch(self):
"""Update Claim."""
# Test Setup - Post Claim
doc = {"ttl": 300, "grace": 400}
result = self.client.post(data=doc)
self.assertEqual(201, result.status_code)
# Patch Claim
claim_location = result.headers['Location']
url = self.cfg.zaqar.url + claim_location
doc_updated = {"ttl": 300}
result = self.client.patch(url, data=doc_updated)
self.assertEqual(204, result.status_code)
# verify that the claim TTL is updated
result = self.client.get(url)
new_ttl = result.json()['ttl']
self.assertEqual(300, new_ttl)
test_claim_patch.tags = ['smoke', 'positive']
def test_delete_claimed_message(self):
"""Delete message belonging to a Claim."""
# Test Setup - Post claim
doc = {"ttl": 60, "grace": 60}
result = self.client.post(data=doc)
self.assertEqual(201, result.status_code)
# Delete Claimed Messages
for rst in result.json():
href = rst['href']
url = self.cfg.zaqar.url + href
result = self.client.delete(url)
self.assertEqual(204, result.status_code)
test_delete_claimed_message.tags = ['smoke', 'positive']
def test_claim_release(self):
"""Release Claim."""
doc = {"ttl": 300, "grace": 100}
result = self.client.post(data=doc)
self.assertEqual(201, result.status_code)
# Extract claim location and construct the claim URL.
location = result.headers['Location']
url = self.cfg.zaqar.url + location
# Release Claim.
result = self.client.delete(url)
self.assertEqual(204, result.status_code)
test_claim_release.tags = ['smoke', 'positive']
@ddt.data(10000000000000000000, -100, 1, 59, 43201, -10000000000000000000)
def test_claim_invalid_ttl(self, ttl):
"""Post Claim with invalid TTL.
The request JSON body will have a TTL value
outside the allowed range.Allowed ttl values is
60 <= ttl <= 43200.
"""
doc = {"ttl": ttl, "grace": 100}
result = self.client.post(data=doc)
self.assertEqual(400, result.status_code)
test_claim_invalid_ttl.tags = ['negative']
@ddt.data(10000000000000000000, -100, 1, 59, 43201, -10000000000000000000)
def test_claim_invalid_grace(self, grace):
"""Post Claim with invalid grace.
The request JSON body will have a grace value
outside the allowed range.Allowed grace values is
60 <= grace <= 43200.
"""
doc = {"ttl": 100, "grace": grace}
result = self.client.post(data=doc)
self.assertEqual(400, result.status_code)
test_claim_invalid_grace.tags = ['negative']
@ddt.data(0, -100, 30, 10000000000000000000)
def test_claim_invalid_limit(self, grace):
"""Post Claim with invalid limit.
The request url will have a limit outside the allowed range.
Allowed limit values are 0 < limit <= 20(default max).
"""
doc = {"ttl": 100, "grace": grace}
result = self.client.post(data=doc)
self.assertEqual(400, result.status_code)
test_claim_invalid_limit.tags = ['negative']
@ddt.data(10000000000000000000, -100, 1, 59, 43201, -10000000000000000000)
def test_patch_claim_invalid_ttl(self, ttl):
"""Patch Claim with invalid TTL.
The request JSON body will have a TTL value
outside the allowed range.Allowed ttl values is
60 <= ttl <= 43200.
"""
doc = {"ttl": 100, "grace": 100}
result = self.client.post(data=doc)
self.assertEqual(201, result.status_code)
# Extract claim location and construct the claim URL.
location = result.headers['Location']
url = self.cfg.zaqar.url + location
# Patch Claim.
doc = {"ttl": ttl}
result = self.client.patch(url, data=doc)
self.assertEqual(400, result.status_code)
test_patch_claim_invalid_ttl.tags = ['negative']
def test_query_non_existing_claim(self):
"""Query Non Existing Claim."""
path = '/non-existing-claim'
result = self.client.get(path)
self.assertEqual(404, result.status_code)
test_query_non_existing_claim.tags = ['negative']
def test_patch_non_existing_claim(self):
"""Patch Non Existing Claim."""
path = '/non-existing-claim'
doc = {"ttl": 400}
result = self.client.patch(path, data=doc)
self.assertEqual(404, result.status_code)
test_patch_non_existing_claim.tags = ['negative']
def test_delete_non_existing_claim(self):
"""Patch Non Existing Claim."""
path = '/non-existing-claim'
result = self.client.delete(path)
self.assertEqual(204, result.status_code)
test_delete_non_existing_claim.tags = ['negative']
def tearDown(self):
"""Delete Queue after Claim Test."""
super(TestClaims, self).tearDown()
self.client.delete(self.queue_url)

View File

@ -1,379 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
import ddt
from oslo_serialization import jsonutils
from zaqar.tests.functional import base
from zaqar.tests.functional import helpers
@ddt.ddt
class TestMessages(base.V1FunctionalTestBase):
"""Tests for Messages."""
server_class = base.ZaqarServer
def setUp(self):
super(TestMessages, self).setUp()
self.queue = uuid.uuid1()
self.queue_url = ("{url}/{version}/queues/{queue}".format(
url=self.cfg.zaqar.url,
version="v1",
queue=self.queue))
self.client.put(self.queue_url)
self.message_url = self.queue_url + '/messages'
self.client.set_base_url(self.message_url)
def tearDown(self):
self.client.delete(self.queue_url)
super(TestMessages, self).tearDown()
def _post_large_bulk_insert(self, offset):
"""Insert just under than max allowed messages."""
message1 = {"body": '', "ttl": 300}
message2 = {"body": '', "ttl": 120}
doc = [message1, message2]
overhead = len(jsonutils.dumps(doc))
half_size = (self.limits.max_messages_post_size - overhead) // 2
message1['body'] = helpers.generate_random_string(half_size)
message2['body'] = helpers.generate_random_string(half_size + offset)
return self.client.post(data=doc)
def test_message_single_insert(self):
"""Insert Single Message into the Queue.
This test also verifies that claimed messages are
retuned (or not) depending on the include_claimed flag.
"""
doc = helpers.create_message_body(messagecount=1)
result = self.client.post(data=doc)
self.assertEqual(201, result.status_code)
response_headers = set(result.headers.keys())
self.assertIsSubset(self.headers_response_with_body, response_headers)
# GET on posted message
href = result.json()['resources'][0]
url = self.cfg.zaqar.url + href
result = self.client.get(url)
self.assertEqual(200, result.status_code)
# Compare message metadata
result_body = result.json()['body']
posted_metadata = doc[0]['body']
self.assertEqual(posted_metadata, result_body)
# Post a claim & verify the include_claimed flag.
url = self.queue_url + '/claims'
doc = {"ttl": 300, "grace": 100}
result = self.client.post(url, data=doc)
self.assertEqual(201, result.status_code)
params = {'include_claimed': True,
'echo': True}
result = self.client.get(params=params)
self.assertEqual(200, result.status_code)
response_message_body = result.json()["messages"][0]["body"]
self.assertEqual(posted_metadata, response_message_body)
# By default, include_claimed = false
result = self.client.get(self.message_url)
self.assertEqual(204, result.status_code)
test_message_single_insert.tags = ['smoke', 'positive']
def test_message_bulk_insert(self):
"""Bulk Insert Messages into the Queue."""
message_count = self.limits.max_messages_per_page
doc = helpers.create_message_body(messagecount=message_count)
result = self.client.post(data=doc)
self.assertEqual(201, result.status_code)
# GET on posted messages
location = result.headers['location']
url = self.cfg.zaqar.url + location
result = self.client.get(url)
self.assertEqual(200, result.status_code)
self.skipTest('Bug #1273335 - Get set of messages returns wrong hrefs '
'(happens randomly)')
# Verify that the response json schema matches the expected schema
self.assertSchema(result.json(), 'message_get_many')
# Compare message metadata
result_body = [result.json()[i]['body']
for i in range(len(result.json()))]
result_body.sort()
posted_metadata = [doc[i]['body']
for i in range(message_count)]
posted_metadata.sort()
self.assertEqual(posted_metadata, result_body)
test_message_bulk_insert.tags = ['smoke', 'positive']
@ddt.data({}, {'limit': 5})
def test_get_message(self, params):
"""Get Messages."""
expected_msg_count = params.get('limit', 10)
# Test Setup
doc = helpers.create_message_body(
messagecount=self.limits.max_messages_per_page)
result = self.client.post(data=doc)
self.assertEqual(201, result.status_code)
url = ''
params['echo'] = True
# Follow the hrefs & perform GET, till the end of messages i.e. http
# 204
while result.status_code in [201, 200]:
result = self.client.get(url, params=params)
self.assertIn(result.status_code, [200, 204])
if result.status_code == 200:
actual_msg_count = len(result.json()['messages'])
self.assertMessageCount(actual_msg_count, expected_msg_count)
self.assertSchema(result.json(), 'message_list')
href = result.json()['links'][0]['href']
url = self.cfg.zaqar.url + href
self.assertEqual(204, result.status_code)
test_get_message.tags = ['smoke', 'positive']
def test_message_delete(self):
"""Delete Message."""
# Test Setup
doc = helpers.create_message_body(messagecount=1)
result = self.client.post(data=doc)
self.assertEqual(201, result.status_code)
# Delete posted message
href = result.json()['resources'][0]
url = self.cfg.zaqar.url + href
result = self.client.delete(url)
self.assertEqual(204, result.status_code)
result = self.client.get(url)
self.assertEqual(404, result.status_code)
test_message_delete.tags = ['smoke', 'positive']
def test_message_bulk_delete(self):
"""Bulk Delete Messages."""
doc = helpers.create_message_body(messagecount=10)
result = self.client.post(data=doc)
self.assertEqual(201, result.status_code)
# Delete posted messages
location = result.headers['Location']
url = self.cfg.zaqar.url + location
result = self.client.delete(url)
self.assertEqual(204, result.status_code)
result = self.client.get(url)
self.assertEqual(204, result.status_code)
test_message_bulk_delete.tags = ['smoke', 'positive']
def test_message_delete_nonexisting(self):
"""Delete non-existing Messages."""
result = self.client.delete('/non-existing')
self.assertEqual(204, result.status_code)
test_message_delete_nonexisting.tags = ['negative']
def test_message_partial_delete(self):
"""Delete Messages will be partially successful."""
doc = helpers.create_message_body(messagecount=3)
result = self.client.post(data=doc)
self.assertEqual(201, result.status_code)
# Delete posted message
location = result.headers['Location']
url = self.cfg.zaqar.url + location
url += ',nonexisting'
result = self.client.delete(url)
self.assertEqual(204, result.status_code)
test_message_partial_delete.tags = ['negative']
def test_message_partial_get(self):
"""Get Messages will be partially successful."""
doc = helpers.create_message_body(messagecount=3)
result = self.client.post(data=doc)
self.assertEqual(201, result.status_code)
# Get posted message and a nonexisting message
location = result.headers['Location']
url = self.cfg.zaqar.url + location
url += ',nonexisting'
result = self.client.get(url)
self.assertEqual(200, result.status_code)
self.assertSchema(result.json(), "message_get_many")
test_message_partial_get.tags = ['negative']
@ddt.data(-10, -1, 0)
def test_message_bulk_insert_large_bodies(self, offset):
"""Insert just under than max allowed messages."""
result = self._post_large_bulk_insert(offset)
self.assertEqual(201, result.status_code)
test_message_bulk_insert_large_bodies.tags = ['positive']
@ddt.data(1, 10)
def test_message_bulk_insert_large_bodies_(self, offset):
"""Insert just under than max allowed messages."""
result = self._post_large_bulk_insert(offset)
self.assertEqual(400, result.status_code)
test_message_bulk_insert_large_bodies_.tags = ['negative']
def test_message_bulk_insert_oversized(self):
"""Insert more than max allowed size."""
doc = '[{{"body": "{0}", "ttl": 300}}, {{"body": "{1}", "ttl": 120}}]'
overhead = len(doc.format('', ''))
half_size = (self.limits.max_messages_post_size - overhead) // 2
doc = doc.format(helpers.generate_random_string(half_size),
helpers.generate_random_string(half_size + 1))
result = self.client.post(data=doc)
self.assertEqual(400, result.status_code)
test_message_bulk_insert_oversized.tags = ['negative']
@ddt.data(10000000000000000000, -100, 0, 30, -10000000000000000000)
def test_message_get_invalid_limit(self, limit):
"""Get Messages with invalid value for limit.
Allowed values for limit are 0 < limit <= 20(configurable).
"""
params = {'limit': limit}
result = self.client.get(params=params)
self.assertEqual(400, result.status_code)
test_message_get_invalid_limit.tags = ['negative']
def test_message_bulk_delete_negative(self):
"""Delete more messages than allowed in a single request.
By default, max messages that can be deleted in a single
request is 20.
"""
url = (self.message_url + '?ids=' +
','.join(str(i) for i in
range(self.limits.max_messages_per_page + 1)))
result = self.client.delete(url)
self.assertEqual(400, result.status_code)
test_message_bulk_delete_negative.tags = ['negative']
def test_message_bulk_get_negative(self):
"""GET more messages by id than allowed in a single request.
By default, max messages that can be fetched in a single
request is 20.
"""
url = (self.message_url + '?ids=' +
','.join(str(i) for i in
range(self.limits.max_messages_per_page + 1)))
result = self.client.get(url)
self.assertEqual(400, result.status_code)
test_message_bulk_get_negative.tags = ['negative']
def test_get_messages_malformed_marker(self):
"""Get messages with non-existing marker."""
url = self.message_url + '?marker=invalid'
result = self.client.get(url)
self.assertEqual(204, result.status_code)
test_get_messages_malformed_marker.tags = ['negative']
@ddt.data(None, '1234', 'aa2-bb3',
'103e09c6-31b7-11e3-86bc-b8ca3ad0f5d81',
'103e09c6-31b7-11e3-86bc-b8ca3ad0f5d')
def test_get_messages_invalid_client_id(self, client_id):
"""Get messages with invalid client id."""
url = self.message_url
header = helpers.create_zaqar_headers(self.cfg)
header['Client-ID'] = client_id
result = self.client.get(url, headers=header)
self.assertEqual(400, result.status_code)
test_get_messages_invalid_client_id.tags = ['negative']
def test_query_non_existing_message(self):
"""Get Non Existing Message."""
path = '/non-existing-message'
result = self.client.get(path)
self.assertEqual(404, result.status_code)
test_query_non_existing_message.tags = ['negative']
def test_query_non_existing_message_set(self):
"""Get Set of Non Existing Messages."""
path = '?ids=not_there1,not_there2'
result = self.client.get(path)
self.assertEqual(204, result.status_code)
test_query_non_existing_message_set.tags = ['negative']
def test_delete_non_existing_message(self):
"""Delete Non Existing Message."""
path = '/non-existing-message'
result = self.client.delete(path)
self.assertEqual(204, result.status_code)
test_delete_non_existing_message.tags = ['negative']

View File

@ -1,436 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
import ddt
from zaqar.tests.functional import base # noqa
from zaqar.tests.functional import helpers
class NamedBinaryStr(bytes):
"""Wrapper for bytes to facilitate overriding __name__."""
class NamedUnicodeStr(str):
"""Unicode string look-alike to facilitate overriding __name__."""
def __init__(self, value):
self._value = value
def __str__(self):
return self._value
def encode(self, enc):
return self._value.encode(enc)
def __format__(self, formatstr):
"""Workaround for ddt bug.
DDT will always call __format__ even when __name__ exists,
which blows up for Unicode strings under Py2.
"""
return ''
class NamedDict(dict):
"""Wrapper for dict to facilitate overriding __name__."""
def annotated(test_name, test_input):
if isinstance(test_input, dict):
annotated_input = NamedDict(test_input)
elif isinstance(test_input, str):
annotated_input = NamedUnicodeStr(test_input)
else:
annotated_input = NamedBinaryStr(test_input)
setattr(annotated_input, '__name__', test_name)
return annotated_input
@ddt.ddt
class TestInsertQueue(base.V1FunctionalTestBase):
"""Tests for Insert queue."""
server_class = base.ZaqarServer
def setUp(self):
super(TestInsertQueue, self).setUp()
self.base_url = '{0}/{1}'.format(self.cfg.zaqar.url,
"v1")
self.header = helpers.create_zaqar_headers(self.cfg)
self.headers_response_empty = {'location'}
self.client.set_base_url(self.base_url)
self.header = helpers.create_zaqar_headers(self.cfg)
@ddt.data('qtestqueue', 'TESTqueue', 'hyphen-name', '_undersore',
annotated('test_insert_queue_long_name', 'i' * 64))
def test_insert_queue(self, queue_name):
"""Create Queue."""
self.url = self.base_url + '/queues/' + queue_name
self.addCleanup(self.client.delete, self.url)
result = self.client.put(self.url)
self.assertEqual(201, result.status_code)
response_headers = set(result.headers.keys())
self.assertIsSubset(self.headers_response_empty, response_headers)
self.url = self.url + '/metadata'
result = self.client.get(self.url)
self.assertEqual(200, result.status_code)
self.assertEqual({}, result.json())
test_insert_queue.tags = ['positive', 'smoke']
@ddt.data(annotated('test_insert_queue_non_ascii_name',
u'\u6c49\u5b57\u6f22\u5b57'),
'@$@^qw',
annotated('test_insert_queue_invalid_name_length', 'i' * 65))
def test_insert_queue_invalid_name(self, queue_name):
"""Create Queue."""
self.url = self.base_url + '/queues/' + queue_name
self.addCleanup(self.client.delete, self.url)
result = self.client.put(self.url)
self.assertEqual(400, result.status_code)
self.url = self.url + '/metadata'
result = self.client.get(self.url)
self.assertEqual(400, result.status_code)
test_insert_queue_invalid_name.tags = ['negative']
def test_insert_queue_header_plaintext(self):
"""Insert Queue with 'Accept': 'plain/text'."""
path = '/queues/plaintextheader'
self.addCleanup(self.client.delete, path)
header = {"Accept": 'plain/text'}
result = self.client.put(path, headers=header)
self.assertEqual(406, result.status_code)
test_insert_queue_header_plaintext.tags = ['negative']
def test_insert_queue_header_asterisk(self):
"""Insert Queue with 'Accept': '*/*'."""
path = '/queues/asteriskinheader'
headers = {"Accept": '*/*'}
self.addCleanup(self.client.delete, url=path, headers=headers)
result = self.client.put(path, headers=headers)
self.assertEqual(201, result.status_code)
test_insert_queue_header_asterisk.tags = ['positive']
def test_insert_queue_with_metadata(self):
"""Insert queue with a non-empty request body."""
self.url = self.base_url + '/queues/hasmetadata'
doc = {"queue": "Has Metadata"}
self.addCleanup(self.client.delete, self.url)
result = self.client.put(self.url, data=doc)
self.assertEqual(201, result.status_code)
self.url = self.base_url + '/queues/hasmetadata/metadata'
result = self.client.get(self.url)
self.assertEqual(200, result.status_code)
self.assertEqual({}, result.json())
test_insert_queue_with_metadata.tags = ['negative']
def tearDown(self):
super(TestInsertQueue, self).tearDown()
@ddt.ddt
class TestQueueMetaData(base.V1FunctionalTestBase):
"""Tests for queue metadata."""
server_class = base.ZaqarServer
def setUp(self):
super(TestQueueMetaData, self).setUp()
self.base_url = '{0}/{1}'.format(self.cfg.zaqar.url,
"v1")
self.queue_url = self.base_url + '/queues/{0}'.format(uuid.uuid1())
self.client.put(self.queue_url)
self.queue_metadata_url = self.queue_url + '/metadata'
self.client.set_base_url(self.queue_metadata_url)
@ddt.data({},
{'@queue': 'Top Level field with @'},
annotated('test_insert_queue_metadata_unicode', {
u'\u6c49\u5b57': u'Unicode: \u6c49\u5b57'
}),
{'queue': '#$%^&Apple'},
annotated('test_insert_queue_metadata_huge',
{"queue": "i" * 65000}))
def test_insert_queue_metadata(self, doc):
"""Insert Queue with empty json."""
result = self.client.put(data=doc)
self.assertEqual(204, result.status_code)
result = self.client.get()
self.assertEqual(200, result.status_code)
doc_decoded = {}
for k, v in doc.items():
if isinstance(k, bytes):
k = k.decode('utf-8')
if isinstance(v, bytes):
v = v.decode('utf-8')
doc_decoded[k] = v
self.assertEqual(result.json(), doc_decoded)
test_insert_queue_metadata.tags = ['smoke', 'positive']
@ddt.data('not_a_dict',
annotated('test_insert_queue_invalid_metadata_huge',
{"queue": "i" * 65537}))
def test_insert_queue_invalid_metadata(self, doc):
"""Insert invalid metadata."""
result = self.client.put(data=doc)
self.assertEqual(400, result.status_code)
test_insert_queue_invalid_metadata.tags = ['negative']
def tearDown(self):
super(TestQueueMetaData, self).tearDown()
self.client.delete(self.queue_url)
@ddt.ddt
class TestQueueMisc(base.V1FunctionalTestBase):
server_class = base.ZaqarServer
def setUp(self):
super(TestQueueMisc, self).setUp()
self.base_url = self.cfg.zaqar.url
self.client.set_base_url(self.base_url)
self.queue_url = (self.base_url + '/{0}/queues/{1}'
.format("v1", uuid.uuid1()))
def test_list_queues(self):
"""List Queues."""
self.client.put(self.queue_url)
self.addCleanup(self.client.delete, self.queue_url)
result = self.client.get('/{0}/queues'
.format('v1'))
self.assertEqual(200, result.status_code)
self.assertSchema(result.json(), 'queue_list')
test_list_queues.tags = ['smoke', 'positive']
def test_list_queues_detailed(self):
"""List Queues with detailed = True."""
self.client.put(self.queue_url)
self.addCleanup(self.client.delete, self.queue_url)
params = {'detailed': True}
result = self.client.get('/{0}/queues'
.format("v1"),
params=params)
self.assertEqual(200, result.status_code)
self.assertSchema(result.json(), 'queue_list')
response_keys = result.json()['queues'][0].keys()
self.assertIn('metadata', response_keys)
test_list_queues_detailed.tags = ['smoke', 'positive']
@ddt.data(0, -1, 1001)
def test_list_queue_invalid_limit(self, limit):
"""List Queues with a limit value that is not allowed."""
params = {'limit': limit}
result = self.client.get('/{0}/queues'
.format("v1"),
params=params)
self.assertEqual(400, result.status_code)
test_list_queue_invalid_limit.tags = ['negative']
def test_check_health(self):
"""Test health endpoint."""
result = self.client.get('/{0}/health'
.format("v1"))
self.assertEqual(204, result.status_code)
test_check_health.tags = ['positive']
def test_check_queue_exists(self):
"""Checks if queue exists."""
self.client.put(self.queue_url)
self.addCleanup(self.client.delete, self.queue_url)
result = self.client.get(self.queue_url)
self.assertEqual(204, result.status_code)
result = self.client.head(self.queue_url)
self.assertEqual(204, result.status_code)
test_check_queue_exists.tags = ['positive']
def test_check_queue_exists_negative(self):
"""Checks non-existing queue."""
path = '/{0}/queues/nonexistingqueue'.format("v1")
result = self.client.get(path)
self.assertEqual(404, result.status_code)
result = self.client.head(path)
self.assertEqual(404, result.status_code)
test_check_queue_exists_negative.tags = ['negative']
def test_get_queue_malformed_marker(self):
"""List queues with invalid marker."""
path = '/{0}/queues?marker=zzz'.format("v1")
result = self.client.get(path)
self.assertEqual(204, result.status_code)
test_get_queue_malformed_marker.tags = ['negative']
def test_get_stats_empty_queue(self):
"""Get queue stats on an empty queue."""
result = self.client.put(self.queue_url)
self.addCleanup(self.client.delete, self.queue_url)
self.assertEqual(201, result.status_code)
stats_url = self.queue_url + '/stats'
# Get stats on an empty queue
result = self.client.get(stats_url)
self.assertEqual(200, result.status_code)
expected_response = {'messages':
{'claimed': 0, 'total': 0, 'free': 0}}
self.assertEqual(expected_response, result.json())
test_get_stats_empty_queue.tags = ['positive']
@ddt.data(0, 1)
def test_get_queue_stats_claimed(self, claimed):
"""Get stats on a queue."""
result = self.client.put(self.queue_url)
self.addCleanup(self.client.delete, self.queue_url)
self.assertEqual(201, result.status_code)
# Post Messages to the test queue
doc = helpers.create_message_body(
messagecount=self.limits.max_messages_per_claim_or_pop)
message_url = self.queue_url + '/messages'
result = self.client.post(message_url, data=doc)
self.assertEqual(201, result.status_code)
if claimed > 0:
claim_url = self.queue_url + '/claims?limit=' + str(claimed)
doc = {'ttl': 300, 'grace': 300}
result = self.client.post(claim_url, data=doc)
self.assertEqual(201, result.status_code)
# Get stats on the queue.
stats_url = self.queue_url + '/stats'
result = self.client.get(stats_url)
self.assertEqual(200, result.status_code)
self.assertQueueStats(result.json(), claimed)
test_get_queue_stats_claimed.tags = ['positive']
def tearDown(self):
super(TestQueueMisc, self).tearDown()
class TestQueueNonExisting(base.V1FunctionalTestBase):
"""Test Actions on non existing queue."""
server_class = base.ZaqarServer
def setUp(self):
super(TestQueueNonExisting, self).setUp()
self.base_url = '{0}/{1}'.format(self.cfg.zaqar.url, "v1")
self.queue_url = (self.base_url +
'/queues/0a5b1b85-4263-11e3-b034-28cfe91478b9')
self.client.set_base_url(self.queue_url)
self.header = helpers.create_zaqar_headers(self.cfg)
self.headers_response_empty = {'location'}
self.header = helpers.create_zaqar_headers(self.cfg)
def test_get_queue(self):
"""Get non existing Queue."""
result = self.client.get()
self.assertEqual(404, result.status_code)
def test_get_stats(self):
"""Get stats on non existing Queue."""
result = self.client.get('/stats')
self.assertEqual(404, result.status_code)
def test_get_metadata(self):
"""Get metadata on non existing Queue."""
result = self.client.get('/metadata')
self.assertEqual(404, result.status_code)
def test_get_messages(self):
"""Get messages on non existing Queue."""
result = self.client.get('/messages')
self.assertEqual(204, result.status_code)
def test_post_messages(self):
"""Post messages to a non existing Queue."""
doc = [{"ttl": 200, "body": {"Home": ""}}]
result = self.client.post('/messages', data=doc)
self.assertEqual(404, result.status_code)
def test_claim_messages(self):
"""Claim messages from a non existing Queue."""
doc = {"ttl": 200, "grace": 300}
result = self.client.post('/claims', data=doc)
self.assertEqual(204, result.status_code)
def test_delete_queue(self):
"""Delete non existing Queue."""
result = self.client.delete()
self.assertEqual(204, result.status_code)