
The majority of relative imports in the nvp plugin will be fixed in blueprint nvp-api-client-loadbalance-request. This patch removes the rest of them in and moves the vim tabstop line to the top of the header file to be consistent with the rest of the files in quantum. Fixes bug 1091596 Change-Id: I6c8299ca73ae5df75c87f302680908f863f81f02
278 lines
11 KiB
Python
278 lines
11 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2012 Nicira, Inc.
|
|
# All Rights Reserved
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
# @author: David Lapsley <dlapsley@nicira.com>, Nicira Networks, Inc.
|
|
# @author: Aaron Rosen, Nicira Networks, Inc.
|
|
|
|
|
|
from abc import ABCMeta
|
|
import httplib
|
|
import time
|
|
import logging
|
|
|
|
|
|
from quantum.plugins.nicira.nicira_nvp_plugin.api_client.common import (
|
|
_conn_str)
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
LOG = logging.getLogger(__name__)
|
|
#Default parameters.
|
|
GENERATION_ID_TIMEOUT = -1
|
|
DEFAULT_CONCURRENT_CONNECTIONS = 3
|
|
DEFAULT_CONNECT_TIMEOUT = 5
|
|
|
|
|
|
class NvpApiClient(object):
|
|
'''An abstract baseclass for all NvpApiClient implementations.
|
|
|
|
This defines the interface and property structure for synchronous and
|
|
coroutine-based classes.
|
|
'''
|
|
|
|
__metaclass__ = ABCMeta
|
|
|
|
CONN_IDLE_TIMEOUT = 60 * 15
|
|
|
|
def _create_connection(self, host, port, is_ssl):
|
|
if is_ssl:
|
|
return httplib.HTTPSConnection(host, port,
|
|
timeout=self._connect_timeout)
|
|
return httplib.HTTPConnection(host, port,
|
|
timeout=self._connect_timeout)
|
|
|
|
@staticmethod
|
|
def _conn_params(http_conn):
|
|
is_ssl = isinstance(http_conn, httplib.HTTPSConnection)
|
|
return (http_conn.host, http_conn.port, is_ssl)
|
|
|
|
@property
|
|
def user(self):
|
|
return self._user
|
|
|
|
@property
|
|
def password(self):
|
|
return self._password
|
|
|
|
@property
|
|
def nvp_config_gen(self):
|
|
# If nvp_gen_timeout is not -1 then:
|
|
# Maintain a timestamp along with the generation ID. Hold onto the
|
|
# ID long enough to be useful and block on sequential requests but
|
|
# not long enough to persist when Onix db is cleared, which resets
|
|
# the generation ID, causing the DAL to block indefinitely with some
|
|
# number that's higher than the cluster's value.
|
|
if self._nvp_gen_timeout != -1:
|
|
ts = self._nvp_config_gen_ts
|
|
if ts is not None:
|
|
if (time.time() - ts) > self._nvp_gen_timeout:
|
|
return None
|
|
return self._nvp_config_gen
|
|
|
|
@nvp_config_gen.setter
|
|
def nvp_config_gen(self, value):
|
|
if self._nvp_config_gen != value:
|
|
if self._nvp_gen_timeout != -1:
|
|
self._nvp_config_gen_ts = time.time()
|
|
self._nvp_config_gen = value
|
|
|
|
def auth_cookie(self, conn):
|
|
cookie = None
|
|
data = self._get_provider_data(conn)
|
|
if data:
|
|
cookie = data[1]
|
|
return cookie
|
|
|
|
def set_auth_cookie(self, conn, cookie):
|
|
data = self._get_provider_data(conn)
|
|
if data:
|
|
self._set_provider_data(conn, (data[0], cookie))
|
|
|
|
def acquire_connection(self, auto_login=True, headers=None, rid=-1):
|
|
'''Check out an available HTTPConnection instance.
|
|
|
|
Blocks until a connection is available.
|
|
:auto_login: automatically logins before returning conn
|
|
:headers: header to pass on to login attempt
|
|
:param rid: request id passed in from request eventlet.
|
|
:returns: An available HTTPConnection instance or None if no
|
|
api_providers are configured.
|
|
'''
|
|
if not self._api_providers:
|
|
LOG.warn(_("[%d] no API providers currently available."), rid)
|
|
return None
|
|
if self._conn_pool.empty():
|
|
LOG.debug(_("[%d] Waiting to acquire API client connection."), rid)
|
|
priority, conn = self._conn_pool.get()
|
|
now = time.time()
|
|
if getattr(conn, 'last_used', now) < now - self.CONN_IDLE_TIMEOUT:
|
|
LOG.info(_("[%d] Connection %s idle for %0.2f seconds; "
|
|
"reconnecting."),
|
|
rid, _conn_str(conn), now - conn.last_used)
|
|
conn = self._create_connection(*self._conn_params(conn))
|
|
|
|
conn.last_used = now
|
|
conn.priority = priority # stash current priority for release
|
|
qsize = self._conn_pool.qsize()
|
|
LOG.debug(_("[%d] Acquired connection %s. %d connection(s) "
|
|
"available."), rid, _conn_str(conn), qsize)
|
|
if auto_login and self.auth_cookie(conn) is None:
|
|
self._wait_for_login(conn, headers)
|
|
return conn
|
|
|
|
def release_connection(self, http_conn, bad_state=False,
|
|
service_unavail=False, rid=-1):
|
|
'''Mark HTTPConnection instance as available for check-out.
|
|
|
|
:param http_conn: An HTTPConnection instance obtained from this
|
|
instance.
|
|
:param bad_state: True if http_conn is known to be in a bad state
|
|
(e.g. connection fault.)
|
|
:service_unavail: True if http_conn returned 503 response.
|
|
:param rid: request id passed in from request eventlet.
|
|
'''
|
|
conn_params = self._conn_params(http_conn)
|
|
if self._conn_params(http_conn) not in self._api_providers:
|
|
LOG.debug(_("[%d] Released connection '%s' is not an API provider "
|
|
"for the cluster"), rid, _conn_str(http_conn))
|
|
return
|
|
elif hasattr(http_conn, "no_release"):
|
|
return
|
|
|
|
if bad_state:
|
|
# Reconnect to provider.
|
|
LOG.warn(_("[%d] Connection returned in bad state, reconnecting "
|
|
"to %s"), rid, _conn_str(http_conn))
|
|
http_conn = self._create_connection(*self._conn_params(http_conn))
|
|
priority = self._next_conn_priority
|
|
self._next_conn_priority += 1
|
|
elif service_unavail:
|
|
# http_conn returned a service unaviable response, put other
|
|
# connections to the same controller at end of priority queue,
|
|
conns = []
|
|
while not self._conn_pool.empty():
|
|
priority, conn = self._conn_pool.get()
|
|
if self._conn_params(conn) == conn_params:
|
|
priority = self._next_conn_priority
|
|
self._next_conn_priority += 1
|
|
conns.append((priority, conn))
|
|
for priority, conn in conns:
|
|
self._conn_pool.put((priority, conn))
|
|
# put http_conn at end of queue also
|
|
priority = self._next_conn_priority
|
|
self._next_conn_priority += 1
|
|
else:
|
|
priority = http_conn.priority
|
|
|
|
self._conn_pool.put((priority, http_conn))
|
|
LOG.debug(_("[%d] Released connection %s. %d connection(s) "
|
|
"available."),
|
|
rid, _conn_str(http_conn), self._conn_pool.qsize())
|
|
|
|
def _wait_for_login(self, conn, headers=None):
|
|
'''Block until a login has occurred for the current API provider.'''
|
|
|
|
data = self._get_provider_data(conn)
|
|
if data is None:
|
|
LOG.error(_("Login request for an invalid connection: '%s'"),
|
|
_conn_str(conn))
|
|
return
|
|
provider_sem = data[0]
|
|
if provider_sem.acquire(blocking=False):
|
|
try:
|
|
cookie = self._login(conn, headers)
|
|
self.set_auth_cookie(conn, cookie)
|
|
finally:
|
|
provider_sem.release()
|
|
else:
|
|
LOG.debug(_("Waiting for auth to complete"))
|
|
# Wait until we can aquire then release
|
|
provider_sem.acquire(blocking=True)
|
|
provider_sem.release()
|
|
|
|
def _get_provider_data(self, conn_or_conn_params, default=None):
|
|
"""Get data for specified API provider.
|
|
|
|
Args:
|
|
conn_or_conn_params: either a HTTP(S)Connection object or the
|
|
resolved conn_params tuple returned by self._conn_params().
|
|
default: conn_params if ones passed aren't known
|
|
Returns: Data associated with specified provider
|
|
"""
|
|
conn_params = self._normalize_conn_params(conn_or_conn_params)
|
|
return self._api_provider_data.get(conn_params, default)
|
|
|
|
def _set_provider_data(self, conn_or_conn_params, data):
|
|
"""Set data for specified API provider.
|
|
|
|
Args:
|
|
conn_or_conn_params: either a HTTP(S)Connection object or the
|
|
resolved conn_params tuple returned by self._conn_params().
|
|
data: data to associate with API provider
|
|
"""
|
|
conn_params = self._normalize_conn_params(conn_or_conn_params)
|
|
if data is None:
|
|
del self._api_provider_data[conn_params]
|
|
else:
|
|
self._api_provider_data[conn_params] = data
|
|
|
|
def _normalize_conn_params(self, conn_or_conn_params):
|
|
"""Normalize conn_param tuple.
|
|
|
|
Args:
|
|
conn_or_conn_params: either a HTTP(S)Connection object or the
|
|
resolved conn_params tuple returned by self._conn_params().
|
|
|
|
Returns: Normalized conn_param tuple
|
|
"""
|
|
if (not isinstance(conn_or_conn_params, tuple) and
|
|
not isinstance(conn_or_conn_params, httplib.HTTPConnection)):
|
|
LOG.debug(_("Invalid conn_params value: '%s'"),
|
|
str(conn_or_conn_params))
|
|
return conn_or_conn_params
|
|
if isinstance(conn_or_conn_params, httplib.HTTPConnection):
|
|
conn_params = self._conn_params(conn_or_conn_params)
|
|
else:
|
|
conn_params = conn_or_conn_params
|
|
host, port, is_ssl = conn_params
|
|
if port is None:
|
|
port = 443 if is_ssl else 80
|
|
return (host, port, is_ssl)
|
|
|
|
def update_providers(self, api_providers):
|
|
new_providers = set([tuple(p) for p in api_providers])
|
|
if new_providers != self._api_providers:
|
|
new_conns = []
|
|
while not self._conn_pool.empty():
|
|
priority, conn = self._conn_pool.get_nowait()
|
|
if self._conn_params(conn) in new_providers:
|
|
new_conns.append((priority, conn))
|
|
|
|
to_subtract = self._api_providers - new_providers
|
|
for p in to_subtract:
|
|
self._set_provider_data(p, None)
|
|
to_add = new_providers - self._api_providers
|
|
for unused_i in range(self._concurrent_connections):
|
|
for host, port, is_ssl in to_add:
|
|
conn = self._create_connection(host, port, is_ssl)
|
|
new_conns.append((self._next_conn_priority, conn))
|
|
self._next_conn_priority += 1
|
|
|
|
for priority, conn in new_conns:
|
|
self._conn_pool.put((priority, conn))
|
|
self._api_providers = new_providers
|