388e52ce23
This fixes the issue of deleting temporary snapshots created during the consistency group creation process. These temporary snapshots may not be deleted if the system is under load and the temporary snapshot remains in a "busy" state after the consistency group creation process is otherwise complete. This change also reduces lines of code by implementing a manager for the creation of FixedIntervalLoopingCall instances in the ONTAP drivers. This looping call manager also provides the ability to start all registered looping calls after the driver has been properly initialized. The looping call manager also makes it easy to ensure that FixedIntervalLoopingCall instances are not instantiated in Unit Tests. Closes-Bug: #1596679 Change-Id: I13096a8c94a32e68814f81900032dbcc6a4a9806
44 lines
1.4 KiB
Python
44 lines
1.4 KiB
Python
# Copyright (c) 2016 Chuck Fouts. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
"""
|
|
Collects and starts tasks created from oslo_service.loopingcall.
|
|
"""
|
|
|
|
|
|
from collections import namedtuple
|
|
from oslo_service import loopingcall
|
|
|
|
LoopingTask = namedtuple('LoopingTask',
|
|
['looping_call', 'interval', 'initial_delay'])
|
|
|
|
# Time intervals in seconds
|
|
ONE_MINUTE = 60
|
|
TEN_MINUTES = 600
|
|
ONE_HOUR = 3600
|
|
|
|
|
|
class LoopingCalls(object):
|
|
|
|
def __init__(self):
|
|
self.tasks = []
|
|
|
|
def add_task(self, call_function, interval, initial_delay=0):
|
|
looping_call = loopingcall.FixedIntervalLoopingCall(call_function)
|
|
task = LoopingTask(looping_call, interval, initial_delay)
|
|
self.tasks.append(task)
|
|
|
|
def start_tasks(self):
|
|
for task in self.tasks:
|
|
task.looping_call.start(task.interval, task.initial_delay)
|