Skip to content

Commit

Permalink
feat: share RateLimitStatPrinter with old code
Browse files Browse the repository at this point in the history
  • Loading branch information
eseidel committed Aug 10, 2023
1 parent 042295a commit 0149c3f
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 14 deletions.
26 changes: 25 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -607,4 +607,28 @@ e.g. did the price change since when the trade was scoped vs. when it was execut
"waypoints": 66798
}
}
```
```

## Confused

🛸#54 🛫 to X1-UC71-90215B JUMP_GATE (39s) spent 49 fuel
🛸#6F ✈️ to X1-YA22-87615D, -1m left
Unhandled exception:
Invalid argument (marketSymbol): ESEIDEL-6F is not at X1-YA22-87615D, X1-YA22-92610F.: Instance of 'WaypointSymbol'
#0 recordMarketDataIfNeededAndLog (package:cli/cache/market_prices.dart:468:5)
#1 visitLocalMarket (package:cli/behavior/explorer.dart:65:24)
<asynchronous suspension>
#2 advanceTrader (package:cli/behavior/trader.dart:643:25)
<asynchronous suspension>
#3 advanceShipBehavior (package:cli/behavior/advance.dart:83:23)
<asynchronous suspension>
#4 advanceShips (package:cli/logic.dart:51:25)
<asynchronous suspension>
#5 logic (package:cli/logic.dart:150:7)
<asynchronous suspension>
#6 cliMain (file:///root/space_traders/packages/cli/bin/cli.dart:101:3)
<asynchronous suspension>
#7 main.<anonymous closure> (file:///root/space_traders/packages/cli/bin/cli.dart:107:7)
<asynchronous suspension>
#8 main (file:///root/space_traders/packages/cli/bin/cli.dart:105:3)
<asynchronous suspension>
42 changes: 29 additions & 13 deletions packages/cli/bin/network_execute.dart
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import 'package:cli/cli.dart';
import 'package:cli/net/queue.dart';
import 'package:cli/net/rate_limit.dart';
import 'package:cli/printing.dart';
import 'package:http/http.dart';
import 'package:postgres/postgres.dart';

class NetExecutor {
NetExecutor(PostgreSQLConnection connection, {this.maxRequestsPerSecond = 3})
: queue = NetQueue(connection, QueueRole.responder);
NetExecutor(
PostgreSQLConnection connection, {
this.targetRequestsPerSecond = 3,
}) : queue = NetQueue(connection, QueueRole.responder);

final int maxRequestsPerSecond;
final int targetRequestsPerSecond;
final Client _client = Client();
final NetQueue queue;

Expand All @@ -20,11 +23,12 @@ class NetExecutor {
return DateTime.parse(resetString);
}

Future<Response> sendRequest(QueuedRequest request) async {
final method = request.method;
final uri = Uri.parse(request.url);
final body = request.body;
final headers = request.headers;
Future<Response> _dispatchRequest(
String method,
Uri uri, {
Map<String, String>? headers,
String? body,
}) {
switch (method) {
case 'POST':
return _client.post(uri, headers: headers, body: body);
Expand All @@ -42,6 +46,16 @@ class NetExecutor {
throw Exception('Unknown method: $method');
}

Future<Response> sendRequest(QueuedRequest request) async {
final method = request.method;
final uri = Uri.parse(request.url);
final body = request.body;
final headers = request.headers;
final response =
await _dispatchRequest(method, uri, headers: headers, body: body);
return response;
}

String _removeExpectedPrefix(String url) {
const expectedPrefix = 'https://api.spacetraders.io/v2';
if (!url.startsWith(expectedPrefix)) {
Expand All @@ -52,10 +66,12 @@ class NetExecutor {

Future<void> run() async {
logger.info('Servicing network requests...');
final minWaitTime = const Duration(seconds: 1) ~/ maxRequestsPerSecond;
final minWaitTime = const Duration(seconds: 1) ~/ targetRequestsPerSecond;
final stats = RateLimitStatPrinter();
DateTime? nextRequestTime;
var backoffSeconds = 1;
while (true) {
stats.printIfNeeded();
final request = await queue.nextRequest();
if (request == null) {
await queue.waitForRequest();
Expand All @@ -72,16 +88,16 @@ class NetExecutor {
final path = _removeExpectedPrefix(request.request.url);
nextRequestTime = DateTime.timestamp().add(minWaitTime);
final response = await sendRequest(request.request);
final after = DateTime.timestamp();
final duration = after.difference(before);
logger.info(
stats.record(response);
final duration = DateTime.timestamp().difference(before);
logger.detail(
'${approximateDuration(duration)} ${response.statusCode} '
'${request.request.method.padRight(5)} $path',
);
if (response.statusCode == 429) {
final resetTime = _parseResetTime(response);
if (resetTime != null) {
logger.info('Rate limited, waiting until $resetTime');
logger.warn('Rate limited, waiting until $resetTime');
final duration = resetTime.difference(DateTime.timestamp());
await Future<void>.delayed(duration);
} else {
Expand Down
53 changes: 53 additions & 0 deletions packages/cli/lib/net/rate_limit.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,60 @@ import 'dart:convert';
import 'package:cli/api.dart';
import 'package:cli/logger.dart';
import 'package:cli/net/counts.dart';
import 'package:cli/printing.dart';
import 'package:clock/clock.dart';
import 'package:http/http.dart';
import 'package:meta/meta.dart';

/// Prints stats about rate limiting.
class RateLimitStatPrinter {
/// Total number of requests since last reset.
int _total = 0;

/// Number of successful requests since last reset.
int _successes = 0;

/// Number of rate limited requests since last reset.
int _rateLimits = 0;

DateTime _lastPrintTime = DateTime.timestamp();

void _printStatsIfNonZero(Duration duration) {
if (_total > 0) {
logger.info(
'$_successes ($_rateLimits) in '
'${approximateDuration(duration)} total: $_total',
);
}
}

void _reset() {
_total = 0;
_successes = 0;
_rateLimits = 0;
}

/// Print the stats if it's been at least a minute since the last print.
void printIfNeeded() {
final sinceLastPrint = DateTime.timestamp().difference(_lastPrintTime);
if (sinceLastPrint >= const Duration(minutes: 1)) {
_printStatsIfNonZero(sinceLastPrint);
_reset();
_lastPrintTime = DateTime.timestamp();
}
}

/// Record a response.
void record(Response response) {
_total++;
if (response.statusCode == 429) {
_rateLimits++;
} else {
_successes++;
}
}
}

/// A rate limiter that can be used to rate limit requests to the api.
class RateLimiter {
/// Construct a rate limiter.
Expand Down Expand Up @@ -132,6 +182,7 @@ class RateLimitedApiClient extends CountingApiClient {
_mockInvokeAPI = mockInvokeApi;

final RateLimiter _rateLimiter;
final RateLimitStatPrinter _stats = RateLimitStatPrinter();
final InvokeApi? _mockInvokeAPI;

/// Called by other code for printing.
Expand Down Expand Up @@ -218,6 +269,7 @@ class RateLimitedApiClient extends CountingApiClient {

final response = await handleUnexpectedRateLimit(
() async {
_stats.printIfNeeded();
// Wait for our turn (even when retrying)
await _rateLimiter.waitForRateLimit();
// Could include retry count here.
Expand All @@ -232,6 +284,7 @@ class RateLimitedApiClient extends CountingApiClient {
formParams,
contentType,
);
_stats.record(response);
return response;
},
);
Expand Down

0 comments on commit 0149c3f

Please sign in to comment.