Some error handling improvements
This commit is contained in:
parent
20f19d1c70
commit
ee0e546150
@ -13,10 +13,14 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from .exceptions import MessagingException
|
|
||||||
from .exceptions import MessagingTimeout
|
|
||||||
from .rpc.client import RPCClient
|
from .rpc.client import RPCClient
|
||||||
from .rpc.server import BlockingRPCServer
|
from .rpc.server import BlockingRPCServer
|
||||||
from .serializer import Serializer
|
from .serializer import Serializer
|
||||||
from .target import Target
|
from .target import Target
|
||||||
from .transport import get_transport
|
from .transport import get_transport
|
||||||
|
|
||||||
|
from .exceptions import MessagingException
|
||||||
|
from .exceptions import MessagingTimeout
|
||||||
|
|
||||||
|
from .transport import DriverLoadFailure
|
||||||
|
from .transport import InvalidTransportURL
|
||||||
|
@ -20,7 +20,7 @@ import Queue
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
from openstack.common.gettextutils import _
|
from openstack.common.gettextutils import _
|
||||||
from openstack.common.messaging import exceptions
|
from openstack.common import messaging
|
||||||
from openstack.common.messaging._drivers import base
|
from openstack.common.messaging._drivers import base
|
||||||
from openstack.common.messaging import _utils as utils
|
from openstack.common.messaging import _utils as utils
|
||||||
|
|
||||||
@ -119,7 +119,7 @@ class FakeDriver(base.BaseDriver):
|
|||||||
break
|
break
|
||||||
|
|
||||||
if timeout and (time.time() - start_time > timeout):
|
if timeout and (time.time() - start_time > timeout):
|
||||||
raise exceptions.MessagingTimeout(
|
raise messaging.MessagingTimeout(
|
||||||
_('No listeners found for topic %s') % target.topic)
|
_('No listeners found for topic %s') % target.topic)
|
||||||
|
|
||||||
time.sleep(.05)
|
time.sleep(.05)
|
||||||
|
@ -13,9 +13,16 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
|
||||||
class MessagingException(Exception):
|
class MessagingException(Exception):
|
||||||
pass
|
"""Base class for exceptions."""
|
||||||
|
|
||||||
|
def __init__(self, msg=None):
|
||||||
|
self.msg = msg
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return self.msg
|
||||||
|
|
||||||
|
|
||||||
class MessagingTimeout(MessagingException):
|
class MessagingTimeout(MessagingException):
|
||||||
pass
|
"""Raised if message sending times out."""
|
||||||
|
@ -21,6 +21,9 @@ import urlparse
|
|||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
from stevedore import driver
|
from stevedore import driver
|
||||||
|
|
||||||
|
from openstack.common.messaging import exceptions
|
||||||
|
|
||||||
|
|
||||||
_transport_opts = [
|
_transport_opts = [
|
||||||
cfg.StrOpt('transport_url',
|
cfg.StrOpt('transport_url',
|
||||||
default=None,
|
default=None,
|
||||||
@ -75,6 +78,24 @@ class Transport(object):
|
|||||||
return self._driver.listen(target)
|
return self._driver.listen(target)
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidTransportURL(exceptions.MessagingException):
|
||||||
|
"""Raised if transport URL is invalid."""
|
||||||
|
|
||||||
|
def __init__(self, url, msg):
|
||||||
|
super(InvalidTransportURL, self).__init__(msg)
|
||||||
|
self.url = url
|
||||||
|
|
||||||
|
|
||||||
|
class DriverLoadFailure(exceptions.MessagingException):
|
||||||
|
"""Raised if a transport driver can't be loaded."""
|
||||||
|
|
||||||
|
def __init__(self, driver, ex):
|
||||||
|
msg = 'Failed to load transport driver "%s": %s' % (driver, ex)
|
||||||
|
super(DriverLoadFailure, self).__init__(msg)
|
||||||
|
self.driver = driver
|
||||||
|
self.ex = ex
|
||||||
|
|
||||||
|
|
||||||
def get_transport(conf, url=None):
|
def get_transport(conf, url=None):
|
||||||
"""A factory method for Transport objects.
|
"""A factory method for Transport objects.
|
||||||
|
|
||||||
@ -102,6 +123,8 @@ def get_transport(conf, url=None):
|
|||||||
url = url or conf.transport_url
|
url = url or conf.transport_url
|
||||||
if url is not None:
|
if url is not None:
|
||||||
rpc_backend = urlparse.urlparse(url).scheme
|
rpc_backend = urlparse.urlparse(url).scheme
|
||||||
|
if not rpc_backend:
|
||||||
|
raise InvalidTransportURL(url, 'No scheme specified in "%s"' % url)
|
||||||
else:
|
else:
|
||||||
rpc_backend = conf.rpc_backend
|
rpc_backend = conf.rpc_backend
|
||||||
|
|
||||||
@ -109,9 +132,13 @@ def get_transport(conf, url=None):
|
|||||||
if url is not None:
|
if url is not None:
|
||||||
kwargs['url'] = url
|
kwargs['url'] = url
|
||||||
|
|
||||||
mgr = driver.DriverManager('openstack.common.messaging.drivers',
|
try:
|
||||||
rpc_backend,
|
mgr = driver.DriverManager('openstack.common.messaging.drivers',
|
||||||
invoke_on_load=True,
|
rpc_backend,
|
||||||
invoke_args=[conf],
|
invoke_on_load=True,
|
||||||
invoke_kwds=kwargs)
|
invoke_args=[conf],
|
||||||
|
invoke_kwds=kwargs)
|
||||||
|
except RuntimeError as ex:
|
||||||
|
raise DriverLoadFailure(rpc_backend, ex)
|
||||||
|
|
||||||
return Transport(mgr.driver)
|
return Transport(mgr.driver)
|
||||||
|
Loading…
Reference in New Issue
Block a user