OpenStack DNS As A Service (Designate)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

rpc.py 8.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. # Copyright 2013 Red Hat, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  4. # not use this file except in compliance with the License. You may obtain
  5. # a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. # License for the specific language governing permissions and limitations
  13. # under the License.
  14. __all__ = [
  15. 'init',
  16. 'cleanup',
  17. 'set_defaults',
  18. 'add_extra_exmods',
  19. 'clear_extra_exmods',
  20. 'get_allowed_exmods',
  21. 'RequestContextSerializer',
  22. 'get_client',
  23. 'get_server',
  24. 'get_notifier',
  25. ]
  26. import functools
  27. from oslo_config import cfg
  28. import oslo_messaging as messaging
  29. from oslo_messaging.rpc import dispatcher as rpc_dispatcher
  30. from oslo_serialization import jsonutils
  31. import threading
  32. import designate.context
  33. import designate.exceptions
  34. from designate import objects
  35. CONF = cfg.CONF
  36. EXPECTED_EXCEPTION = threading.local()
  37. NOTIFICATION_TRANSPORT = None
  38. NOTIFIER = None
  39. TRANSPORT = None
  40. # NOTE: Additional entries to designate.exceptions goes here.
  41. CONF.register_opts([
  42. cfg.ListOpt(
  43. 'allowed_remote_exmods',
  44. default=[],
  45. help="Additional modules that contains allowed RPC exceptions.",
  46. deprecated_name='allowed_rpc_exception_modules')
  47. ])
  48. ALLOWED_EXMODS = [
  49. designate.exceptions.__name__,
  50. 'designate.backend.impl_dynect'
  51. ]
  52. EXTRA_EXMODS = []
  53. def init(conf):
  54. global TRANSPORT, NOTIFIER, NOTIFICATION_TRANSPORT
  55. exmods = get_allowed_exmods()
  56. TRANSPORT = create_transport(get_transport_url())
  57. NOTIFICATION_TRANSPORT = messaging.get_notification_transport(
  58. conf, allowed_remote_exmods=exmods)
  59. serializer = RequestContextSerializer(JsonPayloadSerializer())
  60. NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
  61. serializer=serializer)
  62. def initialized():
  63. return None not in [TRANSPORT, NOTIFIER, NOTIFICATION_TRANSPORT]
  64. def cleanup():
  65. global TRANSPORT, NOTIFIER, NOTIFICATION_TRANSPORT
  66. if TRANSPORT is None:
  67. raise AssertionError("'TRANSPORT' must not be None")
  68. if NOTIFICATION_TRANSPORT is None:
  69. raise AssertionError("'NOTIFICATION_TRANSPORT' must not be None")
  70. if NOTIFIER is None:
  71. raise AssertionError("'NOTIFIER' must not be None")
  72. TRANSPORT.cleanup()
  73. NOTIFICATION_TRANSPORT.cleanup()
  74. TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None
  75. def set_defaults(control_exchange):
  76. messaging.set_transport_defaults(control_exchange)
  77. def add_extra_exmods(*args):
  78. EXTRA_EXMODS.extend(args)
  79. def clear_extra_exmods():
  80. del EXTRA_EXMODS[:]
  81. def get_allowed_exmods():
  82. return ALLOWED_EXMODS + EXTRA_EXMODS + CONF.allowed_remote_exmods
  83. class JsonPayloadSerializer(messaging.NoOpSerializer):
  84. @staticmethod
  85. def serialize_entity(context, entity):
  86. return jsonutils.to_primitive(entity, convert_instances=True)
  87. class DesignateObjectSerializer(messaging.NoOpSerializer):
  88. def _process_iterable(self, context, action_fn, values):
  89. """Process an iterable, taking an action on each value.
  90. :param:context: Request context
  91. :param:action_fn: Action to take on each item in values
  92. :param:values: Iterable container of things to take action on
  93. :returns: A new container of the same type (except set) with
  94. items from values having had action applied.
  95. """
  96. iterable = values.__class__
  97. if iterable == set:
  98. # NOTE: A set can't have an unhashable value inside, such as
  99. # a dict. Convert sets to tuples, which is fine, since we can't
  100. # send them over RPC anyway.
  101. iterable = tuple
  102. return iterable([action_fn(context, value) for value in values])
  103. def serialize_entity(self, context, entity):
  104. if isinstance(entity, (tuple, list, set)):
  105. entity = self._process_iterable(context, self.serialize_entity,
  106. entity)
  107. elif hasattr(entity, 'to_primitive') and callable(entity.to_primitive):
  108. entity = entity.to_primitive()
  109. return jsonutils.to_primitive(entity, convert_instances=True)
  110. def deserialize_entity(self, context, entity):
  111. if isinstance(entity, dict) and 'designate_object.name' in entity:
  112. entity = objects.DesignateObject.from_primitive(entity)
  113. elif isinstance(entity, (tuple, list, set)):
  114. entity = self._process_iterable(context, self.deserialize_entity,
  115. entity)
  116. return entity
  117. class RequestContextSerializer(messaging.Serializer):
  118. def __init__(self, base):
  119. self._base = base
  120. def serialize_entity(self, context, entity):
  121. if not self._base:
  122. return entity
  123. return self._base.serialize_entity(context, entity)
  124. def deserialize_entity(self, context, entity):
  125. if not self._base:
  126. return entity
  127. return self._base.deserialize_entity(context, entity)
  128. def serialize_context(self, context):
  129. return context.to_dict()
  130. def deserialize_context(self, context):
  131. return designate.context.DesignateContext.from_dict(context)
  132. def get_transport_url(url_str=None):
  133. return messaging.TransportURL.parse(CONF, url_str)
  134. def get_client(target, version_cap=None, serializer=None):
  135. if TRANSPORT is None:
  136. raise AssertionError("'TRANSPORT' must not be None")
  137. if serializer is None:
  138. serializer = DesignateObjectSerializer()
  139. serializer = RequestContextSerializer(serializer)
  140. return messaging.RPCClient(
  141. TRANSPORT,
  142. target,
  143. version_cap=version_cap,
  144. serializer=serializer
  145. )
  146. def get_server(target, endpoints, serializer=None):
  147. if TRANSPORT is None:
  148. raise AssertionError("'TRANSPORT' must not be None")
  149. if serializer is None:
  150. serializer = DesignateObjectSerializer()
  151. serializer = RequestContextSerializer(serializer)
  152. access_policy = rpc_dispatcher.DefaultRPCAccessPolicy
  153. return messaging.get_rpc_server(
  154. TRANSPORT,
  155. target,
  156. endpoints,
  157. executor='eventlet',
  158. serializer=serializer,
  159. access_policy=access_policy
  160. )
  161. def get_notification_listener(targets, endpoints, serializer=None, pool=None):
  162. if NOTIFICATION_TRANSPORT is None:
  163. raise AssertionError("'NOTIFICATION_TRANSPORT' must not be None")
  164. if serializer is None:
  165. serializer = JsonPayloadSerializer()
  166. return messaging.get_notification_listener(
  167. NOTIFICATION_TRANSPORT,
  168. targets,
  169. endpoints,
  170. executor='eventlet',
  171. pool=pool,
  172. serializer=serializer
  173. )
  174. def get_notifier(service=None, host=None, publisher_id=None):
  175. if NOTIFIER is None:
  176. raise AssertionError("'NOTIFIER' must not be None")
  177. if not publisher_id:
  178. publisher_id = "%s.%s" % (service, host or CONF.host)
  179. return NOTIFIER.prepare(publisher_id=publisher_id)
  180. def create_transport(url):
  181. exmods = get_allowed_exmods()
  182. return messaging.get_rpc_transport(CONF,
  183. url=url,
  184. allowed_remote_exmods=exmods)
  185. def expected_exceptions():
  186. def outer(f):
  187. @functools.wraps(f)
  188. def exception_wrapper(self, *args, **kwargs):
  189. if not hasattr(EXPECTED_EXCEPTION, 'depth'):
  190. EXPECTED_EXCEPTION.depth = 0
  191. EXPECTED_EXCEPTION.depth += 1
  192. # We only want to wrap the first function wrapped.
  193. if EXPECTED_EXCEPTION.depth > 1:
  194. return f(self, *args, **kwargs)
  195. try:
  196. return f(self, *args, **kwargs)
  197. except designate.exceptions.DesignateException as e:
  198. if e.expected:
  199. raise rpc_dispatcher.ExpectedException()
  200. raise
  201. finally:
  202. EXPECTED_EXCEPTION.depth = 0
  203. return exception_wrapper
  204. return outer