Cleanup worker and producer service

Change-Id: Ic96c57a0bd0e370e12c8ff3932fd801a93d51fd2
This commit is contained in:
Christian Berendt 2015-03-27 11:59:05 +01:00
parent 6f2dace98b
commit b53b2a5ff5
8 changed files with 131 additions and 69 deletions

3
.gitignore vendored
View File

@ -9,3 +9,6 @@
build
*.log
*.png
AUTHORS
ChangeLog
dist

51
bin/faafo-producer Executable file
View File

@ -0,0 +1,51 @@
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import sys
from oslo_config import cfg
from oslo_log import log
from faafo.openstack.common import service
from faafo.producer import service as producer
from faafo import version
CONF = cfg.CONF
# If ../faafo/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(possible_topdir, 'faafo', '__init__.py')):
sys.path.insert(0, possible_topdir)
if __name__ == '__main__':
log.register_options(CONF)
log.set_defaults()
CONF(project='producer', prog='faafo-producer',
version=version.version_info.version_string())
log.setup(CONF, 'producer',
version=version.version_info.version_string())
srv = producer.ProducerService()
if CONF.one_shot:
srv.periodic_tasks()
else:
launcher = service.launch(srv)
launcher.wait()

50
bin/faafo-worker Executable file
View File

@ -0,0 +1,50 @@
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import sys
from oslo_config import cfg
from oslo_log import log
from faafo.openstack.common import service
from faafo.worker import service as worker
from faafo import version
CONF = cfg.CONF
# If ../faafo/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(possible_topdir, 'faafo', '__init__.py')):
sys.path.insert(0, possible_topdir)
if __name__ == '__main__':
log.register_options(CONF)
log.set_defaults()
CONF(project='worker', prog='faafo-worker',
version=version.version_info.version_string())
log.setup(CONF, 'worker',
version=version.version_info.version_string())
server = worker.get_server()
server.start()
try:
server.wait()
except KeyboardInterrupt:
LOG.info("Caught keyboard interrupt. Exiting.")

View File

View File

