Fix management IPs usage

Fixed improper usage of management IPs.
Almost all cluster configuration relies on internal IPs

Closes-Bug: #1449506
Closes-Bug: #1439461

Change-Id: Ib89844ced79e9ce15c8f6fd186e81b1df8c5fd94
This commit is contained in:
Artem Osadchyi 2015-04-23 16:45:15 +03:00
parent b3128829e8
commit d1d8b79abc
13 changed files with 58 additions and 45 deletions

View File

@ -206,13 +206,14 @@ class BaseConfigurer(ac.AbstractConfigurer):
def _update_cluster_info(self, cluster_context):
LOG.debug('Updating UI information.')
info = dict()
info = {}
for service in cluster_context.cluster_services:
for uri_info in service.ui_info:
title, process, url = uri_info
instance = cluster_context.get_instance(process)
info.update({
title: {
'WebUI': url % cluster_context.get_instance_ip(process)
'WebUI': url % instance.management_ip
}
})
@ -258,7 +259,7 @@ class BaseConfigurer(ac.AbstractConfigurer):
db_specs = dict(mysql.MySQL.METRICS_SPECS._asdict())
db_specs.update({
'host': mysql.MySQL.get_db_instance(cluster_context).fqdn(),
'host': mysql.MySQL.get_db_instance(cluster_context).internal_ip,
'port': mysql.MySQL.MYSQL_SERVER_PORT,
})

View File

@ -217,11 +217,11 @@ class BaseClusterContext(cc.AbstractClusterContext):
return i[0] if i else None
def get_instances_ip(self, node_process):
return [i.management_ip for i in self.get_instances(node_process)]
return [i.internal_ip for i in self.get_instances(node_process)]
def get_instance_ip(self, node_process):
i = self.get_instance(node_process)
return i.management_ip if i else None
return i.internal_ip if i else None
def get_zookeeper_nodes_ip_with_port(self, separator=','):
return separator.join(['%s:%s' % (ip, mng.ZK_CLIENT_PORT)

View File

@ -53,7 +53,7 @@ class BaseNodeManager(s.AbstractNodeManager):
with random.choice(cldb_instances).remote() as cldb_remote:
for instance in instances:
with instance.remote() as r:
command = GET_SERVER_ID_CMD % instance.management_ip
command = GET_SERVER_ID_CMD % instance.internal_ip
ec, out = r.execute_command(command, run_as_root=True)
command = MOVE_NODE_CMD % out.strip()
cldb_remote.execute_command(command, run_as_root=True)
@ -65,7 +65,7 @@ class BaseNodeManager(s.AbstractNodeManager):
with random.choice(cldb_instances).remote() as cldb_remote:
for instance in instances:
args = {
'ip': instance.management_ip,
'ip': instance.internal_ip,
'nodes': instance.fqdn(),
'zookeepers': c_context.get_zookeeper_nodes_ip_with_port(),
}
@ -109,11 +109,11 @@ class BaseNodeManager(s.AbstractNodeManager):
ips = [n['ip'] for n in resp['data']]
retry_count += 1
for i in instances:
if (i.management_ip not in ips
if (i.internal_ip not in ips
and retry_count > DEFAULT_RETRY_COUNT):
raise ex.HadoopProvisionError(_(
"Node failed to connect to CLDB: %s") %
i.management_ip)
i.internal_ip)
break
else:
context.sleep(DELAY)
@ -161,7 +161,7 @@ class BaseNodeManager(s.AbstractNodeManager):
cmd = cmd % args
LOG.debug(
'Executing "{command}" on node={ip}'.format(
command=cmd, ip=instance.management_ip))
command=cmd, ip=instance.internal_ip))
r.execute_command(cmd, run_as_root=True)
def _start_service(self, instance, service):

View File

