Rename oozie_job_id

The "oozie_job_id" column in table "job_executions" represents
oozie_job_id only when the edp engine is oozie. When it is spark
engin, oozie_job_id = pid@instance_id, when it is storm engine,
oozie_job_id = topology_name@instance_id.

Rename oozie_job_id to engine_job_id to aviod confusing.

Change-Id: I2671b91a315b2c7a2b805ce4d494252860a7fe6c
Closes-bug: 1479575
This commit is contained in:
Li, Chen 2015-07-27 15:29:20 +08:00
parent a580ceb7f3
commit 6c0db8e84a
12 changed files with 69 additions and 18 deletions

View File

@ -248,7 +248,7 @@ class JobExecution(object):
end_time
cluster_id
info
oozie_job_id
engine_job_id
return_code
job_configs
interface

View File

@ -0,0 +1,35 @@
# Copyright 2015 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Rename oozie_job_id
Revision ID: 027
Revises: 026
Create Date: 2015-07-27 14:31:02.413053
"""
# revision identifiers, used by Alembic.
revision = '027'
down_revision = '026'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.alter_column('job_executions', 'oozie_job_id',
new_column_name="engine_job_id",
type_=sa.String(length=100))

View File

@ -314,7 +314,7 @@ class JobExecution(mb.SaharaBase):
cluster_id = sa.Column(sa.String(36),
sa.ForeignKey('clusters.id'))
info = sa.Column(st.JsonDictType())
oozie_job_id = sa.Column(sa.String(100))
engine_job_id = sa.Column(sa.String(100))
return_code = sa.Column(sa.String(80))
job_configs = sa.Column(st.JsonDictType())
extra = sa.Column(st.JsonDictType())
@ -322,6 +322,16 @@ class JobExecution(mb.SaharaBase):
is_public = sa.Column(sa.Boolean())
is_protected = sa.Column(sa.Boolean())
def to_dict(self):
d = super(JobExecution, self).to_dict()
# The oozie_job_id filed is renamed to engine_job_id
# to make this field more universal. But, we need to
# carry both engine_job_id and oozie_job_id until we
# can deprecate "oozie_job_id".
d['oozie_job_id'] = self.engine_job_id
return d
mains_association = sa.Table("mains_association",
mb.SaharaBase.metadata,

View File

@ -25,7 +25,7 @@ class FakeJobEngine(base_engine.JobEngine):
pass
def run_job(self, job_execution):
return 'oozie_job_id', edp.JOB_STATUS_SUCCEEDED, None
return 'engine_job_id', edp.JOB_STATUS_SUCCEEDED, None
def validate_job_execution(self, cluster, job, data):
pass

View File

@ -107,7 +107,7 @@ def _run_job(job_execution_id):
# Set the job id and the start time
# Optionally, update the status and the 'extra' field
update_dict = {'oozie_job_id': jid,
update_dict = {'engine_job_id': jid,
'start_time': datetime.datetime.now()}
if status:
update_dict['info'] = {'status': status}

View File

@ -90,13 +90,13 @@ class OozieJobEngine(base_engine.JobEngine):
return "%s/workflow.xml" % job_dir
def cancel_job(self, job_execution):
if job_execution.oozie_job_id is not None:
if job_execution.engine_job_id is not None:
client = self._get_client()
client.kill_job(job_execution)
return client.get_job_status(job_execution)
def get_job_status(self, job_execution):
if job_execution.oozie_job_id is not None:
if job_execution.engine_job_id is not None:
return self._get_client().get_job_status(job_execution)
def run_job(self, job_execution):

View File

@ -46,13 +46,13 @@ class OozieClient(object):
def kill_job(self, job_execution):
session = self._get_http_session(job_execution.extra.get('neutron'))
resp = session.put(self.job_url % job_execution.oozie_job_id +
resp = session.put(self.job_url % job_execution.engine_job_id +
"?action=kill")
_check_status_code(resp, 200)
def get_job_status(self, job_execution, job_id=None):
if job_id is None:
job_id = job_execution.oozie_job_id
job_id = job_execution.engine_job_id
session = self._get_http_session(job_execution.extra.get('neutron'))
resp = session.get(self.job_url % job_id + "?show=info")
_check_status_code(resp, 200)
@ -60,7 +60,7 @@ class OozieClient(object):
def get_job_logs(self, job_execution):
session = self._get_http_session(job_execution.extra.get('neutron'))
resp = session.get(self.job_url % job_execution.oozie_job_id +
resp = session.get(self.job_url % job_execution.engine_job_id +
"?show=log")
_check_status_code(resp, 200)
return resp.text

View File

@ -63,7 +63,7 @@ class SparkJobEngine(base_engine.JobEngine):
return "", ""
def _get_instance_if_running(self, job_execution):
pid, inst_id = self._get_pid_and_inst_id(job_execution.oozie_job_id)
pid, inst_id = self._get_pid_and_inst_id(job_execution.engine_job_id)
if not pid or not inst_id or (
job_execution.info['status'] in edp.JOB_STATUSES_TERMINATED):
return None, None

View File

@ -53,7 +53,7 @@ class StormJobEngine(base_engine.JobEngine):
def _get_instance_if_running(self, job_execution):
topology_name, inst_id = self._get_topology_and_inst_id(
job_execution.oozie_job_id)
job_execution.engine_job_id)
if not topology_name or not inst_id or (
job_execution.info['status'] in edp.JOB_STATUSES_TERMINATED):
return None, None
@ -70,7 +70,7 @@ class StormJobEngine(base_engine.JobEngine):
def _get_topology_name(self, job_execution):
topology_name, inst_id = self._get_topology_and_inst_id(
job_execution.oozie_job_id)
job_execution.engine_job_id)
return topology_name

View File

@ -534,6 +534,12 @@ class SaharaMigrationsCheckers(object):
self.assertColumnExists(engine, table, 'is_public')
self.assertColumnExists(engine, table, 'is_protected')
def _check_027(self, engine, data):
self.assertColumnNotExists(engine, 'job_executions',
'oozie_job_id')
self.assertColumnExists(engine, 'job_executions',
'engine_job_id')
class TestMigrationsMySQL(SaharaMigrationsCheckers,
base.BaseWalkMigrationTestCase,

View File

@ -77,11 +77,11 @@ class TestSpark(base.SaharaTestCase):
job_exec = mock.Mock()
eng = se.SparkJobEngine("cluster")
job_exec.oozie_job_id = "invalid id"
job_exec.engine_job_id = "invalid id"
self.assertEqual((None, None),
eng._get_instance_if_running(job_exec))
job_exec.oozie_job_id = "pid@inst_id"
job_exec.engine_job_id = "pid@inst_id"
for state in edp.JOB_STATUSES_TERMINATED:
job_exec.info = {'status': state}
self.assertEqual((None, None),

View File

@ -70,11 +70,11 @@ class TestStorm(base.SaharaTestCase):
job_exec = mock.Mock()
eng = se.StormJobEngine("cluster")
job_exec.oozie_job_id = "invalid id"
job_exec.engine_job_id = "invalid id"
self.assertEqual((None, None),
eng._get_instance_if_running(job_exec))
job_exec.oozie_job_id = "topology_name@inst_id"
job_exec.engine_job_id = "topology_name@inst_id"
for state in edp.JOB_STATUSES_TERMINATED:
job_exec.info = {'status': state}
self.assertEqual((None, None),
@ -119,7 +119,7 @@ class TestStorm(base.SaharaTestCase):
get_instances.return_value = ["instance"]
# Pretend process is running
job_exec.oozie_job_id = "topology_name@inst_id"
job_exec.engine_job_id = "topology_name@inst_id"
job_exec.info = {'status': edp.JOB_STATUS_RUNNING}
job_exec.job_configs = {"configs": {"topology_name": "topology_name"}}
status = eng._get_job_status_from_remote(job_exec)
@ -199,7 +199,7 @@ class TestStorm(base.SaharaTestCase):
return_value=master_instance)
eng = se.StormJobEngine("cluster")
job_exec = mock.Mock()
job_exec.oozie_job_id = "topology_name@inst_id"
job_exec.engine_job_id = "topology_name@inst_id"
job_exec.info = {'status': edp.JOB_STATUS_RUNNING}
job_exec.job_configs = {"configs": {"topology_name": "topology_name"}}
status = eng.cancel_job(job_exec)