Skip to content

Commit

Permalink
Flush the insert to have immediate effect (#1365)
Browse files Browse the repository at this point in the history
  • Loading branch information
akrambek authored Jan 10, 2025
1 parent 0bc357a commit 24bf92b
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,30 @@ read advised zilla:flush ${pgsql:flushEx()
.build()
.build()}

write zilla:data.ext ${pgsql:dataEx()
.typeId(zilla:id("pgsql"))
.query()
.build()
.build()}
write "FLUSH;"
[0x00]

write flush

read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.completion()
.tag("FLUSH")
.build()
.build()}

read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.ready()
.status("IDLE")
.build()
.build()}

connect "zilla://streams/app1"
option zilla:window 8192
option zilla:transmission "duplex"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,29 @@ write advise zilla:flush ${pgsql:flushEx()
.build()
.build()}

read zilla:data.ext ${pgsql:dataEx()
.typeId(zilla:id("pgsql"))
.query()
.build()
.build()}
read "FLUSH;"
[0x00]

write advise zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.completion()
.tag("FLUSH")
.build()
.build()}

write advise zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.ready()
.status("IDLE")
.build()
.build()}


accepted


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.aklivity.zilla.runtime.binding.risingwave.internal.stream.RisingwaveCompletionCommand;
import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW;

public class RisingwaveCreateZfunctionMacro
public class RisingwaveCreateZfunctionMacro extends RisingwaveMacroBase
{
private static final String ZFUNCTION_NAME = "zfunctions";

Expand All @@ -35,6 +35,8 @@ public RisingwaveCreateZfunctionMacro(
CreateZfunction command,
RisingwaveMacroHandler handler)
{
super(sql, handler);

this.systemSchema = systemSchema;
this.user = user;
this.sql = sql;
Expand Down Expand Up @@ -67,6 +69,43 @@ public void onStarted(
handler.doExecuteSystemClient(traceId, authorization, sqlQuery);
}

@Override
public RisingwaveMacroState onReady(
long traceId,
long authorization,
PgsqlFlushExFW flushEx)
{
FlushState state = new FlushState();
state.onStarted(traceId, authorization);

return state;
}

@Override
public RisingwaveMacroState onError(
long traceId,
long authorization,
PgsqlFlushExFW flushEx)
{
handler.doFlushProxy(traceId, authorization, flushEx);

return errorState();
}
}

private final class FlushState implements RisingwaveMacroState
{
private final String sqlFormat = """
FLUSH;\u0000""";

@Override
public void onStarted(
long traceId,
long authorization)
{
handler.doExecuteSystemClient(traceId, authorization, sqlFormat);
}

@Override
public RisingwaveMacroState onCompletion(
long traceId,
Expand Down

0 comments on commit 24bf92b

Please sign in to comment.