Fork v1_1 of the API into v2

This patch forks the existing v1_1 of the API and opens the
implementation for v2. Unfortunately, this is done by copying and
pasting the existing v1_1. Hopefully, we'll have a nicer way to do this
in the future.

Change-Id: I611d3b62d5d9866b65061542cedcb1f4e3e61629
This commit is contained in:
Flavio Percoco 2015-01-08 11:33:59 +01:00
parent f2a6bcca1a
commit 6222cc6654
29 changed files with 4491 additions and 5 deletions

View File

@ -38,7 +38,7 @@ EXPECTED_VERSIONS = [
},
{
'id': '1.1',
'status': 'CURRENT',
'status': 'SUPPORTED',
'updated': '2014-9-24T04:06:47Z',
'media-types': [
{
@ -52,6 +52,23 @@ EXPECTED_VERSIONS = [
'rel': 'self'
}
]
},
{
'id': '2',
'status': 'CURRENT',
'updated': '2014-9-24T04:06:47Z',
'media-types': [
{
'base': 'application/json',
'type': 'application/vnd.openstack.messaging-v2+json'
}
],
'links': [
{
'href': '/v2/',
'rel': 'self'
}
]
}
]
@ -65,5 +82,5 @@ class TestVersion(base.TestBase):
versions = jsonutils.loads(response[0])['versions']
self.assertEqual(self.srmock.status, falcon.HTTP_300)
self.assertEqual(len(versions), 2)
self.assertEqual(len(versions), 3)
self.assertEqual(EXPECTED_VERSIONS, versions)

View File

@ -0,0 +1,231 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import uuid
import ddt
import falcon
from oslo.serialization import jsonutils
from zaqar.tests.unit.transport.wsgi import base
from zaqar.tests.unit.transport.wsgi import v2_0
# --------------------------------------------------------------------------
# Identical or just minor variations across versions
# --------------------------------------------------------------------------
URL_PREFIX = '/v2'
class TestAuth(v2_0.TestAuth):
url_prefix = URL_PREFIX
class TestClaimsFaultyDriver(v2_0.TestClaimsFaultyDriver):
url_prefix = URL_PREFIX
class TestClaimsMongoDB(v2_0.TestClaimsMongoDB):
url_prefix = URL_PREFIX
class TestClaimsSqlalchemy(v2_0.TestClaimsSqlalchemy):
url_prefix = URL_PREFIX
class TestDefaultLimits(v2_0.TestDefaultLimits):
url_prefix = URL_PREFIX
class TestHomeDocument(v2_0.TestHomeDocument):
url_prefix = URL_PREFIX
class TestMediaType(v2_0.TestMediaType):
url_prefix = URL_PREFIX
class TestMessagesFaultyDriver(v2_0.TestMessagesFaultyDriver):
url_prefix = URL_PREFIX
class TestMessagesMongoDB(v2_0.TestMessagesMongoDB):
url_prefix = URL_PREFIX
class TestMessagesMongoDBPooled(v2_0.TestMessagesMongoDBPooled):
url_prefix = URL_PREFIX
class TestMessagesSqlalchemy(v2_0.TestMessagesSqlalchemy):
url_prefix = URL_PREFIX
class TestQueueFaultyDriver(v2_0.TestQueueFaultyDriver):
url_prefix = URL_PREFIX
# TODO(kgriffs): Having to list a separate test for each backend is
# sort of a pain; is there a better way?
class TestQueueLifecycleMongoDB(v2_0.TestQueueLifecycleMongoDB):
url_prefix = URL_PREFIX
class TestQueueLifecycleSqlalchemy(v2_0.TestQueueLifecycleSqlalchemy):
url_prefix = URL_PREFIX
class TestPoolsMongoDB(v2_0.TestPoolsMongoDB):
url_prefix = URL_PREFIX
class TestPoolsSqlalchemy(v2_0.TestPoolsSqlalchemy):
url_prefix = URL_PREFIX
class TestValidation(v2_0.TestValidation):
url_prefix = URL_PREFIX
class TestFlavorsMongoDB(v2_0.TestFlavorsMongoDB):
url_prefix = URL_PREFIX
# --------------------------------------------------------------------------
# v1.1 & v2 only
# --------------------------------------------------------------------------
class TestPing(base.V2Base):
config_file = 'wsgi_sqlalchemy.conf'
def test_get(self):
# TODO(kgriffs): Make use of setUp for setting the URL prefix
# so we can just say something like:
#
# response = self.simulate_get('/ping')
#
response = self.simulate_get('/v2/ping')
self.assertEqual(self.srmock.status, falcon.HTTP_204)
self.assertEqual(response, [])
def test_head(self):
response = self.simulate_head('/v2/ping')
self.assertEqual(self.srmock.status, falcon.HTTP_204)
self.assertEqual(response, [])
class TestHealthMongoDB(v2_0.TestHealthMongoDB):
url_prefix = URL_PREFIX
class TestHealthFaultyDriver(v2_0.TestHealthFaultyDriver):
url_prefix = URL_PREFIX
@ddt.ddt
class TestMessages(base.V2Base):
config_file = 'wsgi_sqlalchemy.conf'
def setUp(self):
super(TestMessages, self).setUp()
self.queue_path = '/v2/queues/test-queue'
self.messages_path = self.queue_path + '/messages'
self.project_id = 'e8ba1038'
self.headers = {'Client-ID': str(uuid.uuid4())}
self.simulate_put(self.queue_path, self.project_id)
def tearDown(self):
self.simulate_delete(self.queue_path, self.project_id)
super(TestMessages, self).tearDown()
def _post_messages(self, target, repeat=1):
doc = {'messages': [{'body': 239, 'ttl': 300}] * repeat}
body = jsonutils.dumps(doc)
return self.simulate_post(target, self.project_id, body=body,
headers=self.headers)
def _get_msg_id(self, headers):
return self._get_msg_ids(headers)[0]
def _get_msg_ids(self, headers):
return headers['Location'].rsplit('=', 1)[-1].split(',')
@ddt.data(1, 2, 10)
def test_pop(self, message_count):
self._post_messages(self.messages_path, repeat=message_count)
msg_id = self._get_msg_id(self.srmock.headers_dict)
target = self.messages_path + '/' + msg_id
self.simulate_get(target, self.project_id)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
query_string = 'pop=' + str(message_count)
result = self.simulate_delete(self.messages_path, self.project_id,
query_string=query_string)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
result_doc = jsonutils.loads(result[0])
self.assertEqual(len(result_doc['messages']), message_count)
self.simulate_get(target, self.project_id)
self.assertEqual(self.srmock.status, falcon.HTTP_404)
@ddt.data('', 'pop=1000000', 'pop=10&ids=1', 'pop=-1')
def test_pop_invalid(self, query_string):
self.simulate_delete(self.messages_path, self.project_id,
query_string=query_string)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_pop_empty_queue(self):
query_string = 'pop=1'
result = self.simulate_delete(self.messages_path, self.project_id,
query_string=query_string)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
result_doc = jsonutils.loads(result[0])
self.assertEqual(result_doc['messages'], [])
def test_pop_single_message(self):
self._post_messages(self.messages_path, repeat=5)
msg_id = self._get_msg_id(self.srmock.headers_dict)
target = self.messages_path + '/' + msg_id
self.simulate_get(target, self.project_id)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
# Pop Single message from the queue
query_string = 'pop=1'
result = self.simulate_delete(self.messages_path, self.project_id,
query_string=query_string)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
# Get messages from the queue & verify message count
query_string = 'echo=True'
result = self.simulate_get(self.messages_path, self.project_id,
query_string=query_string,
headers=self.headers)
result_doc = jsonutils.loads(result[0])
actual_msg_count = len(result_doc['messages'])
expected_msg_count = 4
self.assertEqual(actual_msg_count, expected_msg_count)

View File

