Introduction of oslo.messaging
* replaces kombu with oslo.messaging * deprecates and removes the faafo-tracker service (merged into the faafo-producer service) * moves all CLI options into configuration files * updates log example outputs in the documentation * updates the workflow diagram and removes the workflow text description Change-Id: I4b79ed28a8f5b1fe1b33dfdbfe9cc8c958f80b1b
This commit is contained in:
parent
32ca4b408d
commit
ec790a3bed
4
ansible/files/api.conf
Normal file
4
ansible/files/api.conf
Normal file
@ -0,0 +1,4 @@
|
||||
[DEFAULT]
|
||||
|
||||
database_url = mysql://faafo:secretsecret@127.0.0.1:3306/faafo
|
||||
verbose = True
|
5
ansible/files/producer.conf
Normal file
5
ansible/files/producer.conf
Normal file
@ -0,0 +1,5 @@
|
||||
[DEFAULT]
|
||||
|
||||
endpoint_url = http://127.0.0.1:5000
|
||||
transport_url = rabbit://faafo:secretsecret@localhost:5672/
|
||||
verbose = True
|
@ -1,5 +1,3 @@
|
||||
#!/bin/sh
|
||||
|
||||
faafo-api \
|
||||
--database-url mysql://faafo:secretsecret@127.0.0.1:3306/faafo \
|
||||
--debug --verbose
|
||||
faafo-api --config-file api.conf
|
||||
|
@ -1,6 +1,3 @@
|
||||
#!/bin/sh
|
||||
|
||||
faafo-producer \
|
||||
--amqp-url amqp://faafo:secretsecret@127.0.0.1:5672/ \
|
||||
--api-url http://127.0.0.1:5000 \
|
||||
--debug --verbose
|
||||
faafo-producer --config-file producer.conf
|
||||
|
@ -1,6 +0,0 @@
|
||||
#!/bin/sh
|
||||
|
||||
faafo-tracker \
|
||||
--amqp-url amqp://faafo:secretsecret@127.0.0.1:5672/ \
|
||||
--api-url http://127.0.0.1:5000 \
|
||||
--debug --verbose
|
@ -1,6 +1,3 @@
|
||||
#!/bin/sh
|
||||
|
||||
faafo-worker \
|
||||
--amqp-url amqp://faafo:secretsecret@127.0.0.1:5672/ \
|
||||
--target /home/vagrant \
|
||||
--debug --verbose
|
||||
faafo-worker --config-file worker.conf
|
||||
|
5
ansible/files/worker.conf
Normal file
5
ansible/files/worker.conf
Normal file
@ -0,0 +1,5 @@
|
||||
[DEFAULT]
|
||||
|
||||
filesystem_store_datadir = /home/vagrant
|
||||
transport_url = rabbit://faafo:secretsecret@localhost:5672/
|
||||
verbose = True
|
@ -20,7 +20,11 @@
|
||||
args:
|
||||
chdir: /vagrant
|
||||
|
||||
- copy: src=files/run_api.sh dest=/home/vagrant/run_api.sh mode=0755 owner=vagrant group=vagrant
|
||||
- copy: src=files/run_producer.sh dest=/home/vagrant/run_producer.sh mode=0755 owner=vagrant group=vagrant
|
||||
- copy: src=files/run_tracker.sh dest=/home/vagrant/run_tracker.sh mode=0755 owner=vagrant group=vagrant
|
||||
- copy: src=files/run_worker.sh dest=/home/vagrant/run_worker.sh mode=0755 owner=vagrant group=vagrant
|
||||
- copy: src=files/{{ item }} dest=/home/vagrant/{{ item }} mode=0755 owner=vagrant group=vagrant
|
||||
with_items:
|
||||
- api.conf
|
||||
- producer.conf
|
||||
- run_api.sh
|
||||
- run_producer.sh
|
||||
- run_worker.sh
|
||||
- worker.conf
|
||||
|
@ -27,12 +27,11 @@ Now it is possible to login with SSH.
|
||||
|
||||
$ vagrant ssh
|
||||
|
||||
Open a new screen or tmux session. Aftwards run the api, worker, producer, and
|
||||
tracker services in the foreground, each service in a separate window.
|
||||
Open a new screen or tmux session. Aftwards run the api, worker, and producer
|
||||
services in the foreground, each service in a separate window.
|
||||
|
||||
* :code:`sh run_api.sh`
|
||||
* :code:`sh run_producer.sh`
|
||||
* :code:`sh run_tracker.sh`
|
||||
* :code:`sh run_worker.sh`
|
||||
|
||||
RabbitMQ server
|
||||
@ -60,12 +59,11 @@ the application itself.
|
||||
$ pip install -r requirements.txt
|
||||
$ python setup.py install
|
||||
|
||||
Open a new screen or tmux session. Aftwards run the api, worker, producer, and
|
||||
tracker services in the foreground, each service in a separate window.
|
||||
Open a new screen or tmux session. Aftwards run the api, worker, and producer
|
||||
services in the foreground, each service in a separate window.
|
||||
|
||||
.. code::
|
||||
|
||||
$ source .venv/bin/activate; faafo-api
|
||||
$ source .venv/bin/activate; faafo-worker
|
||||
$ source .venv/bin/activate; faafo-tracker
|
||||
$ source .venv/bin/activate; faafo-producer
|
||||
|
@ -4,12 +4,12 @@ digraph {
|
||||
Database -> API [color=red];
|
||||
API -> Webinterface [color=red];
|
||||
Producer -> API [color=orange];
|
||||
Producer -> API [color=green];
|
||||
Producer -> "Queue Service" [color=orange];
|
||||
Tracker -> API [color=green];
|
||||
"Queue Service" -> Tracker [color=green];
|
||||
"Queue Service" -> Worker [color=orange];
|
||||
Worker -> "Image File" [color=blue];
|
||||
Worker -> "Queue Service" [color=green];
|
||||
"Queue Service" -> Producer [color=green];
|
||||
"Image File" -> "Storage Backend" [color=blue];
|
||||
"Storage Backend" -> Webinterface [color=red];
|
||||
}
|
||||
|
Binary file not shown.
Before Width: | Height: | Size: 39 KiB After Width: | Height: | Size: 35 KiB |
@ -10,42 +10,28 @@ Example image
|
||||
Example outputs
|
||||
---------------
|
||||
|
||||
API Service
|
||||
~~~~~~~~~~~
|
||||
|
||||
FIXME(berendt): add output of the API service
|
||||
|
||||
Producer service
|
||||
~~~~~~~~~~~~~~~~
|
||||
|
||||
FIXME(berendt): update output (introduction of oslo.logging)
|
||||
|
||||
.. code::
|
||||
|
||||
2015-02-12 22:21:42,870 generating 2 task(s)
|
||||
2015-02-12 22:21:42,876 generated task: {'width': 728, 'yb': 2.6351683415972076, 'uuid': UUID('66d5f67e-d26d-42fb-9d88-3c3830b4187a'), 'iterations': 395, 'xb': 1.6486035545865234, 'xa': -1.2576814065507933, 'ya': -2.8587178863035616, 'height': 876}
|
||||
2015-02-12 22:21:42,897 generated task: {'width': 944, 'yb': 2.981696583462036, 'uuid': UUID('6f873111-8bc2-4d73-9a36-ed49915699c8'), 'iterations': 201, 'xb': 3.530775320058914, 'xa': -3.3511031734533794, 'ya': -0.921920674639712, 'height': 962}
|
||||
2015-02-12 22:21:42,927 sleeping for 2.680171 seconds
|
||||
|
||||
Tracker service
|
||||
~~~~~~~~~~~~~~~
|
||||
|
||||
FIXME(berendt): update output (introduction of oslo.logging)
|
||||
|
||||
.. code::
|
||||
|
||||
2015-02-12 22:20:26,630 processing result be42a131-e4aa-4db5-80d1-1956784f4b81
|
||||
2015-02-12 22:20:26,630 elapsed time 5.749099 seconds
|
||||
2015-02-12 22:20:26,631 checksum 7ba5bf955a94f1aa02e5f442869b8db88a5915b7c2fb91ffba74708b8d799c2a
|
||||
2015-03-25 23:01:29.308 22526 INFO faafo.producer [-] generating 1 task(s)
|
||||
2015-03-25 23:01:29.344 22526 INFO faafo.producer [-] generated task: {'width': 510, 'yb': 2.478654026560605, 'uuid': '212e8c23-e67f-4bd3-86e1-5a5e811ee2f4', 'iterations': 281, 'xb': 1.1428457603077387, 'xa': -3.3528957195683087, 'ya': -2.1341119130263717, 'height': 278}
|
||||
2015-03-25 23:01:30.295 22526 INFO faafo.producer [-] task 212e8c23-e67f-4bd3-86e1-5a5e811ee2f4 processed: {u'duration': 0.8725259304046631, u'checksum': u'b22d975c4f9dc77df5db96ce6264a4990d865dd8f800aba2ac03a065c2f09b1e', u'uuid': u'212e8c23-e67f-4bd3-86e1-5a5e811ee2f4'}
|
||||
|
||||
Worker service
|
||||
~~~~~~~~~~~~~~
|
||||
|
||||
FIXME(berendt): update output (introduction of oslo.logging)
|
||||
|
||||
.. code::
|
||||
|
||||
2015-02-12 22:20:59,258 processing task 20a00e9e-baec-4045-bc57-2cb9d8d1aa61
|
||||
2015-02-12 22:21:01,506 task 20a00e9e-baec-4045-bc57-2cb9d8d1aa61 processed in 2.246601 seconds
|
||||
2015-02-12 22:21:01,553 saved result of task 20a00e9e-baec-4045-bc57-2cb9d8d1aa61 to file /home/vagrant/20a00e9e-baec-4045-bc57-2cb9d8d1aa61.png
|
||||
2015-02-12 22:21:01,554 pushed result: {'duration': 2.246600866317749, 'checksum': 'faa0f00a72fac53e02c3eb392c5da8365139e509899e269227e5c27047af6c1f', 'uuid': UUID('20a00e9e-baec-4045-bc57-2cb9d8d1aa61')}
|
||||
2015-03-25 23:01:29.378 22518 INFO faafo.worker [-] processing task 212e8c23-e67f-4bd3-86e1-5a5e811ee2f4
|
||||
2015-03-25 23:01:30.251 22518 INFO faafo.worker [-] task 212e8c23-e67f-4bd3-86e1-5a5e811ee2f4 processed in 0.872526 seconds
|
||||
2015-03-25 23:01:30.268 22518 INFO faafo.worker [-] saved result of task 212e8c23-e67f-4bd3-86e1-5a5e811ee2f4 to file /home/vagrant/212e8c23-e67f-4bd3-86e1-5a5e811ee2f4.png
|
||||
|
||||
|
||||
API Service
|
||||
~~~~~~~~~~~
|
||||
.. code::
|
||||
|
||||
2015-03-25 23:01:29.342 22511 INFO werkzeug [-] 127.0.0.1 - - [25/Mar/2015 23:01:29] "POST /api/fractal HTTP/1.1" 201 -
|
||||
2015-03-25 23:01:30.317 22511 INFO werkzeug [-] 127.0.0.1 - - [25/Mar/2015 23:01:30] "PUT /api/fractal/212e8c23-e67f-4bd3-86e1-5a5e811ee2f4 HTTP/1.1" 200 -
|
||||
|
@ -2,17 +2,3 @@ Workflow
|
||||
--------
|
||||
|
||||
.. image:: images/diagram.png
|
||||
|
||||
FIXME(berendt): Add new API service and webinterface to the workflow description.
|
||||
|
||||
* The producer generates a random number of tasks with random parameters and a UUID as identifier.
|
||||
* The producer pushes the generated tasks into the exchange :code:`tasks`.
|
||||
* The producer inserts a new record for each task into the database (including all parameters and the UUID).
|
||||
* The producer sleeps for a random number of seconds and will generate more tasks after awakening.
|
||||
* All messages in the :code:`tasks` exchange will be routed into the :code:`tasks` queue.
|
||||
* The worker waits for new messages in the :code:`tasks` queue.
|
||||
* After receiving a message the worker generates an image based on the received parameters and writes the result into a local file (identified by the UUID).
|
||||
* After writing an image the worker pushes the result (the checksum of the generated image and the duration identified by the UUID) into the exchange :code:`results`.
|
||||
* All messages in the :code:`results` exchange will be routed into the :code:`results` queue.
|
||||
* The tracker waits for new messages in the :code:`results` queue.
|
||||
* After receiving a message the tracker updates the duration and checksum value of the corresponding database record (identified by the UUID).
|
||||
|
1
etc/faafo/producer.conf
Normal file
1
etc/faafo/producer.conf
Normal file
@ -0,0 +1 @@
|
||||
TODO(berendt): generate example configuration file with http://docs.openstack.org/developer/oslo.config/generator.html
|
1
etc/faafo/worker.conf
Normal file
1
etc/faafo/worker.conf
Normal file
@ -0,0 +1 @@
|
||||
TODO(berendt): generate example configuration file with http://docs.openstack.org/developer/oslo.config/generator.html
|
@ -18,9 +18,9 @@ from oslo_log import log
|
||||
|
||||
from faafo import version
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
LOG = log.getLogger('faafo.api')
|
||||
|
||||
cli_opts = [
|
||||
api_opts = [
|
||||
cfg.StrOpt('listen-address',
|
||||
default='0.0.0.0',
|
||||
help='Listen address.'),
|
||||
@ -32,7 +32,7 @@ cli_opts = [
|
||||
help='Database connection URL.')
|
||||
]
|
||||
|
||||
cfg.CONF.register_cli_opts(cli_opts)
|
||||
cfg.CONF.register_opts(api_opts)
|
||||
|
||||
log.register_options(cfg.CONF)
|
||||
log.set_defaults()
|
||||
@ -43,7 +43,7 @@ cfg.CONF(project='api', prog='faafo-api',
|
||||
log.setup(cfg.CONF, 'api',
|
||||
version=version.version_info.version_string())
|
||||
|
||||
app = flask.Flask(__name__)
|
||||
app = flask.Flask('faafo.api')
|
||||
app.config['DEBUG'] = cfg.CONF.debug
|
||||
app.config['SQLALCHEMY_DATABASE_URI'] = cfg.CONF.database_url
|
||||
db = flask.ext.sqlalchemy.SQLAlchemy(app)
|
||||
|
@ -16,27 +16,19 @@ import json
|
||||
import random
|
||||
import uuid
|
||||
|
||||
import kombu
|
||||
from kombu.pools import producers
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import oslo_messaging as messaging
|
||||
import requests
|
||||
|
||||
from faafo.openstack.common import periodic_task
|
||||
from faafo.openstack.common import service
|
||||
from faafo import queues
|
||||
from faafo import version
|
||||
|
||||
|
||||
LOG = log.getLogger('faafo.producer')
|
||||
|
||||
cli_opts = [
|
||||
cfg.StrOpt('amqp-url',
|
||||
default='amqp://faafo:secretsecret@localhost:5672/',
|
||||
help='AMQP connection URL'),
|
||||
cfg.StrOpt('api-url',
|
||||
default='http://localhost:5000',
|
||||
help='API connection URL')
|
||||
]
|
||||
|
||||
producer_opts = [
|
||||
@ -74,22 +66,27 @@ producer_opts = [
|
||||
help="The minimum number of generated tasks."),
|
||||
cfg.IntOpt("max-tasks", default=10,
|
||||
help="The maximum number of generated tasks."),
|
||||
cfg.IntOpt("interval", default=10, help="Interval in seconds.")
|
||||
cfg.IntOpt("interval", default=10, help="Interval in seconds."),
|
||||
cfg.StrOpt('endpoint-url',
|
||||
default='http://localhost:5000',
|
||||
help='API connection URL')
|
||||
]
|
||||
|
||||
cfg.CONF.register_cli_opts(cli_opts)
|
||||
cfg.CONF.register_cli_opts(producer_opts)
|
||||
cfg.CONF.register_opts(producer_opts)
|
||||
|
||||
|
||||
class ProducerService(service.Service, periodic_task.PeriodicTasks):
|
||||
def __init__(self):
|
||||
super(ProducerService, self).__init__()
|
||||
self.messaging = kombu.Connection(cfg.CONF.amqp_url)
|
||||
self._periodic_last_run = {}
|
||||
transport = messaging.get_transport(cfg.CONF)
|
||||
target = messaging.Target(topic='tasks')
|
||||
self._client = messaging.RPCClient(transport, target)
|
||||
|
||||
@periodic_task.periodic_task(spacing=cfg.CONF.interval,
|
||||
run_immediately=True)
|
||||
def generate_task(self, context):
|
||||
run_immediately=False)
|
||||
def generate_task(self, ctxt):
|
||||
ctxt = {}
|
||||
random.seed()
|
||||
number = random.randint(cfg.CONF.min_tasks, cfg.CONF.max_tasks)
|
||||
LOG.info("generating %d task(s)" % number)
|
||||
@ -98,16 +95,14 @@ class ProducerService(service.Service, periodic_task.PeriodicTasks):
|
||||
# NOTE(berendt): only necessary when using requests < 2.4.2
|
||||
headers = {'Content-type': 'application/json',
|
||||
'Accept': 'text/plain'}
|
||||
requests.post("%s/api/fractal" % cfg.CONF.api_url,
|
||||
requests.post("%s/api/fractal" % cfg.CONF.endpoint_url,
|
||||
json.dumps(task), headers=headers)
|
||||
LOG.info("generated task: %s" % task)
|
||||
with producers[self.messaging].acquire(block=True) as producer:
|
||||
producer.publish(
|
||||
task,
|
||||
serializer='pickle',
|
||||
exchange=queues.task_exchange,
|
||||
declare=[queues.task_exchange],
|
||||
routing_key='tasks')
|
||||
result = self._client.call(ctxt, 'process', task=task)
|
||||
LOG.info("task %s processed: %s" % (task['uuid'], result))
|
||||
requests.put("%s/api/fractal/%s" %
|
||||
(cfg.CONF.endpoint_url, str(task['uuid'])),
|
||||
json.dumps(result), headers=headers)
|
||||
|
||||
self.add_periodic_task(generate_task)
|
||||
self.tg.add_dynamic_timer(self.periodic_tasks)
|
||||
|
@ -1,20 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from kombu import Exchange
|
||||
from kombu import Queue
|
||||
|
||||
task_exchange = Exchange('tasks', type='direct')
|
||||
task_queues = [Queue('tasks', task_exchange, routing_key='tasks')]
|
||||
|
||||
result_exchange = Exchange('results', type='direct')
|
||||
result_queues = [Queue('results', result_exchange, routing_key='results')]
|
@ -1,98 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# based on http://code.activestate.com/recipes/577120-julia-fractals/
|
||||
|
||||
import json
|
||||
import sys
|
||||
|
||||
import daemon
|
||||
import kombu
|
||||
from kombu.mixins import ConsumerMixin
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import requests
|
||||
|
||||
from faafo import queues
|
||||
from faafo import version
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
cli_opts = [
|
||||
cfg.BoolOpt('daemonize',
|
||||
default=False,
|
||||
help='Run in background.'),
|
||||
cfg.StrOpt('amqp-url',
|
||||
default='amqp://faafo:secretsecret@localhost:5672/',
|
||||
help='AMQP connection URL'),
|
||||
cfg.StrOpt('api-url',
|
||||
default='http://localhost:5000',
|
||||
help='API connection URL')
|
||||
]
|
||||
|
||||
cfg.CONF.register_cli_opts(cli_opts)
|
||||
|
||||
|
||||
class Tracker(ConsumerMixin):
|
||||
|
||||
def __init__(self, amqp_url, api_url):
|
||||
self.connection = kombu.Connection(amqp_url)
|
||||
self.api_url = api_url
|
||||
|
||||
def get_consumers(self, Consumer, channel):
|
||||
return [Consumer(queues=queues.result_queues,
|
||||
accept=['pickle', 'json'],
|
||||
callbacks=[self.process_result])]
|
||||
|
||||
def process_result(self, body, message):
|
||||
LOG.info("processing result %s" % body['uuid'])
|
||||
LOG.info("elapsed time %f seconds" % body['duration'])
|
||||
LOG.info("checksum %s" % body['checksum'])
|
||||
result = {
|
||||
'duration': float(body['duration']),
|
||||
'checksum': str(body['checksum'])
|
||||
}
|
||||
# NOTE(berendt): only necessary when using requests < 2.4.2
|
||||
headers = {'Content-type': 'application/json',
|
||||
'Accept': 'text/plain'}
|
||||
requests.put("%s/api/fractal/%s" %
|
||||
(self.api_url, str(body['uuid'])),
|
||||
json.dumps(result), headers=headers)
|
||||
message.ack()
|
||||
|
||||
|
||||
def main():
|
||||
log.register_options(cfg.CONF)
|
||||
log.set_defaults()
|
||||
|
||||
cfg.CONF(project='tracker', prog='faafo-tracker',
|
||||
version=version.version_info.version_string())
|
||||
|
||||
log.setup(cfg.CONF, 'tracker',
|
||||
version=version.version_info.version_string())
|
||||
|
||||
tracker = Tracker(cfg.CONF.amqp_url, cfg.CONF.api_url)
|
||||
|
||||
if cfg.CONF.daemonize:
|
||||
with daemon.DaemonContext():
|
||||
tracker.run()
|
||||
else:
|
||||
try:
|
||||
tracker.run()
|
||||
except Exception as e:
|
||||
sys.exit("ERROR: %s" % e)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -14,39 +14,33 @@
|
||||
|
||||
# based on http://code.activestate.com/recipes/577120-julia-fractals/
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
from PIL import Image
|
||||
import random
|
||||
import sys
|
||||
import socket
|
||||
import time
|
||||
|
||||
import daemon
|
||||
import kombu
|
||||
from kombu.mixins import ConsumerMixin
|
||||
from kombu.pools import producers
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import oslo_messaging as messaging
|
||||
|
||||
from faafo import queues
|
||||
from faafo import version
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
LOG = log.getLogger('faafo.worker')
|
||||
|
||||
cli_opts = [
|
||||
cfg.BoolOpt('daemonize',
|
||||
default=False,
|
||||
help='Run in background.'),
|
||||
cfg.StrOpt('target',
|
||||
worker_opts = [
|
||||
cfg.StrOpt('filesystem_store_datadir',
|
||||
default='/tmp',
|
||||
help='Target directory for fractal image files.'),
|
||||
cfg.StrOpt('amqp-url',
|
||||
default='amqp://faafo:secretsecret@localhost:5672/',
|
||||
help='AMQP connection URL')
|
||||
help='Directory that the filesystem backend store writes '
|
||||
'fractal image files to.'),
|
||||
]
|
||||
|
||||
cfg.CONF.register_cli_opts(cli_opts)
|
||||
cfg.CONF.register_opts(worker_opts)
|
||||
|
||||
|
||||
class JuliaSet(object):
|
||||
@ -97,49 +91,34 @@ class JuliaSet(object):
|
||||
return (c, z)
|
||||
|
||||
|
||||
class Worker(ConsumerMixin):
|
||||
class WorkerEndpoint(object):
|
||||
|
||||
def __init__(self, url, target):
|
||||
self.connection = kombu.Connection(url)
|
||||
self.target = target
|
||||
|
||||
def get_consumers(self, Consumer, channel):
|
||||
return [Consumer(queues=queues.task_queues,
|
||||
accept=['pickle', 'json'],
|
||||
callbacks=[self.process_task])]
|
||||
|
||||
def process_task(self, body, message):
|
||||
LOG.info("processing task %s" % body['uuid'])
|
||||
LOG.debug(body)
|
||||
def process(self, ctxt, task):
|
||||
LOG.info("processing task %s" % task['uuid'])
|
||||
LOG.debug(task)
|
||||
start_time = time.time()
|
||||
juliaset = JuliaSet(body['width'],
|
||||
body['height'],
|
||||
body['xa'],
|
||||
body['xb'],
|
||||
body['ya'],
|
||||
body['yb'],
|
||||
body['iterations'])
|
||||
filename = os.path.join(self.target, "%s.png" % body['uuid'])
|
||||
juliaset = JuliaSet(task['width'],
|
||||
task['height'],
|
||||
task['xa'],
|
||||
task['xb'],
|
||||
task['ya'],
|
||||
task['yb'],
|
||||
task['iterations'])
|
||||
filename = os.path.join(cfg.CONF.filesystem_store_datadir,
|
||||
"%s.png" % task['uuid'])
|
||||
elapsed_time = time.time() - start_time
|
||||
LOG.info("task %s processed in %f seconds" %
|
||||
(body['uuid'], elapsed_time))
|
||||
(task['uuid'], elapsed_time))
|
||||
juliaset.save(filename)
|
||||
LOG.info("saved result of task %s to file %s" %
|
||||
(body['uuid'], filename))
|
||||
(task['uuid'], filename))
|
||||
checksum = hashlib.sha256(open(filename, 'rb').read()).hexdigest()
|
||||
result = {
|
||||
'uuid': body['uuid'],
|
||||
'uuid': task['uuid'],
|
||||
'duration': elapsed_time,
|
||||
'checksum': checksum
|
||||
}
|
||||
LOG.info("pushed result: %s" % result)
|
||||
with producers[self.connection].acquire(block=True) as producer:
|
||||
producer.publish(result, serializer='pickle',
|
||||
exchange=queues.result_exchange,
|
||||
declare=[queues.result_exchange],
|
||||
routing_key='results')
|
||||
|
||||
message.ack()
|
||||
return result
|
||||
|
||||
|
||||
def main():
|
||||
@ -152,16 +131,18 @@ def main():
|
||||
log.setup(cfg.CONF, 'worker',
|
||||
version=version.version_info.version_string())
|
||||
|
||||
worker = Worker(cfg.CONF.amqp_url, cfg.CONF.target)
|
||||
|
||||
if cfg.CONF.daemonize:
|
||||
with daemon.DaemonContext():
|
||||
worker.run()
|
||||
else:
|
||||
try:
|
||||
worker.run()
|
||||
except Exception as e:
|
||||
sys.exit("ERROR: %s" % e)
|
||||
transport = messaging.get_transport(cfg.CONF)
|
||||
target = messaging.Target(topic='tasks', server=socket.gethostname())
|
||||
endpoints = [
|
||||
WorkerEndpoint()
|
||||
]
|
||||
server = messaging.get_rpc_server(transport, target, endpoints,
|
||||
executor='eventlet')
|
||||
server.start()
|
||||
try:
|
||||
server.wait()
|
||||
except KeyboardInterrupt:
|
||||
LOG.info("Caught keyboard interrupt. Exiting.")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
@ -1,14 +1,12 @@
|
||||
pbr>=0.6,!=0.7,<1.0
|
||||
anyjson>=0.3.3
|
||||
amqp
|
||||
eventlet>=0.16.1,!=0.17.0
|
||||
kombu>=2.5.0
|
||||
PyMySQL>=0.6.2 # MIT License
|
||||
Pillow==2.4.0 # MIT
|
||||
python-daemon
|
||||
requests>=2.2.0,!=2.4.0
|
||||
Flask>=0.10,<1.0
|
||||
flask-sqlalchemy
|
||||
flask-restless
|
||||
oslo.config>=1.9.3,<1.10.0 # Apache-2.0
|
||||
oslo.log>=1.0.0,<1.1.0 # Apache-2.0
|
||||
oslo.messaging>=1.8.0,<1.9.0 # Apache-2.0
|
||||
|
Loading…
Reference in New Issue
Block a user