diff --git a/config.js b/config.js index 531cae0..01a779e 100644 --- a/config.js +++ b/config.js @@ -2,6 +2,7 @@ module.exports = { extensions: { // clojurescript: require('./ext/clojurescript'), + clojure: require('./build/ext/clojure'), clojurescript: require('./build/ext/clojurescript'), coffee: require('./build/ext/coffee'), babel: require('./build/ext/babel') diff --git a/ext/clojure.js b/ext/clojure.js new file mode 100644 index 0000000..589a0da --- /dev/null +++ b/ext/clojure.js @@ -0,0 +1,171 @@ + +const net = require('net') +const fs = require('fs') +const path = require('path') +const bencode = require('bencode') +const {EventEmitter} = require('events') + +function bocket(sock) { + const em = new EventEmitter() + let buffer = new Buffer('') + sock.on('connect', () => em.emit('connect')) + sock.on('data', data => { + buffer = Buffer.concat([buffer, data]) + while (buffer.length) { + let res + try { + res = bencode.decode(buffer, 'utf8') + } catch (err) { + return + } + let used = bencode.encode(res, 'utf8').length + buffer = buffer.slice(used) + em.emit('data', res) + } + }) + sock.on('error', err => em.emit('error', err)) + return { + on(val, fn) { + return em.on(val, fn) + }, + off(val, fn) { + return em.removeListener(val, fn) + }, + close() { + sock.close() + }, + send(val) { + return sock.write(bencode.encode(val)) + }, + _sock: sock + } +} + +class Session extends EventEmitter { + constructor(port, session) { + super() + this.bock = bocket(net.connect({port})) + this.waiting = [] + this.queue = [] + this.session = session || null + this.bock.on('data', this.onData.bind(this)) + this.bock.on('connect', () => { + if (!this.session) { + this.send({op: 'clone'}, data => { + this.session = data[0]['new-session'] + console.log(data) + this.emit('connect') + }) + } else { + this.emit('connect') + } + }) + } + + send(data, done) { + if (this.session) { + data.session = this.session + } + this.waiting.push(done) + this.bock.send(data) + } + + eval(val, done) { + this.send({op: 'eval', code: val}, done) + } + + op(name, args, done) { + if (arguments.length === 2) { + done = args + args = {} + } + args = args || {} + args.op = name + this.send(args, done) + } + + onData(data) { + if (!this.waiting.length) { + return console.error('Data, came, but no one waited...', data) + } + if (this.waiting[0].partial) { + this.waiting[0].partial(data) + } + this.queue.push(data) + if (data.status && data.status[0] === 'done'){ + if (this.waiting[0].final) { + this.waiting.shift().final(this.queue) + } else { + this.waiting.shift()(this.queue) + } + this.queue = [] + } + } + + e(val) { + this.eval(val, data => data.map(d => console.log(JSON.stringify(d, null, 2)))) + } +} + +export default function extension(ctx, args, done) { + let port = args && parseInt(args[0]) + if (!port) { + return done(new Error("Port is required")) + } + let s + try { + s = new Session(port) + } catch (e) { + return done(e) + } + s.on('connect', () => { + s.eval(`(do (use 'clj-info) (use 'clojure.repl) (use 'complete.core))`, data => { + done(null, {block:{clojure:{ + execute(ctx, args, code, out, done) { + s.eval(`(do ${code})`, { + partial: data => { + if (data.out) { + out.stream('stdout', data.out) + } else if (data.value) { + out.output({'text/plain': data.value}) + } else if (data.err) { + // out.error('Runtime error', 'err', data.err) + } + }, + final: data => { + if (data[0].status && data[0].status[0] === 'eval-error') { + return done(new Error("Eval failed: " + data[1].err)) + } + done(null) + } + }) + }, + complete(ctx, code, pos, done) { + const line = code.slice(0, pos).split('\n').pop() + const chunks = line.split(/[\s()\[{}\]]/g) + const last = chunks[chunks.length - 1] + // console.log('Completing:', JSON.stringify([code, pos], null, 2), last, last.length) + s.eval(`(map symbol (completions "${last}"))`, data => { + if (data[0].value) { + const res = { + matches: data[0].value.slice(1, -1).split(' '), + status: 'ok', + cursor_start: pos - last.length, + cursor_end: pos, + } + done(null, res) + } else { + console.log('Matches failed!') + data.forEach(d => console.log(JSON.stringify(d, null, 2))) + done(null, { + status: 'err' + }) + } + }) + } + }}}) + }) + }) +} + + diff --git a/lib/context.js b/lib/context.js index 35aa82f..d6fd761 100644 --- a/lib/context.js +++ b/lib/context.js @@ -5,12 +5,24 @@ import config from '../config' import getCompletions from './complete' import loadExt from './load-ext' import async from 'async' +import {spawn} from 'child_process' +import fs from 'fs' +import path from 'path' export default class Context { constructor(wired) { this.wired = wired this.ctx = { - require, + require(name) { + const full = path.resolve(path.join('node_modules', name)) + if (fs.existsSync(full)) { + return require(full) + } + return require(name) + }, + Buffer, + setTimeout, + setInterval, itreed: this, process, } @@ -35,11 +47,31 @@ export default class Context { lineMagic(line, out, done) { const parts = line.trim().split(/\s+/g) if (!this.magics.line[parts[0]]) { - return done(new Error(`Unknown magic: ${parts[0]}`)) + return done(new Error(`Unknown line magic: ${parts[0]}`)) } this.magics.line[parts[0]](this, out, parts.slice(1), done) } + magicComplete(line, code, pos, done) { + const parts = line.trim().split(/\s+/g) + if (!this.magics.block[parts[0]]) { + return false + } + const magic = this.magics.block[parts[0]] + const args = parts.slice(1) + if (!magic.complete) return false + var d = require('domain').create() + d.on('error', err => { + console.log('async error', err) + console.log('async error', err.stack) + done(err) + }) + d.run(() => { + magic.complete(this, code, pos, done) + }) + return true + } + blockMagic(line, block, out, done) { const parts = line.trim().split(/\s+/g) if (!this.magics.block[parts[0]]) { @@ -49,6 +81,8 @@ export default class Context { const args = parts.slice(1) if ('function' === typeof magic) { return magic(this, args, block, out, done) + } else if (magic.execute) { + return magic.execute(this, args, block, out, done) } else if (magic.transform) { return magic.transform(this, args, block, out, (err, code) => { if (err) return done(err) @@ -78,16 +112,42 @@ export default class Context { rawEvaluate(code, out, done) { let result - try { - result = this.rawRun(code) - } catch (error) { - return done(error) - } - // TODO allow custom formatters - return done(null, result !== undefined ? format(result) : null) + var d = require('domain').create() + d.on('error', err => { + console.log('async error', err) + out.error('Async Error', err.message, err.stack) + }) + d.run(() => { + try { + result = this.rawRun(code) + } catch (error) { + return done(error) + } + // TODO allow custom formatters + return done(null, result !== undefined ? format(result) : null) + }) } - complete(code, pos, done) { + complete(code, pos, variant, done) { + let off = 0 + if (code.match(/^%%/)) { + const lines = code.split('\n') + const first = lines.shift() + variant = first.slice(2) + code = lines.join('\n') + off = first.length + 1 + } + if (variant) { + if (this.magicComplete(variant, code, pos - off, (err, res) => { + if (res) { + res.cursor_start += off + res.cursor_end += off + } + done(err, res) + })) { + return + } + } // TODO allow extension to the completion logic? e.g. for clojurescript try { return getCompletions(this.magics, this.ctx.getGlobal(), code, pos, done) @@ -118,10 +178,48 @@ export default class Context { } } + shell(code, out, done) { + const proc = spawn('sh', ['-x', '-c', code]) + proc.stdout.on('data', data => out.stream('stdout', data.toString())) + proc.stderr.on('data', data => out.stream('stderr', data.toString())) + proc.on('close', code => { + done(code !== 0 ? new Error(`Exit code: ${code}`) : null) + }) + } + + cd(code, out, done) { + code = code.replace(/~/g, process.env.HOME) + try { + process.chdir(code) + } catch (e) { + return done(new Error(`No such dir: ${code}`)) + } + out.stream('stdout', 'cd to ' + code) + done() + } + + checkSpecial(code, out, done) { + if (code.match(/^%%/)) { + code = code.split('\n').slice(1).join('\n') + } + + if (code[0] === '!' && code.indexOf('\n') === -1) { + this.shell(code.slice(1), out, done) + return true + } + + if (code.indexOf('cd ') === 0 && code.indexOf('\n') === -1) { + this.cd(code.slice('cd '.length), out, done) + return true + } + } + execute(code, out, done) { if (this.dead) throw new Error('Kernel has been shutdown. Cannot execute') this.outToContext(out) + if (this.checkSpecial(code, out, done)) return + if (code.match(/^%%/)) { const lines = code.split('\n') const first = lines.shift() diff --git a/lib/kernel.js b/lib/kernel.js index e4d8ecb..43ffb00 100644 --- a/lib/kernel.js +++ b/lib/kernel.js @@ -10,11 +10,21 @@ function get_uuid() { const DELIM = '' +const knownSigs = { + 'hmac-sha256': 'sha256' +} + + export default class Kernel { constructor(config) { this.config = config this.ctx = new Context() this.count = 0 + this.key = config.key + this.sig = knownSigs[config.signature_scheme] + if (!this.sig) { + throw new Error(`Using a signatute scheme I don't know about: ${connection.signature_scheme}`) + } } init(done) { @@ -40,8 +50,21 @@ export default class Kernel { } } - onShell(uuid, delim, hmac, header, parent_header, metadata, content, blob) { - header = JSON.parse(header.toString()) + // uuid, delim, hmac, header, parent_header, metadata, content + onShell() { + const strs = [].map.call(arguments, a => a.toString()) + let i + for (i=0; i { + this.ctx.complete(content.code, content.cursor_pos, metadata.variant, (error, completions) => { if (error) { console.log(error) return this.send('shell', uuid, 'complete_reply', { @@ -103,6 +126,14 @@ export default class Kernel { }, header) } + sendError(header, name, message, stack) { + this.ioPub('error', { + ename: name, + evalue: message, + trackeback: [stack], + }, header) + } + executeRequest(uuid, header, metadata, content) { this.ioPub('status', {execution_state: 'busy'}, header) this.count += 1 @@ -113,6 +144,7 @@ export default class Kernel { this.ctx.execute(content.code, { stream: this.sendStream.bind(this, header), output: this.sendOutput.bind(this, header), + error: this.sendError.bind(this, header), }, (error, result) => { if (error) { this.ioPub('error', { @@ -162,24 +194,27 @@ export default class Kernel { }, parent, metadata, content) } + hash(string) { + const hmac = crypto.createHmac(this.sig, this.key) + hmac.update(string) + const res = hmac.digest('hex') + return res + } + _send(sock, id, header, parent, metadata, content) { const toHash = [ json(header), json(parent), json(metadata || {}), json(content)] - const hmac = crypto.createHmac('sha256', this.config.key) - hmac.update(toHash.join('')) - const data = [id, DELIM, hmac.digest('hex')].concat(toHash) + const hmac = this.hash(toHash.join('')) + const data = [id, DELIM, hmac].concat(toHash) this.sockets[sock].send(data) } } function fixUnicode(val) { return val.replace('\ufdd0', '\\ufdd0') - // console.log(val) - // console.log(JSON.stringify(val)) - // return decodeURIComponent(escape(val)).replace('\ufdd0', '\\ufdd0') } function json(data) { diff --git a/lib/load-ext.js b/lib/load-ext.js index 2575c00..f1f3a55 100644 --- a/lib/load-ext.js +++ b/lib/load-ext.js @@ -12,15 +12,22 @@ export default function loadExt(ctx, out, args, done) { if (!config.extensions[name]) { throw new Error(`Extension ${name} not available`) } - config.extensions[name](ctx, args, (err, magics) => { - if (err || !magics) return done(err) - for (let name in magics.block) { - ctx.magics.block[name] = magics.block[name] - } - for (let name in magics.line) { - ctx.magics.line[name] = magics.line[name] - } - done() + var d = require('domain').create() + d.on('error', err => { + console.log('async error', err) + done(err) + }) + d.run(() => { + config.extensions[name](ctx, args, (err, magics) => { + if (err || !magics) return done(err) + for (let name in magics.block) { + ctx.magics.block[name] = magics.block[name] + } + for (let name in magics.line) { + ctx.magics.line[name] = magics.line[name] + } + done() + }) }) } diff --git a/lib/run.js b/lib/run.js index 824ba3b..5e1fddf 100644 --- a/lib/run.js +++ b/lib/run.js @@ -1,4 +1,5 @@ +import path from 'path' import Kernel from './kernel' if (process.argv.length != 3) { @@ -6,13 +7,14 @@ if (process.argv.length != 3) { process.exit() } -const config = require(process.argv[2]) +const config = require(path.resolve(process.argv[2])) const kernel = new Kernel(config) kernel.init(err => { if (err) { console.error("Failed to init") + console.log(err) process.exit() } }) diff --git a/package.json b/package.json index f07a48b..5bce643 100644 --- a/package.json +++ b/package.json @@ -27,6 +27,7 @@ "async": "^0.9.0", "babel": "^5.1.11", "coffee-script": "^1.9.2", + "bencode": "^0.7.0", "contextify": "^0.1.13", "superagent": "^1.2.0", "zmq": "^2.11.0" diff --git a/test/Makefile b/test/Makefile new file mode 100644 index 0000000..6f4b895 --- /dev/null +++ b/test/Makefile @@ -0,0 +1,6 @@ + +run: + babel-node ./run.js connection.json config.json + +kernel: + babel-node ../lib/run.js connection.json diff --git a/test/Readme.md b/test/Readme.md new file mode 100644 index 0000000..f81c527 --- /dev/null +++ b/test/Readme.md @@ -0,0 +1,37 @@ + +# Testing an IPython Client + +connection.json should be the standard ipython kernel connection file format + +``` +{ + "control_port": 50160, + "shell_port": 57503, + "transport": "tcp", + "signature_scheme": "hmac-sha256", + "stdin_port": 52597, + "hb_port": 42540, + "ip": "127.0.0.1", + "iopub_port": 40885, + "key": "a0436f6c-1916-498b-8eb9-e81ab9368e84" +} +``` + +`config.json` should contain: + +``` +{ + "execute": { + "{test name}": { + "payload": execute_request payload, + "output": what will be sent to display_data + OR + "error": {ename, evalue} expected + }, + }, + "complete": { + "{code}": [] // list of completions (order invariant) + } +} +``` + diff --git a/test/config.json b/test/config.json new file mode 100644 index 0000000..c82538e --- /dev/null +++ b/test/config.json @@ -0,0 +1,21 @@ +{ + "execute": { + "Adding": { + "content": {"code": "2+2"}, + "events": [{ + "type": "execute_result", + "content": {"data": {"text/plain": "4"}} + }, { + "type": "status", + "content": {"execution_state": "busy"} + }, { + "type": "status", + "content": {"execution_state": "idle"} + }] + } + }, + "complete": { + "pr": ["process"], + "/": ["/etc", "/home"] + } +} diff --git a/test/connection.json b/test/connection.json new file mode 100644 index 0000000..b8486b5 --- /dev/null +++ b/test/connection.json @@ -0,0 +1,12 @@ +{ + "control_port": 50160, + "shell_port": 57503, + "transport": "tcp", + "signature_scheme": "hmac-sha256", + "stdin_port": 52597, + "hb_port": 42540, + "ip": "127.0.0.1", + "iopub_port": 40885, + "key": "a0436f6c-1916-498b-8eb9-e81ab9368e84" +} + diff --git a/test/get-kernel-info.js b/test/get-kernel-info.js new file mode 100644 index 0000000..5e6449d --- /dev/null +++ b/test/get-kernel-info.js @@ -0,0 +1,16 @@ + +export default function getKernelInfo(server, next) { + server.ping({ + sock: 'shell', + send: 'kernel_info_request', + expect: 'kernel_info_reply', + }, (err, payload) => { + next(null, { + type: 'kernel_info_request', + passed: !err && payload.content && payload.content.language, + info: !err && payload.content, + time: Date.now() + }) + }) +} + diff --git a/test/heartbeat.js b/test/heartbeat.js new file mode 100644 index 0000000..3f9cc1f --- /dev/null +++ b/test/heartbeat.js @@ -0,0 +1,35 @@ + +export default function testHeartbeat(sock, errors) { + let sent = null + const m = data => { + data = data.toString() + if (data !== sent) { + errors.push({ + type: 'bad heartbeat', + passed: data === sent, + sent: sent, + got: data, + time: Date.now() + }) + } + sent = null + } + sock.on('message', m) + const hb = setInterval(() => { + if (sent !== null) { + errors.push({ + type: 'missed heartbeat', + passed: false, + sent: sent, + got: 'no response', + time: Date.now() + }) + } + sock.send(sent = Math.random().toString(0xf).slice(10, 40)) + }, 200) + return () => { + sock.removeListener('message', m) + clearInterval(hb) + } +} + diff --git a/test/kernel.js b/test/kernel.js new file mode 100644 index 0000000..0a7dcc2 --- /dev/null +++ b/test/kernel.js @@ -0,0 +1,114 @@ + +import setupSockets from './sockets' +import crypto from 'crypto' + +import Server from './server' +import getKernelInfo from './get-kernel-info' +import testHeartbeat from './heartbeat' +import async from 'async' + +function isSubset(one, larger) { + if (one === larger) return true + if (!one || !larger) return false + if (typeof one !== typeof larger) return false + if ('object' !== typeof one) return false + if (one.constructor.name !== larger.constructor.name) return false + if (Array.isArray(one)) { + return !one.some(n => !larger.some(l => isSubset(n, l))) + } + for (let name in one) { + if (!isSubset(one[name], larger[name])) return false + } + return true +} + +export default function (connection, config) { + let errors = [] + setupSockets(connection, (err, sockets) => { + console.log('Sockets connected: testing') + const stopHeartbeat = testHeartbeat(sockets.heartbeat, errors) + + const server = new Server(connection, config, sockets) + + server.on('error', err => errors.push(err)) + + let tasks = [] + + // setTimeout is so the heartbeats can go + tasks.push(next => setTimeout(() => getKernelInfo(server, next), 500)) + tasks.push(next => server.ping({ + sock: 'shell', + send: 'history_request', + expect: 'history_reply', + }, (err, payload) => { + next(null, { + type: 'history_request', + passed: !err && payload && payload.content && payload.content.history && Array.isArray(payload.content.history) + }) + })) + + tasks = tasks.concat(Object.keys(config.execute).map( + name => next => server.ping({ + sock: 'shell', + send: 'execute_request', + ios: ['display_data', 'execute_result', 'execute_input', 'error', 'status'], + content: config.execute[name].content, + expect: 'execute_reply', + }, (err, payload, events) => { + const execute = config.execute[name] + let missing = [] + if (!events) missing = execute.events + else { + execute.events.forEach(ev => { + const failed = !events.some(ev2 => + ev2.header.msg_type === ev.type && + isSubset(ev.content, ev2.content) + ) + if (failed) missing.push(ev) + }) + } + next(null, { + type: 'execute', + passed: !missing.length, + events, + missing, + }) + }) + )) + + tasks = tasks.concat(Object.keys(config.complete).map( + code => next => server.ping({ + sock:'shell', + send: 'complete_request', + expect: 'complete_reply', + content: {code: code, cursor_pos: code.length} + }, (err, payload) => { + let missing = config.complete[code].filter(name => payload.content.matches.indexOf(name) === -1) + next(null, { + type: 'complete', + passed: !missing.length, + matches: payload.content.matches, + missing, + }) + }) + )) + + async.series(tasks, (err, tests) => { + if (err) console.log('Error from async', err) + stopHeartbeat() + console.log(JSON.stringify(tests.filter(t => !t.passed), null, 2)) + const failed = tests.filter(t => !t.passed) + console.log() + console.log() + if (!failed.length) { + console.log(`>>>>>>> ALL ${tests.length} PASSED <<<<<<<<`) + } else { + console.log('!!!!!!!!!', failed.length + '/' + tests.length + ' failures !!!!!!!') + } + console.log(errors.length, 'errors', errors) + console.log() + process.exit() + }) + }) +} + diff --git a/test/kernel.log b/test/kernel.log new file mode 100644 index 0000000..e69de29 diff --git a/test/run.js b/test/run.js new file mode 100644 index 0000000..4a2a0ea --- /dev/null +++ b/test/run.js @@ -0,0 +1,9 @@ + +import testKernel from './kernel' +import path from 'path' + +const connection = require(path.resolve(process.argv[2])) +const config = require(path.resolve(process.argv[3])) + +testKernel(connection, config) + diff --git a/test/server.js b/test/server.js new file mode 100644 index 0000000..f1a9193 --- /dev/null +++ b/test/server.js @@ -0,0 +1,152 @@ + +import {EventEmitter} from 'events' +import crypto from 'crypto' + +function fixUnicode(val) { + return val.replace('\ufdd0', '\\ufdd0') +} + +const DELIM = '' + +function json(data) { + return fixUnicode(JSON.stringify(data) || '{}') +} + +function get_uuid() { + return Math.random().toString(0x0f).slice(10, 100) +} + +const knownSigs = { + 'hmac-sha256': 'sha256' +} + +export default class Server extends EventEmitter { + constructor(connection, config, sockets) { + super() + this.sockets = sockets + this.key = connection.key + this.sig = knownSigs[connection.signature_scheme] + if (!this.sig) { + throw new Error(`Using a signatute scheme I don't know about: ${connection.signature_scheme}`) + } + for (let name in sockets) { + if (name === 'heartbeat') continue + sockets[name].on('message', this.recv.bind(this, name)) + } + } + + ping(config, done) { + const key = config.sock + ':' + config.expect + let tout = setTimeout(() => { + console.log('TIMEOUT waiting for', config.expect) + tout = null + done(new Error('Timeout')) + }, config.timeout || 1000) + + const header = this.send(config.sock, '', config.send, config.content, {}, {}) + + const events = [] + config.ios && config.ios.forEach(ev => this.on('iopub:' + ev, event)) + function event(payload) { + if (payload.parent_header.msg_id !== header.msg_id) { + return // not answering me + } + events.push(payload) + } + + this.on(key, got) + function got(payload) { + if (!tout) return console.log('Received after timeout', config.expect) + // console.log(payload, header) + if (payload.parent_header.msg_id !== header.msg_id) { + return // not answering me + } + clearTimeout(tout) + this.off(key, got) + setTimeout(() => { + config.ios && config.ios.forEach(ev => this.off('iopub:' + ev, event)) + done(null, payload, events) + }, 100) + } + + } + + off(evt, hl) { + return this.removeListener(evt, hl) + } + + // sock, delim, hmac, header, parent_header, metadata, content + recv(sock) { + const args = [].map.call(arguments, m => m.toString()).slice(1) + if (sock === 'iopub') { + args.shift() + } + const [delim, hmac, header, parent_header, metadata, content] = args + const toHash = [header, parent_header, metadata, content].join('') + const payload = {header, parent_header, metadata, content} + for (let name in payload) { + const val = payload[name] + let parsed + try { + parsed = JSON.parse(val) + } catch (err) { + this.emit('error', { + type: 'format', + message: `${name} is supposed to be JSON, but was ${val}`, + }) + continue + } + payload[name] = parsed + if (!parsed || 'object' !== typeof parsed || Array.isArray(parsed)) { + this.emit('error', { + type: 'format', + message: `${name} is supposed to be a JSON serialized dictionary, but was ${val}` + }) + } + } + payload.delim = delim + payload.hmac = hmac + const hash = this.hash(toHash) + if (hash !== hmac.toString()) { + this.emit('error', { + type: 'security', + message: "Hashes didn't match", + data: [hash, hmac.toString()], + }) + } + // console.log(sock, header, payload.header.msg_type) + this.emit(sock, payload) + this.emit(sock + ':' + payload.header.msg_type, payload) + } + + send(sock, id, msg_type, content, parent, metadata) { + const repid = get_uuid() + const header = { + msg_id: repid, + username: parent.username, + msg_type: msg_type, + session: parent.session, + version: '5.0', + } + this._send(sock, id, header, parent, metadata, content) + return header + } + + hash(string) { + const hmac = crypto.createHmac(this.sig, this.key) + hmac.update(string) + return hmac.digest('hex') + } + + _send(sock, id, header, parent, metadata, content) { + const toHash = [ + json(header), + json(parent), + json(metadata || {}), + json(content)] + const hmac = this.hash(toHash.join('')) + const data = [DELIM, hmac].concat(toHash) + this.sockets[sock].send(data) + } +} + diff --git a/test/sockets.js b/test/sockets.js new file mode 100644 index 0000000..335c4e2 --- /dev/null +++ b/test/sockets.js @@ -0,0 +1,44 @@ + +import async from 'async' +import zmq from 'zmq' + +export default function setupSockets(config, done) { + const sockets = { + control: { + port: config.control_port, + type: 'dealer', + }, + shell: { + port: config.shell_port, + type: 'dealer', + }, + stdin: { + port: config.stdin_port, + type: 'dealer', + }, + iopub: { + port: config.iopub_port, + type: 'sub', + }, + heartbeat: { + port: config.hb_port, + type: 'req', + } + } + const tasks = {} + for (let name in sockets) { + tasks[name] = setupSocket.bind(null, sockets[name], config) + } + async.parallel(tasks, done) +} + +function setupSocket(config, general, done) { + const sock = zmq.socket(config.type); + const addr = general.transport + '://' + general.ip + ':' + config.port + sock.connect(addr) + if (config.type === 'sub') { + sock.subscribe('') + } + done(null, sock) +} + diff --git a/test/tbind.js b/test/tbind.js new file mode 100644 index 0000000..80531fc --- /dev/null +++ b/test/tbind.js @@ -0,0 +1,24 @@ + +import zmq from 'zmq' + +const r = zmq.socket('dealer') +const p = zmq.socket('router') + +const uri = 'tcp://127.0.0.1:55432' + +p.bind('tcp://*:55432', err => { + console.log('FaILE', err) + if (err) { + process.exit() + } + r.connect(uri) + p.on('message', function () { + console.log([].map.call(arguments, a => a.toString())) + }) + r.send(['hi', 'ho', 'cherry']) + setTimeout(() => { + process.exit() + }, 200) + +}) +