diff --git a/sahara/plugins/ambari/__init__.py b/sahara/plugins/ambari/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sahara/plugins/ambari/client.py b/sahara/plugins/ambari/client.py new file mode 100644 index 00000000..eca97ae7 --- /dev/null +++ b/sahara/plugins/ambari/client.py @@ -0,0 +1,82 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from oslo_serialization import jsonutils +from requests import auth + + +class AmbariClient(object): + def __init__(self, instance, port="8080", **kwargs): + kwargs.setdefault("username", "admin") + kwargs.setdefault("password", "admin") + + self._port = port + self._base_url = "http://{host}:{port}/api/v1".format( + host=instance.management_ip, port=port) + self._instance = instance + self._http_client = instance.remote().get_http_client(port) + self._headers = {"X-Requested-By": "sahara"} + self._auth = auth.HTTPBasicAuth(kwargs["username"], kwargs["password"]) + self._default_client_args = {"verify": False, "auth": self._auth, + "headers": self._headers} + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.close() + + def close(self): + self._instance.remote().close_http_session(self._port) + + def get(self, *args, **kwargs): + kwargs.update(self._default_client_args) + return self._http_client.get(*args, **kwargs) + + def post(self, *args, **kwargs): + kwargs.update(self._default_client_args) + return self._http_client.post(*args, **kwargs) + + def put(self, *args, **kwargs): + kwargs.update(self._default_client_args) + return self._http_client.put(*args, **kwargs) + + def delete(self, *args, **kwargs): + kwargs.update(self._default_client_args) + return self._http_client.delete(*args, **kwargs) + + @staticmethod + def check_response(resp): + resp.raise_for_status() + if resp.text: + return jsonutils.loads(resp.text) + + def get_registered_hosts(self): + url = self._base_url + "/hosts" + resp = self.get(url) + data = self.check_response(resp) + return data.get("items", []) + + def update_user_password(self, user, old_password, new_password): + url = self._base_url + "/users/%s" % user + data = jsonutils.dumps({ + "Users": { + "old_password": old_password, + "password": new_password + } + }) + resp = self.put(url, data=data) + self.check_response(resp) diff --git a/sahara/plugins/ambari/common.py b/sahara/plugins/ambari/common.py new file mode 100644 index 00000000..242d0e67 --- /dev/null +++ b/sahara/plugins/ambari/common.py @@ -0,0 +1,19 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# define service names + +AMBARI_SERVER = "Ambari" diff --git a/sahara/plugins/ambari/deploy.py b/sahara/plugins/ambari/deploy.py new file mode 100644 index 00000000..1fd00cd5 --- /dev/null +++ b/sahara/plugins/ambari/deploy.py @@ -0,0 +1,113 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import functools +import telnetlib + +from oslo_log import log as logging +from oslo_utils import uuidutils + +from sahara import conductor +from sahara import context +from sahara.i18n import _ +from sahara.plugins.ambari import client as ambari_client +from sahara.plugins.ambari import common as p_common +from sahara.plugins import exceptions as p_exc +from sahara.plugins import utils as plugin_utils +from sahara.utils import poll_utils + + +LOG = logging.getLogger(__name__) +conductor = conductor.API + + +def setup_ambari(cluster): + LOG.debug("Set up Ambari management console") + ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) + with ambari.remote() as r: + sudo = functools.partial(r.execute_command, run_as_root=True) + sudo("ambari-server setup -s -j" + " `cut -f2 -d \"=\" /etc/profile.d/99-java.sh`", timeout=1800) + sudo("service ambari-server start") + LOG.debug("Ambari management console installed") + + +def setup_agents(cluster): + LOG.debug("Set up Ambari agents") + manager_address = plugin_utils.get_instance( + cluster, p_common.AMBARI_SERVER).fqdn() + with context.ThreadGroup() as tg: + for inst in plugin_utils.get_instances(cluster): + tg.spawn("hwx-agent-setup-%s" % inst.id, + _setup_agent, inst, manager_address) + LOG.debug("Ambari agents has been installed") + + +def _setup_agent(instance, ambari_address): + with instance.remote() as r: + sudo = functools.partial(r.execute_command, run_as_root=True) + r.replace_remote_string("/etc/ambari-agent/conf/ambari-agent.ini", + "localhost", ambari_address) + sudo("service ambari-agent start") + # for correct installing packages + sudo("yum clean all") + + +def wait_ambari_accessible(cluster): + ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) + kwargs = {"host": ambari.management_ip, "port": 8080} + poll_utils.poll(_check_port_accessible, kwargs=kwargs, timeout=300) + + +def _check_port_accessible(host, port): + try: + conn = telnetlib.Telnet(host, port) + conn.close() + return True + except IOError: + return False + + +def update_default_ambari_password(cluster): + ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) + new_password = uuidutils.generate_uuid() + with ambari_client.AmbariClient(ambari) as client: + client.update_user_password("admin", "admin", new_password) + extra = cluster.extra.to_dict() if cluster.extra else {} + extra["ambari_password"] = new_password + ctx = context.ctx() + conductor.cluster_update(ctx, cluster, {"extra": extra}) + cluster = conductor.cluster_get(ctx, cluster.id) + + +def wait_host_registration(cluster): + ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) + hosts = plugin_utils.get_instances(cluster) + password = cluster.extra["ambari_password"] + with ambari_client.AmbariClient(ambari, password=password) as client: + kwargs = {"client": client, "num_hosts": len(hosts)} + poll_utils.poll(_check_host_registration, kwargs=kwargs, timeout=600) + registered_hosts = client.get_registered_hosts() + registered_host_names = [h["Hosts"]["host_name"] for h in registered_hosts] + actual_host_names = [h.fqdn() for h in hosts] + if sorted(registered_host_names) != sorted(actual_host_names): + raise p_exc.HadoopProvisionError( + _("Host registration fails in Ambari")) + + +def _check_host_registration(client, num_hosts): + hosts = client.get_registered_hosts() + return len(hosts) == num_hosts diff --git a/sahara/plugins/ambari/plugin.py b/sahara/plugins/ambari/plugin.py new file mode 100644 index 00000000..8f57a30e --- /dev/null +++ b/sahara/plugins/ambari/plugin.py @@ -0,0 +1,105 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from sahara import conductor +from sahara import context +from sahara.i18n import _ +from sahara.plugins.ambari import common as p_common +from sahara.plugins.ambari import deploy +from sahara.plugins.ambari import validation +from sahara.plugins import provisioning as p +from sahara.plugins import utils as plugin_utils + + +conductor = conductor.API + + +class AmbariPluginProvider(p.ProvisioningPluginBase): + + def get_title(self): + return "HDP Plugin" + + def get_description(self): + return _("HDP plugin with Ambari") + + def get_versions(self): + return ["2.3", "2.2"] + + def get_node_processes(self, hadoop_version): + return { + "Ambari": [p_common.AMBARI_SERVER] + } + + def get_configs(self, hadoop_version): + return [] + + def configure_cluster(self, cluster): + deploy.setup_ambari(cluster) + deploy.setup_agents(cluster) + deploy.wait_ambari_accessible(cluster) + deploy.update_default_ambari_password(cluster) + cluster = conductor.cluster_get(context.ctx(), cluster.id) + deploy.wait_host_registration(cluster) + + def start_cluster(self, cluster): + self._set_cluster_info(cluster) + + def _set_cluster_info(self, cluster): + ambari_ip = plugin_utils.get_instance( + cluster, p_common.AMBARI_SERVER).management_ip + ambari_port = "8080" + info = { + p_common.AMBARI_SERVER: { + "Web UI": "http://{host}:{port}".format(host=ambari_ip, + port=ambari_port), + "Username": "admin", + "Password": cluster.extra["ambari_password"] + } + } + info.update(cluster.info.to_dict()) + ctx = context.ctx() + conductor.cluster_update(ctx, cluster, {"info": info}) + cluster = conductor.cluster_get(ctx, cluster.id) + + def validate(self, cluster): + validation.validate_creation(cluster.id) + + def scale_cluster(self, cluster, instances): + pass + + def decommission_nodes(self, cluster, instances): + pass + + def validate_scaling(self, cluster, existing, additional): + pass + + def get_edp_engine(self, cluster, job_type): + pass + + def get_edp_job_types(self, versions=None): + pass + + def get_edp_config_hints(self, job_type, version): + pass + + def get_open_ports(self, node_group): + ports_map = { + p_common.AMBARI_SERVER: [8080] + } + ports = [] + for service in node_group.node_processes: + ports.extend(ports_map.get(service, [])) + return ports diff --git a/sahara/plugins/ambari/validation.py b/sahara/plugins/ambari/validation.py new file mode 100644 index 00000000..8ad7bb6f --- /dev/null +++ b/sahara/plugins/ambari/validation.py @@ -0,0 +1,36 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from sahara import conductor +from sahara import context +from sahara.plugins.ambari import common +from sahara.plugins import exceptions as ex +from sahara.plugins import utils + + +conductor = conductor.API + + +def validate_creation(cluster_id): + ctx = context.ctx() + cluster = conductor.cluster_get(ctx, cluster_id) + _check_ambari(cluster) + + +def _check_ambari(cluster): + count = utils.get_instances_count(cluster, common.AMBARI_SERVER) + if count != 1: + raise ex.InvalidComponentCountException(common.AMBARI_SERVER, 1, count) diff --git a/sahara/tests/unit/plugins/ambari/__init__.py b/sahara/tests/unit/plugins/ambari/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sahara/tests/unit/plugins/ambari/test_client.py b/sahara/tests/unit/plugins/ambari/test_client.py new file mode 100644 index 00000000..f3edf6d8 --- /dev/null +++ b/sahara/tests/unit/plugins/ambari/test_client.py @@ -0,0 +1,145 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import mock +from oslo_serialization import jsonutils + +from sahara.plugins.ambari import client as ambari_client +from sahara.tests.unit import base + + +class AmbariClientTestCase(base.SaharaTestCase): + def setUp(self): + super(AmbariClientTestCase, self).setUp() + + self.http_client = mock.Mock() + self.http_client.get = mock.Mock() + self.http_client.post = mock.Mock() + self.http_client.put = mock.Mock() + self.http_client.delete = mock.Mock() + + self.headers = {"X-Requested-By": "sahara"} + + self.remote = mock.Mock() + self.remote.get_http_client.return_value = self.http_client + + self.instance = mock.Mock() + self.instance.remote.return_value = self.remote + self.instance.management_ip = "1.2.3.4" + + def test_init_client_default(self): + client = ambari_client.AmbariClient(self.instance) + self.assertEqual(self.http_client, client._http_client) + self.assertEqual("http://1.2.3.4:8080/api/v1", client._base_url) + self.assertEqual("admin", client._auth.username) + self.assertEqual("admin", client._auth.password) + self.remote.get_http_client.assert_called_with("8080") + + def test_init_client_manual(self): + client = ambari_client.AmbariClient(self.instance, port="1234", + username="user", password="pass") + self.assertEqual("http://1.2.3.4:1234/api/v1", client._base_url) + self.assertEqual("user", client._auth.username) + self.assertEqual("pass", client._auth.password) + self.remote.get_http_client.assert_called_with("1234") + + def test_close_http_session(self): + with ambari_client.AmbariClient(self.instance): + pass + self.remote.close_http_session.assert_called_with("8080") + + def test_get_method(self): + client = ambari_client.AmbariClient(self.instance) + client.get("http://spam") + self.http_client.get.assert_called_with( + "http://spam", verify=False, auth=client._auth, + headers=self.headers) + + def test_post_method(self): + client = ambari_client.AmbariClient(self.instance) + client.post("http://spam", data="data") + self.http_client.post.assert_called_with( + "http://spam", data="data", verify=False, auth=client._auth, + headers=self.headers) + + def test_put_method(self): + client = ambari_client.AmbariClient(self.instance) + client.put("http://spam", data="data") + self.http_client.put.assert_called_with( + "http://spam", data="data", verify=False, auth=client._auth, + headers=self.headers) + + def test_delete_method(self): + client = ambari_client.AmbariClient(self.instance) + client.delete("http://spam") + self.http_client.delete.assert_called_with( + "http://spam", verify=False, auth=client._auth, + headers=self.headers) + + def test_get_registered_hosts(self): + client = ambari_client.AmbariClient(self.instance) + resp_data = """{ + "href" : "http://1.2.3.4:8080/api/v1/hosts", + "items" : [ + { + "href" : "http://1.2.3.4:8080/api/v1/hosts/host1", + "Hosts" : { + "host_name" : "host1" + } + }, + { + "href" : "http://1.2.3.4:8080/api/v1/hosts/host2", + "Hosts" : { + "host_name" : "host2" + } + }, + { + "href" : "http://1.2.3.4:8080/api/v1/hosts/host3", + "Hosts" : { + "host_name" : "host3" + } + } + ] +}""" + resp = mock.Mock() + resp.text = resp_data + resp.status_code = 200 + self.http_client.get.return_value = resp + hosts = client.get_registered_hosts() + self.http_client.get.assert_called_with( + "http://1.2.3.4:8080/api/v1/hosts", verify=False, + auth=client._auth, headers=self.headers) + self.assertEqual(3, len(hosts)) + self.assertEqual("host1", hosts[0]["Hosts"]["host_name"]) + self.assertEqual("host2", hosts[1]["Hosts"]["host_name"]) + self.assertEqual("host3", hosts[2]["Hosts"]["host_name"]) + + def test_update_user_password(self): + client = ambari_client.AmbariClient(self.instance) + resp = mock.Mock() + resp.text = "" + resp.status_code = 200 + self.http_client.put.return_value = resp + client.update_user_password("bart", "old_pw", "new_pw") + exp_req = jsonutils.dumps({ + "Users": { + "old_password": "old_pw", + "password": "new_pw" + } + }) + self.http_client.put.assert_called_with( + "http://1.2.3.4:8080/api/v1/users/bart", data=exp_req, + verify=False, auth=client._auth, headers=self.headers) diff --git a/sahara/tests/unit/plugins/ambari/test_open_ports.py b/sahara/tests/unit/plugins/ambari/test_open_ports.py new file mode 100644 index 00000000..882f24e7 --- /dev/null +++ b/sahara/tests/unit/plugins/ambari/test_open_ports.py @@ -0,0 +1,33 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import mock + +from sahara.plugins.ambari import common as p_common +from sahara.plugins.ambari import plugin +from sahara.tests.unit import base + + +class GetPortsTestCase(base.SaharaTestCase): + def setUp(self): + super(GetPortsTestCase, self).setUp() + self.plugin = plugin.AmbariPluginProvider() + + def test_get_ambari_port(self): + ng = mock.Mock() + ng.node_processes = [p_common.AMBARI_SERVER] + ports = self.plugin.get_open_ports(ng) + self.assertEqual([8080], ports) diff --git a/sahara/tests/unit/plugins/ambari/test_validation.py b/sahara/tests/unit/plugins/ambari/test_validation.py new file mode 100644 index 00000000..ad6ff0d7 --- /dev/null +++ b/sahara/tests/unit/plugins/ambari/test_validation.py @@ -0,0 +1,55 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import mock + +from sahara.plugins.ambari import common as p_common +from sahara.plugins.ambari import plugin +from sahara.plugins import exceptions +from sahara.tests.unit import base + + +def make_cluster(processes_map): + m = mock.Mock() + ngs = [] + for count, processes in processes_map.items(): + ng = mock.Mock() + ng.count = count + ng.node_processes = processes + ngs.append(ng) + m.node_groups = ngs + return m + + +class AmbariValidationTestCase(base.SaharaTestCase): + def setUp(self): + super(AmbariValidationTestCase, self).setUp() + self.plugin = plugin.AmbariPluginProvider() + + def test_cluster_with_ambari(self): + cluster = make_cluster({1: [p_common.AMBARI_SERVER]}) + with mock.patch("sahara.plugins.ambari.validation.conductor") as p: + p.cluster_get = mock.Mock() + p.cluster_get.return_value = cluster + self.assertIsNone(self.plugin.validate(cluster)) + + def test_cluster_without_ambari(self): + cluster = make_cluster({1: ["spam"]}) + with mock.patch("sahara.plugins.ambari.validation.conductor") as p: + p.cluster_get = mock.Mock() + p.cluster_get.return_value = cluster + self.assertRaises(exceptions.InvalidComponentCountException, + self.plugin.validate, cluster) diff --git a/setup.cfg b/setup.cfg index 33ca7134..94086645 100644 --- a/setup.cfg +++ b/setup.cfg @@ -40,6 +40,7 @@ console_scripts = sahara.cluster.plugins = vanilla = sahara.plugins.vanilla.plugin:VanillaProvider hdp = sahara.plugins.hdp.ambariplugin:AmbariPlugin + ambari = sahara.plugins.ambari.plugin:AmbariPluginProvider mapr = sahara.plugins.mapr.plugin:MapRPlugin cdh = sahara.plugins.cdh.plugin:CDHPluginProvider fake = sahara.plugins.fake.plugin:FakePluginProvider