Merge "Allow nvp_api to load balance requests"

This commit is contained in:
Jenkins 2013-01-09 16:22:00 +00:00 committed by Gerrit Code Review
commit 928d869311
11 changed files with 660 additions and 591 deletions

View File

@ -36,8 +36,8 @@ class NVPApiHelper(client_eventlet.NvpApiClientEventlet):
''' '''
def __init__(self, api_providers, user, password, request_timeout, def __init__(self, api_providers, user, password, request_timeout,
http_timeout, retries, redirects, failover_time, http_timeout, retries, redirects,
concurrent_connections=3): concurrent_connections=3, nvp_gen_timeout=-1):
'''Constructor. '''Constructor.
:param api_providers: a list of tuples in the form: :param api_providers: a list of tuples in the form:
@ -53,12 +53,10 @@ class NVPApiHelper(client_eventlet.NvpApiClientEventlet):
controller in the cluster) controller in the cluster)
:param retries: the number of concurrent connections. :param retries: the number of concurrent connections.
:param redirects: the number of concurrent connections. :param redirects: the number of concurrent connections.
:param failover_time: minimum time between controller failover and new
connections allowed.
''' '''
client_eventlet.NvpApiClientEventlet.__init__( client_eventlet.NvpApiClientEventlet.__init__(
self, api_providers, user, password, concurrent_connections, self, api_providers, user, password, concurrent_connections,
failover_time=failover_time) nvp_gen_timeout)
self._request_timeout = request_timeout self._request_timeout = request_timeout
self._http_timeout = http_timeout self._http_timeout = http_timeout
@ -84,7 +82,7 @@ class NVPApiHelper(client_eventlet.NvpApiClientEventlet):
if password: if password:
self._password = password self._password = password
return client_eventlet.NvpApiClientEventlet.login(self) return client_eventlet.NvpApiClientEventlet._login(self)
def request(self, method, url, body="", content_type="application/json"): def request(self, method, url, body="", content_type="application/json"):
'''Issues request to controller.''' '''Issues request to controller.'''

View File

@ -27,7 +27,6 @@ import webob.exc
# FIXME(salvatore-orlando): get rid of relative imports # FIXME(salvatore-orlando): get rid of relative imports
from common import config from common import config
from quantum.plugins.nicira.nicira_nvp_plugin.api_client import client_eventlet
from nvp_plugin_version import PLUGIN_VERSION from nvp_plugin_version import PLUGIN_VERSION
from quantum.plugins.nicira.nicira_nvp_plugin import nicira_models from quantum.plugins.nicira.nicira_nvp_plugin import nicira_models
@ -157,12 +156,11 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
http_timeout=cluster.http_timeout, http_timeout=cluster.http_timeout,
retries=cluster.retries, retries=cluster.retries,
redirects=cluster.redirects, redirects=cluster.redirects,
failover_time=self.nvp_opts.failover_time, concurrent_connections=self.nvp_opts['concurrent_connections'],
concurrent_connections=self.nvp_opts.concurrent_connections) nvp_gen_timeout=self.nvp_opts['nvp_gen_timeout'])
# TODO(salvatore-orlando): do login at first request, if len(self.clusters) == 0:
# not when plugin is instantiated first_cluster = cluster
cluster.api_client.login()
self.clusters[c_opts['name']] = cluster self.clusters[c_opts['name']] = cluster
def_cluster_name = self.nvp_opts.default_cluster_name def_cluster_name = self.nvp_opts.default_cluster_name

View File

@ -21,8 +21,8 @@ NVP Plugin configuration
bridged transport zone (default 64) bridged transport zone (default 64)
- concurrent_connections: Number of connects to each controller node - concurrent_connections: Number of connects to each controller node
(default 3) (default 3)
- failover_time: Time from when a connection pool is switched to another - nvp_gen_timout: Number of seconds a generation id should be valid for
controller during failures. (default -1 meaning do not time out)
3) NVP cluster 3) NVP cluster
The Quantum NVP plugin allow for configuring multiple clusters. The Quantum NVP plugin allow for configuring multiple clusters.
Each cluster configuration section must be declared in the following way Each cluster configuration section must be declared in the following way

View File

@ -1,4 +1,4 @@
# Copyright 2012 Nicira Networks, Inc. # Copyright 2012 Nicira, Inc.
# All Rights Reserved # All Rights Reserved
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -14,3 +14,5 @@
# under the License. # under the License.
# #
# vim: tabstop=4 shiftwidth=4 softtabstop=4 # vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# @author: Aaron Rosen, Nicira Networks, Inc.

View File

