system-config/docker/accessbot/accessbot.py
Jeremy Stanley 258e8e8585 Retool accessbot for OFTC
A number of changes are needed to fit accessbot to OFTC's RBAC-style
permissions model and services syntax expectations. Most
importantly, access list entries now use role names for graduated
access tiers (member, chanop, master) rather than fine-grained
option flags. In order to avoid future confusion, switch variable
names and configuration keys to reflect that these are access levels
rather than masks.

While we're at it, skip setting the channel mlock if the result
would be a no-op, so that we don't unnecessarily spam the ircd with
pointless writes.

Also add a bunch of inline comments so I can more easily remember
the subtle nuances I spent a lot of time figuring out.

Change-Id: Id11598fc42672359e1abef7b70cc23100b16ab12
Depends-on: https://review.opendev.org/792843
2021-05-28 02:06:34 +00:00

245 lines
9.3 KiB
Python
Executable File

#! /usr/bin/env python3
# Copyright 2011, 2013-2014 OpenStack Foundation
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import configparser
import argparse
import irc.client
import logging
import ssl
import sys
import time
import yaml
logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(name)s - %(message)s',
level=logging.DEBUG)
class SetAccess(irc.client.SimpleIRCClient):
log = logging.getLogger("setaccess")
def __init__(self, config, noop, nick, password, server, port):
irc.client.SimpleIRCClient.__init__(self)
self.config = config
self.nick = nick
self.password = password
self.server = server
self.port = int(port)
self.noop = noop
self.channels = [x['name'] for x in self.config['channels']]
self.current_channel = None
self.current_list = []
self.current_mode = ''
self.changes = []
self.identified = False
if self.port == 6697:
factory = irc.connection.Factory(wrapper=ssl.wrap_socket)
self.connect(self.server, self.port, self.nick,
connect_factory=factory)
else:
self.connect(self.server, self.port, self.nick)
def on_disconnect(self, connection, event):
sys.exit(0)
def on_privnotice(self, c, e):
nick = e.source.split('!')[0]
msg = e.arguments[0]
if nick == 'NickServ' and not self.identified:
if msg.startswith('authenticate yourself to services'):
self.log.debug("Identifying to nickserv")
# TODO (fungi): We should protect against sending our
# password to a false NickServ, perhaps with
# https://www.oftc.net/NickServ/CertFP/ or eventually
# SASL once the ircd implements that
c.privmsg("nickserv", "identify %s " % self.password)
return
elif msg.startswith('You are successfully identified'):
self.identified = True
# Prejoin and set ourselves as op in these channels,
# to facilitate +f forwarding.
for channel in self.config.get('op_channels', []):
c.join("#%s" % channel)
c.privmsg("chanserv", "op #%s" % channel)
self.advance()
return
else:
return
if nick not in ('ChanServ', 'NickServ'):
self.log.debug("Ignoring message from non-ChanServ "
"user %s" % nick)
return
self.failed = False
self.advance(msg)
def _get_access_list(self, channel_name):
ret = {}
alumni = []
mode = ''
level = ''
channel = None
for c in self.config['channels']:
if c['name'] == channel_name:
channel = c
if channel is None:
raise Exception("Unknown channel %s" % (channel_name,))
for key, value in (list(self.config['global'].items()) +
list(channel.items())):
if key == 'alumni':
alumni += value
continue
if key == 'mode':
mode = value
continue
# If we get this far, we assume the key is an access
# level matching an entry in the access list
level = self.config['access'].get(key)
if level is None:
# Skip if this doesn't match a defined access level
continue
for nick in value:
ret[nick] = level
return ret, alumni, mode
def _get_access_change(self, current, target):
if current != target:
return target
def _get_access_changes(self):
target, alumni, mode = self._get_access_list(
self.current_channel)
self.log.debug("Target #%s ACL: %s" % (self.current_channel, target))
all_nicks = set()
global_alumni = self.config.get('alumni', {})
global_mode = self.config.get('mode', '')
current = {}
changes = []
for nick, level, msg in self.current_list:
if nick in global_alumni or nick in alumni :
self.log.debug("%s is an alumni; removing access", nick)
changes.append('access #%s del %s' % (self.current_channel, nick))
continue
all_nicks.add(nick)
current[nick] = level
for nick in target.keys():
all_nicks.add(nick)
for nick in all_nicks:
change = self._get_access_change(current.get(nick, ''),
target.get(nick, ''))
if change:
changes.append('access #%s add %s %s' % (self.current_channel,
nick, change))
# Set the mode if what we want differs from what's already there.
# Channel mode overrides global mode.
if not mode and global_mode:
mode = global_mode
if not mode:
mode = '+'
if sorted(mode) != sorted(self.current_mode):
self.log.debug("Current mode for #%s is %s, replacing with %s" % (
self.current_channel, self.current_mode, mode))
changes.append('set #%s mlock %s' % (self.current_channel, mode))
return changes
def advance(self, msg=None):
# Some service responses include a number of embedded 0x02 bytes
if msg:
msg = msg.replace('\x02', '')
if self.changes:
if self.noop:
for change in self.changes:
self.log.info('NOOP: ' + change)
self.changes = []
else:
change = self.changes.pop()
self.log.info(change)
self.connection.privmsg('chanserv', change)
time.sleep(1)
return
if not self.current_channel:
if not self.channels:
self.connection.quit()
return
self.current_channel = self.channels.pop()
# Clear the mode string before we request it, so if we get
# no response we won't have the modes from an earlier channel
self.current_mode = ''
# Sending a set mlock with no value prompts the service to
# respond with the current mlock value so we can compare
# against it later
self.connection.privmsg('chanserv', 'set #%s mlock' %
self.current_channel)
# Clear the access list before we request it, so if we get
# no response we won't have the list from an earlier channel
self.current_list = []
self.connection.privmsg('chanserv', 'access #%s list' %
self.current_channel)
time.sleep(1)
return
# We tokenize every server message, and perform some rough
# heuristics in order to determine what kind of response we're
# dealing with and whether it's something we know how to parse
parts = msg.split()
# If the third word look like an access level, assume this is
# an access list entry and that the second word is a
# corresponding nick
if parts[2] in ('MASTER', 'CHANOP', 'MEMBER'):
self.current_list.append((parts[1], parts[2], msg))
# If the message starts with "MLOCK is SET to" then assume the
# fifth word is the channel's mode string
elif msg.startswith('MLOCK is SET to'):
self.current_mode = parts[4]
# If the message starts with "End of" then assume this marks
# the end of an access list
elif msg.startswith('End of'):
self.changes = self._get_access_changes()
self.current_channel = None
self.advance()
return
def main():
parser = argparse.ArgumentParser(description='IRC channel access check')
parser.add_argument('-c', dest='config', nargs=1,
help='specify the config file')
parser.add_argument('-l', dest='channels',
default='/etc/irc/channels.yaml',
help='path to the channel config')
parser.add_argument('--noop', dest='noop',
action='store_true',
help="Don't make any changes")
args = parser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
channels = yaml.load(open(args.channels))
a = SetAccess(channels, args.noop,
config.get('ircbot', 'nick'),
config.get('ircbot', 'pass'),
config.get('ircbot', 'server'),
config.get('ircbot', 'port'))
a.start()
if __name__ == "__main__":
main()