Merge "Make proxy command generic and user-definable"

This commit is contained in:
Jenkins 2014-11-05 11:20:10 +00:00 committed by Gerrit Code Review
commit 51b435fd81
8 changed files with 454 additions and 240 deletions

View File

@ -68,13 +68,46 @@ integration see the Sahara documentation sections
.. _Sahara extra repository: http://github.com/openstack/sahara-extra .. _Sahara extra repository: http://github.com/openstack/sahara-extra
Namespaces and non-root users Custom network topologies
----------------------------- -------------------------
In cases where namespaces are being used to access cluster VMs via private IPs, Sahara accesses VMs at several stages of cluster spawning, both via SSH and
rootwrap functionality is provided to allow users other than ``root`` access HTTP. When floating IPs are not assigned to instances, Sahara needs to be able
to the namespace related OS facilities. To use rootwrap the following to reach them another way. Floating IPs and network namespaces (see
configuration property is required to be set: :ref:`neutron-nova-network`) are automatically used when present.
When none of these are enabled, the ``proxy_command`` property can be used to
give Sahara a command to access VMs. This command is run on the Sahara host and
must open a netcat socket to the instance destination port. ``{host}`` and
``{port}`` keywords should be used to describe the destination, they will be
translated at runtime. Other keywords can be used: ``{tenant_id}``,
``{network_id}`` and ``{router_id}``.
For instance, the following configuration property in the Sahara configuration
file would be used if VMs are accessed through a relay machine:
.. sourcecode:: cfg
[DEFAULT]
proxy_command='ssh relay-machine-{tenant_id} nc {host} {port}'
Whereas the following property would be used to access VMs through a custom
network namespace:
.. sourcecode:: cfg
[DEFAULT]
proxy_command='ip netns exec ns_for_{network_id} nc {host} {port}'
Non-root users
--------------
In cases where a proxy command is being used to access cluster VMs (for
instance when using namespaces or when specifying a custom proxy command),
rootwrap functionality is provided to allow users other than ``root`` access to
the needed OS facilities. To use rootwrap the following configuration property
is required to be set:
.. sourcecode:: cfg .. sourcecode:: cfg
@ -100,12 +133,13 @@ steps:
``etc/sahara/rootwrap.conf`` to the system specific location, usually ``etc/sahara/rootwrap.conf`` to the system specific location, usually
``/etc/sahara``. This file contains the default configuration for rootwrap. ``/etc/sahara``. This file contains the default configuration for rootwrap.
* Copy the provided rootwrap filers file from the local project file * Copy the provided rootwrap filters file from the local project file
``etc/sahara/rootwrap.d/sahara.filters`` to the location specified in the ``etc/sahara/rootwrap.d/sahara.filters`` to the location specified in the
rootwrap configuration file, usually ``/etc/sahara/rootwrap.d``. This file rootwrap configuration file, usually ``/etc/sahara/rootwrap.d``. This file
contains the filters that will allow the ``sahara`` user to acces the contains the filters that will allow the ``sahara`` user to access the
``ip netns exec``, ``nc``, and ``kill`` commands through the rootwrap. It ``ip netns exec``, ``nc``, and ``kill`` commands through the rootwrap
should look similar to the followings: (depending on ``proxy_command`` you may need to set additional filters).
It should look similar to the followings:
.. sourcecode:: cfg .. sourcecode:: cfg

View File

@ -31,6 +31,8 @@ and the size of each volume.
All volumes are attached during Cluster creation/scaling operations. All volumes are attached during Cluster creation/scaling operations.
.. _neutron-nova-network:
Neutron and Nova Network support Neutron and Nova Network support
-------------------------------- --------------------------------
OpenStack clusters may use Nova or Neutron as a networking service. Sahara OpenStack clusters may use Nova or Neutron as a networking service. Sahara

View File

@ -521,6 +521,13 @@
# cluster. (integer value) # cluster. (integer value)
#cluster_remote_threshold=70 #cluster_remote_threshold=70
# Proxy command used to connect to instances. If set, this
# command should open a netcat socket, that Sahara will use
# for SSH and HTTP connections. Use {host} and {port} to
# describe the destination. Other available keywords:
# {tenant_id}, {network_id}, {router_id}. (string value)
#proxy_command=
[conductor] [conductor]

