Skip to content

Commit

Permalink
test exclusive indexer with distributed lock
Browse files Browse the repository at this point in the history
  • Loading branch information
boudra committed Apr 18, 2024
1 parent 322155f commit 26e47c2
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 140 deletions.
15 changes: 8 additions & 7 deletions .github/workflows/deploy-branch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,29 @@ jobs:

- name: Set variables for production
run: |
echo "DEPLOYMENT_ENVIRONMENT=production" >> $GITHUB_ENV
echo "DEPLOYMENT_URL=https://indexer-v2.fly.dev" >> $GITHUB_ENV
echo "DEPLOYMENT_ENVIRONMENT=staging" >> $GITHUB_ENV
echo "GIT_SHA_SHORT=$(git rev-parse --short "$GITHUB_SHA")" >> $GITHUB_ENV
echo "FLY_APP_NAME=indexer-staging" >> $GITHUB_ENV
if: ${{ github.ref == 'refs/heads/main' }}

- name: Build and test
run: |
flyctl -c fly.${{ env.DEPLOYMENT_ENVIRONMENT }}.toml deploy --remote-only --build-only
flyctl -c fly.${{ env.DEPLOYMENT_ENVIRONMENT }}.toml deploy --remote-only --build-only --push --image-label deployment-$GIT_SHA_SHORT
env:
FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }}

- name: Deploy Indexer
run: |
flyctl -c fly.${{ env.DEPLOYMENT_ENVIRONMENT }}.toml deploy --process-groups=indexer --remote-only --wait-timeout=7200 --env BUILD_TAG=`git rev-parse --short HEAD`
flyctl -c fly.${{ env.DEPLOYMENT_ENVIRONMENT }}.toml deploy --process-groups=indexer --wait-timeout=7200 --env BUILD_TAG=`git rev-parse --short HEAD` --image registry.fly.io/$FLY_APP_NAME:deployment-$GIT_SHA_SHORT
env:
FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }}

- name: Deploy HTTP
- name: Deploy Web
run: |
flyctl -c fly.${{ env.DEPLOYMENT_ENVIRONMENT }}.toml deploy --process-groups=web --remote-only --wait-timeout=7200 --env BUILD_TAG=`git rev-parse --short HEAD`
flyctl -c fly.${{ env.DEPLOYMENT_ENVIRONMENT }}.toml deploy --process-groups=web --wait-timeout=7200 --env BUILD_TAG=`git rev-parse --short HEAD` --image registry.fly.io/$FLY_APP_NAME:deployment-$GIT_SHA_SHORT
env:
FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }}

- name: Smoke test
run: |
curl --silent --show-error --fail-with-body ${{ env.DEPLOYMENT_URL }}/api/v1/status
curl --silent --show-error --fail-with-body https://${{ env.FLY_APP_NAME }}.fly.dev/api/v1/status
9 changes: 2 additions & 7 deletions fly.production.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
# fly.toml app configuration file generated for indexer-v2 on 2024-03-26T14:50:51+07:00
#
# See https://fly.io/docs/reference/configuration/ for information about how to use this file.
#

app = 'indexer-v2'
primary_region = 'den'
kill_signal = 'SIGINT'
Expand All @@ -19,7 +14,7 @@ kill_timeout = '5s'
[env]
DEPLOYMENT_ENVIRONMENT = 'production'
ENABLE_RESOURCE_MONITOR = 'true'
INDEXED_CHAINS = 'all'
INDEXED_CHAINS = "mainnet,optimism,fantom,pgn-testnet,pgn-mainnet,arbitrum,polygon,sepolia,avalanche,avalanche-fuji,scroll,scroll-sepolia,base,zksync-era-mainnet"
LOG_LEVEL = 'debug'
NODE_OPTIONS = '--max-old-space-size=4096'
PORT = '8080'
Expand Down Expand Up @@ -56,7 +51,7 @@ kill_timeout = '5s'
port = 8080
timeout = "10s"
type = "http"
processes = ['web', 'indexer']
processes = ['web', 'indexer_green']

