Skip to content

Commit

Permalink
Merge remote-tracking branch 'vesoft/master'
Browse files Browse the repository at this point in the history
# Conflicts:
#	.github/workflows/Http-Gateway-Release.yml
  • Loading branch information
lixuanxian committed Mar 22, 2024
2 parents 4b5b51b + c5d7870 commit 4edaf3b
Show file tree
Hide file tree
Showing 17 changed files with 136 additions and 20 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/Http-Gateway-Release.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
name: Http-Gateway-Release
on:
push:
branches:
- master
release:
types:
- published
jobs:
release-image:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -30,4 +30,4 @@ jobs:
context: .
file: ./Dockerfile
push: true
tags: code4demo/nebula-http-gateway-cros:latest
tags: vesoft/nebula-http-gateway:3.2.0
33 changes: 33 additions & 0 deletions .github/workflows/Http-Gateway-nightly.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
name: Http-Gateway-nightly
on:
push:
branches:
- master
jobs:
release-image:
runs-on: ubuntu-latest
steps:
-
name: Checkout Github Action
uses: actions/checkout@master

-
name: Set up QEMU
uses: docker/setup-qemu-action@v1
-
name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
-
name: Login to DockerHub
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
-
name: Build and push
uses: docker/build-push-action@v2
with:
context: .
file: ./Dockerfile
push: true
tags: vesoft/nebula-http-gateway:nightly
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ response:
#### Disconnect API ####

```bash
$ curl -X POST http://127.0.0.1:8080/api/db/disconnect
$ curl -X POST -H "Cookie:common-nsid=bec2e665ba62a13554b617d70de8b9b9" http://127.0.0.1:8080/api/db/disconnect
```

