mongodb: replace custom retry code by retrying
Change-Id: I01936838cd32173c70a3052f6156bae829015ff4
This commit is contained in:
parent
e5db4b75f1
commit
bed80141c1
@ -1,8 +1,6 @@
|
|||||||
#
|
#
|
||||||
# Copyright Ericsson AB 2013. All rights reserved
|
# Copyright Ericsson AB 2013. All rights reserved
|
||||||
#
|
# Copyright 2015 Red Hat, Inc
|
||||||
# Authors: Ildiko Vancsa <ildiko.vancsa@ericsson.com>
|
|
||||||
# Balazs Gibizer <balazs.gibizer@ericsson.com>
|
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
# not use this file except in compliance with the License. You may obtain
|
# not use this file except in compliance with the License. You may obtain
|
||||||
@ -18,13 +16,13 @@
|
|||||||
"""Common functions for MongoDB and DB2 backends
|
"""Common functions for MongoDB and DB2 backends
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import time
|
|
||||||
import weakref
|
import weakref
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
from oslo_utils import netutils
|
from oslo_utils import netutils
|
||||||
import pymongo
|
import pymongo
|
||||||
|
import retrying
|
||||||
|
|
||||||
from aodh.i18n import _
|
from aodh.i18n import _
|
||||||
|
|
||||||
@ -221,37 +219,6 @@ class QueryTransformer(object):
|
|||||||
return self._handle_simple_op(operator_node, nodes)
|
return self._handle_simple_op(operator_node, nodes)
|
||||||
|
|
||||||
|
|
||||||
def safe_mongo_call(call):
|
|
||||||
def closure(*args, **kwargs):
|
|
||||||
max_retries = cfg.CONF.database.max_retries
|
|
||||||
retry_interval = cfg.CONF.database.retry_interval
|
|
||||||
attempts = 0
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
return call(*args, **kwargs)
|
|
||||||
except pymongo.errors.AutoReconnect as err:
|
|
||||||
if 0 <= max_retries <= attempts:
|
|
||||||
LOG.error(_('Unable to reconnect to the primary mongodb '
|
|
||||||
'after %(retries)d retries. Giving up.') %
|
|
||||||
{'retries': max_retries})
|
|
||||||
raise
|
|
||||||
LOG.warn(_('Unable to reconnect to the primary mongodb: '
|
|
||||||
'%(errmsg)s. Trying again in %(retry_interval)d '
|
|
||||||
'seconds.') %
|
|
||||||
{'errmsg': err, 'retry_interval': retry_interval})
|
|
||||||
attempts += 1
|
|
||||||
time.sleep(retry_interval)
|
|
||||||
return closure
|
|
||||||
|
|
||||||
|
|
||||||
class MongoConn(object):
|
|
||||||
def __init__(self, method):
|
|
||||||
self.method = method
|
|
||||||
|
|
||||||
@safe_mongo_call
|
|
||||||
def __call__(self, *args, **kwargs):
|
|
||||||
return self.method(*args, **kwargs)
|
|
||||||
|
|
||||||
MONGO_METHODS = set([typ for typ in dir(pymongo.collection.Collection)
|
MONGO_METHODS = set([typ for typ in dir(pymongo.collection.Collection)
|
||||||
if not typ.startswith('_')])
|
if not typ.startswith('_')])
|
||||||
MONGO_METHODS.update(set([typ for typ in dir(pymongo.MongoClient)
|
MONGO_METHODS.update(set([typ for typ in dir(pymongo.MongoClient)
|
||||||
@ -260,6 +227,15 @@ MONGO_METHODS.update(set([typ for typ in dir(pymongo)
|
|||||||
if not typ.startswith('_')]))
|
if not typ.startswith('_')]))
|
||||||
|
|
||||||
|
|
||||||
|
def _safe_mongo_call(max_retries, retry_interval):
|
||||||
|
return retrying.retry(
|
||||||
|
retry_on_exception=lambda e: isinstance(
|
||||||
|
e, pymongo.errors.AutoReconnect),
|
||||||
|
wait_fixed=retry_interval * 1000,
|
||||||
|
stop_max_attempt_number=max_retries if max_retries >= 0 else None
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class MongoProxy(object):
|
class MongoProxy(object):
|
||||||
def __init__(self, conn):
|
def __init__(self, conn):
|
||||||
self.conn = conn
|
self.conn = conn
|
||||||
@ -281,13 +257,16 @@ class MongoProxy(object):
|
|||||||
"""Wrap MongoDB connection.
|
"""Wrap MongoDB connection.
|
||||||
|
|
||||||
If item is the name of an executable method, for example find or
|
If item is the name of an executable method, for example find or
|
||||||
insert, wrap this method in the MongoConn.
|
insert, wrap this method to retry.
|
||||||
Else wrap getting attribute with MongoProxy.
|
Else wrap getting attribute with MongoProxy.
|
||||||
"""
|
"""
|
||||||
if item in ('name', 'database'):
|
if item in ('name', 'database'):
|
||||||
return getattr(self.conn, item)
|
return getattr(self.conn, item)
|
||||||
if item in MONGO_METHODS:
|
if item in MONGO_METHODS:
|
||||||
return MongoConn(getattr(self.conn, item))
|
return _safe_mongo_call(
|
||||||
|
cfg.CONF.database.max_retries,
|
||||||
|
cfg.CONF.database.retry_interval,
|
||||||
|
)(getattr(self.conn, item))
|
||||||
return MongoProxy(getattr(self.conn, item))
|
return MongoProxy(getattr(self.conn, item))
|
||||||
|
|
||||||
def __call__(self, *args, **kwargs):
|
def __call__(self, *args, **kwargs):
|
||||||
@ -297,12 +276,15 @@ class MongoProxy(object):
|
|||||||
class CursorProxy(pymongo.cursor.Cursor):
|
class CursorProxy(pymongo.cursor.Cursor):
|
||||||
def __init__(self, cursor):
|
def __init__(self, cursor):
|
||||||
self.cursor = cursor
|
self.cursor = cursor
|
||||||
|
self.next = _safe_mongo_call(
|
||||||
|
cfg.CONF.database.max_retries,
|
||||||
|
cfg.CONF.database.retry_interval,
|
||||||
|
)(self._next)
|
||||||
|
|
||||||
def __getitem__(self, item):
|
def __getitem__(self, item):
|
||||||
return self.cursor[item]
|
return self.cursor[item]
|
||||||
|
|
||||||
@safe_mongo_call
|
def _next(self):
|
||||||
def next(self):
|
|
||||||
"""Wrap Cursor next method.
|
"""Wrap Cursor next method.
|
||||||
|
|
||||||
This method will be executed before each Cursor next method call.
|
This method will be executed before each Cursor next method call.
|
||||||
|
Loading…
Reference in New Issue
Block a user