Merge "Enable network operations over neutron private nets"

This commit is contained in:
Jenkins 2013-11-01 14:34:14 +00:00 committed by Gerrit Code Review
commit 2478a69475
15 changed files with 465 additions and 78 deletions

View File

@ -64,7 +64,8 @@ On Fedora-based distributions (e.g., Fedora/RHEL/CentOS/Scientific Linux):
5. Look through the savanna.conf and change parameters which default values do not suite you.
Set ``os_auth_host`` to the address of your VM with DevStack.
If you are using Neutron instead of Nova Network add ``use_neutron = True`` to config.
If you are using Neutron instead of Nova Network add ``use_neutron = True`` to config. If the
linux kernel you're utilizing support network namespaces then also specify ``use_namespaces = True``.
.. note::

View File

@ -55,11 +55,13 @@ Neutron and Nova Network support
--------------------------------
OpenStack Cluster may use Nova Network or Neutron as a networking service. Savanna supports both, but when deployed,
a special configuration for networking should be set explicitly. By default Savanna will behave as if Nova Network is used.
If OpenStack Cluster uses Neutron, then ``use_neutron`` option should be set to ``True`` in Savanna configuration file.
If OpenStack Cluster uses Neutron, then ``use_neutron`` option should be set to ``True`` in Savanna configuration file. In
addition, if the OpenStack Cluster supports network namespaces, set the ``use_namespaces`` option to ``True``
.. sourcecode:: cfg
use_neutron=True
use_namespaces=True
Savanna Dashboard should also be configured properly to support Neutron. ``SAVANNA_USE_NEUTRON`` should be set to ``True`` in
OpenStack Dashboard ``local_settings.py`` configuration file.

View File

@ -29,6 +29,9 @@
# Use Neutron or Nova Network (boolean value)
#use_neutron=false
# Use network namespaces for communication (only valid to use in conjunction
# with use_neutron=True)
#use_namespaces=false
# Maximum length of job binary data in kilobytes that may be

View File

@ -34,6 +34,9 @@
# Use Neutron or Nova Network (boolean value)
#use_neutron=false
# Use network namespaces for communication (only valid to use in conjunction
# with use_neutron=True)
#use_namespaces=false
#
# Options defined in savanna.main

View File

@ -13,6 +13,7 @@ python-cinderclient>=1.0.6
python-keystoneclient>=0.4.1
python-novaclient>=2.15.0
python-swiftclient>=1.5
python-neutronclient>=2.3.0,<3
six>=1.4.1
SQLAlchemy>=0.7.8,<=0.7.99
WebOb>=1.2.3,<1.3

View File

@ -52,7 +52,12 @@ networking_opts = [
"dhcp_domain config parameter"),
cfg.BoolOpt('use_neutron',
default=False,
help="Use Neutron or Nova Network")
help="Use Neutron Networking (False indicates the use of Nova "
"networking)"),
cfg.BoolOpt('use_namespaces',
default=False,
help="Use network namespaces for communication (only valid to "
"use in conjunction with use_neutron=True)")
]
@ -91,3 +96,14 @@ def parse_configs(argv=None, conf_files=None):
# TODO(slukjanov): replace RuntimeError with Savanna-specific exception
raise RuntimeError("Option '%s' is required for config group "
"'%s'" % (roe.opt_name, roe.group.name))
validate_configs()
def validate_network_configs():
if CONF.use_namespaces and not CONF.use_neutron:
raise RuntimeError('use_namespaces can not be set to "True" when '
'use_neutron is set to "False"')
def validate_configs():
validate_network_configs()

View File

@ -238,6 +238,8 @@ class AmbariPlugin(p.ProvisioningPluginBase):
client.start_services(cluster.name, cluster_spec,
self.cluster_ambari_mapping[cluster.name])
client.cleanup(self.cluster_ambari_mapping[cluster.name])
def get_title(self):
return 'Hortonworks Data Platform'
@ -284,6 +286,8 @@ class AmbariPlugin(p.ProvisioningPluginBase):
ambari_client.scale_cluster(cluster.name, cluster_spec, servers,
self._get_num_hosts(cluster), ambari_info)
ambari_client.cleanup(ambari_info)
def decommission_nodes(self, cluster, instances):
raise exc.InvalidException('The HDP plugin does not yet support the '
'decommissioning of nodes')

