36fc84bed3
Adding ability to edit clusters (only description and name at this moment)in order to change their public/protected attributes later APIImpact PATCH /v1.1/{tenant_id}/clusters/{cluster_id} Partially implements:: blueprint api-for-objects-update Change-Id: I7b86362e4db576ba5cdb20dff1ae904497a1852a
1230 lines
40 KiB
Python
1230 lines
40 KiB
Python
# Copyright (c) 2013 Mirantis Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Implementation of SQLAlchemy backend."""
|
|
|
|
import sys
|
|
import threading
|
|
|
|
from oslo_config import cfg
|
|
from oslo_db import exception as db_exc
|
|
from oslo_db.sqlalchemy import session as db_session
|
|
from oslo_log import log as logging
|
|
import six
|
|
import sqlalchemy as sa
|
|
|
|
from sahara.db.sqlalchemy import models as m
|
|
from sahara import exceptions as ex
|
|
from sahara.i18n import _
|
|
from sahara.i18n import _LW
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
CONF = cfg.CONF
|
|
|
|
_FACADE = None
|
|
_LOCK = threading.Lock()
|
|
|
|
|
|
def _create_facade_lazily():
|
|
global _LOCK, _FACADE
|
|
|
|
if _FACADE is None:
|
|
with _LOCK:
|
|
if _FACADE is None:
|
|
_FACADE = db_session.EngineFacade.from_config(CONF,
|
|
sqlite_fk=True)
|
|
return _FACADE
|
|
|
|
|
|
def get_engine():
|
|
facade = _create_facade_lazily()
|
|
return facade.get_engine()
|
|
|
|
|
|
def get_session(**kwargs):
|
|
facade = _create_facade_lazily()
|
|
return facade.get_session(**kwargs)
|
|
|
|
|
|
def cleanup():
|
|
global _FACADE
|
|
_FACADE = None
|
|
|
|
|
|
def get_backend():
|
|
"""The backend is this module itself."""
|
|
return sys.modules[__name__]
|
|
|
|
|
|
def model_query(model, context, session=None, project_only=True):
|
|
"""Query helper.
|
|
|
|
:param model: base model to query
|
|
:param context: context to query under
|
|
:param project_only: if present and context is user-type, then restrict
|
|
query to match the context's tenant_id.
|
|
"""
|
|
session = session or get_session()
|
|
|
|
query = session.query(model)
|
|
|
|
if project_only and not context.is_admin:
|
|
query = query.filter_by(tenant_id=context.tenant_id)
|
|
|
|
return query
|
|
|
|
|
|
def count_query(model, context, session=None, project_only=None):
|
|
"""Count query helper.
|
|
|
|
:param model: base model to query
|
|
:param context: context to query under
|
|
:param project_only: if present and context is user-type, then restrict
|
|
query to match the context's project_id.
|
|
"""
|
|
return model_query(sa.func.count(model.id), context, session, project_only)
|
|
|
|
|
|
def in_filter(query, cls, search_opts):
|
|
"""Add 'in' filters for specified columns.
|
|
|
|
Add a sqlalchemy 'in' filter to the query for any entry in the
|
|
'search_opts' dict where the key is the name of a column in
|
|
'cls' and the value is a tuple.
|
|
|
|
This allows the value of a column to be matched
|
|
against multiple possible values (OR).
|
|
|
|
Return the modified query and any entries in search_opts
|
|
whose keys do not match columns or whose values are not
|
|
tuples.
|
|
|
|
:param query: a non-null query object
|
|
:param cls: the database model class that filters will apply to
|
|
:param search_opts: a dictionary whose key/value entries are interpreted as
|
|
column names and search values
|
|
:returns: a tuple containing the modified query and a dictionary of
|
|
unused search_opts
|
|
"""
|
|
if not search_opts:
|
|
return query, search_opts
|
|
|
|
remaining = {}
|
|
for k, v in six.iteritems(search_opts):
|
|
if type(v) == tuple and k in cls.__table__.columns:
|
|
col = cls.__table__.columns[k]
|
|
query = query.filter(col.in_(v))
|
|
else:
|
|
remaining[k] = v
|
|
return query, remaining
|
|
|
|
|
|
def like_filter(query, cls, search_opts):
|
|
"""Add 'like' filters for specified columns.
|
|
|
|
Add a sqlalchemy 'like' filter to the query for any entry in the
|
|
'search_opts' dict where the key is the name of a column in
|
|
'cls' and the value is a string containing '%'.
|
|
|
|
This allows the value of a column to be matched
|
|
against simple sql string patterns using LIKE and the
|
|
'%' wildcard.
|
|
|
|
Return the modified query and any entries in search_opts
|
|
whose keys do not match columns or whose values are not
|
|
strings containing '%'.
|
|
|
|
:param query: a non-null query object
|
|
:param cls: the database model class the filters will apply to
|
|
:param search_opts: a dictionary whose key/value entries are interpreted as
|
|
column names and search patterns
|
|
:returns: a tuple containing the modified query and a dictionary of
|
|
unused search_opts
|
|
"""
|
|
if not search_opts:
|
|
return query, search_opts
|
|
|
|
remaining = {}
|
|
for k, v in six.iteritems(search_opts):
|
|
if isinstance(v, six.string_types) and (
|
|
'%' in v and k in cls.__table__.columns):
|
|
col = cls.__table__.columns[k]
|
|
query = query.filter(col.like(v))
|
|
else:
|
|
remaining[k] = v
|
|
return query, remaining
|
|
|
|
|
|
def setup_db():
|
|
try:
|
|
engine = get_engine()
|
|
m.Cluster.metadata.create_all(engine)
|
|
except sa.exc.OperationalError as e:
|
|
LOG.warning(_LW("Database registration exception: {exc}")
|
|
.format(exc=e))
|
|
return False
|
|
return True
|
|
|
|
|
|
def drop_db():
|
|
try:
|
|
engine = get_engine()
|
|
m.Cluster.metadata.drop_all(engine)
|
|
except Exception as e:
|
|
LOG.warning(_LW("Database shutdown exception: {exc}").format(exc=e))
|
|
return False
|
|
return True
|
|
|
|
|
|
# Cluster ops
|
|
|
|
def _cluster_get(context, session, cluster_id):
|
|
query = model_query(m.Cluster, context, session)
|
|
return query.filter_by(id=cluster_id).first()
|
|
|
|
|
|
def cluster_get(context, cluster_id):
|
|
return _cluster_get(context, get_session(), cluster_id)
|
|
|
|
|
|
def cluster_get_all(context, **kwargs):
|
|
query = model_query(m.Cluster, context)
|
|
return query.filter_by(**kwargs).all()
|
|
|
|
|
|
def cluster_create(context, values):
|
|
values = values.copy()
|
|
cluster = m.Cluster()
|
|
node_groups = values.pop("node_groups", [])
|
|
cluster.update(values)
|
|
|
|
session = get_session()
|
|
try:
|
|
with session.begin():
|
|
session.add(cluster)
|
|
session.flush(objects=[cluster])
|
|
|
|
for ng in node_groups:
|
|
node_group = m.NodeGroup()
|
|
node_group.update(ng)
|
|
node_group.update({"cluster_id": cluster.id})
|
|
session.add(node_group)
|
|
|
|
except db_exc.DBDuplicateEntry as e:
|
|
raise ex.DBDuplicateEntry(
|
|
_("Duplicate entry for object %(object)s. Failed on columns: "
|
|
"%(columns)s") % {"object": e.value, "columns": e.columns})
|
|
|
|
return cluster_get(context, cluster.id)
|
|
|
|
|
|
def cluster_update(context, cluster_id, values):
|
|
session = get_session()
|
|
|
|
try:
|
|
with session.begin():
|
|
cluster = _cluster_get(context, session, cluster_id)
|
|
if cluster is None:
|
|
raise ex.NotFoundException(cluster_id,
|
|
_("Cluster id '%s' not found!"))
|
|
cluster.update(values)
|
|
except db_exc.DBDuplicateEntry as e:
|
|
raise ex.DBDuplicateEntry(
|
|
_("Duplicate entry for Cluster: %s") % e.columns)
|
|
|
|
return cluster
|
|
|
|
|
|
def cluster_destroy(context, cluster_id):
|
|
session = get_session()
|
|
with session.begin():
|
|
cluster = _cluster_get(context, session, cluster_id)
|
|
if not cluster:
|
|
raise ex.NotFoundException(cluster_id,
|
|
_("Cluster id '%s' not found!"))
|
|
|
|
session.delete(cluster)
|
|
|
|
|
|
# Node Group ops
|
|
|
|
def _node_group_get(context, session, node_group_id):
|
|
query = model_query(m.NodeGroup, context, session)
|
|
return query.filter_by(id=node_group_id).first()
|
|
|
|
|
|
def node_group_add(context, cluster_id, values):
|
|
session = get_session()
|
|
|
|
with session.begin():
|
|
cluster = _cluster_get(context, session, cluster_id)
|
|
if not cluster:
|
|
raise ex.NotFoundException(cluster_id,
|
|
_("Cluster id '%s' not found!"))
|
|
|
|
node_group = m.NodeGroup()
|
|
node_group.update({"cluster_id": cluster_id})
|
|
node_group.update(values)
|
|
session.add(node_group)
|
|
|
|
return node_group.id
|
|
|
|
|
|
def node_group_update(context, node_group_id, values):
|
|
session = get_session()
|
|
with session.begin():
|
|
node_group = _node_group_get(context, session, node_group_id)
|
|
if not node_group:
|
|
raise ex.NotFoundException(node_group_id,
|
|
_("Node Group id '%s' not found!"))
|
|
|
|
node_group.update(values)
|
|
|
|
|
|
def node_group_remove(context, node_group_id):
|
|
session = get_session()
|
|
|
|
with session.begin():
|
|
node_group = _node_group_get(context, session, node_group_id)
|
|
if not node_group:
|
|
raise ex.NotFoundException(node_group_id,
|
|
_("Node Group id '%s' not found!"))
|
|
|
|
session.delete(node_group)
|
|
|
|
|
|
# Instance ops
|
|
|
|
def _instance_get(context, session, instance_id):
|
|
query = model_query(m.Instance, context, session)
|
|
return query.filter_by(id=instance_id).first()
|
|
|
|
|
|
def instance_add(context, node_group_id, values):
|
|
session = get_session()
|
|
|
|
with session.begin():
|
|
node_group = _node_group_get(context, session, node_group_id)
|
|
if not node_group:
|
|
raise ex.NotFoundException(node_group_id,
|
|
_("Node Group id '%s' not found!"))
|
|
|
|
instance = m.Instance()
|
|
instance.update({"node_group_id": node_group_id})
|
|
instance.update(values)
|
|
session.add(instance)
|
|
|
|
node_group = _node_group_get(context, session, node_group_id)
|
|
node_group.count += 1
|
|
|
|
return instance.id
|
|
|
|
|
|
def instance_update(context, instance_id, values):
|
|
session = get_session()
|
|
with session.begin():
|
|
instance = _instance_get(context, session, instance_id)
|
|
if not instance:
|
|
raise ex.NotFoundException(instance_id,
|
|
_("Instance id '%s' not found!"))
|
|
|
|
instance.update(values)
|
|
|
|
|
|
def instance_remove(context, instance_id):
|
|
session = get_session()
|
|
with session.begin():
|
|
instance = _instance_get(context, session, instance_id)
|
|
if not instance:
|
|
raise ex.NotFoundException(instance_id,
|
|
_("Instance id '%s' not found!"))
|
|
|
|
session.delete(instance)
|
|
|
|
node_group_id = instance.node_group_id
|
|
node_group = _node_group_get(context, session, node_group_id)
|
|
node_group.count -= 1
|
|
|
|
|
|
# Volumes ops
|
|
|
|
def append_volume(context, instance_id, volume_id):
|
|
session = get_session()
|
|
with session.begin():
|
|
instance = _instance_get(context, session, instance_id)
|
|
if not instance:
|
|
raise ex.NotFoundException(instance_id,
|
|
_("Instance id '%s' not found!"))
|
|
|
|
instance.volumes.append(volume_id)
|
|
|
|
|
|
def remove_volume(context, instance_id, volume_id):
|
|
session = get_session()
|
|
with session.begin():
|
|
instance = _instance_get(context, session, instance_id)
|
|
if not instance:
|
|
raise ex.NotFoundException(instance_id,
|
|
_("Instance id '%s' not found!"))
|
|
|
|
instance.volumes.remove(volume_id)
|
|
|
|
|
|
# Cluster Template ops
|
|
|
|
def _cluster_template_get(context, session, cluster_template_id):
|
|
query = model_query(m.ClusterTemplate, context, session)
|
|
return query.filter_by(id=cluster_template_id).first()
|
|
|
|
|
|
def cluster_template_get(context, cluster_template_id):
|
|
return _cluster_template_get(context, get_session(), cluster_template_id)
|
|
|
|
|
|
def cluster_template_get_all(context, **kwargs):
|
|
query = model_query(m.ClusterTemplate, context)
|
|
return query.filter_by(**kwargs).all()
|
|
|
|
|
|
def cluster_template_create(context, values):
|
|
values = values.copy()
|
|
cluster_template = m.ClusterTemplate()
|
|
node_groups = values.pop("node_groups") or []
|
|
cluster_template.update(values)
|
|
|
|
session = get_session()
|
|
try:
|
|
with session.begin():
|
|
session.add(cluster_template)
|
|
session.flush(objects=[cluster_template])
|
|
|
|
for ng in node_groups:
|
|
node_group = m.TemplatesRelation()
|
|
node_group.update({"cluster_template_id": cluster_template.id})
|
|
node_group.update(ng)
|
|
session.add(node_group)
|
|
|
|
except db_exc.DBDuplicateEntry as e:
|
|
raise ex.DBDuplicateEntry(
|
|
_("Duplicate entry for object %(object)s. Failed on columns: "
|
|
"%(columns)s") % {"object": e.value, "columns": e.columns})
|
|
|
|
return cluster_template_get(context, cluster_template.id)
|
|
|
|
|
|
def cluster_template_destroy(context, cluster_template_id,
|
|
ignore_default=False):
|
|
session = get_session()
|
|
with session.begin():
|
|
cluster_template = _cluster_template_get(context, session,
|
|
cluster_template_id)
|
|
if not cluster_template:
|
|
raise ex.NotFoundException(
|
|
cluster_template_id,
|
|
_("Cluster Template id '%s' not found!"))
|
|
elif not ignore_default and cluster_template.is_default:
|
|
raise ex.DeletionFailed(
|
|
_("Cluster template id '%s' "
|
|
"is a default template") % cluster_template.id)
|
|
|
|
session.delete(cluster_template)
|
|
|
|
|
|
def cluster_template_update(context, values, ignore_default=False):
|
|
explicit_node_groups = "node_groups" in values
|
|
if explicit_node_groups:
|
|
node_groups = values.pop("node_groups")
|
|
if node_groups is None:
|
|
node_groups = []
|
|
|
|
session = get_session()
|
|
cluster_template_id = values['id']
|
|
try:
|
|
with session.begin():
|
|
cluster_template = (_cluster_template_get(
|
|
context, session, cluster_template_id))
|
|
if not cluster_template:
|
|
raise ex.NotFoundException(
|
|
cluster_template_id,
|
|
_("Cluster Template id '%s' not found!"))
|
|
|
|
elif not ignore_default and cluster_template.is_default:
|
|
raise ex.UpdateFailedException(
|
|
cluster_template_id,
|
|
_("ClusterTemplate id '%s' can not be updated. "
|
|
"It is a default template.")
|
|
)
|
|
|
|
if len(cluster_template.clusters) > 0:
|
|
raise ex.UpdateFailedException(
|
|
cluster_template_id,
|
|
_("Cluster Template id '%s' can not be updated. "
|
|
"It is referenced by at least one cluster.")
|
|
)
|
|
cluster_template.update(values)
|
|
# The flush here will cause a duplicate entry exception if
|
|
# unique constraints are violated, before we go ahead and delete
|
|
# the node group templates
|
|
session.flush(objects=[cluster_template])
|
|
|
|
# If node_groups has not been specified, then we are
|
|
# keeping the old ones so don't delete!
|
|
if explicit_node_groups:
|
|
model_query(m.TemplatesRelation,
|
|
context, session=session).filter_by(
|
|
cluster_template_id=cluster_template_id).delete()
|
|
|
|
for ng in node_groups:
|
|
node_group = m.TemplatesRelation()
|
|
node_group.update(ng)
|
|
node_group.update({"cluster_template_id":
|
|
cluster_template_id})
|
|
session.add(node_group)
|
|
|
|
except db_exc.DBDuplicateEntry as e:
|
|
raise ex.DBDuplicateEntry(
|
|
_("Duplicate entry for ClusterTemplate: %s") % e.columns)
|
|
|
|
return cluster_template_get(context, cluster_template_id)
|
|
|
|
|
|
# Node Group Template ops
|
|
|
|
def _node_group_template_get(context, session, node_group_template_id):
|
|
query = model_query(m.NodeGroupTemplate, context, session)
|
|
return query.filter_by(id=node_group_template_id).first()
|
|
|
|
|
|
def node_group_template_get(context, node_group_template_id):
|
|
return _node_group_template_get(context, get_session(),
|
|
node_group_template_id)
|
|
|
|
|
|
def node_group_template_get_all(context, **kwargs):
|
|
query = model_query(m.NodeGroupTemplate, context)
|
|
return query.filter_by(**kwargs).all()
|
|
|
|
|
|
def node_group_template_create(context, values):
|
|
node_group_template = m.NodeGroupTemplate()
|
|
node_group_template.update(values)
|
|
|
|
session = get_session()
|
|
try:
|
|
with session.begin():
|
|
session.add(node_group_template)
|
|
except db_exc.DBDuplicateEntry as e:
|
|
raise ex.DBDuplicateEntry(
|
|
_("Duplicate entry for NodeGroupTemplate: %s") % e.columns)
|
|
|
|
return node_group_template
|
|
|
|
|
|
def node_group_template_destroy(context, node_group_template_id,
|
|
ignore_default=False):
|
|
session = get_session()
|
|
with session.begin():
|
|
node_group_template = _node_group_template_get(context, session,
|
|
node_group_template_id)
|
|
if not node_group_template:
|
|
raise ex.NotFoundException(
|
|
node_group_template_id,
|
|
_("Node Group Template id '%s' not found!"))
|
|
elif not ignore_default and node_group_template.is_default:
|
|
raise ex.DeletionFailed(
|
|
_("Node group template id '%s' "
|
|
"is a default template") % node_group_template_id)
|
|
|
|
session.delete(node_group_template)
|
|
|
|
|
|
def node_group_template_update(context, values, ignore_default=False):
|
|
session = get_session()
|
|
try:
|
|
with session.begin():
|
|
ngt_id = values['id']
|
|
ngt = _node_group_template_get(context, session, ngt_id)
|
|
if not ngt:
|
|
raise ex.NotFoundException(
|
|
ngt_id, _("NodeGroupTemplate id '%s' not found"))
|
|
elif not ignore_default and ngt.is_default:
|
|
raise ex.UpdateFailedException(
|
|
ngt_id,
|
|
_("NodeGroupTemplate id '%s' can not be updated. "
|
|
"It is a default template.")
|
|
)
|
|
|
|
# Check to see that the node group template to be updated is not in
|
|
# use by an existing cluster.
|
|
for template_relationship in ngt.templates_relations:
|
|
if len(template_relationship.cluster_template.clusters) > 0:
|
|
raise ex.UpdateFailedException(
|
|
ngt_id,
|
|
_("NodeGroupTemplate id '%s' can not be updated. "
|
|
"It is referenced by an existing cluster.")
|
|
)
|
|
|
|
ngt.update(values)
|
|
except db_exc.DBDuplicateEntry as e:
|
|
raise ex.DBDuplicateEntry(
|
|
_("Duplicate entry for NodeGroupTemplate: %s") % e.columns)
|
|
|
|
return ngt
|
|
|
|
|
|
# Data Source ops
|
|
|
|
def _data_source_get(context, session, data_source_id):
|
|
query = model_query(m.DataSource, context, session)
|
|
return query.filter_by(id=data_source_id).first()
|
|
|
|
|
|
def data_source_get(context, data_source_id):
|
|
return _data_source_get(context, get_session(), data_source_id)
|
|
|
|
|
|
def data_source_count(context, **kwargs):
|
|
"""Count DataSource objects filtered by search criteria in kwargs.
|
|
|
|
Entries in kwargs indicate column names and search values.
|
|
|
|
'in' filters will be used to search for any entries in kwargs
|
|
that name DataSource columns and have values of type tuple. This
|
|
allows column values to match multiple values (OR)
|
|
|
|
'like' filters will be used for any entries in kwargs that
|
|
name DataSource columns and have string values containing '%'.
|
|
This allows column values to match simple wildcards.
|
|
|
|
Any other entries in kwargs will be searched for using filter_by()
|
|
"""
|
|
query = model_query(m.DataSource, context)
|
|
query, kwargs = in_filter(query, m.DataSource, kwargs)
|
|
query, kwargs = like_filter(query, m.DataSource, kwargs)
|
|
|
|
# Use normal filter_by for remaining keys
|
|
return query.filter_by(**kwargs).count()
|
|
|
|
|
|
def data_source_get_all(context, **kwargs):
|
|
query = model_query(m.DataSource, context)
|
|
return query.filter_by(**kwargs).all()
|
|
|
|
|
|
def data_source_create(context, values):
|
|
data_source = m.DataSource()
|
|
data_source.update(values)
|
|
|
|
session = get_session()
|
|
try:
|
|
with session.begin():
|
|
session.add(data_source)
|
|
except db_exc.DBDuplicateEntry as e:
|
|
raise ex.DBDuplicateEntry(
|
|
_("Duplicate entry for DataSource: %s") % e.columns)
|
|
|
|
return data_source
|
|
|
|
|
|
def data_source_destroy(context, data_source_id):
|
|
session = get_session()
|
|
try:
|
|
with session.begin():
|
|
data_source = _data_source_get(context, session, data_source_id)
|
|
if not data_source:
|
|
raise ex.NotFoundException(
|
|
data_source_id,
|
|
_("Data Source id '%s' not found!"))
|
|
session.delete(data_source)
|
|
except db_exc.DBError as e:
|
|
msg = ("foreign key constraint" in six.text_type(e) and
|
|
_(" on foreign key constraint") or "")
|
|
raise ex.DeletionFailed(_("Data Source deletion failed%s") % msg)
|
|
|
|
|
|
def data_source_update(context, values):
|
|
session = get_session()
|
|
try:
|
|
with session.begin():
|
|
ds_id = values['id']
|
|
data_source = _data_source_get(context, session, ds_id)
|
|
if not data_source:
|
|
raise ex.NotFoundException(
|
|
ds_id, _("DataSource id '%s' not found"))
|
|
else:
|
|
jobs = job_execution_get_all(context)
|
|
pending_jobs = [job for job in jobs if
|
|
job.info["status"] == "PENDING"]
|
|
for job in pending_jobs:
|
|
if job.data_source_urls:
|
|
if ds_id in job.data_source_urls:
|
|
raise ex.UpdateFailedException(
|
|
_("DataSource is used in a "
|
|
"PENDING Job and can not be updated."))
|
|
data_source.update(values)
|
|
except db_exc.DBDuplicateEntry as e:
|
|
raise ex.DBDuplicateEntry(
|
|
_("Duplicate entry for DataSource: %s") % e.columns)
|
|
|
|
return data_source
|
|
|
|
|
|
# JobExecution ops
|
|
|
|
def _job_execution_get(context, session, job_execution_id):
|
|
query = model_query(m.JobExecution, context, session)
|
|
return query.filter_by(id=job_execution_id).first()
|
|
|
|
|
|
def job_execution_get(context, job_execution_id):
|
|
return _job_execution_get(context, get_session(), job_execution_id)
|
|
|
|
|
|
def job_execution_get_all(context, **kwargs):
|
|
"""Get all JobExecutions filtered by **kwargs.
|
|
|
|
kwargs key values may be the names of fields in a JobExecution
|
|
plus the following special values with the indicated meaning:
|
|
|
|
'cluster.name' -- name of the Cluster referenced by the JobExecution
|
|
'job.name' -- name of the Job referenced by the JobExecution
|
|
'status' -- JobExecution['info']['status']
|
|
|
|
e.g. job_execution_get_all(cluster_id=12, input_id=123)
|
|
job_execution_get_all(**{'cluster.name': 'test',
|
|
'job.name': 'wordcount'})
|
|
"""
|
|
query = model_query(m.JobExecution, context)
|
|
|
|
# Remove the external fields if present, they'll
|
|
# be handled with a join and filter
|
|
externals = {k: kwargs.pop(k) for k in ['cluster.name',
|
|
'job.name',
|
|
'status'] if k in kwargs}
|
|
|
|
# Filter JobExecution by the remaining kwargs. This has to be done
|
|
# before application of the joins and filters because those
|
|
# change the class that query.filter_by will apply to
|
|
query = query.filter_by(**kwargs)
|
|
|
|
# Now add the joins and filters for the externals
|
|
if 'cluster.name' in externals:
|
|
query = query.join(m.Cluster).filter(
|
|
m.Cluster.name == externals['cluster.name'])
|
|
|
|
if 'job.name' in externals:
|
|
query = query.join(m.Job).filter(
|
|
m.Job.name == externals['job.name'])
|
|
|
|
res = query.all()
|
|
|
|
# 'info' is a JsonDictType which is stored as a string.
|
|
# It would be possible to search for the substring containing
|
|
# the value of 'status' in 'info', but 'info' also contains
|
|
# data returned from a client and not managed by Sahara.
|
|
# In the case of Oozie jobs, for example, other fields (actions)
|
|
# also contain 'status'. Therefore we can't filter on it reliably
|
|
# by a substring search in the query.
|
|
if 'status' in externals:
|
|
status = externals['status'].lower()
|
|
res = [je for je in res if (
|
|
je['info'] and je['info'].get('status', '').lower() == status)]
|
|
return res
|
|
|
|
|
|
def job_execution_count(context, **kwargs):
|
|
query = count_query(m.JobExecution, context)
|
|
return query.filter_by(**kwargs).first()[0]
|
|
|
|
|
|
def _get_config_section(configs, mapping_type):
|
|
if mapping_type not in configs:
|
|
configs[mapping_type] = [] if mapping_type == "args" else {}
|
|
return configs[mapping_type]
|
|
|
|
|
|
def _merge_execution_interface(job_ex, job, execution_interface):
|
|
"""Merges the interface for a job execution with that of its job."""
|
|
configs = job_ex.job_configs or {}
|
|
nonexistent = object()
|
|
positional_args = {}
|
|
for arg in job.interface:
|
|
value = nonexistent
|
|
typed_configs = _get_config_section(configs, arg.mapping_type)
|
|
# Interface args are our first choice for the value.
|
|
if arg.name in execution_interface:
|
|
value = execution_interface[arg.name]
|
|
else:
|
|
# If a default exists, we can use that, but...
|
|
if arg.default is not None:
|
|
value = arg.default
|
|
# We should prefer an argument passed through the
|
|
# job_configs that maps to the same location.
|
|
if arg.mapping_type != "args":
|
|
value = typed_configs.get(arg.location, value)
|
|
if value is not nonexistent:
|
|
if arg.mapping_type != "args":
|
|
typed_configs[arg.location] = value
|
|
else:
|
|
positional_args[int(arg.location)] = value
|
|
if positional_args:
|
|
positional_args = [positional_args[i] for i
|
|
in range(len(positional_args))]
|
|
configs["args"] = positional_args + configs["args"]
|
|
if configs and not job_ex.job_configs:
|
|
job_ex.job_configs = configs
|
|
|
|
|
|
def job_execution_create(context, values):
|
|
session = get_session()
|
|
execution_interface = values.pop('interface', {})
|
|
job_ex = m.JobExecution()
|
|
job_ex.update(values)
|
|
|
|
try:
|
|
with session.begin():
|
|
job_ex.interface = []
|
|
job = _job_get(context, session, job_ex.job_id)
|
|
if job.interface:
|
|
_merge_execution_interface(job_ex, job, execution_interface)
|
|
session.add(job_ex)
|
|
except db_exc.DBDuplicateEntry as e:
|
|
raise ex.DBDuplicateEntry(
|
|
_("Duplicate entry for JobExecution: %s") % e.columns)
|
|
|
|
return job_ex
|
|
|
|
|
|
def job_execution_update(context, job_execution_id, values):
|
|
session = get_session()
|
|
|
|
with session.begin():
|
|
job_ex = _job_execution_get(context, session, job_execution_id)
|
|
if not job_ex:
|
|
raise ex.NotFoundException(job_execution_id,
|
|
_("JobExecution id '%s' not found!"))
|
|
job_ex.update(values)
|
|
session.add(job_ex)
|
|
|
|
return job_ex
|
|
|
|
|
|
def job_execution_destroy(context, job_execution_id):
|
|
session = get_session()
|
|
with session.begin():
|
|
job_ex = _job_execution_get(context, session, job_execution_id)
|
|
if not job_ex:
|
|
raise ex.NotFoundException(job_execution_id,
|
|
_("JobExecution id '%s' not found!"))
|
|
|
|
session.delete(job_ex)
|
|
|
|
|
|
# Job ops
|
|
|
|
def _job_get(context, session, job_id):
|
|
query = model_query(m.Job, context, session)
|
|
return query.filter_by(id=job_id).first()
|
|
|
|
|
|
def job_get(context, job_id):
|
|
return _job_get(context, get_session(), job_id)
|
|
|
|
|
|
def job_get_all(context, **kwargs):
|
|
query = model_query(m.Job, context)
|
|
return query.filter_by(**kwargs).all()
|
|
|
|
|
|
def _append_job_binaries(context, session, from_list, to_list):
|
|
for job_binary_id in from_list:
|
|
job_binary = model_query(
|
|
m.JobBinary, context, session).filter_by(id=job_binary_id).first()
|
|
if job_binary is not None:
|
|
to_list.append(job_binary)
|
|
|
|
|
|
def _append_interface(context, from_list, to_list):
|
|
for order, argument_values in enumerate(from_list):
|
|
argument_values['tenant_id'] = context.tenant_id
|
|
argument_values['order'] = order
|
|
argument = m.JobInterfaceArgument()
|
|
argument.update(argument_values)
|
|
to_list.append(argument)
|
|
|
|
|
|
def job_create(context, values):
|
|
mains = values.pop("mains", [])
|
|
libs = values.pop("libs", [])
|
|
interface = values.pop("interface", [])
|
|
|
|
session = get_session()
|
|
try:
|
|
with session.begin():
|
|
job = m.Job()
|
|
job.update(values)
|
|
# These are 'lazy' objects. The initialization below
|
|
# is needed here because it provides libs, mains, and
|
|
# interface to be initialized within a session even if
|
|
# the lists are empty
|
|
job.mains = []
|
|
job.libs = []
|
|
job.interface = []
|
|
|
|
_append_job_binaries(context, session, mains, job.mains)
|
|
_append_job_binaries(context, session, libs, job.libs)
|
|
_append_interface(context, interface, job.interface)
|
|
|
|
session.add(job)
|
|
|
|
except db_exc.DBDuplicateEntry as e:
|
|
raise ex.DBDuplicateEntry(
|
|
_("Duplicate entry for Job: %s") % e.columns)
|
|
|
|
return job
|
|
|
|
|
|
def job_update(context, job_id, values):
|
|
session = get_session()
|
|
|
|
try:
|
|
with session.begin():
|
|
job = _job_get(context, session, job_id)
|
|
if not job:
|
|
raise ex.NotFoundException(job_id,
|
|
_("Job id '%s' not found!"))
|
|
job.update(values)
|
|
session.add(job)
|
|
except db_exc.DBDuplicateEntry as e:
|
|
raise ex.DBDuplicateEntry(
|
|
_("Duplicate entry for Job: %s") % e.columns)
|
|
|
|
return job
|
|
|
|
|
|
def job_destroy(context, job_id):
|
|
session = get_session()
|
|
try:
|
|
with session.begin():
|
|
job = _job_get(context, session, job_id)
|
|
if not job:
|
|
raise ex.NotFoundException(job_id,
|
|
_("Job id '%s' not found!"))
|
|
session.delete(job)
|
|
except db_exc.DBError as e:
|
|
msg = ("foreign key constraint" in six.text_type(e) and
|
|
_(" on foreign key constraint") or "")
|
|
raise ex.DeletionFailed(_("Job deletion failed%s") % msg)
|
|
|
|
|
|
# JobBinary ops
|
|
|
|
def _job_binary_get(context, session, job_binary_id):
|
|
query = model_query(m.JobBinary, context, session)
|
|
return query.filter_by(id=job_binary_id).first()
|
|
|
|
|
|
def job_binary_get_all(context, **kwargs):
|
|
"""Returns JobBinary objects that do not contain a data field
|
|
|
|
The data column uses deferred loading.
|
|
"""
|
|
query = model_query(m.JobBinary, context)
|
|
return query.filter_by(**kwargs).all()
|
|
|
|
|
|
def job_binary_get(context, job_binary_id):
|
|
"""Returns a JobBinary object that does not contain a data field
|
|
|
|
The data column uses deferred loadling.
|
|
"""
|
|
return _job_binary_get(context, get_session(), job_binary_id)
|
|
|
|
|
|
def job_binary_create(context, values):
|
|
"""Returns a JobBinary that does not contain a data field
|
|
|
|
The data column uses deferred loading.
|
|
"""
|
|
job_binary = m.JobBinary()
|
|
job_binary.update(values)
|
|
|
|
session = get_session()
|
|
try:
|
|
with session.begin():
|
|
session.add(job_binary)
|
|
except db_exc.DBDuplicateEntry as e:
|
|
raise ex.DBDuplicateEntry(
|
|
_("Duplicate entry for JobBinary: %s") % e.columns)
|
|
|
|
return job_binary
|
|
|
|
|
|
def job_binary_update(context, values):
|
|
"""Returns a JobBinary updated with the provided values."""
|
|
jb_id = values["id"]
|
|
session = get_session()
|
|
try:
|
|
with session.begin():
|
|
jb = _job_binary_get(context, session, jb_id)
|
|
if not jb:
|
|
raise ex.NotFoundException(
|
|
jb_id, _("JobBinary id '%s' not found"))
|
|
# We do not want to update the url for internal binaries
|
|
new_url = values.get("url", None)
|
|
if new_url and "internal-db://" in jb["url"]:
|
|
if jb["url"] != new_url:
|
|
raise ex.UpdateFailedException(
|
|
jb_id,
|
|
_("The url for JobBinary Id '%s' can not "
|
|
"be updated because it is an internal-db url."))
|
|
jobs = job_execution_get_all(context)
|
|
pending_jobs = [job for job in jobs if
|
|
job.info["status"] == "PENDING"]
|
|
if len(pending_jobs) > 0:
|
|
for job in pending_jobs:
|
|
if _check_job_binary_referenced(
|
|
context, session, jb_id, job.job_id):
|
|
raise ex.UpdateFailedException(
|
|
jb_id,
|
|
_("JobBinary Id '%s' is used in a PENDING job "
|
|
"and can not be updated."))
|
|
jb.update(values)
|
|
except db_exc.DBDuplicateEntry as e:
|
|
raise ex.DBDuplicateEntry(
|
|
_("Duplicate entry for JobBinary: %s") % e.columns)
|
|
|
|
return jb
|
|
|
|
|
|
def _check_job_binary_referenced(ctx, session, job_binary_id, job_id=None):
|
|
args = {"JobBinary_id": job_binary_id}
|
|
if job_id:
|
|
args["Job_id"] = job_id
|
|
mains = model_query(m.mains_association, ctx, session,
|
|
project_only=False).filter_by(**args)
|
|
libs = model_query(m.libs_association, ctx, session,
|
|
project_only=False).filter_by(**args)
|
|
|
|
return mains.first() is not None or libs.first() is not None
|
|
|
|
|
|
def job_binary_destroy(context, job_binary_id):
|
|
session = get_session()
|
|
with session.begin():
|
|
job_binary = _job_binary_get(context, session, job_binary_id)
|
|
if not job_binary:
|
|
raise ex.NotFoundException(job_binary_id,
|
|
_("JobBinary id '%s' not found!"))
|
|
|
|
if _check_job_binary_referenced(context, session, job_binary_id):
|
|
raise ex.DeletionFailed(
|
|
_("JobBinary is referenced and cannot be deleted"))
|
|
|
|
session.delete(job_binary)
|
|
|
|
|
|
# JobBinaryInternal ops
|
|
|
|
def _job_binary_internal_get(context, session, job_binary_internal_id):
|
|
query = model_query(m.JobBinaryInternal, context, session)
|
|
return query.filter_by(id=job_binary_internal_id).first()
|
|
|
|
|
|
def job_binary_internal_get_all(context, **kwargs):
|
|
"""Returns JobBinaryInternal objects that do not contain a data field
|
|
|
|
The data column uses deferred loading.
|
|
"""
|
|
query = model_query(m.JobBinaryInternal, context)
|
|
return query.filter_by(**kwargs).all()
|
|
|
|
|
|
def job_binary_internal_get(context, job_binary_internal_id):
|
|
"""Returns a JobBinaryInternal object that does not contain a data field
|
|
|
|
The data column uses deferred loadling.
|
|
"""
|
|
return _job_binary_internal_get(context, get_session(),
|
|
job_binary_internal_id)
|
|
|
|
|
|
def job_binary_internal_get_raw_data(context, job_binary_internal_id):
|
|
"""Returns only the data field for the specified JobBinaryInternal."""
|
|
query = model_query(m.JobBinaryInternal, context)
|
|
res = query.filter_by(id=job_binary_internal_id).first()
|
|
|
|
if res is not None:
|
|
datasize_KB = res.datasize / 1024.0
|
|
if datasize_KB > CONF.job_binary_max_KB:
|
|
raise ex.DataTooBigException(
|
|
round(datasize_KB, 1), CONF.job_binary_max_KB,
|
|
_("Size of internal binary (%(size)sKB) is greater than the "
|
|
"maximum (%(maximum)sKB)"))
|
|
|
|
# This assignment is sufficient to load the deferred column
|
|
res = res.data
|
|
return res
|
|
|
|
|
|
def job_binary_internal_create(context, values):
|
|
"""Returns a JobBinaryInternal that does not contain a data field
|
|
|
|
The data column uses deferred loading.
|
|
"""
|
|
values["datasize"] = len(values["data"])
|
|
datasize_KB = values["datasize"] / 1024.0
|
|
if datasize_KB > CONF.job_binary_max_KB:
|
|
raise ex.DataTooBigException(
|
|
round(datasize_KB, 1), CONF.job_binary_max_KB,
|
|
_("Size of internal binary (%(size)sKB) is greater "
|
|
"than the maximum (%(maximum)sKB)"))
|
|
|
|
job_binary_int = m.JobBinaryInternal()
|
|
job_binary_int.update(values)
|
|
|
|
session = get_session()
|
|
try:
|
|
with session.begin():
|
|
session.add(job_binary_int)
|
|
except db_exc.DBDuplicateEntry as e:
|
|
raise ex.DBDuplicateEntry(
|
|
_("Duplicate entry for JobBinaryInternal: %s") % e.columns)
|
|
|
|
return job_binary_internal_get(context, job_binary_int.id)
|
|
|
|
|
|
def job_binary_internal_destroy(context, job_binary_internal_id):
|
|
session = get_session()
|
|
with session.begin():
|
|
job_binary_internal = _job_binary_internal_get(context, session,
|
|
job_binary_internal_id)
|
|
if not job_binary_internal:
|
|
raise ex.NotFoundException(
|
|
job_binary_internal_id,
|
|
_("JobBinaryInternal id '%s' not found!"))
|
|
|
|
session.delete(job_binary_internal)
|
|
|
|
|
|
def job_binary_internal_update(context, job_binary_internal_id, values):
|
|
"""Returns a JobBinary updated with the provided values."""
|
|
session = get_session()
|
|
try:
|
|
with session.begin():
|
|
j_b_i = _job_binary_internal_get(
|
|
context, session, job_binary_internal_id)
|
|
if not j_b_i:
|
|
raise ex.NotFoundException(
|
|
job_binary_internal_id,
|
|
_("JobBinaryInternal id '%s' not found!"))
|
|
j_b_i.update(values)
|
|
except db_exc.DBDuplicateEntry as e:
|
|
raise ex.DBDuplicateEntry(
|
|
_("Duplicate entry for JobBinaryInternal: %s") % e.columns)
|
|
|
|
return j_b_i
|
|
|
|
|
|
# Events ops
|
|
|
|
|
|
def _cluster_provision_step_get(context, session, provision_step_id):
|
|
query = model_query(m.ClusterProvisionStep, context, session)
|
|
return query.filter_by(id=provision_step_id).first()
|
|
|
|
|
|
def _cluster_provision_step_update(context, session, step_id):
|
|
step = _cluster_provision_step_get(context, session, step_id)
|
|
|
|
if step is None:
|
|
raise ex.NotFoundException(
|
|
step_id,
|
|
_("Cluster Provision Step id '%s' not found!"))
|
|
|
|
if step.successful is not None:
|
|
return
|
|
if len(step.events) == step.total:
|
|
for event in step.events:
|
|
session.delete(event)
|
|
step.update({'successful': True})
|
|
|
|
|
|
def cluster_provision_step_add(context, cluster_id, values):
|
|
session = get_session()
|
|
|
|
with session.begin():
|
|
cluster = _cluster_get(context, session, cluster_id)
|
|
if not cluster:
|
|
raise ex.NotFoundException(cluster_id,
|
|
_("Cluster id '%s' not found!"))
|
|
|
|
provision_step = m.ClusterProvisionStep()
|
|
values['cluster_id'] = cluster_id
|
|
values['tenant_id'] = context.tenant_id
|
|
provision_step.update(values)
|
|
session.add(provision_step)
|
|
|
|
return provision_step.id
|
|
|
|
|
|
def cluster_provision_step_update(context, step_id):
|
|
if CONF.disable_event_log:
|
|
return
|
|
session = get_session()
|
|
with session.begin():
|
|
_cluster_provision_step_update(context, session, step_id)
|
|
|
|
|
|
def cluster_provision_progress_update(context, cluster_id):
|
|
if CONF.disable_event_log:
|
|
return _cluster_get(context, get_session(), cluster_id)
|
|
session = get_session()
|
|
with session.begin():
|
|
cluster = _cluster_get(context, session, cluster_id)
|
|
|
|
if cluster is None:
|
|
raise ex.NotFoundException(cluster_id,
|
|
_("Cluster id '%s' not found!"))
|
|
for step in cluster.provision_progress:
|
|
if step.successful is None:
|
|
_cluster_provision_step_update(context, session, step.id)
|
|
result_cluster = _cluster_get(context, session, cluster_id)
|
|
return result_cluster
|
|
|
|
|
|
def cluster_event_add(context, step_id, values):
|
|
session = get_session()
|
|
|
|
with session.begin():
|
|
provision_step = _cluster_provision_step_get(
|
|
context, session, step_id)
|
|
|
|
if not provision_step:
|
|
raise ex.NotFoundException(
|
|
step_id,
|
|
_("Cluster Provision Step id '%s' not found!"))
|
|
|
|
event = m.ClusterEvent()
|
|
values['step_id'] = step_id
|
|
if not values['successful']:
|
|
provision_step.update({'successful': False})
|
|
event.update(values)
|
|
session.add(event)
|
|
|
|
return event.id
|