Skip to content

Commit

Permalink
add useClient callback
Browse files Browse the repository at this point in the history
  • Loading branch information
HomelessDinosaur committed Jul 10, 2024
1 parent 471b59a commit e95e539
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 81 deletions.
4 changes: 4 additions & 0 deletions lib/src/api/api.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import 'package:grpc/grpc.dart';

export 'bucket.dart';
export 'keyvalue.dart';
export 'secret.dart';
export 'topic.dart';
export 'proto.dart';
export 'queue.dart';

typedef UseClientCallback<T extends Client, Resp> = Future<Resp> Function(T);
45 changes: 21 additions & 24 deletions lib/src/api/bucket.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'dart:async';
import 'dart:convert';

import 'package:nitric_sdk/src/api/api.dart';
import 'package:nitric_sdk/src/context/common.dart';
import 'package:nitric_sdk/src/grpc_helper.dart';
import 'package:nitric_sdk/src/nitric/proto/storage/v1/storage.pbgrpc.dart'
Expand All @@ -27,24 +28,27 @@ class Bucket {
return File(this, key);
}

$p.StorageClient _getClient() {
if (_storageClient != null) {
return _storageClient;
Future<Resp> _useClient<Resp>(
UseClientCallback<$p.StorageClient, Resp> callback) async {
final client = _storageClient ??
$p.StorageClient(ClientChannelSingleton.instance.clientChannel);

var resp = await callback(client);

if (_storageClient == null) {
await ClientChannelSingleton.instance.release();
}

return $p.StorageClient(ClientChannelSingleton.instance.clientChannel);
return resp;
}

/// Get a list of references to the files in the bucket. Optionally supply a [prefix] to filter by.
Future<List<File>> files({String prefix = ""}) async {
final storageClient = _getClient();

final request =
$p.StorageListBlobsRequest(bucketName: name, prefix: prefix);

var resp = await storageClient.listBlobs(request);

await ClientChannelSingleton.instance.release();
var resp =
await _useClient((client) async => await client.listBlobs(request));

return resp.blobs.map((blob) => File(this, blob.key)).toList();
}
Expand Down Expand Up @@ -87,9 +91,7 @@ class File {
key: key,
);

await _bucket._getClient().delete(req);

await ClientChannelSingleton.instance.release();
await _bucket._useClient((client) async => await client.delete(req));
}

/// Read the file from the bucket.
Expand All @@ -99,9 +101,8 @@ class File {
key: key,
);

var resp = await _bucket._getClient().read(req);

await ClientChannelSingleton.instance.release();
var resp =
await _bucket._useClient((client) async => await client.read(req));

return utf8.decode(resp.body);
}
Expand All @@ -116,9 +117,7 @@ class File {
body: bytes,
);

await _bucket._getClient().write(req);

await ClientChannelSingleton.instance.release();
await _bucket._useClient((client) async => await client.write(req));
}

/// Check whether the file exists in the bucket.
Expand All @@ -128,9 +127,8 @@ class File {
key: key,
);

var resp = await _bucket._getClient().exists(req);

await ClientChannelSingleton.instance.release();
var resp =
await _bucket._useClient((client) async => await client.exists(req));

return resp.exists;
}
Expand Down Expand Up @@ -162,9 +160,8 @@ class File {
expiry: exp,
);

var resp = await _bucket._getClient().preSignUrl(req);

await ClientChannelSingleton.instance.release();
var resp = await _bucket
._useClient((client) async => await client.preSignUrl(req));

return resp.url;
}
Expand Down
32 changes: 15 additions & 17 deletions lib/src/api/keyvalue.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,26 @@ class KeyValueStore {
_keyValueClient = client;
}

