Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for pushing down sort order. Closes #447 #448

Merged
merged 7 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## v1.11.0 [tbd]
Add support for running plugins in-process. ([#383](https://github.com/turbot/steampipe-postgres-fdw/issues/383))
Fixes issue where the install script fails if pg_config in not in users path. ([#404](https://github.com/turbot/steampipe-postgres-fdw/issues/404))
* Update Steampipe timing output to show all scans for all connections. ([#439](https://github.com/turbot/steampipe-postgres-fdw/issues/439))
* Add support for running plugins in-process. ([#383](https://github.com/turbot/steampipe-postgres-fdw/issues/383))
* Fixes issue where the install script fails if pg_config in not in users path. ([#404](https://github.com/turbot/steampipe-postgres-fdw/issues/404))

## v1.10.0 [2024-03-04]
_Whats new_
Expand Down
84 changes: 83 additions & 1 deletion fdw.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ package main
#cgo linux LDFLAGS: -Wl,-unresolved-symbols=ignore-all
#cgo darwin LDFLAGS: -Wl,-undefined,dynamic_lookup
#include "fdw_helpers.h"

#include "utils/rel.h"
#include "nodes/pg_list.h"
#include "utils/timestamp.h"

static Name deserializeDeparsedSortListCell(ListCell *lc);

*/
import "C"

Expand All @@ -18,6 +22,7 @@ import (
"unsafe"

"github.com/hashicorp/go-hclog"
"github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
"github.com/turbot/steampipe-plugin-sdk/v5/logging"
"github.com/turbot/steampipe-plugin-sdk/v5/sperr"
"github.com/turbot/steampipe-postgres-fdw/hub"
Expand Down Expand Up @@ -65,6 +70,63 @@ func init() {

}

//export goLog
func goLog(msg *C.char) {
log.Println("[WARN] " + C.GoString(msg))
}

// Given a list of FdwDeparsedSortGroup and a FdwPlanState,
// construct a list FdwDeparsedSortGroup that can be pushed down
//
//export goFdwCanSort
func goFdwCanSort(deparsed *C.List, planstate *C.FdwPlanState) *C.List {
log.Println("[WARN] goFdwCanSort deparsed", deparsed)
// This will be the list of FdwDeparsedSortGroup items that can be pushed down
var pushDownList *C.List = nil

// Iterate over the deparsed list
if deparsed == nil {
return pushDownList
}

// Convert the sortable fields into a lookup
sortableFields := getSortableFields(planstate.foreigntableid)
if len(sortableFields) == 0 {
return pushDownList
}

for it := C.list_head(deparsed); it != nil; it = C.lnext(deparsed, it) {
deparsedSortGroup := C.cellGetFdwDeparsedSortGroup(it)
columnName := C.GoString(C.nameStr(deparsedSortGroup.attname))

supportedOrder := sortableFields[columnName]
requiredOrder := proto.SortOrder_Asc
if deparsedSortGroup.reversed {
requiredOrder = proto.SortOrder_Desc
}
log.Println("[INFO] goFdwCanSort column", columnName, "supportedOrder", supportedOrder, "requiredOrder", requiredOrder)

if supportedOrder == requiredOrder || supportedOrder == proto.SortOrder_All {
log.Printf("[INFO] goFdwCanSort column %s can be pushed down", columnName)
// add deparsedSortGroup to pushDownList
pushDownList = C.lappend(pushDownList, unsafe.Pointer(deparsedSortGroup))
} else {
log.Printf("[INFO] goFdwCanSort column %s CANNOT be pushed down", columnName)
}
}

return pushDownList
}

func getSortableFields(foreigntableid C.Oid) map[string]proto.SortOrder {
opts := GetFTableOptions(types.Oid(foreigntableid))
connection := GetSchemaNameFromForeignTableId(types.Oid(foreigntableid))

tableName := opts["table"]
pluginHub := hub.GetHub()
return pluginHub.GetSortableFields(tableName, connection)
}

//export goFdwGetRelSize
func goFdwGetRelSize(state *C.FdwPlanState, root *C.PlannerInfo, rows *C.double, width *C.int, baserel *C.RelOptInfo) {
logging.ClearProfileData()
Expand Down Expand Up @@ -242,8 +304,9 @@ func goFdwBeginForeignScan(node *C.ForeignScanState, eflags C.int) {
}
// if we are NOT explaining, create an iterator to scan for us
if !explain {
var sortOrder = getSortColumns(execState)
ts := int64(C.GetSQLCurrentTimestamp(0))
iter, err := pluginHub.GetIterator(columns, quals, unhandledRestrictions, int64(execState.limit), opts, ts)
iter, err := pluginHub.GetIterator(columns, quals, unhandledRestrictions, int64(execState.limit), sortOrder, ts, opts)
if err != nil {
log.Printf("[WARN] pluginHub.GetIterator FAILED: %s", err)
FdwError(err)
Expand All @@ -258,6 +321,25 @@ func goFdwBeginForeignScan(node *C.ForeignScanState, eflags C.int) {
logging.LogTime("[fdw] BeginForeignScan end")
}

func getSortColumns(state *C.FdwExecState) []*proto.SortColumn {
sortGroups := state.pathkeys
var res []*proto.SortColumn
for it := C.list_head(sortGroups); it != nil; it = C.lnext(sortGroups, it) {
deparsedSortGroup := C.cellGetFdwDeparsedSortGroup(it)
columnName := C.GoString(C.nameStr(deparsedSortGroup.attname))
requiredOrder := proto.SortOrder_Asc
if deparsedSortGroup.reversed {
requiredOrder = proto.SortOrder_Desc
}

res = append(res, &proto.SortColumn{
Column: columnName,
Order: requiredOrder,
})
}
return res
}

//export goFdwIterateForeignScan
func goFdwIterateForeignScan(node *C.ForeignScanState) *C.TupleTableSlot {
defer func() {
Expand Down
2 changes: 2 additions & 0 deletions fdw/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ typedef struct FdwDeparsedSortGroup
PathKey *key;
} FdwDeparsedSortGroup;

static inline FdwDeparsedSortGroup *cellGetFdwDeparsedSortGroup(ListCell *n) { return (FdwDeparsedSortGroup *)n->ptr_value; }

// datum.c
char *datumString(Datum datum, ConversionInfo *cinfo);
int64 datumInt16(Datum datum, ConversionInfo *cinfo);
Expand Down
1 change: 1 addition & 0 deletions fdw/fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ static void fdwGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid forei
NULL,
NULL));


/* Handle sort pushdown */
if (root->query_pathkeys)
{
Expand Down
2 changes: 2 additions & 0 deletions fdw/fdw_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ static inline BooleanTest *cellGetBooleanTest(ListCell *n) { return (BooleanTest
static inline BoolExpr *cellGetBoolExpr(ListCell *n) { return (BoolExpr *)n->ptr_value; }

static inline RestrictInfo *cellGetRestrictInfo(ListCell *n) { return (RestrictInfo *)n->ptr_value; }
static inline char *nameStr(Name n) { return NameStr(*n); }


// logging
char *tagTypeToString(NodeTag type);
9 changes: 4 additions & 5 deletions fdw/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -408,12 +408,11 @@ void computeDeparsedSortGroup(List *deparsed, FdwPlanState *planstate,
Assert(*deparsed_pathkeys == NIL);

/* Don't ask FDW if nothing to sort */
if (deparsed == NIL)
if (deparsed == NIL){
return;
}

// TODO - Fdw doesn't support this yet
// sortable_fields = canSort(planstate, deparsed);
sortable_fields = NIL;
sortable_fields = goFdwCanSort(deparsed,planstate);

/* Don't go further if FDW can't enforce any sort */
if (sortable_fields == NIL)
Expand Down Expand Up @@ -546,7 +545,7 @@ findPaths(PlannerInfo *root, RelOptInfo *baserel, List *possiblePaths,

/*
* Deparse a list of PathKey and return a list of FdwDeparsedSortGroup.
* This function will return data iif all the PathKey belong to the current
* This function will return data if all the PathKey belong to the current
* foreign table.
*/
List *
Expand Down
16 changes: 8 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ require (
github.com/hashicorp/go-hclog v1.6.3
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/turbot/go-kit v0.10.0-rc.0
//main (join_timing)
github.com/turbot/steampipe v1.7.0-rc.0.0.20240417091217-07782a2b1346
github.com/turbot/steampipe-plugin-sdk/v5 v5.10.0
go.opentelemetry.io/otel v1.24.0
//main
github.com/turbot/steampipe v1.7.0-rc.0.0.20240417105518-bd915693c2ea
github.com/turbot/steampipe-plugin-sdk/v5 v5.11.0-alpha.0
go.opentelemetry.io/otel v1.25.0
google.golang.org/protobuf v1.33.0
)

require (
github.com/Masterminds/semver/v3 v3.2.1
go.opentelemetry.io/otel/metric v1.24.0
go.opentelemetry.io/otel/metric v1.25.0
)

require (
Expand Down Expand Up @@ -168,9 +168,9 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.24.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 // indirect
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.opentelemetry.io/otel/sdk v1.25.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.25.0 // indirect
go.opentelemetry.io/otel/trace v1.25.0 // indirect
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
Expand Down
28 changes: 14 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -709,10 +709,10 @@ github.com/turbot/go-kit v0.10.0-rc.0 h1:kd+jp2ibbIV33Hc8SsMAN410Dl9Pz6SJ40axbKU
github.com/turbot/go-kit v0.10.0-rc.0/go.mod h1:fFQqR59I5z5JeeBLfK1PjSifn4Oprs3NiQx0CxeSJxs=
github.com/turbot/pipe-fittings v1.1.1 h1:W1F/O3tWaR2W9HTnFskJS5sLHpZXwOTbhTtDorIw744=
github.com/turbot/pipe-fittings v1.1.1/go.mod h1:Cgy232VEhVjtDibJS8v5Zf4lKQnfZJOBtsuUNZ7MzTc=
github.com/turbot/steampipe v1.7.0-rc.0.0.20240417091217-07782a2b1346 h1:MxpsEaeo+9BlEbkC14hIFexI2eo7lY++xhEQpqLz4SI=
github.com/turbot/steampipe v1.7.0-rc.0.0.20240417091217-07782a2b1346/go.mod h1:If2VvE6PYUVhP6HDenst19E//vyniNLCxafj8OHFH5E=
github.com/turbot/steampipe-plugin-sdk/v5 v5.10.0 h1:2g2rPRzY3N5+94yjUW2jnbFm9DGGsUx3d77tjqgDT4M=
github.com/turbot/steampipe-plugin-sdk/v5 v5.10.0/go.mod h1:DJ9gPbPzmCe4M2sp+KzCmOhFuucl5/6hXnXvFvS/9nQ=
github.com/turbot/steampipe v1.7.0-rc.0.0.20240417105518-bd915693c2ea h1:wb2sAxZ+haYdUSM5wJuYRGPMr0wC2vu1voHyH8CfLHo=
github.com/turbot/steampipe v1.7.0-rc.0.0.20240417105518-bd915693c2ea/go.mod h1:If2VvE6PYUVhP6HDenst19E//vyniNLCxafj8OHFH5E=
github.com/turbot/steampipe-plugin-sdk/v5 v5.11.0-alpha.0 h1:lABwRQmtTFgsHxzzeA5PSEmtu3JVqfSo2JBZsQ6NLGw=
github.com/turbot/steampipe-plugin-sdk/v5 v5.11.0-alpha.0/go.mod h1:a6f869uShOJDiU4p6fHnSGE9xVTJF1ZyCGf6k0CR31Q=
github.com/turbot/terraform-components v0.0.0-20231213122222-1f3526cab7a7 h1:qDMxFVd8Zo0rIhnEBdCIbR+T6WgjwkxpFZMN8zZmmjg=
github.com/turbot/terraform-components v0.0.0-20231213122222-1f3526cab7a7/go.mod h1:5hzpfalEjfcJWp9yq75/EZoEu2Mzm34eJAPm3HOW2tw=
github.com/ulikunitz/xz v0.5.10 h1:t92gobL9l3HE202wg3rlk19F6X+JOxl9BBrCCMYEYd8=
Expand Down Expand Up @@ -746,22 +746,22 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.4
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0/go.mod h1:r9vWsPS/3AQItv3OSlEJ/E4mbrhUbbw18meOjArPtKQ=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 h1:sv9kVfal0MK0wBMCOGr+HeJm9v803BkJxGrk2au7j08=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0/go.mod h1:SK2UL73Zy1quvRPonmOmRDiWk1KBV3LyIeeIxcEApWw=
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo=
go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k=
go.opentelemetry.io/otel v1.25.0/go.mod h1:Wa2ds5NOXEMkCmUou1WA7ZBfLTHWIsp034OVD7AO+Vg=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.24.0 h1:f2jriWfOdldanBwS9jNBdeOKAQN7b4ugAMaNu1/1k9g=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.24.0/go.mod h1:B+bcQI1yTY+N0vqMpoZbEN7+XU4tNM0DmUiOwebFJWI=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 h1:t6wl9SPayj+c7lEIFgm4ooDBZVb01IhLB4InpomhRw8=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0/go.mod h1:iSDOcsnSA5INXzZtwaBPrKp/lWu/V14Dd+llD0oI2EA=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 h1:Mw5xcxMwlqoJd97vwPxA8isEaIoxsta9/Q51+TTJLGE=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0/go.mod h1:CQNu9bj7o7mC6U7+CA/schKEYakYXWr79ucDHTMGhCM=
go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI=
go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco=
go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw=
go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg=
go.opentelemetry.io/otel/sdk/metric v1.24.0 h1:yyMQrPzF+k88/DbH7o4FMAs80puqd+9osbiBrJrz/w8=
go.opentelemetry.io/otel/sdk/metric v1.24.0/go.mod h1:I6Y5FjH6rvEnTTAYQz3Mmv2kl6Ek5IIrmwTLqMrrOE0=
go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI=
go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU=
go.opentelemetry.io/otel/metric v1.25.0 h1:LUKbS7ArpFL/I2jJHdJcqMGxkRdxpPHE0VU/D4NuEwA=
go.opentelemetry.io/otel/metric v1.25.0/go.mod h1:rkDLUSd2lC5lq2dFNrX9LGAbINP5B7WBkC78RXCpH5s=
go.opentelemetry.io/otel/sdk v1.25.0 h1:PDryEJPC8YJZQSyLY5eqLeafHtG+X7FWnf3aXMtxbqo=
go.opentelemetry.io/otel/sdk v1.25.0/go.mod h1:oFgzCM2zdsxKzz6zwpTZYLLQsFwc+K0daArPdIhuxkw=
go.opentelemetry.io/otel/sdk/metric v1.25.0 h1:7CiHOy08LbrxMAp4vWpbiPcklunUshVpAvGBrdDRlGw=
go.opentelemetry.io/otel/sdk/metric v1.25.0/go.mod h1:LzwoKptdbBBdYfvtGCzGwk6GWMA3aUzBOwtQpR6Nz7o=
go.opentelemetry.io/otel/trace v1.25.0 h1:tqukZGLwQYRIFtSQM2u2+yfMVTgGVeqRLPUYx1Dq6RM=
go.opentelemetry.io/otel/trace v1.25.0/go.mod h1:hCCs70XM/ljO+BeQkyFnbK28SBIJ/Emuha+ccrCRT7I=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.opentelemetry.io/proto/otlp v1.1.0 h1:2Di21piLrCqJ3U3eXGCTPHE9R8Nh+0uglSnOyxikMeI=
go.opentelemetry.io/proto/otlp v1.1.0/go.mod h1:GpBHCBWiqvVLDqmHZsoMM3C5ySeKTC7ej/RNTae6MdY=
Expand Down
3 changes: 2 additions & 1 deletion hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type Hub interface {
GetConnectionConfigByName(string) *proto.ConnectionConfig
LoadConnectionConfig() (bool, error)
GetSchema(remoteSchema string, localSchema string) (*proto.Schema, error)
GetIterator(columns []string, quals *proto.Quals, unhandledRestrictions int, limit int64, opts types.Options, queryTimestamp int64) (Iterator, error)
GetIterator(columns []string, quals *proto.Quals, unhandledRestrictions int, limit int64, sortOrder []*proto.SortColumn, queryTimestamp int64, opts types.Options) (Iterator, error)
GetRelSize(columns []string, quals []*proto.Qual, opts types.Options) (types.RelSize, error)
GetPathKeys(opts types.Options) ([]types.PathKey, error)
Explain(columns []string, quals []*proto.Qual, sortKeys []string, verbose bool, opts types.Options) ([]string, error)
Expand All @@ -30,4 +30,5 @@ type Hub interface {
ValidateCacheCommand(command string) error
cacheTTL(name string) time.Duration
cacheEnabled(name string) bool
GetSortableFields(table, connection string) map[string]proto.SortOrder
}
5 changes: 5 additions & 0 deletions hub/hub_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,3 +543,8 @@ func (h *hubBase) cacheTTL(connectionName string) time.Duration {
log.Printf("[INFO] cacheTTL 5")
return ttl
}

// GetSortableFields
func (h *hubBase) GetSortableFields(tableName, connectionName string) map[string]proto.SortOrder {
return nil
}
12 changes: 6 additions & 6 deletions hub/hub_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (l *HubLocal) GetSchema(_, connectionName string) (*proto.Schema, error) {
return res.GetSchema(), nil
}

func (l *HubLocal) GetIterator(columns []string, quals *proto.Quals, unhandledRestrictions int, limit int64, opts types.Options, queryTimestamp int64) (Iterator, error) {
func (l *HubLocal) GetIterator(columns []string, quals *proto.Quals, unhandledRestrictions int, limit int64, sortOrder []*proto.SortColumn, queryTimestamp int64, opts types.Options) (Iterator, error) {
logging.LogTime("GetIterator start")
qualMap, err := buildQualMap(quals)
connectionName := opts["connection"]
Expand All @@ -121,7 +121,7 @@ func (l *HubLocal) GetIterator(columns []string, quals *proto.Quals, unhandledRe

// create a span for this scan
scanTraceCtx := l.traceContextForScan(table, columns, limit, qualMap, connectionName)
iterator, err := l.startScanForConnection(connectionName, table, qualMap, unhandledRestrictions, columns, limit, scanTraceCtx)
iterator, err := l.startScanForConnection(connectionName, table, qualMap, unhandledRestrictions, columns, limit, sortOrder, queryTimestamp, scanTraceCtx)

if err != nil {
log.Printf("[TRACE] RemoteHub GetIterator() failed :( %s", err)
Expand All @@ -144,8 +144,8 @@ func (l *HubLocal) GetPathKeys(opts types.Options) ([]types.PathKey, error) {

}

func (h *HubLocal) GetConnectionConfigByName(name string) *proto.ConnectionConfig {
return h.connections[name]
func (l *HubLocal) GetConnectionConfigByName(name string) *proto.ConnectionConfig {
return l.connections[name]
}

func (l *HubLocal) ProcessImportForeignSchemaOptions(opts types.Options, connection string) error {
Expand All @@ -165,7 +165,7 @@ func (l *HubLocal) ProcessImportForeignSchemaOptions(opts types.Options, connect
}

// startScanForConnection starts a scan for a single connection, using a scanIterator or a legacyScanIterator
func (l *HubLocal) startScanForConnection(connectionName string, table string, qualMap map[string]*proto.Quals, unhandledRestrictions int, columns []string, limit int64, scanTraceCtx *telemetry.TraceCtx) (_ Iterator, err error) {
func (l *HubLocal) startScanForConnection(connectionName string, table string, qualMap map[string]*proto.Quals, unhandledRestrictions int, columns []string, limit int64, sortOrder []*proto.SortColumn, queryTimestamp int64, scanTraceCtx *telemetry.TraceCtx) (_ Iterator, err error) {
defer func() {
if err != nil {
// close the span in case of errir
Expand Down Expand Up @@ -197,7 +197,7 @@ func (l *HubLocal) startScanForConnection(connectionName string, table string, q
}

log.Printf("[TRACE] startScanForConnection creating a new scan iterator")
iterator := newScanIteratorLocal(l, connectionName, table, l.pluginName, connectionLimitMap, qualMap, columns, limit, scanTraceCtx)
iterator := newScanIteratorLocal(l, connectionName, table, l.pluginName, connectionLimitMap, qualMap, columns, limit, sortOrder, queryTimestamp, scanTraceCtx)
return iterator, nil
}

Expand Down
Loading
Loading