Enable the ability to run multiple consumer processes at once
Change-Id: Ibed328d0b2bc11206d9975e84ce957305f8b973a
This commit is contained in:
parent
22e18453dd
commit
7c77ad3e44
@ -35,6 +35,7 @@ buffer_size = 4096
|
||||
max_buffer_size = 32768
|
||||
# Path in zookeeper for kafka consumer group partitioning algo
|
||||
zookeeper_path = /persister_partitions/alarm-state-transitions
|
||||
num_processors = 1
|
||||
|
||||
[kafka_metrics]
|
||||
# Comma separated list of Kafka broker host:port
|
||||
@ -52,6 +53,7 @@ buffer_size = 4096
|
||||
max_buffer_size = 32768
|
||||
# Path in zookeeper for kafka consumer group partitioning algo
|
||||
zookeeper_path = /persister_partitions/metrics
|
||||
num_processors = 1
|
||||
|
||||
[influxdb]
|
||||
database_name = mon
|
||||
|
@ -21,8 +21,11 @@
|
||||
Start the perister as stand-alone process by running 'persister.py
|
||||
--config-file <config file>'
|
||||
"""
|
||||
|
||||
import multiprocessing
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
|
||||
import simport
|
||||
from oslo_config import cfg
|
||||
@ -49,7 +52,8 @@ kafka_common_opts = [cfg.StrOpt('uri'),
|
||||
cfg.IntOpt('fetch_size_bytes'),
|
||||
cfg.IntOpt('buffer_size'),
|
||||
cfg.IntOpt('max_buffer_size'),
|
||||
cfg.StrOpt('zookeeper_path')]
|
||||
cfg.StrOpt('zookeeper_path'),
|
||||
cfg.IntOpt('num_processors')]
|
||||
|
||||
kafka_metrics_opts = kafka_common_opts
|
||||
kafka_alarm_history_opts = kafka_common_opts
|
||||
@ -71,6 +75,57 @@ repositories_group = cfg.OptGroup(name='repositories', title='repositories')
|
||||
cfg.CONF.register_group(repositories_group)
|
||||
cfg.CONF.register_opts(repositories_opts, repositories_group)
|
||||
|
||||
processors = [] # global list to facilitate clean signal handling
|
||||
exiting = False
|
||||
|
||||
|
||||
def clean_exit(signum, frame=None):
|
||||
"""Exit all processes attempting to finish uncommited active work before exit.
|
||||
Can be called on an os signal or no zookeeper losing connection.
|
||||
"""
|
||||
global exiting
|
||||
if exiting:
|
||||
# Since this is set up as a handler for SIGCHLD when this kills one
|
||||
# child it gets another signal, the global exiting avoids this running
|
||||
# multiple times.
|
||||
LOG.debug('Exit in progress clean_exit received additional signal %s' % signum)
|
||||
return
|
||||
|
||||
LOG.info('Received signal %s, beginning graceful shutdown.' % signum)
|
||||
exiting = True
|
||||
wait_for_exit = False
|
||||
|
||||
for process in processors:
|
||||
try:
|
||||
if process.is_alive():
|
||||
process.terminate() # Sends sigterm which any processes after a notification is sent attempt to handle
|
||||
wait_for_exit = True
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# wait for a couple seconds to give the subprocesses a chance to shut down correctly.
|
||||
if wait_for_exit:
|
||||
time.sleep(2)
|
||||
|
||||
# Kill everything, that didn't already die
|
||||
for child in multiprocessing.active_children():
|
||||
LOG.debug('Killing pid %s' % child.pid)
|
||||
try:
|
||||
os.kill(child.pid, signal.SIGKILL)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if signum == signal.SIGTERM:
|
||||
sys.exit(0)
|
||||
|
||||
sys.exit(signum)
|
||||
|
||||
|
||||
def start_process(respository, kafka_config):
|
||||
LOG.info("start process: {}".format(respository))
|
||||
persister = Persister(kafka_config, cfg.CONF.zookeeper, respository)
|
||||
persister.run()
|
||||
|
||||
|
||||
def main():
|
||||
log.register_options(cfg.CONF)
|
||||
@ -78,26 +133,24 @@ def main():
|
||||
cfg.CONF(sys.argv[1:], project='monasca', prog='persister')
|
||||
log.setup(cfg.CONF, "monasca-persister")
|
||||
|
||||
"""Start persister.
|
||||
|
||||
Start metric persister and alarm persister in separate threads.
|
||||
"""
|
||||
"""Start persister."""
|
||||
|
||||
metric_repository = simport.load(cfg.CONF.repositories.metrics_driver)
|
||||
alarm_state_history_repository = simport.load(cfg.CONF.repositories.alarm_state_history_driver)
|
||||
|
||||
metric_persister = Persister(cfg.CONF.kafka_metrics,
|
||||
cfg.CONF.zookeeper,
|
||||
metric_repository)
|
||||
# Add processors for metrics topic
|
||||
for proc in range(0, cfg.CONF.kafka_metrics.num_processors):
|
||||
processors.append(multiprocessing.Process(
|
||||
target=start_process, args=(metric_repository, cfg.CONF.kafka_metrics)))
|
||||
|
||||
alarm_persister = Persister(cfg.CONF.kafka_alarm_history,
|
||||
cfg.CONF.zookeeper,
|
||||
alarm_state_history_repository)
|
||||
# Add processors for alarm history topic
|
||||
for proc in range(0, cfg.CONF.kafka_alarm_history.num_processors):
|
||||
processors.append(multiprocessing.Process(
|
||||
target=start_process, args=(alarm_state_history_repository, cfg.CONF.kafka_alarm_history)))
|
||||
|
||||
metric_persister.start()
|
||||
alarm_persister.start()
|
||||
|
||||
LOG.info('''
|
||||
# Start
|
||||
try:
|
||||
LOG.info('''
|
||||
|
||||
_____
|
||||
/ \ ____ ____ _____ ______ ____ _____
|
||||
@ -113,8 +166,21 @@ def main():
|
||||
\/ \/ \/ \/
|
||||
|
||||
''')
|
||||
for process in processors:
|
||||
process.start()
|
||||
|
||||
LOG.info('Monasca Persister has started successfully!')
|
||||
# The signal handlers must be added after the processes start otherwise
|
||||
# they run on all processes
|
||||
signal.signal(signal.SIGCHLD, clean_exit)
|
||||
signal.signal(signal.SIGINT, clean_exit)
|
||||
signal.signal(signal.SIGTERM, clean_exit)
|
||||
|
||||
while True:
|
||||
time.sleep(10)
|
||||
|
||||
except Exception:
|
||||
LOG.exception('Error! Exiting.')
|
||||
clean_exit(signal.SIGKILL)
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
|
@ -13,7 +13,6 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import os
|
||||
import threading
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
@ -22,12 +21,10 @@ from monasca_common.kafka.consumer import KafkaConsumer
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class Persister(threading.Thread):
|
||||
class Persister(object):
|
||||
|
||||
def __init__(self, kafka_conf, zookeeper_conf, repository):
|
||||
|
||||
super(Persister, self).__init__()
|
||||
|
||||
self._data_points = []
|
||||
|
||||
self._kafka_topic = kafka_conf.topic
|
||||
|
Loading…
x
Reference in New Issue
Block a user