# Copyright (C) 2015 Cisco Systems, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import threading from oslo_messaging._drivers import base from oslo_messaging._drivers import common as driver_common from oslo_messaging._drivers import pool as driver_pool from oslo_messaging._i18n import _LE from oslo_messaging._i18n import _LW from oslo_serialization import jsonutils import kafka from kafka.common import KafkaError from oslo_config import cfg from oslo_log import log as logging LOG = logging.getLogger(__name__) PURPOSE_SEND = 'send' PURPOSE_LISTEN = 'listen' kafka_opts = [ cfg.StrOpt('kafka_default_host', default='localhost', help='Default Kafka broker Host'), cfg.PortOpt('kafka_default_port', default=9092, help='Default Kafka broker Port'), cfg.IntOpt('kafka_max_fetch_bytes', default=1024 * 1024, help='Max fetch bytes of Kafka consumer'), cfg.IntOpt('kafka_consumer_timeout', default=1.0, help='Default timeout(s) for Kafka consumers'), cfg.IntOpt('pool_size', default=10, help='Pool Size for Kafka Consumers'), ] CONF = cfg.CONF def pack_context_with_message(ctxt, msg): """Pack context into msg.""" if isinstance(ctxt, dict): context_d = ctxt else: context_d = ctxt.to_dict() return {'message': msg, 'context': context_d} def target_to_topic(target, priority=None): """Convert target into topic string :param target: Message destination target :type target: oslo_messaging.Target :param priority: Notification priority :type priority: string """ if not priority: return target.topic return target.topic + '.' + priority class Connection(object): def __init__(self, conf, url, purpose): driver_conf = conf.oslo_messaging_kafka self.conf = conf self.kafka_client = None self.producer = None self.consumer = None self.fetch_messages_max_bytes = driver_conf.kafka_max_fetch_bytes self.consumer_timeout = float(driver_conf.kafka_consumer_timeout) self.url = url self._parse_url() # TODO(Support for manual/auto_commit functionality) # When auto_commit is False, consumer can manually notify # the completion of the subscription. # Currently we don't support for non auto commit option self.auto_commit = True self._consume_loop_stopped = False def _parse_url(self): driver_conf = self.conf.oslo_messaging_kafka try: self.host = self.url.hosts[0].hostname except (NameError, IndexError): self.host = driver_conf.kafka_default_host try: self.port = self.url.hosts[0].port except (NameError, IndexError): self.port = driver_conf.kafka_default_port if self.host is None: self.host = driver_conf.kafka_default_host if self.port is None: self.port = driver_conf.kafka_default_port def notify_send(self, topic, ctxt, msg, retry): """Send messages to Kafka broker. :param topic: String of the topic :param ctxt: context for the messages :param msg: messages for publishing :param retry: the number of retry """ message = pack_context_with_message(ctxt, msg) self._ensure_connection() self._send_and_retry(message, topic, retry) def _send_and_retry(self, message, topic, retry): current_retry = 0 if not isinstance(message, str): message = jsonutils.dumps(message) while message is not None: try: self._send(message, topic) message = None except Exception: LOG.warning(_LW("Failed to publish a message of topic %s"), topic) current_retry += 1 if retry is not None and current_retry >= retry: LOG.exception(_LE("Failed to retry to send data " "with max retry times")) message = None def _send(self, message, topic): self.producer.send_messages(topic, message) def consume(self, timeout=None): """Receive up to 'max_fetch_messages' messages. :param timeout: poll timeout in seconds """ duration = (self.consumer_timeout if timeout is None else timeout) timer = driver_common.DecayingTimer(duration=duration) timer.start() def _raise_timeout(): LOG.debug('Timed out waiting for Kafka response') raise driver_common.Timeout() poll_timeout = (self.consumer_timeout if timeout is None else min(timeout, self.consumer_timeout)) while True: if self._consume_loop_stopped: return try: next_timeout = poll_timeout * 1000.0 # TODO(use configure() method instead) # Currently KafkaConsumer does not support for # the case of updating only fetch_max_wait_ms parameter self.consumer._config['fetch_max_wait_ms'] = next_timeout messages = list(self.consumer.fetch_messages()) except Exception as e: LOG.exception(_LE("Failed to consume messages: %s"), e) messages = None if not messages: poll_timeout = timer.check_return( _raise_timeout, maximum=self.consumer_timeout) continue return messages def stop_consuming(self): self._consume_loop_stopped = True def reset(self): """Reset a connection so it can be used again.""" if self.kafka_client: self.kafka_client.close() self.kafka_client = None if self.producer: self.producer.stop() self.producer = None self.consumer = None def close(self): if self.kafka_client: self.kafka_client.close() self.kafka_client = None if self.producer: self.producer.stop() self.consumer = None def commit(self): """Commit is used by subscribers belonging to the same group. After subscribing messages, commit is called to prevent the other subscribers which belong to the same group from re-subscribing the same messages. Currently self.auto_commit option is always True, so we don't need to call this function. """ self.consumer.commit() def _ensure_connection(self): if self.kafka_client: return try: self.kafka_client = kafka.KafkaClient( "%s:%s" % (self.host, str(self.port))) self.producer = kafka.SimpleProducer(self.kafka_client) except KafkaError as e: LOG.exception(_LE("Kafka Connection is not available: %s"), e) self.kafka_client = None def declare_topic_consumer(self, topics, group=None): self._ensure_connection() for topic in topics: self.kafka_client.ensure_topic_exists(topic) self.consumer = kafka.KafkaConsumer( *topics, group_id=group, bootstrap_servers=["%s:%s" % (self.host, str(self.port))], fetch_message_max_bytes=self.fetch_messages_max_bytes) class OsloKafkaMessage(base.RpcIncomingMessage): def __init__(self, ctxt, message): super(OsloKafkaMessage, self).__init__(ctxt, message) def requeue(self): LOG.warning(_LW("requeue is not supported")) def reply(self, reply=None, failure=None, log_failure=True): LOG.warning(_LW("reply is not supported")) class KafkaListener(base.Listener): def __init__(self, conn): super(KafkaListener, self).__init__() self._stopped = threading.Event() self.conn = conn self.incoming_queue = [] @base.batch_poll_helper def poll(self, timeout=None): while not self._stopped.is_set(): if self.incoming_queue: return self.incoming_queue.pop(0) try: messages = self.conn.consume(timeout=timeout) for msg in messages: message = msg.value LOG.debug('poll got message : %s', message) message = jsonutils.loads(message) self.incoming_queue.append(OsloKafkaMessage( ctxt=message['context'], message=message['message'])) except driver_common.Timeout: return None def stop(self): self._stopped.set() self.conn.stop_consuming() def cleanup(self): self.conn.close() def commit(self): # TODO(Support for manually/auto commit functionality) # It's better to allow users to commit manually and support for # self.auto_commit = False option. For now, this commit function # is meaningless since user couldn't call this function and # auto_commit option is always True. self.conn.commit() class KafkaDriver(base.BaseDriver): """Note: Current implementation of this driver is experimental. We will have functional and/or integrated testing enabled for this driver. """ def __init__(self, conf, url, default_exchange=None, allowed_remote_exmods=None): opt_group = cfg.OptGroup(name='oslo_messaging_kafka', title='Kafka driver options') conf.register_group(opt_group) conf.register_opts(kafka_opts, group=opt_group) super(KafkaDriver, self).__init__( conf, url, default_exchange, allowed_remote_exmods) self.connection_pool = driver_pool.ConnectionPool( self.conf, self.conf.oslo_messaging_kafka.pool_size, self._url, Connection) self.listeners = [] def cleanup(self): for c in self.listeners: c.close() self.listeners = [] def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, retry=None): raise NotImplementedError( 'The RPC implementation for Kafka is not implemented') def send_notification(self, target, ctxt, message, version, retry=None): """Send notification to Kafka brokers :param target: Message destination target :type target: oslo_messaging.Target :param ctxt: Message context :type ctxt: dict :param message: Message payload to pass :type message: dict :param version: Messaging API version (currently not used) :type version: str :param retry: an optional default kafka consumer retries configuration None means to retry forever 0 means no retry N means N retries :type retry: int """ with self._get_connection(purpose=PURPOSE_SEND) as conn: conn.notify_send(target_to_topic(target), ctxt, message, retry) def listen(self, target): raise NotImplementedError( 'The RPC implementation for Kafka is not implemented') def listen_for_notifications(self, targets_and_priorities, pool=None): """Listen to a specified list of targets on Kafka brokers :param targets_and_priorities: List of pairs (target, priority) priority is not used for kafka driver target.exchange_target.topic is used as a kafka topic :type targets_and_priorities: list :param pool: consumer group of Kafka consumers :type pool: string """ conn = self._get_connection(purpose=PURPOSE_LISTEN) topics = set() for target, priority in targets_and_priorities: topics.add(target_to_topic(target, priority)) conn.declare_topic_consumer(topics, pool) listener = KafkaListener(conn) return listener def _get_connection(self, purpose): return driver_common.ConnectionContext(self.connection_pool, purpose)