View File

@ -30,7 +30,6 @@ class ClusterSpec():
self.services = []
self.configurations = {}
self.node_groups = {}
self.servers = None
self.version = version
self.user_input_handlers = {}
@ -44,7 +43,6 @@ class ClusterSpec():
if scaled_groups is None:
scaled_groups = {}
self._determine_deployed_services(cluster)
self.servers = self._get_servers_from_savanna_cluster(cluster)
self._process_node_groups(cluster=cluster)
for ng_id in scaled_groups:
@ -114,13 +112,6 @@ class ClusterSpec():
return components
def _get_servers_from_savanna_cluster(self, cluster):
servers = []
for node_group in cluster.node_groups:
servers += node_group.instances
return servers
def _parse_services(self, template_json):
for s in template_json['services']:
name = s['name']
@ -184,7 +175,8 @@ class ClusterSpec():
for instance in ng.instances:
node_group.instances.add(Instance(instance.fqdn,
instance.management_ip,
instance.internal_ip))
instance.internal_ip,
instance.remote))
self.node_groups[node_group.name] = node_group
def _determine_deployed_services(self, cluster):
@ -251,10 +243,11 @@ class User():
class Instance():
def __init__(self, fqdn, management_ip, internal_ip):
def __init__(self, fqdn, management_ip, internal_ip, remote):
self.fqdn = fqdn
self.management_ip = management_ip
self.internal_ip = internal_ip
self.remote = remote
def __hash__(self):
return hash(self.fqdn)

View File

