New version of HDP plugin

Current features:
* Set up Ambari server and Ambari agents from pre-built image
* Support Ambari 2.0 / 2.1
* Password generation for management console

partially implements bp: hdp-22-support

Change-Id: I53084a83cccde52654a42911f449cc8b7d769bea
This commit is contained in:
Sergey Reshetnyak 2015-05-14 15:33:00 +03:00
parent 0716c0d990
commit 3869ed88a0
11 changed files with 589 additions and 0 deletions

View File

View File

@ -0,0 +1,82 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_serialization import jsonutils
from requests import auth
class AmbariClient(object):
def __init__(self, instance, port="8080", **kwargs):
kwargs.setdefault("username", "admin")
kwargs.setdefault("password", "admin")
self._port = port
self._base_url = "http://{host}:{port}/api/v1".format(
host=instance.management_ip, port=port)
self._instance = instance
self._http_client = instance.remote().get_http_client(port)
self._headers = {"X-Requested-By": "sahara"}
self._auth = auth.HTTPBasicAuth(kwargs["username"], kwargs["password"])
self._default_client_args = {"verify": False, "auth": self._auth,
"headers": self._headers}
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def close(self):
self._instance.remote().close_http_session(self._port)
def get(self, *args, **kwargs):
kwargs.update(self._default_client_args)
return self._http_client.get(*args, **kwargs)
def post(self, *args, **kwargs):
kwargs.update(self._default_client_args)
return self._http_client.post(*args, **kwargs)
def put(self, *args, **kwargs):
kwargs.update(self._default_client_args)
return self._http_client.put(*args, **kwargs)
def delete(self, *args, **kwargs):
kwargs.update(self._default_client_args)
return self._http_client.delete(*args, **kwargs)
@staticmethod
def check_response(resp):
resp.raise_for_status()
if resp.text:
return jsonutils.loads(resp.text)
def get_registered_hosts(self):
url = self._base_url + "/hosts"
resp = self.get(url)
data = self.check_response(resp)
return data.get("items", [])
def update_user_password(self, user, old_password, new_password):
url = self._base_url + "/users/%s" % user
data = jsonutils.dumps({
"Users": {
"old_password": old_password,
"password": new_password
}
})
resp = self.put(url, data=data)
self.check_response(resp)

View File

@ -0,0 +1,19 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# define service names
AMBARI_SERVER = "Ambari"

View File

@ -0,0 +1,113 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import telnetlib
from oslo_log import log as logging
from oslo_utils import uuidutils
from sahara import conductor
from sahara import context
from sahara.i18n import _
from sahara.plugins.ambari import client as ambari_client
from sahara.plugins.ambari import common as p_common
from sahara.plugins import exceptions as p_exc
from sahara.plugins import utils as plugin_utils
from sahara.utils import poll_utils
LOG = logging.getLogger(__name__)
conductor = conductor.API
def setup_ambari(cluster):
LOG.debug("Set up Ambari management console")
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
with ambari.remote() as r:
sudo = functools.partial(r.execute_command, run_as_root=True)
sudo("ambari-server setup -s -j"
" `cut -f2 -d \"=\" /etc/profile.d/99-java.sh`", timeout=1800)
sudo("service ambari-server start")
LOG.debug("Ambari management console installed")
def setup_agents(cluster):
LOG.debug("Set up Ambari agents")
manager_address = plugin_utils.get_instance(
cluster, p_common.AMBARI_SERVER).fqdn()
with context.ThreadGroup() as tg:
for inst in plugin_utils.get_instances(cluster):
tg.spawn("hwx-agent-setup-%s" % inst.id,
_setup_agent, inst, manager_address)
LOG.debug("Ambari agents has been installed")
def _setup_agent(instance, ambari_address):
with instance.remote() as r:
sudo = functools.partial(r.execute_command, run_as_root=True)
r.replace_remote_string("/etc/ambari-agent/conf/ambari-agent.ini",
"localhost", ambari_address)
sudo("service ambari-agent start")
# for correct installing packages
sudo("yum clean all")
def wait_ambari_accessible(cluster):
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
kwargs = {"host": ambari.management_ip, "port": 8080}
poll_utils.poll(_check_port_accessible, kwargs=kwargs, timeout=300)
def _check_port_accessible(host, port):
try:
conn = telnetlib.Telnet(host, port)
conn.close()
return True
except IOError:
return False
def update_default_ambari_password(cluster):
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
new_password = uuidutils.generate_uuid()
with ambari_client.AmbariClient(ambari) as client:
client.update_user_password("admin", "admin", new_password)
extra = cluster.extra.to_dict() if cluster.extra else {}
extra["ambari_password"] = new_password
ctx = context.ctx()
conductor.cluster_update(ctx, cluster, {"extra": extra})
cluster = conductor.cluster_get(ctx, cluster.id)
def wait_host_registration(cluster):
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
hosts = plugin_utils.get_instances(cluster)
password = cluster.extra["ambari_password"]
with ambari_client.AmbariClient(ambari, password=password) as client:
kwargs = {"client": client, "num_hosts": len(hosts)}
poll_utils.poll(_check_host_registration, kwargs=kwargs, timeout=600)
registered_hosts = client.get_registered_hosts()
registered_host_names = [h["Hosts"]["host_name"] for h in registered_hosts]
actual_host_names = [h.fqdn() for h in hosts]
if sorted(registered_host_names) != sorted(actual_host_names):
raise p_exc.HadoopProvisionError(
_("Host registration fails in Ambari"))
def _check_host_registration(client, num_hosts):
hosts = client.get_registered_hosts()
return len(hosts) == num_hosts

