diff --git a/.gitignore b/.gitignore index 21f4d96b..6249bf6a 100755 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,5 @@ dist cover/ .coverage .testrepository/ +.venv +.stestr \ No newline at end of file diff --git a/monasca_common/kafka_lib/__init__.py b/monasca_common/kafka_lib/__init__.py index 8dd0a282..d249c8fd 100644 --- a/monasca_common/kafka_lib/__init__.py +++ b/monasca_common/kafka_lib/__init__.py @@ -1,8 +1,21 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + # __title__ = 'kafka' from .version import __version__ __author__ = 'David Arthur' __license__ = 'Apache License 2.0' __copyright__ = 'Copyright 2015, David Arthur under Apache License, v2.0' +__version__ = __version__ # from monasca_common.kafka_lib.client import KafkaClient # from monasca_common.kafka_lib.conn import KafkaConnection @@ -10,7 +23,8 @@ __copyright__ = 'Copyright 2015, David Arthur under Apache License, v2.0' # create_message, create_gzip_message, create_snappy_message # ) # from monasca_common.kafka_lib.producer import SimpleProducer, KeyedProducer -# from monasca_common.kafka_lib.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner +# from monasca_common.kafka_lib.partitioner import RoundRobinPartitioner, +# HashedPartitioner, Murmur2Partitioner # from monasca_common.kafka_lib.consumer import SimpleConsumer, MultiProcessConsumer, KafkaConsumer # # __all__ = [ diff --git a/monasca_common/kafka_lib/client.py b/monasca_common/kafka_lib/client.py index f179ba2f..e27bddbe 100644 --- a/monasca_common/kafka_lib/client.py +++ b/monasca_common/kafka_lib/client.py @@ -1,3 +1,15 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + import collections import copy import functools @@ -7,12 +19,13 @@ import time import monasca_common.kafka_lib.common as kafka_common from monasca_common.kafka_lib.common import (TopicAndPartition, BrokerMetadata, - ConnectionError, FailedPayloadsError, - KafkaTimeoutError, KafkaUnavailableError, - LeaderNotAvailableError, UnknownTopicOrPartitionError, - NotLeaderForPartitionError, ReplicaNotAvailableError) + ConnectionError, FailedPayloadsError, + KafkaTimeoutError, KafkaUnavailableError, + LeaderNotAvailableError, UnknownTopicOrPartitionError, + NotLeaderForPartitionError, ReplicaNotAvailableError) -from monasca_common.kafka_lib.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS +from monasca_common.kafka_lib.conn import (collect_hosts, KafkaConnection, + DEFAULT_SOCKET_TIMEOUT_SECONDS) from monasca_common.kafka_lib.protocol import KafkaProtocol from monasca_common.kafka_lib.util import kafka_bytestring @@ -44,7 +57,6 @@ class KafkaClient(object): self.load_metadata_for_topics() # bootstrap with all metadata - ################## # Private API # ################## @@ -121,7 +133,7 @@ class KafkaClient(object): def _next_id(self): """Generate a new correlation id""" # modulo to keep w/i int32 - self.correlation_id = (self.correlation_id + 1) % 2**31 + self.correlation_id = (self.correlation_id + 1) % 2 ** 31 return self.correlation_id def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn): @@ -421,10 +433,7 @@ class KafkaClient(object): def has_metadata_for_topic(self, topic): topic = kafka_bytestring(topic) - return ( - topic in self.topic_partitions - and len(self.topic_partitions[topic]) > 0 - ) + return topic in self.topic_partitions and len(self.topic_partitions[topic]) > 0 def get_partition_ids_for_topic(self, topic): topic = kafka_bytestring(topic) @@ -437,7 +446,7 @@ class KafkaClient(object): def topics(self): return list(self.topic_partitions.keys()) - def ensure_topic_exists(self, topic, timeout = 30): + def ensure_topic_exists(self, topic, timeout=30): start_time = time.time() while not self.has_metadata_for_topic(topic): @@ -536,7 +545,8 @@ class KafkaClient(object): # this error code is provided for admin purposes only # we never talk to replicas, only the leader except ReplicaNotAvailableError: - log.debug('Some (non-leader) replicas not available for topic %s partition %d', topic, partition) + log.debug('Some (non-leader) replicas not available for topic %s partition %d', + topic, partition) # If Known Broker, topic_partition -> BrokerMetadata if leader in self.brokers: @@ -623,8 +633,8 @@ class KafkaClient(object): """ encoder = functools.partial(KafkaProtocol.encode_fetch_request, - max_wait_time=max_wait_time, - min_bytes=min_bytes) + max_wait_time=max_wait_time, + min_bytes=min_bytes) resps = self._send_broker_aware_request( payloads, encoder, @@ -645,8 +655,7 @@ class KafkaClient(object): def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None): - encoder = functools.partial(KafkaProtocol.encode_offset_commit_request, - group=group) + encoder = functools.partial(KafkaProtocol.encode_offset_commit_request, group=group) decoder = KafkaProtocol.decode_offset_commit_response resps = self._send_broker_aware_request(payloads, encoder, decoder) @@ -656,8 +665,7 @@ class KafkaClient(object): def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None): - encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request, - group=group) + encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request, group=group) decoder = KafkaProtocol.decode_offset_fetch_response resps = self._send_broker_aware_request(payloads, encoder, decoder) @@ -665,10 +673,10 @@ class KafkaClient(object): if not fail_on_error or not self._raise_on_response_error(resp)] def send_offset_fetch_request_kafka(self, group, payloads=[], - fail_on_error=True, callback=None): + fail_on_error=True, callback=None): encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request, - group=group, from_kafka=True) + group=group, from_kafka=True) decoder = KafkaProtocol.decode_offset_fetch_response resps = self._send_consumer_aware_request(group, payloads, encoder, decoder) diff --git a/monasca_common/kafka_lib/codec.py b/monasca_common/kafka_lib/codec.py index a9373c72..211e1562 100644 --- a/monasca_common/kafka_lib/codec.py +++ b/monasca_common/kafka_lib/codec.py @@ -1,3 +1,15 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + import gzip from io import BytesIO import struct @@ -81,7 +93,7 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024): if xerial_compatible: def _chunker(): for i in xrange(0, len(payload), xerial_blocksize): - yield payload[i:i+xerial_blocksize] + yield payload[i:i + xerial_blocksize] out = BytesIO() diff --git a/monasca_common/kafka_lib/common.py b/monasca_common/kafka_lib/common.py index a7d81644..dc46e8e4 100644 --- a/monasca_common/kafka_lib/common.py +++ b/monasca_common/kafka_lib/common.py @@ -1,87 +1,81 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from collections import namedtuple import inspect import sys -from collections import namedtuple ############### # Structs # ############### # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI -MetadataRequest = namedtuple("MetadataRequest", - ["topics"]) +MetadataRequest = namedtuple("MetadataRequest", ["topics"]) -MetadataResponse = namedtuple("MetadataResponse", - ["brokers", "topics"]) +MetadataResponse = namedtuple("MetadataResponse", ["brokers", "topics"]) # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ConsumerMetadataRequest -ConsumerMetadataRequest = namedtuple("ConsumerMetadataRequest", - ["groups"]) +ConsumerMetadataRequest = namedtuple("ConsumerMetadataRequest", ["groups"]) ConsumerMetadataResponse = namedtuple("ConsumerMetadataResponse", - ["error", "nodeId", "host", "port"]) + ["error", "nodeId", "host", "port"]) # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI -ProduceRequest = namedtuple("ProduceRequest", - ["topic", "partition", "messages"]) +ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"]) -ProduceResponse = namedtuple("ProduceResponse", - ["topic", "partition", "error", "offset"]) +ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI -FetchRequest = namedtuple("FetchRequest", - ["topic", "partition", "offset", "max_bytes"]) +FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "max_bytes"]) FetchResponse = namedtuple("FetchResponse", - ["topic", "partition", "error", "highwaterMark", "messages"]) + ["topic", "partition", "error", "highwaterMark", "messages"]) # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI -OffsetRequest = namedtuple("OffsetRequest", - ["topic", "partition", "time", "max_offsets"]) +OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "max_offsets"]) -OffsetResponse = namedtuple("OffsetResponse", - ["topic", "partition", "error", "offsets"]) +OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offsets"]) # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI OffsetCommitRequest = namedtuple("OffsetCommitRequest", - ["topic", "partition", "offset", "metadata"]) + ["topic", "partition", "offset", "metadata"]) -OffsetCommitResponse = namedtuple("OffsetCommitResponse", - ["topic", "partition", "error"]) +OffsetCommitResponse = namedtuple("OffsetCommitResponse", ["topic", "partition", "error"]) -OffsetFetchRequest = namedtuple("OffsetFetchRequest", - ["topic", "partition"]) +OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"]) OffsetFetchResponse = namedtuple("OffsetFetchResponse", - ["topic", "partition", "offset", "metadata", "error"]) - + ["topic", "partition", "offset", "metadata", "error"]) # Other useful structs -BrokerMetadata = namedtuple("BrokerMetadata", - ["nodeId", "host", "port"]) +BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"]) -TopicMetadata = namedtuple("TopicMetadata", - ["topic", "error", "partitions"]) +TopicMetadata = namedtuple("TopicMetadata", ["topic", "error", "partitions"]) PartitionMetadata = namedtuple("PartitionMetadata", - ["topic", "partition", "leader", "replicas", "isr", "error"]) + ["topic", "partition", "leader", "replicas", "isr", "error"]) -OffsetAndMessage = namedtuple("OffsetAndMessage", - ["offset", "message"]) +OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"]) -Message = namedtuple("Message", - ["magic", "attributes", "key", "value"]) +Message = namedtuple("Message", ["magic", "attributes", "key", "value"]) -TopicAndPartition = namedtuple("TopicAndPartition", - ["topic", "partition"]) +TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) -KafkaMessage = namedtuple("KafkaMessage", - ["topic", "partition", "offset", "key", "value"]) +KafkaMessage = namedtuple("KafkaMessage", ["topic", "partition", "offset", "key", "value"]) # Define retry policy for async producer # Limit value: int >= 0, 0 means no retries -RetryOptions = namedtuple("RetryOptions", - ["limit", "backoff_ms", "retry_on_timeouts"]) +RetryOptions = namedtuple("RetryOptions", ["limit", "backoff_ms", "retry_on_timeouts"]) ################# @@ -240,7 +234,8 @@ class AsyncProducerQueueFull(KafkaError): def _iter_broker_errors(): for name, obj in inspect.getmembers(sys.modules[__name__]): - if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError: + if inspect.isclass(obj) and issubclass(obj, + BrokerResponseError) and obj != BrokerResponseError: yield obj diff --git a/monasca_common/kafka_lib/conn.py b/monasca_common/kafka_lib/conn.py index b520b0fa..0f39db2e 100644 --- a/monasca_common/kafka_lib/conn.py +++ b/monasca_common/kafka_lib/conn.py @@ -1,3 +1,15 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + import copy import logging from random import shuffle diff --git a/monasca_common/kafka_lib/consumer/__init__.py b/monasca_common/kafka_lib/consumer/__init__.py index 935f56e1..e7a322af 100644 --- a/monasca_common/kafka_lib/consumer/__init__.py +++ b/monasca_common/kafka_lib/consumer/__init__.py @@ -1,6 +1,6 @@ -from .simple import SimpleConsumer -from .multiprocess import MultiProcessConsumer from .kafka import KafkaConsumer +from .multiprocess import MultiProcessConsumer +from .simple import SimpleConsumer __all__ = [ 'SimpleConsumer', 'MultiProcessConsumer', 'KafkaConsumer' diff --git a/monasca_common/kafka_lib/consumer/base.py b/monasca_common/kafka_lib/consumer/base.py index d14a0f8d..06098c59 100644 --- a/monasca_common/kafka_lib/consumer/base.py +++ b/monasca_common/kafka_lib/consumer/base.py @@ -1,3 +1,15 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + from __future__ import absolute_import import atexit @@ -5,7 +17,6 @@ import logging import numbers from threading import Lock -import monasca_common.kafka_lib.common as kafka_common from monasca_common.kafka_lib.common import ( OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, UnknownTopicOrPartitionError, check_error, KafkaError @@ -31,6 +42,7 @@ FULL_QUEUE_WAIT_TIME_SECONDS = 0.1 MAX_BACKOFF_SECONDS = 60 + class Consumer(object): """ Base class to be used by other consumers. Not to be used directly @@ -148,7 +160,7 @@ class Consumer(object): partitions = list(self.offsets.keys()) log.debug('Committing new offsets for %s, partitions %s', - self.topic, partitions) + self.topic, partitions) for partition in partitions: offset = self.offsets[partition] log.debug('Commit offset %d in SimpleConsumer: ' @@ -189,7 +201,7 @@ class Consumer(object): # py3 supports unregistering if hasattr(atexit, 'unregister'): - atexit.unregister(self._cleanup_func) # pylint: disable=no-member + atexit.unregister(self._cleanup_func) # pylint: disable=no-member # py2 requires removing from private attribute... else: diff --git a/monasca_common/kafka_lib/consumer/kafka.py b/monasca_common/kafka_lib/consumer/kafka.py index e32e7abc..4cb1364a 100644 --- a/monasca_common/kafka_lib/consumer/kafka.py +++ b/monasca_common/kafka_lib/consumer/kafka.py @@ -1,3 +1,15 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + from __future__ import absolute_import from collections import namedtuple @@ -51,6 +63,7 @@ DEPRECATED_CONFIG_KEYS = { 'metadata_broker_list': 'bootstrap_servers', } + class KafkaConsumer(object): """A simpler kafka consumer""" DEFAULT_CONFIG = deepcopy(DEFAULT_CONSUMER_CONFIG) @@ -258,7 +271,8 @@ class KafkaConsumer(object): # or (2) auto reset else: - self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition) + self._offsets.fetch[topic_partition] = \ + self._reset_partition_offset(topic_partition) # highwater marks (received from server on fetch response) # and task_done (set locally by user) @@ -665,7 +679,7 @@ class KafkaConsumer(object): # Otherwise we should re-raise the upstream exception # b/c it typically includes additional data about # the request that triggered it, and we do not want to drop that - raise # pylint: disable-msg=E0704 + raise # pylint: disable-msg=E0704 (offset, ) = self.get_partition_offsets(topic, partition, request_time_ms, max_num_offsets=1) @@ -682,7 +696,8 @@ class KafkaConsumer(object): def _check_consumer_timeout(self): if self._consumer_timeout and time.time() > self._consumer_timeout: - raise ConsumerTimeout('Consumer timed out after %d ms' % + self._config['consumer_timeout_ms']) + raise ConsumerTimeout( + 'Consumer timed out after %d ms' % + self._config['consumer_timeout_ms']) # # Autocommit private methods @@ -703,7 +718,8 @@ class KafkaConsumer(object): self._uncommitted_message_count = 0 self._next_commit_time = None if self._does_auto_commit_ms(): - self._next_commit_time = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0) + self._next_commit_time = time.time() + ( + self._config['auto_commit_interval_ms'] / 1000.0) def _incr_auto_commit_message_count(self, n=1): self._uncommitted_message_count += n diff --git a/monasca_common/kafka_lib/consumer/multiprocess.py b/monasca_common/kafka_lib/consumer/multiprocess.py index 18a50144..8e22f61e 100644 --- a/monasca_common/kafka_lib/consumer/multiprocess.py +++ b/monasca_common/kafka_lib/consumer/multiprocess.py @@ -1,12 +1,24 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + from __future__ import absolute_import from collections import namedtuple import logging from multiprocessing import Process, Manager as MPManager try: - import queue # python 3 + import queue # python 3 except ImportError: - import Queue as queue # python 2 + import Queue as queue # python 2 import time from ..common import KafkaError @@ -72,7 +84,8 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options): queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS) break except queue.Full: - if events.exit.is_set(): break + if events.exit.is_set(): + break count += 1 @@ -93,9 +106,10 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options): except KafkaError as e: # Retry with exponential backoff - log.error("Problem communicating with Kafka (%s), retrying in %d seconds..." % (e, interval)) + log.error( + "Problem communicating with Kafka (%s), retrying in %d seconds..." % (e, interval)) time.sleep(interval) - interval = interval*2 if interval*2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS + interval = interval * 2 if interval * 2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS class MultiProcessConsumer(Consumer): @@ -150,9 +164,9 @@ class MultiProcessConsumer(Consumer): manager = MPManager() self.queue = manager.Queue(1024) # Child consumers dump messages into this self.events = Events( - start = manager.Event(), # Indicates the consumers to start fetch - exit = manager.Event(), # Requests the consumers to shutdown - pause = manager.Event()) # Requests the consumers to pause fetch + start=manager.Event(), # Indicates the consumers to start fetch + exit=manager.Event(), # Requests the consumers to shutdown + pause=manager.Event()) # Requests the consumers to pause fetch self.size = manager.Value('i', 0) # Indicator of number of messages to fetch # dict.keys() returns a view in py3 + it's not a thread-safe operation diff --git a/monasca_common/kafka_lib/consumer/simple.py b/monasca_common/kafka_lib/consumer/simple.py index 7c632464..cf1dbbd8 100644 --- a/monasca_common/kafka_lib/consumer/simple.py +++ b/monasca_common/kafka_lib/consumer/simple.py @@ -1,14 +1,26 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + from __future__ import absolute_import try: from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611 except ImportError: - from itertools import izip_longest as izip_longest, repeat # python 2 + from itertools import izip_longest as izip_longest, repeat # python 2 import logging try: - import queue # python 3 + import queue # python 3 except ImportError: - import Queue as queue # python 2 + import Queue as queue # python 2 import sys import time @@ -166,7 +178,7 @@ class SimpleConsumer(Consumer): # Otherwise we should re-raise the upstream exception # b/c it typically includes additional data about # the request that triggered it, and we do not want to drop that - raise # pylint: disable-msg=E0704 + raise # pylint: disable-msg=E0704 # send_offset_request log.info('Resetting topic-partition offset to %s for %s:%d', @@ -198,7 +210,7 @@ class SimpleConsumer(Consumer): If partition is None, would modify all partitions. """ - if whence is None: # set an absolute offset + if whence is None: # set an absolute offset if partition is None: for tmp_partition in self.offsets: self.offsets[tmp_partition] = offset @@ -308,6 +320,7 @@ class SimpleConsumer(Consumer): def _get_message(self, block=True, timeout=0.1, get_partition_info=None, update_offset=True): + """ If no messages can be fetched, returns None. If get_partition_info is None, it defaults to self.partition_info @@ -365,8 +378,7 @@ class SimpleConsumer(Consumer): def _fetch(self): # Create fetch request payloads for all the partitions - partitions = dict((p, self.buffer_size) - for p in self.fetch_offsets.keys()) + partitions = dict((p, self.buffer_size) for p in self.fetch_offsets.keys()) while partitions: requests = [] for partition, buffer_size in six.iteritems(partitions): @@ -416,8 +428,9 @@ class SimpleConsumer(Consumer): try: for message in resp.messages: if message.offset < self.fetch_offsets[partition]: - log.debug('Skipping message %s because its offset is less than the consumer offset', - message) + log.debug( + 'Skipping message %s because its offset is less' + ' than the consumer offset', message) continue # Put the message in our queue self.queue.put((partition, message)) diff --git a/monasca_common/kafka_lib/context.py b/monasca_common/kafka_lib/context.py index ba59a389..991ea2c4 100644 --- a/monasca_common/kafka_lib/context.py +++ b/monasca_common/kafka_lib/context.py @@ -1,3 +1,15 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + """ Context manager to commit/rollback consumer offsets. """ diff --git a/monasca_common/kafka_lib/partitioner/base.py b/monasca_common/kafka_lib/partitioner/base.py index 857f634d..50447252 100644 --- a/monasca_common/kafka_lib/partitioner/base.py +++ b/monasca_common/kafka_lib/partitioner/base.py @@ -1,3 +1,15 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + class Partitioner(object): """ diff --git a/monasca_common/kafka_lib/partitioner/hashed.py b/monasca_common/kafka_lib/partitioner/hashed.py index d5d6d27c..2c04ad86 100644 --- a/monasca_common/kafka_lib/partitioner/hashed.py +++ b/monasca_common/kafka_lib/partitioner/hashed.py @@ -1,3 +1,15 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + import six from .base import Partitioner @@ -70,13 +82,13 @@ def murmur2(key): for i in range(length4): i4 = i * 4 k = ((data[i4 + 0] & 0xff) + - ((data[i4 + 1] & 0xff) << 8) + - ((data[i4 + 2] & 0xff) << 16) + - ((data[i4 + 3] & 0xff) << 24)) + ((data[i4 + 1] & 0xff) << 8) + + ((data[i4 + 2] & 0xff) << 16) + + ((data[i4 + 3] & 0xff) << 24)) k &= 0xffffffff k *= m k &= 0xffffffff - k ^= (k % 0x100000000) >> r # k ^= k >>> r + k ^= (k % 0x100000000) >> r # k ^= k >>> r k &= 0xffffffff k *= m k &= 0xffffffff @@ -100,11 +112,11 @@ def murmur2(key): h *= m h &= 0xffffffff - h ^= (h % 0x100000000) >> 13 # h >>> 13; + h ^= (h % 0x100000000) >> 13 # h >>> 13; h &= 0xffffffff h *= m h &= 0xffffffff - h ^= (h % 0x100000000) >> 15 # h >>> 15; + h ^= (h % 0x100000000) >> 15 # h >>> 15; h &= 0xffffffff return h diff --git a/monasca_common/kafka_lib/partitioner/roundrobin.py b/monasca_common/kafka_lib/partitioner/roundrobin.py index 6439e532..f8d43ba0 100644 --- a/monasca_common/kafka_lib/partitioner/roundrobin.py +++ b/monasca_common/kafka_lib/partitioner/roundrobin.py @@ -1,7 +1,20 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + from itertools import cycle from .base import Partitioner + class RoundRobinPartitioner(Partitioner): """ Implements a round robin partitioner which sends data to partitions diff --git a/monasca_common/kafka_lib/producer/__init__.py b/monasca_common/kafka_lib/producer/__init__.py index bc0e7c61..ea6fa6aa 100644 --- a/monasca_common/kafka_lib/producer/__init__.py +++ b/monasca_common/kafka_lib/producer/__init__.py @@ -1,5 +1,5 @@ -from .simple import SimpleProducer from .keyed import KeyedProducer +from .simple import SimpleProducer __all__ = [ 'SimpleProducer', 'KeyedProducer' diff --git a/monasca_common/kafka_lib/producer/base.py b/monasca_common/kafka_lib/producer/base.py index 38394de2..b0803f74 100644 --- a/monasca_common/kafka_lib/producer/base.py +++ b/monasca_common/kafka_lib/producer/base.py @@ -1,3 +1,15 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + from __future__ import absolute_import import atexit @@ -83,7 +95,9 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, try: client.reinit() except Exception as e: - log.warn('Async producer failed to connect to brokers; backoff for %s(ms) before retrying', retry_options.backoff_ms) + log.warn( + 'Async producer failed to connect to brokers; backoff for %s(ms) before retrying', + retry_options.backoff_ms) time.sleep(float(retry_options.backoff_ms) / 1000) else: break @@ -148,7 +162,9 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, } def _handle_error(error_cls, request): - if issubclass(error_cls, RETRY_ERROR_TYPES) or (retry_options.retry_on_timeouts and issubclass(error_cls, RequestTimedOutError)): + if (issubclass(error_cls, RETRY_ERROR_TYPES) or + (retry_options.retry_on_timeouts and + issubclass(error_cls, RequestTimedOutError))): reqs_to_retry.append(request) if issubclass(error_cls, RETRY_BACKOFF_ERROR_TYPES): retry_state['do_backoff'] |= True @@ -179,8 +195,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, 'to %s:%d with msgs %s', error_cls.__name__, (i + 1), len(requests), orig_req.topic, orig_req.partition, - orig_req.messages if log_messages_on_error - else hash(orig_req.messages)) + orig_req.messages if log_messages_on_error else hash(orig_req.messages)) if not reqs_to_retry: request_tries = {} @@ -203,17 +218,15 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, request_tries = dict( (key, count + 1) for (key, count) in request_tries.items() - if key in reqs_to_retry - and (retry_options.limit is None - or (count < retry_options.limit)) + if key in reqs_to_retry and (retry_options.limit is None or + (count < retry_options.limit)) ) # Log messages we are going to retry for orig_req in request_tries.keys(): log.info('Retrying ProduceRequest to %s:%d with msgs %s', orig_req.topic, orig_req.partition, - orig_req.messages if log_messages_on_error - else hash(orig_req.messages)) + orig_req.messages if log_messages_on_error else hash(orig_req.messages)) if request_tries or not queue.empty(): log.error('Stopped producer with {0} unsent messages' @@ -282,7 +295,7 @@ class Producer(object): codec_compresslevel=None, sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT, async=False, - batch_send=False, # deprecated, use async + batch_send=False, # deprecated, use async batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, async_retry_limit=ASYNC_RETRY_LIMIT, @@ -403,7 +416,8 @@ class Producer(object): 'Current queue size %d.' % self.queue.qsize()) resp = [] else: - messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel) + messages = create_message_set( + [(m, key) for m in msg], self.codec, key, self.codec_compresslevel) req = ProduceRequest(topic, partition, messages) try: resp = self.client.send_produce_request( @@ -441,7 +455,7 @@ class Producer(object): # py3 supports unregistering if hasattr(atexit, 'unregister'): - atexit.unregister(self._cleanup_func) # pylint: disable=no-member + atexit.unregister(self._cleanup_func) # pylint: disable=no-member # py2 requires removing from private attribute... else: diff --git a/monasca_common/kafka_lib/producer/keyed.py b/monasca_common/kafka_lib/producer/keyed.py index a5a26c95..78da89c9 100644 --- a/monasca_common/kafka_lib/producer/keyed.py +++ b/monasca_common/kafka_lib/producer/keyed.py @@ -1,12 +1,23 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + from __future__ import absolute_import import logging import warnings -from .base import Producer from ..partitioner import HashedPartitioner from ..util import kafka_bytestring - +from .base import Producer log = logging.getLogger(__name__) @@ -32,7 +43,8 @@ class KeyedProducer(Producer): if not self.client.has_metadata_for_topic(topic): self.client.load_metadata_for_topics(topic) - self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic)) + self.partitioners[topic] = self.partitioner_class( + self.client.get_partition_ids_for_topic(topic)) partitioner = self.partitioners[topic] return partitioner.partition(key) @@ -44,7 +56,8 @@ class KeyedProducer(Producer): # DEPRECATED def send(self, topic, key, msg): - warnings.warn("KeyedProducer.send is deprecated in favor of send_messages", DeprecationWarning) + warnings.warn("KeyedProducer.send is deprecated in favor of send_messages", + DeprecationWarning) return self.send_messages(topic, key, msg) def __repr__(self): diff --git a/monasca_common/kafka_lib/producer/simple.py b/monasca_common/kafka_lib/producer/simple.py index 13e60d98..f13bad5d 100644 --- a/monasca_common/kafka_lib/producer/simple.py +++ b/monasca_common/kafka_lib/producer/simple.py @@ -1,3 +1,15 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + from __future__ import absolute_import from itertools import cycle @@ -40,7 +52,7 @@ class SimpleProducer(Producer): # Randomize the initial partition that is returned if self.random_start: num_partitions = len(self.client.get_partition_ids_for_topic(topic)) - for _ in xrange(random.randint(0, num_partitions-1)): + for _ in xrange(random.randint(0, num_partitions - 1)): next(self.partition_cycles[topic]) return next(self.partition_cycles[topic]) diff --git a/monasca_common/kafka_lib/protocol.py b/monasca_common/kafka_lib/protocol.py index 51b586af..49ca1c4f 100644 --- a/monasca_common/kafka_lib/protocol.py +++ b/monasca_common/kafka_lib/protocol.py @@ -1,15 +1,25 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + import logging import struct -import six - from six.moves import xrange from monasca_common.kafka_lib.codec import ( gzip_encode, gzip_decode, snappy_encode, snappy_decode ) from monasca_common.kafka_lib.common import ( - Message, OffsetAndMessage, TopicAndPartition, + Message, OffsetAndMessage, BrokerMetadata, TopicMetadata, PartitionMetadata, MetadataResponse, ProduceResponse, FetchResponse, OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, diff --git a/monasca_common/kafka_lib/util.py b/monasca_common/kafka_lib/util.py index 058daf20..0b324b1e 100644 --- a/monasca_common/kafka_lib/util.py +++ b/monasca_common/kafka_lib/util.py @@ -1,3 +1,15 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + import binascii import collections import struct @@ -83,8 +95,7 @@ def group_by_topic_and_partition(tuples): out = collections.defaultdict(dict) for t in tuples: assert t.topic not in out or t.partition not in out[t.topic], \ - 'Duplicate {0}s for {1} {2}'.format(t.__class__.__name__, - t.topic, t.partition) + 'Duplicate {0}s for {1} {2}'.format(t.__class__.__name__, t.topic, t.partition) out[t.topic][t.partition] = t return out diff --git a/monasca_common/monasca_query_language/aql_parser.py b/monasca_common/monasca_query_language/aql_parser.py index da741881..19e0ed70 100644 --- a/monasca_common/monasca_query_language/aql_parser.py +++ b/monasca_common/monasca_query_language/aql_parser.py @@ -13,10 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import datetime import six -import sys -import time import pyparsing @@ -56,8 +53,7 @@ dimension = dimension_name + dim_comparison_op + dimension_value dimension.setParseAction(query_structures.Dimension) dimension_list = pyparsing.Group((LBRACE + pyparsing.Optional( - pyparsing.delimitedList(dimension)) + - RBRACE)) + pyparsing.delimitedList(dimension)) + RBRACE)) metric = (metric_name + pyparsing.Optional(dimension_list) | pyparsing.Optional(metric_name) + dimension_list) diff --git a/monasca_common/repositories/constants.py b/monasca_common/repositories/constants.py index b165e8c0..64999070 100644 --- a/monasca_common/repositories/constants.py +++ b/monasca_common/repositories/constants.py @@ -11,4 +11,5 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + PAGE_LIMIT = 50 diff --git a/monasca_common/repositories/mysql/mysql_repository.py b/monasca_common/repositories/mysql/mysql_repository.py index eddb6f0d..c66b4777 100755 --- a/monasca_common/repositories/mysql/mysql_repository.py +++ b/monasca_common/repositories/mysql/mysql_repository.py @@ -11,6 +11,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + import logging from oslo_config import cfg diff --git a/monasca_common/tests/policy/test_policy.py b/monasca_common/tests/policy/test_policy.py index 8dc6c8c0..50981af6 100644 --- a/monasca_common/tests/policy/test_policy.py +++ b/monasca_common/tests/policy/test_policy.py @@ -57,7 +57,6 @@ class PolicyFileTestCase(base.BaseTestCase): self.context, action, self.target) - class PolicyTestCase(base.BaseTestCase): def setUp(self): super(PolicyTestCase, self).setUp() @@ -73,7 +72,7 @@ class PolicyTestCase(base.BaseTestCase): os_policy.RuleDefault("example:uppercase_admin", "role:ADMIN or role:sysadmin"), os_policy.RuleDefault("example:get_http", - "http://www.example.com"), + "http://www.example.com"), os_policy.RuleDefault("example:my_file", "role:compute_admin or " "project_id:%(project_id)s"), diff --git a/monasca_common/tests/test_monasca_query_language.py b/monasca_common/tests/test_monasca_query_language.py index c2df3442..703e439e 100644 --- a/monasca_common/tests/test_monasca_query_language.py +++ b/monasca_common/tests/test_monasca_query_language.py @@ -18,7 +18,6 @@ import pyparsing from monasca_common.monasca_query_language import aql_parser from monasca_common.monasca_query_language import exceptions -from monasca_common.monasca_query_language import query_structures class TestMonascaQueryLanguage(base.BaseTestCase): @@ -84,8 +83,8 @@ class TestMonascaQueryLanguage(base.BaseTestCase): "source metric_one targets metric_two excluding metric_three group by hostname", "source metric_one targets metric_two group by hostname", "source metric_one group by hostname", - "source {__severity__=HIGH} targets {__severity__=LOW} excluding " - "{__alarmName__=alarm_one} group by __alarmName__" + "source {__severity__=HIGH} targets {__severity__=LOW} excluding" + " {__alarmName__=alarm_one} group by __alarmName__" ] negative_expressions = [ "targets metric_two source_metric_one" diff --git a/monasca_common/tests/test_simport.py b/monasca_common/tests/test_simport.py index dd05219f..6c664458 100755 --- a/monasca_common/tests/test_simport.py +++ b/monasca_common/tests/test_simport.py @@ -89,10 +89,12 @@ class TestSimport(base.BaseTestCase): # 'full_path/monasca-common/monasca_common/tests/external/externalmodule.py'> # # while Python 3: -# +# # , that's why we need to provide different module names for simport in Python 2 and 3 # + if six.PY2: class TestSimportPY2(base.BaseTestCase): @@ -107,7 +109,8 @@ if six.PY2: def test_good_load_external(self): - method = simport.load(PWD + "/external|monasca_common.tests.external.externalmodule:Blah.method_b") + method = simport.load( + PWD + "/external|monasca_common.tests.external.externalmodule:Blah.method_b") self.assertTrue('monasca_common.tests.external.externalmodule' in sys.modules) old = sys.modules['monasca_common.tests.external.externalmodule'] @@ -119,7 +122,8 @@ if six.PY2: self.assertEqual(method, external.externalmodule.Blah.method_b) def test_import_class(self): - klass = simport.load(PWD + "/external|monasca_common.tests.external.externalmodule:Blah") + klass = simport.load( + PWD + "/external|monasca_common.tests.external.externalmodule:Blah") import external.externalmodule self.assertEqual(klass, external.externalmodule.Blah) diff --git a/monasca_common/tests/validation/test_metric_validation.py b/monasca_common/tests/validation/test_metric_validation.py index ff24f876..81e0976e 100644 --- a/monasca_common/tests/validation/test_metric_validation.py +++ b/monasca_common/tests/validation/test_metric_validation.py @@ -14,8 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from oslotest import base import codecs +from oslotest import base import six from monasca_common.validation import metrics as metric_validator @@ -35,6 +35,7 @@ def _hex_to_unicode(hex_raw): hex_str = hex_str_raw.decode('utf-8', 'replace') return hex_str + # NOTE(trebskit) => http://www.cl.cam.ac.uk/~mgk25/ucs/examples/UTF-8-test.txt UNICODE_MESSAGES = [ # 1 correct UTF-8 text @@ -207,7 +208,7 @@ class TestMetricValidation(base.BaseTestCase): def test_invalid_dimension_key_length(self): metric = {"name": "test_metric_name", - "dimensions": {'A'*256: 'B', 'B': 'C', 'D': 'E'}, + "dimensions": {'A' * 256: 'B', 'B': 'C', 'D': 'E'}, "timestamp": 1405630174123, "value": 5} self.assertRaisesRegex( @@ -217,7 +218,7 @@ class TestMetricValidation(base.BaseTestCase): def test_invalid_dimension_value_length(self): metric = {"name": "test_metric_name", - "dimensions": {'A': 'B', 'B': 'C'*256, 'D': 'E'}, + "dimensions": {'A': 'B', 'B': 'C' * 256, 'D': 'E'}, "timestamp": 1405630174123, "value": 5} self.assertRaisesRegex( @@ -324,7 +325,7 @@ class TestMetricValidation(base.BaseTestCase): def test_invalid_dimension_value_chars(self): for c in invalid_dimension_chars: metric = {"name": "test_name", - "dimensions": {'test-key': 'test{}value'.format(c)}, + "dimensions": {'test-key': 'test{}value'.format(c)}, "timestamp": 1405630174123, "value": 5} self.assertRaisesRegex( @@ -378,7 +379,8 @@ class TestMetricValidation(base.BaseTestCase): def test_invalid_too_large_value_meta(self): value_meta_value = "" num_value_meta = 10 - for i in six.moves.range(0, int(metric_validator.VALUE_META_VALUE_MAX_LENGTH / num_value_meta)): + for i in six.moves.range( + 0, int(metric_validator.VALUE_META_VALUE_MAX_LENGTH / num_value_meta)): value_meta_value = '{}{}'.format(value_meta_value, '1') value_meta = {} for i in six.moves.range(0, num_value_meta): diff --git a/tox.ini b/tox.ini index 9de29fb6..1b156db6 100644 --- a/tox.ini +++ b/tox.ini @@ -74,14 +74,14 @@ commands = bindep test [flake8] max-complexity = 50 -max-line-length = 120 +max-line-length = 100 builtins = _ exclude=.venv,.git,.tox,dist,*egg,build show-source = True # note: Due to the need to fork kafka-python, many pep8 violations occure. # All of the below ignores are caused by the forked kafka-python library # so when monasca migrates to pykafka, the below line can be removed. -ignore = E121,E126,E127,E128,E131,E221,E226,E241,E251,E261,E302,E303,E501,E701,F401,H101,H102,H301,H304,H306,H404,H405 +ignore = H101,H301,H404,H405 [testenv:lower-constraints] basepython = python3