$p.KvStoreClient _getClient() {
if (_keyValueClient != null) {
return _keyValueClient;
Future<Resp> _useClient<Resp>(
UseClientCallback<$p.KvStoreClient, Resp> callback) async {
final client = _keyValueClient ??
$p.KvStoreClient(ClientChannelSingleton.instance.clientChannel);

var resp = callback(client);

if (_keyValueClient == null) {
await ClientChannelSingleton.instance.release();
}

return $p.KvStoreClient(ClientChannelSingleton.instance.clientChannel);
return resp;
}

/// Get a reference to a [key] in the store.
Future<Map<String, dynamic>> get(String key) async {
var req =
$p.KvStoreGetValueRequest(ref: $p.ValueRef(key: key, store: name));

var resp = await _getClient().getValue(req);

await ClientChannelSingleton.instance.release();
var resp = await _useClient((client) async => await client.getValue(req));

return Proto.mapFromStruct(resp.value.content);
}
Expand All @@ -42,29 +46,23 @@ class KeyValueStore {
var req = $p.KvStoreSetValueRequest(
ref: $p.ValueRef(key: key, store: name), content: content);

await _getClient().setValue(req);

await ClientChannelSingleton.instance.release();
await _useClient((client) async => await client.setValue(req));
}

/// Delete a [key] from the store.
Future<void> delete(String key) async {
var req =
$p.KvStoreDeleteKeyRequest(ref: $p.ValueRef(key: key, store: name));

await _getClient().deleteKey(req);

await ClientChannelSingleton.instance.release();
await _useClient((client) async => await client.deleteKey(req));
}

/// Get a stream of key values that match the [prefix].
Stream<String> keys({String prefix = ""}) {
Future<Stream<String>> keys({String prefix = ""}) async {
var req =
$p.KvStoreScanKeysRequest(store: $p.Store(name: name), prefix: prefix);

var resp = _getClient().scanKeys(req);

unawaited(ClientChannelSingleton.instance.release());
var resp = await _useClient((client) async => client.scanKeys(req));

return resp.map((event) => event.key);
}
Expand Down
26 changes: 13 additions & 13 deletions lib/src/api/queue.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@ class Queue {
_queuesClient = client;
}

$p.QueuesClient _getClient() {
if (_queuesClient != null) {
return _queuesClient;
Future<Resp> _useClient<Resp>(
UseClientCallback<$p.QueuesClient, Resp> callback) async {
final client = _queuesClient ??
$p.QueuesClient(ClientChannelSingleton.instance.clientChannel);

var resp = callback(client);

if (_queuesClient == null) {
await ClientChannelSingleton.instance.release();
}

return $p.QueuesClient(ClientChannelSingleton.instance.clientChannel);
return resp;
}

/// Enqueue a list of [messages] to the queue.
Expand All @@ -34,9 +40,7 @@ class Queue {
queueName: name,
);

var resp = await _getClient().enqueue(req);

await ClientChannelSingleton.instance.release();
var resp = await _useClient((client) async => await client.enqueue(req));

return resp.failedMessages.map((fm) => FailedMessage(fm)).toList();
}
Expand All @@ -45,9 +49,7 @@ class Queue {
Future<List<DequeuedMessage>> dequeue({int depth = 1}) async {
var req = $p.QueueDequeueRequest(queueName: name, depth: depth);

var resp = await _getClient().dequeue(req);

await ClientChannelSingleton.instance.release();
var resp = await _useClient((client) async => await client.dequeue(req));

return resp.messages.map((m) => DequeuedMessage(this, m)).toList();
}
Expand All @@ -71,9 +73,7 @@ class DequeuedMessage {
var req =
$p.QueueCompleteRequest(leaseId: _leaseId, queueName: _queue.name);

await _queue._getClient().complete(req);

await ClientChannelSingleton.instance.release();
await _queue._useClient((client) async => await client.complete(req));
}
}

Expand Down
26 changes: 15 additions & 11 deletions lib/src/api/secret.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import 'package:nitric_sdk/src/grpc_helper.dart';
import 'package:nitric_sdk/src/nitric/proto/secrets/v1/secrets.pbgrpc.dart'
as $p;

import 'api.dart';

/// References an encrypted secret stored in a secret manager.
class Secret {
/// The name of the secret
Expand All @@ -14,13 +16,18 @@ class Secret {
_secretClient = client;
}

$p.SecretManagerClient _getClient() {
if (_secretClient != null) {
return _secretClient;
Future<Resp> _useClient<Resp>(
UseClientCallback<$p.SecretManagerClient, Resp> callback) async {
final client = _secretClient ??
$p.SecretManagerClient(ClientChannelSingleton.instance.clientChannel);

var resp = callback(client);

if (_secretClient == null) {
await ClientChannelSingleton.instance.release();
}

return $p.SecretManagerClient(
ClientChannelSingleton.instance.clientChannel);
return resp;
}

/// Get a reference to a specific [version] of this secret.
Expand All @@ -36,9 +43,7 @@ class Secret {
/// Put a new [value] to the secret, creating a new secret version and returning it.
Future<SecretVersion> put(String value) async {
var req = $p.SecretPutRequest(secret: _toWire(), value: utf8.encode(value));
var resp = await _getClient().put(req);

await ClientChannelSingleton.instance.release();
var resp = await _useClient((client) async => client.put(req));

return SecretVersion._fromWire(this, resp.secretVersion);
}
Expand Down Expand Up @@ -66,9 +71,8 @@ class SecretVersion {
/// Access the value of this secret version.
Future<SecretValue> access() async {
var req = $p.SecretAccessRequest(secretVersion: _toWire());
var resp = await _secret._getClient().access(req);

await ClientChannelSingleton.instance.release();
var resp =
await _secret._useClient((client) async => await client.access(req));

return SecretValue(version, utf8.decode(resp.value));
}
Expand Down
22 changes: 15 additions & 7 deletions lib/src/api/sql.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import 'package:nitric_sdk/src/nitric/proto/resources/v1/resources.pbgrpc.dart'
as $rp;
import 'package:nitric_sdk/src/nitric/proto/sql/v1/sql.pbgrpc.dart' as $p;

import 'api.dart';

/// A Topic for publishing events to subscribers of this topic.
class SqlDatabase extends Resource {
/// The name of the topic
Expand All @@ -19,20 +21,26 @@ class SqlDatabase extends Resource {
_sqlClient = client;
}

$p.SqlClient _getClient() {
if (_sqlClient != null) {
return _sqlClient;
Future<Resp> _useClient<Resp>(
UseClientCallback<$p.SqlClient, Resp> callback) async {
final client = _sqlClient ??
$p.SqlClient(ClientChannelSingleton.instance.clientChannel);

var resp = callback(client);

if (_sqlClient == null) {
await ClientChannelSingleton.instance.release();
}

return $p.SqlClient(ClientChannelSingleton.instance.clientChannel);
return resp;
}

/// Returns a connection endpoint to connect to the SQL database
Future<String> connectionString() async {
final resp = await _getClient()
.connectionString($p.SqlConnectionStringRequest(databaseName: name));
final req = $p.SqlConnectionStringRequest(databaseName: name);

await ClientChannelSingleton.instance.release();
final resp =
await _useClient((client) async => await client.connectionString(req));

return resp.connectionString;
}
Expand Down
18 changes: 11 additions & 7 deletions lib/src/api/topic.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@ class Topic {
_topicsClient = client;
}

$p.TopicsClient _getClient() {
if (_topicsClient != null) {
return _topicsClient;
Future<Resp> _useClient<Resp>(
UseClientCallback<$p.TopicsClient, Resp> callback) async {
final client = _topicsClient ??
$p.TopicsClient(ClientChannelSingleton.instance.clientChannel);

var resp = callback(client);

if (_topicsClient == null) {
await ClientChannelSingleton.instance.release();
}

return $p.TopicsClient(ClientChannelSingleton.instance.clientChannel);
return resp;
}

/// Publish a [message] to the topic. Optional [delay] (in seconds) can be set to delay the message publish time.
Expand All @@ -35,8 +41,6 @@ class Topic {
delay: $d.Duration(seconds: Int64(delay)),
);

await _getClient().publish(req);

await ClientChannelSingleton.instance.release();
await _useClient((client) async => await client.publish(req));
}
}
4 changes: 2 additions & 2 deletions test/src/api/keyvalue_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ void main() {

var kvStore = KeyValueStore("keyvalueName", client: keyValueClient);

var keys = kvStore.keys();
var keys = await kvStore.keys();

verify(() => keyValueClient.scanKeys(req)).called(1);

Expand All @@ -117,7 +117,7 @@ void main() {

var kvStore = KeyValueStore("keyvalueName", client: keyValueClient);

var keys = kvStore.keys(prefix: "f");
var keys = await kvStore.keys(prefix: "f");

verify(() => keyValueClient.scanKeys(req)).called(1);

Expand Down

0 comments on commit e95e539

Please sign in to comment.