Skip to content

Commit

Permalink
Convert worker-manager TCP server to an HTTP server that exposes queu…
Browse files Browse the repository at this point in the history
…e depth
  • Loading branch information
lukemelia committed Jan 21, 2025
1 parent a246bac commit 84cc7b2
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 101 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ Instead of running `pnpm start:base`, you can alternatively use `pnpm start:all`
| :4201 | `/seed` seed realm || 🚫 |
| :4202 | `/test` host test realm, `/node-test` node test realm || 🚫 |
| :4205 | `/test` realm for matrix client tests (playwright controlled) | 🚫 | 🚫 |
| :4210 | Development Worker Manager (spins up 1 worker by default) || 🚫 |
| :4211 | Test Worker Manager (spins up 1 worker by default) || 🚫 |
| :4212 | Test Worker Manager for matrix client tests (playwright controlled - 1 worker) || 🚫 |
| :4213 | Test Worker Manager for matrix client tests - base realm server (playwright controlled - 1 worker) || 🚫 |
| :4210 | Worker Manager (spins up 1 worker by default in development) || 🚫 |
| :4211 | Worker Manager (spins up 1 worker by default) || 🚫 |
| :4212 | Worker Manager for matrix client tests (playwright controlled - 1 worker) | | 🚫 |
| :4213 | Worker Manager for matrix client tests - base realm server (playwright controlled - 1 worker) || 🚫 |
| :5001 | Mail user interface for viewing emails sent to local SMTP || 🚫 |
| :5435 | Postgres DB || 🚫 |
| :8008 | Matrix synapse server || 🚫 |
Expand Down Expand Up @@ -223,7 +223,7 @@ There is a ember-freestyle component explorer available to assist with developme

1. `cd packages/boxel-ui/test-app`
2. `pnpm start`
3. Visit http://localhost:4210/ in your browser
3. Visit http://localhost:4220/ in your browser

## Boxel Motion Demo App

Expand Down
2 changes: 1 addition & 1 deletion packages/boxel-ui/test-app/.ember-cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ module.exports = {
Setting `disableAnalytics` to true will prevent any data from being sent.
*/
port: 4210,
port: 4220,
testPort: 7356,
disableAnalytics: false,
};
54 changes: 12 additions & 42 deletions packages/realm-server/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { NodeAdapter } from './node-realm';
import yargs from 'yargs';
import { RealmServer } from './server';
import { resolve } from 'path';
import { createConnection, type Socket } from 'net';
import { makeFastBootIndexRunner } from './fastboot';
import { shimExternals } from './lib/externals';
import * as Sentry from '@sentry/node';
Expand Down Expand Up @@ -331,49 +330,20 @@ let autoMigrate = migrateDB || undefined;
process.exit(-3);
});

