Skip to content

Commit

Permalink
Fix up graphqlk log provider (#9742)
Browse files Browse the repository at this point in the history
  • Loading branch information
pgebheim authored Nov 17, 2020
2 parents 501a480 + 20649c7 commit d453119
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 29 deletions.
58 changes: 45 additions & 13 deletions packages/augur-sdk/src/graph/GraphQLLogProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Filter, Log } from '@augurproject/types';
import { retry } from 'async';
import * as _ from 'lodash';

const RETRY_TIMES = 3;
const RETRY_TIMES = 7;
const INTERVAL = 1000;

const eventFields = {
Expand All @@ -23,7 +23,7 @@ const eventFields = {
"completeSetsPurchased": ["universe", "market", "account", "numCompleteSets", "timestamp"],
"completeSetsSold": ["universe", "market", "account", "numCompleteSets", "fees", "timestamp"],
"tradingProceedsClaimed": ["universe", "sender", "market", "outcome", "numShares", "numPayoutTokens", "fees", "timestamp"],
"tokensTransferred": ["universe", "token", "from", "to", "value", "tokenType", "market"],
//"tokensTransferred": ["universe", "token", "from", "to", "value", "tokenType", "market"],
"tokensMinted": ["universe", "token", "target", "amount", "tokenType", "market", "totalSupply"],
"tokensBurned": ["universe", "token", "target", "amount", "tokenType", "market", "totalSupply"],
"tokenBalanceChanged": ["universe", "owner", "token", "tokenType", "market", "balance", "outcome"],
Expand Down Expand Up @@ -82,29 +82,29 @@ export function* logQuery(fromBlockNumber: number, toBlockNumber: number, logFie
}
}, {});

while (true) {
while (Object.keys(skipCounts).length > 0) {
const lastLogs = yield buildQuery(fromBlockNumber, toBlockNumber, logFieldDescriptions, skipCounts);
skipCounts = Object.entries(skipCounts).reduce((acc, [key, skipCount]) => {
if(lastLogs[key].length === 0) return acc; // Ignore this entity, no more can be fetched
if(lastLogs[key].length < 1000) return acc; // Ignore this entity, no more can be fetched
acc[key] = skipCount + lastLogs[key].length;
return acc;
}, {});
console.log("Skip counts", skipCounts);

const flattenedLogs = _.flatten(_.values(lastLogs));
allLogs = {
allLogs = [
...allLogs,
...flattenedLogs
}

if(flattenedLogs.length === 0) break;
]
}

return allLogs;
}

function makeSubgraphRequest(subgraphUrl: string, query: string): Promise<LogQueryResponse> {
return makeGraphRequest(subgraphUrl, query) as Promise<LogQueryResponse>
}

function makeGraphRequest(subgraphUrl: string, query: string): Promise<LogQueryResponse> {
function makeGraphRequest(url: string, query: string): Promise<any> {
return new Promise((resolve, reject) => {
retry({
times: RETRY_TIMES,
Expand All @@ -114,7 +114,7 @@ function makeGraphRequest(subgraphUrl: string, query: string): Promise<LogQueryR
errorFilter (err) { return true; }
},
async () => {
const result = await axios.post(subgraphUrl, { query });
const result = await axios.post(url, { query });
if (result.data.errors !== undefined) {
const message = `GraphQL Log Request Got ${result.data.errors.length} Errors: ${JSON.stringify(result.data.errors)}`;
console.warn(message);
Expand All @@ -125,8 +125,9 @@ function makeGraphRequest(subgraphUrl: string, query: string): Promise<LogQueryR
(err, results) => {
if (err) {
reject(err);
} else {
resolve(results);
}
resolve(results);
}
)
});
Expand All @@ -145,10 +146,41 @@ export class GraphQLLogProvider {
const i = logQuery(Number(filter.fromBlock), Number(filter.toBlock), eventFields);
let result = i.next();
while(!result.done) {
const response = await makeGraphRequest(this.subgraphUrl, result.value as string);
const response = await makeSubgraphRequest(this.subgraphUrl, result.value as string);
result = i.next(response);
}

return result.value;
}

async getSyncStatus() {
const response = await makeGraphRequest(
"https://api.thegraph.com/index-node/graphql", `
{
status: indexingStatusForCurrentVersion(subgraphName: "${this.subgraphUrl.replace("https://api.thegraph.com/subgraphs/name/", "")}") {
synced
health
fatalError {
message
}
chains {
chainHeadBlock {
number
}
latestBlock {
number
}
}
}
}
`);
const {synced, health, fatalError, chains} = response.status;
return {
synced,
health,
fatalError,
chainHeadBlockNumber: parseInt(chains[0].chainHeadBlock.number),
latestBlockNumber: parseInt(chains[0].latestBlock.number)
}
}
}
32 changes: 16 additions & 16 deletions packages/augur-sdk/src/state/create-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,28 @@ export async function buildSyncStrategies(client:Augur, db:Promise<DB>, provider
return async () => {
const contractAddresses = client.contractEvents.getAugurContractAddresses();
const uploadBlockNumber = config.uploadBlockNumber;
const currentBlockNumber = await provider.getBlockNumber();
let currentBlockNumber = await provider.getBlockNumber();

let bulkSyncStrategy;
let bulkSyncStrategy = null;

if (config.graph && config.graph.logSubgraphURL) {
const logProvider = new GraphQLLogProvider(config.graph.logSubgraphURL);
bulkSyncStrategy = new BulkSyncStrategy(logProvider.getLogs.bind(logProvider),
contractAddresses, logFilterAggregator.onLogsAdded,
(logs: any[]) => { return logs; }
);
} else {
const status = await logProvider.getSyncStatus();
if(status.health === "healthy") {
currentBlockNumber = status.latestBlockNumber;
bulkSyncStrategy = new BulkSyncStrategy(logProvider.getLogs.bind(logProvider),
contractAddresses, logFilterAggregator.onLogsAdded,
(logs: any[]) => { return logs; }
);
}
}

if (bulkSyncStrategy === null) {
bulkSyncStrategy = new BulkSyncStrategy(provider.getLogs,
contractAddresses, logFilterAggregator.onLogsAdded,
client.contractEvents.parseLogs);
}

const blockAndLogStreamerSyncStrategy = BlockAndLogStreamerSyncStrategy.create(
provider,
contractAddresses,
Expand Down Expand Up @@ -226,22 +233,15 @@ export async function createServer(config: SDKConfiguration, client?: Augur): Pr
export async function startServerFromClient(config: SDKConfiguration, client?: Augur ): Promise<API> {
const { api, sync } = await createServer(config, client);

// TODO should this await?
sync();
/*
controller.run().catch((err) => {
// TODO: PG needs to handle what happens if the server side of the connector dies
console.log('Error starting up Augur syncing services');
});
*/
sync().catch((error) => api.augur.events.emit("error", error))

return api;
}

export async function startServer(config: SDKConfiguration): Promise<API> {
const { api, sync } = await createServer(config, undefined);

sync();
sync().catch((error) => api.augur.events.emit("error", error))

return api;
}
3 changes: 3 additions & 0 deletions packages/augur-ui/src/modules/events/actions/log-handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -918,4 +918,7 @@ const EventHandlers = {
handleInitialReportSubmittedLog
),
[SubscriptionEventName.MarketCreated]: wrapLogHandler(handleMarketCreatedLog),
"error": () => {
console.error("ERROR: ", error);
}
};

0 comments on commit d453119

Please sign in to comment.