Change back from oslo.messaging to kombu

Use work queues instead of RPC casts to improve the API performance.

Change-Id: I6141dc0b8be38541314df3ae770f8651faa40ecd
This commit is contained in:
Christian Berendt 2015-04-02 17:29:44 +02:00
parent 47f2fbab8e
commit 8df64be42c
21 changed files with 78 additions and 2075 deletions

View File

@ -1,52 +0,0 @@
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import sys
from oslo_config import cfg
from oslo_log import log
from faafo.openstack.common import service
from faafo.producer import service as producer
from faafo import version
CONF = cfg.CONF
# If ../faafo/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(possible_topdir, 'faafo', '__init__.py')):
sys.path.insert(0, possible_topdir)
if __name__ == '__main__':
log.register_options(CONF)
log.set_defaults()
CONF(project='producer', prog='faafo-producer',
default_config_files=['/etc/faafo/faafo.conf'],
version=version.version_info.version_string())
log.setup(CONF, 'producer',
version=version.version_info.version_string())
srv = producer.ProducerService()
if CONF.one_shot:
srv.periodic_tasks()
else:
launcher = service.launch(srv)
launcher.wait()

9
bin/faafo-worker Executable file → Normal file
View File

@ -15,13 +15,14 @@
import os
import sys
import kombu
from oslo_config import cfg
from oslo_log import log
from faafo.openstack.common import service
from faafo.worker import service as worker
from faafo import version
LOG = log.getLogger('faafo.worker')
CONF = cfg.CONF
# If ../faafo/__init__.py exists, add ../ to Python search path, so that
@ -43,9 +44,9 @@ if __name__ == '__main__':
log.setup(CONF, 'worker',
version=version.version_info.version_string())
server = worker.get_server()
server.start()
connection = kombu.Connection(CONF.transport_url)
server = worker.Worker(connection)
try:
server.wait()
server.run()
except KeyboardInterrupt:
LOG.info("Caught keyboard interrupt. Exiting.")

View File

@ -26,7 +26,7 @@ if [[ -e /etc/os-release ]]; then
RUN_WORKER=0
URL_DATABASE='sqlite:////tmp/sqlite.db'
URL_ENDPOINT='http://127.0.0.1'
URL_MESSAGING='rabbit://guest:guest@localhost:5672/'
URL_MESSAGING='amqp://guest:guest@localhost:5672/'
while getopts e:m:d:i:r: FLAG; do
case $FLAG in

View File

