Merge "Add Impala 2.2 to MapR plugin"
This commit is contained in:
commit
4c13275c3b
@ -18,10 +18,10 @@ import sahara.plugins.mapr.domain.node_process as np
|
||||
import sahara.plugins.mapr.domain.service as s
|
||||
import sahara.plugins.mapr.services.hive.hive as hive
|
||||
import sahara.plugins.mapr.util.commands as cmd
|
||||
import sahara.plugins.mapr.util.maprfs_helper as mfs
|
||||
import sahara.plugins.mapr.util.validation_utils as vu
|
||||
import sahara.utils.files as files
|
||||
|
||||
|
||||
IMPALA_SERVER = np.NodeProcess(
|
||||
name='impalaserver',
|
||||
ui_name='Impala-Server',
|
||||
@ -56,6 +56,28 @@ class Impala(s.Service):
|
||||
def _get_impala_env_props(self, context):
|
||||
return {}
|
||||
|
||||
def post_start(self, cluster_context, instances):
|
||||
self._copy_hive_site(cluster_context)
|
||||
|
||||
def _copy_hive_site(self, cluster_context):
|
||||
hive_site_path = self._hive(cluster_context).conf_dir(
|
||||
cluster_context) + "/hive-site.xml"
|
||||
path = self.conf_dir(cluster_context) + "/hive-site.xml"
|
||||
with cluster_context.get_instance(hive.HIVE_METASTORE).remote() as r1:
|
||||
for instance in cluster_context.get_instances(IMPALA_SERVER):
|
||||
with instance.remote() as r2:
|
||||
mfs.exchange(r1, r2, hive_site_path, path, 'mapr')
|
||||
with cluster_context.get_instance(IMPALA_CATALOG).remote() as r3:
|
||||
mfs.exchange(r1, r3, hive_site_path, path, 'mapr')
|
||||
with cluster_context.get_instance(
|
||||
IMPALA_STATE_STORE).remote() as r4:
|
||||
mfs.exchange(r1, r4, hive_site_path, path, 'mapr')
|
||||
|
||||
# hive service instance
|
||||
def _hive(self, context):
|
||||
hive_version = context.get_chosen_service_version('Hive')
|
||||
return context._find_service_instance('Hive', hive_version)
|
||||
|
||||
def get_config_files(self, cluster_context, configs, instance=None):
|
||||
defaults = 'plugins/mapr/services/impala/resources/impala-env.sh.j2'
|
||||
|
||||
@ -84,7 +106,6 @@ class ImpalaV141(Impala):
|
||||
]
|
||||
self._validation_rules = [
|
||||
vu.depends_on(hive.HiveV013(), self),
|
||||
vu.on_same_node(IMPALA_CATALOG, hive.HIVE_SERVER_2),
|
||||
vu.exactly(1, IMPALA_STATE_STORE),
|
||||
vu.exactly(1, IMPALA_CATALOG),
|
||||
vu.at_least(1, IMPALA_SERVER),
|
||||
@ -96,3 +117,38 @@ class ImpalaV141(Impala):
|
||||
'statestore_host': context.get_instance_ip(IMPALA_STATE_STORE),
|
||||
'catalog_host': context.get_instance_ip(IMPALA_CATALOG),
|
||||
}
|
||||
|
||||
|
||||
class ImpalaV220(Impala):
|
||||
def __init__(self):
|
||||
super(ImpalaV220, self).__init__()
|
||||
self._version = '2.2.0'
|
||||
self._dependencies = [
|
||||
('mapr-hive', hive.HiveV12().version),
|
||||
('mapr-impala', self.version),
|
||||
]
|
||||
self._validation_rules = [
|
||||
vu.depends_on(hive.HiveV12(), self),
|
||||
vu.exactly(1, IMPALA_STATE_STORE),
|
||||
vu.exactly(1, IMPALA_CATALOG),
|
||||
vu.at_least(1, IMPALA_SERVER),
|
||||
vu.required_os('centos', self)
|
||||
]
|
||||
|
||||
def _get_impala_env_props(self, context):
|
||||
return {
|
||||
'impala_version': self.version,
|
||||
'statestore_host': context.get_instance_ip(IMPALA_STATE_STORE),
|
||||
'catalog_host': context.get_instance_ip(IMPALA_CATALOG),
|
||||
}
|
||||
|
||||
def _get_packages(self, cluster_context, node_processes):
|
||||
result = []
|
||||
|
||||
result += self.dependencies
|
||||
result += [(np.package, self.version) for np in node_processes]
|
||||
# gets the latest version
|
||||
hbase_version = cluster_context.get_chosen_service_version('HBase')
|
||||
result += [('mapr-hbase', hbase_version)]
|
||||
|
||||
return result
|
||||
|
@ -19,6 +19,7 @@ from sahara.conductor import resource as r
|
||||
import sahara.exceptions as ex
|
||||
from sahara.i18n import _
|
||||
import sahara.plugins.exceptions as e
|
||||
from sahara.service.api import v10 as api
|
||||
import sahara.utils.openstack.nova as nova
|
||||
|
||||
|
||||
@ -105,6 +106,18 @@ class NoVolumesException(ex.SaharaException):
|
||||
self.code = NoVolumesException.ERROR_CODE
|
||||
|
||||
|
||||
class NotRequiredImageException(ex.SaharaException):
|
||||
MESSAGE = _('Service %(service)s requires %(os)s OS.'
|
||||
' Use %(os)s image and add "%(os)s" tag to it.')
|
||||
ERROR_CODE = "INVALID_IMAGE"
|
||||
|
||||
def __init__(self, service, os):
|
||||
super(NotRequiredImageException, self).__init__()
|
||||
self.message = NotRequiredImageException.MESSAGE % {'service': service,
|
||||
'os': os}
|
||||
self.code = NotRequiredImageException.ERROR_CODE
|
||||
|
||||
|
||||
def at_least(count, component):
|
||||
def validate(cluster_context, component, count):
|
||||
actual_count = cluster_context.get_instances_count(component)
|
||||
@ -193,6 +206,21 @@ def assert_present(service, cluster_context):
|
||||
raise e.RequiredServiceMissingException(service.ui_name)
|
||||
|
||||
|
||||
def required_os(os, required_by):
|
||||
def validate(cluster_context, os, required_by):
|
||||
for ng in cluster_context.get_node_groups():
|
||||
nps = ng.node_processes
|
||||
for node_process in required_by.node_processes:
|
||||
if node_process.ui_name in nps:
|
||||
image_id = (ng.image_id or
|
||||
cluster_context.cluster.default_image_id)
|
||||
if not image_has_tag(image_id, os):
|
||||
raise NotRequiredImageException(required_by.ui_name,
|
||||
os)
|
||||
|
||||
return ft.partial(validate, os=os, required_by=required_by)
|
||||
|
||||
|
||||
def create_fake_cluster(cluster, existing, additional):
|
||||
counts = existing.copy()
|
||||
counts.update(additional)
|
||||
@ -227,3 +255,8 @@ def has_volumes():
|
||||
raise NoVolumesException(node_group.name)
|
||||
|
||||
return validate
|
||||
|
||||
|
||||
def image_has_tag(image_id, tag):
|
||||
image = api.get_registered_image(image_id)
|
||||
return tag in image.tags
|
||||
|
@ -20,6 +20,7 @@ from sahara.plugins.mapr.services.hbase import hbase
|
||||
from sahara.plugins.mapr.services.hive import hive
|
||||
from sahara.plugins.mapr.services.httpfs import httpfs
|
||||
from sahara.plugins.mapr.services.hue import hue
|
||||
from sahara.plugins.mapr.services.impala import impala
|
||||
from sahara.plugins.mapr.services.mahout import mahout
|
||||
from sahara.plugins.mapr.services.management import management as mng
|
||||
from sahara.plugins.mapr.services.maprfs import maprfs
|
||||
@ -48,6 +49,7 @@ class VersionHandler(bvh.BaseVersionHandler):
|
||||
self._services = [
|
||||
hive.HiveV013(),
|
||||
hive.HiveV12(),
|
||||
impala.ImpalaV220(),
|
||||
pig.PigV014(),
|
||||
pig.PigV015(),
|
||||
flume.FlumeV16(),
|
||||
|
Loading…
x
Reference in New Issue
Block a user