-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathUMQTT.Protocol.pas
302 lines (235 loc) · 7.51 KB
/
UMQTT.Protocol.pas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
unit UMQTT.Protocol;
interface
uses
System.Classes,
System.SysUtils,
UMQTT.Protocol.Types;
type
TSendDataEvent = procedure(pData: Pointer; pLen: Integer) of object;
TPublishEvent = procedure(const pTopic: string; const pMessage: string) of object;
TMQTTProtocol = class
private
fBuffer: TBytes;
fOnConnack: TNotifyEvent;
fOnSendData: TSendDataEvent;
fOnPublish: TPublishEvent;
fOnPingresp: TNotifyEvent;
procedure TriggerSendData(const pData: TBytes);
procedure ProcessPingresp;
procedure ProcessPublish(pData: Pointer; pLen: Integer);
procedure ProcessSuback(pData: Pointer; pLen: Integer);
procedure ProcessPuback(pData: Pointer; pLen: Integer);
procedure ProcessConnack(pData: Pointer; pLen: Integer);
public
procedure ParseReceivedData(pData: Pointer; pLen: Integer);
procedure CommandConnect(const pClientId: string);
procedure CommandPingReq;
procedure CommandPublish(const pTopic, pMessage: string);
procedure CommandSubscribe(const pTopic: string);
property OnConnack: TNotifyEvent read fOnConnack write fOnConnack;
property OnSendData: TSendDataEvent read fOnSendData write fOnSendData;
property OnPublish: TPublishEvent read fOnPublish write fOnPublish;
property OnPingresp: TNotifyEvent read fOnPingresp write fOnPingresp;
end;
implementation
uses
Winapi.Windows,
Winapi.Winsock;
{$REGION 'TMQTTProtocol'}
procedure TMQTTProtocol.TriggerSendData(const pData: TBytes);
begin
if Assigned(fOnSendData) then
fOnSendData(@pData[0], Length(pData));
end;
procedure TMQTTProtocol.ProcessPingresp;
begin
if Assigned(fOnPingresp) then
fOnPingresp(Self);
end;
procedure TMQTTProtocol.ProcessPublish(pData: Pointer; pLen: Integer);
var
LMSB, LLSB: Byte;
Bytes: TBytes;
vTotal, vTopicLen: Integer;
vTopic, vMessage: string;
begin
vTotal := 0;
Bytes := Copy(TBytes(pData), 0, pLen);
LMSB := Bytes[0];
LLSB := Bytes[1];
Inc(vTotal, 2);
vTopicLen := LMSB * $100 + LLSB;
Inc(vTotal, vTopicLen);
vTopic := TEncoding.UTF8.GetString(Bytes, 2, vTopicLen);
// Packet identifier
if (Bytes[vTotal] = 0) and (Bytes[vTotal + 1] = 10) then
Inc(vTotal, 2);
// Properties
if (Bytes[vTotal] = 0) then
Inc(vTotal);
vMessage := TEncoding.UTF8.GetString(Bytes, vTotal, pLen - vTotal);
if Assigned(fOnPublish) then
fOnPublish(vTopic, vMessage);
OutputDebugString(PWideChar(vTopic + '/' + vMessage));
end;
procedure TMQTTProtocol.ProcessSuback(pData: Pointer; pLen: Integer);
begin
end;
procedure TMQTTProtocol.ProcessPuback(pData: Pointer; pLen: Integer);
begin
end;
procedure TMQTTProtocol.ProcessConnack(pData: Pointer; pLen: Integer);
var
vConnectAckFlags: Byte;
vConnectReasonCode: Byte;
begin
if pLen < 2 then
Exit;
// vConnectAckFlags := TBytes(pData)[0];
vConnectReasonCode := TBytes(pData)[1];
if TConnackReasonCodes(vConnectReasonCode) <> caSuccess then
Exit;
if Assigned(fOnConnack) then
fOnConnack(Self);
end;
procedure TMQTTProtocol.ParseReceivedData(pData: Pointer; pLen: Integer);
var
vTotal, vPacketSize: Integer;
vIdentifier: Byte;
begin
fBuffer := fBuffer + Copy(TBytes(pData), 0, pLen);
vTotal := 0;
while Length(fBuffer) > 0 do
begin
if Length(fBuffer) < vTotal + SizeOf(TMQTTFixedHeader) then
Break;
vIdentifier := fBuffer[vTotal] shr 4;
vPacketSize := fBuffer[vTotal + 1];
if Length(fBuffer) < vTotal + 2 + vPacketSize then
Break;
case TControlPacket(vIdentifier) of
cpConnack: ProcessConnack(@Copy(fBuffer, vTotal + 2, vPacketSize)[0], vPacketSize);
cpPuback: ProcessPuback(@Copy(fBuffer, vTotal + 2, vPacketSize)[0], vPacketSize);
cpSuback: ProcessSuback(@Copy(fBuffer, vTotal + 2, vPacketSize)[0], vPacketSize);
cpPublish: ProcessPublish(@Copy(fBuffer, vTotal + 2, vPacketSize)[0], vPacketSize);
cpPingresp: ProcessPingresp;
end;
// Increment total read
Inc(vTotal, SizeOf(TMQTTFixedHeader) + vPacketSize);
end;
Delete(fBuffer, 0, vTotal);
end;
procedure TMQTTProtocol.CommandConnect(const pClientId: string);
var
vPacket: TMQTTPacket;
begin
vPacket.BeginUpdate;
try
vPacket.FixedHeader.ControlPacketType(cpConnect);
// Protocol name and version
vPacket.VariableHeader.AddStrLength(Length(c_MQTTName));
vPacket.VariableHeader.AppendData(c_MQTTName);
vPacket.VariableHeader.AppendData([c_MQTTVersion]);
// Connect flags
// User Name | Flag | Password | Flag | Will Retain | Will QoS | Will Flag | Clean Start | Reserved
vPacket.VariableHeader.AppendData([0]);
// Keep alive
vPacket.VariableHeader.AppendData([0, 0]);
// Properties
vPacket.VariableHeader.AppendData([0]);
// Payload
vPacket.Payload.AddStrLength(Length(pClientId));
vPacket.Payload.AppendData(TEncoding.UTF8.GetBytes(pClientId));
finally
vPacket.EndUpdate;
end;
TriggerSendData(vPacket.ToBytes);
end;
procedure TMQTTProtocol.CommandPingReq;
var
vPacket: TMQTTPacket;
begin
vPacket.BeginUpdate;
try
vPacket.FixedHeader.ControlPacketType(cpPingreq);
finally
vPacket.EndUpdate;
end;
TriggerSendData(vPacket.ToBytes);
end;
procedure TMQTTProtocol.CommandPublish(const pTopic, pMessage: string);
var
vPacket: TMQTTPacket;
begin
vPacket.BeginUpdate;
try
vPacket.FixedHeader.ControlPacketType(cpPublish);
// Topic
vPacket.VariableHeader.AddStrLength(Length(pTopic));
vPacket.VariableHeader.AppendData(TEncoding.UTF8.GetBytes(pTopic));
// Packet identifier
// if (QOS = 1) or (QOS = 2) then
// vPacket.VariableHeader.AppendData();
// vPacket.VariableHeader.AppendData([0]);
vPacket.VariableHeader.AppendData([0, 10]);
// Property length
vPacket.VariableHeader.AppendData([0]);
// Payload
vPacket.Payload.AppendData(TEncoding.UTF8.GetBytes(pMessage));
finally
vPacket.EndUpdate;
end;
TriggerSendData(vPacket.ToBytes)
end;
procedure TMQTTProtocol.CommandSubscribe(const pTopic: string);
var
vPacket: TMQTTPacket;
begin
vPacket.BeginUpdate;
try
vPacket.FixedHeader.ControlPacketType(cpSubscribe);
// Packet identifier
vPacket.VariableHeader.AppendData([0, 10, 0]);
// Payload
vPacket.Payload.AddStrLength(Length(pTopic));
vPacket.Payload.AppendData(TEncoding.UTF8.GetBytes(pTopic));
// Subscription options
vPacket.Payload.AppendData([0]);
finally
vPacket.EndUpdate;
end;
TriggerSendData(vPacket.ToBytes);
end;
{$ENDREGION}
{$REGION 'Tests'}
procedure TestFixedHeader;
var
vHeader: TMQTTFixedHeader;
vProtocol: TMQTTProtocol;
begin
vHeader.ControlPacketType(cpConnect);
if not vHeader.ControlAndFlags = 16 then
raise Exception.Create('Invalid byte 1 value');
vHeader.ControlPacketType(cpConnack);
if not vHeader.ControlAndFlags = 32 then
raise Exception.Create('Invalid byte 1 value');
vHeader.ControlPacketType(cpSubscribe);
if not vHeader.ControlAndFlags = 130 then
raise Exception.Create('Invalid byte 1 value');
// var vIdentifier := fBuffer[0] shr 4;
// case TControlPacket(vIdentifier) of
// cpConnack:
// begin
// end;
// end;
vProtocol := TMQTTProtocol.Create;
try
vProtocol.CommandConnect('CID1883');
finally
vProtocol.Free;
end;
end;
{$ENDREGION}
initialization
TestFixedHeader;
end.