From 24bf92baa31b01f98cd108e7f2c61684bf7d77e7 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 9 Jan 2025 16:02:02 -0800 Subject: [PATCH] Flush the insert to have immediate effect (#1365) --- .../effective/create.zfunction/client.rpt | 24 +++++++++++ .../effective/create.zfunction/server.rpt | 23 +++++++++++ .../macro/RisingwaveCreateZfunctionMacro.java | 41 ++++++++++++++++++- 3 files changed, 87 insertions(+), 1 deletion(-) diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zfunction/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zfunction/client.rpt index 0aa6276c81..cff99ae1a0 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zfunction/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zfunction/client.rpt @@ -75,6 +75,30 @@ read advised zilla:flush ${pgsql:flushEx() .build() .build()} +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "FLUSH;" + [0x00] + +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("FLUSH") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + connect "zilla://streams/app1" option zilla:window 8192 option zilla:transmission "duplex" diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zfunction/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zfunction/server.rpt index 96690328fd..74534c1d7b 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zfunction/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zfunction/server.rpt @@ -75,6 +75,29 @@ write advise zilla:flush ${pgsql:flushEx() .build() .build()} +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "FLUSH;" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("FLUSH") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + + accepted diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZfunctionMacro.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZfunctionMacro.java index 5e9b629a64..b1871904e7 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZfunctionMacro.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZfunctionMacro.java @@ -18,7 +18,7 @@ import io.aklivity.zilla.runtime.binding.risingwave.internal.stream.RisingwaveCompletionCommand; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW; -public class RisingwaveCreateZfunctionMacro +public class RisingwaveCreateZfunctionMacro extends RisingwaveMacroBase { private static final String ZFUNCTION_NAME = "zfunctions"; @@ -35,6 +35,8 @@ public RisingwaveCreateZfunctionMacro( CreateZfunction command, RisingwaveMacroHandler handler) { + super(sql, handler); + this.systemSchema = systemSchema; this.user = user; this.sql = sql; @@ -67,6 +69,43 @@ public void onStarted( handler.doExecuteSystemClient(traceId, authorization, sqlQuery); } + @Override + public RisingwaveMacroState onReady( + long traceId, + long authorization, + PgsqlFlushExFW flushEx) + { + FlushState state = new FlushState(); + state.onStarted(traceId, authorization); + + return state; + } + + @Override + public RisingwaveMacroState onError( + long traceId, + long authorization, + PgsqlFlushExFW flushEx) + { + handler.doFlushProxy(traceId, authorization, flushEx); + + return errorState(); + } + } + + private final class FlushState implements RisingwaveMacroState + { + private final String sqlFormat = """ + FLUSH;\u0000"""; + + @Override + public void onStarted( + long traceId, + long authorization) + { + handler.doExecuteSystemClient(traceId, authorization, sqlFormat); + } + @Override public RisingwaveMacroState onCompletion( long traceId,