Add Impala 2.2 to MapR plugin
- Impala node processes can be installed on different nodes - OS precondition is checked for impala. - Impala 2.2 should be supported for 5.1.0 Change-Id: I21bd64b9c529a3ed25296b087f08c793ead44aee Closes-Bug: #1580979
This commit is contained in:
parent
9a1300b1a7
commit
9d6b648ec7
3
releasenotes/notes/add-impala-2.2-c1649599649aff5c.yaml
Normal file
3
releasenotes/notes/add-impala-2.2-c1649599649aff5c.yaml
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- Add impala 2.2 to MapR plugin
|
@ -18,10 +18,10 @@ import sahara.plugins.mapr.domain.node_process as np
|
|||||||
import sahara.plugins.mapr.domain.service as s
|
import sahara.plugins.mapr.domain.service as s
|
||||||
import sahara.plugins.mapr.services.hive.hive as hive
|
import sahara.plugins.mapr.services.hive.hive as hive
|
||||||
import sahara.plugins.mapr.util.commands as cmd
|
import sahara.plugins.mapr.util.commands as cmd
|
||||||
|
import sahara.plugins.mapr.util.maprfs_helper as mfs
|
||||||
import sahara.plugins.mapr.util.validation_utils as vu
|
import sahara.plugins.mapr.util.validation_utils as vu
|
||||||
import sahara.utils.files as files
|
import sahara.utils.files as files
|
||||||
|
|
||||||
|
|
||||||
IMPALA_SERVER = np.NodeProcess(
|
IMPALA_SERVER = np.NodeProcess(
|
||||||
name='impalaserver',
|
name='impalaserver',
|
||||||
ui_name='Impala-Server',
|
ui_name='Impala-Server',
|
||||||
@ -56,6 +56,28 @@ class Impala(s.Service):
|
|||||||
def _get_impala_env_props(self, context):
|
def _get_impala_env_props(self, context):
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
def post_start(self, cluster_context, instances):
|
||||||
|
self._copy_hive_site(cluster_context)
|
||||||
|
|
||||||
|
def _copy_hive_site(self, cluster_context):
|
||||||
|
hive_site_path = self._hive(cluster_context).conf_dir(
|
||||||
|
cluster_context) + "/hive-site.xml"
|
||||||
|
path = self.conf_dir(cluster_context) + "/hive-site.xml"
|
||||||
|
with cluster_context.get_instance(hive.HIVE_METASTORE).remote() as r1:
|
||||||
|
for instance in cluster_context.get_instances(IMPALA_SERVER):
|
||||||
|
with instance.remote() as r2:
|
||||||
|
mfs.exchange(r1, r2, hive_site_path, path, 'mapr')
|
||||||
|
with cluster_context.get_instance(IMPALA_CATALOG).remote() as r3:
|
||||||
|
mfs.exchange(r1, r3, hive_site_path, path, 'mapr')
|
||||||
|
with cluster_context.get_instance(
|
||||||
|
IMPALA_STATE_STORE).remote() as r4:
|
||||||
|
mfs.exchange(r1, r4, hive_site_path, path, 'mapr')
|
||||||
|
|
||||||
|
# hive service instance
|
||||||
|
def _hive(self, context):
|
||||||
|
hive_version = context.get_chosen_service_version('Hive')
|
||||||
|
return context._find_service_instance('Hive', hive_version)
|
||||||
|
|
||||||
def get_config_files(self, cluster_context, configs, instance=None):
|
def get_config_files(self, cluster_context, configs, instance=None):
|
||||||
defaults = 'plugins/mapr/services/impala/resources/impala-env.sh.j2'
|
defaults = 'plugins/mapr/services/impala/resources/impala-env.sh.j2'
|
||||||
|
|
||||||
@ -84,7 +106,6 @@ class ImpalaV141(Impala):
|
|||||||
]
|
]
|
||||||
self._validation_rules = [
|
self._validation_rules = [
|
||||||
vu.depends_on(hive.HiveV013(), self),
|
vu.depends_on(hive.HiveV013(), self),
|
||||||
vu.on_same_node(IMPALA_CATALOG, hive.HIVE_SERVER_2),
|
|
||||||
vu.exactly(1, IMPALA_STATE_STORE),
|
vu.exactly(1, IMPALA_STATE_STORE),
|
||||||
vu.exactly(1, IMPALA_CATALOG),
|
vu.exactly(1, IMPALA_CATALOG),
|
||||||
vu.at_least(1, IMPALA_SERVER),
|
vu.at_least(1, IMPALA_SERVER),
|
||||||
@ -96,3 +117,38 @@ class ImpalaV141(Impala):
|
|||||||
'statestore_host': context.get_instance_ip(IMPALA_STATE_STORE),
|
'statestore_host': context.get_instance_ip(IMPALA_STATE_STORE),
|
||||||
'catalog_host': context.get_instance_ip(IMPALA_CATALOG),
|
'catalog_host': context.get_instance_ip(IMPALA_CATALOG),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class ImpalaV220(Impala):
|
||||||
|
def __init__(self):
|
||||||
|
super(ImpalaV220, self).__init__()
|
||||||
|
self._version = '2.2.0'
|
||||||
|
self._dependencies = [
|
||||||
|
('mapr-hive', hive.HiveV12().version),
|
||||||
|
('mapr-impala', self.version),
|
||||||
|
]
|
||||||
|
self._validation_rules = [
|
||||||
|
vu.depends_on(hive.HiveV12(), self),
|
||||||
|
vu.exactly(1, IMPALA_STATE_STORE),
|
||||||
|
vu.exactly(1, IMPALA_CATALOG),
|
||||||
|
vu.at_least(1, IMPALA_SERVER),
|
||||||
|
vu.required_os('centos', self)
|
||||||
|
]
|
||||||
|
|
||||||
|
def _get_impala_env_props(self, context):
|
||||||
|
return {
|
||||||
|
'impala_version': self.version,
|
||||||
|
'statestore_host': context.get_instance_ip(IMPALA_STATE_STORE),
|
||||||
|
'catalog_host': context.get_instance_ip(IMPALA_CATALOG),
|
||||||
|
}
|
||||||
|
|
||||||
|
def _get_packages(self, cluster_context, node_processes):
|
||||||
|
result = []
|
||||||
|
|
||||||
|
result += self.dependencies
|
||||||
|
result += [(np.package, self.version) for np in node_processes]
|
||||||
|
# gets the latest version
|
||||||
|
hbase_version = cluster_context.get_chosen_service_version('HBase')
|
||||||
|
result += [('mapr-hbase', hbase_version)]
|
||||||
|
|
||||||
|
return result
|
||||||
|
@ -19,6 +19,7 @@ from sahara.conductor import resource as r
|
|||||||
import sahara.exceptions as ex
|
import sahara.exceptions as ex
|
||||||
from sahara.i18n import _
|
from sahara.i18n import _
|
||||||
import sahara.plugins.exceptions as e
|
import sahara.plugins.exceptions as e
|
||||||
|
from sahara.service.api import v10 as api
|
||||||
import sahara.utils.openstack.nova as nova
|
import sahara.utils.openstack.nova as nova
|
||||||
|
|
||||||
|
|
||||||
@ -105,6 +106,18 @@ class NoVolumesException(ex.SaharaException):
|
|||||||
self.code = NoVolumesException.ERROR_CODE
|
self.code = NoVolumesException.ERROR_CODE
|
||||||
|
|
||||||
|
|
||||||
|
class NotRequiredImageException(ex.SaharaException):
|
||||||
|
MESSAGE = _('Service %(service)s requires %(os)s OS.'
|
||||||
|
' Use %(os)s image and add "%(os)s" tag to it.')
|
||||||
|
ERROR_CODE = "INVALID_IMAGE"
|
||||||
|
|
||||||
|
def __init__(self, service, os):
|
||||||
|
super(NotRequiredImageException, self).__init__()
|
||||||
|
self.message = NotRequiredImageException.MESSAGE % {'service': service,
|
||||||
|
'os': os}
|
||||||
|
self.code = NotRequiredImageException.ERROR_CODE
|
||||||
|
|
||||||
|
|
||||||
def at_least(count, component):
|
def at_least(count, component):
|
||||||
def validate(cluster_context, component, count):
|
def validate(cluster_context, component, count):
|
||||||
actual_count = cluster_context.get_instances_count(component)
|
actual_count = cluster_context.get_instances_count(component)
|
||||||
@ -193,6 +206,21 @@ def assert_present(service, cluster_context):
|
|||||||
raise e.RequiredServiceMissingException(service.ui_name)
|
raise e.RequiredServiceMissingException(service.ui_name)
|
||||||
|
|
||||||
|
|
||||||
|
def required_os(os, required_by):
|
||||||
|
def validate(cluster_context, os, required_by):
|
||||||
|
for ng in cluster_context.get_node_groups():
|
||||||
|
nps = ng.node_processes
|
||||||
|
for node_process in required_by.node_processes:
|
||||||
|
if node_process.ui_name in nps:
|
||||||
|
image_id = (ng.image_id or
|
||||||
|
cluster_context.cluster.default_image_id)
|
||||||
|
if not image_has_tag(image_id, os):
|
||||||
|
raise NotRequiredImageException(required_by.ui_name,
|
||||||
|
os)
|
||||||
|
|
||||||
|
return ft.partial(validate, os=os, required_by=required_by)
|
||||||
|
|
||||||
|
|
||||||
def create_fake_cluster(cluster, existing, additional):
|
def create_fake_cluster(cluster, existing, additional):
|
||||||
counts = existing.copy()
|
counts = existing.copy()
|
||||||
counts.update(additional)
|
counts.update(additional)
|
||||||
@ -227,3 +255,8 @@ def has_volumes():
|
|||||||
raise NoVolumesException(node_group.name)
|
raise NoVolumesException(node_group.name)
|
||||||
|
|
||||||
return validate
|
return validate
|
||||||
|
|
||||||
|
|
||||||
|
def image_has_tag(image_id, tag):
|
||||||
|
image = api.get_registered_image(image_id)
|
||||||
|
return tag in image.tags
|
||||||
|
@ -20,6 +20,7 @@ from sahara.plugins.mapr.services.hbase import hbase
|
|||||||
from sahara.plugins.mapr.services.hive import hive
|
from sahara.plugins.mapr.services.hive import hive
|
||||||
from sahara.plugins.mapr.services.httpfs import httpfs
|
from sahara.plugins.mapr.services.httpfs import httpfs
|
||||||
from sahara.plugins.mapr.services.hue import hue
|
from sahara.plugins.mapr.services.hue import hue
|
||||||
|
from sahara.plugins.mapr.services.impala import impala
|
||||||
from sahara.plugins.mapr.services.mahout import mahout
|
from sahara.plugins.mapr.services.mahout import mahout
|
||||||
from sahara.plugins.mapr.services.management import management as mng
|
from sahara.plugins.mapr.services.management import management as mng
|
||||||
from sahara.plugins.mapr.services.maprfs import maprfs
|
from sahara.plugins.mapr.services.maprfs import maprfs
|
||||||
@ -48,6 +49,7 @@ class VersionHandler(bvh.BaseVersionHandler):
|
|||||||
self._services = [
|
self._services = [
|
||||||
hive.HiveV013(),
|
hive.HiveV013(),
|
||||||
hive.HiveV12(),
|
hive.HiveV12(),
|
||||||
|
impala.ImpalaV220(),
|
||||||
pig.PigV014(),
|
pig.PigV014(),
|
||||||
pig.PigV015(),
|
pig.PigV015(),
|
||||||
flume.FlumeV16(),
|
flume.FlumeV16(),
|
||||||
|
Loading…
Reference in New Issue
Block a user