-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathpythabot.py
146 lines (123 loc) · 5.44 KB
/
pythabot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import datetime
import socket
import sys
import time
from logger import get_logger
log = get_logger('pythabot')
class Pythabot:
def __init__(self, config, registry):
self.config = config
self.registry = registry
self.buffer = ""
self.debounce = False
self.debounce2 = False
self.sock = socket.socket()
def connect(self):
try:
self.sock.connect((self.config["host"],self.config["port"]))
log.info("Connected to %s", self.config["host"])
if len(self.config["pass"]) != 0:
self.sendraw(f'PASS {self.config["pass"]}')
else:
log.info("Account identification bypassed.")
self.sendraw(f"NICK {self.config['nick']}")
self.sendraw(f'USER {self.config["ident"]} {self.config["host"]} bla :{self.config["realname"]}')
log.info("Identified as %s", self.config["nick"])
except socket.error:
self.quit(f'Could not connect to port {self.config["port"]}, on {self.config["host"]}.')
def run_periodic_commands(self):
for name, attrs in self.registry.periodic_tasks.items():
if attrs['last_run'] and attrs['last_run'] + attrs['run_every'] >= time.time():
continue
else:
log.info('Running periodic task: %s', name)
self.registry.periodic_tasks[name]['last_run'] = time.time()
for msg in attrs['command'](self.config).run():
chans_to_msg = attrs['chans'] if attrs['chans'] else self.config['chans']
for chan in chans_to_msg:
self.privmsg(chan, msg)
def initparse(self, line):
#[':techboy6601!~IceChat77@unaffiliated/techboy6601','PRIVMSG','#botters-test',':yo','wuts','up']
senderline = line[0]
msg = " ".join(line[3:])
msg = msg[1:]
try:
command, args = msg.split(" ", 1)
except ValueError:
command, args = msg, ""
exapoint = senderline.find("!")
tildepoint = senderline.find("~") + 1
atpoint = senderline.find("@") + 1
parsedline = {
"sender": senderline[1:exapoint],
"ident": senderline[tildepoint:atpoint - 1],
"mask": senderline[atpoint:],
"chan": line[2] if line[2] != self.config['nick'] else senderline[1:exapoint],
"msg": msg,
"command": command,
"args": args,
"bot": self,
}
self.parse(parsedline)
def parse(self, msg):
cmd = msg["command"].lower()
command = self.registry.commands.get(cmd)
if command:
log.info('got command %s %s', cmd, msg)
if command.permission == "all" or (command.permission == "owner" and msg["mask"] == self.config["ownermask"]):
reply = command.run(msg)
send_to = msg['chan'] if not command.private_only else msg['sender']
for line in reply:
self.privmsg(send_to, line)
parsed_things = []
for parser in self.registry.parsers:
for line in parser.parse(msg):
if parser.multiline:
self.privmsg(msg['chan'], line)
else:
parsed_things.append(line)
if parsed_things:
self.privmsg(msg['chan'], ' | '.join(parsed_things))
def listen(self):
try:
while 1:
self.buffer = self.buffer + self.sock.recv(1024).decode('utf-8')
log.debug(self.buffer.strip())
if (("MOTD" in self.buffer or 'End of message of the day' in self.buffer) and not self.debounce):
if 'nickserv_pass' in self.config:
self.privmsg('NickServ', f'identify {self.config["nickserv_pass"]}')
time.sleep(10)
for chan in self.config["chans"]:
self.sendraw(f"JOIN {chan}")
log.info("Joined %s", chan)
self.debounce = True
temp = self.buffer.split("\n")
self.buffer = temp.pop()
for line in temp:
line = line.rstrip()
line = line.split(" ")
if line[1] == "433":
self.quit(f"Username {self.config['nick']} is already in use! Aborting.")
if line[0] == "PING":
self.sendraw(f"PONG {line[1]}")
log.debug(f"PONG {line[1]} {datetime.datetime.now()}")
if line[1] == "PRIVMSG":
self.initparse(line)
self.run_periodic_commands()
except socket.error:
log.error("Socket error. Reconnecting in 30s")
time.sleep(30)
self.connect()
def sendraw(self, msg):
msg += "\r\n"
self.sock.send(msg.encode('utf-8'))
def privmsg(self, send_to, msg):
log.info('PRIVMSG: %s', msg.encode('utf-8'))
fullmsg = f'PRIVMSG {send_to} :{msg}\r\n'.encode('utf-8')
self.sock.send(fullmsg)
time.sleep(.5)
def quit(self, errmsg):
log.error("%s", errmsg)
self.sendraw(f"QUIT :{self.config['quitmsg']}")
self.sock.close()
sys.exit(1)