@ -13,6 +13,13 @@
# Database connection URL. (string value)
database_url = sqlite:////tmp/sqlite.db
#
# From faafo.queues
#
# AMQP connection URL. (string value)
transport_url = amqp://guest:guest@localhost:5672//
#
# From faafo.worker
#
@ -105,327 +112,3 @@ verbose = true
# The format for an instance UUID that is passed with the log message.
# (string value)
#instance_uuid_format = "[instance: %(uuid)s] "
#
# From oslo.messaging
#
# ZeroMQ bind address. Should be a wildcard (*), an ethernet
# interface, or IP. The "host" option should point or resolve to this
# address. (string value)
#rpc_zmq_bind_address = *
# MatchMaker driver. (string value)
#rpc_zmq_matchmaker = oslo_messaging._drivers.matchmaker.MatchMakerLocalhost
# ZeroMQ receiver listening port. (integer value)
#rpc_zmq_port = 9501
# Number of ZeroMQ contexts, defaults to 1. (integer value)
#rpc_zmq_contexts = 1
# Maximum number of ingress messages to locally buffer per topic.
# Default is unlimited. (integer value)
#rpc_zmq_topic_backlog = <None>
# Directory for holding IPC sockets. (string value)
#rpc_zmq_ipc_dir = /var/run/openstack
# Name of this node. Must be a valid hostname, FQDN, or IP address.
# Must match "host" option, if running Nova. (string value)
#rpc_zmq_host = localhost
# Seconds to wait before a cast expires (TTL). Only supported by
# impl_zmq. (integer value)
#rpc_cast_timeout = 30
# Heartbeat frequency. (integer value)
#matchmaker_heartbeat_freq = 300
# Heartbeat time-to-live. (integer value)
#matchmaker_heartbeat_ttl = 600
# Size of RPC thread pool. (integer value)
#rpc_thread_pool_size = 64
# Driver or drivers to handle sending notifications. (multi valued)
#notification_driver =
# AMQP topic used for OpenStack notifications. (list value)
# Deprecated group/name - [rpc_notifier2]/topics
#notification_topics = notifications
# Seconds to wait for a response from a call. (integer value)
#rpc_response_timeout = 60
# A URL representing the messaging driver to use and its full
# configuration. If not set, we fall back to the rpc_backend option
# and driver specific configuration. (string value)
transport_url = rabbit://guest:guest@localhost:5672/
# The messaging driver to use, defaults to rabbit. Other drivers
# include qpid and zmq. (string value)
#rpc_backend = rabbit
# The default exchange under which topics are scoped. May be
# overridden by an exchange name specified in the transport_url
# option. (string value)
#control_exchange = openstack
[matchmaker_redis]
#
# From oslo.messaging
#
# Host to locate redis. (string value)
#host = 127.0.0.1
# Use this port to connect to redis host. (integer value)
#port = 6379
# Password for Redis server (optional). (string value)
#password = <None>
[matchmaker_ring]
#
# From oslo.messaging
#
# Matchmaker ring file (JSON). (string value)
# Deprecated group/name - [DEFAULT]/matchmaker_ringfile
#ringfile = /etc/oslo/matchmaker_ring.json
[oslo_messaging_amqp]
#
# From oslo.messaging
#
# address prefix used when sending to a specific server (string value)
# Deprecated group/name - [amqp1]/server_request_prefix
#server_request_prefix = exclusive
# address prefix used when broadcasting to all servers (string value)
# Deprecated group/name - [amqp1]/broadcast_prefix
#broadcast_prefix = broadcast
# address prefix when sending to any server in group (string value)
# Deprecated group/name - [amqp1]/group_request_prefix
#group_request_prefix = unicast
# Name for the AMQP container (string value)
# Deprecated group/name - [amqp1]/container_name
#container_name = <None>
# Timeout for inactive connections (in seconds) (integer value)
# Deprecated group/name - [amqp1]/idle_timeout
#idle_timeout = 0
# Debug: dump AMQP frames to stdout (boolean value)
# Deprecated group/name - [amqp1]/trace
#trace = false
# CA certificate PEM file for verifing server certificate (string
# value)
# Deprecated group/name - [amqp1]/ssl_ca_file
#ssl_ca_file =
# Identifying certificate PEM file to present to clients (string
# value)
# Deprecated group/name - [amqp1]/ssl_cert_file
#ssl_cert_file =
# Private key PEM file used to sign cert_file certificate (string
# value)
# Deprecated group/name - [amqp1]/ssl_key_file
#ssl_key_file =
# Password for decrypting ssl_key_file (if encrypted) (string value)
# Deprecated group/name - [amqp1]/ssl_key_password
#ssl_key_password = <None>
# Accept clients using either SSL or plain TCP (boolean value)
# Deprecated group/name - [amqp1]/allow_insecure_clients
#allow_insecure_clients = false
[oslo_messaging_qpid]
#
# From oslo.messaging
#
# Use durable queues in AMQP. (boolean value)
# Deprecated group/name - [DEFAULT]/rabbit_durable_queues
#amqp_durable_queues = false
# Auto-delete queues in AMQP. (boolean value)
# Deprecated group/name - [DEFAULT]/amqp_auto_delete
#amqp_auto_delete = false
# Size of RPC connection pool. (integer value)
# Deprecated group/name - [DEFAULT]/rpc_conn_pool_size
#rpc_conn_pool_size = 30
# Qpid broker hostname. (string value)
# Deprecated group/name - [DEFAULT]/qpid_hostname
#qpid_hostname = localhost
# Qpid broker port. (integer value)
# Deprecated group/name - [DEFAULT]/qpid_port
#qpid_port = 5672
# Qpid HA cluster host:port pairs. (list value)
# Deprecated group/name - [DEFAULT]/qpid_hosts
#qpid_hosts = $qpid_hostname:$qpid_port
# Username for Qpid connection. (string value)
# Deprecated group/name - [DEFAULT]/qpid_username
#qpid_username =
# Password for Qpid connection. (string value)
# Deprecated group/name - [DEFAULT]/qpid_password
#qpid_password =
# Space separated list of SASL mechanisms to use for auth. (string
# value)
# Deprecated group/name - [DEFAULT]/qpid_sasl_mechanisms
#qpid_sasl_mechanisms =
# Seconds between connection keepalive heartbeats. (integer value)
# Deprecated group/name - [DEFAULT]/qpid_heartbeat
#qpid_heartbeat = 60
# Transport to use, either 'tcp' or 'ssl'. (string value)
# Deprecated group/name - [DEFAULT]/qpid_protocol
#qpid_protocol = tcp
# Whether to disable the Nagle algorithm. (boolean value)
# Deprecated group/name - [DEFAULT]/qpid_tcp_nodelay
#qpid_tcp_nodelay = true
# The number of prefetched messages held by receiver. (integer value)
# Deprecated group/name - [DEFAULT]/qpid_receiver_capacity
#qpid_receiver_capacity = 1
# The qpid topology version to use. Version 1 is what was originally
# used by impl_qpid. Version 2 includes some backwards-incompatible
# changes that allow broker federation to work. Users should update
# to version 2 when they are able to take everything down, as it
# requires a clean break. (integer value)
# Deprecated group/name - [DEFAULT]/qpid_topology_version
#qpid_topology_version = 1
[oslo_messaging_rabbit]
#
# From oslo.messaging
#
# Use durable queues in AMQP. (boolean value)
# Deprecated group/name - [DEFAULT]/rabbit_durable_queues
#amqp_durable_queues = false
# Auto-delete queues in AMQP. (boolean value)
# Deprecated group/name - [DEFAULT]/amqp_auto_delete
#amqp_auto_delete = false
# Size of RPC connection pool. (integer value)
# Deprecated group/name - [DEFAULT]/rpc_conn_pool_size
#rpc_conn_pool_size = 30
# SSL version to use (valid only if SSL enabled). Valid values are
# TLSv1 and SSLv23. SSLv2, SSLv3, TLSv1_1, and TLSv1_2 may be
# available on some distributions. (string value)
# Deprecated group/name - [DEFAULT]/kombu_ssl_version
#kombu_ssl_version =
# SSL key file (valid only if SSL enabled). (string value)
# Deprecated group/name - [DEFAULT]/kombu_ssl_keyfile
#kombu_ssl_keyfile =
# SSL cert file (valid only if SSL enabled). (string value)
# Deprecated group/name - [DEFAULT]/kombu_ssl_certfile
#kombu_ssl_certfile =
# SSL certification authority file (valid only if SSL enabled).
# (string value)
# Deprecated group/name - [DEFAULT]/kombu_ssl_ca_certs
#kombu_ssl_ca_certs =
# How long to wait before reconnecting in response to an AMQP consumer
# cancel notification. (floating point value)
# Deprecated group/name - [DEFAULT]/kombu_reconnect_delay
#kombu_reconnect_delay = 1.0
# The RabbitMQ broker address where a single node is used. (string
# value)
# Deprecated group/name - [DEFAULT]/rabbit_host
#rabbit_host = localhost
# The RabbitMQ broker port where a single node is used. (integer
# value)
# Deprecated group/name - [DEFAULT]/rabbit_port
#rabbit_port = 5672
# RabbitMQ HA cluster host:port pairs. (list value)
# Deprecated group/name - [DEFAULT]/rabbit_hosts
#rabbit_hosts = $rabbit_host:$rabbit_port
# Connect over SSL for RabbitMQ. (boolean value)
# Deprecated group/name - [DEFAULT]/rabbit_use_ssl
#rabbit_use_ssl = false
# The RabbitMQ userid. (string value)
# Deprecated group/name - [DEFAULT]/rabbit_userid
#rabbit_userid = guest
# The RabbitMQ password. (string value)
# Deprecated group/name - [DEFAULT]/rabbit_password
#rabbit_password = guest
# The RabbitMQ login method. (string value)
# Deprecated group/name - [DEFAULT]/rabbit_login_method
#rabbit_login_method = AMQPLAIN
# The RabbitMQ virtual host. (string value)
# Deprecated group/name - [DEFAULT]/rabbit_virtual_host
#rabbit_virtual_host = /
# How frequently to retry connecting with RabbitMQ. (integer value)
#rabbit_retry_interval = 1
# How long to backoff for between retries when connecting to RabbitMQ.
# (integer value)
# Deprecated group/name - [DEFAULT]/rabbit_retry_backoff
#rabbit_retry_backoff = 2
# Maximum number of RabbitMQ connection retries. Default is 0
# (infinite retry count). (integer value)
# Deprecated group/name - [DEFAULT]/rabbit_max_retries
#rabbit_max_retries = 0
# Use HA queues in RabbitMQ (x-ha-policy: all). If you change this
# option, you must wipe the RabbitMQ database. (boolean value)
# Deprecated group/name - [DEFAULT]/rabbit_ha_queues
#rabbit_ha_queues = false
# Number of seconds after which the Rabbit broker is considered down
# if heartbeat's keep-alive fails (0 disable the heartbeat). (integer
# value)
#heartbeat_timeout_threshold = 60
# How often times during the heartbeat_timeout_threshold we check the
# heartbeat. (integer value)
#heartbeat_rate = 2
# Deprecated, use rpc_backend=kombu+memory or rpc_backend=fake
# (boolean value)
# Deprecated group/name - [DEFAULT]/fake_rabbit
#fake_rabbit = false

View File

