From 79516ab7d4f3db493b6a708beeefa555446e42d7 Mon Sep 17 00:00:00 2001 From: Aleksey Kasatkin Date: Thu, 13 Nov 2014 18:00:50 +0200 Subject: [PATCH] Filter incomplete tasks Non-completed tasks are verified properly and skipped with 'failed' status Blueprint: send-anon-usage Change-Id: I876daccbc3bc8510b8d7cc5df3c949be907c509f --- .../collector/api/resources/action_logs.py | 32 +++++---- .../collector/api/schemas/action_logs.json | 2 +- .../test/resources/test_action_logs.py | 68 +++++++++++++++++-- 3 files changed, 84 insertions(+), 18 deletions(-) diff --git a/collector/collector/api/resources/action_logs.py b/collector/collector/api/resources/action_logs.py index f6163a7..33dc8ae 100644 --- a/collector/collector/api/resources/action_logs.py +++ b/collector/collector/api/resources/action_logs.py @@ -46,8 +46,17 @@ def post(): for chunk in util.split_collection(action_logs, chunk_size=1000): existed_objs, action_logs_to_add = _separate_action_logs(chunk) objects_info.extend(_extract_objects_info(existed_objs)) + skipped_objs = [] for obj in action_logs_to_add: - obj['body'] = json.dumps(obj['body']) + if obj['body']['action_type'] == 'nailgun_task' and \ + not obj['body']['end_timestamp']: + skipped_objs.append(obj) + else: + obj['body'] = json.dumps(obj['body']) + for obj in skipped_objs: + action_logs_to_add.remove(obj) + objects_info.extend(_extract_dicts_info( + skipped_objs, consts.ACTION_LOG_STATUSES.failed)) objects_info.extend(_save_action_logs(action_logs_to_add)) return 200, {'status': 'ok', 'action_logs': list(objects_info)} @@ -59,21 +68,18 @@ def _save_action_logs(action_logs): return result try: db.session.execute(ActionLog.__table__.insert(), action_logs) - for action_log in action_logs: - result.append({ - 'master_node_uid': action_log['master_node_uid'], - 'external_id': action_log['external_id'], - 'status': consts.ACTION_LOG_STATUSES.added - }) + result = _extract_dicts_info( + action_logs, consts.ACTION_LOG_STATUSES.added) except Exception: app.logger.exception("Processing of action logs chunk failed") - result = _handle_chunk_processing_error(action_logs) + result = _extract_dicts_info( + action_logs, consts.ACTION_LOG_STATUSES.failed) return result -def _extract_objects_info(existed_objects): +def _extract_objects_info(objects): result = [] - for obj in existed_objects: + for obj in objects: result.append({ 'master_node_uid': obj.master_node_uid, 'external_id': obj.external_id, @@ -82,13 +88,13 @@ def _extract_objects_info(existed_objects): return result -def _handle_chunk_processing_error(chunk): +def _extract_dicts_info(dicts, status): result = [] - for action_log in chunk: + for action_log in dicts: result.append({ 'master_node_uid': action_log['master_node_uid'], 'external_id': action_log['external_id'], - 'status': consts.ACTION_LOG_STATUSES.failed + 'status': status }) return result diff --git a/collector/collector/api/schemas/action_logs.json b/collector/collector/api/schemas/action_logs.json index bc1acf0..60abe7e 100644 --- a/collector/collector/api/schemas/action_logs.json +++ b/collector/collector/api/schemas/action_logs.json @@ -46,7 +46,7 @@ "action_name": {"type": "string"}, "action_type": {"enum": ["nailgun_task"]}, "start_timestamp": {"type": "string"}, - "end_timestamp": {"type": "string"}, + "end_timestamp": {"type": ["string", "null"]}, "additional_info": { "type": "object", "properties": { diff --git a/collector/collector/test/resources/test_action_logs.py b/collector/collector/test/resources/test_action_logs.py index 61c683d..dfb0e4c 100644 --- a/collector/collector/test/resources/test_action_logs.py +++ b/collector/collector/test/resources/test_action_logs.py @@ -93,8 +93,8 @@ class TestActionLogs(DbTest): "action_group": "", "action_name": "", "action_type": "nailgun_task", - "start_timestamp": "", - "end_timestamp": "", + "start_timestamp": "1", + "end_timestamp": "2", "additional_info": { "parent_task_id": 0, "subtasks_ids": [], @@ -134,8 +134,8 @@ class TestActionLogs(DbTest): "action_group": "", "action_name": "", "action_type": "nailgun_task", - "start_timestamp": "", - "end_timestamp": "", + "start_timestamp": "3", + "end_timestamp": "4", "additional_info": { "parent_task_id": 0, "subtasks_ids": [], @@ -179,3 +179,63 @@ class TestActionLogs(DbTest): {'action_logs': expected_logs} ) self.check_response_error(resp, code=400) + + def test_incomplete_tasks(self): + master_node_uid = 'x' + action_logs = [ + { + 'master_node_uid': master_node_uid, + 'external_id': i, + 'body': { + "id": i, + "actor_id": "", + "action_group": "cluster_changes", + "action_name": "deployment", + "action_type": "nailgun_task", + "start_timestamp": "1", + # about 1/3 is incomplete + "end_timestamp": "2" if i % 3 else None, + "additional_info": { + "parent_task_id": 0, + "subtasks_ids": [], + "operation": "deployment" + }, + "is_sent": False, + "cluster_id": 5 + } + } + for i in xrange(100)] + completed_count = sum(rec["body"]["end_timestamp"] is not None + for rec in action_logs) + resp = self.post( + '/api/v1/action_logs/', + {'action_logs': action_logs} + ) + self.check_response_ok(resp) + + log_recs = db.session.query(ActionLog).filter( + ActionLog.master_node_uid == master_node_uid) + self.assertEqual( + log_recs.count(), + completed_count + ) + for rec in log_recs: + self.assertIsNotNone(rec.body["end_timestamp"]) + + resp_logs = json.loads(resp.data)['action_logs'] + self.assertEqual( + len(resp_logs), + len(action_logs) + ) + passed = sum(r['status'] == consts.ACTION_LOG_STATUSES.added + for r in resp_logs) + failed = sum(r['status'] == consts.ACTION_LOG_STATUSES.failed + for r in resp_logs) + self.assertEqual( + passed + failed, + len(action_logs) + ) + self.assertEqual( + passed, + completed_count + )