[[vm]]
memory = '4gb'
Expand Down
54 changes: 54 additions & 0 deletions fly.staging.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
app = "indexer-staging"
primary_region = "den"
kill_signal = "SIGINT"
kill_timeout = 5

[experimental]
auto_rollback = true

[env]
PORT = "8080"
DEPLOYMENT_ENVIRONMENT = "staging"
LOG_LEVEL = "debug"
STORAGE_DIR = "/mnt/indexer"
INDEXED_CHAINS = "mainnet,optimism,fantom,pgn-testnet,pgn-mainnet,arbitrum,polygon,sepolia,avalanche,avalanche-fuji,scroll,scroll-sepolia,base,zksync-era-mainnet,sei-devnet"
ENABLE_RESOURCE_MONITOR = "true"
NODE_OPTIONS="--max-old-space-size=7168"

[processes]
web = "npm start -- --indexer --http"

[mounts]
source="indexer_staging"
destination="/mnt/indexer"

[[services]]
protocol = "tcp"
internal_port = 8080
processes = ["web"]

[[services.ports]]
port = 80
handlers = ["http"]
force_https = true
[[services.ports]]
port = 443
handlers = ["tls", "http"]

[services.concurrency]
type = "connections"
hard_limit = 25
soft_limit = 20

[[services.tcp_checks]]
interval = "30s"
timeout = "10s"
grace_period = "1m"

[[services.http_checks]]
interval = "60s"
grace_period = "1m"
timeout = "5s"
method = "get"
path = "/api/v1/status"
protocol = "http"
2 changes: 1 addition & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import os from "node:os";
type ChainId = number;
type CoingeckoSupportedChainId = 1 | 10 | 250 | 42161 | 43114 | 713715;

const CHAIN_DATA_VERSION = "60";
const CHAIN_DATA_VERSION = "62";

export type Token = {
code: string;
Expand Down
36 changes: 36 additions & 0 deletions src/database/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,29 +46,35 @@ const UPDATE_STATS_EVERY_MS = 60_000;

export class Database {
#db: KyselyDb;
#connectionPool: Pool;
#roundMatchTokenCache = new LRUCache<string, Address>({ max: 500 });
#donationQueue: NewDonation[] = [];
#donationBatchTimeout: ReturnType<typeof setTimeout> | null = null;
#statsTimeout: ReturnType<typeof setTimeout> | null = null;
#logger: Logger;

readonly databaseSchemaName: string;

constructor(options: {
statsUpdaterEnabled: boolean;
logger: Logger;
connectionPool: Pool;
schemaName: string;
}) {
const dialect = new PostgresDialect({
pool: options.connectionPool,
});

this.#connectionPool = options.connectionPool;

this.#db = new Kysely<Tables>({
dialect,
plugins: [new CamelCasePlugin()],
});

this.#db = this.#db.withSchema(options.schemaName);

this.#logger = options.logger;
this.databaseSchemaName = options.schemaName;

this.scheduleDonationQueueFlush();
Expand All @@ -78,6 +84,36 @@ export class Database {
}
}

async acquireWriteLock() {
const client = await this.#connectionPool.connect();
// generate lock id based on schema
const lockId = this.databaseSchemaName.split("").reduce((acc, char) => {
return acc + char.charCodeAt(0);
}, 0);

try {
const result = await client.query(
`SELECT pg_try_advisory_lock(${lockId}) as lock`
);

// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
if (result.rows[0].lock === true) {
return {
release: async () => {
await client.query(`SELECT pg_advisory_unlock(${lockId})`);
client.release();
},
client,
};
}
} catch (error) {
this.#logger.error({ error }, "Failed to acquire write lock");
client.release();
}

return null;
}

private scheduleStatsUpdate() {
if (this.#statsTimeout !== null) {
clearTimeout(this.#statsTimeout);
Expand Down
Loading

0 comments on commit 26e47c2

Please sign in to comment.