Skip to content

Commit

Permalink
feat: use d1 for tile lookups
Browse files Browse the repository at this point in the history
  • Loading branch information
zackpollard committed Feb 19, 2025
1 parent 568084b commit 048100f
Show file tree
Hide file tree
Showing 12 changed files with 264 additions and 428 deletions.
38 changes: 1 addition & 37 deletions .github/workflows/tiles-worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,44 +159,8 @@ jobs:
TF_VAR_tiles_build_dir: "${{ github.workspace }}/dist"
run: op run --env-file=".env" -- terragrunt run-all plan -no-color 2>&1 | tee "${{github.workspace}}/plan_output.txt" && exit ${PIPESTATUS[0]};

kv-warming:
needs: [build, test]
name: KV Warming
runs-on: mich
if: github.event_name == 'workflow_dispatch' || github.ref == 'refs/heads/main'
defaults:
run:
working-directory: ./tiles
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Setup Node
uses: actions/setup-node@v4
with:
node-version-file: './tiles/.nvmrc'

- name: Run npm install
run: npm ci

- name: Get tiles.json
run: echo "TILES_JSON=$(jq -c . < ${{ github.workspace }}/deployment/modules/cloudflare/tiles-worker/tiles.tfvars.json)" >> $GITHUB_ENV

- name: Run kv warming
env:
S3_ACCESS_KEY: ${{ secrets.CLOUDFLARE_TILES_R2_KV_TOKEN_ID }}
S3_SECRET_KEY: ${{ secrets.CLOUDFLARE_TILES_R2_KV_TOKEN_HASHED_VALUE }}
S3_ENDPOINT: https://${{ secrets.CLOUDFLARE_ACCOUNT_ID }}.r2.cloudflarestorage.com
KV_API_KEY: ${{ secrets.CLOUDFLARE_TILES_R2_KV_TOKEN_VALUE }}
CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }}
# Figure out how to extract this from terraform at some point or get it into github vars
KV_NAMESPACE_ID: 5a4b82694e8b490db8b8904cdaea4f00
BUCKET_KEY: tiles-weur
DEPLOYMENT_KEY: ${{ fromJson(env.TILES_JSON).pmtiles_deployment_key }}
run: npm run kv:warm

deploy-terragrunt:
needs: [build, test, kv-warming]
needs: [build, test]
name: Deploy Terragrunt
runs-on: ubuntu-latest
if: github.event_name == 'workflow_dispatch' || github.ref == 'refs/heads/main'
Expand Down
5 changes: 5 additions & 0 deletions deployment/modules/cloudflare/tiles-worker/workers.tf
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ resource "cloudflare_workers_script" "tiles" {
namespace_id = data.terraform_remote_state.tiles_state.outputs.kv_namespace_id
}

d1_database_binding {
database_id = "17968519-6e8a-440e-920b-2118b8e5376e"
name = "D1_TILE_LOOKUP"
}

compatibility_date = "2024-07-29"
compatibility_flags = ["nodejs_compat"]
}
Expand Down
154 changes: 154 additions & 0 deletions tiles/src/d1-warmer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import { S3Client } from '@aws-sdk/client-s3';
import { gunzipSync } from 'fflate';
import { appendFileSync, writeFileSync } from 'fs';
import pLimit from 'p-limit';
import { AsyncFn, IMetricsRepository, IStorageRepository, Operation } from './interface';
import { DirectoryString, PMTilesService } from './pmtiles/pmtiles.service';
import { Compression, Directory, Header } from './pmtiles/types';
import { deserializeIndex } from './pmtiles/utils';
import { CloudflareD1Repository, MemCacheRepository, S3StorageRepository } from './repository';

class FakeMetricsRepository implements IMetricsRepository {
constructor() {}

monitorAsyncFunction<T extends AsyncFn>(
operation: Operation,
call: T,
): (...args: Parameters<T>) => Promise<Awaited<ReturnType<T>>> {
return call;
}
push(): void {}
}

export function decompress(buf: ArrayBuffer, compression: Compression): ArrayBuffer {
if (compression !== Compression.Gzip) {
throw new Error('Compression method not supported');
}
const result = gunzipSync(new Uint8Array(buf));
return result;
}

const getDirectory = async (length: number, offset: number, source: IStorageRepository, header: Header) => {
const resp = await source.getRange({ offset, length });
const data = decompress(resp, header.internalCompression);
const entries = deserializeIndex(await new Response(data).arrayBuffer());
if (entries.length === 0) {
throw new Error('Empty directory is invalid');
}
const directory: Directory = { offsetStart: entries[0].offset, tileIdStart: entries[0].tileId, entries };
return directory;
};