View File

@ -19,42 +19,15 @@ import testtools
from sahara.utils.openstack import neutron as neutron_client from sahara.utils.openstack import neutron as neutron_client
class NeutronClientRemoteWrapperTest(testtools.TestCase): class NeutronClientTest(testtools.TestCase):
@mock.patch("neutronclient.neutron.client.Client") @mock.patch("neutronclient.neutron.client.Client")
def test_get_router(self, patched): def test_get_router(self, patched):
patched.side_effect = _test_get_neutron_client patched.side_effect = _test_get_neutron_client
neutron = neutron_client.NeutronClientRemoteWrapper( neutron = neutron_client.NeutronClient(
'33b47310-b7a8-4559-bf95-45ba669a448e', None, None, None) '33b47310-b7a8-4559-bf95-45ba669a448e', None, None, None)
self.assertEqual('6c4d4e32-3667-4cd4-84ea-4cc1e98d18be', self.assertEqual('6c4d4e32-3667-4cd4-84ea-4cc1e98d18be',
neutron.get_router()) neutron.get_router())
@mock.patch("neutronclient.neutron.client.Client")
def test__get_adapters(self, patched):
patched.side_effect = _test_get_neutron_client
neutron = neutron_client.NeutronClientRemoteWrapper(
'33b47310-b7a8-4559-bf95-45ba669a448e', None, None, None)
host1 = '127.0.0.1'
port1 = '9999'
expected_adapters = [
neutron_client.NeutronHttpAdapter(neutron.get_router(),
host1, port1)]
# this should create an adapter and cache it
actual_adapters = neutron._get_adapters(host=host1, port=port1)
self.assertEqual(len(expected_adapters), len(actual_adapters))
self.assertEqual(expected_adapters[0].host,
actual_adapters[0].host)
self.assertEqual(expected_adapters[0].port,
actual_adapters[0].port)
# this should return all adapters for the host, which at this
# time only contains the single adapter
actual_adapters = neutron._get_adapters(host=host1)
self.assertEqual(len(expected_adapters), len(actual_adapters))
self.assertEqual(expected_adapters[0].host,
actual_adapters[0].host)
self.assertEqual(expected_adapters[0].port,
actual_adapters[0].port)
def _test_get_neutron_client(api_version, *args, **kwargs): def _test_get_neutron_client(api_version, *args, **kwargs):
return FakeNeutronClient() return FakeNeutronClient()

View File

