Skip to content

Commit

Permalink
feat: Move transactions into the database.
Browse files Browse the repository at this point in the history
This required plumbing Database everywhere.
I also wrapped PostgreSQLConnection in Database
to avoid the direct dependency.
  • Loading branch information
eseidel committed Aug 11, 2023
1 parent c3f24dd commit 8d8a782
Show file tree
Hide file tree
Showing 43 changed files with 707 additions and 288 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -631,4 +631,7 @@ Invalid argument (marketSymbol): ESEIDEL-6F is not at X1-YA22-87615D, X1-YA22-92
#7 main.<anonymous closure> (file:///root/space_traders/packages/cli/bin/cli.dart:107:7)
<asynchronous suspension>
#8 main (file:///root/space_traders/packages/cli/bin/cli.dart:105:3)
<asynchronous suspension>
<asynchronous suspension>

## Use best-place-to-buy logic for ships:
[WARN] 🛸#1 Can not buy SHIP_ORE_HOUND at X1-YA22-18767C, credits 318,996c < 1.05 * price = 1,334,568c. Disabling Behavior.buyShip for 10m.
19 changes: 10 additions & 9 deletions packages/cli/bin/cli.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import 'package:cli/net/auth.dart';
import 'package:cli/net/counts.dart';
import 'package:cli/net/queue.dart';
import 'package:cli/printing.dart';
import 'package:db/db.dart';
import 'package:file/local.dart';
import 'package:postgres/postgres.dart';
import 'package:scoped/scoped.dart';

