Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #6869]Fix Beeline SQL querying cannot be stopped immediately by pressing Ctrl-C. #6879

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ class BeeLineOpts implements Completer {

private String delimiter = DEFAULT_DELIMITER;

private boolean cancelImmediate = false;

@Retention(RetentionPolicy.RUNTIME)
public @interface Ignore {
// marker annotations for functions that Reflector should ignore / pretend it does not exist
Expand Down Expand Up @@ -648,4 +650,12 @@ public static Env getEnv() {
public static void setEnv(Env envToUse) {
env = envToUse;
}

public boolean getCancelImmediate() {
return cancelImmediate;
}

public void setCancelImmediate(boolean cancelImmediate) {
this.cancelImmediate = cancelImmediate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,35 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.kyuubi.jdbc.hive.KyuubiStatement;

/** Rows implementation which buffers all rows */
class BufferedRows extends Rows {
private final List<Row> list;
private final Iterator<Row> iterator;
private int columnCount;
private int maxColumnWidth;
private boolean cancelImmediate;

BufferedRows(BeeLine beeLine, ResultSet rs) throws SQLException {
this(beeLine, rs, Optional.<Integer>absent());
}

BufferedRows(BeeLine beeLine, ResultSet rs, Optional<Integer> limit) throws SQLException {
super(beeLine, rs);
cancelImmediate = beeLine.getOpts().getCancelImmediate();
list = new ArrayList<Row>();
columnCount = rsMeta.getColumnCount();
list.add(new Row(columnCount));

int numRowsBuffered = 0;
int maxRowsBuffered = limit.or(Integer.MAX_VALUE);

while (numRowsBuffered++ < maxRowsBuffered && rs.next()) {
while (numRowsBuffered++ < maxRowsBuffered
&& !((rs.getStatement() instanceof KyuubiStatement)
&& ((KyuubiStatement) rs.getStatement()).getIsCancelled()
&& cancelImmediate)
&& rs.next()) {
this.list.add(new Row(columnCount, rs));
}

Expand Down
6 changes: 6 additions & 0 deletions kyuubi-hive-beeline/src/main/resources/BeeLine.properties
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ Options:\n\
\ Only applicable if --outputformat=table.\n\n\
\ --incrementalBufferRows=NUMROWS The number of rows to buffer when printing rows on stdout,\n\
\ defaults to 1000; only applicable if --incremental=true\n\
\ --cancelImmediate=[true|false] Defaults to false.When set to false, canceling by Ctrl-C will wait \n\
\ until buffer rows is full.There is two sisititon to set it true; \n\
\ Sitisiton 1: incrementalBufferRows is too large but you want to cancel job \n\
\ immediate.\n\
\ Sitisiton 2: Runing flink sql streaming (you do not want to wait \n\
\ until `kyuubi.session.engine.flink.max.rows` is reached.).\n\
\ and --outputformat=table.\n\n\
\ --truncateTable=[true|false] Truncate table column when it exceeds length.\n\
\ --delimiterForDSV=DELIMITER Specify the delimiter for delimiter-separated values output format (default: |).\n\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import org.apache.kyuubi.jdbc.hive.KyuubiStatement;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
Expand Down Expand Up @@ -52,7 +53,7 @@ public class TestBufferedRows {

@Test
public void testNormalizeWidths() throws SQLException {
setupMockData();
setupMockData(false);

BufferedRows bfRows = new BufferedRows(mockBeeline, mockResultSet);
bfRows.normalizeWidths();
Expand All @@ -64,14 +65,27 @@ public void testNormalizeWidths() throws SQLException {
}
}

private void setupMockData() throws SQLException {
@Test
public void testCancel() throws SQLException {
setupMockData(true);

BufferedRows bfRows = new BufferedRows(mockBeeline, mockResultSet);
// skip scheam row
bfRows.next();
while (bfRows.hasNext()) {
throw new SQLException("Should not fetch any data when cancel is called");
}
}

private void setupMockData(boolean cancelStatement) throws SQLException {
// Mock BeeLine
mockBeeline = mock(BeeLine.class);
// Mock BeeLineOpts
mockBeeLineOpts = mock(BeeLineOpts.class);
when(mockBeeLineOpts.getMaxColumnWidth()).thenReturn(BeeLineOpts.DEFAULT_MAX_COLUMN_WIDTH);
when(mockBeeLineOpts.getNumberFormat()).thenReturn("default");
when(mockBeeLineOpts.getNullString()).thenReturn("NULL");
when(mockBeeLineOpts.getCancelImmediate()).thenReturn(cancelStatement);
when(mockBeeline.getOpts()).thenReturn(mockBeeLineOpts);

// MockResultSet
Expand All @@ -83,6 +97,10 @@ private void setupMockData() throws SQLException {
when(mockResultSetMetaData.getColumnLabel(2)).thenReturn("Value");
when(mockResultSet.getMetaData()).thenReturn(mockResultSetMetaData);

KyuubiStatement mockKyuubiStatement = mock(KyuubiStatement.class);
when(mockKyuubiStatement.getIsCancelled()).thenReturn(cancelStatement);
when(mockResultSet.getStatement()).thenReturn(mockKyuubiStatement);

mockRow = new MockRow();
// returns true as long as there is more data in mockResultData array
when(mockResultSet.next())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -861,4 +861,8 @@ private void closeResultSet() throws SQLException {
resultSet = null;
}
}

public boolean getIsCancelled() {
return isCancelled;
}
}