diff --git a/doc/source/devref/plugin.spi.rst b/doc/source/devref/plugin.spi.rst index 67a87a1b..237a16af 100644 --- a/doc/source/devref/plugin.spi.rst +++ b/doc/source/devref/plugin.spi.rst @@ -121,6 +121,13 @@ When user terminates cluster, Savanna simply shuts down all the cluster VMs. Thi *Returns*: None +get_oozie_server(cluster) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Returns the instance object for the host running the Oozie server (this service may be referenced by a vendor-dependent identifier) + +*Returns*: The Oozie server instance object + Object Model ============ diff --git a/savanna/context.py b/savanna/context.py index 9f682128..dfbff1ef 100644 --- a/savanna/context.py +++ b/savanna/context.py @@ -79,6 +79,10 @@ class Context(object): 'roles': self.roles, } + def is_auth_capable(self): + return self.service_catalog and self.token and self.tenant_id and \ + self.user_id + def get_admin_context(): return Context(is_admin=True) diff --git a/savanna/db/migration/alembic_migrations/versions/002_add_job_exec_extra.py b/savanna/db/migration/alembic_migrations/versions/002_add_job_exec_extra.py new file mode 100644 index 00000000..338dd7ce --- /dev/null +++ b/savanna/db/migration/alembic_migrations/versions/002_add_job_exec_extra.py @@ -0,0 +1,40 @@ +# Copyright 2014 Openstack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""add_job_exec_extra + +Revision ID: 44f8293d129d +Revises: 001 +Create Date: 2014-01-23 23:25:13.225628 + +""" + +# revision identifiers, used by Alembic. +revision = '002' +down_revision = '001' + +from alembic import op +import sqlalchemy as sa + +from savanna.db.sqlalchemy import types as st + + +def upgrade(): + op.add_column('job_executions', sa.Column('extra', st.JsonEncoded(), + nullable=True)) + + +def downgrade(): + op.drop_column('job_executions', 'extra') diff --git a/savanna/db/sqlalchemy/models.py b/savanna/db/sqlalchemy/models.py index 8d7ca3b1..14bc8b8e 100644 --- a/savanna/db/sqlalchemy/models.py +++ b/savanna/db/sqlalchemy/models.py @@ -268,7 +268,7 @@ class JobExecution(mb.SavannaBase): job_configs = sa.Column(st.JsonDictType()) main_class = sa.Column(sa.Text()) java_opts = sa.Column(sa.Text()) - + extra = sa.Column(st.JsonDictType()) mains_association = sa.Table("mains_association", mb.SavannaBase.metadata, diff --git a/savanna/plugins/hdp/ambariplugin.py b/savanna/plugins/hdp/ambariplugin.py index 05bfba44..541303eb 100644 --- a/savanna/plugins/hdp/ambariplugin.py +++ b/savanna/plugins/hdp/ambariplugin.py @@ -20,6 +20,7 @@ from savanna import context from savanna import exceptions as exc from savanna.openstack.common import log as logging from savanna.plugins.general import exceptions as ex +from savanna.plugins.general import utils as u from savanna.plugins.hdp import hadoopserver as h from savanna.plugins.hdp import savannautils as utils from savanna.plugins.hdp.versions import versionhandlerfactory as vhf @@ -138,6 +139,9 @@ class AmbariPlugin(p.ProvisioningPluginBase): "node_groups": node_groups, "cluster_configs": cluster_configs}) + def get_oozie_server(self, cluster): + return u.get_instance(cluster, "oozie_server") + def update_infra(self, cluster): pass diff --git a/savanna/plugins/hdp/versions/version_1_3_2/services.py b/savanna/plugins/hdp/versions/version_1_3_2/services.py index 7a899c08..d8a9de33 100644 --- a/savanna/plugins/hdp/versions/version_1_3_2/services.py +++ b/savanna/plugins/hdp/versions/version_1_3_2/services.py @@ -614,11 +614,16 @@ class OozieService(Service): def finalize_configuration(self, cluster_spec): oozie_servers = cluster_spec.determine_component_hosts('OOZIE_SERVER') if oozie_servers: + oozie_server = oozie_servers.pop() + name_list = [oozie_server.fqdn(), oozie_server.internal_ip, + oozie_server.management_ip] self._replace_config_token( - cluster_spec, '%OOZIE_HOST%', oozie_servers.pop().fqdn(), + cluster_spec, '%OOZIE_HOST%', oozie_server.fqdn(), {'global': ['oozie_hostname'], - 'core-site': ['hadoop.proxyuser.oozie.hosts'], 'oozie-site': ['oozie.base.url']}) + self._replace_config_token( + cluster_spec, '%OOZIE_HOST%', ",".join(name_list), + {'core-site': ['hadoop.proxyuser.oozie.hosts']}) def finalize_ng_components(self, cluster_spec): oozie_ng = cluster_spec.get_node_groups_containing_component( diff --git a/savanna/plugins/intel/plugin.py b/savanna/plugins/intel/plugin.py index 170e1c4c..91ba5b8e 100644 --- a/savanna/plugins/intel/plugin.py +++ b/savanna/plugins/intel/plugin.py @@ -179,3 +179,6 @@ class IDHProvider(p.ProvisioningPluginBase): ctx = context.ctx() conductor.cluster_update(ctx, cluster, {'info': info}) + + def get_oozie_server(self, cluster): + return u.get_instance(cluster, "oozie") diff --git a/savanna/plugins/provisioning.py b/savanna/plugins/provisioning.py index 305ee127..79d68607 100644 --- a/savanna/plugins/provisioning.py +++ b/savanna/plugins/provisioning.py @@ -66,6 +66,10 @@ class ProvisioningPluginBase(plugins_base.PluginInterface): def scale_cluster(self, cluster, instances): pass + @plugins_base.optional + def get_oozie_server(self, cluster): + pass + @plugins_base.required_with_default def decommission_nodes(self, cluster, instances): pass diff --git a/savanna/plugins/vanilla/plugin.py b/savanna/plugins/vanilla/plugin.py index a530cc3d..03e678bf 100644 --- a/savanna/plugins/vanilla/plugin.py +++ b/savanna/plugins/vanilla/plugin.py @@ -56,6 +56,9 @@ class VanillaProvider(p.ProvisioningPluginBase): "1.2.1 cluster without any management consoles. Also it can " "deploy Oozie 4.0.0 and Hive 0.11.0") + def get_oozie_server(self, cluster): + return utils.get_instance(cluster, "oozie") + def get_versions(self): return ['1.2.1'] diff --git a/savanna/service/edp/api.py b/savanna/service/edp/api.py index 939a6107..247fe3e6 100644 --- a/savanna/service/edp/api.py +++ b/savanna/service/edp/api.py @@ -13,9 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +from oslo.config import cfg + from savanna import conductor as c from savanna import context from savanna.openstack.common import log as logging +from savanna.plugins import base as plugin_base from savanna.service.edp.binary_retrievers import dispatch from savanna.service.edp import job_manager as manager from savanna.service.edp.workflow_creator import workflow_factory as w_f @@ -23,6 +26,7 @@ from savanna.service.edp.workflow_creator import workflow_factory as w_f conductor = c.API LOG = logging.getLogger(__name__) +CONF = cfg.CONF def get_job_config_hints(job_type): @@ -41,6 +45,17 @@ def execute_job(job_id, data): if "args" in configs and type(configs["args"]) is dict: configs["args"] = [] + ctx = context.current() + cluster = conductor.cluster_get(ctx, cluster_id) + plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name) + instance = plugin.get_oozie_server(cluster) + + extra = {} + info = None + if CONF.use_namespaces and not CONF.use_floating_ips: + info = instance.remote().get_neutron_info() + extra['neutron'] = info + # Not in Java job types but present for all others input_id = data.get('input_id', None) output_id = data.get('output_id', None) @@ -55,8 +70,8 @@ def execute_job(job_id, data): 'java_opts': java_opts, 'input_id': input_id, 'output_id': output_id, 'job_id': job_id, 'cluster_id': cluster_id, - 'info': {'status': 'Pending'}, 'job_configs': configs} - + 'info': {'status': 'Pending'}, 'job_configs': configs, + 'extra': extra} job_execution = conductor.job_execution_create(context.ctx(), job_ex_dict) context.spawn("Starting Job Execution %s" % job_execution.id, diff --git a/savanna/service/edp/job_manager.py b/savanna/service/edp/job_manager.py index 2a94f175..2f6b9fd6 100644 --- a/savanna/service/edp/job_manager.py +++ b/savanna/service/edp/job_manager.py @@ -61,8 +61,9 @@ def get_job_status(job_execution_id): if cluster is None or cluster.status != 'Active': return job_execution - client = o.OozieClient(cluster['info']['JobFlow']['Oozie'] + "/oozie") - job_info = client.get_job_status(job_execution.oozie_job_id) + client = o.OozieClient(cluster['info']['JobFlow']['Oozie'] + "/oozie", + _get_oozie_server(cluster)) + job_info = client.get_job_status(job_execution) update = {"info": job_info} if job_info['status'] in terminated_job_states: update['end_time'] = datetime.datetime.now() @@ -82,15 +83,21 @@ def update_job_statuses(): (je.id, e)) +def _get_oozie_server(cluster): + plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name) + return plugin.get_oozie_server(cluster) + + def cancel_job(job_execution_id): ctx = context.ctx() job_execution = conductor.job_execution_get(ctx, job_execution_id) cluster = conductor.cluster_get(ctx, job_execution.cluster_id) - client = o.OozieClient(cluster['info']['JobFlow']['Oozie'] + "/oozie/") - client.kill_job(job_execution.oozie_job_id) + client = o.OozieClient(cluster['info']['JobFlow']['Oozie'] + "/oozie/", + _get_oozie_server(cluster)) + client.kill_job(job_execution) - job_info = client.get_job_status(job_execution.oozie_job_id) + job_info = client.get_job_status(job_execution) update = {"info": job_info, "end_time": datetime.datetime.now()} job_execution = conductor.job_execution_update(ctx, job_execution, @@ -141,7 +148,8 @@ def run_job(job_execution): jt_path = cluster['info']['MapReduce']['JobTracker'] nn_path = cluster['info']['HDFS']['NameNode'] - client = o.OozieClient(cluster['info']['JobFlow']['Oozie'] + "/oozie/") + client = o.OozieClient(cluster['info']['JobFlow']['Oozie'] + "/oozie/", + _get_oozie_server(cluster)) job_parameters = {"jobTracker": jt_path, "nameNode": nn_path, "user.name": hdfs_user, @@ -149,13 +157,14 @@ def run_job(job_execution): "%s%s" % (nn_path, path_to_workflow), "oozie.use.system.libpath": "true"} - oozie_job_id = client.add_job(x.create_hadoop_xml(job_parameters)) - client.run_job(oozie_job_id) + oozie_job_id = client.add_job(x.create_hadoop_xml(job_parameters), + job_execution) job_execution = conductor.job_execution_update(ctx, job_execution, {'oozie_job_id': oozie_job_id, 'start_time': datetime.datetime.now()}) + client.run_job(job_execution, oozie_job_id) return job_execution diff --git a/savanna/service/edp/oozie.py b/savanna/service/edp/oozie.py index 14698533..6266ebf0 100644 --- a/savanna/service/edp/oozie.py +++ b/savanna/service/edp/oozie.py @@ -15,39 +15,52 @@ import json import re -import requests +from six.moves.urllib import parse as urlparse import urllib import savanna.exceptions as ex class OozieClient(object): - def __init__(self, url): + def __init__(self, url, oozie_server): self.job_url = url + "/v1/job/%s" self.jobs_url = url + "/v1/jobs" + self.oozie_server = oozie_server + self.port = urlparse.urlparse(url).port - def add_job(self, job_config): - resp = requests.post(self.jobs_url, job_config, headers={ + def _get_http_session(self, info=None): + return self.oozie_server.remote().get_http_client(self.port, info=info) + + def add_job(self, job_config, job_execution): + session = self._get_http_session(job_execution.extra.get('neutron')) + resp = session.post(self.jobs_url, data=job_config, headers={ "Content-Type": "application/xml;charset=UTF-8" }) _check_status_code(resp, 201) return get_json(resp)['id'] - def run_job(self, job_id): - resp = requests.put(self.job_url % job_id + "?action=start") + def run_job(self, job_execution, job_id): + session = self._get_http_session(job_execution.extra.get('neutron')) + resp = session.put(self.job_url % job_id + "?action=start") _check_status_code(resp, 200) - def kill_job(self, job_id): - resp = requests.put(self.job_url % job_id + "?action=kill") + def kill_job(self, job_execution): + session = self._get_http_session(job_execution.extra.get('neutron')) + resp = session.put(self.job_url % job_execution.oozie_job_id + + "?action=kill") _check_status_code(resp, 200) - def get_job_status(self, job_id): - resp = requests.get(self.job_url % job_id + "?show=info") + def get_job_status(self, job_execution): + session = self._get_http_session(job_execution.extra.get('neutron')) + resp = session.get(self.job_url % job_execution.oozie_job_id + + "?show=info") _check_status_code(resp, 200) return get_json(resp) - def get_job_logs(self, job_id): - resp = requests.get(self.job_url % job_id + "?show=log") + def get_job_logs(self, job_execution): + session = self._get_http_session(job_execution.extra.get('neutron')) + resp = session.get(self.job_url % job_execution.oozie_job_id + + "?show=log") _check_status_code(resp, 200) return resp.text @@ -57,7 +70,8 @@ class OozieClient(object): f = ";".join([k + "=" + v for k, v in filter.items()]) url += "&filter=" + urllib.quote(f) - resp = requests.get(url) + session = self._get_http_session() + resp = session.get(url) _check_status_code(resp, 200) return get_json(resp) diff --git a/savanna/tests/unit/db/migration/test_migrations.py b/savanna/tests/unit/db/migration/test_migrations.py index dc39fd51..391aa619 100644 --- a/savanna/tests/unit/db/migration/test_migrations.py +++ b/savanna/tests/unit/db/migration/test_migrations.py @@ -323,3 +323,6 @@ class TestMigrations(base.BaseWalkMigrationTestCase, base.CommonTestsMixIn): ] self.assertColumnsExists(engine, 'instances', instances_columns) self.assertColumnCount(engine, 'instances', instances_columns) + + def _check_002(self, engine, date): + self.assertColumnExists(engine, 'job_executions', 'extra') diff --git a/savanna/tests/unit/plugins/hdp/test_ambariplugin.py b/savanna/tests/unit/plugins/hdp/test_ambariplugin.py index e5d30597..c82a39e9 100644 --- a/savanna/tests/unit/plugins/hdp/test_ambariplugin.py +++ b/savanna/tests/unit/plugins/hdp/test_ambariplugin.py @@ -288,6 +288,25 @@ class AmbariPluginTest(unittest2.TestCase): self.assertEqual('admin', ambari_info.user) self.assertEqual('admin', ambari_info.password) + def test_get_oozie_server(self): + test_host = base.TestServer( + 'host1', 'test-master', '11111', 3, '111.11.1111', + '222.11.1111') + + node_group = base.TestNodeGroup( + 'ng1', [test_host], ["AMBARI_SERVER", "NAMENODE", "DATANODE", + "JOBTRACKER", "TASKTRACKER", "OOZIE_SERVER"]) + cluster = base.TestCluster([node_group]) + plugin = ap.AmbariPlugin() + + self.assertIsNotNone(plugin.get_oozie_server(cluster)) + + node_group = base.TestNodeGroup( + 'ng1', [test_host], ["AMBARI_SERVER", "NAMENODE", "DATANODE", + "JOBTRACKER", "TASKTRACKER", "NOT_OOZIE"]) + cluster = base.TestCluster([node_group]) + self.assertIsNone(plugin.get_oozie_server(cluster)) + def _get_test_request(self, host, port): request = base.TestRequest() self.requests.append(request) diff --git a/savanna/tests/unit/plugins/hdp/test_clusterspec.py b/savanna/tests/unit/plugins/hdp/test_clusterspec.py index c861279f..090a5956 100644 --- a/savanna/tests/unit/plugins/hdp/test_clusterspec.py +++ b/savanna/tests/unit/plugins/hdp/test_clusterspec.py @@ -291,7 +291,7 @@ class ClusterSpecTest(unittest2.TestCase): self.assertEqual(config['global']['oozie_hostname'], 'oozie_host.novalocal') self.assertEqual(config['core-site']['hadoop.proxyuser.oozie.hosts'], - 'oozie_host.novalocal') + 'oozie_host.novalocal,222.11.9999,111.11.9999') # test swift properties self.assertEqual('swift_prop_value', diff --git a/savanna/tests/unit/test_context.py b/savanna/tests/unit/test_context.py index 909f1703..7eab819b 100644 --- a/savanna/tests/unit/test_context.py +++ b/savanna/tests/unit/test_context.py @@ -117,6 +117,19 @@ class ContextTest(unittest2.TestCase): self.assertIsNotNone(tg.exc) self.assertEqual(tg.failed_thread, 'test thread') + def test_is_auth_capable_for_admin_ctx(self): + ctx = context.ctx() + self.assertFalse(ctx.is_auth_capable()) + + def test_is_auth_capable_for_user_ctx(self): + existing_ctx = context.ctx() + try: + ctx = context.Context('test_user', 'tenant_1', 'test_auth_token', + {"network": "aURL"}, remote_semaphore='123') + self.assertTrue(ctx.is_auth_capable()) + finally: + context.set_ctx(existing_ctx) + class TestException(Exception): pass diff --git a/savanna/tests/unit/utils/test_hashabledict.py b/savanna/tests/unit/utils/test_hashabledict.py new file mode 100644 index 00000000..22289279 --- /dev/null +++ b/savanna/tests/unit/utils/test_hashabledict.py @@ -0,0 +1,28 @@ +# Copyright (c) 2013 Hortonworks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import unittest2 + +from savanna.utils import hashabledict as h + + +class HashableDictTest(unittest2.TestCase): + + def test_is_hashable(self): + hd = h.HashableDict() + hd['one'] = 'oneValue' + + self.assertTrue(isinstance(hd, collections.Hashable)) diff --git a/savanna/utils/hashabledict.py b/savanna/utils/hashabledict.py new file mode 100644 index 00000000..df755681 --- /dev/null +++ b/savanna/utils/hashabledict.py @@ -0,0 +1,21 @@ +# Copyright (c) 2013 Hortonworks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six + + +class HashableDict(dict): + def __hash__(self): + return hash((frozenset(self), frozenset(six.itervalues(self)))) diff --git a/savanna/utils/remote.py b/savanna/utils/remote.py index 50bcc7e1..ba93ce2c 100644 --- a/savanna/utils/remote.py +++ b/savanna/utils/remote.py @@ -45,6 +45,7 @@ from savanna import context from savanna import exceptions as ex from savanna.openstack.common import excutils from savanna.utils import crypto +from savanna.utils import hashabledict as h from savanna.utils.openstack import base from savanna.utils.openstack import neutron from savanna.utils import procutils @@ -307,8 +308,8 @@ class InstanceInteropHelper(object): finally: _release_remote_semaphore() - def _get_neutron_info(self): - neutron_info = HashableDict() + def get_neutron_info(self): + neutron_info = h.HashableDict() neutron_info['network'] = \ self.instance.node_group.cluster.neutron_management_network ctx = context.current() @@ -323,7 +324,7 @@ class InstanceInteropHelper(object): def _get_conn_params(self): info = None if CONF.use_namespaces and not CONF.use_floating_ips: - info = self._get_neutron_info() + info = self.get_neutron_info() return (self.instance.management_ip, self.instance.node_group.image_username, self.instance.node_group.cluster.management_private_key, info) @@ -357,12 +358,13 @@ class InstanceInteropHelper(object): finally: _release_remote_semaphore() - def get_http_client(self, port): + def get_http_client(self, port, info=None): self._log_command('Retrieving http session for {0}:{1}' .format(self.instance.management_ip, port)) - info = None if CONF.use_namespaces and not CONF.use_floating_ips: - info = self._get_neutron_info() + # need neutron info + if not info: + info = self.get_neutron_info() return _get_http_client(self.instance.management_ip, port, info) def close_http_sessions(self): @@ -452,8 +454,3 @@ class BulkInstanceInteropHelper(InstanceInteropHelper): def _run_s(self, func, timeout, *args, **kwargs): return self._run_with_log(func, timeout, *args, **kwargs) - - -class HashableDict(dict): - def __hash__(self): - return hash((frozenset(self), frozenset(six.itervalues(self))))