system-config/docker/accessbot/accessbot.py
Jeremy Stanley 38ec695e77 Use yaml.safe_load() in accessbot
Newer versions of PyYAML have removed the ability to rely on
yaml.load() without passing an explicit Loader class. Instead,
switch to its safe_load() function which has an implicit one. The
same thing was done to our check_irc_access.py script in
I72776774eabd4e397ee5e8bdd58b3632be8fd3a1 but we never updated
accessbot similarly.

Change-Id: I32370e35c908acc3b75326a8ac5cadc25ef0760f
2021-12-30 12:42:26 +00:00

245 lines
9.3 KiB
Python
Executable File

#! /usr/bin/env python3
# Copyright 2011, 2013-2014 OpenStack Foundation
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import configparser
import argparse
import irc.client
import logging
import ssl
import sys
import time
import yaml
logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(name)s - %(message)s',
level=logging.DEBUG)
class SetAccess(irc.client.SimpleIRCClient):
log = logging.getLogger("setaccess")
def __init__(self, config, noop, nick, password, server, port):
irc.client.SimpleIRCClient.__init__(self)
self.config = config
self.nick = nick
self.password = password
self.server = server
self.port = int(port)
self.noop = noop
self.channels = [x['name'] for x in self.config['channels']]
self.current_channel = None
self.current_list = []
self.current_mode = ''
self.changes = []
self.identified = False
if self.port == 6697:
factory = irc.connection.Factory(wrapper=ssl.wrap_socket)
self.connect(self.server, self.port, self.nick,
connect_factory=factory)
else:
self.connect(self.server, self.port, self.nick)
def on_disconnect(self, connection, event):
sys.exit(0)
def on_privnotice(self, c, e):
nick = e.source.split('!')[0]
msg = e.arguments[0]
if nick == 'NickServ' and not self.identified:
if msg.startswith('authenticate yourself to services'):
self.log.debug("Identifying to nickserv")
# TODO (fungi): We should protect against sending our
# password to a false NickServ, perhaps with
# https://www.oftc.net/NickServ/CertFP/ or eventually
# SASL once the ircd implements that
c.privmsg("nickserv", "identify %s " % self.password)
return
elif msg.startswith('You are successfully identified'):
self.identified = True
# Prejoin and set ourselves as op in these channels,
# to facilitate +f forwarding.
for channel in self.config.get('op_channels', []):
c.join("#%s" % channel)
c.privmsg("chanserv", "op #%s" % channel)
self.advance()
return
else:
return
if nick not in ('ChanServ', 'NickServ'):
self.log.debug("Ignoring message from non-ChanServ "
"user %s" % nick)
return
self.failed = False
self.advance(msg)
def _get_access_list(self, channel_name):
ret = {}
alumni = []
mode = ''
level = ''
channel = None
for c in self.config['channels']:
if c['name'] == channel_name:
channel = c
if channel is None:
raise Exception("Unknown channel %s" % (channel_name,))
for key, value in (list(self.config['global'].items()) +
list(channel.items())):
if key == 'alumni':
alumni += value
continue
if key == 'mode':
mode = value
continue
# If we get this far, we assume the key is an access
# level matching an entry in the access list
level = self.config['access'].get(key)
if level is None:
# Skip if this doesn't match a defined access level
continue
for nick in value:
ret[nick] = level
return ret, alumni, mode
def _get_access_change(self, current, target):
if current != target:
return target
def _get_access_changes(self):
target, alumni, mode = self._get_access_list(
self.current_channel)
self.log.debug("Target #%s ACL: %s" % (self.current_channel, target))
all_nicks = set()
global_alumni = self.config.get('alumni', {})
global_mode = self.config.get('mode', '')
current = {}
changes = []
for nick, level, msg in self.current_list:
if nick in global_alumni or nick in alumni :
self.log.debug("%s is an alumni; removing access", nick)
changes.append('access #%s del %s' % (self.current_channel, nick))
continue
all_nicks.add(nick)
current[nick] = level
for nick in target.keys():
all_nicks.add(nick)
for nick in all_nicks:
change = self._get_access_change(current.get(nick, ''),
target.get(nick, ''))
if change:
changes.append('access #%s add %s %s' % (self.current_channel,
nick, change))
# Set the mode if what we want differs from what's already there.
# Channel mode overrides global mode.
if not mode and global_mode:
mode = global_mode
if not mode:
mode = '+'
if sorted(mode) != sorted(self.current_mode):
self.log.debug("Current mode for #%s is %s, replacing with %s" % (
self.current_channel, self.current_mode, mode))
changes.append('set #%s mlock %s' % (self.current_channel, mode))
return changes
def advance(self, msg=None):
# Some service responses include a number of embedded 0x02 bytes
if msg:
msg = msg.replace('\x02', '')
if self.changes:
if self.noop:
for change in self.changes:
self.log.info('NOOP: ' + change)
self.changes = []
else:
change = self.changes.pop()
self.log.info(change)
self.connection.privmsg('chanserv', change)
time.sleep(1)
return
if not self.current_channel:
if not self.channels:
self.connection.quit()
return
self.current_channel = self.channels.pop()
# Clear the mode string before we request it, so if we get
# no response we won't have the modes from an earlier channel
self.current_mode = ''
# Sending a set mlock with no value prompts the service to
# respond with the current mlock value so we can compare
# against it later
self.connection.privmsg('chanserv', 'set #%s mlock' %
self.current_channel)
# Clear the access list before we request it, so if we get
# no response we won't have the list from an earlier channel
self.current_list = []
self.connection.privmsg('chanserv', 'access #%s list' %
self.current_channel)
time.sleep(1)
return
# We tokenize every server message, and perform some rough
# heuristics in order to determine what kind of response we're
# dealing with and whether it's something we know how to parse
parts = msg.split()
# If the third word look like an access level, assume this is
# an access list entry and that the second word is a
# corresponding nick
if parts[2] in ('MASTER', 'CHANOP', 'MEMBER'):
self.current_list.append((parts[1], parts[2], msg))
# If the message starts with "MLOCK is SET to" then assume the
# fifth word is the channel's mode string
elif msg.startswith('MLOCK is SET to'):
self.current_mode = parts[4]
# If the message starts with "End of" then assume this marks
# the end of an access list
elif msg.startswith('End of'):
self.changes = self._get_access_changes()
self.current_channel = None
self.advance()
return
def main():
parser = argparse.ArgumentParser(description='IRC channel access check')
parser.add_argument('-c', dest='config', nargs=1,
help='specify the config file')
parser.add_argument('-l', dest='channels',
default='/etc/irc/channels.yaml',
help='path to the channel config')
parser.add_argument('--noop', dest='noop',
action='store_true',
help="Don't make any changes")
args = parser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
channels = yaml.safe_load(open(args.channels))
a = SetAccess(channels, args.noop,
config.get('ircbot', 'nick'),
config.get('ircbot', 'pass'),
config.get('ircbot', 'server'),
config.get('ircbot', 'port'))
a.start()
if __name__ == "__main__":
main()