-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathUMQTT.Connection.pas
132 lines (104 loc) · 3.45 KB
/
UMQTT.Connection.pas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
unit UMQTT.Connection;
interface
uses
System.SysUtils,
OverbyteIcsWSocket;
type
IMQTTConnection = interface
procedure SetConnectionData(const pHost: string; pPort: Integer; pSsl: Boolean);
procedure RegisterCallback(pConnected: TProc<Boolean>; pDisconnected: TProc; pDataReceived: TProc<Pointer, Integer>);
procedure SendData(const pData: Pointer; pLen: Integer);
procedure Connect;
procedure Disconnect;
function LocalPort: Integer;
end;
TMQTTConnection = class(TInterfacedObject, IMQTTConnection)
private
fSocket: TWSocket;
fOnConnected: TProc<Boolean>;
fOnDisconnected: TProc;
fOnDataReceived: TProc<Pointer, Integer>;
procedure HandleSocketSslHandshakeDone(Sender: TObject; ErrCode: Word; PeerCert: TX509Base; var Disconnect: Boolean);
procedure HandleSocketConnected(Sender: TObject; ErrCode: Word);
procedure HandleSocketDataReceived(Sender: TObject; ErrCode: Word);
public
destructor Destroy; override;
procedure SetConnectionData(const pHost: string; pPort: Integer; pSsl: Boolean);
procedure RegisterCallback(pConnected: TProc<Boolean>; pDisconnected: TProc; pDataReceived: TProc<Pointer, Integer>);
procedure SendData(const pData: Pointer; pLen: Integer);
procedure Connect;
procedure Disconnect;
function LocalPort: Integer;
end;
implementation
{$REGION 'TMQTTConnection'}
destructor TMQTTConnection.Destroy;
begin
fSocket.Free;
inherited;
end;
procedure TMQTTConnection.HandleSocketSslHandshakeDone(Sender: TObject;
ErrCode: Word; PeerCert: TX509Base; var Disconnect: Boolean);
begin
end;
procedure TMQTTConnection.HandleSocketConnected(Sender: TObject; ErrCode: Word);
var
vContext: TSslContext;
begin
if Assigned(fOnConnected) then
fOnConnected(ErrCode = 0);
if fSocket is TSslWSocket then
begin
vContext := TSslContext.Create(fSocket);
// TSslWSocket(fSocket).ss
TSslWSocket(fSocket).SslContext := vContext;
TSslWSocket(fSocket).SslEnable := True;
TSslWSocket(fSocket).StartSslHandshake;
end;
end;
procedure TMQTTConnection.HandleSocketDataReceived(Sender: TObject;
ErrCode: Word);
var
vBuffer: AnsiString;
begin
vBuffer := fSocket.ReceiveStrA;
if Assigned(fOnDataReceived) then
fOnDataReceived(@vBuffer[1], Length(vBuffer));
end;
function TMQTTConnection.LocalPort: Integer;
begin
Result := fSocket.GetXPort.ToInteger;
end;
procedure TMQTTConnection.SetConnectionData(const pHost: string; pPort: Integer; pSsl: Boolean);
begin
if pSsl then
fSocket := TSslWSocket.Create(nil)
else
fSocket := TWSocket.Create(nil);
fSocket.Addr := pHost;
fSocket.Port := IntToStr(pPort);
fSocket.Proto := 'tcp';
fSocket.OnSslHandshakeDone := HandleSocketSslHandshakeDone;
fSocket.OnSessionConnected := HandleSocketConnected;
fSocket.OnDataAvailable := HandleSocketDataReceived;
end;
procedure TMQTTConnection.RegisterCallback(pConnected: TProc<Boolean>; pDisconnected: TProc; pDataReceived: TProc<Pointer, Integer>);
begin
fOnConnected := pConnected;
fOnDisconnected := pDisconnected;
fOnDataReceived := pDataReceived;
end;
procedure TMQTTConnection.SendData(const pData: Pointer; pLen: Integer);
begin
fSocket.Send(pData, pLen);
end;
procedure TMQTTConnection.Connect;
begin
fSocket.Connect;
end;
procedure TMQTTConnection.Disconnect;
begin
fSocket.Close;
end;
{$ENDREGION}
end.