@ -13,6 +13,13 @@
# Database connection URL. (string value)
#database_url = sqlite:////tmp/sqlite.db
#
# From faafo.queues
#
# AMQP connection URL. (string value)
#transport_url = amqp://guest:guest@localhost:5672//
#
# From faafo.worker
#
@ -105,327 +112,3 @@
# The format for an instance UUID that is passed with the log message.
# (string value)
#instance_uuid_format = "[instance: %(uuid)s] "
#
# From oslo.messaging
#
# ZeroMQ bind address. Should be a wildcard (*), an ethernet
# interface, or IP. The "host" option should point or resolve to this
# address. (string value)
#rpc_zmq_bind_address = *
# MatchMaker driver. (string value)
#rpc_zmq_matchmaker = oslo_messaging._drivers.matchmaker.MatchMakerLocalhost
# ZeroMQ receiver listening port. (integer value)
#rpc_zmq_port = 9501
# Number of ZeroMQ contexts, defaults to 1. (integer value)
#rpc_zmq_contexts = 1
# Maximum number of ingress messages to locally buffer per topic.
# Default is unlimited. (integer value)
#rpc_zmq_topic_backlog = <None>
# Directory for holding IPC sockets. (string value)
#rpc_zmq_ipc_dir = /var/run/openstack
# Name of this node. Must be a valid hostname, FQDN, or IP address.
# Must match "host" option, if running Nova. (string value)
#rpc_zmq_host = localhost
# Seconds to wait before a cast expires (TTL). Only supported by
# impl_zmq. (integer value)
#rpc_cast_timeout = 30
# Heartbeat frequency. (integer value)
#matchmaker_heartbeat_freq = 300
# Heartbeat time-to-live. (integer value)
#matchmaker_heartbeat_ttl = 600
# Size of RPC thread pool. (integer value)
#rpc_thread_pool_size = 64
# Driver or drivers to handle sending notifications. (multi valued)
#notification_driver =
# AMQP topic used for OpenStack notifications. (list value)
# Deprecated group/name - [rpc_notifier2]/topics
#notification_topics = notifications
# Seconds to wait for a response from a call. (integer value)
#rpc_response_timeout = 60
# A URL representing the messaging driver to use and its full
# configuration. If not set, we fall back to the rpc_backend option
# and driver specific configuration. (string value)
#transport_url = <None>
# The messaging driver to use, defaults to rabbit. Other drivers
# include qpid and zmq. (string value)
#rpc_backend = rabbit
# The default exchange under which topics are scoped. May be
# overridden by an exchange name specified in the transport_url
# option. (string value)
#control_exchange = openstack
[matchmaker_redis]
#
# From oslo.messaging
#
# Host to locate redis. (string value)
#host = 127.0.0.1
# Use this port to connect to redis host. (integer value)
#port = 6379
# Password for Redis server (optional). (string value)
#password = <None>
[matchmaker_ring]
#
# From oslo.messaging
#
# Matchmaker ring file (JSON). (string value)
# Deprecated group/name - [DEFAULT]/matchmaker_ringfile
#ringfile = /etc/oslo/matchmaker_ring.json
[oslo_messaging_amqp]
#
# From oslo.messaging
#
# address prefix used when sending to a specific server (string value)
# Deprecated group/name - [amqp1]/server_request_prefix
#server_request_prefix = exclusive
# address prefix used when broadcasting to all servers (string value)
# Deprecated group/name - [amqp1]/broadcast_prefix
#broadcast_prefix = broadcast
# address prefix when sending to any server in group (string value)
# Deprecated group/name - [amqp1]/group_request_prefix
#group_request_prefix = unicast
# Name for the AMQP container (string value)
# Deprecated group/name - [amqp1]/container_name
#container_name = <None>
# Timeout for inactive connections (in seconds) (integer value)
# Deprecated group/name - [amqp1]/idle_timeout
#idle_timeout = 0
# Debug: dump AMQP frames to stdout (boolean value)
# Deprecated group/name - [amqp1]/trace
#trace = false
# CA certificate PEM file for verifing server certificate (string
# value)
# Deprecated group/name - [amqp1]/ssl_ca_file
#ssl_ca_file =
# Identifying certificate PEM file to present to clients (string
# value)
# Deprecated group/name - [amqp1]/ssl_cert_file
#ssl_cert_file =
# Private key PEM file used to sign cert_file certificate (string
# value)
# Deprecated group/name - [amqp1]/ssl_key_file
#ssl_key_file =
# Password for decrypting ssl_key_file (if encrypted) (string value)
# Deprecated group/name - [amqp1]/ssl_key_password
#ssl_key_password = <None>
# Accept clients using either SSL or plain TCP (boolean value)
# Deprecated group/name - [amqp1]/allow_insecure_clients
#allow_insecure_clients = false
[oslo_messaging_qpid]
#
# From oslo.messaging
#
# Use durable queues in AMQP. (boolean value)
# Deprecated group/name - [DEFAULT]/rabbit_durable_queues
#amqp_durable_queues = false
# Auto-delete queues in AMQP. (boolean value)
# Deprecated group/name - [DEFAULT]/amqp_auto_delete
#amqp_auto_delete = false
# Size of RPC connection pool. (integer value)
# Deprecated group/name - [DEFAULT]/rpc_conn_pool_size
#rpc_conn_pool_size = 30
# Qpid broker hostname. (string value)
# Deprecated group/name - [DEFAULT]/qpid_hostname
#qpid_hostname = localhost
# Qpid broker port. (integer value)
# Deprecated group/name - [DEFAULT]/qpid_port
#qpid_port = 5672
# Qpid HA cluster host:port pairs. (list value)
# Deprecated group/name - [DEFAULT]/qpid_hosts
#qpid_hosts = $qpid_hostname:$qpid_port
# Username for Qpid connection. (string value)
# Deprecated group/name - [DEFAULT]/qpid_username
#qpid_username =
# Password for Qpid connection. (string value)
# Deprecated group/name - [DEFAULT]/qpid_password
#qpid_password =
# Space separated list of SASL mechanisms to use for auth. (string
# value)
# Deprecated group/name - [DEFAULT]/qpid_sasl_mechanisms
#qpid_sasl_mechanisms =
# Seconds between connection keepalive heartbeats. (integer value)
# Deprecated group/name - [DEFAULT]/qpid_heartbeat
#qpid_heartbeat = 60
# Transport to use, either 'tcp' or 'ssl'. (string value)
# Deprecated group/name - [DEFAULT]/qpid_protocol
#qpid_protocol = tcp
# Whether to disable the Nagle algorithm. (boolean value)
# Deprecated group/name - [DEFAULT]/qpid_tcp_nodelay
#qpid_tcp_nodelay = true
# The number of prefetched messages held by receiver. (integer value)
# Deprecated group/name - [DEFAULT]/qpid_receiver_capacity
#qpid_receiver_capacity = 1
# The qpid topology version to use. Version 1 is what was originally
# used by impl_qpid. Version 2 includes some backwards-incompatible
# changes that allow broker federation to work. Users should update
# to version 2 when they are able to take everything down, as it
# requires a clean break. (integer value)
# Deprecated group/name - [DEFAULT]/qpid_topology_version
#qpid_topology_version = 1
[oslo_messaging_rabbit]
#
# From oslo.messaging
#
# Use durable queues in AMQP. (boolean value)
# Deprecated group/name - [DEFAULT]/rabbit_durable_queues
#amqp_durable_queues = false
# Auto-delete queues in AMQP. (boolean value)
# Deprecated group/name - [DEFAULT]/amqp_auto_delete
#amqp_auto_delete = false
# Size of RPC connection pool. (integer value)
# Deprecated group/name - [DEFAULT]/rpc_conn_pool_size
#rpc_conn_pool_size = 30
# SSL version to use (valid only if SSL enabled). Valid values are
# TLSv1 and SSLv23. SSLv2, SSLv3, TLSv1_1, and TLSv1_2 may be
# available on some distributions. (string value)
# Deprecated group/name - [DEFAULT]/kombu_ssl_version
#kombu_ssl_version =
# SSL key file (valid only if SSL enabled). (string value)
# Deprecated group/name - [DEFAULT]/kombu_ssl_keyfile
#kombu_ssl_keyfile =
# SSL cert file (valid only if SSL enabled). (string value)
# Deprecated group/name - [DEFAULT]/kombu_ssl_certfile
#kombu_ssl_certfile =
# SSL certification authority file (valid only if SSL enabled).
# (string value)
# Deprecated group/name - [DEFAULT]/kombu_ssl_ca_certs
#kombu_ssl_ca_certs =
# How long to wait before reconnecting in response to an AMQP consumer
# cancel notification. (floating point value)
# Deprecated group/name - [DEFAULT]/kombu_reconnect_delay
#kombu_reconnect_delay = 1.0
# The RabbitMQ broker address where a single node is used. (string
# value)
# Deprecated group/name - [DEFAULT]/rabbit_host
#rabbit_host = localhost
# The RabbitMQ broker port where a single node is used. (integer
# value)
# Deprecated group/name - [DEFAULT]/rabbit_port
#rabbit_port = 5672
# RabbitMQ HA cluster host:port pairs. (list value)
# Deprecated group/name - [DEFAULT]/rabbit_hosts
#rabbit_hosts = $rabbit_host:$rabbit_port
# Connect over SSL for RabbitMQ. (boolean value)
# Deprecated group/name - [DEFAULT]/rabbit_use_ssl
#rabbit_use_ssl = false
# The RabbitMQ userid. (string value)
# Deprecated group/name - [DEFAULT]/rabbit_userid
#rabbit_userid = guest
# The RabbitMQ password. (string value)
# Deprecated group/name - [DEFAULT]/rabbit_password
#rabbit_password = guest
# The RabbitMQ login method. (string value)
# Deprecated group/name - [DEFAULT]/rabbit_login_method
#rabbit_login_method = AMQPLAIN
# The RabbitMQ virtual host. (string value)
# Deprecated group/name - [DEFAULT]/rabbit_virtual_host
#rabbit_virtual_host = /
# How frequently to retry connecting with RabbitMQ. (integer value)
#rabbit_retry_interval = 1
# How long to backoff for between retries when connecting to RabbitMQ.
# (integer value)
# Deprecated group/name - [DEFAULT]/rabbit_retry_backoff
#rabbit_retry_backoff = 2
# Maximum number of RabbitMQ connection retries. Default is 0
# (infinite retry count). (integer value)
# Deprecated group/name - [DEFAULT]/rabbit_max_retries
#rabbit_max_retries = 0
# Use HA queues in RabbitMQ (x-ha-policy: all). If you change this
# option, you must wipe the RabbitMQ database. (boolean value)
# Deprecated group/name - [DEFAULT]/rabbit_ha_queues
#rabbit_ha_queues = false
# Number of seconds after which the Rabbit broker is considered down
# if heartbeat's keep-alive fails (0 disable the heartbeat). (integer
# value)
#heartbeat_timeout_threshold = 60
# How often times during the heartbeat_timeout_threshold we check the
# heartbeat. (integer value)
#heartbeat_rate = 2
# Deprecated, use rpc_backend=kombu+memory or rpc_backend=fake
# (boolean value)
# Deprecated group/name - [DEFAULT]/fake_rabbit
#fake_rabbit = false

