Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple worker support #2036

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ jobs:
matrix:
shardIndex: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
shardTotal: [12]
concurrency:
group: matrix-client-test-${{ matrix.shardIndex }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/init
Expand Down
44 changes: 41 additions & 3 deletions .github/workflows/manual-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,21 @@ jobs:
with:
repository: "boxel-realm-server-${{ inputs.environment }}"
environment: ${{ inputs.environment }}
dockerfile: "packages/realm-server/Dockerfile"
dockerfile: "packages/realm-server/realm-server.Dockerfile"
build-args: |
"realm_server_script=start:${{ inputs.environment }}"

build-worker:
name: Build worker Docker image
uses: cardstack/gh-actions/.github/workflows/docker-ecr.yml@main
secrets: inherit
with:
repository: "boxel-worker-${{ inputs.environment }}"
environment: ${{ inputs.environment }}
dockerfile: "packages/realm-server/worker.Dockerfile"
build-args: |
"worker_script=start:worker-${{ inputs.environment }}"

build-pg-migration:
name: Build pg-migration Docker image
uses: cardstack/gh-actions/.github/workflows/docker-ecr.yml@main
Expand All @@ -103,16 +114,43 @@ jobs:
image: ${{ needs.build-pg-migration.outputs.image }}
wait-for-service-stability: false

# the wait-for-service-stability flag doesn't seem to work in
# aws-actions/amazon-ecs-deploy-task-definition@v2. we keep getting timeouts
# waiting for service stability. So we are manually waiting here.
post-migrate-db:
name: Wait for db-migration
needs: [migrate-db]
runs-on: ubuntu-latest
steps:
- run: sleep 240
- run: sleep 180

deploy-worker:
name: Deploy worker
needs: [build-worker, deploy-host, post-migrate-db]
uses: cardstack/gh-actions/.github/workflows/ecs-deploy.yml@main
secrets: inherit
with:
container-name: "boxel-worker"
environment: ${{ inputs.environment }}
cluster: ${{ inputs.environment }}
service-name: "boxel-worker-${{ inputs.environment }}"
image: ${{ needs.build-worker.outputs.image }}
wait-for-service-stability: false

# the wait-for-service-stability flag doesn't seem to work in
# aws-actions/amazon-ecs-deploy-task-definition@v2. we keep getting timeouts
# waiting for service stability. So we are manually waiting here.
post-deploy-worker:
name: Wait for worker
needs: [deploy-worker]
runs-on: ubuntu-latest
steps:
- run: sleep 180

deploy-realm-server:
name: Deploy realm server
needs: [build-realm-server, deploy-host, post-migrate-db]
needs:
[post-deploy-worker, build-realm-server, deploy-host, post-migrate-db]
uses: cardstack/gh-actions/.github/workflows/ecs-deploy.yml@main
secrets: inherit
with:
Expand Down
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Live reloads are not available in this mode, however, if you use start the serve

#### Using `start:all`

Instead of running `pnpm start:base`, you can alternatively use `pnpm start:all` which also serves a few other realms on other ports--this is convenient if you wish to switch between the app and the tests without having to restart servers. Here's what is spun up with `start:all`:
Instead of running `pnpm start:base`, you can alternatively use `pnpm start:all` which also serves a few other realms on other ports--this is convenient if you wish to switch between the app and the tests without having to restart servers. Use the environment variable `WORKER_COUNT` to add additional workers. By default there is 1 worker for each realm server. Here's what is spun up with `start:all`:

| Port | Description | Running `start:all` | Running `start:base` |
| ----- | ------------------------------------------------------------- | ------------------- | -------------------- |
Expand All @@ -82,13 +82,17 @@ Instead of running `pnpm start:base`, you can alternatively use `pnpm start:all`
| :4201 | `/seed` seed realm | ✅ | 🚫 |
| :4202 | `/test` host test realm, `/node-test` node test realm | ✅ | 🚫 |
| :4205 | `/test` realm for matrix client tests (playwright controlled) | 🚫 | 🚫 |
| :4210 | Development Worker Manager (spins up 1 worker by default) | ✅ | 🚫 |
| :4211 | Test Worker Manager (spins up 1 worker by default) | ✅ | 🚫 |
| :4212 | Test Worker Manager for matrix client tests (playwright controlled - 1 worker) | ✅ | 🚫 |
| :4213 | Test Worker Manager for matrix client tests - base realm server (playwright controlled - 1 worker) | ✅ | 🚫 |
| :5001 | Mail user interface for viewing emails sent to local SMTP | ✅ | 🚫 |
| :5435 | Postgres DB | ✅ | 🚫 |
| :8008 | Matrix synapse server | ✅ | 🚫 |

#### Using `start:development`

You can also use `start:development` if you want the functionality of `start:all`, but without running the test realms. `start:development` will enable you to open http://localhost:4201 and allow to select between the cards in the /base and /experiments realm.
You can also use `start:development` if you want the functionality of `start:all`, but without running the test realms. `start:development` will enable you to open http://localhost:4201 and allow to select between the cards in the /base and /experiments realm. In order to use `start:development` you must also make sure to run `start:worker-development` in order to start the workers (which are normally started in `start:all`.

### Card Pre-rendering

Expand Down
71 changes: 62 additions & 9 deletions packages/matrix/helpers/isolated-realm-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,36 @@ export async function startServer() {
process.env.MATRIX_URL = 'http://localhost:8008';
process.env.REALM_SERVER_MATRIX_USERNAME = 'realm_server';

let workerManager = spawn(
'ts-node',
[
`--transpileOnly`,
'worker-manager',
`--port=4212`,
`--matrixURL='http://localhost:8008'`,
`--distURL="${process.env.HOST_URL ?? 'http://localhost:4200'}"`,

`--fromUrl='http://localhost:4205/test/'`,
`--toUrl='http://localhost:4205/test/'`,
`--fromUrl='https://cardstack.com/base/'`,
`--toUrl='http://localhost:4201/base/'`,
],
{
cwd: realmServerDir,
stdio: ['pipe', 'pipe', 'pipe', 'ipc'],
},
);
if (workerManager.stdout) {
workerManager.stdout.on('data', (data: Buffer) =>
console.log(`worker: ${data.toString()}`),
);
}
if (workerManager.stderr) {
workerManager.stderr.on('data', (data: Buffer) =>
console.error(`worker: ${data.toString()}`),
);
}

let realmServer = spawn(
'ts-node',
[
Expand All @@ -40,13 +70,14 @@ export async function startServer() {
`--matrixURL='http://localhost:8008'`,
`--realmsRootPath='${dir.name}'`,
`--seedPath='${seedPath}'`,
`--workerManagerPort=4212`,
`--migrateDB`,
`--useRegistrationSecretFunction`,

`--path='${testRealmDir}'`,
`--username='test_realm'`,
`--fromUrl='/test/'`,
`--toUrl='/test/'`,
`--fromUrl='http://localhost:4205/test/'`,
`--toUrl='http://localhost:4205/test/'`,
`--fromUrl='https://cardstack.com/base/'`,
`--toUrl='http://localhost:4201/base/'`,
],
Expand All @@ -55,6 +86,7 @@ export async function startServer() {
stdio: ['pipe', 'pipe', 'pipe', 'ipc'],
},
);
realmServer.unref();
if (realmServer.stdout) {
realmServer.stdout.on('data', (data: Buffer) =>
console.log(`realm server: ${data.toString()}`),
Expand Down Expand Up @@ -91,25 +123,36 @@ export async function startServer() {
);
}

return new IsolatedRealmServer(realmServer, testRealmDir);
return new IsolatedRealmServer(realmServer, workerManager, testRealmDir);
}

export class IsolatedRealmServer {
private stopped: (() => void) | undefined;
private realmServerStopped: (() => void) | undefined;
private workerManagerStopped: (() => void) | undefined;
private sqlResults: ((results: string) => void) | undefined;
private sqlError: ((error: string) => void) | undefined;

constructor(
private realmServerProcess: ReturnType<typeof spawn>,
private workerManagerProcess: ReturnType<typeof spawn>,
readonly realmPath: string, // useful for debugging
) {
workerManagerProcess.on('message', (message) => {
if (message === 'stopped') {
if (!this.workerManagerStopped) {
console.error(`received unprompted worker manager stop`);
return;
}
this.workerManagerStopped();
}
});
realmServerProcess.on('message', (message) => {
if (message === 'stopped') {
if (!this.stopped) {
if (!this.realmServerStopped) {
console.error(`received unprompted server stop`);
return;
}
this.stopped();
this.realmServerStopped();
} else if (
typeof message === 'string' &&
message.startsWith('sql-results:')
Expand Down Expand Up @@ -149,10 +192,20 @@ export class IsolatedRealmServer {
}

async stop() {
let stop = new Promise<void>((r) => (this.stopped = r));
let realmServerStop = new Promise<void>(
(r) => (this.realmServerStopped = r),
);
this.realmServerProcess.send('stop');
await stop;
this.stopped = undefined;
await realmServerStop;
this.realmServerStopped = undefined;
this.realmServerProcess.send('kill');

let workerManagerStop = new Promise<void>(
(r) => (this.workerManagerStopped = r),
);
this.workerManagerProcess.send('stop');
await workerManagerStop;
this.workerManagerStopped = undefined;
this.workerManagerProcess.send('kill');
}
}
103 changes: 54 additions & 49 deletions packages/realm-server/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ import { NodeAdapter } from './node-realm';
import yargs from 'yargs';
import { RealmServer } from './server';
import { resolve } from 'path';
import { spawn } from 'child_process';
import { createConnection, type Socket } from 'net';
import { makeFastBootIndexRunner } from './fastboot';
import { shimExternals } from './lib/externals';
import * as Sentry from '@sentry/node';
import { PgAdapter, PgQueuePublisher } from '@cardstack/postgres';
import { MatrixClient } from '@cardstack/runtime-common/matrix-client';
import flattenDeep from 'lodash/flattenDeep';
import 'decorator-transforms/globals';

let log = logger('main');
Expand Down Expand Up @@ -68,6 +67,7 @@ let {
useRegistrationSecretFunction,
seedPath,
migrateDB,
workerManagerPort,
} = yargs(process.argv.slice(2))
.usage('Start realm server')
.options({
Expand Down Expand Up @@ -130,6 +130,11 @@ let {
'The flag should be set when running matrix tests where the synapse instance is torn down and restarted multiple times during the life of the realm server.',
type: 'boolean',
},
workerManagerPort: {
description:
'The port the worker manager is running on. used to wait for the workers to be ready',
type: 'number',
},
})
.parseSync();

Expand Down Expand Up @@ -165,8 +170,8 @@ let virtualNetwork = new VirtualNetwork();
shimExternals(virtualNetwork);

let urlMappings = fromUrls.map((fromUrl, i) => [
new URL(String(fromUrl), `http://localhost:${port}`),
new URL(String(toUrls[i]), `http://localhost:${port}`),
new URL(String(fromUrl)),
new URL(String(toUrls[i])),
]);
for (let [from, to] of urlMappings) {
virtualNetwork.addURLMapping(from, to);
Expand All @@ -185,7 +190,9 @@ let autoMigrate = migrateDB || undefined;
manager.getOptions.bind(manager),
);

await startWorker({ autoMigrate });
if (workerManagerPort != null) {
await waitForWorkerManager(workerManagerPort);
}

for (let [i, path] of paths.entries()) {
let url = hrefs[i][0];
Expand Down Expand Up @@ -324,51 +331,49 @@ let autoMigrate = migrateDB || undefined;
process.exit(-3);
});

async function startWorker(opts?: { autoMigrate?: true }) {
let worker = spawn(
'ts-node',
[
'--transpileOnly',
'worker',
`--port=${port}`,
`--matrixURL='${matrixURL}'`,
`--distURL='${distURL}'`,
...(opts?.autoMigrate ? [`--migrateDB`] : []),
...flattenDeep(
urlMappings.map(([from, to]) => [
`--fromUrl='${from}'`,
`--toUrl='${to}'`,
]),
),
],
{
stdio: ['pipe', 'pipe', 'pipe', 'ipc'],
},
);
let workerReadyDeferred: Deferred<boolean> | undefined;
async function waitForWorkerManager(port: number) {
const workerManager = await new Promise<Socket>((r) => {
let socket = createConnection({ port }, () => {
log.info(`Connected to worker manager on port ${port}`);
r(socket);
});
});

if (worker.stdout) {
worker.stdout.on('data', (data: Buffer) =>
log.info(`worker: ${data.toString()}`),
);
}
if (worker.stderr) {
worker.stderr.on('data', (data: Buffer) =>
console.error(`worker: ${data.toString()}`),
);
}
workerManager.on('data', (data) => {
let res = data.toString();
if (!workerReadyDeferred) {
throw new Error(
`received unsolicited message from worker manager on port ${port}`,
);
}
switch (res) {
case 'ready':
case 'not-ready':
workerReadyDeferred.fulfill(res === 'ready' ? true : false);
break;
default:
workerReadyDeferred.reject(
`unexpected response from worker manager: ${res}`,
);
}
});

let timeout = await Promise.race([
new Promise<void>((r) => {
worker.on('message', (message) => {
if (message === 'ready') {
r();
}
});
}),
new Promise<true>((r) => setTimeout(() => r(true), 30_000)),
]);
if (timeout) {
console.error(`timed-out waiting for worker to start. Stopping server`);
process.exit(-2);
try {
let isReady = false;
let timeout = Date.now() + 30_000;
do {
workerReadyDeferred = new Deferred();
workerManager.write('ready?');
isReady = await workerReadyDeferred.promise;
} while (!isReady && Date.now() < timeout);
if (!isReady) {
throw new Error(
`timed out trying to connect to worker manager on port ${port}`,
);
}
} finally {
workerManager.end();
}
log.info('workers are ready');
}
Loading
Loading