@ -1,25 +1,40 @@
# Copyright 2009-2012 Nicira Networks, Inc. # Copyright 2012 Nicira, Inc.
# All Rights Reserved # All Rights Reserved
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
# a copy of the License at # a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
# #
# vim: tabstop=4 shiftwidth=4 softtabstop=4 # vim: tabstop=4 shiftwidth=4 softtabstop=4
# #
# Author: David Lapsley <dlapsley@nicira.com>, Nicira Networks, Inc. # @author: David Lapsley <dlapsley@nicira.com>, Nicira Networks, Inc.
# @author: Aaron Rosen, Nicira Networks, Inc.
from abc import ABCMeta from abc import ABCMeta
from abc import abstractmethod import httplib
from abc import abstractproperty import time
import logging
from quantum.plugins.nicira.nicira_nvp_plugin.api_client.common import (
_conn_str)
logging.basicConfig(level=logging.INFO)
LOG = logging.getLogger(__name__)
#Default parameters.
GENERATION_ID_TIMEOUT = -1
DEFAULT_CONCURRENT_CONNECTIONS = 3
DEFAULT_CONNECT_TIMEOUT = 5
class NvpApiClient(object): class NvpApiClient(object):
@ -33,38 +48,230 @@ class NvpApiClient(object):
CONN_IDLE_TIMEOUT = 60 * 15 CONN_IDLE_TIMEOUT = 60 * 15
@abstractmethod def _create_connection(self, host, port, is_ssl):
def update_providers(self, api_providers): if is_ssl:
pass return httplib.HTTPSConnection(host, port,
timeout=self._connect_timeout)
return httplib.HTTPConnection(host, port,
timeout=self._connect_timeout)
@abstractproperty @staticmethod
def _conn_params(http_conn):
is_ssl = isinstance(http_conn, httplib.HTTPSConnection)
return (http_conn.host, http_conn.port, is_ssl)
@property
def user(self): def user(self):
pass return self._user
@abstractproperty @property
def password(self): def password(self):
pass return self._password
@abstractproperty @property
def auth_cookie(self): def nvp_config_gen(self):
pass # If nvp_gen_timeout is not -1 then:
# Maintain a timestamp along with the generation ID. Hold onto the
# ID long enough to be useful and block on sequential requests but
# not long enough to persist when Onix db is cleared, which resets
# the generation ID, causing the DAL to block indefinitely with some
# number that's higher than the cluster's value.
if self._nvp_gen_timeout != -1:
ts = self._nvp_config_gen_ts
if ts is not None:
if (time.time() - ts) > self._nvp_gen_timeout:
return None
return self._nvp_config_gen
@abstractmethod @nvp_config_gen.setter
def acquire_connection(self): def nvp_config_gen(self, value):
pass if self._nvp_config_gen != value:
if self._nvp_gen_timeout != -1:
self._nvp_config_gen_ts = time.time()
self._nvp_config_gen = value
@abstractmethod def auth_cookie(self, conn):
def release_connection(self, http_conn, bad_state=False): cookie = None
pass data = self._get_provider_data(conn)
if data:
cookie = data[1]
return cookie
@abstractproperty def set_auth_cookie(self, conn, cookie):
def need_login(self): data = self._get_provider_data(conn)
pass if data:
self._set_provider_data(conn, (data[0], cookie))
@abstractmethod def acquire_connection(self, auto_login=True, headers=None, rid=-1):
def wait_for_login(self): '''Check out an available HTTPConnection instance.
pass
@abstractmethod Blocks until a connection is available.
def login(self): :auto_login: automatically logins before returning conn
pass :headers: header to pass on to login attempt
:param rid: request id passed in from request eventlet.
:returns: An available HTTPConnection instance or None if no
api_providers are configured.
'''
if not self._api_providers:
LOG.warn(_("[%d] no API providers currently available."), rid)
return None
if self._conn_pool.empty():
LOG.debug(_("[%d] Waiting to acquire API client connection."), rid)
priority, conn = self._conn_pool.get()
now = time.time()
if getattr(conn, 'last_used', now) < now - self.CONN_IDLE_TIMEOUT:
LOG.info(_("[%d] Connection %s idle for %0.2f seconds; "
"reconnecting."),
rid, _conn_str(conn), now - conn.last_used)
conn = self._create_connection(*self._conn_params(conn))
conn.last_used = now
conn.priority = priority # stash current priority for release
qsize = self._conn_pool.qsize()
LOG.debug(_("[%d] Acquired connection %s. %d connection(s) "
"available."), rid, _conn_str(conn), qsize)
if auto_login and self.auth_cookie(conn) is None:
self._wait_for_login(conn, headers)
return conn
def release_connection(self, http_conn, bad_state=False,
service_unavail=False, rid=-1):
'''Mark HTTPConnection instance as available for check-out.
:param http_conn: An HTTPConnection instance obtained from this
instance.
:param bad_state: True if http_conn is known to be in a bad state
(e.g. connection fault.)
:service_unavail: True if http_conn returned 503 response.
:param rid: request id passed in from request eventlet.
'''
conn_params = self._conn_params(http_conn)
if self._conn_params(http_conn) not in self._api_providers:
LOG.debug(_("[%d] Released connection '%s' is not an API provider "
"for the cluster"), rid, _conn_str(http_conn))
return
elif hasattr(http_conn, "no_release"):
return
if bad_state:
# Reconnect to provider.
LOG.warn(_("[%d] Connection returned in bad state, reconnecting "
"to %s"), rid, _conn_str(http_conn))
http_conn = self._create_connection(*self._conn_params(http_conn))
priority = self._next_conn_priority
self._next_conn_priority += 1
elif service_unavail:
# http_conn returned a service unaviable response, put other
# connections to the same controller at end of priority queue,
conns = []
while not self._conn_pool.empty():
priority, conn = self._conn_pool.get()
if self._conn_params(conn) == conn_params:
priority = self._next_conn_priority
self._next_conn_priority += 1
conns.append((priority, conn))
for priority, conn in conns:
self._conn_pool.put((priority, conn))
# put http_conn at end of queue also
priority = self._next_conn_priority
self._next_conn_priority += 1
else:
priority = http_conn.priority
self._conn_pool.put((priority, http_conn))
LOG.debug(_("[%d] Released connection %s. %d connection(s) "
"available."),
rid, _conn_str(http_conn), self._conn_pool.qsize())
def _wait_for_login(self, conn, headers=None):
'''Block until a login has occurred for the current API provider.'''
data = self._get_provider_data(conn)
if data is None:
LOG.error(_("Login request for an invalid connection: '%s'"),
_conn_str(conn))
return
provider_sem = data[0]
if provider_sem.acquire(blocking=False):
try:
cookie = self._login(conn, headers)
self.set_auth_cookie(conn, cookie)
finally:
provider_sem.release()
else:
LOG.debug(_("Waiting for auth to complete"))
# Wait until we can aquire then release
provider_sem.acquire(blocking=True)
provider_sem.release()
def _get_provider_data(self, conn_or_conn_params, default=None):
"""Get data for specified API provider.
Args:
conn_or_conn_params: either a HTTP(S)Connection object or the
resolved conn_params tuple returned by self._conn_params().
default: conn_params if ones passed aren't known
Returns: Data associated with specified provider
"""
conn_params = self._normalize_conn_params(conn_or_conn_params)
return self._api_provider_data.get(conn_params, default)
def _set_provider_data(self, conn_or_conn_params, data):
"""Set data for specified API provider.
Args:
conn_or_conn_params: either a HTTP(S)Connection object or the
resolved conn_params tuple returned by self._conn_params().
data: data to associate with API provider
"""
conn_params = self._normalize_conn_params(conn_or_conn_params)
if data is None:
del self._api_provider_data[conn_params]
else:
self._api_provider_data[conn_params] = data
def _normalize_conn_params(self, conn_or_conn_params):
"""Normalize conn_param tuple.
Args:
conn_or_conn_params: either a HTTP(S)Connection object or the
resolved conn_params tuple returned by self._conn_params().
Returns: Normalized conn_param tuple
"""
if (not isinstance(conn_or_conn_params, tuple) and
not isinstance(conn_or_conn_params, httplib.HTTPConnection)):
LOG.debug(_("Invalid conn_params value: '%s'"),
str(conn_or_conn_params))
return conn_or_conn_params
if isinstance(conn_or_conn_params, httplib.HTTPConnection):
conn_params = self._conn_params(conn_or_conn_params)
else:
conn_params = conn_or_conn_params
host, port, is_ssl = conn_params
if port is None:
port = 443 if is_ssl else 80
return (host, port, is_ssl)
def update_providers(self, api_providers):
new_providers = set([tuple(p) for p in api_providers])
if new_providers != self._api_providers:
new_conns = []
while not self._conn_pool.empty():
priority, conn = self._conn_pool.get_nowait()
if self._conn_params(conn) in new_providers:
new_conns.append((priority, conn))
to_subtract = self._api_providers - new_providers
for p in to_subtract:
self._set_provider_data(p, None)
to_add = new_providers - self._api_providers
for unused_i in range(self._concurrent_connections):
for host, port, is_ssl in to_add:
conn = self._create_connection(host, port, is_ssl)
new_conns.append((self._next_conn_priority, conn))
self._next_conn_priority += 1
for priority, conn in new_conns:
self._conn_pool.put((priority, conn))
self._api_providers = new_providers

View File

