-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgsm_module.py
140 lines (100 loc) · 3.58 KB
/
gsm_module.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/env python3
import serial
from datetime import datetime
import config
import db
from models import MessageToSend, ReceivedMessage
port = serial.Serial(config.serial_port, 9600, timeout=4)
if (not port.isOpen()):
port.open()
def send_at_command(command, append_eol=True):
command = command + "\r\n" if append_eol else command
port.write(command.encode())
return list(map(lambda elem: elem.decode("utf-8"), port.readlines()))
def init(pin=None):
while True:
result = send_at_command("ATI")
if len(result) > 0 and result[-1] == "OK\r\n":
break
if (not enter_pin(pin)):
raise Error("PIN authentification has failed!")
# switch to text mode so commands look nicer
send_at_command("AT+CMGF=1")
# store received sms on sim card
# i.e. disable cnmi notifications and set storage
# of newly arrived messages to gsm module memory
send_at_command("AT+CNMI=0,0,0,0,0")
send_at_command("AT+CPMS=\"ME\",\"ME\",\"ME\"")
print("GSM module initialized!")
def enter_pin(pin=None):
pin_status = send_at_command("AT+CPIN?")[2]
if pin_status == "+CPIN:READY\r\n":
return True
elif pin_status == "+CPIN:SIM PIN\r\n":
auth_result = send_at_command("AT+CPIN=\"" + pin + "\"")
return auth_result[2] == "OK\r\n"
else:
return False
def send_sms_message(phone_number, text):
assert phone_number.startswith("+421")
command_sequence = [
"AT+CMGF=1",
"AT+CMGS=" + phone_number,
text
]
for command in command_sequence:
send_at_command(command)
result = send_at_command(chr(26), False)
print(result)
def get_sms_messages(category="ALL"):
assert category in [
"ALL", "REC READ", "REC UNREAD", "STO UNSENT", "STO SENT"
]
result = []
response_raw = send_at_command("AT+CMGL=" + category)
print(response_raw)
sms_list_raw = response_raw[2:-2]
# the odd elements are sms metadata, the even ones are sms texts
sms_pairs = zip(sms_list_raw[0::2], sms_list_raw[1::2])
for sms_meta, sms_text in sms_pairs:
result.append(parse_sms(sms_meta, sms_text))
print(result)
return result
def delete_all_sms_messages():
sms_messages_to_delete = get_sms_messages("ALL")
for sms_message in sms_messages_to_delete:
delete_sms_message(sms_message["index"])
def delete_sms_message(index):
return send_at_command("AT+CMGD=" + str(index))
def parse_sms(sms_meta, sms_text):
sms_meta = sms_meta.split(',')
return {
'index': int(sms_meta[0].split(': ')[1]),
'category': sms_meta[1].split("\"")[1],
'sender': sms_meta[2].split("\"")[1],
'date': sms_meta[4].split("\"")[1],
'text': sms_text
}
if __name__ == '__main__':
init(pin=config.sim_card_pin)
while (True):
# check received messages
for sms_message in get_sms_messages():
print('new message arrived')
msg = ReceivedMessage(
phone_from=sms_message['sender'],
msg_body=sms_message['text'],
)
db.session.add(msg)
db.session.commit()
delete_sms_message(sms_message["index"])
# send messages waiting to be sent
messages_to_send = (
db.query(MessageToSend)
.filter(MessageToSend.sent_at.is_(None))
.all()
)
for message in messages_to_send:
send_sms_message(message.phone_to, message.msg_body)
message.sent_at = datetime.utcnow()
db.session.commit()