Add cluster table and related methods

This patch adds a new table called clusters with its ORM representation
class -Cluster-, and related DB methods.

It also updates DB tables for resources from Cinder Volume nodes that
are addressed by host to include a reference to the cluster
(cluster_name) and related DB methods.

This is part of the effort to support HA A-A in c-vol nodes.

Specs: https://review.openstack.org/327283
Change-Id: I10653d4a5fe4cb3fd1f8ccf1224938451753907e
Implements: blueprint cinder-volume-active-active-support
This commit is contained in:
Gorka Eguileor 2016-05-23 14:12:19 +02:00
parent b03f539e6e
commit 57ea6967bf
12 changed files with 1070 additions and 48 deletions

View File

@ -99,27 +99,33 @@ def service_destroy(context, service_id):
return IMPL.service_destroy(context, service_id)
def service_get(context, service_id=None, **filters):
def service_get(context, service_id=None, backend_match_level=None, **filters):
"""Get a service that matches the criteria.
A possible filter is is_up=True and it will filter nodes that are down.
:param service_id: Id of the service.
:param filters: Filters for the query in the form of key/value.
:param backend_match_level: 'pool', 'backend', or 'host' for host and
cluster filters (as defined in _filter_host
method)
:raise ServiceNotFound: If service doesn't exist.
"""
return IMPL.service_get(context, service_id, **filters)
return IMPL.service_get(context, service_id, backend_match_level,
**filters)
def service_get_all(context, **filters):
def service_get_all(context, backend_match_level=None, **filters):
"""Get all services that match the criteria.
A possible filter is is_up=True and it will filter nodes that are down.
:param filters: Filters for the query in the form of key/value arguments.
:param backend_match_level: 'pool', 'backend', or 'host' for host and
cluster filters (as defined in _filter_host
method)
"""
return IMPL.service_get_all(context, **filters)
return IMPL.service_get_all(context, backend_match_level, **filters)
def service_create(context, values):
@ -138,6 +144,70 @@ def service_update(context, service_id, values):
###############
def cluster_get(context, id=None, is_up=None, get_services=False,
services_summary=False, read_deleted='no',
name_match_level=None, **filters):
"""Get a cluster that matches the criteria.
:param id: Id of the cluster.
:param is_up: Boolean value to filter based on the cluster's up status.
:param get_services: If we want to load all services from this cluster.
:param services_summary: If we want to load num_hosts and
num_down_hosts fields.
:param read_deleted: Filtering based on delete status. Default value is
"no".
:param name_match_level: 'pool', 'backend', or 'host' for name filter (as
defined in _filter_host method)
:param filters: Field based filters in the form of key/value.
:raise ClusterNotFound: If cluster doesn't exist.
"""
return IMPL.cluster_get(context, id, is_up, get_services, services_summary,
read_deleted, name_match_level, **filters)
def cluster_get_all(context, is_up=None, get_services=False,
services_summary=False, read_deleted='no',
name_match_level=None, **filters):
"""Get all clusters that match the criteria.
:param is_up: Boolean value to filter based on the cluster's up status.
:param get_services: If we want to load all services from this cluster.
:param services_summary: If we want to load num_hosts and
num_down_hosts fields.
:param read_deleted: Filtering based on delete status. Default value is
"no".
:param name_match_level: 'pool', 'backend', or 'host' for name filter (as
defined in _filter_host method)
:param filters: Field based filters in the form of key/value.
"""
return IMPL.cluster_get_all(context, is_up, get_services, services_summary,
read_deleted, name_match_level, **filters)
def cluster_create(context, values):
"""Create a cluster from the values dictionary."""
return IMPL.cluster_create(context, values)
def cluster_update(context, id, values):
"""Set the given properties on an cluster and update it.
Raises ClusterNotFound if cluster does not exist.
"""
return IMPL.cluster_update(context, id, values)
def cluster_destroy(context, id):
"""Destroy the cluster or raise if it does not exist or has hosts.
:raise ClusterNotFound: If cluster doesn't exist.
"""
return IMPL.cluster_destroy(context, id)
###############
def volume_attach(context, values):
"""Attach a volume."""
return IMPL.volume_attach(context, values)
@ -220,6 +290,25 @@ def volume_update(context, volume_id, values):
return IMPL.volume_update(context, volume_id, values)
def volume_include_in_cluster(context, cluster, partial_rename=True,
**filters):
"""Include all volumes matching the filters into a cluster.
When partial_rename is set we will not set the cluster_name with cluster
parameter value directly, we'll replace provided cluster_name or host
filter value with cluster instead.
This is useful when we want to replace just the cluster name but leave
the backend and pool information as it is. If we are using cluster_name
to filter, we'll use that same DB field to replace the cluster value and
leave the rest as it is. Likewise if we use the host to filter.
Returns the number of volumes that have been changed.
"""
return IMPL.volume_include_in_cluster(context, cluster, partial_rename,
**filters)
def volume_attachment_update(context, attachment_id, values):
return IMPL.volume_attachment_update(context, attachment_id, values)
@ -1046,6 +1135,26 @@ def cg_creating_from_src(cg_id=None, cgsnapshot_id=None):
return IMPL.cg_creating_from_src(cg_id, cgsnapshot_id)
def consistencygroup_include_in_cluster(context, cluster, partial_rename=True,
**filters):
"""Include all consistency groups matching the filters into a cluster.
When partial_rename is set we will not set the cluster_name with cluster
parameter value directly, we'll replace provided cluster_name or host
filter value with cluster instead.
This is useful when we want to replace just the cluster name but leave
the backend and pool information as it is. If we are using cluster_name
to filter, we'll use that same DB field to replace the cluster value and
leave the rest as it is. Likewise if we use the host to filter.
Returns the number of consistency groups that have been changed.
"""
return IMPL.consistencygroup_include_in_cluster(context, cluster,
partial_rename,
**filters)
###################

View File

