OpenStack library for messaging
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

impl_rabbit.py 54KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317
  1. # Copyright 2011 OpenStack Foundation
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  4. # not use this file except in compliance with the License. You may obtain
  5. # a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. # License for the specific language governing permissions and limitations
  13. # under the License.
  14. import contextlib
  15. import errno
  16. import functools
  17. import itertools
  18. import math
  19. import os
  20. import random
  21. import socket
  22. import ssl
  23. import sys
  24. import threading
  25. import time
  26. import uuid
  27. import kombu
  28. import kombu.connection
  29. import kombu.entity
  30. import kombu.messaging
  31. from oslo_config import cfg
  32. from oslo_log import log as logging
  33. from oslo_utils import eventletutils
  34. import six
  35. import six.moves
  36. from six.moves.urllib import parse
  37. import oslo_messaging
  38. from oslo_messaging._drivers import amqp as rpc_amqp
  39. from oslo_messaging._drivers import amqpdriver
  40. from oslo_messaging._drivers import base
  41. from oslo_messaging._drivers import common as rpc_common
  42. from oslo_messaging._drivers import pool
  43. from oslo_messaging._i18n import _
  44. from oslo_messaging._i18n import _LE
  45. from oslo_messaging._i18n import _LI
  46. from oslo_messaging._i18n import _LW
  47. from oslo_messaging import _utils
  48. from oslo_messaging import exceptions
  49. # NOTE(sileht): don't exists in py2 socket module
  50. TCP_USER_TIMEOUT = 18
  51. rabbit_opts = [
  52. cfg.BoolOpt('ssl',
  53. default=False,
  54. deprecated_name='rabbit_use_ssl',
  55. help='Connect over SSL.'),
  56. cfg.StrOpt('ssl_version',
  57. default='',
  58. deprecated_name='kombu_ssl_version',
  59. help='SSL version to use (valid only if SSL enabled). '
  60. 'Valid values are TLSv1 and SSLv23. SSLv2, SSLv3, '
  61. 'TLSv1_1, and TLSv1_2 may be available on some '
  62. 'distributions.'
  63. ),
  64. cfg.StrOpt('ssl_key_file',
  65. default='',
  66. deprecated_name='kombu_ssl_keyfile',
  67. help='SSL key file (valid only if SSL enabled).'),
  68. cfg.StrOpt('ssl_cert_file',
  69. default='',
  70. deprecated_name='kombu_ssl_certfile',
  71. help='SSL cert file (valid only if SSL enabled).'),
  72. cfg.StrOpt('ssl_ca_file',
  73. default='',
  74. deprecated_name='kombu_ssl_ca_certs',
  75. help='SSL certification authority file '
  76. '(valid only if SSL enabled).'),
  77. cfg.FloatOpt('kombu_reconnect_delay',
  78. default=1.0,
  79. deprecated_group='DEFAULT',
  80. help='How long to wait before reconnecting in response to an '
  81. 'AMQP consumer cancel notification.'),
  82. cfg.StrOpt('kombu_compression',
  83. help="EXPERIMENTAL: Possible values are: gzip, bz2. If not "
  84. "set compression will not be used. This option may not "
  85. "be available in future versions."),
  86. cfg.IntOpt('kombu_missing_consumer_retry_timeout',
  87. deprecated_name="kombu_reconnect_timeout",
  88. default=60,
  89. help='How long to wait a missing client before abandoning to '
  90. 'send it its replies. This value should not be longer '
  91. 'than rpc_response_timeout.'),
  92. cfg.StrOpt('kombu_failover_strategy',
  93. choices=('round-robin', 'shuffle'),
  94. default='round-robin',
  95. help='Determines how the next RabbitMQ node is chosen in case '
  96. 'the one we are currently connected to becomes '
  97. 'unavailable. Takes effect only if more than one '
  98. 'RabbitMQ node is provided in config.'),
  99. cfg.StrOpt('rabbit_login_method',
  100. choices=('PLAIN', 'AMQPLAIN', 'RABBIT-CR-DEMO'),
  101. default='AMQPLAIN',
  102. deprecated_group='DEFAULT',
  103. help='The RabbitMQ login method.'),
  104. cfg.IntOpt('rabbit_retry_interval',
  105. default=1,
  106. help='How frequently to retry connecting with RabbitMQ.'),
  107. cfg.IntOpt('rabbit_retry_backoff',
  108. default=2,
  109. deprecated_group='DEFAULT',
  110. help='How long to backoff for between retries when connecting '
  111. 'to RabbitMQ.'),
  112. cfg.IntOpt('rabbit_interval_max',
  113. default=30,
  114. help='Maximum interval of RabbitMQ connection retries. '
  115. 'Default is 30 seconds.'),
  116. cfg.BoolOpt('rabbit_ha_queues',
  117. default=False,
  118. deprecated_group='DEFAULT',
  119. help='Try to use HA queues in RabbitMQ (x-ha-policy: all). '
  120. 'If you change this option, you must wipe the RabbitMQ '
  121. 'database. In RabbitMQ 3.0, queue mirroring is no longer '
  122. 'controlled by the x-ha-policy argument when declaring a '
  123. 'queue. If you just want to make sure that all queues (except '
  124. 'those with auto-generated names) are mirrored across all '
  125. 'nodes, run: '
  126. """\"rabbitmqctl set_policy HA '^(?!amq\.).*' """
  127. """'{"ha-mode": "all"}' \""""),
  128. cfg.IntOpt('rabbit_transient_queues_ttl',
  129. min=1,
  130. default=1800,
  131. help='Positive integer representing duration in seconds for '
  132. 'queue TTL (x-expires). Queues which are unused for the '
  133. 'duration of the TTL are automatically deleted. The '
  134. 'parameter affects only reply and fanout queues.'),
  135. cfg.IntOpt('rabbit_qos_prefetch_count',
  136. default=0,
  137. help='Specifies the number of messages to prefetch. Setting to '
  138. 'zero allows unlimited messages.'),
  139. cfg.IntOpt('heartbeat_timeout_threshold',
  140. default=60,
  141. help="Number of seconds after which the Rabbit broker is "
  142. "considered down if heartbeat's keep-alive fails "
  143. "(0 disable the heartbeat). EXPERIMENTAL"),
  144. cfg.IntOpt('heartbeat_rate',
  145. default=2,
  146. help='How often times during the heartbeat_timeout_threshold '
  147. 'we check the heartbeat.'),
  148. ]
  149. LOG = logging.getLogger(__name__)
  150. def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl):
  151. """Construct the arguments for declaring a queue.
  152. If the rabbit_ha_queues option is set, we try to declare a mirrored queue
  153. as described here:
  154. http://www.rabbitmq.com/ha.html
  155. Setting x-ha-policy to all means that the queue will be mirrored
  156. to all nodes in the cluster. In RabbitMQ 3.0, queue mirroring is
  157. no longer controlled by the x-ha-policy argument when declaring a
  158. queue. If you just want to make sure that all queues (except those
  159. with auto-generated names) are mirrored across all nodes, run:
  160. rabbitmqctl set_policy HA '^(?!amq\.).*' '{"ha-mode": "all"}'
  161. If the rabbit_queue_ttl option is > 0, then the queue is
  162. declared with the "Queue TTL" value as described here:
  163. https://www.rabbitmq.com/ttl.html
  164. Setting a queue TTL causes the queue to be automatically deleted
  165. if it is unused for the TTL duration. This is a helpful safeguard
  166. to prevent queues with zero consumers from growing without bound.
  167. """
  168. args = {}
  169. if rabbit_ha_queues:
  170. args['x-ha-policy'] = 'all'
  171. if rabbit_queue_ttl > 0:
  172. args['x-expires'] = rabbit_queue_ttl * 1000
  173. return args
  174. class RabbitMessage(dict):
  175. def __init__(self, raw_message):
  176. super(RabbitMessage, self).__init__(
  177. rpc_common.deserialize_msg(raw_message.payload))
  178. LOG.trace('RabbitMessage.Init: message %s', self)
  179. self._raw_message = raw_message
  180. def acknowledge(self):
  181. LOG.trace('RabbitMessage.acknowledge: message %s', self)
  182. self._raw_message.ack()
  183. def requeue(self):
  184. LOG.trace('RabbitMessage.requeue: message %s', self)
  185. self._raw_message.requeue()
  186. class Consumer(object):
  187. """Consumer class."""
  188. def __init__(self, exchange_name, queue_name, routing_key, type, durable,
  189. exchange_auto_delete, queue_auto_delete, callback,
  190. nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0):
  191. """Init the Consumer class with the exchange_name, routing_key,
  192. type, durable auto_delete
  193. """
  194. self.queue_name = queue_name
  195. self.exchange_name = exchange_name
  196. self.routing_key = routing_key
  197. self.exchange_auto_delete = exchange_auto_delete
  198. self.queue_auto_delete = queue_auto_delete
  199. self.durable = durable
  200. self.callback = callback
  201. self.type = type
  202. self.nowait = nowait
  203. self.queue_arguments = _get_queue_arguments(rabbit_ha_queues,
  204. rabbit_queue_ttl)
  205. self.queue = None
  206. self._declared_on = None
  207. self.exchange = kombu.entity.Exchange(
  208. name=exchange_name,
  209. type=type,
  210. durable=self.durable,
  211. auto_delete=self.exchange_auto_delete)
  212. def declare(self, conn):
  213. """Re-declare the queue after a rabbit (re)connect."""
  214. self.queue = kombu.entity.Queue(
  215. name=self.queue_name,
  216. channel=conn.channel,
  217. exchange=self.exchange,
  218. durable=self.durable,
  219. auto_delete=self.queue_auto_delete,
  220. routing_key=self.routing_key,
  221. queue_arguments=self.queue_arguments)
  222. try:
  223. LOG.debug('[%s] Queue.declare: %s',
  224. conn.connection_id, self.queue_name)
  225. self.queue.declare()
  226. except conn.connection.channel_errors as exc:
  227. # NOTE(jrosenboom): This exception may be triggered by a race
  228. # condition. Simply retrying will solve the error most of the time
  229. # and should work well enough as a workaround until the race
  230. # condition itself can be fixed.
  231. # See https://bugs.launchpad.net/neutron/+bug/1318721 for details.
  232. if exc.code == 404:
  233. self.queue.declare()
  234. else:
  235. raise
  236. except kombu.exceptions.ConnectionError as exc:
  237. # NOTE(gsantomaggio): This exception happens when the
  238. # connection is established,but it fails to create the queue.
  239. # Add some delay to avoid too many requests to the server.
  240. # See: https://bugs.launchpad.net/oslo.messaging/+bug/1822778
  241. # for details.
  242. if exc.code == 541:
  243. interval = 2
  244. info = {'sleep_time': interval,
  245. 'queue': self.queue_name,
  246. 'err_str': exc
  247. }
  248. LOG.error(_LE('Internal amqp error (541) '
  249. 'during queue declare,'
  250. 'retrying in %(sleep_time)s seconds. '
  251. 'Queue: [%(queue)s], '
  252. 'error message: [%(err_str)s]'), info)
  253. time.sleep(interval)
  254. self.queue.declare()
  255. else:
  256. raise
  257. self._declared_on = conn.channel
  258. def consume(self, conn, tag):
  259. """Actually declare the consumer on the amqp channel. This will
  260. start the flow of messages from the queue. Using the
  261. Connection.consume() will process the messages,
  262. calling the appropriate callback.
  263. """
  264. # Ensure we are on the correct channel before consuming
  265. if conn.channel != self._declared_on:
  266. self.declare(conn)
  267. try:
  268. self.queue.consume(callback=self._callback,
  269. consumer_tag=six.text_type(tag),
  270. nowait=self.nowait)
  271. except conn.connection.channel_errors as exc:
  272. # We retries once because of some races that we can
  273. # recover before informing the deployer
  274. # bugs.launchpad.net/oslo.messaging/+bug/1581148
  275. # bugs.launchpad.net/oslo.messaging/+bug/1609766
  276. # bugs.launchpad.net/neutron/+bug/1318721
  277. # 406 error code relates to messages that are doubled ack'd
  278. # At any channel error, the RabbitMQ closes
  279. # the channel, but the amqp-lib quietly re-open
  280. # it. So, we must reset all tags and declare
  281. # all consumers again.
  282. conn._new_tags = set(conn._consumers.values())
  283. if exc.code == 404 or (exc.code == 406 and
  284. exc.method_name == 'Basic.ack'):
  285. self.declare(conn)
  286. self.queue.consume(callback=self._callback,
  287. consumer_tag=six.text_type(tag),
  288. nowait=self.nowait)
  289. else:
  290. raise
  291. def cancel(self, tag):
  292. LOG.trace('ConsumerBase.cancel: canceling %s', tag)
  293. self.queue.cancel(six.text_type(tag))
  294. def _callback(self, message):
  295. """Call callback with deserialized message.
  296. Messages that are processed and ack'ed.
  297. """
  298. m2p = getattr(self.queue.channel, 'message_to_python', None)
  299. if m2p:
  300. message = m2p(message)
  301. try:
  302. self.callback(RabbitMessage(message))
  303. except Exception:
  304. LOG.exception(_LE("Failed to process message"
  305. " ... skipping it."))
  306. message.reject()
  307. class DummyConnectionLock(_utils.DummyLock):
  308. def heartbeat_acquire(self):
  309. pass
  310. class ConnectionLock(DummyConnectionLock):
  311. """Lock object to protect access to the kombu connection
  312. This is a lock object to protect access to the kombu connection
  313. object between the heartbeat thread and the driver thread.
  314. They are two way to acquire this lock:
  315. * lock.acquire()
  316. * lock.heartbeat_acquire()
  317. In both case lock.release(), release the lock.
  318. The goal is that the heartbeat thread always have the priority
  319. for acquiring the lock. This ensures we have no heartbeat
  320. starvation when the driver sends a lot of messages.
  321. So when lock.heartbeat_acquire() is called next time the lock
  322. is released(), the caller unconditionally acquires
  323. the lock, even someone else have asked for the lock before it.
  324. """
  325. def __init__(self):
  326. self._workers_waiting = 0
  327. self._heartbeat_waiting = False
  328. self._lock_acquired = None
  329. self._monitor = threading.Lock()
  330. self._workers_locks = threading.Condition(self._monitor)
  331. self._heartbeat_lock = threading.Condition(self._monitor)
  332. self._get_thread_id = eventletutils.fetch_current_thread_functor()
  333. def acquire(self):
  334. with self._monitor:
  335. while self._lock_acquired:
  336. self._workers_waiting += 1
  337. self._workers_locks.wait()
  338. self._workers_waiting -= 1
  339. self._lock_acquired = self._get_thread_id()
  340. def heartbeat_acquire(self):
  341. # NOTE(sileht): must be called only one time
  342. with self._monitor:
  343. while self._lock_acquired is not None:
  344. self._heartbeat_waiting = True
  345. self._heartbeat_lock.wait()
  346. self._heartbeat_waiting = False
  347. self._lock_acquired = self._get_thread_id()
  348. def release(self):
  349. with self._monitor:
  350. if self._lock_acquired is None:
  351. raise RuntimeError("We can't release a not acquired lock")
  352. thread_id = self._get_thread_id()
  353. if self._lock_acquired != thread_id:
  354. raise RuntimeError("We can't release lock acquired by another "
  355. "thread/greenthread; %s vs %s" %
  356. (self._lock_acquired, thread_id))
  357. self._lock_acquired = None
  358. if self._heartbeat_waiting:
  359. self._heartbeat_lock.notify()
  360. elif self._workers_waiting > 0:
  361. self._workers_locks.notify()
  362. @contextlib.contextmanager
  363. def for_heartbeat(self):
  364. self.heartbeat_acquire()
  365. try:
  366. yield
  367. finally:
  368. self.release()
  369. class Connection(object):
  370. """Connection object."""
  371. pools = {}
  372. def __init__(self, conf, url, purpose):
  373. # NOTE(viktors): Parse config options
  374. driver_conf = conf.oslo_messaging_rabbit
  375. self.interval_start = driver_conf.rabbit_retry_interval
  376. self.interval_stepping = driver_conf.rabbit_retry_backoff
  377. self.interval_max = driver_conf.rabbit_interval_max
  378. self.login_method = driver_conf.rabbit_login_method
  379. self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
  380. self.rabbit_transient_queues_ttl = \
  381. driver_conf.rabbit_transient_queues_ttl
  382. self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
  383. self.heartbeat_timeout_threshold = \
  384. driver_conf.heartbeat_timeout_threshold
  385. self.heartbeat_rate = driver_conf.heartbeat_rate
  386. self.kombu_reconnect_delay = driver_conf.kombu_reconnect_delay
  387. self.amqp_durable_queues = driver_conf.amqp_durable_queues
  388. self.amqp_auto_delete = driver_conf.amqp_auto_delete
  389. self.ssl = driver_conf.ssl
  390. self.kombu_missing_consumer_retry_timeout = \
  391. driver_conf.kombu_missing_consumer_retry_timeout
  392. self.kombu_failover_strategy = driver_conf.kombu_failover_strategy
  393. self.kombu_compression = driver_conf.kombu_compression
  394. if self.ssl:
  395. self.ssl_version = driver_conf.ssl_version
  396. self.ssl_key_file = driver_conf.ssl_key_file
  397. self.ssl_cert_file = driver_conf.ssl_cert_file
  398. self.ssl_ca_file = driver_conf.ssl_ca_file
  399. self._url = ''
  400. if url.hosts:
  401. if url.transport.startswith('kombu+'):
  402. LOG.warning(_LW('Selecting the kombu transport through the '
  403. 'transport url (%s) is a experimental feature '
  404. 'and this is not yet supported.'),
  405. url.transport)
  406. if len(url.hosts) > 1:
  407. random.shuffle(url.hosts)
  408. transformed_urls = [
  409. self._transform_transport_url(url, host)
  410. for host in url.hosts]
  411. self._url = ';'.join(transformed_urls)
  412. elif url.transport.startswith('kombu+'):
  413. # NOTE(sileht): url have a + but no hosts
  414. # (like kombu+memory:///), pass it to kombu as-is
  415. transport = url.transport.replace('kombu+', '')
  416. self._url = "%s://" % transport
  417. if url.virtual_host:
  418. self._url += url.virtual_host
  419. elif not url.hosts:
  420. host = oslo_messaging.transport.TransportHost('')
  421. self._url = self._transform_transport_url(
  422. url, host, default_username='guest', default_password='guest',
  423. default_hostname='localhost')
  424. self._initial_pid = os.getpid()
  425. self._consumers = {}
  426. self._producer = None
  427. self._new_tags = set()
  428. self._active_tags = {}
  429. self._tags = itertools.count(1)
  430. # Set of exchanges and queues declared on the channel to avoid
  431. # unnecessary redeclaration. This set is resetted each time
  432. # the connection is resetted in Connection._set_current_channel
  433. self._declared_exchanges = set()
  434. self._declared_queues = set()
  435. self._consume_loop_stopped = False
  436. self.channel = None
  437. self.purpose = purpose
  438. # NOTE(sileht): if purpose is PURPOSE_LISTEN
  439. # we don't need the lock because we don't
  440. # have a heartbeat thread
  441. if purpose == rpc_common.PURPOSE_SEND:
  442. self._connection_lock = ConnectionLock()
  443. else:
  444. self._connection_lock = DummyConnectionLock()
  445. self.connection_id = str(uuid.uuid4())
  446. self.name = "%s:%d:%s" % (os.path.basename(sys.argv[0]),
  447. os.getpid(),
  448. self.connection_id)
  449. self.connection = kombu.connection.Connection(
  450. self._url, ssl=self._fetch_ssl_params(),
  451. login_method=self.login_method,
  452. heartbeat=self.heartbeat_timeout_threshold,
  453. failover_strategy=self.kombu_failover_strategy,
  454. transport_options={
  455. 'confirm_publish': True,
  456. 'client_properties': {
  457. 'capabilities': {
  458. 'authentication_failure_close': True,
  459. 'connection.blocked': True,
  460. 'consumer_cancel_notify': True
  461. },
  462. 'connection_name': self.name},
  463. 'on_blocked': self._on_connection_blocked,
  464. 'on_unblocked': self._on_connection_unblocked,
  465. },
  466. )
  467. LOG.debug('[%(connection_id)s] Connecting to AMQP server on'
  468. ' %(hostname)s:%(port)s',
  469. self._get_connection_info())
  470. # NOTE(sileht): kombu recommend to run heartbeat_check every
  471. # seconds, but we use a lock around the kombu connection
  472. # so, to not lock to much this lock to most of the time do nothing
  473. # expected waiting the events drain, we start heartbeat_check and
  474. # retrieve the server heartbeat packet only two times more than
  475. # the minimum required for the heartbeat works
  476. # (heartbeat_timeout/heartbeat_rate/2.0, default kombu
  477. # heartbeat_rate is 2)
  478. self._heartbeat_wait_timeout = (
  479. float(self.heartbeat_timeout_threshold) /
  480. float(self.heartbeat_rate) / 2.0)
  481. self._heartbeat_support_log_emitted = False
  482. # NOTE(sileht): just ensure the connection is setuped at startup
  483. with self._connection_lock:
  484. self.ensure_connection()
  485. # NOTE(sileht): if purpose is PURPOSE_LISTEN
  486. # the consume code does the heartbeat stuff
  487. # we don't need a thread
  488. self._heartbeat_thread = None
  489. if purpose == rpc_common.PURPOSE_SEND:
  490. self._heartbeat_start()
  491. LOG.debug('[%(connection_id)s] Connected to AMQP server on '
  492. '%(hostname)s:%(port)s via [%(transport)s] client with'
  493. ' port %(client_port)s.',
  494. self._get_connection_info())
  495. # NOTE(sileht): value chosen according the best practice from kombu
  496. # http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
  497. # For heartbeat, we can set a bigger timeout, and check we receive the
  498. # heartbeat packets regulary
  499. if self._heartbeat_supported_and_enabled():
  500. self._poll_timeout = self._heartbeat_wait_timeout
  501. else:
  502. self._poll_timeout = 1
  503. if self._url.startswith('memory://'):
  504. # Kludge to speed up tests.
  505. self.connection.transport.polling_interval = 0.0
  506. # Fixup logging
  507. self.connection.hostname = "memory_driver"
  508. self.connection.port = 1234
  509. self._poll_timeout = 0.05
  510. # FIXME(markmc): use oslo sslutils when it is available as a library
  511. _SSL_PROTOCOLS = {
  512. "tlsv1": ssl.PROTOCOL_TLSv1,
  513. "sslv23": ssl.PROTOCOL_SSLv23
  514. }
  515. _OPTIONAL_PROTOCOLS = {
  516. 'sslv2': 'PROTOCOL_SSLv2',
  517. 'sslv3': 'PROTOCOL_SSLv3',
  518. 'tlsv1_1': 'PROTOCOL_TLSv1_1',
  519. 'tlsv1_2': 'PROTOCOL_TLSv1_2',
  520. }
  521. for protocol in _OPTIONAL_PROTOCOLS:
  522. try:
  523. _SSL_PROTOCOLS[protocol] = getattr(ssl,
  524. _OPTIONAL_PROTOCOLS[protocol])
  525. except AttributeError:
  526. pass
  527. @classmethod
  528. def validate_ssl_version(cls, version):
  529. key = version.lower()
  530. try:
  531. return cls._SSL_PROTOCOLS[key]
  532. except KeyError:
  533. raise RuntimeError(_("Invalid SSL version : %s") % version)
  534. def _transform_transport_url(self, url, host, default_username='',
  535. default_password='', default_hostname=''):
  536. transport = url.transport.replace('kombu+', '')
  537. transport = transport.replace('rabbit', 'amqp')
  538. return '%s://%s:%s@%s:%s/%s' % (
  539. transport,
  540. parse.quote(host.username or default_username),
  541. parse.quote(host.password or default_password),
  542. self._parse_url_hostname(host.hostname) or default_hostname,
  543. str(host.port or 5672),
  544. url.virtual_host or '')
  545. def _parse_url_hostname(self, hostname):
  546. """Handles hostname returned from urlparse and checks whether it's
  547. ipaddress. If it's ipaddress it ensures that it has brackets for IPv6.
  548. """
  549. return '[%s]' % hostname if ':' in hostname else hostname
  550. def _fetch_ssl_params(self):
  551. """Handles fetching what ssl params should be used for the connection
  552. (if any).
  553. """
  554. if self.ssl:
  555. ssl_params = dict()
  556. # http://docs.python.org/library/ssl.html - ssl.wrap_socket
  557. if self.ssl_version:
  558. ssl_params['ssl_version'] = self.validate_ssl_version(
  559. self.ssl_version)
  560. if self.ssl_key_file:
  561. ssl_params['keyfile'] = self.ssl_key_file
  562. if self.ssl_cert_file:
  563. ssl_params['certfile'] = self.ssl_cert_file
  564. if self.ssl_ca_file:
  565. ssl_params['ca_certs'] = self.ssl_ca_file
  566. # We might want to allow variations in the
  567. # future with this?
  568. ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
  569. return ssl_params or True
  570. return False
  571. @staticmethod
  572. def _on_connection_blocked(reason):
  573. LOG.error(_LE("The broker has blocked the connection: %s"), reason)
  574. @staticmethod
  575. def _on_connection_unblocked():
  576. LOG.info(_LI("The broker has unblocked the connection"))
  577. def ensure_connection(self):
  578. # NOTE(sileht): we reset the channel and ensure
  579. # the kombu underlying connection works
  580. def on_error(exc, interval):
  581. LOG.error("Connection failed: %s (retrying in %s seconds)",
  582. str(exc), interval)
  583. self._set_current_channel(None)
  584. self.connection.ensure_connection(errback=on_error)
  585. self._set_current_channel(self.connection.channel())
  586. self.set_transport_socket_timeout()
  587. def ensure(self, method, retry=None,
  588. recoverable_error_callback=None, error_callback=None,
  589. timeout_is_error=True):
  590. """Will retry up to retry number of times.
  591. retry = None or -1 means to retry forever
  592. retry = 0 means no retry
  593. retry = N means N retries
  594. NOTE(sileht): Must be called within the connection lock
  595. """
  596. current_pid = os.getpid()
  597. if self._initial_pid != current_pid:
  598. LOG.warning(_LW("Process forked after connection established! "
  599. "This can result in unpredictable behavior. "
  600. "See: https://docs.openstack.org/oslo.messaging/"
  601. "latest/reference/transport.html"))
  602. self._initial_pid = current_pid
  603. if retry is None or retry < 0:
  604. retry = float('inf')
  605. def on_error(exc, interval):
  606. LOG.debug("[%s] Received recoverable error from kombu:"
  607. % self.connection_id,
  608. exc_info=True)
  609. recoverable_error_callback and recoverable_error_callback(exc)
  610. interval = (self.kombu_reconnect_delay + interval
  611. if self.kombu_reconnect_delay > 0
  612. else interval)
  613. info = {'err_str': exc, 'sleep_time': interval}
  614. info.update(self._get_connection_info(conn_error=True))
  615. if 'Socket closed' in six.text_type(exc):
  616. LOG.error(_LE('[%(connection_id)s] AMQP server'
  617. ' %(hostname)s:%(port)s closed'
  618. ' the connection. Check login credentials:'
  619. ' %(err_str)s'), info)
  620. else:
  621. LOG.error(_LE('[%(connection_id)s] AMQP server on '
  622. '%(hostname)s:%(port)s is unreachable: '
  623. '%(err_str)s. Trying again in '
  624. '%(sleep_time)d seconds.'), info)
  625. # XXX(nic): when reconnecting to a RabbitMQ cluster
  626. # with mirrored queues in use, the attempt to release the
  627. # connection can hang "indefinitely" somewhere deep down
  628. # in Kombu. Blocking the thread for a bit prior to
  629. # release seems to kludge around the problem where it is
  630. # otherwise reproduceable.
  631. # TODO(sileht): Check if this is useful since we
  632. # use kombu for HA connection, the interval_step
  633. # should sufficient, because the underlying kombu transport
  634. # connection object freed.
  635. if self.kombu_reconnect_delay > 0:
  636. LOG.trace('Delaying reconnect for %1.1f seconds ...',
  637. self.kombu_reconnect_delay)
  638. time.sleep(self.kombu_reconnect_delay)
  639. def on_reconnection(new_channel):
  640. """Callback invoked when the kombu reconnects and creates
  641. a new channel, we use it the reconfigure our consumers.
  642. """
  643. self._set_current_channel(new_channel)
  644. self.set_transport_socket_timeout()
  645. LOG.info(_LI('[%(connection_id)s] Reconnected to AMQP server on '
  646. '%(hostname)s:%(port)s via [%(transport)s] client '
  647. 'with port %(client_port)s.'),
  648. self._get_connection_info())
  649. def execute_method(channel):
  650. self._set_current_channel(channel)
  651. method()
  652. try:
  653. autoretry_method = self.connection.autoretry(
  654. execute_method, channel=self.channel,
  655. max_retries=retry,
  656. errback=on_error,
  657. interval_start=self.interval_start or 1,
  658. interval_step=self.interval_stepping,
  659. interval_max=self.interval_max,
  660. on_revive=on_reconnection)
  661. ret, channel = autoretry_method()
  662. self._set_current_channel(channel)
  663. return ret
  664. except rpc_amqp.AMQPDestinationNotFound:
  665. # NOTE(sileht): we must reraise this without
  666. # trigger error_callback
  667. raise
  668. except Exception as exc:
  669. error_callback and error_callback(exc)
  670. self._set_current_channel(None)
  671. # NOTE(sileht): number of retry exceeded and the connection
  672. # is still broken
  673. info = {'err_str': exc, 'retry': retry}
  674. info.update(self.connection.info())
  675. msg = _('Unable to connect to AMQP server on '
  676. '%(hostname)s:%(port)s after %(retry)s '
  677. 'tries: %(err_str)s') % info
  678. LOG.error(msg)
  679. raise exceptions.MessageDeliveryFailure(msg)
  680. def _set_current_channel(self, new_channel):
  681. """Change the channel to use.
  682. NOTE(sileht): Must be called within the connection lock
  683. """
  684. if new_channel == self.channel:
  685. return
  686. if self.channel is not None:
  687. self._declared_queues.clear()
  688. self._declared_exchanges.clear()
  689. self.connection.maybe_close_channel(self.channel)
  690. self.channel = new_channel
  691. if new_channel is not None:
  692. if self.purpose == rpc_common.PURPOSE_LISTEN:
  693. self._set_qos(new_channel)
  694. self._producer = kombu.messaging.Producer(new_channel)
  695. for consumer in self._consumers:
  696. consumer.declare(self)
  697. def _set_qos(self, channel):
  698. """Set QoS prefetch count on the channel"""
  699. if self.rabbit_qos_prefetch_count > 0:
  700. channel.basic_qos(0,
  701. self.rabbit_qos_prefetch_count,
  702. False)
  703. def close(self):
  704. """Close/release this connection."""
  705. self._heartbeat_stop()
  706. if self.connection:
  707. for consumer in six.moves.filter(lambda c: c.type == 'fanout',
  708. self._consumers):
  709. LOG.debug('[connection close] Deleting fanout '
  710. 'queue: %s ' % consumer.queue.name)
  711. consumer.queue.delete()
  712. self._set_current_channel(None)
  713. self.connection.release()
  714. self.connection = None
  715. def reset(self):
  716. """Reset a connection so it can be used again."""
  717. with self._connection_lock:
  718. try:
  719. for consumer, tag in self._consumers.items():
  720. consumer.cancel(tag=tag)
  721. except kombu.exceptions.OperationalError:
  722. self.ensure_connection()
  723. self._consumers.clear()
  724. self._active_tags.clear()
  725. self._new_tags.clear()
  726. self._tags = itertools.count(1)
  727. def _heartbeat_supported_and_enabled(self):
  728. if self.heartbeat_timeout_threshold <= 0:
  729. return False
  730. if self.connection.supports_heartbeats:
  731. return True
  732. elif not self._heartbeat_support_log_emitted:
  733. LOG.warning(_LW("Heartbeat support requested but it is not "
  734. "supported by the kombu driver or the broker"))
  735. self._heartbeat_support_log_emitted = True
  736. return False
  737. def set_transport_socket_timeout(self, timeout=None):
  738. # NOTE(sileht): they are some case where the heartbeat check
  739. # or the producer.send return only when the system socket
  740. # timeout if reach. kombu doesn't allow use to customise this
  741. # timeout so for py-amqp we tweak ourself
  742. # NOTE(dmitryme): Current approach works with amqp==1.4.9 and
  743. # kombu==3.0.33. Once the commit below is released, we should
  744. # try to set the socket timeout in the constructor:
  745. # https://github.com/celery/py-amqp/pull/64
  746. heartbeat_timeout = self.heartbeat_timeout_threshold
  747. if self._heartbeat_supported_and_enabled():
  748. # NOTE(sileht): we are supposed to send heartbeat every
  749. # heartbeat_timeout, no need to wait more otherwise will
  750. # disconnect us, so raise timeout earlier ourself
  751. if timeout is None:
  752. timeout = heartbeat_timeout
  753. else:
  754. timeout = min(heartbeat_timeout, timeout)
  755. try:
  756. sock = self.channel.connection.sock
  757. except AttributeError as e:
  758. # Level is set to debug because otherwise we would spam the logs
  759. LOG.debug('[%s] Failed to get socket attribute: %s'
  760. % (self.connection_id, str(e)))
  761. else:
  762. sock.settimeout(timeout)
  763. # TCP_USER_TIMEOUT is not defined on Windows and Mac OS X
  764. if sys.platform != 'win32' and sys.platform != 'darwin':
  765. try:
  766. timeout = timeout * 1000 if timeout is not None else 0
  767. # NOTE(gdavoian): only integers and strings are allowed
  768. # as socket options' values, and TCP_USER_TIMEOUT option
  769. # can take only integer values, so we round-up the timeout
  770. # to the nearest integer in order to ensure that the
  771. # connection is not broken before the expected timeout
  772. sock.setsockopt(socket.IPPROTO_TCP,
  773. TCP_USER_TIMEOUT,
  774. int(math.ceil(timeout)))
  775. except socket.error as error:
  776. code = error[0]
  777. # TCP_USER_TIMEOUT not defined on kernels <2.6.37
  778. if code != errno.ENOPROTOOPT:
  779. raise
  780. @contextlib.contextmanager
  781. def _transport_socket_timeout(self, timeout):
  782. self.set_transport_socket_timeout(timeout)
  783. yield
  784. self.set_transport_socket_timeout()
  785. def _heartbeat_check(self):
  786. # NOTE(sileht): we are supposed to send at least one heartbeat
  787. # every heartbeat_timeout_threshold, so no need to way more
  788. self.connection.heartbeat_check(rate=self.heartbeat_rate)
  789. def _heartbeat_start(self):
  790. if self._heartbeat_supported_and_enabled():
  791. self._heartbeat_exit_event = eventletutils.Event()
  792. self._heartbeat_thread = threading.Thread(
  793. target=self._heartbeat_thread_job)
  794. self._heartbeat_thread.daemon = True
  795. self._heartbeat_thread.start()
  796. else:
  797. self._heartbeat_thread = None
  798. def _heartbeat_stop(self):
  799. if self._heartbeat_thread is not None:
  800. self._heartbeat_exit_event.set()
  801. self._heartbeat_thread.join()
  802. self._heartbeat_thread = None
  803. def _heartbeat_thread_job(self):
  804. """Thread that maintains inactive connections
  805. """
  806. # NOTE(hberaud): Python2 doesn't have ConnectionRefusedError
  807. # defined so to switch connections destination on failure
  808. # with python2 and python3 we need to wrapp adapt connection refused
  809. try:
  810. ConnectRefuseError = ConnectionRefusedError
  811. except NameError:
  812. ConnectRefuseError = socket.error
  813. while not self._heartbeat_exit_event.is_set():
  814. with self._connection_lock.for_heartbeat():
  815. try:
  816. try:
  817. self._heartbeat_check()
  818. # NOTE(sileht): We need to drain event to receive
  819. # heartbeat from the broker but don't hold the
  820. # connection too much times. In amqpdriver a connection
  821. # is used exclusively for read or for write, so we have
  822. # to do this for connection used for write drain_events
  823. # already do that for other connection
  824. try:
  825. self.connection.drain_events(timeout=0.001)
  826. except socket.timeout:
  827. pass
  828. # NOTE(hberaud): In a clustered rabbitmq when
  829. # a node disappears, we get a ConnectionRefusedError
  830. # because the socket get disconnected.
  831. # The socket access yields a OSError because the heartbeat
  832. # tries to reach an unreachable host (No route to host).
  833. # Catch these exceptions to ensure that we call
  834. # ensure_connection for switching the
  835. # connection destination.
  836. except (socket.timeout,
  837. ConnectRefuseError,
  838. OSError,
  839. kombu.exceptions.OperationalError) as exc:
  840. LOG.info(_LI("A recoverable connection/channel error "
  841. "occurred, trying to reconnect: %s"), exc)
  842. self.ensure_connection()
  843. except Exception:
  844. LOG.warning(_LW("Unexpected error during heartbeat "
  845. "thread processing, retrying..."))
  846. LOG.debug('Exception', exc_info=True)
  847. self._heartbeat_exit_event.wait(
  848. timeout=self._heartbeat_wait_timeout)
  849. self._heartbeat_exit_event.clear()
  850. def declare_consumer(self, consumer):
  851. """Create a Consumer using the class that was passed in and
  852. add it to our list of consumers
  853. """
  854. def _connect_error(exc):
  855. log_info = {'topic': consumer.routing_key, 'err_str': exc}
  856. LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': "
  857. "%(err_str)s"), log_info)
  858. def _declare_consumer():
  859. consumer.declare(self)
  860. tag = self._active_tags.get(consumer.queue_name)
  861. if tag is None:
  862. tag = next(self._tags)
  863. self._active_tags[consumer.queue_name] = tag
  864. self._new_tags.add(tag)
  865. self._consumers[consumer] = tag
  866. return consumer
  867. with self._connection_lock:
  868. return self.ensure(_declare_consumer,
  869. error_callback=_connect_error)
  870. def consume(self, timeout=None):
  871. """Consume from all queues/consumers."""
  872. timer = rpc_common.DecayingTimer(duration=timeout)
  873. timer.start()
  874. def _raise_timeout():
  875. raise rpc_common.Timeout()
  876. def _recoverable_error_callback(exc):
  877. if not isinstance(exc, rpc_common.Timeout):
  878. self._new_tags = set(self._consumers.values())
  879. timer.check_return(_raise_timeout)
  880. def _error_callback(exc):
  881. _recoverable_error_callback(exc)
  882. LOG.error(_LE('Failed to consume message from queue: %s'),
  883. exc)
  884. def _consume():
  885. # NOTE(sileht): in case the acknowledgment or requeue of a
  886. # message fail, the kombu transport can be disconnected
  887. # In this case, we must redeclare our consumers, so raise
  888. # a recoverable error to trigger the reconnection code.
  889. if not self.connection.connected:
  890. raise self.connection.recoverable_connection_errors[0]
  891. while self._new_tags:
  892. for consumer, tag in self._consumers.items():
  893. if tag in self._new_tags:
  894. consumer.consume(self, tag=tag)
  895. self._new_tags.remove(tag)
  896. poll_timeout = (self._poll_timeout if timeout is None
  897. else min(timeout, self._poll_timeout))
  898. while True:
  899. if self._consume_loop_stopped:
  900. return
  901. if self._heartbeat_supported_and_enabled():
  902. self._heartbeat_check()
  903. try:
  904. self.connection.drain_events(timeout=poll_timeout)
  905. return
  906. except socket.timeout:
  907. poll_timeout = timer.check_return(
  908. _raise_timeout, maximum=self._poll_timeout)
  909. except self.connection.channel_errors as exc:
  910. if exc.code == 406 and exc.method_name == 'Basic.ack':
  911. # NOTE(gordc): occasionally multiple workers will grab
  912. # same message and acknowledge it. if it happens, meh.
  913. raise self.connection.recoverable_channel_errors[0]
  914. raise
  915. with self._connection_lock:
  916. self.ensure(_consume,
  917. recoverable_error_callback=_recoverable_error_callback,
  918. error_callback=_error_callback)
  919. def stop_consuming(self):
  920. self._consume_loop_stopped = True
  921. def declare_direct_consumer(self, topic, callback):
  922. """Create a 'direct' queue.
  923. In nova's use, this is generally a msg_id queue used for
  924. responses for call/multicall
  925. """
  926. # TODO(obondarev): use default exchange since T release
  927. consumer = Consumer(exchange_name=topic,
  928. queue_name=topic,
  929. routing_key=topic,
  930. type='direct',
  931. durable=False,
  932. exchange_auto_delete=True,
  933. queue_auto_delete=False,
  934. callback=callback,
  935. rabbit_ha_queues=self.rabbit_ha_queues,
  936. rabbit_queue_ttl=self.rabbit_transient_queues_ttl)
  937. self.declare_consumer(consumer)
  938. def declare_topic_consumer(self, exchange_name, topic, callback=None,
  939. queue_name=None):
  940. """Create a 'topic' consumer."""
  941. consumer = Consumer(exchange_name=exchange_name,
  942. queue_name=queue_name or topic,
  943. routing_key=topic,
  944. type='topic',
  945. durable=self.amqp_durable_queues,
  946. exchange_auto_delete=self.amqp_auto_delete,
  947. queue_auto_delete=self.amqp_auto_delete,
  948. callback=callback,
  949. rabbit_ha_queues=self.rabbit_ha_queues)
  950. self.declare_consumer(consumer)
  951. def declare_fanout_consumer(self, topic, callback):
  952. """Create a 'fanout' consumer."""
  953. unique = uuid.uuid4().hex
  954. exchange_name = '%s_fanout' % topic
  955. queue_name = '%s_fanout_%s' % (topic, unique)
  956. consumer = Consumer(exchange_name=exchange_name,
  957. queue_name=queue_name,
  958. routing_key=topic,
  959. type='fanout',
  960. durable=False,
  961. exchange_auto_delete=True,
  962. queue_auto_delete=False,
  963. callback=callback,
  964. rabbit_ha_queues=self.rabbit_ha_queues,
  965. rabbit_queue_ttl=self.rabbit_transient_queues_ttl)
  966. self.declare_consumer(consumer)
  967. def _ensure_publishing(self, method, exchange, msg, routing_key=None,
  968. timeout=None, retry=None):
  969. """Send to a publisher based on the publisher class."""
  970. def _error_callback(exc):
  971. log_info = {'topic': exchange.name, 'err_str': exc}
  972. LOG.error(_LE("Failed to publish message to topic "
  973. "'%(topic)s': %(err_str)s"), log_info)
  974. LOG.debug('Exception', exc_info=exc)
  975. method = functools.partial(method, exchange, msg, routing_key, timeout)
  976. with self._connection_lock:
  977. self.ensure(method, retry=retry, error_callback=_error_callback)
  978. def _get_connection_info(self, conn_error=False):
  979. # Bug #1745166: set 'conn_error' true if this is being called when the
  980. # connection is in a known error state. Otherwise attempting to access
  981. # the connection's socket while it is in an error state will cause
  982. # py-amqp to attempt reconnecting.
  983. ci = self.connection.info()
  984. info = dict([(k, ci.get(k)) for k in
  985. ['hostname', 'port', 'transport']])
  986. client_port = None
  987. if (not conn_error and self.channel and
  988. hasattr(self.channel.connection, 'sock') and
  989. self.channel.connection.sock):
  990. client_port = self.channel.connection.sock.getsockname()[1]
  991. info.update({'client_port': client_port,
  992. 'connection_id': self.connection_id})
  993. return info
  994. def _publish(self, exchange, msg, routing_key=None, timeout=None):
  995. """Publish a message."""
  996. if not (exchange.passive or exchange.name in self._declared_exchanges):
  997. exchange(self.channel).declare()
  998. self._declared_exchanges.add(exchange.name)
  999. log_info = {'msg': msg,
  1000. 'who': exchange or 'default',
  1001. 'key': routing_key}
  1002. LOG.trace('Connection._publish: sending message %(msg)s to'
  1003. ' %(who)s with routing key %(key)s', log_info)
  1004. # NOTE(sileht): no need to wait more, caller expects
  1005. # a answer before timeout is reached
  1006. with self._transport_socket_timeout(timeout):
  1007. self._producer.publish(msg,
  1008. exchange=exchange,
  1009. routing_key=routing_key,
  1010. expiration=timeout,
  1011. compression=self.kombu_compression)
  1012. def _publish_and_creates_default_queue(self, exchange, msg,
  1013. routing_key=None, timeout=None):
  1014. """Publisher that declares a default queue
  1015. When the exchange is missing instead of silently creates an exchange
  1016. not binded to a queue, this publisher creates a default queue
  1017. named with the routing_key
  1018. This is mainly used to not miss notification in case of nobody consumes
  1019. them yet. If the future consumer bind the default queue it can retrieve
  1020. missing messages.
  1021. _set_current_channel is responsible to cleanup the cache.
  1022. """
  1023. queue_indentifier = (exchange.name, routing_key)
  1024. # NOTE(sileht): We only do it once per reconnection
  1025. # the Connection._set_current_channel() is responsible to clear
  1026. # this cache
  1027. if queue_indentifier not in self._declared_queues:
  1028. queue = kombu.entity.Queue(
  1029. channel=self.channel,
  1030. exchange=exchange,
  1031. durable=exchange.durable,
  1032. auto_delete=exchange.auto_delete,
  1033. name=routing_key,
  1034. routing_key=routing_key,
  1035. queue_arguments=_get_queue_arguments(self.rabbit_ha_queues, 0))
  1036. log_info = {'key': routing_key, 'exchange': exchange}
  1037. LOG.trace(
  1038. 'Connection._publish_and_creates_default_queue: '
  1039. 'declare queue %(key)s on %(exchange)s exchange', log_info)
  1040. queue.declare()
  1041. self._declared_queues.add(queue_indentifier)
  1042. self._publish(exchange, msg, routing_key=routing_key, timeout=timeout)
  1043. def _publish_and_raises_on_missing_exchange(self, exchange, msg,
  1044. routing_key=None,
  1045. timeout=None):
  1046. """Publisher that raises exception if exchange is missing."""
  1047. if not exchange.passive:
  1048. raise RuntimeError("_publish_and_retry_on_missing_exchange() must "
  1049. "be called with an passive exchange.")
  1050. try:
  1051. self._publish(exchange, msg, routing_key=routing_key,
  1052. timeout=timeout)
  1053. return
  1054. except self.connection.channel_errors as exc:
  1055. if exc.code == 404:
  1056. # NOTE(noelbk/sileht):
  1057. # If rabbit dies, the consumer can be disconnected before the
  1058. # publisher sends, and if the consumer hasn't declared the
  1059. # queue, the publisher's will send a message to an exchange
  1060. # that's not bound to a queue, and the message wll be lost.
  1061. # So we set passive=True to the publisher exchange and catch
  1062. # the 404 kombu ChannelError and retry until the exchange
  1063. # appears
  1064. raise rpc_amqp.AMQPDestinationNotFound(
  1065. "exchange %s doesn't exists" % exchange.name)
  1066. raise
  1067. def direct_send(self, msg_id, msg):
  1068. """Send a 'direct' message."""
  1069. exchange = kombu.entity.Exchange(name='', # using default exchange
  1070. type='direct',
  1071. durable=False,
  1072. auto_delete=True,
  1073. passive=True)
  1074. self._ensure_publishing(self._publish_and_raises_on_missing_exchange,
  1075. exchange, msg, routing_key=msg_id)
  1076. def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
  1077. """Send a 'topic' message."""
  1078. exchange = kombu.entity.Exchange(
  1079. name=exchange_name,
  1080. type='topic',
  1081. durable=self.amqp_durable_queues,
  1082. auto_delete=self.amqp_auto_delete)
  1083. self._ensure_publishing(self._publish, exchange, msg,
  1084. routing_key=topic, timeout=timeout,
  1085. retry=retry)
  1086. def fanout_send(self, topic, msg, retry=None):
  1087. """Send a 'fanout' message."""
  1088. exchange = kombu.entity.Exchange(name='%s_fanout' % topic,
  1089. type='fanout',
  1090. durable=False,
  1091. auto_delete=True)
  1092. self._ensure_publishing(self._publish, exchange, msg, retry=retry)
  1093. def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
  1094. """Send a notify message on a topic."""
  1095. exchange = kombu.entity.Exchange(
  1096. name=exchange_name,
  1097. type='topic',
  1098. durable=self.amqp_durable_queues,
  1099. auto_delete=self.amqp_auto_delete)
  1100. self._ensure_publishing(self._publish_and_creates_default_queue,
  1101. exchange, msg, routing_key=topic, retry=retry)
  1102. class RabbitDriver(amqpdriver.AMQPDriverBase):
  1103. """RabbitMQ Driver
  1104. The ``rabbit`` driver is the default driver used in OpenStack's
  1105. integration tests.
  1106. The driver is aliased as ``kombu`` to support upgrading existing
  1107. installations with older settings.
  1108. """
  1109. def __init__(self, conf, url,
  1110. default_exchange=None,
  1111. allowed_remote_exmods=None):
  1112. opt_group = cfg.OptGroup(name='oslo_messaging_rabbit',
  1113. title='RabbitMQ driver options')
  1114. conf.register_group(opt_group)
  1115. conf.register_opts(rabbit_opts, group=opt_group)
  1116. conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
  1117. conf.register_opts(base.base_opts, group=opt_group)
  1118. conf = rpc_common.ConfigOptsProxy(conf, url, opt_group.name)
  1119. self.missing_destination_retry_timeout = (
  1120. conf.oslo_messaging_rabbit.kombu_missing_consumer_retry_timeout)
  1121. self.prefetch_size = (
  1122. conf.oslo_messaging_rabbit.rabbit_qos_prefetch_count)
  1123. # the pool configuration properties
  1124. max_size = conf.oslo_messaging_rabbit.rpc_conn_pool_size
  1125. min_size = conf.oslo_messaging_rabbit.conn_pool_min_size
  1126. ttl = conf.oslo_messaging_rabbit.conn_pool_ttl
  1127. connection_pool = pool.ConnectionPool(
  1128. conf, max_size, min_size, ttl,
  1129. url, Connection)
  1130. super(RabbitDriver, self).__init__(
  1131. conf, url,
  1132. connection_pool,
  1133. default_exchange,
  1134. allowed_remote_exmods
  1135. )
  1136. def require_features(self, requeue=True):
  1137. pass