View File

@ -2,5 +2,5 @@
output_file = etc/faafo.conf.sample
namespace = faafo.worker
namespace = faafo.api
namespace = oslo.messaging
namespace = oslo.log
namespace = faafo.queues

View File

@ -19,11 +19,13 @@ import flask
import flask.ext.restless
import flask.ext.sqlalchemy
from flask_bootstrap import Bootstrap
from kombu import Connection
from kombu.pools import producers
from oslo_config import cfg
from oslo_log import log
import oslo_messaging as messaging
from PIL import Image
from faafo import queues
from faafo import version
LOG = log.getLogger('faafo.api')
@ -87,10 +89,7 @@ class Fractal(db.Model):
db.create_all()
manager = flask.ext.restless.APIManager(app, flask_sqlalchemy_db=db)
transport = messaging.get_transport(CONF)
target = messaging.Target(topic='tasks')
client = messaging.RPCClient(transport, target)
connection = Connection(CONF.transport_url)
@app.route('/', methods=['GET'])
@ -123,7 +122,12 @@ def get_fractal(fractalid):
def generate_fractal(**kwargs):
client.cast({}, 'process', task=kwargs['result'])
with producers[connection].acquire(block=True) as producer:
producer.publish(kwargs['result'],
serializer='json',
exchange=queues.task_exchange,
declare=[queues.task_exchange],
routing_key='normal')
def main():
@ -132,6 +136,3 @@ def main():
exclude_columns=['image'],
url_prefix='/v1')
app.run(host=CONF.listen_address, port=CONF.bind_port)
if __name__ == '__main__':
main()

View File