response:
Expand Down
15 changes: 14 additions & 1 deletion ccore/nebula/client_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type (
MetaClient interface {
Open() error
AddHosts(endpoints []string) (types.MetaBaser, error)
AddHostsIntoZone(zone string, endpoints []string, isNew bool) (types.MetaBaser, error)
DropHosts(endpoints []string) (types.MetaBaser, error)
ListSpaces() (types.Spaces, error)
BalanceData(space string) (types.Balancer, error)
Expand Down Expand Up @@ -80,6 +81,18 @@ func (c *defaultMetaClient) AddHosts(endpoints []string) (resp types.MetaBaser,
return
}

func (c *defaultMetaClient) AddHostsIntoZone(zone string, endpoints []string, isNew bool) (resp types.MetaBaser, err error) {
retryErr := c.retryDo(func() (types.MetaBaser, error) {
resp, err = c.meta.AddHostsIntoZone(zone, endpoints, isNew)
return resp, err
})
if retryErr != nil {
return nil, retryErr
}

return
}

func (c *defaultMetaClient) DropHosts(endpoints []string) (resp types.MetaBaser, err error) {
retryErr := c.retryDo(func() (types.MetaBaser, error) {
resp, err = c.meta.DropHosts(endpoints)
Expand Down Expand Up @@ -199,10 +212,10 @@ func (c *defaultMetaClient) openRetry(driver types.Driver) error {
_ = c.meta.close()

err := c.meta.open(driver)
c.meta.connection.UpdateNextIndex() // update nextIndex every time
if err == nil {
return nil
}
c.meta.connection.UpdateNextIndex() // update nextIndex when connect failed
}
return nerrors.ErrNoValidMetaEndpoint
}
10 changes: 5 additions & 5 deletions ccore/nebula/gateway/dao/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,23 +237,23 @@ func getMapInfo(valWarp *wrapper.ValueWrapper, _verticesParsedList *list, _edges
for _, v := range valueMap {
vType := v.GetType()
if vType == "vertex" {
var _props map[string]types.Any
_props := make(map[string]types.Any)
_props, err = getVertexInfo(&v, _props)
if err == nil {
*_verticesParsedList = append(*_verticesParsedList, _props)
} else {
return err
}
} else if vType == "edge" {
var _props map[string]types.Any
_props := make(map[string]types.Any)
_props, err = getEdgeInfo(&v, _props)
if err == nil {
*_edgesParsedList = append(*_edgesParsedList, _props)
} else {
return err
}
} else if vType == "path" {
var _props map[string]types.Any
_props := make(map[string]types.Any)
_props, err = getPathInfo(&v, _props)
if err == nil {
*_pathsParsedList = append(*_pathsParsedList, _props)
Expand Down Expand Up @@ -302,8 +302,8 @@ func Disconnect(nsid string) error {
}

/*
executes the gql based on nsid,
and returns result, the runtime panic error and the result error.
executes the gql based on nsid,
and returns result, the runtime panic error and the result error.
*/
func Execute(nsid string, gql string, paramList types.ParameterList) (ExecuteResult, interface{}, error) {
result := ExecuteResult{
Expand Down
7 changes: 5 additions & 2 deletions ccore/nebula/gateway/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,6 @@ func ListParams(args string, tmpParameter types.ParameterMap, sessionMap types.P

func NewClient(address string, port int, username string, password string, opts ...nebula.Option) (*ClientInfo, error) {
var err error
clientMux.Lock()
defer clientMux.Unlock()

// TODO: it's better to add a schedule to make it instead
if currentClientNum > clientRecycleNum {
Expand Down Expand Up @@ -253,8 +251,11 @@ func NewClient(address string, port int, username string, password string, opts
},
timezone: c.GetTimezoneInfo(),
}

clientMux.Lock()
clientPool[nsid] = client
currentClientNum++
clientMux.Unlock()

// Make a goroutine to deal with concurrent requests from each connection
go handleRequest(nsid)
Expand All @@ -273,13 +274,15 @@ func ClearClients() {
}

func recycleClients() {
clientMux.Lock()
for _, client := range clientPool {
now := time.Now().Unix()
expireAt := client.updateTime + SessionExpiredDuration
if now > expireAt {
client.CloseChannel <- true
}
}
clientMux.Unlock()
}

func handleRequest(nsid string) {
Expand Down
4 changes: 4 additions & 0 deletions ccore/nebula/internal/driver/v2_5/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (c *defaultMetaClient) AddHosts(endpoints []string) (types.MetaBaser, error
return nil, nerrors.ErrUnsupported
}

func (c *defaultMetaClient) AddHostsIntoZone(zone string, endpoints []string, isNew bool) (types.MetaBaser, error) {
return nil, nerrors.ErrUnsupported
}

func (c *defaultMetaClient) DropHosts(endpoints []string) (types.MetaBaser, error) {
return nil, nerrors.ErrUnsupported
}
Expand Down
4 changes: 4 additions & 0 deletions ccore/nebula/internal/driver/v2_5/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,10 @@ func (w spaceWrapper) GetName() string {
return string(w.Space.GetName())
}

func (w spaceWrapper) GetId() int32 {
return w.Space.GetId().GetSpaceID()
}

type spacesWrap struct {
metaBaserWrap
Spaces []types.Space
Expand Down
4 changes: 4 additions & 0 deletions ccore/nebula/internal/driver/v2_6/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (c *defaultMetaClient) AddHosts(endpoints []string) (types.MetaBaser, error
return nil, nerrors.ErrUnsupported
}

func (c *defaultMetaClient) AddHostsIntoZone(zone string, endpoints []string, isNew bool) (types.MetaBaser, error) {
return nil, nerrors.ErrUnsupported
}

func (c *defaultMetaClient) DropHosts(endpoints []string) (types.MetaBaser, error) {
return nil, nerrors.ErrUnsupported
}
Expand Down
4 changes: 4 additions & 0 deletions ccore/nebula/internal/driver/v2_6/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,10 @@ func (w spaceWrapper) GetName() string {
return string(w.Space.GetName())
}

func (w spaceWrapper) GetId() int32 {
return w.Space.GetId().GetSpaceID()
}

type spacesWrap struct {
metaBaserWrap
Spaces []types.Space
Expand Down
42 changes: 42 additions & 0 deletions ccore/nebula/internal/driver/v3_0/meta.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v3_0

import (
"fmt"
"net"
"strconv"

Expand Down Expand Up @@ -86,6 +87,47 @@ func (c *defaultMetaClient) AddHosts(endpoints []string) (types.MetaBaser, error
}, nil
}

func (c *defaultMetaClient) AddHostsIntoZone(zone string, endpoints []string, isNew bool) (types.MetaBaser, error) {
hostsToAdd := make([]*nthrift.HostAddr, 0, len(endpoints))
for _, ep := range endpoints {
host, portStr, err := net.SplitHostPort(ep)
if err != nil {
return nil, err
}

port, err := strconv.Atoi(portStr)
if err != nil {
return nil, err
}

hostsToAdd = append(hostsToAdd, &nthrift.HostAddr{
Host: host,
Port: nthrift.Port(port),
})
}

req := &meta.AddHostsIntoZoneReq{
Hosts: hostsToAdd,
ZoneName: []byte(zone),
IsNew: isNew,
}

fmt.Println(req)

resp, err := c.meta.AddHostsIntoZone(req)
if err != nil {
return nil, err
}

return metaBaserWrap{
code: nerrors.ErrorCode(resp.GetCode()),
leader: types.HostAddr{
Host: resp.GetLeader().GetHost(),
Port: resp.GetLeader().GetPort(),
},
}, nil
}

func (c *defaultMetaClient) DropHosts(endpoints []string) (types.MetaBaser, error) {
hostsToDrop := make([]*nthrift.HostAddr, 0, len(endpoints))
for _, ep := range endpoints {
Expand Down
4 changes: 4 additions & 0 deletions ccore/nebula/internal/driver/v3_0/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,10 @@ func (w spaceWrapper) GetName() string {
return string(w.Space.GetName())
}

func (w spaceWrapper) GetId() int32 {
return w.Space.GetId().GetSpaceID()
}

type spacesWrap struct {
metaBaserWrap
Spaces []types.Space
Expand Down
6 changes: 3 additions & 3 deletions ccore/nebula/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ func WithLogger(log Logger) Option {

func WithTimeout(timeout time.Duration) Option {
return func(o *Options) {
WithGraphTimeout(timeout)
WithMetaTimeout(timeout)
WithStorageTimeout(timeout)
WithGraphTimeout(timeout)(o)
WithMetaTimeout(timeout)(o)
WithStorageTimeout(timeout)(o)
}
}

Expand Down
2 changes: 2 additions & 0 deletions ccore/nebula/types/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type (
Open() error
VerifyClientVersion() error
AddHosts(endpoints []string) (MetaBaser, error)
AddHostsIntoZone(zone string, endpoints []string, isNew bool) (MetaBaser, error)
DropHosts(endpoints []string) (MetaBaser, error)
ListSpaces() (Spaces, error)
Balance(req BalanceReq) (Balancer, error)
Expand Down Expand Up @@ -73,6 +74,7 @@ type (

Space interface {
GetName() string
GetId() int32
}

Spaces interface {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/google/go-cmp v0.5.4 // indirect
github.com/prometheus/client_golang v1.9.0 // indirect
github.com/shiena/ansicolor v0.0.0-20200904210342-c7312218db18 // indirect
github.com/vesoft-inc/nebula-go/v3 v3.2.0 // indirect
github.com/vesoft-inc/nebula-go/v3 v3.4.0 // indirect
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad // indirect
golang.org/x/net v0.0.0-20201224014010-6772e930b67b // indirect
golang.org/x/sys v0.0.0-20210105210732-16f7687f5001 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX
github.com/vesoft-inc/nebula-go/v3 v3.0.0-20220425030225-cdb52399b40a/go.mod h1:+sXv05jYQBARdTbTcIEsWVXCnF/6ttOlDK35xQ6m54s=
github.com/vesoft-inc/nebula-go/v3 v3.2.0 h1:f0IW6W5f91O6FyXAi+kFq1D2bNFXtPtkb/H9WywFGmM=
github.com/vesoft-inc/nebula-go/v3 v3.2.0/go.mod h1:+sXv05jYQBARdTbTcIEsWVXCnF/6ttOlDK35xQ6m54s=
github.com/vesoft-inc/nebula-go/v3 v3.4.0 h1:7q2DSW4QABwI2oGPSVuC+Ql7kGwj26G/YVPGD7gETys=
github.com/vesoft-inc/nebula-go/v3 v3.4.0/go.mod h1:+sXv05jYQBARdTbTcIEsWVXCnF/6ttOlDK35xQ6m54s=
github.com/vesoft-inc/nebula-importer v1.0.1-0.20220719030708-8e376665042e h1:Xj3N5lfKv+mG59Fh2GoWZ/89kWEwQtW/W4EiKkD2yI0=
github.com/vesoft-inc/nebula-importer v1.0.1-0.20220719030708-8e376665042e/go.mod h1:8xAQi6KI2qe40Dop/GqDXmBEurt7qGp5Pjd1MESAVNA=
github.com/wendal/errors v0.0.0-20130201093226-f66c77a7882b/go.mod h1:Q12BUT7DqIlHRmgv3RskH+UCM/4eqVMgI0EMmlSpAXc=
Expand Down
7 changes: 4 additions & 3 deletions service/importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package importer
import (
"errors"
"fmt"
"github.com/vesoft-inc/nebula-importer/pkg/logger"
"path/filepath"
"time"

"github.com/vesoft-inc/nebula-importer/pkg/logger"

"github.com/astaxie/beego"
"github.com/astaxie/beego/logs"
"github.com/vesoft-inc/nebula-importer/pkg/config"
Expand Down Expand Up @@ -135,7 +136,7 @@ func actionQuery(taskID string, result *ActionResult) {
}

/*
`actionQueryAll` will return all tasks with status Aborted or Processing
`actionQueryAll` will return all tasks with status Aborted or Processing
*/
func actionQueryAll(result *ActionResult) {
taskIDs := GetTaskMgr().GetAllTaskIDs()
Expand All @@ -159,7 +160,7 @@ func actionStop(taskID string, result *ActionResult) {
}

/*
`actionStopAll` will stop all tasks with status Processing
`actionStopAll` will stop all tasks with status Processing
*/
func actionStopAll(result *ActionResult) {
taskIDs := GetTaskMgr().GetAllTaskIDs()
Expand Down

0 comments on commit 4edaf3b

Please sign in to comment.