Skip to content

Commit

Permalink
Set the query result as not suspended to avoid infinite streams - fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Feb 2, 2021
1 parent 35eaf04 commit 84b1f1d
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/jdbcclient/impl/ConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private <R> void handle(JDBCQueryAction<?, R> action, QueryResultHandler<R> hand
ar.result()
.handle(handler);

promise.complete(true);
promise.complete(false);
} else {
promise.fail(ar.cause());
}
Expand Down
25 changes: 25 additions & 0 deletions src/test/java/io/vertx/jdbcclient/ClientTest.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package io.vertx.jdbcclient;

import io.vertx.core.Future;
import io.vertx.ext.unit.TestContext;
import io.vertx.sqlclient.*;
import org.junit.Test;

import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class ClientTest extends ClientTestBase {
Expand Down Expand Up @@ -97,4 +100,26 @@ private void testTransaction(TestContext testCtx, boolean commit) throws Excepti
}));
}));
}

@Test
public void testStream(TestContext should) {
client.<List<Row>>withTransaction(tx ->
tx.prepare("SELECT CURRENT_DATE AS today, CURRENT_TIME AS now FROM (VALUES(0))")
.map(pS -> pS.createStream(200))
.flatMap(stream -> Future
.future(promise -> {
List<Row> rows = new ArrayList<>();
stream.exceptionHandler(promise::fail);
stream.endHandler(v -> promise.complete(rows));
stream.handler(row -> {
should.assertEquals(0, rows.size());
rows.add(row);
});
})
)
)
.onComplete(should.asyncAssertSuccess(rows -> {
should.assertEquals(1, rows.size());
}));
}
}

0 comments on commit 84b1f1d

Please sign in to comment.