@ -15,31 +15,34 @@
import json
import logging
import pkg_resources as pkg
import requests
from oslo.config import cfg
from savanna import context
from savanna.plugins.general import exceptions as ex
from savanna.plugins.hdp import clusterspec as cs
from savanna.plugins.hdp import configprovider as cfg
from savanna.plugins.hdp import configprovider as cfgprov
from savanna.plugins.hdp.versions import abstractversionhandler as avm
from savanna import version
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class VersionHandler(avm.AbstractVersionHandler):
config_provider = None
version = None
client = None
def _set_version(self, version):
self.version = version
def _get_config_provider(self):
if self.config_provider is None:
self.config_provider = cfg.ConfigurationProvider(
self.config_provider = cfgprov.ConfigurationProvider(
json.load(pkg.resource_stream(version.version_info.package,
'plugins/hdp/versions/1_3_2/resources/'
'ambari-config-resource.json')))
@ -50,7 +53,10 @@ class VersionHandler(avm.AbstractVersionHandler):
return self.version
def get_ambari_client(self):
return AmbariClient(self)
if not self.client:
self.client = AmbariClient(self)
return self.client
def get_config_items(self):
return self._get_config_provider().get_config_items()
@ -93,15 +99,38 @@ class VersionHandler(avm.AbstractVersionHandler):
class AmbariClient():
def __init__(self, handler):
# add an argument for neutron discovery
self.handler = handler
def _get_http_session(self, host, port):
return host.remote.get_http_client(port)
def _post(self, url, ambari_info, data=None):
session = self._get_http_session(ambari_info.host, ambari_info.port)
return session.post(url, data=data,
auth=(ambari_info.user, ambari_info.password))
def _delete(self, url, ambari_info):
session = self._get_http_session(ambari_info.host, ambari_info.port)
return session.delete(url,
auth=(ambari_info.user, ambari_info.password))
def _put(self, url, ambari_info, data=None):
session = self._get_http_session(ambari_info.host, ambari_info.port)
auth = (ambari_info.user, ambari_info.password)
return session.put(url, data=data, auth=auth)
def _get(self, url, ambari_info):
session = self._get_http_session(ambari_info.host, ambari_info.port)
return session.get(url, auth=(ambari_info.user, ambari_info.password))
def _add_cluster(self, ambari_info, name):
add_cluster_url = 'http://{0}/api/v1/clusters/{1}'.format(
ambari_info.get_address(), name)
result = requests.post(add_cluster_url,
data='{"Clusters": {"version" : "HDP-1.3.2"}}',
auth=(ambari_info.user, ambari_info.password))
result = self._post(add_cluster_url, ambari_info,
data='{"Clusters": {"version" : "HDP-1.3.2"}}')
if result.status_code != 201:
LOG.error('Create cluster command failed. %s' % result.text)
@ -115,8 +144,7 @@ class AmbariClient():
'Clusters/desired_configs'.format(
ambari_info.get_address(), name)
result = requests.get(existing_config_url, auth=(
ambari_info.user, ambari_info.password))
result = self._get(existing_config_url, ambari_info)
json_result = json.loads(result.text)
existing_configs = json_result['Clusters']['desired_configs']
@ -151,8 +179,7 @@ class AmbariClient():
config_body['tag'] = 'v%s' % version
config_body['properties'] = \
cluster_spec.configurations[config_name]
result = requests.put(config_url, data=json.dumps(body), auth=(
ambari_info.user, ambari_info.password))
result = self._put(config_url, ambari_info, data=json.dumps(body))
if result.status_code != 200:
LOG.error(
'Set configuration command failed. {0}'.format(
@ -166,9 +193,9 @@ class AmbariClient():
add_service_url = 'http://{0}/api/v1/clusters/{1}/services/{2}'
for service in services:
if service.deployed and service.name != 'AMBARI':
result = requests.post(add_service_url.format(
result = self._post(add_service_url.format(
ambari_info.get_address(), name, service.name),
auth=(ambari_info.user, ambari_info.password))
ambari_info)
if result.status_code not in [201, 409]:
LOG.error(
'Create service command failed. {0}'.format(
@ -182,10 +209,10 @@ class AmbariClient():
for service in cluster_spec.services:
if service.deployed and service.name != 'AMBARI':
for component in service.components:
result = requests.post(add_component_url.format(
result = self._post(add_component_url.format(
ambari_info.get_address(), name, service.name,
component.name),
auth=(ambari_info.user, ambari_info.password))
ambari_info)
if result.status_code not in [201, 409]:
LOG.error(
'Create component command failed. {0}'.format(
@ -202,9 +229,9 @@ class AmbariClient():
'/hosts/{2}/host_components/{3}'
for host in servers:
hostname = host.instance.fqdn.lower()
result = requests.post(
result = self._post(
add_host_url.format(ambari_info.get_address(), name, hostname),
auth=(ambari_info.user, ambari_info.password))
ambari_info)
if result.status_code != 201:
LOG.error(
'Create host command failed. {0}'.format(result.text))
@ -217,9 +244,9 @@ class AmbariClient():
for component in node_group.components:
# don't add any AMBARI components
if component.find('AMBARI') != 0:
result = requests.post(add_host_component_url.format(
result = self._post(add_host_component_url.format(
ambari_info.get_address(), name, hostname, component),
auth=(ambari_info.user, ambari_info.password))
ambari_info)
if result.status_code != 201:
LOG.error(
'Create host_component command failed. %s' %
@ -237,15 +264,14 @@ class AmbariClient():
body = '{"RequestInfo" : { "context" : "Install all services" },'\
'"Body" : {"ServiceInfo": {"state" : "INSTALLED"}}}'
result = requests.put(install_url, data=body, auth=(
ambari_info.user, ambari_info.password))
result = self._put(install_url, ambari_info, data=body)
if result.status_code == 202:
json_result = json.loads(result.text)
request_id = json_result['Requests']['id']
success = self._wait_for_async_request(self._get_async_request_uri(
ambari_info, cluster_name, request_id),
auth=(ambari_info.user, ambari_info.password))
ambari_info)
if success:
LOG.info("Install of Hadoop stack successful.")
self._finalize_ambari_state(ambari_info)
@ -265,10 +291,10 @@ class AmbariClient():
ambari_info.get_address(), cluster_name,
request_id)
def _wait_for_async_request(self, request_url, auth):
def _wait_for_async_request(self, request_url, ambari_info):
started = False
while not started:
result = requests.get(request_url, auth=auth)
result = self._get(request_url, ambari_info)
LOG.debug(
'async request ' + request_url + ' response:\n' + result.text)
json_result = json.loads(result.text)
@ -293,8 +319,7 @@ class AmbariClient():
# resource doesn't comply with Ambari API standards
persist_data = '{ "CLUSTER_CURRENT_STATUS":' \
'"{\\"clusterState\\":\\"CLUSTER_STARTED_5\\"}" }'
result = requests.post(persist_state_uri, data=persist_data,
auth=(ambari_info.user, ambari_info.password))
result = self._post(persist_state_uri, ambari_info, data=persist_data)
if result.status_code != 201 and result.status_code != 202:
LOG.warning('Finalizing of Ambari cluster state failed. {0}'.
@ -311,17 +336,15 @@ class AmbariClient():
body = '{"RequestInfo" : { "context" : "Start all services" },'\
'"Body" : {"ServiceInfo": {"state" : "STARTED"}}}'
auth = (ambari_info.user, ambari_info.password)
self._fire_service_start_notifications(
cluster_name, cluster_spec, ambari_info)
result = requests.put(start_url, data=body, auth=auth)
result = self._put(start_url, ambari_info, data=body)
if result.status_code == 202:
json_result = json.loads(result.text)
request_id = json_result['Requests']['id']
success = self._wait_for_async_request(
self._get_async_request_uri(ambari_info, cluster_name,
request_id),
auth=auth)
request_id), ambari_info)
if success:
LOG.info(
"Successfully started Hadoop cluster '{0}'.".format(
@ -338,20 +361,16 @@ class AmbariClient():
raise ex.HadoopProvisionError(
'Start of Hadoop services failed.')
def _get_rest_request(self):
return requests
def _exec_ambari_command(self, auth, body, cmd_uri):
def _exec_ambari_command(self, ambari_info, body, cmd_uri):
LOG.debug('PUT URI: {0}'.format(cmd_uri))
result = requests.put(cmd_uri, data=body,
auth=auth)
result = self._put(cmd_uri, ambari_info, data=body)
if result.status_code == 202:
LOG.debug(
'PUT response: {0}'.format(result.text))
json_result = json.loads(result.text)
href = json_result['href'] + '/tasks?fields=Tasks/status'
success = self._wait_for_async_request(href, auth)
success = self._wait_for_async_request(href, ambari_info)
if success:
LOG.info(
"Successfully changed state of Hadoop components ")
@ -393,7 +412,7 @@ class AmbariClient():
'HostRoles/host_name.in({2})'.format(
ambari_info.get_address(), cluster_name,
self._get_host_list(servers))
self._exec_ambari_command(auth, body, install_uri)
self._exec_ambari_command(ambari_info, body, install_uri)
def _start_components(self, ambari_info, auth, cluster_name, servers,
cluster_spec):
@ -404,7 +423,7 @@ class AmbariClient():
'HostRoles/host_name.in({2})'.format(
ambari_info.get_address(), cluster_name,
self._get_host_list(servers))
result = requests.get(installed_uri, auth=auth)
result = self._get(installed_uri, ambari_info)
if result.status_code == 200:
LOG.debug(
'GET response: {0}'.format(result.text))
@ -425,10 +444,10 @@ class AmbariClient():
'1}/host_components?HostRoles/state=INSTALLED&'\
'HostRoles/host_name.in({2})'\
'&HostRoles/component_name.in({3})'.format(
ambari_info.get_address(), cluster_name,
self._get_host_list(servers),
",".join(inclusion_list))
self._exec_ambari_command(auth, body, start_uri)
ambari_info.get_address(), cluster_name,
self._get_host_list(servers),
",".join(inclusion_list))
self._exec_ambari_command(ambari_info, body, start_uri)
else:
raise ex.HadoopProvisionError(
'Unable to determine installed service '
@ -447,8 +466,7 @@ class AmbariClient():
while result is None or len(json_result['items']) < num_hosts:
context.sleep(5)
try:
result = requests.get(url, auth=(ambari_info.user,
ambari_info.password))
result = self._get(url, ambari_info)
json_result = json.loads(result.text)
LOG.info('Registered Hosts: {0} of {1}'.format(
@ -467,9 +485,7 @@ class AmbariClient():
update_body = '{{"Users":{{"roles":"admin,user","password":"{0}",' \
'"old_password":"{1}"}} }}'.format(password, old_pwd)
request = self._get_rest_request()
result = request.put(user_url, data=update_body, auth=(
ambari_info.user, ambari_info.password))
result = self._put(user_url, ambari_info, data=update_body)
if result.status_code != 200:
raise ex.HadoopProvisionError('Unable to update Ambari admin user'
@ -483,9 +499,7 @@ class AmbariClient():
create_body = '{{"Users":{{"password":"{0}","roles":"{1}"}} }}'. \
format(user.password, '%s' % ','.join(map(str, user.groups)))
request = self._get_rest_request()
result = request.post(user_url, data=create_body, auth=(
ambari_info.user, ambari_info.password))
result = self._post(user_url, ambari_info, data=create_body)
if result.status_code != 201:
raise ex.HadoopProvisionError(
@ -495,9 +509,7 @@ class AmbariClient():
user_url = 'http://{0}/api/v1/users/{1}'.format(
ambari_info.get_address(), user_name)
request = self._get_rest_request()
result = request.delete(user_url, auth=(
ambari_info.user, ambari_info.password))
result = self._delete(user_url, ambari_info)
if result.status_code != 200:
raise ex.HadoopProvisionError(
@ -531,13 +543,15 @@ class AmbariClient():
self._install_services(name, ambari_info)
self.handler.install_swift_integration(servers)
def cleanup(self, ambari_info):
ambari_info.host.remote.close_http_sessions()
def _get_services_in_state(self, cluster_name, ambari_info, state):
services_url = 'http://{0}/api/v1/clusters/{1}/services?' \
'ServiceInfo/state.in({2})'.format(
ambari_info.get_address(), cluster_name, state)
result = requests.get(services_url, auth=(
ambari_info.user, ambari_info.password))
result = self._get(services_url, ambari_info)
json_result = json.loads(result.text)
services = []

View File

@ -29,6 +29,7 @@ class TestServer:
self.public_ip = public_ip
self.internal_ip = private_ip
self.node_group = None
self.remote = None
def get_instance_info(*args, **kwargs):

View File

@ -26,7 +26,7 @@ from savanna import version
GET_REST_REQ = "savanna.plugins.hdp.versions.1_3_2.versionhandler." \
"AmbariClient._get_rest_request"
"AmbariClient._get_http_session"
def create_cluster_template(ctx, dct):
@ -274,7 +274,7 @@ class AmbariPluginTest(unittest2.TestCase):
self.assertEqual('admin', ambari_info.user)
self.assertEqual('admin', ambari_info.password)
def _get_test_request(self):
def _get_test_request(self, host, port):
request = base.TestRequest()
self.requests.append(request)
return request

View File

@ -0,0 +1,68 @@
# Copyright (c) 2013 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from savanna.utils.openstack import neutron as neutron_client
import unittest2
class NeutronTest(unittest2.TestCase):
@mock.patch("neutronclient.neutron.client.Client")
def test_get_router(self, patched):
patched.side_effect = _test_get_neutron_client
neutron = neutron_client.Client('33b47310-b7a8-4559-bf95-45ba669a448e',
None, None, None)
self.assertEqual('6c4d4e32-3667-4cd4-84ea-4cc1e98d18be',
neutron.get_router())
def _test_get_neutron_client(api_version, *args, **kwargs):
return TestNeutron()
class TestNeutron():
def list_routers(self):
return {"routers": [{"status": "ACTIVE", "external_gateway_info": {
"network_id": "61f95d3f-495e-4409-8c29-0b806283c81e"},
"name": "router1", "admin_state_up": True,
"tenant_id": "903809ded3434f8d89948ee71ca9f5bb",
"routes": [],
"id": "6c4d4e32-3667-4cd4-84ea-4cc1e98d18be"}]}
def list_ports(self, device_id=None):
return {"ports": [
{"status": "ACTIVE", "name": "", "admin_state_up": True,
"network_id": "33b47310-b7a8-4559-bf95-45ba669a448e",
"tenant_id": "903809ded3434f8d89948ee71ca9f5bb",
"binding:vif_type": "ovs", "device_owner": "compute:None",
"binding:capabilities": {"port_filter": True},
"mac_address": "fa:16:3e:69:25:1c", "fixed_ips": [
{"subnet_id": "bfa9d0a1-9efb-4bff-bd2b-c103c053560f",
"ip_address": "10.0.0.8"}],
"id": "0f3df685-bc55-4314-9b76-835e1767b78f",
"security_groups": ["f9fee2a2-bb0b-44e4-8092-93a43dc45cda"],
"device_id": "c2129c18-6707-4f07-94cf-00b2fef8eea7"},
{"status": "ACTIVE", "name": "", "admin_state_up": True,
"network_id": "33b47310-b7a8-4559-bf95-45ba669a448e",
"tenant_id": "903809ded3434f8d89948ee71ca9f5bb",
"binding:vif_type": "ovs",
"device_owner": "network:router_interface",
"binding:capabilities": {"port_filter": True},
"mac_address": "fa:16:3e:c5:b0:cb", "fixed_ips": [
{"subnet_id": "bfa9d0a1-9efb-4bff-bd2b-c103c053560f",
"ip_address": "10.0.0.1"}],
"id": "27193ae1-142a-436c-ab41-c77b1df032a1",
"security_groups": [],
"device_id": "6c4d4e32-3667-4cd4-84ea-4cc1e98d18be"}]}

View File

@ -0,0 +1,190 @@
# Copyright (c) 2013 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import requests
from requests import adapters
from shlex import split as shlsplit
from eventlet.green.subprocess import Popen, PIPE
from neutronclient.neutron import client as neutron_cli
from savanna.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class Client():
neutron = None
adapters = {}
routers = {}
def __init__(self, network, uri, token, tenant_name):
self.neutron = neutron_cli.Client('2.0',
endpoint_url=uri,
token=token,
tenant_name=tenant_name)
self.network = network
def get_router(self):
matching_router = Client.routers.get(self.network, None)
if matching_router:
LOG.debug('Returning cached qrouter')
return matching_router['id']
routers = self.neutron.list_routers()['routers']
for router in routers:
device_id = router['id']
ports = self.neutron.list_ports(device_id=device_id)['ports']
port = next((port for port in ports
if port['network_id'] == self.network), None)
if port:
matching_router = router
Client.routers[self.network] = matching_router
break
if not matching_router:
raise RuntimeError('Neutron router not found corresponding to '
'network {0}'.format(self.network))
return matching_router['id']
def get_http_session(self, host, port=None):
session = requests.Session()
adapters = self._get_adapters(host, port=port)
for adapter in adapters:
session.mount('http://{0}:{1}'.format(host, adapter.port), adapter)
return session
def _get_adapters(self, host, port=None):
LOG.debug('Retrieving neutron adapters for {0}:{1}'.format(host, port))
adapters = []
if not port:
# returning all registered adapters for given host
adapters = [adapter for adapter in self.adapters
if adapter.host == host]
else:
# need to retrieve or create specific adapter
adapter = self.adapters.get((host, port), None)
if not adapter:
LOG.debug('Creating neutron adapter for {0}:{1}'
.format(host, port))
qrouter = self.get_router()
adapter = \
NeutronHttpAdapter(qrouter, host, port)
self.adapters[(host, port)] = adapter
adapters = [adapter]
return adapters
class NeutronHttpAdapter(adapters.HTTPAdapter):
port = None
host = None
def __init__(self, qrouter, host, port):
super(NeutronHttpAdapter, self).__init__()
command = 'ip netns exec qrouter-{0} nc {1} {2}'.format(qrouter,
host, port)
LOG.debug('Neutron adapter created with cmd {0}'.format(command))
self.cmd = shlsplit(command)
self.port = port
self.host = host
def get_connection(self, url, proxies=None):
pool_conn = \
super(NeutronHttpAdapter, self).get_connection(url, proxies)
if hasattr(pool_conn, '_get_conn'):
http_conn = pool_conn._get_conn()
if http_conn.sock is None:
if hasattr(http_conn, 'connect'):
sock = self._connect()
LOG.debug('HTTP connecction {0} getting new '
'netcat socket {1}'.format(http_conn, sock))
http_conn.sock = sock
else:
if hasattr(http_conn.sock, 'is_netcat_socket'):
LOG.debug('pooled http connection has existing '
'netcat socket. resetting pipe...')
http_conn.sock.reset()
pool_conn._put_conn(http_conn)
return pool_conn
def close(self):
LOG.debug('Closing neutron adapter for {0}:{1}'
.format(self.host, self.port))
super(NeutronHttpAdapter, self).close()
def _connect(self):
LOG.debug('returning netcat socket with command {0}'
.format(self.cmd))
return NetcatSocket(self.cmd)
class NetcatSocket:
def _create_process(self):
self.process = Popen(self.cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
def __init__(self, cmd):
self.cmd = cmd
self._create_process()
def send(self, content):
try:
self.process.stdin.write(content)
except IOError, e:
raise RuntimeError(' '.join(self.cmd), e.strerror)
return len(content)
def sendall(self, content):
return self.send(content)
def makefile(self, mode, *arg):
if mode.startswith('r'):
return self.process.stdout
if mode.startswith('w'):
return self.process.stdin
raise RuntimeError("Unknown mode", mode)
def recv(self, size):
try:
return os.read(self.process.stdout.fileno(), size)
except IOError, e:
raise RuntimeError(' '.join(self.cmd), e.strerror)
def _terminate(self):
self.process.terminate()
def close(self):
LOG.debug('Socket close called')
self._terminate()
def settimeout(self, timeout):
pass
def fileno(self):
return self.process.stdin.fileno()
def is_netcat_socket(self):
return True
def reset(self):
self._terminate()
self._create_process()

View File

@ -31,25 +31,28 @@ and eventlet together. The private high-level module methods are
implementations which are run in a separate process.
"""
import logging
import time
from eventlet import semaphore
from eventlet import timeout as e_timeout
import logging
from oslo.config import cfg
import paramiko
import requests
from savanna import context
from savanna import exceptions as ex
from savanna.openstack.common import excutils
from savanna.utils import crypto
from savanna.utils.openstack import base
from savanna.utils.openstack import neutron
from savanna.utils.openstack import nova
from savanna.utils import procutils
import time
LOG = logging.getLogger(__name__)
remote_opts = [
cfg.IntOpt('global_remote_threshold', default=100,
help='Maximum number of remote operations that will '
@ -65,16 +68,32 @@ CONF = cfg.CONF
CONF.register_opts(remote_opts)
_ssh = None
_sessions = {}
def _connect(host, username, private_key):
def _get_proxy(neutron_info):
client = neutron.Client(neutron_info['network'], neutron_info['uri'],
neutron_info['token'], neutron_info['tenant'])
qrouter = client.get_router()
proxy = paramiko.ProxyCommand('ip netns exec qrouter-{0} nc {1} 22'
.format(qrouter, neutron_info['host']))
return proxy
def _connect(host, username, private_key, neutron_info=None):
global _ssh
LOG.debug('Creating SSH connection')
proxy = None
if type(private_key) in [str, unicode]:
private_key = crypto.to_paramiko_private_key(private_key)
_ssh = paramiko.SSHClient()
_ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
_ssh.connect(host, username=username, pkey=private_key)
if neutron_info:
LOG.debug('creating proxy using info: {0}'.format(neutron_info))
proxy = _get_proxy(neutron_info)
_ssh.connect(host, username=username, pkey=private_key, sock=proxy)
def _cleanup():
@ -114,6 +133,38 @@ def _execute_command(cmd, get_stderr=False, raise_when_error=True):
return ret_code, stdout
def _get_http_client(host, port, neutron_info):
global _sessions
_http_session = _sessions.get((host, port), None)
LOG.debug('cached HTTP session for {0}:{1} is {2}'.format(host, port,
_http_session))
if not _http_session:
if neutron_info:
neutron_client = neutron.Client(neutron_info['network'],
neutron_info['uri'],
neutron_info['token'],
neutron_info['tenant'])
# can return a new session here because it actually uses
# the same adapter (and same connection pools) for a given
# host and port tuple
_http_session = neutron_client.get_http_session(host, port=port)
LOG.debug('created neutron based HTTP session for {0}:{1}'
.format(host, port))
else:
# need to cache the session for the non-neutron or neutron
# floating ip cases so that a new session with a new HTTPAdapter
# and associated pools is not recreated for each HTTP invocation
_http_session = requests.Session()
LOG.debug('created standard HTTP session for {0}:{1}'
.format(host, port))
LOG.debug('caching session {0} for {1}:{2}'
.format(_http_session, host, port))
_sessions[(host, port)] = _http_session
return _http_session
def _write_file(sftp, remote_file, data):
fl = sftp.file(remote_file, 'w')
fl.write(data)
@ -157,7 +208,9 @@ def _execute_on_vm_interactive(cmd, matcher):
buf = ''
channel = _ssh.invoke_shell()
LOG.debug('channel is {0}'.format(channel))
try:
LOG.debug('sending cmd {0}'.format(cmd))
channel.send(cmd + '\n')
while not matcher.is_eof(buf):
buf += channel.recv(4096)
@ -166,6 +219,7 @@ def _execute_on_vm_interactive(cmd, matcher):
channel.send(response + '\n')
buf = ''
finally:
LOG.debug('closing channel')
channel.close()
@ -209,9 +263,25 @@ class InstanceInteropHelper(object):
finally:
_release_remote_semaphore()
def _get_neutron_info(self):
neutron_info = HashableDict()
neutron_info['network'] = \
self.instance.node_group.cluster.neutron_management_network
ctx = context.current()
neutron_info['uri'] = base.url_for(ctx.service_catalog, 'network')
neutron_info['token'] = ctx.token
neutron_info['tenant'] = ctx.tenant_name
neutron_info['host'] = self.instance.management_ip
LOG.debug('Returning neutron info: {0}'.format(neutron_info))
return neutron_info
def _get_conn_params(self):
info = None
if CONF.use_namespaces and not CONF.use_floating_ips:
info = self._get_neutron_info()
return (self.instance.management_ip, self.username,
self.instance.node_group.cluster.management_private_key)
self.instance.node_group.cluster.management_private_key, info)
def _run(self, func, *args, **kwargs):
proc = procutils.start_subprocess()
@ -242,6 +312,21 @@ class InstanceInteropHelper(object):
finally:
_release_remote_semaphore()
def get_http_client(self, port):
self._log_command('Retrieving http session for {0}:{1}'
.format(self.instance.management_ip, port))
info = None
if CONF.use_namespaces and not CONF.use_floating_ips:
info = self._get_neutron_info()
return _get_http_client(self.instance.management_ip, port, info)
def close_http_sessions(self):
global _sessions
LOG.debug('closing host related http sessions')
for session in _sessions.values():
session.close()
def execute_command(self, cmd, get_stderr=False, raise_when_error=True,
timeout=300):
"""Execute specified command remotely using existing ssh connection.
@ -323,3 +408,8 @@ class BulkInstanceInteropHelper(InstanceInteropHelper):
def _run_s(self, func, timeout, *args, **kwargs):
return self._run_with_log(func, timeout, *args, **kwargs)
class HashableDict(dict):
def __hash__(self):
return hash((frozenset(self), frozenset(self.itervalues())))

View File

@ -4,6 +4,7 @@ DEFAULT.log_exchange
DEFAULT.job_binary_max_KB
DEFAULT.use_floating_ips
DEFAULT.use_neutron
DEFAULT.use_namespaces
DEFAULT.os_auth_protocol
DEFAULT.os_auth_host