storage: pass the conf object to drivers
Some storage drivers such as SQL need it. Change-Id: I4ef122003d372976a4e401aee223e971133dd8f7
This commit is contained in:
parent
5760b50204
commit
5c995c3924
@ -25,7 +25,8 @@ class Connection(object):
|
|||||||
'storage': {'production_ready': False},
|
'storage': {'production_ready': False},
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, url):
|
@staticmethod
|
||||||
|
def __init__(url, conf):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -70,7 +70,7 @@ class Connection(base.Connection):
|
|||||||
# it is only searchable after periodic refreshes.
|
# it is only searchable after periodic refreshes.
|
||||||
_refresh_on_write = False
|
_refresh_on_write = False
|
||||||
|
|
||||||
def __init__(self, url):
|
def __init__(self, url, conf):
|
||||||
url_split = netutils.urlsplit(url)
|
url_split = netutils.urlsplit(url)
|
||||||
self.conn = es.Elasticsearch(url_split.netloc)
|
self.conn = es.Elasticsearch(url_split.netloc)
|
||||||
|
|
||||||
|
@ -67,9 +67,6 @@ class Connection(hbase_base.Connection, base.Connection):
|
|||||||
|
|
||||||
EVENT_TABLE = "event"
|
EVENT_TABLE = "event"
|
||||||
|
|
||||||
def __init__(self, url):
|
|
||||||
super(Connection, self).__init__(url)
|
|
||||||
|
|
||||||
def upgrade(self):
|
def upgrade(self):
|
||||||
tables = [self.EVENT_TABLE]
|
tables = [self.EVENT_TABLE]
|
||||||
column_families = {'f': dict(max_versions=1)}
|
column_families = {'f': dict(max_versions=1)}
|
||||||
|
@ -27,7 +27,7 @@ class Connection(pymongo_base.Connection):
|
|||||||
|
|
||||||
CONNECTION_POOL = pymongo_utils.ConnectionPool()
|
CONNECTION_POOL = pymongo_utils.ConnectionPool()
|
||||||
|
|
||||||
def __init__(self, url):
|
def __init__(self, url, conf):
|
||||||
|
|
||||||
# NOTE(jd) Use our own connection pooling on top of the Pymongo one.
|
# NOTE(jd) Use our own connection pooling on top of the Pymongo one.
|
||||||
# We need that otherwise we overflow the MongoDB instance with new
|
# We need that otherwise we overflow the MongoDB instance with new
|
||||||
|
@ -16,7 +16,6 @@
|
|||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
from oslo_config import cfg
|
|
||||||
from oslo_db import exception as dbexc
|
from oslo_db import exception as dbexc
|
||||||
from oslo_db.sqlalchemy import session as db_session
|
from oslo_db.sqlalchemy import session as db_session
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
@ -123,12 +122,12 @@ class Connection(base.Connection):
|
|||||||
AVAILABLE_STORAGE_CAPABILITIES,
|
AVAILABLE_STORAGE_CAPABILITIES,
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, url):
|
def __init__(self, url, conf):
|
||||||
# Set max_retries to 0, since oslo.db in certain cases may attempt
|
# Set max_retries to 0, since oslo.db in certain cases may attempt
|
||||||
# to retry making the db connection retried max_retries ^ 2 times
|
# to retry making the db connection retried max_retries ^ 2 times
|
||||||
# in failure case and db reconnection has already been implemented
|
# in failure case and db reconnection has already been implemented
|
||||||
# in storage.__init__.get_connection_from_config function
|
# in storage.__init__.get_connection_from_config function
|
||||||
options = dict(cfg.CONF.database.items())
|
options = dict(conf)
|
||||||
options['max_retries'] = 0
|
options['max_retries'] = 0
|
||||||
# oslo.db doesn't support options defined by Panko
|
# oslo.db doesn't support options defined by Panko
|
||||||
for opt in storage.OPTS:
|
for opt in storage.OPTS:
|
||||||
|
@ -64,12 +64,12 @@ def get_connection_from_config(conf):
|
|||||||
def _inner():
|
def _inner():
|
||||||
url = (getattr(conf.database, 'event_connection') or
|
url = (getattr(conf.database, 'event_connection') or
|
||||||
conf.database.connection)
|
conf.database.connection)
|
||||||
return get_connection(url)
|
return get_connection(url, conf)
|
||||||
|
|
||||||
return _inner()
|
return _inner()
|
||||||
|
|
||||||
|
|
||||||
def get_connection(url):
|
def get_connection(url, conf):
|
||||||
"""Return an open connection to the database."""
|
"""Return an open connection to the database."""
|
||||||
connection_scheme = urlparse.urlparse(url).scheme
|
connection_scheme = urlparse.urlparse(url).scheme
|
||||||
# SqlAlchemy connections specify may specify a 'dialect' or
|
# SqlAlchemy connections specify may specify a 'dialect' or
|
||||||
@ -79,4 +79,4 @@ def get_connection(url):
|
|||||||
LOG.debug('looking for %(name)r driver in panko.event.storage',
|
LOG.debug('looking for %(name)r driver in panko.event.storage',
|
||||||
{'name': engine_name})
|
{'name': engine_name})
|
||||||
mgr = driver.DriverManager('panko.event.storage', engine_name)
|
mgr = driver.DriverManager('panko.event.storage', engine_name)
|
||||||
return mgr.driver(url)
|
return mgr.driver(url, conf)
|
||||||
|
@ -38,8 +38,9 @@ except ImportError:
|
|||||||
|
|
||||||
class MongoDbManager(fixtures.Fixture):
|
class MongoDbManager(fixtures.Fixture):
|
||||||
|
|
||||||
def __init__(self, url):
|
def __init__(self, url, conf):
|
||||||
self._url = url
|
self._url = url
|
||||||
|
self.conf = conf
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(MongoDbManager, self).setUp()
|
super(MongoDbManager, self).setUp()
|
||||||
@ -49,7 +50,7 @@ class MongoDbManager(fixtures.Fixture):
|
|||||||
message='.*you must provide a username and password.*')
|
message='.*you must provide a username and password.*')
|
||||||
try:
|
try:
|
||||||
self.event_connection = storage.get_connection(
|
self.event_connection = storage.get_connection(
|
||||||
self.url)
|
self.url, self.conf)
|
||||||
except storage.StorageBadVersion as e:
|
except storage.StorageBadVersion as e:
|
||||||
raise testcase.TestSkipped(six.text_type(e))
|
raise testcase.TestSkipped(six.text_type(e))
|
||||||
|
|
||||||
@ -62,7 +63,7 @@ class MongoDbManager(fixtures.Fixture):
|
|||||||
|
|
||||||
|
|
||||||
class SQLManager(fixtures.Fixture):
|
class SQLManager(fixtures.Fixture):
|
||||||
def __init__(self, url):
|
def __init__(self, url, conf):
|
||||||
db_name = 'panko_%s' % uuid.uuid4().hex
|
db_name = 'panko_%s' % uuid.uuid4().hex
|
||||||
engine = sqlalchemy.create_engine(url)
|
engine = sqlalchemy.create_engine(url)
|
||||||
conn = engine.connect()
|
conn = engine.connect()
|
||||||
@ -72,10 +73,11 @@ class SQLManager(fixtures.Fixture):
|
|||||||
parsed = list(urlparse.urlparse(url))
|
parsed = list(urlparse.urlparse(url))
|
||||||
parsed[2] = '/' + db_name
|
parsed[2] = '/' + db_name
|
||||||
self.url = urlparse.urlunparse(parsed)
|
self.url = urlparse.urlunparse(parsed)
|
||||||
|
self.conf = conf
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(SQLManager, self).setUp()
|
super(SQLManager, self).setUp()
|
||||||
self.event_connection = storage.get_connection(self.url)
|
self.event_connection = storage.get_connection(self.url, self.conf)
|
||||||
|
|
||||||
|
|
||||||
class PgSQLManager(SQLManager):
|
class PgSQLManager(SQLManager):
|
||||||
@ -93,13 +95,14 @@ class MySQLManager(SQLManager):
|
|||||||
|
|
||||||
|
|
||||||
class ElasticSearchManager(fixtures.Fixture):
|
class ElasticSearchManager(fixtures.Fixture):
|
||||||
def __init__(self, url):
|
def __init__(self, url, conf):
|
||||||
self.url = url
|
self.url = url
|
||||||
|
self.conf = conf
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(ElasticSearchManager, self).setUp()
|
super(ElasticSearchManager, self).setUp()
|
||||||
self.event_connection = storage.get_connection(
|
self.event_connection = storage.get_connection(
|
||||||
self.url)
|
self.url, self.conf)
|
||||||
# prefix each test with unique index name
|
# prefix each test with unique index name
|
||||||
self.event_connection.index_name = 'events_%s' % uuid.uuid4().hex
|
self.event_connection.index_name = 'events_%s' % uuid.uuid4().hex
|
||||||
# force index on write so data is queryable right away
|
# force index on write so data is queryable right away
|
||||||
@ -107,13 +110,14 @@ class ElasticSearchManager(fixtures.Fixture):
|
|||||||
|
|
||||||
|
|
||||||
class HBaseManager(fixtures.Fixture):
|
class HBaseManager(fixtures.Fixture):
|
||||||
def __init__(self, url):
|
def __init__(self, url, conf):
|
||||||
self._url = url
|
self._url = url
|
||||||
|
self.conf = conf
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(HBaseManager, self).setUp()
|
super(HBaseManager, self).setUp()
|
||||||
self.event_connection = storage.get_connection(
|
self.event_connection = storage.get_connection(
|
||||||
self.url)
|
self.url, self.conf)
|
||||||
# Unique prefix for each test to keep data is distinguished because
|
# Unique prefix for each test to keep data is distinguished because
|
||||||
# all test data is stored in one table
|
# all test data is stored in one table
|
||||||
data_prefix = str(uuid.uuid4().hex)
|
data_prefix = str(uuid.uuid4().hex)
|
||||||
@ -145,13 +149,14 @@ class HBaseManager(fixtures.Fixture):
|
|||||||
|
|
||||||
class SQLiteManager(fixtures.Fixture):
|
class SQLiteManager(fixtures.Fixture):
|
||||||
|
|
||||||
def __init__(self, url):
|
def __init__(self, url, conf):
|
||||||
self.url = url
|
self.url = url
|
||||||
|
self.conf = conf
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(SQLiteManager, self).setUp()
|
super(SQLiteManager, self).setUp()
|
||||||
self.event_connection = storage.get_connection(
|
self.event_connection = storage.get_connection(
|
||||||
self.url)
|
self.url, self.conf)
|
||||||
|
|
||||||
|
|
||||||
@six.add_metaclass(test_base.SkipNotImplementedMeta)
|
@six.add_metaclass(test_base.SkipNotImplementedMeta)
|
||||||
@ -191,7 +196,7 @@ class TestBase(test_base.BaseTestCase):
|
|||||||
if not manager:
|
if not manager:
|
||||||
self.skipTest("missing driver manager: %s" % engine)
|
self.skipTest("missing driver manager: %s" % engine)
|
||||||
|
|
||||||
self.db_manager = manager(db_url)
|
self.db_manager = manager(db_url, self.CONF)
|
||||||
|
|
||||||
self.useFixture(self.db_manager)
|
self.useFixture(self.db_manager)
|
||||||
|
|
||||||
@ -206,7 +211,7 @@ class TestBase(test_base.BaseTestCase):
|
|||||||
self.event_conn = None
|
self.event_conn = None
|
||||||
super(TestBase, self).tearDown()
|
super(TestBase, self).tearDown()
|
||||||
|
|
||||||
def _get_connection(self, url):
|
def _get_connection(self, url, conf):
|
||||||
return self.event_conn
|
return self.event_conn
|
||||||
|
|
||||||
|
|
||||||
|
@ -29,12 +29,12 @@ import six
|
|||||||
|
|
||||||
class EngineTest(base.BaseTestCase):
|
class EngineTest(base.BaseTestCase):
|
||||||
def test_get_connection(self):
|
def test_get_connection(self):
|
||||||
engine = storage.get_connection('log://localhost')
|
engine = storage.get_connection('log://localhost', None)
|
||||||
self.assertIsInstance(engine, impl_log.Connection)
|
self.assertIsInstance(engine, impl_log.Connection)
|
||||||
|
|
||||||
def test_get_connection_no_such_engine(self):
|
def test_get_connection_no_such_engine(self):
|
||||||
try:
|
try:
|
||||||
storage.get_connection('no-such-engine://localhost')
|
storage.get_connection('no-such-engine://localhost', None)
|
||||||
except RuntimeError as err:
|
except RuntimeError as err:
|
||||||
self.assertIn('no-such-engine', six.text_type(err))
|
self.assertIn('no-such-engine', six.text_type(err))
|
||||||
|
|
||||||
|
@ -26,7 +26,7 @@ def main(argv):
|
|||||||
url = ("%s?table_prefix=%s" %
|
url = ("%s?table_prefix=%s" %
|
||||||
(os.getenv("PANKO_TEST_STORAGE_URL"),
|
(os.getenv("PANKO_TEST_STORAGE_URL"),
|
||||||
os.getenv("PANKO_TEST_HBASE_TABLE_PREFIX", "test")))
|
os.getenv("PANKO_TEST_HBASE_TABLE_PREFIX", "test")))
|
||||||
event_conn = storage.get_connection(url)
|
event_conn = storage.get_connection(url, None)
|
||||||
for arg in argv:
|
for arg in argv:
|
||||||
if arg == "--upgrade":
|
if arg == "--upgrade":
|
||||||
event_conn.upgrade()
|
event_conn.upgrade()
|
||||||
|
Loading…
Reference in New Issue
Block a user