- Use
#seen NICK
to see if a user has checked in to a particular location.
- Use
#in LOCATION
to check in somewhere, and
#out
to check out.
+ Use
+seen NICK
to see if a user has checked in to a particular location.
+ Use
+in LOCATION
to check in somewhere, and
+out
to check out.
Presence-tracking commands can also be sent privately to the bot.
(more help)
diff --git a/ptgbot/bot.py b/ptgbot/bot.py
index 4f4063f..7d7c731 100644
--- a/ptgbot/bot.py
+++ b/ptgbot/bot.py
@@ -27,6 +27,8 @@ import time
import textwrap
import ptgbot.db
+from ptgbot.usercommands import process_user_command
+
try:
import daemon.pidlockfile as pid_file_module
@@ -106,163 +108,20 @@ class PTGBot(SASL, SSL, irc.bot.SingleServerIRCBot):
"cap not enabled")
return
nick = e.source.split('!')[0]
- msg = e.arguments[0][1:]
- words = msg.split()
+ args = e.arguments[0][1:]
+ words = args.split()
if len(words) < 1:
self.log.debug("Ignoring privmsg with no content")
return
cmd = words[0].lower()
- words.pop(0)
- if cmd.startswith('#'):
+ if cmd.startswith('#') or cmd.startswith('+'):
cmd = cmd[1:]
- if cmd == 'in':
- self.check_in(nick, nick, words)
- elif cmd == 'out':
- self.check_out(nick, nick, words)
- elif cmd == 'seen':
- self.last_seen(nick, nick, words)
- elif cmd == 'subscribe':
- self.subscribe(nick, nick, msg.lstrip('#' + cmd).strip())
- elif cmd == 'unsubscribe':
- self.unsubscribe(nick, nick)
- else:
- self.send_priv_or_pub(
- nick, None, "Recognised commands: in, out, seen, subscribe")
-
- # Checks location against known tracks. If prefixed with # then
- # insists it must match a known track. If not #-prefixed but
- # matches a known track then the # prefix is added. Returns the
- # normalized location to check into, or None if a valid one was
- # not established. When matching/matched against a known track,
- # it will be lower-cased. This assumes that all registered tracks
- # are lower-case.
- def normalize_location(self, reply_to, nick, location):
- tracks = self.data.list_tracks()
-
- if location.startswith('#'):
- track = location[1:].lower()
- if track in tracks:
- return location.lower()
- else:
- self.send_priv_or_pub(
- reply_to, nick, "Unrecognised track #%s" % track)
- return None
- else:
- if location.lower() in tracks:
- return '#' + location.lower()
- else:
- # Free-form location
- return location
-
- def check_in(self, reply_to, nick, words):
- if len(words) == 0:
- self.send_priv_or_pub(
- reply_to, nick,
- "The 'in' command should be followed by a location.")
- return
-
- location = " ".join(words)
- location = self.normalize_location(reply_to, nick, location)
- if location is None:
- return
-
- self.data.check_in(nick, location)
- self.send_priv_or_pub(
- reply_to, nick,
- "OK, checked into %s - thanks for the update!" % location)
-
- def check_out(self, reply_to, nick, words):
- if len(words) > 0:
- self.send_priv_or_pub(
- reply_to, nick,
- "The 'out' command does not accept any extra parameters.")
- return
-
- last_check_in = self.data.get_last_check_in(nick)
- if last_check_in['location'] is None:
- self.send_priv_or_pub(
- reply_to, nick, "You weren't checked in anywhere yet!")
- return
-
- if last_check_in['out'] is not None:
- self.send_priv_or_pub(
- reply_to, nick,
- "You already checked out of %s at %s!" %
- (last_check_in['location'], last_check_in['out']))
- return
-
- location = self.data.check_out(nick)
- self.send_priv_or_pub(
- reply_to, nick,
- "OK, checked out of %s - thanks for the update!" % location)
-
- def last_seen(self, reply_to, nick, words):
- if len(words) != 1:
- self.send_priv_or_pub(
- reply_to, nick,
- "The 'seen' command needs a single nick argument.")
- return
-
- seen_nick = words[0]
- last_check_in = self.data.get_last_check_in(seen_nick)
-
- if last_check_in['location'] is None:
- self.send_priv_or_pub(
- reply_to, nick,
- "%s never checked in anywhere" % seen_nick)
- elif last_check_in['out'] is None:
- self.send_priv_or_pub(
- reply_to, nick,
- "%s was last seen in %s at %s" %
- (last_check_in['nick'], last_check_in['location'],
- last_check_in['in']))
- else:
- self.send_priv_or_pub(
- reply_to, nick,
- "%s checked out of %s at %s" %
- (last_check_in['nick'], last_check_in['location'],
- last_check_in['out']))
-
- def subscribe(self, reply_to, nick, new_re):
- existing_re = self.data.get_subscription(nick)
- if new_re == "":
- if existing_re is None:
- self.send_priv_or_pub(
- reply_to, nick,
- "You don't have a subscription regex set yet"
- )
- else:
- self.send_priv_or_pub(
- reply_to, nick,
- "Your current subscription regex is: " + existing_re)
- else:
- try:
- re.compile(new_re)
- except Exception as e:
- self.send_priv_or_pub(reply_to, nick, "Invalid regex: %s" % e)
- else:
- self.data.set_subscription(nick, new_re)
- self.send_priv_or_pub(
- reply_to, nick,
- "Subscription set to " + new_re +
- (" (was %s)" % existing_re if existing_re else "")
- )
-
- def unsubscribe(self, reply_to, nick):
- existing_re = self.data.get_subscription(nick)
- if existing_re is None:
- self.send_priv_or_pub(
- reply_to, nick,
- "You don't have a subscription regex set yet"
- )
- else:
- self.data.set_subscription(nick, None)
- self.send_priv_or_pub(
- reply_to, nick,
- "Cancelled subscription %s" % existing_re
- )
+ msg = process_user_command(self.data, nick, cmd, words[1:])
+ if msg:
+ self.send(nick, msg)
+ return
def is_chanop(self, nick, chan):
return self.channels[chan].is_oper(nick)
@@ -281,25 +140,22 @@ class PTGBot(SASL, SSL, irc.bot.SingleServerIRCBot):
msg = e.arguments[0][1:]
chan = e.target
+ if msg.startswith('+'):
+ words = msg.split()
+ cmd = words[0].lower()[1:]
+ ret = process_user_command(self.data, nick, cmd, words[1:])
+ if ret:
+ self.send(chan, "%s: %s" % (nick, ret))
+
if msg.startswith('#'):
words = msg.split()
cmd = words[0].lower()
- if cmd == '#in':
- self.check_in(chan, nick, words[1:])
- return
- elif cmd == '#out':
- self.check_out(chan, nick, words[1:])
- return
- elif cmd == '#seen':
- self.last_seen(chan, nick, words[1:])
- return
-
- elif cmd == '#subscribe':
- self.subscribe(chan, nick, msg.lstrip('#' + cmd).strip())
- return
- elif cmd == '#unsubscribe':
- self.unsubscribe(chan, nick)
+ if cmd in ['#in', '#out', '#seen', '#subscribe', '#unsubscribe']:
+ cmd = cmd[1:]
+ ret = process_user_command(self.data, nick, cmd, words[1:])
+ if ret:
+ self.send(chan, "%s: %s" % (nick, ret))
return
if (self.data.is_voice_required() and
@@ -493,12 +349,6 @@ class PTGBot(SASL, SSL, irc.bot.SingleServerIRCBot):
# Fortunately this is the behaviour we want.
self.send(nick, message)
- def send_priv_or_pub(self, target, nick, msg):
- if target.startswith('#') and nick is not None:
- self.send(target, "%s: %s" % (nick, msg))
- else:
- self.send(target, msg)
-
def send(self, channel, msg):
# 400 chars is an estimate of a safe line length (which can vary)
chunks = textwrap.wrap(msg, 400)
diff --git a/ptgbot/tests/test_message_process.py b/ptgbot/tests/test_message_process.py
index a8ba5dc..e09eb6f 100644
--- a/ptgbot/tests/test_message_process.py
+++ b/ptgbot/tests/test_message_process.py
@@ -271,7 +271,8 @@ class TestProcessMessage(testtools.TestCase):
'seen': "The 'seen' command needs a single nick argument.",
'seen foo bar': "The 'seen' command needs a single nick argument.",
'subscribe ***': "Invalid regex: nothing to repeat at position 0",
- 'foo': "Recognised commands: in, out, seen, subscribe",
+ 'foo': "Unknown user command. "
+ "Should be: in, out, seen, or subscribe",
}
original_db_data = copy.deepcopy(self.db.data)
with mock.patch.object(
@@ -291,7 +292,7 @@ class TestProcessMessage(testtools.TestCase):
mock_send.reset_mock()
def test_user_command_in_pubmsg(self):
- commands = ['#seen dahu']
+ commands = ['#seen dahu', '+seen dahu']
for command in commands:
msg = Event('',
'johndoe!~johndoe@openstack/member/johndoe',
diff --git a/ptgbot/usercommands.py b/ptgbot/usercommands.py
new file mode 100644
index 0000000..f51777a
--- /dev/null
+++ b/ptgbot/usercommands.py
@@ -0,0 +1,118 @@
+# Copyright 2011, 2013, 2020 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Checks location against known tracks. If prefixed with # then
+# insists it must match a known track. If not #-prefixed but
+# matches a known track then the # prefix is added. Returns the
+# normalized location to check into, or None if a valid one was
+# not established. When matching/matched against a known track,
+# it will be lower-cased. This assumes that all registered tracks
+# are lower-case.
+
+import re
+
+
+def normalize_location(tracks, location):
+ if location.startswith('#'):
+ track = location[1:].lower()
+ if track in tracks:
+ return location.lower()
+ else:
+ raise ValueError(track)
+ else:
+ if location.lower() in tracks:
+ return '#' + location.lower()
+ else:
+ # Free-form location
+ return location
+
+
+def process_user_command(db, nick, cmd, params):
+ if cmd == 'in':
+ if len(params) == 0:
+ return "The 'in' command should be followed by a location."
+
+ location = " ".join(params)
+ try:
+ location = normalize_location(db.list_tracks(), location)
+ except ValueError as e:
+ return "Unrecognised track #%s" % e
+
+ db.check_in(nick, location)
+ return "OK, checked into %s - thanks for the update!" % location
+
+ elif cmd == 'out':
+ if len(params) > 0:
+ return "The 'out' command does not accept any extra parameters."
+
+ last_check_in = db.get_last_check_in(nick)
+ if last_check_in['location'] is None:
+ return "You weren't checked in anywhere yet!"
+
+ if last_check_in['out'] is not None:
+ return ("You already checked out of %s at %s!" %
+ (last_check_in['location'], last_check_in['out']))
+
+ location = db.check_out(nick)
+ return "OK, checked out of %s - thanks for the update!" % location
+
+ elif cmd == 'seen':
+ if len(params) != 1:
+ return "The 'seen' command needs a single nick argument."
+
+ seen_nick = params[0]
+ last_check_in = db.get_last_check_in(seen_nick)
+
+ if last_check_in['location'] is None:
+ return "%s never checked in anywhere" % seen_nick
+ elif last_check_in['out'] is None:
+ return ("%s was last seen in %s at %s" % (
+ last_check_in['nick'],
+ last_check_in['location'],
+ last_check_in['in']))
+ else:
+ return ("%s checked out of %s at %s" % (
+ last_check_in['nick'],
+ last_check_in['location'],
+ last_check_in['out']))
+
+ elif cmd == 'subscribe':
+ new_re = str.join(' ', params)
+ existing_re = db.get_subscription(nick)
+ if new_re == "":
+ if existing_re is None:
+ return "You don't have a subscription regex set yet"
+ else:
+ return "Your current subscription regex is: " + existing_re
+ else:
+ try:
+ re.compile(new_re)
+ except Exception as e:
+ return "Invalid regex: %s" % e
+ else:
+ db.set_subscription(nick, new_re)
+ return ("Subscription set to " + new_re +
+ (" (was %s)" % existing_re if existing_re else ""))
+
+ elif cmd == 'unsubscribe':
+ existing_re = db.get_subscription(nick)
+ if existing_re is None:
+ return "You don't have a subscription regex set yet"
+ else:
+ db.set_subscription(nick, None)
+ return "Cancelled subscription %s" % existing_re
+
+ else:
+ return "Unknown user command. Should be: in, out, seen, or subscribe"