void printRequestStats(RequestCounts requestCounts) {
Expand Down Expand Up @@ -46,16 +46,17 @@ bool Function(Ship ship)? _shipFilterFromArgs(Agent agent, List<String> only) {

Api getApi(
String token,
PostgreSQLConnection? connection,
) {
Database db, {
required bool useOutOfProcessNetwork,
}) {
// TODO(eseidel): This is wrong. This needs to check that
// that there is a network executor running, not just that we have
// a connection to the database.
if (connection == null) {
if (!useOutOfProcessNetwork) {
return apiFromAuthToken(token, ClientType.localLimits);
}
final api = apiFromAuthToken(token, ClientType.unlimited);
final queuedClient = QueuedClient(connection)..getPriority = () => 0;
final queuedClient = QueuedClient(db)..getPriority = () => 0;
api.apiClient.client = queuedClient;
return api;
}
Expand Down Expand Up @@ -88,9 +89,9 @@ Future<void> cliMain(List<String> args) async {
final token =
await loadAuthTokenOrRegister(fs, callsign: callsign, email: email);

final useOutOfProcessNetwork = !(results['local'] as bool);
final dbConnection = useOutOfProcessNetwork ? await defaultDatabase() : null;
final api = getApi(token, dbConnection);
final db = await defaultDatabase();
final api =
getApi(token, db, useOutOfProcessNetwork: !(results['local'] as bool));

final caches = await Caches.load(fs, api);
logger.info(
Expand Down Expand Up @@ -125,7 +126,7 @@ Future<void> cliMain(List<String> args) async {
// But that's also OK since that's nonsentical.
final shipFilter =
_shipFilterFromArgs(agent, results['only'] as List<String>);
await logic(api, centralCommand, caches, shipFilter: shipFilter);
await logic(api, db, centralCommand, caches, shipFilter: shipFilter);
}

Future<void> main(List<String> args) async {
Expand Down
20 changes: 12 additions & 8 deletions packages/cli/bin/earning_per_ship.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import 'package:cli/cache/ship_cache.dart';
import 'package:cli/cache/transactions.dart';
import 'package:cli/cli.dart';
import 'package:cli/printing.dart';
import 'package:db/db.dart';
import 'package:db/transaction.dart';
import 'package:intl/intl.dart';

class TransactionSummary {
Expand Down Expand Up @@ -60,8 +62,9 @@ Future<void> command(FileSystem fs, List<String> args) async {
lookbackMinutesString != null ? int.parse(lookbackMinutesString) : 180;
final lookback = Duration(minutes: lookbackMinutes);

final allTransactions = loadAllTransactions(fs).toList();
final shipSymbols = allTransactions.map((e) => e.shipSymbol).toSet();
final db = await defaultDatabase();

final shipSymbols = (await uniqueShipSymbols(db)).map(ShipSymbol.fromString);
final behaviorCache = BehaviorCache.load(fs);

final longestHexNumber = shipSymbols.fold(
Expand All @@ -88,12 +91,13 @@ Future<void> command(FileSystem fs, List<String> args) async {
final behaviorCreditPerSecondTotals = <String, double>{};

final startTime = DateTime.timestamp().subtract(lookback);
final transactions = allTransactions.where(
(t) =>
t.timestamp.isAfter(startTime) &&
(t.accounting == AccountingType.goods ||
t.accounting == AccountingType.fuel),
);
final transactions = (await transactionsAfter(db, startTime))
.map(Transaction.fromRecord)
.where(
(t) =>
t.accounting == AccountingType.goods ||
t.accounting == AccountingType.fuel,
);

for (final shipSymbol in shipSymbols) {
final ship = shipCache.ship(shipSymbol);
Expand Down
14 changes: 7 additions & 7 deletions packages/cli/bin/earning_rate.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'package:cli/cache/transactions.dart';
import 'package:cli/cli.dart';
import 'package:cli/printing.dart';
import 'package:db/db.dart';
import 'package:db/transaction.dart';

DateTime snapToHour(DateTime time) {
return DateTime.utc(time.year, time.month, time.day, time.hour);
Expand All @@ -11,13 +12,12 @@ int hoursAgo(DateTime time) {
}

Future<void> command(FileSystem fs, List<String> args) async {
final db = await defaultDatabase();
// Credits per hour.
final transactions = TransactionLog.load(fs);

final oldest = transactions.entries.first;
final firstTransactionHour = snapToHour(oldest.timestamp);
final oneDayAgoAsHour =
snapToHour(DateTime.now().subtract(const Duration(hours: 24)));
final transactions = await transactionsAfter(db, oneDayAgoAsHour);
final firstTransactionHour = snapToHour(transactions.first.timestamp);

const timeWidth = 5;
const creditsWidth = 15;
Expand All @@ -28,7 +28,7 @@ Future<void> command(FileSystem fs, List<String> args) async {
: firstTransactionHour;
var latestCredits = 0;
var lastPeriodCredits = 0;
for (final transaction in transactions.entries) {
for (final transaction in transactions) {
latestCredits = transaction.agentCredits;
if (transaction.timestamp.isAfter(nextPrintDate)) {
final diff = latestCredits - lastPeriodCredits;
Expand All @@ -43,7 +43,7 @@ Future<void> command(FileSystem fs, List<String> args) async {
nextPrintDate = nextPrintDate.add(const Duration(hours: 1));
}
}
final last = transactions.entries.lastOrNull;
final last = transactions.lastOrNull;
if (last != null) {
final sinceLast =
approximateDuration(DateTime.now().difference(last.timestamp))
Expand Down
7 changes: 5 additions & 2 deletions packages/cli/bin/list_repeated_transactions.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import 'package:cli/cache/caches.dart';
import 'package:cli/cli.dart';
import 'package:db/db.dart';
import 'package:db/transaction.dart';

void printDiffs(List<int> data) {
final diffs = <int>[];
Expand All @@ -10,12 +12,13 @@ void printDiffs(List<int> data) {
}

Future<void> command(FileSystem fs, List<String> args) async {
final db = await defaultDatabase();
final transactions = (await allTransactions(db)).map(Transaction.fromRecord);
// final marketPrices = MarketPrices.load(fs);
final allTransactions = loadAllTransactions(fs);
// Walk through all transactions, finding repeats.
final transactionSets = <List<Transaction>>[];
var repeats = <Transaction>[];
for (final transaction in allTransactions) {
for (final transaction in transactions) {
// If the transaction has the same market and tradeSymbol as the previous
// one, then it's a repeat, and should be collected together into a group
// of repeats.
Expand Down
11 changes: 6 additions & 5 deletions packages/cli/bin/list_transactions.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import 'package:cli/api.dart';
import 'package:cli/cache/transactions.dart';
import 'package:cli/cli.dart';
import 'package:db/db.dart';
import 'package:db/transaction.dart';

String describeTransaction(Transaction t) {
return '${t.timestamp} ${t.tradeSymbol} ${t.quantity} ${t.tradeType} '
Expand All @@ -14,12 +16,11 @@ Future<void> command(FileSystem fs, List<String> args) async {
final lookbackMinutes = (args.length > 1) ? int.parse(args[1]) : 180;
final lookback = Duration(minutes: lookbackMinutes);

final transactionLog = TransactionLog.load(fs);

final db = await defaultDatabase();
final startTime = DateTime.timestamp().subtract(lookback);
final transactions = transactionLog.where(
(t) => t.timestamp.isAfter(startTime) && t.shipSymbol == shipSymbol,
);
final transactions = (await transactionsAfter(db, startTime))
.map(Transaction.fromRecord)
.where((t) => t.shipSymbol == shipSymbol);
for (final transaction in transactions) {
logger.info(describeTransaction(transaction));
}
Expand Down
3 changes: 2 additions & 1 deletion packages/cli/bin/network_db.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import 'package:cli/cli.dart';
import 'package:cli/net/auth.dart';
import 'package:cli/net/queries.dart';
import 'package:cli/net/queue.dart';
import 'package:db/db.dart';

Future<void> command(FileSystem fs, List<String> args) async {
final connection = await defaultDatabase();
final api = defaultApi(fs, ClientType.unlimited);
final queuedClient = QueuedClient(connection!)..getPriority = () => 0;
final queuedClient = QueuedClient(connection)..getPriority = () => 0;
api.apiClient.client = queuedClient;
final agent = await getMyAgent(api);
logger.info('Got agent: $agent');
Expand Down
8 changes: 4 additions & 4 deletions packages/cli/bin/network_execute.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import 'package:cli/cli.dart';
import 'package:cli/net/queue.dart';
import 'package:cli/net/rate_limit.dart';
import 'package:cli/printing.dart';
import 'package:db/db.dart';
import 'package:http/http.dart';
import 'package:postgres/postgres.dart';

class NetExecutor {
NetExecutor(
PostgreSQLConnection connection, {
Database db, {
this.targetRequestsPerSecond = 3,
}) : queue = NetQueue(connection, QueueRole.responder);
}) : queue = NetQueue(db, QueueRole.responder);

final int targetRequestsPerSecond;
final Client _client = Client();
Expand Down Expand Up @@ -123,7 +123,7 @@ class NetExecutor {

Future<void> command(FileSystem fs, List<String> args) async {
final connection = await defaultDatabase();
await NetExecutor(connection!).run();
await NetExecutor(connection).run();
}

void main(List<String> args) async {
Expand Down
7 changes: 4 additions & 3 deletions packages/cli/bin/network_fill.dart
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import 'package:cli/cli.dart';
import 'package:cli/net/queue.dart';
import 'package:db/db.dart';

Future<void> command(FileSystem fs, List<String> args) async {
final connection = await defaultDatabase();
final queue = NetQueue(connection!, QueueRole.requestor);
final db = await defaultDatabase();
final queue = NetQueue(db, QueueRole.requestor);

final requests = [
(3, 'https://api.spacetraders.io/v2/my/3'),
Expand All @@ -22,7 +23,7 @@ Future<void> command(FileSystem fs, List<String> args) async {
logger.info('Got response: ${response.statusCode} ${response.body}');
}

await connection.close();
await db.close();
logger.info('Done!');
}

Expand Down
7 changes: 5 additions & 2 deletions packages/cli/bin/recent_deals.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import 'package:cli/cache/caches.dart';
import 'package:cli/cli.dart';
import 'package:cli/printing.dart';
import 'package:collection/collection.dart';
import 'package:db/db.dart';
import 'package:db/transaction.dart';

void main(List<String> args) async {
await runOffline(args, command);
Expand Down Expand Up @@ -99,12 +101,13 @@ bool Function(Transaction t) filterFromArgs(List<String> args) {
}

Future<void> command(FileSystem fs, List<String> args) async {
final db = await defaultDatabase();
final filter = filterFromArgs(args);
final transactionLog = TransactionLog.load(fs);
final deals = <SyntheticDeal>[];
final openDeals = <ShipSymbol, List<Transaction>>{};
final ignoredTransactions = <Transaction>[];
final supportedTypes = {AccountingType.fuel, AccountingType.goods};
final transactions = (await allTransactions(db)).map(Transaction.fromRecord);

void recordDeal(List<Transaction> openDeal) {
final deal = SyntheticDeal(openDeal);
Expand All @@ -115,7 +118,7 @@ Future<void> command(FileSystem fs, List<String> args) async {
}
}

for (final transaction in transactionLog.entries) {
for (final transaction in transactions) {
if (!filter(transaction)) {
continue;
}
Expand Down
5 changes: 5 additions & 0 deletions packages/cli/lib/behavior/advance.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import 'package:cli/behavior/trader.dart';
import 'package:cli/cache/caches.dart';
import 'package:cli/logger.dart';
import 'package:cli/nav/navigation.dart';
import 'package:db/db.dart';

Future<DateTime?> _advanceIdle(
Api api,
Database db,
CentralCommand centralCommand,
Caches caches,
BehaviorState state,
Expand All @@ -28,6 +30,7 @@ Future<DateTime?> _advanceIdle(

Future<DateTime?> Function(
Api api,
Database db,
CentralCommand centralCommand,
Caches caches,
BehaviorState state,
Expand Down Expand Up @@ -59,6 +62,7 @@ Future<DateTime?> Function(
/// or null if can be advanced immediately.
Future<DateTime?> advanceShipBehavior(
Api api,
Database db,
CentralCommand centralCommand,
Caches caches,
Ship ship, {
Expand All @@ -82,6 +86,7 @@ Future<DateTime?> advanceShipBehavior(
try {
final waitUntil = await behaviorFunction(
api,
db,
centralCommand,
caches,
state,
Expand Down
8 changes: 5 additions & 3 deletions packages/cli/lib/behavior/buy_ship.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import 'package:cli/nav/route.dart';
import 'package:cli/net/actions.dart';
import 'package:cli/net/queries.dart';
import 'package:cli/printing.dart';
import 'package:db/db.dart';

/// Job to buy a ship.
// class ShipBuyJob {
Expand Down Expand Up @@ -126,10 +127,10 @@ CostedTrip? findBestShipyardToBuy(
/// to try and share code.
Future<PurchaseShip201ResponseData> doBuyShipJob(
Api api,
Database db,
ShipCache shipCache,
ShipyardPrices shipyardPrices,
AgentCache agentCache,
TransactionLog transactionLog,
Ship ship,
WaypointSymbol shipyardSymbol,
ShipType shipType, {
Expand Down Expand Up @@ -178,9 +179,9 @@ Future<PurchaseShip201ResponseData> doBuyShipJob(
// Do we need to catch exceptions about insufficient credits?
final result = await purchaseShipAndLog(
api,
db,
shipCache,
agentCache,
transactionLog,
ship,
shipyardSymbol,
shipType,
Expand All @@ -191,6 +192,7 @@ Future<PurchaseShip201ResponseData> doBuyShipJob(
/// Apply the buy ship behavior.
Future<DateTime?> advanceBuyShip(
Api api,
Database db,
CentralCommand centralCommand,
Caches caches,
BehaviorState state,
Expand Down Expand Up @@ -308,9 +310,9 @@ Future<DateTime?> advanceBuyShip(
// Do we need to catch exceptions about insufficient credits?
final result = await purchaseShipAndLog(
api,
db,
caches.ships,
caches.agent,
caches.transactions,
ship,
shipyard.waypointSymbol,
shipType,
Expand Down
Loading

0 comments on commit 8d8a782

Please sign in to comment.