Skip to content

Commit

Permalink
feat: Sql support (#29)
Browse files Browse the repository at this point in the history
Co-authored-by: Tim Holm <[email protected]>
  • Loading branch information
HomelessDinosaur and tjholm authored Jul 10, 2024
1 parent 6529881 commit 0072ec2
Show file tree
Hide file tree
Showing 35 changed files with 1,015 additions and 219 deletions.
1 change: 0 additions & 1 deletion example/services/nitric_example.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import 'package:nitric_sdk/nitric.dart';
import 'package:nitric_sdk/resources.dart';
import 'package:uuid/uuid.dart';

class Profile {
Expand Down
4 changes: 4 additions & 0 deletions lib/src/api/api.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import 'package:grpc/grpc.dart';

export 'bucket.dart';
export 'keyvalue.dart';
export 'secret.dart';
export 'topic.dart';
export 'proto.dart';
export 'queue.dart';

typedef UseClientCallback<T extends Client, Resp> = Future<Resp> Function(T);
47 changes: 30 additions & 17 deletions lib/src/api/bucket.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'dart:async';
import 'dart:convert';

import 'package:nitric_sdk/src/api/api.dart';
import 'package:nitric_sdk/src/context/common.dart';
import 'package:nitric_sdk/src/grpc_helper.dart';
import 'package:nitric_sdk/src/nitric/proto/storage/v1/storage.pbgrpc.dart'
Expand All @@ -10,21 +11,15 @@ import 'package:fixnum/fixnum.dart';
import 'package:nitric_sdk/src/workers/common.dart';

class Bucket {
late $p.StorageClient _storageClient;
late $p.StorageListenerClient? _storageListenerClient;
late final $p.StorageClient? _storageClient;
late final $p.StorageListenerClient? _storageListenerClient;

String name;

Bucket(this.name,
{$p.StorageClient? client,
$p.StorageListenerClient? storageListenerClient}) {
if (client == null) {
_storageClient =
$p.StorageClient(ClientChannelSingleton.instance.clientChannel);
} else {
_storageClient = client;
}

{$p.StorageListenerClient? storageListenerClient,
$p.StorageClient? client}) {
_storageClient = client;
_storageListenerClient = storageListenerClient;
}

Expand All @@ -33,12 +28,27 @@ class Bucket {
return File(this, key);
}

Future<Resp> _useClient<Resp>(
UseClientCallback<$p.StorageClient, Resp> callback) async {
final client = _storageClient ??
$p.StorageClient(ClientChannelSingleton.instance.clientChannel);

var resp = await callback(client);

if (_storageClient == null) {
await ClientChannelSingleton.instance.release();
}

return resp;
}

/// Get a list of references to the files in the bucket. Optionally supply a [prefix] to filter by.
Future<List<File>> files({String prefix = ""}) async {
final request =
$p.StorageListBlobsRequest(bucketName: name, prefix: prefix);

var resp = await _storageClient.listBlobs(request);
var resp =
await _useClient((client) async => await client.listBlobs(request));

return resp.blobs.map((blob) => File(this, blob.key)).toList();
}
Expand Down Expand Up @@ -81,7 +91,7 @@ class File {
key: key,
);

await _bucket._storageClient.delete(req);
await _bucket._useClient((client) async => await client.delete(req));
}

/// Read the file from the bucket.
Expand All @@ -91,7 +101,8 @@ class File {
key: key,
);

var resp = await _bucket._storageClient.read(req);
var resp =
await _bucket._useClient((client) async => await client.read(req));

return utf8.decode(resp.body);
}
Expand All @@ -106,7 +117,7 @@ class File {
body: bytes,
);

await _bucket._storageClient.write(req);
await _bucket._useClient((client) async => await client.write(req));
}

/// Check whether the file exists in the bucket.
Expand All @@ -116,7 +127,8 @@ class File {
key: key,
);

var resp = await _bucket._storageClient.exists(req);
var resp =
await _bucket._useClient((client) async => await client.exists(req));

return resp.exists;
}
Expand Down Expand Up @@ -148,7 +160,8 @@ class File {
expiry: exp,
);

var resp = await _bucket._storageClient.preSignUrl(req);
var resp = await _bucket
._useClient((client) async => await client.preSignUrl(req));

