Prefer assert or more-specific error to IllegalState / IllegalArgument
This commit is contained in:
@@ -131,10 +131,9 @@ class KafkaClient(object):
|
|||||||
return conn.state is ConnectionStates.DISCONNECTED and not conn.blacked_out()
|
return conn.state is ConnectionStates.DISCONNECTED and not conn.blacked_out()
|
||||||
|
|
||||||
def _initiate_connect(self, node_id):
|
def _initiate_connect(self, node_id):
|
||||||
"""Initiate a connection to the given node"""
|
"""Initiate a connection to the given node (must be in metadata)"""
|
||||||
broker = self.cluster.broker_metadata(node_id)
|
broker = self.cluster.broker_metadata(node_id)
|
||||||
if not broker:
|
assert broker, 'Broker id %s not in current metadata' % node_id
|
||||||
raise Errors.IllegalArgumentError('Broker %s not found in current cluster metadata', node_id)
|
|
||||||
|
|
||||||
if node_id not in self._conns:
|
if node_id not in self._conns:
|
||||||
log.debug("Initiating connection to node %s at %s:%s",
|
log.debug("Initiating connection to node %s at %s:%s",
|
||||||
@@ -144,8 +143,7 @@ class KafkaClient(object):
|
|||||||
return self._finish_connect(node_id)
|
return self._finish_connect(node_id)
|
||||||
|
|
||||||
def _finish_connect(self, node_id):
|
def _finish_connect(self, node_id):
|
||||||
if node_id not in self._conns:
|
assert node_id in self._conns, '%s is not in current conns' % node_id
|
||||||
raise Errors.IllegalArgumentError('Node %s not found in connections', node_id)
|
|
||||||
state = self._conns[node_id].connect()
|
state = self._conns[node_id].connect()
|
||||||
if state is ConnectionStates.CONNECTING:
|
if state is ConnectionStates.CONNECTING:
|
||||||
self._connecting.add(node_id)
|
self._connecting.add(node_id)
|
||||||
@@ -242,13 +240,15 @@ class KafkaClient(object):
|
|||||||
request (Struct): request object (not-encoded)
|
request (Struct): request object (not-encoded)
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
IllegalStateError: if node_id is not ready
|
NodeNotReadyError: if node_id is not ready
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Future: resolves to Response struct
|
Future: resolves to Response struct
|
||||||
"""
|
"""
|
||||||
if not self._can_send_request(node_id):
|
if not self._can_send_request(node_id):
|
||||||
raise Errors.IllegalStateError("Attempt to send a request to node %s which is not ready." % node_id)
|
raise Errors.NodeNotReadyError("Attempt to send a request to node"
|
||||||
|
" which is not ready (node id %s)."
|
||||||
|
% node_id)
|
||||||
|
|
||||||
# Every request gets a response, except one special case:
|
# Every request gets a response, except one special case:
|
||||||
expect_response = True
|
expect_response = True
|
||||||
|
@@ -190,9 +190,7 @@ class BrokerConnection(object):
|
|||||||
|
|
||||||
Return response if available
|
Return response if available
|
||||||
"""
|
"""
|
||||||
if self._processing:
|
assert not self._processing, 'Recursion not supported'
|
||||||
raise Errors.IllegalStateError('Recursive connection processing'
|
|
||||||
' not supported')
|
|
||||||
if not self.connected():
|
if not self.connected():
|
||||||
log.warning('%s cannot recv: socket not connected', self)
|
log.warning('%s cannot recv: socket not connected', self)
|
||||||
# If requests are pending, we should close the socket and
|
# If requests are pending, we should close the socket and
|
||||||
@@ -272,11 +270,8 @@ class BrokerConnection(object):
|
|||||||
return response
|
return response
|
||||||
|
|
||||||
def _process_response(self, read_buffer):
|
def _process_response(self, read_buffer):
|
||||||
if self._processing:
|
assert not self._processing, 'Recursion not supported'
|
||||||
raise Errors.IllegalStateError('Recursive connection processing'
|
self._processing = True
|
||||||
' not supported')
|
|
||||||
else:
|
|
||||||
self._processing = True
|
|
||||||
ifr = self.in_flight_requests.popleft()
|
ifr = self.in_flight_requests.popleft()
|
||||||
|
|
||||||
# verify send/recv correlation ids match
|
# verify send/recv correlation ids match
|
||||||
|
@@ -371,23 +371,19 @@ class Fetcher(object):
|
|||||||
response (OffsetResponse): response from the server
|
response (OffsetResponse): response from the server
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
IllegalStateError: if response does not match partition
|
AssertionError: if response does not match partition
|
||||||
"""
|
"""
|
||||||
topic, partition_info = response.topics[0]
|
topic, partition_info = response.topics[0]
|
||||||
if len(response.topics) != 1 or len(partition_info) != 1:
|
assert len(response.topics) == 1 and len(partition_info) == 1, (
|
||||||
raise Errors.IllegalStateError("OffsetResponse should only be for"
|
'OffsetResponse should only be for a single topic-partition')
|
||||||
" a single topic-partition")
|
|
||||||
|
|
||||||
part, error_code, offsets = partition_info[0]
|
part, error_code, offsets = partition_info[0]
|
||||||
if topic != partition.topic or part != partition.partition:
|
assert topic == partition.topic and part == partition.partition, (
|
||||||
raise Errors.IllegalStateError("OffsetResponse partition does not"
|
'OffsetResponse partition does not match OffsetRequest partition')
|
||||||
" match OffsetRequest partition")
|
|
||||||
|
|
||||||
error_type = Errors.for_code(error_code)
|
error_type = Errors.for_code(error_code)
|
||||||
if error_type is Errors.NoError:
|
if error_type is Errors.NoError:
|
||||||
if len(offsets) != 1:
|
assert len(offsets) == 1, 'Expected OffsetResponse with one offset'
|
||||||
raise Errors.IllegalStateError("OffsetResponse should only"
|
|
||||||
" return a single offset")
|
|
||||||
offset = offsets[0]
|
offset = offsets[0]
|
||||||
log.debug("Fetched offset %d for partition %s", offset, partition)
|
log.debug("Fetched offset %d for partition %s", offset, partition)
|
||||||
future.success(offset)
|
future.success(offset)
|
||||||
@@ -519,9 +515,7 @@ class Fetcher(object):
|
|||||||
elif error_type is Errors.UnknownError:
|
elif error_type is Errors.UnknownError:
|
||||||
log.warn("Unknown error fetching data for topic-partition %s", tp)
|
log.warn("Unknown error fetching data for topic-partition %s", tp)
|
||||||
else:
|
else:
|
||||||
raise Errors.IllegalStateError("Unexpected error code %s"
|
raise error_type('Unexpected error while fetching data')
|
||||||
" while fetching data"
|
|
||||||
% error_code)
|
|
||||||
|
|
||||||
"""TOOD - metrics
|
"""TOOD - metrics
|
||||||
self.sensors.bytesFetched.record(totalBytes)
|
self.sensors.bytesFetched.record(totalBytes)
|
||||||
|
@@ -345,8 +345,7 @@ class KafkaConsumer(object):
|
|||||||
dict: topic to deque of records since the last fetch for the
|
dict: topic to deque of records since the last fetch for the
|
||||||
subscribed list of topics and partitions
|
subscribed list of topics and partitions
|
||||||
"""
|
"""
|
||||||
if timeout_ms < 0:
|
assert timeout_ms >= 0, 'Timeout must not be negative'
|
||||||
raise Errors.IllegalArgumentError("Timeout must not be negative")
|
|
||||||
|
|
||||||
# poll for new data until the timeout expires
|
# poll for new data until the timeout expires
|
||||||
start = time.time()
|
start = time.time()
|
||||||
@@ -408,8 +407,8 @@ class KafkaConsumer(object):
|
|||||||
Arguments:
|
Arguments:
|
||||||
partition (TopicPartition): partition to check
|
partition (TopicPartition): partition to check
|
||||||
"""
|
"""
|
||||||
if not self._subscription.is_assigned(partition):
|
assert self._subscription.is_assigned(partition)
|
||||||
raise Errors.IllegalStateError("You can only check the position for partitions assigned to this consumer.")
|
|
||||||
offset = self._subscription.assignment[partition].consumed
|
offset = self._subscription.assignment[partition].consumed
|
||||||
if offset is None:
|
if offset is None:
|
||||||
self._update_fetch_positions(partition)
|
self._update_fetch_positions(partition)
|
||||||
@@ -454,8 +453,7 @@ class KafkaConsumer(object):
|
|||||||
partition (TopicPartition): partition for seek operation
|
partition (TopicPartition): partition for seek operation
|
||||||
offset (int): message offset in partition
|
offset (int): message offset in partition
|
||||||
"""
|
"""
|
||||||
if offset < 0:
|
assert offset >= 0
|
||||||
raise Errors.IllegalStateError("seek offset must not be a negative number")
|
|
||||||
log.debug("Seeking to offset %s for partition %s", offset, partition)
|
log.debug("Seeking to offset %s for partition %s", offset, partition)
|
||||||
self._subscription.assignment[partition].seek(offset)
|
self._subscription.assignment[partition].seek(offset)
|
||||||
|
|
||||||
|
@@ -103,8 +103,7 @@ class SubscriptionState(object):
|
|||||||
"""
|
"""
|
||||||
if self._user_assignment or (topics and pattern):
|
if self._user_assignment or (topics and pattern):
|
||||||
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
|
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
|
||||||
if not (topics or pattern):
|
assert topics or pattern, 'Must provide topics or pattern'
|
||||||
raise IllegalStateError('Must provide topics or a pattern')
|
|
||||||
|
|
||||||
if pattern:
|
if pattern:
|
||||||
log.info('Subscribing to pattern: /%s/', pattern)
|
log.info('Subscribing to pattern: /%s/', pattern)
|
||||||
@@ -341,8 +340,7 @@ class TopicPartitionState(object):
|
|||||||
self._fetched = None # current fetch position
|
self._fetched = None # current fetch position
|
||||||
|
|
||||||
def _set_fetched(self, offset):
|
def _set_fetched(self, offset):
|
||||||
if not self.has_valid_position:
|
assert self.has_valid_position, 'Valid consumed/fetch position required'
|
||||||
raise IllegalStateError("Cannot update fetch position without valid consumed/fetched positions")
|
|
||||||
self._fetched = offset
|
self._fetched = offset
|
||||||
|
|
||||||
def _get_fetched(self):
|
def _get_fetched(self):
|
||||||
@@ -351,8 +349,7 @@ class TopicPartitionState(object):
|
|||||||
fetched = property(_get_fetched, _set_fetched, None, "current fetch position")
|
fetched = property(_get_fetched, _set_fetched, None, "current fetch position")
|
||||||
|
|
||||||
def _set_consumed(self, offset):
|
def _set_consumed(self, offset):
|
||||||
if not self.has_valid_position:
|
assert self.has_valid_position, 'Valid consumed/fetch position required'
|
||||||
raise IllegalStateError("Cannot update consumed position without valid consumed/fetched positions")
|
|
||||||
self._consumed = offset
|
self._consumed = offset
|
||||||
|
|
||||||
def _get_consumed(self):
|
def _get_consumed(self):
|
||||||
|
@@ -72,10 +72,6 @@ class AbstractCoordinator(object):
|
|||||||
retry_backoff_ms (int): Milliseconds to backoff when retrying on
|
retry_backoff_ms (int): Milliseconds to backoff when retrying on
|
||||||
errors. Default: 100.
|
errors. Default: 100.
|
||||||
"""
|
"""
|
||||||
if not client:
|
|
||||||
raise Errors.IllegalStateError('a client is required to use'
|
|
||||||
' Group Coordinator')
|
|
||||||
|
|
||||||
self.config = copy.copy(self.DEFAULT_CONFIG)
|
self.config = copy.copy(self.DEFAULT_CONFIG)
|
||||||
for key in self.config:
|
for key in self.config:
|
||||||
if key in configs:
|
if key in configs:
|
||||||
|
@@ -99,8 +99,7 @@ class ConsumerCoordinator(AbstractCoordinator):
|
|||||||
self._subscription = subscription
|
self._subscription = subscription
|
||||||
self._partitions_per_topic = {}
|
self._partitions_per_topic = {}
|
||||||
self._auto_commit_task = None
|
self._auto_commit_task = None
|
||||||
if not self.config['assignors']:
|
assert self.config['assignors'], 'Coordinator require assignors'
|
||||||
raise Errors.IllegalStateError('Coordinator requires assignors')
|
|
||||||
|
|
||||||
self._cluster.request_update()
|
self._cluster.request_update()
|
||||||
self._cluster.add_listener(self._handle_metadata_update)
|
self._cluster.add_listener(self._handle_metadata_update)
|
||||||
@@ -168,10 +167,7 @@ class ConsumerCoordinator(AbstractCoordinator):
|
|||||||
def _on_join_complete(self, generation, member_id, protocol,
|
def _on_join_complete(self, generation, member_id, protocol,
|
||||||
member_assignment_bytes):
|
member_assignment_bytes):
|
||||||
assignor = self._lookup_assignor(protocol)
|
assignor = self._lookup_assignor(protocol)
|
||||||
if not assignor:
|
assert assignor, 'invalid assignment protocol: %s' % protocol
|
||||||
raise Errors.IllegalStateError("Coordinator selected invalid"
|
|
||||||
" assignment protocol: %s"
|
|
||||||
% protocol)
|
|
||||||
|
|
||||||
assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes)
|
assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes)
|
||||||
|
|
||||||
@@ -202,10 +198,7 @@ class ConsumerCoordinator(AbstractCoordinator):
|
|||||||
|
|
||||||
def _perform_assignment(self, leader_id, assignment_strategy, members):
|
def _perform_assignment(self, leader_id, assignment_strategy, members):
|
||||||
assignor = self._lookup_assignor(assignment_strategy)
|
assignor = self._lookup_assignor(assignment_strategy)
|
||||||
if not assignor:
|
assert assignor, 'Invalid assignment protocol: %s' % assignment_strategy
|
||||||
raise Errors.IllegalStateError("Coordinator selected invalid"
|
|
||||||
" assignment protocol: %s"
|
|
||||||
% assignment_strategy)
|
|
||||||
member_metadata = {}
|
member_metadata = {}
|
||||||
all_subscribed_topics = set()
|
all_subscribed_topics = set()
|
||||||
for member_id, metadata_bytes in members:
|
for member_id, metadata_bytes in members:
|
||||||
@@ -581,10 +574,8 @@ class AutoCommitTask(object):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def _reschedule(self, at):
|
def _reschedule(self, at):
|
||||||
if self._enabled:
|
assert self._enabled, 'AutoCommitTask not enabled'
|
||||||
self._client.schedule(self, at)
|
self._client.schedule(self, at)
|
||||||
else:
|
|
||||||
raise Errors.IllegalStateError('AutoCommitTask not enabled')
|
|
||||||
|
|
||||||
def __call__(self):
|
def __call__(self):
|
||||||
if not self._enabled:
|
if not self._enabled:
|
||||||
|
@@ -16,9 +16,9 @@ class Heartbeat(object):
|
|||||||
if key in configs:
|
if key in configs:
|
||||||
self.config[key] = configs[key]
|
self.config[key] = configs[key]
|
||||||
|
|
||||||
if self.config['heartbeat_interval_ms'] > self.config['session_timeout_ms']:
|
assert (self.config['heartbeat_interval_ms']
|
||||||
raise Errors.IllegalArgumentError("Heartbeat interval must be set"
|
<= self.config['session_timeout_ms'],
|
||||||
" lower than the session timeout")
|
'Heartbeat interval must be lower than the session timeout')
|
||||||
|
|
||||||
self.interval = self.config['heartbeat_interval_ms'] / 1000.0
|
self.interval = self.config['heartbeat_interval_ms'] / 1000.0
|
||||||
self.timeout = self.config['session_timeout_ms'] / 1000.0
|
self.timeout = self.config['session_timeout_ms'] / 1000.0
|
||||||
|
@@ -27,10 +27,7 @@ class Future(object):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def success(self, value):
|
def success(self, value):
|
||||||
if self.is_done:
|
assert not self.is_done, 'Future is already complete'
|
||||||
raise Errors.IllegalStateError('Invalid attempt to complete a'
|
|
||||||
' request future which is already'
|
|
||||||
' complete')
|
|
||||||
self.value = value
|
self.value = value
|
||||||
self.is_done = True
|
self.is_done = True
|
||||||
for f in self._callbacks:
|
for f in self._callbacks:
|
||||||
@@ -41,11 +38,10 @@ class Future(object):
|
|||||||
return self
|
return self
|
||||||
|
|
||||||
def failure(self, e):
|
def failure(self, e):
|
||||||
if self.is_done:
|
assert not self.is_done, 'Future is already complete'
|
||||||
raise Errors.IllegalStateError('Invalid attempt to complete a'
|
|
||||||
' request future which is already'
|
|
||||||
' complete')
|
|
||||||
self.exception = e if type(e) is not type else e()
|
self.exception = e if type(e) is not type else e()
|
||||||
|
assert isinstance(self.exception, BaseException), (
|
||||||
|
'future failed without an exception')
|
||||||
self.is_done = True
|
self.is_done = True
|
||||||
for f in self._errbacks:
|
for f in self._errbacks:
|
||||||
try:
|
try:
|
||||||
|
Reference in New Issue
Block a user