View File

@ -0,0 +1,105 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara import conductor
from sahara import context
from sahara.i18n import _
from sahara.plugins.ambari import common as p_common
from sahara.plugins.ambari import deploy
from sahara.plugins.ambari import validation
from sahara.plugins import provisioning as p
from sahara.plugins import utils as plugin_utils
conductor = conductor.API
class AmbariPluginProvider(p.ProvisioningPluginBase):
def get_title(self):
return "HDP Plugin"
def get_description(self):
return _("HDP plugin with Ambari")
def get_versions(self):
return ["2.3", "2.2"]
def get_node_processes(self, hadoop_version):
return {
"Ambari": [p_common.AMBARI_SERVER]
}
def get_configs(self, hadoop_version):
return []
def configure_cluster(self, cluster):
deploy.setup_ambari(cluster)
deploy.setup_agents(cluster)
deploy.wait_ambari_accessible(cluster)
deploy.update_default_ambari_password(cluster)
cluster = conductor.cluster_get(context.ctx(), cluster.id)
deploy.wait_host_registration(cluster)
def start_cluster(self, cluster):
self._set_cluster_info(cluster)
def _set_cluster_info(self, cluster):
ambari_ip = plugin_utils.get_instance(
cluster, p_common.AMBARI_SERVER).management_ip
ambari_port = "8080"
info = {
p_common.AMBARI_SERVER: {
"Web UI": "http://{host}:{port}".format(host=ambari_ip,
port=ambari_port),
"Username": "admin",
"Password": cluster.extra["ambari_password"]
}
}
info.update(cluster.info.to_dict())
ctx = context.ctx()
conductor.cluster_update(ctx, cluster, {"info": info})
cluster = conductor.cluster_get(ctx, cluster.id)
def validate(self, cluster):
validation.validate_creation(cluster.id)
def scale_cluster(self, cluster, instances):
pass
def decommission_nodes(self, cluster, instances):
pass
def validate_scaling(self, cluster, existing, additional):
pass
def get_edp_engine(self, cluster, job_type):
pass
def get_edp_job_types(self, versions=None):
pass
def get_edp_config_hints(self, job_type, version):
pass
def get_open_ports(self, node_group):
ports_map = {
p_common.AMBARI_SERVER: [8080]
}
ports = []
for service in node_group.node_processes:
ports.extend(ports_map.get(service, []))
return ports

View File

@ -0,0 +1,36 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara import conductor
from sahara import context
from sahara.plugins.ambari import common
from sahara.plugins import exceptions as ex
from sahara.plugins import utils
conductor = conductor.API
def validate_creation(cluster_id):
ctx = context.ctx()
cluster = conductor.cluster_get(ctx, cluster_id)
_check_ambari(cluster)
def _check_ambari(cluster):
count = utils.get_instances_count(cluster, common.AMBARI_SERVER)
if count != 1:
raise ex.InvalidComponentCountException(common.AMBARI_SERVER, 1, count)

View File

