Merge "Big Switch: Lock consistency table for REST calls"

This commit is contained in:
Jenkins
2014-06-29 03:26:26 +00:00
committed by Gerrit Code Review
4 changed files with 97 additions and 33 deletions

View File

@@ -14,20 +14,22 @@
# under the License. # under the License.
import sqlalchemy as sa import sqlalchemy as sa
from neutron.common import exceptions
from neutron.db import api as db from neutron.db import api as db
from neutron.db import model_base from neutron.db import model_base
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
'''
A simple table to store the latest consistency hash class MultipleReadForUpdateCalls(exceptions.NeutronException):
received from a server in case neutron gets restarted. message = _("Only one read_for_update call may be made at a time.")
'''
class ConsistencyHash(model_base.BASEV2): class ConsistencyHash(model_base.BASEV2):
''' '''
A simple table to store the latest consistency hash
received from a server.
For now we only support one global state so the For now we only support one global state so the
hash_id will always be '1' hash_id will always be '1'
''' '''
@@ -37,20 +39,44 @@ class ConsistencyHash(model_base.BASEV2):
hash = sa.Column(sa.String(255), nullable=False) hash = sa.Column(sa.String(255), nullable=False)
def get_consistency_hash(hash_id='1'): class HashHandler(object):
session = db.get_session() '''
with session.begin(subtransactions=True): A wrapper object to keep track of the session and hold the SQL
query = session.query(ConsistencyHash) lock between the read and the update to prevent other servers
res = query.filter_by(hash_id=hash_id).first() from reading the hash during a transaction.
if not res: '''
return False def __init__(self, context=None, hash_id='1'):
return res.hash self.hash_id = hash_id
self.session = db.get_session() if not context else context.session
self.hash_db_obj = None
self.transaction = None
def read_for_update(self):
if self.transaction:
raise MultipleReadForUpdateCalls()
self.transaction = self.session.begin(subtransactions=True)
# Lock for update here to prevent another server from reading the hash
# while this one is in the middle of a transaction.
# This may not lock the SQL table in MySQL Galera deployments
# but that's okay because the worst case is a double-sync
res = (self.session.query(ConsistencyHash).
filter_by(hash_id=self.hash_id).
with_lockmode('update').first())
if not res:
return ''
self.hash_db_obj = res
return res.hash
def put_consistency_hash(hash, hash_id='1'): def put_hash(self, hash):
session = db.get_session() hash = hash or ''
with session.begin(subtransactions=True): if not self.transaction:
conhash = ConsistencyHash(hash_id=hash_id, hash=hash) self.transaction = self.session.begin(subtransactions=True)
session.merge(conhash) if self.hash_db_obj is not None:
self.hash_db_obj.hash = hash
else:
conhash = ConsistencyHash(hash_id=self.hash_id, hash=hash)
self.session.merge(conhash)
self.transaction.commit()
self.transaction = None
LOG.debug(_("Consistency hash for group %(hash_id)s updated " LOG.debug(_("Consistency hash for group %(hash_id)s updated "
"to %(hash)s"), {'hash_id': hash_id, 'hash': hash}) "to %(hash)s"), {'hash_id': self.hash_id, 'hash': hash})

View File

