Skip to content

Commit

Permalink
Feat: recover server app channle
Browse files Browse the repository at this point in the history
  • Loading branch information
zijiren233 committed Oct 19, 2023
1 parent 1651f7b commit cf2d835
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 211 deletions.
15 changes: 11 additions & 4 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
"github.com/zijiren233/gencontainer/rwmap"
"github.com/zijiren233/livelib/cmd/flags"
"github.com/zijiren233/livelib/protocol/hls"
"github.com/zijiren233/livelib/protocol/httpflv"
Expand All @@ -35,7 +36,12 @@ func Server(cmd *cobra.Command, args []string) {
muxer := cmux.New(listener)
httpl := muxer.Match(cmux.HTTP1Fast())
tcp := muxer.Match(cmux.Any())
s := server.NewRtmpServer(server.WithInitHlsPlayer(true), server.WithAutoCreateAppOrChannel(true))
channels := rwmap.RWMap[string, *server.Channel]{}
s := server.NewRtmpServer(func(ReqAppName, ReqChannelName string, IsPublisher bool) (*server.Channel, error) {
c, _ := channels.LoadOrStore(ReqAppName, server.NewChannel(ReqAppName))
c.InitHlsPlayer()
return c, nil
})
go s.Serve(tcp)
if flags.Dev {
gin.SetMode(gin.DebugMode)
Expand All @@ -51,10 +57,11 @@ func Server(cmd *cobra.Command, args []string) {
fileName := channelSplitd[0]
fileExt := path.Ext(channelStr)
channelName := strings.TrimSuffix(fileName, fileExt)
channel, err := s.GetChannelWithApp(appName, channelName)
if err != nil {

channel, ok := channels.Load(appName)
if !ok {
c.AbortWithStatusJSON(http.StatusNotFound, gin.H{
"error": err.Error(),
"error": "app not found",
})
return
}
Expand Down
97 changes: 0 additions & 97 deletions server/app.go

This file was deleted.

13 changes: 9 additions & 4 deletions server/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
type Channel struct {
channelName string
inPublication uint32
players *rwmap.RWMap[av.WriteCloser, *packWriter]
players rwmap.RWMap[av.WriteCloser, *packWriter]

closed uint32
wg sync.WaitGroup
Expand All @@ -25,11 +25,16 @@ type Channel struct {
hlsWriter *hls.Source
}

func newChannel(channelName string) *Channel {
return &Channel{
type ChannelConf func(*Channel)

func NewChannel(channelName string, conf ...ChannelConf) *Channel {
ch := &Channel{
channelName: channelName,
players: &rwmap.RWMap[av.WriteCloser, *packWriter]{},
}
for _, c := range conf {
c(ch)
}
return ch
}

func (c *Channel) InPublication() bool {
Expand Down
117 changes: 11 additions & 106 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,67 +5,35 @@ import (
"net"
"sync/atomic"

"github.com/zijiren233/gencontainer/rwmap"
"github.com/zijiren233/livelib/protocol/rtmp"
"github.com/zijiren233/livelib/protocol/rtmp/core"
)

type Server struct {
apps rwmap.RWMap[string, *App]
connBufferSize int32
parseChannelFunc parseChannelFunc
initHlsPlayer bool
autoCreateAppOrChannel bool
connBufferSize int32
authFunc AuthFunc
}

type parseChannelFunc func(ReqAppName, ReqChannelName string, IsPublisher bool) (TrueAppName string, TrueChannel string, err error)

func DefaultRtmpServer() *Server {
return &Server{
connBufferSize: 4096,
initHlsPlayer: false,
autoCreateAppOrChannel: false,
}
}
type AuthFunc func(ReqAppName, ReqChannelName string, IsPublisher bool) (*Channel, error)

type ServerConf func(*Server)

func WithParseChannelFunc(f parseChannelFunc) ServerConf {
return func(s *Server) {
s.parseChannelFunc = f
}
}

func WithConnBufferSize(bufferSize int32) ServerConf {
return func(s *Server) {
s.connBufferSize = bufferSize
}
}

func WithInitHlsPlayer(init bool) ServerConf {
return func(s *Server) {
s.initHlsPlayer = init
}
}

func WithAutoCreateAppOrChannel(auto bool) ServerConf {
return func(s *Server) {
s.autoCreateAppOrChannel = auto
func NewRtmpServer(authFunc AuthFunc, c ...ServerConf) *Server {
s := &Server{
authFunc: authFunc,
}
}

func NewRtmpServer(c ...ServerConf) *Server {
s := DefaultRtmpServer()
for _, conf := range c {
conf(s)
}
return s
}

func (s *Server) SetParseChannelFunc(f parseChannelFunc) {
s.parseChannelFunc = f
}

func (s *Server) SetConnBufferSize(bufferSize int32) {
atomic.StoreInt32(&s.connBufferSize, bufferSize)
}
Expand All @@ -74,50 +42,8 @@ var (
ErrAppAlreadyExists = errors.New("app already exists")
)

func (s *Server) NewApp(appName string) (*App, error) {
a := NewApp(appName)
_, loaded := s.apps.LoadOrStore(appName, a)
if loaded {
return nil, ErrAppAlreadyExists
}
return a, nil
}

func (s *Server) GetOrNewApp(appName string) *App {
a, _ := s.apps.LoadOrStore(appName, NewApp(appName))
return a
}

var ErrAppNotFount = errors.New("app not found")

func (s *Server) GetApp(appName string) (*App, error) {
a, ok := s.apps.Load(appName)
if !ok {
return nil, ErrAppNotFount
}
return a, nil
}

func (s *Server) DelApp(appName string) error {
a, loaded := s.apps.LoadAndDelete(appName)
if !loaded {
return ErrAppNotFount
}
return a.Close()
}

func (s *Server) GetChannelWithApp(appName, channelName string) (*Channel, error) {
a, err := s.GetApp(appName)
if err != nil {
return nil, err
}
return a.GetChannel(channelName)
}

func (s *Server) GetOrNewChannelWithApp(appName, channelName string) (*Channel, error) {
return s.GetOrNewApp(appName).GetOrNewChannel(channelName)
}

func (s *Server) Serve(l net.Listener) error {
for {
netconn, err := l.Accept()
Expand All @@ -140,37 +66,16 @@ func (s *Server) handleConn(conn *core.Conn) (err error) {
if err = connServer.ReadInitMsg(); err != nil {
return
}

var app, name = connServer.ConnInfo.App, connServer.PublishInfo.Name
if s.parseChannelFunc != nil {
app, name, err = s.parseChannelFunc(connServer.ConnInfo.App, connServer.PublishInfo.Name, connServer.IsPublisher())
if err != nil {
return err
}
}
var channel *Channel
if s.autoCreateAppOrChannel {
channel, err = s.GetOrNewChannelWithApp(app, name)
if err != nil {
return err
}
} else {
app, err := s.GetApp(app)
if err != nil {
return err
}
channel, err = app.GetChannel(name)
if err != nil {
return err
}
if s.authFunc == nil {
panic("rtmp server auth func not implemented")
}
channel, err := s.authFunc(app, name, connServer.IsPublisher())

if connServer.IsPublisher() {
reader := rtmp.NewReader(connServer)
defer reader.Close()
if s.initHlsPlayer {
if err := channel.InitHlsPlayer(); err != nil {
return err
}
}
channel.PushStart(reader)
} else {
writer := rtmp.NewWriter(connServer)
Expand Down

0 comments on commit cf2d835

Please sign in to comment.