Skip to content

Commit

Permalink
Merge pull request #4 from duart38/async_handler
Browse files Browse the repository at this point in the history
Async handler support
  • Loading branch information
duart38 authored Sep 11, 2022
2 parents 2682042 + 7b5cb65 commit 772a48b
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 75 deletions.
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
3. Allows you to Thread already existing functions
5. Allows module imports inside the worker

## Example
## Examples
> See examples folder for more examples
```typescript
let thread = new Thread<number>((e: MessageEvent)=>{
console.log('Worker: Message received from main script');
Expand Down Expand Up @@ -44,6 +46,22 @@ new Thread(someFunction, "module"); // thread an already existing function
new Thread(someFunction, "module", ['import Something from "../some.bundle.js";']); // thread with custom importing
```

**Async support**
```TypeScript
const thread = new Thread<string, number>(async (_) => {
console.log("Worker: Message received from main script");
// Some async logic...
await new Promise((ir) => setTimeout(ir, 2000));
return "DONE";
}, "module");

thread.onMessage((e) => {
console.log(`recived back from thread: ${e}`);
});

thread.postMessage(0);
```

## API

### Standard API
Expand Down
53 changes: 30 additions & 23 deletions Thread.bundle.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
// deno-fmt-ignore-file
// deno-lint-ignore-file
// This code was bundled using `deno bundle` and it's not recommended to edit it manually

