Skip to content

Commit

Permalink
Merge pull request #4 from LdDl/racefix
Browse files Browse the repository at this point in the history
Racefix
  • Loading branch information
LdDl authored Oct 1, 2020
2 parents d0dd09f + 310ba81 commit 1f18893
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 41 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ Paste link to the browser and check if video loaded successfully.
## Dependencies
GIN web-framework - [https://github.com/gin-gonic/gin](https://github.com/gin-gonic/gin). License is [MIT](https://github.com/gin-gonic/gin/blob/master/LICENSE)

Media library - [http://github.com/deepch/vdk](https://github.com/deepch/vdk). License is [MIT](https://github.com/deepch/vdk/blob/master/LICENSE). We are using fork actually - https://github.com/morozka/vdk
Media library - [github.com/morozka/vdk](https://github.com/morozka/vdk). License is [MIT](https://github.com/morozka/vdk/blob/master/LICENSE)


UUID generation and parsing - [https://github.com/google/uuid](https://github.com/google/uuid). License is [BSD 3-Clause](https://github.com/google/uuid/blob/master/LICENSE)

Expand Down
89 changes: 56 additions & 33 deletions configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,30 @@ import (
"github.com/morozka/vdk/av"
)

// StreamsMap map of *StreamConfiguration with mutex
type StreamsMap struct {
sync.Mutex
Streams map[uuid.UUID]*StreamConfiguration
}

func (sm StreamsMap) getKeys() []uuid.UUID {
sm.Lock()
defer sm.Unlock()
keys := make([]uuid.UUID, 0, len(sm.Streams))
for k := range sm.Streams {
keys = append(keys, k)
}
return keys
}

// AppConfiguration Configuration parameters for application
type AppConfiguration struct {
mutex sync.Mutex
Server ServerConfiguration `json:"server"`
Streams map[uuid.UUID]*StreamConfiguration `json:"streams"`

HlsMsPerSegment int64 `json:"hlsMsPerSegment"`
HlsDirectory string `json:"hlsDirectory"`
HlsWindowSize uint `json:"hlsWindowSize"`
HlsCapacity uint `json:"hlsWindowCapacity"`
Server ServerConfiguration `json:"server"`
Streams StreamsMap `json:"streams"`
HlsMsPerSegment int64 `json:"hlsMsPerSegment"`
HlsDirectory string `json:"hlsDirectory"`
HlsWindowSize uint `json:"hlsWindowSize"`
HlsCapacity uint `json:"hlsWindowCapacity"`
}

// ServerConfiguration Configuration parameters for server
Expand Down Expand Up @@ -92,7 +106,7 @@ func NewAppConfiguration(fname string) (*AppConfiguration, error) {
jsonConf.HlsWindowSize = jsonConf.HlsCapacity
}

tmp.Streams = make(map[uuid.UUID]*StreamConfiguration)
tmp.Streams = StreamsMap{Streams: make(map[uuid.UUID]*StreamConfiguration)}
tmp.Server = jsonConf.Server
tmp.HlsMsPerSegment = jsonConf.HlsMsPerSegment
tmp.HlsDirectory = jsonConf.HlsDirectory
Expand All @@ -104,7 +118,7 @@ func NewAppConfiguration(fname string) (*AppConfiguration, error) {
log.Printf("Not valid UUID: %s\n", jsonConf.Streams[i].GUID)
continue
}
tmp.Streams[validUUID] = &StreamConfiguration{
tmp.Streams.Streams[validUUID] = &StreamConfiguration{
URL: jsonConf.Streams[i].URL,
Clients: make(map[uuid.UUID]viewer),
HlsChanel: make(chan av.Packet, 100),
Expand All @@ -114,67 +128,76 @@ func NewAppConfiguration(fname string) (*AppConfiguration, error) {
}

func (element *AppConfiguration) cast(id uuid.UUID, pck av.Packet) {
element.Streams[id].HlsChanel <- pck
for _, v := range element.Streams[id].Clients {
element.Streams.Lock()
defer element.Streams.Unlock()
curStream, _ := element.Streams.Streams[id]
curStream.HlsChanel <- pck
for _, v := range curStream.Clients {
if len(v.c) < cap(v.c) {
v.c <- pck
}
}
}

func (element *AppConfiguration) ext(streamID uuid.UUID) bool {
_, ok := element.Streams[streamID]
element.Streams.Lock()
defer element.Streams.Unlock()
_, ok := element.Streams.Streams[streamID]
return ok
}

func (element *AppConfiguration) codecAdd(streamID uuid.UUID, codecs []av.CodecData) {
defer element.mutex.Unlock()
element.mutex.Lock()
t := element.Streams[streamID]
t.Codecs = codecs
element.Streams[streamID] = t
element.Streams.Lock()
defer element.Streams.Unlock()
element.Streams.Streams[streamID].Codecs = codecs
}

func (element *AppConfiguration) codecGet(streamID uuid.UUID) []av.CodecData {
return element.Streams[streamID].Codecs
element.Streams.Lock()
defer element.Streams.Unlock()
curStream, _ := element.Streams.Streams[streamID]
return curStream.Codecs
}

func (element *AppConfiguration) updateStatus(streamID uuid.UUID, status bool) {
defer element.mutex.Unlock()
element.mutex.Lock()
t := element.Streams[streamID]
element.Streams.Lock()
defer element.Streams.Unlock()
t, _ := element.Streams.Streams[streamID]
t.Status = status
element.Streams[streamID] = t
element.Streams.Streams[streamID] = t
}

func (element *AppConfiguration) clientAdd(streamID uuid.UUID) (uuid.UUID, chan av.Packet, error) {
defer element.mutex.Unlock()
element.mutex.Lock()
element.Streams.Lock()
defer element.Streams.Unlock()
clientID, err := uuid.NewUUID()
if err != nil {
return uuid.UUID{}, nil, err
}
ch := make(chan av.Packet, 100)
element.Streams[streamID].Clients[clientID] = viewer{c: ch}
curStream, _ := element.Streams.Streams[streamID]
curStream.Clients[clientID] = viewer{c: ch}
return clientID, ch, nil
}

func (element *AppConfiguration) clientDelete(streamID, clientID uuid.UUID) {
defer element.mutex.Unlock()
element.mutex.Lock()
delete(element.Streams[streamID].Clients, clientID)
defer element.Streams.Unlock()
element.Streams.Lock()
delete(element.Streams.Streams[streamID].Clients, clientID)
}

func (element *AppConfiguration) startHlsCast(streamID uuid.UUID, stopCast chan bool) {
defer element.mutex.Unlock()
element.mutex.Lock()
go element.startHls(streamID, element.Streams[streamID].HlsChanel, stopCast)
defer element.Streams.Unlock()
element.Streams.Lock()
go element.startHls(streamID, element.Streams.Streams[streamID].HlsChanel, stopCast)
}

func (element *AppConfiguration) list() (uuid.UUID, []uuid.UUID) {
defer element.Streams.Unlock()
element.Streams.Lock()
res := []uuid.UUID{}
first := uuid.UUID{}
for k := range element.Streams {
for k := range element.Streams.Streams {
if first.String() == "" {
first = k
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/LdDl/video-server
go 1.14

require (
github.com/deepch/vdk v0.0.0-20200811012133-292592fcb5e6
github.com/morozka/vdk v0.0.0
github.com/gin-contrib/cors v1.3.1
github.com/gin-contrib/pprof v1.3.0
github.com/gin-gonic/gin v1.6.3
Expand Down
11 changes: 7 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deepch/vdk v0.0.0-20200703105741-130629d758b1 h1:MrpuwTr3W4F6FQUZCnAX19lAWKh8M+3rXNGkd4bm7Vs=
github.com/deepch/vdk v0.0.0-20200703105741-130629d758b1/go.mod h1:7H7GjjI4rfnSeijznawyOce8q2esWSr29CN3TqBh0Mc=
github.com/deepch/vdk v0.0.0-20200811012133-292592fcb5e6 h1:GOqfl5Mk8puBu0dAijgOylAGLQfYAi1+s1V9oHJBVbk=
github.com/deepch/vdk v0.0.0-20200811012133-292592fcb5e6/go.mod h1:REkQY9owyTu6mlvoHzUGFz1Y/C/XMYA/EqM3ezHZRBc=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gin-contrib/cors v1.3.1 h1:doAsuITavI4IOcd0Y19U4B+O0dNWihRyX//nn4sEmgA=
github.com/gin-contrib/cors v1.3.1/go.mod h1:jjEJ4268OPZUcU7k9Pm653S7lXUGcqMADzFA61xsmDk=
Expand Down Expand Up @@ -53,6 +49,13 @@ github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHX
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/morozka/vdk v0.0.0-20200703105741-130629d758b1 h1:MrpuwTr3W4F6FQUZCnAX19lAWKh8M+3rXNGkd4bm7Vs=
github.com/morozka/vdk v0.0.0-20200703105741-130629d758b1/go.mod h1:7H7GjjI4rfnSeijznawyOce8q2esWSr29CN3TqBh0Mc=
github.com/morozka/vdk v0.0.0-20200811012133-292592fcb5e6 h1:GOqfl5Mk8puBu0dAijgOylAGLQfYAi1+s1V9oHJBVbk=
github.com/morozka/vdk v0.0.0-20200811012133-292592fcb5e6/go.mod h1:REkQY9owyTu6mlvoHzUGFz1Y/C/XMYA/EqM3ezHZRBc=
github.com/morozka/vdk v0.0.0-20200904010955-fccce247d096 h1:G6vS2gyvKyYl1bMIUfmJhk3UPCQwRBSilnrpzyK2ZN0=
github.com/morozka/vdk v0.0.0 h1:pRCLahoaYW8OCyavzCdB/3c/DONVg1PEvknlsYrRwl8=
github.com/morozka/vdk v0.0.0/go.mod h1:zR4ofHV1yxXtkdrKRXyEOHyaY66G4JBoOSpHZ6Q+XoU=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
Expand Down
1 change: 1 addition & 0 deletions http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"time"


"github.com/gin-contrib/cors"
"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
Expand Down
7 changes: 5 additions & 2 deletions streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (

// StartStreams Start video streams
func StartStreams(cfg *AppConfiguration) {
for k, v := range cfg.Streams {
for _, k := range cfg.Streams.getKeys() {
cfg.Streams.Lock()
url := cfg.Streams.Streams[k].URL
cfg.Streams.Unlock()
go func(name uuid.UUID, url string) {
for {
log.Printf("Stream must be establishment for '%s' by connecting to %s\n", name, url)
Expand Down Expand Up @@ -46,6 +49,6 @@ func StartStreams(cfg *AppConfiguration) {
log.Printf("Stream must be re-establishment for '%s' by connecting to %s in next 5 seconds\n", name, url)
time.Sleep(5 * time.Second)
}
}(k, v.URL)
}(k, url)
}
}

0 comments on commit 1f18893

Please sign in to comment.