Support redis as mgmt storage backend

Change-Id: Id67109a8707f2c172132450913ad2783bd09ca05
Implements: blueprint support-redis-as-management-storage-backend
This commit is contained in:
gengchc2 2017-11-05 01:26:02 -08:00 committed by wangxiyuan
parent 5250d940bc
commit 81c822c3d6
9 changed files with 995 additions and 16 deletions

View File

@ -206,6 +206,11 @@ class PoolAlreadyExists(Conflict):
msg_format = u'The database URI is in use by another pool.'
class PoolRedisNotSupportGroup(ExceptionBase):
msg_format = (u'Redis not support pool_goup, please use flavor ')
class SubscriptionAlreadyExists(Conflict):
msg_format = (u'Such subscription already exists. Subscriptions '

View File

@ -34,8 +34,21 @@ Supported Features
.. [1] This depends on the backing Redis store performance. For more
information, see `Redis' benchmarks <http://redis.io/topics/benchmarks>`_.
Redis is only a storage driver, and can't be used as the sole backend for a
Zaqar deployment.
Redis can be used both a storage driver and management driver.
For the management driver, you need to enable the redis storage options
in redis.conf. Redis persistent storage supports two ways: RDB and AOF.
The following is RDB way:
The configuration is as follows:
save <seconds> <changes>
E.g
save 900 1
save 300 10
save 60 10000
NOTE: save time, the above means that a changed key interval 900s
for persistent storage; 10 changed keys 300s for storage;
10000 changed keys 60s for storage.
Unsupported Features
--------------------
@ -45,9 +58,11 @@ Unsupported Features
.. [2] As an in-memory store, Redis doesn't support the durability guarantees
the MongoDB or SQLAlchemy backends do.
Redis is not supported as the backend for the Management Store, which means
either MongoDB or SQLAlchemy are required in addition to Redis for a working
deployment.
"""
from zaqar.storage.redis import driver
# Hoist classes into package namespace
ControlDriver = driver.ControlDriver
DataDriver = driver.DataDriver

View File

@ -0,0 +1,247 @@
# Copyright (c) 2017 ZTE Corporation..
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Redis storage controller for the queues catalogue.
Serves to construct an association between a project + queue -> pool.
::
{
'p_q': project_queue :: six.text_type,
's': pool_identifier :: six.text_type
}
"""
from oslo_log import log as logging
import redis
import six
from zaqar.i18n import _
from zaqar.storage import base
from zaqar.storage import errors
from zaqar.storage.redis import utils
LOG = logging.getLogger(__name__)
CATALOGUE_SUFFIX = 'catalogue'
COUNTING_BATCH_SIZE = 100
class CatalogueController(base.CatalogueBase):
"""Implements Catalogue resource operations using Redis.
* Project Index (Redis sorted set):
Set of all queue_ids for the given project, ordered by name.
Key: <project_id>.catalogue
+--------+-----------------------------+
| Id | Value |
+========+=============================+
| name | <project_id>.<queue_name> |
+--------+-----------------------------+
* Queue and pool Information (Redis hash):
Key: <project_id>.<queue_name>.catalogue
+----------------------+---------+
| Name | Field |
+======================+=========+
| Project | p |
+----------------------+---------+
| Queue | p_q |
+----------------------+---------+
| Pool | p_p |
+----------------------+---------+
"""
def __init__(self, *args, **kwargs):
super(CatalogueController, self).__init__(*args, **kwargs)
self._client = self.driver.connection
@utils.raises_conn_error
@utils.retries_on_connection_error
def _insert(self, project, queue, pool):
queue_key = utils.scope_queue_name(queue, project)
catalogue_project_key = utils.scope_pool_catalogue(project,
CATALOGUE_SUFFIX)
catalogue_queue_key = utils.scope_pool_catalogue(queue_key,
CATALOGUE_SUFFIX)
# Check if the queue already exists.
if self._exists(queue, project):
return False
catalogue = {
'p': project,
'p_q': queue,
'p_p': pool
}
# Pipeline ensures atomic inserts.
with self._client.pipeline() as pipe:
pipe.zadd(catalogue_project_key, 1, queue_key)
pipe.hmset(catalogue_queue_key, catalogue)
try:
pipe.execute()
except redis.exceptions.ResponseError:
msgtmpl = _(u'CatalogueController:insert %(prj)s:'
'%(queue)s %(pool)s failed')
LOG.exception(msgtmpl,
{'prj': project, 'queue': queue, 'pool': pool})
return False
msgtmpl = _(u'CatalogueController:insert %(prj)s:%(queue)s'
':%(pool)s, success')
LOG.info(msgtmpl,
{'prj': project, 'queue': queue, 'pool': pool})
return True
@utils.raises_conn_error
@utils.retries_on_connection_error
def list(self, project):
catalogue_project_key = utils.scope_pool_catalogue(project,
CATALOGUE_SUFFIX)
ctlgs = []
offset = 0
while True:
queues = self._client.zrange(catalogue_project_key, offset,
offset + COUNTING_BATCH_SIZE - 1)
if not queues:
break
offset += len(queues)
for queue in queues:
catalogue_queue_key =\
utils.scope_pool_catalogue(queue,
CATALOGUE_SUFFIX)
ctlg = self._client.hgetall(catalogue_queue_key)
ctlgs.append(ctlg)
return (_normalize(v) for v in ctlgs)
@utils.raises_conn_error
@utils.retries_on_connection_error
def get(self, project, queue):
queue_key = utils.scope_queue_name(queue, project)
catalogue_queue_key = \
utils.scope_pool_catalogue(queue_key,
CATALOGUE_SUFFIX)
ctlg = self._client.hgetall(catalogue_queue_key)
if ctlg is None or len(ctlg) == 0:
raise errors.QueueNotMapped(queue, project)
return _normalize(ctlg)
@utils.raises_conn_error
@utils.retries_on_connection_error
def _exists(self, project, queue):
queue_key = utils.scope_queue_name(queue, project)
catalogue_queue_key = \
utils.scope_pool_catalogue(queue_key,
CATALOGUE_SUFFIX)
return self._client.exists(catalogue_queue_key)
@utils.raises_conn_error
@utils.retries_on_connection_error
def exists(self, project, queue):
return self._exists(project, queue)
def insert(self, project, queue, pool):
self._insert(project, queue, pool)
@utils.raises_conn_error
@utils.retries_on_connection_error
def delete(self, project, queue):
# (gengchc): Check if the queue already exists.
if not self._exists(project, queue):
return True
queue_key = utils.scope_queue_name(queue, project)
catalogue_project_key = utils.scope_pool_catalogue(project,
CATALOGUE_SUFFIX)
catalogue_queue_key = utils.scope_pool_catalogue(queue_key,
CATALOGUE_SUFFIX)
# (gengchc) Pipeline ensures atomic inserts.
with self._client.pipeline() as pipe:
pipe.zrem(catalogue_project_key, queue_key)
pipe.delete(catalogue_queue_key)
try:
pipe.execute()
except redis.exceptions.ResponseError:
msgtmpl = _(u'CatalogueController:delete %(prj)s'
':%(queue)s failed')
LOG.info(msgtmpl,
{'prj': project, 'queue': queue})
return False
msgtmpl = _(u'CatalogueController:delete %(prj)s:%(queue)s success')
LOG.info(msgtmpl,
{'prj': project, 'queue': queue})
@utils.raises_conn_error
@utils.retries_on_connection_error
def _update(self, project, queue, pool):
# Check if the queue already exists.
if not self._exists(project, queue):
raise errors.QueueNotMapped(queue, project)
queue_key = utils.scope_queue_name(queue, project)
catalogue_queue_key = utils.scope_pool_catalogue(queue_key,
CATALOGUE_SUFFIX)
with self._client.pipeline() as pipe:
pipe.hset(catalogue_queue_key, "pl", pool)
try:
pipe.execute()
except redis.exceptions.ResponseError:
msgtmpl = _(u'CatalogueController:_update %(prj)s'
':%(queue)s:%(pool)s failed')
LOG.exception(msgtmpl,
{'prj': project, 'queue': queue, 'pool': pool})
return False
msgtmpl = _(u'CatalogueController:_update %(prj)s:%(queue)s'
':%(pool)s')
LOG.info(msgtmpl,
{'prj': project, 'queue': queue, 'pool': pool})
@utils.raises_conn_error
@utils.retries_on_connection_error
def update(self, project, queue, pool=None):
if pool is None:
return False
self._update(project, queue, pool)
@utils.raises_conn_error
@utils.retries_on_connection_error
def drop_all(self):
allcatalogueobj_key = self._client.keys(pattern='*catalog')
if len(allcatalogueobj_key) == 0:
return
with self._client.pipeline() as pipe:
for key in allcatalogueobj_key:
pipe.delete(key)
try:
pipe.execute()
except redis.exceptions.ResponseError:
return False
def _normalize(entry):
return {
'queue': six.text_type(entry['p_q']),
'project': six.text_type(entry['p']),
'pool': six.text_type(entry['p_p'])
}

View File

@ -12,13 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from zaqar.storage.redis import catalogue
from zaqar.storage.redis import claims
from zaqar.storage.redis import flavors
from zaqar.storage.redis import messages
from zaqar.storage.redis import pools
from zaqar.storage.redis import queues
from zaqar.storage.redis import subscriptions
QueueController = queues.QueueController
MessageController = messages.MessageController
CatalogueController = catalogue.CatalogueController
ClaimController = claims.ClaimController
FlavorsController = flavors.FlavorsController
MessageController = messages.MessageController
QueueController = queues.QueueController
PoolsController = pools.PoolsController
SubscriptionController = subscriptions.SubscriptionController

View File

@ -258,15 +258,31 @@ class ControlDriver(storage.ControlDriverBase):
@property
def pools_controller(self):
raise NotImplementedError()
controller = controllers.PoolsController(self)
if (self.conf.profiler.enabled and
self.conf.profiler.trace_management_store):
return profiler.trace_cls("redis_pools_controller")(controller)
else:
return controller
@property
def catalogue_controller(self):
raise NotImplementedError()
controller = controllers.CatalogueController(self)
if (self.conf.profiler.enabled and
self.conf.profiler.trace_management_store):
return profiler.trace_cls("redis_catalogue_"
"controller")(controller)
else:
return controller
@property
def flavors_controller(self):
raise NotImplementedError()
controller = controllers.FlavorsController(self)
if (self.conf.profiler.enabled and
self.conf.profiler.trace_management_store):
return profiler.trace_cls("redis_flavors_controller")(controller)
else:
return controller
def _get_redis_client(driver):

View File

@ -0,0 +1,181 @@
# Copyright (c) 2017 ZTE Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import functools
import msgpack
import redis
from zaqar.storage import base
from zaqar.storage import errors
from zaqar.storage.redis import utils
class FlavorsController(base.FlavorsBase):
"""Implements flavor resource operations using Redis.
Redis Data Structures:
1 All flavor_ids (Redis sorted set):
Set of all flavor_ids, ordered by name. Used to
delete the all records of table flavors
Key: flavors
+--------+-----------------------------+
| Id | Value |
+========+=============================+
| name | <flavor> |
+--------+-----------------------------+
2 Project Index (Redis sorted set):
Set of all flavors for the given project, ordered by name.
Key: <project_id>.flavors
+--------+-----------------------------+
| Id | Value |
+========+=============================+
| name | <flavor> |
+--------+-----------------------------+
3 Flavor Information (Redis hash):
Key: <flavor_id>.flavors
+----------------------+---------+
| Name | Field |
+======================+=========+
| flavor | f |
+----------------------+---------+
| project | p |
+----------------------+---------+
| capabilities | c |
+----------------------+---------+
"""
def __init__(self, *args, **kwargs):
super(FlavorsController, self).__init__(*args, **kwargs)
self._client = self.driver.connection
self._packer = msgpack.Packer(encoding='utf-8',
use_bin_type=True).pack
self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8')
@utils.raises_conn_error
def list(self, project=None, marker=None, limit=10, detailed=False):
client = self._client
subset_key = utils.flavor_project_subset_key(project)
marker_key = utils.flavor_name_hash_key(marker)
rank = client.zrank(subset_key, marker_key)
start = rank + 1 if rank is not None else 0
cursor = (f for f in client.zrange(subset_key, start,
start + limit - 1))
marker_next = {}
def normalizer(flavor):
marker_next['next'] = flavor['f']
return self._normalize(flavor, detailed=detailed)
yield utils.FlavorListCursor(self._client, cursor, normalizer)
yield marker_next and marker_next['next']
@utils.raises_conn_error
def get(self, name, project=None, detailed=False):
hash_key = utils.flavor_name_hash_key(name)
flavors = self._client.hgetall(hash_key)
if flavors is None or len(flavors) == 0:
raise errors.FlavorDoesNotExist(name)
return self._normalize(flavors, detailed)
@utils.raises_conn_error
def create(self, name, project=None, capabilities=None):
capabilities = {} if capabilities is None else capabilities
subset_key = utils.flavor_project_subset_key(project)
set_key = utils.flavor_set_key()
hash_key = utils.flavor_name_hash_key(name)
flavors = self._client.hgetall(hash_key)
if len(flavors) == 0:
flavors = {
'f': name,
'p': project,
'c': self._packer(capabilities or {}),
}
# Pipeline ensures atomic inserts.
with self._client.pipeline() as pipe:
pipe.zadd(set_key, 1, hash_key)
pipe.zadd(subset_key, 1, hash_key)
pipe.hmset(hash_key, flavors)
pipe.execute()
else:
with self._client.pipeline() as pipe:
pipe.hset(hash_key, "c", self._packer(capabilities))
pipe.hset(hash_key, "p", project)
pipe.execute()
@utils.raises_conn_error
def exists(self, name, project=None):
set_key = utils.flavor_set_key()
hash_key = utils.flavor_name_hash_key(name)
return self._client.zrank(set_key, hash_key) is not None
@utils.raises_conn_error
def update(self, name, project=None, capabilities=None):
hash_key = utils.flavor_name_hash_key(name)
with self._client.pipeline() as pipe:
pipe.hset(hash_key, "c", self._packer(capabilities))
pipe.hset(hash_key, "p", project)
try:
pipe.execute()
except redis.exceptions.ResponseError:
raise errors.FlavorDoesNotExist(name)
@utils.raises_conn_error
def delete(self, name, project=None):
subset_key = utils.flavor_project_subset_key(project)
set_key = utils.flavor_set_key()
hash_key = utils.flavor_name_hash_key(name)
if self._client.zrank(subset_key, hash_key) is not None:
with self._client.pipeline() as pipe:
pipe.zrem(set_key, hash_key)
pipe.zrem(subset_key, hash_key)
pipe.delete(hash_key)
pipe.execute()
@utils.raises_conn_error
def drop_all(self):
allflavor_key = self._client.keys(pattern='*flavors')
if len(allflavor_key) == 0:
return
with self._client.pipeline() as pipe:
for key in allflavor_key:
pipe.delete(key)
try:
pipe.execute()
except redis.exceptions.ResponseError:
return False
def _normalize(self, flavor, detailed=False):
ret = {
'name': flavor['f'],
}
if detailed:
ret['capabilities'] = self._unpacker(flavor['c'])
return ret

View File

@ -0,0 +1,265 @@
# Copyright (c) 2017 ZTE Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""pools: an implementation of the pool management storage
controller for redis.
Schema:
'n': name :: six.text_type
'u': uri :: six.text_type
'w': weight :: int
'o': options :: dict
"""
import functools
import msgpack
from oslo_log import log as logging
import redis
from zaqar.common import utils as common_utils
from zaqar.storage import base
from zaqar.storage import errors
from zaqar.storage.redis import utils
LOG = logging.getLogger(__name__)
class PoolsController(base.PoolsBase):
"""Implements Pools resource operations using Redis.
* All pool (Redis sorted set):
Set of all pool_ids, ordered by name. Used to delete the all
records of table pools.
Key: pools
+--------+-----------------------------+
| Id | Value |
+========+=============================+
| name | <pool> |
+--------+-----------------------------+
* Flavor Index (Redis sorted set):
Set of all pool_ids for the given flavor, ordered by name.
Key: <flavor>.pools
+--------+-----------------------------+
| Id | Value |
+========+=============================+
| name | <pool> |
+--------+-----------------------------+
* Pools Information (Redis hash):
Key: <pool>.pools
+----------------------+---------+
| Name | Field |
+======================+=========+
| pool | pl |
+----------------------+---------+
| uri | u |
+----------------------+---------+
| weight | w |
+----------------------+---------+
| options | o |
+----------------------+---------+
| flavor | f |
+----------------------+---------+
"""
def __init__(self, *args, **kwargs):
super(PoolsController, self).__init__(*args, **kwargs)
self._client = self.driver.connection
self.flavor_ctl = self.driver.flavors_controller
self._packer = msgpack.Packer(encoding='utf-8',
use_bin_type=True).pack
self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8')
@utils.raises_conn_error
@utils.retries_on_connection_error
def _list(self, marker=None, limit=10, detailed=False):
client = self._client
set_key = utils.pools_set_key()
marker_key = utils.pools_name_hash_key(marker)
rank = client.zrank(set_key, marker_key)
start = rank + 1 if rank is not None else 0
cursor = (f for f in client.zrange(set_key, start,
start + limit - 1))
marker_next = {}
def normalizer(pools):
marker_next['next'] = pools['pl']
return self._normalize(pools, detailed=detailed)
yield utils.PoolsListCursor(self._client, cursor, normalizer)
yield marker_next and marker_next['next']
@utils.raises_conn_error
@utils.retries_on_connection_error
def _get(self, name, detailed=False):
pool_key = utils.pools_name_hash_key(name)
pool = self._client.hgetall(pool_key)
if pool is None or len(pool) == 0:
raise errors.PoolDoesNotExist(name)
return self._normalize(pool, detailed)
@utils.raises_conn_error
@utils.retries_on_connection_error
def _get_pools_by_flavor(self, flavor=None, detailed=False):
cursor = None
if flavor is None or flavor.get('name') is None:
set_key = utils.pools_set_key()
cursor = (pl for pl in self._client.zrange(set_key, 0, -1))
elif flavor.get('name') is not None:
subset_key = utils.pools_subset_key(flavor['name'])
cursor = (pl for pl in self._client.zrange(subset_key, 0, -1))
if cursor is None:
return []
normalizer = functools.partial(self._normalize, detailed=detailed)
return utils.PoolsListCursor(self._client, cursor, normalizer)
@utils.raises_conn_error
@utils.retries_on_connection_error
def _create(self, name, weight, uri, group=None, flavor=None,
options=None):
if group is not None:
raise errors.PoolRedisNotSupportGroup
flavor = flavor if flavor is not None else None
options = {} if options is None else options
pool_key = utils.pools_name_hash_key(name)
subset_key = utils.pools_subset_key(flavor)
set_key = utils.pools_set_key()
if self._exists(name):
self._update(name, weight=weight, uri=uri,
flavor=flavor, options=options)
return
pool = {
'pl': name,
'u': uri,
'w': weight,
'o': self._packer(options),
'f': flavor
}
# Pipeline ensures atomic inserts.
with self._client.pipeline() as pipe:
pipe.zadd(set_key, 1, pool_key)
if flavor is not None:
pipe.zadd(subset_key, 1, pool_key)
pipe.hmset(pool_key, pool)
pipe.execute()
@utils.raises_conn_error
@utils.retries_on_connection_error
def _exists(self, name):
pool_key = utils.pools_name_hash_key(name)
return self._client.exists(pool_key)
@utils.raises_conn_error
@utils.retries_on_connection_error
def _update(self, name, **kwargs):
names = ('uri', 'weight', 'flavor', 'options')
fields = common_utils.fields(kwargs, names,
pred=lambda x: x is not None,
key_transform=lambda x: x[0])
assert fields, ('`weight`, `uri`, `flavor`, '
'or `options` not found in kwargs')
if 'o' in fields:
new_options = fields.get('o', None)
fields['o'] = self._packer(new_options)
pool_key = utils.pools_name_hash_key(name)
# (gengchc2): Pipeline ensures atomic inserts.
with self._client.pipeline() as pipe:
# (gengchc2): If flavor is changed, we need to change.pool key
# in pools subset.
if 'f' in fields:
flavor_old = self._get(name).get('flavor')
flavor_new = fields['f']
if flavor_old != flavor_new:
if flavor_new is not None:
new_subset_key = utils.pools_subset_key(flavor_new)
pipe.zadd(new_subset_key, 1, pool_key)
# (gengchc2) remove pool from flavor_old.pools subset
if flavor_old is not None:
old_subset_key = utils.pools_subset_key(flavor_old)
pipe.zrem(old_subset_key, pool_key)
pipe.hmset(pool_key, fields)
pipe.execute()
@utils.raises_conn_error
@utils.retries_on_connection_error
def _delete(self, name):
try:
pool = self.get(name)
flavor = pool.get("flavor", None)
# NOTE(gengchc2): If this is the only pool in the
# flavor and it's being used by a flavor, don't allow
# it to be deleted.
if flavor is not None:
flavor1 = {}
flavor1['name'] = flavor
pools_in_flavor = list(self.get_pools_by_flavor(
flavor=flavor1))
if self.flavor_ctl.exists(flavor)\
and len(pools_in_flavor) == 1:
raise errors.PoolInUseByFlavor(name, flavor)
pool_key = utils.pools_name_hash_key(name)
subset_key = utils.pools_subset_key(flavor)
set_key = utils.pools_set_key()
with self._client.pipeline() as pipe:
if flavor is not None:
pipe.zrem(subset_key, pool_key)
pipe.zrem(set_key, pool_key)
pipe.delete(pool_key)
pipe.execute()
except errors.PoolDoesNotExist:
pass
@utils.raises_conn_error
@utils.retries_on_connection_error
def _drop_all(self):
poolsobj_key = self._client.keys(pattern='*pools')
if len(poolsobj_key) == 0:
return
with self._client.pipeline() as pipe:
for key in poolsobj_key:
pipe.delete(key)
try:
pipe.execute()
except redis.exceptions.ResponseError:
return False
def _normalize(self, pool, detailed=False):
ret = {
'name': pool['pl'],
'uri': pool['u'],
'weight': int(pool['w']),
'flavor': pool['f']
}
if detailed:
ret['options'] = self._unpacker(pool['o'])
return ret

View File

@ -28,6 +28,8 @@ from zaqar.storage import errors
LOG = logging.getLogger(__name__)
MESSAGE_IDS_SUFFIX = 'messages'
SUBSCRIPTION_IDS_SUFFIX = 'subscriptions'
FLAVORS_IDS_SUFFIX = 'flavors'
POOLS_IDS_SUFFIX = 'pools'
def descope_queue_name(scoped_name):
@ -275,3 +277,145 @@ class SubscriptionListCursor(object):
def __next__(self):
return self.next()
def scope_flavors_ids_set(flavors_suffix=''):
"""Scope flavors set with '.'
Returns a scoped name for the list of flavors in the form
suffix
"""
return flavors_suffix
def scope_project_flavors_ids_set(project=None,
flavors_suffix=''):
"""Scope flavors set with '.'
Returns a scoped name for the list of flavors in the form
project-id_suffix
"""
return (normalize_none_str(project) + '.' + flavors_suffix)
def scope_name_flavors_ids_set(name=None,
flavors_suffix=''):
"""Scope flavors set with '.'
Returns a scoped name for the list of flavors in the form
flavors_name_suffix
"""
return (normalize_none_str(name) + '.' + flavors_suffix)
def flavor_set_key():
return scope_flavors_ids_set(FLAVORS_IDS_SUFFIX)
def flavor_project_subset_key(project=None):
return scope_project_flavors_ids_set(project,
FLAVORS_IDS_SUFFIX)
def flavor_name_hash_key(name=None):
return scope_name_flavors_ids_set(name,
FLAVORS_IDS_SUFFIX)
class FlavorListCursor(object):
def __init__(self, client, flavors, denormalizer):
self.flavor_iter = flavors
self.denormalizer = denormalizer
self.client = client
def __iter__(self):
return self
@raises_conn_error
def next(self):
curr = next(self.flavor_iter)
flavor = self.client.hmget(curr, ['f', 'p', 'c'])
flavor_dict = {}
flavor_dict['f'] = flavor[0]
flavor_dict['p'] = flavor[1]
flavor_dict['c'] = flavor[2]
return self.denormalizer(flavor_dict)
def __next__(self):
return self.next()
def scope_pools_ids_set(pools_suffix=''):
"""Scope pools set with '.'
Returns a scoped name for the list of pools in the form
suffix
"""
return pools_suffix
def scope_flavor_pools_ids_set(flavor=None,
pools_suffix=''):
"""Scope pools set with '.'
Returns a scoped name for the list of pools in the form
project-id_suffix
"""
return (normalize_none_str(flavor) + '.' +
pools_suffix)
def scope_name_pools_ids_set(name=None,
pools_suffix=''):
"""Scope pools set with '.'
Returns a scoped name for the list of pools in the form
pools_name_suffix
"""
return (normalize_none_str(name) + '.' +
pools_suffix)
def pools_set_key():
return scope_pools_ids_set(POOLS_IDS_SUFFIX)
def pools_subset_key(flavor=None):
return scope_flavor_pools_ids_set(flavor,
POOLS_IDS_SUFFIX)
def pools_name_hash_key(name=None):
return scope_name_pools_ids_set(name,
POOLS_IDS_SUFFIX)
class PoolsListCursor(object):
def __init__(self, client, pools, denormalizer):
self.pools_iter = pools
self.denormalizer = denormalizer
self.client = client
def __iter__(self):
return self
@raises_conn_error
def next(self):
curr = next(self.pools_iter)
pools = self.client.hmget(curr, ['pl', 'u', 'w', 'f', 'o'])
pool_dict = {}
pool_dict['pl'] = pools[0]
pool_dict['u'] = pools[1]
pool_dict['w'] = pools[2]
pool_dict['f'] = pools[3]
pool_dict['o'] = pools[4]
return self.denormalizer(pool_dict)
def __next__(self):
return self.next()

View File

@ -24,7 +24,7 @@ import redis
from zaqar.common import cache as oslo_cache
from zaqar.common import errors
from zaqar import storage
from zaqar.storage import mongodb
from zaqar.storage import pooling
from zaqar.storage.redis import controllers
from zaqar.storage.redis import driver
from zaqar.storage.redis import messages
@ -288,7 +288,7 @@ class RedisQueuesTest(base.QueueControllerTest):
driver_class = driver.DataDriver
config_file = 'wsgi_redis.conf'
controller_class = controllers.QueueController
control_driver_class = mongodb.ControlDriver
control_driver_class = driver.ControlDriver
def setUp(self):
super(RedisQueuesTest, self).setUp()
@ -305,7 +305,7 @@ class RedisMessagesTest(base.MessageControllerTest):
driver_class = driver.DataDriver
config_file = 'wsgi_redis.conf'
controller_class = controllers.MessageController
control_driver_class = mongodb.ControlDriver
control_driver_class = driver.ControlDriver
gc_interval = 1
def setUp(self):
@ -372,7 +372,7 @@ class RedisClaimsTest(base.ClaimControllerTest):
driver_class = driver.DataDriver
config_file = 'wsgi_redis.conf'
controller_class = controllers.ClaimController
control_driver_class = mongodb.ControlDriver
control_driver_class = driver.ControlDriver
def setUp(self):
super(RedisClaimsTest, self).setUp()
@ -473,3 +473,104 @@ class RedisSubscriptionTests(base.SubscriptionControllerTest):
config_file = 'wsgi_redis.conf'
controller_class = controllers.SubscriptionController
control_driver_class = driver.ControlDriver
@testing.requires_redis
class RedisPoolsTests(base.PoolsControllerTest):
config_file = 'wsgi_redis.conf'
driver_class = driver.ControlDriver
controller_class = controllers.PoolsController
control_driver_class = driver.ControlDriver
def setUp(self):
super(RedisPoolsTests, self).setUp()
self.pools_controller = self.driver.pools_controller
# Let's create one pool
self.pool = str(uuid.uuid1())
self.pools_controller.create(self.pool, 100, 'localhost', options={})
self.pool1 = str(uuid.uuid1())
self.flavor = str(uuid.uuid1())
self.flavors_controller.create(self.flavor,
project=self.project,
capabilities={})
self.pools_controller.create(self.pool1, 100, 'localhost1',
flavor=self.flavor, options={})
self.flavors_controller = self.driver.flavors_controller
def tearDown(self):
self.pools_controller.drop_all()
super(RedisPoolsTests, self).tearDown()
def test_delete_pool_used_by_flavor(self):
with testing.expect(storage.errors.PoolInUseByFlavor):
self.pools_controller.delete(self.pool1)
def test_mismatching_capabilities_fifo(self):
# NOTE(gengchc2): The fifo function is not implemented
# in redis, we skip it.
self.skip("The fifo function is not implemented")
def test_mismatching_capabilities1(self):
# NOTE(gengchc2): This test is used for testing mismatchming
# capabilities in pool with flavor
with testing.expect(storage.errors.PoolCapabilitiesMismatch):
self.pools_controller.create(str(uuid.uuid1()),
100, 'mongodb://localhost',
flavor=self.flavor,
options={})
@testing.requires_redis
class RedisCatalogueTests(base.CatalogueControllerTest):
driver_class = driver.ControlDriver
controller_class = controllers.CatalogueController
control_driver_class = driver.ControlDriver
config_file = 'wsgi_redis.conf'
def setUp(self):
super(RedisCatalogueTests, self).setUp()
self.addCleanup(self.controller.drop_all)
@testing.requires_redis
class PooledMessageTests(base.MessageControllerTest):
config_file = 'wsgi_redis_pooled.conf'
controller_class = pooling.MessageController
driver_class = pooling.DataDriver
control_driver_class = driver.ControlDriver
controller_base_class = storage.Message
# NOTE(kgriffs): Redis's TTL scavenger only runs once a minute
gc_interval = 60
@testing.requires_redis
class PooledClaimsTests(base.ClaimControllerTest):
config_file = 'wsgi_redis_pooled.conf'
controller_class = pooling.ClaimController
driver_class = pooling.DataDriver
control_driver_class = driver.ControlDriver
controller_base_class = storage.Claim
def setUp(self):
super(PooledClaimsTests, self).setUp()
self.connection = self.controller._pool_catalog.lookup(
self.queue_name, self.project)._storage.\
claim_controller.driver.connection
def tearDown(self):
super(PooledClaimsTests, self).tearDown()
self.connection.flushdb()
# NOTE(gengchc2): Unittest for new flavor configure scenario.
@testing.requires_redis
class RedisFlavorsTest1(base.FlavorsControllerTest1):
driver_class = driver.ControlDriver
controller_class = controllers.FlavorsController
control_driver_class = driver.ControlDriver
config_file = 'wsgi_redis.conf'
def setUp(self):
super(RedisFlavorsTest1, self).setUp()
self.addCleanup(self.controller.drop_all)