oslo.messaging/oslo_messaging/_drivers/pika_driver/pika_connection.py

543 lines
19 KiB
Python

# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import logging
import os
import threading
import futurist
from pika.adapters import select_connection
from pika import exceptions as pika_exceptions
from pika import spec as pika_spec
from oslo_utils import eventletutils
current_thread = eventletutils.fetch_current_thread_functor()
LOG = logging.getLogger(__name__)
class ThreadSafePikaConnection(object):
def __init__(self, parameters=None,
_impl_class=select_connection.SelectConnection):
self.params = parameters
self._connection_lock = threading.Lock()
self._evt_closed = threading.Event()
self._task_queue = collections.deque()
self._pending_connection_futures = set()
create_connection_future = self._register_pending_future()
def on_open_error(conn, err):
create_connection_future.set_exception(
pika_exceptions.AMQPConnectionError(err)
)
self._impl = _impl_class(
parameters=parameters,
on_open_callback=create_connection_future.set_result,
on_open_error_callback=on_open_error,
on_close_callback=self._on_connection_close,
stop_ioloop_on_close=False,
)
self._interrupt_pipein, self._interrupt_pipeout = os.pipe()
self._impl.ioloop.add_handler(self._interrupt_pipein,
self._impl.ioloop.read_interrupt,
select_connection.READ)
self._thread = threading.Thread(target=self._process_io)
self._thread.daemon = True
self._thread_id = None
self._thread.start()
create_connection_future.result()
def _check_called_not_from_event_loop(self):
if current_thread() == self._thread_id:
raise RuntimeError("This call is not allowed from ioloop thread")
def _execute_task(self, func, *args, **kwargs):
if current_thread() == self._thread_id:
return func(*args, **kwargs)
future = futurist.Future()
self._task_queue.append((func, args, kwargs, future))
if self._evt_closed.is_set():
self._notify_all_futures_connection_close()
elif self._interrupt_pipeout is not None:
os.write(self._interrupt_pipeout, b'X')
return future.result()
def _register_pending_future(self):
future = futurist.Future()
self._pending_connection_futures.add(future)
def on_done_callback(fut):
try:
self._pending_connection_futures.remove(fut)
except KeyError:
pass
future.add_done_callback(on_done_callback)
if self._evt_closed.is_set():
self._notify_all_futures_connection_close()
return future
def _notify_all_futures_connection_close(self):
while self._task_queue:
try:
method_res_future = self._task_queue.pop()[3]
except KeyError:
break
else:
method_res_future.set_exception(
pika_exceptions.ConnectionClosed()
)
while self._pending_connection_futures:
try:
pending_connection_future = (
self._pending_connection_futures.pop()
)
except KeyError:
break
else:
pending_connection_future.set_exception(
pika_exceptions.ConnectionClosed()
)
def _on_connection_close(self, conn, reply_code, reply_text):
self._evt_closed.set()
self._notify_all_futures_connection_close()
if self._interrupt_pipeout:
os.close(self._interrupt_pipeout)
os.close(self._interrupt_pipein)
def add_on_close_callback(self, callback):
return self._execute_task(self._impl.add_on_close_callback, callback)
def _do_process_io(self):
while self._task_queue:
func, args, kwargs, future = self._task_queue.pop()
try:
res = func(*args, **kwargs)
except BaseException as e:
LOG.exception(e)
future.set_exception(e)
else:
future.set_result(res)
self._impl.ioloop.poll()
self._impl.ioloop.process_timeouts()
def _process_io(self):
self._thread_id = current_thread()
while not self._evt_closed.is_set():
try:
self._do_process_io()
except BaseException:
LOG.exception("Error during processing connection's IO")
def close(self, *args, **kwargs):
self._check_called_not_from_event_loop()
res = self._execute_task(self._impl.close, *args, **kwargs)
self._evt_closed.wait()
self._thread.join()
return res
def channel(self, channel_number=None):
self._check_called_not_from_event_loop()
channel_opened_future = self._register_pending_future()
impl_channel = self._execute_task(
self._impl.channel,
on_open_callback=channel_opened_future.set_result,
channel_number=channel_number
)
# Create our proxy channel
channel = ThreadSafePikaChannel(impl_channel, self)
# Link implementation channel with our proxy channel
impl_channel._set_cookie(channel)
channel_opened_future.result()
return channel
def add_timeout(self, timeout, callback):
return self._execute_task(self._impl.add_timeout, timeout, callback)
def remove_timeout(self, timeout_id):
return self._execute_task(self._impl.remove_timeout, timeout_id)
@property
def is_closed(self):
return self._impl.is_closed
@property
def is_closing(self):
return self._impl.is_closing
@property
def is_open(self):
return self._impl.is_open
class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
def __init__(self, channel_impl, connection):
self._impl = channel_impl
self._connection = connection
self._delivery_confirmation = False
self._message_returned = False
self._current_future = None
self._evt_closed = threading.Event()
self.add_on_close_callback(self._on_channel_close)
def _execute_task(self, func, *args, **kwargs):
return self._connection._execute_task(func, *args, **kwargs)
def _on_channel_close(self, channel, reply_code, reply_text):
self._evt_closed.set()
if self._current_future:
self._current_future.set_exception(
pika_exceptions.ChannelClosed(reply_code, reply_text))
def _on_message_confirmation(self, frame):
self._current_future.set_result(frame)
def add_on_close_callback(self, callback):
self._execute_task(self._impl.add_on_close_callback, callback)
def add_on_cancel_callback(self, callback):
self._execute_task(self._impl.add_on_cancel_callback, callback)
def __int__(self):
return self.channel_number
@property
def channel_number(self):
return self._impl.channel_number
@property
def is_closed(self):
return self._impl.is_closed
@property
def is_closing(self):
return self._impl.is_closing
@property
def is_open(self):
return self._impl.is_open
def close(self, reply_code=0, reply_text="Normal Shutdown"):
self._impl.close(reply_code=reply_code, reply_text=reply_text)
self._evt_closed.wait()
def _check_called_not_from_event_loop(self):
self._connection._check_called_not_from_event_loop()
def flow(self, active):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(
self._impl.flow, callback=self._current_future.set_result,
active=active
)
return self._current_future.result()
def basic_consume(self, # pylint: disable=R0913
consumer_callback,
queue,
no_ack=False,
exclusive=False,
consumer_tag=None,
arguments=None):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(
self._impl.add_callback, self._current_future.set_result,
replies=[pika_spec.Basic.ConsumeOk], one_shot=True
)
self._impl.add_callback(self._current_future.set_result,
replies=[pika_spec.Basic.ConsumeOk],
one_shot=True)
tag = self._execute_task(
self._impl.basic_consume,
consumer_callback=consumer_callback,
queue=queue,
no_ack=no_ack,
exclusive=exclusive,
consumer_tag=consumer_tag,
arguments=arguments
)
self._current_future.result()
return tag
def basic_cancel(self, consumer_tag):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(
self._impl.basic_cancel,
callback=self._current_future.set_result,
consumer_tag=consumer_tag,
nowait=False)
self._current_future.result()
def basic_ack(self, delivery_tag=0, multiple=False):
return self._execute_task(
self._impl.basic_ack, delivery_tag=delivery_tag, multiple=multiple)
def basic_nack(self, delivery_tag=None, multiple=False, requeue=True):
return self._execute_task(
self._impl.basic_nack, delivery_tag=delivery_tag,
multiple=multiple, requeue=requeue
)
def publish(self, exchange, routing_key, body, # pylint: disable=R0913
properties=None, mandatory=False, immediate=False):
if self._delivery_confirmation:
self._check_called_not_from_event_loop()
# In publisher-acknowledgments mode
self._message_returned = False
self._current_future = futurist.Future()
self._execute_task(self._impl.basic_publish,
exchange=exchange,
routing_key=routing_key,
body=body,
properties=properties,
mandatory=mandatory,
immediate=immediate)
conf_method = self._current_future.result().method
if isinstance(conf_method, pika_spec.Basic.Nack):
raise pika_exceptions.NackError((None,))
else:
assert isinstance(conf_method, pika_spec.Basic.Ack), (
conf_method)
if self._message_returned:
raise pika_exceptions.UnroutableError((None,))
else:
# In non-publisher-acknowledgments mode
self._execute_task(self._impl.basic_publish,
exchange=exchange,
routing_key=routing_key,
body=body,
properties=properties,
mandatory=mandatory,
immediate=immediate)
def basic_qos(self, prefetch_size=0, prefetch_count=0, all_channels=False):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.basic_qos,
callback=self._current_future.set_result,
prefetch_size=prefetch_size,
prefetch_count=prefetch_count,
all_channels=all_channels)
self._current_future.result()
def basic_recover(self, requeue=False):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(
self._impl.basic_recover,
callback=lambda: self._current_future.set_result(None),
requeue=requeue
)
self._current_future.result()
def basic_reject(self, delivery_tag=None, requeue=True):
self._execute_task(self._impl.basic_reject,
delivery_tag=delivery_tag,
requeue=requeue)
def _on_message_returned(self, *args, **kwargs):
self._message_returned = True
def confirm_delivery(self):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.add_callback,
callback=self._current_future.set_result,
replies=[pika_spec.Confirm.SelectOk],
one_shot=True)
self._execute_task(self._impl.confirm_delivery,
callback=self._on_message_confirmation,
nowait=False)
self._current_future.result()
self._delivery_confirmation = True
self._execute_task(self._impl.add_on_return_callback,
self._on_message_returned)
def exchange_declare(self, exchange=None, # pylint: disable=R0913
exchange_type='direct', passive=False, durable=False,
auto_delete=False, internal=False,
arguments=None, **kwargs):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.exchange_declare,
callback=self._current_future.set_result,
exchange=exchange,
exchange_type=exchange_type,
passive=passive,
durable=durable,
auto_delete=auto_delete,
internal=internal,
nowait=False,
arguments=arguments,
type=kwargs["type"] if kwargs else None)
return self._current_future.result()
def exchange_delete(self, exchange=None, if_unused=False):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.exchange_delete,
callback=self._current_future.set_result,
exchange=exchange,
if_unused=if_unused,
nowait=False)
return self._current_future.result()
def exchange_bind(self, destination=None, source=None, routing_key='',
arguments=None):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.exchange_bind,
callback=self._current_future.set_result,
destination=destination,
source=source,
routing_key=routing_key,
nowait=False,
arguments=arguments)
return self._current_future.result()
def exchange_unbind(self, destination=None, source=None, routing_key='',
arguments=None):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.exchange_unbind,
callback=self._current_future.set_result,
destination=destination,
source=source,
routing_key=routing_key,
nowait=False,
arguments=arguments)
return self._current_future.result()
def queue_declare(self, queue='', passive=False, durable=False,
exclusive=False, auto_delete=False,
arguments=None):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.queue_declare,
callback=self._current_future.set_result,
queue=queue,
passive=passive,
durable=durable,
exclusive=exclusive,
auto_delete=auto_delete,
nowait=False,
arguments=arguments)
return self._current_future.result()
def queue_delete(self, queue='', if_unused=False, if_empty=False):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.queue_delete,
callback=self._current_future.set_result,
queue=queue,
if_unused=if_unused,
if_empty=if_empty,
nowait=False)
return self._current_future.result()
def queue_purge(self, queue=''):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.queue_purge,
callback=self._current_future.set_result,
queue=queue,
nowait=False)
return self._current_future.result()
def queue_bind(self, queue, exchange, routing_key=None,
arguments=None):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.queue_bind,
callback=self._current_future.set_result,
queue=queue,
exchange=exchange,
routing_key=routing_key,
nowait=False,
arguments=arguments)
return self._current_future.result()
def queue_unbind(self, queue='', exchange=None, routing_key=None,
arguments=None):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.queue_unbind,
callback=self._current_future.set_result,
queue=queue,
exchange=exchange,
routing_key=routing_key,
arguments=arguments)
return self._current_future.result()