@ -13,8 +13,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import mock
import testtools import testtools
from sahara import exceptions as ex
from sahara.tests.unit import base
from sahara.utils import ssh_remote from sahara.utils import ssh_remote
@ -22,3 +25,164 @@ class TestEscapeQuotes(testtools.TestCase):
def test_escape_quotes(self): def test_escape_quotes(self):
s = ssh_remote._escape_quotes('echo "\\"Hello, world!\\""') s = ssh_remote._escape_quotes('echo "\\"Hello, world!\\""')
self.assertEqual(s, r'echo \"\\\"Hello, world!\\\"\"') self.assertEqual(s, r'echo \"\\\"Hello, world!\\\"\"')
class TestHTTPRemoteWrapper(testtools.TestCase):
def test__get_adapters(self):
wrapper = ssh_remote.HTTPRemoteWrapper()
host1 = '127.0.0.1'
port1 = '9999'
proxy_command = ('ip netns exec qrouter-{router_id} nc {host} {port}'
.format(router_id='fake', host=host1, port=port1))
expected_adapters = [
ssh_remote.ProxiedHTTPAdapter(proxy_command, host1, port1)]
# this should create an adapter and cache it
actual_adapters = wrapper._get_adapters(proxy_command,
host=host1, port=port1)
self.assertEqual(len(expected_adapters), len(actual_adapters))
self.assertEqual(expected_adapters[0].host,
actual_adapters[0].host)
self.assertEqual(expected_adapters[0].port,
actual_adapters[0].port)
# this should return all adapters for the host, which at this
# time only contains the single adapter
actual_adapters = wrapper._get_adapters(proxy_command, host=host1)
self.assertEqual(len(expected_adapters), len(actual_adapters))
self.assertEqual(expected_adapters[0].host,
actual_adapters[0].host)
self.assertEqual(expected_adapters[0].port,
actual_adapters[0].port)
class FakeCluster(object):
def __init__(self, priv_key):
self.management_private_key = priv_key
self.neutron_management_network = 'network1'
class FakeNodeGroup(object):
def __init__(self, user, priv_key):
self.image_username = user
self.cluster = FakeCluster(priv_key)
class FakeInstance(object):
def __init__(self, inst_name, management_ip, user, priv_key):
self.instance_name = inst_name
self.management_ip = management_ip
self.node_group = FakeNodeGroup(user, priv_key)
class TestInstanceInteropHelper(base.SaharaTestCase):
def setUp(self):
super(TestInstanceInteropHelper, self).setUp()
p_sma = mock.patch('sahara.utils.ssh_remote._acquire_remote_semaphore')
p_sma.start()
p_smr = mock.patch('sahara.utils.ssh_remote._release_remote_semaphore')
p_smr.start()
p_neutron_router = mock.patch(
'sahara.utils.openstack.neutron.NeutronClient.get_router',
return_value='fakerouter')
p_neutron_router.start()
# During tests subprocesses are not used (because _sahara-subprocess
# is not installed in /bin and Mock objects cannot be pickled).
p_start_subp = mock.patch('sahara.utils.procutils.start_subprocess',
return_value=42)
p_start_subp.start()
p_run_subp = mock.patch('sahara.utils.procutils.run_in_subprocess')
self.run_in_subprocess = p_run_subp.start()
p_shut_subp = mock.patch('sahara.utils.procutils.shutdown_subprocess')
p_shut_subp.start()
self.patchers = [p_sma, p_smr, p_neutron_router, p_start_subp,
p_run_subp, p_shut_subp]
def tearDown(self):
for patcher in self.patchers:
patcher.stop()
super(TestInstanceInteropHelper, self).tearDown()
def setup_context(self, username="test_user", tenant_id="tenant_1",
token="test_auth_token", tenant_name='test_tenant',
**kwargs):
service_catalog = '''[
{ "type": "network",
"endpoints": [ { "region": "RegionOne",
"publicURL": "http://localhost/" } ] } ]'''
super(TestInstanceInteropHelper, self).setup_context(
username=username, tenant_id=tenant_id, token=token,
tenant_name=tenant_name, service_catalog=service_catalog, **kwargs)
# When use_floating_ips=True, no proxy should be used: _connect is called
# with proxy=None and ProxiedHTTPAdapter is not used.
@mock.patch('sahara.utils.ssh_remote.ProxiedHTTPAdapter')
def test_use_floating_ips(self, p_adapter):
self.override_config('use_floating_ips', True)
instance = FakeInstance('inst1', '10.0.0.1', 'user1', 'key1')
remote = ssh_remote.InstanceInteropHelper(instance)
# Test SSH
remote.execute_command('/bin/true')
self.run_in_subprocess.assert_any_call(
42, ssh_remote._connect, ('10.0.0.1', 'user1', 'key1', None))
# Test HTTP
remote.get_http_client(8080)
self.assertFalse(p_adapter.called)
# When use_floating_ips=False and use_namespaces=True, a netcat socket
# created with 'ip netns exec qrouter-...' should be used to access
# instances.
@mock.patch('sahara.utils.ssh_remote.ProxiedHTTPAdapter')
def test_use_namespaces(self, p_adapter):
self.override_config('use_floating_ips', False)
self.override_config('use_namespaces', True)
instance = FakeInstance('inst2', '10.0.0.2', 'user2', 'key2')
remote = ssh_remote.InstanceInteropHelper(instance)
# Test SSH
remote.execute_command('/bin/true')
self.run_in_subprocess.assert_any_call(
42, ssh_remote._connect,
('10.0.0.2', 'user2', 'key2',
'ip netns exec qrouter-fakerouter nc 10.0.0.2 22'))
# Test HTTP
remote.get_http_client(8080)
p_adapter.assert_called_once_with(
'ip netns exec qrouter-fakerouter nc 10.0.0.2 8080',
'10.0.0.2', 8080)
# When proxy_command is set, a user-defined netcat socket should be used to
# access instances.
@mock.patch('sahara.utils.ssh_remote.ProxiedHTTPAdapter')
def test_proxy_command(self, p_adapter):
self.override_config('proxy_command', 'ssh fakerelay nc {host} {port}')
instance = FakeInstance('inst3', '10.0.0.3', 'user3', 'key3')
remote = ssh_remote.InstanceInteropHelper(instance)
# Test SSH
remote.execute_command('/bin/true')
self.run_in_subprocess.assert_any_call(
42, ssh_remote._connect,
('10.0.0.3', 'user3', 'key3', 'ssh fakerelay nc 10.0.0.3 22'))
# Test HTTP
remote.get_http_client(8080)
p_adapter.assert_called_once_with(
'ssh fakerelay nc 10.0.0.3 8080', '10.0.0.3', 8080)
def test_proxy_command_bad(self):
self.override_config('proxy_command', '{bad_kw} nc {host} {port}')
instance = FakeInstance('inst4', '10.0.0.4', 'user4', 'key4')
remote = ssh_remote.InstanceInteropHelper(instance)
# Test SSH
self.assertRaises(ex.SystemError, remote.execute_command, '/bin/true')
# Test HTTP
self.assertRaises(ex.SystemError, remote.get_http_client, 8080)

