diff --git a/doc/source/userdoc/advanced.configuration.guide.rst b/doc/source/userdoc/advanced.configuration.guide.rst index 9c3576e5..873f9017 100644 --- a/doc/source/userdoc/advanced.configuration.guide.rst +++ b/doc/source/userdoc/advanced.configuration.guide.rst @@ -68,13 +68,46 @@ integration see the Sahara documentation sections .. _Sahara extra repository: http://github.com/openstack/sahara-extra -Namespaces and non-root users ------------------------------ +Custom network topologies +------------------------- -In cases where namespaces are being used to access cluster VMs via private IPs, -rootwrap functionality is provided to allow users other than ``root`` access -to the namespace related OS facilities. To use rootwrap the following -configuration property is required to be set: +Sahara accesses VMs at several stages of cluster spawning, both via SSH and +HTTP. When floating IPs are not assigned to instances, Sahara needs to be able +to reach them another way. Floating IPs and network namespaces (see +:ref:`neutron-nova-network`) are automatically used when present. + +When none of these are enabled, the ``proxy_command`` property can be used to +give Sahara a command to access VMs. This command is run on the Sahara host and +must open a netcat socket to the instance destination port. ``{host}`` and +``{port}`` keywords should be used to describe the destination, they will be +translated at runtime. Other keywords can be used: ``{tenant_id}``, +``{network_id}`` and ``{router_id}``. + +For instance, the following configuration property in the Sahara configuration +file would be used if VMs are accessed through a relay machine: + +.. sourcecode:: cfg + + [DEFAULT] + proxy_command='ssh relay-machine-{tenant_id} nc {host} {port}' + +Whereas the following property would be used to access VMs through a custom +network namespace: + +.. sourcecode:: cfg + + [DEFAULT] + proxy_command='ip netns exec ns_for_{network_id} nc {host} {port}' + + +Non-root users +-------------- + +In cases where a proxy command is being used to access cluster VMs (for +instance when using namespaces or when specifying a custom proxy command), +rootwrap functionality is provided to allow users other than ``root`` access to +the needed OS facilities. To use rootwrap the following configuration property +is required to be set: .. sourcecode:: cfg @@ -100,12 +133,13 @@ steps: ``etc/sahara/rootwrap.conf`` to the system specific location, usually ``/etc/sahara``. This file contains the default configuration for rootwrap. -* Copy the provided rootwrap filers file from the local project file +* Copy the provided rootwrap filters file from the local project file ``etc/sahara/rootwrap.d/sahara.filters`` to the location specified in the rootwrap configuration file, usually ``/etc/sahara/rootwrap.d``. This file - contains the filters that will allow the ``sahara`` user to acces the - ``ip netns exec``, ``nc``, and ``kill`` commands through the rootwrap. It - should look similar to the followings: + contains the filters that will allow the ``sahara`` user to access the + ``ip netns exec``, ``nc``, and ``kill`` commands through the rootwrap + (depending on ``proxy_command`` you may need to set additional filters). + It should look similar to the followings: .. sourcecode:: cfg diff --git a/doc/source/userdoc/features.rst b/doc/source/userdoc/features.rst index d9766a36..140e97b6 100644 --- a/doc/source/userdoc/features.rst +++ b/doc/source/userdoc/features.rst @@ -31,6 +31,8 @@ and the size of each volume. All volumes are attached during Cluster creation/scaling operations. +.. _neutron-nova-network: + Neutron and Nova Network support -------------------------------- OpenStack clusters may use Nova or Neutron as a networking service. Sahara diff --git a/etc/sahara/sahara.conf.sample b/etc/sahara/sahara.conf.sample index 2d4ec203..4cd3b82d 100644 --- a/etc/sahara/sahara.conf.sample +++ b/etc/sahara/sahara.conf.sample @@ -521,6 +521,13 @@ # cluster. (integer value) #cluster_remote_threshold=70 +# Proxy command used to connect to instances. If set, this +# command should open a netcat socket, that Sahara will use +# for SSH and HTTP connections. Use {host} and {port} to +# describe the destination. Other available keywords: +# {tenant_id}, {network_id}, {router_id}. (string value) +#proxy_command= + [conductor] diff --git a/sahara/tests/unit/utils/test_neutron.py b/sahara/tests/unit/utils/test_neutron.py index 6333b48e..0d94a4a0 100644 --- a/sahara/tests/unit/utils/test_neutron.py +++ b/sahara/tests/unit/utils/test_neutron.py @@ -19,42 +19,15 @@ import testtools from sahara.utils.openstack import neutron as neutron_client -class NeutronClientRemoteWrapperTest(testtools.TestCase): +class NeutronClientTest(testtools.TestCase): @mock.patch("neutronclient.neutron.client.Client") def test_get_router(self, patched): patched.side_effect = _test_get_neutron_client - neutron = neutron_client.NeutronClientRemoteWrapper( + neutron = neutron_client.NeutronClient( '33b47310-b7a8-4559-bf95-45ba669a448e', None, None, None) self.assertEqual('6c4d4e32-3667-4cd4-84ea-4cc1e98d18be', neutron.get_router()) - @mock.patch("neutronclient.neutron.client.Client") - def test__get_adapters(self, patched): - patched.side_effect = _test_get_neutron_client - neutron = neutron_client.NeutronClientRemoteWrapper( - '33b47310-b7a8-4559-bf95-45ba669a448e', None, None, None) - host1 = '127.0.0.1' - port1 = '9999' - expected_adapters = [ - neutron_client.NeutronHttpAdapter(neutron.get_router(), - host1, port1)] - # this should create an adapter and cache it - actual_adapters = neutron._get_adapters(host=host1, port=port1) - self.assertEqual(len(expected_adapters), len(actual_adapters)) - self.assertEqual(expected_adapters[0].host, - actual_adapters[0].host) - self.assertEqual(expected_adapters[0].port, - actual_adapters[0].port) - - # this should return all adapters for the host, which at this - # time only contains the single adapter - actual_adapters = neutron._get_adapters(host=host1) - self.assertEqual(len(expected_adapters), len(actual_adapters)) - self.assertEqual(expected_adapters[0].host, - actual_adapters[0].host) - self.assertEqual(expected_adapters[0].port, - actual_adapters[0].port) - def _test_get_neutron_client(api_version, *args, **kwargs): return FakeNeutronClient() diff --git a/sahara/tests/unit/utils/test_ssh_remote.py b/sahara/tests/unit/utils/test_ssh_remote.py index 9b514178..03e5b653 100644 --- a/sahara/tests/unit/utils/test_ssh_remote.py +++ b/sahara/tests/unit/utils/test_ssh_remote.py @@ -13,8 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import mock import testtools +from sahara import exceptions as ex +from sahara.tests.unit import base from sahara.utils import ssh_remote @@ -22,3 +25,164 @@ class TestEscapeQuotes(testtools.TestCase): def test_escape_quotes(self): s = ssh_remote._escape_quotes('echo "\\"Hello, world!\\""') self.assertEqual(s, r'echo \"\\\"Hello, world!\\\"\"') + + +class TestHTTPRemoteWrapper(testtools.TestCase): + def test__get_adapters(self): + wrapper = ssh_remote.HTTPRemoteWrapper() + host1 = '127.0.0.1' + port1 = '9999' + proxy_command = ('ip netns exec qrouter-{router_id} nc {host} {port}' + .format(router_id='fake', host=host1, port=port1)) + expected_adapters = [ + ssh_remote.ProxiedHTTPAdapter(proxy_command, host1, port1)] + # this should create an adapter and cache it + actual_adapters = wrapper._get_adapters(proxy_command, + host=host1, port=port1) + self.assertEqual(len(expected_adapters), len(actual_adapters)) + self.assertEqual(expected_adapters[0].host, + actual_adapters[0].host) + self.assertEqual(expected_adapters[0].port, + actual_adapters[0].port) + + # this should return all adapters for the host, which at this + # time only contains the single adapter + actual_adapters = wrapper._get_adapters(proxy_command, host=host1) + self.assertEqual(len(expected_adapters), len(actual_adapters)) + self.assertEqual(expected_adapters[0].host, + actual_adapters[0].host) + self.assertEqual(expected_adapters[0].port, + actual_adapters[0].port) + + +class FakeCluster(object): + def __init__(self, priv_key): + self.management_private_key = priv_key + self.neutron_management_network = 'network1' + + +class FakeNodeGroup(object): + def __init__(self, user, priv_key): + self.image_username = user + self.cluster = FakeCluster(priv_key) + + +class FakeInstance(object): + def __init__(self, inst_name, management_ip, user, priv_key): + self.instance_name = inst_name + self.management_ip = management_ip + self.node_group = FakeNodeGroup(user, priv_key) + + +class TestInstanceInteropHelper(base.SaharaTestCase): + def setUp(self): + super(TestInstanceInteropHelper, self).setUp() + + p_sma = mock.patch('sahara.utils.ssh_remote._acquire_remote_semaphore') + p_sma.start() + p_smr = mock.patch('sahara.utils.ssh_remote._release_remote_semaphore') + p_smr.start() + + p_neutron_router = mock.patch( + 'sahara.utils.openstack.neutron.NeutronClient.get_router', + return_value='fakerouter') + p_neutron_router.start() + + # During tests subprocesses are not used (because _sahara-subprocess + # is not installed in /bin and Mock objects cannot be pickled). + p_start_subp = mock.patch('sahara.utils.procutils.start_subprocess', + return_value=42) + p_start_subp.start() + p_run_subp = mock.patch('sahara.utils.procutils.run_in_subprocess') + self.run_in_subprocess = p_run_subp.start() + p_shut_subp = mock.patch('sahara.utils.procutils.shutdown_subprocess') + p_shut_subp.start() + + self.patchers = [p_sma, p_smr, p_neutron_router, p_start_subp, + p_run_subp, p_shut_subp] + + def tearDown(self): + for patcher in self.patchers: + patcher.stop() + super(TestInstanceInteropHelper, self).tearDown() + + def setup_context(self, username="test_user", tenant_id="tenant_1", + token="test_auth_token", tenant_name='test_tenant', + **kwargs): + service_catalog = '''[ + { "type": "network", + "endpoints": [ { "region": "RegionOne", + "publicURL": "http://localhost/" } ] } ]''' + super(TestInstanceInteropHelper, self).setup_context( + username=username, tenant_id=tenant_id, token=token, + tenant_name=tenant_name, service_catalog=service_catalog, **kwargs) + + # When use_floating_ips=True, no proxy should be used: _connect is called + # with proxy=None and ProxiedHTTPAdapter is not used. + @mock.patch('sahara.utils.ssh_remote.ProxiedHTTPAdapter') + def test_use_floating_ips(self, p_adapter): + self.override_config('use_floating_ips', True) + + instance = FakeInstance('inst1', '10.0.0.1', 'user1', 'key1') + remote = ssh_remote.InstanceInteropHelper(instance) + + # Test SSH + remote.execute_command('/bin/true') + self.run_in_subprocess.assert_any_call( + 42, ssh_remote._connect, ('10.0.0.1', 'user1', 'key1', None)) + # Test HTTP + remote.get_http_client(8080) + self.assertFalse(p_adapter.called) + + # When use_floating_ips=False and use_namespaces=True, a netcat socket + # created with 'ip netns exec qrouter-...' should be used to access + # instances. + @mock.patch('sahara.utils.ssh_remote.ProxiedHTTPAdapter') + def test_use_namespaces(self, p_adapter): + self.override_config('use_floating_ips', False) + self.override_config('use_namespaces', True) + + instance = FakeInstance('inst2', '10.0.0.2', 'user2', 'key2') + remote = ssh_remote.InstanceInteropHelper(instance) + + # Test SSH + remote.execute_command('/bin/true') + self.run_in_subprocess.assert_any_call( + 42, ssh_remote._connect, + ('10.0.0.2', 'user2', 'key2', + 'ip netns exec qrouter-fakerouter nc 10.0.0.2 22')) + # Test HTTP + remote.get_http_client(8080) + p_adapter.assert_called_once_with( + 'ip netns exec qrouter-fakerouter nc 10.0.0.2 8080', + '10.0.0.2', 8080) + + # When proxy_command is set, a user-defined netcat socket should be used to + # access instances. + @mock.patch('sahara.utils.ssh_remote.ProxiedHTTPAdapter') + def test_proxy_command(self, p_adapter): + self.override_config('proxy_command', 'ssh fakerelay nc {host} {port}') + + instance = FakeInstance('inst3', '10.0.0.3', 'user3', 'key3') + remote = ssh_remote.InstanceInteropHelper(instance) + + # Test SSH + remote.execute_command('/bin/true') + self.run_in_subprocess.assert_any_call( + 42, ssh_remote._connect, + ('10.0.0.3', 'user3', 'key3', 'ssh fakerelay nc 10.0.0.3 22')) + # Test HTTP + remote.get_http_client(8080) + p_adapter.assert_called_once_with( + 'ssh fakerelay nc 10.0.0.3 8080', '10.0.0.3', 8080) + + def test_proxy_command_bad(self): + self.override_config('proxy_command', '{bad_kw} nc {host} {port}') + + instance = FakeInstance('inst4', '10.0.0.4', 'user4', 'key4') + remote = ssh_remote.InstanceInteropHelper(instance) + + # Test SSH + self.assertRaises(ex.SystemError, remote.execute_command, '/bin/true') + # Test HTTP + self.assertRaises(ex.SystemError, remote.get_http_client, 8080) diff --git a/sahara/utils/openstack/neutron.py b/sahara/utils/openstack/neutron.py index 896a4147..77f757a8 100644 --- a/sahara/utils/openstack/neutron.py +++ b/sahara/utils/openstack/neutron.py @@ -13,14 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import shlex -from eventlet.green import subprocess as e_subprocess from neutronclient.neutron import client as neutron_cli -import requests -from requests import adapters -import six from sahara import context from sahara import exceptions as ex @@ -44,9 +38,8 @@ def client(): return neutron_cli.Client('2.0', **args) -class NeutronClientRemoteWrapper(object): +class NeutronClient(object): neutron = None - adapters = {} routers = {} def __init__(self, network, uri, token, tenant_name): @@ -57,8 +50,7 @@ class NeutronClientRemoteWrapper(object): self.network = network def get_router(self): - matching_router = NeutronClientRemoteWrapper.routers.get(self.network, - None) + matching_router = NeutronClient.routers.get(self.network, None) if matching_router: LOG.debug('Returning cached qrouter') return matching_router['id'] @@ -71,8 +63,7 @@ class NeutronClientRemoteWrapper(object): if port['network_id'] == self.network), None) if port: matching_router = router - NeutronClientRemoteWrapper.routers[ - self.network] = matching_router + NeutronClient.routers[self.network] = matching_router break if not matching_router: @@ -80,149 +71,3 @@ class NeutronClientRemoteWrapper(object): '%s is not found') % self.network) return matching_router['id'] - - def get_http_session(self, host, port=None, use_rootwrap=False, - rootwrap_command=None, *args, **kwargs): - session = requests.Session() - adapters = self._get_adapters(host, port=port, - use_rootwrap=use_rootwrap, - rootwrap_command=rootwrap_command, - *args, **kwargs) - for adapter in adapters: - session.mount('http://{0}:{1}'.format(host, adapter.port), adapter) - - return session - - def _get_adapters(self, host, port=None, use_rootwrap=False, - rootwrap_command=None, *args, **kwargs): - LOG.debug('Retrieving neutron adapters for {0}:{1}'.format(host, port)) - adapters = [] - if not port: - # returning all registered adapters for given host - adapters = [adapter for adapter in six.itervalues(self.adapters) - if adapter.host == host] - else: - # need to retrieve or create specific adapter - adapter = self.adapters.get((host, port), None) - if not adapter: - LOG.debug('Creating neutron adapter for {0}:{1}' - .format(host, port)) - qrouter = self.get_router() - kwargs['use_rootwrap'] = use_rootwrap - kwargs['rootwrap_command'] = rootwrap_command - adapter = ( - NeutronHttpAdapter(qrouter, host, port, *args, **kwargs)) - self.adapters[(host, port)] = adapter - adapters = [adapter] - - return adapters - - -class NeutronHttpAdapter(adapters.HTTPAdapter): - port = None - host = None - - def __init__(self, qrouter, host, port, use_rootwrap=False, - rootwrap_command=None, *args, **kwargs): - super(NeutronHttpAdapter, self).__init__(*args, **kwargs) - command = '{0} ip netns exec qrouter-{1} nc {2} {3}'.format( - rootwrap_command if use_rootwrap else '', - qrouter, host, port) - LOG.debug('Neutron adapter created with cmd {0}'.format(command)) - self.cmd = shlex.split(command) - self.port = port - self.host = host - self.rootwrap_command = rootwrap_command if use_rootwrap else None - - def get_connection(self, url, proxies=None): - pool_conn = ( - super(NeutronHttpAdapter, self).get_connection(url, proxies)) - if hasattr(pool_conn, '_get_conn'): - http_conn = pool_conn._get_conn() - if http_conn.sock is None: - if hasattr(http_conn, 'connect'): - sock = self._connect() - LOG.debug('HTTP connection {0} getting new ' - 'netcat socket {1}'.format(http_conn, sock)) - http_conn.sock = sock - else: - if hasattr(http_conn.sock, 'is_netcat_socket'): - LOG.debug('pooled http connection has existing ' - 'netcat socket. resetting pipe...') - http_conn.sock.reset() - - pool_conn._put_conn(http_conn) - - return pool_conn - - def close(self): - LOG.debug('Closing neutron adapter for {0}:{1}' - .format(self.host, self.port)) - super(NeutronHttpAdapter, self).close() - - def _connect(self): - LOG.debug('returning netcat socket with command {0}' - .format(self.cmd)) - return NetcatSocket(self.cmd, rootwrap_command=self.rootwrap_command) - - -class NetcatSocket(object): - - def _create_process(self): - self.process = e_subprocess.Popen(self.cmd, - stdin=e_subprocess.PIPE, - stdout=e_subprocess.PIPE, - stderr=e_subprocess.PIPE) - - def __init__(self, cmd, rootwrap_command=None): - self.cmd = cmd - self.rootwrap_command = rootwrap_command - self._create_process() - - def send(self, content): - try: - self.process.stdin.write(content) - self.process.stdin.flush() - except IOError as e: - raise ex.SystemError(e) - return len(content) - - def sendall(self, content): - return self.send(content) - - def makefile(self, mode, *arg): - if mode.startswith('r'): - return self.process.stdout - if mode.startswith('w'): - return self.process.stdin - raise ex.IncorrectStateError(_("Unknown file mode %s") % mode) - - def recv(self, size): - try: - return os.read(self.process.stdout.fileno(), size) - except IOError as e: - raise ex.SystemError(e) - - def _terminate(self): - if self.rootwrap_command: - os.system('{0} kill {1}'.format(self.rootwrap_command, - self.process.pid)) - else: - self.process.terminate() - - def close(self): - LOG.debug('Socket close called') - self._terminate() - - def settimeout(self, timeout): - pass - - def fileno(self): - return self.process.stdin.fileno() - - def is_netcat_socket(self): - return True - - def reset(self): - self._terminate() - self._create_process() diff --git a/sahara/utils/remote.py b/sahara/utils/remote.py index 55e1fe47..5cdc7163 100644 --- a/sahara/utils/remote.py +++ b/sahara/utils/remote.py @@ -32,6 +32,12 @@ ssh_opts = [ cfg.IntOpt('cluster_remote_threshold', default=70, help='The same as global_remote_threshold, but for ' 'a single cluster.'), + cfg.StrOpt('proxy_command', default='', + help='Proxy command used to connect to instances. If set, this ' + 'command should open a netcat socket, that Sahara will use for ' + 'SSH and HTTP connections. Use {host} and {port} to describe ' + 'the destination. Other available keywords: {tenant_id}, ' + '{network_id}, {router_id}.'), ] diff --git a/sahara/utils/ssh_remote.py b/sahara/utils/ssh_remote.py index 60f96ece..73aa1b79 100644 --- a/sahara/utils/ssh_remote.py +++ b/sahara/utils/ssh_remote.py @@ -32,15 +32,19 @@ implementations which are run in a separate process. """ import logging +import os +import shlex import time import uuid +from eventlet.green import subprocess as e_subprocess from eventlet import semaphore from eventlet import timeout as e_timeout from oslo.config import cfg from oslo.utils import excutils import paramiko import requests +from requests import adapters import six from sahara import context @@ -69,33 +73,20 @@ INFRA = None _global_remote_semaphore = None -def _get_proxy(neutron_info): - client = neutron.NeutronClientRemoteWrapper(neutron_info['network'], - neutron_info['uri'], - neutron_info['token'], - neutron_info['tenant']) - qrouter = client.get_router() - proxy = paramiko.ProxyCommand('{0} ip netns exec qrouter-{1} nc {2} 22' - .format(neutron_info['rootwrap_command'] - if neutron_info['use_rootwrap'] - else '', - qrouter, neutron_info['host'])) - - return proxy - - -def _connect(host, username, private_key, neutron_info=None): +def _connect(host, username, private_key, proxy_command=None): global _ssh LOG.debug('Creating SSH connection') - proxy = None if type(private_key) in [str, unicode]: private_key = crypto.to_paramiko_private_key(private_key) _ssh = paramiko.SSHClient() _ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - if neutron_info: - LOG.debug('creating proxy using info: {0}'.format(neutron_info)) - proxy = _get_proxy(neutron_info) + + proxy = None + if proxy_command: + LOG.debug('creating proxy using command: {0}'.format(proxy_command)) + proxy = paramiko.ProxyCommand(proxy_command) + _ssh.connect(host, username=username, pkey=private_key, sock=proxy) @@ -146,28 +137,24 @@ def _execute_command(cmd, run_as_root=False, get_stderr=False, return ret_code, stdout -def _get_http_client(host, port, neutron_info, *args, **kwargs): +def _get_http_client(host, port, proxy_command=None, *args, **kwargs): global _sessions _http_session = _sessions.get((host, port), None) LOG.debug('cached HTTP session for {0}:{1} is {2}'.format(host, port, _http_session)) if not _http_session: - if neutron_info: - neutron_client = neutron.NeutronClientRemoteWrapper( - neutron_info['network'], neutron_info['uri'], - neutron_info['token'], neutron_info['tenant']) + if proxy_command: # can return a new session here because it actually uses # the same adapter (and same connection pools) for a given # host and port tuple - _http_session = neutron_client.get_http_session( - host, port=port, use_rootwrap=CONF.use_rootwrap, - rootwrap_command=CONF.rootwrap_command, *args, **kwargs) - LOG.debug('created neutron based HTTP session for {0}:{1}' + _http_session = HTTPRemoteWrapper().get_http_session( + proxy_command, host, port=port, *args, **kwargs) + LOG.debug('created proxied HTTP session for {0}:{1}' .format(host, port)) else: - # need to cache the session for the non-neutron or neutron - # floating ip cases so that a new session with a new HTTPAdapter + # need to cache the sessions that are not proxied through + # HTTPRemoteWrapper so that a new session with a new HTTPAdapter # and associated pools is not recreated for each HTTP invocation _http_session = requests.Session() LOG.debug('created standard HTTP session for {0}:{1}' @@ -312,6 +299,146 @@ def _release_remote_semaphore(): context.current().remote_semaphore.release() +class HTTPRemoteWrapper(object): + adapters = {} + + def get_http_session(self, proxy_command, host, port=None, + *args, **kwargs): + session = requests.Session() + adapters = self._get_adapters(proxy_command, host, port=port, + *args, **kwargs) + for adapter in adapters: + session.mount('http://{0}:{1}'.format(host, adapter.port), adapter) + + return session + + def _get_adapters(self, proxy_command, host, port=None, *args, **kwargs): + LOG.debug('Retrieving HTTP adapters for {0}:{1}'.format(host, port)) + adapters = [] + if not port: + # returning all registered adapters for given host + adapters = [adapter for adapter in six.itervalues(self.adapters) + if adapter.host == host] + else: + # need to retrieve or create specific adapter + adapter = self.adapters.get((host, port), None) + if not adapter: + LOG.debug('Creating HTTP adapter for {0}:{1}' + .format(host, port)) + adapter = ProxiedHTTPAdapter(proxy_command, host, port, + *args, **kwargs) + self.adapters[(host, port)] = adapter + adapters = [adapter] + + return adapters + + +class ProxiedHTTPAdapter(adapters.HTTPAdapter): + port = None + host = None + + def __init__(self, proxy_command, host, port, *args, **kwargs): + super(ProxiedHTTPAdapter, self).__init__(*args, **kwargs) + LOG.debug('HTTP adapter created with cmd {0}'.format(proxy_command)) + self.cmd = shlex.split(proxy_command) + self.port = port + self.host = host + + def get_connection(self, url, proxies=None): + pool_conn = ( + super(ProxiedHTTPAdapter, self).get_connection(url, proxies)) + if hasattr(pool_conn, '_get_conn'): + http_conn = pool_conn._get_conn() + if http_conn.sock is None: + if hasattr(http_conn, 'connect'): + sock = self._connect() + LOG.debug('HTTP connection {0} getting new ' + 'netcat socket {1}'.format(http_conn, sock)) + http_conn.sock = sock + else: + if hasattr(http_conn.sock, 'is_netcat_socket'): + LOG.debug('pooled http connection has existing ' + 'netcat socket. resetting pipe...') + http_conn.sock.reset() + + pool_conn._put_conn(http_conn) + + return pool_conn + + def close(self): + LOG.debug('Closing HTTP adapter for {0}:{1}' + .format(self.host, self.port)) + super(ProxiedHTTPAdapter, self).close() + + def _connect(self): + LOG.debug('Returning netcat socket with command {0}' + .format(self.cmd)) + rootwrap_command = CONF.rootwrap_command if CONF.use_rootwrap else '' + return NetcatSocket(self.cmd, rootwrap_command) + + +class NetcatSocket(object): + + def _create_process(self): + self.process = e_subprocess.Popen(self.cmd, + stdin=e_subprocess.PIPE, + stdout=e_subprocess.PIPE, + stderr=e_subprocess.PIPE) + + def __init__(self, cmd, rootwrap_command=None): + self.cmd = cmd + self.rootwrap_command = rootwrap_command + self._create_process() + + def send(self, content): + try: + self.process.stdin.write(content) + self.process.stdin.flush() + except IOError as e: + raise ex.SystemError(e) + return len(content) + + def sendall(self, content): + return self.send(content) + + def makefile(self, mode, *arg): + if mode.startswith('r'): + return self.process.stdout + if mode.startswith('w'): + return self.process.stdin + raise ex.IncorrectStateError(_("Unknown file mode %s") % mode) + + def recv(self, size): + try: + return os.read(self.process.stdout.fileno(), size) + except IOError as e: + raise ex.SystemError(e) + + def _terminate(self): + if self.rootwrap_command: + os.system('{0} kill {1}'.format(self.rootwrap_command, + self.process.pid)) + else: + self.process.terminate() + + def close(self): + LOG.debug('Socket close called') + self._terminate() + + def settimeout(self, timeout): + pass + + def fileno(self): + return self.process.stdin.fileno() + + def is_netcat_socket(self): + return True + + def reset(self): + self._terminate() + self._create_process() + + class InstanceInteropHelper(remote.Remote): def __init__(self, instance): self.instance = instance @@ -340,19 +467,61 @@ class InstanceInteropHelper(remote.Remote): neutron_info['token'] = ctx.token neutron_info['tenant'] = ctx.tenant_name neutron_info['host'] = self.instance.management_ip - neutron_info['use_rootwrap'] = CONF.use_rootwrap - neutron_info['rootwrap_command'] = CONF.rootwrap_command LOG.debug('Returning neutron info: {0}'.format(neutron_info)) return neutron_info - def _get_conn_params(self): - info = None - if CONF.use_namespaces and not CONF.use_floating_ips: + def _build_proxy_command(self, command, host=None, port=None, info=None, + rootwrap_command=None): + # Accepted keywords in the proxy command template: + # {host}, {port}, {tenant_id}, {network_id}, {router_id} + keywords = {} + + if not info: info = self.get_neutron_info() + keywords['tenant_id'] = context.current().tenant_id + keywords['network_id'] = info['network'] + + # Query Neutron only if needed + if '{router_id}' in command: + client = neutron.NeutronClient(info['network'], info['uri'], + info['token'], info['tenant']) + keywords['router_id'] = client.get_router() + + keywords['host'] = host + keywords['port'] = port + + try: + command = command.format(**keywords) + except KeyError as e: + LOG.error(_('Invalid keyword in proxy_command: %s'), str(e)) + # Do not give more details to the end-user + raise ex.SystemError('Misconfiguration') + if rootwrap_command: + command = '{0} {1}'.format(rootwrap_command, command) + return command + + def _get_conn_params(self): + proxy_command = None + if CONF.proxy_command: + # Build a session through a user-defined socket + proxy_command = CONF.proxy_command + elif CONF.use_namespaces and not CONF.use_floating_ips: + # Build a session through a netcat socket in the Neutron namespace + proxy_command = ( + 'ip netns exec qrouter-{router_id} nc {host} {port}') + # proxy_command is currently a template, turn it into a real command + # i.e. dereference {host}, {port}, etc. + if proxy_command: + rootwrap = CONF.rootwrap_command if CONF.use_rootwrap else '' + proxy_command = self._build_proxy_command( + proxy_command, host=self.instance.management_ip, port=22, + info=None, rootwrap_command=rootwrap) + return (self.instance.management_ip, self.instance.node_group.image_username, - self.instance.node_group.cluster.management_private_key, info) + self.instance.node_group.cluster.management_private_key, + proxy_command) def _run(self, func, *args, **kwargs): proc = procutils.start_subprocess() @@ -384,15 +553,29 @@ class InstanceInteropHelper(remote.Remote): _release_remote_semaphore() def get_http_client(self, port, info=None, *args, **kwargs): - self._log_command('Retrieving http session for {0}:{1}'.format( - self.instance.management_ip, - port)) - if CONF.use_namespaces and not CONF.use_floating_ips: + self._log_command('Retrieving HTTP session for {0}:{1}'.format( + self.instance.management_ip, port)) + proxy_command = None + if CONF.proxy_command: + # Build a session through a user-defined socket + proxy_command = CONF.proxy_command + elif info or (CONF.use_namespaces and not CONF.use_floating_ips): # need neutron info if not info: info = self.get_neutron_info() - return _get_http_client(self.instance.management_ip, port, info, - *args, **kwargs) + # Build a session through a netcat socket in the Neutron namespace + proxy_command = ( + 'ip netns exec qrouter-{router_id} nc {host} {port}') + # proxy_command is currently a template, turn it into a real command + # i.e. dereference {host}, {port}, etc. + if proxy_command: + rootwrap = CONF.rootwrap_command if CONF.use_rootwrap else '' + proxy_command = self._build_proxy_command( + proxy_command, host=self.instance.management_ip, port=port, + info=info, rootwrap_command=rootwrap) + + return _get_http_client(self.instance.management_ip, port, + proxy_command, *args, **kwargs) def close_http_session(self, port): global _sessions