@ -59,7 +59,7 @@ class NodeProcess(object):
self.execute_action(instances, Action.STOP)
def execute_action(self, instances, action):
nodes = ','.join(map(lambda i: i.management_ip, instances))
nodes = ','.join(map(lambda i: i.internal_ip, instances))
args = {'service': self.name, 'action': action.name, 'nodes': nodes}
command = WARDEN_MANAGED_CMD % args
with instances[0].remote() as r:
@ -85,7 +85,7 @@ class NodeProcess(object):
util.execute_on_instances(instances, poll_status)
def status(self, instance):
command = 'maprcli service list -node %s -json' % instance.fqdn()
command = 'maprcli service list -node %s -json' % instance.internal_ip
with instance.remote() as remote:
ec, out = remote.execute_command(util._run_as('mapr', command))
node_processes = json.loads(out)['data']

View File

@ -85,7 +85,7 @@ class Hive(s.Service):
jdbc_uri = ('jdbc:mysql://%(db_host)s:%(db_port)s/%(db_name)s?'
'createDatabaseIfNotExist=true')
jdbc_args = {
'db_host': mysql.MySQL.get_db_instance(context).fqdn(),
'db_host': mysql.MySQL.get_db_instance(context).internal_ip,
'db_port': mysql.MySQL.MYSQL_SERVER_PORT,
'db_name': mysql.MySQL.METASTORE_SPECS.db_name,
}

View File