@@ -44,6 +44,7 @@ on port-attach) on an additional PUT to do a bulk dump of all persistent data.
""" """
import copy import copy
import functools
import httplib import httplib
import re import re
@@ -442,6 +443,14 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
raise exceptions.PortNotFound(port_id=port_id) raise exceptions.PortNotFound(port_id=port_id)
def put_context_in_serverpool(f):
@functools.wraps(f)
def wrapper(self, context, *args, **kwargs):
self.servers.set_context(context)
return f(self, context, *args, **kwargs)
return wrapper
class NeutronRestProxyV2(NeutronRestProxyV2Base, class NeutronRestProxyV2(NeutronRestProxyV2Base,
addr_pair_db.AllowedAddressPairsMixin, addr_pair_db.AllowedAddressPairsMixin,
extradhcpopt_db.ExtraDhcpOptMixin, extradhcpopt_db.ExtraDhcpOptMixin,
@@ -508,6 +517,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
# Consume from all consumers in threads # Consume from all consumers in threads
self.conn.consume_in_threads() self.conn.consume_in_threads()
@put_context_in_serverpool
def create_network(self, context, network): def create_network(self, context, network):
"""Create a network. """Create a network.
@@ -551,6 +561,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
# return created network # return created network
return new_net return new_net
@put_context_in_serverpool
def update_network(self, context, net_id, network): def update_network(self, context, net_id, network):
"""Updates the properties of a particular Virtual Network. """Updates the properties of a particular Virtual Network.
@@ -590,6 +601,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
# NOTE(kevinbenton): workaround for eventlet/mysql deadlock # NOTE(kevinbenton): workaround for eventlet/mysql deadlock
@utils.synchronized('bsn-port-barrier') @utils.synchronized('bsn-port-barrier')
@put_context_in_serverpool
def delete_network(self, context, net_id): def delete_network(self, context, net_id):
"""Delete a network. """Delete a network.
:param context: neutron api request context :param context: neutron api request context
@@ -612,6 +624,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
self._send_delete_network(orig_net, context) self._send_delete_network(orig_net, context)
return ret_val return ret_val
@put_context_in_serverpool
def create_port(self, context, port): def create_port(self, context, port):
"""Create a port, which is a connection point of a device """Create a port, which is a connection point of a device
(e.g., a VM NIC) to attach to a L2 Neutron network. (e.g., a VM NIC) to attach to a L2 Neutron network.
@@ -702,6 +715,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
self._extend_port_dict_binding(context, port) self._extend_port_dict_binding(context, port)
return [self._fields(port, fields) for port in ports] return [self._fields(port, fields) for port in ports]
@put_context_in_serverpool
def update_port(self, context, port_id, port): def update_port(self, context, port_id, port):
"""Update values of a port. """Update values of a port.
@@ -778,6 +792,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
# NOTE(kevinbenton): workaround for eventlet/mysql deadlock # NOTE(kevinbenton): workaround for eventlet/mysql deadlock
@utils.synchronized('bsn-port-barrier') @utils.synchronized('bsn-port-barrier')
@put_context_in_serverpool
def delete_port(self, context, port_id, l3_port_check=True): def delete_port(self, context, port_id, l3_port_check=True):
"""Delete a port. """Delete a port.
:param context: neutron api request context :param context: neutron api request context
@@ -803,6 +818,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
self._delete_port(context, port_id) self._delete_port(context, port_id)
self.servers.rest_delete_port(tenid, port['network_id'], port_id) self.servers.rest_delete_port(tenid, port['network_id'], port_id)
@put_context_in_serverpool
def create_subnet(self, context, subnet): def create_subnet(self, context, subnet):
LOG.debug(_("NeutronRestProxyV2: create_subnet() called")) LOG.debug(_("NeutronRestProxyV2: create_subnet() called"))
@@ -819,6 +835,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
self._send_update_network(orig_net, context) self._send_update_network(orig_net, context)
return new_subnet return new_subnet
@put_context_in_serverpool
def update_subnet(self, context, id, subnet): def update_subnet(self, context, id, subnet):
LOG.debug(_("NeutronRestProxyV2: update_subnet() called")) LOG.debug(_("NeutronRestProxyV2: update_subnet() called"))
@@ -837,6 +854,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
# NOTE(kevinbenton): workaround for eventlet/mysql deadlock # NOTE(kevinbenton): workaround for eventlet/mysql deadlock
@utils.synchronized('bsn-port-barrier') @utils.synchronized('bsn-port-barrier')
@put_context_in_serverpool
def delete_subnet(self, context, id): def delete_subnet(self, context, id):
LOG.debug(_("NeutronRestProxyV2: delete_subnet() called")) LOG.debug(_("NeutronRestProxyV2: delete_subnet() called"))
orig_subnet = super(NeutronRestProxyV2, self).get_subnet(context, id) orig_subnet = super(NeutronRestProxyV2, self).get_subnet(context, id)
@@ -875,6 +893,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
return tenantset return tenantset
return defaultset return defaultset
@put_context_in_serverpool
def create_router(self, context, router): def create_router(self, context, router):
LOG.debug(_("NeutronRestProxyV2: create_router() called")) LOG.debug(_("NeutronRestProxyV2: create_router() called"))
@@ -896,6 +915,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
# return created router # return created router
return new_router return new_router
@put_context_in_serverpool
def update_router(self, context, router_id, router): def update_router(self, context, router_id, router):
LOG.debug(_("NeutronRestProxyV2.update_router() called")) LOG.debug(_("NeutronRestProxyV2.update_router() called"))
@@ -919,6 +939,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
# NOTE(kevinbenton): workaround for eventlet/mysql deadlock. # NOTE(kevinbenton): workaround for eventlet/mysql deadlock.
# delete_router ends up calling _delete_port instead of delete_port. # delete_router ends up calling _delete_port instead of delete_port.
@utils.synchronized('bsn-port-barrier') @utils.synchronized('bsn-port-barrier')
@put_context_in_serverpool
def delete_router(self, context, router_id): def delete_router(self, context, router_id):
LOG.debug(_("NeutronRestProxyV2: delete_router() called")) LOG.debug(_("NeutronRestProxyV2: delete_router() called"))
@@ -1009,6 +1030,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
interface_id) interface_id)
return del_ret return del_ret
@put_context_in_serverpool
def create_floatingip(self, context, floatingip): def create_floatingip(self, context, floatingip):
LOG.debug(_("NeutronRestProxyV2: create_floatingip() called")) LOG.debug(_("NeutronRestProxyV2: create_floatingip() called"))
@@ -1032,6 +1054,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
# return created floating IP # return created floating IP
return new_fl_ip return new_fl_ip
@put_context_in_serverpool
def update_floatingip(self, context, id, floatingip): def update_floatingip(self, context, id, floatingip):
LOG.debug(_("NeutronRestProxyV2: update_floatingip() called")) LOG.debug(_("NeutronRestProxyV2: update_floatingip() called"))
@@ -1048,6 +1071,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
self._send_floatingip_update(context) self._send_floatingip_update(context)
return new_fl_ip return new_fl_ip
@put_context_in_serverpool
def delete_floatingip(self, context, id): def delete_floatingip(self, context, id):
LOG.debug(_("NeutronRestProxyV2: delete_floatingip() called")) LOG.debug(_("NeutronRestProxyV2: delete_floatingip() called"))
@@ -1072,6 +1096,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
# overriding method from l3_db as original method calls # overriding method from l3_db as original method calls
# self.delete_floatingip() which in turn calls self.delete_port() which # self.delete_floatingip() which in turn calls self.delete_port() which
# is locked with 'bsn-port-barrier' # is locked with 'bsn-port-barrier'
@put_context_in_serverpool
def delete_disassociated_floatingips(self, context, network_id): def delete_disassociated_floatingips(self, context, network_id):
query = self._model_query(context, l3_db.FloatingIP) query = self._model_query(context, l3_db.FloatingIP)
query = query.filter_by(floating_network_id=network_id, query = query.filter_by(floating_network_id=network_id,

View File

@@ -37,6 +37,7 @@ import socket
import ssl import ssl
import eventlet import eventlet
import eventlet.corolocal
from oslo.config import cfg from oslo.config import cfg
from neutron.common import exceptions from neutron.common import exceptions
@@ -120,7 +121,7 @@ class ServerProxy(object):
return self.capabilities return self.capabilities
def rest_call(self, action, resource, data='', headers={}, timeout=False, def rest_call(self, action, resource, data='', headers={}, timeout=False,
reconnect=False): reconnect=False, hash_handler=None):
uri = self.base_uri + resource uri = self.base_uri + resource
body = json.dumps(data) body = json.dumps(data)
if not headers: if not headers:
@@ -130,7 +131,12 @@ class ServerProxy(object):
headers['NeutronProxy-Agent'] = self.name headers['NeutronProxy-Agent'] = self.name
headers['Instance-ID'] = self.neutron_id headers['Instance-ID'] = self.neutron_id
headers['Orchestration-Service-ID'] = ORCHESTRATION_SERVICE_ID headers['Orchestration-Service-ID'] = ORCHESTRATION_SERVICE_ID
headers[HASH_MATCH_HEADER] = self.mypool.consistency_hash or '' if hash_handler:
# this will be excluded on calls that don't need hashes
# (e.g. topology sync, capability checks)
headers[HASH_MATCH_HEADER] = hash_handler.read_for_update()
else:
hash_handler = cdb.HashHandler()
if 'keep-alive' in self.capabilities: if 'keep-alive' in self.capabilities:
headers['Connection'] = 'keep-alive' headers['Connection'] = 'keep-alive'
else: else:
@@ -177,9 +183,7 @@ class ServerProxy(object):
try: try:
self.currentconn.request(action, uri, body, headers) self.currentconn.request(action, uri, body, headers)
response = self.currentconn.getresponse() response = self.currentconn.getresponse()
newhash = response.getheader(HASH_MATCH_HEADER) hash_handler.put_hash(response.getheader(HASH_MATCH_HEADER))
if newhash:
self._put_consistency_hash(newhash)
respstr = response.read() respstr = response.read()
respdata = respstr respdata = respstr
if response.status in self.success_codes: if response.status in self.success_codes:
@@ -215,10 +219,6 @@ class ServerProxy(object):
'data': ret[3]}) 'data': ret[3]})
return ret return ret
def _put_consistency_hash(self, newhash):
self.mypool.consistency_hash = newhash
cdb.put_consistency_hash(newhash)
class ServerPool(object): class ServerPool(object):
@@ -234,6 +234,7 @@ class ServerPool(object):
self.neutron_id = cfg.CONF.RESTPROXY.neutron_id self.neutron_id = cfg.CONF.RESTPROXY.neutron_id
self.base_uri = base_uri self.base_uri = base_uri
self.name = name self.name = name
self.contexts = {}
self.timeout = cfg.CONF.RESTPROXY.server_timeout self.timeout = cfg.CONF.RESTPROXY.server_timeout
self.always_reconnect = not cfg.CONF.RESTPROXY.cache_connections self.always_reconnect = not cfg.CONF.RESTPROXY.cache_connections
default_port = 8000 default_port = 8000
@@ -245,10 +246,6 @@ class ServerPool(object):
self.get_topo_function = None self.get_topo_function = None
self.get_topo_function_args = {} self.get_topo_function_args = {}
# Hash to send to backend with request as expected previous
# state to verify consistency.
self.consistency_hash = cdb.get_consistency_hash()
if not servers: if not servers:
raise cfg.Error(_('Servers not defined. Aborting server manager.')) raise cfg.Error(_('Servers not defined. Aborting server manager.'))
servers = [s if len(s.rsplit(':', 1)) == 2 servers = [s if len(s.rsplit(':', 1)) == 2
@@ -267,6 +264,19 @@ class ServerPool(object):
cfg.CONF.RESTPROXY.consistency_interval) cfg.CONF.RESTPROXY.consistency_interval)
LOG.debug(_("ServerPool: initialization done")) LOG.debug(_("ServerPool: initialization done"))
def set_context(self, context):
# this context needs to be local to the greenthread
# so concurrent requests don't use the wrong context
self.contexts[eventlet.corolocal.get_ident()] = context
def pop_context(self):
# Don't store these contexts after use. They should only
# last for one request.
try:
return self.contexts.pop(eventlet.corolocal.get_ident())
except KeyError:
return None
def get_capabilities(self): def get_capabilities(self):
# lookup on first try # lookup on first try
try: try:
@@ -393,12 +403,14 @@ class ServerPool(object):
@utils.synchronized('bsn-rest-call') @utils.synchronized('bsn-rest-call')
def rest_call(self, action, resource, data, headers, ignore_codes, def rest_call(self, action, resource, data, headers, ignore_codes,
timeout=False): timeout=False):
hash_handler = cdb.HashHandler(context=self.pop_context())
good_first = sorted(self.servers, key=lambda x: x.failed) good_first = sorted(self.servers, key=lambda x: x.failed)
first_response = None first_response = None
for active_server in good_first: for active_server in good_first:
ret = active_server.rest_call(action, resource, data, headers, ret = active_server.rest_call(action, resource, data, headers,
timeout, timeout,
reconnect=self.always_reconnect) reconnect=self.always_reconnect,
hash_handler=hash_handler)
# If inconsistent, do a full synchronization # If inconsistent, do a full synchronization
if ret[0] == httplib.CONFLICT: if ret[0] == httplib.CONFLICT:
if not self.get_topo_function: if not self.get_topo_function:

View File

@@ -361,7 +361,8 @@ class ServerManagerTests(test_rp.BigSwitchProxyPluginV2TestCase):
# making a call should trigger a conflict sync # making a call should trigger a conflict sync
pl.servers.rest_call('GET', '/', '', None, []) pl.servers.rest_call('GET', '/', '', None, [])
srestmock.assert_has_calls([ srestmock.assert_has_calls([
mock.call('GET', '/', '', None, False, reconnect=True), mock.call('GET', '/', '', None, False, reconnect=True,
hash_handler=mock.ANY),
mock.call('PUT', '/topology', mock.call('PUT', '/topology',
{'routers': [], 'networks': []}, {'routers': [], 'networks': []},
timeout=None) timeout=None)