Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PECO-239] Apache Arrow support #94

Merged
merged 20 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
c469b11
Add apache-arrow dependency; upgrade typescript to make apache-arrow …
kravets-levko Nov 22, 2022
70eaad6
[PECO-239] Add basic Apache Arrow support
kravets-levko Nov 22, 2022
006ff16
Merge branch 'main' into PECO-239-apache-arrow-support
kravets-levko Dec 5, 2022
9fb6865
Expose additional options for Arrow feature
kravets-levko Jan 22, 2023
286a874
Add e2e tests
kravets-levko Jan 22, 2023
e6055a5
Merge branch 'main' into PECO-239-apache-arrow-support
kravets-levko Jan 22, 2023
17cda31
Re-add apache-arrow to fix improperly resolved merge conflict
kravets-levko Jan 22, 2023
ce4d461
Fix existing e2e tests
kravets-levko Jan 22, 2023
70ccc5f
Add unit tests
kravets-levko Jan 23, 2023
ef8d529
Enable Arrow by default; use single option to enable native types; ma…
kravets-levko Feb 1, 2023
05af910
Merge branch 'main' into PECO-239-apache-arrow-support
kravets-levko Feb 1, 2023
c2dab38
Add/update tests
kravets-levko Feb 5, 2023
671c0a0
Merge branch 'main' into PECO-239-apache-arrow-support
kravets-levko Feb 5, 2023
ae5ca1f
Add/update e2e tests
kravets-levko Feb 12, 2023
eeeed8a
Merge branch 'main' into PECO-239-apache-arrow-support
kravets-levko Feb 13, 2023
e2143b2
Don't try to convert date/timestamp values to Date because we don't a…
kravets-levko Feb 28, 2023
d07f098
Merge branch 'main' into PECO-239-apache-arrow-support
kravets-levko Mar 10, 2023
2b8a0d6
All Arrow options should be feature flags (aka global config) instead…
kravets-levko Mar 13, 2023
74746aa
Fix Prettier swear at "satisfies" keyword
kravets-levko Mar 13, 2023
2629375
Fix tests due to changes to Arrow config
kravets-levko Mar 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions lib/DBSQLOperation/SchemaHelper.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { TOperationHandle, TGetResultSetMetadataResp, TSparkRowSetType } from '../../thrift/TCLIService_types';
import { TGetResultSetMetadataResp, TOperationHandle, TSparkRowSetType } from '../../thrift/TCLIService_types';
import HiveDriver from '../hive/HiveDriver';
import StatusFactory from '../factory/StatusFactory';
import IOperationResult from '../result/IOperationResult';
import JsonResult from '../result/JsonResult';
import ArrowResult from '../result/ArrowResult';
import HiveDriverError from '../errors/HiveDriverError';
import { definedOrError } from '../utils';

