Merge "Fix Senlin performance issues"
This commit is contained in:
commit
71d9a66abe
9
releasenotes/notes/bug-1817604-41d4b8f6c6f920e4.yaml
Normal file
9
releasenotes/notes/bug-1817604-41d4b8f6c6f920e4.yaml
Normal file
@ -0,0 +1,9 @@
|
||||
---
|
||||
fixes:
|
||||
- |
|
||||
[`bug 1817604 <https://bugs.launchpad.net/senlin/+bug/1817604>`_]
|
||||
Fixes major performance bugs within senlin by improving database
|
||||
interaction. This was completed by updating the database models to
|
||||
properly take advantage of relationships. Additionally removes
|
||||
unnecessary database calls and prefers joins instead to retrieve
|
||||
object data.
|
@ -29,7 +29,7 @@ from oslo_log import log as logging
|
||||
from oslo_utils import timeutils
|
||||
import osprofiler.sqlalchemy
|
||||
import sqlalchemy
|
||||
from sqlalchemy.orm import joinedload_all
|
||||
from sqlalchemy.orm import joinedload
|
||||
from sqlalchemy.sql.expression import func
|
||||
|
||||
from senlin.common import consts
|
||||
@ -87,14 +87,9 @@ def retry_on_deadlock(f):
|
||||
max_retry_interval=CONF.database_max_retry_interval)(f)
|
||||
|
||||
|
||||
def model_query(context, *args):
|
||||
with session_for_read() as session:
|
||||
query = session.query(*args).options(joinedload_all('*'))
|
||||
return query
|
||||
|
||||
|
||||
def query_by_short_id(context, model, short_id, project_safe=True):
|
||||
q = model_query(context, model)
|
||||
def query_by_short_id(context, model_query, model, short_id,
|
||||
project_safe=True):
|
||||
q = model_query()
|
||||
q = q.filter(model.id.like('%s%%' % short_id))
|
||||
|
||||
if project_safe:
|
||||
@ -108,8 +103,8 @@ def query_by_short_id(context, model, short_id, project_safe=True):
|
||||
raise exception.MultipleChoices(arg=short_id)
|
||||
|
||||
|
||||
def query_by_name(context, model, name, project_safe=True):
|
||||
q = model_query(context, model)
|
||||
def query_by_name(context, model_query, name, project_safe=True):
|
||||
q = model_query()
|
||||
q = q.filter_by(name=name)
|
||||
|
||||
if project_safe:
|
||||
@ -124,17 +119,27 @@ def query_by_name(context, model, name, project_safe=True):
|
||||
|
||||
|
||||
# Clusters
|
||||
def cluster_model_query():
|
||||
with session_for_read() as session:
|
||||
query = session.query(models.Cluster).options(
|
||||
joinedload(models.Cluster.nodes),
|
||||
joinedload(models.Cluster.profile),
|
||||
joinedload(models.Cluster.policies)
|
||||
)
|
||||
return query
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
def cluster_create(context, values):
|
||||
with session_for_write() as session:
|
||||
cluster_ref = models.Cluster()
|
||||
cluster_ref.update(values)
|
||||
session.add(cluster_ref)
|
||||
return cluster_ref
|
||||
return cluster_get(context, cluster_ref.id)
|
||||
|
||||
|
||||
def cluster_get(context, cluster_id, project_safe=True):
|
||||
cluster = model_query(context, models.Cluster).get(cluster_id)
|
||||
cluster = cluster_model_query().get(cluster_id)
|
||||
|
||||
if cluster is None:
|
||||
return None
|
||||
@ -146,17 +151,17 @@ def cluster_get(context, cluster_id, project_safe=True):
|
||||
|
||||
|
||||
def cluster_get_by_name(context, name, project_safe=True):
|
||||
return query_by_name(context, models.Cluster, name,
|
||||
return query_by_name(context, cluster_model_query, name,
|
||||
project_safe=project_safe)
|
||||
|
||||
|
||||
def cluster_get_by_short_id(context, short_id, project_safe=True):
|
||||
return query_by_short_id(context, models.Cluster, short_id,
|
||||
project_safe=project_safe)
|
||||
return query_by_short_id(context, cluster_model_query, models.Cluster,
|
||||
short_id, project_safe=project_safe)
|
||||
|
||||
|
||||
def _query_cluster_get_all(context, project_safe=True):
|
||||
query = model_query(context, models.Cluster)
|
||||
query = cluster_model_query()
|
||||
|
||||
if project_safe:
|
||||
query = query.filter_by(project=context.project_id)
|
||||
@ -171,7 +176,7 @@ def cluster_get_all(context, limit=None, marker=None, sort=None, filters=None,
|
||||
|
||||
keys, dirs = utils.get_sort_params(sort, consts.CLUSTER_INIT_AT)
|
||||
if marker:
|
||||
marker = model_query(context, models.Cluster).get(marker)
|
||||
marker = cluster_model_query().get(marker)
|
||||
|
||||
return sa_utils.paginate_query(query, models.Cluster, limit, keys,
|
||||
marker=marker, sort_dirs=dirs).all()
|
||||
@ -233,6 +238,14 @@ def cluster_delete(context, cluster_id):
|
||||
|
||||
|
||||
# Nodes
|
||||
def node_model_query():
|
||||
with session_for_read() as session:
|
||||
query = session.query(models.Node).options(
|
||||
joinedload(models.Node.profile)
|
||||
)
|
||||
return query
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
def node_create(context, values):
|
||||
# This operation is always called with cluster and node locked
|
||||
@ -244,7 +257,7 @@ def node_create(context, values):
|
||||
|
||||
|
||||
def node_get(context, node_id, project_safe=True):
|
||||
node = model_query(context, models.Node).get(node_id)
|
||||
node = node_model_query().get(node_id)
|
||||
if not node:
|
||||
return None
|
||||
|
||||
@ -256,16 +269,17 @@ def node_get(context, node_id, project_safe=True):
|
||||
|
||||
|
||||
def node_get_by_name(context, name, project_safe=True):
|
||||
return query_by_name(context, models.Node, name, project_safe=project_safe)
|
||||
return query_by_name(context, node_model_query, name,
|
||||
project_safe=project_safe)
|
||||
|
||||
|
||||
def node_get_by_short_id(context, short_id, project_safe=True):
|
||||
return query_by_short_id(context, models.Node, short_id,
|
||||
return query_by_short_id(context, node_model_query, models.Node, short_id,
|
||||
project_safe=project_safe)
|
||||
|
||||
|
||||
def _query_node_get_all(context, project_safe=True, cluster_id=None):
|
||||
query = model_query(context, models.Node)
|
||||
query = node_model_query()
|
||||
|
||||
if cluster_id is not None:
|
||||
query = query.filter_by(cluster_id=cluster_id)
|
||||
@ -286,7 +300,7 @@ def node_get_all(context, cluster_id=None, limit=None, marker=None, sort=None,
|
||||
|
||||
keys, dirs = utils.get_sort_params(sort, consts.NODE_INIT_AT)
|
||||
if marker:
|
||||
marker = model_query(context, models.Node).get(marker)
|
||||
marker = node_model_query().get(marker)
|
||||
return sa_utils.paginate_query(query, models.Node, limit, keys,
|
||||
marker=marker, sort_dirs=dirs).all()
|
||||
|
||||
@ -314,7 +328,7 @@ def node_ids_by_cluster(context, cluster_id, filters=None):
|
||||
|
||||
def node_count_by_cluster(context, cluster_id, **kwargs):
|
||||
project_safe = kwargs.pop('project_safe', True)
|
||||
query = model_query(context, models.Node)
|
||||
query = node_model_query()
|
||||
query = query.filter_by(cluster_id=cluster_id)
|
||||
query = query.filter_by(**kwargs)
|
||||
if project_safe:
|
||||
@ -575,6 +589,14 @@ def node_lock_steal(node_id, action_id):
|
||||
|
||||
|
||||
# Policies
|
||||
def policy_model_query():
|
||||
with session_for_read() as session:
|
||||
query = session.query(models.Policy).options(
|
||||
joinedload(models.Policy.bindings)
|
||||
)
|
||||
return query
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
def policy_create(context, values):
|
||||
with session_for_write() as session:
|
||||
@ -585,7 +607,7 @@ def policy_create(context, values):
|
||||
|
||||
|
||||
def policy_get(context, policy_id, project_safe=True):
|
||||
policy = model_query(context, models.Policy)
|
||||
policy = policy_model_query()
|
||||
policy = policy.filter_by(id=policy_id).first()
|
||||
|
||||
if policy is None:
|
||||
@ -599,18 +621,18 @@ def policy_get(context, policy_id, project_safe=True):
|
||||
|
||||
|
||||
def policy_get_by_name(context, name, project_safe=True):
|
||||
return query_by_name(context, models.Policy, name,
|
||||
return query_by_name(context, policy_model_query, name,
|
||||
project_safe=project_safe)
|
||||
|
||||
|
||||
def policy_get_by_short_id(context, short_id, project_safe=True):
|
||||
return query_by_short_id(context, models.Policy, short_id,
|
||||
project_safe=project_safe)
|
||||
return query_by_short_id(context, policy_model_query, models.Policy,
|
||||
short_id, project_safe=project_safe)
|
||||
|
||||
|
||||
def policy_get_all(context, limit=None, marker=None, sort=None, filters=None,
|
||||
project_safe=True):
|
||||
query = model_query(context, models.Policy)
|
||||
query = policy_model_query()
|
||||
|
||||
if project_safe:
|
||||
query = query.filter_by(project=context.project_id)
|
||||
@ -620,7 +642,7 @@ def policy_get_all(context, limit=None, marker=None, sort=None, filters=None,
|
||||
|
||||
keys, dirs = utils.get_sort_params(sort, consts.POLICY_CREATED_AT)
|
||||
if marker:
|
||||
marker = model_query(context, models.Policy).get(marker)
|
||||
marker = policy_model_query().get(marker)
|
||||
return sa_utils.paginate_query(query, models.Policy, limit, keys,
|
||||
marker=marker, sort_dirs=dirs).all()
|
||||
|
||||
@ -653,8 +675,14 @@ def policy_delete(context, policy_id):
|
||||
|
||||
|
||||
# Cluster-Policy Associations
|
||||
def cluster_policy_model_query():
|
||||
with session_for_read() as session:
|
||||
query = session.query(models.ClusterPolicies)
|
||||
return query
|
||||
|
||||
|
||||
def cluster_policy_get(context, cluster_id, policy_id):
|
||||
query = model_query(context, models.ClusterPolicies)
|
||||
query = cluster_policy_model_query()
|
||||
bindings = query.filter_by(cluster_id=cluster_id,
|
||||
policy_id=policy_id)
|
||||
return bindings.first()
|
||||
@ -699,7 +727,7 @@ def cluster_policy_ids_by_cluster(context, cluster_id):
|
||||
|
||||
def cluster_policy_get_by_type(context, cluster_id, policy_type, filters=None):
|
||||
|
||||
query = model_query(context, models.ClusterPolicies)
|
||||
query = cluster_policy_model_query()
|
||||
query = query.filter_by(cluster_id=cluster_id)
|
||||
|
||||
key_enabled = consts.CP_ENABLED
|
||||
@ -715,7 +743,7 @@ def cluster_policy_get_by_type(context, cluster_id, policy_type, filters=None):
|
||||
|
||||
def cluster_policy_get_by_name(context, cluster_id, policy_name, filters=None):
|
||||
|
||||
query = model_query(context, models.ClusterPolicies)
|
||||
query = cluster_policy_model_query()
|
||||
query = query.filter_by(cluster_id=cluster_id)
|
||||
|
||||
key_enabled = consts.CP_ENABLED
|
||||
@ -812,6 +840,12 @@ def cluster_remove_dependents(context, cluster_id, profile_id):
|
||||
|
||||
|
||||
# Profiles
|
||||
def profile_model_query():
|
||||
with session_for_read() as session:
|
||||
query = session.query(models.Profile)
|
||||
return query
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
def profile_create(context, values):
|
||||
with session_for_write() as session:
|
||||
@ -822,7 +856,7 @@ def profile_create(context, values):
|
||||
|
||||
|
||||
def profile_get(context, profile_id, project_safe=True):
|
||||
query = model_query(context, models.Profile)
|
||||
query = profile_model_query()
|
||||
profile = query.get(profile_id)
|
||||
|
||||
if profile is None:
|
||||
@ -836,18 +870,18 @@ def profile_get(context, profile_id, project_safe=True):
|
||||
|
||||
|
||||
def profile_get_by_name(context, name, project_safe=True):
|
||||
return query_by_name(context, models.Profile, name,
|
||||
return query_by_name(context, profile_model_query, name,
|
||||
project_safe=project_safe)
|
||||
|
||||
|
||||
def profile_get_by_short_id(context, short_id, project_safe=True):
|
||||
return query_by_short_id(context, models.Profile, short_id,
|
||||
project_safe=project_safe)
|
||||
return query_by_short_id(context, profile_model_query, models.Profile,
|
||||
short_id, project_safe=project_safe)
|
||||
|
||||
|
||||
def profile_get_all(context, limit=None, marker=None, sort=None, filters=None,
|
||||
project_safe=True):
|
||||
query = model_query(context, models.Profile)
|
||||
query = profile_model_query()
|
||||
|
||||
if project_safe:
|
||||
query = query.filter_by(project=context.project_id)
|
||||
@ -857,7 +891,7 @@ def profile_get_all(context, limit=None, marker=None, sort=None, filters=None,
|
||||
|
||||
keys, dirs = utils.get_sort_params(sort, consts.PROFILE_CREATED_AT)
|
||||
if marker:
|
||||
marker = model_query(context, models.Profile).get(marker)
|
||||
marker = profile_model_query().get(marker)
|
||||
return sa_utils.paginate_query(query, models.Profile, limit, keys,
|
||||
marker=marker, sort_dirs=dirs).all()
|
||||
|
||||
@ -895,6 +929,12 @@ def profile_delete(context, profile_id):
|
||||
|
||||
|
||||
# Credentials
|
||||
def credential_model_query():
|
||||
with session_for_read() as session:
|
||||
query = session.query(models.Credential)
|
||||
return query
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
def cred_create(context, values):
|
||||
with session_for_write() as session:
|
||||
@ -905,7 +945,7 @@ def cred_create(context, values):
|
||||
|
||||
|
||||
def cred_get(context, user, project):
|
||||
return model_query(context, models.Credential).get((user, project))
|
||||
return credential_model_query().get((user, project))
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
@ -937,6 +977,14 @@ def cred_create_update(context, values):
|
||||
|
||||
|
||||
# Events
|
||||
def event_model_query():
|
||||
with session_for_read() as session:
|
||||
query = session.query(models.Event).options(
|
||||
joinedload(models.Event.cluster)
|
||||
)
|
||||
return query
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
def event_create(context, values):
|
||||
with session_for_write() as session:
|
||||
@ -948,7 +996,7 @@ def event_create(context, values):
|
||||
|
||||
@retry_on_deadlock
|
||||
def event_get(context, event_id, project_safe=True):
|
||||
event = model_query(context, models.Event).get(event_id)
|
||||
event = event_model_query().get(event_id)
|
||||
if project_safe and event is not None:
|
||||
if event.project != context.project_id:
|
||||
return None
|
||||
@ -957,8 +1005,8 @@ def event_get(context, event_id, project_safe=True):
|
||||
|
||||
|
||||
def event_get_by_short_id(context, short_id, project_safe=True):
|
||||
return query_by_short_id(context, models.Event, short_id,
|
||||
project_safe=project_safe)
|
||||
return query_by_short_id(context, event_model_query, models.Event,
|
||||
short_id, project_safe=project_safe)
|
||||
|
||||
|
||||
def _event_filter_paginate_query(context, query, filters=None,
|
||||
@ -968,14 +1016,14 @@ def _event_filter_paginate_query(context, query, filters=None,
|
||||
|
||||
keys, dirs = utils.get_sort_params(sort, consts.EVENT_TIMESTAMP)
|
||||
if marker:
|
||||
marker = model_query(context, models.Event).get(marker)
|
||||
marker = event_model_query().get(marker)
|
||||
return sa_utils.paginate_query(query, models.Event, limit, keys,
|
||||
marker=marker, sort_dirs=dirs).all()
|
||||
|
||||
|
||||
def event_get_all(context, limit=None, marker=None, sort=None, filters=None,
|
||||
project_safe=True):
|
||||
query = model_query(context, models.Event)
|
||||
query = event_model_query()
|
||||
if project_safe:
|
||||
query = query.filter_by(project=context.project_id)
|
||||
|
||||
@ -984,7 +1032,7 @@ def event_get_all(context, limit=None, marker=None, sort=None, filters=None,
|
||||
|
||||
|
||||
def event_count_by_cluster(context, cluster_id, project_safe=True):
|
||||
query = model_query(context, models.Event)
|
||||
query = event_model_query()
|
||||
|
||||
if project_safe:
|
||||
query = query.filter_by(project=context.project_id)
|
||||
@ -995,7 +1043,7 @@ def event_count_by_cluster(context, cluster_id, project_safe=True):
|
||||
|
||||
def event_get_all_by_cluster(context, cluster_id, limit=None, marker=None,
|
||||
sort=None, filters=None, project_safe=True):
|
||||
query = model_query(context, models.Event)
|
||||
query = event_model_query()
|
||||
query = query.filter_by(cluster_id=cluster_id)
|
||||
|
||||
if project_safe:
|
||||
@ -1036,13 +1084,22 @@ def event_purge(project, granularity='days', age=30):
|
||||
|
||||
|
||||
# Actions
|
||||
def action_model_query():
|
||||
with session_for_read() as session:
|
||||
query = session.query(models.Action).options(
|
||||
joinedload(models.Action.dep_on),
|
||||
joinedload(models.Action.dep_by)
|
||||
)
|
||||
return query
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
def action_create(context, values):
|
||||
with session_for_write() as session:
|
||||
action = models.Action()
|
||||
action.update(values)
|
||||
session.add(action)
|
||||
return action
|
||||
return action_get(context, action.id)
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
@ -1057,73 +1114,68 @@ def action_update(context, action_id, values):
|
||||
|
||||
|
||||
def action_get(context, action_id, project_safe=True, refresh=False):
|
||||
with session_for_read() as session:
|
||||
action = session.query(models.Action).get(action_id)
|
||||
if action is None:
|
||||
action = action_model_query().get(action_id)
|
||||
if action is None:
|
||||
return None
|
||||
|
||||
if project_safe:
|
||||
if action.project != context.project_id:
|
||||
return None
|
||||
|
||||
if project_safe:
|
||||
if action.project != context.project_id:
|
||||
return None
|
||||
|
||||
session.refresh(action)
|
||||
return action
|
||||
return action
|
||||
|
||||
|
||||
def action_list_active_scaling(context, cluster_id=None, project_safe=True):
|
||||
with session_for_read() as session:
|
||||
query = session.query(models.Action)
|
||||
if project_safe:
|
||||
query = query.filter_by(project=context.project_id)
|
||||
if cluster_id:
|
||||
query = query.filter_by(target=cluster_id)
|
||||
query = query.filter(
|
||||
models.Action.status.in_(
|
||||
[consts.ACTION_READY,
|
||||
consts.ACTION_WAITING,
|
||||
consts.ACTION_RUNNING,
|
||||
consts.ACTION_WAITING_LIFECYCLE_COMPLETION]))
|
||||
query = query.filter(
|
||||
models.Action.action.in_(consts.CLUSTER_SCALE_ACTIONS))
|
||||
scaling_actions = query.all()
|
||||
return scaling_actions
|
||||
query = action_model_query()
|
||||
if project_safe:
|
||||
query = query.filter_by(project=context.project_id)
|
||||
if cluster_id:
|
||||
query = query.filter_by(target=cluster_id)
|
||||
query = query.filter(
|
||||
models.Action.status.in_(
|
||||
[consts.ACTION_READY,
|
||||
consts.ACTION_WAITING,
|
||||
consts.ACTION_RUNNING,
|
||||
consts.ACTION_WAITING_LIFECYCLE_COMPLETION]))
|
||||
query = query.filter(
|
||||
models.Action.action.in_(consts.CLUSTER_SCALE_ACTIONS))
|
||||
scaling_actions = query.all()
|
||||
return scaling_actions
|
||||
|
||||
|
||||
def action_get_by_name(context, name, project_safe=True):
|
||||
return query_by_name(context, models.Action, name,
|
||||
return query_by_name(context, action_model_query, name,
|
||||
project_safe=project_safe)
|
||||
|
||||
|
||||
def action_get_by_short_id(context, short_id, project_safe=True):
|
||||
return query_by_short_id(context, models.Action, short_id,
|
||||
project_safe=project_safe)
|
||||
return query_by_short_id(context, action_model_query, models.Action,
|
||||
short_id, project_safe=project_safe)
|
||||
|
||||
|
||||
def action_get_all_by_owner(context, owner_id):
|
||||
query = model_query(context, models.Action).filter_by(owner=owner_id)
|
||||
query = action_model_query().filter_by(owner=owner_id)
|
||||
return query.all()
|
||||
|
||||
|
||||
def action_get_all_active_by_target(context, target_id, project_safe=True):
|
||||
with session_for_read() as session:
|
||||
query = session.query(models.Action)
|
||||
if project_safe:
|
||||
query = query.filter_by(project=context.project_id)
|
||||
query = query.filter_by(target=target_id)
|
||||
query = query.filter(
|
||||
models.Action.status.in_(
|
||||
[consts.ACTION_READY,
|
||||
consts.ACTION_WAITING,
|
||||
consts.ACTION_RUNNING,
|
||||
consts.ACTION_WAITING_LIFECYCLE_COMPLETION]))
|
||||
actions = query.all()
|
||||
return actions
|
||||
query = action_model_query()
|
||||
if project_safe:
|
||||
query = query.filter_by(project=context.project_id)
|
||||
query = query.filter_by(target=target_id)
|
||||
query = query.filter(
|
||||
models.Action.status.in_(
|
||||
[consts.ACTION_READY,
|
||||
consts.ACTION_WAITING,
|
||||
consts.ACTION_RUNNING,
|
||||
consts.ACTION_WAITING_LIFECYCLE_COMPLETION]))
|
||||
actions = query.all()
|
||||
return actions
|
||||
|
||||
|
||||
def action_get_all(context, filters=None, limit=None, marker=None, sort=None,
|
||||
project_safe=True):
|
||||
|
||||
query = model_query(context, models.Action)
|
||||
query = action_model_query()
|
||||
if project_safe:
|
||||
query = query.filter_by(project=context.project_id)
|
||||
|
||||
@ -1132,7 +1184,7 @@ def action_get_all(context, filters=None, limit=None, marker=None, sort=None,
|
||||
|
||||
keys, dirs = utils.get_sort_params(sort, consts.ACTION_CREATED_AT)
|
||||
if marker:
|
||||
marker = model_query(context, models.Action).get(marker)
|
||||
marker = action_model_query().get(marker)
|
||||
return sa_utils.paginate_query(query, models.Action, limit, keys,
|
||||
marker=marker, sort_dirs=dirs).all()
|
||||
|
||||
@ -1155,26 +1207,30 @@ def action_check_status(context, action_id, timestamp):
|
||||
return action.status
|
||||
|
||||
|
||||
def action_dependency_model_query():
|
||||
with session_for_read() as session:
|
||||
query = session.query(models.ActionDependency)
|
||||
return query
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
def dependency_get_depended(context, action_id):
|
||||
with session_for_read() as session:
|
||||
q = session.query(models.ActionDependency).filter_by(
|
||||
dependent=action_id)
|
||||
return [d.depended for d in q.all()]
|
||||
q = action_dependency_model_query().filter_by(
|
||||
dependent=action_id)
|
||||
return [d.depended for d in q.all()]
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
def dependency_get_dependents(context, action_id):
|
||||
with session_for_read() as session:
|
||||
q = session.query(models.ActionDependency).filter_by(
|
||||
depended=action_id)
|
||||
return [d.dependent for d in q.all()]
|
||||
q = action_dependency_model_query().filter_by(
|
||||
depended=action_id)
|
||||
return [d.dependent for d in q.all()]
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
def dependency_add(context, depended, dependent):
|
||||
if isinstance(depended, list) and isinstance(dependent, list):
|
||||
raise exception.NotSupport(
|
||||
raise exception.Error(
|
||||
'Multiple dependencies between lists not support')
|
||||
|
||||
with session_for_write() as session:
|
||||
@ -1372,7 +1428,7 @@ def action_abandon(context, action_id, values=None):
|
||||
|
||||
@retry_on_deadlock
|
||||
def action_lock_check(context, action_id, owner=None):
|
||||
action = model_query(context, models.Action).get(action_id)
|
||||
action = action_model_query().get(action_id)
|
||||
if not action:
|
||||
raise exception.ResourceNotFound(type='action', id=action_id)
|
||||
|
||||
@ -1394,7 +1450,7 @@ def action_signal(context, action_id, value):
|
||||
|
||||
|
||||
def action_signal_query(context, action_id):
|
||||
action = model_query(context, models.Action).get(action_id)
|
||||
action = action_model_query().get(action_id)
|
||||
if not action:
|
||||
return None
|
||||
|
||||
@ -1462,6 +1518,12 @@ def action_purge(project, granularity='days', age=30):
|
||||
|
||||
|
||||
# Receivers
|
||||
def receiver_model_query():
|
||||
with session_for_read() as session:
|
||||
query = session.query(models.Receiver)
|
||||
return query
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
def receiver_create(context, values):
|
||||
with session_for_write() as session:
|
||||
@ -1472,7 +1534,7 @@ def receiver_create(context, values):
|
||||
|
||||
|
||||
def receiver_get(context, receiver_id, project_safe=True):
|
||||
receiver = model_query(context, models.Receiver).get(receiver_id)
|
||||
receiver = receiver_model_query().get(receiver_id)
|
||||
if not receiver:
|
||||
return None
|
||||
|
||||
@ -1485,7 +1547,7 @@ def receiver_get(context, receiver_id, project_safe=True):
|
||||
|
||||
def receiver_get_all(context, limit=None, marker=None, filters=None, sort=None,
|
||||
project_safe=True):
|
||||
query = model_query(context, models.Receiver)
|
||||
query = receiver_model_query()
|
||||
if project_safe:
|
||||
query = query.filter_by(project=context.project_id)
|
||||
|
||||
@ -1494,19 +1556,19 @@ def receiver_get_all(context, limit=None, marker=None, filters=None, sort=None,
|
||||
|
||||
keys, dirs = utils.get_sort_params(sort, consts.RECEIVER_NAME)
|
||||
if marker:
|
||||
marker = model_query(context, models.Receiver).get(marker)
|
||||
marker = receiver_model_query().get(marker)
|
||||
return sa_utils.paginate_query(query, models.Receiver, limit, keys,
|
||||
marker=marker, sort_dirs=dirs).all()
|
||||
|
||||
|
||||
def receiver_get_by_name(context, name, project_safe=True):
|
||||
return query_by_name(context, models.Receiver, name,
|
||||
return query_by_name(context, receiver_model_query, name,
|
||||
project_safe=project_safe)
|
||||
|
||||
|
||||
def receiver_get_by_short_id(context, short_id, project_safe=True):
|
||||
return query_by_short_id(context, models.Receiver, short_id,
|
||||
project_safe=project_safe)
|
||||
return query_by_short_id(context, receiver_model_query, models.Receiver,
|
||||
short_id, project_safe=project_safe)
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
@ -1644,6 +1706,12 @@ def gc_by_engine(engine_id):
|
||||
|
||||
|
||||
# HealthRegistry
|
||||
def health_registry_model_query():
|
||||
with session_for_read() as session:
|
||||
query = session.query(models.HealthRegistry)
|
||||
return query
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
def registry_create(context, cluster_id, check_type, interval, params,
|
||||
engine_id, enabled=True):
|
||||
@ -1704,7 +1772,7 @@ def registry_get(context, cluster_id):
|
||||
|
||||
|
||||
def registry_get_by_param(context, params):
|
||||
query = model_query(context, models.HealthRegistry)
|
||||
query = health_registry_model_query()
|
||||
obj = utils.exact_filter(query, models.HealthRegistry, params).first()
|
||||
return obj
|
||||
|
||||
|
@ -95,6 +95,8 @@ class Cluster(BASE, TimestampMixin, models.ModelBase):
|
||||
dependents = Column(types.Dict)
|
||||
config = Column(types.Dict)
|
||||
|
||||
profile = relationship(Profile)
|
||||
|
||||
|
||||
class Node(BASE, TimestampMixin, models.ModelBase):
|
||||
"""Node objects."""
|
||||
@ -121,6 +123,9 @@ class Node(BASE, TimestampMixin, models.ModelBase):
|
||||
data = Column(types.Dict)
|
||||
dependents = Column(types.Dict)
|
||||
profile = relationship(Profile, backref=backref('nodes'))
|
||||
cluster = relationship(Cluster, backref=backref('nodes'),
|
||||
foreign_keys=[cluster_id],
|
||||
primaryjoin='Cluster.id == Node.cluster_id')
|
||||
|
||||
|
||||
class ClusterLock(BASE, models.ModelBase):
|
||||
@ -241,6 +246,13 @@ class Action(BASE, TimestampMixin, models.ModelBase):
|
||||
project = Column(String(32))
|
||||
domain = Column(String(32))
|
||||
|
||||
dep_on = relationship(
|
||||
ActionDependency,
|
||||
primaryjoin="Action.id == ActionDependency.dependent")
|
||||
dep_by = relationship(
|
||||
ActionDependency,
|
||||
primaryjoin="Action.id == ActionDependency.depended")
|
||||
|
||||
|
||||
class Event(BASE, models.ModelBase):
|
||||
"""Events generated by the Senin engine."""
|
||||
|
@ -18,7 +18,6 @@ from senlin.common import exception
|
||||
from senlin.common import utils
|
||||
from senlin.db import api as db_api
|
||||
from senlin.objects import base
|
||||
from senlin.objects import dependency as dobj
|
||||
from senlin.objects import fields
|
||||
|
||||
|
||||
@ -49,6 +48,8 @@ class Action(base.SenlinObject, base.VersionedObjectDictCompat):
|
||||
'user': fields.StringField(),
|
||||
'project': fields.StringField(),
|
||||
'domain': fields.StringField(nullable=True),
|
||||
'dep_on': fields.CustomListField(attr_name='depended', nullable=True),
|
||||
'dep_by': fields.CustomListField(attr_name='dependent', nullable=True),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
@ -187,12 +188,6 @@ class Action(base.SenlinObject, base.VersionedObjectDictCompat):
|
||||
status=status)
|
||||
|
||||
def to_dict(self):
|
||||
if self.id:
|
||||
dep_on = dobj.Dependency.get_depended(self.context, self.id)
|
||||
dep_by = dobj.Dependency.get_dependents(self.context, self.id)
|
||||
else:
|
||||
dep_on = []
|
||||
dep_by = []
|
||||
action_dict = {
|
||||
'id': self.id,
|
||||
'name': self.name,
|
||||
@ -208,8 +203,8 @@ class Action(base.SenlinObject, base.VersionedObjectDictCompat):
|
||||
'status_reason': self.status_reason,
|
||||
'inputs': self.inputs,
|
||||
'outputs': self.outputs,
|
||||
'depends_on': dep_on,
|
||||
'depended_by': dep_by,
|
||||
'depends_on': self.dep_on,
|
||||
'depended_by': self.dep_by,
|
||||
'created_at': utils.isotime(self.created_at),
|
||||
'updated_at': utils.isotime(self.updated_at),
|
||||
'data': self.data,
|
||||
|
@ -15,7 +15,6 @@
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from senlin.common import context as senlin_context
|
||||
from senlin.common import exception as exc
|
||||
from senlin.common import utils
|
||||
from senlin.db import api as db_api
|
||||
@ -49,14 +48,34 @@ class Cluster(base.SenlinObject, base.VersionedObjectDictCompat):
|
||||
'domain': fields.StringField(nullable=True),
|
||||
'dependents': fields.JsonField(nullable=True),
|
||||
'config': fields.JsonField(nullable=True),
|
||||
'profile_name': fields.StringField(),
|
||||
'nodes': fields.CustomListField(attr_name='id', nullable=True),
|
||||
'policies': fields.CustomListField(attr_name='id', nullable=True),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _from_db_object(context, obj, db_obj):
|
||||
if db_obj is None:
|
||||
return None
|
||||
for field in obj.fields:
|
||||
if field == 'metadata':
|
||||
obj['metadata'] = db_obj['meta_data']
|
||||
elif field == 'profile_name':
|
||||
obj['profile_name'] = db_obj['profile'].name
|
||||
else:
|
||||
obj[field] = db_obj[field]
|
||||
|
||||
obj._context = context
|
||||
obj.obj_reset_changes()
|
||||
|
||||
return obj
|
||||
|
||||
@classmethod
|
||||
def create(cls, context, values):
|
||||
values = cls._transpose_metadata(values)
|
||||
values['init_at'] = timeutils.utcnow(True)
|
||||
obj = db_api.cluster_create(context, values)
|
||||
return cls._from_db_object(context, cls(context), obj)
|
||||
return cls._from_db_object(context, cls(), obj)
|
||||
|
||||
@classmethod
|
||||
def find(cls, context, identity, project_safe=True):
|
||||
@ -118,9 +137,6 @@ class Cluster(base.SenlinObject, base.VersionedObjectDictCompat):
|
||||
db_api.cluster_delete(context, obj_id)
|
||||
|
||||
def to_dict(self):
|
||||
context = senlin_context.get_admin_context()
|
||||
profile = db_api.profile_get(context, self.profile_id,
|
||||
project_safe=False)
|
||||
return {
|
||||
'id': self.id,
|
||||
'name': self.name,
|
||||
@ -141,7 +157,7 @@ class Cluster(base.SenlinObject, base.VersionedObjectDictCompat):
|
||||
'data': self.data or {},
|
||||
'dependents': self.dependents or {},
|
||||
'config': self.config or {},
|
||||
'profile_name': profile.name,
|
||||
'nodes': db_api.node_ids_by_cluster(context, self.id),
|
||||
'policies': db_api.cluster_policy_ids_by_cluster(context, self.id)
|
||||
'profile_name': self.profile_name,
|
||||
'nodes': self.nodes,
|
||||
'policies': self.policies
|
||||
}
|
||||
|
@ -519,3 +519,17 @@ class ReceiverTypeField(fields.AutoTypedField):
|
||||
class NodeReplaceMapField(fields.AutoTypedField):
|
||||
|
||||
AUTO_TYPE = UniqueDict(fields.String())
|
||||
|
||||
|
||||
class CustomListField(ListField):
|
||||
|
||||
def __init__(self, attr_name, **kwargs):
|
||||
self.attr_name = attr_name
|
||||
super(CustomListField, self).__init__(**kwargs)
|
||||
|
||||
def coerce(self, obj, attr, value):
|
||||
objs = super(CustomListField, self).coerce(obj, attr, value)
|
||||
custom_list = []
|
||||
for i in objs:
|
||||
custom_list.append(getattr(i, self.attr_name))
|
||||
return custom_list
|
||||
|
@ -29,7 +29,6 @@ class Node(base.SenlinObject, base.VersionedObjectDictCompat):
|
||||
'id': fields.UUIDField(),
|
||||
'name': fields.StringField(),
|
||||
'profile_id': fields.UUIDField(),
|
||||
# This field is treated as string because we may store '' into it
|
||||
'cluster_id': fields.StringField(),
|
||||
'physical_id': fields.StringField(nullable=True),
|
||||
'index': fields.IntegerField(),
|
||||
|
@ -100,6 +100,7 @@ class DBAPIClusterTest(base.SenlinTestCase):
|
||||
self.assertIsNone(db_api.cluster_get_by_name(self.ctx, cluster.name))
|
||||
|
||||
def test_cluster_get_by_name_diff_project(self):
|
||||
self.ctx.project_id = UUID2
|
||||
cluster1 = shared.create_cluster(self.ctx, self.profile,
|
||||
name='cluster_A',
|
||||
project=UUID2)
|
||||
@ -109,6 +110,7 @@ class DBAPIClusterTest(base.SenlinTestCase):
|
||||
shared.create_cluster(self.ctx, self.profile, name='cluster_B',
|
||||
project=UUID2)
|
||||
|
||||
self.ctx.project_id = UUID1
|
||||
res = db_api.cluster_get_by_name(self.ctx, 'cluster_A')
|
||||
self.assertIsNone(res)
|
||||
|
||||
|
@ -134,6 +134,7 @@ class TestCluster(base.SenlinTestCase):
|
||||
self.assertEqual([], rt['policies'])
|
||||
|
||||
def test_store_for_create(self):
|
||||
utils.create_profile(self.context, PROFILE_ID)
|
||||
cluster = cm.Cluster('test-cluster', 0, PROFILE_ID,
|
||||
user=self.context.user_id,
|
||||
project=self.context.project_id)
|
||||
@ -168,6 +169,7 @@ class TestCluster(base.SenlinTestCase):
|
||||
self.assertEqual({}, result.metadata)
|
||||
|
||||
def test_store_for_update(self):
|
||||
utils.create_profile(self.context, PROFILE_ID)
|
||||
cluster = cm.Cluster('test-cluster', 0, PROFILE_ID,
|
||||
user=self.context.user_id,
|
||||
project=self.context.project_id)
|
||||
|
@ -50,6 +50,7 @@ class TestClusterPolicy(base.SenlinTestCase):
|
||||
self.assertEqual('', cp.policy_name)
|
||||
|
||||
def test_cluster_policy_store(self):
|
||||
utils.create_profile(self.context, PROFILE_ID)
|
||||
cluster = utils.create_cluster(self.context, CLUSTER_ID, PROFILE_ID)
|
||||
policy = utils.create_policy(self.context, POLICY_ID)
|
||||
values = {
|
||||
@ -96,6 +97,7 @@ class TestClusterPolicy(base.SenlinTestCase):
|
||||
"specified cluster 'some-cluster'.",
|
||||
six.text_type(ex))
|
||||
|
||||
utils.create_profile(self.context, PROFILE_ID)
|
||||
cluster = utils.create_cluster(self.context, CLUSTER_ID, PROFILE_ID)
|
||||
policy = utils.create_policy(self.context, POLICY_ID)
|
||||
|
||||
|
@ -15,9 +15,11 @@ from oslo_config import cfg
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
import six
|
||||
|
||||
from senlin.common import exception as exc
|
||||
from senlin.db import api as db_api
|
||||
from senlin.objects import cluster as co
|
||||
from senlin.objects import cluster_policy as cpo
|
||||
from senlin.tests.unit.common import base
|
||||
from senlin.tests.unit.common import utils
|
||||
|
||||
@ -96,11 +98,17 @@ class TestCluster(base.SenlinTestCase):
|
||||
mock_get_short_id.assert_called_once_with(self.ctx, 'bogus',
|
||||
project_safe=True)
|
||||
|
||||
@mock.patch.object(db_api, 'cluster_policy_ids_by_cluster')
|
||||
@mock.patch.object(db_api, 'node_ids_by_cluster')
|
||||
@mock.patch.object(db_api, 'profile_get')
|
||||
def test_to_dict(self, mock_profile, mock_nodes, mock_bindings):
|
||||
def test_to_dict(self):
|
||||
PROFILE_ID = '96f4df4b-889e-4184-ba8d-b5ca122f95bb'
|
||||
POLICY1_ID = '2c5139a6-24ba-4a6f-bd53-a268f61536de'
|
||||
POLICY2_ID = '2c5139a6-24ba-4a6f-bd53-a268f61536d3'
|
||||
NODE1_ID = '26f4df4b-889e-4184-ba8d-b5ca122f9566'
|
||||
NODE2_ID = '26f4df4b-889e-4184-ba8d-b5ca122f9567'
|
||||
|
||||
utils.create_profile(self.ctx, PROFILE_ID)
|
||||
policy_1 = utils.create_policy(self.ctx, POLICY1_ID, 'P1')
|
||||
policy_2 = utils.create_policy(self.ctx, POLICY2_ID, 'P2')
|
||||
|
||||
values = {
|
||||
'profile_id': PROFILE_ID,
|
||||
'name': 'test-cluster',
|
||||
@ -114,11 +122,21 @@ class TestCluster(base.SenlinTestCase):
|
||||
'project': self.ctx.project_id,
|
||||
}
|
||||
cluster = co.Cluster.create(self.ctx, values)
|
||||
fake_profile = mock.Mock()
|
||||
fake_profile.name = 'PROFILEABC'
|
||||
mock_profile.return_value = fake_profile
|
||||
mock_nodes.return_value = ['N1', 'N2']
|
||||
mock_bindings.return_value = ['P1', 'P2']
|
||||
p1 = cpo.ClusterPolicy(cluster_id=cluster.id, policy_id=policy_1.id,
|
||||
enabled=True, id=uuidutils.generate_uuid(),
|
||||
last_op=None)
|
||||
p2 = cpo.ClusterPolicy(cluster_id=cluster.id, policy_id=policy_2.id,
|
||||
enabled=True, id=uuidutils.generate_uuid(),
|
||||
last_op=None)
|
||||
values = {
|
||||
'priority': 12,
|
||||
'enabled': True,
|
||||
}
|
||||
p1.create(self.ctx, cluster.id, POLICY1_ID, values)
|
||||
p2.create(self.ctx, cluster.id, POLICY2_ID, values)
|
||||
utils.create_node(self.ctx, NODE1_ID, PROFILE_ID, cluster.id)
|
||||
utils.create_node(self.ctx, NODE2_ID, PROFILE_ID, cluster.id)
|
||||
cluster = co.Cluster.get(self.ctx, cluster.id)
|
||||
expected = {
|
||||
'id': cluster.id,
|
||||
'name': cluster.name,
|
||||
@ -133,15 +151,18 @@ class TestCluster(base.SenlinTestCase):
|
||||
'max_size': -1,
|
||||
'desired_capacity': 1,
|
||||
'timeout': cfg.CONF.default_action_timeout,
|
||||
'status': 'INIT',
|
||||
'status': six.text_type('INIT'),
|
||||
'status_reason': None,
|
||||
'metadata': {},
|
||||
'data': {},
|
||||
'dependents': {},
|
||||
'config': {},
|
||||
'nodes': ['N1', 'N2'],
|
||||
'policies': ['P1', 'P2'],
|
||||
'profile_name': 'PROFILEABC',
|
||||
'nodes': [mock.ANY, mock.ANY],
|
||||
'policies': [mock.ANY, mock.ANY],
|
||||
'profile_name': six.text_type('test-profile'),
|
||||
}
|
||||
cluster_dict = cluster.to_dict()
|
||||
|
||||
self.assertEqual(expected, cluster.to_dict())
|
||||
self.assertEqual(expected, cluster_dict)
|
||||
self.assertEqual(2, len(cluster_dict['nodes']))
|
||||
self.assertEqual(2, len(cluster_dict['policies']))
|
||||
|
@ -749,3 +749,19 @@ class TestReceiverType(TestField):
|
||||
},
|
||||
self.field.get_schema()
|
||||
)
|
||||
|
||||
|
||||
class TestCustomField(TestField):
|
||||
def setUp(self):
|
||||
super(TestCustomField, self).setUp()
|
||||
self.field = senlin_fields.CustomListField(attr_name='dependant')
|
||||
dep = mock.Mock()
|
||||
dep.dependant = '123'
|
||||
self.coerce_good_values = [([dep], ['123']), ([dep], ['123'])]
|
||||
self.coerce_bad_values = ['BOGUS']
|
||||
|
||||
self.to_primitive_values = [([dep], [dep])]
|
||||
self.from_primitive_values = [([dep], [dep])]
|
||||
|
||||
def test_stringify(self):
|
||||
self.assertEqual('[abc,def]', self.field.stringify(['abc', 'def']))
|
||||
|
Loading…
Reference in New Issue
Block a user