const handler = async () => {
const {
S3_ACCESS_KEY,
S3_SECRET_KEY,
S3_ENDPOINT,
KV_API_KEY,
CLOUDFLARE_ACCOUNT_ID,
KV_NAMESPACE_ID,
BUCKET_KEY,
DEPLOYMENT_KEY,
} = process.env;
if (
!S3_ACCESS_KEY ||
!S3_SECRET_KEY ||
!S3_ENDPOINT ||
!KV_API_KEY ||
!CLOUDFLARE_ACCOUNT_ID ||
!KV_NAMESPACE_ID ||
!BUCKET_KEY ||
!DEPLOYMENT_KEY
) {
throw new Error('Missing environment variables');
}

console.log('Starting S3');
const client = new S3Client({
region: 'auto',
endpoint: S3_ENDPOINT,
credentials: {
accessKeyId: S3_ACCESS_KEY,
secretAccessKey: S3_SECRET_KEY,
},
});

const storageRepository = new S3StorageRepository(client, BUCKET_KEY, DEPLOYMENT_KEY);
const memCacheRepository = new MemCacheRepository(new Map());
const metricsRepository = new FakeMetricsRepository();
const pmTilesService = await PMTilesService.init(
storageRepository,
memCacheRepository,
metricsRepository,
null as unknown as CloudflareD1Repository,
);

const [header, root] = await pmTilesService.getHeaderAndRootFromSource();
let countR2 = 0;
let total = 0;
const promises: Promise<void>[] = [];
const limit = pLimit(10);

let entryCount = 0;

const createTableStatement = `CREATE TABLE IF NOT EXISTS cache_entries (
startTileId INTEGER NOT NULL PRIMARY KEY,
entry TEXT NOT NULL
) STRICT;`;

writeFileSync('cache_entries.sql', createTableStatement);

for (const entry of root.entries) {
const call = async () => {
const directory = await getDirectory(
entry.length,
entry.offset + header.leafDirectoryOffset,
storageRepository,
header,
);

entryCount += directory.entries.length;

console.log('Entry Progress: ' + entryCount);

const totalChunks = Math.ceil(directory.entries.length / 50);
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) {
const chunkEntries = directory.entries.slice(chunkIndex * 50, (chunkIndex + 1) * 50);
const directoryChunk = {
offsetStart: chunkEntries[0].offset,
tileIdStart: chunkEntries[0].tileId,
entries: chunkEntries,
};

const startTileId = chunkEntries[0].tileId;

const stream = DirectoryString.fromDirectory(directoryChunk);
const entryValue = await stream.toString();

const insertStatement = `\nINSERT INTO cache_entries (startTileId, entry) VALUES (${startTileId}, '${entryValue}');`;
appendFileSync(`cache_entries.${Math.floor(countR2 / 50)}.sql`, insertStatement);
entryCount++;
}

countR2++;
console.log('R2 Progress: ' + countR2 + '/' + total);
};
promises.push(limit(call));
total++;
}

await Promise.all(promises);
};

process.on('uncaughtException', (e) => {
console.error('UNCAUGHT EXCEPTION');
console.error('stack', e);
process.exit(1);
});

handler()
.then(() => console.log('Done'))
.catch((e) => {
console.error('Error', e);
throw e;
});
6 changes: 3 additions & 3 deletions tiles/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { preferredBuckets, R2BucketRegion } from './buckets';
import { IMetricsRepository } from './interface';
import { PMTilesService } from './pmtiles/pmtiles.service';
import {
CloudflareD1Repository,
CloudflareDeferredRepository,
CloudflareKVRepository,
CloudflareMetricsRepository,
HeaderMetricsProvider,
InfluxMetricsProvider,
Expand Down Expand Up @@ -156,7 +156,7 @@ async function handleRequest(
}

const memCacheRepository = new MemCacheRepository(globalThis.memCache);
const kvRepository = new CloudflareKVRepository(env.KV);
const d1Repository = new CloudflareD1Repository(env.D1_TILE_LOOKUP);
const bucketMap: Record<R2BucketRegion, R2Bucket> = {
apac: env.BUCKET_APAC,
eeur: env.BUCKET_EEUR,
Expand All @@ -176,8 +176,8 @@ async function handleRequest(
const pmTilesService = await metrics.monitorAsyncFunction({ name: 'pmtiles_init' }, PMTilesService.init)(
storageRepository,
memCacheRepository,
kvRepository,
metrics,
d1Repository,
);

const respHeaders = new Headers();
Expand Down
5 changes: 2 additions & 3 deletions tiles/src/interface.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { Metric } from './repository';

export interface IKeyValueRepository {
get(key: string): Promise<string | undefined>;
getAsStream(key: string): Promise<ReadableStream | undefined>;
export interface IDatabaseRepository {
query(query: string, ...values: unknown[]): Promise<D1Result<Record<string, unknown>>>;
}

export interface IMemCacheRepository {
Expand Down
Loading

0 comments on commit 048100f

Please sign in to comment.