zuul/zuul/merger/server.py
Clark Boylan 0f7982fee0 Clean up stale git index.lock files on merger startup
We've noticed that if zuul executors (and presumably mergers) don't shut
down gracefully that they may leak git index.lock files in the .git dirs
of the merger repos. Since these repos should be dedicated to zuul's use
without outside interference we can reasonably safely remove any present
index.lock files when starting zuul mergers (and executors).

This implementation does an os.walk under the merger repos root looking
for .git dirs and once it has found them checks for any index.lock
files. This happens before starting the gearman worker which should
avoid any races with these resources.

Change-Id: Ie043453bcdf4500a3718da6f705c882431acafdf
2020-09-17 15:19:16 -07:00

282 lines
9.9 KiB
Python

# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import os
import threading
from abc import ABCMeta
from zuul.lib import commandsocket
from zuul.lib.config import get_default
from zuul.lib.gearworker import ZuulGearWorker
from zuul.merger import merger
from zuul.merger.merger import nullcontext
COMMANDS = ['stop', 'pause', 'unpause']
class BaseRepoLocks(metaclass=ABCMeta):
def getRepoLock(self, connection_name, project_name):
return nullcontext()
class RepoLocks(BaseRepoLocks):
def __init__(self):
self.locks = {}
def getRepoLock(self, connection_name, project_name):
key = '%s:%s' % (connection_name, project_name)
self.locks.setdefault(key, threading.Lock())
return self.locks[key]
class BaseMergeServer(metaclass=ABCMeta):
log = logging.getLogger("zuul.BaseMergeServer")
_repo_locks_class = BaseRepoLocks
def __init__(self, config, component, connections=None):
self.connections = connections or {}
self.merge_email = get_default(config, 'merger', 'git_user_email',
'zuul.merger.default@example.com')
self.merge_name = get_default(config, 'merger', 'git_user_name',
'Zuul Merger Default')
self.merge_speed_limit = get_default(
config, 'merger', 'git_http_low_speed_limit', '1000')
self.merge_speed_time = get_default(
config, 'merger', 'git_http_low_speed_time', '30')
self.git_timeout = get_default(config, 'merger', 'git_timeout', 300)
self.merge_root = get_default(config, component, 'git_dir',
'/var/lib/zuul/{}-git'.format(component))
# This merger and its git repos are used to maintain
# up-to-date copies of all the repos that are used by jobs, as
# well as to support the merger:cat functon to supply
# configuration information to Zuul when it starts.
self.merger = self._getMerger(self.merge_root, None)
self.config = config
# Repo locking is needed on the executor
self.repo_locks = self._repo_locks_class()
self.merger_jobs = {
'merger:merge': self.merge,
'merger:cat': self.cat,
'merger:refstate': self.refstate,
'merger:fileschanges': self.fileschanges,
}
self.merger_gearworker = ZuulGearWorker(
'Zuul Merger',
'zuul.BaseMergeServer',
'merger-gearman-worker',
self.config,
self.merger_jobs)
def _getMerger(self, root, cache_root, logger=None):
return merger.Merger(
root, self.connections, self.merge_email, self.merge_name,
self.merge_speed_limit, self.merge_speed_time, cache_root, logger,
execution_context=True, git_timeout=self.git_timeout)
def _repoLock(self, connection_name, project_name):
# The merger does not need locking so return a null lock.
return nullcontext()
def _update(self, connection_name, project_name, zuul_event_id=None):
self.merger.updateRepo(connection_name, project_name,
zuul_event_id=zuul_event_id)
def start(self):
self.log.debug('Starting merger worker')
self.log.debug('Cleaning any stale git index.lock files')
for (dirpath, dirnames, filenames) in os.walk(self.merge_root):
if '.git' in dirnames:
# Only recurse into .git dirs
dirnames.clear()
dirnames.append('.git')
elif dirpath.endswith('/.git'):
# Recurse no further
dirnames.clear()
if 'index.lock' in filenames:
fp = os.path.join(dirpath, 'index.lock')
try:
os.unlink(fp)
self.log.debug('Removed stale git lock: %s' % fp)
except Exception:
self.log.exception(
'Unable to remove stale git lock: '
'%s this may result in failed merges' % fp)
self.merger_gearworker.start()
def stop(self):
self.log.debug('Stopping merger worker')
self.merger_gearworker.stop()
def join(self):
self.merger_gearworker.join()
def pause(self):
self.log.debug('Pausing merger worker')
self.merger_gearworker.unregister()
def unpause(self):
self.log.debug('Resuming merger worker')
self.merger_gearworker.register()
def cat(self, job):
self.log.debug("Got cat job: %s" % job.unique)
args = json.loads(job.arguments)
connection_name = args['connection']
project_name = args['project']
self._update(connection_name, project_name)
lock = self.repo_locks.getRepoLock(connection_name, project_name)
try:
self._update(connection_name, project_name)
with lock:
files = self.merger.getFiles(connection_name, project_name,
args['branch'], args['files'],
args.get('dirs'))
except Exception:
result = dict(update=False)
else:
result = dict(updated=True, files=files)
job.sendWorkComplete(json.dumps(result))
def merge(self, job):
self.log.debug("Got merge job: %s" % job.unique)
args = json.loads(job.arguments)
zuul_event_id = args.get('zuul_event_id')
ret = self.merger.mergeChanges(
args['items'], args.get('files'),
args.get('dirs', []),
args.get('repo_state'),
branches=args.get('branches'),
repo_locks=self.repo_locks,
zuul_event_id=zuul_event_id)
result = dict(merged=(ret is not None))
if ret is None:
result['commit'] = result['files'] = result['repo_state'] = None
else:
(result['commit'], result['files'], result['repo_state'],
recent, orig_commit) = ret
result['zuul_event_id'] = zuul_event_id
job.sendWorkComplete(json.dumps(result))
def refstate(self, job):
self.log.debug("Got refstate job: %s" % job.unique)
args = json.loads(job.arguments)
zuul_event_id = args.get('zuul_event_id')
success, repo_state, item_in_branches = \
self.merger.getRepoState(
args['items'], branches=args.get('branches'),
repo_locks=self.repo_locks)
result = dict(updated=success,
repo_state=repo_state,
item_in_branches=item_in_branches)
result['zuul_event_id'] = zuul_event_id
job.sendWorkComplete(json.dumps(result))
def fileschanges(self, job):
self.log.debug("Got fileschanges job: %s" % job.unique)
args = json.loads(job.arguments)
zuul_event_id = args.get('zuul_event_id')
connection_name = args['connection']
project_name = args['project']
self._update(connection_name, project_name,
zuul_event_id=zuul_event_id)
lock = self.repo_locks.getRepoLock(connection_name, project_name)
try:
self._update(connection_name, project_name,
zuul_event_id=zuul_event_id)
with lock:
files = self.merger.getFilesChanges(
connection_name, project_name,
args['branch'], args['tosha'],
zuul_event_id=zuul_event_id)
except Exception:
result = dict(update=False)
else:
result = dict(updated=True, files=files)
result['zuul_event_id'] = zuul_event_id
job.sendWorkComplete(json.dumps(result))
class MergeServer(BaseMergeServer):
log = logging.getLogger("zuul.MergeServer")
def __init__(self, config, connections=None):
super().__init__(config, 'merger', connections)
self.command_map = dict(
stop=self.stop,
pause=self.pause,
unpause=self.unpause,
)
command_socket = get_default(
self.config, 'merger', 'command_socket',
'/var/lib/zuul/merger.socket')
self.command_socket = commandsocket.CommandSocket(command_socket)
self._command_running = False
def start(self):
super().start()
self._command_running = True
self.log.debug("Starting command processor")
self.command_socket.start()
self.command_thread = threading.Thread(
target=self.runCommand, name='command')
self.command_thread.daemon = True
self.command_thread.start()
def stop(self):
self.log.debug("Stopping")
super().stop()
self._command_running = False
self.command_socket.stop()
self.log.debug("Stopped")
def join(self):
super().join()
def pause(self):
self.log.debug('Pausing')
super().pause()
def unpause(self):
self.log.debug('Resuming')
super().unpause()
def runCommand(self):
while self._command_running:
try:
command = self.command_socket.get().decode('utf8')
if command != '_stop':
self.command_map[command]()
except Exception:
self.log.exception("Exception while processing command")