system-config/docker/matrix-eavesdrop/src/eavesdrop/bot.py

181 lines
6.2 KiB
Python

#!/usr/bin/python3
# Copyright (C) 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import time
import json
import os
import sys
import getpass
import socket
import yaml
import logging
import datetime
logging.basicConfig(level=logging.INFO)
from nio import (
AsyncClient, AsyncClientConfig, LoginResponse,
RoomMessageText, RoomMessageNotice
)
from nio.store.database import DefaultStore
class Bot:
def __init__(self):
self.log = logging.getLogger('bot')
self.config_path = os.environ.get("MATRIX_CONFIG_FILE",
"/config/config.yaml")
self.load_config()
self.cred_path = os.path.join(
self.config['data_dir'], 'credentials.json')
self.device_name = socket.gethostname()
self.room_map = {}
async def login(self):
config = AsyncClientConfig(
store=DefaultStore,
store_sync_tokens=True)
creds = self.load_creds()
if creds:
self.log.info("Restoring previous session")
self.client = AsyncClient(self.config['homeserver'],
store_path=self.config['data_dir'],
config=config)
self.client.restore_login(
user_id=self.config['user_id'],
device_id=creds["device_id"],
access_token=creds["access_token"],
)
else:
self.log.info("Creating new session")
self.client = AsyncClient(self.config['homeserver'],
self.config['user_id'],
store_path=self.config['data_dir'],
config=config)
resp = await self.client.login(
self.config['password'], device_name=self.device_name)
if (isinstance(resp, LoginResponse)):
self.save_creds(resp.device_id, resp.access_token)
else:
self.log.error(resp)
raise Exception("Error logging in")
# Load the sync tokens
self.client.load_store()
def load_config(self):
with open(self.config_path) as f:
data = yaml.safe_load(f)
self.rooms = data['rooms']
self.config = data['config']
def save_creds(self, device_id, token):
data = {
'device_id': device_id,
'access_token': token,
}
with open(self.cred_path, 'w') as f:
json.dump(data, f)
def load_creds(self):
if os.path.exists(self.cred_path):
with open(self.cred_path) as f:
data = json.load(f)
return data
def get_room_path(self, room):
room_path = room['path']
if not room_path.startswith('/'):
room_path = os.path.join(self.config['log_dir'], room_path)
return room_path
async def join_rooms(self):
new = set()
old = set()
resp = await self.client.joined_rooms()
for room in resp.rooms:
old.add(room)
for room in self.rooms:
self.log.info("Join room %s", room['id'])
resp = await self.client.join(room['id'])
new.add(resp.room_id)
# Store the canonical room id, since the one in the config
# file may be an alias
self.room_map[resp.room_id] = room
os.makedirs(self.get_room_path(room), exist_ok=True)
for room in old-new:
self.log.info("Leave room %s", room['id'])
await self.client.room_leave(room)
async def message_callback(self, room, event):
config_room = self.room_map.get(room.room_id)
if not config_room:
return
room_name = config_room['id'].split(':')[0]
ts = datetime.datetime.utcfromtimestamp(event.server_timestamp/1000.0)
event_date = str(ts.date())
event_time = str(ts.time())[:8]
room_path = self.get_room_path(config_room)
filename = f'{room_name}.{event_date}.log'
logpath = os.path.join(room_path, filename)
body = event.body
line = f'{event_date}T{event_time} <{event.sender}> {body}\n'
self.log.info('Logging %s %s', room.room_id, line[:-1])
with open(logpath, 'a') as f:
f.write(line)
async def notice_callback(self, room, event):
config_room = self.room_map.get(room.room_id)
if not config_room:
return
room_name = config_room['id'].split(':')[0]
ts = datetime.datetime.utcfromtimestamp(event.server_timestamp/1000.0)
event_date = str(ts.date())
event_time = str(ts.time())[:8]
room_path = self.get_room_path(config_room)
filename = f'{room_name}.{event_date}.log'
logpath = os.path.join(room_path, filename)
body = event.body
line = f'{event_date}T{event_time} -{event.sender}- {body}\n'
self.log.info('Logging %s %s', room.room_id, line[:-1])
with open(logpath, 'a') as f:
f.write(line)
async def run(self):
await self.login()
await self.join_rooms()
self.client.add_event_callback(self.message_callback, RoomMessageText)
self.client.add_event_callback(self.notice_callback, RoomMessageNotice)
try:
await self.client.sync_forever(timeout=30000, full_state=True)
finally:
await self.client.close()
async def _main():
while True:
try:
bot = Bot()
await bot.run()
except Exception:
bot.log.exception("Error:")
time.sleep(10)
def main():
asyncio.get_event_loop().run_until_complete(_main())