@ -1,53 +1,44 @@
# Copyright 2009-2012 Nicira Networks, Inc. # Copyright 2012 Nicira, Inc.
# All Rights Reserved # All Rights Reserved
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
# a copy of the License at # a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
# #
# vim: tabstop=4 shiftwidth=4 softtabstop=4 # vim: tabstop=4 shiftwidth=4 softtabstop=4
# #
# @author: Aaron Rosen, Nicira Networks, Inc.
import client
import eventlet import eventlet
import httplib
import logging import logging
import request_eventlet
import time import time
from common import _conn_str from quantum.plugins.nicira.nicira_nvp_plugin.api_client import client
from quantum.plugins.nicira.nicira_nvp_plugin.api_client import (
request_eventlet)
eventlet.monkey_patch() eventlet.monkey_patch()
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
# Default parameters.
DEFAULT_FAILOVER_TIME = 5
DEFAULT_CONCURRENT_CONNECTIONS = 3
DEFAULT_CONNECT_TIMEOUT = 5
GENERATION_ID_TIMEOUT = -1 # if set to -1 then disabled
class NvpApiClientEventlet(client.NvpApiClient):
class NvpApiClientEventlet(object):
'''Eventlet-based implementation of NvpApiClient ABC.''' '''Eventlet-based implementation of NvpApiClient ABC.'''
CONN_IDLE_TIMEOUT = 60 * 15
def __init__(self, api_providers, user, password, def __init__(self, api_providers, user, password,
concurrent_connections=DEFAULT_CONCURRENT_CONNECTIONS, concurrent_connections=client.DEFAULT_CONCURRENT_CONNECTIONS,
use_https=True, nvp_gen_timeout=client.GENERATION_ID_TIMEOUT, use_https=True,
connect_timeout=DEFAULT_CONNECT_TIMEOUT, connect_timeout=client.DEFAULT_CONNECT_TIMEOUT):
failover_time=DEFAULT_FAILOVER_TIME,
nvp_gen_timeout=GENERATION_ID_TIMEOUT):
'''Constructor '''Constructor
:param api_providers: a list of tuples of the form: (host, port, :param api_providers: a list of tuples of the form: (host, port,
@ -57,242 +48,112 @@ class NvpApiClientEventlet(object):
:param concurrent_connections: total number of concurrent connections. :param concurrent_connections: total number of concurrent connections.
:param use_https: whether or not to use https for requests. :param use_https: whether or not to use https for requests.
:param connect_timeout: connection timeout in seconds. :param connect_timeout: connection timeout in seconds.
:param failover_time: time from when a connection pool is switched to
the next connection released via acquire_connection().
:param nvp_gen_timeout controls how long the generation id is kept :param nvp_gen_timeout controls how long the generation id is kept
if set to -1 the generation id is never timed out if set to -1 the generation id is never timed out
''' '''
if not api_providers: if not api_providers:
api_providers = [] api_providers = []
self._api_providers = set([tuple(p) for p in api_providers]) self._api_providers = set([tuple(p) for p in api_providers])
self._api_provider_data = {} # tuple(semaphore, nvp_session_cookie)
for p in self._api_providers:
self._set_provider_data(p, (eventlet.semaphore.Semaphore(1), None))
self._user = user self._user = user
self._password = password self._password = password
self._concurrent_connections = concurrent_connections self._concurrent_connections = concurrent_connections
self._use_https = use_https self._use_https = use_https
self._connect_timeout = connect_timeout self._connect_timeout = connect_timeout
self._failover_time = failover_time
self._nvp_config_gen = None self._nvp_config_gen = None
self._nvp_config_gen_ts = None self._nvp_config_gen_ts = None
self._nvp_gen_timeout = nvp_gen_timeout self._nvp_gen_timeout = nvp_gen_timeout
# Connection pool is a list of queues. # Connection pool is a list of queues.
self._conn_pool = list() self._conn_pool = eventlet.queue.PriorityQueue()
conn_pool_idx = 0 self._next_conn_priority = 1
for host, port, is_ssl in api_providers: for host, port, is_ssl in api_providers:
provider_conn_pool = eventlet.queue.Queue(
maxsize=concurrent_connections)
for i in range(concurrent_connections): for i in range(concurrent_connections):
# All connections in a provider_conn_poool have the
# same priority (they connect to the same server).
conn = self._create_connection(host, port, is_ssl) conn = self._create_connection(host, port, is_ssl)
conn.idx = conn_pool_idx self._conn_pool.put((self._next_conn_priority, conn))
provider_conn_pool.put(conn) self._next_conn_priority += 1
self._conn_pool.append(provider_conn_pool) def acquire_redirect_connection(self, conn_params, auto_login=True,
conn_pool_idx += 1 headers=None):
"""Check out or create connection to redirected NVP API server.
self._active_conn_pool_idx = 0 Args:
conn_params: tuple specifying target of redirect, see
self._conn_params()
auto_login: returned connection should have valid session cookie
headers: headers to pass on if auto_login
self._cookie = None Returns: An available HTTPConnection instance corresponding to the
self._need_login = True specified conn_params. If a connection did not previously
self._doing_login_sem = eventlet.semaphore.Semaphore(1) exist, new connections are created with the highest prioity
in the connection pool and one of these new connections
returned.
"""
result_conn = None
data = self._get_provider_data(conn_params)
if data:
# redirect target already exists in provider data and connections
# to the provider have been added to the connection pool. Try to
# obtain a connection from the pool, note that it's possible that
# all connection to the provider are currently in use.
conns = []
while not self._conn_pool.empty():
priority, conn = self._conn_pool.get_nowait()
if not result_conn and self._conn_params(conn) == conn_params:
conn.priority = priority
result_conn = conn
else:
conns.append((priority, conn))
for priority, conn in conns:
self._conn_pool.put((priority, conn))
# hack: if no free connections available, create new connection
# and stash "no_release" attribute (so that we only exceed
# self._concurrent_connections temporarily)
if not result_conn:
conn = self._create_connection(*conn_params)
conn.priority = 0 # redirect connections ahve highest priority
conn.no_release = True
result_conn = conn
else:
#redirect target not already known, setup provider lists
self._api_providers.update([conn_params])
self._set_provider_data(conn_params,
(eventlet.semaphore.Semaphore(1), None))
# redirects occur during cluster upgrades, i.e. results to old
# redirects to new, so give redirect targets highest priority
priority = 0
for i in range(self._concurrent_connections):
conn = self._create_connection(*conn_params)
conn.priority = priority
if i == self._concurrent_connections - 1:
break
self._conn_pool.put((priority, conn))
result_conn = conn
if result_conn:
result_conn.last_used = time.time()
if auto_login and self.auth_cookie(conn) is None:
self._wait_for_login(result_conn, headers)
return result_conn
def _create_connection(self, host, port, is_ssl): def _login(self, conn=None, headers=None):
if is_ssl:
return httplib.HTTPSConnection(host, port,
timeout=self._connect_timeout)
return httplib.HTTPConnection(host, port,
timeout=self._connect_timeout)
@staticmethod
def _conn_params(http_conn):
is_ssl = isinstance(http_conn, httplib.HTTPSConnection)
return (http_conn.host, http_conn.port, is_ssl)
def update_providers(self, api_providers):
raise Exception(_('update_providers() not implemented.'))
@property
def user(self):
return self._user
@property
def password(self):
return self._password
@property
def nvp_config_gen(self):
# If nvp_gen_timeout is not -1 then:
# Maintain a timestamp along with the generation ID. Hold onto the
# ID long enough to be useful and block on sequential requests but
# not long enough to persist when Onix db is cleared, which resets
# the generation ID, causing the DAL to block indefinitely with some
# number that's higher than the cluster's value.
if self._nvp_gen_timeout != -1:
ts = self._nvp_config_gen_ts
if ts is not None:
if (time.time() - ts) > self._nvp_gen_timeout:
return None
return self._nvp_config_gen
@nvp_config_gen.setter
def nvp_config_gen(self, value):
if self._nvp_config_gen != value:
if self._nvp_gen_timeout != -1:
self._nvp_config_gen_ts = time.time()
self._nvp_config_gen = value
@property
def auth_cookie(self):
return self._cookie
def acquire_connection(self, rid=-1):
'''Check out an available HTTPConnection instance.
Blocks until a connection is available.
:param rid: request id passed in from request eventlet.
:returns: An available HTTPConnection instance or None if no
api_providers are configured.
'''
if not self._api_providers:
LOG.warn(_("[%d] no API providers currently available."), rid)
return None
# The sleep time is to give controllers time to become consistent after
# there has been a change in the controller used as the api_provider.
now = time.time()
if now < getattr(self, '_issue_conn_barrier', now):
LOG.warn(_("[%d] Waiting for failover timer to expire."), rid)
time.sleep(self._issue_conn_barrier - now)
# Print out a warning if all connections are in use.
if self._conn_pool[self._active_conn_pool_idx].empty():
LOG.debug(_("[%d] Waiting to acquire client connection."), rid)
# Try to acquire a connection (block in get() until connection
# available or timeout occurs).
active_conn_pool_idx = self._active_conn_pool_idx
conn = self._conn_pool[active_conn_pool_idx].get()
if active_conn_pool_idx != self._active_conn_pool_idx:
# active_conn_pool became inactive while we were waiting.
# Put connection back on old pool and try again.
LOG.warn(_("[%(rid)d] Active pool expired while waiting for "
"connection: %(conn)s"),
{'rid': rid, 'conn': _conn_str(conn)})
self._conn_pool[active_conn_pool_idx].put(conn)
return self.acquire_connection(rid=rid)
# Check if the connection has been idle too long.
now = time.time()
if getattr(conn, 'last_used', now) < now - self.CONN_IDLE_TIMEOUT:
LOG.info(_("[%(rid)d] Connection %(conn)s idle for %(sec)0.2f "
"seconds; reconnecting."),
{'rid': rid, 'conn': _conn_str(conn),
'sec': now - conn.last_used})
conn = self._create_connection(*self._conn_params(conn))
# Stash conn pool so conn knows where to go when it releases.
conn.idx = self._active_conn_pool_idx
conn.last_used = now
qsize = self._conn_pool[self._active_conn_pool_idx].qsize()
LOG.debug(_("[%(rid)d] Acquired connection %(conn)s. %(qsize)d "
"connection(s) available."),
{'rid': rid, 'conn': _conn_str(conn), 'qsize': qsize})
return conn
def release_connection(self, http_conn, bad_state=False, rid=-1):
'''Mark HTTPConnection instance as available for check-out.
:param http_conn: An HTTPConnection instance obtained from this
instance.
:param bad_state: True if http_conn is known to be in a bad state
(e.g. connection fault.)
:param rid: request id passed in from request eventlet.
'''
if self._conn_params(http_conn) not in self._api_providers:
LOG.warn(_("[%(rid)d] Released connection '%(conn)s' is not an "
"API provider for the cluster"),
{'rid': rid, 'conn': _conn_str(http_conn)})
return
# Retrieve "home" connection pool.
conn_pool_idx = http_conn.idx
conn_pool = self._conn_pool[conn_pool_idx]
if bad_state:
# Reconnect to provider.
LOG.warn(_("[%(rid)d] Connection returned in bad state, "
"reconnecting to %(conn)s"),
{'rid': rid, 'conn': _conn_str(http_conn)})
http_conn = self._create_connection(*self._conn_params(http_conn))
http_conn.idx = conn_pool_idx
if self._active_conn_pool_idx == http_conn.idx:
# This pool is no longer in a good state. Switch to next pool.
self._active_conn_pool_idx += 1
self._active_conn_pool_idx %= len(self._conn_pool)
LOG.warn(_("[%(rid)d] Switched active_conn_pool from "
"%(idx)d to %(pool_idx)d."),
{'rid': rid, 'idx': http_conn.idx,
'pool_idx': self._active_conn_pool_idx})
# No connections to the new provider allowed until after this
# timer has expired (allow time for synchronization).
self._issue_conn_barrier = time.time() + self._failover_time
conn_pool.put(http_conn)
LOG.debug(_("[%(rid)d] Released connection %(conn)s. "
"%(qsize)d connection(s) available."),
{'rid': rid, 'conn': _conn_str(http_conn),
'qsize': conn_pool.qsize()})
@property
def need_login(self):
return self._need_login
@need_login.setter
def need_login(self, val=True):
self._need_login = val
def wait_for_login(self):
'''Block until a login has occurred for the current API provider.'''
if self._need_login:
if self._doing_login_sem.acquire(blocking=False):
self.login()
self._doing_login_sem.release()
else:
LOG.debug(_("Waiting for auth to complete"))
self._doing_login_sem.acquire()
self._doing_login_sem.release()
return self._cookie
def login(self):
'''Issue login request and update authentication cookie.''' '''Issue login request and update authentication cookie.'''
cookie = None
g = request_eventlet.NvpLoginRequestEventlet( g = request_eventlet.NvpLoginRequestEventlet(
self, self._user, self._password) self, self._user, self._password, conn, headers)
g.start() g.start()
ret = g.join() ret = g.join()
if ret: if ret:
if isinstance(ret, Exception): if isinstance(ret, Exception):
LOG.error(_('NvpApiClient: login error "%s"'), ret) LOG.error(_('NvpApiClient: login error "%s"'), ret)
raise ret raise ret
self._cookie = None
cookie = ret.getheader("Set-Cookie") cookie = ret.getheader("Set-Cookie")
if cookie: if cookie:
LOG.debug(_("Saving new authentication cookie '%s'"), cookie) LOG.debug(_("Saving new authentication cookie '%s'"), cookie)
self._cookie = cookie
self._need_login = False
# TODO: or ret is an error.
if not ret:
return None
return self._cookie
return cookie
# Register as subclass. # Register as subclass.
client.NvpApiClient.register(NvpApiClientEventlet) client.NvpApiClient.register(NvpApiClientEventlet)

View File

@ -1,23 +1,24 @@
# Copyright 2009-2012 Nicira Networks, Inc. # Copyright 2012 Nicira, Inc.
# All Rights Reserved # All Rights Reserved
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
# a copy of the License at # a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
# #
# vim: tabstop=4 shiftwidth=4 softtabstop=4 # vim: tabstop=4 shiftwidth=4 softtabstop=4
# #
# @author: Aaron Rosen, Nicira Networks, Inc.
import httplib import httplib
import mock
def _conn_str(conn): def _conn_str(conn):
@ -25,8 +26,6 @@ def _conn_str(conn):
proto = "https://" proto = "https://"
elif isinstance(conn, httplib.HTTPConnection): elif isinstance(conn, httplib.HTTPConnection):
proto = "http://" proto = "http://"
elif isinstance(conn, mock.Mock):
proto = "http://"
else: else:
raise TypeError(_('_conn_str() invalid connection type: %s') % raise TypeError(_('_conn_str() invalid connection type: %s') %
type(conn)) type(conn))

View File

@ -1,27 +1,49 @@
# Copyright 2009-2012 Nicira Networks, Inc. # Copyright 2012 Nicira, Inc.
# All Rights Reserved # All Rights Reserved
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
# a copy of the License at # a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
# #
# vim: tabstop=4 shiftwidth=4 softtabstop=4 # vim: tabstop=4 shiftwidth=4 softtabstop=4
# #
# @author: Aaron Rosen, Nicira Networks, Inc.
from abc import ABCMeta from abc import ABCMeta
from abc import abstractmethod from abc import abstractmethod
from abc import abstractproperty import copy
import httplib
import logging
import time
import urlparse
from quantum.plugins.nicira.nicira_nvp_plugin.api_client.common import (
_conn_str)
logging.basicConfig(level=logging.INFO)
LOG = logging.getLogger(__name__)
# Default parameters.
DEFAULT_REQUEST_TIMEOUT = 30
DEFAULT_HTTP_TIMEOUT = 10
DEFAULT_RETRIES = 2
DEFAULT_REDIRECTS = 2
DEFAULT_API_REQUEST_POOL_SIZE = 1000
DEFAULT_MAXIMUM_REQUEST_ID = 4294967295
DOWNLOAD_TIMEOUT = 180 # The UI code has a coorespoind 190 sec timeout
# for downloads, see: django/nvp_console/views.py
class NvpApiRequest: class NvpApiRequest(object):
'''An abstract baseclass for all ApiRequest implementations. '''An abstract baseclass for all ApiRequest implementations.
This defines the interface and property structure for both eventlet and This defines the interface and property structure for both eventlet and
@ -30,6 +52,22 @@ class NvpApiRequest:
__metaclass__ = ABCMeta __metaclass__ = ABCMeta
# List of allowed status codes.
ALLOWED_STATUS_CODES = [
httplib.OK,
httplib.CREATED,
httplib.NO_CONTENT,
httplib.MOVED_PERMANENTLY,
httplib.TEMPORARY_REDIRECT,
httplib.BAD_REQUEST,
httplib.UNAUTHORIZED,
httplib.FORBIDDEN,
httplib.NOT_FOUND,
httplib.CONFLICT,
httplib.INTERNAL_SERVER_ERROR,
httplib.SERVICE_UNAVAILABLE
]
@abstractmethod @abstractmethod
def start(self): def start(self):
pass pass
@ -42,6 +80,200 @@ class NvpApiRequest:
def copy(self): def copy(self):
pass pass
@abstractproperty def _issue_request(self):
'''Issue a request to a provider.'''
conn = (self._client_conn or
self._api_client.acquire_connection(True,
copy.copy(self._headers),
rid=self._rid()))
if conn is None:
error = Exception("No API connections available")
self._request_error = error
return error
url = self._url
LOG.debug(_("[%d] Issuing - request '%s'"),
self._rid(), self._request_str(conn, url))
issued_time = time.time()
is_conn_error = False
is_conn_service_unavail = False
try:
redirects = 0
while (redirects <= self._redirects):
# Update connection with user specified request timeout,
# the connect timeout is usually smaller so we only set
# the request timeout after a connection is established
if conn.sock is None:
conn.connect()
conn.sock.settimeout(self._http_timeout)
elif conn.sock.gettimeout() != self._http_timeout:
conn.sock.settimeout(self._http_timeout)
headers = copy.copy(self._headers)
cookie = self._api_client.auth_cookie(conn)
if cookie:
headers["Cookie"] = cookie
gen = self._api_client.nvp_config_gen
if gen:
headers["X-Nvp-Wait-For-Config-Generation"] = gen
LOG.debug(_("Setting %s request header: '%s'"),
'X-Nvp-Wait-For-Config-Generation', gen)
try:
conn.request(self._method, url, self._body, headers)
except Exception as e:
LOG.warn(_("[%d] Exception issuing request: '%s'"),
self._rid(), e)
raise e
response = conn.getresponse()
response.body = response.read()
response.headers = response.getheaders()
LOG.debug(_("[%d] Completed request '%s': %s (%0.2f seconds)"),
self._rid(), self._request_str(conn, url),
response.status, time.time() - issued_time)
new_gen = response.getheader('X-Nvp-Config-Generation', None)
if new_gen:
LOG.debug(_("Reading '%s' response header: '%s'"),
'X-Nvp-config-Generation', new_gen)
if (self._api_client.nvp_config_gen is None or
self._api_client.nvp_config_gen < int(new_gen)):
self._api_client.nvp_config_gen = int(new_gen)
if response.status == httplib.UNAUTHORIZED:
if cookie is None and self._url != "/ws.v1/login":
# The connection still has no valid cookie despite
# attemps to authenticate and the request has failed
# with unauthorized status code. If this isn't a
# a request to authenticate, we should abort the
# request since there is no point in retrying.
self._abort = True
else:
# If request is unauthorized, clear the session cookie
# for the current provider so that subsequent requests
# to the same provider triggers re-authentication.
self._api_client.set_auth_cookie(conn, None)
self._api_client.set_auth_cookie(conn, None)
elif response.status == httplib.SERVICE_UNAVAILABLE:
is_conn_service_unavail = True
if response.status not in [httplib.MOVED_PERMANENTLY,
httplib.TEMPORARY_REDIRECT]:
break
elif redirects >= self._redirects:
LOG.info(_("[%d] Maximum redirects exceeded, aborting "
"request"), self._rid())
break
redirects += 1
conn, url = self._redirect_params(conn, response.headers,
self._client_conn is None)
if url is None:
response.status = httplib.INTERNAL_SERVER_ERROR
break
LOG.info(_("[%d] Redirecting request to: '%s'"),
self._rid(), self._request_str(conn, url))
# If we receive any of these responses, then
# our server did not process our request and may be in an
# errored state. Raise an exception, which will cause the
# the conn to be released with is_conn_error == True
# which puts the conn on the back of the client's priority
# queue.
if response.status >= 500:
LOG.warn(_("[%d] Request '%s %s' received: %s"),
self._rid(), self._method, self._url,
response.status)
raise Exception('Server error return: %s' %
response.status)
return response
except Exception as e:
if isinstance(e, httplib.BadStatusLine):
msg = "Invalid server response"
else:
msg = unicode(e)
LOG.warn(_("[%d] Failed request '%s': '%s' (%0.2f seconds)"),
self._rid(), self._request_str(conn, url), msg,
time.time() - issued_time)
self._request_error = e
is_conn_error = True
return e
finally:
# Make sure we release the original connection provided by the
# acquire_connection() call above.
if self._client_conn is None:
self._api_client.release_connection(conn, is_conn_error,
is_conn_service_unavail,
rid=self._rid())
def _redirect_params(self, conn, headers, allow_release_conn=False):
"""Process redirect response, create new connection if necessary.
Args:
conn: connection that returned the redirect response
headers: response headers of the redirect response
allow_release_conn: if redirecting to a different server,
release existing connection back to connection pool.
Returns: Return tuple(conn, url) where conn is a connection object
to the redirect target and url is the path of the API request
"""
url = None
for name, value in headers:
if name.lower() == "location":
url = value
break
if not url:
LOG.warn(_("[%d] Received redirect status without location header"
" field"), self._rid())
return (conn, None)
# Accept location with the following format:
# 1. /path, redirect to same node
# 2. scheme://hostname:[port]/path where scheme is https or http
# Reject others
# 3. e.g. relative paths, unsupported scheme, unspecified host
result = urlparse.urlparse(url)
if not result.scheme and not result.hostname and result.path:
if result.path[0] == "/":
if result.query:
url = "%s?%s" % (result.path, result.query)
else:
url = result.path
return (conn, url) # case 1
else:
LOG.warn(_("[%d] Received invalid redirect location: '%s'"),
self._rid(), url)
return (conn, None) # case 3
elif result.scheme not in ["http", "https"] or not result.hostname:
LOG.warn(_("[%d] Received malformed redirect location: %s"),
self._rid(), url)
return (conn, None) # case 3
# case 2, redirect location includes a scheme
# so setup a new connection and authenticate
if allow_release_conn:
self._api_client.release_connection(conn)
conn_params = (result.hostname, result.port, result.scheme == "https")
conn = self._api_client.acquire_redirect_connection(conn_params, True,
self._headers)
if result.query:
url = "%s?%s" % (result.path, result.query)
else:
url = result.path
return (conn, url)
def _rid(self):
'''Return current request id.'''
return self._request_id
@property
def request_error(self): def request_error(self):
pass '''Return any errors associated with this instance.'''
return self._request_error
def _request_str(self, conn, url):
'''Return string representation of connection.'''
return "%s %s/%s" % (self._method, _conn_str(conn), url)

View File

@ -1,75 +1,46 @@
# Copyright 2009-2012 Nicira Networks, Inc. # Copyright 2012 Nicira, Inc.
# All Rights Reserved # All Rights Reserved
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
# a copy of the License at # a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
# #
# vim: tabstop=4 shiftwidth=4 softtabstop=4 # vim: tabstop=4 shiftwidth=4 softtabstop=4
# #
# @author: Aaron Rosen, Nicira Networks, Inc.
import copy
import eventlet import eventlet
import httplib import httplib
import json import json
import logging import logging
import request
import time
import urllib import urllib
import urlparse
import client_eventlet from quantum.plugins.nicira.nicira_nvp_plugin.api_client import request
from common import _conn_str
from eventlet import timeout
eventlet.monkey_patch() eventlet.monkey_patch()
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
lg = logging.getLogger("nvp_api_request") LOG = logging.getLogger(__name__)
USER_AGENT = "NVP eventlet client/1.0" USER_AGENT = "NVP eventlet client/1.0"
# Default parameters.
DEFAULT_REQUEST_TIMEOUT = 30
DEFAULT_HTTP_TIMEOUT = 10
DEFAULT_RETRIES = 2
DEFAULT_REDIRECTS = 2
DEFAULT_API_REQUEST_POOL_SIZE = 1000
DEFAULT_MAXIMUM_REQUEST_ID = 4294967295
class NvpApiRequestEventlet(request.NvpApiRequest):
class NvpApiRequestEventlet:
'''Eventlet-based ApiRequest class. '''Eventlet-based ApiRequest class.
This class will form the basis for eventlet-based ApiRequest classes This class will form the basis for eventlet-based ApiRequest classes
(e.g. those used by the Quantum NVP Plugin). (e.g. those used by the Quantum NVP Plugin).
''' '''
# List of allowed status codes.
ALLOWED_STATUS_CODES = [
httplib.OK,
httplib.CREATED,
httplib.NO_CONTENT,
httplib.MOVED_PERMANENTLY,
httplib.TEMPORARY_REDIRECT,
httplib.BAD_REQUEST,
httplib.UNAUTHORIZED,
httplib.FORBIDDEN,
httplib.NOT_FOUND,
httplib.CONFLICT,
httplib.INTERNAL_SERVER_ERROR,
httplib.SERVICE_UNAVAILABLE
]
# Maximum number of green threads present in the system at one time. # Maximum number of green threads present in the system at one time.
API_REQUEST_POOL_SIZE = DEFAULT_API_REQUEST_POOL_SIZE API_REQUEST_POOL_SIZE = request.DEFAULT_API_REQUEST_POOL_SIZE
# Pool of green threads. One green thread is allocated per incoming # Pool of green threads. One green thread is allocated per incoming
# request. Incoming requests will block when the pool is empty. # request. Incoming requests will block when the pool is empty.
@ -77,18 +48,18 @@ class NvpApiRequestEventlet:
# A unique id is assigned to each incoming request. When the current # A unique id is assigned to each incoming request. When the current
# request id reaches MAXIMUM_REQUEST_ID it wraps around back to 0. # request id reaches MAXIMUM_REQUEST_ID it wraps around back to 0.
MAXIMUM_REQUEST_ID = DEFAULT_MAXIMUM_REQUEST_ID MAXIMUM_REQUEST_ID = request.DEFAULT_MAXIMUM_REQUEST_ID
# The request id for the next incoming request. # The request id for the next incoming request.
CURRENT_REQUEST_ID = 0 CURRENT_REQUEST_ID = 0
def __init__(self, nvp_api_client, url, method="GET", body=None, def __init__(self, nvp_api_client, url, method="GET", body=None,
headers=None, headers=None,
request_timeout=DEFAULT_REQUEST_TIMEOUT, request_timeout=request.DEFAULT_REQUEST_TIMEOUT,
retries=DEFAULT_RETRIES, retries=request.DEFAULT_RETRIES,
auto_login=True, auto_login=True,
redirects=DEFAULT_REDIRECTS, redirects=request.DEFAULT_REDIRECTS,
http_timeout=DEFAULT_HTTP_TIMEOUT): http_timeout=request.DEFAULT_HTTP_TIMEOUT, client_conn=None):
'''Constructor.''' '''Constructor.'''
self._api_client = nvp_api_client self._api_client = nvp_api_client
self._url = url self._url = url
@ -100,6 +71,8 @@ class NvpApiRequestEventlet:
self._auto_login = auto_login self._auto_login = auto_login
self._redirects = redirects self._redirects = redirects
self._http_timeout = http_timeout self._http_timeout = http_timeout
self._client_conn = client_conn
self._abort = False
self._request_error = None self._request_error = None
@ -126,10 +99,6 @@ class NvpApiRequestEventlet:
'''Spawn a new green thread with the supplied function and args.''' '''Spawn a new green thread with the supplied function and args.'''
return self.__class__._spawn(func, *args, **kwargs) return self.__class__._spawn(func, *args, **kwargs)
def _rid(self):
'''Return current request id.'''
return self._request_id
@classmethod @classmethod
def joinall(cls): def joinall(cls):
'''Wait for all outstanding requests to complete.''' '''Wait for all outstanding requests to complete.'''
@ -152,197 +121,19 @@ class NvpApiRequestEventlet:
self._headers, self._request_timeout, self._retries, self._headers, self._request_timeout, self._retries,
self._auto_login, self._redirects, self._http_timeout) self._auto_login, self._redirects, self._http_timeout)
@property
def request_error(self):
'''Return any errors associated with this instance.'''
return self._request_error
def _run(self): def _run(self):
'''Method executed within green thread.''' '''Method executed within green thread.'''
if self._request_timeout: if self._request_timeout:
# No timeout exception escapes the with block. # No timeout exception escapes the with block.
with timeout.Timeout(self._request_timeout, False): with eventlet.timeout.Timeout(self._request_timeout, False):
return self._handle_request() return self._handle_request()
lg.info(_('[%d] Request timeout.'), self._rid()) LOG.info(_('[%d] Request timeout.'), self._rid())
self._request_error = Exception(_('Request timeout')) self._request_error = Exception(_('Request timeout'))
return None return None
else: else:
return self._handle_request() return self._handle_request()
def _request_str(self, conn, url):
'''Return string representation of connection.'''
return "%s %s/%s" % (self._method, _conn_str(conn), url)
def _issue_request(self):
'''Issue a request to a provider.'''
conn = self._api_client.acquire_connection(rid=self._rid())
if conn is None:
error = Exception(_("No API connections available"))
self._request_error = error
return error
# Preserve the acquired connection as conn may be over-written by
# redirects below.
acquired_conn = conn
url = self._url
lg.debug(_("[%(rid)d] Issuing - request '%(req)s'"),
{'rid': self._rid(),
'req': self._request_str(conn, url)})
issued_time = time.time()
is_conn_error = False
try:
redirects = 0
while (redirects <= self._redirects):
# Update connection with user specified request timeout,
# the connect timeout is usually smaller so we only set
# the request timeout after a connection is established
if conn.sock is None:
conn.connect()
conn.sock.settimeout(self._http_timeout)
elif conn.sock.gettimeout() != self._http_timeout:
conn.sock.settimeout(self._http_timeout)
headers = copy.copy(self._headers)
gen = self._api_client.nvp_config_gen
if gen:
headers["X-Nvp-Wait-For-Config-Generation"] = gen
lg.debug(_("Setting %(header)s request header: %(gen)s"),
{'header': 'X-Nvp-Wait-For-Config-Generation',
'gen': gen})
try:
conn.request(self._method, url, self._body, headers)
except Exception as e:
lg.warn(_('[%(rid)d] Exception issuing request: %(e)s'),
{'rid': self._rid(), 'e': e})
raise e
response = conn.getresponse()
response.body = response.read()
response.headers = response.getheaders()
lg.debug(_("[%(rid)d] Completed request '%(req)s': %(status)s "
"(%(time)0.2f seconds)"),
{'rid': self._rid(),
'req': self._request_str(conn, url),
'status': response.status,
'time': time.time() - issued_time})
new_gen = response.getheader('X-Nvp-Config-Generation', None)
if new_gen:
lg.debug(_("Reading %(header)s response header: %(gen)s"),
{'header': 'X-Nvp-config-Generation',
'gen': new_gen})
if (self._api_client.nvp_config_gen is None or
self._api_client.nvp_config_gen < int(new_gen)):
self._api_client.nvp_config_gen = int(new_gen)
if response.status not in [httplib.MOVED_PERMANENTLY,
httplib.TEMPORARY_REDIRECT]:
break
elif redirects >= self._redirects:
lg.info(_("[%d] Maximum redirects exceeded, aborting "
"request"), self._rid())
break
redirects += 1
# In the following call, conn is replaced by the connection
# specified in the redirect response from the server.
conn, url = self._redirect_params(conn, response.headers)
if url is None:
response.status = httplib.INTERNAL_SERVER_ERROR
break
lg.info(_("[%(rid)d] Redirecting request to: %(req)s"),
{'rid': self._rid(),
'req': self._request_str(conn, url)})
# FIX for #9415. If we receive any of these responses, then
# our server did not process our request and may be in an
# errored state. Raise an exception, which will cause the
# the conn to be released with is_conn_error == True
# which puts the conn on the back of the client's priority
# queue.
if response.status >= 500:
lg.warn(_("[%(rid)d] Request '%(method)s %(url)s' "
"received: %(status)s"),
{'rid': self._rid(), 'method': self._method,
'url': self._url,
'status': response.status})
raise Exception(_('Server error return: %s') %
response.status)
return response
except Exception as e:
if isinstance(e, httplib.BadStatusLine):
msg = _("Invalid server response")
else:
msg = unicode(e)
lg.warn(_("[%(rid)d] Failed request '%(req)s': %(msg)s "
"(%(time)0.2f seconds)"),
{'rid': self._rid(), 'req': self._request_str(conn, url),
'msg': msg,
'time': time.time() - issued_time})
self._request_error = e
is_conn_error = True
return e
finally:
# Make sure we release the original connection provided by the
# acquire_connection() call above.
self._api_client.release_connection(acquired_conn, is_conn_error,
rid=self._rid())
def _redirect_params(self, conn, headers):
'''Process redirect params from a server response.'''
url = None
for name, value in headers:
if name.lower() == "location":
url = value
break
if not url:
lg.warn(_("[%d] Received redirect status without location header "
"field"), self._rid())
return (conn, None)
# Accept location with the following format:
# 1. /path, redirect to same node
# 2. scheme://hostname:[port]/path where scheme is https or http
# Reject others
# 3. e.g. relative paths, unsupported scheme, unspecified host
result = urlparse.urlparse(url)
if not result.scheme and not result.hostname and result.path:
if result.path[0] == "/":
if result.query:
url = "%s?%s" % (result.path, result.query)
else:
url = result.path
return (conn, url) # case 1
else:
lg.warn(_("[%(rid)d] Received invalid redirect location: "
"%(url)s"),
{'rid': self._rid(), 'url': url})
return (conn, None) # case 3
elif result.scheme not in ["http", "https"] or not result.hostname:
lg.warn(_("[%(rid)d] Received malformed redirect location: "
"%(url)s"),
{'rid': self._rid(), 'url': url})
return (conn, None) # case 3
# case 2, redirect location includes a scheme
# so setup a new connection and authenticate
use_https = result.scheme == "https"
api_providers = [(result.hostname, result.port, use_https)]
api_client = client_eventlet.NvpApiClientEventlet(
api_providers, self._api_client.user, self._api_client.password,
use_https=use_https)
api_client.wait_for_login()
if api_client.auth_cookie:
self._headers["Cookie"] = api_client.auth_cookie
else:
self._headers["Cookie"] = ""
conn = api_client.acquire_connection(rid=self._rid())
if result.query:
url = "%s?%s" % (result.path, result.query)
else:
url = result.path
return (conn, url)
def _handle_request(self): def _handle_request(self):
'''First level request handling.''' '''First level request handling.'''
attempt = 0 attempt = 0
@ -350,46 +141,41 @@ class NvpApiRequestEventlet:
while response is None and attempt <= self._retries: while response is None and attempt <= self._retries:
attempt += 1 attempt += 1
if self._auto_login and self._api_client.need_login:
self._api_client.wait_for_login()
if self._api_client.auth_cookie:
self._headers["Cookie"] = self._api_client.auth_cookie
req = self.spawn(self._issue_request).wait() req = self.spawn(self._issue_request).wait()
# automatically raises any exceptions returned. # automatically raises any exceptions returned.
if isinstance(req, httplib.HTTPResponse): if isinstance(req, httplib.HTTPResponse):
if (req.status == httplib.UNAUTHORIZED if attempt <= self._retries and not self._abort:
if (req.status == httplib.UNAUTHORIZED
or req.status == httplib.FORBIDDEN): or req.status == httplib.FORBIDDEN):
self._api_client.need_login = True
if attempt <= self._retries:
continue continue
# else fall through to return the error code # else fall through to return the error code
lg.debug(_("[%(rid)d] Completed request '%(method)s %(url)s'" LOG.debug(_("[%(rid)d] Completed request '%(method)s %(url)s'"
": %(status)s"), ": %(status)s"),
{'rid': self._rid(), 'method': self._method, {'rid': self._rid(), 'method': self._method,
'url': self._url, 'status': req.status}) 'url': self._url, 'status': req.status})
self._request_error = None self._request_error = None
response = req response = req
else: else:
lg.info(_('[%(rid)d] Error while handling request: %(req)s'), LOG.info(_('[%(rid)d] Error while handling request: %(req)s'),
{'rid': self._rid(), 'req': req}) {'rid': self._rid(), 'req': req})
self._request_error = req self._request_error = req
response = None response = None
return response return response
class NvpLoginRequestEventlet(NvpApiRequestEventlet): class NvpLoginRequestEventlet(NvpApiRequestEventlet):
'''Process a login request.''' '''Process a login request.'''
def __init__(self, nvp_client, user, password): def __init__(self, nvp_client, user, password, client_conn=None,
headers = {"Content-Type": "application/x-www-form-urlencoded"} headers=None):
if headers is None:
headers = {}
headers.update({"Content-Type": "application/x-www-form-urlencoded"})
body = urllib.urlencode({"username": user, "password": password}) body = urllib.urlencode({"username": user, "password": password})
NvpApiRequestEventlet.__init__( NvpApiRequestEventlet.__init__(
self, nvp_client, "/ws.v1/login", "POST", body, headers, self, nvp_client, "/ws.v1/login", "POST", body, headers,
auto_login=False) auto_login=False, client_conn=client_conn)
def session_cookie(self): def session_cookie(self):
if self.successful(): if self.successful():
@ -398,7 +184,7 @@ class NvpLoginRequestEventlet(NvpApiRequestEventlet):
class NvpGetApiProvidersRequestEventlet(NvpApiRequestEventlet): class NvpGetApiProvidersRequestEventlet(NvpApiRequestEventlet):
'''Get a list of API providers.''' '''Gej a list of API providers.'''
def __init__(self, nvp_client): def __init__(self, nvp_client):
url = "/ws.v1/control-cluster/node?fields=roles" url = "/ws.v1/control-cluster/node?fields=roles"
@ -427,8 +213,8 @@ class NvpGetApiProvidersRequestEventlet(NvpApiRequestEventlet):
ret.append(_provider_from_listen_addr(addr)) ret.append(_provider_from_listen_addr(addr))
return ret return ret
except Exception as e: except Exception as e:
lg.warn(_("[%(rid)d] Failed to parse API provider: %(e)s"), LOG.warn(_("[%(rid)d] Failed to parse API provider: %(e)s"),
{'rid': self._rid(), 'e': e}) {'rid': self._rid(), 'e': e})
# intentionally fall through # intentionally fall through
return None return None
@ -438,12 +224,11 @@ class NvpGenericRequestEventlet(NvpApiRequestEventlet):
def __init__(self, nvp_client, method, url, body, content_type, def __init__(self, nvp_client, method, url, body, content_type,
auto_login=False, auto_login=False,
request_timeout=DEFAULT_REQUEST_TIMEOUT, request_timeout=request.DEFAULT_REQUEST_TIMEOUT,
http_timeout=DEFAULT_HTTP_TIMEOUT, http_timeout=request.DEFAULT_HTTP_TIMEOUT,
retries=DEFAULT_RETRIES, retries=request.DEFAULT_RETRIES,
redirects=DEFAULT_REDIRECTS): redirects=request.DEFAULT_REDIRECTS):
headers = {"Content-Type": content_type} headers = {"Content-Type": content_type}
NvpApiRequestEventlet.__init__( NvpApiRequestEventlet.__init__(
self, nvp_client, url, method, body, headers, self, nvp_client, url, method, body, headers,
request_timeout=request_timeout, retries=retries, request_timeout=request_timeout, retries=retries,

View File

@ -21,7 +21,7 @@ nvp_opts = [
cfg.IntOpt('max_lp_per_bridged_ls', default=64), cfg.IntOpt('max_lp_per_bridged_ls', default=64),
cfg.IntOpt('max_lp_per_overlay_ls', default=256), cfg.IntOpt('max_lp_per_overlay_ls', default=256),
cfg.IntOpt('concurrent_connections', default=5), cfg.IntOpt('concurrent_connections', default=5),
cfg.IntOpt('failover_time', default=240), cfg.IntOpt('nvp_gen_timeout', default=-1),
cfg.StrOpt('default_cluster_name') cfg.StrOpt('default_cluster_name')
] ]

View File

@ -134,7 +134,6 @@ class NvpApiRequestEventletTest(unittest.TestCase):
myconn.__str__.return_value = 'myconn string' myconn.__str__.return_value = 'myconn string'
req = self.req req = self.req
req._request_timeout = REQUEST_TIMEOUT = 1
req._redirect_params = Mock() req._redirect_params = Mock()
req._redirect_params.return_value = (myconn, 'url') req._redirect_params.return_value = (myconn, 'url')
req._request_str = Mock() req._request_str = Mock()
@ -214,60 +213,48 @@ class NvpApiRequestEventletTest(unittest.TestCase):
with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.' with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
'client_eventlet.NvpApiClientEventlet') as mock: 'client_eventlet.NvpApiClientEventlet') as mock:
api_client = mock.return_value api_client = mock.return_value
api_client.wait_for_login.return_value = None self.req._api_client = api_client
api_client.auth_cookie = 'mycookie'
api_client.acquire_connection.return_value = True
myconn = Mock() myconn = Mock()
(conn, retval) = self.req._redirect_params( (conn, retval) = self.req._redirect_params(
myconn, [('location', 'https://host:1/path')]) myconn, [('location', 'https://host:1/path')])
self.assertTrue(retval is not None) self.assertTrue(retval is not None)
self.assertTrue(api_client.wait_for_login.called) self.assertTrue(api_client.acquire_redirect_connection.called)
self.assertTrue(api_client.acquire_connection.called)
def test_redirect_params_setup_htttps_and_query(self): def test_redirect_params_setup_htttps_and_query(self):
with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.' with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
'client_eventlet.NvpApiClientEventlet') as mock: 'client_eventlet.NvpApiClientEventlet') as mock:
api_client = mock.return_value api_client = mock.return_value
api_client.wait_for_login.return_value = None self.req._api_client = api_client
api_client.auth_cookie = 'mycookie'
api_client.acquire_connection.return_value = True
myconn = Mock() myconn = Mock()
(conn, retval) = self.req._redirect_params(myconn, [ (conn, retval) = self.req._redirect_params(myconn, [
('location', 'https://host:1/path?q=1')]) ('location', 'https://host:1/path?q=1')])
self.assertTrue(retval is not None) self.assertTrue(retval is not None)
self.assertTrue(api_client.wait_for_login.called) self.assertTrue(api_client.acquire_redirect_connection.called)
self.assertTrue(api_client.acquire_connection.called)
def test_redirect_params_setup_https_connection_no_cookie(self): def test_redirect_params_setup_https_connection_no_cookie(self):
with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.' with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
'client_eventlet.NvpApiClientEventlet') as mock: 'client_eventlet.NvpApiClientEventlet') as mock:
api_client = mock.return_value api_client = mock.return_value
api_client.wait_for_login.return_value = None self.req._api_client = api_client
api_client.auth_cookie = None
api_client.acquire_connection.return_value = True
myconn = Mock() myconn = Mock()
(conn, retval) = self.req._redirect_params(myconn, [ (conn, retval) = self.req._redirect_params(myconn, [
('location', 'https://host:1/path')]) ('location', 'https://host:1/path')])
self.assertTrue(retval is not None) self.assertTrue(retval is not None)
self.assertTrue(api_client.wait_for_login.called) self.assertTrue(api_client.acquire_redirect_connection.called)
self.assertTrue(api_client.acquire_connection.called)
def test_redirect_params_setup_https_and_query_no_cookie(self): def test_redirect_params_setup_https_and_query_no_cookie(self):
with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.' with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
'client_eventlet.NvpApiClientEventlet') as mock: 'client_eventlet.NvpApiClientEventlet') as mock:
api_client = mock.return_value api_client = mock.return_value
api_client.wait_for_login.return_value = None self.req._api_client = api_client
api_client.auth_cookie = None
api_client.acquire_connection.return_value = True
myconn = Mock() myconn = Mock()
(conn, retval) = self.req._redirect_params( (conn, retval) = self.req._redirect_params(
myconn, [('location', 'https://host:1/path?q=1')]) myconn, [('location', 'https://host:1/path?q=1')])
self.assertTrue(retval is not None) self.assertTrue(retval is not None)
self.assertTrue(api_client.wait_for_login.called) self.assertTrue(api_client.acquire_redirect_connection.called)
self.assertTrue(api_client.acquire_connection.called)
def test_redirect_params_path_only_with_query(self): def test_redirect_params_path_only_with_query(self):
with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.' with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'