Skip to content

Commit

Permalink
feat: add historical data switch
Browse files Browse the repository at this point in the history
  • Loading branch information
pedronauck committed Dec 28, 2024
1 parent 36f5ee5 commit afa9c37
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 78 deletions.
4 changes: 2 additions & 2 deletions packages/fuel-streams/examples/stream/blocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ async function main() {
const stream = await BlocksStream.init(client);
const subscription = await stream.subscribe(BlocksSubject.build());

for await (const block of subscription) {
console.log(chalk.blue(`Received block: ${block}`));
for await (const data of subscription) {
console.log(chalk.blue(`Received block: ${data}`));
}

await stream.flushAwait();
Expand Down
10 changes: 5 additions & 5 deletions packages/fuel-streams/src/client-opts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ export class ClientOpts {
}

connectOpts() {
const username = 'default_user';
const password = '';
const username = 'admin';
const password = 'admin';
const authenticator = connector(username, password);
return {
servers: this.url,
Expand All @@ -66,9 +66,9 @@ export class ClientOpts {
}

get url() {
const subdomain =
this.network === Network.mainnet ? 'stream' : 'stream-testnet';
return `wss://${subdomain}.fuel.network:8443`;
// const subdomain =
// this.network === Network.mainnet ? "stream" : "stream-testnet";
return 'ws://k8s-fuelstre-fuelstre-d1a733c184-c10339d402ab28fd.elb.us-east-1.amazonaws.com:8443';
}
}

Expand Down
5 changes: 3 additions & 2 deletions packages/fuel-streams/src/nats-client.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { JetStreamClient, JetStreamManager } from '@nats-io/jetstream';
import { jetstream } from '@nats-io/jetstream';
import { type KvOptions, Kvm } from '@nats-io/kv';
import type { NatsConnection, Status } from '@nats-io/nats-core';
import { wsconnect } from '@nats-io/transport-node';
Expand Down Expand Up @@ -84,10 +85,10 @@ export class Client {
}

console.info(`Successfully connected to ${nc.getServer()} !`);

const js = jetstream(nc, { domain: 'CORE' });
this.natsConnection = nc;
this.opts = opts;
this.kvm = new Kvm(nc);
this.kvm = new Kvm(js);
this.jetStreamManager = await this.kvm.js.jetstreamManager();
this.jetStream = this.jetStreamManager.jetstream();
} catch (error) {
Expand Down
108 changes: 57 additions & 51 deletions packages/fuel-streams/src/parsers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import {
type RawUtxo,
type RawVariableOutput,
type Receipt,
ReceiptType,
// ReceiptType,
type Transaction,
type Utxo,
} from './types';
Expand Down Expand Up @@ -181,43 +181,47 @@ export class OutputParser implements StreamParser<Output, RawOutput> {

export class LogWithoutDataParser {
parse(data: RawLogWithoutData): Log {
const transformations = {
type: () => ReceiptType.Log,
ra: toBN,
rb: toBN,
rc: toBN,
rd: toBN,
pc: toBN,
is: toBN,
};
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
return data as any;
// const transformations = {
// type: () => ReceiptType.Log,
// ra: toBN,
// rb: toBN,
// rc: toBN,
// rd: toBN,
// pc: toBN,
// is: toBN,
// };

return evolve(transformations, {
...data,
val0: toBN(data.ra),
val1: toBN(data.rb),
val2: toBN(data.rc),
val3: toBN(data.rd),
}) as Log;
// return evolve(transformations, {
// ...data,
// val0: toBN(data.ra),
// val1: toBN(data.rb),
// val2: toBN(data.rc),
// val3: toBN(data.rd),
// }) as Log;
}
}

export class LogWithDataParser {
parse(data: RawLogWithData): Log {
const transformations = {
type: () => ReceiptType.LogData,
ra: toBN,
rb: toBN,
ptr: toBN,
len: toBN,
pc: toBN,
is: toBN,
};
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
return data as any;
// const transformations = {
// type: () => ReceiptType.LogData,
// ra: toBN,
// rb: toBN,
// ptr: toBN,
// len: toBN,
// pc: toBN,
// is: toBN,
// };

return evolve(transformations, {
...data,
val0: toBN(data.ra),
val1: toBN(data.rb),
}) as Log;
// return evolve(transformations, {
// ...data,
// val0: toBN(data.ra),
// val1: toBN(data.rb),
// }) as Log;
}
}

Expand All @@ -237,27 +241,29 @@ export class LogParser implements StreamParser<Log, RawLog> {

export class ReceiptParser implements StreamParser<Receipt, RawReceipt> {
parse(data: RawReceipt): Receipt {
const transformations = {
type: (v: RawReceipt['type']) => ReceiptType[v],
amount: toBN,
gas: toBN,
gasUsed: toBN,
is: toBN,
len: toBN,
param1: toBN,
param2: toBN,
pc: toBN,
ptr: toBN,
ra: toBN,
rb: toBN,
rc: toBN,
rd: toBN,
reason: toBN,
result: toBN,
val: toBN,
};
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
return data as any;
// const transformations = {
// type: (v: RawReceipt['type']) => ReceiptType[v],
// amount: toBN,
// gas: toBN,
// gasUsed: toBN,
// is: toBN,
// len: toBN,
// param1: toBN,
// param2: toBN,
// pc: toBN,
// ptr: toBN,
// ra: toBN,
// rb: toBN,
// rc: toBN,
// rd: toBN,
// reason: toBN,
// result: toBN,
// val: toBN,
// };

return evolve(transformations, data) as Receipt;
// return evolve({}, data) as Receipt;
}
}

Expand Down
28 changes: 16 additions & 12 deletions packages/fuel-streams/src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export type StreamData<T, R> = {

export interface SubscribeConsumerConfig<C extends Array<unknown>> {
filterSubjects: C;
deliverPolicy: DeliverPolicy;
}

export type StreamIterator<T extends StreamData<unknown, unknown>> =
Expand Down Expand Up @@ -85,31 +86,34 @@ export class Stream<T extends GenericRecord, R extends GenericRecord> {
}

// biome-ignore lint/suspicious/noExplicitAny: <explanation>
async subscribe<S extends SubjectBase<any>>(subject: S) {
return this.subscribeConsumer({
filterSubjects: [subject],
});
async subscribe<S extends SubjectBase<any>>(
subject: S,
deliverPolicy: DeliverPolicy = DeliverPolicy.New,
) {
return this.subscribeConsumer({ filterSubjects: [subject], deliverPolicy });
}

async subscribeWithString(subject: string) {
return this.subscribeConsumer({
filterSubjects: [subject],
});
async subscribeWithString(
subject: string,
deliverPolicy: DeliverPolicy = DeliverPolicy.New,
) {
return this.subscribeConsumer({ filterSubjects: [subject], deliverPolicy });
}

// biome-ignore lint/suspicious/noExplicitAny: <explanation>
async subscribeConsumer<C extends Array<string | SubjectBase<any>>>(
userConfig: SubscribeConsumerConfig<C>,
): Promise<StreamIterator<StreamData<T, R>>> {
const consumer = await this.createConsumer({
ack_policy: AckPolicy.None,
deliver_policy: DeliverPolicy.New,
const opts = {
deliver_policy: userConfig.deliverPolicy,
filter_subjects: userConfig.filterSubjects?.map((subject) => {
return typeof subject === 'object' && 'parse' in subject
? subject.parse()
: subject;
}),
});
} as Partial<ConsumerConfig>;
console.log('opts', opts);
const consumer = await this.createConsumer(opts);

const parser = this.parser;
const iterator = await consumer.consume();
Expand Down
2 changes: 2 additions & 0 deletions packages/simple-app/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
"dependencies": {
"@fuels/streams": "workspace:*",
"@microlink/react-json-view": "1.23.4",
"@nats-io/jetstream": "3.0.0-32",
"@radix-ui/react-scroll-area": "1.2.1",
"@radix-ui/react-select": "2.1.2",
"@radix-ui/react-slot": "1.1.0",
"@radix-ui/react-switch": "1.1.2",
"@radix-ui/react-tabs": "1.1.1",
"@radix-ui/react-tooltip": "1.1.4",
"@statelyai/inspect": "0.4.0",
Expand Down
22 changes: 21 additions & 1 deletion packages/simple-app/src/components/stream-form.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import {
SelectTrigger,
SelectValue,
} from '@/components/ui/select';
import { Switch } from '@/components/ui/switch';
import { useDynamicForm } from '@/lib/form';
import { useStreamData } from '@/lib/stream/use-stream-data';
import { DeliverPolicy } from '@fuels/streams';
import { Play, Square } from 'lucide-react';
import v from 'voca';

Expand All @@ -31,7 +33,8 @@ export function StreamForm() {
subject,
} = useDynamicForm();

const { start, stop, isSubscribing } = useStreamData();
const { start, stop, isSubscribing, changeDeliveryPolicy, deliverPolicy } =
useStreamData();

function handleSubmit() {
if (!selectedModule || !subject) return;
Expand All @@ -40,6 +43,23 @@ export function StreamForm() {

return (
<div className="space-y-4" aria-label="Stream Configuration Form">
<div className="flex items-center gap-4">
<Switch
className="mt-1/5"
id="historical-data"
checked={deliverPolicy === DeliverPolicy.All}
onCheckedChange={(checked) =>
changeDeliveryPolicy(
checked ? DeliverPolicy.All : DeliverPolicy.New,
)
}
aria-label="Toggle historical data"
/>
<label htmlFor="historical-data" className="text-sm font-medium">
Enable historical data
</label>
</div>

<div className={variantOptions.length > 0 ? 'flex gap-4' : ''}>
<div className={variantOptions.length > 0 ? 'w-1/2' : 'w-full'}>
<label
Expand Down
27 changes: 27 additions & 0 deletions packages/simple-app/src/components/ui/switch.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import * as SwitchPrimitives from '@radix-ui/react-switch';
import * as React from 'react';

import { cn } from '@/lib/utils';

const Switch = React.forwardRef<
React.ElementRef<typeof SwitchPrimitives.Root>,
React.ComponentPropsWithoutRef<typeof SwitchPrimitives.Root>
>(({ className, ...props }, ref) => (
<SwitchPrimitives.Root
className={cn(
'peer inline-flex h-5 w-9 shrink-0 cursor-pointer items-center rounded-full border-2 border-transparent shadow-sm transition-colors focus-visible:outline-none focus-visible:ring-2 focus-visible:ring-ring focus-visible:ring-offset-2 focus-visible:ring-offset-background disabled:cursor-not-allowed disabled:opacity-50 data-[state=checked]:bg-primary data-[state=unchecked]:bg-input',
className,
)}
{...props}
ref={ref}
>
<SwitchPrimitives.Thumb
className={cn(
'pointer-events-none block h-4 w-4 rounded-full bg-background shadow-lg ring-0 transition-transform data-[state=checked]:translate-x-4 data-[state=unchecked]:translate-x-0',
)}
/>
</SwitchPrimitives.Root>
));
Switch.displayName = SwitchPrimitives.Root.displayName;

export { Switch };
Loading

0 comments on commit afa9c37

Please sign in to comment.