140 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			140 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 | 
						|
 | 
						|
# Copyright 2010 United States Government as represented by the
 | 
						|
# Administrator of the National Aeronautics and Space Administration.
 | 
						|
# All Rights Reserved.
 | 
						|
#
 | 
						|
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
						|
#    not use this file except in compliance with the License. You may obtain
 | 
						|
#    a copy of the License at
 | 
						|
#
 | 
						|
#         http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
#    Unless required by applicable law or agreed to in writing, software
 | 
						|
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
						|
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
						|
#    License for the specific language governing permissions and limitations
 | 
						|
#    under the License.
 | 
						|
 | 
						|
"""Based a bit on the carrot.backeds.queue backend... but a lot better."""
 | 
						|
 | 
						|
import Queue as queue
 | 
						|
 | 
						|
from carrot.backends import base
 | 
						|
from eventlet import greenthread
 | 
						|
 | 
						|
from nova import log as logging
 | 
						|
 | 
						|
 | 
						|
LOG = logging.getLogger("nova.fakerabbit")
 | 
						|
 | 
						|
 | 
						|
EXCHANGES = {}
 | 
						|
QUEUES = {}
 | 
						|
 | 
						|
 | 
						|
class Message(base.BaseMessage):
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
class Exchange(object):
 | 
						|
    def __init__(self, name, exchange_type):
 | 
						|
        self.name = name
 | 
						|
        self.exchange_type = exchange_type
 | 
						|
        self._queue = queue.Queue()
 | 
						|
        self._routes = {}
 | 
						|
 | 
						|
    def publish(self, message, routing_key=None):
 | 
						|
        nm = self.name
 | 
						|
        LOG.debug(_('(%(nm)s) publish (key: %(routing_key)s)'
 | 
						|
                ' %(message)s') % locals())
 | 
						|
        routing_key = routing_key.split('.')[0]
 | 
						|
        if routing_key in self._routes:
 | 
						|
            for f in self._routes[routing_key]:
 | 
						|
                LOG.debug(_('Publishing to route %s'), f)
 | 
						|
                f(message, routing_key=routing_key)
 | 
						|
 | 
						|
    def bind(self, callback, routing_key):
 | 
						|
        self._routes.setdefault(routing_key, [])
 | 
						|
        self._routes[routing_key].append(callback)
 | 
						|
 | 
						|
 | 
						|
class Queue(object):
 | 
						|
    def __init__(self, name):
 | 
						|
        self.name = name
 | 
						|
        self._queue = queue.Queue()
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        return '<Queue: %s>' % self.name
 | 
						|
 | 
						|
    def push(self, message, routing_key=None):
 | 
						|
        self._queue.put(message)
 | 
						|
 | 
						|
    def size(self):
 | 
						|
        return self._queue.qsize()
 | 
						|
 | 
						|
    def pop(self):
 | 
						|
        return self._queue.get()
 | 
						|
 | 
						|
 | 
						|
class Backend(base.BaseBackend):
 | 
						|
    def queue_declare(self, queue, **kwargs):
 | 
						|
        global QUEUES
 | 
						|
        if queue not in QUEUES:
 | 
						|
            LOG.debug(_('Declaring queue %s'), queue)
 | 
						|
            QUEUES[queue] = Queue(queue)
 | 
						|
 | 
						|
    def exchange_declare(self, exchange, type, *args, **kwargs):
 | 
						|
        global EXCHANGES
 | 
						|
        if exchange not in EXCHANGES:
 | 
						|
            LOG.debug(_('Declaring exchange %s'), exchange)
 | 
						|
            EXCHANGES[exchange] = Exchange(exchange, type)
 | 
						|
 | 
						|
    def queue_bind(self, queue, exchange, routing_key, **kwargs):
 | 
						|
        global EXCHANGES
 | 
						|
        global QUEUES
 | 
						|
        LOG.debug(_('Binding %(queue)s to %(exchange)s with'
 | 
						|
                ' key %(routing_key)s') % locals())
 | 
						|
        EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)
 | 
						|
 | 
						|
    def declare_consumer(self, queue, callback, *args, **kwargs):
 | 
						|
        self.current_queue = queue
 | 
						|
        self.current_callback = callback
 | 
						|
 | 
						|
    def consume(self, limit=None):
 | 
						|
        while True:
 | 
						|
            item = self.get(self.current_queue)
 | 
						|
            if item:
 | 
						|
                self.current_callback(item)
 | 
						|
                raise StopIteration()
 | 
						|
            greenthread.sleep(0)
 | 
						|
 | 
						|
    def get(self, queue, no_ack=False):
 | 
						|
        global QUEUES
 | 
						|
        if not queue in QUEUES or not QUEUES[queue].size():
 | 
						|
            return None
 | 
						|
        (message_data, content_type, content_encoding) = QUEUES[queue].pop()
 | 
						|
        message = Message(backend=self, body=message_data,
 | 
						|
                          content_type=content_type,
 | 
						|
                          content_encoding=content_encoding)
 | 
						|
        message.result = True
 | 
						|
        LOG.debug(_('Getting from %(queue)s: %(message)s') % locals())
 | 
						|
        return message
 | 
						|
 | 
						|
    def prepare_message(self, message_data, delivery_mode,
 | 
						|
                        content_type, content_encoding, **kwargs):
 | 
						|
        """Prepare message for sending."""
 | 
						|
        return (message_data, content_type, content_encoding)
 | 
						|
 | 
						|
    def publish(self, message, exchange, routing_key, **kwargs):
 | 
						|
        global EXCHANGES
 | 
						|
        if exchange in EXCHANGES:
 | 
						|
            EXCHANGES[exchange].publish(message, routing_key=routing_key)
 | 
						|
 | 
						|
 | 
						|
def reset_all():
 | 
						|
    global EXCHANGES
 | 
						|
    global QUEUES
 | 
						|
    EXCHANGES = {}
 | 
						|
    QUEUES = {}
 |