@ -43,7 +43,7 @@ import sqlalchemy
from sqlalchemy import MetaData
from sqlalchemy import or_, and_, case
from sqlalchemy.orm import aliased
from sqlalchemy.orm import joinedload, joinedload_all
from sqlalchemy.orm import joinedload, joinedload_all, undefer_group
from sqlalchemy.orm import RelationshipProperty
from sqlalchemy.schema import Table
from sqlalchemy import sql
@ -61,6 +61,7 @@ from cinder.db.sqlalchemy import models
from cinder import exception
from cinder.i18n import _, _LW, _LE, _LI
from cinder.objects import fields
from cinder import utils
CONF = cfg.CONF
@ -366,8 +367,54 @@ def _clean_filters(filters):
return {k: v for k, v in filters.items() if v is not None}
def _filter_host(field, value, match_level=None):
"""Generate a filter condition for host and cluster fields.
Levels are:
- 'pool': Will search for an exact match
- 'backend': Will search for exact match and value#*
- 'host'; Will search for exact match, value@* and value#*
If no level is provided we'll determine it based on the value we want to
match:
- 'pool': If '#' is present in value
- 'backend': If '@' is present in value and '#' is not present
- 'host': In any other case
:param field: ORM field. Ex: objects.Volume.model.host
:param value: String to compare with
:param match_level: 'pool', 'backend', or 'host'
"""
# If we don't set level we'll try to determine it automatically. LIKE
# operations are expensive, so we try to reduce them to the minimum.
if match_level is None:
if '#' in value:
match_level = 'pool'
elif '@' in value:
match_level = 'backend'
else:
match_level = 'host'
# Mysql is not doing case sensitive filtering, so we force it
if CONF.database.connection.startswith('mysql:'):
cmp_value = func.binary(value)
like_op = 'LIKE_BINARY'
else:
cmp_value = value
like_op = 'LIKE'
conditions = [field == cmp_value]
if match_level != 'pool':
conditions.append(field.op(like_op)(value + '#%'))
if match_level == 'host':
conditions.append(field.op(like_op)(value + '@%'))
return or_(*conditions)
def _service_query(context, session=None, read_deleted='no', host=None,
is_up=None, **filters):
cluster_name=None, is_up=None, backend_match_level=None,
**filters):
filters = _clean_filters(filters)
if filters and not is_valid_model_filters(models.Service, filters):
return None
@ -375,26 +422,21 @@ def _service_query(context, session=None, read_deleted='no', host=None,
query = model_query(context, models.Service, session=session,
read_deleted=read_deleted)
# Host is a particular case of filter, because we must retrieve not only
# exact matches (single backend configuration), but also match hosts that
# have the backend defined (multi backend configuration).
# Host and cluster are particular cases of filters, because we must
# retrieve not only exact matches (single backend configuration), but also
# match those that have the backend defined (multi backend configuration).
if host:
host_attr = models.Service.host
# Mysql is not doing case sensitive filtering, so we force it
if CONF.database.connection.startswith('mysql:'):
conditions = or_(host_attr == func.binary(host),
host_attr.op('LIKE BINARY')(host + '@%'))
else:
conditions = or_(host_attr == host,
host_attr.op('LIKE')(host + '@%'))
query = query.filter(conditions)
query = query.filter(_filter_host(models.Service.host, host,
backend_match_level))
if cluster_name:
query = query.filter(_filter_host(models.Service.cluster_name,
cluster_name, backend_match_level))
if filters:
query = query.filter_by(**filters)
if is_up is not None:
date_limit = (timeutils.utcnow() -
dt.timedelta(seconds=CONF.service_down_time))
date_limit = utils.service_expired_time()
svc = models.Service
filter_ = or_(
and_(svc.created_at.isnot(None), svc.created_at >= date_limit),
@ -414,16 +456,20 @@ def service_destroy(context, service_id):
@require_admin_context
def service_get(context, service_id=None, **filters):
def service_get(context, service_id=None, backend_match_level=None, **filters):
"""Get a service that matches the criteria.
A possible filter is is_up=True and it will filter nodes that are down.
:param service_id: Id of the service.
:param filters: Filters for the query in the form of key/value.
:param backend_match_level: 'pool', 'backend', or 'host' for host and
cluster filters (as defined in _filter_host
method)
:raise ServiceNotFound: If service doesn't exist.
"""
query = _service_query(context, id=service_id, **filters)
query = _service_query(context, backend_match_level=backend_match_level,
id=service_id, **filters)
service = None if not query else query.first()
if not service:
serv_id = service_id or filters.get('topic') or filters.get('binary')
@ -433,14 +479,18 @@ def service_get(context, service_id=None, **filters):
@require_admin_context
def service_get_all(context, **filters):
def service_get_all(context, backend_match_level=None, **filters):
"""Get all services that match the criteria.
A possible filter is is_up=True and it will filter nodes that are down.
:param filters: Filters for the query in the form of key/value.
:param backend_match_level: 'pool', 'backend', or 'host' for host and
cluster filters (as defined in _filter_host
method)
"""
query = _service_query(context, **filters)
query = _service_query(context, backend_match_level=backend_match_level,
**filters)
return [] if not query else query.all()
@ -471,6 +521,149 @@ def service_update(context, service_id, values):
raise exception.ServiceNotFound(service_id=service_id)
###################
def _cluster_query(context, is_up=None, get_services=False,
services_summary=False, read_deleted='no',
name_match_level=None, name=None, session=None, **filters):
filters = _clean_filters(filters)
if filters and not is_valid_model_filters(models.Cluster, filters):
return None
query = model_query(context, models.Cluster, session=session,
read_deleted=read_deleted)
# Cluster is a special case of filter, because we must match exact match
# as well as hosts that specify the backend
if name:
query = query.filter(_filter_host(models.Cluster.name, name,
name_match_level))
if filters:
query = query.filter_by(**filters)
if services_summary:
query = query.options(undefer_group('services_summary'))
# We bind the expiration time to now (as it changes with each query)
# and is required by num_down_hosts
query = query.params(expired=utils.service_expired_time())
elif 'num_down_hosts' in filters:
query = query.params(expired=utils.service_expired_time())
if get_services:
query = query.options(joinedload_all('services'))
if is_up is not None:
date_limit = utils.service_expired_time()
filter_ = and_(models.Cluster.last_heartbeat.isnot(None),
models.Cluster.last_heartbeat >= date_limit)
query = query.filter(filter_ == is_up)
return query
@require_admin_context
def cluster_get(context, id=None, is_up=None, get_services=False,
services_summary=False, read_deleted='no',
name_match_level=None, **filters):
"""Get a cluster that matches the criteria.
:param id: Id of the cluster.
:param is_up: Boolean value to filter based on the cluster's up status.
:param get_services: If we want to load all services from this cluster.
:param services_summary: If we want to load num_hosts and
num_down_hosts fields.
:param read_deleted: Filtering based on delete status. Default value is
"no".
:param filters: Field based filters in the form of key/value.
:param name_match_level: 'pool', 'backend', or 'host' for name filter (as
defined in _filter_host method)
:raise ClusterNotFound: If cluster doesn't exist.
"""
query = _cluster_query(context, is_up, get_services, services_summary,
read_deleted, name_match_level, id=id, **filters)
cluster = None if not query else query.first()
if not cluster:
cluster_id = id or six.text_type(filters)
raise exception.ClusterNotFound(id=cluster_id)
return cluster
@require_admin_context
def cluster_get_all(context, is_up=None, get_services=False,
services_summary=False, read_deleted='no',
name_match_level=None, **filters):
"""Get all clusters that match the criteria.
:param is_up: Boolean value to filter based on the cluster's up status.
:param get_services: If we want to load all services from this cluster.
:param services_summary: If we want to load num_hosts and
num_down_hosts fields.
:param read_deleted: Filtering based on delete status. Default value is
"no".
:param name_match_level: 'pool', 'backend', or 'host' for name filter (as
defined in _filter_host method)
:param filters: Field based filters in the form of key/value.
"""
query = _cluster_query(context, is_up, get_services, services_summary,
read_deleted, name_match_level, **filters)
return [] if not query else query.all()
@require_admin_context
def cluster_create(context, values):
"""Create a cluster from the values dictionary."""
cluster_ref = models.Cluster()
cluster_ref.update(values)
# Provided disabled value takes precedence
if values.get('disabled') is None:
cluster_ref.disabled = not CONF.enable_new_services
session = get_session()
try:
with session.begin():
cluster_ref.save(session)
# We mark that newly created cluster has no hosts to prevent
# problems at the OVO level
cluster_ref.last_heartbeat = None
return cluster_ref
# If we had a race condition (another non deleted cluster exists with the
# same name) raise Duplicate exception.
except db_exc.DBDuplicateEntry:
raise exception.ClusterExists(name=values.get('name'))
@require_admin_context
@_retry_on_deadlock
def cluster_update(context, id, values):
"""Set the given properties on an cluster and update it.
Raises ClusterNotFound if cluster does not exist.
"""
query = _cluster_query(context, id=id)
result = query.update(values)
if not result:
raise exception.ClusterNotFound(id=id)
@require_admin_context
def cluster_destroy(context, id):
"""Destroy the cluster or raise if it does not exist or has hosts."""
query = _cluster_query(context, id=id)
query = query.filter(models.Cluster.num_hosts == 0)
# If the update doesn't succeed we don't know if it's because the
# cluster doesn't exist or because it has hosts.
result = query.update(models.Cluster.delete_values(),
synchronize_session=False)
if not result:
# This will fail if the cluster doesn't exist raising the right
# exception
cluster_get(context, id=id)
# If it doesn't fail, then the problem is that there are hosts
raise exception.ClusterHasHosts(id=id)
###################
@ -1316,6 +1509,51 @@ def volume_destroy(context, volume_id):
return updated_values
def _include_in_cluster(context, cluster, model, partial_rename, filters):
"""Generic include in cluster method.
When we include resources in a cluster we have to be careful to preserve
the addressing sections that have not been provided. That's why we allow
partial_renaming, so we can preserve the backend and pool if we are only
providing host/cluster level information, and preserve pool information if
we only provide backend level information.
For example when we include a host in a cluster we receive calls with
filters like {'host': 'localhost@lvmdriver-1'} and cluster with something
like 'mycluster@lvmdriver-1'. Since in the DB the resources will have the
host field set to something like 'localhost@lvmdriver-1#lvmdriver-1' we
want to include original pool in the new cluster_name. So we want to store
in cluster_name value 'mycluster@lvmdriver-1#lvmdriver-1'.
"""
filters = _clean_filters(filters)
if filters and not is_valid_model_filters(model, filters):
return None
query = model_query(context, model)
# cluster_name and host are special filter cases
for field in {'cluster_name', 'host'}.intersection(filters):
value = filters.pop(field)
# We do a special backend filter
query = query.filter(_filter_host(getattr(model, field), value))
# If we want do do a partial rename and we haven't set the cluster
# already, the value we want to set is a SQL replace of existing field
# value.
if partial_rename and isinstance(cluster, six.string_types):
cluster = func.replace(getattr(model, field), value, cluster)
query = query.filter_by(**filters)
result = query.update({'cluster_name': cluster}, synchronize_session=False)
return result
@require_admin_context
def volume_include_in_cluster(context, cluster, partial_rename=True,
**filters):
"""Include all volumes matching the filters into a cluster."""
return _include_in_cluster(context, cluster, models.Volume,
partial_rename, filters)
@require_admin_context
def volume_detached(context, volume_id, attachment_id):
"""This updates a volume attachment and marks it as detached.
@ -4304,6 +4542,14 @@ def cg_creating_from_src(cg_id=None, cgsnapshot_id=None):
return sql.exists([subq]).where(match_id)
@require_admin_context
def consistencygroup_include_in_cluster(context, cluster,
partial_rename=True, **filters):
"""Include all consistency groups matching the filters into a cluster."""
return _include_in_cluster(context, cluster, models.ConsistencyGroup,
partial_rename, filters)
###############################
@ -4487,7 +4733,7 @@ def purge_deleted_rows(context, age_in_days):
# Reorder the list so the volumes and volume_types tables are last
# to avoid FK constraints
for table in ("volume_types", "snapshots", "volumes"):
for table in ("volume_types", "snapshots", "volumes", "clusters"):
tables.remove(table)
tables.append(table)

View File

@ -0,0 +1,58 @@
# Copyright (c) 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Boolean, Column, DateTime, Integer
from sqlalchemy import MetaData, String, Table, UniqueConstraint
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
# New cluster table
cluster = Table(
'clusters', meta,
# Inherited fields from CinderBase
Column('created_at', DateTime(timezone=False)),
Column('updated_at', DateTime(timezone=False)),
Column('deleted_at', DateTime(timezone=False)),
Column('deleted', Boolean(), default=False),
# Cluster specific fields
Column('id', Integer, primary_key=True, nullable=False),
Column('name', String(255), nullable=False),
Column('binary', String(255), nullable=False),
Column('disabled', Boolean(), default=False),
Column('disabled_reason', String(255)),
Column('race_preventer', Integer, nullable=False, default=0),
# To remove potential races on creation we have a constraint set on
# name and race_preventer fields, and we set value on creation to 0, so
# 2 clusters with the same name will fail this constraint. On deletion
# we change this field to the same value as the id which will be unique
# and will not conflict with the creation of another cluster with the
# same name.
UniqueConstraint('name', 'binary', 'race_preventer'),
mysql_engine='InnoDB',
mysql_charset='utf8',
)
cluster.create()
# Add the cluster flag to Service, ConsistencyGroup, and Volume tables.
for table_name in ('services', 'consistencygroups', 'volumes'):
table = Table(table_name, meta, autoload=True)
cluster_name = Column('cluster_name', String(255), nullable=True)
table.create_column(cluster_name)

View File

@ -23,10 +23,12 @@ SQLAlchemy models for cinder data.
from oslo_config import cfg
from oslo_db.sqlalchemy import models
from oslo_utils import timeutils
from sqlalchemy import and_, func, select
from sqlalchemy import bindparam
from sqlalchemy import Column, Integer, String, Text, schema
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import ForeignKey, DateTime, Boolean
from sqlalchemy.orm import relationship, backref, validates
from sqlalchemy import ForeignKey, DateTime, Boolean, UniqueConstraint
from sqlalchemy.orm import backref, column_property, relationship, validates
CONF = cfg.CONF
@ -63,8 +65,13 @@ class Service(BASE, CinderBase):
__tablename__ = 'services'
id = Column(Integer, primary_key=True)
cluster_name = Column(String(255), nullable=True)
host = Column(String(255)) # , ForeignKey('hosts.id'))
binary = Column(String(255))
# We want to overwrite default updated_at definition so we timestamp at
# creation as well, so we only need to check updated_at for the heartbeat
updated_at = Column(DateTime, default=timeutils.utcnow,
onupdate=timeutils.utcnow)
topic = Column(String(255))
report_count = Column(Integer, nullable=False, default=0)
disabled = Column(Boolean, default=False)
@ -87,6 +94,63 @@ class Service(BASE, CinderBase):
active_backend_id = Column(String(255))
frozen = Column(Boolean, nullable=False, default=False)
cluster = relationship('Cluster',
backref='services',
foreign_keys=cluster_name,
primaryjoin='and_('
'Service.cluster_name == Cluster.name,'
'Service.deleted == False)')
class Cluster(BASE, CinderBase):
"""Represents a cluster of hosts."""
__tablename__ = 'clusters'
# To remove potential races on creation we have a constraint set on name
# and race_preventer fields, and we set value on creation to 0, so 2
# clusters with the same name will fail this constraint. On deletion we
# change this field to the same value as the id which will be unique and
# will not conflict with the creation of another cluster with the same
# name.
__table_args__ = (UniqueConstraint('name', 'binary', 'race_preventer'),)
id = Column(Integer, primary_key=True)
# NOTE(geguileo): Name is constructed in the same way that Server.host but
# using cluster configuration option instead of host.
name = Column(String(255), nullable=False)
binary = Column(String(255), nullable=False)
disabled = Column(Boolean, default=False)
disabled_reason = Column(String(255))
race_preventer = Column(Integer, nullable=False, default=0)
# Last heartbeat reported by any of the services of this cluster. This is
# not deferred since we always want to load this field.
last_heartbeat = column_property(
select([func.max(Service.updated_at)]).
where(and_(Service.cluster_name == name, ~Service.deleted)).
correlate_except(Service), deferred=False)
# Number of existing services for this cluster
num_hosts = column_property(
select([func.count(Service.id)]).
where(and_(Service.cluster_name == name, ~Service.deleted)).
correlate_except(Service),
group='services_summary', deferred=True)
# Number of services that are down for this cluster
num_down_hosts = column_property(
select([func.count(Service.id)]).
where(and_(Service.cluster_name == name,
~Service.deleted,
Service.updated_at < bindparam('expired'))).
correlate_except(Service),
group='services_summary', deferred=True)
@staticmethod
def delete_values():
return {'race_preventer': Cluster.id,
'deleted': True,
'deleted_at': timeutils.utcnow()}
class ConsistencyGroup(BASE, CinderBase):
"""Represents a consistencygroup."""
@ -96,6 +160,7 @@ class ConsistencyGroup(BASE, CinderBase):
user_id = Column(String(255), nullable=False)
project_id = Column(String(255), nullable=False)
cluster_name = Column(String(255), nullable=True)
host = Column(String(255))
availability_zone = Column(String(255))
name = Column(String(255))
@ -150,6 +215,7 @@ class Volume(BASE, CinderBase):
snapshot_id = Column(String(36))
cluster_name = Column(String(255), nullable=True)
host = Column(String(255)) # , ForeignKey('hosts.id'))
size = Column(Integer)
availability_zone = Column(String(255)) # TODO(vish): foreign key?
@ -654,7 +720,8 @@ def register_models():
VolumeTypes,
VolumeGlanceMetadata,
ConsistencyGroup,
Cgsnapshot
Cgsnapshot,
Cluster,
)
engine = create_engine(CONF.database.connection, echo=False)
for model in models:

View File

@ -284,6 +284,10 @@ class RPCTimeout(CinderException):
code = 502
class Duplicate(CinderException):
pass
class NotFound(CinderException):
message = _("Resource could not be found.")
code = 404
@ -394,6 +398,18 @@ class ServiceTooOld(Invalid):
message = _("Service is too old to fulfil this request.")
class ClusterNotFound(NotFound):
message = _('Cluster %(id)s could not be found.')
class ClusterHasHosts(Invalid):
message = _("Cluster %(id)s still has hosts.")
class ClusterExists(Duplicate):
message = _("Cluster %(name)s already exists.")
class HostNotFound(NotFound):
message = _("Host %(host)s could not be found.")
@ -452,10 +468,6 @@ class FileNotFound(NotFound):
message = _("File %(file_path)s could not be found.")
class Duplicate(CinderException):
pass
class VolumeTypeExists(Duplicate):
message = _("Volume Type %(id)s already exists.")

View File

@ -0,0 +1,298 @@
# Copyright (c) 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for cluster table related operations."""
import datetime
import mock
from oslo_config import cfg
from oslo_utils import timeutils
from sqlalchemy.orm import exc
from cinder import db
from cinder import exception
from cinder.tests.unit import test_db_api
CONF = cfg.CONF
class ClusterTestCase(test_db_api.BaseTest):
"""Unit tests for cinder.db.api.cluster_*."""
def _default_cluster_values(self):
return {
'name': 'cluster_name',
'binary': 'cinder-volume',
'disabled': False,
'disabled_reason': None,
'deleted': False,
'updated_at': None,
'deleted_at': None,
}
def _create_cluster(self, **values):
create_values = self._default_cluster_values()
create_values.update(values)
cluster = db.cluster_create(self.ctxt, create_values)
return db.cluster_get(self.ctxt, cluster.id, services_summary=True)
def _create_populated_cluster(self, num_services, num_down_svcs=0,
**values):
"""Helper method that creates a cluster with up and down services."""
up_time = timeutils.utcnow()
down_time = (up_time -
datetime.timedelta(seconds=CONF.service_down_time + 1))
cluster = self._create_cluster(**values)
svcs = [
db.service_create(
self.ctxt,
{'cluster_name': cluster.name,
'updated_at': down_time if i < num_down_svcs else up_time})
for i in range(num_services)
]
return cluster, svcs
def test_cluster_create_and_get(self):
"""Basic cluster creation test."""
values = self._default_cluster_values()
cluster = db.cluster_create(self.ctxt, values)
values['last_heartbeat'] = None
self.assertEqual(0, cluster.race_preventer)
for k, v in values.items():
self.assertEqual(v, getattr(cluster, k))
db_cluster = db.cluster_get(self.ctxt, cluster.id,
services_summary=True)
for k, v in values.items():
self.assertEqual(v, getattr(db_cluster, k))
self.assertEqual(0, db_cluster.race_preventer)
def test_cluster_create_cfg_disabled(self):
"""Test that create uses enable_new_services configuration option."""
self.override_config('enable_new_services', False)
cluster = self._create_cluster(disabled=None)
self.assertTrue(cluster.disabled)
def test_cluster_create_disabled_preference(self):
"""Test that provided disabled value has highest priority on create."""
self.override_config('enable_new_services', False)
cluster = self._create_cluster()
self.assertFalse(cluster.disabled)
def test_cluster_create_duplicate(self):
"""Test that unique constraints are working.
To remove potential races on creation we have a constraint set on name
and race_preventer fields, and we set value on creation to 0, so 2
clusters with the same name will fail this constraint. On deletion we
change this field to the same value as the id which will be unique and
will not conflict with the creation of another cluster with the same
name.
"""
cluster = self._create_cluster()
self.assertRaises(exception.ClusterExists,
self._create_cluster,
name=cluster.name)
def test_cluster_create_not_duplicate(self):
"""Test that unique constraints will work with delete operation.
To remove potential races on creation we have a constraint set on name
and race_preventer fields, and we set value on creation to 0, so 2
clusters with the same name will fail this constraint. On deletion we
change this field to the same value as the id which will be unique and
will not conflict with the creation of another cluster with the same
name.
"""
cluster = self._create_cluster()
self.assertIsNone(db.cluster_destroy(self.ctxt, cluster.id))
self.assertIsNotNone(self._create_cluster(name=cluster.name))
def test_cluster_get_fail(self):
"""Test that cluster get will fail if the cluster doesn't exists."""
self._create_cluster(name='cluster@backend')
self.assertRaises(exception.ClusterNotFound,
db.cluster_get, self.ctxt, 'name=cluster@backend2')
def test_cluster_get_by_name(self):
"""Getting a cluster by name will include backends if not specified."""
cluster = self._create_cluster(name='cluster@backend')
# Get without the backend
db_cluster = db.cluster_get(self.ctxt, name='cluster')
self.assertEqual(cluster.id, db_cluster.id)
# Get with the backend detail
db_cluster = db.cluster_get(self.ctxt, name='cluster@backend')
self.assertEqual(cluster.id, db_cluster.id)
def test_cluster_get_without_summary(self):
"""Test getting cluster without summary information."""
cluster = self._create_cluster()
db_cluster = db.cluster_get(self.ctxt, cluster.id)
self.assertRaises(exc.DetachedInstanceError,
getattr, db_cluster, 'num_hosts')
self.assertRaises(exc.DetachedInstanceError,
getattr, db_cluster, 'num_down_hosts')
self.assertIsNone(db_cluster.last_heartbeat)
def test_cluster_get_with_summary_empty_cluster(self):
"""Test getting empty cluster with summary information."""
cluster = self._create_cluster()
db_cluster = db.cluster_get(self.ctxt, cluster.id,
services_summary=True)
self.assertEqual(0, db_cluster.num_hosts)
self.assertEqual(0, db_cluster.num_down_hosts)
self.assertIsNone(db_cluster.last_heartbeat)
def test_cluster_get_with_summary(self):
"""Test getting cluster with summary information."""
cluster, svcs = self._create_populated_cluster(3, 1)
db_cluster = db.cluster_get(self.ctxt, cluster.id,
services_summary=True)
self.assertEqual(3, db_cluster.num_hosts)
self.assertEqual(1, db_cluster.num_down_hosts)
self.assertEqual(svcs[1].updated_at, db_cluster.last_heartbeat)
def test_cluster_get_is_up_on_empty_cluster(self):
"""Test is_up filter works on empty clusters."""
cluster = self._create_cluster()
db_cluster = db.cluster_get(self.ctxt, cluster.id, is_up=False)
self.assertEqual(cluster.id, db_cluster.id)
self.assertRaises(exception.ClusterNotFound,
db.cluster_get, self.ctxt, cluster.id, is_up=True)
def test_cluster_get_services_on_empty_cluster(self):
"""Test get_services filter works on empty clusters."""
cluster = self._create_cluster()
db_cluster = db.cluster_get(self.ctxt, cluster.id, get_services=True)
self.assertEqual(cluster.id, db_cluster.id)
self.assertListEqual([], db_cluster.services)
def test_cluster_get_services(self):
"""Test services is properly populated on non empty cluster."""
# We create another cluster to see we do the selection correctly
self._create_populated_cluster(2, name='cluster2')
# We create our cluster with 2 up nodes and 1 down
cluster, svcs = self._create_populated_cluster(3, 1)
# Add a deleted service to the cluster
db.service_create(self.ctxt,
{'cluster_name': cluster.name,
'deleted': True})
db_cluster = db.cluster_get(self.ctxt, name=cluster.name,
get_services=True)
self.assertEqual(3, len(db_cluster.services))
self.assertSetEqual({svc.id for svc in svcs},
{svc.id for svc in db_cluster.services})
def test_cluster_get_is_up_all_are_down(self):
"""Test that is_up filter works when all services are down."""
cluster, svcs = self._create_populated_cluster(3, 3)
self.assertRaises(exception.ClusterNotFound,
db.cluster_get, self.ctxt, cluster.id, is_up=True)
db_cluster = db.cluster_get(self.ctxt, name=cluster.name, is_up=False)
self.assertEqual(cluster.id, db_cluster.id)
def test_cluster_get_by_num_down_hosts(self):
"""Test cluster_get by subquery field num_down_hosts."""
cluster, svcs = self._create_populated_cluster(3, 2)
result = db.cluster_get(self.ctxt, num_down_hosts=2)
self.assertEqual(cluster.id, result.id)
def test_cluster_get_by_num_hosts(self):
"""Test cluster_get by subquery field num_hosts."""
cluster, svcs = self._create_populated_cluster(3, 2)
result = db.cluster_get(self.ctxt, num_hosts=3)
self.assertEqual(cluster.id, result.id)
def test_cluster_destroy(self):
"""Test basic cluster destroy."""
cluster = self._create_cluster()
# On creation race_preventer is marked with a 0
self.assertEqual(0, cluster.race_preventer)
db.cluster_destroy(self.ctxt, cluster.id)
db_cluster = db.cluster_get(self.ctxt, cluster.id, read_deleted='yes')
self.assertTrue(db_cluster.deleted)
self.assertIsNotNone(db_cluster.deleted_at)
# On deletion race_preventer is marked with the id
self.assertEqual(cluster.id, db_cluster.race_preventer)
def test_cluster_destroy_non_existent(self):
"""Test destroying non existent cluster."""
self.assertRaises(exception.ClusterNotFound,
db.cluster_destroy, self.ctxt, 0)
def test_cluster_destroy_has_services(self):
"""Test that we cannot delete a cluster with non deleted services."""
cluster, svcs = self._create_populated_cluster(3, 1)
self.assertRaises(exception.ClusterHasHosts,
db.cluster_destroy, self.ctxt, cluster.id)
def test_cluster_update_non_existent(self):
"""Test that we raise an exception on updating non existent cluster."""
self.assertRaises(exception.ClusterNotFound,
db.cluster_update, self.ctxt, 0, {'disabled': True})
def test_cluster_update(self):
"""Test basic cluster update."""
cluster = self._create_cluster()
self.assertFalse(cluster.disabled)
db.cluster_update(self.ctxt, cluster.id, {'disabled': True})
db_cluster = db.cluster_get(self.ctxt, cluster.id)
self.assertTrue(db_cluster.disabled)
def test_cluster_get_all_empty(self):
"""Test basic empty cluster get_all."""
self.assertListEqual([], db.cluster_get_all(self.ctxt))
def test_cluster_get_all_matches(self):
"""Basic test of get_all with a matching filter."""
cluster1, svcs = self._create_populated_cluster(3, 1)
cluster2, svcs = self._create_populated_cluster(3, 2, name='cluster2')
cluster3, svcs = self._create_populated_cluster(3, 3, name='cluster3')
expected = {cluster1.id, cluster2.id}
result = db.cluster_get_all(self.ctxt, is_up=True)
self.assertEqual(len(expected), len(result))
self.assertSetEqual(expected, {cluster.id for cluster in result})
def test_cluster_get_all_no_match(self):
"""Basic test of get_all with a non matching filter."""
cluster1, svcs = self._create_populated_cluster(3, 3)
result = db.cluster_get_all(self.ctxt, is_up=True)
self.assertListEqual([], result)
@mock.patch('cinder.db.sqlalchemy.api._cluster_query')
def test_cluster_get_all_passes_parameters(self, cluster_query_mock):
"""Test that get_all passes all parameters.
Since we have already tested all filters and parameters with
cluster_get method all we have to do for get_all is to check that we
are passing them to the query building method.
"""
args = (mock.sentinel.read_deleted, mock.sentinel.get_services,
mock.sentinel.services_summary, mock.sentinel.is_up,
mock.sentinel.name_match_level)
filters = {'session': mock.sentinel.session,
'name': mock.sentinel.name,
'disabled': mock.sentinel.disabled,
'disabled_reason': mock.sentinel.disabled_reason,
'race_preventer': mock.sentinel.race_preventer,
'last_heartbeat': mock.sentinel.last_heartbeat,
'num_hosts': mock.sentinel.num_hosts,
'num_down_hosts': mock.sentinel.num_down_hosts}
db.cluster_get_all(self.ctxt, *args, **filters)
cluster_query_mock.assert_called_once_with(self.ctxt, *args, **filters)

View File

@ -50,7 +50,9 @@ class AllocatedCapacityWeigherTestCase(test.TestCase):
disabled=disabled)
host_states = self.host_manager.get_all_host_states(ctxt)
_mock_service_get_all.assert_called_once_with(
ctxt, topic=CONF.volume_topic, disabled=disabled)
ctxt,
None, # backend_match_level
topic=CONF.volume_topic, disabled=disabled)
return host_states
def test_default_of_spreading_first(self):

