Refact Orchestrator
Add check_time_state in utils since both Orchestrator and Worker has used. Get wait_time and period from cfg and put them in first place. Change-Id: Id04ab21dc68a2b4d0ced28354e7ddfb32160b850
This commit is contained in:
parent
e8dc5ce4aa
commit
17986301ae
@ -55,6 +55,9 @@ CONF.register_opts(orchestrator_opts, group='orchestrator')
|
||||
FETCHERS_NAMESPACE = 'cloudkitty.tenant.fetchers'
|
||||
PROCESSORS_NAMESPACE = 'cloudkitty.rating.processors'
|
||||
|
||||
PERIOD = CONF.collect.period
|
||||
WAIT_TIME = CONF.collect.wait_periods * CONF.collect.period
|
||||
|
||||
|
||||
class RatingEndpoint(object):
|
||||
target = oslo_messaging.Target(namespace='rating',
|
||||
@ -154,13 +157,10 @@ class Worker(BaseWorker):
|
||||
self._collector = collector
|
||||
self._storage = storage
|
||||
|
||||
self._period = CONF.collect.period
|
||||
self._wait_time = CONF.collect.wait_periods * self._period
|
||||
|
||||
super(Worker, self).__init__(tenant_id)
|
||||
|
||||
def _collect(self, service, start_timestamp):
|
||||
next_timestamp = start_timestamp + self._period
|
||||
next_timestamp = start_timestamp + PERIOD
|
||||
raw_data = self._collector.retrieve(service,
|
||||
start_timestamp,
|
||||
next_timestamp,
|
||||
@ -172,15 +172,7 @@ class Worker(BaseWorker):
|
||||
|
||||
def check_state(self):
|
||||
timestamp = self._storage.get_state(self._tenant_id)
|
||||
if not timestamp:
|
||||
month_start = ck_utils.get_month_start()
|
||||
return ck_utils.dt2ts(month_start)
|
||||
|
||||
now = ck_utils.utcnow_ts()
|
||||
next_timestamp = timestamp + self._period
|
||||
if next_timestamp + self._wait_time < now:
|
||||
return next_timestamp
|
||||
return 0
|
||||
return ck_utils.check_time_state(timestamp, PERIOD, WAIT_TIME)
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
@ -202,7 +194,7 @@ class Worker(BaseWorker):
|
||||
raise collector.NoDataCollected('', service)
|
||||
except collector.NoDataCollected:
|
||||
begin = timestamp
|
||||
end = begin + self._period
|
||||
end = begin + PERIOD
|
||||
for processor in self._processors:
|
||||
processor.obj.nodata(begin, end)
|
||||
self._storage.nodata(begin, end, self._tenant_id)
|
||||
@ -260,16 +252,7 @@ class Orchestrator(object):
|
||||
|
||||
def _check_state(self, tenant_id):
|
||||
timestamp = self.storage.get_state(tenant_id)
|
||||
if not timestamp:
|
||||
month_start = ck_utils.get_month_start()
|
||||
return ck_utils.dt2ts(month_start)
|
||||
|
||||
now = ck_utils.utcnow_ts()
|
||||
next_timestamp = timestamp + CONF.collect.period
|
||||
wait_time = CONF.collect.wait_periods * CONF.collect.period
|
||||
if next_timestamp + wait_time < now:
|
||||
return next_timestamp
|
||||
return 0
|
||||
return ck_utils.check_time_state(timestamp, PERIOD, WAIT_TIME)
|
||||
|
||||
def process_messages(self):
|
||||
# TODO(sheeprine): Code kept to handle threading and asynchronous
|
||||
@ -299,7 +282,7 @@ class Orchestrator(object):
|
||||
# being processed
|
||||
eventlet.sleep(1)
|
||||
# FIXME(sheeprine): We may cause a drift here
|
||||
eventlet.sleep(CONF.collect.period)
|
||||
eventlet.sleep(PERIOD)
|
||||
|
||||
def terminate(self):
|
||||
self.coord.stop()
|
||||
|
@ -179,3 +179,15 @@ def refresh_stevedore(namespace=None):
|
||||
del cache[namespace]
|
||||
else:
|
||||
cache.clear()
|
||||
|
||||
|
||||
def check_time_state(timestamp=None, period=0, wait_time=0):
|
||||
if not timestamp:
|
||||
month_start = get_month_start()
|
||||
return dt2ts(month_start)
|
||||
|
||||
now = utcnow_ts()
|
||||
next_timestamp = timestamp + period
|
||||
if next_timestamp + wait_time < now:
|
||||
return next_timestamp
|
||||
return 0
|
||||
|
Loading…
Reference in New Issue
Block a user