Expand Down Expand Up @@ -40,12 +41,13 @@ export default class SchemaHelper {

async getResultHandler(): Promise<IOperationResult> {
const metadata = await this.fetchMetadata();
const schema = definedOrError(metadata.schema);
const resultFormat = definedOrError(metadata.resultFormat);

switch (resultFormat) {
case TSparkRowSetType.COLUMN_BASED_SET:
return new JsonResult(schema);
return new JsonResult(metadata.schema);
case TSparkRowSetType.ARROW_BASED_SET:
return new ArrowResult(metadata.schema, metadata.arrowSchema);
default:
throw new HiveDriverError(`Unsupported result format: ${TSparkRowSetType[resultFormat]}`);
}
Expand Down
34 changes: 33 additions & 1 deletion lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { stringify, NIL, parse } from 'uuid';
import { TSessionHandle, TStatus, TOperationHandle, TSparkDirectResults } from '../thrift/TCLIService_types';
import {
TSessionHandle,
TStatus,
TOperationHandle,
TSparkDirectResults,
TSparkArrowTypes,
} from '../thrift/TCLIService_types';
import HiveDriver from './hive/HiveDriver';
import { Int64 } from './hive/Types';
import IDBSQLSession, {
Expand All @@ -21,6 +27,7 @@ import StatusFactory from './factory/StatusFactory';
import InfoValue from './dto/InfoValue';
import { definedOrError } from './utils';
import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger';
import globalConfig from './globalConfig';

const defaultMaxRows = 100000;

Expand All @@ -42,6 +49,30 @@ function getDirectResultsOptions(maxRows: number | null = defaultMaxRows) {
};
}

function getArrowOptions(): {
canReadArrowResult: boolean;
useArrowNativeTypes?: TSparkArrowTypes;
andrefurlan-db marked this conversation as resolved.
Show resolved Hide resolved
} {
const { arrowEnabled = true, useArrowNativeTypes = true } = globalConfig;

if (!arrowEnabled) {
return {
canReadArrowResult: false,
};
}

return {
canReadArrowResult: true,
useArrowNativeTypes: {
timestampAsArrow: useArrowNativeTypes,
decimalAsArrow: useArrowNativeTypes,
complexTypesAsArrow: useArrowNativeTypes,
// TODO: currently unsupported by `apache-arrow` (see https://github.com/streamlit/streamlit/issues/4489)
intervalTypesAsArrow: false,
},
};
}

export default class DBSQLSession implements IDBSQLSession {
private driver: HiveDriver;

Expand Down Expand Up @@ -101,6 +132,7 @@ export default class DBSQLSession implements IDBSQLSession {
queryTimeout: options.queryTimeout,
runAsync: options.runAsync || false,
...getDirectResultsOptions(options.maxRows),
...getArrowOptions(),
})
.then((response) => this.createOperation(response));
}
Expand Down
9 changes: 9 additions & 0 deletions lib/globalConfig.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
interface GlobalConfig {
arrowEnabled?: boolean;
useArrowNativeTypes?: boolean;
}

export default {
arrowEnabled: true,
andrefurlan-db marked this conversation as resolved.
Show resolved Hide resolved
useArrowNativeTypes: true,
} satisfies GlobalConfig;
137 changes: 137 additions & 0 deletions lib/result/ArrowResult.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import { Buffer } from 'buffer';
import {
tableFromIPC,
Schema,
Field,
TypeMap,
DataType,
Type,
StructRow,
MapRow,
Vector,
util as arrowUtils,
} from 'apache-arrow';
import { TRowSet, TTableSchema, TColumnDesc } from '../../thrift/TCLIService_types';
import IOperationResult from './IOperationResult';
import { getSchemaColumns, convertThriftValue } from './utils';

const { isArrowBigNumSymbol, bignumToBigInt } = arrowUtils;

type ArrowSchema = Schema<TypeMap>;
type ArrowSchemaField = Field<DataType<Type, TypeMap>>;

export default class ArrowResult implements IOperationResult {
private readonly schema: Array<TColumnDesc>;

private readonly arrowSchema?: Buffer;

constructor(schema?: TTableSchema, arrowSchema?: Buffer) {
this.schema = getSchemaColumns(schema);
this.arrowSchema = arrowSchema;
}

getValue(data?: Array<TRowSet>) {
if (this.schema.length === 0 || !this.arrowSchema || !data) {
return [];
}

const batches = this.getBatches(data);
if (batches.length === 0) {
return [];
}

const table = tableFromIPC<TypeMap>([this.arrowSchema, ...batches]);
return this.getRows(table.schema, table.toArray());
}

private getBatches(data: Array<TRowSet>): Array<Buffer> {
const result: Array<Buffer> = [];

data.forEach((rowSet) => {
rowSet.arrowBatches?.forEach((arrowBatch) => {
if (arrowBatch.batch) {
result.push(arrowBatch.batch);
}
});
});

return result;
}

private getRows(schema: ArrowSchema, rows: Array<StructRow | MapRow>): Array<any> {
return rows.map((row) => {
// First, convert native Arrow values to corresponding plain JS objects
const record = this.convertArrowTypes(row, undefined, schema.fields);
// Second, cast all the values to original Thrift types
return this.convertThriftTypes(record);
});
}

private convertArrowTypes(value: any, valueType: DataType | undefined, fields: Array<ArrowSchemaField> = []): any {
const fieldsMap: Record<string, ArrowSchemaField> = {};
for (const field of fields) {
fieldsMap[field.name] = field;
}

// Convert structures to plain JS object and process all its fields recursively
if (value instanceof StructRow) {
const result = value.toJSON();
for (const key of Object.keys(result)) {
const field: ArrowSchemaField | undefined = fieldsMap[key];
result[key] = this.convertArrowTypes(result[key], field?.type, field?.type.children || []);
}
return result;
}
if (value instanceof MapRow) {
const result = value.toJSON();
// Map type consists of its key and value types. We need only value type here, key will be cast to string anyway

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure? I thought map[any]any was the type definition. So you could have a struct as key, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is for compatibility with column results - maps are json-serialized as plain objects which could have only string keys. we could try to use JS built-in Map class to represent maps, but for column results it probably will have some limitations

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you run any tests with a map using complex data type as key?

Copy link
Contributor Author

@kravets-levko kravets-levko Mar 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned it in Readme in tests/fixtures folder. In short: complex types used as map keys produce malformed json (similar to date/time values), so they could be only handled with Arrow + native types enabled. And even in this case, since JS objects cannot have complex type keys, they will remain stringified. So it will look something like this:

{ 
  '{"s": "hello"}': ..., 
  '{"s": "world"}': ..., 
}

const field = fieldsMap.entries?.type.children.find((item) => item.name === 'value');
for (const key of Object.keys(result)) {
result[key] = this.convertArrowTypes(result[key], field?.type, field?.type.children || []);
}
return result;
}

// Convert lists to JS array and process items recursively
if (value instanceof Vector) {
const result = value.toJSON();
// Array type contains the only child which defines a type of each array's element
const field = fieldsMap.element;
return result.map((item) => this.convertArrowTypes(item, field?.type, field?.type.children || []));
}

if (DataType.isTimestamp(valueType)) {
andrefurlan-db marked this conversation as resolved.
Show resolved Hide resolved
return new Date(value);
}

// Convert big number values to BigInt
// Decimals are also represented as big numbers in Arrow, so additionally process them (convert to float)
if (value instanceof Object && value[isArrowBigNumSymbol]) {
const result = bignumToBigInt(value);
if (DataType.isDecimal(valueType)) {
return Number(result) / 10 ** valueType.scale;
andrefurlan-db marked this conversation as resolved.
Show resolved Hide resolved
}
return result;
}

// Convert binary data to Buffer
if (value instanceof Uint8Array) {
return Buffer.from(value);
}

// Return other values as is
return typeof value === 'bigint' ? Number(value) : value;
}

private convertThriftTypes(record: Record<string, any>): any {
const result: Record<string, any> = {};

this.schema.forEach((column) => {
const typeDescriptor = column.typeDesc.types[0]?.primitiveEntry;
const field = column.columnName;
result[field] = convertThriftValue(typeDescriptor, record[field]);
});

return result;
}
}
82 changes: 8 additions & 74 deletions lib/result/JsonResult.ts
Original file line number Diff line number Diff line change
@@ -1,44 +1,28 @@
import { ColumnCode, ColumnType } from '../hive/Types';
import {
TTypeId,
TRowSet,
TTableSchema,
TColumn,
TColumnDesc,
TPrimitiveTypeEntry,
} from '../../thrift/TCLIService_types';
import { ColumnCode } from '../hive/Types';
import { TRowSet, TTableSchema, TColumn, TColumnDesc } from '../../thrift/TCLIService_types';
import IOperationResult from './IOperationResult';
import { getSchemaColumns, convertThriftValue } from './utils';

export default class JsonResult implements IOperationResult {
private readonly schema?: TTableSchema;
private readonly schema: Array<TColumnDesc>;

constructor(schema?: TTableSchema) {
this.schema = schema;
this.schema = getSchemaColumns(schema);
}

getValue(data?: Array<TRowSet>): Array<object> {
if (!data) {
if (this.schema.length === 0 || !data) {
return [];
}

const descriptors = this.getSchemaColumns();

return data.reduce((result: Array<any>, rowSet: TRowSet) => {
const columns = rowSet.columns || [];
const rows = this.getRows(columns, descriptors);
const rows = this.getRows(columns, this.schema);

return result.concat(rows);
}, []);
}

private getSchemaColumns(): Array<TColumnDesc> {
if (!this.schema) {
return [];
}

return [...this.schema.columns].sort((c1, c2) => c1.position - c2.position);
}

private getRows(columns: Array<TColumn>, descriptors: Array<TColumnDesc>): Array<any> {
return descriptors.reduce(
(rows, descriptor) =>
Expand Down Expand Up @@ -69,67 +53,17 @@ export default class JsonResult implements IOperationResult {
if (columnValue.nulls && this.isNull(columnValue.nulls, i)) {
return null;
}
return this.convertData(typeDescriptor, value);
return convertThriftValue(typeDescriptor, value);
});
}

private convertData(typeDescriptor: TPrimitiveTypeEntry | undefined, value: ColumnType): any {
if (!typeDescriptor) {
return value;
}

switch (typeDescriptor.type) {
case TTypeId.TIMESTAMP_TYPE:
case TTypeId.DATE_TYPE:
case TTypeId.UNION_TYPE:
case TTypeId.USER_DEFINED_TYPE:
return String(value);
case TTypeId.DECIMAL_TYPE:
return Number(value);
case TTypeId.STRUCT_TYPE:
case TTypeId.MAP_TYPE:
return this.toJSON(value, {});
case TTypeId.ARRAY_TYPE:
return this.toJSON(value, []);
case TTypeId.BIGINT_TYPE:
return this.convertBigInt(value);
case TTypeId.NULL_TYPE:
case TTypeId.BINARY_TYPE:
case TTypeId.INTERVAL_YEAR_MONTH_TYPE:
case TTypeId.INTERVAL_DAY_TIME_TYPE:
case TTypeId.FLOAT_TYPE:
case TTypeId.DOUBLE_TYPE:
case TTypeId.INT_TYPE:
case TTypeId.SMALLINT_TYPE:
case TTypeId.TINYINT_TYPE:
case TTypeId.BOOLEAN_TYPE:
case TTypeId.STRING_TYPE:
case TTypeId.CHAR_TYPE:
case TTypeId.VARCHAR_TYPE:
default:
return value;
}
}

private isNull(nulls: Buffer, i: number): boolean {
const byte = nulls[Math.floor(i / 8)];
const ofs = 2 ** (i % 8);

return (byte & ofs) !== 0;
}

private toJSON(value: any, defaultValue: any): any {
try {
return JSON.parse(value);
} catch (e) {
return defaultValue;
}
}

private convertBigInt(value: any): any {
return value.toNumber();
}

private getColumnValue(column: TColumn) {
return (
column[ColumnCode.binaryVal] ||
Expand Down
Loading