blueprint host-aggregates

This is the first of a series of commits that add the host-aggregates capability,
as described on the blueprint page.

This commit, more precisely, introduces changes to the Nova model: model classes
related to aggregates have been added, as well as DB API methods to interact with
the model; a sqlalchemy migration script plus a bunch of tests are also part of
this changeset.

Commits that will follow are going to add:

- Extensions to OSAPI Admin, and related python_novaclient mappings
- Implementation of the XenAPI virt layer
- Integration of OSAPI and virt layer, via the compute_api
- smoketests
- openstack-manuals documentation

These commits will be pushed for review not necessarily in this exact order.

Change-Id: Iceb27609dc53bf4305c02d7cbc436fba4c4a7256
This commit is contained in:
Armando Migliaccio
2012-01-13 16:46:37 +00:00
parent 355c631785
commit 17135ed2c6
4 changed files with 412 additions and 0 deletions

View File

@@ -0,0 +1,28 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Possible states for host aggregates.
An aggregate may be 'building', in which case the admin has triggered its
creation, but the underlying hypervisor pool has not actually being created
yet. An aggregate may be 'active', in which case the underlying hypervisor
pool is up and running. An aggregate may be in 'error' in all other cases.
"""
BUILDING = 'building'
ACTIVE = 'active'
ERROR = 'error'

View File

@@ -1741,6 +1741,65 @@ def sm_volume_get_all(context):
####################
def aggregate_create(context, values, metadata=None):
"""Create a new aggregate with metadata."""
return IMPL.aggregate_create(context, values, metadata)
def aggregate_get(context, aggregate_id, read_deleted='no'):
"""Get a specific aggregate by id."""
return IMPL.aggregate_get(context, aggregate_id, read_deleted)
def aggregate_update(context, aggregate_id, values):
"""Update the attributes of an aggregates. If values contains a metadata
key, it updates the aggregate metadata too."""
return IMPL.aggregate_update(context, aggregate_id, values)
def aggregate_delete(context, aggregate_id):
"""Delete an aggregate."""
return IMPL.aggregate_delete(context, aggregate_id)
def aggregate_get_all(context, read_deleted='yes'):
"""Get all aggregates."""
return IMPL.aggregate_get_all(context, read_deleted)
def aggregate_metadata_add(context, aggregate_id, metadata, set_delete=False):
"""Add/update metadata. If set_delete=True, it adds only."""
IMPL.aggregate_metadata_add(context, aggregate_id, metadata, set_delete)
def aggregate_metadata_get(context, aggregate_id, read_deleted='no'):
"""Get metadata for the specified aggregate."""
return IMPL.aggregate_metadata_get(context, aggregate_id, read_deleted)
def aggregate_metadata_delete(context, aggregate_id, key):
"""Delete the given metadata key."""
IMPL.aggregate_metadata_delete(context, aggregate_id, key)
def aggregate_host_add(context, aggregate_id, host):
"""Add host to the aggregate."""
IMPL.aggregate_host_add(context, aggregate_id, host)
def aggregate_host_get_all(context, aggregate_id, read_deleted='yes'):
"""Get hosts for the specified aggregate."""
return IMPL.aggregate_host_get_all(context, aggregate_id, read_deleted)
def aggregate_host_delete(context, aggregate_id, host):
"""Delete the given host from the aggregate."""
IMPL.aggregate_host_delete(context, aggregate_id, host)
####################
def instance_fault_create(context, values):
"""Create a new Instance Fault."""
return IMPL.instance_fault_create(context, values)

View File

@@ -20,6 +20,7 @@
"""Implementation of SQLAlchemy backend."""
import datetime
import functools
import re
import warnings
@@ -29,6 +30,7 @@ from nova import exception
from nova import flags
from nova import utils
from nova import log as logging
from nova.compute import aggregate_states
from nova.compute import vm_states
from nova.db.sqlalchemy import models
from nova.db.sqlalchemy.session import get_session
@@ -143,6 +145,20 @@ def require_volume_exists(f):
return wrapper
def require_aggregate_exists(f):
"""Decorator to require the specified aggregate to exist.
Requires the wrapped function to use context and aggregate_id as
their first two arguments.
"""
@functools.wraps(f)
def wrapper(context, aggregate_id, *args, **kwargs):
db.aggregate_get(context, aggregate_id)
return f(context, aggregate_id, *args, **kwargs)
return wrapper
def model_query(context, *args, **kwargs):
"""Query helper that accounts for context's `read_deleted` field.
@@ -3953,6 +3969,218 @@ def sm_volume_get_all(context):
################
def _aggregate_get_query(context, model_class, id_field, id,
session=None, read_deleted='yes'):
return model_query(context, model_class, session=session,
read_deleted=read_deleted).filter(id_field == id)
@require_admin_context
def aggregate_create(context, values, metadata=None):
try:
aggregate = models.Aggregate()
aggregate.update(values)
aggregate.operational_state = aggregate_states.BUILDING
aggregate.save()
except exception.DBError:
raise exception.AggregateNameExists(aggregate_name=values['name'])
if metadata:
aggregate_metadata_add(context, aggregate.id, metadata)
return aggregate
@require_admin_context
def aggregate_get(context, aggregate_id, read_deleted='no'):
aggregate = _aggregate_get_query(context,
models.Aggregate,
models.Aggregate.id, aggregate_id,
read_deleted=read_deleted).first()
if not aggregate:
raise exception.AggregateNotFound(aggregate_id=aggregate_id)
return aggregate
@require_admin_context
def aggregate_update(context, aggregate_id, values):
session = get_session()
aggregate = _aggregate_get_query(context,
models.Aggregate,
models.Aggregate.id, aggregate_id,
session=session,
read_deleted='no').first()
if aggregate:
metadata = values.get('metadata')
if metadata is not None:
aggregate_metadata_add(context,
aggregate_id,
values.pop('metadata'),
set_delete=True)
with session.begin():
aggregate.update(values)
aggregate.save(session=session)
values['metadata'] = metadata
return aggregate
else:
raise exception.AggregateNotFound(aggregate_id=aggregate_id)
@require_admin_context
def aggregate_delete(context, aggregate_id):
query = _aggregate_get_query(context,
models.Aggregate,
models.Aggregate.id, aggregate_id,
read_deleted='no')
if query.first():
query.update({'deleted': True,
'deleted_at': utils.utcnow(),
'updated_at': literal_column('updated_at')})
else:
raise exception.AggregateNotFound(aggregate_id=aggregate_id)
@require_admin_context
def aggregate_get_all(context, read_deleted='yes'):
return model_query(context,
models.Aggregate,
read_deleted=read_deleted).all()
@require_admin_context
@require_aggregate_exists
def aggregate_metadata_get(context, aggregate_id, read_deleted='no'):
rows = model_query(context,
models.AggregateMetadata,
read_deleted=read_deleted).\
filter_by(aggregate_id=aggregate_id).all()
return dict([(r['key'], r['value']) for r in rows])
@require_admin_context
@require_aggregate_exists
def aggregate_metadata_delete(context, aggregate_id, key):
query = _aggregate_get_query(context,
models.AggregateMetadata,
models.AggregateMetadata.aggregate_id,
aggregate_id, read_deleted='no').\
filter_by(key=key)
if query.first():
query.update({'deleted': True,
'deleted_at': utils.utcnow(),
'updated_at': literal_column('updated_at')})
else:
raise exception.AggregateMetadataNotFound(aggregate_id=aggregate_id,
metadata_key=key)
@require_admin_context
@require_aggregate_exists
def aggregate_metadata_get_item(context, aggregate_id, key,
session=None, read_deleted='yes'):
result = _aggregate_get_query(context,
models.AggregateMetadata,
models.AggregateMetadata.aggregate_id,
aggregate_id, session=session,
read_deleted=read_deleted).\
filter_by(key=key).first()
if not result:
raise exception.AggregateMetadataNotFound(metadata_key=key,
aggregate_id=aggregate_id)
return result
@require_admin_context
@require_aggregate_exists
def aggregate_metadata_add(context, aggregate_id, metadata, set_delete=False):
session = get_session()
if set_delete:
original_metadata = aggregate_metadata_get(context, aggregate_id)
for meta_key, meta_value in original_metadata.iteritems():
if meta_key not in metadata:
meta_ref = aggregate_metadata_get_item(context, aggregate_id,
meta_key, session)
meta_ref.update({'deleted': True})
meta_ref.save(session=session)
meta_ref = None
for meta_key, meta_value in metadata.iteritems():
item = {"value": meta_value}
try:
meta_ref = aggregate_metadata_get_item(context, aggregate_id,
meta_key, session)
if meta_ref.deleted:
item.update({'deleted': False, 'deleted_at': None,
'updated_at': literal_column('updated_at')})
except exception.AggregateMetadataNotFound:
meta_ref = models.AggregateMetadata()
item.update({"key": meta_key, "aggregate_id": aggregate_id})
meta_ref.update(item)
meta_ref.save(session=session)
return metadata
@require_admin_context
@require_aggregate_exists
def aggregate_host_get_all(context, aggregate_id, read_deleted='yes'):
rows = model_query(context,
models.AggregateHost,
read_deleted=read_deleted).\
filter_by(aggregate_id=aggregate_id).all()
return [r.host for r in rows]
@require_admin_context
@require_aggregate_exists
def aggregate_host_delete(context, aggregate_id, host):
query = _aggregate_get_query(context,
models.AggregateHost,
models.AggregateHost.aggregate_id,
aggregate_id,
read_deleted='no').filter_by(host=host)
if query.first():
query.update({'deleted': True,
'deleted_at': utils.utcnow(),
'updated_at': literal_column('updated_at')})
else:
raise exception.AggregateHostNotFound(aggregate_id=aggregate_id,
host=host)
@require_admin_context
@require_aggregate_exists
def aggregate_host_add(context, aggregate_id, host):
host_ref = _aggregate_get_query(context,
models.AggregateHost,
models.AggregateHost.aggregate_id,
aggregate_id,
read_deleted='no').\
filter_by(host=host).first()
if not host_ref:
try:
host_ref = models.AggregateHost()
values = {"host": host, "aggregate_id": aggregate_id, }
host_ref.update(values)
host_ref.save()
except exception.DBError:
raise exception.AggregateHostConflict(host=host)
else:
raise exception.AggregateHostExists(host=host,
aggregate_id=aggregate_id)
return host_ref
################
def instance_fault_create(context, values):
"""Create a new InstanceFault."""
fault_ref = models.InstanceFault()

