diff --git a/sahara/conductor/manager.py b/sahara/conductor/manager.py index 6f53e890..3f52ace6 100644 --- a/sahara/conductor/manager.py +++ b/sahara/conductor/manager.py @@ -44,6 +44,7 @@ NODE_GROUP_DEFAULTS = { "security_groups": None, "auto_security_group": False, "availability_zone": None, + "is_proxy_gateway": False, } INSTANCE_DEFAULTS = { diff --git a/sahara/conductor/objects.py b/sahara/conductor/objects.py index 6ddcfc10..706621ef 100644 --- a/sahara/conductor/objects.py +++ b/sahara/conductor/objects.py @@ -21,6 +21,8 @@ is to hide some necessary magic. Current module describes objects fields via docstrings and contains implementation of helper methods. """ +import random + from oslo.config import cfg from sahara.utils import configs @@ -61,6 +63,22 @@ class Cluster(object): cluster_template - ClusterTemplate object """ + def has_proxy_gateway(self): + for ng in self.node_groups: + if ng.is_proxy_gateway: + return True + + def get_proxy_gateway_node(self): + proxies = [] + for ng in self.node_groups: + if ng.is_proxy_gateway and ng.instances: + proxies += ng.instances + + if proxies: + return random.choice(proxies) + + return None + class NodeGroup(object): """An object representing Node Group. @@ -87,6 +105,8 @@ class NodeGroup(object): availability_zone - name of Nova availability zone where to spawn instances open_ports - List of ports that will be opened if auto_security_group is True + is_proxy_gateway - indicates if nodes from this node group should be used + as proxy to access other cluster nodes count instances - list of Instance objects @@ -185,6 +205,7 @@ class NodeGroupTemplate(object): security_groups auto_security_group availability_zone + is_proxy_gateway """ diff --git a/sahara/db/migration/alembic_migrations/versions/016_is_proxy_gateway.py b/sahara/db/migration/alembic_migrations/versions/016_is_proxy_gateway.py new file mode 100644 index 00000000..3d9cca2d --- /dev/null +++ b/sahara/db/migration/alembic_migrations/versions/016_is_proxy_gateway.py @@ -0,0 +1,44 @@ +# Copyright 2014 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Add is_proxy_gateway + +Revision ID: 016 +Revises: 015 +Create Date: 2014-11-10 12:47:17.871520 + +""" + +# revision identifiers, used by Alembic. +revision = '016' +down_revision = '015' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.add_column('node_group_templates', + sa.Column('is_proxy_gateway', sa.Boolean())) + op.add_column('node_groups', + sa.Column('is_proxy_gateway', sa.Boolean())) + op.add_column('templates_relations', + sa.Column('is_proxy_gateway', sa.Boolean())) + + +def downgrade(): + op.drop_column('templates_relations', 'is_proxy_gateway') + op.drop_column('node_groups', 'is_proxy_gateway') + op.drop_column('node_group_templates', 'is_proxy_gateway') diff --git a/sahara/db/sqlalchemy/models.py b/sahara/db/sqlalchemy/models.py index d2748415..4c215d72 100644 --- a/sahara/db/sqlalchemy/models.py +++ b/sahara/db/sqlalchemy/models.py @@ -123,6 +123,7 @@ class NodeGroup(mb.SaharaBase): auto_security_group = sa.Column(sa.Boolean()) availability_zone = sa.Column(sa.String(255)) open_ports = sa.Column(st.JsonListType()) + is_proxy_gateway = sa.Column(sa.Boolean()) def to_dict(self): d = super(NodeGroup, self).to_dict() @@ -209,6 +210,7 @@ class NodeGroupTemplate(mb.SaharaBase): security_groups = sa.Column(st.JsonListType()) auto_security_group = sa.Column(sa.Boolean()) availability_zone = sa.Column(sa.String(255)) + is_proxy_gateway = sa.Column(sa.Boolean()) class TemplatesRelation(mb.SaharaBase): @@ -244,6 +246,7 @@ class TemplatesRelation(mb.SaharaBase): security_groups = sa.Column(st.JsonListType()) auto_security_group = sa.Column(sa.Boolean()) availability_zone = sa.Column(sa.String(255)) + is_proxy_gateway = sa.Column(sa.Boolean()) # EDP objects: DataSource, Job, Job Execution, JobBinary diff --git a/sahara/service/networks.py b/sahara/service/networks.py index a154aaa1..f460830e 100644 --- a/sahara/service/networks.py +++ b/sahara/service/networks.py @@ -48,7 +48,10 @@ def init_instances_ips(instance): else: management_ip = management_ip or address['addr'] - if not CONF.use_floating_ips: + cluster = instance.node_group.cluster + if (not CONF.use_floating_ips or + (cluster.has_proxy_gateway() and + not instance.node_group.is_proxy_gateway)): management_ip = internal_ip # NOTE(aignatov): Once bug #1262529 is fixed this 'if' block should be diff --git a/sahara/service/validations/base.py b/sahara/service/validations/base.py index 8ab4fef2..71cb26f1 100644 --- a/sahara/service/validations/base.py +++ b/sahara/service/validations/base.py @@ -291,7 +291,9 @@ def check_node_groups_in_cluster_templates(cluster_name, plugin_name, cluster_template_id): c_t = api.get_cluster_template(id=cluster_template_id) n_groups = c_t.to_wrapped_dict()['cluster_template']['node_groups'] - check_network_config(n_groups) + proxy_gateway_used = len([ng for ng in n_groups if + ng.get('is_proxy_gateway', False)]) > 0 + check_network_config(n_groups, proxy_gateway_used) for node_group in n_groups: check_node_group_basic_fields(plugin_name, hadoop_version, node_group) check_cluster_hostnames_lengths(cluster_name, n_groups) @@ -311,10 +313,14 @@ def check_node_group_template_exists(ng_tmpl_id): ng_tmpl_id, _("NodeGroup template with id '%s' not found")) -def check_network_config(node_groups): +def check_network_config(node_groups, proxy_gateway_used=False): if CONF.use_floating_ips and CONF.use_neutron: for ng in node_groups: - if not _get_floating_ip_pool(ng): + require_floating = True + if proxy_gateway_used: + require_floating = ng.get('is_proxy_gateway', False) + + if require_floating and not _get_floating_ip_pool(ng): raise ex.MissingFloatingNetworkException(ng.get('name')) diff --git a/sahara/service/validations/clusters.py b/sahara/service/validations/clusters.py index b34bc4a2..d0b4e0bc 100644 --- a/sahara/service/validations/clusters.py +++ b/sahara/service/validations/clusters.py @@ -81,7 +81,9 @@ def check_cluster_create(data, **kwargs): data['anti_affinity']) if data.get('node_groups'): - b.check_network_config(data['node_groups']) + proxy_gateway_used = len([ng for ng in data['node_groups'] if + ng.get('is_proxy_gateway', False)]) > 0 + b.check_network_config(data['node_groups'], proxy_gateway_used) b.check_cluster_hostnames_lengths(data['name'], data['node_groups']) neutron_net_id = _get_cluster_field(data, 'neutron_management_network') diff --git a/sahara/service/validations/clusters_scaling.py b/sahara/service/validations/clusters_scaling.py index 90ecf0c2..9a3df2e6 100644 --- a/sahara/service/validations/clusters_scaling.py +++ b/sahara/service/validations/clusters_scaling.py @@ -107,6 +107,7 @@ def check_cluster_scaling(data, cluster_id, **kwargs): if data.get("add_node_groups"): b.check_add_node_groups(cluster, data['add_node_groups']) - b.check_network_config(data['add_node_groups']) + b.check_network_config(data['add_node_groups'], + cluster.has_proxy_gateway()) b.check_cluster_hostnames_lengths(cluster.name, data['add_node_groups']) diff --git a/sahara/service/validations/node_group_templates.py b/sahara/service/validations/node_group_templates.py index 415637b4..8ba7ba35 100644 --- a/sahara/service/validations/node_group_templates.py +++ b/sahara/service/validations/node_group_templates.py @@ -87,6 +87,9 @@ NODE_GROUP_TEMPLATE_SCHEMA = { "availability_zone": { "type": "string", }, + "is_proxy_gateway": { + "type": "boolean" + }, }, "additionalProperties": False, "required": [ diff --git a/sahara/tests/unit/conductor/manager/test_clusters.py b/sahara/tests/unit/conductor/manager/test_clusters.py index 5762f34b..bc17362d 100644 --- a/sahara/tests/unit/conductor/manager/test_clusters.py +++ b/sahara/tests/unit/conductor/manager/test_clusters.py @@ -120,6 +120,7 @@ class ClusterTest(test_base.ConductorManagerTestCase): ng.pop("image_username") ng.pop("open_ports") ng.pop("auto_security_group") + ng.pop("is_proxy_gateway") ng.pop("tenant_id") ng.pop("availability_zone") diff --git a/sahara/tests/unit/conductor/manager/test_templates.py b/sahara/tests/unit/conductor/manager/test_templates.py index c596d959..2af63221 100644 --- a/sahara/tests/unit/conductor/manager/test_templates.py +++ b/sahara/tests/unit/conductor/manager/test_templates.py @@ -213,6 +213,7 @@ class ClusterTemplates(test_base.ConductorManagerTestCase): ng.pop("volumes_availability_zone") ng.pop("volume_type") ng.pop("auto_security_group") + ng.pop("is_proxy_gateway") self.assertEqual(SAMPLE_CLT["node_groups"], clt_db_obj["node_groups"]) diff --git a/sahara/tests/unit/db/migration/test_migrations.py b/sahara/tests/unit/db/migration/test_migrations.py index 43394272..18e23afa 100644 --- a/sahara/tests/unit/db/migration/test_migrations.py +++ b/sahara/tests/unit/db/migration/test_migrations.py @@ -434,6 +434,13 @@ class SaharaMigrationsCheckers(object): self.assertColumnCount(engine, 'cluster_events', events_columns) self.assertColumnsExists(engine, 'cluster_events', events_columns) + def _check_016(self, engine, date): + self.assertColumnExists(engine, 'node_group_templates', + 'is_proxy_gateway') + self.assertColumnExists(engine, 'node_groups', 'is_proxy_gateway') + self.assertColumnExists(engine, 'templates_relations', + 'is_proxy_gateway') + class TestMigrationsMySQL(SaharaMigrationsCheckers, base.BaseWalkMigrationTestCase, diff --git a/sahara/tests/unit/service/validation/test_cluster_create_validation.py b/sahara/tests/unit/service/validation/test_cluster_create_validation.py index 5417c97f..9480ace7 100644 --- a/sahara/tests/unit/service/validation/test_cluster_create_validation.py +++ b/sahara/tests/unit/service/validation/test_cluster_create_validation.py @@ -275,6 +275,74 @@ class TestClusterCreateValidation(u.ValidationTestCase): } ) + def test_cluster_create_missing_floating_pool(self): + self.override_config("use_neutron", True) + self._assert_create_object_validation( + data={ + 'name': "testname", + 'plugin_name': "vanilla", + 'hadoop_version': "1.2.1", + 'user_keypair_id': 'test_keypair', + 'default_image_id': '550e8400-e29b-41d4-a716-446655440000', + 'neutron_management_network': 'd9a3bebc-f788-4b81-' + '9a93-aa048022c1ca', + 'node_groups': [ + { + "name": "ng1", + "node_processes": ["namenode"], + "flavor_id": "42", + "count": 100, + 'security_groups': ['group1', 'group2'], + 'floating_ip_pool': + 'd9a3bebc-f788-4b81-9a93-aa048022c1ca' + }, + { + "name": "ng2", + "node_processes": ["datanode"], + "flavor_id": "42", + "count": 100, + 'security_groups': ['group1', 'group2'] + } + ] + }, + bad_req_i=(1, 'MISSING_FLOATING_NETWORK', + "Node Group ng2 is missing 'floating_ip_pool' " + "field") + ) + + def test_cluster_create_with_proxy_gateway(self): + self.override_config("use_neutron", True) + self._assert_create_object_validation( + data={ + 'name': "testname", + 'plugin_name': "vanilla", + 'hadoop_version': "1.2.1", + 'user_keypair_id': 'test_keypair', + 'default_image_id': '550e8400-e29b-41d4-a716-446655440000', + 'neutron_management_network': 'd9a3bebc-f788-4b81-' + '9a93-aa048022c1ca', + 'node_groups': [ + { + "name": "ng1", + "node_processes": ["namenode"], + "flavor_id": "42", + "count": 100, + 'security_groups': ['group1', 'group2'], + 'floating_ip_pool': + 'd9a3bebc-f788-4b81-9a93-aa048022c1ca', + "is_proxy_gateway": True + }, + { + "name": "ng2", + "node_processes": ["datanode"], + "flavor_id": "42", + "count": 100, + 'security_groups': ['group1', 'group2'] + } + ] + } + ) + def test_cluster_create_security_groups_by_ids(self): self.override_config("use_neutron", True) self._assert_create_object_validation( diff --git a/sahara/tests/unit/testutils.py b/sahara/tests/unit/testutils.py index 1625ebb0..de504371 100644 --- a/sahara/tests/unit/testutils.py +++ b/sahara/tests/unit/testutils.py @@ -36,7 +36,7 @@ def make_ng_dict(name, flavor, processes, count, instances=None, **kwargs): 'count': count, 'instances': instances, 'node_configs': {}, 'security_groups': None, 'auto_security_group': False, 'availability_zone': None, 'volumes_availability_zone': None, - 'open_ports': []} + 'open_ports': [], 'is_proxy_gateway': False} dct.update(kwargs) return dct diff --git a/sahara/tests/unit/utils/test_ssh_remote.py b/sahara/tests/unit/utils/test_ssh_remote.py index d358e6f8..7a9cb716 100644 --- a/sahara/tests/unit/utils/test_ssh_remote.py +++ b/sahara/tests/unit/utils/test_ssh_remote.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import shlex + import mock import testtools @@ -32,6 +34,12 @@ class FakeCluster(object): self.management_private_key = priv_key self.neutron_management_network = 'network1' + def has_proxy_gateway(self): + return False + + def get_proxy_gateway_node(self): + return None + class FakeNodeGroup(object): def __init__(self, user, priv_key): @@ -101,7 +109,8 @@ class TestInstanceInteropHelper(base.SaharaTestCase): # Test SSH remote.execute_command('/bin/true') self.run_in_subprocess.assert_any_call( - 42, ssh_remote._connect, ('10.0.0.1', 'user1', 'key1', None)) + 42, ssh_remote._connect, ('10.0.0.1', 'user1', 'key1', + None, None, None)) # Test HTTP remote.get_http_client(8080) self.assertFalse(p_adapter.called) @@ -109,8 +118,9 @@ class TestInstanceInteropHelper(base.SaharaTestCase): # When use_floating_ips=False and use_namespaces=True, a netcat socket # created with 'ip netns exec qrouter-...' should be used to access # instances. + @mock.patch('sahara.utils.ssh_remote._simple_exec_func') @mock.patch('sahara.utils.ssh_remote.ProxiedHTTPAdapter') - def test_use_namespaces(self, p_adapter): + def test_use_namespaces(self, p_adapter, p_simple_exec_func): self.override_config('use_floating_ips', False) self.override_config('use_namespaces', True) @@ -122,17 +132,20 @@ class TestInstanceInteropHelper(base.SaharaTestCase): self.run_in_subprocess.assert_any_call( 42, ssh_remote._connect, ('10.0.0.2', 'user2', 'key2', - 'ip netns exec qrouter-fakerouter nc 10.0.0.2 22')) + 'ip netns exec qrouter-fakerouter nc 10.0.0.2 22', None, None)) # Test HTTP remote.get_http_client(8080) p_adapter.assert_called_once_with( - 'ip netns exec qrouter-fakerouter nc 10.0.0.2 8080', + p_simple_exec_func(), '10.0.0.2', 8080) + p_simple_exec_func.assert_any_call( + shlex.split('ip netns exec qrouter-fakerouter nc 10.0.0.2 8080')) # When proxy_command is set, a user-defined netcat socket should be used to # access instances. + @mock.patch('sahara.utils.ssh_remote._simple_exec_func') @mock.patch('sahara.utils.ssh_remote.ProxiedHTTPAdapter') - def test_proxy_command(self, p_adapter): + def test_proxy_command(self, p_adapter, p_simple_exec_func): self.override_config('proxy_command', 'ssh fakerelay nc {host} {port}') instance = FakeInstance('inst3', '10.0.0.3', 'user3', 'key3') @@ -142,11 +155,14 @@ class TestInstanceInteropHelper(base.SaharaTestCase): remote.execute_command('/bin/true') self.run_in_subprocess.assert_any_call( 42, ssh_remote._connect, - ('10.0.0.3', 'user3', 'key3', 'ssh fakerelay nc 10.0.0.3 22')) + ('10.0.0.3', 'user3', 'key3', 'ssh fakerelay nc 10.0.0.3 22', + None, None)) # Test HTTP remote.get_http_client(8080) p_adapter.assert_called_once_with( - 'ssh fakerelay nc 10.0.0.3 8080', '10.0.0.3', 8080) + p_simple_exec_func(), '10.0.0.3', 8080) + p_simple_exec_func.assert_any_call( + shlex.split('ssh fakerelay nc 10.0.0.3 8080')) def test_proxy_command_bad(self): self.override_config('proxy_command', '{bad_kw} nc {host} {port}') diff --git a/sahara/utils/procutils.py b/sahara/utils/procutils.py index 769d1547..64cfc989 100644 --- a/sahara/utils/procutils.py +++ b/sahara/utils/procutils.py @@ -38,19 +38,20 @@ def start_subprocess(): stderr=subprocess.PIPE) -def run_in_subprocess(proc, func, args=(), kwargs={}): +def run_in_subprocess(proc, func, args=(), kwargs={}, interactive=False): try: pickle.dump(func, proc.stdin) pickle.dump(args, proc.stdin) pickle.dump(kwargs, proc.stdin) proc.stdin.flush() - result = pickle.load(proc.stdout) + if not interactive: + result = pickle.load(proc.stdout) - if 'exception' in result: - raise SubprocessException(result['exception']) + if 'exception' in result: + raise SubprocessException(result['exception']) - return result['output'] + return result['output'] finally: # NOTE(dmitryme): in openstack/common/processutils.py it # is suggested to sleep a little between calls to multiprocessing. diff --git a/sahara/utils/ssh_remote.py b/sahara/utils/ssh_remote.py index c0d1ec69..3d101e85 100644 --- a/sahara/utils/ssh_remote.py +++ b/sahara/utils/ssh_remote.py @@ -33,6 +33,8 @@ implementations which are run in a separate process. import os import shlex +import sys +import threading import time import uuid @@ -64,6 +66,7 @@ CONF = cfg.CONF _ssh = None +_proxy_ssh = None _sessions = {} @@ -73,26 +76,44 @@ INFRA = None _global_remote_semaphore = None -def _connect(host, username, private_key, proxy_command=None): +def _connect(host, username, private_key, proxy_command=None, + gateway_host=None, gateway_image_username=None): global _ssh + global _proxy_ssh LOG.debug('Creating SSH connection') if type(private_key) in [str, unicode]: private_key = crypto.to_paramiko_private_key(private_key) + _ssh = paramiko.SSHClient() _ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) proxy = None if proxy_command: - LOG.debug('creating proxy using command: {0}'.format(proxy_command)) + LOG.debug('creating proxy using command: %s', proxy_command) proxy = paramiko.ProxyCommand(proxy_command) + if gateway_host: + _proxy_ssh = paramiko.SSHClient() + _proxy_ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + LOG.debug('connecting to proxy gateway at: %s', gateway_host) + _proxy_ssh.connect(gateway_host, username=gateway_image_username, + pkey=private_key, sock=proxy) + + proxy = _proxy_ssh.get_transport().open_session() + proxy.exec_command("nc {0} 22".format(host)) + _ssh.connect(host, username=username, pkey=private_key, sock=proxy) def _cleanup(): global _ssh + global _proxy_ssh + _ssh.close() + if _proxy_ssh: + _proxy_ssh.close() def _read_paramimko_stream(recv_func): @@ -137,14 +158,54 @@ def _execute_command(cmd, run_as_root=False, get_stderr=False, return ret_code, stdout -def _get_http_client(host, port, proxy_command=None): +def _execute_command_interactive(cmd, run_as_root=False): + global _ssh + + chan = _ssh.get_transport().open_session() + if run_as_root: + chan.exec_command('sudo bash -c "%s"' % _escape_quotes(cmd)) + else: + chan.exec_command(cmd) + + _proxy_shell(chan) + + _ssh.close() + + +def _proxy_shell(chan): + def readall(): + while True: + d = sys.stdin.read(1) + if not d or chan.exit_status_ready(): + break + chan.send(d) + + reader = threading.Thread(target=readall) + reader.start() + + while True: + data = chan.recv(256) + if not data or chan.exit_status_ready(): + break + sys.stdout.write(data) + sys.stdout.flush() + + +def _get_http_client(host, port, proxy_command=None, gateway_host=None, + gateway_username=None, gateway_private_key=None): global _sessions _http_session = _sessions.get((host, port), None) LOG.debug('cached HTTP session for {0}:{1} is {2}'.format(host, port, _http_session)) if not _http_session: - if proxy_command: + if gateway_host: + _http_session = _get_proxy_gateway_http_session( + gateway_host, gateway_username, + gateway_private_key, host, port, proxy_command) + LOG.debug('created ssh proxied HTTP session for {0}:{1}' + .format(host, port)) + elif proxy_command: # can return a new session here because it actually uses # the same adapter (and same connection pools) for a given # host and port tuple @@ -301,20 +362,64 @@ def _release_remote_semaphore(): def _get_proxied_http_session(proxy_command, host, port=None): session = requests.Session() - adapter = ProxiedHTTPAdapter(proxy_command, host, port) + + adapter = ProxiedHTTPAdapter( + _simple_exec_func(shlex.split(proxy_command)), host, port) session.mount('http://{0}:{1}'.format(host, adapter.port), adapter) return session -class ProxiedHTTPAdapter(adapters.HTTPAdapter): - port = None - host = None +def _get_proxy_gateway_http_session(gateway_host, gateway_username, + gateway_private_key, host, port=None, + proxy_command=None): + session = requests.Session() + adapter = ProxiedHTTPAdapter( + _proxy_gateway_func(gateway_host, gateway_username, + gateway_private_key, host, + port, proxy_command), + host, port) + session.mount('http://{0}:{1}'.format(host, port), adapter) - def __init__(self, proxy_command, host, port): + return session + + +def _simple_exec_func(cmd): + def func(): + return e_subprocess.Popen(cmd, + stdin=e_subprocess.PIPE, + stdout=e_subprocess.PIPE, + stderr=e_subprocess.PIPE) + + return func + + +def _proxy_gateway_func(gateway_host, gateway_username, + gateway_private_key, host, + port, proxy_command): + def func(): + proc = procutils.start_subprocess() + + try: + conn_params = (gateway_host, gateway_username, gateway_private_key, + proxy_command, None, None) + procutils.run_in_subprocess(proc, _connect, conn_params) + cmd = "nc {host} {port}".format(host=host, port=port) + procutils.run_in_subprocess( + proc, _execute_command_interactive, (cmd,), interactive=True) + return proc + except Exception: + with excutils.save_and_reraise_exception(): + procutils.shutdown_subprocess(proc, _cleanup) + + return func + + +class ProxiedHTTPAdapter(adapters.HTTPAdapter): + def __init__(self, create_process_func, host, port): super(ProxiedHTTPAdapter, self).__init__() - LOG.debug('HTTP adapter created with cmd {0}'.format(proxy_command)) - self.cmd = shlex.split(proxy_command) + LOG.debug('HTTP adapter created for {0}:{1}'.format(host, port)) + self.create_process_func = create_process_func self.port = port self.host = host @@ -345,22 +450,19 @@ class ProxiedHTTPAdapter(adapters.HTTPAdapter): super(ProxiedHTTPAdapter, self).close() def _connect(self): - LOG.debug('Returning netcat socket with command {0}' - .format(self.cmd)) + LOG.debug('Returning netcat socket for {0}:{1}' + .format(self.host, self.port)) rootwrap_command = CONF.rootwrap_command if CONF.use_rootwrap else '' - return NetcatSocket(self.cmd, rootwrap_command) + return NetcatSocket(self.create_process_func, rootwrap_command) class NetcatSocket(object): def _create_process(self): - self.process = e_subprocess.Popen(self.cmd, - stdin=e_subprocess.PIPE, - stdout=e_subprocess.PIPE, - stderr=e_subprocess.PIPE) + self.process = self.create_process_func() - def __init__(self, cmd, rootwrap_command=None): - self.cmd = cmd + def __init__(self, create_process_func, rootwrap_command=None): + self.create_process_func = create_process_func self.rootwrap_command = rootwrap_command self._create_process() @@ -432,27 +534,29 @@ class InstanceInteropHelper(remote.Remote): finally: _release_remote_semaphore() - def get_neutron_info(self): + def get_neutron_info(self, instance=None): + if not instance: + instance = self.instance neutron_info = h.HashableDict() neutron_info['network'] = ( - self.instance.node_group.cluster.neutron_management_network) + instance.node_group.cluster.neutron_management_network) ctx = context.current() neutron_info['uri'] = base.url_for(ctx.service_catalog, 'network') neutron_info['token'] = ctx.auth_token neutron_info['tenant'] = ctx.tenant_name - neutron_info['host'] = self.instance.management_ip + neutron_info['host'] = instance.management_ip LOG.debug('Returning neutron info: {0}'.format(neutron_info)) return neutron_info - def _build_proxy_command(self, command, host=None, port=None, info=None, - rootwrap_command=None): + def _build_proxy_command(self, command, instance=None, port=None, + info=None, rootwrap_command=None): # Accepted keywords in the proxy command template: # {host}, {port}, {tenant_id}, {network_id}, {router_id} keywords = {} if not info: - info = self.get_neutron_info() + info = self.get_neutron_info(instance) keywords['tenant_id'] = context.current().tenant_id keywords['network_id'] = info['network'] @@ -462,7 +566,7 @@ class InstanceInteropHelper(remote.Remote): info['token'], info['tenant']) keywords['router_id'] = client.get_router() - keywords['host'] = host + keywords['host'] = instance.management_ip keywords['port'] = port try: @@ -476,6 +580,19 @@ class InstanceInteropHelper(remote.Remote): return command def _get_conn_params(self): + host_ng = self.instance.node_group + cluster = host_ng.cluster + access_instance = self.instance + proxy_gateway_node = cluster.get_proxy_gateway_node() + + gateway_host = None + gateway_image_username = None + if proxy_gateway_node and not host_ng.is_proxy_gateway: + access_instance = proxy_gateway_node + gateway_host = proxy_gateway_node.management_ip + ng = proxy_gateway_node.node_group + gateway_image_username = ng.image_username + proxy_command = None if CONF.proxy_command: # Build a session through a user-defined socket @@ -489,13 +606,15 @@ class InstanceInteropHelper(remote.Remote): if proxy_command: rootwrap = CONF.rootwrap_command if CONF.use_rootwrap else '' proxy_command = self._build_proxy_command( - proxy_command, host=self.instance.management_ip, port=22, + proxy_command, instance=access_instance, port=22, info=None, rootwrap_command=rootwrap) return (self.instance.management_ip, - self.instance.node_group.image_username, - self.instance.node_group.cluster.management_private_key, - proxy_command) + host_ng.image_username, + cluster.management_private_key, + proxy_command, + gateway_host, + gateway_image_username) def _run(self, func, *args, **kwargs): proc = procutils.start_subprocess() @@ -529,6 +648,23 @@ class InstanceInteropHelper(remote.Remote): def get_http_client(self, port, info=None): self._log_command('Retrieving HTTP session for {0}:{1}'.format( self.instance.management_ip, port)) + + host_ng = self.instance.node_group + cluster = host_ng.cluster + access_instance = self.instance + access_port = port + proxy_gateway_node = cluster.get_proxy_gateway_node() + + gateway_host = None + gateway_username = None + gateway_private_key = None + if proxy_gateway_node and not host_ng.is_proxy_gateway: + access_instance = proxy_gateway_node + access_port = 22 + gateway_host = proxy_gateway_node.management_ip + gateway_username = proxy_gateway_node.node_group.image_username + gateway_private_key = cluster.management_private_key + proxy_command = None if CONF.proxy_command: # Build a session through a user-defined socket @@ -536,7 +672,7 @@ class InstanceInteropHelper(remote.Remote): elif info or (CONF.use_namespaces and not CONF.use_floating_ips): # need neutron info if not info: - info = self.get_neutron_info() + info = self.get_neutron_info(access_instance) # Build a session through a netcat socket in the Neutron namespace proxy_command = ( 'ip netns exec qrouter-{router_id} nc {host} {port}') @@ -545,11 +681,13 @@ class InstanceInteropHelper(remote.Remote): if proxy_command: rootwrap = CONF.rootwrap_command if CONF.use_rootwrap else '' proxy_command = self._build_proxy_command( - proxy_command, host=self.instance.management_ip, port=port, + proxy_command, instance=access_instance, port=access_port, info=info, rootwrap_command=rootwrap) return _get_http_client(self.instance.management_ip, port, - proxy_command) + proxy_command, gateway_host, + gateway_username, + gateway_private_key) def close_http_session(self, port): global _sessions