Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend application configuration #32

Merged
merged 77 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
1ec22c8
upgrade modules
Pavel7824 May 12, 2023
2da588f
update conf
LdDl May 12, 2023
335e960
Merge pull request #1 from LopanovCo/dev
LdDl May 12, 2023
dd57ee3
enum supported types
LdDl May 12, 2023
1e81872
Merge pull request #2 from LopanovCo/stream-types
LdDl May 12, 2023
345972d
app -> streams methods
LdDl May 15, 2023
8975468
early return on client add
LdDl May 15, 2023
9fd2283
metods applications -> storage
Pavel7824 May 15, 2023
2fa7efa
Merge pull request #4 from LopanovCo/dev
LdDl May 15, 2023
c9aaf05
Merge branch 'master' into skip-empty
LdDl May 15, 2023
e8033f6
migrate cast + exists with type
LdDl May 15, 2023
abea578
Merge pull request #3 from LopanovCo/skip-empty
LdDl May 15, 2023
b7548ec
add save mp4
Pavel7824 May 15, 2023
c0a71bc
add save mp4
Pavel7824 May 15, 2023
9e85316
add save mp4
Pavel7824 May 15, 2023
5b9ea16
fix
Pavel7824 May 15, 2023
cdd5486
Merge pull request #5 from LopanovCo/mp4
LdDl May 16, 2023
14c6f9f
conf for archive
LdDl May 17, 2023
bb9316e
add archive
Pavel7824 May 17, 2023
30ab25a
Merge pull request #7 from LopanovCo/archive
LdDl May 17, 2023
6af9b4e
Merge branch 'master' into master
LdDl Sep 3, 2024
7288849
Merge branch 'master' into master
LdDl Sep 3, 2024
bc57ec3
Merge branch 'master' into master
LdDl Sep 3, 2024
2064fea
update config
LdDl Sep 4, 2024
dc8d109
update
LdDl Sep 4, 2024
57394e1
add dep
LdDl Sep 4, 2024
00318aa
log main file
LdDl Sep 4, 2024
8f5c838
ioutil deprecated
LdDl Sep 4, 2024
cb185a7
log configuration
LdDl Sep 4, 2024
fa2eb4d
lower case
LdDl Sep 4, 2024
ae236c3
rearrange errors
LdDl Sep 4, 2024
2831f6f
lower case storage
LdDl Sep 4, 2024
db3a81f
gin.releasemode out of goroutine
LdDl Sep 4, 2024
bd0832f
log streams
LdDl Sep 4, 2024
e979e06
log video server
LdDl Sep 4, 2024
f1a43e8
log API server
LdDl Sep 4, 2024
ed81ae0
log hls
LdDl Sep 4, 2024
ce4e4aa
log mp4
LdDl Sep 4, 2024
22048bd
Merge pull request #8 from LopanovCo/log
LdDl Sep 4, 2024
83c5483
Merge branch 'master' into master
LdDl Sep 4, 2024
392826a
remove panic
LdDl Sep 4, 2024
6e7e49c
spot mutex
LdDl Sep 4, 2024
dacc999
remove lock on top
LdDl Sep 4, 2024
c8265a4
spot another mutex error
LdDl Sep 4, 2024
326b741
spot error for 100% working video
LdDl Sep 4, 2024
cc22bac
fix mutex for archive
LdDl Sep 5, 2024
ab5c0e6
arrange locks
LdDl Sep 5, 2024
cac88a0
fix codec acquire
LdDl Sep 5, 2024
2e0559b
update hls.js
LdDl Sep 5, 2024
6798f0e
seprate utils
LdDl Sep 5, 2024
e91dd1a
reduce log info
LdDl Sep 5, 2024
fb02c3f
minor
LdDl Sep 5, 2024
5627a4c
handle bad packet write
LdDl Sep 5, 2024
b64830e
more logging on MSE
LdDl Sep 5, 2024
26e9f40
Merge pull request #9 from LopanovCo/debug
LdDl Sep 5, 2024
f963e3b
Merge branch 'debug'
LdDl Sep 5, 2024
88b5265
control timeout var
LdDl Sep 5, 2024
67a044c
log scope ws
LdDl Sep 5, 2024
5a8eac0
add verbose level
LdDl Sep 5, 2024
bd9ee89
parse verbose
LdDl Sep 5, 2024
9d9e8f0
conditional verbose on stream internal
LdDl Sep 5, 2024
da76c17
minor log
LdDl Sep 5, 2024
1a56b0c
Merge pull request #10 from LopanovCo/debug
LdDl Sep 5, 2024
2b01458
lock/unlock in api
LdDl Sep 6, 2024
695278a
add archiveEnabled
LdDl Sep 6, 2024
a95d18a
arrange
LdDl Sep 6, 2024
e5ceae8
Merge pull request #11 from LopanovCo/archive
LdDl Sep 6, 2024
d756b23
err on hls/mp4 cast
LdDl Sep 6, 2024
143e107
another arch enabled
LdDl Sep 6, 2024
a0c440c
arrange config a bit
LdDl Sep 6, 2024
be112de
Merge pull request #12 from LopanovCo/mutexes
LdDl Sep 6, 2024
474b2a9
trim
LdDl Sep 6, 2024
48b46e9
merge
LdDl Sep 6, 2024
4a2cf17
add log for http api
LdDl Sep 6, 2024
ec20044
Merge pull request #13 from LopanovCo/log
LdDl Sep 6, 2024
a7f668a
Archive layout name + Extend first segment duration (#14)
LdDl Sep 6, 2024
ddb273f
Minio S3 integration (#15)
LdDl Sep 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
cmd/video_server/video_server
cmd/video_server/video_server.exe
cmd/video_server/hls/*
cmd/video_server/mp4/*
example_client/mse_example/node_modules
example_client/mse_example/.nuxt
example_client/mse_example/package-lock.json
Expand All @@ -13,4 +14,4 @@ build.sh
cmd/video_server/linux-video_server.zip
cmd/video_server/windows-video_server.zip
cmd/video_server/conf_token.json
video-server-ui
video-server-ui
64 changes: 64 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,70 @@ App running at:
```
Paste link to the browser and check if video loaded successfully.

## Archive

You can configure application to write MP4 chunks of custom duration (but not less than first keyframe duration) to the filesystem or [S3 MinIO](https://min.io/)

- For storing archive to the filesystem. Point default directory for storing MP4 files and duration:
```json
"archive": {
"enabled": true,
"directory": "./mp4",
"ms_per_file": 30000
}
```
For each stream configuration you can override default directory and duration. Field "type" should have value "filesystem":
```json
{
///
// Some other single stream props...
///
"archive": {
"enabled": true,
"ms_per_file": 20000,
"type": "filesystem",
"directory": "custom_folder"
}
}
```
- For storing archive to the S3 MinIO:
Modify configuration file to have both filesystem and minio configuration (filesystem will be picked for storing temporary files before moving it to the MinIO), e.g.:
```json
"archive": {
"directory": "./mp4",
"ms_per_file": 30000,
"minio_settings": {
"host": "localhost",
"port": 29199,
"user": "minio_secret_login",
"password": "minio_secret_password",
"default_bucket": "archive_bucket",
"default_path": "/var/archive_data"
}
}
```
For each stream configuration you can override default directory for temporary files, MinIO bucket and path in it and chunk duration. Field "type" should have value "minio":
```json
{
///
// Some other single stream props...
///
"archive": {
"enabled": true,
"ms_per_file": 20000,
"type": "filesystem",
"directory": "custom_folder",
"type": "minio",
"minio_bucket": "vod-bucket",
"minio_path": "/var/archive_data_custom"
}
}
```

- If you want disable archive for specified stream, just set value of the field `enabled` to `false` in streams array. For disabling archive at all you can do the same but in the main configuration (where default values are set)

- To install MinIO (in case if you want to store archive in S3) you can use [./docker-compose.yaml](docker-compose file) or [./scripts/minio-ansible.yml](Ansible script) for example of deployment workflows

## Dependencies
GIN web-framework - [https://github.com/gin-gonic/gin](https://github.com/gin-gonic/gin). License is [MIT](https://github.com/gin-gonic/gin/blob/master/LICENSE)

Expand Down
197 changes: 112 additions & 85 deletions application.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package videoserver

import (
"log"
"strings"
"fmt"

"github.com/LdDl/video-server/configuration"
"github.com/LdDl/video-server/storage"
"github.com/gin-contrib/cors"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/pkg/errors"

"github.com/deepch/vdk/av"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
)

// Application is a configuration parameters for application
Expand All @@ -18,14 +22,16 @@ type Application struct {
Streams StreamsStorage `json:"streams"`
HLS HLSInfo `json:"hls"`
CorsConfig *cors.Config `json:"-"`
minioClient *minio.Client
}

// APIConfiguration is just copy of configuration.APIConfiguration but with some not exported fields
type APIConfiguration struct {
Enabled bool `json:"-"`
Host string `json:"host"`
Port int32 `json:"port"`
Mode string `json:"-"`
Enabled bool `json:"-"`
Host string `json:"host"`
Port int32 `json:"port"`
Mode string `json:"-"`
Verbose VerboseLevel `json:"-"`
}

// VideoConfiguration is just copy of configuration.VideoConfiguration but with some not exported fields
Expand Down Expand Up @@ -58,6 +64,7 @@ func NewApplication(cfg *configuration.Configuration) (*Application, error) {
Host: cfg.APICfg.Host,
Port: cfg.APICfg.Port,
Mode: cfg.APICfg.Mode,
Verbose: NewVerboseLevelFrom(cfg.APICfg.Verbose),
},
VideoServerCfg: VideoConfiguration{
Host: cfg.VideoServerCfg.Host,
Expand All @@ -74,19 +81,76 @@ func NewApplication(cfg *configuration.Configuration) (*Application, error) {
if cfg.CorsConfig.Enabled {
tmp.setCors(cfg.CorsConfig)
}
minioEnabled := false
for _, rtspStream := range cfg.RTSPStreams {
validUUID, err := uuid.Parse(rtspStream.GUID)
if err != nil {
log.Printf("Not valid UUID: %s\n", rtspStream.GUID)
log.Error().Err(err).Str("scope", "configuration").Str("stream_id", rtspStream.GUID).Msg("Not valid UUID")
continue
}
tmp.Streams.Streams[validUUID] = NewStreamConfiguration(rtspStream.URL, rtspStream.StreamTypes)
verbose := strings.ToLower(rtspStream.Verbose)
if verbose == "v" {
tmp.Streams.Streams[validUUID].verbose = true
} else if verbose == "vvv" {
tmp.Streams.Streams[validUUID].verbose = true
tmp.Streams.Streams[validUUID].verboseDetailed = true
outputTypes := make([]StreamType, 0, len(rtspStream.OutputTypes))
for _, v := range rtspStream.OutputTypes {
typ, ok := streamTypeExists(v)
if !ok {
return nil, errors.Wrapf(ErrStreamTypeNotExists, "Type: '%s'", v)
}
if _, ok := supportedOutputStreamTypes[typ]; !ok {
return nil, errors.Wrapf(ErrStreamTypeNotSupported, "Type: '%s'", v)
}
outputTypes = append(outputTypes, typ)
}

tmp.Streams.Streams[validUUID] = NewStreamConfiguration(rtspStream.URL, outputTypes)
tmp.Streams.Streams[validUUID].verboseLevel = NewVerboseLevelFrom(rtspStream.Verbose)
if rtspStream.Archive.Enabled && cfg.ArchiveCfg.Enabled {
if rtspStream.Archive.MsPerSegment == 0 {
return nil, fmt.Errorf("bad ms per segment archive stream")
}
storageType := storage.NewStorageTypeFrom(rtspStream.Archive.TypeArchive)
var archiveStorage streamArhive
switch storageType {
case storage.STORAGE_FILESYSTEM:
fsStorage, err := storage.NewFileSystemProvider(rtspStream.Archive.Directory)
if err != nil {
return nil, errors.Wrap(err, "Can't create filesystem provider")
}
archiveStorage = streamArhive{
store: fsStorage,
dir: rtspStream.Archive.Directory,
bucket: rtspStream.Archive.Directory,
bucketPath: rtspStream.Archive.Directory,
msPerSegment: rtspStream.Archive.MsPerSegment,
}
case storage.STORAGE_MINIO:
if !minioEnabled {
client, err := minio.New(fmt.Sprintf("%s:%d", cfg.ArchiveCfg.Minio.Host, cfg.ArchiveCfg.Minio.Port), &minio.Options{
Creds: credentials.NewStaticV4(cfg.ArchiveCfg.Minio.User, cfg.ArchiveCfg.Minio.Password, ""),
Secure: false,
})
if err != nil {
return nil, errors.Wrap(err, "Can't connect to MinIO instance")
}
tmp.minioClient = client
minioEnabled = true
}
minioStorage, err := storage.NewMinioProvider(tmp.minioClient, rtspStream.Archive.MinioBucket, rtspStream.Archive.MinioPath)
if err != nil {
return nil, errors.Wrap(err, "Can't create MinIO provider")
}
archiveStorage = streamArhive{
store: minioStorage,
dir: rtspStream.Archive.Directory,
bucket: rtspStream.Archive.MinioBucket,
bucketPath: rtspStream.Archive.MinioPath,
msPerSegment: rtspStream.Archive.MsPerSegment,
}
default:
return nil, fmt.Errorf("unsupported archive type")
}
err = tmp.SetStreamArchive(validUUID, &archiveStorage)
if err != nil {
return nil, errors.Wrap(err, "can't set archive for given stream")
}
}
}
return &tmp, nil
Expand All @@ -106,105 +170,68 @@ func (app *Application) setCors(cfg configuration.CORSConfiguration) {
app.CorsConfig.AllowCredentials = cfg.AllowCredentials
}

func (app *Application) cast(streamID uuid.UUID, pck av.Packet, hlsEnabled bool) error {
app.Streams.Lock()
defer app.Streams.Unlock()
curStream, ok := app.Streams.Streams[streamID]
if !ok {
return ErrStreamNotFound
}
if hlsEnabled {
curStream.hlsChanel <- pck
}
for _, v := range curStream.Clients {
if len(v.c) < cap(v.c) {
v.c <- pck
}
}
return nil
func (app *Application) cast(streamID uuid.UUID, pck av.Packet, hlsEnabled, archiveEnabled bool) error {
return app.Streams.cast(streamID, pck, hlsEnabled, archiveEnabled)
}
func (app *Application) exists(streamID uuid.UUID) bool {
app.Streams.Lock()
defer app.Streams.Unlock()
_, ok := app.Streams.Streams[streamID]
return ok

func (app *Application) streamExists(streamID uuid.UUID) bool {
return app.Streams.streamExists(streamID)
}

func (app *Application) existsWithType(streamID uuid.UUID, streamType string) bool {
app.Streams.Lock()
defer app.Streams.Unlock()
stream, ok := app.Streams.Streams[streamID]
if !ok {
return false
}
supportedTypes := stream.SupportedStreamTypes
typeEnabled := typeExists(streamType, supportedTypes)
return ok && typeEnabled
func (app *Application) existsWithType(streamID uuid.UUID, streamType StreamType) bool {
return app.Streams.existsWithType(streamID, streamType)
}

func (app *Application) addCodec(streamID uuid.UUID, codecs []av.CodecData) {
app.Streams.Lock()
defer app.Streams.Unlock()
app.Streams.Streams[streamID].Codecs = codecs
app.Streams.addCodec(streamID, codecs)
}

func (app *Application) getCodec(streamID uuid.UUID) ([]av.CodecData, error) {
app.Streams.Lock()
defer app.Streams.Unlock()
curStream, ok := app.Streams.Streams[streamID]
if !ok {
return nil, ErrStreamNotFound
}
return curStream.Codecs, nil
return app.Streams.getCodec(streamID)
}

func (app *Application) updateStreamStatus(streamID uuid.UUID, status bool) error {
return app.Streams.updateStreamStatus(streamID, status)
}

func (app *Application) addClient(streamID uuid.UUID) (uuid.UUID, chan av.Packet, error) {
return app.Streams.addClient(streamID)
}

func (app *Application) clientDelete(streamID, clientID uuid.UUID) {
app.Streams.deleteClient(streamID, clientID)
}

func (app *Application) startHlsCast(streamID uuid.UUID, stopCast chan bool) error {
app.Streams.Lock()
defer app.Streams.Unlock()
t, ok := app.Streams.Streams[streamID]
stream, ok := app.Streams.Streams[streamID]
if !ok {
return ErrStreamNotFound
}
t.Status = status
app.Streams.Streams[streamID] = t
go app.startHls(streamID, stream.hlsChanel, stopCast)
return nil
}

func (app *Application) clientAdd(streamID uuid.UUID) (uuid.UUID, chan av.Packet, error) {
func (app *Application) startMP4Cast(streamID uuid.UUID, stopCast chan bool) error {
app.Streams.Lock()
defer app.Streams.Unlock()
clientID, err := uuid.NewUUID()
if err != nil {
return uuid.UUID{}, nil, err
}
ch := make(chan av.Packet, 100)
curStream, ok := app.Streams.Streams[streamID]
stream, ok := app.Streams.Streams[streamID]
if !ok {
return uuid.UUID{}, nil, ErrStreamNotFound
return ErrStreamNotFound
}
curStream.Clients[clientID] = viewer{c: ch}
return clientID, ch, nil
go app.startMP4(streamID, stream.mp4Chanel, stopCast)
return nil
}

func (app *Application) clientDelete(streamID, clientID uuid.UUID) {
defer app.Streams.Unlock()
app.Streams.Lock()
delete(app.Streams.Streams[streamID].Clients, clientID)
func (app *Application) getStreamsIDs() []uuid.UUID {
return app.Streams.getKeys()
}

func (app *Application) startHlsCast(streamID uuid.UUID, stopCast chan bool) {
defer app.Streams.Unlock()
app.Streams.Lock()
go app.startHls(streamID, app.Streams.Streams[streamID].hlsChanel, stopCast)
func (app *Application) SetStreamArchive(streamID uuid.UUID, archiveStorage *streamArhive) error {
return app.Streams.setArchiveStream(streamID, archiveStorage)
}

func (app *Application) getStreamsIDs() []uuid.UUID {
return app.Streams.getKeys()
// defer app.Streams.Unlock()
// app.Streams.Lock()
// res := make([]uuid.UUID, 0, len(app.Streams.Streams))
// for k := range app.Streams.Streams {
// res = append(res, k)
// }
// return res
func (app *Application) getStreamArchive(streamID uuid.UUID) *streamArhive {
return app.Streams.getArchiveStream(streamID)
}
Loading