Stop storing dependent items on buildsets
Items in a pipeline's queue form a tree (yes, pipeline queues are actually trees), and when items are deqeued, the links between them are updated, so that the item now at the root no longer points to the item that was previously ahead of it. Normally that would allow reclamation of all of the memory associated with the item. However, item buildsets have a list of the dependent items that were ahead of it at the time the buildest configuration was set. At this point, these are only used to provide meta-information to the job about the changes ahead. Rather than storing references to the items, only store rendered dictionaries with info about the changes. That will allow python to reclaim the memory (including dynamic layouts, and eventually the changes themselves) used by items as soon as they are dequeued. Change-Id: I75509a7e9dfeae2337f9f743bec4fcb02ebb903c
This commit is contained in:
parent
9f0f4408d5
commit
9e5b8116b8
|
@ -133,7 +133,7 @@ class ExecutorClient(object):
|
|||
self.gearman.shutdown()
|
||||
self.log.debug("Stopped")
|
||||
|
||||
def execute(self, job, item, pipeline, dependent_items=[],
|
||||
def execute(self, job, item, pipeline, dependent_changes=[],
|
||||
merger_items=[]):
|
||||
tenant = pipeline.layout.tenant
|
||||
uuid = str(uuid4().hex)
|
||||
|
@ -143,11 +143,7 @@ class ExecutorClient(object):
|
|||
job, uuid,
|
||||
item.current_build_set.getJobNodeSet(job.name),
|
||||
item.change,
|
||||
[x.change for x in dependent_items]))
|
||||
|
||||
dependent_items = dependent_items[:]
|
||||
dependent_items.reverse()
|
||||
all_items = dependent_items + [item]
|
||||
dependent_changes))
|
||||
|
||||
# TODOv3(jeblair): This ansible vars data structure will
|
||||
# replace the environment variables below.
|
||||
|
@ -188,25 +184,7 @@ class ExecutorClient(object):
|
|||
zuul_params['newrev'] = item.change.newrev
|
||||
zuul_params['projects'] = [] # Set below
|
||||
zuul_params['_projects'] = {} # transitional to convert to dict
|
||||
zuul_params['items'] = []
|
||||
for i in all_items:
|
||||
d = dict()
|
||||
d['project'] = dict(
|
||||
name=i.change.project.name,
|
||||
short_name=i.change.project.name.split('/')[-1],
|
||||
canonical_hostname=i.change.project.canonical_hostname,
|
||||
canonical_name=i.change.project.canonical_name,
|
||||
src_dir=os.path.join('src', i.change.project.canonical_name),
|
||||
)
|
||||
if hasattr(i.change, 'number'):
|
||||
d['change'] = str(i.change.number)
|
||||
if hasattr(i.change, 'url'):
|
||||
d['change_url'] = i.change.url
|
||||
if hasattr(i.change, 'patchset'):
|
||||
d['patchset'] = str(i.change.patchset)
|
||||
if hasattr(i.change, 'branch'):
|
||||
d['branch'] = i.change.branch
|
||||
zuul_params['items'].append(d)
|
||||
zuul_params['items'] = dependent_changes
|
||||
|
||||
params = dict()
|
||||
params['job'] = job.name
|
||||
|
@ -264,12 +242,15 @@ class ExecutorClient(object):
|
|||
job_project.override_branch))
|
||||
projects.add(project)
|
||||
required_projects.add(project)
|
||||
for i in all_items:
|
||||
if i.change.project not in projects:
|
||||
project = i.change.project
|
||||
for change in dependent_changes:
|
||||
# We have to find the project this way because it may not
|
||||
# be registered in the tenant (ie, a foreign project).
|
||||
source = self.sched.connections.getSourceByHostname(
|
||||
change['project']['canonical_hostname'])
|
||||
project = source.getProject(change['project']['name'])
|
||||
if project not in projects:
|
||||
params['projects'].append(make_project_dict(project))
|
||||
projects.add(project)
|
||||
|
||||
for p in projects:
|
||||
zuul_params['_projects'][p.canonical_name] = (dict(
|
||||
name=p.name,
|
||||
|
|
|
@ -161,3 +161,10 @@ class ConnectionRegistry(object):
|
|||
def getTrigger(self, connection_name, config=None):
|
||||
connection = self.connections[connection_name]
|
||||
return connection.driver.getTrigger(connection, config)
|
||||
|
||||
def getSourceByHostname(self, canonical_hostname):
|
||||
for connection in self.connections.values():
|
||||
if hasattr(connection, 'canonical_hostname'):
|
||||
if connection.canonical_hostname == canonical_hostname:
|
||||
return self.getSource(connection.connection_name)
|
||||
return None
|
||||
|
|
|
@ -358,10 +358,10 @@ class PipelineManager(object):
|
|||
try:
|
||||
nodeset = item.current_build_set.getJobNodeSet(job.name)
|
||||
self.sched.nodepool.useNodeSet(nodeset)
|
||||
build = self.sched.executor.execute(job, item,
|
||||
self.pipeline,
|
||||
build_set.dependent_items,
|
||||
build_set.merger_items)
|
||||
build = self.sched.executor.execute(
|
||||
job, item, self.pipeline,
|
||||
build_set.dependent_changes,
|
||||
build_set.merger_items)
|
||||
self.log.debug("Adding build %s of job %s to item %s" %
|
||||
(build, job, item))
|
||||
item.addBuild(build)
|
||||
|
|
|
@ -1265,7 +1265,7 @@ class BuildSet(object):
|
|||
self.result = None
|
||||
self.uuid = None
|
||||
self.commit = None
|
||||
self.dependent_items = None
|
||||
self.dependent_changes = None
|
||||
self.merger_items = None
|
||||
self.unable_to_merge = False
|
||||
self.config_error = None # None or an error message string.
|
||||
|
@ -1294,18 +1294,16 @@ class BuildSet(object):
|
|||
# The change isn't enqueued until after it's created
|
||||
# so we don't know what the other changes ahead will be
|
||||
# until jobs start.
|
||||
if self.dependent_items is None:
|
||||
items = []
|
||||
if not self.uuid:
|
||||
self.uuid = uuid4().hex
|
||||
if self.dependent_changes is None:
|
||||
items = [self.item]
|
||||
next_item = self.item.item_ahead
|
||||
while next_item:
|
||||
items.append(next_item)
|
||||
next_item = next_item.item_ahead
|
||||
self.dependent_items = items
|
||||
if not self.uuid:
|
||||
self.uuid = uuid4().hex
|
||||
if self.merger_items is None:
|
||||
items = [self.item] + self.dependent_items
|
||||
items.reverse()
|
||||
self.dependent_changes = [i.change.toDict() for i in items]
|
||||
self.merger_items = [i.makeMergerItem() for i in items]
|
||||
|
||||
def getStateName(self, state_num):
|
||||
|
@ -1970,6 +1968,18 @@ class Ref(object):
|
|||
oldrev=self.oldrev,
|
||||
newrev=self.newrev)
|
||||
|
||||
def toDict(self):
|
||||
# Render to a dict to use in passing json to the executor
|
||||
d = dict()
|
||||
d['project'] = dict(
|
||||
name=self.project.name,
|
||||
short_name=self.project.name.split('/')[-1],
|
||||
canonical_hostname=self.project.canonical_hostname,
|
||||
canonical_name=self.project.canonical_name,
|
||||
src_dir=os.path.join('src', self.project.canonical_name),
|
||||
)
|
||||
return d
|
||||
|
||||
|
||||
class Branch(Ref):
|
||||
"""An existing branch state for a Project."""
|
||||
|
@ -1977,6 +1987,12 @@ class Branch(Ref):
|
|||
super(Branch, self).__init__(project)
|
||||
self.branch = None
|
||||
|
||||
def toDict(self):
|
||||
# Render to a dict to use in passing json to the executor
|
||||
d = super(Branch, self).toDict()
|
||||
d['branch'] = self.branch
|
||||
return d
|
||||
|
||||
|
||||
class Tag(Ref):
|
||||
"""An existing tag state for a Project."""
|
||||
|
@ -2039,6 +2055,14 @@ class Change(Branch):
|
|||
number=self.number,
|
||||
patchset=self.patchset)
|
||||
|
||||
def toDict(self):
|
||||
# Render to a dict to use in passing json to the executor
|
||||
d = super(Change, self).toDict()
|
||||
d['change'] = str(self.number)
|
||||
d['change_url'] = self.url
|
||||
d['patchset'] = str(self.patchset)
|
||||
return d
|
||||
|
||||
|
||||
class TriggerEvent(object):
|
||||
"""Incoming event from an external system."""
|
||||
|
|
Loading…
Reference in New Issue