Refactor event-log code

Now event-log code have few bad points:
1) We push an list in context without any extra info
2) Name event_wrapper_without_instance looks strange
3) Code duplications

This patch remove this bad points and make event-log code better.

Partially implements blueprint event-log

Change-Id: I075d256b4251159b71954e2f7e020cfd4d5e6b9b
This commit is contained in:
Vitaly Gridnev 2015-02-05 13:59:56 +03:00
parent 3f63733f9f
commit aa997bb6b8
5 changed files with 100 additions and 70 deletions

View File

@ -73,7 +73,7 @@ class Context(context.RequestContext):
if current_instance_info is not None: if current_instance_info is not None:
self.current_instance_info = current_instance_info self.current_instance_info = current_instance_info
else: else:
self.current_instance_info = [] self.current_instance_info = InstanceInfo()
def clone(self): def clone(self):
return Context( return Context(
@ -273,6 +273,15 @@ def sleep(seconds=0):
time.sleep(seconds) time.sleep(seconds)
class InstanceInfo(object):
def __init__(self, cluster_id=None, instance_id=None, instance_name=None,
node_group_id=None):
self.cluster_id = cluster_id
self.instance_id = instance_id
self.instance_name = instance_name
self.node_group_id = node_group_id
class InstanceInfoManager(object): class InstanceInfoManager(object):
def __init__(self, instance_info): def __init__(self, instance_info):
self.prev_instance_info = current().current_instance_info self.prev_instance_info = current().current_instance_info

View File

@ -236,8 +236,8 @@ class DirectEngine(e.Engine):
instance_name = g.generate_instance_name( instance_name = g.generate_instance_name(
cluster.name, node_group.name, idx) cluster.name, node_group.name, idx)
current_instance_info = [ current_instance_info = context.InstanceInfo(
cluster.id, None, instance_name, node_group.id] cluster.id, None, instance_name, node_group.id)
with context.InstanceInfoManager(current_instance_info): with context.InstanceInfoManager(current_instance_info):
instance_id = self._run_instance( instance_id = self._run_instance(
@ -323,7 +323,7 @@ class DirectEngine(e.Engine):
names.append(group.name) names.append(group.name)
return names return names
@cpo.event_wrapper_without_instance(mark_successful_on_exit=True) @cpo.event_wrapper(mark_successful_on_exit=True)
def _run_instance(self, cluster, node_group, idx, aa_group=None, def _run_instance(self, cluster, node_group, idx, aa_group=None,
old_aa_groups=None): old_aa_groups=None):
"""Create instance using nova client and persist them into DB.""" """Create instance using nova client and persist them into DB."""

View File

@ -199,7 +199,8 @@ class _CreateLauncher(HeatEngine):
DISABLE_ROLLBACK = True DISABLE_ROLLBACK = True
inst_ids = [] inst_ids = []
@cpo.event_wrapper_without_instance(mark_successful_on_exit=True) @cpo.event_wrapper(
True, step=_('Create Heat stack'), param=('cluster', 1))
def create_instances(self, cluster, target_count): def create_instances(self, cluster, target_count):
tmpl = heat.ClusterTemplate(cluster) tmpl = heat.ClusterTemplate(cluster)
@ -213,8 +214,6 @@ class _CreateLauncher(HeatEngine):
# create all instances # create all instances
cluster = g.change_cluster_status(cluster, self.STAGES[0]) cluster = g.change_cluster_status(cluster, self.STAGES[0])
cpo.add_provisioning_step(cluster.id, _("Create Heat stack"), 1)
with context.InstanceInfoManager([cluster.id, None, None, None]):
self.create_instances(cluster, target_count) self.create_instances(cluster, target_count)
# wait for all instances are up and networks ready # wait for all instances are up and networks ready

View File

@ -130,15 +130,14 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
self.assertEqual(1, len(cpo.get_cluster_events(cluster.id, step_id1))) self.assertEqual(1, len(cpo.get_cluster_events(cluster.id, step_id1)))
self.assertEqual(1, len(cpo.get_cluster_events(cluster.id, step_id2))) self.assertEqual(1, len(cpo.get_cluster_events(cluster.id, step_id2)))
def _make_checks(self, instance, sleep=True): def _make_checks(self, instance_info, sleep=True):
ctx = context.ctx() ctx = context.ctx()
if sleep: if sleep:
context.sleep(2) context.sleep(2)
current_instance_info = ctx.current_instance_info current_instance_info = ctx.current_instance_info
expected = [None, instance.id, instance.name, None] self.assertEqual(instance_info, current_instance_info)
self.assertEqual(expected, current_instance_info)
def test_instance_context_manager(self): def test_instance_context_manager(self):
fake_instances = [FakeInstance() for _ in range(50)] fake_instances = [FakeInstance() for _ in range(50)]
@ -146,14 +145,16 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
# check that InstanceContextManager works fine sequentially # check that InstanceContextManager works fine sequentially
for instance in fake_instances: for instance in fake_instances:
info = [None, instance.id, instance.name, None] info = context.InstanceInfo(
None, instance.id, instance.name, None)
with context.InstanceInfoManager(info): with context.InstanceInfoManager(info):
self._make_checks(instance, sleep=False) self._make_checks(info, sleep=False)
# check that InstanceContextManager works fine in parallel # check that InstanceContextManager works fine in parallel
with context.ThreadGroup() as tg: with context.ThreadGroup() as tg:
for instance in fake_instances: for instance in fake_instances:
info = [None, instance.id, instance.name, None] info = context.InstanceInfo(
None, instance.id, instance.name, None)
with context.InstanceInfoManager(info): with context.InstanceInfoManager(info):
tg.spawn("make_checks", self._make_checks, instance) tg.spawn("make_checks", self._make_checks, info)

View File

@ -22,27 +22,33 @@ import six
from sahara import conductor as c from sahara import conductor as c
from sahara.conductor import resource from sahara.conductor import resource
from sahara import context from sahara import context
from sahara import exceptions
from sahara.i18n import _
conductor = c.API conductor = c.API
def _get_cluster_id(instance):
# If instance is InstanceInfo from context, then get cluster_id directly
if hasattr(instance, 'node_group'):
return instance.node_group.cluster_id
else:
return instance.cluster_id
def add_successful_event(instance): def add_successful_event(instance):
cluster_id = instance.node_group.cluster_id cluster_id = _get_cluster_id(instance)
step_id = get_current_provisioning_step(cluster_id) step_id = get_current_provisioning_step(cluster_id)
if step_id: if step_id:
conductor.cluster_event_add(context.ctx(), step_id, { conductor.cluster_event_add(context.ctx(), step_id, {
'successful': True, 'successful': True,
'node_group_id': instance.node_group_id, 'node_group_id': instance.node_group_id,
'instance_id': instance.id, 'instance_id': instance.instance_id,
'instance_name': instance.instance_name, 'instance_name': instance.instance_name,
'event_info': None, 'event_info': None,
}) })
def add_fail_event(instance, exception): def add_fail_event(instance, exception):
cluster_id = instance.node_group.cluster_id cluster_id = _get_cluster_id(instance)
step_id = get_current_provisioning_step(cluster_id) step_id = get_current_provisioning_step(cluster_id)
event_info = six.text_type(exception) event_info = six.text_type(exception)
@ -50,7 +56,7 @@ def add_fail_event(instance, exception):
conductor.cluster_event_add(context.ctx(), step_id, { conductor.cluster_event_add(context.ctx(), step_id, {
'successful': False, 'successful': False,
'node_group_id': instance.node_group_id, 'node_group_id': instance.node_group_id,
'instance_id': instance.id, 'instance_id': instance.instance_id,
'instance_name': instance.instance_name, 'instance_name': instance.instance_name,
'event_info': event_info, 'event_info': event_info,
}) })
@ -133,27 +139,31 @@ def get_cluster_events(cluster_id, provision_step=None):
return events return events
def event_wrapper(mark_successful_on_exit): def event_wrapper(mark_successful_on_exit, **spec):
""""General event-log wrapper
:param mark_successful_on_exit: should we send success event
after execution of function
:param spec: extra specification
:parameter step: provisioning step name (only for provisioning
steps with only one event)
:parameter param: tuple (name, pos) with parameter specification,
where 'name' is the name of the parameter of function, 'pos' is the
position of the parameter of function. This parameter is used to
extract info about Instance or Cluster.
"""
def decorator(func): def decorator(func):
@functools.wraps(func) @functools.wraps(func)
def handler(*args, **kwargs): def handler(*args, **kwargs):
# NOTE (vgridnev): We should know information about instance, step_name = spec.get('step', None)
# so we should find instance in args or kwargs. instance = _find_in_args(spec, *args, **kwargs)
# Also, we import sahara.conductor.resource
# to check some object is Instance
instance = None if step_name:
for arg in args: # It's single process, let's add provisioning step here
if isinstance(arg, resource.InstanceResource): cluster_id = _get_cluster_id(instance)
instance = arg add_provisioning_step(cluster_id, step_name, 1)
for kw_arg in kwargs.values():
if isinstance(kw_arg, resource.InstanceResource):
instance = kw_arg
if instance is None:
raise exceptions.InvalidDataException(
_("Function should have an Instance as argument"))
try: try:
value = func(*args, **kwargs) value = func(*args, **kwargs)
@ -169,39 +179,50 @@ def event_wrapper(mark_successful_on_exit):
return decorator return decorator
def event_wrapper_without_instance(mark_successful_on_exit): def _get_info_from_instance(arg):
def decorator(func): if isinstance(arg, resource.InstanceResource):
@functools.wraps(func) return arg
def handler(*args, **kwargs): return None
ctx = context.ctx()
(cluster_id, instance_id, instance_name,
node_group_id) = ctx.current_instance_info
step_id = get_current_provisioning_step(cluster_id)
try:
value = func(*args, **kwargs)
except Exception as e:
with excutils.save_and_reraise_exception():
conductor.cluster_event_add(
context.ctx(),
step_id, {
'successful': False,
'node_group_id': node_group_id,
'instance_id': instance_id,
'instance_name': instance_name,
'event_info': six.text_type(e),
})
if mark_successful_on_exit: def _get_info_from_cluster(arg):
conductor.cluster_event_add( if isinstance(arg, resource.ClusterResource):
context.ctx(), return context.InstanceInfo(arg.id)
step_id, { return None
'successful': True,
'node_group_id': node_group_id,
'instance_id': instance_id,
'instance_name': instance_name,
})
def _get_info_from_obj(arg):
functions = [_get_info_from_instance, _get_info_from_cluster]
for func in functions:
value = func(arg)
if value:
return value return value
return handler return None
return decorator
def _find_in_args(spec, *args, **kwargs):
param_values = spec.get('param', None)
if param_values:
p_name, p_pos = param_values
obj = kwargs.get(p_name, None)
if obj:
return _get_info_from_obj(obj)
return _get_info_from_obj(args[p_pos])
# If param is not specified, let's search instance in args
for arg in args:
val = _get_info_from_instance(arg)
if val:
return val
for arg in kwargs.values():
val = _get_info_from_instance(arg)
if val:
return val
# If instance not found in args, let's get instance info from context
return context.ctx().current_instance_info