Merge "Delay string interpolations at logging calls"
This commit is contained in:
@@ -212,7 +212,7 @@ class ReprocessSchedulerPostApi(base.BaseResource):
|
||||
LOG.debug(
|
||||
"No need to validate possible collision of reprocessing "
|
||||
"for scope [%s] because it does not have active "
|
||||
"reprocessing schedules." % scope_to_reprocess)
|
||||
"reprocessing schedules.", scope_to_reprocess)
|
||||
continue
|
||||
|
||||
for schedule in all_reprocessing_schedules:
|
||||
|
||||
@@ -339,7 +339,7 @@ class GnocchiCollector(collector.BaseCollector):
|
||||
if 'Metrics not found' not in e.message["cause"]:
|
||||
raise
|
||||
LOG.warning('[{scope}] Skipping this metric for the '
|
||||
'current cycle.'.format(scope=project_id))
|
||||
'current cycle.', scope=project_id)
|
||||
return []
|
||||
|
||||
def get_aggregation_api_arguments(self, end, metric_name, project_id,
|
||||
@@ -482,8 +482,8 @@ class GnocchiCollector(collector.BaseCollector):
|
||||
except AssociatedResourceNotFound as e:
|
||||
LOG.warning(
|
||||
'[{}] An error occured during data collection '
|
||||
'between {} and {}: {}'.format(
|
||||
project_id, start, end, e),
|
||||
'between {} and {}: {}',
|
||||
project_id, start, end, e
|
||||
)
|
||||
continue
|
||||
point = self._create_data_point(met, qty, 0, groupby,
|
||||
|
||||
@@ -197,21 +197,14 @@ class ScopeEndpoint(object):
|
||||
self._coord.start(start_heart=True)
|
||||
|
||||
def reset_state(self, ctxt, res_data):
|
||||
LOG.info('Received state reset command. {}'.format(res_data))
|
||||
LOG.info('Received state reset command. {}', res_data)
|
||||
random.shuffle(res_data['scopes'])
|
||||
for scope in res_data['scopes']:
|
||||
lock_name, lock = get_lock(self._coord, scope['scope_id'])
|
||||
LOG.debug(
|
||||
'[ScopeEndpoint] Trying to acquire lock "{}" ...'.format(
|
||||
lock_name,
|
||||
)
|
||||
)
|
||||
LOG.debug('[ScopeEndpoint] Trying to acquire lock "{}" ...',
|
||||
lock_name)
|
||||
if lock.acquire(blocking=True):
|
||||
LOG.debug(
|
||||
'[ScopeEndpoint] Acquired lock "{}".'.format(
|
||||
lock_name,
|
||||
)
|
||||
)
|
||||
LOG.debug('[ScopeEndpoint] Acquired lock "{}".', lock_name)
|
||||
last_processed_timestamp = tzutils.dt_from_iso(
|
||||
res_data['last_processed_timestamp'])
|
||||
try:
|
||||
@@ -227,11 +220,8 @@ class ScopeEndpoint(object):
|
||||
)
|
||||
finally:
|
||||
lock.release()
|
||||
LOG.debug(
|
||||
'[ScopeEndpoint] Released lock "{}" .'.format(
|
||||
lock_name,
|
||||
)
|
||||
)
|
||||
LOG.debug('[ScopeEndpoint] Released lock "{}" .',
|
||||
lock_name)
|
||||
|
||||
|
||||
class BaseWorker(object):
|
||||
@@ -326,16 +316,15 @@ class Worker(BaseWorker):
|
||||
try:
|
||||
return self._collect(metric, timestamp)
|
||||
except collector.NoDataCollected:
|
||||
LOG.info(
|
||||
self._log_prefix + 'No data collected '
|
||||
'for metric {metric} at timestamp {ts}'.format(
|
||||
metric=metric, ts=timestamp))
|
||||
LOG.info('{prefix}No data collected '
|
||||
'for metric {metric} at timestamp {ts}',
|
||||
prefix=self._log_prefix, metric=metric, ts=timestamp)
|
||||
return metric, None
|
||||
except Exception as e:
|
||||
LOG.exception(
|
||||
self._log_prefix + 'Error while collecting'
|
||||
' metric {metric} at timestamp {ts}: {e}. Exiting.'.format(
|
||||
metric=metric, ts=timestamp, e=e))
|
||||
LOG.exception('{prefix}Error while collecting metric {metric} '
|
||||
'at timestamp {ts}: {e}. Exiting.',
|
||||
prefix=self._log_prefix, metric=metric,
|
||||
ts=timestamp, e=e)
|
||||
# FIXME(peschk_l): here we just exit, and the
|
||||
# collection will be retried during the next collect
|
||||
# cycle. In the future, we should implement a retrying
|
||||
@@ -365,17 +354,17 @@ class Worker(BaseWorker):
|
||||
futs = [tpool.submit(_get_result, metric)
|
||||
for metric in metrics]
|
||||
|
||||
LOG.debug(self._log_prefix +
|
||||
'Collecting [{}] metrics.'.format(metrics))
|
||||
LOG.debug('{}Collecting [{}] metrics.',
|
||||
self._log_prefix, metrics)
|
||||
|
||||
results = [r.result() for r in waiters.wait_for_all(futs).done]
|
||||
|
||||
log_message = self._log_prefix + \
|
||||
"Collecting {} metrics took {}s total, with {}s average"
|
||||
|
||||
LOG.debug(log_message.format(tpool.statistics.executed,
|
||||
tpool.statistics.runtime,
|
||||
tpool.statistics.average_runtime))
|
||||
LOG.debug(
|
||||
"{}Collecting {} metrics took {}s total, with {}s average",
|
||||
self._log_prefix,
|
||||
tpool.statistics.executed,
|
||||
tpool.statistics.runtime,
|
||||
tpool.statistics.average_runtime)
|
||||
|
||||
except ZeroDivisionError as zeroDivisionError:
|
||||
LOG.debug("Ignoring ZeroDivisionError for metrics [%s]: [%s].",
|
||||
@@ -623,14 +612,14 @@ class CloudKittyProcessor(cotyledon.Service):
|
||||
pass
|
||||
|
||||
def run(self):
|
||||
LOG.debug('Started worker {}.'.format(self._worker_id))
|
||||
LOG.debug('Started worker {}.', self._worker_id)
|
||||
while True:
|
||||
self.internal_run()
|
||||
|
||||
def terminate(self):
|
||||
LOG.debug('Terminating worker {}.'.format(self._worker_id))
|
||||
LOG.debug('Terminating worker {}.', self._worker_id)
|
||||
self.coord.stop()
|
||||
LOG.debug('Terminated worker {}.'.format(self._worker_id))
|
||||
LOG.debug('Terminated worker {}.', self._worker_id)
|
||||
|
||||
def internal_run(self):
|
||||
self.load_scopes_to_process()
|
||||
@@ -639,16 +628,16 @@ class CloudKittyProcessor(cotyledon.Service):
|
||||
self.coord, self.generate_lock_base_name(tenant_id))
|
||||
|
||||
LOG.debug('[Worker: {w}] Trying to acquire lock "{lock_name}" for '
|
||||
'scope ID {scope_id}.'.format(w=self._worker_id,
|
||||
lock_name=lock_name,
|
||||
scope_id=tenant_id))
|
||||
'scope ID {scope_id}.',
|
||||
w=self._worker_id, lock_name=lock_name,
|
||||
scope_id=tenant_id)
|
||||
|
||||
lock_acquired = lock.acquire(blocking=False)
|
||||
if lock_acquired:
|
||||
LOG.debug('[Worker: {w}] Acquired lock "{lock_name}" for '
|
||||
'scope ID {scope_id}.'.format(w=self._worker_id,
|
||||
lock_name=lock_name,
|
||||
scope_id=tenant_id))
|
||||
'scope ID {scope_id}.',
|
||||
w=self._worker_id, lock_name=lock_name,
|
||||
scope_id=tenant_id)
|
||||
|
||||
_success = True
|
||||
try:
|
||||
@@ -698,8 +687,8 @@ class CloudKittyProcessor(cotyledon.Service):
|
||||
self.tenants = self.fetcher.get_tenants()
|
||||
random.shuffle(self.tenants)
|
||||
|
||||
LOG.info('[Worker: {w}] Tenants loaded for fetcher {f}'.format(
|
||||
w=self._worker_id, f=self.fetcher.name))
|
||||
LOG.info('[Worker: {w}] Tenants loaded for fetcher {f}',
|
||||
w=self._worker_id, f=self.fetcher.name)
|
||||
|
||||
|
||||
class CloudKittyReprocessor(CloudKittyProcessor):
|
||||
|
||||
@@ -105,8 +105,8 @@ class ElasticsearchStorage(v2_storage.BaseStorage):
|
||||
if r.status_code != 200:
|
||||
raise exceptions.IndexDoesNotExist(
|
||||
CONF.storage_elasticsearch.index_name)
|
||||
LOG.info('Creating mapping "_doc" on index {}...'.format(
|
||||
CONF.storage_elasticsearch.index_name))
|
||||
LOG.info('Creating mapping "_doc" on index {}...',
|
||||
CONF.storage_elasticsearch.index_name)
|
||||
self._conn.put_mapping(CLOUDKITTY_INDEX_MAPPING)
|
||||
LOG.info('Mapping created.')
|
||||
|
||||
|
||||
@@ -219,13 +219,13 @@ class ElasticsearchClient(object):
|
||||
resp = self._req(
|
||||
self._sess.delete, url, json.dumps(body), None, deserialize=False)
|
||||
body = resp.json()
|
||||
LOG.debug('Freed {} scrolls contexts'.format(body['num_freed']))
|
||||
LOG.debug('Freed {} scrolls contexts', body['num_freed'])
|
||||
return body
|
||||
|
||||
def close_scrolls(self):
|
||||
"""Closes all scroll contexts opened by this client."""
|
||||
ids = list(self._scroll_ids)
|
||||
LOG.debug('Closing {} scroll contexts: {}'.format(len(ids), ids))
|
||||
LOG.debug('Closing {} scroll contexts: {}', len(ids), ids)
|
||||
self.close_scroll({'scroll_id': ids})
|
||||
self._scroll_ids = set()
|
||||
|
||||
@@ -267,7 +267,7 @@ class ElasticsearchClient(object):
|
||||
:param terms: list of documents to index
|
||||
:type terms: collections.abc.Iterable
|
||||
"""
|
||||
LOG.debug("Indexing {} documents".format(len(terms)))
|
||||
LOG.debug("Indexing {} documents", len(terms))
|
||||
if elasticsearch.CONF.storage_elasticsearch.use_datastream:
|
||||
return self.bulk_with_instruction({"create": {}}, terms)
|
||||
else:
|
||||
|
||||
@@ -143,7 +143,7 @@ class InfluxClient(object):
|
||||
total_points = len(self._points)
|
||||
if len(self._points) < 1:
|
||||
return
|
||||
LOG.debug('Pushing {} points to InfluxDB'.format(total_points))
|
||||
LOG.debug('Pushing {} points to InfluxDB', total_points)
|
||||
self._conn.write_points(self._points,
|
||||
retention_policy=self._retention_policy)
|
||||
self._points = []
|
||||
@@ -555,7 +555,7 @@ class InfluxClientV2(InfluxClient):
|
||||
total_points = len(self._points)
|
||||
if len(self._points) < 1:
|
||||
return
|
||||
LOG.debug('Pushing {} points to InfluxDB'.format(total_points))
|
||||
LOG.debug('Pushing {} points to InfluxDB', total_points)
|
||||
self.write_points(self._points,
|
||||
retention_policy=self._retention_policy)
|
||||
self._points = []
|
||||
@@ -755,10 +755,8 @@ class InfluxStorage(v2_storage.BaseStorage):
|
||||
policy = CONF.storage_influxdb.retention_policy
|
||||
database = CONF.storage_influxdb.database
|
||||
if not self._conn.retention_policy_exists(database, policy):
|
||||
LOG.error(
|
||||
'Archive policy "{}" does not exist in database "{}"'.format(
|
||||
policy, database)
|
||||
)
|
||||
LOG.error('Archive policy "{}" does not exist in database "{}"',
|
||||
policy, database)
|
||||
|
||||
def push(self, dataframes, scope_id=None):
|
||||
|
||||
|
||||
@@ -104,8 +104,8 @@ class OpenSearchStorage(v2_storage.BaseStorage):
|
||||
if r.status_code != 200:
|
||||
raise exceptions.IndexDoesNotExist(
|
||||
CONF.storage_opensearch.index_name)
|
||||
LOG.info('Creating mapping "_doc" on index {}...'.format(
|
||||
CONF.storage_opensearch.index_name))
|
||||
LOG.info('Creating mapping "_doc" on index {}...',
|
||||
CONF.storage_opensearch.index_name)
|
||||
self._conn.post_mapping(CLOUDKITTY_INDEX_MAPPING)
|
||||
LOG.info('Mapping created.')
|
||||
|
||||
|
||||
@@ -217,13 +217,13 @@ class OpenSearchClient(object):
|
||||
resp = self._req(
|
||||
self._sess.delete, url, json.dumps(body), None, deserialize=False)
|
||||
body = resp.json()
|
||||
LOG.debug('Freed {} scrolls contexts'.format(body['num_freed']))
|
||||
LOG.debug('Freed {} scrolls contexts', body['num_freed'])
|
||||
return body
|
||||
|
||||
def close_scrolls(self):
|
||||
"""Closes all scroll contexts opened by this client."""
|
||||
ids = list(self._scroll_ids)
|
||||
LOG.debug('Closing {} scroll contexts: {}'.format(len(ids), ids))
|
||||
LOG.debug('Closing {} scroll contexts: {}', len(ids), ids)
|
||||
self.close_scroll({'scroll_id': ids})
|
||||
self._scroll_ids = set()
|
||||
|
||||
@@ -260,7 +260,7 @@ class OpenSearchClient(object):
|
||||
:param terms: list of documents to index
|
||||
:type terms: collections.abc.Iterable
|
||||
"""
|
||||
LOG.debug("Indexing {} documents".format(len(terms)))
|
||||
LOG.debug("Indexing {} documents", len(terms))
|
||||
if opensearch.CONF.storage_opensearch.use_datastream:
|
||||
return self.bulk_with_instruction({"create": {}}, terms)
|
||||
else:
|
||||
|
||||
@@ -140,11 +140,11 @@ class StateManager(object):
|
||||
r.collector = collector
|
||||
r.fetcher = fetcher
|
||||
LOG.info('Updating identifier "{i}" with scope_key "{sk}", '
|
||||
'collector "{c}" and fetcher "{f}"'.format(
|
||||
i=identifier,
|
||||
sk=scope_key,
|
||||
c=collector,
|
||||
f=fetcher))
|
||||
'collector "{c}" and fetcher "{f}"',
|
||||
i=identifier,
|
||||
sk=scope_key,
|
||||
c=collector,
|
||||
f=fetcher)
|
||||
session.commit()
|
||||
return r
|
||||
|
||||
|
||||
3
tox.ini
3
tox.ini
@@ -83,7 +83,8 @@ exclude = .git,.venv,.tox,dist,doc,*egg,build,.ropeproject,releasenotes
|
||||
# [H203]: Use assertIs(Not)None to check for None
|
||||
# [H204]: Use assert(Not)Equal to check for equality
|
||||
# [H205]: Use assert(Greater|Less)(Equal) for comparison
|
||||
enable-extensions=H203,H204,H205
|
||||
# [H904]: Delay string interpolations at logging calls
|
||||
enable-extensions=H203,H204,H205,H904
|
||||
|
||||
|
||||
[doc8]
|
||||
|
||||
Reference in New Issue
Block a user