View File

@ -13,14 +13,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os
import shlex
from eventlet.green import subprocess as e_subprocess
from neutronclient.neutron import client as neutron_cli from neutronclient.neutron import client as neutron_cli
import requests
from requests import adapters
import six
from sahara import context from sahara import context
from sahara import exceptions as ex from sahara import exceptions as ex
@ -44,9 +38,8 @@ def client():
return neutron_cli.Client('2.0', **args) return neutron_cli.Client('2.0', **args)
class NeutronClientRemoteWrapper(object): class NeutronClient(object):
neutron = None neutron = None
adapters = {}
routers = {} routers = {}
def __init__(self, network, uri, token, tenant_name): def __init__(self, network, uri, token, tenant_name):
@ -57,8 +50,7 @@ class NeutronClientRemoteWrapper(object):
self.network = network self.network = network
def get_router(self): def get_router(self):
matching_router = NeutronClientRemoteWrapper.routers.get(self.network, matching_router = NeutronClient.routers.get(self.network, None)
None)
if matching_router: if matching_router:
LOG.debug('Returning cached qrouter') LOG.debug('Returning cached qrouter')
return matching_router['id'] return matching_router['id']
@ -71,8 +63,7 @@ class NeutronClientRemoteWrapper(object):
if port['network_id'] == self.network), None) if port['network_id'] == self.network), None)
if port: if port:
matching_router = router matching_router = router
NeutronClientRemoteWrapper.routers[ NeutronClient.routers[self.network] = matching_router
self.network] = matching_router
break break
if not matching_router: if not matching_router:
@ -80,149 +71,3 @@ class NeutronClientRemoteWrapper(object):
'%s is not found') % self.network) '%s is not found') % self.network)
return matching_router['id'] return matching_router['id']
def get_http_session(self, host, port=None, use_rootwrap=False,
rootwrap_command=None, *args, **kwargs):
session = requests.Session()
adapters = self._get_adapters(host, port=port,
use_rootwrap=use_rootwrap,
rootwrap_command=rootwrap_command,
*args, **kwargs)
for adapter in adapters:
session.mount('http://{0}:{1}'.format(host, adapter.port), adapter)
return session
def _get_adapters(self, host, port=None, use_rootwrap=False,
rootwrap_command=None, *args, **kwargs):
LOG.debug('Retrieving neutron adapters for {0}:{1}'.format(host, port))
adapters = []
if not port:
# returning all registered adapters for given host
adapters = [adapter for adapter in six.itervalues(self.adapters)
if adapter.host == host]
else:
# need to retrieve or create specific adapter
adapter = self.adapters.get((host, port), None)
if not adapter:
LOG.debug('Creating neutron adapter for {0}:{1}'
.format(host, port))
qrouter = self.get_router()
kwargs['use_rootwrap'] = use_rootwrap
kwargs['rootwrap_command'] = rootwrap_command
adapter = (
NeutronHttpAdapter(qrouter, host, port, *args, **kwargs))
self.adapters[(host, port)] = adapter
adapters = [adapter]
return adapters
class NeutronHttpAdapter(adapters.HTTPAdapter):
port = None
host = None
def __init__(self, qrouter, host, port, use_rootwrap=False,
rootwrap_command=None, *args, **kwargs):
super(NeutronHttpAdapter, self).__init__(*args, **kwargs)
command = '{0} ip netns exec qrouter-{1} nc {2} {3}'.format(
rootwrap_command if use_rootwrap else '',
qrouter, host, port)
LOG.debug('Neutron adapter created with cmd {0}'.format(command))
self.cmd = shlex.split(command)
self.port = port
self.host = host
self.rootwrap_command = rootwrap_command if use_rootwrap else None
def get_connection(self, url, proxies=None):
pool_conn = (
super(NeutronHttpAdapter, self).get_connection(url, proxies))
if hasattr(pool_conn, '_get_conn'):
http_conn = pool_conn._get_conn()
if http_conn.sock is None:
if hasattr(http_conn, 'connect'):
sock = self._connect()
LOG.debug('HTTP connection {0} getting new '
'netcat socket {1}'.format(http_conn, sock))
http_conn.sock = sock
else:
if hasattr(http_conn.sock, 'is_netcat_socket'):
LOG.debug('pooled http connection has existing '
'netcat socket. resetting pipe...')
http_conn.sock.reset()
pool_conn._put_conn(http_conn)
return pool_conn
def close(self):
LOG.debug('Closing neutron adapter for {0}:{1}'
.format(self.host, self.port))
super(NeutronHttpAdapter, self).close()
def _connect(self):
LOG.debug('returning netcat socket with command {0}'
.format(self.cmd))
return NetcatSocket(self.cmd, rootwrap_command=self.rootwrap_command)
class NetcatSocket(object):
def _create_process(self):
self.process = e_subprocess.Popen(self.cmd,
stdin=e_subprocess.PIPE,
stdout=e_subprocess.PIPE,
stderr=e_subprocess.PIPE)
def __init__(self, cmd, rootwrap_command=None):
self.cmd = cmd
self.rootwrap_command = rootwrap_command
self._create_process()
def send(self, content):
try:
self.process.stdin.write(content)
self.process.stdin.flush()
except IOError as e:
raise ex.SystemError(e)
return len(content)
def sendall(self, content):
return self.send(content)
def makefile(self, mode, *arg):
if mode.startswith('r'):
return self.process.stdout
if mode.startswith('w'):
return self.process.stdin
raise ex.IncorrectStateError(_("Unknown file mode %s") % mode)
def recv(self, size):
try:
return os.read(self.process.stdout.fileno(), size)
except IOError as e:
raise ex.SystemError(e)
def _terminate(self):
if self.rootwrap_command:
os.system('{0} kill {1}'.format(self.rootwrap_command,
self.process.pid))
else:
self.process.terminate()
def close(self):
LOG.debug('Socket close called')
self._terminate()
def settimeout(self, timeout):
pass
def fileno(self):
return self.process.stdin.fileno()
def is_netcat_socket(self):
return True
def reset(self):
self._terminate()
self._create_process()

