From ddb24ff5b10603c5f9497a5195cf393e1a7ba1d6 Mon Sep 17 00:00:00 2001 From: Flaper Fesp Date: Tue, 7 May 2013 01:08:33 +0200 Subject: [PATCH] Rename tenant into project Keystone will replace tenant with project during Havana. Some of this work already started and is moving forward. We can go ahead and replace it now before the tree gets bigger. Change-Id: I1d0494112f6a65cc4ee5390eee782e24790ca5b7 --- marconi/storage/base.py | 56 +++++------ marconi/storage/exceptions.py | 18 ++-- marconi/storage/mongodb/controllers.py | 100 ++++++++++---------- marconi/storage/sqlite/controllers.py | 96 ++++++++++--------- marconi/tests/storage/base.py | 105 +++++++++++---------- marconi/tests/storage/test_base.py | 24 ++--- marconi/tests/storage/test_impl_mongodb.py | 10 +- marconi/tests/util/faulty_storage.py | 20 ++-- marconi/transport/wsgi/claims.py | 16 ++-- marconi/transport/wsgi/driver.py | 14 +-- marconi/transport/wsgi/messages.py | 16 ++-- marconi/transport/wsgi/queues.py | 16 ++-- marconi/transport/wsgi/stats.py | 4 +- 13 files changed, 249 insertions(+), 246 deletions(-) diff --git a/marconi/storage/base.py b/marconi/storage/base.py index 81063ac84..258c189f4 100644 --- a/marconi/storage/base.py +++ b/marconi/storage/base.py @@ -69,12 +69,12 @@ class QueueBase(ControllerBase): __metaclass__ = abc.ABCMeta @abc.abstractmethod - def list(self, tenant=None, marker=None, + def list(self, project=None, marker=None, limit=10, detailed=False): """ Base method for listing queues. - :param tenant: Tenant id + :param project: Project id :param marker: The last queue name :param limit: (Default 10) Max number :param detailed: Whether metadata is included @@ -85,12 +85,12 @@ class QueueBase(ControllerBase): raise NotImplementedError @abc.abstractmethod - def get(self, name, tenant=None): + def get(self, name, project=None): """ Base method for queue retrieval. :param name: The queue name - :param tenant: Tenant id + :param project: Project id :returns: Dictionary containing queue metadata :raises: DoesNotExist @@ -98,14 +98,14 @@ class QueueBase(ControllerBase): raise NotImplementedError @abc.abstractmethod - def upsert(self, name, metadata, tenant=None): + def upsert(self, name, metadata, project=None): """ This methods handles both creates and updates operations for queues. :param name: The queue name :param metadata: Arbitrary metadata - :param tenant: Tenant id + :param project: Project id :returns: True if a queue was created and False if it was updated. """ @@ -113,34 +113,34 @@ class QueueBase(ControllerBase): assert isinstance(metadata, dict), msg @abc.abstractmethod - def delete(self, name, tenant=None): + def delete(self, name, project=None): """ Base method for queue deletion. :param name: The queue name - :param tenant: Tenant id + :param project: Project id """ raise NotImplementedError @abc.abstractmethod - def stats(self, name, tenant=None): + def stats(self, name, project=None): """ Base method for queue stats. :param name: The queue name - :param tenant: Tenant id + :param project: Project id :returns: Dictionary with the queue stats """ raise NotImplementedError @abc.abstractmethod - def actions(self, name, tenant=None, marker=None, limit=10): + def actions(self, name, project=None, marker=None, limit=10): """ Base method for queue actions. :param name: Queue name - :param tenant: Tenant id + :param project: Project id :param marker: Tail identifier :param limit: (Default 10) Max number of messages to retrieve. @@ -157,14 +157,14 @@ class MessageBase(ControllerBase): __metaclass__ = abc.ABCMeta @abc.abstractmethod - def list(self, queue, tenant=None, marker=None, + def list(self, queue, project=None, marker=None, limit=10, echo=False, client_uuid=None): """ Base message list method :param queue: Name of the queue to get the message from. - :param tenant: Tenant id + :param project: Project id :param marker: Tail identifier :param limit: (Default 10) specifies up to 100 messages to return. @@ -178,13 +178,13 @@ class MessageBase(ControllerBase): """ raise NotImplementedError - def get(self, queue, message_id, tenant=None): + def get(self, queue, message_id, project=None): """ Base message get method :param queue: Name of the queue to get the message from. - :param tenant: Tenant id + :param project: Project id :param message_id: Message ID :returns: Dictionary containing message data @@ -193,7 +193,7 @@ class MessageBase(ControllerBase): raise NotImplementedError @abc.abstractmethod - def post(self, queue, messages, client_uuid, tenant=None): + def post(self, queue, messages, client_uuid, project=None): """ Base message post method @@ -205,21 +205,21 @@ class MessageBase(ControllerBase): :param messages: Messages to post to queue, it can be a list of 1 or more elements. :param client_uuid: Client's unique identifier. - :param tenant: Tenant id + :param project: Project id :returns: List of message ids """ raise NotImplementedError @abc.abstractmethod - def delete(self, queue, message_id, tenant=None, claim=None): + def delete(self, queue, message_id, project=None, claim=None): """ Base message delete method :param queue: Name of the queue to post message to. :param message_id: Message to be deleted - :param tenant: Tenant id + :param project: Project id :param claim: Claim this message belongs to. When specified, claim must be valid and message_id must belong to @@ -233,14 +233,14 @@ class ClaimBase(ControllerBase): __metaclass__ = abc.ABCMeta @abc.abstractmethod - def get(self, queue, claim_id, tenant=None): + def get(self, queue, claim_id, project=None): """ Base claim get method :param queue: Name of the queue this claim belongs to. :param claim_id: The claim id - :param tenant: Tenant id + :param project: Project id :returns: (Claim's metadata, claimed messages) :raises: DoesNotExist @@ -248,7 +248,7 @@ class ClaimBase(ControllerBase): raise NotImplementedError @abc.abstractmethod - def create(self, queue, metadata, tenant=None, limit=10): + def create(self, queue, metadata, project=None, limit=10): """ Base claim create method @@ -256,7 +256,7 @@ class ClaimBase(ControllerBase): claim belongs to. :param metadata: Claim's parameters to be stored. - :param tenant: Tenant id + :param project: Project id :param limit: (Default 10) Max number of messages to claim. @@ -265,7 +265,7 @@ class ClaimBase(ControllerBase): raise NotImplementedError @abc.abstractmethod - def update(self, queue, claim_id, metadata, tenant=None): + def update(self, queue, claim_id, metadata, project=None): """ Base claim update method @@ -274,18 +274,18 @@ class ClaimBase(ControllerBase): :param claim_id: Claim to be updated :param metadata: Claim's parameters to be updated. - :param tenant: Tenant id + :param project: Project id """ raise NotImplementedError @abc.abstractmethod - def delete(self, queue, claim_id, tenant=None): + def delete(self, queue, claim_id, project=None): """ Base claim delete method :param queue: Name of the queue this claim belongs to. :param claim_id: Claim to be deleted - :param tenant: Tenant id + :param project: Project id """ raise NotImplementedError diff --git a/marconi/storage/exceptions.py b/marconi/storage/exceptions.py index 89ccaf6a8..2b247e284 100644 --- a/marconi/storage/exceptions.py +++ b/marconi/storage/exceptions.py @@ -24,27 +24,27 @@ class NotPermitted(Exception): class QueueDoesNotExist(DoesNotExist): - def __init__(self, name, tenant): - msg = (_("Queue %(name)s does not exist for tenant %(tenant)s") % - dict(name=name, tenant=tenant)) + def __init__(self, name, project): + msg = (_("Queue %(name)s does not exist for project %(project)s") % + dict(name=name, project=project)) super(QueueDoesNotExist, self).__init__(msg) class MessageDoesNotExist(DoesNotExist): - def __init__(self, mid, queue, tenant): + def __init__(self, mid, queue, project): msg = (_("Message %(mid)s does not exist in " - "queue %(queue)s of tenant %(tenant)s") % - dict(mid=mid, queue=queue, tenant=tenant)) + "queue %(queue)s of project %(project)s") % + dict(mid=mid, queue=queue, project=project)) super(MessageDoesNotExist, self).__init__(msg) class ClaimDoesNotExist(DoesNotExist): - def __init__(self, cid, queue, tenant): + def __init__(self, cid, queue, project): msg = (_("Claim %(cid)s does not exist in " - "queue %(queue)s of tenant %(tenant)s") % - dict(cid=cid, queue=queue, tenant=tenant)) + "queue %(queue)s of project %(project)s") % + dict(cid=cid, queue=queue, project=project)) super(ClaimDoesNotExist, self).__init__(msg) diff --git a/marconi/storage/mongodb/controllers.py b/marconi/storage/mongodb/controllers.py index b5aab1c33..d0d7d8ea2 100644 --- a/marconi/storage/mongodb/controllers.py +++ b/marconi/storage/mongodb/controllers.py @@ -38,7 +38,7 @@ class QueueController(storage.QueueBase): Queues: Name Field ---------------- - tenant -> t + project -> p metadata -> m name -> n @@ -49,15 +49,15 @@ class QueueController(storage.QueueBase): self._col = self.driver.db["queues"] # NOTE(flaper87): This creates a unique compound index for - # tenant and name. Using tenant as the first field of the - # index allows for querying by tenant and tenant+name. + # project and name. Using project as the first field of the + # index allows for querying by project and project+name. # This is also useful for retrieving the queues list for - # as specific tenant, for example. Order Matters! - self._col.ensure_index([("t", 1), ("n", 1)], unique=True) + # as specific project, for example. Order Matters! + self._col.ensure_index([("p", 1), ("n", 1)], unique=True) - def list(self, tenant=None, marker=None, + def list(self, project=None, marker=None, limit=10, detailed=False): - query = {"t": tenant} + query = {"p": project} if marker: query["n"] = {"$gt": marker} @@ -80,29 +80,29 @@ class QueueController(storage.QueueBase): yield normalizer(cursor) yield marker_name["next"] - def _get(self, name, tenant=None, fields={"m": 1, "_id": 0}): - queue = self._col.find_one({"t": tenant, "n": name}, fields=fields) + def _get(self, name, project=None, fields={"m": 1, "_id": 0}): + queue = self._col.find_one({"p": project, "n": name}, fields=fields) if queue is None: - raise exceptions.QueueDoesNotExist(name, tenant) + raise exceptions.QueueDoesNotExist(name, project) return queue - def get_id(self, name, tenant=None): + def get_id(self, name, project=None): """ Just like `get` method but returns the queue's id :returns: Queue's `ObjectId` """ - queue = self._get(name, tenant, fields=["_id"]) + queue = self._get(name, project, fields=["_id"]) return queue.get("_id") - def get(self, name, tenant=None): - queue = self._get(name, tenant) + def get(self, name, project=None): + queue = self._get(name, project) return queue.get("m", {}) - def upsert(self, name, metadata, tenant=None): - super(QueueController, self).upsert(name, metadata, tenant) + def upsert(self, name, metadata, project=None): + super(QueueController, self).upsert(name, metadata, project) - rst = self._col.update({"t": tenant, "n": name}, + rst = self._col.update({"p": project, "n": name}, {"$set": {"m": metadata}}, multi=False, upsert=True, @@ -110,12 +110,12 @@ class QueueController(storage.QueueBase): return not rst["updatedExisting"] - def delete(self, name, tenant=None): - self.driver.message_controller.purge_queue(name, tenant) - self._col.remove({"t": tenant, "n": name}) + def delete(self, name, project=None): + self.driver.message_controller.purge_queue(name, project) + self._col.remove({"p": project, "n": name}) - def stats(self, name, tenant=None): - qid = self.get_id(name, tenant) + def stats(self, name, project=None): + qid = self.get_id(name, project) msg_ctrl = self.driver.message_controller active = msg_ctrl.active(qid) claimed = msg_ctrl.claimed(qid) @@ -128,7 +128,7 @@ class QueueController(storage.QueueBase): } } - def actions(self, name, tenant=None, marker=None, limit=10): + def actions(self, name, project=None, marker=None, limit=10): raise NotImplementedError @@ -176,9 +176,9 @@ class MessageController(storage.MessageBase): ("c.e", 1), ("_id", -1)], background=True) - def _get_queue_id(self, queue, tenant): + def _get_queue_id(self, queue, project): queue_controller = self.driver.queue_controller - return queue_controller.get_id(queue, tenant) + return queue_controller.get_id(queue, project) def all(self): return self._col.find() @@ -247,11 +247,11 @@ class MessageController(storage.MessageBase): {"$set": {"c": {"id": None, "e": 0}}}, upsert=False, multi=True) - def list(self, queue, tenant=None, marker=None, + def list(self, queue, project=None, marker=None, limit=10, echo=False, client_uuid=None): try: - qid = self._get_queue_id(queue, tenant) + qid = self._get_queue_id(queue, project) messages = self.active(qid, marker, echo, client_uuid) except ValueError: return @@ -276,18 +276,18 @@ class MessageController(storage.MessageBase): yield utils.HookedCursor(messages, denormalizer) yield str(marker_id['next']) - def get(self, queue, message_id, tenant=None): + def get(self, queue, message_id, project=None): # Base query, always check expire time try: mid = utils.to_oid(message_id) except ValueError: - raise exceptions.MessageDoesNotExist(message_id, queue, tenant) + raise exceptions.MessageDoesNotExist(message_id, queue, project) now = timeutils.utcnow() query = { - "q": self._get_queue_id(queue, tenant), + "q": self._get_queue_id(queue, project), "e": {"$gt": now}, "_id": mid } @@ -295,7 +295,7 @@ class MessageController(storage.MessageBase): message = self._col.find_one(query) if message is None: - raise exceptions.MessageDoesNotExist(message_id, queue, tenant) + raise exceptions.MessageDoesNotExist(message_id, queue, project) oid = message["_id"] age = now - utils.oid_utc(oid) @@ -307,8 +307,8 @@ class MessageController(storage.MessageBase): "body": message["b"], } - def post(self, queue, messages, client_uuid, tenant=None): - qid = self._get_queue_id(queue, tenant) + def post(self, queue, messages, client_uuid, project=None): + qid = self._get_queue_id(queue, project) now = timeutils.utcnow() @@ -329,7 +329,7 @@ class MessageController(storage.MessageBase): ids = self._col.insert(denormalizer(messages)) return map(str, ids) - def delete(self, queue, message_id, tenant=None, claim=None): + def delete(self, queue, message_id, project=None, claim=None): try: try: mid = utils.to_oid(message_id) @@ -337,7 +337,7 @@ class MessageController(storage.MessageBase): return query = { - "q": self._get_queue_id(queue, tenant), + "q": self._get_queue_id(queue, project), "_id": mid } @@ -365,9 +365,9 @@ class MessageController(storage.MessageBase): except exceptions.QueueDoesNotExist: pass - def purge_queue(self, queue, tenant=None): + def purge_queue(self, queue, project=None): try: - qid = self._get_queue_id(queue, tenant) + qid = self._get_queue_id(queue, project) self._col.remove({"q": qid}, w=0) except exceptions.QueueDoesNotExist: pass @@ -393,15 +393,15 @@ class ClaimController(storage.ClaimBase): the claim id and it's expiration timestamp. """ - def _get_queue_id(self, queue, tenant): + def _get_queue_id(self, queue, project): queue_controller = self.driver.queue_controller - return queue_controller.get_id(queue, tenant) + return queue_controller.get_id(queue, project) - def get(self, queue, claim_id, tenant=None): + def get(self, queue, claim_id, project=None): msg_ctrl = self.driver.message_controller # Check whether the queue exists or not - qid = self._get_queue_id(queue, tenant) + qid = self._get_queue_id(queue, project) # Base query, always check expire time now = timeutils.utcnow() @@ -435,11 +435,11 @@ class ClaimController(storage.ClaimBase): "id": str(claim["id"]), } except StopIteration: - raise exceptions.ClaimDoesNotExist(cid, queue, tenant) + raise exceptions.ClaimDoesNotExist(cid, queue, project) return (claim, messages) - def create(self, queue, metadata, tenant=None, limit=10): + def create(self, queue, metadata, project=None, limit=10): """ This implementation was done in a best-effort fashion. In order to create a claim we need to get a list @@ -460,7 +460,7 @@ class ClaimController(storage.ClaimBase): # We don't need the qid here but # we need to verify it exists. - qid = self._get_queue_id(queue, tenant) + qid = self._get_queue_id(queue, project) ttl = int(metadata.get("ttl", 60)) oid = objectid.ObjectId() @@ -513,14 +513,14 @@ class ClaimController(storage.ClaimBase): upsert=False, multi=True) if updated != 0: - claim, messages = self.get(queue, oid, tenant=tenant) + claim, messages = self.get(queue, oid, project=project) return (str(oid), messages) - def update(self, queue, claim_id, metadata, tenant=None): + def update(self, queue, claim_id, metadata, project=None): try: cid = utils.to_oid(claim_id) except ValueError: - raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant) + raise exceptions.ClaimDoesNotExist(claim_id, queue, project) now = timeutils.utcnow() ttl = int(metadata.get("ttl", 60)) @@ -532,14 +532,14 @@ class ClaimController(storage.ClaimBase): msg = _("New ttl will make the claim expires") raise ValueError(msg) - qid = self._get_queue_id(queue, tenant) + qid = self._get_queue_id(queue, project) msg_ctrl = self.driver.message_controller claimed = msg_ctrl.claimed(qid, cid, expires=now, limit=1) try: claimed.next() except StopIteration: - raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant) + raise exceptions.ClaimDoesNotExist(claim_id, queue, project) meta = { "id": cid, @@ -561,6 +561,6 @@ class ClaimController(storage.ClaimBase): {"$set": {"e": expires, "t": ttl}}, upsert=False, multi=True) - def delete(self, queue, claim_id, tenant=None): + def delete(self, queue, claim_id, project=None): msg_ctrl = self.driver.message_controller msg_ctrl.unclaim(claim_id) diff --git a/marconi/storage/sqlite/controllers.py b/marconi/storage/sqlite/controllers.py index aa82c8099..465a114ef 100644 --- a/marconi/storage/sqlite/controllers.py +++ b/marconi/storage/sqlite/controllers.py @@ -26,23 +26,23 @@ class Queue(base.QueueBase): if not exists Queues ( id INTEGER, - tenant TEXT, + project TEXT, name TEXT, metadata DOCUMENT, PRIMARY KEY(id), - UNIQUE(tenant, name) + UNIQUE(project, name) ) ''') - def list(self, tenant, marker=None, + def list(self, project, marker=None, limit=10, detailed=False): sql = ((''' select name from Queues''' if not detailed else ''' select name, metadata from Queues''') + ''' - where tenant = ?''') - args = [tenant] + where project = ?''') + args = [project] if marker: sql += ''' @@ -67,37 +67,37 @@ class Queue(base.QueueBase): yield it() yield marker_name['next'] - def get(self, name, tenant): + def get(self, name, project): try: return self.driver.get(''' select metadata from Queues - where tenant = ? and name = ?''', tenant, name)[0] + where project = ? and name = ?''', project, name)[0] except _NoResult: - raise exceptions.QueueDoesNotExist(name, tenant) + raise exceptions.QueueDoesNotExist(name, project) - def upsert(self, name, metadata, tenant): + def upsert(self, name, metadata, project): with self.driver('immediate'): previous_record = self.driver.run(''' select id from Queues - where tenant = ? and name = ? - ''', tenant, name).fetchone() + where project = ? and name = ? + ''', project, name).fetchone() self.driver.run(''' replace into Queues values (null, ?, ?, ?) - ''', tenant, name, self.driver.pack(metadata)) + ''', project, name, self.driver.pack(metadata)) return previous_record is None - def delete(self, name, tenant): + def delete(self, name, project): self.driver.run(''' delete from Queues - where tenant = ? and name = ?''', tenant, name) + where project = ? and name = ?''', project, name) - def stats(self, name, tenant): + def stats(self, name, project): with self.driver('deferred'): - qid = _get_qid(self.driver, name, tenant) + qid = _get_qid(self.driver, name, project) claimed, free = self.driver.get(''' select * from (select count(msgid) @@ -121,7 +121,7 @@ class Queue(base.QueueBase): 'actions': 0, } - def actions(self, name, tenant, marker=None, limit=10): + def actions(self, name, project, marker=None, limit=10): raise NotImplementedError @@ -143,15 +143,15 @@ class Message(base.MessageBase): ) ''') - def get(self, queue, message_id, tenant): + def get(self, queue, message_id, project): try: content, ttl, age = self.driver.get(''' select content, ttl, julianday() * 86400.0 - created from Queues as Q join Messages as M on qid = Q.id where ttl > julianday() * 86400.0 - created - and M.id = ? and tenant = ? and name = ? - ''', _msgid_decode(message_id), tenant, queue) + and M.id = ? and project = ? and name = ? + ''', _msgid_decode(message_id), project, queue) return { 'id': message_id, @@ -161,9 +161,9 @@ class Message(base.MessageBase): } except (_NoResult, _BadID): - raise exceptions.MessageDoesNotExist(message_id, queue, tenant) + raise exceptions.MessageDoesNotExist(message_id, queue, project) - def list(self, queue, tenant, marker=None, + def list(self, queue, project, marker=None, limit=10, echo=False, client_uuid=None): with self.driver('deferred'): try: @@ -172,7 +172,7 @@ class Message(base.MessageBase): from Messages where ttl > julianday() * 86400.0 - created and qid = ?''' - args = [_get_qid(self.driver, queue, tenant)] + args = [_get_qid(self.driver, queue, project)] if not echo: sql += ''' @@ -207,9 +207,9 @@ class Message(base.MessageBase): except _BadID: return - def post(self, queue, messages, client_uuid, tenant): + def post(self, queue, messages, client_uuid, project): with self.driver('immediate'): - qid = _get_qid(self.driver, queue, tenant) + qid = _get_qid(self.driver, queue, project) # cleanup all expired messages in this queue @@ -237,7 +237,7 @@ class Message(base.MessageBase): return map(_msgid_encode, range(unused, my['newid'])) - def delete(self, queue, message_id, tenant, claim=None): + def delete(self, queue, message_id, project, claim=None): try: id = _msgid_decode(message_id) @@ -246,8 +246,8 @@ class Message(base.MessageBase): delete from Messages where id = ? and qid = (select id from Queues - where tenant = ? and name = ?) - ''', id, tenant, queue) + where project = ? and name = ?) + ''', id, project, queue) return with self.driver('immediate'): @@ -256,8 +256,8 @@ class Message(base.MessageBase): from Queues as Q join Messages as M on qid = Q.id where ttl > julianday() * 86400.0 - created - and M.id = ? and tenant = ? and name = ? - ''', id, tenant, queue) + and M.id = ? and project = ? and name = ? + ''', id, project, queue) if not message_exists: return @@ -312,7 +312,7 @@ class Claim(base.ClaimBase): ) ''') - def get(self, queue, claim_id, tenant): + def get(self, queue, claim_id, project): with self.driver('deferred'): try: id, ttl, age = self.driver.get(''' @@ -320,8 +320,8 @@ class Claim(base.ClaimBase): from Queues as Q join Claims as C on Q.id = C.qid where C.ttl > julianday() * 86400.0 - C.created - and C.id = ? and tenant = ? and name = ? - ''', _cid_decode(claim_id), tenant, queue) + and C.id = ? and project = ? and name = ? + ''', _cid_decode(claim_id), project, queue) return ( { @@ -333,11 +333,11 @@ class Claim(base.ClaimBase): ) except (_NoResult, _BadID): - raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant) + raise exceptions.ClaimDoesNotExist(claim_id, queue, project) - def create(self, queue, metadata, tenant, limit=10): + def create(self, queue, metadata, project, limit=10): with self.driver('immediate'): - qid = _get_qid(self.driver, queue, tenant) + qid = _get_qid(self.driver, queue, project) # cleanup all expired claims in this queue @@ -383,7 +383,7 @@ class Claim(base.ClaimBase): 'body': content, } - def update(self, queue, claim_id, metadata, tenant): + def update(self, queue, claim_id, metadata, project): try: id = _cid_decode(claim_id) @@ -397,16 +397,18 @@ class Claim(base.ClaimBase): where ttl > julianday() * 86400.0 - created and id = ? and qid = (select id from Queues - where tenant = ? and name = ?) - ''', metadata['ttl'], id, tenant, queue) + where project = ? and name = ?) + ''', metadata['ttl'], id, project, queue) if not self.driver.affected: - raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant) + raise exceptions.ClaimDoesNotExist(claim_id, + queue, + project) self.__update_claimed(id, metadata['ttl']) except _BadID: - raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant) + raise exceptions.ClaimDoesNotExist(claim_id, queue, project) def __update_claimed(self, cid, ttl): # Precondition: cid is not expired @@ -419,14 +421,14 @@ class Claim(base.ClaimBase): where cid = ?) ''', ttl, ttl, cid) - def delete(self, queue, claim_id, tenant): + def delete(self, queue, claim_id, project): try: self.driver.run(''' delete from Claims where id = ? and qid = (select id from Queues - where tenant = ? and name = ?) - ''', _cid_decode(claim_id), tenant, queue) + where project = ? and name = ?) + ''', _cid_decode(claim_id), project, queue) except _BadID: pass @@ -440,14 +442,14 @@ class _BadID(Exception): pass -def _get_qid(driver, queue, tenant): +def _get_qid(driver, queue, project): try: return driver.get(''' select id from Queues - where tenant = ? and name = ?''', tenant, queue)[0] + where project = ? and name = ?''', project, queue)[0] except _NoResult: - raise exceptions.QueueDoesNotExist(queue, tenant) + raise exceptions.QueueDoesNotExist(queue, project) # The utilities below make the database IDs opaque to the users diff --git a/marconi/tests/storage/base.py b/marconi/tests/storage/base.py index 4962d08e5..4dd52909d 100644 --- a/marconi/tests/storage/base.py +++ b/marconi/tests/storage/base.py @@ -20,7 +20,7 @@ from marconi.tests import util as testing class ControllerBaseTest(testing.TestBase): - tenant = "tenant" + project = "project" driver_class = None controller_class = None controller_base_class = None @@ -53,9 +53,9 @@ class QueueControllerTest(ControllerBaseTest): def test_list(self): num = 15 for queue in xrange(num): - self.controller.upsert(queue, {}, tenant=self.tenant) + self.controller.upsert(queue, {}, project=self.project) - interaction = self.controller.list(tenant=self.tenant, + interaction = self.controller.list(project=self.project, detailed=True) queues = list(interaction.next()) @@ -64,7 +64,7 @@ class QueueControllerTest(ControllerBaseTest): 'metadata' in queue, queues)), True) self.assertEquals(len(queues), 10) - interaction = self.controller.list(tenant=self.tenant, + interaction = self.controller.list(project=self.project, marker=interaction.next()) queues = list(interaction.next()) @@ -75,37 +75,37 @@ class QueueControllerTest(ControllerBaseTest): def test_queue_lifecycle(self): # Test Queue Creation - created = self.controller.upsert("test", tenant=self.tenant, + created = self.controller.upsert("test", project=self.project, metadata=dict(topic="test_queue")) self.assertTrue(created) # Test Queue retrieval - queue = self.controller.get("test", tenant=self.tenant) + queue = self.controller.get("test", project=self.project) self.assertIsNotNone(queue) # Test Queue Update - created = self.controller.upsert("test", tenant=self.tenant, + created = self.controller.upsert("test", project=self.project, metadata=dict(meta="test_meta")) self.assertFalse(created) - queue = self.controller.get("test", tenant=self.tenant) + queue = self.controller.get("test", project=self.project) self.assertEqual(queue["meta"], "test_meta") # Test Queue Statistic _insert_fixtures(self.message_controller, "test", - tenant=self.tenant, client_uuid="my_uuid", num=12) + project=self.project, client_uuid="my_uuid", num=12) - countof = self.controller.stats("test", tenant=self.tenant) + countof = self.controller.stats("test", project=self.project) self.assertEqual(countof['messages']['free'], 12) # Test Queue Deletion - self.controller.delete("test", tenant=self.tenant) + self.controller.delete("test", project=self.project) # Test DoesNotExist Exception self.assertRaises(storage.exceptions.DoesNotExist, self.controller.get, "test", - tenant=self.tenant) + project=self.project) class MessageControllerTest(ControllerBaseTest): @@ -126,10 +126,10 @@ class MessageControllerTest(ControllerBaseTest): self.queue_controller = self.driver.queue_controller self.claim_controller = self.driver.claim_controller self.queue_controller.upsert(self.queue_name, {}, - tenant=self.tenant) + project=self.project) def tearDown(self): - self.queue_controller.delete(self.queue_name, tenant=self.tenant) + self.queue_controller.delete(self.queue_name, project=self.project) super(MessageControllerTest, self).tearDown() def test_message_lifecycle(self): @@ -147,25 +147,25 @@ class MessageControllerTest(ControllerBaseTest): # Test Message Creation created = list(self.controller.post(queue_name, messages, - tenant=self.tenant, + project=self.project, client_uuid="unused")) self.assertEqual(len(created), 1) # Test Message Get - self.controller.get(queue_name, created[0], tenant=self.tenant) + self.controller.get(queue_name, created[0], project=self.project) # Test Message Deletion - self.controller.delete(queue_name, created[0], tenant=self.tenant) + self.controller.delete(queue_name, created[0], project=self.project) # Test DoesNotExist self.assertRaises(storage.exceptions.DoesNotExist, self.controller.get, queue_name, message_id=created[0], - tenant=self.tenant) + project=self.project) def test_get_multi(self): _insert_fixtures(self.controller, self.queue_name, - tenant=self.tenant, client_uuid="my_uuid", num=15) + project=self.project, client_uuid="my_uuid", num=15) def load_messages(expected, *args, **kwargs): interaction = self.controller.list(*args, **kwargs) @@ -174,75 +174,76 @@ class MessageControllerTest(ControllerBaseTest): return interaction # Test all messages, echo False and uuid - load_messages(0, self.queue_name, tenant=self.tenant, + load_messages(0, self.queue_name, project=self.project, client_uuid="my_uuid") # Test all messages and limit - load_messages(15, self.queue_name, tenant=self.tenant, limit=20, + load_messages(15, self.queue_name, project=self.project, limit=20, echo=True) # Test all messages, echo True, and uuid interaction = load_messages(10, self.queue_name, echo=True, - tenant=self.tenant, client_uuid="my_uuid") + project=self.project, + client_uuid="my_uuid") # Test all messages, echo True, uuid and marker - load_messages(5, self.queue_name, echo=True, tenant=self.tenant, + load_messages(5, self.queue_name, echo=True, project=self.project, marker=interaction.next(), client_uuid="my_uuid") def test_claim_effects(self): _insert_fixtures(self.controller, self.queue_name, - tenant=self.tenant, client_uuid="my_uuid", num=12) + project=self.project, client_uuid="my_uuid", num=12) meta = {"ttl": 70} another_cid, _ = self.claim_controller.create(self.queue_name, meta, - tenant=self.tenant) + project=self.project) cid, msgs = self.claim_controller.create(self.queue_name, meta, - tenant=self.tenant) + project=self.project) [msg1, msg2] = msgs # A wrong claim does not ensure the message deletion with testing.expected(storage.exceptions.NotPermitted): self.controller.delete(self.queue_name, msg1["id"], - tenant=self.tenant, + project=self.project, claim=another_cid) # Make sure a message can be deleted with a claim self.controller.delete(self.queue_name, msg1["id"], - tenant=self.tenant, + project=self.project, claim=cid) with testing.expected(storage.exceptions.DoesNotExist): self.controller.get(self.queue_name, msg1["id"], - tenant=self.tenant) + project=self.project) # Make sure such a deletion is idempotent self.controller.delete(self.queue_name, msg1["id"], - tenant=self.tenant, + project=self.project, claim=cid) # A non-existing claim does not ensure the message deletion self.claim_controller.delete(self.queue_name, cid, - tenant=self.tenant) + project=self.project) with testing.expected(storage.exceptions.NotPermitted): self.controller.delete(self.queue_name, msg2["id"], - tenant=self.tenant, + project=self.project, claim=cid) def test_expired_message(self): messages = [{'body': 3.14, 'ttl': 0}] [msgid] = self.controller.post(self.queue_name, messages, - tenant=self.tenant, + project=self.project, client_uuid='my_uuid') with testing.expected(storage.exceptions.DoesNotExist): self.controller.get(self.queue_name, msgid, - tenant=self.tenant) + project=self.project) countof = self.queue_controller.stats(self.queue_name, - tenant=self.tenant) + project=self.project) self.assertEquals(countof['messages']['free'], 0) def test_illformed_id(self): @@ -263,12 +264,12 @@ class MessageControllerTest(ControllerBaseTest): self.queue_controller.upsert('unused', {}, '480924') [msgid] = self.controller.post('unused', [{'body': {}, 'ttl': 10}], - tenant='480924', + project='480924', client_uuid='unused') with testing.expected(exceptions.NotPermitted): self.controller.delete('unused', msgid, - tenant='480924', + project='480924', claim='illformed') @@ -290,21 +291,21 @@ class ClaimControllerTest(ControllerBaseTest): self.queue_controller = self.driver.queue_controller self.message_controller = self.driver.message_controller self.queue_controller.upsert(self.queue_name, {}, - tenant=self.tenant) + project=self.project) def tearDown(self): - self.queue_controller.delete(self.queue_name, tenant=self.tenant) + self.queue_controller.delete(self.queue_name, project=self.project) super(ClaimControllerTest, self).tearDown() def test_claim_lifecycle(self): _insert_fixtures(self.message_controller, self.queue_name, - tenant=self.tenant, client_uuid="my_uuid", num=20) + project=self.project, client_uuid="my_uuid", num=20) meta = {"ttl": 70} # Make sure create works claim_id, messages = self.controller.create(self.queue_name, meta, - tenant=self.tenant, + project=self.project, limit=15) messages = list(messages) @@ -312,13 +313,13 @@ class ClaimControllerTest(ControllerBaseTest): # Ensure Queue stats countof = self.queue_controller.stats(self.queue_name, - tenant=self.tenant) + project=self.project) self.assertEqual(countof['messages']['claimed'], 15) self.assertEqual(countof['messages']['free'], 5) # Make sure get works claim, messages2 = self.controller.get(self.queue_name, claim_id, - tenant=self.tenant) + project=self.project) messages2 = list(messages2) self.assertEquals(len(messages2), 15) @@ -328,11 +329,11 @@ class ClaimControllerTest(ControllerBaseTest): new_meta = {"ttl": 100} self.controller.update(self.queue_name, claim_id, - new_meta, tenant=self.tenant) + new_meta, project=self.project) # Make sure update works claim, messages2 = self.controller.get(self.queue_name, claim_id, - tenant=self.tenant) + project=self.project) messages2 = list(messages2) self.assertEquals(len(messages2), 15) @@ -344,25 +345,25 @@ class ClaimControllerTest(ControllerBaseTest): # Make sure delete works self.controller.delete(self.queue_name, claim_id, - tenant=self.tenant) + project=self.project) self.assertRaises(storage.exceptions.ClaimDoesNotExist, self.controller.get, self.queue_name, - claim_id, tenant=self.tenant) + claim_id, project=self.project) def test_expired_claim(self): meta = {"ttl": 0} claim_id, messages = self.controller.create(self.queue_name, meta, - tenant=self.tenant) + project=self.project) with testing.expected(storage.exceptions.DoesNotExist): self.controller.get(self.queue_name, claim_id, - tenant=self.tenant) + project=self.project) with testing.expected(storage.exceptions.DoesNotExist): self.controller.update(self.queue_name, claim_id, - meta, tenant=self.tenant) + meta, project=self.project) def test_illformed_id(self): # any ill-formed IDs should be regarded as non-existing ones. @@ -375,7 +376,7 @@ class ClaimControllerTest(ControllerBaseTest): {'ttl': 40}, '480924') -def _insert_fixtures(controller, queue_name, tenant=None, +def _insert_fixtures(controller, queue_name, project=None, client_uuid=None, num=4): def messages(): @@ -386,4 +387,4 @@ def _insert_fixtures(controller, queue_name, tenant=None, "event": "Event number %s" % n }} controller.post(queue_name, messages(), - tenant=tenant, client_uuid=client_uuid) + project=project, client_uuid=client_uuid) diff --git a/marconi/tests/storage/test_base.py b/marconi/tests/storage/test_base.py index 9429d7848..61f7af686 100644 --- a/marconi/tests/storage/test_base.py +++ b/marconi/tests/storage/test_base.py @@ -35,24 +35,24 @@ class Driver(storage.DriverBase): class QueueController(storage.QueueBase): - def list(self, tenant=None): - super(QueueController, self).list(tenant) + def list(self, project=None): + super(QueueController, self).list(project) - def get(self, name, tenant=None): - super(QueueController, self).get(name, tenant=tenant) + def get(self, name, project=None): + super(QueueController, self).get(name, project=project) - def upsert(self, name, metadata, tenant=None): - super(QueueController, self).upsert(name, tenant=tenant, + def upsert(self, name, metadata, project=None): + super(QueueController, self).upsert(name, project=project, metadata=metadata) - def delete(self, name, tenant=None): - super(QueueController, self).delete(name, tenant=tenant) + def delete(self, name, project=None): + super(QueueController, self).delete(name, project=project) - def stats(self, name, tenant=None): - super(QueueController, self).stats(name, tenant=tenant) + def stats(self, name, project=None): + super(QueueController, self).stats(name, project=project) - def actions(self, name, tenant=None, marker=None, limit=10): - super(QueueController, self).actions(name, tenant=tenant, + def actions(self, name, project=None, marker=None, limit=10): + super(QueueController, self).actions(name, project=project, marker=marker, limit=limit) diff --git a/marconi/tests/storage/test_impl_mongodb.py b/marconi/tests/storage/test_impl_mongodb.py index 007cd5590..b8dec942c 100644 --- a/marconi/tests/storage/test_impl_mongodb.py +++ b/marconi/tests/storage/test_impl_mongodb.py @@ -61,7 +61,7 @@ class MongodbQueueTests(base.QueueControllerTest): def test_indexes(self): col = self.controller._col indexes = col.index_information() - self.assertIn("t_1_n_1", indexes) + self.assertIn("p_1_n_1", indexes) def test_messages_purged(self): queue_name = "test" @@ -119,18 +119,18 @@ class MongodbClaimTests(base.ClaimControllerTest): epoch = '000000000000000000000000' self.assertRaises(storage.exceptions.ClaimDoesNotExist, self.controller.get, self.queue_name, - epoch, tenant=self.tenant) + epoch, project=self.project) claim_id, messages = self.controller.create(self.queue_name, {"ttl": 1}, - tenant=self.tenant) + project=self.project) # Lets let it expire time.sleep(1) self.assertRaises(storage.exceptions.ClaimDoesNotExist, self.controller.update, self.queue_name, - claim_id, {}, tenant=self.tenant) + claim_id, {}, project=self.project) self.assertRaises(storage.exceptions.ClaimDoesNotExist, self.controller.update, self.queue_name, - claim_id, {}, tenant=self.tenant) + claim_id, {}, project=self.project) diff --git a/marconi/tests/util/faulty_storage.py b/marconi/tests/util/faulty_storage.py index 14bd9b020..049db7bf5 100644 --- a/marconi/tests/util/faulty_storage.py +++ b/marconi/tests/util/faulty_storage.py @@ -35,22 +35,22 @@ class QueueController(storage.QueueBase): def __init__(self, driver): pass - def list(self, tenant=None): + def list(self, project=None): raise NotImplementedError() - def get(self, name, tenant=None): + def get(self, name, project=None): raise NotImplementedError() - def upsert(self, name, metadata, tenant=None): + def upsert(self, name, metadata, project=None): raise NotImplementedError() - def delete(self, name, tenant=None): + def delete(self, name, project=None): raise NotImplementedError() - def stats(self, name, tenant=None): + def stats(self, name, project=None): raise NotImplementedError() - def actions(self, name, tenant=None, marker=None, limit=10): + def actions(self, name, project=None, marker=None, limit=10): raise NotImplementedError() @@ -58,16 +58,16 @@ class MessageController(storage.MessageBase): def __init__(self, driver): pass - def get(self, queue, tenant=None, message_id=None, + def get(self, queue, project=None, message_id=None, marker=None, echo=False, client_uuid=None): raise NotImplementedError() - def list(self, queue, tenant=None, marker=None, + def list(self, queue, project=None, marker=None, limit=10, echo=False, client_uuid=None): raise NotImplementedError() - def post(self, queue, messages, tenant=None): + def post(self, queue, messages, project=None): raise NotImplementedError() - def delete(self, queue, message_id, tenant=None, claim=None): + def delete(self, queue, message_id, project=None, claim=None): raise NotImplementedError() diff --git a/marconi/transport/wsgi/claims.py b/marconi/transport/wsgi/claims.py index 3274ad4f8..30fc3ee2e 100644 --- a/marconi/transport/wsgi/claims.py +++ b/marconi/transport/wsgi/claims.py @@ -31,7 +31,7 @@ class CollectionResource(object): def __init__(self, claim_controller): self.claim_ctrl = claim_controller - def on_post(self, req, resp, tenant_id, queue_name): + def on_post(self, req, resp, project_id, queue_name): if req.content_length is None or req.content_length == 0: raise falcon.HTTPBadRequest(_('Bad request'), _('Missing claim metadata.')) @@ -46,7 +46,7 @@ class CollectionResource(object): cid, msgs = self.claim_ctrl.create( queue_name, metadata=metadata, - tenant=tenant_id, + project=project_id, **kwargs) resp_msgs = list(msgs) @@ -83,12 +83,12 @@ class ItemResource(object): def __init__(self, claim_controller): self.claim_ctrl = claim_controller - def on_get(self, req, resp, tenant_id, queue_name, claim_id): + def on_get(self, req, resp, project_id, queue_name, claim_id): try: meta, msgs = self.claim_ctrl.get( queue_name, claim_id=claim_id, - tenant=tenant_id) + project=project_id) meta['messages'] = list(msgs) for msg in meta['messages']: @@ -112,7 +112,7 @@ class ItemResource(object): msg = _('Please try again in a few seconds.') raise falcon.HTTPServiceUnavailable(title, msg, 30) - def on_patch(self, req, resp, tenant_id, queue_name, claim_id): + def on_patch(self, req, resp, project_id, queue_name, claim_id): if req.content_length is None or req.content_length == 0: raise falcon.HTTPBadRequest(_('Bad request'), _('Missing claim metadata.')) @@ -122,7 +122,7 @@ class ItemResource(object): self.claim_ctrl.update(queue_name, claim_id=claim_id, metadata=metadata, - tenant=tenant_id) + project=project_id) resp.status = falcon.HTTP_204 @@ -139,11 +139,11 @@ class ItemResource(object): msg = _('Please try again in a few seconds.') raise falcon.HTTPServiceUnavailable(title, msg, 30) - def on_delete(self, req, resp, tenant_id, queue_name, claim_id): + def on_delete(self, req, resp, project_id, queue_name, claim_id): try: self.claim_ctrl.delete(queue_name, claim_id=claim_id, - tenant=tenant_id) + project=project_id) resp.status = falcon.HTTP_204 diff --git a/marconi/transport/wsgi/driver.py b/marconi/transport/wsgi/driver.py index 46741d8b3..2b607bda6 100644 --- a/marconi/transport/wsgi/driver.py +++ b/marconi/transport/wsgi/driver.py @@ -39,17 +39,17 @@ class Driver(transport.DriverBase): claim_item = transport.wsgi.claims.ItemResource(claim_controller) self.app = api = falcon.API() - api.add_route('/v1/{tenant_id}/queues', queue_collection) - api.add_route('/v1/{tenant_id}/queues/{queue_name}', queue_item) - api.add_route('/v1/{tenant_id}/queues/{queue_name}' + api.add_route('/v1/{project_id}/queues', queue_collection) + api.add_route('/v1/{project_id}/queues/{queue_name}', queue_item) + api.add_route('/v1/{project_id}/queues/{queue_name}' '/stats', stats_endpoint) - api.add_route('/v1/{tenant_id}/queues/{queue_name}' + api.add_route('/v1/{project_id}/queues/{queue_name}' '/messages', msg_collection) - api.add_route('/v1/{tenant_id}/queues/{queue_name}' + api.add_route('/v1/{project_id}/queues/{queue_name}' '/messages/{message_id}', msg_item) - api.add_route('/v1/{tenant_id}/queues/{queue_name}' + api.add_route('/v1/{project_id}/queues/{queue_name}' '/claims', claim_collection) - api.add_route('/v1/{tenant_id}/queues/{queue_name}' + api.add_route('/v1/{project_id}/queues/{queue_name}' '/claims/{claim_id}', claim_item) def listen(self): diff --git a/marconi/transport/wsgi/messages.py b/marconi/transport/wsgi/messages.py index d2ecc10f6..d7f356c96 100644 --- a/marconi/transport/wsgi/messages.py +++ b/marconi/transport/wsgi/messages.py @@ -31,7 +31,7 @@ class CollectionResource(object): def __init__(self, message_controller): self.msg_ctrl = message_controller - def on_post(self, req, resp, tenant_id, queue_name): + def on_post(self, req, resp, project_id, queue_name): uuid = req.get_header('Client-ID', required=True) if req.content_length is None or req.content_length == 0: @@ -54,7 +54,7 @@ class CollectionResource(object): ls = filtered(helpers.read_json(req.stream)) ns = self.msg_ctrl.post(queue_name, messages=ls, - tenant=tenant_id, + project=project_id, client_uuid=uuid) resp.location = req.path + '/' + ','.join( @@ -74,7 +74,7 @@ class CollectionResource(object): msg = _('Please try again in a few seconds.') raise falcon.HTTPServiceUnavailable(title, msg, 30) - def on_get(self, req, resp, tenant_id, queue_name): + def on_get(self, req, resp, project_id, queue_name): uuid = req.get_header('Client-ID', required=True) #TODO(zyuan): where do we define the limits? @@ -86,7 +86,7 @@ class CollectionResource(object): try: interaction = self.msg_ctrl.list(queue_name, - tenant=tenant_id, + project=project_id, client_uuid=uuid, **kwargs) resp_dict = { @@ -129,11 +129,11 @@ class ItemResource(object): def __init__(self, message_controller): self.msg_ctrl = message_controller - def on_get(self, req, resp, tenant_id, queue_name, message_id): + def on_get(self, req, resp, project_id, queue_name, message_id): try: msg = self.msg_ctrl.get(queue_name, message_id=message_id, - tenant=tenant_id) + project=project_id) msg['href'] = req.path del msg['id'] @@ -151,11 +151,11 @@ class ItemResource(object): msg = _('Please try again in a few seconds.') raise falcon.HTTPServiceUnavailable(title, msg, 30) - def on_delete(self, req, resp, tenant_id, queue_name, message_id): + def on_delete(self, req, resp, project_id, queue_name, message_id): try: self.msg_ctrl.delete(queue_name, message_id=message_id, - tenant=tenant_id, + project=project_id, claim=req.get_param('claim_id')) resp.status = falcon.HTTP_204 diff --git a/marconi/transport/wsgi/queues.py b/marconi/transport/wsgi/queues.py index 8032653f1..9ba569144 100644 --- a/marconi/transport/wsgi/queues.py +++ b/marconi/transport/wsgi/queues.py @@ -32,7 +32,7 @@ class ItemResource(object): def __init__(self, queue_controller): self.queue_ctrl = queue_controller - def on_put(self, req, resp, tenant_id, queue_name): + def on_put(self, req, resp, project_id, queue_name): if req.content_length > transport.MAX_QUEUE_METADATA_SIZE: raise falcon.HTTPBadRequest(_('Bad request'), _('Queue metadata size is too large.')) @@ -45,7 +45,7 @@ class ItemResource(object): metadata = _filtered(helpers.read_json(req.stream)) created = self.queue_ctrl.upsert(queue_name, metadata=metadata, - tenant=tenant_id) + project=project_id) except helpers.MalformedJSON: raise falcon.HTTPBadRequest(_('Bad request'), _('Malformed queue metadata.')) @@ -59,10 +59,10 @@ class ItemResource(object): resp.status = falcon.HTTP_201 if created else falcon.HTTP_204 resp.location = req.path - def on_get(self, req, resp, tenant_id, queue_name): + def on_get(self, req, resp, project_id, queue_name): try: doc = self.queue_ctrl.get(queue_name, - tenant=tenant_id) + project=project_id) resp.content_location = req.relative_uri resp.body = helpers.to_json(doc) @@ -76,10 +76,10 @@ class ItemResource(object): msg = _('Please try again in a few seconds.') raise falcon.HTTPServiceUnavailable(title, msg, 30) - def on_delete(self, req, resp, tenant_id, queue_name): + def on_delete(self, req, resp, project_id, queue_name): try: self.queue_ctrl.delete(queue_name, - tenant=tenant_id) + project=project_id) except Exception as ex: LOG.exception(ex) @@ -97,7 +97,7 @@ class CollectionResource(object): def __init__(self, queue_controller): self.queue_ctrl = queue_controller - def on_get(self, req, resp, tenant_id): + def on_get(self, req, resp, project_id): #TODO(zyuan): where do we define the limits? kwargs = helpers.purge({ 'marker': req.get_param('marker'), @@ -106,7 +106,7 @@ class CollectionResource(object): }) try: - interaction = self.queue_ctrl.list(tenant=tenant_id, **kwargs) + interaction = self.queue_ctrl.list(project=project_id, **kwargs) resp_dict = { 'queues': list(interaction.next()) diff --git a/marconi/transport/wsgi/stats.py b/marconi/transport/wsgi/stats.py index 19b65e816..e480b2835 100644 --- a/marconi/transport/wsgi/stats.py +++ b/marconi/transport/wsgi/stats.py @@ -30,10 +30,10 @@ class Resource(object): def __init__(self, queue_controller): self.queue_ctrl = queue_controller - def on_get(self, req, resp, tenant_id, queue_name): + def on_get(self, req, resp, project_id, queue_name): try: resp_dict = self.queue_ctrl.stats(queue_name, - tenant=tenant_id) + project=project_id) resp.content_location = req.path resp.body = helpers.to_json(resp_dict)