Filter incomplete tasks

Non-completed tasks are verified properly and skipped with 'failed' status

Blueprint: send-anon-usage

Change-Id: I876daccbc3bc8510b8d7cc5df3c949be907c509f
This commit is contained in:
Aleksey Kasatkin 2014-11-13 18:00:50 +02:00
parent 3a2f16b5ba
commit 79516ab7d4
3 changed files with 84 additions and 18 deletions

View File

@ -46,8 +46,17 @@ def post():
for chunk in util.split_collection(action_logs, chunk_size=1000):
existed_objs, action_logs_to_add = _separate_action_logs(chunk)
objects_info.extend(_extract_objects_info(existed_objs))
skipped_objs = []
for obj in action_logs_to_add:
obj['body'] = json.dumps(obj['body'])
if obj['body']['action_type'] == 'nailgun_task' and \
not obj['body']['end_timestamp']:
skipped_objs.append(obj)
else:
obj['body'] = json.dumps(obj['body'])
for obj in skipped_objs:
action_logs_to_add.remove(obj)
objects_info.extend(_extract_dicts_info(
skipped_objs, consts.ACTION_LOG_STATUSES.failed))
objects_info.extend(_save_action_logs(action_logs_to_add))
return 200, {'status': 'ok', 'action_logs': list(objects_info)}
@ -59,21 +68,18 @@ def _save_action_logs(action_logs):
return result
try:
db.session.execute(ActionLog.__table__.insert(), action_logs)
for action_log in action_logs:
result.append({
'master_node_uid': action_log['master_node_uid'],
'external_id': action_log['external_id'],
'status': consts.ACTION_LOG_STATUSES.added
})
result = _extract_dicts_info(
action_logs, consts.ACTION_LOG_STATUSES.added)
except Exception:
app.logger.exception("Processing of action logs chunk failed")
result = _handle_chunk_processing_error(action_logs)
result = _extract_dicts_info(
action_logs, consts.ACTION_LOG_STATUSES.failed)
return result
def _extract_objects_info(existed_objects):
def _extract_objects_info(objects):
result = []
for obj in existed_objects:
for obj in objects:
result.append({
'master_node_uid': obj.master_node_uid,
'external_id': obj.external_id,
@ -82,13 +88,13 @@ def _extract_objects_info(existed_objects):
return result
def _handle_chunk_processing_error(chunk):
def _extract_dicts_info(dicts, status):
result = []
for action_log in chunk:
for action_log in dicts:
result.append({
'master_node_uid': action_log['master_node_uid'],
'external_id': action_log['external_id'],
'status': consts.ACTION_LOG_STATUSES.failed
'status': status
})
return result

View File

@ -46,7 +46,7 @@
"action_name": {"type": "string"},
"action_type": {"enum": ["nailgun_task"]},
"start_timestamp": {"type": "string"},
"end_timestamp": {"type": "string"},
"end_timestamp": {"type": ["string", "null"]},
"additional_info": {
"type": "object",
"properties": {

View File

@ -93,8 +93,8 @@ class TestActionLogs(DbTest):
"action_group": "",
"action_name": "",
"action_type": "nailgun_task",
"start_timestamp": "",
"end_timestamp": "",
"start_timestamp": "1",
"end_timestamp": "2",
"additional_info": {
"parent_task_id": 0,
"subtasks_ids": [],
@ -134,8 +134,8 @@ class TestActionLogs(DbTest):
"action_group": "",
"action_name": "",
"action_type": "nailgun_task",
"start_timestamp": "",
"end_timestamp": "",
"start_timestamp": "3",
"end_timestamp": "4",
"additional_info": {
"parent_task_id": 0,
"subtasks_ids": [],
@ -179,3 +179,63 @@ class TestActionLogs(DbTest):
{'action_logs': expected_logs}
)
self.check_response_error(resp, code=400)
def test_incomplete_tasks(self):
master_node_uid = 'x'
action_logs = [
{
'master_node_uid': master_node_uid,
'external_id': i,
'body': {
"id": i,
"actor_id": "",
"action_group": "cluster_changes",
"action_name": "deployment",
"action_type": "nailgun_task",
"start_timestamp": "1",
# about 1/3 is incomplete
"end_timestamp": "2" if i % 3 else None,
"additional_info": {
"parent_task_id": 0,
"subtasks_ids": [],
"operation": "deployment"
},
"is_sent": False,
"cluster_id": 5
}
}
for i in xrange(100)]
completed_count = sum(rec["body"]["end_timestamp"] is not None
for rec in action_logs)
resp = self.post(
'/api/v1/action_logs/',
{'action_logs': action_logs}
)
self.check_response_ok(resp)
log_recs = db.session.query(ActionLog).filter(
ActionLog.master_node_uid == master_node_uid)
self.assertEqual(
log_recs.count(),
completed_count
)
for rec in log_recs:
self.assertIsNotNone(rec.body["end_timestamp"])
resp_logs = json.loads(resp.data)['action_logs']
self.assertEqual(
len(resp_logs),
len(action_logs)
)
passed = sum(r['status'] == consts.ACTION_LOG_STATUSES.added
for r in resp_logs)
failed = sum(r['status'] == consts.ACTION_LOG_STATUSES.failed
for r in resp_logs)
self.assertEqual(
passed + failed,
len(action_logs)
)
self.assertEqual(
passed,
completed_count
)