View File

@ -32,6 +32,12 @@ ssh_opts = [
cfg.IntOpt('cluster_remote_threshold', default=70, cfg.IntOpt('cluster_remote_threshold', default=70,
help='The same as global_remote_threshold, but for ' help='The same as global_remote_threshold, but for '
'a single cluster.'), 'a single cluster.'),
cfg.StrOpt('proxy_command', default='',
help='Proxy command used to connect to instances. If set, this '
'command should open a netcat socket, that Sahara will use for '
'SSH and HTTP connections. Use {host} and {port} to describe '
'the destination. Other available keywords: {tenant_id}, '
'{network_id}, {router_id}.'),
] ]

View File

@ -32,15 +32,19 @@ implementations which are run in a separate process.
""" """
import logging import logging
import os
import shlex
import time import time
import uuid import uuid
from eventlet.green import subprocess as e_subprocess
from eventlet import semaphore from eventlet import semaphore
from eventlet import timeout as e_timeout from eventlet import timeout as e_timeout
from oslo.config import cfg from oslo.config import cfg
from oslo.utils import excutils from oslo.utils import excutils
import paramiko import paramiko
import requests import requests
from requests import adapters
import six import six
from sahara import context from sahara import context
@ -69,33 +73,20 @@ INFRA = None
_global_remote_semaphore = None _global_remote_semaphore = None
def _get_proxy(neutron_info): def _connect(host, username, private_key, proxy_command=None):
client = neutron.NeutronClientRemoteWrapper(neutron_info['network'],
neutron_info['uri'],
neutron_info['token'],
neutron_info['tenant'])
qrouter = client.get_router()
proxy = paramiko.ProxyCommand('{0} ip netns exec qrouter-{1} nc {2} 22'
.format(neutron_info['rootwrap_command']
if neutron_info['use_rootwrap']
else '',
qrouter, neutron_info['host']))
return proxy
def _connect(host, username, private_key, neutron_info=None):
global _ssh global _ssh
LOG.debug('Creating SSH connection') LOG.debug('Creating SSH connection')
proxy = None
if type(private_key) in [str, unicode]: if type(private_key) in [str, unicode]:
private_key = crypto.to_paramiko_private_key(private_key) private_key = crypto.to_paramiko_private_key(private_key)
_ssh = paramiko.SSHClient() _ssh = paramiko.SSHClient()
_ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) _ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if neutron_info:
LOG.debug('creating proxy using info: {0}'.format(neutron_info)) proxy = None
proxy = _get_proxy(neutron_info) if proxy_command:
LOG.debug('creating proxy using command: {0}'.format(proxy_command))
proxy = paramiko.ProxyCommand(proxy_command)
_ssh.connect(host, username=username, pkey=private_key, sock=proxy) _ssh.connect(host, username=username, pkey=private_key, sock=proxy)
@ -146,28 +137,24 @@ def _execute_command(cmd, run_as_root=False, get_stderr=False,
return ret_code, stdout return ret_code, stdout
def _get_http_client(host, port, neutron_info, *args, **kwargs): def _get_http_client(host, port, proxy_command=None, *args, **kwargs):
global _sessions global _sessions
_http_session = _sessions.get((host, port), None) _http_session = _sessions.get((host, port), None)
LOG.debug('cached HTTP session for {0}:{1} is {2}'.format(host, port, LOG.debug('cached HTTP session for {0}:{1} is {2}'.format(host, port,
_http_session)) _http_session))
if not _http_session: if not _http_session:
if neutron_info: if proxy_command:
neutron_client = neutron.NeutronClientRemoteWrapper(
neutron_info['network'], neutron_info['uri'],
neutron_info['token'], neutron_info['tenant'])
# can return a new session here because it actually uses # can return a new session here because it actually uses
# the same adapter (and same connection pools) for a given # the same adapter (and same connection pools) for a given
# host and port tuple # host and port tuple
_http_session = neutron_client.get_http_session( _http_session = HTTPRemoteWrapper().get_http_session(
host, port=port, use_rootwrap=CONF.use_rootwrap, proxy_command, host, port=port, *args, **kwargs)
rootwrap_command=CONF.rootwrap_command, *args, **kwargs) LOG.debug('created proxied HTTP session for {0}:{1}'
LOG.debug('created neutron based HTTP session for {0}:{1}'
.format(host, port)) .format(host, port))
else: else:
# need to cache the session for the non-neutron or neutron # need to cache the sessions that are not proxied through
# floating ip cases so that a new session with a new HTTPAdapter # HTTPRemoteWrapper so that a new session with a new HTTPAdapter
# and associated pools is not recreated for each HTTP invocation # and associated pools is not recreated for each HTTP invocation
_http_session = requests.Session() _http_session = requests.Session()
LOG.debug('created standard HTTP session for {0}:{1}' LOG.debug('created standard HTTP session for {0}:{1}'
@ -312,6 +299,146 @@ def _release_remote_semaphore():
context.current().remote_semaphore.release() context.current().remote_semaphore.release()
class HTTPRemoteWrapper(object):
adapters = {}
def get_http_session(self, proxy_command, host, port=None,
*args, **kwargs):
session = requests.Session()
adapters = self._get_adapters(proxy_command, host, port=port,
*args, **kwargs)
for adapter in adapters:
session.mount('http://{0}:{1}'.format(host, adapter.port), adapter)
return session
def _get_adapters(self, proxy_command, host, port=None, *args, **kwargs):
LOG.debug('Retrieving HTTP adapters for {0}:{1}'.format(host, port))
adapters = []
if not port:
# returning all registered adapters for given host
adapters = [adapter for adapter in six.itervalues(self.adapters)
if adapter.host == host]
else:
# need to retrieve or create specific adapter
adapter = self.adapters.get((host, port), None)
if not adapter:
LOG.debug('Creating HTTP adapter for {0}:{1}'
.format(host, port))
adapter = ProxiedHTTPAdapter(proxy_command, host, port,
*args, **kwargs)
self.adapters[(host, port)] = adapter
adapters = [adapter]
return adapters
class ProxiedHTTPAdapter(adapters.HTTPAdapter):
port = None
host = None
def __init__(self, proxy_command, host, port, *args, **kwargs):
super(ProxiedHTTPAdapter, self).__init__(*args, **kwargs)
LOG.debug('HTTP adapter created with cmd {0}'.format(proxy_command))
self.cmd = shlex.split(proxy_command)
self.port = port
self.host = host
def get_connection(self, url, proxies=None):
pool_conn = (
super(ProxiedHTTPAdapter, self).get_connection(url, proxies))
if hasattr(pool_conn, '_get_conn'):
http_conn = pool_conn._get_conn()
if http_conn.sock is None:
if hasattr(http_conn, 'connect'):
sock = self._connect()
LOG.debug('HTTP connection {0} getting new '
'netcat socket {1}'.format(http_conn, sock))
http_conn.sock = sock
else:
if hasattr(http_conn.sock, 'is_netcat_socket'):
LOG.debug('pooled http connection has existing '
'netcat socket. resetting pipe...')
http_conn.sock.reset()
pool_conn._put_conn(http_conn)
return pool_conn
def close(self):
LOG.debug('Closing HTTP adapter for {0}:{1}'
.format(self.host, self.port))
super(ProxiedHTTPAdapter, self).close()
def _connect(self):
LOG.debug('Returning netcat socket with command {0}'
.format(self.cmd))
rootwrap_command = CONF.rootwrap_command if CONF.use_rootwrap else ''
return NetcatSocket(self.cmd, rootwrap_command)
class NetcatSocket(object):
def _create_process(self):
self.process = e_subprocess.Popen(self.cmd,
stdin=e_subprocess.PIPE,
stdout=e_subprocess.PIPE,
stderr=e_subprocess.PIPE)
def __init__(self, cmd, rootwrap_command=None):
self.cmd = cmd
self.rootwrap_command = rootwrap_command
self._create_process()
def send(self, content):
try:
self.process.stdin.write(content)
self.process.stdin.flush()
except IOError as e:
raise ex.SystemError(e)
return len(content)
def sendall(self, content):
return self.send(content)
def makefile(self, mode, *arg):
if mode.startswith('r'):
return self.process.stdout
if mode.startswith('w'):
return self.process.stdin
raise ex.IncorrectStateError(_("Unknown file mode %s") % mode)
def recv(self, size):
try:
return os.read(self.process.stdout.fileno(), size)
except IOError as e:
raise ex.SystemError(e)
def _terminate(self):
if self.rootwrap_command:
os.system('{0} kill {1}'.format(self.rootwrap_command,
self.process.pid))
else:
self.process.terminate()
def close(self):
LOG.debug('Socket close called')
self._terminate()
def settimeout(self, timeout):
pass
def fileno(self):
return self.process.stdin.fileno()
def is_netcat_socket(self):
return True
def reset(self):
self._terminate()
self._create_process()
class InstanceInteropHelper(remote.Remote): class InstanceInteropHelper(remote.Remote):
def __init__(self, instance): def __init__(self, instance):
self.instance = instance self.instance = instance
@ -340,19 +467,61 @@ class InstanceInteropHelper(remote.Remote):
neutron_info['token'] = ctx.token neutron_info['token'] = ctx.token
neutron_info['tenant'] = ctx.tenant_name neutron_info['tenant'] = ctx.tenant_name
neutron_info['host'] = self.instance.management_ip neutron_info['host'] = self.instance.management_ip
neutron_info['use_rootwrap'] = CONF.use_rootwrap
neutron_info['rootwrap_command'] = CONF.rootwrap_command
LOG.debug('Returning neutron info: {0}'.format(neutron_info)) LOG.debug('Returning neutron info: {0}'.format(neutron_info))
return neutron_info return neutron_info
def _get_conn_params(self): def _build_proxy_command(self, command, host=None, port=None, info=None,
info = None rootwrap_command=None):
if CONF.use_namespaces and not CONF.use_floating_ips: # Accepted keywords in the proxy command template:
# {host}, {port}, {tenant_id}, {network_id}, {router_id}
keywords = {}
if not info:
info = self.get_neutron_info() info = self.get_neutron_info()
keywords['tenant_id'] = context.current().tenant_id
keywords['network_id'] = info['network']
# Query Neutron only if needed
if '{router_id}' in command:
client = neutron.NeutronClient(info['network'], info['uri'],
info['token'], info['tenant'])
keywords['router_id'] = client.get_router()
keywords['host'] = host
keywords['port'] = port
try:
command = command.format(**keywords)
except KeyError as e:
LOG.error(_('Invalid keyword in proxy_command: %s'), str(e))
# Do not give more details to the end-user
raise ex.SystemError('Misconfiguration')
if rootwrap_command:
command = '{0} {1}'.format(rootwrap_command, command)
return command
def _get_conn_params(self):
proxy_command = None
if CONF.proxy_command:
# Build a session through a user-defined socket
proxy_command = CONF.proxy_command
elif CONF.use_namespaces and not CONF.use_floating_ips:
# Build a session through a netcat socket in the Neutron namespace
proxy_command = (
'ip netns exec qrouter-{router_id} nc {host} {port}')
# proxy_command is currently a template, turn it into a real command
# i.e. dereference {host}, {port}, etc.
if proxy_command:
rootwrap = CONF.rootwrap_command if CONF.use_rootwrap else ''
proxy_command = self._build_proxy_command(
proxy_command, host=self.instance.management_ip, port=22,
info=None, rootwrap_command=rootwrap)
return (self.instance.management_ip, return (self.instance.management_ip,
self.instance.node_group.image_username, self.instance.node_group.image_username,
self.instance.node_group.cluster.management_private_key, info) self.instance.node_group.cluster.management_private_key,
proxy_command)
def _run(self, func, *args, **kwargs): def _run(self, func, *args, **kwargs):
proc = procutils.start_subprocess() proc = procutils.start_subprocess()
@ -384,15 +553,29 @@ class InstanceInteropHelper(remote.Remote):
_release_remote_semaphore() _release_remote_semaphore()
def get_http_client(self, port, info=None, *args, **kwargs): def get_http_client(self, port, info=None, *args, **kwargs):
self._log_command('Retrieving http session for {0}:{1}'.format( self._log_command('Retrieving HTTP session for {0}:{1}'.format(
self.instance.management_ip, self.instance.management_ip, port))
port)) proxy_command = None
if CONF.use_namespaces and not CONF.use_floating_ips: if CONF.proxy_command:
# Build a session through a user-defined socket
proxy_command = CONF.proxy_command
elif info or (CONF.use_namespaces and not CONF.use_floating_ips):
# need neutron info # need neutron info
if not info: if not info:
info = self.get_neutron_info() info = self.get_neutron_info()
return _get_http_client(self.instance.management_ip, port, info, # Build a session through a netcat socket in the Neutron namespace
*args, **kwargs) proxy_command = (
'ip netns exec qrouter-{router_id} nc {host} {port}')
# proxy_command is currently a template, turn it into a real command
# i.e. dereference {host}, {port}, etc.
if proxy_command:
rootwrap = CONF.rootwrap_command if CONF.use_rootwrap else ''
proxy_command = self._build_proxy_command(
proxy_command, host=self.instance.management_ip, port=port,
info=info, rootwrap_command=rootwrap)
return _get_http_client(self.instance.management_ip, port,
proxy_command, *args, **kwargs)
def close_http_session(self, port): def close_http_session(self, port):
global _sessions global _sessions