-
-
Notifications
You must be signed in to change notification settings - Fork 95
/
Copy pathjupyter-zmq-channel.el
253 lines (219 loc) · 10.2 KB
/
jupyter-zmq-channel.el
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
;;; jupyter-zmq-channel.el --- A Jupyter channel implementation using ZMQ sockets -*- lexical-binding: t -*-
;; Copyright (C) 2019-2024 Nathaniel Nicandro
;; Author: Nathaniel Nicandro <[email protected]>
;; Created: 27 Jun 2019
;; This program is free software; you can redistribute it and/or
;; modify it under the terms of the GNU General Public License as
;; published by the Free Software Foundation; either version 3, or (at
;; your option) any later version.
;; This program is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;; General Public License for more details.
;; You should have received a copy of the GNU General Public License
;; along with GNU Emacs; see the file COPYING. If not, write to the
;; Free Software Foundation, Inc., 59 Temple Place - Suite 330,
;; Boston, MA 02111-1307, USA.
;;; Commentary:
;; Implements synchronous channel types using ZMQ sockets. Each channel is
;; essentially a wrapper around a `zmq-socket' constrained to a socket type by
;; the type of the channel and with an associated `zmq-IDENTITY' obtained from
;; the `jupyter-session' that must be associated with the channel. A heartbeat
;; channel is distinct from the other channels in that it is implemented using
;; a timer which periodically pings the kernel depending on how its configured.
;; In order for communication to occur on the other channels, one of
;; `jupyter-send' or `jupyter-recv' must be called after starting the channel
;; with `jupyter-start'.
;;; Code:
(require 'jupyter-messages)
(require 'zmq)
(eval-and-compile (zmq-load))
(require 'jupyter-channel)
(eval-when-compile (require 'subr-x))
(declare-function jupyter-ioloop-poller-remove "jupyter-ioloop")
(declare-function jupyter-ioloop-poller-add "jupyter-ioloop")
(defconst jupyter-socket-types
(list :hb zmq-REQ
:shell zmq-DEALER
:iopub zmq-SUB
:stdin zmq-DEALER
:control zmq-DEALER)
"The socket types for the various channels used by `jupyter'.")
(cl-deftype zmq-socket () '(satisfies zmq-socket-p))
(defclass jupyter-zmq-channel (jupyter-channel)
((socket
:type (or null zmq-socket)
:initform nil
:documentation "The socket used for communicating with the kernel.")))
(defun jupyter-connect-endpoint (type endpoint &optional identity)
"Create socket with TYPE and connect to ENDPOINT.
If IDENTITY is non-nil, it will be set as the ROUTING-ID of the
socket. Return the created socket."
(let ((sock (zmq-socket (zmq-current-context) type)))
(prog1 sock
(zmq-socket-set sock zmq-LINGER 1000)
(when identity
(zmq-socket-set sock zmq-ROUTING-ID identity))
(zmq-connect sock endpoint))))
(defun jupyter-connect-channel (ctype endpoint &optional identity)
"Create a socket based on a Jupyter channel type.
CTYPE is one of the symbols `:hb', `:stdin', `:shell',
`:control', or `:iopub' and represents the type of channel to
connect to ENDPOINT. If IDENTITY is non-nil, it will be set as
the ROUTING-ID of the socket. Return the created socket."
(let ((sock-type (plist-get jupyter-socket-types ctype)))
(unless sock-type
(error "Invalid channel type (%s)" ctype))
(jupyter-connect-endpoint sock-type endpoint identity)))
(cl-defmethod jupyter-start ((channel jupyter-zmq-channel)
&key (identity (jupyter-session-id
(oref channel session))))
(unless (jupyter-alive-p channel)
(let ((socket (jupyter-connect-channel
(oref channel type) (oref channel endpoint) identity)))
(oset channel socket socket)
(cl-case (oref channel type)
(:iopub
(zmq-socket-set socket zmq-SUBSCRIBE ""))))
(when (and (functionp 'jupyter-ioloop-environment-p)
(jupyter-ioloop-environment-p))
(jupyter-ioloop-poller-add (oref channel socket) zmq-POLLIN))))
(cl-defmethod jupyter-stop ((channel jupyter-zmq-channel))
(when (jupyter-alive-p channel)
(when (and (functionp 'jupyter-ioloop-environment-p)
(jupyter-ioloop-environment-p))
(jupyter-ioloop-poller-remove (oref channel socket)))
(with-slots (socket) channel
(zmq-disconnect socket (zmq-socket-get socket zmq-LAST-ENDPOINT)))
(oset channel socket nil)))
(cl-defmethod jupyter-alive-p ((channel jupyter-zmq-channel))
(not (null (oref channel socket))))
(cl-defmethod jupyter-send ((channel jupyter-zmq-channel) type message &optional msg-id)
"Send a message on a ZMQ based Jupyter channel.
CHANNEL is the channel to send MESSAGE on. TYPE is a Jupyter
message type, like :kernel-info-request. Return the message ID
of the sent message."
(cl-destructuring-bind (id . msg)
(jupyter-encode-message (oref channel session) type
:msg-id msg-id
:content message)
(prog1 id
(zmq-send-multipart (oref channel socket) msg))))
(cl-defmethod jupyter-recv ((channel jupyter-zmq-channel) &optional dont-wait)
"Receive a message on CHANNEL.
Return a cons cell (IDENTS . MSG) where IDENTS are the ZMQ
message identities, as a list, and MSG is the received message.
If DONT-WAIT is non-nil, return immediately without waiting for a
message if one isn't already available."
(condition-case nil
(let ((session (oref channel session))
(msg (zmq-recv-multipart (oref channel socket)
(and dont-wait zmq-DONTWAIT))))
(when msg
(cl-destructuring-bind (idents . parts)
(jupyter--split-identities msg)
(cons idents (jupyter-decode-message session parts)))))
(zmq-EAGAIN nil)))
;;; Heartbeat channel
(defvar jupyter-hb-max-failures 3
"Number of heartbeat failures until the kernel is considered unreachable.
A ping is sent to the kernel on a heartbeat channel and waits
until `time-to-dead' seconds to see if the kernel sent a ping
back. If the kernel doesn't send a ping back after
`jupyter-hb-max-failures', the callback associated with the
heartbeat channel is called. See `jupyter-hb-on-kernel-dead'.")
(defclass jupyter-hb-channel (jupyter-zmq-channel)
((type
:type keyword
:initform :hb
:documentation "The type of this channel is `:hb'.")
(time-to-dead
:type number
:initform 10
:documentation "The time in seconds to wait for a response
from the kernel until the connection is assumed to be dead. Note
that this slot only takes effect when starting the channel.")
(dead-cb
:type function
:initform #'ignore
:documentation "A callback function that takes 0 arguments
and is called when the kernel has not responded for
\(* `jupyter-hb-max-failures' `time-to-dead'\) seconds.")
(beating
:type (or boolean symbol)
:initform t
:documentation "A flag variable indicating that the heartbeat
channel is communicating with the kernel.")
(paused
:type boolean
:initform t
:documentation "A flag variable indicating that the heartbeat
channel is paused and not communicating with the kernel. To
pause the heartbeat channel use `jupyter-hb-pause', to unpause
use `jupyter-hb-unpause'."))
:documentation "A base class for heartbeat channels.")
(cl-defmethod jupyter-alive-p ((channel jupyter-hb-channel))
"Return non-nil if CHANNEL is alive."
(zmq-socket-p (oref channel socket)))
(defun jupyter-hb--pingable-p (channel)
(and (not (oref channel paused))
(jupyter-alive-p channel)))
(cl-defmethod jupyter-hb-beating-p ((channel jupyter-hb-channel))
"Return non-nil if CHANNEL is reachable."
(and (jupyter-hb--pingable-p channel)
(oref channel beating)))
(cl-defmethod jupyter-hb-pause ((channel jupyter-hb-channel))
"Pause checking for heartbeat events on CHANNEL."
(oset channel paused t))
(cl-defmethod jupyter-hb-unpause ((channel jupyter-hb-channel))
"Un-pause checking for heatbeat events on CHANNEL."
(when (oref channel paused)
(if (jupyter-alive-p channel)
;; Consume a pending message from the kernel if there is one. We send a
;; ping and then schedule a timer which fires TIME-TO-DEAD seconds
;; later to receive the ping back from the kernel and start the process
;; all over again. If the channel is paused before TIME-TO-DEAD
;; seconds, there may still be a ping from the kernel waiting.
(ignore-errors (zmq-recv (oref channel socket) zmq-DONTWAIT))
(jupyter-start channel))
(oset channel paused nil)
(jupyter-hb--send-ping channel)))
(cl-defgeneric jupyter-hb-on-kernel-dead (channel fun)
(declare (indent 1)))
(cl-defmethod jupyter-hb-on-kernel-dead ((channel jupyter-hb-channel) fun)
"When the kernel connected to CHANNEL dies, call FUN.
A kernel is considered dead when CHANNEL does not receive a
response after \(* `jupyter-hb-max-failures' `time-to-dead'\)
seconds has elapsed without the kernel sending a ping back."
(oset channel dead-cb fun))
(defun jupyter-hb--send-ping (channel &optional failed-count)
(when (jupyter-hb--pingable-p channel)
(condition-case nil
(progn
(zmq-send (oref channel socket) "ping")
(run-with-timer
(oref channel time-to-dead) nil
(lambda ()
(when-let* ((sock (and (jupyter-hb--pingable-p channel)
(oref channel socket))))
(oset channel beating
(condition-case nil
(and (zmq-recv sock zmq-DONTWAIT) t)
((zmq-EINTR zmq-EAGAIN) nil)))
(if (oref channel beating)
(jupyter-hb--send-ping channel)
;; Reset the socket
(jupyter-stop channel)
(jupyter-start channel)
(or failed-count (setq failed-count 0))
(if (< failed-count jupyter-hb-max-failures)
(jupyter-hb--send-ping channel (1+ failed-count))
(oset channel paused t)
(when (functionp (oref channel dead-cb))
(funcall (oref channel dead-cb)))))))))
;; FIXME: Should be a part of `jupyter-hb--pingable-p'
(zmq-ENOTSOCK
(jupyter-hb-pause channel)
(oset channel socket nil)))))
(provide 'jupyter-zmq-channel)
;;; jupyter-zmq-channel.el ends here