@ -66,7 +66,7 @@ def extract_project_id(req, resp, params):
X-PROJECT-ID cannot be an empty string. Specify the right header X-PROJECT-ID
and retry.'''))
# TODO(flaper87): Make version comparison smarter to support v2.
# TODO(flaper87): Make version comparison smarter to support v2_0.
if not params['project_id'] and 'v1.1' in req.path:
raise falcon.HTTPBadRequest('Project-Id Missing',
_(u'The header X-PROJECT-ID was missing'))

View File

@ -153,3 +153,17 @@ class V1_1BaseFaulty(TestBaseFaulty):
Should contain methods specific to V1.1 exception testing
"""
pass
class V2Base(V1_1Base):
"""Base class for V2 API Tests.
Should contain methods specific to V2 of the API
"""
class V2BaseFaulty(V1_1BaseFaulty):
"""Base class for V2 API Faulty Tests.
Should contain methods specific to V2 exception testing
"""

View File

@ -0,0 +1,46 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
from zaqar.tests.unit.transport.wsgi.v2_0 import test_auth
from zaqar.tests.unit.transport.wsgi.v2_0 import test_claims
from zaqar.tests.unit.transport.wsgi.v2_0 import test_default_limits
from zaqar.tests.unit.transport.wsgi.v2_0 import test_flavors
from zaqar.tests.unit.transport.wsgi.v2_0 import test_health
from zaqar.tests.unit.transport.wsgi.v2_0 import test_home
from zaqar.tests.unit.transport.wsgi.v2_0 import test_media_type
from zaqar.tests.unit.transport.wsgi.v2_0 import test_messages
from zaqar.tests.unit.transport.wsgi.v2_0 import test_pools
from zaqar.tests.unit.transport.wsgi.v2_0 import test_queue_lifecycle as l
from zaqar.tests.unit.transport.wsgi.v2_0 import test_validation
TestAuth = test_auth.TestAuth
TestClaimsFaultyDriver = test_claims.TestClaimsFaultyDriver
TestClaimsMongoDB = test_claims.TestClaimsMongoDB
TestClaimsSqlalchemy = test_claims.TestClaimsSqlalchemy
TestDefaultLimits = test_default_limits.TestDefaultLimits
TestHealthMongoDB = test_health.TestHealthMongoDB
TestHealthFaultyDriver = test_health.TestHealthFaultyDriver
TestHomeDocument = test_home.TestHomeDocument
TestMediaType = test_media_type.TestMediaType
TestMessagesFaultyDriver = test_messages.TestMessagesFaultyDriver
TestMessagesMongoDB = test_messages.TestMessagesMongoDB
TestMessagesMongoDBPooled = test_messages.TestMessagesMongoDBPooled
TestMessagesSqlalchemy = test_messages.TestMessagesSqlalchemy
TestQueueFaultyDriver = l.TestQueueLifecycleFaultyDriver
TestQueueLifecycleMongoDB = l.TestQueueLifecycleMongoDB
TestQueueLifecycleSqlalchemy = l.TestQueueLifecycleSqlalchemy
TestPoolsMongoDB = test_pools.TestPoolsMongoDB
TestPoolsSqlalchemy = test_pools.TestPoolsSqlalchemy
TestValidation = test_validation.TestValidation
TestFlavorsMongoDB = test_flavors.TestFlavorsMongoDB

View File

@ -0,0 +1,43 @@
# Copyright (c) 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test Auth."""
import uuid
import falcon
from falcon import testing
from keystonemiddleware import auth_token
from zaqar.tests.unit.transport.wsgi import base
class TestAuth(base.V2Base):
config_file = 'keystone_auth.conf'
def setUp(self):
super(TestAuth, self).setUp()
self.headers = {'Client-ID': str(uuid.uuid4())}
def test_auth_install(self):
self.assertIsInstance(self.app, auth_token.AuthProtocol)
def test_non_authenticated(self):
env = testing.create_environ(self.url_prefix + '/480924/queues/',
method='GET',
headers=self.headers)
self.app(env, self.srmock)
self.assertEqual(self.srmock.status, falcon.HTTP_401)

View File

@ -0,0 +1,329 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
import uuid
import ddt
import falcon
import mock
from oslo.serialization import jsonutils
from oslo.utils import timeutils
from testtools import matchers
from zaqar import tests as testing
from zaqar.tests.unit.transport.wsgi import base
@ddt.ddt
class ClaimsBaseTest(base.V2Base):
def setUp(self):
super(ClaimsBaseTest, self).setUp()
self.default_claim_ttl = self.boot.transport._defaults.claim_ttl
self.project_id = '737_abc8332832'
self.headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': self.project_id
}
self.queue_path = self.url_prefix + '/queues/fizbit'
self.claims_path = self.queue_path + '/claims'
self.messages_path = self.queue_path + '/messages'
doc = json.dumps({"_ttl": 60})
self.simulate_put(self.queue_path, body=doc, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_201)
doc = json.dumps({'messages': [{'body': 239, 'ttl': 300}] * 10})
self.simulate_post(self.queue_path + '/messages',
body=doc, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_201)
def tearDown(self):
self.simulate_delete(self.queue_path, headers=self.headers)
super(ClaimsBaseTest, self).tearDown()
@ddt.data('[', '[]', '.', '"fail"')
def test_bad_claim(self, doc):
self.simulate_post(self.claims_path, body=doc, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
href = self._get_a_claim()
self.simulate_patch(href, body=doc, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_exceeded_claim(self):
self.simulate_post(self.claims_path,
body='{"ttl": 100, "grace": 60}',
query_string='limit=21', headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data((-1, -1), (59, 60), (60, 59), (60, 43201), (43201, 60))
def test_unacceptable_ttl_or_grace(self, ttl_grace):
ttl, grace = ttl_grace
self.simulate_post(self.claims_path,
body=json.dumps({'ttl': ttl, 'grace': grace}),
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(-1, 59, 43201)
def test_unacceptable_new_ttl(self, ttl):
href = self._get_a_claim()
self.simulate_patch(href,
body=json.dumps({'ttl': ttl}),
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_default_ttl_and_grace(self):
self.simulate_post(self.claims_path,
body='{}', headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_201)
body = self.simulate_get(self.srmock.headers_dict['location'],
headers=self.headers)
claim = jsonutils.loads(body[0])
self.assertEqual(self.default_claim_ttl, claim['ttl'])
def _get_a_claim(self):
doc = '{"ttl": 100, "grace": 60}'
self.simulate_post(self.claims_path, body=doc, headers=self.headers)
return self.srmock.headers_dict['Location']
def test_lifecycle(self):
doc = '{"ttl": 100, "grace": 60}'
# First, claim some messages
body = self.simulate_post(self.claims_path, body=doc,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_201)
claimed = jsonutils.loads(body[0])['messages']
claim_href = self.srmock.headers_dict['Location']
message_href, params = claimed[0]['href'].split('?')
# No more messages to claim
self.simulate_post(self.claims_path, body=doc,
query_string='limit=3', headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
# Listing messages, by default, won't include claimed, will echo
body = self.simulate_get(self.messages_path,
headers=self.headers,
query_string="echo=true")
self.assertEqual(self.srmock.status, falcon.HTTP_200)
self._empty_message_list(body)
# Listing messages, by default, won't include claimed, won't echo
body = self.simulate_get(self.messages_path,
headers=self.headers,
query_string="echo=false")
self.assertEqual(self.srmock.status, falcon.HTTP_200)
self._empty_message_list(body)
# List messages, include_claimed, but don't echo
body = self.simulate_get(self.messages_path,
query_string='include_claimed=true'
'&echo=false',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
self._empty_message_list(body)
# List messages with a different client-id and echo=false.
# Should return some messages
headers = self.headers.copy()
headers["Client-ID"] = str(uuid.uuid4())
body = self.simulate_get(self.messages_path,
query_string='include_claimed=true'
'&echo=false',
headers=headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
# Include claimed messages this time, and echo
body = self.simulate_get(self.messages_path,
query_string='include_claimed=true'
'&echo=true',
headers=self.headers)
listed = jsonutils.loads(body[0])
self.assertEqual(self.srmock.status, falcon.HTTP_200)
self.assertEqual(len(listed['messages']), len(claimed))
now = timeutils.utcnow() + datetime.timedelta(seconds=10)
timeutils_utcnow = 'zaqar.openstack.common.timeutils.utcnow'
with mock.patch(timeutils_utcnow) as mock_utcnow:
mock_utcnow.return_value = now
body = self.simulate_get(claim_href, headers=self.headers)
claim = jsonutils.loads(body[0])
self.assertEqual(self.srmock.status, falcon.HTTP_200)
self.assertEqual(claim['ttl'], 100)
# NOTE(cpp-cabrera): verify that claim age is non-negative
self.assertThat(claim['age'], matchers.GreaterThan(-1))
# Try to delete the message without submitting a claim_id
self.simulate_delete(message_href, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_403)
# Delete the message and its associated claim
self.simulate_delete(message_href,
query_string=params, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
# Try to get it from the wrong project
headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': 'bogusproject'
}
self.simulate_get(message_href, query_string=params, headers=headers)
self.assertEqual(self.srmock.status, falcon.HTTP_404)
# Get the message
self.simulate_get(message_href, query_string=params,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_404)
# Update the claim
new_claim_ttl = '{"ttl": 60, "grace": 60}'
creation = timeutils.utcnow()
self.simulate_patch(claim_href, body=new_claim_ttl,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
# Get the claimed messages (again)
body = self.simulate_get(claim_href, headers=self.headers)
query = timeutils.utcnow()
claim = jsonutils.loads(body[0])
message_href, params = claim['messages'][0]['href'].split('?')
self.assertEqual(claim['ttl'], 60)
estimated_age = timeutils.delta_seconds(creation, query)
self.assertTrue(estimated_age > claim['age'])
# Delete the claim
self.simulate_delete(claim['href'], headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
# Try to delete a message with an invalid claim ID
self.simulate_delete(message_href,
query_string=params, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
# Make sure it wasn't deleted!
self.simulate_get(message_href, query_string=params,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
# Try to get a claim that doesn't exist
self.simulate_get(claim['href'], headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_404)
# Try to update a claim that doesn't exist
self.simulate_patch(claim['href'], body=doc,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_404)
def test_post_claim_nonexistent_queue(self):
path = self.url_prefix + '/queues/nonexistent/claims'
self.simulate_post(path,
body='{"ttl": 100, "grace": 60}',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
def test_get_claim_nonexistent_queue(self):
path = self.url_prefix + '/queues/nonexistent/claims/aaabbbba'
self.simulate_get(path, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_404)
# NOTE(cpp-cabrera): regression test against bug #1203842
def test_get_nonexistent_claim_404s(self):
self.simulate_get(self.claims_path + '/a', headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_404)
def test_delete_nonexistent_claim_204s(self):
self.simulate_delete(self.claims_path + '/a',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
def test_patch_nonexistent_claim_404s(self):
patch_data = json.dumps({'ttl': 100})
self.simulate_patch(self.claims_path + '/a', body=patch_data,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_404)
class TestClaimsMongoDB(ClaimsBaseTest):
config_file = 'wsgi_mongodb.conf'
@testing.requires_mongodb
def setUp(self):
super(TestClaimsMongoDB, self).setUp()
def tearDown(self):
storage = self.boot.storage._storage
connection = storage.connection
connection.drop_database(storage.queues_database)
for db in storage.message_databases:
connection.drop_database(db)
super(TestClaimsMongoDB, self).tearDown()
class TestClaimsSqlalchemy(ClaimsBaseTest):
config_file = 'wsgi_sqlalchemy.conf'
class TestClaimsFaultyDriver(base.V2BaseFaulty):
config_file = 'wsgi_faulty.conf'
def test_simple(self):
self.project_id = '480924abc_'
self.headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': self.project_id
}
claims_path = self.url_prefix + '/queues/fizbit/claims'
doc = '{"ttl": 100, "grace": 60}'
self.simulate_post(claims_path, body=doc, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_503)
self.simulate_get(claims_path + '/nichts', headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_503)
self.simulate_patch(claims_path + '/nichts', body=doc,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_503)
self.simulate_delete(claims_path + '/foo', headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_503)

View File

@ -0,0 +1,124 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import uuid
import falcon
from oslo.serialization import jsonutils
from zaqar import storage
from zaqar.tests.unit.transport.wsgi import base
class TestDefaultLimits(base.V2Base):
config_file = 'wsgi_sqlalchemy_default_limits.conf'
def setUp(self):
super(TestDefaultLimits, self).setUp()
self.headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': '838383abc_'
}
self.queue_path = self.url_prefix + '/queues'
self.q1_queue_path = self.queue_path + '/q1'
self.messages_path = self.q1_queue_path + '/messages'
self.claims_path = self.q1_queue_path + '/claims'
self.simulate_put(self.q1_queue_path, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_201)
def tearDown(self):
self.simulate_delete(self.queue_path, headers=self.headers)
super(TestDefaultLimits, self).tearDown()
def test_queue_listing(self):
# 2 queues to list
self.simulate_put(self.queue_path + '/q2', headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_201)
with self._prepare_queues(storage.DEFAULT_QUEUES_PER_PAGE + 1):
result = self.simulate_get(self.queue_path, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
queues = jsonutils.loads(result[0])['queues']
self.assertEqual(len(queues), storage.DEFAULT_QUEUES_PER_PAGE)
def test_message_listing_different_id(self):
self._prepare_messages(storage.DEFAULT_MESSAGES_PER_PAGE + 1)
headers = self.headers.copy()
headers['Client-ID'] = str(uuid.uuid4())
result = self.simulate_get(self.messages_path,
headers=headers,
query_string='echo=false')
self.assertEqual(self.srmock.status, falcon.HTTP_200)
messages = jsonutils.loads(result[0])['messages']
self.assertEqual(len(messages), storage.DEFAULT_MESSAGES_PER_PAGE)
def test_message_listing_same_id(self):
self._prepare_messages(storage.DEFAULT_MESSAGES_PER_PAGE + 1)
result = self.simulate_get(self.messages_path,
headers=self.headers,
query_string='echo=false')
self.assertEqual(self.srmock.status, falcon.HTTP_200)
self._empty_message_list(result)
self._prepare_messages(storage.DEFAULT_MESSAGES_PER_PAGE + 1)
result = self.simulate_get(self.messages_path,
headers=self.headers,
query_string='echo=true')
messages = jsonutils.loads(result[0])['messages']
self.assertEqual(len(messages), storage.DEFAULT_MESSAGES_PER_PAGE)
def test_claim_creation(self):
self._prepare_messages(storage.DEFAULT_MESSAGES_PER_CLAIM + 1)
result = self.simulate_post(self.claims_path,
body='{"ttl": 60, "grace": 60}',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_201)
messages = jsonutils.loads(result[0])['messages']
self.assertEqual(len(messages), storage.DEFAULT_MESSAGES_PER_CLAIM)
@contextlib.contextmanager
def _prepare_queues(self, count):
queue_paths = [self.queue_path + '/multi-{0}'.format(i)
for i in range(count)]
for path in queue_paths:
self.simulate_put(path, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_201)
yield
for path in queue_paths:
self.simulate_delete(path, headers=self.headers)
def _prepare_messages(self, count):
doc = {'messages': [{'body': 239, 'ttl': 300}] * count}
body = jsonutils.dumps(doc)
self.simulate_post(self.messages_path, body=body,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_201)

View File

@ -0,0 +1,339 @@
# Copyright (c) 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import contextlib
import uuid
import ddt
import falcon
from oslo.serialization import jsonutils
from zaqar import tests as testing
from zaqar.tests.unit.transport.wsgi import base
@contextlib.contextmanager
def flavor(test, name, pool, capabilities={}):
"""A context manager for constructing a flavor for use in testing.
Deletes the flavor after exiting the context.
:param test: Must expose simulate_* methods
:param name: Name for this flavor
:type name: six.text_type
:type pool: six.text_type
:type capabilities: dict
:returns: (name, uri, capabilities)
:rtype: see above
"""
doc = {'pool': pool, 'capabilities': capabilities}
path = test.url_prefix + '/flavors/' + name
test.simulate_put(path, body=jsonutils.dumps(doc))
try:
yield name, pool, capabilities
finally:
test.simulate_delete(path)
@contextlib.contextmanager
def flavors(test, count, pool):
"""A context manager for constructing flavors for use in testing.
Deletes the flavors after exiting the context.
:param test: Must expose simulate_* methods
:param count: Number of pools to create
:type count: int
:returns: (paths, pool, capabilities)
:rtype: ([six.text_type], [six.text_type], [dict])
"""
base = test.url_prefix + '/flavors/'
args = sorted([(base + str(i), {str(i): i}, str(i)) for i in range(count)],
key=lambda tup: tup[2])
for path, capabilities, _ in args:
doc = {'pool': pool, 'capabilities': capabilities}
test.simulate_put(path, body=jsonutils.dumps(doc))
try:
yield args
finally:
for path, _, _ in args:
test.simulate_delete(path)
@ddt.ddt
class FlavorsBaseTest(base.V2Base):
def setUp(self):
super(FlavorsBaseTest, self).setUp()
self.queue = 'test-queue'
self.queue_path = self.url_prefix + '/queues/' + self.queue
self.pool = 'mypool'
self.pool_group = 'mypool-group'
self.pool_path = self.url_prefix + '/pools/' + self.pool
self.pool_doc = {'weight': 100,
'group': self.pool_group,
'uri': 'sqlite://:memory:'}
self.simulate_put(self.pool_path, body=jsonutils.dumps(self.pool_doc))
self.flavor = 'test-flavor'
self.doc = {'capabilities': {}, 'pool': self.pool_group}
self.flavor_path = self.url_prefix + '/flavors/' + self.flavor
self.simulate_put(self.flavor_path, body=jsonutils.dumps(self.doc))
self.assertEqual(self.srmock.status, falcon.HTTP_201)
def tearDown(self):
super(FlavorsBaseTest, self).tearDown()
self.simulate_delete(self.flavor_path)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
self.simulate_delete(self.queue_path)
def test_put_flavor_works(self):
name = str(uuid.uuid1())
with flavor(self, name, self.doc['pool']):
self.assertEqual(self.srmock.status, falcon.HTTP_201)
def test_put_raises_if_missing_fields(self):
path = self.url_prefix + '/flavors/' + str(uuid.uuid1())
self.simulate_put(path, body=jsonutils.dumps({}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
self.simulate_put(path,
body=jsonutils.dumps({'capabilities': {}}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(1, 2**32+1, [])
def test_put_raises_if_invalid_pool(self, pool):
path = self.url_prefix + '/flavors/' + str(uuid.uuid1())
self.simulate_put(path,
body=jsonutils.dumps({'pool': pool}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(-1, 'wee', [])
def test_put_raises_if_invalid_capabilities(self, capabilities):
path = self.url_prefix + '/flavors/' + str(uuid.uuid1())
doc = {'pool': 'a', 'capabilities': capabilities}
self.simulate_put(path, body=jsonutils.dumps(doc))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_put_existing_overwrites(self):
# NOTE(cabrera): setUp creates default flavor
expect = self.doc
self.simulate_put(self.flavor_path,
body=jsonutils.dumps(expect))
self.assertEqual(self.srmock.status, falcon.HTTP_201)
result = self.simulate_get(self.flavor_path)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
doc = jsonutils.loads(result[0])
self.assertEqual(doc['pool'], expect['pool'])
def test_create_flavor_no_pool(self):
self.simulate_delete(self.flavor_path)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
self.simulate_delete(self.pool_path)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
self.simulate_put(self.flavor_path, body=jsonutils.dumps(self.doc))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_delete_works(self):
self.simulate_delete(self.flavor_path)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
self.simulate_get(self.flavor_path)
self.assertEqual(self.srmock.status, falcon.HTTP_404)
def test_get_nonexisting_raises_404(self):
self.simulate_get(self.url_prefix + '/flavors/nonexisting')
self.assertEqual(self.srmock.status, falcon.HTTP_404)
def _flavor_expect(self, flavor, xhref, xpool):
self.assertIn('href', flavor)
self.assertEqual(flavor['href'], xhref)
self.assertIn('pool', flavor)
self.assertEqual(flavor['pool'], xpool)
def test_get_works(self):
result = self.simulate_get(self.flavor_path)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
pool = jsonutils.loads(result[0])
self._flavor_expect(pool, self.flavor_path, self.doc['pool'])
def test_detailed_get_works(self):
result = self.simulate_get(self.flavor_path,
query_string='?detailed=True')
self.assertEqual(self.srmock.status, falcon.HTTP_200)
pool = jsonutils.loads(result[0])
self._flavor_expect(pool, self.flavor_path, self.doc['pool'])
self.assertIn('capabilities', pool)
self.assertEqual(pool['capabilities'], {})
def test_patch_raises_if_missing_fields(self):
self.simulate_patch(self.flavor_path,
body=jsonutils.dumps({'location': 1}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def _patch_test(self, doc):
self.simulate_patch(self.flavor_path,
body=jsonutils.dumps(doc))
self.assertEqual(self.srmock.status, falcon.HTTP_200)
result = self.simulate_get(self.flavor_path,
query_string='?detailed=True')
self.assertEqual(self.srmock.status, falcon.HTTP_200)
pool = jsonutils.loads(result[0])
self._flavor_expect(pool, self.flavor_path, doc['pool'])
self.assertEqual(pool['capabilities'], doc['capabilities'])
def test_patch_works(self):
doc = {'pool': 'my-pool', 'capabilities': {'a': 1}}
self._patch_test(doc)
def test_patch_works_with_extra_fields(self):
doc = {'pool': 'my-pool', 'capabilities': {'a': 1},
'location': 100, 'partition': 'taco'}
self._patch_test(doc)
@ddt.data(-1, 2**32+1, [])
def test_patch_raises_400_on_invalid_pool(self, pool):
self.simulate_patch(self.flavor_path,
body=jsonutils.dumps({'pool': pool}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(-1, 'wee', [])
def test_patch_raises_400_on_invalid_capabilities(self, capabilities):
doc = {'capabilities': capabilities}
self.simulate_patch(self.flavor_path, body=jsonutils.dumps(doc))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_patch_raises_404_if_flavor_not_found(self):
self.simulate_patch(self.url_prefix + '/flavors/notexists',
body=jsonutils.dumps({'pool': 'test'}))
self.assertEqual(self.srmock.status, falcon.HTTP_404)
def test_empty_listing(self):
self.simulate_delete(self.flavor_path)
result = self.simulate_get(self.url_prefix + '/flavors')
results = jsonutils.loads(result[0])
self.assertEqual(self.srmock.status, falcon.HTTP_200)
self.assertTrue(len(results['flavors']) == 0)
self.assertIn('links', results)
def _listing_test(self, count=10, limit=10,
marker=None, detailed=False):
# NOTE(cpp-cabrera): delete initial flavor - it will interfere
# with listing tests
self.simulate_delete(self.flavor_path)
query = '?limit={0}&detailed={1}'.format(limit, detailed)
if marker:
query += '&marker={2}'.format(marker)
with flavors(self, count, self.doc['pool']) as expected:
result = self.simulate_get(self.url_prefix + '/flavors',
query_string=query)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
results = jsonutils.loads(result[0])
self.assertIsInstance(results, dict)
self.assertIn('flavors', results)
self.assertIn('links', results)
flavors_list = results['flavors']
link = results['links'][0]
self.assertEqual('next', link['rel'])
href = falcon.uri.parse_query_string(link['href'])
self.assertIn('marker', href)
self.assertEqual(href['limit'], str(limit))
self.assertEqual(href['detailed'], str(detailed).lower())
next_query_string = ('?marker={marker}&limit={limit}'
'&detailed={detailed}').format(**href)
next_result = self.simulate_get(link['href'].split('?')[0],
query_string=next_query_string)
next_flavors = jsonutils.loads(next_result[0])
next_flavors_list = next_flavors['flavors']
self.assertEqual(self.srmock.status, falcon.HTTP_200)
self.assertIn('links', next_flavors)
if limit < count:
self.assertEqual(len(next_flavors_list),
min(limit, count-limit))
else:
self.assertTrue(len(next_flavors_list) == 0)
self.assertEqual(len(flavors_list), min(limit, count))
for i, s in enumerate(flavors_list + next_flavors_list):
expect = expected[i]
path, capabilities = expect[:2]
self._flavor_expect(s, path, self.doc['pool'])
if detailed:
self.assertIn('capabilities', s)
self.assertEqual(s['capabilities'], capabilities)
else:
self.assertNotIn('capabilities', s)
def test_listing_works(self):
self._listing_test()
def test_detailed_listing_works(self):
self._listing_test(detailed=True)
@ddt.data(1, 5, 10, 15)
def test_listing_works_with_limit(self, limit):
self._listing_test(count=15, limit=limit)
def test_listing_marker_is_respected(self):
self.simulate_delete(self.flavor_path)
with flavors(self, 10, self.doc['pool']) as expected:
result = self.simulate_get(self.url_prefix + '/flavors',
query_string='?marker=3')
self.assertEqual(self.srmock.status, falcon.HTTP_200)
flavor_list = jsonutils.loads(result[0])['flavors']
self.assertEqual(len(flavor_list), 6)
path, capabilities = expected[4][:2]
self._flavor_expect(flavor_list[0], path, self.doc['pool'])
def test_queue_create_works(self):
metadata = {'_flavor': self.flavor}
self.simulate_put(self.queue_path, body=jsonutils.dumps(metadata))
self.assertEqual(self.srmock.status, falcon.HTTP_201)
def test_queue_create_no_flavor(self):
metadata = {'_flavor': self.flavor}
self.simulate_delete(self.flavor_path)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
self.simulate_put(self.queue_path, body=jsonutils.dumps(metadata))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
class TestFlavorsMongoDB(FlavorsBaseTest):
config_file = 'wsgi_mongodb_pooled.conf'
@testing.requires_mongodb
def setUp(self):
super(TestFlavorsMongoDB, self).setUp()

View File

@ -0,0 +1,98 @@
# Copyright 2014 Catalyst IT Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ddt
import falcon
import mock
from oslo.serialization import jsonutils
from zaqar.storage import errors
import zaqar.storage.mongodb as mongo
from zaqar import tests as testing
from zaqar.tests.unit.transport.wsgi import base
@ddt.ddt
class TestHealth(base.TestBase):
def setUp(self):
super(TestHealth, self).setUp()
def test_basic(self):
path = self.url_prefix + '/health'
body = self.simulate_get(path)
health = jsonutils.loads(body[0])
self.assertEqual(self.srmock.status, falcon.HTTP_200)
self.assertTrue(health['storage_reachable'])
self.assertIsNotNone(health['message_volume'])
for op in health['operation_status']:
self.assertTrue(health['operation_status'][op]['succeeded'])
@mock.patch.object(mongo.driver.DataDriver, '_health')
def test_message_volume(self, mock_driver_get):
def _health():
KPI = {}
KPI['message_volume'] = {'free': 1, 'claimed': 2, 'total': 3}
return KPI
mock_driver_get.side_effect = _health
path = self.url_prefix + '/health'
body = self.simulate_get(path)
health = jsonutils.loads(body[0])
self.assertEqual(self.srmock.status, falcon.HTTP_200)
message_volume = health['message_volume']
self.assertEqual(message_volume['free'], 1)
self.assertEqual(message_volume['claimed'], 2)
self.assertEqual(message_volume['total'], 3)
@mock.patch.object(mongo.messages.MessageController, 'delete')
def test_operation_status(self, mock_messages_delete):
mock_messages_delete.side_effect = errors.NotPermitted()
path = self.url_prefix + '/health'
body = self.simulate_get(path)
health = jsonutils.loads(body[0])
self.assertEqual(self.srmock.status, falcon.HTTP_200)
op_status = health['operation_status']
for op in op_status.keys():
if op == 'delete_messages':
self.assertFalse(op_status[op]['succeeded'])
self.assertIsNotNone(op_status[op]['ref'])
else:
self.assertTrue(op_status[op]['succeeded'])
class TestHealthMongoDB(TestHealth):
config_file = 'wsgi_mongodb.conf'
@testing.requires_mongodb
def setUp(self):
super(TestHealthMongoDB, self).setUp()
def tearDown(self):
super(TestHealthMongoDB, self).tearDown()
class TestHealthFaultyDriver(base.TestBaseFaulty):
config_file = 'wsgi_faulty.conf'
def test_simple(self):
path = self.url_prefix + '/health'
self.simulate_get(path)
self.assertEqual(self.srmock.status, falcon.HTTP_503)

View File

@ -0,0 +1,67 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import uuid
import falcon
from oslo.serialization import jsonutils
import six.moves.urllib.parse as urlparse
from zaqar.tests.unit.transport.wsgi import base
class TestHomeDocument(base.V2Base):
config_file = 'wsgi_sqlalchemy.conf'
def test_json_response(self):
self.headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': '8383830383abc_'
}
body = self.simulate_get(self.url_prefix, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
content_type = self.srmock.headers_dict['Content-Type']
self.assertEqual(content_type, 'application/json-home')
try:
jsonutils.loads(body[0])
except ValueError:
self.fail('Home document is not valid JSON')
def test_href_template(self):
self.headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': '8383830383'
}
body = self.simulate_get(self.url_prefix, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
resp = jsonutils.loads(body[0])
queue_href_template = resp['resources']['rel/queue']['href-template']
path_1 = 'https://zaqar.example.com' + self.url_prefix
path_2 = 'https://zaqar.example.com' + self.url_prefix + '/'
# Verify all the href template start with the correct version prefix
for resource in list(resp['resources']):
self.assertTrue(resp['resources'][resource]['href-template'].
startswith(self.url_prefix))
url = urlparse.urljoin(path_1, queue_href_template)
expected = ('https://zaqar.example.com' + self.url_prefix +
'/queues/foo')
self.assertEqual(url.format(queue_name='foo'), expected)
url = urlparse.urljoin(path_2, queue_href_template)
self.assertEqual(url.format(queue_name='foo'), expected)

View File

@ -0,0 +1,50 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
import falcon
from falcon import testing
from zaqar.tests.unit.transport.wsgi import base
class TestMediaType(base.V2Base):
config_file = 'wsgi_sqlalchemy.conf'
def test_json_only_endpoints(self):
endpoints = (
('GET', self.url_prefix + '/queues'),
('GET', self.url_prefix + '/queues/nonexistent/stats'),
('POST', self.url_prefix + '/queues/nonexistent/messages'),
('GET', self.url_prefix + '/queues/nonexistent/messages/deadbeaf'),
('POST', self.url_prefix + '/queues/nonexistent/claims'),
('GET', self.url_prefix + '/queues/nonexistent/claims/0ad'),
('GET', self.url_prefix + '/health'),
)
for method, endpoint in endpoints:
headers = {
'Client-ID': str(uuid.uuid4()),
'Accept': 'application/xml',
}
env = testing.create_environ(endpoint,
method=method,
headers=headers)
self.app(env, self.srmock)
self.assertEqual(self.srmock.status, falcon.HTTP_406)

View File

@ -0,0 +1,573 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import uuid
import ddt
import falcon
import mock
from oslo.serialization import jsonutils
from oslo.utils import timeutils
import six
from testtools import matchers
from zaqar import tests as testing
from zaqar.tests.unit.transport.wsgi import base
from zaqar.transport import validation
@ddt.ddt
class MessagesBaseTest(base.V2Base):
def setUp(self):
super(MessagesBaseTest, self).setUp()
self.default_message_ttl = self.boot.transport._defaults.message_ttl
if self.conf.pooling:
for i in range(4):
uri = self.conf['drivers:management_store:mongodb'].uri
doc = {'weight': 100, 'uri': uri}
self.simulate_put(self.url_prefix + '/pools/' + str(i),
body=jsonutils.dumps(doc))
self.assertEqual(self.srmock.status, falcon.HTTP_201)
self.project_id = '7e55e1a7e'
self.headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': self.project_id
}
# TODO(kgriffs): Add support in self.simulate_* for a "base path"
# so that we don't have to concatenate against self.url_prefix
# all over the place.
self.queue_path = self.url_prefix + '/queues/fizbit'
self.messages_path = self.queue_path + '/messages'
doc = '{"_ttl": 60}'
self.simulate_put(self.queue_path, body=doc, headers=self.headers)
def tearDown(self):
self.simulate_delete(self.queue_path, headers=self.headers)
if self.conf.pooling:
for i in range(4):
self.simulate_delete(self.url_prefix + '/pools/' + str(i),
headers=self.headers)
super(MessagesBaseTest, self).tearDown()
def _test_post(self, sample_messages):
sample_doc = jsonutils.dumps({'messages': sample_messages})
result = self.simulate_post(self.messages_path,
body=sample_doc, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_201)
result_doc = jsonutils.loads(result[0])
msg_ids = self._get_msg_ids(self.srmock.headers_dict)
self.assertEqual(len(msg_ids), len(sample_messages))
expected_resources = [six.text_type(self.messages_path + '/' + id)
for id in msg_ids]
self.assertEqual(expected_resources, result_doc['resources'])
# NOTE(kgriffs): As of v1.1, "partial" is no longer given
# in the response document.
self.assertNotIn('partial', result_doc)
self.assertEqual(len(msg_ids), len(sample_messages))
lookup = dict([(m['ttl'], m['body']) for m in sample_messages])
# Test GET on the message resource directly
# NOTE(cpp-cabrera): force the passing of time to age a message
timeutils_utcnow = 'zaqar.openstack.common.timeutils.utcnow'
now = timeutils.utcnow() + datetime.timedelta(seconds=10)
with mock.patch(timeutils_utcnow) as mock_utcnow:
mock_utcnow.return_value = now
for msg_id in msg_ids:
message_uri = self.messages_path + '/' + msg_id
headers = self.headers.copy()
headers['X-Project-ID'] = '777777'
# Wrong project ID
self.simulate_get(message_uri, headers=headers)
self.assertEqual(self.srmock.status, falcon.HTTP_404)
# Correct project ID
result = self.simulate_get(message_uri, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
# Check message properties
message = jsonutils.loads(result[0])
self.assertEqual(message['href'], message_uri)
self.assertEqual(message['body'], lookup[message['ttl']])
self.assertEqual(msg_id, message['id'])
# no negative age
# NOTE(cpp-cabrera): testtools lacks GreaterThanEqual on py26
self.assertThat(message['age'],
matchers.GreaterThan(-1))
# Test bulk GET
query_string = 'ids=' + ','.join(msg_ids)
result = self.simulate_get(self.messages_path,
query_string=query_string,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
result_doc = jsonutils.loads(result[0])
expected_ttls = set(m['ttl'] for m in sample_messages)
actual_ttls = set(m['ttl'] for m in result_doc['messages'])
self.assertFalse(expected_ttls - actual_ttls)
actual_ids = set(m['id'] for m in result_doc['messages'])
self.assertFalse(set(msg_ids) - actual_ids)
def test_exceeded_payloads(self):
# Get a valid message id
self._post_messages(self.messages_path)
msg_id = self._get_msg_id(self.srmock.headers_dict)
# Bulk GET restriction
query_string = 'ids=' + ','.join([msg_id] * 21)
self.simulate_get(self.messages_path,
query_string=query_string, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
# Listing restriction
self.simulate_get(self.messages_path,
query_string='limit=21',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
# Bulk deletion restriction
query_string = 'ids=' + ','.join([msg_id] * 22)
self.simulate_delete(self.messages_path,
query_string=query_string, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_post_single(self):
sample_messages = [
{'body': {'key': 'value'}, 'ttl': 200},
]
self._test_post(sample_messages)
def test_post_multiple(self):
sample_messages = [
{'body': 239, 'ttl': 100},
{'body': {'key': 'value'}, 'ttl': 200},
{'body': [1, 3], 'ttl': 300},
]
self._test_post(sample_messages)
def test_post_optional_ttl(self):
sample_messages = {
'messages': [
{'body': 239},
{'body': {'key': 'value'}, 'ttl': 200},
],
}
# Manually check default TTL is max from config
sample_doc = jsonutils.dumps(sample_messages)
result = self.simulate_post(self.messages_path,
body=sample_doc, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_201)
result_doc = jsonutils.loads(result[0])
href = result_doc['resources'][0]
result = self.simulate_get(href, headers=self.headers)
message = jsonutils.loads(result[0])
self.assertEqual(self.default_message_ttl, message['ttl'])
def test_post_to_non_ascii_queue(self):
# NOTE(kgriffs): This test verifies that routes with
# embedded queue name params go through the validation
# hook, regardless of the target resource.
path = self.url_prefix + u'/queues/non-ascii-n\u0153me/messages'
if six.PY2:
path = path.encode('utf-8')
self._post_messages(path)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_post_with_long_queue_name(self):
# NOTE(kgriffs): This test verifies that routes with
# embedded queue name params go through the validation
# hook, regardless of the target resource.
queues_path = self.url_prefix + '/queues/'
game_title = 'v' * validation.QUEUE_NAME_MAX_LEN
self._post_messages(queues_path + game_title + '/messages')
self.assertEqual(self.srmock.status, falcon.HTTP_201)
game_title += 'v'
self._post_messages(queues_path + game_title + '/messages')
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_post_to_missing_queue(self):
self._post_messages(self.url_prefix + '/queues/nonexistent/messages')
self.assertEqual(self.srmock.status, falcon.HTTP_201)
def test_get_from_missing_queue(self):
body = self.simulate_get(self.url_prefix +
'/queues/nonexistent/messages',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
self._empty_message_list(body)
@ddt.data('', '0xdeadbeef', '550893e0-2b6e-11e3-835a-5cf9dd72369')
def test_bad_client_id(self, text_id):
self.simulate_post(self.queue_path + '/messages',
body='{"ttl": 60, "body": ""}',
headers={'Client-ID': text_id})
self.assertEqual(self.srmock.status, falcon.HTTP_400)
self.simulate_get(self.queue_path + '/messages',
query_string='limit=3&echo=true',
headers={'Client-ID': text_id})
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(None, '[', '[]', '{}', '.')
def test_post_bad_message(self, document):
self.simulate_post(self.queue_path + '/messages',
body=document,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(-1, 59, 1209601)
def test_unacceptable_ttl(self, ttl):
doc = {'messages': [{'ttl': ttl, 'body': None}]}
self.simulate_post(self.queue_path + '/messages',
body=jsonutils.dumps(doc),
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_exceeded_message_posting(self):
# Total (raw request) size
doc = {'messages': [{'body': "some body", 'ttl': 100}] * 20}
body = jsonutils.dumps(doc, indent=4)
max_len = self.transport_cfg.max_messages_post_size
long_body = body + (' ' * (max_len - len(body) + 1))
self.simulate_post(self.queue_path + '/messages',
body=long_body,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data('{"overflow": 9223372036854775808}',
'{"underflow": -9223372036854775809}')
def test_unsupported_json(self, document):
self.simulate_post(self.queue_path + '/messages',
body=document,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_delete(self):
self._post_messages(self.messages_path)
msg_id = self._get_msg_id(self.srmock.headers_dict)
target = self.messages_path + '/' + msg_id
self.simulate_get(target, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
self.simulate_delete(target, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
self.simulate_get(target, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_404)
# Safe to delete non-existing ones
self.simulate_delete(target, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
def test_bulk_delete(self):
path = self.queue_path + '/messages'
self._post_messages(path, repeat=5)
[target, params] = self.srmock.headers_dict['location'].split('?')
# Deleting the whole collection is denied
self.simulate_delete(path, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
self.simulate_delete(target, query_string=params, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
self.simulate_get(target, query_string=params, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_404)
# Safe to delete non-existing ones
self.simulate_delete(target, query_string=params, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
# Even after the queue is gone
self.simulate_delete(self.queue_path, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
self.simulate_delete(target, query_string=params, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
def test_list(self):
path = self.queue_path + '/messages'
self._post_messages(path, repeat=10)
query_string = 'limit=3&echo=true'
body = self.simulate_get(path,
query_string=query_string,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
cnt = 0
while jsonutils.loads(body[0])['messages'] != []:
contents = jsonutils.loads(body[0])
[target, params] = contents['links'][0]['href'].split('?')
for msg in contents['messages']:
self.simulate_get(msg['href'], headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
body = self.simulate_get(target,
query_string=params,
headers=self.headers)
cnt += 1
self.assertEqual(cnt, 4)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
self._empty_message_list(body)
# Stats
body = self.simulate_get(self.queue_path + '/stats',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
message_stats = jsonutils.loads(body[0])['messages']
# NOTE(kgriffs): The other parts of the stats are tested
# in tests.storage.base and so are not repeated here.
expected_pattern = self.queue_path + '/messages/[^/]+$'
for message_stat_name in ('oldest', 'newest'):
self.assertThat(message_stats[message_stat_name]['href'],
matchers.MatchesRegex(expected_pattern))
# NOTE(kgriffs): Try to get messages for a missing queue
body = self.simulate_get(self.url_prefix +
'/queues/nonexistent/messages',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
self._empty_message_list(body)
def test_list_with_bad_marker(self):
path = self.queue_path + '/messages'
self._post_messages(path, repeat=5)
query_string = 'limit=3&echo=true&marker=sfhlsfdjh2048'
body = self.simulate_get(path,
query_string=query_string,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
self._empty_message_list(body)
def test_no_uuid(self):
headers = {
'Client-ID': "textid",
'X-Project-ID': '7e7e7e'
}
path = self.queue_path + '/messages'
self.simulate_post(path, body='[{"body": 0, "ttl": 100}]',
headers=headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
self.simulate_get(path, headers=headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_get_claimed_contains_claim_id_in_href(self):
path = self.queue_path
res = self._post_messages(path + '/messages', repeat=5)
for url in jsonutils.loads(res[0])['resources']:
message = self.simulate_get(url)
self.assertNotIn('claim_id', jsonutils.loads(message[0])['href'])
self.simulate_post(path + '/claims',
body='{"ttl": 100, "grace": 100}',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_201)
for url in jsonutils.loads(res[0])['resources']:
message = self.simulate_get(url)
self.assertIn('claim_id', jsonutils.loads(message[0])['href'])
# NOTE(cpp-cabrera): regression test against bug #1210633
def test_when_claim_deleted_then_messages_unclaimed(self):
path = self.queue_path
self._post_messages(path + '/messages', repeat=5)
# post claim
self.simulate_post(path + '/claims',
body='{"ttl": 100, "grace": 100}',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_201)
location = self.srmock.headers_dict['location']
# release claim
self.simulate_delete(location, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
# get unclaimed messages
self.simulate_get(path + '/messages',
query_string='echo=true',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
# NOTE(cpp-cabrera): regression test against bug #1203842
def test_get_nonexistent_message_404s(self):
path = self.url_prefix + '/queues/notthere/messages/a'
self.simulate_get(path, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_404)
def test_get_multiple_invalid_messages_404s(self):
path = self.url_prefix + '/queues/notthere/messages'
self.simulate_get(path, query_string='ids=a,b,c',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_404)
def test_delete_multiple_invalid_messages_204s(self):
path = self.url_prefix + '/queues/notthere/messages'
self.simulate_delete(path, query_string='ids=a,b,c',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
def test_delete_message_with_invalid_claim_doesnt_delete_message(self):
path = self.queue_path
resp = self._post_messages(path + '/messages', 1)
location = jsonutils.loads(resp[0])['resources'][0]
self.simulate_delete(location, query_string='claim_id=invalid',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
self.simulate_get(location, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
def test_no_duplicated_messages_path_in_href(self):
"""Test for bug 1240897."""
path = self.queue_path + '/messages'
self._post_messages(path, repeat=1)
msg_id = self._get_msg_id(self.srmock.headers_dict)
query_string = 'ids=%s' % msg_id
body = self.simulate_get(path,
query_string=query_string,
headers=self.headers)
messages = jsonutils.loads(body[0])
self.assertNotIn(self.queue_path + '/messages/messages',
messages['messages'][0]['href'])
def _post_messages(self, target, repeat=1):
doc = {'messages': [{'body': 239, 'ttl': 300}] * repeat}
body = jsonutils.dumps(doc)
return self.simulate_post(target, body=body, headers=self.headers)
def _get_msg_id(self, headers):
return self._get_msg_ids(headers)[0]
def _get_msg_ids(self, headers):
return headers['location'].rsplit('=', 1)[-1].split(',')
class TestMessagesSqlalchemy(MessagesBaseTest):
config_file = 'wsgi_sqlalchemy.conf'
class TestMessagesMongoDB(MessagesBaseTest):
config_file = 'wsgi_mongodb.conf'
@testing.requires_mongodb
def setUp(self):
super(TestMessagesMongoDB, self).setUp()
def tearDown(self):
super(TestMessagesMongoDB, self).tearDown()
class TestMessagesMongoDBPooled(MessagesBaseTest):
config_file = 'wsgi_mongodb_pooled.conf'
@testing.requires_mongodb
def setUp(self):
super(TestMessagesMongoDBPooled, self).setUp()
def tearDown(self):
super(TestMessagesMongoDBPooled, self).tearDown()
# TODO(cpp-cabrera): remove this skipTest once pooled queue
# listing is implemented
def test_list(self):
self.skipTest("Need to implement pooled queue listing.")
class TestMessagesFaultyDriver(base.V2BaseFaulty):
config_file = 'wsgi_faulty.conf'
def test_simple(self):
project_id = 'xyz'
path = self.url_prefix + '/queues/fizbit/messages'
body = '{"messages": [{"body": 239, "ttl": 100}]}'
headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': project_id
}
self.simulate_post(path,
body=body,
headers=headers)
self.assertEqual(self.srmock.status, falcon.HTTP_503)
self.simulate_get(path,
headers=headers)
self.assertEqual(self.srmock.status, falcon.HTTP_503)
self.simulate_get(path + '/nonexistent', headers=headers)
self.assertEqual(self.srmock.status, falcon.HTTP_503)
self.simulate_delete(path + '/nada', headers=headers)
self.assertEqual(self.srmock.status, falcon.HTTP_503)

View File

@ -0,0 +1,342 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import contextlib
import uuid
import ddt
import falcon
from oslo.serialization import jsonutils
from zaqar import tests as testing
from zaqar.tests.unit.transport.wsgi import base
@contextlib.contextmanager
def pool(test, name, weight, uri, group=None, options={}):
"""A context manager for constructing a pool for use in testing.
Deletes the pool after exiting the context.
:param test: Must expose simulate_* methods
:param name: Name for this pool
:type name: six.text_type
:type weight: int
:type uri: six.text_type
:type options: dict
:returns: (name, weight, uri, options)
:rtype: see above
"""
doc = {'weight': weight, 'uri': uri,
'group': group, 'options': options}
path = test.url_prefix + '/pools/' + name
test.simulate_put(path, body=jsonutils.dumps(doc))
try:
yield name, weight, uri, group, options
finally:
test.simulate_delete(path)
@contextlib.contextmanager
def pools(test, count, uri, group):
"""A context manager for constructing pools for use in testing.
Deletes the pools after exiting the context.
:param test: Must expose simulate_* methods
:param count: Number of pools to create
:type count: int
:returns: (paths, weights, uris, options)
:rtype: ([six.text_type], [int], [six.text_type], [dict])
"""
base = test.url_prefix + '/pools/'
args = [(base + str(i), i,
{str(i): i})
for i in range(count)]
for path, weight, option in args:
doc = {'weight': weight, 'uri': uri,
'group': group, 'options': option}
test.simulate_put(path, body=jsonutils.dumps(doc))
try:
yield args
finally:
for path, _, _ in args:
test.simulate_delete(path)
@ddt.ddt
class PoolsBaseTest(base.V2Base):
def setUp(self):
super(PoolsBaseTest, self).setUp()
self.doc = {'weight': 100,
'group': 'mygroup',
'uri': 'sqlite://:memory:'}
self.pool = self.url_prefix + '/pools/' + str(uuid.uuid1())
self.simulate_put(self.pool, body=jsonutils.dumps(self.doc))
self.assertEqual(self.srmock.status, falcon.HTTP_201)
def tearDown(self):
super(PoolsBaseTest, self).tearDown()
self.simulate_delete(self.pool)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
def test_put_pool_works(self):
name = str(uuid.uuid1())
weight, uri = self.doc['weight'], self.doc['uri']
with pool(self, name, weight, uri, group='my-group'):
self.assertEqual(self.srmock.status, falcon.HTTP_201)
def test_put_raises_if_missing_fields(self):
path = self.url_prefix + '/pools/' + str(uuid.uuid1())
self.simulate_put(path, body=jsonutils.dumps({'weight': 100}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
self.simulate_put(path,
body=jsonutils.dumps(
{'uri': 'sqlite://:memory:'}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(-1, 2**32+1, 'big')
def test_put_raises_if_invalid_weight(self, weight):
path = self.url_prefix + '/pools/' + str(uuid.uuid1())
doc = {'weight': weight, 'uri': 'a'}
self.simulate_put(path,
body=jsonutils.dumps(doc))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(-1, 2**32+1, [], 'localhost:27017')
def test_put_raises_if_invalid_uri(self, uri):
path = self.url_prefix + '/pools/' + str(uuid.uuid1())
self.simulate_put(path,
body=jsonutils.dumps({'weight': 1, 'uri': uri}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(-1, 'wee', [])
def test_put_raises_if_invalid_options(self, options):
path = self.url_prefix + '/pools/' + str(uuid.uuid1())
doc = {'weight': 1, 'uri': 'a', 'options': options}
self.simulate_put(path, body=jsonutils.dumps(doc))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_put_existing_overwrites(self):
# NOTE(cabrera): setUp creates default pool
expect = self.doc
self.simulate_put(self.pool,
body=jsonutils.dumps(expect))
self.assertEqual(self.srmock.status, falcon.HTTP_201)
result = self.simulate_get(self.pool)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
doc = jsonutils.loads(result[0])
self.assertEqual(doc['weight'], expect['weight'])
self.assertEqual(doc['uri'], expect['uri'])
def test_delete_works(self):
self.simulate_delete(self.pool)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
self.simulate_get(self.pool)
self.assertEqual(self.srmock.status, falcon.HTTP_404)
def test_get_nonexisting_raises_404(self):
self.simulate_get(self.url_prefix + '/pools/nonexisting')
self.assertEqual(self.srmock.status, falcon.HTTP_404)
def _pool_expect(self, pool, xhref, xweight, xuri):
self.assertIn('href', pool)
self.assertEqual(pool['href'], xhref)
self.assertIn('weight', pool)
self.assertEqual(pool['weight'], xweight)
self.assertIn('uri', pool)
self.assertEqual(pool['uri'], xuri)
def test_get_works(self):
result = self.simulate_get(self.pool)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
pool = jsonutils.loads(result[0])
self._pool_expect(pool, self.pool, self.doc['weight'],
self.doc['uri'])
def test_detailed_get_works(self):
result = self.simulate_get(self.pool,
query_string='?detailed=True')
self.assertEqual(self.srmock.status, falcon.HTTP_200)
pool = jsonutils.loads(result[0])
self._pool_expect(pool, self.pool, self.doc['weight'],
self.doc['uri'])
self.assertIn('options', pool)
self.assertEqual(pool['options'], {})
def test_patch_raises_if_missing_fields(self):
self.simulate_patch(self.pool,
body=jsonutils.dumps({'location': 1}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def _patch_test(self, doc):
self.simulate_patch(self.pool,
body=jsonutils.dumps(doc))
self.assertEqual(self.srmock.status, falcon.HTTP_200)
result = self.simulate_get(self.pool,
query_string='?detailed=True')
self.assertEqual(self.srmock.status, falcon.HTTP_200)
pool = jsonutils.loads(result[0])
self._pool_expect(pool, self.pool, doc['weight'],
doc['uri'])
self.assertEqual(pool['options'], doc['options'])
def test_patch_works(self):
doc = {'weight': 101, 'uri': 'sqlite://:memory:', 'options': {'a': 1}}
self._patch_test(doc)
def test_patch_works_with_extra_fields(self):
doc = {'weight': 101, 'uri': 'sqlite://:memory:', 'options': {'a': 1},
'location': 100, 'partition': 'taco'}
self._patch_test(doc)
@ddt.data(-1, 2**32+1, 'big')
def test_patch_raises_400_on_invalid_weight(self, weight):
self.simulate_patch(self.pool,
body=jsonutils.dumps({'weight': weight}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(-1, 2**32+1, [], 'localhost:27017')
def test_patch_raises_400_on_invalid_uri(self, uri):
self.simulate_patch(self.pool,
body=jsonutils.dumps({'uri': uri}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(-1, 'wee', [])
def test_patch_raises_400_on_invalid_options(self, options):
self.simulate_patch(self.pool,
body=jsonutils.dumps({'options': options}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_patch_raises_404_if_pool_not_found(self):
self.simulate_patch(self.url_prefix + '/pools/notexists',
body=jsonutils.dumps({'weight': 1}))
self.assertEqual(self.srmock.status, falcon.HTTP_404)
def test_empty_listing(self):
self.simulate_delete(self.pool)
result = self.simulate_get(self.url_prefix + '/pools')
results = jsonutils.loads(result[0])
self.assertEqual(self.srmock.status, falcon.HTTP_200)
self.assertTrue(len(results['pools']) == 0)
self.assertIn('links', results)
def _listing_test(self, count=10, limit=10,
marker=None, detailed=False):
# NOTE(cpp-cabrera): delete initial pool - it will interfere
# with listing tests
self.simulate_delete(self.pool)
query = '?limit={0}&detailed={1}'.format(limit, detailed)
if marker:
query += '&marker={0}'.format(marker)
with pools(self, count, self.doc['uri'], 'my-group') as expected:
result = self.simulate_get(self.url_prefix + '/pools',
query_string=query)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
results = jsonutils.loads(result[0])
self.assertIsInstance(results, dict)
self.assertIn('pools', results)
self.assertIn('links', results)
pool_list = results['pools']
link = results['links'][0]
self.assertEqual('next', link['rel'])
href = falcon.uri.parse_query_string(link['href'])
self.assertIn('marker', href)
self.assertEqual(href['limit'], str(limit))
self.assertEqual(href['detailed'], str(detailed).lower())
next_query_string = ('?marker={marker}&limit={limit}'
'&detailed={detailed}').format(**href)
next_result = self.simulate_get(link['href'].split('?')[0],
query_string=next_query_string)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
next_pool = jsonutils.loads(next_result[0])
next_pool_list = next_pool['pools']
self.assertIn('links', next_pool)
if limit < count:
self.assertEqual(len(next_pool_list),
min(limit, count-limit))
else:
# NOTE(jeffrey4l): when limit >= count, there will be no
# pools in the 2nd page.
self.assertTrue(len(next_pool_list) == 0)
self.assertEqual(len(pool_list), min(limit, count))
for s in pool_list + next_pool_list:
# NOTE(flwang): It can't assumed that both sqlalchemy and
# mongodb can return query result with the same order. Just
# like the order they're inserted. Actually, sqlalchemy can't
# guarantee that. So we're leveraging the relationship between
# pool weight and the index of pools fixture to get the
# right pool to verify.
expect = expected[s['weight']]
path, weight, group = expect[:3]
self._pool_expect(s, path, weight, self.doc['uri'])
if detailed:
self.assertIn('options', s)
self.assertEqual(s['options'], expect[-1])
else:
self.assertNotIn('options', s)
def test_listing_works(self):
self._listing_test()
def test_detailed_listing_works(self):
self._listing_test(detailed=True)
@ddt.data(1, 5, 10, 15)
def test_listing_works_with_limit(self, limit):
self._listing_test(count=15, limit=limit)
def test_listing_marker_is_respected(self):
self.simulate_delete(self.pool)
with pools(self, 10, self.doc['uri'], 'my-group') as expected:
result = self.simulate_get(self.url_prefix + '/pools',
query_string='?marker=3')
self.assertEqual(self.srmock.status, falcon.HTTP_200)
pool_list = jsonutils.loads(result[0])['pools']
self.assertEqual(len(pool_list), 6)
path, weight = expected[4][:2]
self._pool_expect(pool_list[0], path, weight, self.doc['uri'])
class TestPoolsMongoDB(PoolsBaseTest):
config_file = 'wsgi_mongodb_pooled.conf'
@testing.requires_mongodb
def setUp(self):
super(TestPoolsMongoDB, self).setUp()
class TestPoolsSqlalchemy(PoolsBaseTest):
config_file = 'wsgi_sqlalchemy_pooled.conf'
def setUp(self):
super(TestPoolsSqlalchemy, self).setUp()

View File

@ -0,0 +1,380 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import uuid
import ddt
import falcon
from oslo.serialization import jsonutils
import six
from zaqar import tests as testing
from zaqar.tests.unit.transport.wsgi import base
@ddt.ddt
class QueueLifecycleBaseTest(base.V2Base):
config_file = None
def setUp(self):
super(QueueLifecycleBaseTest, self).setUp()
self.queue_path = self.url_prefix + '/queues'
self.gumshoe_queue_path = self.queue_path + '/gumshoe'
self.fizbat_queue_path = self.queue_path + '/fizbat'
self.headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': '3387309841abc_'
}
def test_empty_project_id(self):
headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': ''
}
self.simulate_put(self.gumshoe_queue_path, headers=headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
self.simulate_delete(self.gumshoe_queue_path, headers=headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data('480924', 'foo')
def test_basics_thoroughly(self, project_id):
headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': project_id
}
gumshoe_queue_path_stats = self.gumshoe_queue_path + '/stats'
# Stats are empty - queue not created yet
self.simulate_get(gumshoe_queue_path_stats, headers=headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
# Create
doc = '{"messages": {"ttl": 600}}'
self.simulate_put(self.gumshoe_queue_path,
headers=headers, body=doc)
self.assertEqual(self.srmock.status, falcon.HTTP_201)
location = self.srmock.headers_dict['Location']
self.assertEqual(location, self.gumshoe_queue_path)
# Fetch metadata
result = self.simulate_get(self.gumshoe_queue_path,
headers=headers)
result_doc = jsonutils.loads(result[0])
self.assertEqual(self.srmock.status, falcon.HTTP_200)
self.assertEqual(result_doc, jsonutils.loads(doc))
# Stats empty queue
self.simulate_get(gumshoe_queue_path_stats, headers=headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
# Delete
self.simulate_delete(self.gumshoe_queue_path, headers=headers)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
# Get non-existent stats
self.simulate_get(gumshoe_queue_path_stats, headers=headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
def test_name_restrictions(self):
self.simulate_put(self.queue_path + '/Nice-Boat_2',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_201)
self.simulate_put(self.queue_path + '/Nice-Bo@t',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
self.simulate_put(self.queue_path + '/_' + 'niceboat' * 8,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_project_id_restriction(self):
muvluv_queue_path = self.queue_path + '/Muv-Luv'
self.simulate_put(muvluv_queue_path,
headers={'Client-ID': str(uuid.uuid4()),
'X-Project-ID': 'JAM Project' * 24})
self.assertEqual(self.srmock.status, falcon.HTTP_400)
# no charset restrictions
self.simulate_put(muvluv_queue_path,
headers={'Client-ID': str(uuid.uuid4()),
'X-Project-ID': 'JAM Project'})
self.assertEqual(self.srmock.status, falcon.HTTP_201)
def test_non_ascii_name(self):
test_params = ((u'/queues/non-ascii-n\u0153me', 'utf-8'),
(u'/queues/non-ascii-n\xc4me', 'iso8859-1'))
for uri, enc in test_params:
uri = self.url_prefix + uri
if six.PY2:
uri = uri.encode(enc)
self.simulate_put(uri, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
self.simulate_delete(uri, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_no_metadata(self):
self.simulate_put(self.fizbat_queue_path,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_201)
self.simulate_put(self.fizbat_queue_path, body='',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
@ddt.data('{', '[]', '.', ' ')
def test_bad_metadata(self, document):
self.simulate_put(self.fizbat_queue_path,
headers=self.headers,
body=document)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_too_much_metadata(self):
self.simulate_put(self.fizbat_queue_path, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_201)
doc = '{{"messages": {{"ttl": 600}}, "padding": "{pad}"}}'
max_size = self.transport_cfg.max_queue_metadata
padding_len = max_size - (len(doc) - 10) + 1
doc = doc.format(pad='x' * padding_len)
self.simulate_put(self.fizbat_queue_path,
headers=self.headers,
body=doc)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_way_too_much_metadata(self):
self.simulate_put(self.fizbat_queue_path, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_201)
doc = '{{"messages": {{"ttl": 600}}, "padding": "{pad}"}}'
max_size = self.transport_cfg.max_queue_metadata
padding_len = max_size * 100
doc = doc.format(pad='x' * padding_len)
self.simulate_put(self.fizbat_queue_path,
headers=self.headers, body=doc)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_custom_metadata(self):
# Set
doc = '{{"messages": {{"ttl": 600}}, "padding": "{pad}"}}'
max_size = self.transport_cfg.max_queue_metadata
padding_len = max_size - (len(doc) - 2)
doc = doc.format(pad='x' * padding_len)
self.simulate_put(self.fizbat_queue_path,
headers=self.headers,
body=doc)
self.assertEqual(self.srmock.status, falcon.HTTP_201)
# Get
result = self.simulate_get(self.fizbat_queue_path,
headers=self.headers)
result_doc = jsonutils.loads(result[0])
self.assertEqual(result_doc, jsonutils.loads(doc))
self.assertEqual(self.srmock.status, falcon.HTTP_200)
def test_update_metadata(self):
self.skip("This should use patch instead")
xyz_queue_path = self.url_prefix + '/queues/xyz'
xyz_queue_path_metadata = xyz_queue_path
# Create
self.simulate_put(xyz_queue_path, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_201)
# Set meta
doc1 = '{"messages": {"ttl": 600}}'
self.simulate_put(xyz_queue_path_metadata,
headers=self.headers,
body=doc1)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
# Update
doc2 = '{"messages": {"ttl": 100}}'
self.simulate_put(xyz_queue_path_metadata,
headers=self.headers,
body=doc2)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
# Get
result = self.simulate_get(xyz_queue_path_metadata,
headers=self.headers)
result_doc = jsonutils.loads(result[0])
self.assertEqual(result_doc, jsonutils.loads(doc2))
def test_list(self):
arbitrary_number = 644079696574693
project_id = str(arbitrary_number)
client_id = str(uuid.uuid4())
header = {
'X-Project-ID': project_id,
'Client-ID': client_id
}
# NOTE(kgriffs): It's important that this one sort after the one
# above. This is in order to prove that bug/1236605 is fixed, and
# stays fixed!
alt_project_id = str(arbitrary_number + 1)
# List empty
result = self.simulate_get(self.queue_path, headers=header)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
results = jsonutils.loads(result[0])
self.assertEqual(results['queues'], [])
self.assertIn('links', results)
link = results['links'][0]
self.assertEqual('next', link['rel'])
href = falcon.uri.parse_query_string(link['href'])
self.assertNotIn('marker', href)
# Payload exceeded
self.simulate_get(self.queue_path, headers=header,
query_string='limit=21')
self.assertEqual(self.srmock.status, falcon.HTTP_400)
# Create some
def create_queue(name, project_id, body):
altheader = {'Client-ID': client_id}
if project_id is not None:
altheader['X-Project-ID'] = project_id
uri = self.queue_path + '/' + name
self.simulate_put(uri, headers=altheader, body=body)
create_queue('q1', project_id, '{"node": 31}')
create_queue('q2', project_id, '{"node": 32}')
create_queue('q3', project_id, '{"node": 33}')
create_queue('q3', alt_project_id, '{"alt": 1}')
# List (limit)
result = self.simulate_get(self.queue_path, headers=header,
query_string='limit=2')
result_doc = jsonutils.loads(result[0])
self.assertEqual(len(result_doc['queues']), 2)
# List (no metadata, get all)
result = self.simulate_get(self.queue_path,
headers=header, query_string='limit=5')
result_doc = jsonutils.loads(result[0])
[target, params] = result_doc['links'][0]['href'].split('?')
self.assertEqual(self.srmock.status, falcon.HTTP_200)
# Ensure we didn't pick up the queue from the alt project.
queues = result_doc['queues']
self.assertEqual(len(queues), 3)
# List with metadata
result = self.simulate_get(self.queue_path, headers=header,
query_string='detailed=true')
self.assertEqual(self.srmock.status, falcon.HTTP_200)
result_doc = jsonutils.loads(result[0])
[target, params] = result_doc['links'][0]['href'].split('?')
queue = result_doc['queues'][0]
result = self.simulate_get(queue['href'], headers=header)
result_doc = jsonutils.loads(result[0])
self.assertEqual(result_doc, queue['metadata'])
self.assertEqual(result_doc, {'node': 31})
# List tail
self.simulate_get(target, headers=header, query_string=params)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
# List manually-constructed tail
self.simulate_get(target, headers=header, query_string='marker=zzz')
self.assertEqual(self.srmock.status, falcon.HTTP_200)
class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest):
config_file = 'wsgi_mongodb.conf'
@testing.requires_mongodb
def setUp(self):
super(TestQueueLifecycleMongoDB, self).setUp()
def tearDown(self):
storage = self.boot.storage._storage
connection = storage.connection
connection.drop_database(storage.queues_database)
for db in storage.message_databases:
connection.drop_database(db)
super(TestQueueLifecycleMongoDB, self).tearDown()
class TestQueueLifecycleSqlalchemy(QueueLifecycleBaseTest):
config_file = 'wsgi_sqlalchemy.conf'
class TestQueueLifecycleFaultyDriver(base.V2BaseFaulty):
config_file = 'wsgi_faulty.conf'
def test_simple(self):
self.headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': '338730984abc_1'
}
gumshoe_queue_path = self.url_prefix + '/queues/gumshoe'
doc = '{"messages": {"ttl": 600}}'
self.simulate_put(gumshoe_queue_path,
headers=self.headers,
body=doc)
self.assertEqual(self.srmock.status, falcon.HTTP_503)
location = ('Location', gumshoe_queue_path)
self.assertNotIn(location, self.srmock.headers)
result = self.simulate_get(gumshoe_queue_path,
headers=self.headers)
result_doc = jsonutils.loads(result[0])
self.assertEqual(self.srmock.status, falcon.HTTP_503)
self.assertNotEqual(result_doc, jsonutils.loads(doc))
self.simulate_get(gumshoe_queue_path + '/stats',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_503)
self.simulate_get(self.url_prefix + '/queues',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_503)
self.simulate_delete(gumshoe_queue_path, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_503)

View File

@ -0,0 +1,91 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import uuid
import falcon
from zaqar.tests.unit.transport.wsgi import base
class TestValidation(base.V2Base):
config_file = 'wsgi_sqlalchemy_validation.conf'
def setUp(self):
super(TestValidation, self).setUp()
self.project_id = '7e55e1a7e'
self.queue_path = self.url_prefix + '/queues/noein'
self.simulate_put(self.queue_path, self.project_id)
self.headers = {
'Client-ID': str(uuid.uuid4()),
}
def tearDown(self):
self.simulate_delete(self.queue_path, self.project_id)
super(TestValidation, self).tearDown()
def test_metadata_deserialization(self):
# Normal case
self.simulate_put(self.queue_path,
self.project_id,
body='{"timespace": "Shangri-la"}')
self.assertEqual(self.srmock.status, falcon.HTTP_204)
# Too long
max_queue_metadata = 64
doc_tmpl = '{{"Dragon Torc":"{0}"}}'
doc_tmpl_ws = '{{ "Dragon Torc" : "{0}" }}' # with whitespace
envelope_length = len(doc_tmpl.format(''))
for tmpl in doc_tmpl, doc_tmpl_ws:
gen = '0' * (max_queue_metadata - envelope_length + 1)
doc = tmpl.format(gen)
self.simulate_put(self.queue_path,
self.project_id,
body=doc)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_message_deserialization(self):
# Normal case
body = '{"messages": [{"body": "Dragon Knights", "ttl": 100}]}'
self.simulate_post(self.queue_path + '/messages',
self.project_id, body=body,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_201)
# Both messages' size are too long
max_messages_post_size = 256
obj = {'a': 0, 'b': ''}
envelope_length = len(json.dumps(obj, separators=(',', ':')))
obj['b'] = 'x' * (max_messages_post_size - envelope_length + 1)
for long_body in ('a' * (max_messages_post_size - 2 + 1), obj):
doc = json.dumps([{'body': long_body, 'ttl': 100}])
self.simulate_post(self.queue_path + '/messages',
self.project_id,
body=doc,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)

View File

@ -28,6 +28,7 @@ from zaqar.transport import auth
from zaqar.transport import validation
from zaqar.transport.wsgi import v1_0
from zaqar.transport.wsgi import v1_1
from zaqar.transport.wsgi import v2_0
from zaqar.transport.wsgi import version
_WSGI_OPTIONS = (
@ -83,6 +84,7 @@ class Driver(transport.DriverBase):
catalog = [
('/v1', v1_0.public_endpoints(self, self._conf)),
('/v1.1', v1_1.public_endpoints(self, self._conf)),
('/v2', v2_0.public_endpoints(self, self._conf)),
('/', [('', version.Resource())])
]
@ -90,6 +92,7 @@ class Driver(transport.DriverBase):
catalog.extend([
('/v1', v1_0.private_endpoints(self, self._conf)),
('/v1.1', v1_1.private_endpoints(self, self._conf)),
('/v2', v2_0.private_endpoints(self, self._conf)),
])
self.app = falcon.API(before=self.before_hooks)

View File

@ -24,7 +24,7 @@ from zaqar.transport.wsgi.v1_1 import stats
VERSION = {
'id': '1.1',
'status': 'CURRENT',
'status': 'SUPPORTED',
'updated': '2014-9-24T04:06:47Z',
'media-types': [
{

View File

@ -0,0 +1,121 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
from zaqar.transport.wsgi.v2_0 import claims
from zaqar.transport.wsgi.v2_0 import flavors
from zaqar.transport.wsgi.v2_0 import health
from zaqar.transport.wsgi.v2_0 import homedoc
from zaqar.transport.wsgi.v2_0 import messages
from zaqar.transport.wsgi.v2_0 import ping
from zaqar.transport.wsgi.v2_0 import pools
from zaqar.transport.wsgi.v2_0 import queues
from zaqar.transport.wsgi.v2_0 import stats
VERSION = {
'id': '2',
'status': 'CURRENT',
'updated': '2014-9-24T04:06:47Z',
'media-types': [
{
'base': 'application/json',
'type': 'application/vnd.openstack.messaging-v2+json'
}
],
'links': [
{
'href': '/v2/',
'rel': 'self'
}
]
}
def public_endpoints(driver, conf):
queue_controller = driver._storage.queue_controller
message_controller = driver._storage.message_controller
claim_controller = driver._storage.claim_controller
defaults = driver._defaults
return [
# Home
('/',
homedoc.Resource(conf)),
# Queues Endpoints
('/queues',
queues.CollectionResource(driver._validate,
queue_controller)),
('/queues/{queue_name}',
queues.ItemResource(driver._validate,
queue_controller,
message_controller)),
('/queues/{queue_name}/stats',
stats.Resource(queue_controller)),
# Messages Endpoints
('/queues/{queue_name}/messages',
messages.CollectionResource(driver._wsgi_conf,
driver._validate,
message_controller,
queue_controller,
defaults.message_ttl)),
('/queues/{queue_name}/messages/{message_id}',
messages.ItemResource(message_controller)),
# Claims Endpoints
('/queues/{queue_name}/claims',
claims.CollectionResource(driver._wsgi_conf,
driver._validate,
claim_controller,
defaults.claim_ttl,
defaults.claim_grace)),
('/queues/{queue_name}/claims/{claim_id}',
claims.ItemResource(driver._wsgi_conf,
driver._validate,
claim_controller,
defaults.claim_ttl,
defaults.claim_grace)),
# Ping
('/ping',
ping.Resource(driver._storage))
]
def private_endpoints(driver, conf):
catalogue = [
# Health
('/health',
health.Resource(driver._storage)),
]
if conf.pooling:
pools_controller = driver._control.pools_controller
flavors_controller = driver._control.flavors_controller
catalogue.extend([
('/pools',
pools.Listing(pools_controller)),
('/pools/{pool}',
pools.Resource(pools_controller)),
('/flavors',
flavors.Listing(flavors_controller)),
('/flavors/{flavor}',
flavors.Resource(flavors_controller)),
])
return catalogue

View File

@ -0,0 +1,215 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import six
from zaqar.i18n import _
import zaqar.openstack.common.log as logging
from zaqar.storage import errors as storage_errors
from zaqar.transport import utils
from zaqar.transport import validation
from zaqar.transport.wsgi import errors as wsgi_errors
from zaqar.transport.wsgi import utils as wsgi_utils
LOG = logging.getLogger(__name__)
class CollectionResource(object):
__slots__ = (
'_claim_controller',
'_validate',
'_claim_post_spec',
'_default_meta',
)
def __init__(self, wsgi_conf, validate, claim_controller,
default_claim_ttl, default_grace_ttl):
self._claim_controller = claim_controller
self._validate = validate
self._claim_post_spec = (
('ttl', int, default_claim_ttl),
('grace', int, default_grace_ttl),
)
# NOTE(kgriffs): Create this once up front, rather than creating
# a new dict every time, for the sake of performance.
self._default_meta = {
'ttl': default_claim_ttl,
'grace': default_grace_ttl,
}
def on_post(self, req, resp, project_id, queue_name):
LOG.debug(u'Claims collection POST - queue: %(queue)s, '
u'project: %(project)s',
{'queue': queue_name, 'project': project_id})
# Check for an explicit limit on the # of messages to claim
limit = req.get_param_as_int('limit')
claim_options = {} if limit is None else {'limit': limit}
# NOTE(kgriffs): Clients may or may not actually include the
# Content-Length header when the body is empty; the following
# check works for both 0 and None.
if not req.content_length:
# No values given, so use defaults
metadata = self._default_meta
else:
# Read claim metadata (e.g., TTL) and raise appropriate
# HTTP errors as needed.
document = wsgi_utils.deserialize(req.stream, req.content_length)
metadata = wsgi_utils.sanitize(document, self._claim_post_spec)
# Claim some messages
try:
self._validate.claim_creation(metadata, limit=limit)
cid, msgs = self._claim_controller.create(
queue_name,
metadata=metadata,
project=project_id,
**claim_options)
# Buffer claimed messages
# TODO(kgriffs): optimize, along with serialization (below)
resp_msgs = list(msgs)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except Exception as ex:
LOG.exception(ex)
description = _(u'Claim could not be created.')
raise wsgi_errors.HTTPServiceUnavailable(description)
# Serialize claimed messages, if any. This logic assumes
# the storage driver returned well-formed messages.
if len(resp_msgs) != 0:
base_path = req.path.rpartition('/')[0]
resp_msgs = [wsgi_utils.format_message_v1_1(msg, base_path, cid)
for msg in resp_msgs]
resp.location = req.path + '/' + cid
resp.body = utils.to_json({'messages': resp_msgs})
resp.status = falcon.HTTP_201
else:
resp.status = falcon.HTTP_204
class ItemResource(object):
__slots__ = ('_claim_controller', '_validate', '_claim_patch_spec')
def __init__(self, wsgi_conf, validate, claim_controller,
default_claim_ttl, default_grace_ttl):
self._claim_controller = claim_controller
self._validate = validate
self._claim_patch_spec = (
('ttl', int, default_claim_ttl),
('grace', int, default_grace_ttl),
)
def on_get(self, req, resp, project_id, queue_name, claim_id):
LOG.debug(u'Claim item GET - claim: %(claim_id)s, '
u'queue: %(queue_name)s, project: %(project_id)s',
{'queue_name': queue_name,
'project_id': project_id,
'claim_id': claim_id})
try:
meta, msgs = self._claim_controller.get(
queue_name,
claim_id=claim_id,
project=project_id)
# Buffer claimed messages
# TODO(kgriffs): Optimize along with serialization (see below)
meta['messages'] = list(msgs)
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
except Exception as ex:
LOG.exception(ex)
description = _(u'Claim could not be queried.')
raise wsgi_errors.HTTPServiceUnavailable(description)
# Serialize claimed messages
# TODO(kgriffs): Optimize
base_path = req.path.rsplit('/', 2)[0]
meta['messages'] = [wsgi_utils.format_message_v1_1(msg, base_path,
claim_id)
for msg in meta['messages']]
meta['href'] = req.path
del meta['id']
resp.body = utils.to_json(meta)
# status defaults to 200
def on_patch(self, req, resp, project_id, queue_name, claim_id):
LOG.debug(u'Claim Item PATCH - claim: %(claim_id)s, '
u'queue: %(queue_name)s, project:%(project_id)s' %
{'queue_name': queue_name,
'project_id': project_id,
'claim_id': claim_id})
# Read claim metadata (e.g., TTL) and raise appropriate
# HTTP errors as needed.
document = wsgi_utils.deserialize(req.stream, req.content_length)
metadata = wsgi_utils.sanitize(document, self._claim_patch_spec)
try:
self._validate.claim_updating(metadata)
self._claim_controller.update(queue_name,
claim_id=claim_id,
metadata=metadata,
project=project_id)
resp.status = falcon.HTTP_204
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
except Exception as ex:
LOG.exception(ex)
description = _(u'Claim could not be updated.')
raise wsgi_errors.HTTPServiceUnavailable(description)
def on_delete(self, req, resp, project_id, queue_name, claim_id):
LOG.debug(u'Claim item DELETE - claim: %(claim_id)s, '
u'queue: %(queue_name)s, project: %(project_id)s' %
{'queue_name': queue_name,
'project_id': project_id,
'claim_id': claim_id})
try:
self._claim_controller.delete(queue_name,
claim_id=claim_id,
project=project_id)
resp.status = falcon.HTTP_204
except Exception as ex:
LOG.exception(ex)
description = _(u'Claim could not be deleted.')
raise wsgi_errors.HTTPServiceUnavailable(description)

View File

@ -0,0 +1,208 @@
# Copyright (c) 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import jsonschema
from zaqar.common.api.schemas import flavors as schema
from zaqar.common import utils as common_utils
from zaqar.i18n import _
from zaqar.openstack.common import log
from zaqar.storage import errors
from zaqar.transport import utils as transport_utils
from zaqar.transport.wsgi import errors as wsgi_errors
from zaqar.transport.wsgi import utils as wsgi_utils
LOG = log.getLogger(__name__)
class Listing(object):
"""A resource to list registered flavors
:param flavors_controller: means to interact with storage
"""
def __init__(self, flavors_controller):
self._ctrl = flavors_controller
def on_get(self, request, response, project_id):
"""Returns a flavor listing as objects embedded in an object:
::
{
"flavors": [
{"href": "", "capabilities": {}, "pool": ""},
...
],
"links": [
{"rel": "next", "href": ""},
...
]
}
:returns: HTTP | 200
"""
LOG.debug(u'LIST flavors for project_id %s' % project_id)
store = {}
request.get_param('marker', store=store)
request.get_param_as_int('limit', store=store)
request.get_param_as_bool('detailed', store=store)
cursor = self._ctrl.list(project=project_id, **store)
flavors = list(next(cursor))
results = {}
if flavors:
store['marker'] = next(cursor)
for entry in flavors:
entry['href'] = request.path + '/' + entry.pop('name')
results['links'] = [
{
'rel': 'next',
'href': request.path + falcon.to_query_str(store)
}
]
results['flavors'] = flavors
response.body = transport_utils.to_json(results)
response.status = falcon.HTTP_200
class Resource(object):
"""A handler for individual flavor.
:param flavors_controller: means to interact with storage
"""
def __init__(self, flavors_controller):
self._ctrl = flavors_controller
validator_type = jsonschema.Draft4Validator
self._validators = {
'create': validator_type(schema.create),
'pool': validator_type(schema.patch_pool),
'capabilities': validator_type(schema.patch_capabilities),
}
def on_get(self, request, response, project_id, flavor):
"""Returns a JSON object for a single flavor entry:
::
{"pool": "", capabilities: {...}}
:returns: HTTP | [200, 404]
"""
LOG.debug(u'GET flavor - name: %s', flavor)
data = None
detailed = request.get_param_as_bool('detailed') or False
try:
data = self._ctrl.get(flavor,
project=project_id,
detailed=detailed)
except errors.FlavorDoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
data['href'] = request.path
# remove the name entry - it isn't needed on GET
del data['name']
response.body = transport_utils.to_json(data)
def on_put(self, request, response, project_id, flavor):
"""Registers a new flavor. Expects the following input:
::
{"pool": "my-pool", "capabilities": {}}
A capabilities object may also be provided.
:returns: HTTP | [201, 400]
"""
LOG.debug(u'PUT flavor - name: %s', flavor)
data = wsgi_utils.load(request)
wsgi_utils.validate(self._validators['create'], data)
try:
self._ctrl.create(flavor,
pool=data['pool'],
project=project_id,
capabilities=data['capabilities'])
response.status = falcon.HTTP_201
response.location = request.path
except errors.PoolDoesNotExist as ex:
LOG.exception(ex)
description = (_(u'Flavor {flavor} could not be created. '
u'Pool {pool} does not exist') %
dict(flavor=flavor, pool=data['pool']))
raise falcon.HTTPBadRequest(_('Unable to create'), description)
def on_delete(self, request, response, project_id, flavor):
"""Deregisters a flavor.
:returns: HTTP | [204]
"""
LOG.debug(u'DELETE flavor - name: %s', flavor)
self._ctrl.delete(flavor, project=project_id)
response.status = falcon.HTTP_204
def on_patch(self, request, response, project_id, flavor):
"""Allows one to update a flavors's pool and/or capabilities.
This method expects the user to submit a JSON object
containing at least one of: 'pool', 'capabilities'. If
none are found, the request is flagged as bad. There is also
strict format checking through the use of
jsonschema. Appropriate errors are returned in each case for
badly formatted input.
:returns: HTTP | [200, 400]
"""
LOG.debug(u'PATCH flavor - name: %s', flavor)
data = wsgi_utils.load(request)
EXPECT = ('pool', 'capabilities')
if not any([(field in data) for field in EXPECT]):
LOG.debug(u'PATCH flavor, bad params')
raise wsgi_errors.HTTPBadRequestBody(
'One of `pool` or `capabilities` needs '
'to be specified'
)
for field in EXPECT:
wsgi_utils.validate(self._validators[field], data)
fields = common_utils.fields(data, EXPECT,
pred=lambda v: v is not None)
try:
self._ctrl.update(flavor, project=project_id, **fields)
except errors.FlavorDoesNotExist as ex:
LOG.exception(ex)
raise falcon.HTTPNotFound()

View File

@ -0,0 +1,38 @@
# Copyright (c) 2014 Rackspace, Inc.
# Copyright 2014 Catalyst IT Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
from zaqar.i18n import _
from zaqar.openstack.common import log as logging
from zaqar.transport import utils
from zaqar.transport.wsgi import errors as wsgi_errors
LOG = logging.getLogger(__name__)
class Resource(object):
__slots__ = ('_driver',)
def __init__(self, driver):
self._driver = driver
def on_get(self, req, resp, **kwargs):
try:
resp_dict = self._driver.health()
resp.body = utils.to_json(resp_dict)
except Exception as ex:
LOG.exception(ex)
description = _(u'Health status could not be read.')
raise wsgi_errors.HTTPServiceUnavailable(description)

View File

@ -0,0 +1,268 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import json
# NOTE(kgriffs): http://tools.ietf.org/html/draft-nottingham-json-home-03
JSON_HOME = {
'resources': {
# -----------------------------------------------------------------
# Queues
# -----------------------------------------------------------------
'rel/queues': {
'href-template': '/v2/queues{?marker,limit,detailed}',
'href-vars': {
'marker': 'param/marker',
'limit': 'param/queue_limit',
'detailed': 'param/detailed',
},
'hints': {
'allow': ['GET'],
'formats': {
'application/json': {},
},
},
},
'rel/queue': {
'href-template': '/v2/queues/{queue_name}',
'href-vars': {
'queue_name': 'param/queue_name',
},
'hints': {
'allow': ['PUT', 'DELETE'],
'formats': {
'application/json': {},
},
},
},
'rel/queue_stats': {
'href-template': '/v2/queues/{queue_name}/stats',
'href-vars': {
'queue_name': 'param/queue_name',
},
'hints': {
'allow': ['GET'],
'formats': {
'application/json': {},
},
},
},
# -----------------------------------------------------------------
# Messages
# -----------------------------------------------------------------
'rel/messages': {
'href-template': ('/v2/queues/{queue_name}/messages'
'{?marker,limit,echo,include_claimed}'),
'href-vars': {
'queue_name': 'param/queue_name',
'marker': 'param/marker',
'limit': 'param/messages_limit',
'echo': 'param/echo',
'include_claimed': 'param/include_claimed',
},
'hints': {
'allow': ['GET'],
'formats': {
'application/json': {},
},
},
},
'rel/post_messages': {
'href-template': '/v2/queues/{queue_name}/messages',
'href-vars': {
'queue_name': 'param/queue_name',
},
'hints': {
'allow': ['POST'],
'formats': {
'application/json': {},
},
'accept-post': ['application/json'],
},
},
'rel/messages_delete': {
'href-template': '/v2/queues/{queue_name}/messages{?ids,pop}',
'href-vars': {
'queue_name': 'param/queue_name',
'ids': 'param/ids',
'pop': 'param/pop'
},
'hints': {
'allow': [
'DELETE'
],
'formats': {
'application/json': {}
}
}
},
'rel/message_delete': {
'href-template': '/v2/queues/{queue_name}/messages/{message_id}{?claim}', # noqa
'href-vars': {
'queue_name': 'param/queue_name',
'message_id': 'param/message_id',
'claim': 'param/claim_id'
},
'hints': {
'allow': [
'DELETE'
],
'formats': {
'application/json': {}
}
}
},
# -----------------------------------------------------------------
# Claims
# -----------------------------------------------------------------
'rel/claim': {
'href-template': '/v2/queues/{queue_name}/claims/{claim_id}',
'href-vars': {
'queue_name': 'param/queue_name',
'claim_id': 'param/claim_id',
},
'hints': {
'allow': ['GET'],
'formats': {
'application/json': {},
},
},
},
'rel/post_claim': {
'href-template': '/v2/queues/{queue_name}/claims{?limit}',
'href-vars': {
'queue_name': 'param/queue_name',
'limit': 'param/claim_limit',
},
'hints': {
'allow': ['POST'],
'formats': {
'application/json': {},
},
'accept-post': ['application/json']
},
},
'rel/patch_claim': {
'href-template': '/v2/queues/{queue_name}/claims/{claim_id}',
'href-vars': {
'queue_name': 'param/queue_name',
'claim_id': 'param/claim_id',
},
'hints': {
'allow': ['PATCH'],
'formats': {
'application/json': {},
},
'accept-post': ['application/json']
},
},
'rel/delete_claim': {
'href-template': '/v2/queues/{queue_name}/claims/{claim_id}',
'href-vars': {
'queue_name': 'param/queue_name',
'claim_id': 'param/claim_id',
},
'hints': {
'allow': ['DELETE'],
'formats': {
'application/json': {},
},
},
},
}
}
ADMIN_RESOURCES = {
# -----------------------------------------------------------------
# Pools
# -----------------------------------------------------------------
'rel/pools': {
'href-template': '/v2/pools{?detailed,limit,marker}',
'href-vars': {
'detailed': 'param/detailed',
'limit': 'param/pool_limit',
'marker': 'param/marker',
},
'hints': {
'allow': ['GET'],
'formats': {
'application/json': {},
},
},
},
'rel/pool': {
'href-template': '/v2/pools/{pool_name}',
'href-vars': {
'pool_name': 'param/pool_name',
},
'hints': {
'allow': ['GET', 'PUT', 'PATCH', 'DELETE'],
'formats': {
'application/json': {},
},
},
},
# -----------------------------------------------------------------
# Flavors
# -----------------------------------------------------------------
'rel/flavors': {
'href-template': '/v2/flavors{?detailed,limit,marker}',
'href-vars': {
'detailed': 'param/detailed',
'limit': 'param/flavor_limit',
'marker': 'param/marker',
},
'hints': {
'allow': ['GET'],
'formats': {
'application/json': {},
},
},
},
'rel/flavor': {
'href-template': '/v2/flavors/{flavor_name}',
'href-vars': {
'flavor_name': 'param/flavor_name',
},
'hints': {
'allow': ['GET', 'PUT', 'PATCH', 'DELETE'],
'formats': {
'application/json': {},
},
},
},
}
class Resource(object):
def __init__(self, conf):
if conf.admin_mode:
JSON_HOME['resources'].update(ADMIN_RESOURCES)
document = json.dumps(JSON_HOME, ensure_ascii=False, indent=4)
self.document_utf8 = document.encode('utf-8')
def on_get(self, req, resp, project_id):
resp.data = self.document_utf8
resp.content_type = 'application/json-home'
resp.cache_control = ['max-age=86400']
# status defaults to 200

View File

@ -0,0 +1,376 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import six
from zaqar.common.transport.wsgi import helpers as wsgi_helpers
from zaqar.i18n import _
import zaqar.openstack.common.log as logging
from zaqar.storage import errors as storage_errors
from zaqar.transport import utils
from zaqar.transport import validation
from zaqar.transport.wsgi import errors as wsgi_errors
from zaqar.transport.wsgi import utils as wsgi_utils
LOG = logging.getLogger(__name__)
class CollectionResource(object):
__slots__ = (
'_message_controller',
'_queue_controller',
'_wsgi_conf',
'_validate',
'_message_post_spec',
)
def __init__(self, wsgi_conf, validate,
message_controller, queue_controller,
default_message_ttl):
self._wsgi_conf = wsgi_conf
self._validate = validate
self._message_controller = message_controller
self._queue_controller = queue_controller
self._message_post_spec = (
('ttl', int, default_message_ttl),
('body', '*', None),
)
# ----------------------------------------------------------------------
# Helpers
# ----------------------------------------------------------------------
def _get_by_id(self, base_path, project_id, queue_name, ids):
"""Returns one or more messages from the queue by ID."""
try:
self._validate.message_listing(limit=len(ids))
messages = self._message_controller.bulk_get(
queue_name,
message_ids=ids,
project=project_id)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except Exception as ex:
LOG.exception(ex)
description = _(u'Message could not be retrieved.')
raise wsgi_errors.HTTPServiceUnavailable(description)
# Prepare response
messages = list(messages)
if not messages:
return None
messages = [wsgi_utils.format_message_v1_1(m, base_path, m['claim_id'])
for m in messages]
return {'messages': messages}
def _get(self, req, project_id, queue_name):
client_uuid = wsgi_helpers.get_client_uuid(req)
kwargs = {}
# NOTE(kgriffs): This syntax ensures that
# we don't clobber default values with None.
req.get_param('marker', store=kwargs)
req.get_param_as_int('limit', store=kwargs)
req.get_param_as_bool('echo', store=kwargs)
req.get_param_as_bool('include_claimed', store=kwargs)
try:
self._validate.message_listing(**kwargs)
results = self._message_controller.list(
queue_name,
project=project_id,
client_uuid=client_uuid,
**kwargs)
# Buffer messages
cursor = next(results)
messages = list(cursor)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
except Exception as ex:
LOG.exception(ex)
description = _(u'Messages could not be listed.')
raise wsgi_errors.HTTPServiceUnavailable(description)
if not messages:
messages = []
else:
# Found some messages, so prepare the response
kwargs['marker'] = next(results)
base_path = req.path.rsplit('/', 1)[0]
messages = [wsgi_utils.format_message_v1_1(m, base_path,
m['claim_id'])
for m in messages]
return {
'messages': messages,
'links': [
{
'rel': 'next',
'href': req.path + falcon.to_query_str(kwargs)
}
]
}
# ----------------------------------------------------------------------
# Interface
# ----------------------------------------------------------------------
def on_post(self, req, resp, project_id, queue_name):
LOG.debug(u'Messages collection POST - queue: %(queue)s, '
u'project: %(project)s',
{'queue': queue_name, 'project': project_id})
client_uuid = wsgi_helpers.get_client_uuid(req)
try:
# Place JSON size restriction before parsing
self._validate.message_length(req.content_length)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
# Deserialize and validate the incoming messages
document = wsgi_utils.deserialize(req.stream, req.content_length)
if 'messages' not in document:
description = _(u'No messages were found in the request body.')
raise wsgi_errors.HTTPBadRequestAPI(description)
messages = wsgi_utils.sanitize(document['messages'],
self._message_post_spec,
doctype=wsgi_utils.JSONArray)
try:
self._validate.message_posting(messages)
if not self._queue_controller.exists(queue_name, project_id):
self._queue_controller.create(queue_name, project=project_id)
message_ids = self._message_controller.post(
queue_name,
messages=messages,
project=project_id,
client_uuid=client_uuid)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
except storage_errors.MessageConflict as ex:
LOG.exception(ex)
description = _(u'No messages could be enqueued.')
raise wsgi_errors.HTTPServiceUnavailable(description)
except Exception as ex:
LOG.exception(ex)
description = _(u'Messages could not be enqueued.')
raise wsgi_errors.HTTPServiceUnavailable(description)
# Prepare the response
ids_value = ','.join(message_ids)
resp.location = req.path + '?ids=' + ids_value
hrefs = [req.path + '/' + id for id in message_ids]
body = {'resources': hrefs}
resp.body = utils.to_json(body)
resp.status = falcon.HTTP_201
def on_get(self, req, resp, project_id, queue_name):
LOG.debug(u'Messages collection GET - queue: %(queue)s, '
u'project: %(project)s',
{'queue': queue_name, 'project': project_id})
ids = req.get_param_as_list('ids')
if ids is None:
response = self._get(req, project_id, queue_name)
else:
response = self._get_by_id(req.path.rsplit('/', 1)[0], project_id,
queue_name, ids)
if response is None:
# NOTE(TheSriram): Trying to get a message by id, should
# return the message if its present, otherwise a 404 since
# the message might have been deleted.
resp.status = falcon.HTTP_404
else:
resp.body = utils.to_json(response)
# status defaults to 200
def on_delete(self, req, resp, project_id, queue_name):
LOG.debug(u'Messages collection DELETE - queue: %(queue)s, '
u'project: %(project)s',
{'queue': queue_name, 'project': project_id})
ids = req.get_param_as_list('ids')
pop_limit = req.get_param_as_int('pop')
try:
self._validate.message_deletion(ids, pop_limit)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
if ids:
resp.status = self._delete_messages_by_id(queue_name, ids,
project_id)
elif pop_limit:
resp.status, resp.body = self._pop_messages(queue_name,
project_id,
pop_limit)
def _delete_messages_by_id(self, queue_name, ids, project_id):
try:
self._message_controller.bulk_delete(
queue_name,
message_ids=ids,
project=project_id)
except Exception as ex:
LOG.exception(ex)
description = _(u'Messages could not be deleted.')
raise wsgi_errors.HTTPServiceUnavailable(description)
return falcon.HTTP_204
def _pop_messages(self, queue_name, project_id, pop_limit):
try:
LOG.debug(u'POP messages - queue: %(queue)s, '
u'project: %(project)s',
{'queue': queue_name, 'project': project_id})
messages = self._message_controller.pop(
queue_name,
project=project_id,
limit=pop_limit)
except Exception as ex:
LOG.exception(ex)
description = _(u'Messages could not be popped.')
raise wsgi_errors.HTTPServiceUnavailable(description)
# Prepare response
if not messages:
messages = []
body = {'messages': messages}
body = utils.to_json(body)
return falcon.HTTP_200, body
class ItemResource(object):
__slots__ = ('_message_controller')
def __init__(self, message_controller):
self._message_controller = message_controller
def on_get(self, req, resp, project_id, queue_name, message_id):
LOG.debug(u'Messages item GET - message: %(message)s, '
u'queue: %(queue)s, project: %(project)s',
{'message': message_id,
'queue': queue_name,
'project': project_id})
try:
message = self._message_controller.get(
queue_name,
message_id,
project=project_id)
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
except Exception as ex:
LOG.exception(ex)
description = _(u'Message could not be retrieved.')
raise wsgi_errors.HTTPServiceUnavailable(description)
# Prepare response
message['href'] = req.path
message = wsgi_utils.format_message_v1_1(message,
req.path.rsplit('/', 2)[0],
message['claim_id'])
resp.body = utils.to_json(message)
# status defaults to 200
def on_delete(self, req, resp, project_id, queue_name, message_id):
LOG.debug(u'Messages item DELETE - message: %(message)s, '
u'queue: %(queue)s, project: %(project)s',
{'message': message_id,
'queue': queue_name,
'project': project_id})
error_title = _(u'Unable to delete')
try:
self._message_controller.delete(
queue_name,
message_id=message_id,
project=project_id,
claim=req.get_param('claim_id'))
except storage_errors.MessageNotClaimed as ex:
LOG.debug(ex)
description = _(u'A claim was specified, but the message '
u'is not currently claimed.')
raise falcon.HTTPBadRequest(error_title, description)
except storage_errors.ClaimDoesNotExist as ex:
LOG.debug(ex)
description = _(u'The specified claim does not exist or '
u'has expired.')
raise falcon.HTTPBadRequest(error_title, description)
except storage_errors.NotPermitted as ex:
LOG.debug(ex)
description = _(u'This message is claimed; it cannot be '
u'deleted without a valid claim ID.')
raise falcon.HTTPForbidden(error_title, description)
except Exception as ex:
LOG.exception(ex)
description = _(u'Message could not be deleted.')
raise wsgi_errors.HTTPServiceUnavailable(description)
# Alles guete
resp.status = falcon.HTTP_204

View File

@ -0,0 +1,30 @@
# Copyright 2014 IBM Corp. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import falcon
class Resource(object):
__slots__ = ('_driver',)
def __init__(self, driver):
self._driver = driver
def on_get(self, req, resp, **kwargs):
resp.status = (falcon.HTTP_204 if self._driver.is_alive()
else falcon.HTTP_503)
def on_head(self, req, resp, **kwargs):
resp.status = falcon.HTTP_204

View File

@ -0,0 +1,245 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""pools: a resource to handle storage pool management
A pool is added by an operator by interacting with the
pooling-related endpoints. When specifying a pool, the
following fields are required:
::
{
"name": string,
"weight": integer,
"uri": string::uri
}
Furthermore, depending on the underlying storage type of pool being
registered, there is an optional field:
::
{
"options": {...}
}
"""
import falcon
import jsonschema
from zaqar.common.api.schemas import pools as schema
from zaqar.common import utils as common_utils
from zaqar.i18n import _
from zaqar.openstack.common import log
from zaqar.storage import errors
from zaqar.storage import utils as storage_utils
from zaqar.transport import utils as transport_utils
from zaqar.transport.wsgi import errors as wsgi_errors
from zaqar.transport.wsgi import utils as wsgi_utils
LOG = log.getLogger(__name__)
class Listing(object):
"""A resource to list registered pools
:param pools_controller: means to interact with storage
"""
def __init__(self, pools_controller):
self._ctrl = pools_controller
def on_get(self, request, response, project_id):
"""Returns a pool listing as objects embedded in an object:
::
{
"pools": [
{"href": "", "weight": 100, "uri": ""},
...
],
"links": [
{"href": "", "rel": "next"}
]
}
:returns: HTTP | 200
"""
LOG.debug(u'LIST pools')
store = {}
request.get_param('marker', store=store)
request.get_param_as_int('limit', store=store)
request.get_param_as_bool('detailed', store=store)
cursor = self._ctrl.list(**store)
pools = list(next(cursor))
results = {}
if pools:
store['marker'] = next(cursor)
for entry in pools:
entry['href'] = request.path + '/' + entry.pop('name')
results['links'] = [
{
'rel': 'next',
'href': request.path + falcon.to_query_str(store)
}
]
results['pools'] = pools
response.content_location = request.relative_uri
response.body = transport_utils.to_json(results)
response.status = falcon.HTTP_200
class Resource(object):
"""A handler for individual pool.
:param pools_controller: means to interact with storage
"""
def __init__(self, pools_controller):
self._ctrl = pools_controller
validator_type = jsonschema.Draft4Validator
self._validators = {
'weight': validator_type(schema.patch_weight),
'uri': validator_type(schema.patch_uri),
'group': validator_type(schema.patch_uri),
'options': validator_type(schema.patch_options),
'create': validator_type(schema.create)
}
def on_get(self, request, response, project_id, pool):
"""Returns a JSON object for a single pool entry:
::
{"weight": 100, "uri": "", options: {...}}
:returns: HTTP | [200, 404]
"""
LOG.debug(u'GET pool - name: %s', pool)
data = None
detailed = request.get_param_as_bool('detailed') or False
try:
data = self._ctrl.get(pool, detailed)
except errors.PoolDoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
data['href'] = request.path
# remove the name entry - it isn't needed on GET
del data['name']
response.body = transport_utils.to_json(data)
def on_put(self, request, response, project_id, pool):
"""Registers a new pool. Expects the following input:
::
{"weight": 100, "uri": ""}
An options object may also be provided.
:returns: HTTP | [201, 204]
"""
LOG.debug(u'PUT pool - name: %s', pool)
conf = self._ctrl.driver.conf
data = wsgi_utils.load(request)
wsgi_utils.validate(self._validators['create'], data)
if not storage_utils.can_connect(data['uri'], conf=conf):
raise wsgi_errors.HTTPBadRequestBody(
'cannot connect to %s' % data['uri']
)
self._ctrl.create(pool, weight=data['weight'],
uri=data['uri'],
group=data.get('group'),
options=data.get('options', {}))
response.status = falcon.HTTP_201
response.location = request.path
def on_delete(self, request, response, project_id, pool):
"""Deregisters a pool.
:returns: HTTP | [204, 403]
"""
LOG.debug(u'DELETE pool - name: %s', pool)
try:
self._ctrl.delete(pool)
except errors.PoolInUseByFlavor as ex:
LOG.exception(ex)
title = _(u'Unable to delete')
description = _(u'This pool is used by flavors {flavor}; '
u'It cannot be deleted.')
description = description.format(flavor=ex.flavor)
raise falcon.HTTPForbidden(title, description)
response.status = falcon.HTTP_204
def on_patch(self, request, response, project_id, pool):
"""Allows one to update a pool's weight, uri, and/or options.
This method expects the user to submit a JSON object
containing at least one of: 'uri', 'weight', 'group', 'options'. If
none are found, the request is flagged as bad. There is also
strict format checking through the use of
jsonschema. Appropriate errors are returned in each case for
badly formatted input.
:returns: HTTP | 200,400
"""
LOG.debug(u'PATCH pool - name: %s', pool)
data = wsgi_utils.load(request)
EXPECT = ('weight', 'uri', 'group', 'options')
if not any([(field in data) for field in EXPECT]):
LOG.debug(u'PATCH pool, bad params')
raise wsgi_errors.HTTPBadRequestBody(
'One of `uri`, `weight`, `group`, or `options` needs '
'to be specified'
)
for field in EXPECT:
wsgi_utils.validate(self._validators[field], data)
conf = self._ctrl.driver.conf
if 'uri' in data and not storage_utils.can_connect(data['uri'],
conf=conf):
raise wsgi_errors.HTTPBadRequestBody(
'cannot connect to %s' % data['uri']
)
fields = common_utils.fields(data, EXPECT,
pred=lambda v: v is not None)
try:
self._ctrl.update(pool, **fields)
except errors.PoolDoesNotExist as ex:
LOG.exception(ex)
raise falcon.HTTPNotFound()

View File

@ -0,0 +1,163 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import six
from zaqar.i18n import _
import zaqar.openstack.common.log as logging
from zaqar.storage import errors as storage_errors
from zaqar.transport import utils
from zaqar.transport import validation
from zaqar.transport.wsgi import errors as wsgi_errors
from zaqar.transport.wsgi import utils as wsgi_utils
LOG = logging.getLogger(__name__)
class ItemResource(object):
__slots__ = ('_validate', '_queue_controller', '_message_controller')
def __init__(self, validate, queue_controller, message_controller):
self._validate = validate
self._queue_controller = queue_controller
self._message_controller = message_controller
def on_get(self, req, resp, project_id, queue_name):
LOG.debug(u'Queue metadata GET - queue: %(queue)s, '
u'project: %(project)s',
{'queue': queue_name, 'project': project_id})
try:
resp_dict = self._queue_controller.get(queue_name,
project=project_id)
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
except Exception as ex:
LOG.exception(ex)
description = _(u'Queue metadata could not be retrieved.')
raise wsgi_errors.HTTPServiceUnavailable(description)
resp.body = utils.to_json(resp_dict)
# status defaults to 200
def on_put(self, req, resp, project_id, queue_name):
LOG.debug(u'Queue item PUT - queue: %(queue)s, '
u'project: %(project)s',
{'queue': queue_name, 'project': project_id})
try:
# Place JSON size restriction before parsing
self._validate.queue_metadata_length(req.content_length)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
# Deserialize queue metadata
metadata = None
if req.content_length:
document = wsgi_utils.deserialize(req.stream, req.content_length)
metadata = wsgi_utils.sanitize(document, spec=None)
try:
created = self._queue_controller.create(queue_name,
metadata=metadata,
project=project_id)
except storage_errors.FlavorDoesNotExist as ex:
LOG.exception(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except Exception as ex:
LOG.exception(ex)
description = _(u'Queue could not be created.')
raise wsgi_errors.HTTPServiceUnavailable(description)
resp.status = falcon.HTTP_201 if created else falcon.HTTP_204
resp.location = req.path
def on_delete(self, req, resp, project_id, queue_name):
LOG.debug(u'Queue item DELETE - queue: %(queue)s, '
u'project: %(project)s',
{'queue': queue_name, 'project': project_id})
try:
self._queue_controller.delete(queue_name, project=project_id)
except Exception as ex:
LOG.exception(ex)
description = _(u'Queue could not be deleted.')
raise wsgi_errors.HTTPServiceUnavailable(description)
resp.status = falcon.HTTP_204
class CollectionResource(object):
__slots__ = ('_queue_controller', '_validate')
def __init__(self, validate, queue_controller):
self._queue_controller = queue_controller
self._validate = validate
def on_get(self, req, resp, project_id):
LOG.debug(u'Queue collection GET - project: %(project)s',
{'project': project_id})
kwargs = {}
# NOTE(kgriffs): This syntax ensures that
# we don't clobber default values with None.
req.get_param('marker', store=kwargs)
req.get_param_as_int('limit', store=kwargs)
req.get_param_as_bool('detailed', store=kwargs)
try:
self._validate.queue_listing(**kwargs)
results = self._queue_controller.list(project=project_id, **kwargs)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except Exception as ex:
LOG.exception(ex)
description = _(u'Queues could not be listed.')
raise wsgi_errors.HTTPServiceUnavailable(description)
# Buffer list of queues
queues = list(next(results))
# Got some. Prepare the response.
kwargs['marker'] = next(results) or kwargs.get('marker', '')
for each_queue in queues:
each_queue['href'] = req.path + '/' + each_queue['name']
response_body = {
'queues': queues,
'links': [
{
'rel': 'next',
'href': req.path + falcon.to_query_str(kwargs)
}
]
}
resp.body = utils.to_json(response_body)
# status defaults to 200

View File

@ -0,0 +1,73 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
from zaqar.i18n import _
import zaqar.openstack.common.log as logging
from zaqar.storage import errors as storage_errors
from zaqar.transport import utils
from zaqar.transport.wsgi import errors as wsgi_errors
LOG = logging.getLogger(__name__)
class Resource(object):
__slots__ = ('_queue_ctrl')
def __init__(self, queue_controller):
self._queue_ctrl = queue_controller
def on_get(self, req, resp, project_id, queue_name):
try:
resp_dict = self._queue_ctrl.stats(queue_name,
project=project_id)
message_stats = resp_dict['messages']
if message_stats['total'] != 0:
base_path = req.path[:req.path.rindex('/')] + '/messages/'
newest = message_stats['newest']
newest['href'] = base_path + newest['id']
del newest['id']
oldest = message_stats['oldest']
oldest['href'] = base_path + oldest['id']
del oldest['id']
resp.body = utils.to_json(resp_dict)
# status defaults to 200
except storage_errors.QueueDoesNotExist as ex:
resp_dict = {
'messages': {
'claimed': 0,
'free': 0,
'total': 0
}
}
resp.body = utils.to_json(resp_dict)
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
except Exception as ex:
LOG.exception(ex)
description = _(u'Queue stats could not be read.')
raise wsgi_errors.HTTPServiceUnavailable(description)

View File

@ -17,11 +17,13 @@ import falcon
from zaqar.transport import utils
from zaqar.transport.wsgi import v1_0
from zaqar.transport.wsgi import v1_1
from zaqar.transport.wsgi import v2_0
VERSIONS = {
'versions': [
v1_0.VERSION,
v1_1.VERSION
v1_1.VERSION,
v2_0.VERSION
]
}