return resp.url;
}
Expand Down
33 changes: 22 additions & 11 deletions lib/src/api/keyvalue.dart
Original file line number Diff line number Diff line change
@@ -1,29 +1,40 @@
import 'dart:async';

import 'package:nitric_sdk/src/api/api.dart';
import 'package:nitric_sdk/src/grpc_helper.dart';
import 'package:nitric_sdk/src/nitric/proto/kvstore/v1/kvstore.pbgrpc.dart'
as $p;

/// A Key Value Store.
class KeyValueStore {
late $p.KvStoreClient _keyValueClient;
late final $p.KvStoreClient? _keyValueClient;

final String name;

KeyValueStore(this.name, {$p.KvStoreClient? client}) {
if (client == null) {
_keyValueClient =
$p.KvStoreClient(ClientChannelSingleton.instance.clientChannel);
} else {
_keyValueClient = client;
_keyValueClient = client;
}

Future<Resp> _useClient<Resp>(
UseClientCallback<$p.KvStoreClient, Resp> callback) async {
final client = _keyValueClient ??
$p.KvStoreClient(ClientChannelSingleton.instance.clientChannel);

var resp = callback(client);

if (_keyValueClient == null) {
await ClientChannelSingleton.instance.release();
}

return resp;
}

/// Get a reference to a [key] in the store.
Future<Map<String, dynamic>> get(String key) async {
var req =
$p.KvStoreGetValueRequest(ref: $p.ValueRef(key: key, store: name));

var resp = await _keyValueClient.getValue(req);
var resp = await _useClient((client) async => await client.getValue(req));

return Proto.mapFromStruct(resp.value.content);
}
Expand All @@ -35,23 +46,23 @@ class KeyValueStore {
var req = $p.KvStoreSetValueRequest(
ref: $p.ValueRef(key: key, store: name), content: content);

await _keyValueClient.setValue(req);
await _useClient((client) async => await client.setValue(req));
}

/// Delete a [key] from the store.
Future<void> delete(String key) async {
var req =
$p.KvStoreDeleteKeyRequest(ref: $p.ValueRef(key: key, store: name));

await _keyValueClient.deleteKey(req);
await _useClient((client) async => await client.deleteKey(req));
}

/// Get a stream of key values that match the [prefix].
Stream<String> keys({String prefix = ""}) {
Future<Stream<String>> keys({String prefix = ""}) async {
var req =
$p.KvStoreScanKeysRequest(store: $p.Store(name: name), prefix: prefix);

var resp = _keyValueClient.scanKeys(req);
var resp = await _useClient((client) async => client.scanKeys(req));

return resp.map((event) => event.key);
}
Expand Down
27 changes: 18 additions & 9 deletions lib/src/api/queue.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,28 @@ import 'package:nitric_sdk/src/grpc_helper.dart';
import 'package:nitric_sdk/src/nitric/proto/queues/v1/queues.pbgrpc.dart' as $p;

class Queue {
late $p.QueuesClient _queuesClient;
late final $p.QueuesClient? _queuesClient;

/// The name of the queue.
String name;

/// Construct a new queue.
Queue(this.name, {$p.QueuesClient? client}) {
if (client == null) {
_queuesClient =
$p.QueuesClient(ClientChannelSingleton.instance.clientChannel);
} else {
_queuesClient = client;
_queuesClient = client;
}

Future<Resp> _useClient<Resp>(
UseClientCallback<$p.QueuesClient, Resp> callback) async {
final client = _queuesClient ??
$p.QueuesClient(ClientChannelSingleton.instance.clientChannel);

var resp = callback(client);

if (_queuesClient == null) {
await ClientChannelSingleton.instance.release();
}

return resp;
}

/// Enqueue a list of [messages] to the queue.
Expand All @@ -31,7 +40,7 @@ class Queue {
queueName: name,
);

var resp = await _queuesClient.enqueue(req);
var resp = await _useClient((client) async => await client.enqueue(req));

return resp.failedMessages.map((fm) => FailedMessage(fm)).toList();
}
Expand All @@ -40,7 +49,7 @@ class Queue {
Future<List<DequeuedMessage>> dequeue({int depth = 1}) async {
var req = $p.QueueDequeueRequest(queueName: name, depth: depth);

var resp = await _queuesClient.dequeue(req);
var resp = await _useClient((client) async => await client.dequeue(req));

return resp.messages.map((m) => DequeuedMessage(this, m)).toList();
}
Expand All @@ -64,7 +73,7 @@ class DequeuedMessage {
var req =
$p.QueueCompleteRequest(leaseId: _leaseId, queueName: _queue.name);

await _queue._queuesClient.complete(req);
await _queue._useClient((client) async => await client.complete(req));
}
}

Expand Down
28 changes: 20 additions & 8 deletions lib/src/api/secret.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,30 @@ import 'package:nitric_sdk/src/grpc_helper.dart';
import 'package:nitric_sdk/src/nitric/proto/secrets/v1/secrets.pbgrpc.dart'
as $p;

import 'api.dart';

/// References an encrypted secret stored in a secret manager.
class Secret {
/// The name of the secret
final String name;
late final $p.SecretManagerClient _secretClient;
late final $p.SecretManagerClient? _secretClient;

Secret(this.name, {$p.SecretManagerClient? client}) {
if (client == null) {
_secretClient =
$p.SecretManagerClient(ClientChannelSingleton.instance.clientChannel);
} else {
_secretClient = client;
_secretClient = client;
}

Future<Resp> _useClient<Resp>(
UseClientCallback<$p.SecretManagerClient, Resp> callback) async {
final client = _secretClient ??
$p.SecretManagerClient(ClientChannelSingleton.instance.clientChannel);

var resp = callback(client);

if (_secretClient == null) {
await ClientChannelSingleton.instance.release();
}

return resp;
}

/// Get a reference to a specific [version] of this secret.
Expand All @@ -32,7 +43,7 @@ class Secret {
/// Put a new [value] to the secret, creating a new secret version and returning it.
Future<SecretVersion> put(String value) async {
var req = $p.SecretPutRequest(secret: _toWire(), value: utf8.encode(value));
var resp = await _secretClient.put(req);
var resp = await _useClient((client) async => client.put(req));

return SecretVersion._fromWire(this, resp.secretVersion);
}
Expand Down Expand Up @@ -60,7 +71,8 @@ class SecretVersion {
/// Access the value of this secret version.
Future<SecretValue> access() async {
var req = $p.SecretAccessRequest(secretVersion: _toWire());
var resp = await _secret._secretClient.access(req);
var resp =
await _secret._useClient((client) async => await client.access(req));

return SecretValue(version, utf8.decode(resp.value));
}
Expand Down
58 changes: 58 additions & 0 deletions lib/src/api/sql.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import 'package:nitric_sdk/nitric.dart';
import 'package:nitric_sdk/src/grpc_helper.dart';
import 'package:nitric_sdk/src/nitric/proto/resources/v1/resources.pb.dart';
import 'package:nitric_sdk/src/nitric/proto/resources/v1/resources.pbgrpc.dart'
as $rp;
import 'package:nitric_sdk/src/nitric/proto/sql/v1/sql.pbgrpc.dart' as $p;

import 'api.dart';

/// A Topic for publishing events to subscribers of this topic.
class SqlDatabase extends Resource {
/// The name of the topic
late final String? migrations;
late final $p.SqlClient? _sqlClient;

SqlDatabase(String name,
{this.migrations,
$p.SqlClient? client,
$rp.ResourcesClient? resourcesClient})
: super(name, resourcesClient) {
_sqlClient = client;
}

Future<Resp> _useClient<Resp>(
UseClientCallback<$p.SqlClient, Resp> callback) async {
final client = _sqlClient ??
$p.SqlClient(ClientChannelSingleton.instance.clientChannel);

var resp = callback(client);

if (_sqlClient == null) {
await ClientChannelSingleton.instance.release();
}

return resp;
}

/// Returns a connection endpoint to connect to the SQL database
Future<String> connectionString() async {
final req = $p.SqlConnectionStringRequest(databaseName: name);

final resp =
await _useClient((client) async => await client.connectionString(req));

return resp.connectionString;
}

@override
$rp.ResourceDeclareRequest asRequest() {
var resource =
$rp.ResourceIdentifier(name: name, type: $rp.ResourceType.SqlDatabase);

return $rp.ResourceDeclareRequest(
id: resource,
sqlDatabase: $rp.SqlDatabaseResource(
migrations: SqlDatabaseMigrations(migrationsPath: migrations)));
}
}
Loading

0 comments on commit 0072ec2

Please sign in to comment.