View File

@@ -0,0 +1,97 @@
# Copyright (c) 2011 Citrix Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Boolean, String, DateTime, Integer
from sqlalchemy import MetaData, Column, ForeignKey, Table
from nova import log as logging
meta = MetaData()
aggregates = Table('aggregates', meta,
Column('created_at', DateTime(timezone=False)),
Column('updated_at', DateTime(timezone=False)),
Column('deleted_at', DateTime(timezone=False)),
Column('deleted', Boolean(create_constraint=True, name=None)),
Column('id', Integer(),
primary_key=True, nullable=False, autoincrement=True),
Column('name',
String(length=255, convert_unicode=False, assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False),
unique=True),
Column('operational_state',
String(length=255, convert_unicode=False, assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False),
nullable=False),
Column('availability_zone',
String(length=255, convert_unicode=False, assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False),
nullable=False),
)
hosts = Table('aggregate_hosts', meta,
Column('created_at', DateTime(timezone=False)),
Column('updated_at', DateTime(timezone=False)),
Column('deleted_at', DateTime(timezone=False)),
Column('deleted', Boolean(create_constraint=True, name=None)),
Column('id', Integer(), primary_key=True, nullable=False),
Column('host',
String(length=255, convert_unicode=False, assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False),
unique=True),
Column('aggregate_id', Integer(), ForeignKey('aggregates.id'),
nullable=False),
)
metadata = Table('aggregate_metadata', meta,
Column('created_at', DateTime(timezone=False)),
Column('updated_at', DateTime(timezone=False)),
Column('deleted_at', DateTime(timezone=False)),
Column('deleted', Boolean(create_constraint=True, name=None)),
Column('id', Integer(), primary_key=True, nullable=False),
Column('aggregate_id',
Integer(),
ForeignKey('aggregates.id'),
nullable=False),
Column('key',
String(length=255, convert_unicode=False, assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False),
nullable=False),
Column('value',
String(length=255, convert_unicode=False, assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False),
nullable=False))
tables = (aggregates, hosts, metadata)
def upgrade(migrate_engine):
meta.bind = migrate_engine
for table in tables:
try:
table.create()
except Exception:
logging.exception(repr(table))
def downgrade(migrate_engine):
meta.bind = migrate_engine
for table in tables:
try:
table.drop()
except Exception:
logging.exception(repr(table))