@ -87,7 +87,7 @@ class Hue(s.Service):
rdbms_specs = mysql.MySQL.RDBMS_SPECS
result = {
'db_host': db_instance.fqdn(),
'db_host': db_instance.internal_ip,
'hue_name': hue_specs.db_name,
'hue_user': hue_specs.user,
'hue_password': hue_specs.password,
@ -111,7 +111,7 @@ class Hue(s.Service):
if hive_host:
hive_service = context.get_service(hive.HIVE_METASTORE)
result.update({
'hive_host': hive_host.management_ip,
'hive_host': hive_host.internal_ip,
'hive_version': hive_service.version,
'hive_conf_dir': hive_service.conf_dir(context),
})

View File

@ -128,7 +128,7 @@ class MySQL(s.Service):
args = {
'user': MySQL.METASTORE_SPECS.user,
'password': MySQL.METASTORE_SPECS.password,
'host': instance.management_ip,
'host': instance.internal_ip,
'path': script.remote_path
}
cmd = 'mysql -h{host} -u{user} -p{password} < {path}'
@ -197,7 +197,7 @@ class MySQL(s.Service):
@staticmethod
def _grant_access(instance, specs, instances):
f_name = 'grant_access_%s.sql' % specs.db_name
ips = [i.management_ip for i in instances]
ips = [i.internal_ip for i in instances]
user_hosts = MySQL.get_user_hosts(instance, specs.user)
script = MySQL._create_script_obj(f_name, 'grant_access.sql',
hosts=set(ips)-set(user_hosts),

View File

@ -74,7 +74,7 @@ class Oozie(s.Service):
jdbc_uri = ('jdbc:mysql://%(db_host)s:%(db_port)s/%(db_name)s?'
'createDatabaseIfNotExist=true')
jdbc_args = {
'db_host': mysql.MySQL.get_db_instance(context).fqdn(),
'db_host': mysql.MySQL.get_db_instance(context).internal_ip,
'db_port': mysql.MySQL.MYSQL_SERVER_PORT,
'db_name': mysql.MySQL.OOZIE_SPECS.db_name,
}

View File

@ -33,7 +33,7 @@ class SparkMaster(np.NodeProcess):
_submit_port = SPARK_MASTER_PORT
def submit_url(self, cluster_context):
host = cluster_context.get_instance(self).fqdn()
host = cluster_context.get_instance(self).internal_ip
args = {'host': host, 'port': self.submit_port(cluster_context)}
return 'spark://%(host)s:%(port)s' % args
@ -158,7 +158,7 @@ class Spark(s.Service):
def _generate_slaves_file(self, cluster_context):
slaves = cluster_context.get_instances(SPARK_SLAVE)
return '\n'.join(map(lambda i: i.fqdn(), slaves))
return '\n'.join(map(lambda i: i.internal_ip, slaves))
def _create_hadoop_spark_dirs(self, cluster_context):
path = '/apps/spark'

View File

@ -94,7 +94,8 @@ def run_script(instance, script, run_as=None, *args, **kwargs):
script = files.get_file_text(script) % kwargs
r.write_file_to(path, script, run_as_root=(run_as == 'root'))
r.execute_command(_run_as(run_as, 'chmod +x %s' % path))
r.execute_command(_run_as(run_as, '%s %s' % (path, ' '.join(args))))
r.execute_command(_run_as(run_as, '%s %s' % (path, ' '.join(args))),
timeout=3600)
# FIXME(aosadchyi): reuse existing remote
remove(instance, path, run_as=run_as)

View File

@ -47,7 +47,7 @@ class Context(bc.BaseClusterContext):
@property
def resource_manager_uri(self):
if not self._resource_manager_uri:
ip = self.get_instance(yarn.RESOURCE_MANAGER).management_ip
ip = self.get_instance(yarn.RESOURCE_MANAGER).internal_ip
self._resource_manager_uri = '%s:8032' % ip
return self._resource_manager_uri

View File

@ -48,7 +48,7 @@ class Context(bc.BaseClusterContext):
def resource_manager_uri(self):
# FIXME(aosadchyi): Wait for RM HA to work properly
if not self._resource_manager_uri:
ip = self.get_instance(yarn.RESOURCE_MANAGER).management_ip
ip = self.get_instance(yarn.RESOURCE_MANAGER).internal_ip
self._resource_manager_uri = '%s:8032' % ip
return self._resource_manager_uri

View File

@ -28,13 +28,18 @@ from sahara.tests.unit import base as b
from sahara.tests.unit import testutils as tu
MANAGEMENT_IP = '1.1.1.1'
INTERNAL_IP = '1.1.1.2'
class TestClusterContext(b.SaharaTestCase):
def __init__(self, *args, **kwds):
super(TestClusterContext, self).__init__(*args, **kwds)
self.fake_np = np.NodeProcess('fake', 'foo', 'bar')
def _get_context(self):
i1 = tu.make_inst_dict('id_1', 'instance_1', '1.1.1.1')
i1 = tu.make_inst_dict('id_1', 'instance_1', MANAGEMENT_IP)
i1['internal_ip'] = INTERNAL_IP
master_proc = [
yarn.RESOURCE_MANAGER.ui_name,
yarn.NODE_MANAGER.ui_name,
@ -65,7 +70,8 @@ class TestClusterContext(b.SaharaTestCase):
def test_get_oozie_server_uri(self):
ctx = self._get_context()
self.assertEqual('http://1.1.1.1:11000/oozie', ctx.oozie_server_uri)
expected = 'http://%s:11000/oozie' % MANAGEMENT_IP
self.assertEqual(expected, ctx.oozie_server_uri)
def test_oozie_server(self):
ctx = self._get_context()
@ -74,7 +80,8 @@ class TestClusterContext(b.SaharaTestCase):
def test_oozie_http(self):
ctx = self._get_context()
self.assertEqual('1.1.1.1:11000', ctx.oozie_http)
expected = '%s:11000' % MANAGEMENT_IP
self.assertEqual(expected, ctx.oozie_http)
def test_configure_sh(self):
ctx = self._get_context()
@ -83,10 +90,10 @@ class TestClusterContext(b.SaharaTestCase):
r'(-no-autostart)\s+(-f)\s+(-RM (\S+))\s(-HS (\S+))')
self.assertRegex(conf_sh, pattern)
self.assertIn('/opt/mapr/server/configure.sh', conf_sh)
self.assertIn('-C 1.1.1.1', conf_sh)
self.assertIn('-Z 1.1.1.1', conf_sh)
self.assertIn('-RM 1.1.1.1', conf_sh)
self.assertIn('-HS 1.1.1.1', conf_sh)
self.assertIn('-C %s' % INTERNAL_IP, conf_sh)
self.assertIn('-Z %s' % INTERNAL_IP, conf_sh)
self.assertIn('-RM %s' % INTERNAL_IP, conf_sh)
self.assertIn('-HS %s' % INTERNAL_IP, conf_sh)
self.assertIn('-no-autostart', conf_sh)
self.assertIn('-N ' + ctx.cluster.name, conf_sh)
@ -124,29 +131,33 @@ class TestClusterContext(b.SaharaTestCase):
ctx = self._get_context()
ip_list_1 = ctx.get_instances_ip(yarn.RESOURCE_MANAGER)
self.assertEqual(1, len(ip_list_1))
self.assertIn('1.1.1.1', ip_list_1)
self.assertIn(INTERNAL_IP, ip_list_1)
ip_list_2 = ctx.get_instances_ip(yarn.RESOURCE_MANAGER.ui_name)
self.assertEqual(1, len(ip_list_2))
self.assertIn('1.1.1.1', ip_list_2)
self.assertIn(INTERNAL_IP, ip_list_2)
empty_list = ctx.get_instances_ip(self.fake_np)
self.assertEqual(0, len(empty_list))
def test_get_instance_ip(self):
ctx = self._get_context()
ip_1 = ctx.get_instance_ip(yarn.RESOURCE_MANAGER)
self.assertEqual('1.1.1.1', ip_1)
self.assertEqual(INTERNAL_IP, ip_1)
ip_2 = ctx.get_instance_ip(yarn.RESOURCE_MANAGER.ui_name)
self.assertEqual('1.1.1.1', ip_2)
self.assertEqual(INTERNAL_IP, ip_2)
none_ip = ctx.get_instance_ip(self.fake_np)
self.assertIsNone(none_ip)
def test_get_zookeeper_nodes_ip_with_port(self):
ctx = self._get_context()
self.assertEqual('1.1.1.1:5181',
ctx.get_zookeeper_nodes_ip_with_port())
expected = '%s:5181' % INTERNAL_IP
actual = ctx.get_zookeeper_nodes_ip_with_port()
self.assertEqual(expected, actual)
management.ZK_CLIENT_PORT = '0000'
self.assertEqual('1.1.1.1:0000',
ctx.get_zookeeper_nodes_ip_with_port())
expected = '%s:0000' % INTERNAL_IP
actual = ctx.get_zookeeper_nodes_ip_with_port()
self.assertEqual(expected, actual)
def test_filter_instances(self):
ctx = self._get_context()
@ -233,32 +244,32 @@ class TestClusterContext(b.SaharaTestCase):
ctx = self._get_context()
cldb_list_1 = ctx.get_cldb_nodes_ip()
self.assertEqual(1, len(cldb_list_1.split(',')))
self.assertIn('1.1.1.1', cldb_list_1)
self.assertIn(INTERNAL_IP, cldb_list_1)
cldb_list_2 = ctx.get_cldb_nodes_ip()
self.assertEqual(1, len(cldb_list_2.split(',')))
self.assertIn('1.1.1.1', cldb_list_2)
self.assertIn(INTERNAL_IP, cldb_list_2)
sep = ':'
cldb_list_3 = ctx.get_cldb_nodes_ip(sep)
self.assertEqual(1, len(cldb_list_3.split(sep)))
self.assertIn('1.1.1.1', cldb_list_3)
self.assertIn(INTERNAL_IP, cldb_list_3)
def test_get_zookeeper_nodes_ip(self):
ctx = self._get_context()
zk_list_1 = ctx.get_zookeeper_nodes_ip()
self.assertEqual(1, len(zk_list_1.split(',')))
self.assertIn('1.1.1.1', zk_list_1)
self.assertIn(INTERNAL_IP, zk_list_1)
zk_list_2 = ctx.get_zookeeper_nodes_ip()
self.assertEqual(1, len(zk_list_2.split(',')))
self.assertIn('1.1.1.1', zk_list_2)
self.assertIn(INTERNAL_IP, zk_list_2)
sep = ':'
zk_list_3 = ctx.get_zookeeper_nodes_ip(sep)
self.assertEqual(1, len(zk_list_3.split(sep)))
self.assertIn('1.1.1.1', zk_list_3)
self.assertIn(INTERNAL_IP, zk_list_3)
def test_get_resourcemanager_ip(self):
ctx = self._get_context()
ip = ctx.get_resourcemanager_ip()
self.assertEqual('1.1.1.1', ip)
self.assertEqual(INTERNAL_IP, ip)
def test_get_historyserver_ip(self):
ctx = self._get_context()