From 6c0db8e84a8262e4357b60b430b85c8e11447c59 Mon Sep 17 00:00:00 2001 From: "Li, Chen" Date: Mon, 27 Jul 2015 15:29:20 +0800 Subject: [PATCH] Rename oozie_job_id The "oozie_job_id" column in table "job_executions" represents oozie_job_id only when the edp engine is oozie. When it is spark engin, oozie_job_id = pid@instance_id, when it is storm engine, oozie_job_id = topology_name@instance_id. Rename oozie_job_id to engine_job_id to aviod confusing. Change-Id: I2671b91a315b2c7a2b805ce4d494252860a7fe6c Closes-bug: 1479575 --- sahara/conductor/objects.py | 2 +- .../versions/027_rename_oozie_job_id.py | 35 +++++++++++++++++++ sahara/db/sqlalchemy/models.py | 12 ++++++- sahara/plugins/fake/edp_engine.py | 2 +- sahara/service/edp/job_manager.py | 2 +- sahara/service/edp/oozie/engine.py | 4 +-- sahara/service/edp/oozie/oozie.py | 6 ++-- sahara/service/edp/spark/engine.py | 2 +- sahara/service/edp/storm/engine.py | 4 +-- .../unit/db/migration/test_migrations.py | 6 ++++ sahara/tests/unit/service/edp/spark/base.py | 4 +-- .../unit/service/edp/storm/test_storm.py | 8 ++--- 12 files changed, 69 insertions(+), 18 deletions(-) create mode 100644 sahara/db/migration/alembic_migrations/versions/027_rename_oozie_job_id.py diff --git a/sahara/conductor/objects.py b/sahara/conductor/objects.py index 6c9055b0..1ab88de4 100644 --- a/sahara/conductor/objects.py +++ b/sahara/conductor/objects.py @@ -248,7 +248,7 @@ class JobExecution(object): end_time cluster_id info - oozie_job_id + engine_job_id return_code job_configs interface diff --git a/sahara/db/migration/alembic_migrations/versions/027_rename_oozie_job_id.py b/sahara/db/migration/alembic_migrations/versions/027_rename_oozie_job_id.py new file mode 100644 index 00000000..0d318955 --- /dev/null +++ b/sahara/db/migration/alembic_migrations/versions/027_rename_oozie_job_id.py @@ -0,0 +1,35 @@ +# Copyright 2015 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Rename oozie_job_id + +Revision ID: 027 +Revises: 026 +Create Date: 2015-07-27 14:31:02.413053 + +""" + +# revision identifiers, used by Alembic. +revision = '027' +down_revision = '026' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.alter_column('job_executions', 'oozie_job_id', + new_column_name="engine_job_id", + type_=sa.String(length=100)) diff --git a/sahara/db/sqlalchemy/models.py b/sahara/db/sqlalchemy/models.py index 95cc8198..5a40c8ac 100644 --- a/sahara/db/sqlalchemy/models.py +++ b/sahara/db/sqlalchemy/models.py @@ -314,7 +314,7 @@ class JobExecution(mb.SaharaBase): cluster_id = sa.Column(sa.String(36), sa.ForeignKey('clusters.id')) info = sa.Column(st.JsonDictType()) - oozie_job_id = sa.Column(sa.String(100)) + engine_job_id = sa.Column(sa.String(100)) return_code = sa.Column(sa.String(80)) job_configs = sa.Column(st.JsonDictType()) extra = sa.Column(st.JsonDictType()) @@ -322,6 +322,16 @@ class JobExecution(mb.SaharaBase): is_public = sa.Column(sa.Boolean()) is_protected = sa.Column(sa.Boolean()) + def to_dict(self): + d = super(JobExecution, self).to_dict() + # The oozie_job_id filed is renamed to engine_job_id + # to make this field more universal. But, we need to + # carry both engine_job_id and oozie_job_id until we + # can deprecate "oozie_job_id". + d['oozie_job_id'] = self.engine_job_id + + return d + mains_association = sa.Table("mains_association", mb.SaharaBase.metadata, diff --git a/sahara/plugins/fake/edp_engine.py b/sahara/plugins/fake/edp_engine.py index 8090585e..4716475e 100644 --- a/sahara/plugins/fake/edp_engine.py +++ b/sahara/plugins/fake/edp_engine.py @@ -25,7 +25,7 @@ class FakeJobEngine(base_engine.JobEngine): pass def run_job(self, job_execution): - return 'oozie_job_id', edp.JOB_STATUS_SUCCEEDED, None + return 'engine_job_id', edp.JOB_STATUS_SUCCEEDED, None def validate_job_execution(self, cluster, job, data): pass diff --git a/sahara/service/edp/job_manager.py b/sahara/service/edp/job_manager.py index fbd3fc42..5ddcf392 100644 --- a/sahara/service/edp/job_manager.py +++ b/sahara/service/edp/job_manager.py @@ -107,7 +107,7 @@ def _run_job(job_execution_id): # Set the job id and the start time # Optionally, update the status and the 'extra' field - update_dict = {'oozie_job_id': jid, + update_dict = {'engine_job_id': jid, 'start_time': datetime.datetime.now()} if status: update_dict['info'] = {'status': status} diff --git a/sahara/service/edp/oozie/engine.py b/sahara/service/edp/oozie/engine.py index fa8f7d48..9b174713 100644 --- a/sahara/service/edp/oozie/engine.py +++ b/sahara/service/edp/oozie/engine.py @@ -90,13 +90,13 @@ class OozieJobEngine(base_engine.JobEngine): return "%s/workflow.xml" % job_dir def cancel_job(self, job_execution): - if job_execution.oozie_job_id is not None: + if job_execution.engine_job_id is not None: client = self._get_client() client.kill_job(job_execution) return client.get_job_status(job_execution) def get_job_status(self, job_execution): - if job_execution.oozie_job_id is not None: + if job_execution.engine_job_id is not None: return self._get_client().get_job_status(job_execution) def run_job(self, job_execution): diff --git a/sahara/service/edp/oozie/oozie.py b/sahara/service/edp/oozie/oozie.py index 701bcd81..c173ed03 100644 --- a/sahara/service/edp/oozie/oozie.py +++ b/sahara/service/edp/oozie/oozie.py @@ -46,13 +46,13 @@ class OozieClient(object): def kill_job(self, job_execution): session = self._get_http_session(job_execution.extra.get('neutron')) - resp = session.put(self.job_url % job_execution.oozie_job_id + + resp = session.put(self.job_url % job_execution.engine_job_id + "?action=kill") _check_status_code(resp, 200) def get_job_status(self, job_execution, job_id=None): if job_id is None: - job_id = job_execution.oozie_job_id + job_id = job_execution.engine_job_id session = self._get_http_session(job_execution.extra.get('neutron')) resp = session.get(self.job_url % job_id + "?show=info") _check_status_code(resp, 200) @@ -60,7 +60,7 @@ class OozieClient(object): def get_job_logs(self, job_execution): session = self._get_http_session(job_execution.extra.get('neutron')) - resp = session.get(self.job_url % job_execution.oozie_job_id + + resp = session.get(self.job_url % job_execution.engine_job_id + "?show=log") _check_status_code(resp, 200) return resp.text diff --git a/sahara/service/edp/spark/engine.py b/sahara/service/edp/spark/engine.py index bfdcffea..9137ce3e 100644 --- a/sahara/service/edp/spark/engine.py +++ b/sahara/service/edp/spark/engine.py @@ -63,7 +63,7 @@ class SparkJobEngine(base_engine.JobEngine): return "", "" def _get_instance_if_running(self, job_execution): - pid, inst_id = self._get_pid_and_inst_id(job_execution.oozie_job_id) + pid, inst_id = self._get_pid_and_inst_id(job_execution.engine_job_id) if not pid or not inst_id or ( job_execution.info['status'] in edp.JOB_STATUSES_TERMINATED): return None, None diff --git a/sahara/service/edp/storm/engine.py b/sahara/service/edp/storm/engine.py index 14056e41..64e66e6e 100644 --- a/sahara/service/edp/storm/engine.py +++ b/sahara/service/edp/storm/engine.py @@ -53,7 +53,7 @@ class StormJobEngine(base_engine.JobEngine): def _get_instance_if_running(self, job_execution): topology_name, inst_id = self._get_topology_and_inst_id( - job_execution.oozie_job_id) + job_execution.engine_job_id) if not topology_name or not inst_id or ( job_execution.info['status'] in edp.JOB_STATUSES_TERMINATED): return None, None @@ -70,7 +70,7 @@ class StormJobEngine(base_engine.JobEngine): def _get_topology_name(self, job_execution): topology_name, inst_id = self._get_topology_and_inst_id( - job_execution.oozie_job_id) + job_execution.engine_job_id) return topology_name diff --git a/sahara/tests/unit/db/migration/test_migrations.py b/sahara/tests/unit/db/migration/test_migrations.py index ca1055c0..0ccd3cb4 100644 --- a/sahara/tests/unit/db/migration/test_migrations.py +++ b/sahara/tests/unit/db/migration/test_migrations.py @@ -534,6 +534,12 @@ class SaharaMigrationsCheckers(object): self.assertColumnExists(engine, table, 'is_public') self.assertColumnExists(engine, table, 'is_protected') + def _check_027(self, engine, data): + self.assertColumnNotExists(engine, 'job_executions', + 'oozie_job_id') + self.assertColumnExists(engine, 'job_executions', + 'engine_job_id') + class TestMigrationsMySQL(SaharaMigrationsCheckers, base.BaseWalkMigrationTestCase, diff --git a/sahara/tests/unit/service/edp/spark/base.py b/sahara/tests/unit/service/edp/spark/base.py index f3adee33..f50bbfc8 100644 --- a/sahara/tests/unit/service/edp/spark/base.py +++ b/sahara/tests/unit/service/edp/spark/base.py @@ -77,11 +77,11 @@ class TestSpark(base.SaharaTestCase): job_exec = mock.Mock() eng = se.SparkJobEngine("cluster") - job_exec.oozie_job_id = "invalid id" + job_exec.engine_job_id = "invalid id" self.assertEqual((None, None), eng._get_instance_if_running(job_exec)) - job_exec.oozie_job_id = "pid@inst_id" + job_exec.engine_job_id = "pid@inst_id" for state in edp.JOB_STATUSES_TERMINATED: job_exec.info = {'status': state} self.assertEqual((None, None), diff --git a/sahara/tests/unit/service/edp/storm/test_storm.py b/sahara/tests/unit/service/edp/storm/test_storm.py index 09e8e465..449a813a 100644 --- a/sahara/tests/unit/service/edp/storm/test_storm.py +++ b/sahara/tests/unit/service/edp/storm/test_storm.py @@ -70,11 +70,11 @@ class TestStorm(base.SaharaTestCase): job_exec = mock.Mock() eng = se.StormJobEngine("cluster") - job_exec.oozie_job_id = "invalid id" + job_exec.engine_job_id = "invalid id" self.assertEqual((None, None), eng._get_instance_if_running(job_exec)) - job_exec.oozie_job_id = "topology_name@inst_id" + job_exec.engine_job_id = "topology_name@inst_id" for state in edp.JOB_STATUSES_TERMINATED: job_exec.info = {'status': state} self.assertEqual((None, None), @@ -119,7 +119,7 @@ class TestStorm(base.SaharaTestCase): get_instances.return_value = ["instance"] # Pretend process is running - job_exec.oozie_job_id = "topology_name@inst_id" + job_exec.engine_job_id = "topology_name@inst_id" job_exec.info = {'status': edp.JOB_STATUS_RUNNING} job_exec.job_configs = {"configs": {"topology_name": "topology_name"}} status = eng._get_job_status_from_remote(job_exec) @@ -199,7 +199,7 @@ class TestStorm(base.SaharaTestCase): return_value=master_instance) eng = se.StormJobEngine("cluster") job_exec = mock.Mock() - job_exec.oozie_job_id = "topology_name@inst_id" + job_exec.engine_job_id = "topology_name@inst_id" job_exec.info = {'status': edp.JOB_STATUS_RUNNING} job_exec.job_configs = {"configs": {"topology_name": "topology_name"}} status = eng.cancel_job(job_exec)