View File

@ -50,7 +50,9 @@ class CapacityWeigherTestCase(test.TestCase):
disabled=disabled)
host_states = self.host_manager.get_all_host_states(ctxt)
_mock_service_get_all.assert_called_once_with(
ctxt, topic=CONF.volume_topic, disabled=disabled)
ctxt,
None, # backend_match_level
topic=CONF.volume_topic, disabled=disabled)
return host_states
# If thin_provisioning_support = False, use the following formula:

View File

@ -76,7 +76,10 @@ class VolumeNumberWeigherTestCase(test.TestCase):
disabled=disabled)
host_states = self.host_manager.get_all_host_states(ctxt)
_mock_service_get_all.assert_called_once_with(
ctxt, topic=CONF.volume_topic, disabled=disabled)
ctxt,
None, # backend_match_level
topic=CONF.volume_topic,
disabled=disabled)
return host_states
def test_volume_number_weight_multiplier1(self):

View File

@ -16,6 +16,7 @@
import datetime
import ddt
import enum
import mock
from oslo_config import cfg
@ -93,12 +94,14 @@ class ModelsObjectComparatorMixin(object):
for key, value in obj1.items():
self.assertEqual(value, obj2[key])
def _assertEqualListsOfObjects(self, objs1, objs2, ignored_keys=None):
def _assertEqualListsOfObjects(self, objs1, objs2, ignored_keys=None,
msg=None):
obj_to_dict = lambda o: self._dict_from_object(o, ignored_keys)
sort_key = lambda d: [d[k] for k in sorted(d)]
conv_and_sort = lambda obj: sorted(map(obj_to_dict, obj), key=sort_key)
self.assertListEqual(conv_and_sort(objs1), conv_and_sort(objs2))
self.assertListEqual(conv_and_sort(objs1), conv_and_sort(objs2),
msg=msg)
def _assertEqualListsOfPrimitivesAsSets(self, primitives1, primitives2):
self.assertEqual(len(primitives1), len(primitives2))
@ -122,6 +125,7 @@ class DBAPIServiceTestCase(BaseTest):
def _get_base_values(self):
return {
'host': 'fake_host',
'cluster_name': None,
'binary': 'fake_binary',
'topic': 'fake_topic',
'report_count': 3,
@ -139,18 +143,21 @@ class DBAPIServiceTestCase(BaseTest):
return service
def test_service_create(self):
service = self._create_service({})
# Add a cluster value to the service
values = {'cluster_name': 'cluster'}
service = self._create_service(values)
self.assertIsNotNone(service['id'])
for key, value in self._get_base_values().items():
expected = self._get_base_values()
expected.update(values)
for key, value in expected.items():
self.assertEqual(value, service[key])
@mock.patch('oslo_utils.timeutils.utcnow', return_value=UTC_NOW)
def test_service_destroy(self, utcnow_mock):
def test_service_destroy(self):
service1 = self._create_service({})
service2 = self._create_service({'host': 'fake_host2'})
self.assertDictEqual(
{'deleted': True, 'deleted_at': UTC_NOW},
{'deleted': True, 'deleted_at': mock.ANY},
db.service_destroy(self.ctxt, service1['id']))
self.assertRaises(exception.ServiceNotFound,
db.service_get, self.ctxt, service1['id'])
@ -181,6 +188,16 @@ class DBAPIServiceTestCase(BaseTest):
real_service1 = db.service_get(self.ctxt, service1['id'])
self._assertEqualObjects(service1, real_service1)
def test_service_get_by_cluster(self):
service = self._create_service({'cluster_name': 'cluster@backend'})
# Search with an exact match
real_service = db.service_get(self.ctxt,
cluster_name='cluster@backend')
self._assertEqualObjects(service, real_service)
# Search without the backend
real_service = db.service_get(self.ctxt, cluster_name='cluster')
self._assertEqualObjects(service, real_service)
def test_service_get_not_found_exception(self):
self.assertRaises(exception.ServiceNotFound,
db.service_get, self.ctxt, 100500)
@ -195,17 +212,19 @@ class DBAPIServiceTestCase(BaseTest):
expired = (datetime.datetime.utcnow()
- datetime.timedelta(seconds=CONF.service_down_time + 1))
values = [
# Now we are updating updated_at at creation as well so this one
# is up.
{'host': 'host1', 'binary': 'b1', 'created_at': expired},
{'host': 'host1@ceph', 'binary': 'b2'},
{'host': 'host2', 'binary': 'b2'},
{'disabled': True, 'created_at': expired, 'updated_at': expired}
{'disabled': True, 'created_at': expired, 'updated_at': expired},
]
services = [self._create_service(vals) for vals in values]
disabled_services = services[-1:]
non_disabled_services = services[:-1]
up_services = services[1:3]
down_services = [services[0], services[3]]
up_services = services[0:3]
down_services = [services[3]]
expected = services[:2]
expected_bin = services[1:3]
compares = [
@ -218,8 +237,9 @@ class DBAPIServiceTestCase(BaseTest):
(up_services, db.service_get_all(self.ctxt, is_up=True)),
(down_services, db.service_get_all(self.ctxt, is_up=False)),
]
for comp in compares:
self._assertEqualListsOfObjects(*comp)
for i, comp in enumerate(compares):
self._assertEqualListsOfObjects(*comp,
msg='Error comparing %s' % i)
def test_service_get_all_by_topic(self):
values = [
@ -258,6 +278,18 @@ class DBAPIServiceTestCase(BaseTest):
service2 = db.service_get(self.ctxt, host='host2', binary='b')
self._assertEqualObjects(services[1], service2)
def test_service_get_all_by_cluster(self):
values = [
{'host': 'host1', 'cluster_name': 'cluster'},
{'host': 'host2', 'cluster_name': 'cluster'},
{'host': 'host3', 'cluster_name': 'cluster@backend'},
{'host': 'host4', 'cluster_name': 'cluster2'},
]
services = [self._create_service(vals) for vals in values]
expected = services[:3]
real = db.service_get_all(self.ctxt, cluster_name='cluster')
self._assertEqualListsOfObjects(expected, real)
def test_service_get_by_args_not_found_exception(self):
self.assertRaises(exception.ServiceNotFound,
db.service_get,
@ -278,6 +310,7 @@ class DBAPIServiceTestCase(BaseTest):
self.assertEqual('binary', binary_op.name)
@ddt.ddt
class DBAPIVolumeTestCase(BaseTest):
"""Unit tests for cinder.db.api.volume_*."""
@ -1211,6 +1244,79 @@ class DBAPIVolumeTestCase(BaseTest):
['desc'], filters=filters)
self._assertEqualListsOfObjects([], volumes)
def _create_volumes_to_test_include_in(self):
"""Helper method for test_volume_include_in_* tests."""
return [
db.volume_create(self.ctxt,
{'host': 'host1@backend1#pool1',
'cluster_name': 'cluster1@backend1#pool1'}),
db.volume_create(self.ctxt,
{'host': 'host1@backend2#pool2',
'cluster_name': 'cluster1@backend2#pool2'}),
db.volume_create(self.ctxt,
{'host': 'host2@backend#poo1',
'cluster_name': 'cluster2@backend#pool'}),
]
@ddt.data('host1@backend1#pool1', 'host1@backend1')
def test_volume_include_in_cluster_by_host(self, host):
"""Basic volume include test filtering by host and with full rename."""
vol = self._create_volumes_to_test_include_in()[0]
cluster_name = 'my_cluster'
result = db.volume_include_in_cluster(self.ctxt, cluster_name,
partial_rename=False,
host=host)
self.assertEqual(1, result)
db_vol = db.volume_get(self.ctxt, vol.id)
self.assertEqual(cluster_name, db_vol.cluster_name)
def test_volume_include_in_cluster_by_host_multiple(self):
"""Partial cluster rename filtering with host level info."""
vols = self._create_volumes_to_test_include_in()[0:2]
host = 'host1'
cluster_name = 'my_cluster'
result = db.volume_include_in_cluster(self.ctxt, cluster_name,
partial_rename=True,
host=host)
self.assertEqual(2, result)
db_vols = [db.volume_get(self.ctxt, vols[0].id),
db.volume_get(self.ctxt, vols[1].id)]
for i in range(2):
self.assertEqual(cluster_name + vols[i].host[len(host):],
db_vols[i].cluster_name)
@ddt.data('cluster1@backend1#pool1', 'cluster1@backend1')
def test_volume_include_in_cluster_by_cluster_name(self, cluster_name):
"""Basic volume include test filtering by cluster with full rename."""
vol = self._create_volumes_to_test_include_in()[0]
new_cluster_name = 'cluster_new@backend1#pool'
result = db.volume_include_in_cluster(self.ctxt, new_cluster_name,
partial_rename=False,
cluster_name=cluster_name)
self.assertEqual(1, result)
db_vol = db.volume_get(self.ctxt, vol.id)
self.assertEqual(new_cluster_name, db_vol.cluster_name)
def test_volume_include_in_cluster_by_cluster_multiple(self):
"""Partial rename filtering with cluster with host level info."""
vols = self._create_volumes_to_test_include_in()[0:2]
cluster_name = 'cluster1'
new_cluster_name = 'my_cluster'
result = db.volume_include_in_cluster(self.ctxt, new_cluster_name,
partial_rename=True,
cluster_name=cluster_name)
self.assertEqual(2, result)
db_vols = [db.volume_get(self.ctxt, vols[0].id),
db.volume_get(self.ctxt, vols[1].id)]
for i in range(2):
self.assertEqual(
new_cluster_name + vols[i].cluster_name[len(cluster_name):],
db_vols[i].cluster_name)
class DBAPISnapshotTestCase(BaseTest):
@ -1444,6 +1550,87 @@ class DBAPISnapshotTestCase(BaseTest):
self.assertEqual(should_be, db.snapshot_metadata_get(self.ctxt, 1))
@ddt.ddt
class DBAPIConsistencygroupTestCase(BaseTest):
def _create_cgs_to_test_include_in(self):
"""Helper method for test_consistencygroup_include_in_* tests."""
return [
db.consistencygroup_create(
self.ctxt, {'host': 'host1@backend1#pool1',
'cluster_name': 'cluster1@backend1#pool1'}),
db.consistencygroup_create(
self.ctxt, {'host': 'host1@backend2#pool2',
'cluster_name': 'cluster1@backend2#pool1'}),
db.consistencygroup_create(
self.ctxt, {'host': 'host2@backend#poo1',
'cluster_name': 'cluster2@backend#pool'}),
]
@ddt.data('host1@backend1#pool1', 'host1@backend1')
def test_consistencygroup_include_in_cluster_by_host(self, host):
"""Basic CG include test filtering by host and with full rename."""
cg = self._create_cgs_to_test_include_in()[0]
cluster_name = 'my_cluster'
result = db.consistencygroup_include_in_cluster(self.ctxt,
cluster_name,
partial_rename=False,
host=host)
self.assertEqual(1, result)
db_cg = db.consistencygroup_get(self.ctxt, cg.id)
self.assertEqual(cluster_name, db_cg.cluster_name)
def test_consistencygroup_include_in_cluster_by_host_multiple(self):
"""Partial cluster rename filtering with host level info."""
cgs = self._create_cgs_to_test_include_in()[0:2]
host = 'host1'
cluster_name = 'my_cluster'
result = db.consistencygroup_include_in_cluster(self.ctxt,
cluster_name,
partial_rename=True,
host=host)
self.assertEqual(2, result)
db_cgs = [db.consistencygroup_get(self.ctxt, cgs[0].id),
db.consistencygroup_get(self.ctxt, cgs[1].id)]
for i in range(2):
self.assertEqual(cluster_name + cgs[i].host[len(host):],
db_cgs[i].cluster_name)
@ddt.data('cluster1@backend1#pool1', 'cluster1@backend1')
def test_consistencygroup_include_in_cluster_by_cluster_name(self,
cluster_name):
"""Basic CG include test filtering by cluster with full rename."""
cg = self._create_cgs_to_test_include_in()[0]
new_cluster_name = 'cluster_new@backend1#pool'
result = db.consistencygroup_include_in_cluster(
self.ctxt, new_cluster_name, partial_rename=False,
cluster_name=cluster_name)
self.assertEqual(1, result)
db_cg = db.consistencygroup_get(self.ctxt, cg.id)
self.assertEqual(new_cluster_name, db_cg.cluster_name)
def test_consistencygroup_include_in_cluster_by_cluster_multiple(self):
"""Partial rename filtering with cluster with host level info."""
cgs = self._create_cgs_to_test_include_in()[0:2]
cluster_name = 'cluster1'
new_cluster_name = 'my_cluster'
result = db.consistencygroup_include_in_cluster(
self.ctxt, new_cluster_name, partial_rename=True,
cluster_name=cluster_name)
self.assertEqual(2, result)
db_cgs = [db.consistencygroup_get(self.ctxt, cgs[0].id),
db.consistencygroup_get(self.ctxt, cgs[1].id)]
for i in range(2):
self.assertEqual(
new_cluster_name + cgs[i].cluster_name[len(cluster_name):],
db_cgs[i].cluster_name)
class DBAPICgsnapshotTestCase(BaseTest):
"""Tests for cinder.db.api.cgsnapshot_*."""

View File

@ -839,6 +839,39 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
self.assertIsInstance(messages.c.resource_type.type,
self.VARCHAR_TYPE)
def _check_075(self, engine, data):
"""Test adding cluster table and cluster_id fields."""
self.assertTrue(engine.dialect.has_table(engine.connect(), 'clusters'))
clusters = db_utils.get_table(engine, 'clusters')
# Inherited fields from CinderBase
self.assertIsInstance(clusters.c.created_at.type,
self.TIME_TYPE)
self.assertIsInstance(clusters.c.updated_at.type,
self.TIME_TYPE)
self.assertIsInstance(clusters.c.deleted_at.type,
self.TIME_TYPE)
self.assertIsInstance(clusters.c.deleted.type,
self.BOOL_TYPE)
# Cluster specific fields
self.assertIsInstance(clusters.c.id.type,
self.INTEGER_TYPE)
self.assertIsInstance(clusters.c.name.type,
self.VARCHAR_TYPE)
self.assertIsInstance(clusters.c.binary.type,
self.VARCHAR_TYPE)
self.assertIsInstance(clusters.c.disabled.type,
self.BOOL_TYPE)
self.assertIsInstance(clusters.c.disabled_reason.type,
self.VARCHAR_TYPE)
# Check that we have added cluster_name field to all required tables
for table_name in ('services', 'consistencygroups', 'volumes'):
table = db_utils.get_table(engine, table_name)
self.assertIsInstance(table.c.cluster_name.type,
self.VARCHAR_TYPE)
def test_walk_versions(self):
self.walk_versions(False, False)

View File

@ -1025,3 +1025,8 @@ def validate_dictionary_string_length(specs):
if value is not None:
check_string_length(value, 'Value for key "%s"' % key,
min_length=0, max_length=255)
def service_expired_time():
return (timeutils.utcnow() -
datetime.timedelta(seconds=CONF.service_down_time))