Merge "Fix CloudKitty log formatting"

This commit is contained in:
Zuul
2026-03-02 11:10:40 +00:00
committed by Gerrit Code Review
8 changed files with 41 additions and 47 deletions

View File

@@ -338,8 +338,8 @@ class GnocchiCollector(collector.BaseCollector):
if isinstance(e, gexceptions.BadRequest):
if 'Metrics not found' not in e.message["cause"]:
raise
LOG.warning('[{scope}] Skipping this metric for the '
'current cycle.', scope=project_id)
LOG.warning('[%s] Skipping this metric for the '
'current cycle.', project_id)
return []
def get_aggregation_api_arguments(self, end, metric_name, project_id,
@@ -481,8 +481,8 @@ class GnocchiCollector(collector.BaseCollector):
met, d, resources_info)
except AssociatedResourceNotFound as e:
LOG.warning(
'[{}] An error occured during data collection '
'between {} and {}: {}',
'[%s] An error occured during data collection '
'between %s and %s: %s',
project_id, start, end, e
)
continue

View File

@@ -212,14 +212,14 @@ class ScopeEndpoint(object):
self._coord.start(start_heart=True)
def reset_state(self, ctxt, res_data):
LOG.info('Received state reset command. {}', res_data)
LOG.info('Received state reset command. %s', res_data)
random.shuffle(res_data['scopes'])
for scope in res_data['scopes']:
lock_name, lock = get_lock(self._coord, scope['scope_id'])
LOG.debug('[ScopeEndpoint] Trying to acquire lock "{}" ...',
LOG.debug('[ScopeEndpoint] Trying to acquire lock "%s" ...',
lock_name)
if lock.acquire(blocking=True):
LOG.debug('[ScopeEndpoint] Acquired lock "{}".', lock_name)
LOG.debug('[ScopeEndpoint] Acquired lock "%s".', lock_name)
last_processed_timestamp = tzutils.dt_from_iso(
res_data['last_processed_timestamp'])
try:
@@ -235,7 +235,7 @@ class ScopeEndpoint(object):
)
finally:
lock.release()
LOG.debug('[ScopeEndpoint] Released lock "{}" .',
LOG.debug('[ScopeEndpoint] Released lock "%s" .',
lock_name)
@@ -351,15 +351,14 @@ class Worker(BaseWorker):
try:
return self._collect(metric, timestamp)
except collector.NoDataCollected:
LOG.info('{prefix}No data collected '
'for metric {metric} at timestamp {ts}',
prefix=self._log_prefix, metric=metric, ts=timestamp)
LOG.info('%sNo data collected '
'for metric %s at timestamp %s',
self._log_prefix, metric, timestamp)
return metric, None
except Exception as e:
LOG.exception('{prefix}Error while collecting metric {metric} '
'at timestamp {ts}: {e}. Exiting.',
prefix=self._log_prefix, metric=metric,
ts=timestamp, e=e)
LOG.exception('%sError while collecting metric %s '
'at timestamp %s: %s. Exiting.',
self._log_prefix, metric, timestamp, e)
# FIXME(peschk_l): here we just exit, and the
# collection will be retried during the next collect
# cycle. In the future, we should implement a retrying
@@ -389,13 +388,13 @@ class Worker(BaseWorker):
futs = [tpool.submit(_get_result, metric)
for metric in metrics]
LOG.debug('{}Collecting [{}] metrics.',
LOG.debug('%sCollecting [%s] metrics.',
self._log_prefix, metrics)
results = [r.result() for r in waiters.wait_for_all(futs).done]
LOG.debug(
"{}Collecting {} metrics took {}s total, with {}s average",
"%sCollecting %s metrics took %ss total, with %ss average",
self._log_prefix,
tpool.statistics.executed,
tpool.statistics.runtime,
@@ -704,14 +703,14 @@ class CloudKittyProcessor(cotyledon.Service):
pass
def run(self):
LOG.debug('Started worker {}.', self._worker_id)
LOG.debug('Started worker %s.', self._worker_id)
while True:
self.internal_run()
def terminate(self):
LOG.debug('Terminating worker {}.', self._worker_id)
LOG.debug('Terminating worker %s.', self._worker_id)
self.coord.stop()
LOG.debug('Terminated worker {}.', self._worker_id)
LOG.debug('Terminated worker %s.', self._worker_id)
def internal_run(self):
self.load_scopes_to_process()
@@ -719,17 +718,15 @@ class CloudKittyProcessor(cotyledon.Service):
lock_name, lock = get_lock(
self.coord, self.generate_lock_base_name(tenant_id))
LOG.debug('[Worker: {w}] Trying to acquire lock "{lock_name}" for '
'scope ID {scope_id}.',
w=self._worker_id, lock_name=lock_name,
scope_id=tenant_id)
LOG.debug('[Worker: %s] Trying to acquire lock "%s" for '
'scope ID %s.',
self._worker_id, lock_name, tenant_id)
lock_acquired = lock.acquire(blocking=False)
if lock_acquired:
LOG.debug('[Worker: {w}] Acquired lock "{lock_name}" for '
'scope ID {scope_id}.',
w=self._worker_id, lock_name=lock_name,
scope_id=tenant_id)
LOG.debug('[Worker: %s] Acquired lock "%s" for '
'scope ID %s.',
self._worker_id, lock_name, tenant_id)
_success = True
try:
@@ -779,8 +776,8 @@ class CloudKittyProcessor(cotyledon.Service):
self.tenants = self.fetcher.get_tenants()
random.shuffle(self.tenants)
LOG.info('[Worker: {w}] Tenants loaded for fetcher {f}',
w=self._worker_id, f=self.fetcher.name)
LOG.info('[Worker: %s] Tenants loaded for fetcher %s',
self._worker_id, self.fetcher.name)
class CloudKittyReprocessor(CloudKittyProcessor):

View File

@@ -105,7 +105,7 @@ class ElasticsearchStorage(v2_storage.BaseStorage):
if r.status_code != 200:
raise exceptions.IndexDoesNotExist(
CONF.storage_elasticsearch.index_name)
LOG.info('Creating mapping "_doc" on index {}...',
LOG.info('Creating mapping "_doc" on index %s...',
CONF.storage_elasticsearch.index_name)
self._conn.put_mapping(CLOUDKITTY_INDEX_MAPPING)
LOG.info('Mapping created.')

View File

@@ -219,13 +219,13 @@ class ElasticsearchClient(object):
resp = self._req(
self._sess.delete, url, json.dumps(body), None, deserialize=False)
body = resp.json()
LOG.debug('Freed {} scrolls contexts', body['num_freed'])
LOG.debug('Freed %s scrolls contexts', body['num_freed'])
return body
def close_scrolls(self):
"""Closes all scroll contexts opened by this client."""
ids = list(self._scroll_ids)
LOG.debug('Closing {} scroll contexts: {}', len(ids), ids)
LOG.debug('Closing %s scroll contexts: %s', len(ids), ids)
self.close_scroll({'scroll_id': ids})
self._scroll_ids = set()
@@ -267,7 +267,7 @@ class ElasticsearchClient(object):
:param terms: list of documents to index
:type terms: collections.abc.Iterable
"""
LOG.debug("Indexing {} documents", len(terms))
LOG.debug("Indexing %s documents", len(terms))
if elasticsearch.CONF.storage_elasticsearch.use_datastream:
return self.bulk_with_instruction({"create": {}}, terms)
else:

View File

@@ -143,7 +143,7 @@ class InfluxClient(object):
total_points = len(self._points)
if len(self._points) < 1:
return
LOG.debug('Pushing {} points to InfluxDB', total_points)
LOG.debug('Pushing %s points to InfluxDB', total_points)
self._conn.write_points(self._points,
retention_policy=self._retention_policy)
self._points = []
@@ -555,7 +555,7 @@ class InfluxClientV2(InfluxClient):
total_points = len(self._points)
if len(self._points) < 1:
return
LOG.debug('Pushing {} points to InfluxDB', total_points)
LOG.debug('Pushing %s points to InfluxDB', total_points)
self.write_points(self._points,
retention_policy=self._retention_policy)
self._points = []
@@ -755,7 +755,7 @@ class InfluxStorage(v2_storage.BaseStorage):
policy = CONF.storage_influxdb.retention_policy
database = CONF.storage_influxdb.database
if not self._conn.retention_policy_exists(database, policy):
LOG.error('Archive policy "{}" does not exist in database "{}"',
LOG.error('Archive policy "%s" does not exist in database "%s"',
policy, database)
def push(self, dataframes, scope_id=None):

View File

@@ -104,7 +104,7 @@ class OpenSearchStorage(v2_storage.BaseStorage):
if r.status_code != 200:
raise exceptions.IndexDoesNotExist(
CONF.storage_opensearch.index_name)
LOG.info('Creating mapping "_doc" on index {}...',
LOG.info('Creating mapping "_doc" on index %s...',
CONF.storage_opensearch.index_name)
self._conn.post_mapping(CLOUDKITTY_INDEX_MAPPING)
LOG.info('Mapping created.')

View File

@@ -217,13 +217,13 @@ class OpenSearchClient(object):
resp = self._req(
self._sess.delete, url, json.dumps(body), None, deserialize=False)
body = resp.json()
LOG.debug('Freed {} scrolls contexts', body['num_freed'])
LOG.debug('Freed %s scrolls contexts', body['num_freed'])
return body
def close_scrolls(self):
"""Closes all scroll contexts opened by this client."""
ids = list(self._scroll_ids)
LOG.debug('Closing {} scroll contexts: {}', len(ids), ids)
LOG.debug('Closing %s scroll contexts: %s', len(ids), ids)
self.close_scroll({'scroll_id': ids})
self._scroll_ids = set()
@@ -260,7 +260,7 @@ class OpenSearchClient(object):
:param terms: list of documents to index
:type terms: collections.abc.Iterable
"""
LOG.debug("Indexing {} documents", len(terms))
LOG.debug("Indexing %s documents", len(terms))
if opensearch.CONF.storage_opensearch.use_datastream:
return self.bulk_with_instruction({"create": {}}, terms)
else:

View File

@@ -139,12 +139,9 @@ class StateManager(object):
r.scope_key = scope_key
r.collector = collector
r.fetcher = fetcher
LOG.info('Updating identifier "{i}" with scope_key "{sk}", '
'collector "{c}" and fetcher "{f}"',
i=identifier,
sk=scope_key,
c=collector,
f=fetcher)
LOG.info('Updating identifier "%s" with scope_key "%s", '
'collector "%s" and fetcher "%s"',
identifier, scope_key, collector, fetcher)
session.commit()
return r