diff --git a/.github/workflows/Http-Gateway-Release.yml b/.github/workflows/Http-Gateway-Release.yml index 79a15ba..c9394b5 100644 --- a/.github/workflows/Http-Gateway-Release.yml +++ b/.github/workflows/Http-Gateway-Release.yml @@ -1,8 +1,8 @@ name: Http-Gateway-Release on: - push: - branches: - - master + release: + types: + - published jobs: release-image: runs-on: ubuntu-latest @@ -30,4 +30,4 @@ jobs: context: . file: ./Dockerfile push: true - tags: code4demo/nebula-http-gateway-cros:latest \ No newline at end of file + tags: vesoft/nebula-http-gateway:3.2.0 \ No newline at end of file diff --git a/.github/workflows/Http-Gateway-nightly.yml b/.github/workflows/Http-Gateway-nightly.yml new file mode 100644 index 0000000..27fc945 --- /dev/null +++ b/.github/workflows/Http-Gateway-nightly.yml @@ -0,0 +1,33 @@ +name: Http-Gateway-nightly +on: + push: + branches: + - master +jobs: + release-image: + runs-on: ubuntu-latest + steps: + - + name: Checkout Github Action + uses: actions/checkout@master + + - + name: Set up QEMU + uses: docker/setup-qemu-action@v1 + - + name: Set up Docker Buildx + uses: docker/setup-buildx-action@v1 + - + name: Login to DockerHub + uses: docker/login-action@v1 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + - + name: Build and push + uses: docker/build-push-action@v2 + with: + context: . + file: ./Dockerfile + push: true + tags: vesoft/nebula-http-gateway:nightly \ No newline at end of file diff --git a/README.md b/README.md index 060708e..d0e3a7d 100644 --- a/README.md +++ b/README.md @@ -136,7 +136,7 @@ response: #### Disconnect API #### ```bash -$ curl -X POST http://127.0.0.1:8080/api/db/disconnect +$ curl -X POST -H "Cookie:common-nsid=bec2e665ba62a13554b617d70de8b9b9" http://127.0.0.1:8080/api/db/disconnect ``` response: diff --git a/ccore/nebula/client_meta.go b/ccore/nebula/client_meta.go index b5f3785..5dc19a2 100644 --- a/ccore/nebula/client_meta.go +++ b/ccore/nebula/client_meta.go @@ -11,6 +11,7 @@ type ( MetaClient interface { Open() error AddHosts(endpoints []string) (types.MetaBaser, error) + AddHostsIntoZone(zone string, endpoints []string, isNew bool) (types.MetaBaser, error) DropHosts(endpoints []string) (types.MetaBaser, error) ListSpaces() (types.Spaces, error) BalanceData(space string) (types.Balancer, error) @@ -80,6 +81,18 @@ func (c *defaultMetaClient) AddHosts(endpoints []string) (resp types.MetaBaser, return } +func (c *defaultMetaClient) AddHostsIntoZone(zone string, endpoints []string, isNew bool) (resp types.MetaBaser, err error) { + retryErr := c.retryDo(func() (types.MetaBaser, error) { + resp, err = c.meta.AddHostsIntoZone(zone, endpoints, isNew) + return resp, err + }) + if retryErr != nil { + return nil, retryErr + } + + return +} + func (c *defaultMetaClient) DropHosts(endpoints []string) (resp types.MetaBaser, err error) { retryErr := c.retryDo(func() (types.MetaBaser, error) { resp, err = c.meta.DropHosts(endpoints) @@ -199,10 +212,10 @@ func (c *defaultMetaClient) openRetry(driver types.Driver) error { _ = c.meta.close() err := c.meta.open(driver) - c.meta.connection.UpdateNextIndex() // update nextIndex every time if err == nil { return nil } + c.meta.connection.UpdateNextIndex() // update nextIndex when connect failed } return nerrors.ErrNoValidMetaEndpoint } diff --git a/ccore/nebula/gateway/dao/dao.go b/ccore/nebula/gateway/dao/dao.go index 2a5301d..5dbdff1 100644 --- a/ccore/nebula/gateway/dao/dao.go +++ b/ccore/nebula/gateway/dao/dao.go @@ -237,7 +237,7 @@ func getMapInfo(valWarp *wrapper.ValueWrapper, _verticesParsedList *list, _edges for _, v := range valueMap { vType := v.GetType() if vType == "vertex" { - var _props map[string]types.Any + _props := make(map[string]types.Any) _props, err = getVertexInfo(&v, _props) if err == nil { *_verticesParsedList = append(*_verticesParsedList, _props) @@ -245,7 +245,7 @@ func getMapInfo(valWarp *wrapper.ValueWrapper, _verticesParsedList *list, _edges return err } } else if vType == "edge" { - var _props map[string]types.Any + _props := make(map[string]types.Any) _props, err = getEdgeInfo(&v, _props) if err == nil { *_edgesParsedList = append(*_edgesParsedList, _props) @@ -253,7 +253,7 @@ func getMapInfo(valWarp *wrapper.ValueWrapper, _verticesParsedList *list, _edges return err } } else if vType == "path" { - var _props map[string]types.Any + _props := make(map[string]types.Any) _props, err = getPathInfo(&v, _props) if err == nil { *_pathsParsedList = append(*_pathsParsedList, _props) @@ -302,8 +302,8 @@ func Disconnect(nsid string) error { } /* - executes the gql based on nsid, - and returns result, the runtime panic error and the result error. +executes the gql based on nsid, +and returns result, the runtime panic error and the result error. */ func Execute(nsid string, gql string, paramList types.ParameterList) (ExecuteResult, interface{}, error) { result := ExecuteResult{ diff --git a/ccore/nebula/gateway/pool/pool.go b/ccore/nebula/gateway/pool/pool.go index 690e699..b66289c 100644 --- a/ccore/nebula/gateway/pool/pool.go +++ b/ccore/nebula/gateway/pool/pool.go @@ -213,8 +213,6 @@ func ListParams(args string, tmpParameter types.ParameterMap, sessionMap types.P func NewClient(address string, port int, username string, password string, opts ...nebula.Option) (*ClientInfo, error) { var err error - clientMux.Lock() - defer clientMux.Unlock() // TODO: it's better to add a schedule to make it instead if currentClientNum > clientRecycleNum { @@ -253,8 +251,11 @@ func NewClient(address string, port int, username string, password string, opts }, timezone: c.GetTimezoneInfo(), } + + clientMux.Lock() clientPool[nsid] = client currentClientNum++ + clientMux.Unlock() // Make a goroutine to deal with concurrent requests from each connection go handleRequest(nsid) @@ -273,6 +274,7 @@ func ClearClients() { } func recycleClients() { + clientMux.Lock() for _, client := range clientPool { now := time.Now().Unix() expireAt := client.updateTime + SessionExpiredDuration @@ -280,6 +282,7 @@ func recycleClients() { client.CloseChannel <- true } } + clientMux.Unlock() } func handleRequest(nsid string) { diff --git a/ccore/nebula/internal/driver/v2_5/meta.go b/ccore/nebula/internal/driver/v2_5/meta.go index ab10f4f..e8130ce 100644 --- a/ccore/nebula/internal/driver/v2_5/meta.go +++ b/ccore/nebula/internal/driver/v2_5/meta.go @@ -46,6 +46,10 @@ func (c *defaultMetaClient) AddHosts(endpoints []string) (types.MetaBaser, error return nil, nerrors.ErrUnsupported } +func (c *defaultMetaClient) AddHostsIntoZone(zone string, endpoints []string, isNew bool) (types.MetaBaser, error) { + return nil, nerrors.ErrUnsupported +} + func (c *defaultMetaClient) DropHosts(endpoints []string) (types.MetaBaser, error) { return nil, nerrors.ErrUnsupported } diff --git a/ccore/nebula/internal/driver/v2_5/wrapper.go b/ccore/nebula/internal/driver/v2_5/wrapper.go index 52caf41..deeb23f 100644 --- a/ccore/nebula/internal/driver/v2_5/wrapper.go +++ b/ccore/nebula/internal/driver/v2_5/wrapper.go @@ -1148,6 +1148,10 @@ func (w spaceWrapper) GetName() string { return string(w.Space.GetName()) } +func (w spaceWrapper) GetId() int32 { + return w.Space.GetId().GetSpaceID() +} + type spacesWrap struct { metaBaserWrap Spaces []types.Space diff --git a/ccore/nebula/internal/driver/v2_6/meta.go b/ccore/nebula/internal/driver/v2_6/meta.go index bd140bc..66f1acb 100644 --- a/ccore/nebula/internal/driver/v2_6/meta.go +++ b/ccore/nebula/internal/driver/v2_6/meta.go @@ -50,6 +50,10 @@ func (c *defaultMetaClient) AddHosts(endpoints []string) (types.MetaBaser, error return nil, nerrors.ErrUnsupported } +func (c *defaultMetaClient) AddHostsIntoZone(zone string, endpoints []string, isNew bool) (types.MetaBaser, error) { + return nil, nerrors.ErrUnsupported +} + func (c *defaultMetaClient) DropHosts(endpoints []string) (types.MetaBaser, error) { return nil, nerrors.ErrUnsupported } diff --git a/ccore/nebula/internal/driver/v2_6/wrapper.go b/ccore/nebula/internal/driver/v2_6/wrapper.go index 9e825cf..c1f4e2a 100644 --- a/ccore/nebula/internal/driver/v2_6/wrapper.go +++ b/ccore/nebula/internal/driver/v2_6/wrapper.go @@ -1248,6 +1248,10 @@ func (w spaceWrapper) GetName() string { return string(w.Space.GetName()) } +func (w spaceWrapper) GetId() int32 { + return w.Space.GetId().GetSpaceID() +} + type spacesWrap struct { metaBaserWrap Spaces []types.Space diff --git a/ccore/nebula/internal/driver/v3_0/meta.go b/ccore/nebula/internal/driver/v3_0/meta.go index f245df8..153430a 100644 --- a/ccore/nebula/internal/driver/v3_0/meta.go +++ b/ccore/nebula/internal/driver/v3_0/meta.go @@ -1,6 +1,7 @@ package v3_0 import ( + "fmt" "net" "strconv" @@ -86,6 +87,47 @@ func (c *defaultMetaClient) AddHosts(endpoints []string) (types.MetaBaser, error }, nil } +func (c *defaultMetaClient) AddHostsIntoZone(zone string, endpoints []string, isNew bool) (types.MetaBaser, error) { + hostsToAdd := make([]*nthrift.HostAddr, 0, len(endpoints)) + for _, ep := range endpoints { + host, portStr, err := net.SplitHostPort(ep) + if err != nil { + return nil, err + } + + port, err := strconv.Atoi(portStr) + if err != nil { + return nil, err + } + + hostsToAdd = append(hostsToAdd, &nthrift.HostAddr{ + Host: host, + Port: nthrift.Port(port), + }) + } + + req := &meta.AddHostsIntoZoneReq{ + Hosts: hostsToAdd, + ZoneName: []byte(zone), + IsNew: isNew, + } + + fmt.Println(req) + + resp, err := c.meta.AddHostsIntoZone(req) + if err != nil { + return nil, err + } + + return metaBaserWrap{ + code: nerrors.ErrorCode(resp.GetCode()), + leader: types.HostAddr{ + Host: resp.GetLeader().GetHost(), + Port: resp.GetLeader().GetPort(), + }, + }, nil +} + func (c *defaultMetaClient) DropHosts(endpoints []string) (types.MetaBaser, error) { hostsToDrop := make([]*nthrift.HostAddr, 0, len(endpoints)) for _, ep := range endpoints { diff --git a/ccore/nebula/internal/driver/v3_0/wrapper.go b/ccore/nebula/internal/driver/v3_0/wrapper.go index c37bd96..0cd35e0 100644 --- a/ccore/nebula/internal/driver/v3_0/wrapper.go +++ b/ccore/nebula/internal/driver/v3_0/wrapper.go @@ -1213,6 +1213,10 @@ func (w spaceWrapper) GetName() string { return string(w.Space.GetName()) } +func (w spaceWrapper) GetId() int32 { + return w.Space.GetId().GetSpaceID() +} + type spacesWrap struct { metaBaserWrap Spaces []types.Space diff --git a/ccore/nebula/options.go b/ccore/nebula/options.go index b1687a8..7a6f8ea 100644 --- a/ccore/nebula/options.go +++ b/ccore/nebula/options.go @@ -55,9 +55,9 @@ func WithLogger(log Logger) Option { func WithTimeout(timeout time.Duration) Option { return func(o *Options) { - WithGraphTimeout(timeout) - WithMetaTimeout(timeout) - WithStorageTimeout(timeout) + WithGraphTimeout(timeout)(o) + WithMetaTimeout(timeout)(o) + WithStorageTimeout(timeout)(o) } } diff --git a/ccore/nebula/types/driver.go b/ccore/nebula/types/driver.go index 2ba0810..25a4ea1 100644 --- a/ccore/nebula/types/driver.go +++ b/ccore/nebula/types/driver.go @@ -37,6 +37,7 @@ type ( Open() error VerifyClientVersion() error AddHosts(endpoints []string) (MetaBaser, error) + AddHostsIntoZone(zone string, endpoints []string, isNew bool) (MetaBaser, error) DropHosts(endpoints []string) (MetaBaser, error) ListSpaces() (Spaces, error) Balance(req BalanceReq) (Balancer, error) @@ -73,6 +74,7 @@ type ( Space interface { GetName() string + GetId() int32 } Spaces interface { diff --git a/go.mod b/go.mod index 1a83441..7ea118b 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/google/go-cmp v0.5.4 // indirect github.com/prometheus/client_golang v1.9.0 // indirect github.com/shiena/ansicolor v0.0.0-20200904210342-c7312218db18 // indirect - github.com/vesoft-inc/nebula-go/v3 v3.2.0 // indirect + github.com/vesoft-inc/nebula-go/v3 v3.4.0 // indirect golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad // indirect golang.org/x/net v0.0.0-20201224014010-6772e930b67b // indirect golang.org/x/sys v0.0.0-20210105210732-16f7687f5001 // indirect diff --git a/go.sum b/go.sum index e36d6ed..e388b05 100644 --- a/go.sum +++ b/go.sum @@ -321,6 +321,8 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX github.com/vesoft-inc/nebula-go/v3 v3.0.0-20220425030225-cdb52399b40a/go.mod h1:+sXv05jYQBARdTbTcIEsWVXCnF/6ttOlDK35xQ6m54s= github.com/vesoft-inc/nebula-go/v3 v3.2.0 h1:f0IW6W5f91O6FyXAi+kFq1D2bNFXtPtkb/H9WywFGmM= github.com/vesoft-inc/nebula-go/v3 v3.2.0/go.mod h1:+sXv05jYQBARdTbTcIEsWVXCnF/6ttOlDK35xQ6m54s= +github.com/vesoft-inc/nebula-go/v3 v3.4.0 h1:7q2DSW4QABwI2oGPSVuC+Ql7kGwj26G/YVPGD7gETys= +github.com/vesoft-inc/nebula-go/v3 v3.4.0/go.mod h1:+sXv05jYQBARdTbTcIEsWVXCnF/6ttOlDK35xQ6m54s= github.com/vesoft-inc/nebula-importer v1.0.1-0.20220719030708-8e376665042e h1:Xj3N5lfKv+mG59Fh2GoWZ/89kWEwQtW/W4EiKkD2yI0= github.com/vesoft-inc/nebula-importer v1.0.1-0.20220719030708-8e376665042e/go.mod h1:8xAQi6KI2qe40Dop/GqDXmBEurt7qGp5Pjd1MESAVNA= github.com/wendal/errors v0.0.0-20130201093226-f66c77a7882b/go.mod h1:Q12BUT7DqIlHRmgv3RskH+UCM/4eqVMgI0EMmlSpAXc= diff --git a/service/importer/importer.go b/service/importer/importer.go index df27993..544d864 100644 --- a/service/importer/importer.go +++ b/service/importer/importer.go @@ -3,10 +3,11 @@ package importer import ( "errors" "fmt" - "github.com/vesoft-inc/nebula-importer/pkg/logger" "path/filepath" "time" + "github.com/vesoft-inc/nebula-importer/pkg/logger" + "github.com/astaxie/beego" "github.com/astaxie/beego/logs" "github.com/vesoft-inc/nebula-importer/pkg/config" @@ -135,7 +136,7 @@ func actionQuery(taskID string, result *ActionResult) { } /* - `actionQueryAll` will return all tasks with status Aborted or Processing +`actionQueryAll` will return all tasks with status Aborted or Processing */ func actionQueryAll(result *ActionResult) { taskIDs := GetTaskMgr().GetAllTaskIDs() @@ -159,7 +160,7 @@ func actionStop(taskID string, result *ActionResult) { } /* - `actionStopAll` will stop all tasks with status Processing +`actionStopAll` will stop all tasks with status Processing */ func actionStopAll(result *ActionResult) { taskIDs := GetTaskMgr().GetAllTaskIDs()