@ -0,0 +1,145 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo_serialization import jsonutils
from sahara.plugins.ambari import client as ambari_client
from sahara.tests.unit import base
class AmbariClientTestCase(base.SaharaTestCase):
def setUp(self):
super(AmbariClientTestCase, self).setUp()
self.http_client = mock.Mock()
self.http_client.get = mock.Mock()
self.http_client.post = mock.Mock()
self.http_client.put = mock.Mock()
self.http_client.delete = mock.Mock()
self.headers = {"X-Requested-By": "sahara"}
self.remote = mock.Mock()
self.remote.get_http_client.return_value = self.http_client
self.instance = mock.Mock()
self.instance.remote.return_value = self.remote
self.instance.management_ip = "1.2.3.4"
def test_init_client_default(self):
client = ambari_client.AmbariClient(self.instance)
self.assertEqual(self.http_client, client._http_client)
self.assertEqual("http://1.2.3.4:8080/api/v1", client._base_url)
self.assertEqual("admin", client._auth.username)
self.assertEqual("admin", client._auth.password)
self.remote.get_http_client.assert_called_with("8080")
def test_init_client_manual(self):
client = ambari_client.AmbariClient(self.instance, port="1234",
username="user", password="pass")
self.assertEqual("http://1.2.3.4:1234/api/v1", client._base_url)
self.assertEqual("user", client._auth.username)
self.assertEqual("pass", client._auth.password)
self.remote.get_http_client.assert_called_with("1234")
def test_close_http_session(self):
with ambari_client.AmbariClient(self.instance):
pass
self.remote.close_http_session.assert_called_with("8080")
def test_get_method(self):
client = ambari_client.AmbariClient(self.instance)
client.get("http://spam")
self.http_client.get.assert_called_with(
"http://spam", verify=False, auth=client._auth,
headers=self.headers)
def test_post_method(self):
client = ambari_client.AmbariClient(self.instance)
client.post("http://spam", data="data")
self.http_client.post.assert_called_with(
"http://spam", data="data", verify=False, auth=client._auth,
headers=self.headers)
def test_put_method(self):
client = ambari_client.AmbariClient(self.instance)
client.put("http://spam", data="data")
self.http_client.put.assert_called_with(
"http://spam", data="data", verify=False, auth=client._auth,
headers=self.headers)
def test_delete_method(self):
client = ambari_client.AmbariClient(self.instance)
client.delete("http://spam")
self.http_client.delete.assert_called_with(
"http://spam", verify=False, auth=client._auth,
headers=self.headers)
def test_get_registered_hosts(self):
client = ambari_client.AmbariClient(self.instance)
resp_data = """{
"href" : "http://1.2.3.4:8080/api/v1/hosts",
"items" : [
{
"href" : "http://1.2.3.4:8080/api/v1/hosts/host1",
"Hosts" : {
"host_name" : "host1"
}
},
{
"href" : "http://1.2.3.4:8080/api/v1/hosts/host2",
"Hosts" : {
"host_name" : "host2"
}
},
{
"href" : "http://1.2.3.4:8080/api/v1/hosts/host3",
"Hosts" : {
"host_name" : "host3"
}
}
]
}"""
resp = mock.Mock()
resp.text = resp_data
resp.status_code = 200
self.http_client.get.return_value = resp
hosts = client.get_registered_hosts()
self.http_client.get.assert_called_with(
"http://1.2.3.4:8080/api/v1/hosts", verify=False,
auth=client._auth, headers=self.headers)
self.assertEqual(3, len(hosts))
self.assertEqual("host1", hosts[0]["Hosts"]["host_name"])
self.assertEqual("host2", hosts[1]["Hosts"]["host_name"])
self.assertEqual("host3", hosts[2]["Hosts"]["host_name"])
def test_update_user_password(self):
client = ambari_client.AmbariClient(self.instance)
resp = mock.Mock()
resp.text = ""
resp.status_code = 200
self.http_client.put.return_value = resp
client.update_user_password("bart", "old_pw", "new_pw")
exp_req = jsonutils.dumps({
"Users": {
"old_password": "old_pw",
"password": "new_pw"
}
})
self.http_client.put.assert_called_with(
"http://1.2.3.4:8080/api/v1/users/bart", data=exp_req,
verify=False, auth=client._auth, headers=self.headers)

View File

@ -0,0 +1,33 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from sahara.plugins.ambari import common as p_common
from sahara.plugins.ambari import plugin
from sahara.tests.unit import base
class GetPortsTestCase(base.SaharaTestCase):
def setUp(self):
super(GetPortsTestCase, self).setUp()
self.plugin = plugin.AmbariPluginProvider()
def test_get_ambari_port(self):
ng = mock.Mock()
ng.node_processes = [p_common.AMBARI_SERVER]
ports = self.plugin.get_open_ports(ng)
self.assertEqual([8080], ports)

View File

@ -0,0 +1,55 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from sahara.plugins.ambari import common as p_common
from sahara.plugins.ambari import plugin
from sahara.plugins import exceptions
from sahara.tests.unit import base
def make_cluster(processes_map):
m = mock.Mock()
ngs = []
for count, processes in processes_map.items():
ng = mock.Mock()
ng.count = count
ng.node_processes = processes
ngs.append(ng)
m.node_groups = ngs
return m
class AmbariValidationTestCase(base.SaharaTestCase):
def setUp(self):
super(AmbariValidationTestCase, self).setUp()
self.plugin = plugin.AmbariPluginProvider()
def test_cluster_with_ambari(self):
cluster = make_cluster({1: [p_common.AMBARI_SERVER]})
with mock.patch("sahara.plugins.ambari.validation.conductor") as p:
p.cluster_get = mock.Mock()
p.cluster_get.return_value = cluster
self.assertIsNone(self.plugin.validate(cluster))
def test_cluster_without_ambari(self):
cluster = make_cluster({1: ["spam"]})
with mock.patch("sahara.plugins.ambari.validation.conductor") as p:
p.cluster_get = mock.Mock()
p.cluster_get.return_value = cluster
self.assertRaises(exceptions.InvalidComponentCountException,
self.plugin.validate, cluster)

View File

@ -40,6 +40,7 @@ console_scripts =
sahara.cluster.plugins =
vanilla = sahara.plugins.vanilla.plugin:VanillaProvider
hdp = sahara.plugins.hdp.ambariplugin:AmbariPlugin
ambari = sahara.plugins.ambari.plugin:AmbariPluginProvider
mapr = sahara.plugins.mapr.plugin:MapRPlugin
cdh = sahara.plugins.cdh.plugin:CDHPluginProvider
fake = sahara.plugins.fake.plugin:FakePluginProvider