Move the savanna subdir to sahara

Rename the subdirectory and replace all instances
of "import savanna" with "import sahara" and all
instances of "from savanna" with "from sahara".

* Replaced mock patches like mock.patch('savanna...
* Updated config generator script
* Renamed entry points in setup.cfg
* Hacking checks also fixed
* Manual renaming in alembic scripts to force work migrations
* Fix doc building
* Renamed itests directories
* Some changes in gitignore
* Removed locale dir after rebase

Co-Authored-By: Alexander Ignatov <aignatov@mirantis.com>

Change-Id: Ia77252c24046c3e7283c0a7b96d11636020b949c
Partially implements: blueprint savanna-renaming-service
This commit is contained in:
Trevor McKay
2014-03-17 14:23:00 -04:00
committed by Alexander Ignatov
parent 7142184703
commit d39024a207
404 changed files with 1819 additions and 1248 deletions

20
sahara/db/__init__.py Normal file
View File

@@ -0,0 +1,20 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
DB abstraction for Sahara
"""
from sahara.db.api import * # noqa

380
sahara/db/api.py Normal file
View File

@@ -0,0 +1,380 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Defines interface for DB access.
Functions in this module are imported into the sahara.db namespace. Call these
functions from sahara.db namespace, not the sahara.db.api namespace.
All functions in this module return objects that implement a dictionary-like
interface.
**Related Flags**
:db_backend: string to lookup in the list of LazyPluggable backends.
`sqlalchemy` is the only supported backend right now.
:sql_connection: string specifying the sqlalchemy connection to use, like:
`sqlite:///var/lib/sahara/sahara.sqlite`.
"""
from oslo.config import cfg
from sahara.openstack.common.db import api as db_api
from sahara.openstack.common import log as logging
CONF = cfg.CONF
_BACKEND_MAPPING = {
'sqlalchemy': 'sahara.db.sqlalchemy.api',
}
IMPL = db_api.DBAPI(backend_mapping=_BACKEND_MAPPING)
LOG = logging.getLogger(__name__)
def setup_db():
"""Set up database, create tables, etc.
Return True on success, False otherwise
"""
return IMPL.setup_db()
def drop_db():
"""Drop database.
Return True on success, False otherwise
"""
return IMPL.drop_db()
## Helpers for building constraints / equality checks
def constraint(**conditions):
"""Return a constraint object suitable for use with some updates."""
return IMPL.constraint(**conditions)
def equal_any(*values):
"""Return an equality condition object suitable for use in a constraint.
Equal_any conditions require that a model object's attribute equal any
one of the given values.
"""
return IMPL.equal_any(*values)
def not_equal(*values):
"""Return an inequality condition object suitable for use in a constraint.
Not_equal conditions require that a model object's attribute differs from
all of the given values.
"""
return IMPL.not_equal(*values)
def to_dict(func):
def decorator(*args, **kwargs):
res = func(*args, **kwargs)
if isinstance(res, list):
return [item.to_dict() for item in res]
if res:
return res.to_dict()
else:
return None
return decorator
## Cluster ops
@to_dict
def cluster_get(context, cluster):
"""Return the cluster or None if it does not exist."""
return IMPL.cluster_get(context, cluster)
@to_dict
def cluster_get_all(context, **kwargs):
"""Get all clusters filtered by **kwargs e.g.
cluster_get_all(plugin_name='vanilla', hadoop_version='1.1')
"""
return IMPL.cluster_get_all(context, **kwargs)
@to_dict
def cluster_create(context, values):
"""Create a cluster from the values dictionary."""
return IMPL.cluster_create(context, values)
@to_dict
def cluster_update(context, cluster, values):
"""Set the given properties on cluster and update it."""
return IMPL.cluster_update(context, cluster, values)
def cluster_destroy(context, cluster):
"""Destroy the cluster or raise if it does not exist."""
IMPL.cluster_destroy(context, cluster)
## Node Group ops
def node_group_add(context, cluster, values):
"""Create a Node Group from the values dictionary."""
return IMPL.node_group_add(context, cluster, values)
def node_group_update(context, node_group, values):
"""Set the given properties on node_group and update it."""
IMPL.node_group_update(context, node_group, values)
def node_group_remove(context, node_group):
"""Destroy the node_group or raise if it does not exist."""
IMPL.node_group_remove(context, node_group)
## Instance ops
def instance_add(context, node_group, values):
"""Create an Instance from the values dictionary."""
return IMPL.instance_add(context, node_group, values)
def instance_update(context, instance, values):
"""Set the given properties on Instance and update it."""
IMPL.instance_update(context, instance, values)
def instance_remove(context, instance):
"""Destroy the Instance or raise if it does not exist."""
IMPL.instance_remove(context, instance)
## Volumes ops
def append_volume(context, instance, volume_id):
"""Append volume_id to instance."""
IMPL.append_volume(context, instance, volume_id)
def remove_volume(context, instance, volume_id):
"""Remove volume_id in instance."""
IMPL.remove_volume(context, instance, volume_id)
## Cluster Template ops
@to_dict
def cluster_template_get(context, cluster_template):
"""Return the cluster_template or None if it does not exist."""
return IMPL.cluster_template_get(context, cluster_template)
@to_dict
def cluster_template_get_all(context):
"""Get all cluster_templates."""
return IMPL.cluster_template_get_all(context)
@to_dict
def cluster_template_create(context, values):
"""Create a cluster_template from the values dictionary."""
return IMPL.cluster_template_create(context, values)
def cluster_template_destroy(context, cluster_template):
"""Destroy the cluster_template or raise if it does not exist."""
IMPL.cluster_template_destroy(context, cluster_template)
## Node Group Template ops
@to_dict
def node_group_template_get(context, node_group_template):
"""Return the Node Group Template or None if it does not exist."""
return IMPL.node_group_template_get(context, node_group_template)
@to_dict
def node_group_template_get_all(context):
"""Get all Node Group Templates."""
return IMPL.node_group_template_get_all(context)
@to_dict
def node_group_template_create(context, values):
"""Create a Node Group Template from the values dictionary."""
return IMPL.node_group_template_create(context, values)
def node_group_template_destroy(context, node_group_template):
"""Destroy the Node Group Template or raise if it does not exist."""
IMPL.node_group_template_destroy(context, node_group_template)
## Data Source ops
@to_dict
def data_source_get(context, data_source):
"""Return the Data Source or None if it does not exist."""
return IMPL.data_source_get(context, data_source)
@to_dict
def data_source_get_all(context):
"""Get all Data Sources."""
return IMPL.data_source_get_all(context)
@to_dict
def data_source_create(context, values):
"""Create a Data Source from the values dictionary."""
return IMPL.data_source_create(context, values)
def data_source_destroy(context, data_source):
"""Destroy the Data Source or raise if it does not exist."""
IMPL.data_source_destroy(context, data_source)
## JobExecutions ops
@to_dict
def job_execution_get(context, job_execution):
"""Return the JobExecution or None if it does not exist."""
return IMPL.job_execution_get(context, job_execution)
@to_dict
def job_execution_get_all(context, **kwargs):
"""Get all JobExecutions filtered by **kwargs e.g.
job_execution_get_all(cluster_id=12, input_id=123)
"""
return IMPL.job_execution_get_all(context, **kwargs)
def job_execution_count(context, **kwargs):
"""Count number of JobExecutions filtered by **kwargs e.g.
job_execution_count(cluster_id=12, input_id=123)
"""
return IMPL.job_execution_count(context, **kwargs)
@to_dict
def job_execution_create(context, values):
"""Create a JobExecution from the values dictionary."""
return IMPL.job_execution_create(context, values)
@to_dict
def job_execution_update(context, job_execution, values):
"""Create a JobExecution from the values dictionary."""
return IMPL.job_execution_update(context, job_execution, values)
def job_execution_destroy(context, job_execution):
"""Destroy the JobExecution or raise if it does not exist."""
IMPL.job_execution_destroy(context, job_execution)
## Job ops
@to_dict
def job_get(context, job):
"""Return the Job or None if it does not exist."""
return IMPL.job_get(context, job)
@to_dict
def job_get_all(context):
"""Get all Jobs."""
return IMPL.job_get_all(context)
@to_dict
def job_create(context, values):
"""Create a Job from the values dictionary."""
return IMPL.job_create(context, values)
def job_update(context, job, values):
"""Update a Job from the values dictionary."""
return IMPL.job_update(context, job, values)
def job_destroy(context, job):
"""Destroy the Job or raise if it does not exist."""
IMPL.job_destroy(context, job)
@to_dict
def job_binary_get_all(context):
"""Get all JobBinarys."""
return IMPL.job_binary_get_all(context)
@to_dict
def job_binary_get(context, job_binary):
"""Return the JobBinary or None if it does not exist."""
return IMPL.job_binary_get(context, job_binary)
@to_dict
def job_binary_create(context, values):
"""Create a JobBinary from the values dictionary."""
return IMPL.job_binary_create(context, values)
def job_binary_destroy(context, job_binary):
"""Destroy the JobBinary or raise if it does not exist."""
IMPL.job_binary_destroy(context, job_binary)
@to_dict
def job_binary_internal_get_all(context):
"""Get all JobBinaryInternals."""
return IMPL.job_binary_internal_get_all(context)
@to_dict
def job_binary_internal_get(context, job_binary_internal):
"""Return the JobBinaryInternal or None if it does not exist."""
return IMPL.job_binary_internal_get(context, job_binary_internal)
@to_dict
def job_binary_internal_create(context, values):
"""Create a JobBinaryInternal from the values dictionary."""
return IMPL.job_binary_internal_create(context, values)
def job_binary_internal_destroy(context, job_binary_internal):
"""Destroy the JobBinaryInternal or raise if it does not exist."""
IMPL.job_binary_internal_destroy(context, job_binary_internal)
def job_binary_internal_get_raw_data(context, job_binary_internal_id):
"""Return the binary data field from the specified JobBinaryInternal."""
return IMPL.job_binary_internal_get_raw_data(context,
job_binary_internal_id)

35
sahara/db/base.py Normal file
View File

@@ -0,0 +1,35 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base class for classes that need modular database access."""
from oslo.config import cfg
from sahara.openstack.common import importutils
db_driver_opt = cfg.StrOpt('db_driver',
default='sahara.db',
help='Driver to use for database access.')
CONF = cfg.CONF
CONF.register_opt(db_driver_opt)
class Base(object):
"""DB driver is injected in the init method."""
def __init__(self):
self.db = importutils.import_module(CONF.db_driver)

View File

View File

@@ -0,0 +1,54 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = sahara/db/migration/alembic_migrations
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# max length of characters to apply to the
# "slug" field
#truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
sqlalchemy.url =
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@@ -0,0 +1,85 @@
<!--
Copyright 2012 New Dream Network, LLC (DreamHost)
Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
-->
The migrations in the alembic/versions contain the changes needed to migrate
from older Savanna releases to newer versions. A migration occurs by executing
a script that details the changes needed to upgrade/downgrade the database. The
migration scripts are ordered so that multiple scripts can run sequentially to
update the database. The scripts are executed by Savanna's migration wrapper
which uses the Alembic library to manage the migration. Savanna supports
migration from Icehouse or later.
If you are a deployer or developer and want to migrate from Icehouse to later
release you must first add version tracking to the database:
```
$ savanna-db-manage --config-file /path/to/savaanna.conf stamp icehouse
```
You can upgrade to the latest database version via:
```
$ savanna-db-manage --config-file /path/to/savanna.conf upgrade head
```
To check the current database version:
```
$ savanna-db-manage --config-file /path/to/savanna.conf current
```
To create a script to run the migration offline:
```
$ savanna-db-manage --config-file /path/to/savanna.conf upgrade head --sql
```
To run the offline migration between specific migration versions:
```
$ savanna-db-manage --config-file /path/to/savanna.conf upgrade \
<start version>:<end version> --sql
```
Upgrade the database incrementally:
```
$ savanna-db-manage --config-file /path/to/savanna.conf upgrade --delta \
<# of revs>
```
Downgrade the database by a certain number of revisions:
```
$ savanna-db-manage --config-file /path/to/savanna.conf downgrade --delta \
<# of revs>
```
Create new revision:
```
$ savanna-db-manage --config-file /path/to/savanna.conf revision \
-m "description of revision" --autogenerate
```
Create a blank file:
```
$ savanna-db-manage --config-file /path/to/savanna.conf revision \
-m "description of revision"
```
To verify that the timeline does branch, you can run this command:
```
$ savanna-db-manage --config-file /path/to/savanna.conf check_migration
```
If the migration path does branch, you can find the branch point via:
```
$ savanna-db-manage --config-file /path/to/savanna.conf history
```

View File

@@ -0,0 +1,95 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Based on Neutron's migration/cli.py
from __future__ import with_statement
from logging import config as c
from alembic import context
from sqlalchemy import create_engine
from sqlalchemy import pool
from sahara.db.sqlalchemy import model_base
from sahara.openstack.common import importutils
importutils.import_module('sahara.db.sqlalchemy.models')
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
savanna_config = config.savanna_config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
c.fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = model_base.SaharaBase.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
context.configure(url=savanna_config.database.connection)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
engine = create_engine(
savanna_config.database.connection,
poolclass=pool.NullPool)
connection = engine.connect()
context.configure(
connection=connection,
target_metadata=target_metadata)
try:
with context.begin_transaction():
context.run_migrations()
finally:
connection.close()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -0,0 +1,37 @@
# Copyright ${create_date.year} OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision}
Create Date: ${create_date}
"""
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,305 @@
# Copyright 2014 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""initial
Revision ID: 572fe6dceb6d
Revises: None
Create Date: 2014-01-17 17:20:38.500595
"""
# revision identifiers, used by Alembic.
revision = '001'
down_revision = None
from alembic import op
import sqlalchemy as sa
from sahara.db.sqlalchemy import types as st
MYSQL_ENGINE = 'InnoDB'
MYSQL_CHARSET = 'utf8'
def upgrade():
op.create_table(
'job_binary_internal',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('tenant_id', sa.String(length=36), nullable=True),
sa.Column('name', sa.String(length=80), nullable=False),
sa.Column('data', sa.LargeBinary(), nullable=True),
sa.Column('datasize', sa.BIGINT(), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name', 'tenant_id'),
mysql_engine=MYSQL_ENGINE,
mysql_charset=MYSQL_CHARSET
)
op.create_table(
'node_group_templates',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('name', sa.String(length=80), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('tenant_id', sa.String(length=36), nullable=True),
sa.Column('flavor_id', sa.String(length=36), nullable=False),
sa.Column('image_id', sa.String(length=36), nullable=True),
sa.Column('plugin_name', sa.String(length=80), nullable=False),
sa.Column('hadoop_version', sa.String(length=80), nullable=False),
sa.Column('node_processes', st.JsonDictType(), nullable=True),
sa.Column('node_configs', st.JsonDictType(), nullable=True),
sa.Column('volumes_per_node', sa.Integer(), nullable=False),
sa.Column('volumes_size', sa.Integer(), nullable=True),
sa.Column('volume_mount_prefix', sa.String(length=80), nullable=True),
sa.Column('floating_ip_pool', sa.String(length=36), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name', 'tenant_id'),
mysql_engine=MYSQL_ENGINE,
mysql_charset=MYSQL_CHARSET
)
op.create_table(
'data_sources',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('tenant_id', sa.String(length=36), nullable=True),
sa.Column('name', sa.String(length=80), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('type', sa.String(length=80), nullable=False),
sa.Column('url', sa.String(length=256), nullable=False),
sa.Column('credentials', st.JsonDictType(), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name', 'tenant_id'),
mysql_engine=MYSQL_ENGINE,
mysql_charset=MYSQL_CHARSET
)
op.create_table(
'cluster_templates',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('name', sa.String(length=80), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('cluster_configs', st.JsonDictType(), nullable=True),
sa.Column('default_image_id', sa.String(length=36), nullable=True),
sa.Column('anti_affinity', st.JsonDictType(), nullable=True),
sa.Column('tenant_id', sa.String(length=36), nullable=True),
sa.Column('neutron_management_network',
sa.String(length=36), nullable=True),
sa.Column('plugin_name', sa.String(length=80), nullable=False),
sa.Column('hadoop_version', sa.String(length=80), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name', 'tenant_id'),
mysql_engine=MYSQL_ENGINE,
mysql_charset=MYSQL_CHARSET
)
op.create_table(
'job_binaries',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('tenant_id', sa.String(length=36), nullable=True),
sa.Column('name', sa.String(length=80), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('url', sa.String(length=256), nullable=False),
sa.Column('extra', st.JsonDictType(), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name', 'tenant_id'),
mysql_engine=MYSQL_ENGINE,
mysql_charset=MYSQL_CHARSET
)
op.create_table(
'jobs',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('tenant_id', sa.String(length=36), nullable=True),
sa.Column('name', sa.String(length=80), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('type', sa.String(length=80), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name', 'tenant_id'),
mysql_engine=MYSQL_ENGINE,
mysql_charset=MYSQL_CHARSET
)
op.create_table(
'templates_relations',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('tenant_id', sa.String(length=36), nullable=True),
sa.Column('name', sa.String(length=80), nullable=False),
sa.Column('flavor_id', sa.String(length=36), nullable=False),
sa.Column('image_id', sa.String(length=36), nullable=True),
sa.Column('node_processes', st.JsonDictType(), nullable=True),
sa.Column('node_configs', st.JsonDictType(), nullable=True),
sa.Column('volumes_per_node', sa.Integer(), nullable=True),
sa.Column('volumes_size', sa.Integer(), nullable=True),
sa.Column('volume_mount_prefix', sa.String(length=80), nullable=True),
sa.Column('count', sa.Integer(), nullable=False),
sa.Column('cluster_template_id', sa.String(length=36), nullable=True),
sa.Column('node_group_template_id',
sa.String(length=36), nullable=True),
sa.Column('floating_ip_pool', sa.String(length=36), nullable=True),
sa.ForeignKeyConstraint(['cluster_template_id'],
['cluster_templates.id'], ),
sa.ForeignKeyConstraint(['node_group_template_id'],
['node_group_templates.id'], ),
sa.PrimaryKeyConstraint('id'),
mysql_engine=MYSQL_ENGINE,
mysql_charset=MYSQL_CHARSET
)
op.create_table(
'mains_association',
sa.Column('Job_id', sa.String(length=36), nullable=True),
sa.Column('JobBinary_id', sa.String(length=36), nullable=True),
sa.ForeignKeyConstraint(['JobBinary_id'], ['job_binaries.id'], ),
sa.ForeignKeyConstraint(['Job_id'], ['jobs.id'], ),
mysql_engine=MYSQL_ENGINE,
mysql_charset=MYSQL_CHARSET
)
op.create_table(
'libs_association',
sa.Column('Job_id', sa.String(length=36), nullable=True),
sa.Column('JobBinary_id', sa.String(length=36), nullable=True),
sa.ForeignKeyConstraint(['JobBinary_id'], ['job_binaries.id'], ),
sa.ForeignKeyConstraint(['Job_id'], ['jobs.id'], ),
mysql_engine=MYSQL_ENGINE,
mysql_charset=MYSQL_CHARSET
)
op.create_table(
'clusters',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('name', sa.String(length=80), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('tenant_id', sa.String(length=36), nullable=True),
sa.Column('trust_id', sa.String(length=36), nullable=True),
sa.Column('is_transient', sa.Boolean(), nullable=True),
sa.Column('plugin_name', sa.String(length=80), nullable=False),
sa.Column('hadoop_version', sa.String(length=80), nullable=False),
sa.Column('cluster_configs', st.JsonDictType(), nullable=True),
sa.Column('default_image_id', sa.String(length=36), nullable=True),
sa.Column('neutron_management_network',
sa.String(length=36), nullable=True),
sa.Column('anti_affinity', st.JsonDictType(), nullable=True),
sa.Column('management_private_key', sa.Text(), nullable=False),
sa.Column('management_public_key', sa.Text(), nullable=False),
sa.Column('user_keypair_id', sa.String(length=80), nullable=True),
sa.Column('status', sa.String(length=80), nullable=True),
sa.Column('status_description', sa.String(length=200), nullable=True),
sa.Column('info', st.JsonDictType(), nullable=True),
sa.Column('extra', st.JsonDictType(), nullable=True),
sa.Column('cluster_template_id', sa.String(length=36), nullable=True),
sa.ForeignKeyConstraint(['cluster_template_id'],
['cluster_templates.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name', 'tenant_id'),
mysql_engine=MYSQL_ENGINE,
mysql_charset=MYSQL_CHARSET
)
op.create_table(
'node_groups',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('name', sa.String(length=80), nullable=False),
sa.Column('tenant_id', sa.String(length=36), nullable=True),
sa.Column('flavor_id', sa.String(length=36), nullable=False),
sa.Column('image_id', sa.String(length=36), nullable=True),
sa.Column('image_username', sa.String(length=36), nullable=True),
sa.Column('node_processes', st.JsonDictType(), nullable=True),
sa.Column('node_configs', st.JsonDictType(), nullable=True),
sa.Column('volumes_per_node', sa.Integer(), nullable=True),
sa.Column('volumes_size', sa.Integer(), nullable=True),
sa.Column('volume_mount_prefix', sa.String(length=80), nullable=True),
sa.Column('count', sa.Integer(), nullable=False),
sa.Column('cluster_id', sa.String(length=36), nullable=True),
sa.Column('node_group_template_id',
sa.String(length=36), nullable=True),
sa.Column('floating_ip_pool', sa.String(length=36), nullable=True),
sa.ForeignKeyConstraint(['cluster_id'], ['clusters.id'], ),
sa.ForeignKeyConstraint(['node_group_template_id'],
['node_group_templates.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name', 'cluster_id'),
mysql_engine=MYSQL_ENGINE,
mysql_charset=MYSQL_CHARSET
)
op.create_table(
'job_executions',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('tenant_id', sa.String(length=36), nullable=True),
sa.Column('job_id', sa.String(length=36), nullable=True),
sa.Column('input_id', sa.String(length=36), nullable=True),
sa.Column('output_id', sa.String(length=36), nullable=True),
sa.Column('start_time', sa.DateTime(), nullable=True),
sa.Column('end_time', sa.DateTime(), nullable=True),
sa.Column('cluster_id', sa.String(length=36), nullable=True),
sa.Column('info', st.JsonDictType(), nullable=True),
sa.Column('progress', sa.Float(), nullable=True),
sa.Column('oozie_job_id', sa.String(length=100), nullable=True),
sa.Column('return_code', sa.String(length=80), nullable=True),
sa.Column('job_configs', st.JsonDictType(), nullable=True),
sa.Column('main_class', sa.Text(), nullable=True),
sa.Column('java_opts', sa.Text(), nullable=True),
sa.ForeignKeyConstraint(['cluster_id'], ['clusters.id'], ),
sa.ForeignKeyConstraint(['input_id'], ['data_sources.id'], ),
sa.ForeignKeyConstraint(['job_id'], ['jobs.id'], ),
sa.ForeignKeyConstraint(['output_id'], ['data_sources.id'], ),
sa.PrimaryKeyConstraint('id'),
mysql_engine=MYSQL_ENGINE,
mysql_charset=MYSQL_CHARSET
)
op.create_table(
'instances',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('tenant_id', sa.String(length=36), nullable=True),
sa.Column('node_group_id', sa.String(length=36), nullable=True),
sa.Column('instance_id', sa.String(length=36), nullable=True),
sa.Column('instance_name', sa.String(length=80), nullable=False),
sa.Column('internal_ip', sa.String(length=15), nullable=True),
sa.Column('management_ip', sa.String(length=15), nullable=True),
sa.Column('volumes', st.JsonDictType(), nullable=True),
sa.ForeignKeyConstraint(['node_group_id'], ['node_groups.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('instance_id', 'node_group_id'),
mysql_engine=MYSQL_ENGINE,
mysql_charset=MYSQL_CHARSET
)
def downgrade():
op.drop_table('instances')
op.drop_table('job_executions')
op.drop_table('node_groups')
op.drop_table('clusters')
op.drop_table('libs_association')
op.drop_table('mains_association')
op.drop_table('templates_relations')
op.drop_table('jobs')
op.drop_table('job_binaries')
op.drop_table('cluster_templates')
op.drop_table('data_sources')
op.drop_table('node_group_templates')
op.drop_table('job_binary_internal')

View File

@@ -0,0 +1,40 @@
# Copyright 2014 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""add_job_exec_extra
Revision ID: 44f8293d129d
Revises: 001
Create Date: 2014-01-23 23:25:13.225628
"""
# revision identifiers, used by Alembic.
revision = '002'
down_revision = '001'
from alembic import op
import sqlalchemy as sa
from sahara.db.sqlalchemy import types as st
def upgrade():
op.add_column('job_executions', sa.Column('extra', st.JsonEncoded(),
nullable=True))
def downgrade():
op.drop_column('job_executions', 'extra')

View File

@@ -0,0 +1,41 @@
# Copyright 2014 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""remove java job fields
Revision ID: 2a26b2a052a
Revises: 002
Create Date: 2014-01-31 14:26:57.404894
"""
# revision identifiers, used by Alembic.
revision = '003'
down_revision = '002'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.drop_column('job_executions', u'java_opts')
op.drop_column('job_executions', u'main_class')
def downgrade():
op.add_column('job_executions',
sa.Column(u'main_class', sa.TEXT(), nullable=True))
op.add_column('job_executions',
sa.Column(u'java_opts', sa.TEXT(), nullable=True))

View File

@@ -0,0 +1,36 @@
# Copyright 2014 Openstack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""jar_to_mapreduce
Revision ID: 004
Revises: 003
Create Date: 2014-02-11 13:31:00
"""
# revision identifiers, used by Alembic.
revision = '004'
down_revision = '003'
from alembic import op
def upgrade():
op.execute("UPDATE jobs SET type='MapReduce' where type='Jar'")
def downgrade():
op.execute("UPDATE jobs SET type='Jar' where type='MapReduce'")

111
sahara/db/migration/cli.py Normal file
View File

@@ -0,0 +1,111 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from alembic import command as alembic_cmd
from alembic import config as alembic_cfg
from alembic import util as alembic_u
from oslo.config import cfg
CONF = cfg.CONF
CONF.import_opt("connection", "sahara.openstack.common.db.sqlalchemy.session",
group="database")
def do_alembic_command(config, cmd, *args, **kwargs):
try:
getattr(alembic_cmd, cmd)(config, *args, **kwargs)
except alembic_u.CommandError as e:
alembic_u.err(str(e))
def do_check_migration(config, _cmd):
do_alembic_command(config, 'branches')
def do_upgrade_downgrade(config, cmd):
if not CONF.command.revision and not CONF.command.delta:
raise SystemExit('You must provide a revision or relative delta')
revision = CONF.command.revision
if CONF.command.delta:
sign = '+' if CONF.command.name == 'upgrade' else '-'
revision = sign + str(CONF.command.delta)
do_alembic_command(config, cmd, revision, sql=CONF.command.sql)
def do_stamp(config, cmd):
do_alembic_command(config, cmd,
CONF.command.revision,
sql=CONF.command.sql)
def do_revision(config, cmd):
do_alembic_command(config, cmd,
message=CONF.command.message,
autogenerate=CONF.command.autogenerate,
sql=CONF.command.sql)
def add_command_parsers(subparsers):
for name in ['current', 'history', 'branches']:
parser = subparsers.add_parser(name)
parser.set_defaults(func=do_alembic_command)
parser = subparsers.add_parser('check_migration')
parser.set_defaults(func=do_check_migration)
for name in ['upgrade', 'downgrade']:
parser = subparsers.add_parser(name)
parser.add_argument('--delta', type=int)
parser.add_argument('--sql', action='store_true')
parser.add_argument('revision', nargs='?')
parser.set_defaults(func=do_upgrade_downgrade)
parser = subparsers.add_parser('stamp')
parser.add_argument('--sql', action='store_true')
parser.add_argument('revision')
parser.set_defaults(func=do_stamp)
parser = subparsers.add_parser('revision')
parser.add_argument('-m', '--message')
parser.add_argument('--autogenerate', action='store_true')
parser.add_argument('--sql', action='store_true')
parser.set_defaults(func=do_revision)
command_opt = cfg.SubCommandOpt('command',
title='Command',
help='Available commands',
handler=add_command_parsers)
CONF.register_cli_opt(command_opt)
def main():
config = alembic_cfg.Config(
os.path.join(os.path.dirname(__file__), 'alembic.ini')
)
config.set_main_option('script_location',
'sahara.db.migration:alembic_migrations')
# attach the Sahara conf to the Alembic conf
config.savanna_config = CONF
CONF(project='sahara')
CONF.command.func(config, CONF.command.name)

View File

748
sahara/db/sqlalchemy/api.py Normal file
View File

@@ -0,0 +1,748 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation of SQLAlchemy backend."""
import sys
from oslo.config import cfg
import sqlalchemy as sa
from sahara.db.sqlalchemy import models as m
from sahara import exceptions as ex
from sahara.openstack.common.db import exception as db_exc
from sahara.openstack.common.db.sqlalchemy import session as db_session
from sahara.openstack.common import log as logging
LOG = logging.getLogger(__name__)
get_engine = db_session.get_engine
get_session = db_session.get_session
CONF = cfg.CONF
def get_backend():
"""The backend is this module itself."""
return sys.modules[__name__]
def model_query(model, context, session=None, project_only=True):
"""Query helper.
:param model: base model to query
:param context: context to query under
:param project_only: if present and context is user-type, then restrict
query to match the context's tenant_id.
"""
session = session or get_session()
query = session.query(model)
if project_only and not context.is_admin:
query = query.filter_by(tenant_id=context.tenant_id)
return query
def count_query(model, context, session=None, project_only=None):
"""Count query helper.
:param model: base model to query
:param context: context to query under
:param project_only: if present and context is user-type, then restrict
query to match the context's project_id.
"""
return model_query(sa.func.count(model.id), context, session, project_only)
def setup_db():
try:
engine = db_session.get_engine(sqlite_fk=True)
m.Cluster.metadata.create_all(engine)
except sa.exc.OperationalError as e:
LOG.exception("Database registration exception: %s", e)
return False
return True
def drop_db():
try:
engine = db_session.get_engine(sqlite_fk=True)
m.Cluster.metadata.drop_all(engine)
except Exception as e:
LOG.exception("Database shutdown exception: %s", e)
return False
return True
## Helpers for building constraints / equality checks
def constraint(**conditions):
return Constraint(conditions)
def equal_any(*values):
return EqualityCondition(values)
def not_equal(*values):
return InequalityCondition(values)
class Constraint(object):
def __init__(self, conditions):
self.conditions = conditions
def apply(self, model, query):
for key, condition in self.conditions.iteritems():
for clause in condition.clauses(getattr(model, key)):
query = query.filter(clause)
return query
class EqualityCondition(object):
def __init__(self, values):
self.values = values
def clauses(self, field):
return sa.or_([field == value for value in self.values])
class InequalityCondition(object):
def __init__(self, values):
self.values = values
def clauses(self, field):
return [field != value for value in self.values]
## Cluster ops
def _cluster_get(context, session, cluster_id):
query = model_query(m.Cluster, context, session)
return query.filter_by(id=cluster_id).first()
def cluster_get(context, cluster_id):
return _cluster_get(context, get_session(), cluster_id)
def cluster_get_all(context, **kwargs):
query = model_query(m.Cluster, context)
return query.filter_by(**kwargs).all()
def cluster_create(context, values):
values = values.copy()
cluster = m.Cluster()
node_groups = values.pop("node_groups", [])
cluster.update(values)
session = get_session()
with session.begin():
try:
cluster.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for Cluster: %s"
% e.columns)
try:
for ng in node_groups:
node_group = m.NodeGroup()
node_group.update({"cluster_id": cluster.id})
node_group.update(ng)
node_group.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for NodeGroup: %s"
% e.columns)
return cluster_get(context, cluster.id)
def cluster_update(context, cluster_id, values):
session = get_session()
with session.begin():
cluster = _cluster_get(context, session, cluster_id)
if cluster is None:
raise ex.NotFoundException(cluster_id,
"Cluster id '%s' not found!")
cluster.update(values)
return cluster
def cluster_destroy(context, cluster_id):
session = get_session()
with session.begin():
cluster = _cluster_get(context, session, cluster_id)
if not cluster:
raise ex.NotFoundException(cluster_id,
"Cluster id '%s' not found!")
session.delete(cluster)
## Node Group ops
def _node_group_get(context, session, node_group_id):
query = model_query(m.NodeGroup, context, session)
return query.filter_by(id=node_group_id).first()
def node_group_add(context, cluster_id, values):
session = get_session()
with session.begin():
cluster = _cluster_get(context, session, cluster_id)
if not cluster:
raise ex.NotFoundException(cluster_id,
"Cluster id '%s' not found!")
node_group = m.NodeGroup()
node_group.update({"cluster_id": cluster_id})
node_group.update(values)
session.add(node_group)
return node_group.id
def node_group_update(context, node_group_id, values):
session = get_session()
with session.begin():
node_group = _node_group_get(context, session, node_group_id)
if not node_group:
raise ex.NotFoundException(node_group_id,
"Node Group id '%s' not found!")
node_group.update(values)
def node_group_remove(context, node_group_id):
session = get_session()
with session.begin():
node_group = _node_group_get(context, session, node_group_id)
if not node_group:
raise ex.NotFoundException(node_group_id,
"Node Group id '%s' not found!")
session.delete(node_group)
## Instance ops
def _instance_get(context, session, instance_id):
query = model_query(m.Instance, context, session)
return query.filter_by(id=instance_id).first()
def instance_add(context, node_group_id, values):
session = get_session()
with session.begin():
node_group = _node_group_get(context, session, node_group_id)
if not node_group:
raise ex.NotFoundException(node_group_id,
"Node Group id '%s' not found!")
instance = m.Instance()
instance.update({"node_group_id": node_group_id})
instance.update(values)
session.add(instance)
node_group = _node_group_get(context, session, node_group_id)
node_group.count += 1
return instance.id
def instance_update(context, instance_id, values):
session = get_session()
with session.begin():
instance = _instance_get(context, session, instance_id)
if not instance:
raise ex.NotFoundException(instance_id,
"Instance id '%s' not found!")
instance.update(values)
def instance_remove(context, instance_id):
session = get_session()
with session.begin():
instance = _instance_get(context, session, instance_id)
if not instance:
raise ex.NotFoundException(instance_id,
"Instance id '%s' not found!")
session.delete(instance)
node_group_id = instance.node_group_id
node_group = _node_group_get(context, session, node_group_id)
node_group.count -= 1
## Volumes ops
def append_volume(context, instance_id, volume_id):
session = get_session()
with session.begin():
instance = _instance_get(context, session, instance_id)
if not instance:
raise ex.NotFoundException(instance_id,
"Instance id '%s' not found!")
instance.volumes.append(volume_id)
def remove_volume(context, instance_id, volume_id):
session = get_session()
with session.begin():
instance = _instance_get(context, session, instance_id)
if not instance:
raise ex.NotFoundException(instance_id,
"Instance id '%s' not found!")
instance.volumes.remove(volume_id)
## Cluster Template ops
def _cluster_template_get(context, session, cluster_template_id):
query = model_query(m.ClusterTemplate, context, session)
return query.filter_by(id=cluster_template_id).first()
def cluster_template_get(context, cluster_template_id):
return _cluster_template_get(context, get_session(), cluster_template_id)
def cluster_template_get_all(context):
query = model_query(m.ClusterTemplate, context)
return query.all()
def cluster_template_create(context, values):
values = values.copy()
cluster_template = m.ClusterTemplate()
node_groups = values.pop("node_groups") or []
cluster_template.update(values)
session = get_session()
with session.begin():
try:
cluster_template.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for ClusterTemplate: %s"
% e.columns)
try:
for ng in node_groups:
node_group = m.TemplatesRelation()
node_group.update({"cluster_template_id": cluster_template.id})
node_group.update(ng)
node_group.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for TemplatesRelation:"
"%s" % e.columns)
return cluster_template_get(context, cluster_template.id)
def cluster_template_destroy(context, cluster_template_id):
session = get_session()
with session.begin():
cluster_template = _cluster_template_get(context, session,
cluster_template_id)
if not cluster_template:
raise ex.NotFoundException(cluster_template_id,
"Cluster Template id '%s' not found!")
session.delete(cluster_template)
## Node Group Template ops
def _node_group_template_get(context, session, node_group_template_id):
query = model_query(m.NodeGroupTemplate, context, session)
return query.filter_by(id=node_group_template_id).first()
def node_group_template_get(context, node_group_template_id):
return _node_group_template_get(context, get_session(),
node_group_template_id)
def node_group_template_get_all(context):
query = model_query(m.NodeGroupTemplate, context)
return query.all()
def node_group_template_create(context, values):
node_group_template = m.NodeGroupTemplate()
node_group_template.update(values)
try:
node_group_template.save()
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for NodeGroupTemplate: %s"
% e.columns)
return node_group_template
def node_group_template_destroy(context, node_group_template_id):
session = get_session()
with session.begin():
node_group_template = _node_group_template_get(context, session,
node_group_template_id)
if not node_group_template:
raise ex.NotFoundException(
node_group_template_id,
"Node Group Template id '%s' not found!")
session.delete(node_group_template)
## Data Source ops
def _data_source_get(context, session, data_source_id):
query = model_query(m.DataSource, context, session)
return query.filter_by(id=data_source_id).first()
def data_source_get(context, data_source_id):
return _data_source_get(context, get_session(), data_source_id)
def data_source_get_all(context):
query = model_query(m.DataSource, context)
return query.all()
def data_source_create(context, values):
data_source = m.DataSource()
data_source.update(values)
try:
data_source.save()
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for DataSource: %s"
% e.columns)
return data_source
def data_source_destroy(context, data_source_id):
session = get_session()
try:
with session.begin():
data_source = _data_source_get(context, session, data_source_id)
if not data_source:
raise ex.NotFoundException(data_source_id,
"Data Source id '%s' not found!")
session.delete(data_source)
except db_exc.DBError as e:
msg = "foreign key constraint" in e.message and\
" on foreign key constraint" or ""
raise ex.DeletionFailed("Data Source deletion failed%s" % msg)
## JobExecution ops
def _job_execution_get(context, session, job_execution_id):
query = model_query(m.JobExecution, context, session)
return query.filter_by(id=job_execution_id).first()
def job_execution_get(context, job_execution_id):
return _job_execution_get(context, get_session(), job_execution_id)
def job_execution_get_all(context, **kwargs):
query = model_query(m.JobExecution, context)
return query.filter_by(**kwargs).all()
def job_execution_count(context, **kwargs):
query = count_query(m.JobExecution, context)
return query.filter_by(**kwargs).first()[0]
def job_execution_create(context, values):
session = get_session()
with session.begin():
job_ex = m.JobExecution()
job_ex.update(values)
try:
job_ex.save()
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for JobExecution: %s"
% e.columns)
return job_ex
def job_execution_update(context, job_execution_id, values):
session = get_session()
with session.begin():
job_ex = _job_execution_get(context, session, job_execution_id)
if not job_ex:
raise ex.NotFoundException(job_execution_id,
"JobExecution id '%s' not found!")
job_ex.update(values)
return job_ex
def job_execution_destroy(context, job_execution_id):
session = get_session()
with session.begin():
job_ex = _job_execution_get(context, session, job_execution_id)
if not job_ex:
raise ex.NotFoundException(job_execution_id,
"JobExecution id '%s' not found!")
session.delete(job_ex)
## Job ops
def _job_get(context, session, job_id):
query = model_query(m.Job, context, session)
return query.filter_by(id=job_id).first()
def job_get(context, job_id):
return _job_get(context, get_session(), job_id)
def job_get_all(context):
query = model_query(m.Job, context)
return query.all()
def _append_job_binaries(context, session, from_list, to_list):
for job_binary_id in from_list:
job_binary = model_query(
m.JobBinary, context, session).filter_by(id=job_binary_id).first()
if job_binary is not None:
to_list.append(job_binary)
def job_create(context, values):
mains = values.pop("mains", [])
libs = values.pop("libs", [])
session = get_session()
with session.begin():
job = m.Job()
job.update(values)
# libs and mains are 'lazy' objects. The initialization below
# is needed here because it provides libs and mains to be initialized
# within a session even if the lists are empty
job.mains = []
job.libs = []
try:
_append_job_binaries(context, session, mains, job.mains)
_append_job_binaries(context, session, libs, job.libs)
job.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for Job: %s"
% e.columns)
return job
def job_update(context, job_id, values):
session = get_session()
with session.begin():
job = _job_get(context, session, job_id)
if not job:
raise ex.NotFoundException(job_id,
"Job id '%s' not found!")
job.update(values)
return job
def job_destroy(context, job_id):
session = get_session()
try:
with session.begin():
job = _job_get(context, session, job_id)
if not job:
raise ex.NotFoundException(job_id,
"Job id '%s' not found!")
session.delete(job)
except db_exc.DBError as e:
msg = "foreign key constraint" in e.message and\
" on foreign key constraint" or ""
raise ex.DeletionFailed("Job deletion failed%s" % msg)
## JobBinary ops
def _job_binary_get(context, session, job_binary_id):
query = model_query(m.JobBinary, context, session)
return query.filter_by(id=job_binary_id).first()
def job_binary_get_all(context):
"""Returns JobBinary objects that do not contain a data field
The data column uses deferred loading.
"""
query = model_query(m.JobBinary, context)
return query.all()
def job_binary_get(context, job_binary_id):
"""Returns a JobBinary object that does not contain a data field
The data column uses deferred loadling.
"""
return _job_binary_get(context, get_session(), job_binary_id)
def job_binary_create(context, values):
"""Returns a JobBinary that does not contain a data field
The data column uses deferred loading.
"""
job_binary = m.JobBinary()
job_binary.update(values)
try:
job_binary.save()
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for JobBinary: %s"
% e.columns)
return job_binary
def _check_job_binary_referenced(ctx, session, job_binary_id):
args = {"JobBinary_id": job_binary_id}
mains = model_query(m.mains_association, ctx, session,
project_only=False).filter_by(**args)
libs = model_query(m.libs_association, ctx, session,
project_only=False).filter_by(**args)
return mains.first() is not None or libs.first() is not None
def job_binary_destroy(context, job_binary_id):
session = get_session()
with session.begin():
job_binary = _job_binary_get(context, session, job_binary_id)
if not job_binary:
raise ex.NotFoundException(job_binary_id,
"JobBinary id '%s' not found!")
if _check_job_binary_referenced(context, session, job_binary_id):
raise ex.DeletionFailed("JobBinary is referenced"
"and cannot be deleted")
session.delete(job_binary)
## JobBinaryInternal ops
def _job_binary_internal_get(context, session, job_binary_internal_id):
query = model_query(m.JobBinaryInternal, context, session)
return query.filter_by(id=job_binary_internal_id).first()
def job_binary_internal_get_all(context):
"""Returns JobBinaryInternal objects that do not contain a data field
The data column uses deferred loading.
"""
query = model_query(m.JobBinaryInternal, context)
return query.all()
def job_binary_internal_get(context, job_binary_internal_id):
"""Returns a JobBinaryInternal object that does not contain a data field
The data column uses deferred loadling.
"""
return _job_binary_internal_get(context, get_session(),
job_binary_internal_id)
def job_binary_internal_get_raw_data(context, job_binary_internal_id):
"""Returns only the data field for the specified JobBinaryInternal."""
query = model_query(m.JobBinaryInternal, context)
res = query.filter_by(id=job_binary_internal_id).first()
if res is not None:
datasize_KB = res.datasize / 1024.0
if datasize_KB > CONF.job_binary_max_KB:
raise ex.DataTooBigException(round(datasize_KB, 1),
CONF.job_binary_max_KB,
"Size of internal binary (%sKB) is "
"greater than the maximum (%sKB)")
# This assignment is sufficient to load the deferred column
res = res.data
return res
def job_binary_internal_create(context, values):
"""Returns a JobBinaryInternal that does not contain a data field
The data column uses deferred loading.
"""
values["datasize"] = len(values["data"])
datasize_KB = values["datasize"] / 1024.0
if datasize_KB > CONF.job_binary_max_KB:
raise ex.DataTooBigException(round(datasize_KB, 1),
CONF.job_binary_max_KB,
"Size of internal binary (%sKB) is "
"greater than the maximum (%sKB)")
job_binary_int = m.JobBinaryInternal()
job_binary_int.update(values)
try:
job_binary_int.save()
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for JobBinaryInternal: %s"
% e.columns)
return job_binary_internal_get(context, job_binary_int.id)
def job_binary_internal_destroy(context, job_binary_internal_id):
session = get_session()
with session.begin():
job_binary_internal = _job_binary_internal_get(context, session,
job_binary_internal_id)
if not job_binary_internal:
raise ex.NotFoundException(job_binary_internal_id,
"JobBinaryInternal id '%s' not found!")
session.delete(job_binary_internal)

View File

@@ -0,0 +1,48 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sqlalchemy.ext import declarative
from sqlalchemy.orm import attributes
from sahara.openstack.common.db.sqlalchemy import models as oslo_models
class _SaharaBase(oslo_models.ModelBase, oslo_models.TimestampMixin):
"""Base class for all SQLAlchemy DB Models."""
def to_dict(self):
"""sqlalchemy based automatic to_dict method."""
d = {}
# if a column is unloaded at this point, it is
# probably deferred. We do not want to access it
# here and thereby cause it to load...
unloaded = attributes.instance_state(self).unloaded
for col in self.__table__.columns:
if col.name not in unloaded:
d[col.name] = getattr(self, col.name)
datetime_to_str(d, 'created_at')
datetime_to_str(d, 'updated_at')
return d
def datetime_to_str(dct, attr_name):
if dct.get(attr_name) is not None:
dct[attr_name] = dct[attr_name].isoformat(' ')
SaharaBase = declarative.declarative_base(cls=_SaharaBase)

View File

@@ -0,0 +1,354 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
import six
import sqlalchemy as sa
from sqlalchemy.orm import relationship
from sahara.db.sqlalchemy import model_base as mb
from sahara.db.sqlalchemy import types as st
## Helpers
def _generate_unicode_uuid():
return six.text_type(uuid.uuid4())
def _id_column():
return sa.Column(sa.String(36),
primary_key=True,
default=_generate_unicode_uuid)
## Main objects: Cluster, NodeGroup, Instance
class Cluster(mb.SaharaBase):
"""Contains all info about cluster."""
__tablename__ = 'clusters'
__table_args__ = (
sa.UniqueConstraint('name', 'tenant_id'),
)
id = _id_column()
name = sa.Column(sa.String(80), nullable=False)
description = sa.Column(sa.Text)
tenant_id = sa.Column(sa.String(36))
trust_id = sa.Column(sa.String(36))
is_transient = sa.Column(sa.Boolean, default=False)
plugin_name = sa.Column(sa.String(80), nullable=False)
hadoop_version = sa.Column(sa.String(80), nullable=False)
cluster_configs = sa.Column(st.JsonDictType())
default_image_id = sa.Column(sa.String(36))
neutron_management_network = sa.Column(sa.String(36))
anti_affinity = sa.Column(st.JsonListType())
management_private_key = sa.Column(sa.Text, nullable=False)
management_public_key = sa.Column(sa.Text, nullable=False)
user_keypair_id = sa.Column(sa.String(80))
status = sa.Column(sa.String(80))
status_description = sa.Column(sa.String(200))
info = sa.Column(st.JsonDictType())
extra = sa.Column(st.JsonDictType())
node_groups = relationship('NodeGroup', cascade="all,delete",
backref='cluster', lazy='joined')
cluster_template_id = sa.Column(sa.String(36),
sa.ForeignKey('cluster_templates.id'))
cluster_template = relationship('ClusterTemplate',
backref="clusters", lazy='joined')
def to_dict(self):
d = super(Cluster, self).to_dict()
d['node_groups'] = [ng.to_dict() for ng in self.node_groups]
return d
class NodeGroup(mb.SaharaBase):
"""Specifies group of nodes within a cluster."""
__tablename__ = 'node_groups'
__table_args__ = (
sa.UniqueConstraint('name', 'cluster_id'),
)
id = _id_column()
name = sa.Column(sa.String(80), nullable=False)
tenant_id = sa.Column(sa.String(36))
flavor_id = sa.Column(sa.String(36), nullable=False)
image_id = sa.Column(sa.String(36))
image_username = sa.Column(sa.String(36))
node_processes = sa.Column(st.JsonListType())
node_configs = sa.Column(st.JsonDictType())
volumes_per_node = sa.Column(sa.Integer)
volumes_size = sa.Column(sa.Integer)
volume_mount_prefix = sa.Column(sa.String(80))
count = sa.Column(sa.Integer, nullable=False)
instances = relationship('Instance', cascade="all,delete",
backref='node_group',
order_by="Instance.instance_name", lazy='joined')
cluster_id = sa.Column(sa.String(36), sa.ForeignKey('clusters.id'))
node_group_template_id = sa.Column(sa.String(36),
sa.ForeignKey(
'node_group_templates.id'))
node_group_template = relationship('NodeGroupTemplate',
backref="node_groups", lazy='joined')
floating_ip_pool = sa.Column(sa.String(36))
def to_dict(self):
d = super(NodeGroup, self).to_dict()
d['instances'] = [i.to_dict() for i in self.instances]
return d
class Instance(mb.SaharaBase):
"""An OpenStack instance created for the cluster."""
__tablename__ = 'instances'
__table_args__ = (
sa.UniqueConstraint('instance_id', 'node_group_id'),
)
id = _id_column()
tenant_id = sa.Column(sa.String(36))
node_group_id = sa.Column(sa.String(36), sa.ForeignKey('node_groups.id'))
instance_id = sa.Column(sa.String(36))
instance_name = sa.Column(sa.String(80), nullable=False)
internal_ip = sa.Column(sa.String(15))
management_ip = sa.Column(sa.String(15))
volumes = sa.Column(st.JsonListType())
## Template objects: ClusterTemplate, NodeGroupTemplate, TemplatesRelation
class ClusterTemplate(mb.SaharaBase):
"""Template for Cluster."""
__tablename__ = 'cluster_templates'
__table_args__ = (
sa.UniqueConstraint('name', 'tenant_id'),
)
id = _id_column()
name = sa.Column(sa.String(80), nullable=False)
description = sa.Column(sa.Text)
cluster_configs = sa.Column(st.JsonDictType())
default_image_id = sa.Column(sa.String(36))
anti_affinity = sa.Column(st.JsonListType())
tenant_id = sa.Column(sa.String(36))
neutron_management_network = sa.Column(sa.String(36))
plugin_name = sa.Column(sa.String(80), nullable=False)
hadoop_version = sa.Column(sa.String(80), nullable=False)
node_groups = relationship('TemplatesRelation', cascade="all,delete",
backref='cluster_template', lazy='joined')
def to_dict(self):
d = super(ClusterTemplate, self).to_dict()
d['node_groups'] = [tr.to_dict() for tr in
self.node_groups]
return d
class NodeGroupTemplate(mb.SaharaBase):
"""Template for NodeGroup."""
__tablename__ = 'node_group_templates'
__table_args__ = (
sa.UniqueConstraint('name', 'tenant_id'),
)
id = _id_column()
name = sa.Column(sa.String(80), nullable=False)
description = sa.Column(sa.Text)
tenant_id = sa.Column(sa.String(36))
flavor_id = sa.Column(sa.String(36), nullable=False)
image_id = sa.Column(sa.String(36))
plugin_name = sa.Column(sa.String(80), nullable=False)
hadoop_version = sa.Column(sa.String(80), nullable=False)
node_processes = sa.Column(st.JsonListType())
node_configs = sa.Column(st.JsonDictType())
volumes_per_node = sa.Column(sa.Integer, nullable=False)
volumes_size = sa.Column(sa.Integer)
volume_mount_prefix = sa.Column(sa.String(80))
floating_ip_pool = sa.Column(sa.String(36))
class TemplatesRelation(mb.SaharaBase):
"""NodeGroupTemplate - ClusterTemplate relationship.
In fact, it's a template of NodeGroup in Cluster.
"""
__tablename__ = 'templates_relations'
id = _id_column()
tenant_id = sa.Column(sa.String(36))
name = sa.Column(sa.String(80), nullable=False)
flavor_id = sa.Column(sa.String(36), nullable=False)
image_id = sa.Column(sa.String(36))
node_processes = sa.Column(st.JsonListType())
node_configs = sa.Column(st.JsonDictType())
volumes_per_node = sa.Column(sa.Integer)
volumes_size = sa.Column(sa.Integer)
volume_mount_prefix = sa.Column(sa.String(80))
count = sa.Column(sa.Integer, nullable=False)
cluster_template_id = sa.Column(sa.String(36),
sa.ForeignKey('cluster_templates.id'))
node_group_template_id = sa.Column(sa.String(36),
sa.ForeignKey(
'node_group_templates.id'))
node_group_template = relationship('NodeGroupTemplate',
backref="templates_relations",
lazy='joined')
floating_ip_pool = sa.Column(sa.String(36))
## EDP objects: DataSource, Job, Job Execution, JobBinary
class DataSource(mb.SaharaBase):
"""DataSource - represent a diffident types of data source,
e.g. Swift, Cassandra etc.
"""
__tablename__ = 'data_sources'
__table_args__ = (
sa.UniqueConstraint('name', 'tenant_id'),
)
id = _id_column()
tenant_id = sa.Column(sa.String(36))
name = sa.Column(sa.String(80), nullable=False)
description = sa.Column(sa.Text())
type = sa.Column(sa.String(80), nullable=False)
url = sa.Column(sa.String(256), nullable=False)
credentials = sa.Column(st.JsonDictType())
class JobExecution(mb.SaharaBase):
"""JobExecution - represent a job execution of specific cluster
"""
__tablename__ = 'job_executions'
id = _id_column()
tenant_id = sa.Column(sa.String(36))
job_id = sa.Column(sa.String(36),
sa.ForeignKey('jobs.id'))
input_id = sa.Column(sa.String(36),
sa.ForeignKey('data_sources.id'))
output_id = sa.Column(sa.String(36),
sa.ForeignKey('data_sources.id'))
start_time = sa.Column(sa.DateTime())
end_time = sa.Column(sa.DateTime())
cluster_id = sa.Column(sa.String(36),
sa.ForeignKey('clusters.id'))
info = sa.Column(st.JsonDictType())
progress = sa.Column(sa.Float)
oozie_job_id = sa.Column(sa.String(100))
return_code = sa.Column(sa.String(80))
job_configs = sa.Column(st.JsonDictType())
extra = sa.Column(st.JsonDictType())
mains_association = sa.Table("mains_association",
mb.SaharaBase.metadata,
sa.Column("Job_id",
sa.String(36),
sa.ForeignKey("jobs.id")),
sa.Column("JobBinary_id",
sa.String(36),
sa.ForeignKey("job_binaries.id"))
)
libs_association = sa.Table("libs_association",
mb.SaharaBase.metadata,
sa.Column("Job_id",
sa.String(36),
sa.ForeignKey("jobs.id")),
sa.Column("JobBinary_id",
sa.String(36),
sa.ForeignKey("job_binaries.id"))
)
class Job(mb.SaharaBase):
"""Job - description and location of a job binary
"""
__tablename__ = 'jobs'
__table_args__ = (
sa.UniqueConstraint('name', 'tenant_id'),
)
id = _id_column()
tenant_id = sa.Column(sa.String(36))
name = sa.Column(sa.String(80), nullable=False)
description = sa.Column(sa.Text())
type = sa.Column(sa.String(80), nullable=False)
mains = relationship("JobBinary",
secondary=mains_association, lazy="joined")
libs = relationship("JobBinary",
secondary=libs_association, lazy="joined")
def to_dict(self):
d = super(Job, self).to_dict()
d['mains'] = [jb.to_dict() for jb in self.mains]
d['libs'] = [jb.to_dict() for jb in self.libs]
return d
class JobBinaryInternal(mb.SaharaBase):
"""JobBinaryInternal - raw binary storage for executable jobs
"""
__tablename__ = 'job_binary_internal'
__table_args__ = (
sa.UniqueConstraint('name', 'tenant_id'),
)
id = _id_column()
tenant_id = sa.Column(sa.String(36))
name = sa.Column(sa.String(80), nullable=False)
data = sa.orm.deferred(sa.Column(sa.LargeBinary))
datasize = sa.Column(sa.BIGINT)
class JobBinary(mb.SaharaBase):
"""JobBinary - raw binary storage for executable jobs
"""
__tablename__ = 'job_binaries'
__table_args__ = (
sa.UniqueConstraint('name', 'tenant_id'),
)
id = _id_column()
tenant_id = sa.Column(sa.String(36))
name = sa.Column(sa.String(80), nullable=False)
description = sa.Column(sa.Text())
url = sa.Column(sa.String(256), nullable=False)
extra = sa.Column(st.JsonDictType())

View File

@@ -0,0 +1,110 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sqlalchemy as sa
from sqlalchemy.ext import mutable
from sahara.openstack.common import jsonutils
class JsonEncoded(sa.TypeDecorator):
"""Represents an immutable structure as a json-encoded string."""
impl = sa.Text
def process_bind_param(self, value, dialect):
if value is not None:
value = jsonutils.dumps(value)
return value
def process_result_value(self, value, dialect):
if value is not None:
value = jsonutils.loads(value)
return value
# TODO(slukjanov): verify this implementation
class MutableDict(mutable.Mutable, dict):
@classmethod
def coerce(cls, key, value):
"""Convert plain dictionaries to MutableDict."""
if not isinstance(value, MutableDict):
if isinstance(value, dict):
return MutableDict(value)
# this call will raise ValueError
return mutable.Mutable.coerce(key, value)
else:
return value
def update(self, e=None, **f):
"""Detect dictionary update events and emit change events."""
dict.update(self, e, **f)
self.changed()
def __setitem__(self, key, value):
"""Detect dictionary set events and emit change events."""
dict.__setitem__(self, key, value)
self.changed()
def __delitem__(self, key):
"""Detect dictionary del events and emit change events."""
dict.__delitem__(self, key)
self.changed()
# TODO(slukjanov): verify this implementation
class MutableList(mutable.Mutable, list):
@classmethod
def coerce(cls, key, value):
"""Convert plain lists to MutableList."""
if not isinstance(value, MutableList):
if isinstance(value, list):
return MutableList(value)
# this call will raise ValueError
return mutable.Mutable.coerce(key, value)
else:
return value
def __add__(self, value):
"""Detect list add events and emit change events."""
list.__add__(self, value)
self.changed()
def append(self, value):
"""Detect list add events and emit change events."""
list.append(self, value)
self.changed()
def __setitem__(self, key, value):
"""Detect list set events and emit change events."""
list.__setitem__(self, key, value)
self.changed()
def __delitem__(self, i):
"""Detect list del events and emit change events."""
list.__delitem__(self, i)
self.changed()
def JsonDictType():
"""Returns an SQLAlchemy Column Type suitable to store a Json dict."""
return MutableDict.as_mutable(JsonEncoded)
def JsonListType():
"""Returns an SQLAlchemy Column Type suitable to store a Json array."""
return MutableList.as_mutable(JsonEncoded)