diff --git a/doc/source/drivers/gerrit.rst b/doc/source/drivers/gerrit.rst index c57c2b8e1c..2b9a243a1c 100644 --- a/doc/source/drivers/gerrit.rst +++ b/doc/source/drivers/gerrit.rst @@ -30,7 +30,8 @@ Zuul interacts with Gerrit in up to three ways: * Reporting results Trigger events arrive over an event stream, either SSH (via the -``gerrit stream-events`` command) or a pub-sub protocol such as Kafka. +``gerrit stream-events`` command) or other protocols such as Kafka, or +AWS Kinesis. Fetching source code may happen over SSH or HTTP. @@ -247,6 +248,53 @@ are some implications for event delivery: Path to TLS CA certificate to use when connecting to a Kafka broker. +AWS Kinesis Event Support +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Zuul includes support for Gerrit's `events-aws-kinesis` plugin. This +may be used as an alternative to SSH for receiving trigger events. + +Kinesis does provide event delivery guarantees, so unlike SSH, if all +Zuul schedulers are unable to communicate with Gerrit or AWS, they +will eventually receive queued events on reconnection. + +All Zuul schedulers will attempt to connect to AWS Kinesis, but only +one scheduler will process a given Kinesis shard at a time. There are +some implications for event delivery: + +* All events will be delivered to Zuul at least once. In the case of + a disrupted connection, Zuul may receive duplicate events. + +* If a connection is disrupted longer than the Kinesis retention + period for a shard, Zuul may skip to the latest event ignoring all + previous events. + +* Because shard processing happens in parallel, events may not arrive + in order. + +* If a long period with no events elapses and a connection is + disrupted, it may take Zuul some time to catch up to the latest + events. + +.. attr:: + + .. attr:: aws_kinesis_region + :required: + + The AWS region name in which the Kinesis stream is located. + + .. attr:: aws_kinesis_stream + :default: gerrit + + The AWS Kinesis stream name. + + .. attr:: aws_kinesis_access_key + + The AWS access key to use. + + .. attr:: aws_kinesis_secret_key + + The AWS secret key to use. Trigger Configuration --------------------- diff --git a/releasenotes/notes/aws-kinesis-61a83290275e06d3.yaml b/releasenotes/notes/aws-kinesis-61a83290275e06d3.yaml new file mode 100644 index 0000000000..f356a1b4b9 --- /dev/null +++ b/releasenotes/notes/aws-kinesis-61a83290275e06d3.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + Support for using AWS Kinesis as an event source has been added to the Gerrit driver. diff --git a/requirements.txt b/requirements.txt index 918a68dd7d..d5c7362387 100644 --- a/requirements.txt +++ b/requirements.txt @@ -41,3 +41,4 @@ opentelemetry-sdk opentelemetry-exporter-otlp-proto-grpc opentelemetry-exporter-otlp-proto-http confluent-kafka +boto3 diff --git a/test-requirements.txt b/test-requirements.txt index 6ea23dc281..7ca1af3901 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -7,3 +7,4 @@ psycopg2-binary beautifulsoup4 graphene requests_mock +moto diff --git a/tests/fixtures/zuul-gerrit-awskinesis.conf b/tests/fixtures/zuul-gerrit-awskinesis.conf new file mode 100644 index 0000000000..d7c0142143 --- /dev/null +++ b/tests/fixtures/zuul-gerrit-awskinesis.conf @@ -0,0 +1,39 @@ +[statsd] +# note, use 127.0.0.1 rather than localhost to avoid getting ipv6 +# see: https://github.com/jsocol/pystatsd/issues/61 +server=127.0.0.1 + +[scheduler] +tenant_config=main.yaml + +[merger] +git_dir=/tmp/zuul-test/merger-git +git_user_email=zuul@example.com +git_user_name=zuul + +[web] +root=http://zuul.example.com + +[executor] +git_dir=/tmp/zuul-test/executor-git +load_multiplier=100 + +[connection gerrit] +driver=gerrit +server=review.example.com +user=jenkins +sshkey=fake_id_rsa_path +password=badpassword +aws_kinesis_region=us-west-2 +aws_kinesis_access_key=AK000000000000000000 +aws_kinesis_secret_key=0123456789abcdef0123456789abcdef0123456789abcdef + +[connection smtp] +driver=smtp +server=localhost +port=25 +default_from=zuul@example.com +default_to=you@example.com + +[database] +dburi=$MYSQL_FIXTURE_DBURI$ diff --git a/tests/unit/test_gerrit_awskinesis.py b/tests/unit/test_gerrit_awskinesis.py new file mode 100644 index 0000000000..6e9319bd58 --- /dev/null +++ b/tests/unit/test_gerrit_awskinesis.py @@ -0,0 +1,187 @@ +# Copyright 2023 Acme Gating, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import os +import time + +import boto3 +from moto import mock_kinesis + +import tests.base +from tests.base import ( + ZuulTestCase, + iterate_timeout, + simple_layout, +) + + +FIXTURE_DIR = os.path.join(tests.base.FIXTURE_DIR, 'gerrit') + + +def serialize(event): + return json.dumps(event).encode('utf8') + + +class TestGerritEventSourceAWSKinesis(ZuulTestCase): + config_file = 'zuul-gerrit-awskinesis.conf' + mock_kinesis = mock_kinesis() + + def setUp(self): + self.mock_kinesis.start() + + self.kinesis_client = boto3.client('kinesis', region_name='us-west-2') + self.kinesis_client.create_stream( + StreamName='gerrit', + ShardCount=4, + StreamModeDetails={ + 'StreamMode': 'ON_DEMAND' + } + ) + super().setUp() + + def tearDown(self): + self.mock_kinesis.stop() + super().tearDown() + + @simple_layout('layouts/simple.yaml') + def test_kinesis(self): + listener = self.fake_gerrit.event_thread + + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + + self.kinesis_client.put_record( + StreamName='gerrit', + Data=serialize(A.getPatchsetCreatedEvent(1)), + PartitionKey='whatever', + ) + + for _ in iterate_timeout(60, 'wait for event'): + if listener._event_count == 1: + break + time.sleep(0.2) + self.waitUntilSettled() + + self.assertHistory([ + dict(name='check-job', result='SUCCESS', changes='1,1') + ]) + self.assertEqual(A.reported, 1, "A should be reported") + + # Stop the listener + listener.stop() + listener.join() + + # Add new gerrit events while we are "offline" + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + self.kinesis_client.put_record( + StreamName='gerrit', + Data=serialize(B.getPatchsetCreatedEvent(1)), + PartitionKey='whatever', + ) + + # Restart the listener + listener.init() + listener.start() + + for _ in iterate_timeout(60, 'wait for caught up'): + if all(listener._caught_up.values()): + break + time.sleep(0.2) + self.waitUntilSettled() + + # Make sure we don't reprocess old events (change A), but do + # see new events (change B) + self.assertHistory([ + dict(name='check-job', result='SUCCESS', changes='1,1'), + dict(name='check-job', result='SUCCESS', changes='2,1'), + ]) + self.assertEqual(A.reported, 1, "A should be reported") + self.assertEqual(B.reported, 1, "B should be reported") + + @simple_layout('layouts/simple.yaml') + def test_kinesis_bad_checkpoint(self): + listener = self.fake_gerrit.event_thread + + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + + self.kinesis_client.put_record( + StreamName='gerrit', + Data=serialize(A.getPatchsetCreatedEvent(1)), + PartitionKey='whatever', + ) + + for _ in iterate_timeout(60, 'wait for event'): + if listener._event_count == 1: + break + time.sleep(0.2) + self.waitUntilSettled() + + self.assertHistory([ + dict(name='check-job', result='SUCCESS', changes='1,1') + ]) + self.assertEqual(A.reported, 1, "A should be reported") + + # Stop the listener + listener.stop() + listener.join() + + # Corrupt the checkpoint + for cp in listener.checkpoints.values(): + cp.set("nope") + + # Add new gerrit events while we are "offline" + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + self.kinesis_client.put_record( + StreamName='gerrit', + Data=serialize(B.getPatchsetCreatedEvent(1)), + PartitionKey='whatever', + ) + + # Restart the listener + listener.init() + listener.start() + + for _ in iterate_timeout(60, 'wait for caught up'): + if all(listener._caught_up.values()): + break + time.sleep(0.2) + self.waitUntilSettled() + + # Make sure we don't reprocess old events (change A), + # and also that we missed change B because of the corruption + self.assertHistory([ + dict(name='check-job', result='SUCCESS', changes='1,1'), + ]) + self.assertEqual(A.reported, 1, "A should be reported") + self.assertEqual(B.reported, 0, "B should not be reported") + + # Poke B again to make sure we get new events + self.kinesis_client.put_record( + StreamName='gerrit', + Data=serialize(B.getPatchsetCreatedEvent(1)), + PartitionKey='whatever', + ) + + for _ in iterate_timeout(60, 'wait for event'): + if listener._event_count == 2: + break + time.sleep(0.2) + self.waitUntilSettled() + + self.assertHistory([ + dict(name='check-job', result='SUCCESS', changes='1,1'), + dict(name='check-job', result='SUCCESS', changes='2,1'), + ]) + self.assertEqual(A.reported, 1, "A should be reported") + self.assertEqual(B.reported, 1, "B should be reported") diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index e3cd07f49f..b881eccb10 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -44,6 +44,9 @@ from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent from zuul.driver.gerrit.gerriteventssh import GerritSSHEventListener from zuul.driver.gerrit.gerriteventchecks import GerritChecksPoller from zuul.driver.gerrit.gerriteventkafka import GerritKafkaEventListener +from zuul.driver.gerrit.gerriteventawskinesis import ( + GerritAWSKinesisEventListener, +) from zuul.driver.git.gitwatcher import GitWatcher from zuul.lib import tracing from zuul.lib.logutil import get_annotated_logger @@ -412,6 +415,7 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): EVENT_SOURCE_NONE = 'none' EVENT_SOURCE_STREAM_EVENTS = 'stream-events' EVENT_SOURCE_KAFKA = 'kafka' + EVENT_SOURCE_KINESIS = 'kinesis' def __init__(self, driver, connection_name, connection_config): super(GerritConnection, self).__init__(driver, connection_name, @@ -441,6 +445,8 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): self.event_source = self.EVENT_SOURCE_STREAM_EVENTS if self.connection_config.get('kafka_bootstrap_servers', None): self.event_source = self.EVENT_SOURCE_KAFKA + elif self.connection_config.get('aws_kinesis_region', None): + self.event_source = self.EVENT_SOURCE_KINESIS # Thread for whatever event source we use self.event_thread = None @@ -1638,6 +1644,8 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): self.startSSHListener() elif self.event_source == self.EVENT_SOURCE_KAFKA: self.startKafkaListener() + elif self.event_source == self.EVENT_SOURCE_KINESIS: + self.startAWSKinesisListener() else: self.log.warning("No gerrit event source configured") self.startRefWatcherThread() @@ -1654,6 +1662,11 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): self.event_thread = GerritKafkaEventListener( self, self.connection_config) + def startAWSKinesisListener(self): + self.log.info("Starting AWS Kinesis consumer") + self.event_thread = GerritAWSKinesisEventListener( + self, self.connection_config) + def startPollerThread(self): if self.session is not None: self.poller_thread = self._poller_class(self) diff --git a/zuul/driver/gerrit/gerriteventawskinesis.py b/zuul/driver/gerrit/gerriteventawskinesis.py new file mode 100644 index 0000000000..c9ddbd078a --- /dev/null +++ b/zuul/driver/gerrit/gerriteventawskinesis.py @@ -0,0 +1,192 @@ +# Copyright 2023 Acme Gating, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import boto3 +import logging +import pprint +import threading +import time + +from zuul.zk.event_queues import EventReceiverElection, EventCheckpoint + +# Kinesis sort of looks like Kafka, but has some important differences: +# We poll over HTTP (of course). + +# We get a shard iterator for every shard in the Kinesis stream. That +# keeps our position in the shard as we iterate over the chunks of +# data that AWS returns. Each time we poll, we get a new shard +# iterator. However, the iterator is only valid for 5 minutes, so we +# can't use it for checkpointing. + +# The docs recommend running a thread for each shard and continuously +# polling with a 1 second delay. We're going to run an election for +# each shard and let the schedulers fight it out. + +# We can use the sequence number to checkpoint, so if we record that +# in ZK, when Zuul recovers it can start at the last sequence number. +# However, that may have an indeterminate, potentially large, series +# of empty Kinesis data chunks between it and any newer records. + + +class GerritAWSKinesisEventListener: + log = logging.getLogger("zuul.GerritConnection.awskinesis") + + def __init__(self, gerrit_connection, connection_config): + self.gerrit_connection = gerrit_connection + region = connection_config.get('aws_kinesis_region') + access_key = connection_config.get('aws_kinesis_access_key') + secret_key = connection_config.get('aws_kinesis_secret_key') + self.stream = connection_config.get('aws_kinesis_stream', 'gerrit') + args = dict( + region_name=region, + ) + if access_key: + args['aws_access_key_id'] = access_key + args['aws_secret_access_key'] = secret_key + self.client = boto3.client('kinesis', **args) + self._event_count = 0 # Only for unit tests + self._caught_up = {} # Only for unit tests + self.init() + + def init(self): + # This is in a separate method so the unit tests can restart + # the listener. + gerrit_connection = self.gerrit_connection + stream_info = self.client.describe_stream(StreamName=self.stream) + self.shard_ids = [] + self.elections = {} + self.checkpoints = {} + self._threads = [] + for shard in stream_info['StreamDescription']['Shards']: + sid = shard['ShardId'] + self.shard_ids.append(sid) + self.elections[sid] = EventReceiverElection( + gerrit_connection.sched.zk_client, + gerrit_connection.connection_name, + f"aws_kinesis_{sid}") + self.checkpoints[sid] = EventCheckpoint( + gerrit_connection.sched.zk_client, + gerrit_connection.connection_name, + f"aws_kinesis_{sid}") + self._threads.append(threading.Thread( + target=self.run, args=(sid,))) + self._caught_up[sid] = False + self._stop_event = threading.Event() + self._stopped = False + + def start(self): + for thread in self._threads: + thread.start() + + def stop(self): + self.log.debug("Stopping AWS Kineses listener") + self._stopped = True + self._stop_event.set() + + def join(self): + for thread in self._threads: + thread.join() + + def _run(self, shard_id): + self.log.info("Starting shard consumer for shard %s", + shard_id) + + # Were we caught up in the last iteration of the loop + last_caught_up = False + checkpoint = self.checkpoints[shard_id] + last_seen_sequence_no = checkpoint.get() + + # Arguments to get a shard iterator + args = dict( + StreamName=self.stream, + ShardId=shard_id, + ) + + # Determine what kind of iterator to get based on whether we + # have checkpoint data. + if last_seen_sequence_no is None: + args['ShardIteratorType'] = 'LATEST' + self.log.debug("Shard %s starting with latest event", shard_id) + else: + args['ShardIteratorType'] = 'AFTER_SEQUENCE_NUMBER' + args['StartingSequenceNumber'] = last_seen_sequence_no + self.log.debug("Shard %s starting after sequence number %s", + shard_id, last_seen_sequence_no) + + try: + response = self.client.get_shard_iterator(**args) + except Exception: + self.log.exception("Error obtaining shard %s iterator, " + "retrying from latest", + shard_id) + # Retry from latest (ignoring our checkpoint; it may be + # too old, or the user deleted the data) + args['ShardIteratorType'] = 'LATEST' + args.pop('StartingSequenceNumber', None) + + # If it fails again only asking for latest, it's fatal + response = self.client.get_shard_iterator(**args) + + shard_iterator = response['ShardIterator'] + + while not self._stopped: + # The most recently read sequence number in this batch + sequence_no = None + response = self.client.get_records( + ShardIterator=shard_iterator, + ) + shard_iterator = response['NextShardIterator'] + time_behind = response['MillisBehindLatest'] + if time_behind: + self.log.debug( + "Shard %s received %s records and is %sms behind", + shard_id, len(response['Records']), time_behind) + # We're behind, so poll a little more frequently to + # catch up faster + delay = 0.5 + last_caught_up = False + else: + if not last_caught_up: + # Only emit this log once each time we catch up. + self.log.debug( + "Shard %s received %s records and is caught up", + shard_id, len(response['Records'])) + self._caught_up[shard_id] = True + last_caught_up = True + delay = 1.0 + + for record in response['Records']: + sequence_no = record['SequenceNumber'] + data = json.loads(record['Data'].decode('utf8')) + self.log.info("Received data from kinesis: \n%s" % + pprint.pformat(data)) + self.gerrit_connection.addEvent(data) + self._event_count += 1 + + if sequence_no is not None: + self.log.debug("Shard %s setting sequence number %s", + shard_id, sequence_no) + checkpoint.set(sequence_no) + time.sleep(delay) + + def run(self, shard_id): + while not self._stopped: + try: + self.elections[shard_id].run(self._run, shard_id) + except Exception: + self.log.exception( + "Exception in AWS Kinesis consumer shard %s with %s:", + shard_id, self.gerrit_connection.connection_name) + self._stop_event.wait(5) diff --git a/zuul/zk/event_queues.py b/zuul/zk/event_queues.py index ce38cf23e3..9572e2b274 100644 --- a/zuul/zk/event_queues.py +++ b/zuul/zk/event_queues.py @@ -953,3 +953,52 @@ class NodepoolEventElection(SessionAwareElection): def __init__(self, client): self.election_root = "/zuul/nodepool/election" super().__init__(client.client, self.election_root) + + +class EventCheckpoint(ZooKeeperSimpleBase): + """Store checkpoint data for drivers that need it.""" + + log = logging.getLogger("zuul.zk.event_queues.EventCheckpoint") + + def __init__(self, client, connection_name, receiver_name): + super().__init__(client) + self.root = "/".join( + (CONNECTION_ROOT, connection_name, f"checkpoint-{receiver_name}") + ) + self.stat = None + + def get(self): + """Return the most recently stored checkpoint value or None""" + try: + data, stat = self.kazoo_client.get(self.root) + except NoNodeError: + self.stat = None + return None + + try: + data = json.loads(data.decode("utf-8")) + except Exception: + self.stat = None + return None + + self.stat = stat + return data['checkpoint'] + + def set(self, checkpoint): + """Set the checkpoint value + + If it has been updated since this object last read the value, + Kazoo will raise an exception. + + """ + + data = {'checkpoint': checkpoint} + data = json.dumps(data, sort_keys=True).encode("utf-8") + version = -1 + if self.stat: + version = self.stat.version + try: + self.stat = self.kazoo_client.set(self.root, data, version) + except NoNodeError: + path, self.stat = self.kazoo_client.create(self.root, data, + include_data=True)