let workerReadyDeferred: Deferred<boolean> | undefined;
async function waitForWorkerManager(port: number) {
const workerManager = await new Promise<Socket>((r) => {
let socket = createConnection({ port }, () => {
log.info(`Connected to worker manager on port ${port}`);
r(socket);
});
});

workerManager.on('data', (data) => {
let res = data.toString();
if (!workerReadyDeferred) {
throw new Error(
`received unsolicited message from worker manager on port ${port}`,
);
let isReady = false;
let timeout = Date.now() + 30_000;
do {
let response = await fetch(`http://localhost:${port}/`);
if (response.ok) {
let json = await response.json();
isReady = json.ready;
}
switch (res) {
case 'ready':
case 'not-ready':
workerReadyDeferred.fulfill(res === 'ready' ? true : false);
break;
default:
workerReadyDeferred.reject(
`unexpected response from worker manager: ${res}`,
);
}
});

try {
let isReady = false;
let timeout = Date.now() + 30_000;
do {
workerReadyDeferred = new Deferred();
workerManager.write('ready?');
isReady = await workerReadyDeferred.promise;
} while (!isReady && Date.now() < timeout);
if (!isReady) {
throw new Error(
`timed out trying to connect to worker manager on port ${port}`,
);
}
} finally {
workerManager.end();
} while (!isReady && Date.now() < timeout);
if (!isReady) {
throw new Error(
`timed out trying to waiting for worker manager to be ready on port ${port}`,
);
}
log.info('workers are ready');
}
1 change: 1 addition & 0 deletions packages/realm-server/scripts/start-worker-production.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
NODE_NO_WARNINGS=1 \
ts-node \
--transpileOnly worker-manager \
--port=4210 \
--allPriorityCount="${WORKER_ALL_PRIORITY_COUNT:-1}" \
--highPriorityCount="${WORKER_HIGH_PRIORITY_COUNT:-0}" \
--matrixURL='https://matrix.boxel.ai' \
Expand Down
1 change: 1 addition & 0 deletions packages/realm-server/scripts/start-worker-staging.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
NODE_NO_WARNINGS=1 \
ts-node \
--transpileOnly worker-manager \
--port=4210 \
--allPriorityCount="${WORKER_ALL_PRIORITY_COUNT:-1}" \
--highPriorityCount="${WORKER_HIGH_PRIORITY_COUNT:-0}" \
--matrixURL='https://matrix-staging.stack.cards' \
Expand Down
142 changes: 89 additions & 53 deletions packages/realm-server/worker-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@ import {
logger,
userInitiatedPriority,
systemInitiatedPriority,
query,
} from '@cardstack/runtime-common';
import yargs from 'yargs';
import * as Sentry from '@sentry/node';
import { createServer } from 'net';
import flattenDeep from 'lodash/flattenDeep';
import { spawn } from 'child_process';
import pluralize from 'pluralize';
import Koa from 'koa';
import Router from '@koa/router';
import { ecsMetadata, fullRequestURL, livenessCheck } from './middleware';
import { PgAdapter } from '@cardstack/postgres';

let log = logger('worker');
let log = logger('worker-manager');

const REALM_SECRET_SEED = process.env.REALM_SECRET_SEED;
if (!REALM_SECRET_SEED) {
Expand All @@ -34,8 +38,10 @@ let {
.usage('Start worker manager')
.options({
port: {
description: 'TCP port for worker to communicate readiness (for tests)',
description:
'HTTP port for worker manager to communicate readiness and status',
type: 'number',
demandOption: true,
},
highPriorityCount: {
description:
Expand Down Expand Up @@ -75,63 +81,93 @@ let isExiting = false;
process.on('SIGINT', () => (isExiting = true));
process.on('SIGTERM', () => (isExiting = true));

if (port != null) {
// in tests we start a simple TCP server to communicate to the realm when
// the worker is ready to start processing jobs
let server = createServer((socket) => {
log.info(`realm connected to worker manager`);
socket.on('data', (data) => {
if (data.toString() === 'ready?') {
socket.write(isReady ? 'ready' : 'not-ready');
}
});
socket.on('close', (hadError) => {
log.info(`realm has disconnected${hadError ? ' due to an error' : ''}.`);
});
socket.on('error', (err: any) => {
console.error(`realm disconnected from worker manager: ${err.message}`);
});
});
server.unref();
let dbAdapter = new PgAdapter({});

server.listen(port, () => {
log.info(`worker manager listening for realm on port ${port}`);
});
let webServer = new Koa<Koa.DefaultState, Koa.Context>();
let router = new Router();
router.head('/', livenessCheck);
router.get('/', async (ctxt: Koa.Context, _next: Koa.Next) => {
let result = {
ready: isReady,
} as Record<string, boolean | number>;
if (isReady) {
let [{ queue_depth }] = (await query(dbAdapter, [
`SELECT COUNT(*) as queue_depth FROM jobs WHERE status='unfulfilled'`,
])) as {
queue_depth: string;
}[];
result = {
...result,
highPriorityWorkers: highPriorityCount,
allPriorityWorkers: allPriorityCount,
queueDepth: parseInt(queue_depth, 10),
};
}
ctxt.set('Content-Type', 'application/json');
ctxt.body = JSON.stringify(result);
ctxt.status = isReady ? 200 : 503;
});

webServer
.use(router.routes())
.use((ctxt: Koa.Context, next: Koa.Next) => {
log.info(
`<-- ${ctxt.method} ${ctxt.req.headers.accept} ${
fullRequestURL(ctxt).href
}`,
);

const shutdown = () => {
log.info(`Shutting down server for worker manager...`);
server.close((err) => {
if (err) {
log.error(`Error while closing the server for worker manager:`, err);
process.exit(1);
}
log.info(`Server closed for worker manager.`);
process.exit(0);
ctxt.res.on('finish', () => {
log.info(
`--> ${ctxt.method} ${ctxt.req.headers.accept} ${
fullRequestURL(ctxt).href
}: ${ctxt.status}`,
);
log.debug(JSON.stringify(ctxt.req.headers));
});
};
return next();
})
.use(ecsMetadata);

process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
process.on('uncaughtException', (err) => {
log.error(`Uncaught exception in worker manager:`, err);
shutdown();
});
webServer.on('error', (err: any) => {
console.error(`worker manager HTTP server error: ${err.message}`);
});

process.on('message', (message) => {
if (message === 'stop') {
console.log(`stopping realm server on port ${port}...`);
server.close(() => {
console.log(`worker manager on port ${port} has stopped`);
if (process.send) {
process.send('stopped');
}
});
} else if (message === 'kill') {
console.log(`Ending worker manager process for ${port}...`);
process.exit(0);
let webServerInstance = webServer.listen(port);
log.info(`worker manager HTTP listening on port ${port}`);

const shutdown = (onShutdown?: () => void) => {
log.info(`Shutting down server for worker manager...`);
webServerInstance.closeAllConnections();
webServerInstance.close((err?: Error) => {
if (err) {
log.error(`Error while closing the server for worker manager HTTP:`, err);
process.exit(1);
}
dbAdapter.close(); // warning this is async
log.info(`worker manager HTTP on port ${port} has stopped.`);
onShutdown?.();
process.exit(0);
});
}
};

process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
process.on('uncaughtException', (err) => {
log.error(`Uncaught exception in worker manager:`, err);
shutdown();
});

process.on('message', (message) => {
if (message === 'stop') {
shutdown(() => {
process.send?.('stopped');
});
} else if (message === 'kill') {
console.log(`Ending worker manager process for ${port}...`);
process.exit(0);
}
});

(async () => {
log.info(
Expand Down

0 comments on commit 84cc7b2

Please sign in to comment.