Skip to content

Commit

Permalink
ref!: add support for node net sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
coolaj86 committed Aug 18, 2024
1 parent 79c1f84 commit 0f9c499
Showing 1 changed file with 125 additions and 121 deletions.
246 changes: 125 additions & 121 deletions dashp2p.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,88 @@ var DashP2P = ("object" === typeof module && exports) || {};
let Sizes = {};
let Utils = {};

DashP2P.fromWebSocket = function (wsc) {
let p2p = DashP2P.create();

p2p._socket = {
open: async function () {
let promise = new Promise(function (resolve) {
if (wsc.readyState === WebSocket.CONNECTING) {
wsc.addEventListener("open", resolve);
} else {
resolve(null);
}
});
return promise;
},
send: function (bytes) {
wsc.send(bytes);
},
close: function () {
wsc.close();
},
};

wsc.addEventListener("error", function (err) {
// TODO: might be a browser Event, not an Error
p2p._eventStream.emit("error", err);
});

wsc.addEventListener("message", async function (wsevent) {
let ab = await wsevent.data.arrayBuffer();
let bytes = new Uint8Array(ab);
p2p.processBytes(bytes);
});

return p2p;
};

DashP2P.fromNetSocket = function (socket) {
let p2p = DashP2P.create();

p2p._socket = {
send: function (bytes) {
socket.write(bytes);
},
close: function () {
socket.close();
},
open: async function () {
let promise = new Promise(function (resolve) {
if (socket.readyState === "opening") {
socket.on("ready", resolve);
} else {
resolve(null);
}
});
return promise;
},
};

socket.on("error", function (err) {
p2p._eventStream.emit("error", err);
});
socket.on("readable", async function () {
for (;;) {
let buf = socket.read();
if (!buf) {
return;
}
let bytes = new Uint8Array(buf);
p2p.processBytes(bytes);
}
});
socket.on("close", p2p.close);

return p2p;
};

