# Copyright 2015 Hewlett-Packard Development Company, L.P. # Copyright 2017 IBM Corp. # Copyright 2017 Red Hat, Inc. # Copyright 2023 Acme Gating, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import copy import re import re2 import time from zuul.model import Change, TriggerEvent, EventFilter, RefFilter from zuul.model import FalseWithReason from zuul.driver.util import time_to_seconds, to_list EMPTY_GIT_REF = '0' * 40 # git sha of all zeros, used during creates/deletes class PullRequest(Change): def __init__(self, project): super(PullRequest, self).__init__(project) self.pr = None self.updated_at = None self.title = None self.body_text = None self.reviews = [] self.files = [] self.labels = [] self.draft = None self.mergeable = True self.review_decision = None self.required_contexts = set() self.contexts = set() self.branch_protected = False @property def status(self): return ["{}:{}:{}".format(*c) for c in self.contexts] @property def successful_contexts(self) -> set: if not self.contexts: return set() return set( s[1] for s in self.contexts if s[2] == 'success' ) def isUpdateOf(self, other): if (self.project == other.project and hasattr(other, 'number') and self.number == other.number and hasattr(other, 'patchset') and self.patchset != other.patchset and hasattr(other, 'updated_at') and self.updated_at > other.updated_at): return True return False def serialize(self): d = super().serialize() d.update({ "pr": self.pr, "updated_at": self.updated_at, "title": self.title, "body_text": self.body_text, "reviews": list(self.reviews), "labels": self.labels, "draft": self.draft, "mergeable": self.mergeable, "review_decision": self.review_decision, "required_contexts": list(self.required_contexts), "contexts": list(self.contexts), "branch_protected": self.branch_protected, }) return d def deserialize(self, data): super().deserialize(data) self.pr = data.get("pr") self.updated_at = data.get("updated_at") self.title = data.get("title") self.body_text = data.get("body_text") self.reviews = data.get("reviews", []) self.labels = data.get("labels", []) self.draft = data.get("draft") self.mergeable = data.get("mergeable", True) self.review_decision = data.get("review_decision") self.required_contexts = set(data.get("required_contexts", [])) self.contexts = set(tuple(c) for c in data.get("contexts", [])) self.branch_protected = data.get("branch_protected", False) class GithubTriggerEvent(TriggerEvent): def __init__(self): super(GithubTriggerEvent, self).__init__() self.title = None self.label = None self.unlabel = None self.action = None self.delivery = None self.check_run = None self.status = None self.commits = [] self.body_edited = None self.branch_protection_changed = None self.default_branch_changed = None def toDict(self): d = super().toDict() d["title"] = self.title d["label"] = self.label d["unlabel"] = self.unlabel d["action"] = self.action d["delivery"] = self.delivery d["check_run"] = self.check_run d["status"] = self.status d["commits"] = self.commits d["body_edited"] = self.body_edited d["branch_protection_changed"] = self.branch_protection_changed d["default_branch_changed"] = self.default_branch_changed return d def updateFromDict(self, d): super().updateFromDict(d) self.title = d["title"] self.label = d["label"] self.unlabel = d["unlabel"] self.action = d["action"] self.delivery = d["delivery"] self.check_run = d["check_run"] self.status = d["status"] self.commits = d["commits"] self.body_edited = d["body_edited"] self.branch_protection_changed = d.get("branch_protection_changed") self.default_branch_changed = d.get("default_branch_changed") def isBranchProtectionChanged(self): return bool(self.branch_protection_changed) def isDefaultBranchChanged(self): return bool(self.default_branch_changed) def isPatchsetCreated(self): if self.type == 'pull_request': return self.action in ['opened', 'changed'] return False def isMessageChanged(self): return bool(self.body_edited) def isChangeAbandoned(self): if self.type == 'pull_request': return 'closed' == self.action return False def _repr(self): r = [super(GithubTriggerEvent, self)._repr()] if self.action: r.append(self.action) r.append(self.canonical_project_name) if self.change_number: r.append('%s,%s' % (self.change_number, self.patch_number)) if self.delivery: r.append('delivery: %s' % self.delivery) if self.check_run: r.append('check_run: %s' % self.check_run) return ' '.join(r) class GithubEventFilter(EventFilter): def __init__(self, connection_name, trigger, types=[], branches=[], refs=[], comments=[], actions=[], labels=[], unlabels=[], states=[], statuses=[], required_statuses=[], check_runs=[], ignore_deletes=True, require=None, reject=None): EventFilter.__init__(self, connection_name, trigger) # TODO: Backwards compat, remove after 9.x: if required_statuses and require is None: require = {'status': required_statuses} if require: self.require_filter = GithubRefFilter.requiresFromConfig( connection_name, require) else: self.require_filter = None if reject: self.reject_filter = GithubRefFilter.rejectFromConfig( connection_name, reject) else: self.reject_filter = None self._types = [x.pattern for x in types] self._branches = [x.pattern for x in branches] self._refs = [x.pattern for x in refs] self._comments = [x.pattern for x in comments] self.types = types self.branches = branches self.refs = refs self.comments = comments self.actions = actions self.labels = labels self.unlabels = unlabels self.states = states self.statuses = statuses self.check_runs = check_runs self.ignore_deletes = ignore_deletes def __repr__(self): ret = '= t): return False elif k == 'type': if review['type'] != v: return False elif k == 'permission': # If permission is read, we've matched. You must have read # to provide a review. if v != 'read': # Admins have implicit write. if v == 'write': if review['permission'] not in ('write', 'admin'): return False elif v == 'admin': if review['permission'] != 'admin': return False return True def matchesReviews(self, change): if self.required_reviews or self.reject_reviews: if not hasattr(change, 'number'): # not a PR, no reviews return FalseWithReason("Change is not a PR") if self.required_reviews and not change.reviews: # No reviews means no matching of required bits # having reject reviews but no reviews on the change is okay return FalseWithReason("Reviews %s do not match %s" % ( self.required_reviews, change.reviews)) return (self.matchesRequiredReviews(change) and self.matchesNoRejectReviews(change)) def matchesRequiredReviews(self, change): for rreview in self.required_reviews: matches_review = False for review in change.reviews: if self._match_review_required_review(rreview, review): # Consider matched if any review matches matches_review = True break if not matches_review: return FalseWithReason( "Required reviews %s do not match %s" % ( self.required_reviews, change.reviews)) return True def matchesNoRejectReviews(self, change): for rreview in self.reject_reviews: for review in change.reviews: if self._match_review_required_review(rreview, review): # A review matched, we can reject right away return FalseWithReason("Reject reviews %s match %s" % ( self.reject_reviews, change.reviews)) return True def matchesStatuses(self, change): if self.required_statuses or self.reject_statuses: if not hasattr(change, 'number'): # not a PR, no status return FalseWithReason("Can not match statuses without PR") if self.required_statuses and not change.status: return FalseWithReason( "Required statuses %s do not match %s" % ( self.required_statuses, change.status)) required_statuses_results = self.matchesRequiredStatuses(change) if not required_statuses_results: return required_statuses_results return self.matchesNoRejectStatuses(change) def matchesRequiredStatuses(self, change): # statuses are ORed # A PR head can have multiple statuses on it. If the change # statuses and the filter statuses are a null intersection, there # are no matches and we return false if self.required_statuses: for required_status in self.required_statuses: for status in change.status: # TODO: construct as ZuulRegex in initializer when # re2 migration is complete. if re2.fullmatch(required_status, status): return True return FalseWithReason("Required statuses %s do not match %s" % ( self.required_statuses, change.status)) return True def matchesNoRejectStatuses(self, change): # statuses are ANDed # If any of the rejected statusses are present, we return false for rstatus in self.reject_statuses: for status in change.status: # TODO: construct as ZuulRegex in initializer when re2 # migration is complete. if re2.fullmatch(rstatus, status): return FalseWithReason("Reject statuses %s match %s" % ( self.reject_statuses, change.status)) return True def matchesLabels(self, change): if self.required_labels or self.reject_labels: if not hasattr(change, 'number'): # not a PR, no label return FalseWithReason("Can not match labels without PR") if self.required_labels and not change.labels: # No labels means no matching of required bits # having reject labels but no labels on the change is okay return FalseWithReason( "Required labels %s does not match %s" % ( self.required_labels, change.labels)) return (self.matchesRequiredLabels(change) and self.matchesNoRejectLabels(change)) def matchesRequiredLabels(self, change): for label in self.required_labels: if label not in change.labels: return FalseWithReason("Labels %s do not match %s" % ( self.required_labels, change.labels)) return True def matchesNoRejectLabels(self, change): for label in self.reject_labels: if label in change.labels: return FalseWithReason("Reject labels %s match %s" % ( self.reject_labels, change.labels)) return True def matches(self, change): statuses_result = self.matchesStatuses(change) if not statuses_result: return statuses_result reviews_result = self.matchesReviews(change) if not reviews_result: return reviews_result labels_result = self.matchesLabels(change) if not labels_result: return labels_result if self.open is not None: # if a "change" has no number, it's not a change, but a push # and cannot possibly pass this test. if hasattr(change, 'number'): if self.open != change.open: return FalseWithReason( "Change does not match open requirement") else: return FalseWithReason("Change is not a PR") if self.merged is not None: # if a "change" has no number, it's not a change, but a push # and cannot possibly pass this test. if hasattr(change, 'number'): if self.merged != change.is_merged: return FalseWithReason( "Change does not match merged requirement") else: return FalseWithReason("Change is not a PR") if self.current_patchset is not None: # if a "change" has no number, it's not a change, but a push # and cannot possibly pass this test. if hasattr(change, 'number'): if self.current_patchset != change.is_current_patchset: return FalseWithReason( "Change does not match current-patchset requirement") else: return FalseWithReason("Change is not a PR") if self.draft is not None: # if a "change" has no number, it's not a change, but a push # and cannot possibly pass this test. if hasattr(change, 'number'): if self.draft != change.draft: return FalseWithReason( "Change does not match draft requirement") else: return FalseWithReason("Change is not a PR") return True