diff --git a/build.js b/build.js index 72d5aa6..2b85eb6 100644 --- a/build.js +++ b/build.js @@ -1,22 +1,26 @@ -var $tw = require("tiddlywiki").TiddlyWiki(); -var path = require("path"); -var fs = require("fs"); - -// Pass the command line arguments to the boot kernel -$tw.boot.argv = ["."]; -// we only need the first phase of startup -$tw.boot.initStartup({}); -// get the bundle output path -var output = path.resolve("plugins/sse/plugin.info"); -// create the directory tree -if(!fs.existsSync(path.dirname(output))) { - fs.mkdirSync(path.dirname(output),{recursive: true}); -} -// load the plugin from the dist folder -var plugin = $tw.loadPluginFolder(path.join(__dirname,"dist")); -// put the plugin tiddlers back into the tiddlers field -$tw.utils.extend(plugin,JSON.parse(plugin.text)); -// remove the text field -delete plugin.text; -// write the bundled plugin to file -fs.writeFileSync(output,JSON.stringify(plugin,null,"\t")); +var $tw = require("tiddlywiki").TiddlyWiki(); +var path = require("path"); +var fs = require("fs"); + +// Pass the command line arguments to the boot kernel +$tw.boot.argv = ["."]; +// we only need the first phase of startup +$tw.boot.initStartup({}); +// get the bundle output path +var output = [path.resolve("plugins/sse/plugin.info"), path.resolve("plugins/sse/plugin.json")]; + +// load the plugin from the dist folder +var plugin = $tw.loadPluginFolder(path.join(__dirname, "dist")); +// put the plugin tiddlers back into the tiddlers field +$tw.utils.extend(plugin, JSON.parse(plugin.text)); +// remove the text field +delete plugin.text; +// write the bundled plugin to file +output.forEach(output => { + // create the directory tree + if (!fs.existsSync(path.dirname(output))) { + fs.mkdirSync(path.dirname(output), { recursive: true }); + } + fs.writeFileSync(output, JSON.stringify(plugin, null, "\t")); +}) + diff --git a/dist/plugin.info b/dist/plugin.info index 12ce5cb..75fe2b1 100644 --- a/dist/plugin.info +++ b/dist/plugin.info @@ -1,9 +1,9 @@ -{ - "title": "$:/plugins/twcloud/tiddlyweb-sse", - "name": "TiddlyWeb SSE", - "description": "Sync changes immediately from the browser to TW5 (node.js) using SSE", - "list": "readme", - "plugin-priority": 10, - "license": "MIT - Copyright Arlen Beiler 2021", - "version": "1.0.1" -} +{ + "title": "$:/plugins/twcloud/tiddlyweb-sse", + "name": "TiddlyWeb SSE", + "description": "Sync changes immediately from the browser to TW5 (node.js) using SSE", + "list": "readme", + "plugin-priority": 10, + "license": "MIT - Copyright Arlen Beiler 2021", + "version": "1.0.2" +} diff --git a/dist/server-sent-events.js.meta b/dist/server-sent-events.js.meta index 392542f..e6f43ff 100644 --- a/dist/server-sent-events.js.meta +++ b/dist/server-sent-events.js.meta @@ -1,3 +1,3 @@ -title: $:/core/modules/server/server-sent-events.js -type: application/javascript -module-type: library +title: $:/core/modules/server/server-sent-events.js +type: application/javascript +module-type: library diff --git a/dist/sse-client.js b/dist/sse-client.js index bd41261..3061f59 100644 --- a/dist/sse-client.js +++ b/dist/sse-client.js @@ -1,81 +1,81 @@ -/*\ -title: $:/plugins/twcloud/tiddlyweb-sse/sse-client.js -type: application/javascript -module-type: startup - -Miscellaneous startup logic for both the client and server. - -\*/ -(function() { - -/*jslint node: true, browser: true */ -/*global $tw: false */ -"use strict"; -var checks = [ - "$:/status/IsLoggedIn", - "$:/status/UserName", - "$:/status/IsAnonymous", - "$:/status/IsReadOnly" -]; -// Export name and synchronous status -exports.name = "tiddlyweb-sse-hook"; -exports.after = ["startup"]; -exports.platforms = ["browser"]; -exports.synchronous = true; -exports.startup = function() { - var source = null; - if(!$tw.syncer || !$tw.syncer.syncadaptor || $tw.syncer.syncadaptor.name !== "tiddlyweb") {return;} - $tw.wiki.addEventListener("change",function(changes) { - if(checks.filter(e => changes[e]).length === 0) {return;} - // check if we have a previous one and close it if we do - if(source && source.readyState !== source.CLOSED) {source.close();} - // Get the mount point in case a path prefix is used - var host = $tw.syncer.syncadaptor.getHost(); - // Make sure it ends with a slash (it usually does) - if(host[host.length - 1] !== "/") {host += "/";} - // get the endpoint - var endpoint = host + "events/plugins/twcloud/tiddlyweb-sse/wiki-change"; - // set the syncer poll to one hour - $tw.syncer.pollTimerInterval = 1000 * 60 * 60; - // Setup the event listener - source = exports.setupSSE(endpoint,$tw.syncer); - }); -} - -function debounce(interval,callback) { - var timeout = null; - return function() { - clearTimeout(timeout); - timeout = setTimeout(callback,interval); - }; -} - -exports.setupSSE = function setupSSE(endpoint,syncer,refresh) { - if(window.EventSource) { - var source = new EventSource(endpoint,{withCredentials: true}); - var debouncedSync = debounce(syncer.throttleInterval,syncer.syncFromServer.bind(syncer)); - source.addEventListener("change",debouncedSync); - source.onerror = function() { - // return if we're reconnecting because that's handled automatically - if(source.readyState === source.CONNECTING) {return;} - // wait for the errorRetryInterval - setTimeout(function() { - //call this function to set everything up again - exports.setupSSE(endpoint,syncer,true); - },syncer.errorRetryInterval); - }; - source.onopen = function() { - // only run this on first open, not on auto reconnect - source.onopen = function() {}; - // once we've properly opened, disable polling - syncer.wiki.addTiddler({title: syncer.titleSyncDisablePolling,text: "yes"}); - //sync from server manually here to make sure we stay up to date - if(refresh) {syncer.syncFromServer();} - } - return source; - } else { - return null; - } -} - -})(); +/*\ +title: $:/plugins/twcloud/tiddlyweb-sse/sse-client.js +type: application/javascript +module-type: startup + +Miscellaneous startup logic for both the client and server. + +\*/ +(function() { + +/*jslint node: true, browser: true */ +/*global $tw: false */ +"use strict"; +var checks = [ + "$:/status/IsLoggedIn", + "$:/status/UserName", + "$:/status/IsAnonymous", + "$:/status/IsReadOnly" +]; +// Export name and synchronous status +exports.name = "tiddlyweb-sse-hook"; +exports.after = ["startup"]; +exports.platforms = ["browser"]; +exports.synchronous = true; +exports.startup = function() { + var source = null; + if(!$tw.syncer || !$tw.syncer.syncadaptor || $tw.syncer.syncadaptor.name !== "tiddlyweb") {return;} + $tw.wiki.addEventListener("change",function(changes) { + if(checks.filter(e => changes[e]).length === 0) {return;} + // check if we have a previous one and close it if we do + if(source && source.readyState !== source.CLOSED) {source.close();} + // Get the mount point in case a path prefix is used + var host = $tw.syncer.syncadaptor.getHost(); + // Make sure it ends with a slash (it usually does) + if(host[host.length - 1] !== "/") {host += "/";} + // get the endpoint + var endpoint = host + "events/plugins/twcloud/tiddlyweb-sse/wiki-change"; + // set the syncer poll to one hour + $tw.syncer.pollTimerInterval = 1000 * 60 * 60; + // Setup the event listener + source = exports.setupSSE(endpoint,$tw.syncer); + }); +} + +function debounce(interval,callback) { + var timeout = null; + return function() { + clearTimeout(timeout); + timeout = setTimeout(callback,interval); + }; +} + +exports.setupSSE = function setupSSE(endpoint,syncer,refresh) { + if(window.EventSource) { + var source = new EventSource(endpoint,{withCredentials: true}); + var debouncedSync = debounce(syncer.throttleInterval,syncer.syncFromServer.bind(syncer)); + source.addEventListener("change",debouncedSync); + source.onerror = function() { + // return if we're reconnecting because that's handled automatically + if(source.readyState === source.CONNECTING) {return;} + // wait for the errorRetryInterval + setTimeout(function() { + //call this function to set everything up again + exports.setupSSE(endpoint,syncer,true); + },syncer.errorRetryInterval); + }; + source.onopen = function() { + // only run this on first open, not on auto reconnect + source.onopen = function() {}; + // once we've properly opened, disable polling + syncer.wiki.addTiddler({title: syncer.titleSyncDisablePolling,text: "yes"}); + //sync from server manually here to make sure we stay up to date + if(refresh) {syncer.syncFromServer();} + } + return source; + } else { + return null; + } +} + +})(); diff --git a/dist/sse-server.js b/dist/sse-server.js index 2595549..4280a19 100644 --- a/dist/sse-server.js +++ b/dist/sse-server.js @@ -1,77 +1,77 @@ -/*\ -title: $:/plugins/twcloud/tiddlyweb-sse/sse-server.js -type: application/javascript -module-type: route - -GET /events/plugins/twcloud/tiddlyweb/(channel) - -\*/ -(function() { - -/*jslint node: true, browser: true */ -/*global $tw: false */ -"use strict"; - -var wikis = []; - -// Import the Journal class -var Journal = require("$:/core/modules/server/server-sent-events.js").Journal; - -/* -Setup up the array for this wiki and add the change listener -*/ -function setupWiki(wiki) { - function filter(conn) { - return conn.state.wiki === wiki; - } - // Listen to change events for this wiki - wiki.addEventListener("change",function(changes) { - var jsonChanges = JSON.stringify(changes); - eventServer.emitEvent("wiki-change","change",jsonChanges,filter); - }); - wikis.push(wiki); -} - -/* -Setup this particular wiki if we haven't seen it before -*/ -function ensureChannelSetup(channel,wiki) { - // setup wikis for the wiki-change channel - if(channel === "wiki-change" && wikis.indexOf(wiki) === -1) { setupWiki(wiki); } -} -/** @type {import('../server-sent-events').Journal} */ -var eventServer = new Journal(); - -// this filter is called for the emitter route, which recieves -// messages from clients and forwards them to all listeners. It -// does not affect messages sent directly by the server. -// We don't use it in tiddlyweb so just set it to false -eventServer.emitterFilter = function(sender) { - // do not allow clients to broadcast - // they can't anyway unless a route is specified - return function() { return false; }; -} - -if(!$tw.wiki.getTiddler("$:/plugins/tiddlywiki/tiddlyweb")) { - $tw.utils.warning("Warning: Plugin \"twcloud/tiddlyweb-sse\" specified but \"tiddlywiki/tiddlyweb\" is missing"); -} - -// Export the route definition for this server sent events handler. -// We don't need an emitter route, otherwise we could put the common -// instance in a library tiddler export and require it in both files. - -module.exports = eventServer.handlerExports( - "plugins/twcloud/tiddlyweb-sse", - function(request,response,state) { - if(state.params[0] !== "wiki-change") { - response.writeHead(404); - response.end(); - return; - } - // remove the socket timeout - request.setTimeout(0); - ensureChannelSetup(state.params[0],state.wiki); - eventServer.handler(request,response,state); - } -); +/*\ +title: $:/plugins/twcloud/tiddlyweb-sse/sse-server.js +type: application/javascript +module-type: route + +GET /events/plugins/twcloud/tiddlyweb/(channel) + +\*/ +(function() { + +/*jslint node: true, browser: true */ +/*global $tw: false */ +"use strict"; + +var wikis = []; + +// Import the Journal class +var Journal = require("$:/core/modules/server/server-sent-events.js").Journal; + +/* +Setup up the array for this wiki and add the change listener +*/ +function setupWiki(wiki) { + function filter(conn) { + return conn.state.wiki === wiki; + } + // Listen to change events for this wiki + wiki.addEventListener("change",function(changes) { + var jsonChanges = JSON.stringify(changes); + eventServer.emitEvent("wiki-change","change",jsonChanges,filter); + }); + wikis.push(wiki); +} + +/* +Setup this particular wiki if we haven't seen it before +*/ +function ensureChannelSetup(channel,wiki) { + // setup wikis for the wiki-change channel + if(channel === "wiki-change" && wikis.indexOf(wiki) === -1) { setupWiki(wiki); } +} +/** @type {import('../server-sent-events').Journal} */ +var eventServer = new Journal(); + +// this filter is called for the emitter route, which recieves +// messages from clients and forwards them to all listeners. It +// does not affect messages sent directly by the server. +// We don't use it in tiddlyweb so just set it to false +eventServer.emitterFilter = function(sender) { + // do not allow clients to broadcast + // they can't anyway unless a route is specified + return function() { return false; }; +} + +if(!$tw.wiki.getTiddler("$:/plugins/tiddlywiki/tiddlyweb")) { + $tw.utils.warning("Warning: Plugin \"twcloud/tiddlyweb-sse\" specified but \"tiddlywiki/tiddlyweb\" is missing"); +} + +// Export the route definition for this server sent events handler. +// We don't need an emitter route, otherwise we could put the common +// instance in a library tiddler export and require it in both files. + +module.exports = eventServer.handlerExports( + "plugins/twcloud/tiddlyweb-sse", + function(request,response,state) { + if(state.params[0] !== "wiki-change") { + response.writeHead(404); + response.end(); + return; + } + // remove the socket timeout + request.setTimeout(0); + ensureChannelSetup(state.params[0],state.wiki); + eventServer.handler(request,response,state); + } +); })(); \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index 936326e..595eead 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,23 +1,63 @@ -{ - "name": "tiddlyweb-sse", - "version": "1.0.0", - "lockfileVersion": 1, - "requires": true, - "dependencies": { - "@types/node": { - "version": "16.3.2", - "resolved": "https://registry.npmjs.org/@types/node/-/node-16.3.2.tgz", - "integrity": "sha512-jJs9ErFLP403I+hMLGnqDRWT0RYKSvArxuBVh2veudHV7ifEC1WAmjJADacZ7mRbA2nWgHtn8xyECMAot0SkAw==" - }, - "tiddlywiki": { - "version": "5.1.23", - "resolved": "https://registry.npmjs.org/tiddlywiki/-/tiddlywiki-5.1.23.tgz", - "integrity": "sha512-U5P6CdVncHqfoMRAYGj8NxqbP2JJICdLre+jAlXzV8nhllRL9ZFIE0y80wASXc6xip6++TA67Pam7+FJ73A1Vw==" - }, - "typescript": { - "version": "4.3.5", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.3.5.tgz", - "integrity": "sha512-DqQgihaQ9cUrskJo9kIyW/+g0Vxsk8cDtZ52a3NGh0YNTfpUSArXSohyUGnvbPazEPLu398C0UxmKSOrPumUzA==" - } - } -} +{ + "name": "tiddlyweb-sse", + "version": "1.0.2", + "lockfileVersion": 2, + "requires": true, + "packages": { + "": { + "name": "tiddlyweb-sse", + "version": "1.0.2", + "license": "MIT", + "dependencies": { + "@types/node": "^16.3.2", + "tiddlywiki": "^5.2.7", + "typescript": "^4.9.5" + } + }, + "node_modules/@types/node": { + "version": "16.3.2", + "resolved": "https://registry.npmjs.org/@types/node/-/node-16.3.2.tgz", + "integrity": "sha512-jJs9ErFLP403I+hMLGnqDRWT0RYKSvArxuBVh2veudHV7ifEC1WAmjJADacZ7mRbA2nWgHtn8xyECMAot0SkAw==" + }, + "node_modules/tiddlywiki": { + "version": "5.2.7", + "resolved": "https://registry.npmjs.org/tiddlywiki/-/tiddlywiki-5.2.7.tgz", + "integrity": "sha512-7Hi8IF/6iIdTWpX6tJ955isy6n0JShCh1fu27SX9jSa0JuP3JUg40Hgb9g/iNV5oK5dxstFZImjAeVaXOPSdiw==", + "bin": { + "tiddlywiki": "tiddlywiki.js" + }, + "engines": { + "node": ">=0.8.2" + } + }, + "node_modules/typescript": { + "version": "4.9.5", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.9.5.tgz", + "integrity": "sha512-1FXk9E2Hm+QzZQ7z+McJiHL4NW1F2EzMu9Nq9i3zAaGqibafqYwCVU6WyWAuyQRRzOlxou8xZSyXLEN8oKj24g==", + "bin": { + "tsc": "bin/tsc", + "tsserver": "bin/tsserver" + }, + "engines": { + "node": ">=4.2.0" + } + } + }, + "dependencies": { + "@types/node": { + "version": "16.3.2", + "resolved": "https://registry.npmjs.org/@types/node/-/node-16.3.2.tgz", + "integrity": "sha512-jJs9ErFLP403I+hMLGnqDRWT0RYKSvArxuBVh2veudHV7ifEC1WAmjJADacZ7mRbA2nWgHtn8xyECMAot0SkAw==" + }, + "tiddlywiki": { + "version": "5.2.7", + "resolved": "https://registry.npmjs.org/tiddlywiki/-/tiddlywiki-5.2.7.tgz", + "integrity": "sha512-7Hi8IF/6iIdTWpX6tJ955isy6n0JShCh1fu27SX9jSa0JuP3JUg40Hgb9g/iNV5oK5dxstFZImjAeVaXOPSdiw==" + }, + "typescript": { + "version": "4.9.5", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.9.5.tgz", + "integrity": "sha512-1FXk9E2Hm+QzZQ7z+McJiHL4NW1F2EzMu9Nq9i3zAaGqibafqYwCVU6WyWAuyQRRzOlxou8xZSyXLEN8oKj24g==" + } + } +} diff --git a/package.json b/package.json index 7c3c653..36db639 100644 --- a/package.json +++ b/package.json @@ -1,25 +1,25 @@ -{ - "name": "tiddlyweb-sse", - "version": "1.0.0", - "description": "This plugin runs in both the browser and server to sync changes immediately instead of waiting for polling. It adds a route to the server which sends server-sent events to the client, and loads an `EventSource` in the client to call `$tw.syncer.syncFromServer()`.", - "main": "server-sent-events.js", - "scripts": { - "build": "node build.js", - "serve": "tiddlywiki +plugins/tiddlywiki/tiddlyweb +plugins/tiddlywiki/filesystem ++dist . --listen" - }, - "repository": { - "type": "git", - "url": "git+https://github.com/twcloud/tiddlyweb-sse.git" - }, - "author": "", - "license": "MIT", - "bugs": { - "url": "https://github.com/twcloud/tiddlyweb-sse/issues" - }, - "homepage": "https://github.com/twcloud/tiddlyweb-sse#readme", - "dependencies": { - "@types/node": "^16.3.2", - "tiddlywiki": "^5.1.23", - "typescript": "^4.3.5" - } -} +{ + "name": "tiddlyweb-sse", + "version": "1.0.2", + "description": "This plugin runs in both the browser and server to sync changes immediately instead of waiting for polling. It adds a route to the server which sends server-sent events to the client, and loads an `EventSource` in the client to call `$tw.syncer.syncFromServer()`.", + "main": "server-sent-events.js", + "scripts": { + "build": "node build.js", + "serve": "tiddlywiki +plugins/tiddlywiki/tiddlyweb +plugins/tiddlywiki/filesystem ++dist . --listen" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/twcloud/tiddlyweb-sse.git" + }, + "author": "", + "license": "MIT", + "bugs": { + "url": "https://github.com/twcloud/tiddlyweb-sse/issues" + }, + "homepage": "https://github.com/twcloud/tiddlyweb-sse#readme", + "dependencies": { + "@types/node": "^16.3.2", + "tiddlywiki": "^5.2.7", + "typescript": "^4.9.5" + } +} diff --git a/server-sent-events.ts b/server-sent-events.ts index 4f75982..e02ea2e 100644 --- a/server-sent-events.ts +++ b/server-sent-events.ts @@ -1,242 +1,242 @@ -/*jslint node: true, browser: true */ -/*global $tw: false */ - -import { IncomingMessage, ServerResponse } from "http"; -import { UrlWithStringQuery } from "url"; -export type ServerHandlerType = ( - request: IncomingMessage, - response: ServerResponse, - state: HandlerState -) => void; -export interface RouteDef { - bodyFormat: F; - method: string; - path: RegExp; - handler: ServerHandlerType; -} -export interface HandlerState { - url: UrlWithStringQuery; - data: T extends "string" ? string : T extends "buffer" ? Buffer : undefined; - params: RegExpExecArray; -} -declare const $tw: any; -export class Journal { - - constructor(private JOURNALAGE = 5 * 60 * 1000) { - - } - - connections: Record = {}; - entryIDs: Record = {}; - records: Record = {}; - responseHeaders: Record = {}; - cleanJournal(ts: number, channel: string) { - let maxage = ts - this.JOURNALAGE; - while (this.records[channel][0].Timestamp < maxage) { - this.records[channel].shift(); - } - } - - initJournal(key: string) { - if (!this.connections[key]) { this.connections[key] = []; } - if (!this.records[key]) { this.records[key] = [new JournalRecord("", "", 0, Date.now())]; } - if (!this.entryIDs[key]) { this.entryIDs[key] = 1; } - } - - handleConnection(conn: SSEClient) { - let channel = conn.channel; - this.initJournal(channel); - - if (conn.request.headers["last-event-id"]) { - let id = conn.request.headers["last-event-id"]; - let found = false; - // find the specified event ID the client last recieved and return everything since then - for (let i = 0; i < this.records[channel].length; i++) { - let tag = this.records[channel][i].EventIDString; - // return all records after the record that was found - if (found) { conn.writeJournalRecord(this.records[channel][i]); } - // we don't need to send the tag here because it is a reconnect and already has it - else if (tag === id) { found = true; conn.start(200, this.responseHeaders); } - } - // If not found return 409 Conflict since that event id is not found - // this way the client can retry manually if needed. - if (found == false) { conn.start(409); conn.end(); return; } - } else { - let index = this.records[channel].length - 1; - let latest = index > -1 ? this.records[channel][index] : null; - conn.start(200, this.responseHeaders, latest?.EventIDString); - } - this.connections[channel].push(conn); - conn.onended = this.handleConnectionEnded.bind(this, conn); - } - handleConnectionEnded(conn: SSEClient) { - this.connections[conn.channel].splice(this.connections[conn.channel].indexOf(conn), 1); - } - handler(request: IncomingMessage, response: ServerResponse, state: HandlerState<"stream">) { - if (true/* this.isEventStreamRequest(request) */) { - this.handleConnection(new SSEClient(request, response, state)); - } else { - response.writeHead(406, "Not Acceptable"); - response.end(); - } - } - handlerExports(prefix: string, handler: Journal["handler"] = this.handler.bind(this)): RouteDef<"stream"> { - return { - method: "GET", - path: new RegExp("^/events/" + prefix + "/([^/]+)$"), - bodyFormat: "stream", - handler - }; - } - - isEventStreamRequest(request: IncomingMessage) { - return request.headers.accept && - request.headers.accept.match(/^text\/event-stream/); - } - - emitEvent(channel: string, type: string, msg: string, filter: (conn: AnyClient) => boolean = (a) => true) { - let ts = Date.now(); - let id = this.entryIDs[channel]++; - let data: JournalRecord = new JournalRecord(type, msg, id, ts); - var success = new Array(this.connections[channel].length); - this.connections[channel].forEach((conn, i) => { - success[i] = !filter(conn) || conn.writeJournalRecord(data); - }); - for (let i = success.length - 1; i > -1; i--) { - if (!success[i]) { - this.connections[channel].splice(i, 1); - } - } - - this.records[channel].push(data); - this.cleanJournal(data.Timestamp, channel); - return data; - } - repeater(request: IncomingMessage, response: ServerResponse, state: HandlerState<"string">) { - const conn = { request, response, state }; - const channel = state.params[0]; - this.initJournal(channel); - let event = state.params[1]; - let data = this.emitEvent(channel, event, state.data, this.repeaterFilter(conn)); - response.writeHead(200); - response.write(data.EventIDString); - response.end(); - } - repeaterFilter = (conn: AnyClient<"string">) => (conn: AnyClient<"stream">) => true; - repeaterExports(method: string, prefix: string, handler = this.repeater.bind(this)): RouteDef<"string"> { - return { - bodyFormat: "string", method, handler, - path: new RegExp("^/events/" + prefix + "/([^/]+)/([^/]+)$") - }; - } -} -export class JournalRecord { - - public get EventIDString() { - return this.Timestamp.toString() + this.EntryID.toString() - } - - constructor( - public Type: string, - public Data: string, - public EntryID: number, - public Timestamp: number - ) { } - - -} - -export type SSEHandlerType = (state: SSEClient) => void; - -export interface AnyClient { - request: IncomingMessage; - response: ServerResponse; - state: HandlerState; -} - -export class SSEClient implements AnyClient<"stream"> { - static get retryInterval(): number { - return $tw.Syncer.prototype.errorRetryInterval; - } - get channel() { - return this.state.params[0]; - } - constructor( - public request: IncomingMessage, - public response: ServerResponse, - public state: HandlerState<"stream"> - ) { - response.on("error", this.onerror.bind(this)); - response.on("close", this.onclose.bind(this)); - } - onerror(err: any) { - console.log("response error", err.message); - this.response.destroy(); - this.onended && this.onended(); - } - onclose() { - this.end(); - this.onended && this.onended(); - } - onended?: () => void; - start( - statusCode: number = 200, - headers: Record = {}, - eventID: string = "" - ) { - if (this.ended()) { return false; } - - this.response.writeHead(statusCode, $tw.utils.extend({ - "Content-Type": "text/event-stream", - "Cache-Control": "no-cache", - 'Connection': 'keep-alive', - }, headers)); - - // write the retry interval and event id immediately - this.write("", "", eventID); - - // setTimeout(() => { this.end(); }, 10000); - - return true; - } - - writeJournalRecord(data: JournalRecord) { - return this.write(data.Type, data.Data, data.EventIDString); - } - - write(event: string, data: string, eventID: string) { - if (this.ended()) { return false; } - - if (typeof event !== "string" || event.indexOf("\n") !== -1) { - throw new Error("Type must be a single-line string"); - } - if (typeof data !== "string") { - throw new Error("Data must be a string"); - } - - this.response.write( - (event ? "event: " + event + "\n" : "") + - (data ? data.split('\n').map(e => "data: " + e + "\n").join('') : "") + - (eventID ? "id: " + eventID + "\n" : "") + - ("retry: " + SSEClient.retryInterval.toString() + "\n") + - "\n", "utf8"); - - return true; - } - - end() { - if (this.ended()) { return false; } - this.response.end(); - return true; - } - - ended() { - var res = false; - if (typeof this.response.writableEnded === "boolean") { - res = res || this.response.writableEnded; - } else if (typeof this.response.finished === "boolean") { - res = res || this.response.finished; - } - return res; - } +/*jslint node: true, browser: true */ +/*global $tw: false */ + +import { IncomingMessage, ServerResponse } from "http"; +import { UrlWithStringQuery } from "url"; +export type ServerHandlerType = ( + request: IncomingMessage, + response: ServerResponse, + state: HandlerState +) => void; +export interface RouteDef { + bodyFormat: F; + method: string; + path: RegExp; + handler: ServerHandlerType; +} +export interface HandlerState { + url: UrlWithStringQuery; + data: T extends "string" ? string : T extends "buffer" ? Buffer : undefined; + params: RegExpExecArray; +} +declare const $tw: any; +export class Journal { + + constructor(private JOURNALAGE = 5 * 60 * 1000) { + + } + + connections: Record = {}; + entryIDs: Record = {}; + records: Record = {}; + responseHeaders: Record = {}; + cleanJournal(ts: number, channel: string) { + let maxage = ts - this.JOURNALAGE; + while (this.records[channel][0].Timestamp < maxage) { + this.records[channel].shift(); + } + } + + initJournal(key: string) { + if (!this.connections[key]) { this.connections[key] = []; } + if (!this.records[key]) { this.records[key] = [new JournalRecord("", "", 0, Date.now())]; } + if (!this.entryIDs[key]) { this.entryIDs[key] = 1; } + } + + handleConnection(conn: SSEClient) { + let channel = conn.channel; + this.initJournal(channel); + + if (conn.request.headers["last-event-id"]) { + let id = conn.request.headers["last-event-id"]; + let found = false; + // find the specified event ID the client last recieved and return everything since then + for (let i = 0; i < this.records[channel].length; i++) { + let tag = this.records[channel][i].EventIDString; + // return all records after the record that was found + if (found) { conn.writeJournalRecord(this.records[channel][i]); } + // we don't need to send the tag here because it is a reconnect and already has it + else if (tag === id) { found = true; conn.start(200, this.responseHeaders); } + } + // If not found return 409 Conflict since that event id is not found + // this way the client can retry manually if needed. + if (found == false) { conn.start(409); conn.end(); return; } + } else { + let index = this.records[channel].length - 1; + let latest = index > -1 ? this.records[channel][index] : null; + conn.start(200, this.responseHeaders, latest?.EventIDString); + } + this.connections[channel].push(conn); + conn.onended = this.handleConnectionEnded.bind(this, conn); + } + handleConnectionEnded(conn: SSEClient) { + this.connections[conn.channel].splice(this.connections[conn.channel].indexOf(conn), 1); + } + handler(request: IncomingMessage, response: ServerResponse, state: HandlerState<"stream">) { + if (true/* this.isEventStreamRequest(request) */) { + this.handleConnection(new SSEClient(request, response, state)); + } else { + response.writeHead(406, "Not Acceptable"); + response.end(); + } + } + handlerExports(prefix: string, handler: Journal["handler"] = this.handler.bind(this)): RouteDef<"stream"> { + return { + method: "GET", + path: new RegExp("^/events/" + prefix + "/([^/]+)$"), + bodyFormat: "stream", + handler + }; + } + + isEventStreamRequest(request: IncomingMessage) { + return request.headers.accept && + request.headers.accept.match(/^text\/event-stream/); + } + + emitEvent(channel: string, type: string, msg: string, filter: (conn: AnyClient) => boolean = (a) => true) { + let ts = Date.now(); + let id = this.entryIDs[channel]++; + let data: JournalRecord = new JournalRecord(type, msg, id, ts); + var success = new Array(this.connections[channel].length); + this.connections[channel].forEach((conn, i) => { + success[i] = !filter(conn) || conn.writeJournalRecord(data); + }); + for (let i = success.length - 1; i > -1; i--) { + if (!success[i]) { + this.connections[channel].splice(i, 1); + } + } + + this.records[channel].push(data); + this.cleanJournal(data.Timestamp, channel); + return data; + } + repeater(request: IncomingMessage, response: ServerResponse, state: HandlerState<"string">) { + const conn = { request, response, state }; + const channel = state.params[0]; + this.initJournal(channel); + let event = state.params[1]; + let data = this.emitEvent(channel, event, state.data, this.repeaterFilter(conn)); + response.writeHead(200); + response.write(data.EventIDString); + response.end(); + } + repeaterFilter = (conn: AnyClient<"string">) => (conn: AnyClient<"stream">) => true; + repeaterExports(method: string, prefix: string, handler = this.repeater.bind(this)): RouteDef<"string"> { + return { + bodyFormat: "string", method, handler, + path: new RegExp("^/events/" + prefix + "/([^/]+)/([^/]+)$") + }; + } +} +export class JournalRecord { + + public get EventIDString() { + return this.Timestamp.toString() + this.EntryID.toString() + } + + constructor( + public Type: string, + public Data: string, + public EntryID: number, + public Timestamp: number + ) { } + + +} + +export type SSEHandlerType = (state: SSEClient) => void; + +export interface AnyClient { + request: IncomingMessage; + response: ServerResponse; + state: HandlerState; +} + +export class SSEClient implements AnyClient<"stream"> { + static get retryInterval(): number { + return $tw.Syncer.prototype.errorRetryInterval; + } + get channel() { + return this.state.params[0]; + } + constructor( + public request: IncomingMessage, + public response: ServerResponse, + public state: HandlerState<"stream"> + ) { + response.on("error", this.onerror.bind(this)); + response.on("close", this.onclose.bind(this)); + } + onerror(err: any) { + console.log("response error", err.message); + this.response.destroy(); + this.onended && this.onended(); + } + onclose() { + this.end(); + this.onended && this.onended(); + } + onended?: () => void; + start( + statusCode: number = 200, + headers: Record = {}, + eventID: string = "" + ) { + if (this.ended()) { return false; } + + this.response.writeHead(statusCode, $tw.utils.extend({ + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + 'Connection': 'keep-alive', + }, headers)); + + // write the retry interval and event id immediately + this.write("", "", eventID); + + // setTimeout(() => { this.end(); }, 10000); + + return true; + } + + writeJournalRecord(data: JournalRecord) { + return this.write(data.Type, data.Data, data.EventIDString); + } + + write(event: string, data: string, eventID: string) { + if (this.ended()) { return false; } + + if (typeof event !== "string" || event.indexOf("\n") !== -1) { + throw new Error("Type must be a single-line string"); + } + if (typeof data !== "string") { + throw new Error("Data must be a string"); + } + + this.response.write( + (event ? "event: " + event + "\n" : "") + + (data ? data.split('\n').map(e => "data: " + e + "\n").join('') : "") + + (eventID ? "id: " + eventID + "\n" : "") + + ("retry: " + SSEClient.retryInterval.toString() + "\n") + + "\n", "utf8"); + + return true; + } + + end() { + if (this.ended()) { return false; } + this.response.end(); + return true; + } + + ended() { + var res = false; + if (typeof this.response.writableEnded === "boolean") { + res = res || this.response.writableEnded; + } else if (typeof this.response.finished === "boolean") { + res = res || this.response.finished; + } + return res; + } } \ No newline at end of file diff --git a/tiddlywiki.info b/tiddlywiki.info index 087ee90..eeeab4c 100644 --- a/tiddlywiki.info +++ b/tiddlywiki.info @@ -1,10 +1,10 @@ -{ - "description": "Basic client-server edition", - "plugins": [ - "tiddlywiki/tiddlyweb" - ], - "themes": [ - "tiddlywiki/vanilla", - "tiddlywiki/snowwhite" - ] +{ + "description": "Basic client-server edition", + "plugins": [ + "tiddlywiki/tiddlyweb" + ], + "themes": [ + "tiddlywiki/vanilla", + "tiddlywiki/snowwhite" + ] } \ No newline at end of file diff --git a/tsconfig.json b/tsconfig.json index 4dd7b8c..a69eb96 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,16 +1,16 @@ -{ - "compilerOptions": { - "target": "es5", - "module": "commonjs", - "strict": true, - "noImplicitAny": false, - "esModuleInterop": false, - "outDir": "dist", - "lib": [ - "es5" - ] - }, - "files": [ - "./server-sent-events.ts" - ] +{ + "compilerOptions": { + "target": "es5", + "module": "commonjs", + "strict": true, + "noImplicitAny": false, + "esModuleInterop": false, + "outDir": "dist", + "lib": [ + "es5" + ] + }, + "files": [ + "./server-sent-events.ts" + ] } \ No newline at end of file