DashP2P.create = function () {
const HEADER_SIZE = Sizes.HEADER;

let p2p = {};

// state
p2p.state = "header";
/** @type {Array<Uint8Array>} */
p2p.chunks = [];
Expand All @@ -65,15 +143,29 @@ var DashP2P = ("object" === typeof module && exports) || {};
p2p.header = null;
/** @type {Uint8Array?} */
p2p.payload = null;
let explicitEvents = ["version", "verack", "ping", "pong"];

// event-y stuff
let explicitEvents = ["version", "verack", "ping"];
p2p._eventStream = Utils.EventStream.create(explicitEvents);
p2p._evstream = p2p.createSubscriber(explicitEvents);
p2p.createSubscriber = p2p._eventStream.createSubscriber;

p2p._wsc = null;
// socket-y stuff
p2p._socket = {
open: async function () {}, // wait until peer is ready
send: function (bytes) {}, // send to peer
close: function () {}, // disconnect from peer
};
p2p.send = function (bytes) {
throw new Error("no socket has been initialized");
p2p._socket.send(bytes);
};
p2p.close = function () {
throw new Error("no socket has been initialized");
try {
p2p._conn.close();
} catch (e) {
console.error("error closing websocket:", e);
}
p2p._close(true);
};
p2p._close = function (bytes) {
try {
Expand All @@ -83,80 +175,45 @@ var DashP2P = ("object" === typeof module && exports) || {};
}
};

p2p.createSubscriber = p2p._eventStream.createSubscriber;

p2p.initWebSocket = async function (
wsc,
{ network, hostname, port, start_height },
) {
p2p._wsc = wsc;

p2p.send = function (bytes) {
return wsc.send(bytes);
};

p2p.close = function () {
try {
wsc.close();
} catch (e) {
console.error("error closing websocket:", e);
}
p2p._close(true);
};

wsc.addEventListener("message", async function (wsevent) {
let ab = await wsevent.data.arrayBuffer();
let bytes = new Uint8Array(ab);
console.log(
`%c ws.onmessage => p2p.processBytes(bytes) [${bytes.length}]`,
`color: #bada55`,
);
p2p.processBytes(bytes);
});

wsc.addEventListener("open", async function () {
{
let versionBytes = DashP2P.packers.version({
network: network,
addr_recv_ip: hostname,
addr_recv_port: port,
start_height: start_height,
});
console.log("DEBUG wsc.send(versionBytes)");
wsc.send(versionBytes);
}

{
let verackBytes = DashP2P.packers.verack({ network: network });
console.log("DEBUG wsc.send(verackBytes)");
wsc.send(verackBytes);
}
});

wsc.addEventListener("close", p2p.close);

let evstream = p2p.createSubscriber(["version", "verack", "ping"]);
p2p.init = async function ({ network, hostname, port, start_height }) {
console.log("%c subscribed", "color: red");

void (await evstream.once("version"));
console.log("%c[[version]] PROCESSED", "color: red");
// void (await evstream.once('verack'));
// console.log('%c[[verack]] PROCESSED', 'color: red');
await p2p._socket.open();
{
let versionBytes = DashP2P.packers.version({
network: network,
addr_recv_ip: hostname,
addr_recv_port: port,
start_height: start_height,
});
console.log("DEBUG p2p.send(versionBytes)");
p2p._socket.send(versionBytes);
}

(async function () {
{
let verackBytes = DashP2P.packers.verack({ network: network });
console.log("DEBUG p2p.send(verackBytes)");
p2p._socket.send(verackBytes);
}

async function goPong() {
for (;;) {
let msg = await evstream.once("ping");
let msg = await p2p._evstream.once("ping");
console.log("%c received ping", "color: red");
let pongBytes = DashP2P.packers.pong({
network: network,
nonce: msg.payload,
});
console.log("%c[[PING]] wsc.send(pongBytes)", "color: blue;");
wsc.send(pongBytes);
console.log("%c[[PING]] p2p.send(pongBytes)", "color: blue;");
p2p._socket.send(pongBytes);
}
})().catch(DashP2P.createCatchClose(["ping"]));
}
goPong().catch(DashP2P.createCatchClose(["ping"]));

return;
void (await p2p._evstream.once("version"));
console.log("%c[[version]] PROCESSED", "color: red");
// void (await p2p._evstream.once('verack'));
// console.log('%c[[verack]] PROCESSED', 'color: red');
};

/** @param {Uint8Array?} */
Expand Down Expand Up @@ -880,7 +937,7 @@ var DashP2P = ("object" === typeof module && exports) || {};

/**
* @param {Array<String>} events - ex: ['*', 'error'] for default events, or list by name
* @param {Function} eventLoopFn - called in a loop until evstream.close()
* @param {Function?} [eventLoopFn] - called in a loop until evstream.close()
*/
stream.createSubscriber = function (events, eventLoopFn) {
let conn = Utils.EventStream.createSubscriber(stream, events);
Expand Down Expand Up @@ -1038,59 +1095,6 @@ var DashP2P = ("object" === typeof module && exports) || {};
return p;
};

// /** @param {String} events */
// Utils.createPromiseGenerator = function (events) {
// let g = {};

// g.events = events;

// // g._settled = true;
// g._promise = Promise.resolve(); // for type hint
// g._results = [];

// g.resolve = function (result) {};
// g.reject = function (err) {};

// // g.init = async function () {
// // if (!g._settled) {
// // console.warn('g.init() called again before previous call was settled');
// // return await g._promise;
// // }
// // g._settled = false;
// g._promise = new Promise(function (_resolve, _reject) {
// g.resolve = _resolve;
// g.reject = _reject;
// // g.resolve = function (result) {
// // if (g._settled) {
// // g._results.push(result);
// // return;
// // }
// // g._settled = true;
// // _resolve(result);
// // };
// // g.reject = function (error) {
// // if (g._settled) {
// // g._results.push(error);
// // return;
// // }
// // g._settled = true;
// // _reject(error);
// // };
// });
// // if (g._results.length) {
// // let result = g._results.shift();
// // if (result instanceof Error) {
// // g.reject(result);
// // } else {
// // g.resolve(result);
// // }
// // }
// // return await g._promise;
// // };

// return g;
// };

/**
* @param {Array<Uint8Array>} byteArrays
* @param {Number?} [len]
Expand Down

0 comments on commit 0f9c499

Please sign in to comment.