Merge "Add indirect VMs access implementation"
This commit is contained in:
commit
c2a758de7f
@ -44,6 +44,7 @@ NODE_GROUP_DEFAULTS = {
|
||||
"security_groups": None,
|
||||
"auto_security_group": False,
|
||||
"availability_zone": None,
|
||||
"is_proxy_gateway": False,
|
||||
}
|
||||
|
||||
INSTANCE_DEFAULTS = {
|
||||
|
@ -21,6 +21,8 @@ is to hide some necessary magic. Current module describes objects
|
||||
fields via docstrings and contains implementation of helper methods.
|
||||
"""
|
||||
|
||||
import random
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from sahara.utils import configs
|
||||
@ -61,6 +63,22 @@ class Cluster(object):
|
||||
cluster_template - ClusterTemplate object
|
||||
"""
|
||||
|
||||
def has_proxy_gateway(self):
|
||||
for ng in self.node_groups:
|
||||
if ng.is_proxy_gateway:
|
||||
return True
|
||||
|
||||
def get_proxy_gateway_node(self):
|
||||
proxies = []
|
||||
for ng in self.node_groups:
|
||||
if ng.is_proxy_gateway and ng.instances:
|
||||
proxies += ng.instances
|
||||
|
||||
if proxies:
|
||||
return random.choice(proxies)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class NodeGroup(object):
|
||||
"""An object representing Node Group.
|
||||
@ -87,6 +105,8 @@ class NodeGroup(object):
|
||||
availability_zone - name of Nova availability zone where to spawn instances
|
||||
open_ports - List of ports that will be opened if auto_security_group is
|
||||
True
|
||||
is_proxy_gateway - indicates if nodes from this node group should be used
|
||||
as proxy to access other cluster nodes
|
||||
|
||||
count
|
||||
instances - list of Instance objects
|
||||
@ -185,6 +205,7 @@ class NodeGroupTemplate(object):
|
||||
security_groups
|
||||
auto_security_group
|
||||
availability_zone
|
||||
is_proxy_gateway
|
||||
"""
|
||||
|
||||
|
||||
|
@ -0,0 +1,44 @@
|
||||
# Copyright 2014 OpenStack Foundation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Add is_proxy_gateway
|
||||
|
||||
Revision ID: 016
|
||||
Revises: 015
|
||||
Create Date: 2014-11-10 12:47:17.871520
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '016'
|
||||
down_revision = '015'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.add_column('node_group_templates',
|
||||
sa.Column('is_proxy_gateway', sa.Boolean()))
|
||||
op.add_column('node_groups',
|
||||
sa.Column('is_proxy_gateway', sa.Boolean()))
|
||||
op.add_column('templates_relations',
|
||||
sa.Column('is_proxy_gateway', sa.Boolean()))
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_column('templates_relations', 'is_proxy_gateway')
|
||||
op.drop_column('node_groups', 'is_proxy_gateway')
|
||||
op.drop_column('node_group_templates', 'is_proxy_gateway')
|
@ -123,6 +123,7 @@ class NodeGroup(mb.SaharaBase):
|
||||
auto_security_group = sa.Column(sa.Boolean())
|
||||
availability_zone = sa.Column(sa.String(255))
|
||||
open_ports = sa.Column(st.JsonListType())
|
||||
is_proxy_gateway = sa.Column(sa.Boolean())
|
||||
|
||||
def to_dict(self):
|
||||
d = super(NodeGroup, self).to_dict()
|
||||
@ -209,6 +210,7 @@ class NodeGroupTemplate(mb.SaharaBase):
|
||||
security_groups = sa.Column(st.JsonListType())
|
||||
auto_security_group = sa.Column(sa.Boolean())
|
||||
availability_zone = sa.Column(sa.String(255))
|
||||
is_proxy_gateway = sa.Column(sa.Boolean())
|
||||
|
||||
|
||||
class TemplatesRelation(mb.SaharaBase):
|
||||
@ -244,6 +246,7 @@ class TemplatesRelation(mb.SaharaBase):
|
||||
security_groups = sa.Column(st.JsonListType())
|
||||
auto_security_group = sa.Column(sa.Boolean())
|
||||
availability_zone = sa.Column(sa.String(255))
|
||||
is_proxy_gateway = sa.Column(sa.Boolean())
|
||||
|
||||
|
||||
# EDP objects: DataSource, Job, Job Execution, JobBinary
|
||||
|
@ -48,7 +48,10 @@ def init_instances_ips(instance):
|
||||
else:
|
||||
management_ip = management_ip or address['addr']
|
||||
|
||||
if not CONF.use_floating_ips:
|
||||
cluster = instance.node_group.cluster
|
||||
if (not CONF.use_floating_ips or
|
||||
(cluster.has_proxy_gateway() and
|
||||
not instance.node_group.is_proxy_gateway)):
|
||||
management_ip = internal_ip
|
||||
|
||||
# NOTE(aignatov): Once bug #1262529 is fixed this 'if' block should be
|
||||
|
@ -291,7 +291,9 @@ def check_node_groups_in_cluster_templates(cluster_name, plugin_name,
|
||||
cluster_template_id):
|
||||
c_t = api.get_cluster_template(id=cluster_template_id)
|
||||
n_groups = c_t.to_wrapped_dict()['cluster_template']['node_groups']
|
||||
check_network_config(n_groups)
|
||||
proxy_gateway_used = len([ng for ng in n_groups if
|
||||
ng.get('is_proxy_gateway', False)]) > 0
|
||||
check_network_config(n_groups, proxy_gateway_used)
|
||||
for node_group in n_groups:
|
||||
check_node_group_basic_fields(plugin_name, hadoop_version, node_group)
|
||||
check_cluster_hostnames_lengths(cluster_name, n_groups)
|
||||
@ -311,10 +313,14 @@ def check_node_group_template_exists(ng_tmpl_id):
|
||||
ng_tmpl_id, _("NodeGroup template with id '%s' not found"))
|
||||
|
||||
|
||||
def check_network_config(node_groups):
|
||||
def check_network_config(node_groups, proxy_gateway_used=False):
|
||||
if CONF.use_floating_ips and CONF.use_neutron:
|
||||
for ng in node_groups:
|
||||
if not _get_floating_ip_pool(ng):
|
||||
require_floating = True
|
||||
if proxy_gateway_used:
|
||||
require_floating = ng.get('is_proxy_gateway', False)
|
||||
|
||||
if require_floating and not _get_floating_ip_pool(ng):
|
||||
raise ex.MissingFloatingNetworkException(ng.get('name'))
|
||||
|
||||
|
||||
|
@ -81,7 +81,9 @@ def check_cluster_create(data, **kwargs):
|
||||
data['anti_affinity'])
|
||||
|
||||
if data.get('node_groups'):
|
||||
b.check_network_config(data['node_groups'])
|
||||
proxy_gateway_used = len([ng for ng in data['node_groups'] if
|
||||
ng.get('is_proxy_gateway', False)]) > 0
|
||||
b.check_network_config(data['node_groups'], proxy_gateway_used)
|
||||
b.check_cluster_hostnames_lengths(data['name'], data['node_groups'])
|
||||
|
||||
neutron_net_id = _get_cluster_field(data, 'neutron_management_network')
|
||||
|
@ -107,6 +107,7 @@ def check_cluster_scaling(data, cluster_id, **kwargs):
|
||||
|
||||
if data.get("add_node_groups"):
|
||||
b.check_add_node_groups(cluster, data['add_node_groups'])
|
||||
b.check_network_config(data['add_node_groups'])
|
||||
b.check_network_config(data['add_node_groups'],
|
||||
cluster.has_proxy_gateway())
|
||||
b.check_cluster_hostnames_lengths(cluster.name,
|
||||
data['add_node_groups'])
|
||||
|
@ -87,6 +87,9 @@ NODE_GROUP_TEMPLATE_SCHEMA = {
|
||||
"availability_zone": {
|
||||
"type": "string",
|
||||
},
|
||||
"is_proxy_gateway": {
|
||||
"type": "boolean"
|
||||
},
|
||||
},
|
||||
"additionalProperties": False,
|
||||
"required": [
|
||||
|
@ -120,6 +120,7 @@ class ClusterTest(test_base.ConductorManagerTestCase):
|
||||
ng.pop("image_username")
|
||||
ng.pop("open_ports")
|
||||
ng.pop("auto_security_group")
|
||||
ng.pop("is_proxy_gateway")
|
||||
ng.pop("tenant_id")
|
||||
ng.pop("availability_zone")
|
||||
|
||||
|
@ -213,6 +213,7 @@ class ClusterTemplates(test_base.ConductorManagerTestCase):
|
||||
ng.pop("volumes_availability_zone")
|
||||
ng.pop("volume_type")
|
||||
ng.pop("auto_security_group")
|
||||
ng.pop("is_proxy_gateway")
|
||||
|
||||
self.assertEqual(SAMPLE_CLT["node_groups"],
|
||||
clt_db_obj["node_groups"])
|
||||
|
@ -434,6 +434,13 @@ class SaharaMigrationsCheckers(object):
|
||||
self.assertColumnCount(engine, 'cluster_events', events_columns)
|
||||
self.assertColumnsExists(engine, 'cluster_events', events_columns)
|
||||
|
||||
def _check_016(self, engine, date):
|
||||
self.assertColumnExists(engine, 'node_group_templates',
|
||||
'is_proxy_gateway')
|
||||
self.assertColumnExists(engine, 'node_groups', 'is_proxy_gateway')
|
||||
self.assertColumnExists(engine, 'templates_relations',
|
||||
'is_proxy_gateway')
|
||||
|
||||
|
||||
class TestMigrationsMySQL(SaharaMigrationsCheckers,
|
||||
base.BaseWalkMigrationTestCase,
|
||||
|
@ -275,6 +275,74 @@ class TestClusterCreateValidation(u.ValidationTestCase):
|
||||
}
|
||||
)
|
||||
|
||||
def test_cluster_create_missing_floating_pool(self):
|
||||
self.override_config("use_neutron", True)
|
||||
self._assert_create_object_validation(
|
||||
data={
|
||||
'name': "testname",
|
||||
'plugin_name': "vanilla",
|
||||
'hadoop_version': "1.2.1",
|
||||
'user_keypair_id': 'test_keypair',
|
||||
'default_image_id': '550e8400-e29b-41d4-a716-446655440000',
|
||||
'neutron_management_network': 'd9a3bebc-f788-4b81-'
|
||||
'9a93-aa048022c1ca',
|
||||
'node_groups': [
|
||||
{
|
||||
"name": "ng1",
|
||||
"node_processes": ["namenode"],
|
||||
"flavor_id": "42",
|
||||
"count": 100,
|
||||
'security_groups': ['group1', 'group2'],
|
||||
'floating_ip_pool':
|
||||
'd9a3bebc-f788-4b81-9a93-aa048022c1ca'
|
||||
},
|
||||
{
|
||||
"name": "ng2",
|
||||
"node_processes": ["datanode"],
|
||||
"flavor_id": "42",
|
||||
"count": 100,
|
||||
'security_groups': ['group1', 'group2']
|
||||
}
|
||||
]
|
||||
},
|
||||
bad_req_i=(1, 'MISSING_FLOATING_NETWORK',
|
||||
"Node Group ng2 is missing 'floating_ip_pool' "
|
||||
"field")
|
||||
)
|
||||
|
||||
def test_cluster_create_with_proxy_gateway(self):
|
||||
self.override_config("use_neutron", True)
|
||||
self._assert_create_object_validation(
|
||||
data={
|
||||
'name': "testname",
|
||||
'plugin_name': "vanilla",
|
||||
'hadoop_version': "1.2.1",
|
||||
'user_keypair_id': 'test_keypair',
|
||||
'default_image_id': '550e8400-e29b-41d4-a716-446655440000',
|
||||
'neutron_management_network': 'd9a3bebc-f788-4b81-'
|
||||
'9a93-aa048022c1ca',
|
||||
'node_groups': [
|
||||
{
|
||||
"name": "ng1",
|
||||
"node_processes": ["namenode"],
|
||||
"flavor_id": "42",
|
||||
"count": 100,
|
||||
'security_groups': ['group1', 'group2'],
|
||||
'floating_ip_pool':
|
||||
'd9a3bebc-f788-4b81-9a93-aa048022c1ca',
|
||||
"is_proxy_gateway": True
|
||||
},
|
||||
{
|
||||
"name": "ng2",
|
||||
"node_processes": ["datanode"],
|
||||
"flavor_id": "42",
|
||||
"count": 100,
|
||||
'security_groups': ['group1', 'group2']
|
||||
}
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
def test_cluster_create_security_groups_by_ids(self):
|
||||
self.override_config("use_neutron", True)
|
||||
self._assert_create_object_validation(
|
||||
|
@ -36,7 +36,7 @@ def make_ng_dict(name, flavor, processes, count, instances=None, **kwargs):
|
||||
'count': count, 'instances': instances, 'node_configs': {},
|
||||
'security_groups': None, 'auto_security_group': False,
|
||||
'availability_zone': None, 'volumes_availability_zone': None,
|
||||
'open_ports': []}
|
||||
'open_ports': [], 'is_proxy_gateway': False}
|
||||
dct.update(kwargs)
|
||||
return dct
|
||||
|
||||
|
@ -13,6 +13,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import shlex
|
||||
|
||||
import mock
|
||||
import testtools
|
||||
|
||||
@ -32,6 +34,12 @@ class FakeCluster(object):
|
||||
self.management_private_key = priv_key
|
||||
self.neutron_management_network = 'network1'
|
||||
|
||||
def has_proxy_gateway(self):
|
||||
return False
|
||||
|
||||
def get_proxy_gateway_node(self):
|
||||
return None
|
||||
|
||||
|
||||
class FakeNodeGroup(object):
|
||||
def __init__(self, user, priv_key):
|
||||
@ -101,7 +109,8 @@ class TestInstanceInteropHelper(base.SaharaTestCase):
|
||||
# Test SSH
|
||||
remote.execute_command('/bin/true')
|
||||
self.run_in_subprocess.assert_any_call(
|
||||
42, ssh_remote._connect, ('10.0.0.1', 'user1', 'key1', None))
|
||||
42, ssh_remote._connect, ('10.0.0.1', 'user1', 'key1',
|
||||
None, None, None))
|
||||
# Test HTTP
|
||||
remote.get_http_client(8080)
|
||||
self.assertFalse(p_adapter.called)
|
||||
@ -109,8 +118,9 @@ class TestInstanceInteropHelper(base.SaharaTestCase):
|
||||
# When use_floating_ips=False and use_namespaces=True, a netcat socket
|
||||
# created with 'ip netns exec qrouter-...' should be used to access
|
||||
# instances.
|
||||
@mock.patch('sahara.utils.ssh_remote._simple_exec_func')
|
||||
@mock.patch('sahara.utils.ssh_remote.ProxiedHTTPAdapter')
|
||||
def test_use_namespaces(self, p_adapter):
|
||||
def test_use_namespaces(self, p_adapter, p_simple_exec_func):
|
||||
self.override_config('use_floating_ips', False)
|
||||
self.override_config('use_namespaces', True)
|
||||
|
||||
@ -122,17 +132,20 @@ class TestInstanceInteropHelper(base.SaharaTestCase):
|
||||
self.run_in_subprocess.assert_any_call(
|
||||
42, ssh_remote._connect,
|
||||
('10.0.0.2', 'user2', 'key2',
|
||||
'ip netns exec qrouter-fakerouter nc 10.0.0.2 22'))
|
||||
'ip netns exec qrouter-fakerouter nc 10.0.0.2 22', None, None))
|
||||
# Test HTTP
|
||||
remote.get_http_client(8080)
|
||||
p_adapter.assert_called_once_with(
|
||||
'ip netns exec qrouter-fakerouter nc 10.0.0.2 8080',
|
||||
p_simple_exec_func(),
|
||||
'10.0.0.2', 8080)
|
||||
p_simple_exec_func.assert_any_call(
|
||||
shlex.split('ip netns exec qrouter-fakerouter nc 10.0.0.2 8080'))
|
||||
|
||||
# When proxy_command is set, a user-defined netcat socket should be used to
|
||||
# access instances.
|
||||
@mock.patch('sahara.utils.ssh_remote._simple_exec_func')
|
||||
@mock.patch('sahara.utils.ssh_remote.ProxiedHTTPAdapter')
|
||||
def test_proxy_command(self, p_adapter):
|
||||
def test_proxy_command(self, p_adapter, p_simple_exec_func):
|
||||
self.override_config('proxy_command', 'ssh fakerelay nc {host} {port}')
|
||||
|
||||
instance = FakeInstance('inst3', '10.0.0.3', 'user3', 'key3')
|
||||
@ -142,11 +155,14 @@ class TestInstanceInteropHelper(base.SaharaTestCase):
|
||||
remote.execute_command('/bin/true')
|
||||
self.run_in_subprocess.assert_any_call(
|
||||
42, ssh_remote._connect,
|
||||
('10.0.0.3', 'user3', 'key3', 'ssh fakerelay nc 10.0.0.3 22'))
|
||||
('10.0.0.3', 'user3', 'key3', 'ssh fakerelay nc 10.0.0.3 22',
|
||||
None, None))
|
||||
# Test HTTP
|
||||
remote.get_http_client(8080)
|
||||
p_adapter.assert_called_once_with(
|
||||
'ssh fakerelay nc 10.0.0.3 8080', '10.0.0.3', 8080)
|
||||
p_simple_exec_func(), '10.0.0.3', 8080)
|
||||
p_simple_exec_func.assert_any_call(
|
||||
shlex.split('ssh fakerelay nc 10.0.0.3 8080'))
|
||||
|
||||
def test_proxy_command_bad(self):
|
||||
self.override_config('proxy_command', '{bad_kw} nc {host} {port}')
|
||||
|
@ -38,19 +38,20 @@ def start_subprocess():
|
||||
stderr=subprocess.PIPE)
|
||||
|
||||
|
||||
def run_in_subprocess(proc, func, args=(), kwargs={}):
|
||||
def run_in_subprocess(proc, func, args=(), kwargs={}, interactive=False):
|
||||
try:
|
||||
pickle.dump(func, proc.stdin)
|
||||
pickle.dump(args, proc.stdin)
|
||||
pickle.dump(kwargs, proc.stdin)
|
||||
proc.stdin.flush()
|
||||
|
||||
result = pickle.load(proc.stdout)
|
||||
if not interactive:
|
||||
result = pickle.load(proc.stdout)
|
||||
|
||||
if 'exception' in result:
|
||||
raise SubprocessException(result['exception'])
|
||||
if 'exception' in result:
|
||||
raise SubprocessException(result['exception'])
|
||||
|
||||
return result['output']
|
||||
return result['output']
|
||||
finally:
|
||||
# NOTE(dmitryme): in openstack/common/processutils.py it
|
||||
# is suggested to sleep a little between calls to multiprocessing.
|
||||
|
@ -33,6 +33,8 @@ implementations which are run in a separate process.
|
||||
|
||||
import os
|
||||
import shlex
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
@ -64,6 +66,7 @@ CONF = cfg.CONF
|
||||
|
||||
|
||||
_ssh = None
|
||||
_proxy_ssh = None
|
||||
_sessions = {}
|
||||
|
||||
|
||||
@ -73,26 +76,44 @@ INFRA = None
|
||||
_global_remote_semaphore = None
|
||||
|
||||
|
||||
def _connect(host, username, private_key, proxy_command=None):
|
||||
def _connect(host, username, private_key, proxy_command=None,
|
||||
gateway_host=None, gateway_image_username=None):
|
||||
global _ssh
|
||||
global _proxy_ssh
|
||||
|
||||
LOG.debug('Creating SSH connection')
|
||||
if type(private_key) in [str, unicode]:
|
||||
private_key = crypto.to_paramiko_private_key(private_key)
|
||||
|
||||
_ssh = paramiko.SSHClient()
|
||||
_ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
|
||||
proxy = None
|
||||
if proxy_command:
|
||||
LOG.debug('creating proxy using command: {0}'.format(proxy_command))
|
||||
LOG.debug('creating proxy using command: %s', proxy_command)
|
||||
proxy = paramiko.ProxyCommand(proxy_command)
|
||||
|
||||
if gateway_host:
|
||||
_proxy_ssh = paramiko.SSHClient()
|
||||
_proxy_ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
|
||||
LOG.debug('connecting to proxy gateway at: %s', gateway_host)
|
||||
_proxy_ssh.connect(gateway_host, username=gateway_image_username,
|
||||
pkey=private_key, sock=proxy)
|
||||
|
||||
proxy = _proxy_ssh.get_transport().open_session()
|
||||
proxy.exec_command("nc {0} 22".format(host))
|
||||
|
||||
_ssh.connect(host, username=username, pkey=private_key, sock=proxy)
|
||||
|
||||
|
||||
def _cleanup():
|
||||
global _ssh
|
||||
global _proxy_ssh
|
||||
|
||||
_ssh.close()
|
||||
if _proxy_ssh:
|
||||
_proxy_ssh.close()
|
||||
|
||||
|
||||
def _read_paramimko_stream(recv_func):
|
||||
@ -137,14 +158,54 @@ def _execute_command(cmd, run_as_root=False, get_stderr=False,
|
||||
return ret_code, stdout
|
||||
|
||||
|
||||
def _get_http_client(host, port, proxy_command=None):
|
||||
def _execute_command_interactive(cmd, run_as_root=False):
|
||||
global _ssh
|
||||
|
||||
chan = _ssh.get_transport().open_session()
|
||||
if run_as_root:
|
||||
chan.exec_command('sudo bash -c "%s"' % _escape_quotes(cmd))
|
||||
else:
|
||||
chan.exec_command(cmd)
|
||||
|
||||
_proxy_shell(chan)
|
||||
|
||||
_ssh.close()
|
||||
|
||||
|
||||
def _proxy_shell(chan):
|
||||
def readall():
|
||||
while True:
|
||||
d = sys.stdin.read(1)
|
||||
if not d or chan.exit_status_ready():
|
||||
break
|
||||
chan.send(d)
|
||||
|
||||
reader = threading.Thread(target=readall)
|
||||
reader.start()
|
||||
|
||||
while True:
|
||||
data = chan.recv(256)
|
||||
if not data or chan.exit_status_ready():
|
||||
break
|
||||
sys.stdout.write(data)
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
def _get_http_client(host, port, proxy_command=None, gateway_host=None,
|
||||
gateway_username=None, gateway_private_key=None):
|
||||
global _sessions
|
||||
|
||||
_http_session = _sessions.get((host, port), None)
|
||||
LOG.debug('cached HTTP session for {0}:{1} is {2}'.format(host, port,
|
||||
_http_session))
|
||||
if not _http_session:
|
||||
if proxy_command:
|
||||
if gateway_host:
|
||||
_http_session = _get_proxy_gateway_http_session(
|
||||
gateway_host, gateway_username,
|
||||
gateway_private_key, host, port, proxy_command)
|
||||
LOG.debug('created ssh proxied HTTP session for {0}:{1}'
|
||||
.format(host, port))
|
||||
elif proxy_command:
|
||||
# can return a new session here because it actually uses
|
||||
# the same adapter (and same connection pools) for a given
|
||||
# host and port tuple
|
||||
@ -301,20 +362,64 @@ def _release_remote_semaphore():
|
||||
|
||||
def _get_proxied_http_session(proxy_command, host, port=None):
|
||||
session = requests.Session()
|
||||
adapter = ProxiedHTTPAdapter(proxy_command, host, port)
|
||||
|
||||
adapter = ProxiedHTTPAdapter(
|
||||
_simple_exec_func(shlex.split(proxy_command)), host, port)
|
||||
session.mount('http://{0}:{1}'.format(host, adapter.port), adapter)
|
||||
|
||||
return session
|
||||
|
||||
|
||||
class ProxiedHTTPAdapter(adapters.HTTPAdapter):
|
||||
port = None
|
||||
host = None
|
||||
def _get_proxy_gateway_http_session(gateway_host, gateway_username,
|
||||
gateway_private_key, host, port=None,
|
||||
proxy_command=None):
|
||||
session = requests.Session()
|
||||
adapter = ProxiedHTTPAdapter(
|
||||
_proxy_gateway_func(gateway_host, gateway_username,
|
||||
gateway_private_key, host,
|
||||
port, proxy_command),
|
||||
host, port)
|
||||
session.mount('http://{0}:{1}'.format(host, port), adapter)
|
||||
|
||||
def __init__(self, proxy_command, host, port):
|
||||
return session
|
||||
|
||||
|
||||
def _simple_exec_func(cmd):
|
||||
def func():
|
||||
return e_subprocess.Popen(cmd,
|
||||
stdin=e_subprocess.PIPE,
|
||||
stdout=e_subprocess.PIPE,
|
||||
stderr=e_subprocess.PIPE)
|
||||
|
||||
return func
|
||||
|
||||
|
||||
def _proxy_gateway_func(gateway_host, gateway_username,
|
||||
gateway_private_key, host,
|
||||
port, proxy_command):
|
||||
def func():
|
||||
proc = procutils.start_subprocess()
|
||||
|
||||
try:
|
||||
conn_params = (gateway_host, gateway_username, gateway_private_key,
|
||||
proxy_command, None, None)
|
||||
procutils.run_in_subprocess(proc, _connect, conn_params)
|
||||
cmd = "nc {host} {port}".format(host=host, port=port)
|
||||
procutils.run_in_subprocess(
|
||||
proc, _execute_command_interactive, (cmd,), interactive=True)
|
||||
return proc
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
procutils.shutdown_subprocess(proc, _cleanup)
|
||||
|
||||
return func
|
||||
|
||||
|
||||
class ProxiedHTTPAdapter(adapters.HTTPAdapter):
|
||||
def __init__(self, create_process_func, host, port):
|
||||
super(ProxiedHTTPAdapter, self).__init__()
|
||||
LOG.debug('HTTP adapter created with cmd {0}'.format(proxy_command))
|
||||
self.cmd = shlex.split(proxy_command)
|
||||
LOG.debug('HTTP adapter created for {0}:{1}'.format(host, port))
|
||||
self.create_process_func = create_process_func
|
||||
self.port = port
|
||||
self.host = host
|
||||
|
||||
@ -345,22 +450,19 @@ class ProxiedHTTPAdapter(adapters.HTTPAdapter):
|
||||
super(ProxiedHTTPAdapter, self).close()
|
||||
|
||||
def _connect(self):
|
||||
LOG.debug('Returning netcat socket with command {0}'
|
||||
.format(self.cmd))
|
||||
LOG.debug('Returning netcat socket for {0}:{1}'
|
||||
.format(self.host, self.port))
|
||||
rootwrap_command = CONF.rootwrap_command if CONF.use_rootwrap else ''
|
||||
return NetcatSocket(self.cmd, rootwrap_command)
|
||||
return NetcatSocket(self.create_process_func, rootwrap_command)
|
||||
|
||||
|
||||
class NetcatSocket(object):
|
||||
|
||||
def _create_process(self):
|
||||
self.process = e_subprocess.Popen(self.cmd,
|
||||
stdin=e_subprocess.PIPE,
|
||||
stdout=e_subprocess.PIPE,
|
||||
stderr=e_subprocess.PIPE)
|
||||
self.process = self.create_process_func()
|
||||
|
||||
def __init__(self, cmd, rootwrap_command=None):
|
||||
self.cmd = cmd
|
||||
def __init__(self, create_process_func, rootwrap_command=None):
|
||||
self.create_process_func = create_process_func
|
||||
self.rootwrap_command = rootwrap_command
|
||||
self._create_process()
|
||||
|
||||
@ -432,27 +534,29 @@ class InstanceInteropHelper(remote.Remote):
|
||||
finally:
|
||||
_release_remote_semaphore()
|
||||
|
||||
def get_neutron_info(self):
|
||||
def get_neutron_info(self, instance=None):
|
||||
if not instance:
|
||||
instance = self.instance
|
||||
neutron_info = h.HashableDict()
|
||||
neutron_info['network'] = (
|
||||
self.instance.node_group.cluster.neutron_management_network)
|
||||
instance.node_group.cluster.neutron_management_network)
|
||||
ctx = context.current()
|
||||
neutron_info['uri'] = base.url_for(ctx.service_catalog, 'network')
|
||||
neutron_info['token'] = ctx.auth_token
|
||||
neutron_info['tenant'] = ctx.tenant_name
|
||||
neutron_info['host'] = self.instance.management_ip
|
||||
neutron_info['host'] = instance.management_ip
|
||||
|
||||
LOG.debug('Returning neutron info: {0}'.format(neutron_info))
|
||||
return neutron_info
|
||||
|
||||
def _build_proxy_command(self, command, host=None, port=None, info=None,
|
||||
rootwrap_command=None):
|
||||
def _build_proxy_command(self, command, instance=None, port=None,
|
||||
info=None, rootwrap_command=None):
|
||||
# Accepted keywords in the proxy command template:
|
||||
# {host}, {port}, {tenant_id}, {network_id}, {router_id}
|
||||
keywords = {}
|
||||
|
||||
if not info:
|
||||
info = self.get_neutron_info()
|
||||
info = self.get_neutron_info(instance)
|
||||
keywords['tenant_id'] = context.current().tenant_id
|
||||
keywords['network_id'] = info['network']
|
||||
|
||||
@ -462,7 +566,7 @@ class InstanceInteropHelper(remote.Remote):
|
||||
info['token'], info['tenant'])
|
||||
keywords['router_id'] = client.get_router()
|
||||
|
||||
keywords['host'] = host
|
||||
keywords['host'] = instance.management_ip
|
||||
keywords['port'] = port
|
||||
|
||||
try:
|
||||
@ -476,6 +580,19 @@ class InstanceInteropHelper(remote.Remote):
|
||||
return command
|
||||
|
||||
def _get_conn_params(self):
|
||||
host_ng = self.instance.node_group
|
||||
cluster = host_ng.cluster
|
||||
access_instance = self.instance
|
||||
proxy_gateway_node = cluster.get_proxy_gateway_node()
|
||||
|
||||
gateway_host = None
|
||||
gateway_image_username = None
|
||||
if proxy_gateway_node and not host_ng.is_proxy_gateway:
|
||||
access_instance = proxy_gateway_node
|
||||
gateway_host = proxy_gateway_node.management_ip
|
||||
ng = proxy_gateway_node.node_group
|
||||
gateway_image_username = ng.image_username
|
||||
|
||||
proxy_command = None
|
||||
if CONF.proxy_command:
|
||||
# Build a session through a user-defined socket
|
||||
@ -489,13 +606,15 @@ class InstanceInteropHelper(remote.Remote):
|
||||
if proxy_command:
|
||||
rootwrap = CONF.rootwrap_command if CONF.use_rootwrap else ''
|
||||
proxy_command = self._build_proxy_command(
|
||||
proxy_command, host=self.instance.management_ip, port=22,
|
||||
proxy_command, instance=access_instance, port=22,
|
||||
info=None, rootwrap_command=rootwrap)
|
||||
|
||||
return (self.instance.management_ip,
|
||||
self.instance.node_group.image_username,
|
||||
self.instance.node_group.cluster.management_private_key,
|
||||
proxy_command)
|
||||
host_ng.image_username,
|
||||
cluster.management_private_key,
|
||||
proxy_command,
|
||||
gateway_host,
|
||||
gateway_image_username)
|
||||
|
||||
def _run(self, func, *args, **kwargs):
|
||||
proc = procutils.start_subprocess()
|
||||
@ -529,6 +648,23 @@ class InstanceInteropHelper(remote.Remote):
|
||||
def get_http_client(self, port, info=None):
|
||||
self._log_command('Retrieving HTTP session for {0}:{1}'.format(
|
||||
self.instance.management_ip, port))
|
||||
|
||||
host_ng = self.instance.node_group
|
||||
cluster = host_ng.cluster
|
||||
access_instance = self.instance
|
||||
access_port = port
|
||||
proxy_gateway_node = cluster.get_proxy_gateway_node()
|
||||
|
||||
gateway_host = None
|
||||
gateway_username = None
|
||||
gateway_private_key = None
|
||||
if proxy_gateway_node and not host_ng.is_proxy_gateway:
|
||||
access_instance = proxy_gateway_node
|
||||
access_port = 22
|
||||
gateway_host = proxy_gateway_node.management_ip
|
||||
gateway_username = proxy_gateway_node.node_group.image_username
|
||||
gateway_private_key = cluster.management_private_key
|
||||
|
||||
proxy_command = None
|
||||
if CONF.proxy_command:
|
||||
# Build a session through a user-defined socket
|
||||
@ -536,7 +672,7 @@ class InstanceInteropHelper(remote.Remote):
|
||||
elif info or (CONF.use_namespaces and not CONF.use_floating_ips):
|
||||
# need neutron info
|
||||
if not info:
|
||||
info = self.get_neutron_info()
|
||||
info = self.get_neutron_info(access_instance)
|
||||
# Build a session through a netcat socket in the Neutron namespace
|
||||
proxy_command = (
|
||||
'ip netns exec qrouter-{router_id} nc {host} {port}')
|
||||
@ -545,11 +681,13 @@ class InstanceInteropHelper(remote.Remote):
|
||||
if proxy_command:
|
||||
rootwrap = CONF.rootwrap_command if CONF.use_rootwrap else ''
|
||||
proxy_command = self._build_proxy_command(
|
||||
proxy_command, host=self.instance.management_ip, port=port,
|
||||
proxy_command, instance=access_instance, port=access_port,
|
||||
info=info, rootwrap_command=rootwrap)
|
||||
|
||||
return _get_http_client(self.instance.management_ip, port,
|
||||
proxy_command)
|
||||
proxy_command, gateway_host,
|
||||
gateway_username,
|
||||
gateway_private_key)
|
||||
|
||||
def close_http_session(self, port):
|
||||
global _sessions
|
||||
|
Loading…
Reference in New Issue
Block a user