Merge "Add gcloud pubsub support to Gerrit driver"
This commit is contained in:
commit
bd11c4ff79
@ -30,8 +30,8 @@ Zuul interacts with Gerrit in up to three ways:
|
||||
* Reporting results
|
||||
|
||||
Trigger events arrive over an event stream, either SSH (via the
|
||||
``gerrit stream-events`` command) or other protocols such as Kafka, or
|
||||
AWS Kinesis.
|
||||
``gerrit stream-events`` command) or other protocols such as Kafka,
|
||||
AWS Kinesis, or Google Cloud Pub/Sub.
|
||||
|
||||
Fetching source code may happen over SSH or HTTP.
|
||||
|
||||
@ -311,6 +311,53 @@ some implications for event delivery:
|
||||
|
||||
The AWS secret key to use.
|
||||
|
||||
Google Cloud Pub/Sub Event Support
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Zuul includes support for Gerrit's `events-gcloud-pubsub` plugin. This may be
|
||||
used as an alternative to SSH for receiving trigger events.
|
||||
|
||||
Google Cloud Pub/Sub does provide event delivery guarantees, so unlike
|
||||
SSH, if all Zuul schedulers are unable to communicate with Gerrit or
|
||||
Google Cloud Pub/Sub, they will eventually receive queued events on
|
||||
reconnection.
|
||||
|
||||
All Zuul schedulers will attempt to connect to Google Cloud Pub/Sub.
|
||||
There are some implications for event delivery:
|
||||
|
||||
* All events will be delivered to Zuul at least once. In the case of
|
||||
a disrupted connection, Zuul may receive duplicate events.
|
||||
|
||||
* Because the `events-gcloud-pubsub` plugin does not at the time of
|
||||
this writing specify that messages are ordered, events may be
|
||||
received by Zuul out of order. Since this behavior is under the
|
||||
control of the Gerrit plugin, it may change in the future.
|
||||
|
||||
.. attr:: <gerrit gcloud pubsub connection>
|
||||
|
||||
.. attr:: gcloud_pubsub_project
|
||||
:required:
|
||||
|
||||
The Google Cloud project name to use.
|
||||
|
||||
.. attr:: gcloud_pubsub_topic
|
||||
:default: gerrit
|
||||
|
||||
The Google Cloud Pub/Sub topic to which Zuul should subscribe.
|
||||
|
||||
.. attr:: gcloud_pubsub_subscription_id
|
||||
:default: zuul
|
||||
|
||||
The ID of the Google Cloud Pub/Sub subscription to use. If the
|
||||
subscription does not exist, it will be created.
|
||||
|
||||
.. attr:: gcloud_pubsub_private_key
|
||||
|
||||
Path to a file containing the JSON encoded key of a service
|
||||
account. If not provided, then Google Cloud local auth is used.
|
||||
If Zuul is not running in the same Google Cloud project as
|
||||
Gerrit, this is required.
|
||||
|
||||
Trigger Configuration
|
||||
---------------------
|
||||
|
||||
|
5
releasenotes/notes/pubsub-d275d8f6dd0f7d6a.yaml
Normal file
5
releasenotes/notes/pubsub-d275d8f6dd0f7d6a.yaml
Normal file
@ -0,0 +1,5 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Support for using Google Cloud Pub/Sub as an event source has been
|
||||
added to the Gerrit driver.
|
@ -42,3 +42,4 @@ opentelemetry-exporter-otlp-proto-grpc
|
||||
opentelemetry-exporter-otlp-proto-http
|
||||
confluent-kafka
|
||||
boto3
|
||||
google-cloud-pubsub
|
||||
|
37
tests/fixtures/zuul-gerrit-gcloud.conf
vendored
Normal file
37
tests/fixtures/zuul-gerrit-gcloud.conf
vendored
Normal file
@ -0,0 +1,37 @@
|
||||
[statsd]
|
||||
# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
|
||||
# see: https://github.com/jsocol/pystatsd/issues/61
|
||||
server=127.0.0.1
|
||||
|
||||
[scheduler]
|
||||
tenant_config=main.yaml
|
||||
|
||||
[merger]
|
||||
git_dir=/tmp/zuul-test/merger-git
|
||||
git_user_email=zuul@example.com
|
||||
git_user_name=zuul
|
||||
|
||||
[web]
|
||||
root=http://zuul.example.com
|
||||
|
||||
[executor]
|
||||
git_dir=/tmp/zuul-test/executor-git
|
||||
load_multiplier=100
|
||||
|
||||
[connection gerrit]
|
||||
driver=gerrit
|
||||
server=review.example.com
|
||||
user=jenkins
|
||||
sshkey=fake_id_rsa_path
|
||||
password=badpassword
|
||||
gcloud_pubsub_project=testproject
|
||||
|
||||
[connection smtp]
|
||||
driver=smtp
|
||||
server=localhost
|
||||
port=25
|
||||
default_from=zuul@example.com
|
||||
default_to=you@example.com
|
||||
|
||||
[database]
|
||||
dburi=$MYSQL_FIXTURE_DBURI$
|
154
tests/unit/test_gerrit_gcloud_pubsub.py
Normal file
154
tests/unit/test_gerrit_gcloud_pubsub.py
Normal file
@ -0,0 +1,154 @@
|
||||
# Copyright 2023 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import fixtures
|
||||
import json
|
||||
import os
|
||||
import queue
|
||||
import time
|
||||
import threading
|
||||
|
||||
import google.api_core.exceptions
|
||||
|
||||
import tests.base
|
||||
from tests.base import (
|
||||
ZuulTestCase,
|
||||
iterate_timeout,
|
||||
simple_layout,
|
||||
)
|
||||
|
||||
|
||||
FIXTURE_DIR = os.path.join(tests.base.FIXTURE_DIR, 'gerrit')
|
||||
|
||||
|
||||
class FakePubsubMessage:
|
||||
def __init__(self, data):
|
||||
self.data = data
|
||||
self._acked = False
|
||||
|
||||
def ack(self):
|
||||
self._acked = True
|
||||
|
||||
|
||||
class FakePubsubFuture:
|
||||
def __init__(self):
|
||||
self.event = threading.Event()
|
||||
self._error = None
|
||||
|
||||
def result(self):
|
||||
self.event.wait()
|
||||
if self._error:
|
||||
raise self._error
|
||||
|
||||
def cancel(self, error=None):
|
||||
self._error = error
|
||||
self.event.set()
|
||||
|
||||
|
||||
class FakePubsubSubscriber:
|
||||
def __init__(self, credentials=None):
|
||||
self.credentials = credentials
|
||||
self._queue = queue.Queue()
|
||||
self._closed = False
|
||||
self._entered = False
|
||||
|
||||
def put(self, data):
|
||||
self._queue.put(data)
|
||||
|
||||
def create_subscription(self, name=None, topic=None):
|
||||
if not self._entered:
|
||||
raise Exception("Attempt to use subscriber "
|
||||
"outside of context manager")
|
||||
self._name = name
|
||||
self._topic = topic
|
||||
raise google.api_core.exceptions.AlreadyExists("Exists")
|
||||
|
||||
def subscribe(self, name, callback):
|
||||
if self._closed:
|
||||
raise Exception("Attempt to use closed subscriber")
|
||||
if not self._entered:
|
||||
raise Exception("Attempt to use subscriber "
|
||||
"outside of context manager")
|
||||
assert self._name == name
|
||||
self._callback = callback
|
||||
self._future = FakePubsubFuture()
|
||||
self._thread = threading.Thread(target=self._run)
|
||||
self._thread.start()
|
||||
return self._future
|
||||
|
||||
def _run(self):
|
||||
while not self._closed:
|
||||
data = self._queue.get()
|
||||
self._queue.task_done()
|
||||
if data is None:
|
||||
continue
|
||||
msg = FakePubsubMessage(data)
|
||||
self._callback(msg)
|
||||
|
||||
def __enter__(self):
|
||||
self._entered = True
|
||||
return self
|
||||
|
||||
def __exit__(self, *args, **kw):
|
||||
self._closed = True
|
||||
self._queue.put(None)
|
||||
|
||||
|
||||
def serialize(event):
|
||||
return json.dumps(event)
|
||||
|
||||
|
||||
class TestGerritEventSourceGcloudPubsub(ZuulTestCase):
|
||||
config_file = 'zuul-gerrit-gcloud.conf'
|
||||
|
||||
def setUp(self):
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'zuul.driver.gerrit.gerriteventgcloudpubsub.'
|
||||
'pubsub_v1.SubscriberClient',
|
||||
FakePubsubSubscriber))
|
||||
super().setUp()
|
||||
|
||||
@simple_layout('layouts/simple.yaml')
|
||||
def test_gcloud_pubsub(self):
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
subscriber = self.fake_gerrit.event_thread.subscriber
|
||||
self.fake_gerrit.event_thread.RECONNECTION_DELAY = 1
|
||||
|
||||
# Assert we passed the required config entries
|
||||
self.assertEqual('projects/testproject/subscriptions/zuul',
|
||||
subscriber._name)
|
||||
self.assertEqual('projects/testproject/topics/gerrit',
|
||||
subscriber._topic)
|
||||
|
||||
# Exercise reconnection
|
||||
err = Exception("Test error")
|
||||
subscriber._future.cancel(err)
|
||||
|
||||
for _ in iterate_timeout(60, 'wait for reconnect'):
|
||||
if subscriber is not self.fake_gerrit.event_thread.subscriber:
|
||||
break
|
||||
time.sleep(0.2)
|
||||
|
||||
subscriber = self.fake_gerrit.event_thread.subscriber
|
||||
self.additional_event_queues.append(subscriber._queue)
|
||||
|
||||
subscriber.put(serialize(A.getPatchsetCreatedEvent(1)))
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertHistory([
|
||||
dict(name='check-job', result='SUCCESS', changes='1,1')
|
||||
])
|
||||
self.assertEqual(A.reported, 1, "A should be reported")
|
||||
|
||||
self.assertTrue(subscriber._queue.empty())
|
@ -48,6 +48,9 @@ from zuul.driver.gerrit.gerriteventkafka import GerritKafkaEventListener
|
||||
from zuul.driver.gerrit.gerriteventawskinesis import (
|
||||
GerritAWSKinesisEventListener,
|
||||
)
|
||||
from zuul.driver.gerrit.gerriteventgcloudpubsub import (
|
||||
GerritGcloudPubsubEventListener,
|
||||
)
|
||||
from zuul.driver.git.gitwatcher import GitWatcher
|
||||
from zuul.lib import tracing
|
||||
from zuul.lib.logutil import get_annotated_logger
|
||||
@ -433,6 +436,7 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
||||
EVENT_SOURCE_STREAM_EVENTS = 'stream-events'
|
||||
EVENT_SOURCE_KAFKA = 'kafka'
|
||||
EVENT_SOURCE_KINESIS = 'kinesis'
|
||||
EVENT_SOURCE_GCLOUD_PUBSUB = 'gcloudpubsub'
|
||||
|
||||
def __init__(self, driver, connection_name, connection_config):
|
||||
super(GerritConnection, self).__init__(driver, connection_name,
|
||||
@ -468,6 +472,8 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
||||
self.event_source = self.EVENT_SOURCE_KAFKA
|
||||
elif self.connection_config.get('aws_kinesis_region', None):
|
||||
self.event_source = self.EVENT_SOURCE_KINESIS
|
||||
elif self.connection_config.get('gcloud_pubsub_project', None):
|
||||
self.event_source = self.EVENT_SOURCE_GCLOUD_PUBSUB
|
||||
|
||||
# Thread for whatever event source we use
|
||||
self.event_thread = None
|
||||
@ -1694,6 +1700,8 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
||||
self.startKafkaListener()
|
||||
elif self.event_source == self.EVENT_SOURCE_KINESIS:
|
||||
self.startAWSKinesisListener()
|
||||
elif self.event_source == self.EVENT_SOURCE_GCLOUD_PUBSUB:
|
||||
self.startGcloudPubsubListener()
|
||||
else:
|
||||
self.log.warning("No gerrit event source configured")
|
||||
self.startRefWatcherThread()
|
||||
@ -1715,6 +1723,11 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
||||
self.event_thread = GerritAWSKinesisEventListener(
|
||||
self, self.connection_config)
|
||||
|
||||
def startGcloudPubsubListener(self):
|
||||
self.log.info("Starting gcloud pubsub consumer")
|
||||
self.event_thread = GerritGcloudPubsubEventListener(
|
||||
self, self.connection_config)
|
||||
|
||||
def startPollerThread(self):
|
||||
if self.session is not None:
|
||||
self.poller_thread = self._poller_class(self)
|
||||
|
95
zuul/driver/gerrit/gerriteventgcloudpubsub.py
Normal file
95
zuul/driver/gerrit/gerriteventgcloudpubsub.py
Normal file
@ -0,0 +1,95 @@
|
||||
# Copyright 2023 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import google.api_core.exceptions
|
||||
from google.oauth2 import service_account
|
||||
from google.cloud import pubsub_v1
|
||||
import logging
|
||||
import pprint
|
||||
import threading
|
||||
|
||||
|
||||
class GerritGcloudPubsubEventListener:
|
||||
log = logging.getLogger("zuul.GerritConnection.gcloudpubsub")
|
||||
RECONNECTION_DELAY = 5
|
||||
|
||||
def __init__(self, gerrit_connection, connection_config):
|
||||
self.gerrit_connection = gerrit_connection
|
||||
project = connection_config.get('gcloud_pubsub_project')
|
||||
topic = connection_config.get('gcloud_pubsub_topic', 'gerrit')
|
||||
sub = connection_config.get('gcloud_pubsub_subscription_id', 'zuul')
|
||||
key = connection_config.get('gcloud_pubsub_private_key')
|
||||
self.kwargs = {}
|
||||
if key:
|
||||
with open(key) as keyfile:
|
||||
info = json.load(keyfile)
|
||||
credentials = service_account.Credentials.\
|
||||
from_service_account_info(info)
|
||||
self.kwargs['credentials'] = credentials
|
||||
self.topic_name = f'projects/{project}/topics/{topic}'
|
||||
self.subscription_name = f'projects/{project}/subscriptions/{sub}'
|
||||
self._stop_event = threading.Event()
|
||||
self._stopped = False
|
||||
self._thread = None
|
||||
self._future = None
|
||||
|
||||
def start(self):
|
||||
self._thread = threading.Thread(target=self.run)
|
||||
self._thread.start()
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping gcloud pubsub listener")
|
||||
self._stopped = True
|
||||
self._stop_event.set()
|
||||
try:
|
||||
if self._future:
|
||||
self._future.cancel()
|
||||
except Exception:
|
||||
self.log.exception("Error canceling future:")
|
||||
|
||||
def join(self):
|
||||
if self._thread:
|
||||
self._thread.join()
|
||||
|
||||
def callback(self, message):
|
||||
data = json.loads(message.data)
|
||||
self.log.info("Received data from gcloud: \n%s" %
|
||||
pprint.pformat(data))
|
||||
self.gerrit_connection.addEvent(data)
|
||||
message.ack()
|
||||
|
||||
def _run(self):
|
||||
subscriber = pubsub_v1.SubscriberClient(**self.kwargs)
|
||||
# So the unit tests can access it
|
||||
self.subscriber = subscriber
|
||||
with subscriber as client:
|
||||
try:
|
||||
client.create_subscription(name=self.subscription_name,
|
||||
topic=self.topic_name)
|
||||
except google.api_core.exceptions.AlreadyExists:
|
||||
pass
|
||||
self._future = client.subscribe(
|
||||
self.subscription_name, self.callback)
|
||||
self._future.result()
|
||||
|
||||
def run(self):
|
||||
while not self._stopped:
|
||||
try:
|
||||
self._run()
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Exception in gcloud pubsub consumer with %s:",
|
||||
self.gerrit_connection.connection_name)
|
||||
self._stop_event.wait(self.RECONNECTION_DELAY)
|
Loading…
Reference in New Issue
Block a user