class Thread {
worker;
imports;
blob;
blobURL = "";
stopped = false;
constructor(operation, type1, imports){
constructor(operation, type, imports){
imports?.forEach((v)=>{
if (v.endsWith(".ts'") || v.endsWith('.ts"')) {
throw new Error("Threaded imports do no support typescript files");
}
});
this.imports = imports || [];
this.blob = this.populateFile(operation);
this.blob.then(async (b)=>console.log(await b.text())
);
this.worker = this.makeWorker(type1);
this.worker = this.makeWorker(type);
}
async makeWorker(type) {
this.blobURL = URL.createObjectURL(await this.blob);
Expand All @@ -23,39 +25,45 @@ class Thread {
});
}
async populateFile(code) {
let imported = this.imports?.flatMap(async (val)=>(await this.copyDep(val)).join("\n")
);
const imported = this.imports?.flatMap(async (val)=>(await this.copyDep(val)).join("\n"));
return new Blob([
`\n ${(await Promise.all(imported)).join("\n")}\n \n var global = {};\n var userCode = ${code.toString()}\n \n onmessage = function(e) {\n postMessage(userCode(e, global));\n }\n \n `
`
${(await Promise.all(imported)).join("\n")}
var global = {};
var userCode = ${code.toString()}
onmessage = async function(e) {
postMessage(await userCode(e, global));
}
`
]);
}
async copyDep(str) {
var importPathRegex = /('|"|`)(.+\.js)(\1)/ig;
var importInsRegex = /(import( |))({.+}|.+)(from( |))/ig;
var matchedPath = importPathRegex.exec(str) || "";
var file = false;
var fqfn = "";
const importPathRegex = /('|"|`)(.+\.js)(\1)/ig;
const importInsRegex = /(import( |))({.+}|.+)(from( |))/ig;
const matchedPath = importPathRegex.exec(str) || "";
let file = false;
let fqfn = "";
if (!matchedPath[0].includes("http://") && !matchedPath[0].includes("https://")) {
file = true;
fqfn = matchedPath[0].replaceAll(/('|"|`)/ig, "");
}
var matchedIns = importInsRegex.exec(str) || "";
const matchedIns = importInsRegex.exec(str) || "";
if (!matchedIns) {
throw new Error("The import instruction seems to be unreadable try formatting it, for example: \n" + "import { something } from './somet.js' \n ");
}
if (file) {
let x = await import(fqfn);
return Object.keys(x).map((v)=>x[v].toString()
);
const x = await import(fqfn);
return Object.keys(x).map((v)=>x[v].toString());
} else {
let x = await import(matchedPath[0].replaceAll(/'|"/g, ""));
return Object.keys(x).map((v)=>x[v].toString()
);
const x1 = await import(matchedPath[0].replaceAll(/'|"/g, ""));
return Object.keys(x1).map((v)=>x1[v].toString());
}
}
postMessage(msg) {
this.worker.then((w)=>w.postMessage(msg)
);
this.worker.then((w)=>w.postMessage(msg));
return this;
}
async stop() {
Expand All @@ -67,8 +75,7 @@ class Thread {
URL.revokeObjectURL(this.blobURL);
}
onMessage(callback) {
this.worker.then((w)=>w.onmessage = (e)=>callback(e.data)
);
this.worker.then((w)=>w.onmessage = (e)=>callback(e.data));
return this;
}
}
Expand Down
37 changes: 21 additions & 16 deletions Thread.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
export default class Thread<T> {
/**
* > Type T -> return type
*
* > Type K -> data type of MessageEvent
*/
export default class Thread<T = unknown, K = unknown> {
public worker: Promise<Worker>;
private imports: Array<string>;
private blob: Promise<Blob>;
private blobURL: string = "";
private blobURL= "";
/**
* Tells if the worker has been stopped
*/
Expand All @@ -13,7 +18,7 @@ export default class Thread<T> {
* @param imports Modules to import in the worker. only JS files allowed (over the net import allowed)
*/
constructor(
operation: (e: MessageEvent, globalObject?:{}) => T,
operation: (e: MessageEvent<K>, globalObject?: Record<string, unknown>) => T | Promise<T>,
type?: "classic" | "module",
imports?: Array<string>,
) {
Expand All @@ -24,7 +29,6 @@ export default class Thread<T> {
});
this.imports = imports || [];
this.blob = this.populateFile(operation);
this.blob.then(async (b)=>console.log(await b.text()));
this.worker = this.makeWorker(type);
}

Expand All @@ -38,16 +42,17 @@ export default class Thread<T> {
);
}

// deno-lint-ignore ban-types
private async populateFile(code: Function) {
let imported = this.imports?.flatMap(async (val) => (await this.copyDep(val)).join("\n"));
const imported = this.imports?.flatMap(async (val) => (await this.copyDep(val)).join("\n"));
return new Blob([`
${(await Promise.all(imported)).join("\n")}
var global = {};
var userCode = ${code.toString()}
onmessage = function(e) {
postMessage(userCode(e, global));
onmessage = async function(e) {
postMessage(await userCode(e, global));
}
`]);
Expand All @@ -58,11 +63,11 @@ export default class Thread<T> {
* @param str the import line (eg: import {som} from "lorem/ipsum.js";)
*/
private async copyDep(str: string) {
var importPathRegex = /('|"|`)(.+\.js)(\1)/ig; // for the path string ("lorem/ipsum.js")
var importInsRegex = /(import( |))({.+}|.+)(from( |))/ig; // for the instruction before the path (import {som} from)
var matchedPath = importPathRegex.exec(str) || "";
var file = false;
var fqfn = "";
const importPathRegex = /('|"|`)(.+\.js)(\1)/ig; // for the path string ("lorem/ipsum.js")
const importInsRegex = /(import( |))({.+}|.+)(from( |))/ig; // for the instruction before the path (import {som} from)
const matchedPath = importPathRegex.exec(str) || "";
let file = false;
let fqfn = "";

if (
!matchedPath[0].includes("http://") &&
Expand All @@ -71,7 +76,7 @@ export default class Thread<T> {
file = true;
fqfn = matchedPath[0].replaceAll(/('|"|`)/ig, "");
}
var matchedIns = importInsRegex.exec(str) || ""; // matchedIns[0] > import {sss} from
const matchedIns = importInsRegex.exec(str) || ""; // matchedIns[0] > import {sss} from

if (!matchedIns) {
throw new Error(
Expand All @@ -82,10 +87,10 @@ export default class Thread<T> {


if (file) {
let x = await import(fqfn); //Deno.realPathSync(fqfn)
const x = await import(fqfn); //Deno.realPathSync(fqfn)
return Object.keys(x).map((v)=>x[v].toString())
} else {
let x = await import(matchedPath[0].replaceAll(/'|"/g,""));
const x = await import(matchedPath[0].replaceAll(/'|"/g,""));
return Object.keys(x).map((v)=>x[v].toString())
}
}
Expand All @@ -94,7 +99,7 @@ export default class Thread<T> {
* Sends data to the Thread
* @param msg
*/
public postMessage(msg: any): this {
public postMessage(msg: K): this {
this.worker.then(w=>w.postMessage(msg));
return this;
}
Expand Down
4 changes: 2 additions & 2 deletions egg.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
"stable": true,
"homepage": "https://github.com/duart38/Thread",
"entry": "./Thread.ts",
"version": "3.0.0",
"releaseType": "patch",
"version": "4.0.0",
"releaseType": "major",
"files": [
"./LICENSE",
"./README.md",
Expand Down
7 changes: 3 additions & 4 deletions examples/example_allot_of_threads.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import Thread from "../Thread.ts";

let count = 13;
const count = 13;

function postMessage(e: any) {}
function postMessage(_e: unknown) {}

function tester() {
let i = 0;
setInterval(() => {
postMessage(0);
}, 500);
Expand All @@ -14,6 +13,6 @@ function tester() {
}

for (let i = 0; i < count; i++) {
new Thread(tester, "module").onMessage((d) => console.log(`thread -> ${i}`))
new Thread(tester, "module").onMessage((_) => console.log(`thread -> ${i}`))
.postMessage(0);
}
26 changes: 26 additions & 0 deletions examples/example_async_support.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import Thread from "../Thread.ts";

/**
* Thanks to @praswicaksono for the suggestion
* -> https://github.com/praswicaksono
* -> https://github.com/duart38/Thread/issues/3
*/

const thread = new Thread<number, number[]>(async (e) => {
console.log("Worker: Message received from main script");
const result = e.data[0] * e.data[1];
await new Promise((resolve) => setTimeout(resolve, 5 * 1000))
if (isNaN(result)) {
return 0;
} else {
console.log("Worker: Posting message back to main script");
return (result);
}
}, "module");

thread.onMessage((e) => {
console.log(`recived back from thread: ${e}`);
});

thread.postMessage([10, 12]);
thread.postMessage([10, 10]);
14 changes: 7 additions & 7 deletions examples/example_calculateprimes.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import Thread from "../Thread.ts";

let count = 2; // number of threads to spawn
const count = 2; // number of threads to spawn

function postMessage(e: any) {} // stops the compiler from complaining that the method is not available.. this gets pasted in the worker
function postMessage(_e: unknown) {} // stops the compiler from complaining that the method is not available.. this gets pasted in the worker

function tester() {
function calculatePrimes() {
const iterations = 50;
const multiplier = 100000000000;
var primes = [];
for (var i = 0; i < iterations; i++) {
var candidate = i * (multiplier * Math.random());
var isPrime = true;
for (var c = 2; c <= Math.sqrt(candidate); ++c) {
const primes = [];
for (let i = 0; i < iterations; i++) {
const candidate = i * (multiplier * Math.random());
let isPrime = true;
for (let c = 2; c <= Math.sqrt(candidate); ++c) {
if (candidate % c === 0) {
// not prime
isPrime = false;
Expand Down
4 changes: 2 additions & 2 deletions examples/example_deno_worker.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import Thread from "../Thread.ts";
import Observe from "https://raw.githubusercontent.com/duart38/Observe/master/Observe.ts";

let tr = new Thread(
const tr = new Thread<string, string>(
(e) => {
let t = new Observe(e.data); // observable values
const t = new Observe(e.data); // observable values
return t.getValue();
},
"module",
Expand Down
4 changes: 2 additions & 2 deletions examples/example_importing.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import Thread from "../Thread.ts";
import { CallMe } from "../test_import.js";

let tr = new Thread(
(e) => {
const tr = new Thread(
(_e) => {
CallMe();
return "pong";
},
Expand Down
2 changes: 1 addition & 1 deletion examples/example_simple.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Thread from "../Thread.ts";

let thread = new Thread<number>((e: MessageEvent) => {
const thread = new Thread<number, number[]>((e) => {
console.log("Worker: Message received from main script");
const result = e.data[0] * e.data[1];
if (isNaN(result)) {
Expand Down
Loading

0 comments on commit 772a48b

Please sign in to comment.