QueueController for mongodb storage

This patch introduces a queue controller for the mongodb storage
backend.

Implements blueprint storage-mongodb

Change-Id: I473d0514b1948bedeae66c3b737d9e18e088c796
This commit is contained in:
Flaper Fesp 2013-02-28 18:19:08 +01:00
parent cd6a06a7dd
commit b8f510ccf7
8 changed files with 261 additions and 3 deletions

View File

@ -1 +1,3 @@
"""MongoDB Storage Driver for Marconi"""
from marconi.storage.mongodb.base import Driver # NOQA

View File

@ -0,0 +1,62 @@
# Copyright (c) 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Mongodb storage driver implementation
"""
import pymongo
import pymongo.errors
from marconi.common import config
from marconi import storage
from marconi.storage.mongodb import controllers
options = {
"uri": None,
"database": "marconi",
}
cfg = config.namespace('drivers:storage:mongodb').from_options(**options)
class Driver(storage.DriverBase):
def __init__(self):
self._database = None
@property
def db(self):
"""
Property for lazy instantiation of
mongodb's database.
"""
if not self._database:
conn = pymongo.MongoClient(cfg.uri)
self._database = conn[cfg.database]
return self._database
@property
def queue_controller(self):
return controllers.QueueController(self)
@property
def message_controller(self):
return controllers.MessageController(self)
@property
def claim_controller(self):
return controllers.ClaimController(self)

View File

@ -0,0 +1,88 @@
# Copyright (c) 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Mongodb storage controllers implementation
Fields Mapping:
In order to reduce the disk / memory space used,
fields name will be, most of the time, the first
letter of their long name. Fields mapping will be
updated and documented in each class.
"""
from marconi import storage
class QueueController(storage.QueueBase):
"""
Queues:
Name Field
----------------
tenant -> t
metadata -> m
name -> n
"""
def __init__(self, *args, **kwargs):
super(QueueController, self).__init__(*args, **kwargs)
# NOTE(flaper87): This creates a unique compound index for
# tenant and name. Using tenant as the first field of the
# index allows for querying by tenant and tenant+name.
# This is also useful for retrieving the queues list for
# as specific tenant, for example. Order Matters!
self._col.ensure_index([("t", 1), ("n", 1)], unique=True)
@property
def _col(self):
return self.driver.db["queues"]
def list(self, tenant=None):
cursor = self._col.find({"t": tenant}, fields=["n", "m"])
for queue in cursor:
queue["name"] = queue.pop("n")
queue["metadata"] = queue.pop("m", {})
yield queue
def get(self, name, tenant=None):
queue = self._col.find_one({"t": tenant, "n": name}, fields=["m"])
if queue is None:
msg = (_("Queue %(name)s does not exist for tenant %(tenant)s") %
dict(name=name, tenant=tenant))
raise storage.exceptions.DoesNotExist(msg)
return queue.get("m", {})
def upsert(self, name, metadata, tenant=None):
super(QueueController, self).upsert(name, metadata, tenant)
rst = self._col.update({"t": tenant, "n": name},
{"$set": {"m": metadata}},
multi=False,
upsert=True,
manipulate=False)
return not rst["updatedExisting"]
def delete(self, name, tenant=None):
super(QueueController, self).delete(name, tenant)
self._col.remove({"t": tenant, "n": name})
def stats(self, name, tenant=None):
pass
def actions(self, name, tenant=None, marker=None, limit=10):
pass

View File

@ -0,0 +1,10 @@
[drivers]
transport = wsgi
storage = mongodb
[drivers:transport:wsgi]
port = 8888
[drivers:storage:mongodb]
uri = "mongodb://127.0.0.1:27017"
database = "marconi_test"

View File

@ -44,6 +44,20 @@ class QueueControllerTest(ControllerBaseTest):
"""
controller_base_class = storage.QueueBase
def test_list(self):
num = 4
for queue in xrange(num):
self.controller.upsert(queue, {}, tenant=self.tenant)
queues = self.controller.list(tenant=self.tenant)
counter = 0
for queue in queues:
self.assertIn("name", queue)
self.assertIn("metadata", queue)
counter += 1
self.assertEqual(counter, num)
def test_queue_lifecycle(self):
# Test Queue Creation
created = self.controller.upsert("test", tenant=self.tenant,
@ -53,7 +67,7 @@ class QueueControllerTest(ControllerBaseTest):
# Test Queue retrieval
queue = self.controller.get("test", tenant=self.tenant)
self.assertEqual(queue["name"], "test")
self.assertIsNotNone(queue)
# Test Queue Update
created = self.controller.upsert("test", tenant=self.tenant,
@ -61,14 +75,15 @@ class QueueControllerTest(ControllerBaseTest):
self.assertFalse(created)
queue = self.controller.get("test", tenant=self.tenant)
self.assertEqual(queue["metadata"]["meta"], "test_meta")
self.assertEqual(queue["meta"], "test_meta")
# Test Queue Deletion
self.controller.delete("test", tenant=self.tenant)
# Test DoesNotExist Exception
self.assertRaises(storage.exceptions.DoesNotExist,
self.controller.get, "test", tenant=self.tenant)
self.controller.get, "test",
tenant=self.tenant)
class MessageControllerTest(ControllerBaseTest):

View File

@ -0,0 +1,62 @@
# Copyright (c) 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from marconi.common import config
from marconi.storage import mongodb
from marconi.storage.mongodb import controllers
from marconi.tests.storage import base
from marconi.tests.util import suite
cfg = config.namespace("drivers:storage:mongodb").from_options()
class MongodbDriverTest(suite.TestSuite):
def setUp(self):
if not os.environ.get("MONGODB_TEST_LIVE"):
self.skipTest("No MongoDB instance running")
super(MongodbDriverTest, self).setUp()
self.load_conf("wsgi_mongodb.conf")
def test_db_instance(self):
driver = mongodb.Driver()
db = driver.db
self.assertEquals(db.name, cfg.database)
class MongodbQueueTests(base.QueueControllerTest):
driver_class = mongodb.Driver
controller_class = controllers.QueueController
def setUp(self):
if not os.environ.get("MONGODB_TEST_LIVE"):
self.skipTest("No MongoDB instance running")
super(MongodbQueueTests, self).setUp()
self.load_conf("wsgi_mongodb.conf")
def tearDown(self):
self.controller._col.drop()
super(MongodbQueueTests, self).tearDown()
def test_indexes(self):
col = self.controller._col
indexes = col.index_information()
self.assertIn("t_1_n_1", indexes)

View File

@ -17,6 +17,12 @@ import os
import testtools
from marconi.common import config
cfg = config.project()
class TestSuite(testtools.TestCase):
"""
Child class of testtools.TestCase for testing Marconi
@ -46,5 +52,17 @@ class TestSuite(testtools.TestCase):
parent = os.path.dirname(self._my_dir())
return os.path.join(parent, 'etc', filename)
def load_conf(self, filename):
"""
Loads `filename` configuration file.
:param filename: Name of the conf file to find (e.g.,
"wsgi_memory.conf")
:returns: Project's config object.
"""
cfg.load(self.conf_path(filename))
return cfg
def _my_dir(self):
return os.path.abspath(os.path.dirname(__file__))

View File

@ -1,2 +1,3 @@
cliff
oslo.config>=1.1.0
pymongo