@ -23,13 +23,10 @@ import requests
from faafo.openstack.common import periodic_task
from faafo.openstack.common import service
from faafo import version
LOG = log.getLogger('faafo.producer')
cli_opts = [
]
CONF = cfg.CONF
producer_opts = [
cfg.BoolOpt("one-shot", default=False,
@ -72,36 +69,36 @@ producer_opts = [
help='API connection URL')
]
cfg.CONF.register_opts(producer_opts)
CONF.register_opts(producer_opts)
class ProducerService(service.Service, periodic_task.PeriodicTasks):
def __init__(self):
super(ProducerService, self).__init__()
self._periodic_last_run = {}
transport = messaging.get_transport(cfg.CONF)
transport = messaging.get_transport(CONF)
target = messaging.Target(topic='tasks')
self._client = messaging.RPCClient(transport, target)
@periodic_task.periodic_task(spacing=cfg.CONF.interval,
@periodic_task.periodic_task(spacing=CONF.interval,
run_immediately=False)
def generate_task(self, ctxt):
ctxt = {}
random.seed()
number = random.randint(cfg.CONF.min_tasks, cfg.CONF.max_tasks)
number = random.randint(CONF.min_tasks, CONF.max_tasks)
LOG.info("generating %d task(s)" % number)
for i in xrange(0, number):
task = self.get_random_task()
# NOTE(berendt): only necessary when using requests < 2.4.2
headers = {'Content-type': 'application/json',
'Accept': 'text/plain'}
requests.post("%s/api/fractal" % cfg.CONF.endpoint_url,
requests.post("%s/api/fractal" % CONF.endpoint_url,
json.dumps(task), headers=headers)
LOG.info("generated task: %s" % task)
result = self._client.call(ctxt, 'process', task=task)
LOG.info("task %s processed: %s" % (task['uuid'], result))
requests.put("%s/api/fractal/%s" %
(cfg.CONF.endpoint_url, str(task['uuid'])),
(CONF.endpoint_url, str(task['uuid'])),
json.dumps(result), headers=headers)
self.add_periodic_task(generate_task)
@ -115,14 +112,14 @@ class ProducerService(service.Service, periodic_task.PeriodicTasks):
def get_random_task():
random.seed()
width = random.randint(cfg.CONF.min_width, cfg.CONF.max_width)
height = random.randint(cfg.CONF.min_height, cfg.CONF.max_height)
iterations = random.randint(cfg.CONF.min_iterations,
cfg.CONF.max_iterations)
xa = random.uniform(cfg.CONF.min_xa, cfg.CONF.max_xa)
xb = random.uniform(cfg.CONF.min_xb, cfg.CONF.max_xb)
ya = random.uniform(cfg.CONF.min_ya, cfg.CONF.max_ya)
yb = random.uniform(cfg.CONF.min_yb, cfg.CONF.max_yb)
width = random.randint(CONF.min_width, CONF.max_width)
height = random.randint(CONF.min_height, CONF.max_height)
iterations = random.randint(CONF.min_iterations,
CONF.max_iterations)
xa = random.uniform(CONF.min_xa, CONF.max_xa)
xb = random.uniform(CONF.min_xb, CONF.max_xb)
ya = random.uniform(CONF.min_ya, CONF.max_ya)
yb = random.uniform(CONF.min_yb, CONF.max_yb)
task = {
'uuid': str(uuid.uuid4()),
@ -136,25 +133,3 @@ class ProducerService(service.Service, periodic_task.PeriodicTasks):
}
return task
def main():
log.register_options(cfg.CONF)
log.set_defaults()
cfg.CONF(project='producer', prog='faafo-producer',
version=version.version_info.version_string())
log.setup(cfg.CONF, 'producer',
version=version.version_info.version_string())
srv = ProducerService()
if cfg.CONF.one_shot:
srv.periodic_tasks()
else:
launcher = service.launch(srv)
launcher.wait()
if __name__ == '__main__':
main()

0
faafo/worker/__init__.py Normal file
View File

View File

@ -30,10 +30,10 @@ from oslo_config import cfg
from oslo_log import log
import oslo_messaging as messaging
from faafo import version
LOG = log.getLogger('faafo.worker')
CONF = cfg.CONF
glance_store.register_opts(CONF)
class JuliaSet(object):
@ -108,7 +108,7 @@ class WorkerEndpoint(object):
(task['uuid'], filename))
with open(filename, 'rb') as fp:
size = os.fstat(fp.fileno()).st_size
glance_store.add_to_backend(cfg.CONF, task['uuid'], fp, size)
glance_store.add_to_backend(CONF, task['uuid'], fp, size)
checksum = hashlib.sha256(open(filename, 'rb').read()).hexdigest()
LOG.debug("checksum for task %s: %s" % (task['uuid'], checksum))
os.remove(filename)
@ -122,19 +122,8 @@ class WorkerEndpoint(object):
return result
def main():
log.register_options(cfg.CONF)
log.set_defaults()
glance_store.register_opts(cfg.CONF)
cfg.CONF(project='worker', prog='faafo-worker',
version=version.version_info.version_string())
log.setup(cfg.CONF, 'worker',
version=version.version_info.version_string())
transport = messaging.get_transport(cfg.CONF)
def get_server():
transport = messaging.get_transport(CONF)
target = messaging.Target(topic='tasks', server=socket.gethostname())
endpoints = [
WorkerEndpoint()
@ -142,15 +131,7 @@ def main():
server = messaging.get_rpc_server(transport, target, endpoints,
executor='eventlet')
glance_store.create_stores(cfg.CONF)
glance_store.create_stores(CONF)
glance_store.verify_default_store()
server.start()
try:
server.wait()
except KeyboardInterrupt:
LOG.info("Caught keyboard interrupt. Exiting.")
if __name__ == '__main__':
main()
return server

View File

@ -18,7 +18,11 @@ classifier =
Programming Language :: Python :: 2.6
[files]
packages = faafo
packages =
faafo
scripts =
bin/faafo-producer
bin/faafo-worker
[global]
setup-hooks =
@ -26,8 +30,6 @@ setup-hooks =
[entry_points]
console_scripts =
faafo-producer = faafo.producer:main
faafo-worker = faafo.worker:main
faafo-api = faafo.api:main
[build_sphinx]