Added support for reading messages from RabbitMQ asynchronously
This commit is contained in:
		@@ -21,6 +21,7 @@ import sys
 | 
			
		||||
 | 
			
		||||
# If ../portas/__init__.py exists, add ../ to Python search path, so that
 | 
			
		||||
# it will override what happens to be installed in /usr/(local/)lib/python...
 | 
			
		||||
from portas.common.service import TaskResultHandlerService
 | 
			
		||||
 | 
			
		||||
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
 | 
			
		||||
                                                os.pardir,
 | 
			
		||||
@@ -40,11 +41,15 @@ if __name__ == '__main__':
 | 
			
		||||
        config.parse_args()
 | 
			
		||||
        log.setup('portas')
 | 
			
		||||
 | 
			
		||||
        launcher = service.ServiceLauncher()
 | 
			
		||||
 | 
			
		||||
        api_service = wsgi.Service(config.load_paste_app(),
 | 
			
		||||
                                   port=config.CONF.bind_port,
 | 
			
		||||
                                   host=config.CONF.bind_host)
 | 
			
		||||
        launcher = service.Launcher()
 | 
			
		||||
        launcher.run_service(api_service)
 | 
			
		||||
 | 
			
		||||
        launcher.launch_service(api_service)
 | 
			
		||||
        launcher.launch_service(TaskResultHandlerService())
 | 
			
		||||
        launcher.wait()
 | 
			
		||||
    except RuntimeError, e:
 | 
			
		||||
        sys.stderr.write("ERROR: %s\n" % e)
 | 
			
		||||
        sys.exit(1)
 | 
			
		||||
 
 | 
			
		||||
@@ -16,4 +16,19 @@ bind_port = 8082
 | 
			
		||||
log_file = /tmp/portas-api.log
 | 
			
		||||
 | 
			
		||||
#A valid SQLAlchemy connection string for the metadata database
 | 
			
		||||
sql_connection = sqlite:///portas.sqlite
 | 
			
		||||
sql_connection = sqlite:///portas.sqlite
 | 
			
		||||
 | 
			
		||||
[reports]
 | 
			
		||||
results_exchange = task-results
 | 
			
		||||
results_queue = task-results
 | 
			
		||||
reports_exchange = task-reports
 | 
			
		||||
reports_queue = task-reports
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
[rabbitmq]
 | 
			
		||||
host = localhost
 | 
			
		||||
port = 5672
 | 
			
		||||
use_ssl = false
 | 
			
		||||
userid = guest
 | 
			
		||||
password = guest
 | 
			
		||||
virtual_host = /
 | 
			
		||||
@@ -41,9 +41,28 @@ bind_opts = [
 | 
			
		||||
    cfg.IntOpt('bind_port'),
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
reports_opts = [
 | 
			
		||||
    cfg.StrOpt('results_exchange', default='task-results'),
 | 
			
		||||
    cfg.StrOpt('results_queue', default='task-results'),
 | 
			
		||||
    cfg.StrOpt('reports_exchange', default='task-reports'),
 | 
			
		||||
    cfg.StrOpt('reports_queue', default='task-reports')
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
rabbit_opts = [
 | 
			
		||||
    cfg.StrOpt('host', default='localhost'),
 | 
			
		||||
    cfg.IntOpt('port', default=5672),
 | 
			
		||||
    cfg.BoolOpt('use_ssl', default=False),
 | 
			
		||||
    cfg.StrOpt('userid', default='guest'),
 | 
			
		||||
    cfg.StrOpt('password', default='guest'),
 | 
			
		||||
    cfg.StrOpt('virtual_host', default='/'),
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
CONF = cfg.CONF
 | 
			
		||||
CONF.register_opts(paste_deploy_opts, group='paste_deploy')
 | 
			
		||||
CONF.register_opts(bind_opts)
 | 
			
		||||
CONF.register_opts(reports_opts, group='reports')
 | 
			
		||||
CONF.register_opts(rabbit_opts, group='rabbitmq')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
CONF.import_opt('verbose', 'portas.openstack.common.log')
 | 
			
		||||
CONF.import_opt('debug', 'portas.openstack.common.log')
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										52
									
								
								portas/portas/common/service.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										52
									
								
								portas/portas/common/service.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,52 @@
 | 
			
		||||
from eventlet import patcher
 | 
			
		||||
 | 
			
		||||
amqp = patcher.import_patched('amqplib.client_0_8')
 | 
			
		||||
 | 
			
		||||
from portas.openstack.common import service
 | 
			
		||||
from portas.openstack.common import log as logging
 | 
			
		||||
from portas.common import config
 | 
			
		||||
 | 
			
		||||
conf = config.CONF.reports
 | 
			
		||||
rabbitmq = config.CONF.rabbitmq
 | 
			
		||||
log = logging.getLogger(__name__)
 | 
			
		||||
channel = None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TaskResultHandlerService(service.Service):
 | 
			
		||||
    def __init__(self, threads=1000):
 | 
			
		||||
        super(TaskResultHandlerService, self).__init__(threads)
 | 
			
		||||
 | 
			
		||||
    def start(self):
 | 
			
		||||
        super(TaskResultHandlerService, self).start()
 | 
			
		||||
        self.tg.add_thread(self._handle_results)
 | 
			
		||||
 | 
			
		||||
    def stop(self):
 | 
			
		||||
        super(TaskResultHandlerService, self).stop()
 | 
			
		||||
 | 
			
		||||
    def _handle_results(self):
 | 
			
		||||
        connection = amqp.Connection(rabbitmq.host, virtual_host=rabbitmq.virtual_host,
 | 
			
		||||
                                     userid=rabbitmq.userid, password=rabbitmq.password,
 | 
			
		||||
                                     ssl=rabbitmq.use_ssl, insist=True)
 | 
			
		||||
        ch = connection.channel()
 | 
			
		||||
 | 
			
		||||
        def bind(exchange, queue):
 | 
			
		||||
            ch.exchange_declare(exchange, 'direct')
 | 
			
		||||
            ch.queue_declare(queue)
 | 
			
		||||
            ch.queue_bind(queue, exchange, queue)
 | 
			
		||||
 | 
			
		||||
        bind(conf.results_exchange, conf.results_queue)
 | 
			
		||||
        bind(conf.reports_exchange, conf.reports_queue)
 | 
			
		||||
 | 
			
		||||
        ch.basic_consume('task-results', callback=handle_result)
 | 
			
		||||
        ch.basic_consume('task-reports', callback=handle_report)
 | 
			
		||||
        while ch.callbacks:
 | 
			
		||||
            ch.wait()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def handle_report(msg):
 | 
			
		||||
    msg.channel.basic_ack(msg.delivery_tag)
 | 
			
		||||
    log.debug(_('Got report message from orchestration engine:\n{0}'.format(msg.body)))
 | 
			
		||||
 | 
			
		||||
def handle_result(msg):
 | 
			
		||||
    msg.channel.basic_ack(msg.delivery_tag)
 | 
			
		||||
    log.debug(_('Got result message from orchestration engine:\n{0}'.format(msg.body)))
 | 
			
		||||
@@ -13,6 +13,7 @@ httplib2
 | 
			
		||||
kombu
 | 
			
		||||
pycrypto>=2.1.0alpha1
 | 
			
		||||
iso8601>=0.1.4
 | 
			
		||||
amqplib
 | 
			
		||||
 | 
			
		||||
# Note you will need gcc buildtools installed and must
 | 
			
		||||
# have installed libxml headers for lxml to be successfully
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user