Skip to content

Commit

Permalink
[Java] Fix memory management issues in Flight example (#219)
Browse files Browse the repository at this point in the history
Fixes #218.
  • Loading branch information
lidavidm authored Jun 2, 2022
1 parent f8e0a56 commit b15ff49
Showing 1 changed file with 38 additions and 19 deletions.
57 changes: 38 additions & 19 deletions java/source/flight.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Flight Client and Server
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
Expand All @@ -71,7 +72,7 @@ Flight Client and Server
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

class Dataset {
class Dataset implements AutoCloseable {
private final List<ArrowRecordBatch> batches;
private final Schema schema;
private final long rows;
Expand All @@ -89,11 +90,15 @@ Flight Client and Server
public long getRows() {
return rows;
}
@Override
public void close() throws Exception {
AutoCloseables.close(batches);
}
}
class CookbookProducer extends NoOpFlightProducer {
class CookbookProducer extends NoOpFlightProducer implements AutoCloseable {
private final BufferAllocator allocator;
private final Location location;
private final ConcurrentHashMap<FlightDescriptor, Dataset> datasets;
private final ConcurrentMap<FlightDescriptor, Dataset> datasets;
public CookbookProducer(BufferAllocator allocator, Location location) {
this.allocator = allocator;
this.location = location;
Expand All @@ -107,10 +112,9 @@ Flight Client and Server
VectorUnloader unloader;
while (flightStream.next()) {
unloader = new VectorUnloader(flightStream.getRoot());
try (final ArrowRecordBatch arb = unloader.getRecordBatch()) {
batches.add(arb);
rows += flightStream.getRoot().getRowCount();
}
final ArrowRecordBatch arb = unloader.getRecordBatch();
batches.add(arb);
rows += flightStream.getRoot().getRowCount();
}
Dataset dataset = new Dataset(batches, flightStream.getSchema(), rows);
datasets.put(flightStream.getDescriptor(), dataset);
Expand All @@ -125,13 +129,13 @@ Flight Client and Server
Dataset dataset = this.datasets.get(flightDescriptor);
if (dataset == null) {
throw CallStatus.NOT_FOUND.withDescription("Unknown descriptor").toRuntimeException();
} else {
VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(
this.datasets.get(flightDescriptor).getSchema(), allocator);
listener.start(vectorSchemaRoot);
}
try (VectorSchemaRoot root = VectorSchemaRoot.create(
this.datasets.get(flightDescriptor).getSchema(), allocator)) {
VectorLoader loader = new VectorLoader(root);
listener.start(root);
for (ArrowRecordBatch arrowRecordBatch : this.datasets.get(flightDescriptor).getBatches()) {
VectorLoader loader = new VectorLoader(vectorSchemaRoot);
loader.load(arrowRecordBatch.cloneWithTransfer(allocator));
loader.load(arrowRecordBatch);
listener.putNext();
}
listener.completed();
Expand All @@ -143,8 +147,17 @@ Flight Client and Server
FlightDescriptor flightDescriptor = FlightDescriptor.path(
new String(action.getBody(), StandardCharsets.UTF_8));
switch (action.getType()) {
case "DELETE":
if (datasets.remove(flightDescriptor) != null) {
case "DELETE": {
Dataset removed = datasets.remove(flightDescriptor);
if (removed != null) {
try {
removed.close();
} catch (Exception e) {
listener.onError(CallStatus.INTERNAL
.withDescription(e.toString())
.toRuntimeException());
return;
}
Result result = new Result("Delete completed".getBytes(StandardCharsets.UTF_8));
listener.onNext(result);
} else {
Expand All @@ -153,6 +166,7 @@ Flight Client and Server
listener.onNext(result);
}
listener.onCompleted();
}
}
}

Expand All @@ -174,17 +188,22 @@ Flight Client and Server
datasets.forEach((k, v) -> { listener.onNext(getFlightInfo(null, k)); });
listener.onCompleted();
}

@Override
public void close() throws Exception {
AutoCloseables.close(datasets.values());
}
}
Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
try (BufferAllocator allocator = new RootAllocator()){
// Server
try(FlightServer flightServer = FlightServer.builder(allocator, location,
new CookbookProducer(allocator, location)).build()) {
try(final CookbookProducer producer = new CookbookProducer(allocator, location);
final FlightServer flightServer = FlightServer.builder(allocator, location, producer).build()) {
try {
flightServer.start();
System.out.println("S1: Server (Location): Listening on port " + flightServer.getPort());
} catch (IOException e) {
System.exit(1);
throw new RuntimeException(e);
}

// Client
Expand Down Expand Up @@ -534,4 +553,4 @@ Stop Flight Server
C8: Server shut down successfully
_`Arrow Flight RPC`: https://arrow.apache.org/docs/format/Flight.html
_`Arrow Flight RPC`: https://arrow.apache.org/docs/format/Flight.html

0 comments on commit b15ff49

Please sign in to comment.