@ -1,45 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""oslo.i18n integration module.
See http://docs.openstack.org/developer/oslo.i18n/usage.html
"""
try:
import oslo_i18n
# NOTE(dhellmann): This reference to o-s-l-o will be replaced by the
# application name when this module is synced into the separate
# repository. It is OK to have more than one translation function
# using the same domain, since there will still only be one message
# catalog.
_translators = oslo_i18n.TranslatorFactory(domain='faafo')
# The primary translation function using the well-known name "_"
_ = _translators.primary
# Translators for log levels.
#
# The abbreviated names are meant to reflect the usual use of a short
# name like '_'. The "L" is for "log" and the other letter comes from
# the level.
_LI = _translators.log_info
_LW = _translators.log_warning
_LE = _translators.log_error
_LC = _translators.log_critical
except ImportError:
# NOTE(dims): Support for cases where a project wants to use
# code from oslo-incubator, but is not ready to be internationalized
# (like tempest)
_ = _LI = _LW = _LE = _LC = lambda x: x

View File

@ -1,151 +0,0 @@
# Copyright (c) 2012 OpenStack Foundation.
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import copy
import errno
import gc
import logging
import os
import pprint
import socket
import sys
import traceback
import eventlet.backdoor
import greenlet
from oslo_config import cfg
from faafo.openstack.common._i18n import _LI
help_for_backdoor_port = (
"Acceptable values are 0, <port>, and <start>:<end>, where 0 results "
"in listening on a random tcp port number; <port> results in listening "
"on the specified port number (and not enabling backdoor if that port "
"is in use); and <start>:<end> results in listening on the smallest "
"unused port number within the specified range of port numbers. The "
"chosen port is displayed in the service's log file.")
eventlet_backdoor_opts = [
cfg.StrOpt('backdoor_port',
help="Enable eventlet backdoor. %s" % help_for_backdoor_port)
]
CONF = cfg.CONF
CONF.register_opts(eventlet_backdoor_opts)
LOG = logging.getLogger(__name__)
def list_opts():
"""Entry point for oslo-config-generator.
"""
return [(None, copy.deepcopy(eventlet_backdoor_opts))]
class EventletBackdoorConfigValueError(Exception):
def __init__(self, port_range, help_msg, ex):
msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. '
'%(help)s' %
{'range': port_range, 'ex': ex, 'help': help_msg})
super(EventletBackdoorConfigValueError, self).__init__(msg)
self.port_range = port_range
def _dont_use_this():
print("Don't use this, just disconnect instead")
def _find_objects(t):
return [o for o in gc.get_objects() if isinstance(o, t)]
def _print_greenthreads():
for i, gt in enumerate(_find_objects(greenlet.greenlet)):
print(i, gt)
traceback.print_stack(gt.gr_frame)
print()
def _print_nativethreads():
for threadId, stack in sys._current_frames().items():
print(threadId)
traceback.print_stack(stack)
print()
def _parse_port_range(port_range):
if ':' not in port_range:
start, end = port_range, port_range
else:
start, end = port_range.split(':', 1)
try:
start, end = int(start), int(end)
if end < start:
raise ValueError
return start, end
except ValueError as ex:
raise EventletBackdoorConfigValueError(port_range, ex,
help_for_backdoor_port)
def _listen(host, start_port, end_port, listen_func):
try_port = start_port
while True:
try:
return listen_func((host, try_port))
except socket.error as exc:
if (exc.errno != errno.EADDRINUSE or
try_port >= end_port):
raise
try_port += 1
def initialize_if_enabled():
backdoor_locals = {
'exit': _dont_use_this, # So we don't exit the entire process
'quit': _dont_use_this, # So we don't exit the entire process
'fo': _find_objects,
'pgt': _print_greenthreads,
'pnt': _print_nativethreads,
}
if CONF.backdoor_port is None:
return None
start_port, end_port = _parse_port_range(str(CONF.backdoor_port))
# NOTE(johannes): The standard sys.displayhook will print the value of
# the last expression and set it to __builtin__._, which overwrites
# the __builtin__._ that gettext sets. Let's switch to using pprint
# since it won't interact poorly with gettext, and it's easier to
# read the output too.
def displayhook(val):
if val is not None:
pprint.pprint(val)
sys.displayhook = displayhook
sock = _listen('localhost', start_port, end_port, eventlet.listen)
# In the case of backdoor port being zero, a port number is assigned by
# listen(). In any case, pull the port number out here.
port = sock.getsockname()[1]
LOG.info(
_LI('Eventlet backdoor listening on %(port)s for process %(pid)d') %
{'port': port, 'pid': os.getpid()}
)
eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
locals=backdoor_locals)
return port

View File

@ -1,147 +0,0 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import sys
import time
from eventlet import event
from eventlet import greenthread
from faafo.openstack.common._i18n import _LE, _LW
LOG = logging.getLogger(__name__)
# NOTE(zyluo): This lambda function was declared to avoid mocking collisions
# with time.time() called in the standard logging module
# during unittests.
_ts = lambda: time.time()
class LoopingCallDone(Exception):
"""Exception to break out and stop a LoopingCallBase.
The poll-function passed to LoopingCallBase can raise this exception to
break out of the loop normally. This is somewhat analogous to
StopIteration.
An optional return-value can be included as the argument to the exception;
this return-value will be returned by LoopingCallBase.wait()
"""
def __init__(self, retvalue=True):
""":param retvalue: Value that LoopingCallBase.wait() should return."""
self.retvalue = retvalue
class LoopingCallBase(object):
def __init__(self, f=None, *args, **kw):
self.args = args
self.kw = kw
self.f = f
self._running = False
self.done = None
def stop(self):
self._running = False
def wait(self):
return self.done.wait()
class FixedIntervalLoopingCall(LoopingCallBase):
"""A fixed interval looping call."""
def start(self, interval, initial_delay=None):
self._running = True
done = event.Event()
def _inner():
if initial_delay:
greenthread.sleep(initial_delay)
try:
while self._running:
start = _ts()
self.f(*self.args, **self.kw)
end = _ts()
if not self._running:
break
delay = end - start - interval
if delay > 0:
LOG.warn(_LW('task %(func_name)r run outlasted '
'interval by %(delay).2f sec'),
{'func_name': self.f, 'delay': delay})
greenthread.sleep(-delay if delay < 0 else 0)
except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:
LOG.exception(_LE('in fixed duration looping call'))
done.send_exception(*sys.exc_info())
return
else:
done.send(True)
self.done = done
greenthread.spawn_n(_inner)
return self.done
class DynamicLoopingCall(LoopingCallBase):
"""A looping call which sleeps until the next known event.
The function called should return how long to sleep for before being
called again.
"""
def start(self, initial_delay=None, periodic_interval_max=None):
self._running = True
done = event.Event()
def _inner():
if initial_delay:
greenthread.sleep(initial_delay)
try:
while self._running:
idle = self.f(*self.args, **self.kw)
if not self._running:
break
if periodic_interval_max is not None:
idle = min(idle, periodic_interval_max)
LOG.debug('Dynamic looping call %(func_name)r sleeping '
'for %(idle).02f seconds',
{'func_name': self.f, 'idle': idle})
greenthread.sleep(idle)
except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:
LOG.exception(_LE('in dynamic looping call'))
done.send_exception(*sys.exc_info())
return
else:
done.send(True)
self.done = done
greenthread.spawn(_inner)
return self.done

View File

@ -1,232 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import logging
import random
import time
from oslo_config import cfg
import six
from faafo.openstack.common._i18n import _, _LE, _LI
periodic_opts = [
cfg.BoolOpt('run_external_periodic_tasks',
default=True,
help='Some periodic tasks can be run in a separate process. '
'Should we run them here?'),
]
CONF = cfg.CONF
CONF.register_opts(periodic_opts)
LOG = logging.getLogger(__name__)
DEFAULT_INTERVAL = 60.0
def list_opts():
"""Entry point for oslo-config-generator."""
return [(None, copy.deepcopy(periodic_opts))]
class InvalidPeriodicTaskArg(Exception):
message = _("Unexpected argument for periodic task creation: %(arg)s.")
def periodic_task(*args, **kwargs):
"""Decorator to indicate that a method is a periodic task.
This decorator can be used in two ways:
1. Without arguments '@periodic_task', this will be run on the default
interval of 60 seconds.
2. With arguments:
@periodic_task(spacing=N [, run_immediately=[True|False]]
[, name=[None|"string"])
this will be run on approximately every N seconds. If this number is
negative the periodic task will be disabled. If the run_immediately
argument is provided and has a value of 'True', the first run of the
task will be shortly after task scheduler starts. If
run_immediately is omitted or set to 'False', the first time the
task runs will be approximately N seconds after the task scheduler
starts. If name is not provided, __name__ of function is used.
"""
def decorator(f):
# Test for old style invocation
if 'ticks_between_runs' in kwargs:
raise InvalidPeriodicTaskArg(arg='ticks_between_runs')
# Control if run at all
f._periodic_task = True
f._periodic_external_ok = kwargs.pop('external_process_ok', False)
if f._periodic_external_ok and not CONF.run_external_periodic_tasks:
f._periodic_enabled = False
else:
f._periodic_enabled = kwargs.pop('enabled', True)
f._periodic_name = kwargs.pop('name', f.__name__)
# Control frequency
f._periodic_spacing = kwargs.pop('spacing', 0)
f._periodic_immediate = kwargs.pop('run_immediately', False)
if f._periodic_immediate:
f._periodic_last_run = None
else:
f._periodic_last_run = time.time()
return f
# NOTE(sirp): The `if` is necessary to allow the decorator to be used with
# and without parenthesis.
#
# In the 'with-parenthesis' case (with kwargs present), this function needs
# to return a decorator function since the interpreter will invoke it like:
#
# periodic_task(*args, **kwargs)(f)
#
# In the 'without-parenthesis' case, the original function will be passed
# in as the first argument, like:
#
# periodic_task(f)
if kwargs:
return decorator
else:
return decorator(args[0])
class _PeriodicTasksMeta(type):
def _add_periodic_task(cls, task):
"""Add a periodic task to the list of periodic tasks.
The task should already be decorated by @periodic_task.
:return: whether task was actually enabled
"""
name = task._periodic_name
if task._periodic_spacing < 0:
LOG.info(_LI('Skipping periodic task %(task)s because '
'its interval is negative'),
{'task': name})
return False
if not task._periodic_enabled:
LOG.info(_LI('Skipping periodic task %(task)s because '
'it is disabled'),
{'task': name})
return False
# A periodic spacing of zero indicates that this task should
# be run on the default interval to avoid running too
# frequently.
if task._periodic_spacing == 0:
task._periodic_spacing = DEFAULT_INTERVAL
cls._periodic_tasks.append((name, task))
cls._periodic_spacing[name] = task._periodic_spacing
return True
def __init__(cls, names, bases, dict_):
"""Metaclass that allows us to collect decorated periodic tasks."""
super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)
# NOTE(sirp): if the attribute is not present then we must be the base
# class, so, go ahead an initialize it. If the attribute is present,
# then we're a subclass so make a copy of it so we don't step on our
# parent's toes.
try:
cls._periodic_tasks = cls._periodic_tasks[:]
except AttributeError:
cls._periodic_tasks = []
try:
cls._periodic_spacing = cls._periodic_spacing.copy()
except AttributeError:
cls._periodic_spacing = {}
for value in cls.__dict__.values():
if getattr(value, '_periodic_task', False):
cls._add_periodic_task(value)
def _nearest_boundary(last_run, spacing):
"""Find nearest boundary which is in the past, which is a multiple of the
spacing with the last run as an offset.
Eg if last run was 10 and spacing was 7, the new last run could be: 17, 24,
31, 38...
0% to 5% of the spacing value will be added to this value to ensure tasks
do not synchronize. This jitter is rounded to the nearest second, this
means that spacings smaller than 20 seconds will not have jitter.
"""
current_time = time.time()
if last_run is None:
return current_time
delta = current_time - last_run
offset = delta % spacing
# Add up to 5% jitter
jitter = int(spacing * (random.random() / 20))
return current_time - offset + jitter
@six.add_metaclass(_PeriodicTasksMeta)
class PeriodicTasks(object):
def __init__(self):
super(PeriodicTasks, self).__init__()
self._periodic_last_run = {}
for name, task in self._periodic_tasks:
self._periodic_last_run[name] = task._periodic_last_run
def add_periodic_task(self, task):
"""Add a periodic task to the list of periodic tasks.
The task should already be decorated by @periodic_task.
"""
if self.__class__._add_periodic_task(task):
self._periodic_last_run[task._periodic_name] = (
task._periodic_last_run)
def run_periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
idle_for = DEFAULT_INTERVAL
for task_name, task in self._periodic_tasks:
full_task_name = '.'.join([self.__class__.__name__, task_name])
spacing = self._periodic_spacing[task_name]
last_run = self._periodic_last_run[task_name]
# Check if due, if not skip
idle_for = min(idle_for, spacing)
if last_run is not None:
delta = last_run + spacing - time.time()
if delta > 0:
idle_for = min(idle_for, delta)
continue
LOG.debug("Running periodic task %(full_task_name)s",
{"full_task_name": full_task_name})
self._periodic_last_run[task_name] = _nearest_boundary(
last_run, spacing)
try:
task(self, context)
except Exception as e:
if raise_on_error:
raise
LOG.exception(_LE("Error during %(full_task_name)s: %(e)s"),
{"full_task_name": full_task_name, "e": e})
time.sleep(0)
return idle_for

View File

@ -1,503 +0,0 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Generic Node base class for all workers that run on hosts."""
import errno
import logging
import os
import random
import signal
import sys
import time
try:
# Importing just the symbol here because the io module does not
# exist in Python 2.6.
from io import UnsupportedOperation # noqa
except ImportError:
# Python 2.6
UnsupportedOperation = None
import eventlet
from eventlet import event
from oslo_config import cfg
from faafo.openstack.common import eventlet_backdoor
from faafo.openstack.common._i18n import _LE, _LI, _LW
from faafo.openstack.common import systemd
from faafo.openstack.common import threadgroup
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def _sighup_supported():
return hasattr(signal, 'SIGHUP')
def _is_daemon():
# The process group for a foreground process will match the
# process group of the controlling terminal. If those values do
# not match, or ioctl() fails on the stdout file handle, we assume
# the process is running in the background as a daemon.
# http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics
try:
is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())
except OSError as err:
if err.errno == errno.ENOTTY:
# Assume we are a daemon because there is no terminal.
is_daemon = True
else:
raise
except UnsupportedOperation:
# Could not get the fileno for stdout, so we must be a daemon.
is_daemon = True
return is_daemon
def _is_sighup_and_daemon(signo):
if not (_sighup_supported() and signo == signal.SIGHUP):
# Avoid checking if we are a daemon, because the signal isn't
# SIGHUP.
return False
return _is_daemon()
def _signo_to_signame(signo):
signals = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}
if _sighup_supported():
signals[signal.SIGHUP] = 'SIGHUP'
return signals[signo]
def _set_signals_handler(handler):
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)
if _sighup_supported():
signal.signal(signal.SIGHUP, handler)
class Launcher(object):
"""Launch one or more services and wait for them to complete."""
def __init__(self):
"""Initialize the service launcher.
:returns: None
"""
self.services = Services()
self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
def launch_service(self, service):
"""Load and start the given service.
:param service: The service you would like to start.
:returns: None
"""
service.backdoor_port = self.backdoor_port
self.services.add(service)
def stop(self):
"""Stop all services which are currently running.
:returns: None
"""
self.services.stop()
def wait(self):
"""Waits until all services have been stopped, and then returns.
:returns: None
"""
self.services.wait()
def restart(self):
"""Reload config files and restart service.
:returns: None
"""
cfg.CONF.reload_config_files()
self.services.restart()
class SignalExit(SystemExit):
def __init__(self, signo, exccode=1):
super(SignalExit, self).__init__(exccode)
self.signo = signo
class ServiceLauncher(Launcher):
def _handle_signal(self, signo, frame):
# Allow the process to be killed again and die from natural causes
_set_signals_handler(signal.SIG_DFL)
raise SignalExit(signo)
def handle_signal(self):
_set_signals_handler(self._handle_signal)
def _wait_for_exit_or_signal(self, ready_callback=None):
status = None
signo = 0
LOG.debug('Full set of CONF:')
CONF.log_opt_values(LOG, logging.DEBUG)
try:
if ready_callback:
ready_callback()
super(ServiceLauncher, self).wait()
except SignalExit as exc:
signame = _signo_to_signame(exc.signo)
LOG.info(_LI('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
except SystemExit as exc:
status = exc.code
finally:
self.stop()
return status, signo
def wait(self, ready_callback=None):
systemd.notify_once()
while True:
self.handle_signal()
status, signo = self._wait_for_exit_or_signal(ready_callback)
if not _is_sighup_and_daemon(signo):
return status
self.restart()
class ServiceWrapper(object):
def __init__(self, service, workers):
self.service = service
self.workers = workers
self.children = set()
self.forktimes = []
class ProcessLauncher(object):
_signal_handlers_set = set()
@classmethod
def _handle_class_signals(cls, *args, **kwargs):
for handler in cls._signal_handlers_set:
handler(*args, **kwargs)
def __init__(self):
"""Constructor."""
self.children = {}
self.sigcaught = None
self.running = True
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
self.handle_signal()
def handle_signal(self):
self._signal_handlers_set.add(self._handle_signal)
_set_signals_handler(self._handle_class_signals)
def _handle_signal(self, signo, frame):
self.sigcaught = signo
self.running = False
# Allow the process to be killed again and die from natural causes
_set_signals_handler(signal.SIG_DFL)
def _pipe_watcher(self):
# This will block until the write end is closed when the parent
# dies unexpectedly
self.readpipe.read()
LOG.info(_LI('Parent process has died unexpectedly, exiting'))
sys.exit(1)
def _child_process_handle_signal(self):
# Setup child signal handlers differently
def _sigterm(*args):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
raise SignalExit(signal.SIGTERM)
def _sighup(*args):
signal.signal(signal.SIGHUP, signal.SIG_DFL)
raise SignalExit(signal.SIGHUP)
signal.signal(signal.SIGTERM, _sigterm)
if _sighup_supported():
signal.signal(signal.SIGHUP, _sighup)
# Block SIGINT and let the parent send us a SIGTERM
signal.signal(signal.SIGINT, signal.SIG_IGN)
def _child_wait_for_exit_or_signal(self, launcher):
status = 0
signo = 0
# NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children.
try:
launcher.wait()
except SignalExit as exc:
signame = _signo_to_signame(exc.signo)
LOG.info(_LI('Child caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_LE('Unhandled exception'))
status = 2
finally:
launcher.stop()
return status, signo
def _child_process(self, service):
self._child_process_handle_signal()
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
# Close write to ensure only parent has it open
os.close(self.writepipe)
# Create greenthread to watch for parent to close pipe
eventlet.spawn_n(self._pipe_watcher)
# Reseed random number generator
random.seed()
launcher = Launcher()
launcher.launch_service(service)
return launcher
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
# Limit ourselves to one process a second (over the period of
# number of workers * 1 second). This will allow workers to
# start up quickly but ensure we don't fork off children that
# die instantly too quickly.
if time.time() - wrap.forktimes[0] < wrap.workers:
LOG.info(_LI('Forking too fast, sleeping'))
time.sleep(1)
wrap.forktimes.pop(0)
wrap.forktimes.append(time.time())
pid = os.fork()
if pid == 0:
launcher = self._child_process(wrap.service)
while True:
self._child_process_handle_signal()
status, signo = self._child_wait_for_exit_or_signal(launcher)
if not _is_sighup_and_daemon(signo):
break
launcher.restart()
os._exit(status)
LOG.info(_LI('Started child %d'), pid)
wrap.children.add(pid)
self.children[pid] = wrap
return pid
def launch_service(self, service, workers=1):
wrap = ServiceWrapper(service, workers)
LOG.info(_LI('Starting %d workers'), wrap.workers)
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
def _wait_child(self):
try:
# Block while any of child processes have exited
pid, status = os.waitpid(0, 0)
if not pid:
return None
except OSError as exc:
if exc.errno not in (errno.EINTR, errno.ECHILD):
raise
return None
if os.WIFSIGNALED(status):
sig = os.WTERMSIG(status)
LOG.info(_LI('Child %(pid)d killed by signal %(sig)d'),
dict(pid=pid, sig=sig))
else:
code = os.WEXITSTATUS(status)
LOG.info(_LI('Child %(pid)s exited with status %(code)d'),
dict(pid=pid, code=code))
if pid not in self.children:
LOG.warning(_LW('pid %d not in child list'), pid)
return None
wrap = self.children.pop(pid)
wrap.children.remove(pid)
return wrap
def _respawn_children(self):
while self.running:
wrap = self._wait_child()
if not wrap:
continue
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
def wait(self):
"""Loop waiting on children to die and respawning as necessary."""
systemd.notify_once()
LOG.debug('Full set of CONF:')
CONF.log_opt_values(LOG, logging.DEBUG)
try:
while True:
self.handle_signal()
self._respawn_children()
# No signal means that stop was called. Don't clean up here.
if not self.sigcaught:
return
signame = _signo_to_signame(self.sigcaught)
LOG.info(_LI('Caught %s, stopping children'), signame)
if not _is_sighup_and_daemon(self.sigcaught):
break
for pid in self.children:
os.kill(pid, signal.SIGHUP)
self.running = True
self.sigcaught = None
except eventlet.greenlet.GreenletExit:
LOG.info(_LI("Wait called after thread killed. Cleaning up."))
self.stop()
def stop(self):
"""Terminate child processes and wait on each."""
self.running = False
for pid in self.children:
try:
os.kill(pid, signal.SIGTERM)
except OSError as exc:
if exc.errno != errno.ESRCH:
raise
# Wait for children to die
if self.children:
LOG.info(_LI('Waiting on %d children to exit'), len(self.children))
while self.children:
self._wait_child()
class Service(object):
"""Service object for binaries running on hosts."""
def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup(threads)
# signal that the service is done shutting itself down:
self._done = event.Event()
def reset(self):
# NOTE(Fengqian): docs for Event.reset() recommend against using it
self._done = event.Event()
def start(self):
pass
def stop(self, graceful=False):
self.tg.stop(graceful)
self.tg.wait()
# Signal that service cleanup is done:
if not self._done.ready():
self._done.send()
def wait(self):
self._done.wait()
class Services(object):
def __init__(self):
self.services = []
self.tg = threadgroup.ThreadGroup()
self.done = event.Event()
def add(self, service):
self.services.append(service)
self.tg.add_thread(self.run_service, service, self.done)
def stop(self):
# wait for graceful shutdown of services:
for service in self.services:
service.stop()
service.wait()
# Each service has performed cleanup, now signal that the run_service
# wrapper threads can now die:
if not self.done.ready():
self.done.send()
# reap threads:
self.tg.stop()
def wait(self):
self.tg.wait()
def restart(self):
self.stop()
self.done = event.Event()
for restart_service in self.services:
restart_service.reset()
self.tg.add_thread(self.run_service, restart_service, self.done)
@staticmethod
def run_service(service, done):
"""Service start wrapper.
:param service: service to run
:param done: event to wait on until a shutdown is triggered
:returns: None
"""
service.start()
done.wait()
def launch(service, workers=1):
if workers is None or workers == 1:
launcher = ServiceLauncher()
launcher.launch_service(service)
else:
launcher = ProcessLauncher()
launcher.launch_service(service, workers=workers)
return launcher

View File

@ -1,105 +0,0 @@
# Copyright 2012-2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Helper module for systemd service readiness notification.
"""
import logging
import os
import socket
import sys
LOG = logging.getLogger(__name__)
def _abstractify(socket_name):
if socket_name.startswith('@'):
# abstract namespace socket
socket_name = '\0%s' % socket_name[1:]
return socket_name
def _sd_notify(unset_env, msg):
notify_socket = os.getenv('NOTIFY_SOCKET')
if notify_socket:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
try:
sock.connect(_abstractify(notify_socket))
sock.sendall(msg)
if unset_env:
del os.environ['NOTIFY_SOCKET']
except EnvironmentError:
LOG.debug("Systemd notification failed", exc_info=True)
finally:
sock.close()
def notify():
"""Send notification to Systemd that service is ready.
For details see
http://www.freedesktop.org/software/systemd/man/sd_notify.html
"""
_sd_notify(False, 'READY=1')
def notify_once():
"""Send notification once to Systemd that service is ready.
Systemd sets NOTIFY_SOCKET environment variable with the name of the
socket listening for notifications from services.
This method removes the NOTIFY_SOCKET environment variable to ensure
notification is sent only once.
"""
_sd_notify(True, 'READY=1')
def onready(notify_socket, timeout):
"""Wait for systemd style notification on the socket.
:param notify_socket: local socket address
:type notify_socket: string
:param timeout: socket timeout
:type timeout: float
:returns: 0 service ready
1 service not ready
2 timeout occurred
"""
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.settimeout(timeout)
sock.bind(_abstractify(notify_socket))
try:
msg = sock.recv(512)
except socket.timeout:
return 2
finally:
sock.close()
if 'READY=1' in msg:
return 0
else:
return 1
if __name__ == '__main__':
# simple CLI for testing
if len(sys.argv) == 1:
notify()
elif len(sys.argv) >= 2:
timeout = float(sys.argv[1])
notify_socket = os.getenv('NOTIFY_SOCKET')
if notify_socket:
retval = onready(notify_socket, timeout)
sys.exit(retval)

View File

@ -1,149 +0,0 @@
# Copyright 2012 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
import eventlet
from eventlet import greenpool
from faafo.openstack.common import loopingcall
LOG = logging.getLogger(__name__)
def _thread_done(gt, *args, **kwargs):
"""Callback function to be passed to GreenThread.link() when we spawn()
Calls the :class:`ThreadGroup` to notify if.
"""
kwargs['group'].thread_done(kwargs['thread'])
class Thread(object):
"""Wrapper around a greenthread, that holds a reference to the
:class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
it has done so it can be removed from the threads list.
"""
def __init__(self, thread, group):
self.thread = thread
self.thread.link(_thread_done, group=group, thread=self)
def stop(self):
self.thread.kill()
def wait(self):
return self.thread.wait()
def link(self, func, *args, **kwargs):
self.thread.link(func, *args, **kwargs)
class ThreadGroup(object):
"""The point of the ThreadGroup class is to:
* keep track of timers and greenthreads (making it easier to stop them
when need be).
* provide an easy API to add timers.
"""
def __init__(self, thread_pool_size=10):
self.pool = greenpool.GreenPool(thread_pool_size)
self.threads = []
self.timers = []
def add_dynamic_timer(self, callback, initial_delay=None,
periodic_interval_max=None, *args, **kwargs):
timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs)
timer.start(initial_delay=initial_delay,
periodic_interval_max=periodic_interval_max)
self.timers.append(timer)
def add_timer(self, interval, callback, initial_delay=None,
*args, **kwargs):
pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs)
pulse.start(interval=interval,
initial_delay=initial_delay)
self.timers.append(pulse)
def add_thread(self, callback, *args, **kwargs):
gt = self.pool.spawn(callback, *args, **kwargs)
th = Thread(gt, self)
self.threads.append(th)
return th
def thread_done(self, thread):
self.threads.remove(thread)
def _stop_threads(self):
current = threading.current_thread()
# Iterate over a copy of self.threads so thread_done doesn't
# modify the list while we're iterating
for x in self.threads[:]:
if x is current:
# don't kill the current thread.
continue
try:
x.stop()
except eventlet.greenlet.GreenletExit:
pass
except Exception as ex:
LOG.exception(ex)
def stop_timers(self):
for x in self.timers:
try:
x.stop()
except Exception as ex:
LOG.exception(ex)
self.timers = []
def stop(self, graceful=False):
"""stop function has the option of graceful=True/False.
* In case of graceful=True, wait for all threads to be finished.
Never kill threads.
* In case of graceful=False, kill threads immediately.
"""
self.stop_timers()
if graceful:
# In case of graceful=True, wait for all threads to be
# finished, never kill threads
self.wait()
else:
# In case of graceful=False(Default), kill threads
# immediately
self._stop_threads()
def wait(self):
for x in self.timers:
try:
x.wait()
except eventlet.greenlet.GreenletExit:
pass
except Exception as ex:
LOG.exception(ex)
current = threading.current_thread()
# Iterate over a copy of self.threads so thread_done doesn't
# modify the list while we're iterating
for x in self.threads[:]:
if x is current:
continue
try:
x.wait()
except eventlet.greenlet.GreenletExit:
pass
except Exception as ex:
LOG.exception(ex)

32
faafo/queues.py Normal file
View File

@ -0,0 +1,32 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import kombu
from oslo_config import cfg
task_exchange = kombu.Exchange('tasks', type='direct')
task_queue = kombu.Queue('normal', task_exchange, routing_key='normal')
queues_opts = [
cfg.StrOpt('transport-url',
default='amqp://guest:guest@localhost:5672//',
help='AMQP connection URL.')
]
cfg.CONF.register_opts(queues_opts)
def list_opts():
"""Entry point for oslo-config-generator."""
return [(None, copy.deepcopy(queues_opts))]

View File

@ -14,9 +14,6 @@
# based on http://code.activestate.com/recipes/577120-julia-fractals/
import eventlet
eventlet.monkey_patch()
import base64
import copy
import hashlib
@ -24,15 +21,15 @@ import json
import os
from PIL import Image
import random
import socket
import tempfile
import time
from kombu.mixins import ConsumerMixin
from oslo_config import cfg
from oslo_log import log
import oslo_messaging as messaging
import requests
from faafo import queues
LOG = log.getLogger('faafo.worker')
CONF = cfg.CONF
@ -102,9 +99,17 @@ class JuliaSet(object):
return (c, z)
class WorkerEndpoint(object):
class Worker(ConsumerMixin):
def process(self, ctxt, task):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(queues=queues.task_queue,
accept=['json'],
callbacks=[self.process])]
def process(self, task, message):
LOG.info("processing task %s" % task['uuid'])
LOG.debug(task)
start_time = time.time()
@ -145,16 +150,5 @@ class WorkerEndpoint(object):
(CONF.endpoint_url, str(task['uuid'])),
json.dumps(result), headers=headers)
message.ack()
return result
def get_server():
transport = messaging.get_transport(CONF)
target = messaging.Target(topic='tasks', server=socket.gethostname())
endpoints = [
WorkerEndpoint()
]
server = messaging.get_rpc_server(transport, target, endpoints,
executor='eventlet')
return server

View File

@ -1,8 +0,0 @@
[DEFAULT]
# The list of modules to copy from oslo-incubator.git
module=periodic_task
module=service
# The base module to hold the copy of openstack.common
base=faafo

View File

@ -10,5 +10,5 @@ flask-restless
flask-sqlalchemy
oslo.config>=1.9.3,<1.10.0 # Apache-2.0
oslo.log>=1.0.0,<1.1.0 # Apache-2.0
oslo.messaging>=1.8.0,<1.9.0 # Apache-2.0
PrettyTable>=0.7,<0.8
kombu>=2.5.0

View File

@ -38,6 +38,7 @@ console_scripts =
oslo.config.opts =
faafo.api = faafo.api.service:list_opts
faafo.worker = faafo.worker.service:list_opts
faafo.queues= faafo.queues:list_opts
[build_sphinx]
source-dir = doc/source