Merge "Refactor event-log code"
This commit is contained in:
commit
0b01a79877
@ -73,7 +73,7 @@ class Context(context.RequestContext):
|
|||||||
if current_instance_info is not None:
|
if current_instance_info is not None:
|
||||||
self.current_instance_info = current_instance_info
|
self.current_instance_info = current_instance_info
|
||||||
else:
|
else:
|
||||||
self.current_instance_info = []
|
self.current_instance_info = InstanceInfo()
|
||||||
|
|
||||||
def clone(self):
|
def clone(self):
|
||||||
return Context(
|
return Context(
|
||||||
@ -273,6 +273,15 @@ def sleep(seconds=0):
|
|||||||
time.sleep(seconds)
|
time.sleep(seconds)
|
||||||
|
|
||||||
|
|
||||||
|
class InstanceInfo(object):
|
||||||
|
def __init__(self, cluster_id=None, instance_id=None, instance_name=None,
|
||||||
|
node_group_id=None):
|
||||||
|
self.cluster_id = cluster_id
|
||||||
|
self.instance_id = instance_id
|
||||||
|
self.instance_name = instance_name
|
||||||
|
self.node_group_id = node_group_id
|
||||||
|
|
||||||
|
|
||||||
class InstanceInfoManager(object):
|
class InstanceInfoManager(object):
|
||||||
def __init__(self, instance_info):
|
def __init__(self, instance_info):
|
||||||
self.prev_instance_info = current().current_instance_info
|
self.prev_instance_info = current().current_instance_info
|
||||||
|
@ -236,8 +236,8 @@ class DirectEngine(e.Engine):
|
|||||||
instance_name = g.generate_instance_name(
|
instance_name = g.generate_instance_name(
|
||||||
cluster.name, node_group.name, idx)
|
cluster.name, node_group.name, idx)
|
||||||
|
|
||||||
current_instance_info = [
|
current_instance_info = context.InstanceInfo(
|
||||||
cluster.id, None, instance_name, node_group.id]
|
cluster.id, None, instance_name, node_group.id)
|
||||||
|
|
||||||
with context.InstanceInfoManager(current_instance_info):
|
with context.InstanceInfoManager(current_instance_info):
|
||||||
instance_id = self._run_instance(
|
instance_id = self._run_instance(
|
||||||
@ -323,7 +323,7 @@ class DirectEngine(e.Engine):
|
|||||||
names.append(group.name)
|
names.append(group.name)
|
||||||
return names
|
return names
|
||||||
|
|
||||||
@cpo.event_wrapper_without_instance(mark_successful_on_exit=True)
|
@cpo.event_wrapper(mark_successful_on_exit=True)
|
||||||
def _run_instance(self, cluster, node_group, idx, aa_group=None,
|
def _run_instance(self, cluster, node_group, idx, aa_group=None,
|
||||||
old_aa_groups=None):
|
old_aa_groups=None):
|
||||||
"""Create instance using nova client and persist them into DB."""
|
"""Create instance using nova client and persist them into DB."""
|
||||||
|
@ -199,7 +199,8 @@ class _CreateLauncher(HeatEngine):
|
|||||||
DISABLE_ROLLBACK = True
|
DISABLE_ROLLBACK = True
|
||||||
inst_ids = []
|
inst_ids = []
|
||||||
|
|
||||||
@cpo.event_wrapper_without_instance(mark_successful_on_exit=True)
|
@cpo.event_wrapper(
|
||||||
|
True, step=_('Create Heat stack'), param=('cluster', 1))
|
||||||
def create_instances(self, cluster, target_count):
|
def create_instances(self, cluster, target_count):
|
||||||
tmpl = heat.ClusterTemplate(cluster)
|
tmpl = heat.ClusterTemplate(cluster)
|
||||||
|
|
||||||
@ -213,8 +214,6 @@ class _CreateLauncher(HeatEngine):
|
|||||||
# create all instances
|
# create all instances
|
||||||
cluster = g.change_cluster_status(cluster, self.STAGES[0])
|
cluster = g.change_cluster_status(cluster, self.STAGES[0])
|
||||||
|
|
||||||
cpo.add_provisioning_step(cluster.id, _("Create Heat stack"), 1)
|
|
||||||
with context.InstanceInfoManager([cluster.id, None, None, None]):
|
|
||||||
self.create_instances(cluster, target_count)
|
self.create_instances(cluster, target_count)
|
||||||
|
|
||||||
# wait for all instances are up and networks ready
|
# wait for all instances are up and networks ready
|
||||||
|
@ -130,15 +130,14 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
|
|||||||
self.assertEqual(1, len(cpo.get_cluster_events(cluster.id, step_id1)))
|
self.assertEqual(1, len(cpo.get_cluster_events(cluster.id, step_id1)))
|
||||||
self.assertEqual(1, len(cpo.get_cluster_events(cluster.id, step_id2)))
|
self.assertEqual(1, len(cpo.get_cluster_events(cluster.id, step_id2)))
|
||||||
|
|
||||||
def _make_checks(self, instance, sleep=True):
|
def _make_checks(self, instance_info, sleep=True):
|
||||||
ctx = context.ctx()
|
ctx = context.ctx()
|
||||||
|
|
||||||
if sleep:
|
if sleep:
|
||||||
context.sleep(2)
|
context.sleep(2)
|
||||||
|
|
||||||
current_instance_info = ctx.current_instance_info
|
current_instance_info = ctx.current_instance_info
|
||||||
expected = [None, instance.id, instance.name, None]
|
self.assertEqual(instance_info, current_instance_info)
|
||||||
self.assertEqual(expected, current_instance_info)
|
|
||||||
|
|
||||||
def test_instance_context_manager(self):
|
def test_instance_context_manager(self):
|
||||||
fake_instances = [FakeInstance() for _ in range(50)]
|
fake_instances = [FakeInstance() for _ in range(50)]
|
||||||
@ -146,14 +145,16 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
|
|||||||
# check that InstanceContextManager works fine sequentially
|
# check that InstanceContextManager works fine sequentially
|
||||||
|
|
||||||
for instance in fake_instances:
|
for instance in fake_instances:
|
||||||
info = [None, instance.id, instance.name, None]
|
info = context.InstanceInfo(
|
||||||
|
None, instance.id, instance.name, None)
|
||||||
with context.InstanceInfoManager(info):
|
with context.InstanceInfoManager(info):
|
||||||
self._make_checks(instance, sleep=False)
|
self._make_checks(info, sleep=False)
|
||||||
|
|
||||||
# check that InstanceContextManager works fine in parallel
|
# check that InstanceContextManager works fine in parallel
|
||||||
|
|
||||||
with context.ThreadGroup() as tg:
|
with context.ThreadGroup() as tg:
|
||||||
for instance in fake_instances:
|
for instance in fake_instances:
|
||||||
info = [None, instance.id, instance.name, None]
|
info = context.InstanceInfo(
|
||||||
|
None, instance.id, instance.name, None)
|
||||||
with context.InstanceInfoManager(info):
|
with context.InstanceInfoManager(info):
|
||||||
tg.spawn("make_checks", self._make_checks, instance)
|
tg.spawn("make_checks", self._make_checks, info)
|
||||||
|
@ -22,27 +22,33 @@ import six
|
|||||||
from sahara import conductor as c
|
from sahara import conductor as c
|
||||||
from sahara.conductor import resource
|
from sahara.conductor import resource
|
||||||
from sahara import context
|
from sahara import context
|
||||||
from sahara import exceptions
|
|
||||||
from sahara.i18n import _
|
|
||||||
|
|
||||||
conductor = c.API
|
conductor = c.API
|
||||||
|
|
||||||
|
|
||||||
|
def _get_cluster_id(instance):
|
||||||
|
# If instance is InstanceInfo from context, then get cluster_id directly
|
||||||
|
if hasattr(instance, 'node_group'):
|
||||||
|
return instance.node_group.cluster_id
|
||||||
|
else:
|
||||||
|
return instance.cluster_id
|
||||||
|
|
||||||
|
|
||||||
def add_successful_event(instance):
|
def add_successful_event(instance):
|
||||||
cluster_id = instance.node_group.cluster_id
|
cluster_id = _get_cluster_id(instance)
|
||||||
step_id = get_current_provisioning_step(cluster_id)
|
step_id = get_current_provisioning_step(cluster_id)
|
||||||
if step_id:
|
if step_id:
|
||||||
conductor.cluster_event_add(context.ctx(), step_id, {
|
conductor.cluster_event_add(context.ctx(), step_id, {
|
||||||
'successful': True,
|
'successful': True,
|
||||||
'node_group_id': instance.node_group_id,
|
'node_group_id': instance.node_group_id,
|
||||||
'instance_id': instance.id,
|
'instance_id': instance.instance_id,
|
||||||
'instance_name': instance.instance_name,
|
'instance_name': instance.instance_name,
|
||||||
'event_info': None,
|
'event_info': None,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
def add_fail_event(instance, exception):
|
def add_fail_event(instance, exception):
|
||||||
cluster_id = instance.node_group.cluster_id
|
cluster_id = _get_cluster_id(instance)
|
||||||
step_id = get_current_provisioning_step(cluster_id)
|
step_id = get_current_provisioning_step(cluster_id)
|
||||||
event_info = six.text_type(exception)
|
event_info = six.text_type(exception)
|
||||||
|
|
||||||
@ -50,7 +56,7 @@ def add_fail_event(instance, exception):
|
|||||||
conductor.cluster_event_add(context.ctx(), step_id, {
|
conductor.cluster_event_add(context.ctx(), step_id, {
|
||||||
'successful': False,
|
'successful': False,
|
||||||
'node_group_id': instance.node_group_id,
|
'node_group_id': instance.node_group_id,
|
||||||
'instance_id': instance.id,
|
'instance_id': instance.instance_id,
|
||||||
'instance_name': instance.instance_name,
|
'instance_name': instance.instance_name,
|
||||||
'event_info': event_info,
|
'event_info': event_info,
|
||||||
})
|
})
|
||||||
@ -133,27 +139,31 @@ def get_cluster_events(cluster_id, provision_step=None):
|
|||||||
return events
|
return events
|
||||||
|
|
||||||
|
|
||||||
def event_wrapper(mark_successful_on_exit):
|
def event_wrapper(mark_successful_on_exit, **spec):
|
||||||
|
""""General event-log wrapper
|
||||||
|
|
||||||
|
:param mark_successful_on_exit: should we send success event
|
||||||
|
after execution of function
|
||||||
|
|
||||||
|
:param spec: extra specification
|
||||||
|
:parameter step: provisioning step name (only for provisioning
|
||||||
|
steps with only one event)
|
||||||
|
:parameter param: tuple (name, pos) with parameter specification,
|
||||||
|
where 'name' is the name of the parameter of function, 'pos' is the
|
||||||
|
position of the parameter of function. This parameter is used to
|
||||||
|
extract info about Instance or Cluster.
|
||||||
|
"""
|
||||||
|
|
||||||
def decorator(func):
|
def decorator(func):
|
||||||
@functools.wraps(func)
|
@functools.wraps(func)
|
||||||
def handler(*args, **kwargs):
|
def handler(*args, **kwargs):
|
||||||
# NOTE (vgridnev): We should know information about instance,
|
step_name = spec.get('step', None)
|
||||||
# so we should find instance in args or kwargs.
|
instance = _find_in_args(spec, *args, **kwargs)
|
||||||
# Also, we import sahara.conductor.resource
|
|
||||||
# to check some object is Instance
|
|
||||||
|
|
||||||
instance = None
|
if step_name:
|
||||||
for arg in args:
|
# It's single process, let's add provisioning step here
|
||||||
if isinstance(arg, resource.InstanceResource):
|
cluster_id = _get_cluster_id(instance)
|
||||||
instance = arg
|
add_provisioning_step(cluster_id, step_name, 1)
|
||||||
|
|
||||||
for kw_arg in kwargs.values():
|
|
||||||
if isinstance(kw_arg, resource.InstanceResource):
|
|
||||||
instance = kw_arg
|
|
||||||
|
|
||||||
if instance is None:
|
|
||||||
raise exceptions.InvalidDataException(
|
|
||||||
_("Function should have an Instance as argument"))
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
value = func(*args, **kwargs)
|
value = func(*args, **kwargs)
|
||||||
@ -169,39 +179,50 @@ def event_wrapper(mark_successful_on_exit):
|
|||||||
return decorator
|
return decorator
|
||||||
|
|
||||||
|
|
||||||
def event_wrapper_without_instance(mark_successful_on_exit):
|
def _get_info_from_instance(arg):
|
||||||
def decorator(func):
|
if isinstance(arg, resource.InstanceResource):
|
||||||
@functools.wraps(func)
|
return arg
|
||||||
def handler(*args, **kwargs):
|
return None
|
||||||
ctx = context.ctx()
|
|
||||||
(cluster_id, instance_id, instance_name,
|
|
||||||
node_group_id) = ctx.current_instance_info
|
|
||||||
step_id = get_current_provisioning_step(cluster_id)
|
|
||||||
|
|
||||||
try:
|
|
||||||
value = func(*args, **kwargs)
|
|
||||||
except Exception as e:
|
|
||||||
with excutils.save_and_reraise_exception():
|
|
||||||
conductor.cluster_event_add(
|
|
||||||
context.ctx(),
|
|
||||||
step_id, {
|
|
||||||
'successful': False,
|
|
||||||
'node_group_id': node_group_id,
|
|
||||||
'instance_id': instance_id,
|
|
||||||
'instance_name': instance_name,
|
|
||||||
'event_info': six.text_type(e),
|
|
||||||
})
|
|
||||||
|
|
||||||
if mark_successful_on_exit:
|
def _get_info_from_cluster(arg):
|
||||||
conductor.cluster_event_add(
|
if isinstance(arg, resource.ClusterResource):
|
||||||
context.ctx(),
|
return context.InstanceInfo(arg.id)
|
||||||
step_id, {
|
return None
|
||||||
'successful': True,
|
|
||||||
'node_group_id': node_group_id,
|
|
||||||
'instance_id': instance_id,
|
|
||||||
'instance_name': instance_name,
|
|
||||||
})
|
|
||||||
|
|
||||||
|
|
||||||
|
def _get_info_from_obj(arg):
|
||||||
|
functions = [_get_info_from_instance, _get_info_from_cluster]
|
||||||
|
|
||||||
|
for func in functions:
|
||||||
|
value = func(arg)
|
||||||
|
if value:
|
||||||
return value
|
return value
|
||||||
return handler
|
return None
|
||||||
return decorator
|
|
||||||
|
|
||||||
|
def _find_in_args(spec, *args, **kwargs):
|
||||||
|
param_values = spec.get('param', None)
|
||||||
|
|
||||||
|
if param_values:
|
||||||
|
p_name, p_pos = param_values
|
||||||
|
obj = kwargs.get(p_name, None)
|
||||||
|
if obj:
|
||||||
|
return _get_info_from_obj(obj)
|
||||||
|
return _get_info_from_obj(args[p_pos])
|
||||||
|
|
||||||
|
# If param is not specified, let's search instance in args
|
||||||
|
|
||||||
|
for arg in args:
|
||||||
|
val = _get_info_from_instance(arg)
|
||||||
|
if val:
|
||||||
|
return val
|
||||||
|
|
||||||
|
for arg in kwargs.values():
|
||||||
|
val = _get_info_from_instance(arg)
|
||||||
|
if val:
|
||||||
|
return val
|
||||||
|
|
||||||
|
# If instance not found in args, let's get instance info from context
|
||||||
|
|
||||||
|
return context.ctx().current_instance_info
|
||||||
|
Loading…
Reference in New Issue
Block a user