Sergey Vilgelm d6f9fbeb2f Use oslo.log library instead of system logging module
The constants of log levels were added in the 1.8 version of
the oslo.log library. So we can replace most uses of system
logging module with log module from oslo.log.

Change-Id: I36d36c7ed1246de7ba0d0cc8fd1ffcc4dfbcba25
Closes-Bug: 1481370
2015-08-05 13:21:09 +03:00

103 lines
3.2 KiB
Python

# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ssl as ssl_module
from eventlet import patcher
kombu = patcher.import_patched('kombu')
from oslo_serialization import jsonutils
from subscription import Subscription
class MqClient(object):
def __init__(self, login, password, host, port, virtual_host,
ssl=False, ca_certs=None):
ssl_params = None
if ssl is True:
ssl_params = {
'ca_certs': ca_certs,
'cert_reqs': ssl_module.CERT_REQUIRED
}
self._connection = kombu.Connection(
'amqp://{0}:{1}@{2}:{3}/{4}'.format(
login,
password,
host,
port,
virtual_host
), ssl=ssl_params
)
self._channel = None
self._connected = False
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
def connect(self):
self._connection.connect()
self._channel = self._connection.channel()
self._connected = True
def close(self):
self._connection.close()
self._connected = False
def declare(self, queue, exchange='', enable_ha=False, ttl=0):
if not self._connected:
raise RuntimeError('Not connected to RabbitMQ')
queue_arguments = {}
if enable_ha is True:
# To use mirrored queues feature in RabbitMQ 2.x
# we need to declare this policy on the queue itself.
#
# Warning: this option has no effect on RabbitMQ 3.X,
# to enable mirrored queues feature in RabbitMQ 3.X, please
# configure RabbitMQ.
queue_arguments['x-ha-policy'] = 'all'
if ttl > 0:
queue_arguments['x-expires'] = ttl
exchange = kombu.Exchange(exchange, type='direct', durable=True)
queue = kombu.Queue(queue, exchange, queue, durable=True,
queue_arguments=queue_arguments)
bound_queue = queue(self._connection)
bound_queue.declare()
def send(self, message, key, exchange=''):
if not self._connected:
raise RuntimeError('Not connected to RabbitMQ')
producer = kombu.Producer(self._connection)
producer.publish(
exchange=str(exchange),
routing_key=str(key),
body=jsonutils.dumps(message.body),
message_id=str(message.id)
)
def open(self, queue, prefetch_count=1):
if not self._connected:
raise RuntimeError('Not connected to RabbitMQ')
return Subscription(self._connection, queue, prefetch_count)