Skip to content

Commit

Permalink
implement least active load balance (#602)
Browse files Browse the repository at this point in the history
feat: add least active load balance
  • Loading branch information
oldmee authored Feb 3, 2024
1 parent 9709b35 commit a875204
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 1 deletion.
15 changes: 14 additions & 1 deletion pkg/remoting/getty/getty_remoting.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
getty "github.com/apache/dubbo-getty"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/remoting/rpc"
"github.com/seata/seata-go/pkg/util/log"
)

Expand Down Expand Up @@ -61,14 +62,26 @@ func (g *GettyRemoting) SendSync(msg message.RpcMessage, s getty.Session, callba
if s == nil {
s = sessionManager.selectSession(msg)
}
return g.sendAsync(s, msg, callback)
rpc.BeginCount(s.RemoteAddr())
result, err := g.sendAsync(s, msg, callback)
rpc.EndCount(s.RemoteAddr())
if err != nil {
log.Errorf("send message: %#v, session: %s", msg, s.Stat())
return nil, err
}
return result, err
}

func (g *GettyRemoting) SendASync(msg message.RpcMessage, s getty.Session, callback callbackMethod) error {
if s == nil {
s = sessionManager.selectSession(msg)
}
rpc.BeginCount(s.RemoteAddr())
_, err := g.sendAsync(s, msg, callback)
rpc.EndCount(s.RemoteAddr())
if err != nil {
log.Errorf("send message: %#v, session: %s", msg, s.Stat())
}
return err
}

Expand Down
65 changes: 65 additions & 0 deletions pkg/remoting/loadbalance/least_active_loadbalance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package loadbalance

import (
"math/rand"
"sync"
"time"

"github.com/seata/seata-go/pkg/remoting/rpc"

getty "github.com/apache/dubbo-getty"
)

func LeastActiveLoadBalance(sessions *sync.Map, xid string) getty.Session {
var session getty.Session
var leastActive int32 = -1
leastCount := 0
var leastIndexes []getty.Session
sessions.Range(func(key, value interface{}) bool {
session = key.(getty.Session)
if session.IsClosed() {
sessions.Delete(session)
} else {
active := rpc.GetStatus(session.RemoteAddr()).GetActive()
if leastActive == -1 || active < leastActive {
leastActive = active
leastCount = 1
if len(leastIndexes) > 0 {
leastIndexes = leastIndexes[:0]
}
leastIndexes = append(leastIndexes, session)
} else if active == leastActive {
leastIndexes = append(leastIndexes, session)
leastCount++
}
}
return true
})

if leastCount == 0 {
return nil
}

if leastCount == 1 {
return leastIndexes[0]
} else {
return leastIndexes[rand.New(rand.NewSource(time.Now().UnixNano())).Intn(leastCount)]
}
}
63 changes: 63 additions & 0 deletions pkg/remoting/loadbalance/least_active_loadbalance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package loadbalance

import (
"fmt"
"strconv"
"sync"
"testing"

"github.com/golang/mock/gomock"
"github.com/seata/seata-go/pkg/remoting/mock"
"github.com/seata/seata-go/pkg/remoting/rpc"
"github.com/stretchr/testify/assert"
)

func TestLeastActiveLoadBalance(t *testing.T) {
ctrl := gomock.NewController(t)
sessions := &sync.Map{}

for i := 1; i <= 3; i++ {
session := mock.NewMockTestSession(ctrl)
session.EXPECT().IsClosed().Return(false).AnyTimes()
addr := "127.0.0." + strconv.Itoa(i) + ":8000"
session.EXPECT().RemoteAddr().AnyTimes().DoAndReturn(func() string {
return addr
})
sessions.Store(session, fmt.Sprintf("session-%d", i))
rpc.BeginCount(addr)
}

session := mock.NewMockTestSession(ctrl)
session.EXPECT().IsClosed().Return(true).AnyTimes()
addr := "127.0.0.5:8000"
session.EXPECT().RemoteAddr().AnyTimes().DoAndReturn(func() string {
return addr
})
sessions.Store(session, "session-5")
rpc.BeginCount(addr)

countTwo := "127.0.0.1:8000"
rpc.BeginCount(countTwo)

result := LeastActiveLoadBalance(sessions, "test_xid")
assert.False(t, result.RemoteAddr() == countTwo)
assert.False(t, result.RemoteAddr() == addr)
assert.False(t, result.IsClosed())
}
2 changes: 2 additions & 0 deletions pkg/remoting/loadbalance/loadbalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func Select(loadBalanceType string, sessions *sync.Map, xid string) getty.Sessio
return RandomLoadBalance(sessions, xid)
case xidLoadBalance:
return XidLoadBalance(sessions, xid)
case leastActiveLoadBalance:
return LeastActiveLoadBalance(sessions, xid)
case roundRobinLoadBalance:
return RoundRobinLoadBalance(sessions, xid)
default:
Expand Down
64 changes: 64 additions & 0 deletions pkg/remoting/rpc/rpc_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rpc

import (
"sync"
"sync/atomic"
)

var serviceStatusMap sync.Map

type Status struct {
Active int32
Total int32
}

// RemoveStatus remove the RpcStatus of this service
func RemoveStatus(service string) {
serviceStatusMap.Delete(service)
}

// BeginCount begin count
func BeginCount(service string) {
status := GetStatus(service)
atomic.AddInt32(&status.Active, 1)
}

// EndCount end count
func EndCount(service string) {
status := GetStatus(service)
atomic.AddInt32(&status.Active, -1)
atomic.AddInt32(&status.Total, 1)
}

// GetStatus get status
func GetStatus(service string) *Status {
a, _ := serviceStatusMap.LoadOrStore(service, new(Status))
return a.(*Status)
}

// GetActive get active.
func (s *Status) GetActive() int32 {
return s.Active
}

// GetTotal get total.
func (s *Status) GetTotal() int32 {
return s.Total
}
50 changes: 50 additions & 0 deletions pkg/remoting/rpc/rpc_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rpc

import (
"testing"

"github.com/stretchr/testify/assert"
)

var service = "127.0.0.1:8000"

func TestStatus(t *testing.T) {
rpcStatus1 := GetStatus(service)
assert.NotNil(t, rpcStatus1)
rpcStatus2 := GetStatus(service)
assert.Equal(t, rpcStatus1, rpcStatus2)
}

func TestRemoveStatus(t *testing.T) {
old := GetStatus(service)
RemoveStatus(service)
assert.Equal(t, GetStatus(service), old)
}

func TestBeginCount(t *testing.T) {
BeginCount(service)
assert.Equal(t, GetStatus(service).GetActive(), int32(1))
}

func TestEndCount(t *testing.T) {
EndCount(service)
assert.Equal(t, GetStatus(service).GetActive(), int32(0))
assert.Equal(t, GetStatus(service).GetTotal(), int32(1))
}

0 comments on commit a875204

Please sign in to comment.