zuul/zuul/lib/queue.py
James E. Blair 419a8679bb Merge tenant reconfiguration events
Replace the queue used for management events with one that can
combine similar events.  In this case, make it able to merge
tenant reconfiguration events, so that if multiple patches which
change the config merge between iterations of the event processor,
we only reconfigure the tenant once.

Tenant reconfiguration events are also associated with projects,
so make sure that when we merge them, we combine the list of projects
as well, so they have their cached configuration cleared.

Finally, don't store a reference to the tenant, but rather just the
tenant name, so that if reconfiguration events are queued, we don't
keep extra copies of layouts in ram.

Change-Id: If1669a0119b52ad0e3b9a4b92ee10d318df2eb18
2017-10-18 15:41:14 -07:00

79 lines
2.3 KiB
Python

# Copyright 2014 OpenStack Foundation
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import threading
class MergedQueue(object):
def __init__(self):
self.queue = collections.deque()
self.lock = threading.RLock()
self.condition = threading.Condition(self.lock)
self.join_condition = threading.Condition(self.lock)
self.tasks = 0
def qsize(self):
return len(self.queue)
def empty(self):
return self.qsize() == 0
def put(self, item):
# Returns the original item if added, or an updated equivalent
# item if already enqueued.
self.condition.acquire()
ret = None
try:
for x in self.queue:
if item == x:
ret = x
if hasattr(ret, 'merge'):
ret.merge(item)
if ret is None:
ret = item
self.queue.append(item)
self.condition.notify()
finally:
self.condition.release()
return ret
def get(self):
self.condition.acquire()
try:
while True:
try:
ret = self.queue.popleft()
self.join_condition.acquire()
self.tasks += 1
self.join_condition.release()
return ret
except IndexError:
self.condition.wait()
finally:
self.condition.release()
def task_done(self):
self.join_condition.acquire()
self.tasks -= 1
self.join_condition.notify()
self.join_condition.release()
def join(self):
self.join_condition.acquire()
while self.tasks:
self.join_condition.wait()
self.join_condition.release()