Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Frank-III committed Aug 1, 2024
1 parent 461e6c3 commit d03e9c9
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 131 deletions.
36 changes: 13 additions & 23 deletions dev/Todos.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { Rx } from "../src"
import * as HttpClient from "@effect/platform/HttpClient"
import * as HttpRequest from "@effect/platform/HttpClientRequest"
import * as HttpResponse from "@effect/platform/HttpClientResponse"
import * as Schema from "@effect/schema/Schema"
import { Effect, Layer, Option, Stream } from "effect"
import { Rx } from '../src'
import * as HttpClient from '@effect/platform/HttpClient'
import * as HttpRequest from '@effect/platform/HttpClientRequest'
import * as HttpResponse from '@effect/platform/HttpClientResponse'
import * as Schema from '@effect/schema/Schema'
import { Effect, Layer, Option, Stream } from 'effect'

export class Todo extends Schema.Class<Todo>("Todo")({
export class Todo extends Schema.Class<Todo>('Todo')({
id: Schema.Number,
title: Schema.String,
completed: Schema.Boolean,
Expand All @@ -17,13 +17,11 @@ export class Todo extends Schema.Class<Todo>("Todo")({
const make = Effect.gen(function* () {
const defaultClient = yield* HttpClient.HttpClient
const client = defaultClient.pipe(
HttpClient.mapRequest(
HttpRequest.prependUrl("https://jsonplaceholder.typicode.com"),
),
HttpClient.mapRequest(HttpRequest.prependUrl('https://jsonplaceholder.typicode.com')),
HttpClient.filterStatusOk,
)

const getTodos = HttpRequest.get("/todos")
const getTodos = HttpRequest.get('/todos')
const stream = (perPage: number) =>
Stream.paginateChunkEffect(1, page =>
getTodos.pipe(
Expand All @@ -35,24 +33,16 @@ const make = Effect.gen(function* () {
HttpResponse.schemaBodyJsonScoped(Todo.chunk),
Effect.map(chunk => [
chunk,
Option.some(page + 1).pipe(
Option.filter(() => chunk.length === perPage),
),
Option.some(page + 1).pipe(Option.filter(() => chunk.length === perPage)),
]),
),
)
const effect = getTodos.pipe(
client,
HttpResponse.schemaBodyJsonScoped(Todo.array),
)
const effect = getTodos.pipe(client, HttpResponse.schemaBodyJsonScoped(Todo.array))

return { stream, effect } as const
})

export class Todos extends Effect.Tag("Todos")<
Todos,
Effect.Effect.Success<typeof make>
>() {
export class Todos extends Effect.Tag('Todos')<Todos, Effect.Effect.Success<typeof make>>() {
static Live = Layer.effect(Todos, make).pipe(Layer.provide(HttpClient.layer))
}

Expand All @@ -74,5 +64,5 @@ export const effect = todosRuntime.rx(Todos.effect)

export const streamIsDone = Rx.make(get => {
const r = get(stream)
return r._tag === "Success" && r.value.done
return r._tag === 'Success' && r.value.done
})
25 changes: 11 additions & 14 deletions dev/worker/client.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import * as Worker from "@effect/platform/Worker"
import * as BrowserWorker from "@effect/platform-browser/BrowserWorker"
import { GetId, InitialMessage, Requests } from "./schema"
import { Array, Context, Effect, Layer } from "effect"
import TestWorker from "./worker?worker"
import { Rx } from "../../src"
import * as Worker from '@effect/platform/Worker'
import * as BrowserWorker from '@effect/platform-browser/BrowserWorker'
import { GetId, InitialMessage, Requests } from './schema'
import { Array, Context, Effect, Layer } from 'effect'
import TestWorker from './worker?worker'
import { Rx } from '../../src'

const makePool = Worker.makePoolSerialized<Requests>({
initialMessage: () => new InitialMessage(),
Expand All @@ -12,12 +12,9 @@ const makePool = Worker.makePoolSerialized<Requests>({
timeToLive: 20000,
concurrency: 5,
targetUtilization: 0.8,
}).pipe(Effect.tap(pool => pool.executeEffect(new GetId({ id: "1" }))))
}).pipe(Effect.tap(pool => pool.executeEffect(new GetId({ id: '1' }))))

export class Pool extends Context.Tag("app/Pool")<
Pool,
Effect.Effect.Success<typeof makePool>
>() {
export class Pool extends Context.Tag('app/Pool')<Pool, Effect.Effect.Success<typeof makePool>>() {
static Live = Layer.scoped(this, makePool).pipe(
Layer.provide(BrowserWorker.layer(() => new TestWorker())),
)
Expand All @@ -28,7 +25,7 @@ export class Pool extends Context.Tag("app/Pool")<
const runtime = Rx.runtime(Pool.Live)

export const getIdRx = runtime.fn((id: string) => {
console.log("getIdRx", id)
console.log('getIdRx', id)
return Pool.pipe(
Effect.flatMap(pool =>
Effect.forEach(
Expand All @@ -37,11 +34,11 @@ export const getIdRx = runtime.fn((id: string) => {
pool.executeEffect(new GetId({ id: id.toString() })).pipe(
Effect.tap(Effect.log),
Effect.annotateLogs({
rx: "getIdRx",
rx: 'getIdRx',
id,
}),
),
{ concurrency: "unbounded" },
{ concurrency: 'unbounded' },
),
),
)
Expand Down
29 changes: 11 additions & 18 deletions dev/worker/schema.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
import * as Schema from '@effect/schema/Schema'

import * as Schema from "@effect/schema/Schema"
export class InitialMessage extends Schema.TaggedRequest<InitialMessage>()('InitialMessage', {
failure: Schema.Never,
success: Schema.Void,
payload: {},
}) {}

export class InitialMessage extends Schema.TaggedRequest<InitialMessage>()(
"InitialMessage",
{
failure: Schema.Never,
success: Schema.Void,
payload: {}
}
) {}

export class GetId extends Schema.TaggedRequest<GetId>()(
"GetId",
{
failure: Schema.Never,
success: Schema.String,
payload: { id: Schema.String },
}
) {}
export class GetId extends Schema.TaggedRequest<GetId>()('GetId', {
failure: Schema.Never,
success: Schema.String,
payload: { id: Schema.String },
}) {}

export const Requests = Schema.Union(InitialMessage, GetId)
export type Requests = Schema.Schema.Type<typeof Requests>
10 changes: 5 additions & 5 deletions dev/worker/worker.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
/* eslint-disable require-yield */
import * as Runner from "@effect/platform/WorkerRunner"
import * as BrowserRunner from "@effect/platform-browser/BrowserWorkerRunner"
import { Requests } from "./schema"
import { Effect, Layer } from "effect"
import * as Runner from '@effect/platform/WorkerRunner'
import * as BrowserRunner from '@effect/platform-browser/BrowserWorkerRunner'
import { Requests } from './schema'
import { Effect, Layer } from 'effect'

Runner.layerSerialized(Requests, {
InitialMessage: () =>
Effect.gen(function* () {
console.log("Hello from worker")
console.log('Hello from worker')
}),
GetId: ({ id }) => Effect.succeed(id).pipe(Effect.delay(1000)),
}).pipe(Layer.provide(BrowserRunner.layer), Layer.launch, Effect.runPromise)
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
"lint": "concurrently pnpm:lint:*",
"lint:code": "eslint --ignore-path .gitignore --max-warnings 0 src/**/*.{js,ts,tsx,jsx}",
"lint:types": "tsc --noEmit",
"update-deps": "pnpm up -Li"
"update-deps": "pnpm up -Li",
"postinstall": "pnpm build"
},
"peerDependencies": {
"solid-js": "^1.6.0"
Expand Down
151 changes: 81 additions & 70 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,91 +1,101 @@
import { createContext, useContext, createSignal, onCleanup, createEffect, Accessor, createResource, Resource, createMemo } from 'solid-js';
import * as Registry from "@effect-rx/rx/Registry";
import * as Rx from "@effect-rx/rx/Rx";
import type * as RxRef from "@effect-rx/rx/RxRef";
import { globalValue } from "effect/GlobalValue"
import * as Result from '@effect-rx/rx/Result';
import {
createContext,
useContext,
createSignal,
onCleanup,
createEffect,
Accessor,
createResource,
Resource,
createMemo,
} from 'solid-js'
import * as Registry from '@effect-rx/rx/Registry'
import * as Rx from '@effect-rx/rx/Rx'
import type * as RxRef from '@effect-rx/rx/RxRef'
import { globalValue } from 'effect/GlobalValue'
import * as Result from '@effect-rx/rx/Result'

// Re-exporting for easy access
export * as Registry from "@effect-rx/rx/Registry";
export * as Result from "@effect-rx/rx/Result";
export * as Rx from "@effect-rx/rx/Rx";
export * as RxRef from "@effect-rx/rx/RxRef";
import * as Scheduler from "scheduler"
export * as Registry from '@effect-rx/rx/Registry'
export * as Result from '@effect-rx/rx/Result'
export * as Rx from '@effect-rx/rx/Rx'
export * as RxRef from '@effect-rx/rx/RxRef'
import * as Scheduler from 'scheduler'

// Context for Registry
export const RegistryContext = createContext<Registry.Registry>();
export const RegistryContext = createContext<Registry.Registry>()

function scheduleTask(f: () => void): void {
Scheduler.unstable_scheduleCallback(Scheduler.unstable_LowPriority, f)
}
// Default registry using a global value as fallback
export const defaultRegistry: Registry.Registry = globalValue(
"@effect-rx/solid/defaultRegistry",
() => Registry.make({
scheduleTask,
defaultIdleTTL: 400
})
);
'@effect-rx/solid/defaultRegistry',
() =>
Registry.make({
scheduleTask,
defaultIdleTTL: 400,
}),
)

// Function to inject the registry, providing a default if not present in context
export const injectRegistry = (): Registry.Registry => {
const registry = useContext(RegistryContext);
const registry = useContext(RegistryContext)
if (!registry) {
throw new Error("No registry found");
throw new Error('No registry found')
}
return registry;
};

return registry
}

// Hook to use an Rx.Writable, similar to useRx in Vue
export const useRx = <R, W>(rx: Rx.Writable<R, W>): [Accessor<R>, (newValue: W) => void] => {
const registry = injectRegistry();
const [value, setValue] = createSignal<R>(registry.get(rx));
const registry = injectRegistry()
const [value, setValue] = createSignal<R>(registry.get(rx))

createEffect(() => {
const cancel = registry.subscribe(rx, setValue as (newValue: R) => void);
onCleanup(cancel);
});
const cancel = registry.subscribe(rx, setValue as (newValue: R) => void)
onCleanup(cancel)
})

const set = (newValue: W) => registry.set(rx, newValue);
const set = (newValue: W) => registry.set(rx, newValue)

return [value, set];
};
return [value, set]
}

export const useRxValue = <A>(rx: Rx.Rx<A>): Accessor<A> => {
const registry = injectRegistry();
const [value, setValue] = createSignal<A>(registry.get(rx));
const registry = injectRegistry()
const [value, setValue] = createSignal<A>(registry.get(rx))

createEffect(() => {
const cancel = registry.subscribe(rx, setValue as (newValue: A) => void);
onCleanup(cancel);
});
const cancel = registry.subscribe(rx, setValue as (newValue: A) => void)
onCleanup(cancel)
})

return value;
};
return value
}

// Hook to set values on an Rx.Writable
export const useRxSet = <R, W>(rx: Rx.Writable<R, W>): (newValue: W) => void => {
const registry = injectRegistry();
export const useRxSet = <R, W>(rx: Rx.Writable<R, W>): ((newValue: W) => void) => {
const registry = injectRegistry()
createEffect(() => {
const cancel = registry.mount(rx);
onCleanup(cancel);
});
const cancel = registry.mount(rx)
onCleanup(cancel)
})

return (newValue: W) => registry.set(rx, newValue);
};
return (newValue: W) => registry.set(rx, newValue)
}

// Hook to use an RxRef
export const useRxRef = <A>(rxRef: RxRef.ReadonlyRef<A>): Accessor<A> => {
const [value, setValue] = createSignal<A>(rxRef.value);
const [value, setValue] = createSignal<A>(rxRef.value)

createEffect(() => {
const cancel = rxRef.subscribe(setValue as (newValue: A) => void);
onCleanup(cancel);
});
const cancel = rxRef.subscribe(setValue as (newValue: A) => void)
onCleanup(cancel)
})

return value;
};
return value
}

// Suspense implementation using createResource
// type SuspenseResult<A, E> = Result.Success<A, E> | Result.Failure<A, E>;
Expand Down Expand Up @@ -186,29 +196,30 @@ export const useRxRef = <A>(rxRef: RxRef.ReadonlyRef<A>): Accessor<A> => {
// return () => result().result as Result.Success<A, E> | Result.Failure<A, E>
// }

export const useRxSuspense = <A, E>(
rx: Rx.Rx<Result.Result<A, E>>,
suspendOnWaiting?: boolean
) => {
const registry = injectRegistry();
const [state, {mutate}] = createResource(() => {
return new Promise<Result.Result<A,E>>((resolve) => {
const unsubscribe = registry.subscribe(rx, (result) => {
if (result._tag !== "Initial" && (!suspendOnWaiting || !result.waiting)) {
resolve(result);
unsubscribe();
}
}, { immediate: true });
});
});
export const useRxSuspense = <A, E>(rx: Rx.Rx<Result.Result<A, E>>, suspendOnWaiting?: boolean) => {
const registry = injectRegistry()
const [state, { mutate }] = createResource(() => {
return new Promise<Result.Result<A, E>>(resolve => {
const unsubscribe = registry.subscribe(
rx,
result => {
if (result._tag !== 'Initial' && (!suspendOnWaiting || !result.waiting)) {
resolve(result)
unsubscribe()
}
},
{ immediate: true },
)
})
})

createEffect(() => {
const cancel = registry.subscribe(rx, mutate);
onCleanup(cancel);
const cancel = registry.subscribe(rx, mutate)
onCleanup(cancel)
})

return state;
};
return state
}

// function createSuspenseResource<A, E>(
// registry: Registry.Registry,
Expand Down

0 comments on commit d03e9c9

Please sign in to comment.