From f5b4291b4690f45143b3b28574968ab878ba752b Mon Sep 17 00:00:00 2001 From: devYuMinKim Date: Tue, 23 Apr 2024 10:25:44 +0900 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat=20:=20Implement=20screen=20sha?= =?UTF-8?q?ring=20functionality=20via=20WebRTC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit introduces the screen sharing feature using the Pion WebRTC library. The implementation includes the setup of ICE servers, session management, and local track handling for broadcasting the screen. The code also integrates with existing server infrastructure, ensuring compatibility with the current signaling system. Related issue: #80 --- controllers/live_class_controller.go | 255 ++++++------ main.go | 354 +++++++++++++++- services/live_class_service.go | 587 ++++++++++++++------------- 3 files changed, 756 insertions(+), 440 deletions(-) diff --git a/controllers/live_class_controller.go b/controllers/live_class_controller.go index 916bb5e..a28c690 100644 --- a/controllers/live_class_controller.go +++ b/controllers/live_class_controller.go @@ -1,129 +1,130 @@ package controllers -import ( - "github.com/YJU-OKURA/project_minori-gin-deployment-repo/services" - "github.com/gin-gonic/gin" - "net/http" - "strconv" -) - -type LiveClassController struct { - Service services.LiveClassService -} - -func NewLiveClassController(service services.LiveClassService) *LiveClassController { - return &LiveClassController{ - Service: service, - } -} - -// CreateRoom godoc -// @Summary 新しいルームを作成します。 -// @Description 新しいルームを作成します。 -// @Tags Live Class -// @Accept json -// @Produce json -// @Param classID path uint true "Class ID" -// @Param userID path uint true "User ID" -// @Success 200 {object} map[string]interface{} "roomID returned on successful creation" -// @Failure 400 {object} map[string]interface{} "Invalid class ID" -// @Failure 401 {object} map[string]interface{} "Unauthorized to create room" -// @Failure 500 {object} map[string]interface{} "Internal server error" -// @Router /live/create-room/{classID}/{userID} [post] -func (c *LiveClassController) CreateRoom(ctx *gin.Context) { - userID, _ := strconv.ParseUint(ctx.Param("userID"), 10, 32) - classID, err := strconv.ParseUint(ctx.Param("classID"), 10, 32) - if err != nil { - ctx.JSON(http.StatusBadRequest, gin.H{"error": "invalid classID"}) - return - } - - roomID, err := c.Service.CreateRoom(uint(classID), uint(userID)) - if err != nil { - ctx.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) - return - } - ctx.JSON(http.StatusOK, gin.H{"roomID": roomID}) -} - -// StartScreenShare godoc -// @Summary 画面共有を開始します。 -// @Description 画面共有を開始します。 -// @Tags Live Class -// @Accept json -// @Produce json -// @Param roomID path string true "Room ID" -// @Param userID path uint true "User ID" -// @Success 200 {object} map[string]interface{} "SDP data for the screen share" -// @Failure 400 {object} map[string]interface{} "Invalid room ID" -// @Failure 401 {object} map[string]interface{} "Unauthorized to start screen sharing" -// @Failure 500 {object} map[string]interface{} "Internal server error" -// @Router /live/start-screen-share/{roomID}/{userID} [post] -func (c *LiveClassController) StartScreenShare(ctx *gin.Context) { - roomID := ctx.Param("roomID") - userID := ctx.GetString("userID") - err := c.Service.StartScreenShare(roomID, userID) - if err != nil { - ctx.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) - return - } - sdp, err := c.Service.GetScreenShareSDP(roomID) - if err != nil { - ctx.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) - return - } - ctx.JSON(http.StatusOK, gin.H{"sdp": sdp}) -} - -// StopScreenShare godoc -// @Summary 画面共有を停止します。 -// @Description 画面共有を停止します。 -// @Tags Live Class -// @Accept json -// @Produce json -// @Param roomID path string true "Room ID" -// @Param userID path uint true "User ID" -// @Success 200 {object} map[string]interface{} "Screen sharing stopped successfully" -// @Failure 400 {object} map[string]interface{} "Invalid room ID" -// @Failure 401 {object} map[string]interface{} "Unauthorized to stop screen sharing" -// @Failure 500 {object} map[string]interface{} "Internal server error" -// @Router /live/stop-screen-share/{roomID}/{userID} [post] -func (c *LiveClassController) StopScreenShare(ctx *gin.Context) { - roomID := ctx.Param("roomID") - adminID := ctx.GetString("adminID") - err := c.Service.StopScreenShare(roomID, adminID) - if err != nil { - ctx.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) - return - } - ctx.JSON(http.StatusOK, gin.H{"message": "Screen sharing stopped"}) -} - -// JoinScreenShare godoc -// @Summary 画面共有に参加します。 -// @Description 画面共有に参加します。 -// @Tags Live Class -// @Accept json -// @Produce json -// @Param roomID path string true "Room ID" -// @Param userID path uint true "User ID" -// @Success 200 {object} map[string]interface{} "SDP data for the screen share" -// @Failure 400 {object} map[string]interface{} "Invalid room ID or User ID" -// @Failure 401 {object} map[string]interface{} "Unauthorized to join screen sharing" -// @Failure 500 {object} map[string]interface{} "Internal server error" -// @Router /live/join-screen-share/{roomID}/{userID} [get] -func (c *LiveClassController) JoinScreenShare(ctx *gin.Context) { - roomID := ctx.Param("roomID") - userID, err := strconv.ParseUint(ctx.Param("userID"), 10, 32) - if err != nil { - ctx.JSON(http.StatusBadRequest, gin.H{"error": "invalid userID"}) - return - } - - offer, err := c.Service.JoinScreenShare(roomID, uint(userID)) - if err != nil { - ctx.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) - return - } - ctx.JSON(http.StatusOK, gin.H{"offer": offer}) -} +// +//import ( +// "github.com/YJU-OKURA/project_minori-gin-deployment-repo/services" +// "github.com/gin-gonic/gin" +// "net/http" +// "strconv" +//) +// +//type LiveClassController struct { +// Service services.LiveClassService +//} +// +//func NewLiveClassController(service services.LiveClassService) *LiveClassController { +// return &LiveClassController{ +// Service: service, +// } +//} +// +//// CreateRoom godoc +//// @Summary 新しいルームを作成します。 +//// @Description 新しいルームを作成します。 +//// @Tags Live Class +//// @Accept json +//// @Produce json +//// @Param classID path uint true "Class ID" +//// @Param userID path uint true "User ID" +//// @Success 200 {object} map[string]interface{} "roomID returned on successful creation" +//// @Failure 400 {object} map[string]interface{} "Invalid class ID" +//// @Failure 401 {object} map[string]interface{} "Unauthorized to create room" +//// @Failure 500 {object} map[string]interface{} "Internal server error" +//// @Router /live/create-room/{classID}/{userID} [post] +//func (c *LiveClassController) CreateRoom(ctx *gin.Context) { +// userID, _ := strconv.ParseUint(ctx.Param("userID"), 10, 32) +// classID, err := strconv.ParseUint(ctx.Param("classID"), 10, 32) +// if err != nil { +// ctx.JSON(http.StatusBadRequest, gin.H{"error": "invalid classID"}) +// return +// } +// +// roomID, err := c.Service.CreateRoom(uint(classID), uint(userID)) +// if err != nil { +// ctx.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) +// return +// } +// ctx.JSON(http.StatusOK, gin.H{"roomID": roomID}) +//} +// +//// StartScreenShare godoc +//// @Summary 画面共有を開始します。 +//// @Description 画面共有を開始します。 +//// @Tags Live Class +//// @Accept json +//// @Produce json +//// @Param roomID path string true "Room ID" +//// @Param userID path uint true "User ID" +//// @Success 200 {object} map[string]interface{} "SDP data for the screen share" +//// @Failure 400 {object} map[string]interface{} "Invalid room ID" +//// @Failure 401 {object} map[string]interface{} "Unauthorized to start screen sharing" +//// @Failure 500 {object} map[string]interface{} "Internal server error" +//// @Router /live/start-screen-share/{roomID}/{userID} [post] +//func (c *LiveClassController) StartScreenShare(ctx *gin.Context) { +// roomID := ctx.Param("roomID") +// userID := ctx.GetString("userID") +// err := c.Service.StartScreenShare(roomID, userID) +// if err != nil { +// ctx.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) +// return +// } +// sdp, err := c.Service.GetScreenShareSDP(roomID) +// if err != nil { +// ctx.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) +// return +// } +// ctx.JSON(http.StatusOK, gin.H{"sdp": sdp}) +//} +// +//// StopScreenShare godoc +//// @Summary 画面共有を停止します。 +//// @Description 画面共有を停止します。 +//// @Tags Live Class +//// @Accept json +//// @Produce json +//// @Param roomID path string true "Room ID" +//// @Param userID path uint true "User ID" +//// @Success 200 {object} map[string]interface{} "Screen sharing stopped successfully" +//// @Failure 400 {object} map[string]interface{} "Invalid room ID" +//// @Failure 401 {object} map[string]interface{} "Unauthorized to stop screen sharing" +//// @Failure 500 {object} map[string]interface{} "Internal server error" +//// @Router /live/stop-screen-share/{roomID}/{userID} [post] +//func (c *LiveClassController) StopScreenShare(ctx *gin.Context) { +// roomID := ctx.Param("roomID") +// adminID := ctx.GetString("adminID") +// err := c.Service.StopScreenShare(roomID, adminID) +// if err != nil { +// ctx.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) +// return +// } +// ctx.JSON(http.StatusOK, gin.H{"message": "Screen sharing stopped"}) +//} +// +//// JoinScreenShare godoc +//// @Summary 画面共有に参加します。 +//// @Description 画面共有に参加します。 +//// @Tags Live Class +//// @Accept json +//// @Produce json +//// @Param roomID path string true "Room ID" +//// @Param userID path uint true "User ID" +//// @Success 200 {object} map[string]interface{} "SDP data for the screen share" +//// @Failure 400 {object} map[string]interface{} "Invalid room ID or User ID" +//// @Failure 401 {object} map[string]interface{} "Unauthorized to join screen sharing" +//// @Failure 500 {object} map[string]interface{} "Internal server error" +//// @Router /live/join-screen-share/{roomID}/{userID} [get] +//func (c *LiveClassController) JoinScreenShare(ctx *gin.Context) { +// roomID := ctx.Param("roomID") +// userID, err := strconv.ParseUint(ctx.Param("userID"), 10, 32) +// if err != nil { +// ctx.JSON(http.StatusBadRequest, gin.H{"error": "invalid userID"}) +// return +// } +// +// offer, err := c.Service.JoinScreenShare(roomID, uint(userID)) +// if err != nil { +// ctx.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) +// return +// } +// ctx.JSON(http.StatusOK, gin.H{"offer": offer}) +//} diff --git a/main.go b/main.go index de00edf..12e39a2 100644 --- a/main.go +++ b/main.go @@ -1,14 +1,27 @@ package main import ( + "bufio" + "bytes" + "compress/gzip" "context" + "encoding/base64" + "encoding/json" "errors" + "flag" "fmt" "github.com/YJU-OKURA/project_minori-gin-deployment-repo/middlewares" + "github.com/pion/interceptor" + "github.com/pion/interceptor/pkg/intervalpli" + "github.com/pion/webrtc/v4" + "io" "log" "net/http" "os" "os/signal" + "strconv" + "strings" + "syscall" "time" @@ -38,15 +51,306 @@ func main() { db := initializeDatabase() redisClient := initializeRedis() - classUserRepo := repositories.NewClassUserRepository(db) - liveClassService := services.NewLiveClassService(services.NewRoomMap(), classUserRepo) + //classUserRepo := repositories.NewClassUserRepository(db) + //liveClassService := services.NewLiveClassService(services.NewRoomMap(), classUserRepo) jwtService := services.NewJWTService() services.NewRoomManager(redisClient) migrateDatabaseIfNeeded(db) - router := setupRouter(db, liveClassService, jwtService) + //router := setupRouter(db, liveClassService, jwtService) + router := setupRouter(db, jwtService) startServer(router) + + port := flag.Int("port", 8080, "http server port") + flag.Parse() + + sdpChan := HTTPSDPServer(*port) + + // Everything below is the Pion WebRTC API, thanks for using it ❤️. + offer := webrtc.SessionDescription{} + Decode(<-sdpChan, &offer) + fmt.Println("") + + peerConnectionConfig := webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{ + { + URLs: []string{"stun:stun.l.google.com:19302"}, + }, + }, + } + + m := &webrtc.MediaEngine{} + if err := m.RegisterDefaultCodecs(); err != nil { + panic(err) + } + + // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline. + // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection` + // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry + // for each PeerConnection. + i := &interceptor.Registry{} + + // Use the default set of Interceptors + if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil { + panic(err) + } + + // Register a intervalpli factory + // This interceptor sends a PLI every 3 seconds. A PLI causes a video keyframe to be generated by the sender. + // This makes our video seekable and more error resilent, but at a cost of lower picture quality and higher bitrates + // A real world application should process incoming RTCP packets from viewers and forward them to senders + intervalPliFactory, err := intervalpli.NewReceiverInterceptor() + if err != nil { + panic(err) + } + i.Add(intervalPliFactory) + + // Create a new RTCPeerConnection + peerConnection, err := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i)).NewPeerConnection(peerConnectionConfig) + if err != nil { + panic(err) + } + defer func() { + if cErr := peerConnection.Close(); cErr != nil { + fmt.Printf("cannot close peerConnection: %v\n", cErr) + } + }() + + // Allow us to receive 1 video track + if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo); err != nil { + panic(err) + } + + localTrackChan := make(chan *webrtc.TrackLocalStaticRTP) + // Set a handler for when a new remote track starts, this just distributes all our packets + // to connected peers + peerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { //nolint: revive + // Create a local track, all our SFU clients will be fed via this track + localTrack, newTrackErr := webrtc.NewTrackLocalStaticRTP(remoteTrack.Codec().RTPCodecCapability, "video", "pion") + if newTrackErr != nil { + panic(newTrackErr) + } + localTrackChan <- localTrack + + rtpBuf := make([]byte, 1400) + for { + i, _, readErr := remoteTrack.Read(rtpBuf) + if readErr != nil { + panic(readErr) + } + + // ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet + if _, err = localTrack.Write(rtpBuf[:i]); err != nil && !errors.Is(err, io.ErrClosedPipe) { + panic(err) + } + } + }) + + // Set the remote SessionDescription + err = peerConnection.SetRemoteDescription(offer) + if err != nil { + panic(err) + } + + // Create answer + answer, err := peerConnection.CreateAnswer(nil) + if err != nil { + panic(err) + } + + // Create channel that is blocked until ICE Gathering is complete + gatherComplete := webrtc.GatheringCompletePromise(peerConnection) + + // Sets the LocalDescription, and starts our UDP listeners + err = peerConnection.SetLocalDescription(answer) + if err != nil { + panic(err) + } + + // Block until ICE Gathering is complete, disabling trickle ICE + // we do this because we only can exchange one signaling message + // in a production application you should exchange ICE Candidates via OnICECandidate + <-gatherComplete + + // Get the LocalDescription and take it to base64 so we can paste in browser + fmt.Println(Encode(*peerConnection.LocalDescription())) + + localTrack := <-localTrackChan + for { + fmt.Println("") + fmt.Println("Curl an base64 SDP to start sendonly peer connection") + + recvOnlyOffer := webrtc.SessionDescription{} + Decode(<-sdpChan, &recvOnlyOffer) + + // Create a new PeerConnection + peerConnection, err := webrtc.NewPeerConnection(peerConnectionConfig) + if err != nil { + panic(err) + } + + rtpSender, err := peerConnection.AddTrack(localTrack) + if err != nil { + panic(err) + } + + // Read incoming RTCP packets + // Before these packets are returned they are processed by interceptors. For things + // like NACK this needs to be called. + go func() { + rtcpBuf := make([]byte, 1500) + for { + if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil { + return + } + } + }() + + // Set the remote SessionDescription + err = peerConnection.SetRemoteDescription(recvOnlyOffer) + if err != nil { + panic(err) + } + + // Create answer + answer, err := peerConnection.CreateAnswer(nil) + if err != nil { + panic(err) + } + + // Create channel that is blocked until ICE Gathering is complete + gatherComplete = webrtc.GatheringCompletePromise(peerConnection) + + // Sets the LocalDescription, and starts our UDP listeners + err = peerConnection.SetLocalDescription(answer) + if err != nil { + panic(err) + } + + // Block until ICE Gathering is complete, disabling trickle ICE + // we do this because we only can exchange one signaling message + // in a production application you should exchange ICE Candidates via OnICECandidate + <-gatherComplete + + // Get the LocalDescription and take it to base64 so we can paste in browser + fmt.Println(Encode(*peerConnection.LocalDescription())) + } +} + +// HTTPSDPServer starts a HTTP Server that consumes SDPs +func HTTPSDPServer(port int) chan string { + sdpChan := make(chan string) + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + fmt.Fprintf(w, "done") + sdpChan <- string(body) + }) + + go func() { + // nolint: gosec + err := http.ListenAndServe(":"+strconv.Itoa(port), nil) + if err != nil { + panic(err) + } + }() + + return sdpChan +} + +// Allows compressing offer/answer to bypass terminal input limits. +const compress = false + +// MustReadStdin blocks until input is received from stdin +func MustReadStdin() string { + r := bufio.NewReader(os.Stdin) + + var in string + for { + var err error + in, err = r.ReadString('\n') + if err != io.EOF { + if err != nil { + panic(err) + } + } + in = strings.TrimSpace(in) + if len(in) > 0 { + break + } + } + + fmt.Println("") + + return in +} + +// Encode encodes the input in base64 +// It can optionally zip the input before encoding +func Encode(obj interface{}) string { + b, err := json.Marshal(obj) + if err != nil { + panic(err) + } + + if compress { + b = zip(b) + } + + return base64.StdEncoding.EncodeToString(b) +} + +// Decode decodes the input from base64 +// It can optionally unzip the input after decoding +func Decode(in string, obj interface{}) { + b, err := base64.StdEncoding.DecodeString(in) + if err != nil { + panic(err) + } + + if compress { + b = unzip(b) + } + + err = json.Unmarshal(b, obj) + if err != nil { + panic(err) + } +} + +func zip(in []byte) []byte { + var b bytes.Buffer + gz := gzip.NewWriter(&b) + _, err := gz.Write(in) + if err != nil { + panic(err) + } + err = gz.Flush() + if err != nil { + panic(err) + } + err = gz.Close() + if err != nil { + panic(err) + } + return b.Bytes() +} + +func unzip(in []byte) []byte { + var b bytes.Buffer + _, err := b.Write(in) + if err != nil { + panic(err) + } + r, err := gzip.NewReader(&b) + if err != nil { + panic(err) + } + res, err := io.ReadAll(r) + if err != nil { + panic(err) + } + return res } // configureGinMode Ginのモードを設定する @@ -119,14 +423,17 @@ func migrateDatabaseIfNeeded(db *gorm.DB) { } // setupRouter ルーターをセットアップする -func setupRouter(db *gorm.DB, liveClassService services.LiveClassService, jwtService services.JWTService) *gin.Engine { +// func setupRouter(db *gorm.DB, liveClassService services.LiveClassService, jwtService services.JWTService) *gin.Engine { +func setupRouter(db *gorm.DB, jwtService services.JWTService) *gin.Engine { router := gin.Default() router.Use(globalErrorHandler) router.Use(CORS()) initializeSwagger(router) - userController, classBoardController, classCodeController, classScheduleController, classUserController, attendanceController, classUserService, googleAuthController, createClassController, chatController, liveClassController := initializeControllers(db, redisClient, liveClassService) + //userController, classBoardController, classCodeController, classScheduleController, classUserController, attendanceController, classUserService, googleAuthController, createClassController, chatController, liveClassController := initializeControllers(db, redisClient, liveClassService) + userController, classBoardController, classCodeController, classScheduleController, classUserController, attendanceController, classUserService, googleAuthController, createClassController, chatController := initializeControllers(db, redisClient) - setupRoutes(router, userController, classBoardController, classCodeController, classScheduleController, classUserController, attendanceController, classUserService, googleAuthController, createClassController, chatController, liveClassController, jwtService) + //setupRoutes(router, userController, classBoardController, classCodeController, classScheduleController, classUserController, attendanceController, classUserService, googleAuthController, createClassController, chatController, liveClassController, jwtService) + setupRoutes(router, userController, classBoardController, classCodeController, classScheduleController, classUserController, attendanceController, classUserService, googleAuthController, createClassController, chatController, jwtService) return router } @@ -194,7 +501,8 @@ func startServer(router *gin.Engine) { } // initializeControllers コントローラーを初期化する -func initializeControllers(db *gorm.DB, redisClient *redis.Client, liveClassService services.LiveClassService) (*controllers.UserController, *controllers.ClassBoardController, *controllers.ClassCodeController, *controllers.ClassScheduleController, *controllers.ClassUserController, *controllers.AttendanceController, services.ClassUserService, *controllers.GoogleAuthController, *controllers.ClassController, *controllers.ChatController, *controllers.LiveClassController) { +// func initializeControllers(db *gorm.DB, redisClient *redis.Client, liveClassService services.LiveClassService) (*controllers.UserController, *controllers.ClassBoardController, *controllers.ClassCodeController, *controllers.ClassScheduleController, *controllers.ClassUserController, *controllers.AttendanceController, services.ClassUserService, *controllers.GoogleAuthController, *controllers.ClassController, *controllers.ChatController, *controllers.LiveClassController) { +func initializeControllers(db *gorm.DB, redisClient *redis.Client) (*controllers.UserController, *controllers.ClassBoardController, *controllers.ClassCodeController, *controllers.ClassScheduleController, *controllers.ClassUserController, *controllers.AttendanceController, services.ClassUserService, *controllers.GoogleAuthController, *controllers.ClassController, *controllers.ChatController) { userRepo := repositories.NewUserRepository(db) classRepo := repositories.NewClassRepository(db) classBoardRepo := repositories.NewClassBoardRepository(db) @@ -216,7 +524,7 @@ func initializeControllers(db *gorm.DB, redisClient *redis.Client, liveClassServ jwtService := services.NewJWTService() chatManager := services.NewRoomManager(redisClient) go manageChatRooms(db, chatManager) - liveClassService = services.NewLiveClassService(services.NewRoomMap(), classUserRepo) + //liveClassService = services.NewLiveClassService(services.NewRoomMap(), classUserRepo) uploader := utils.NewAwsUploader() userController := controllers.NewCreateUserController(userService) @@ -228,13 +536,15 @@ func initializeControllers(db *gorm.DB, redisClient *redis.Client, liveClassServ attendanceController := controllers.NewAttendanceController(attendanceService) googleAuthController := controllers.NewGoogleAuthController(googleAuthService, jwtService) chatController := controllers.NewChatController(chatManager, redisClient) - liveClassController := controllers.NewLiveClassController(liveClassService) + //liveClassController := controllers.NewLiveClassController(liveClassService) - return userController, classBoardController, classCodeController, classScheduleController, classUserController, attendanceController, classUserService, googleAuthController, createClassController, chatController, liveClassController + //return userController, classBoardController, classCodeController, classScheduleController, classUserController, attendanceController, classUserService, googleAuthController, createClassController, chatController, liveClassController + return userController, classBoardController, classCodeController, classScheduleController, classUserController, attendanceController, classUserService, googleAuthController, createClassController, chatController } // setupRoutes ルートをセットアップする -func setupRoutes(router *gin.Engine, userController *controllers.UserController, classBoardController *controllers.ClassBoardController, classCodeController *controllers.ClassCodeController, classScheduleController *controllers.ClassScheduleController, classUserController *controllers.ClassUserController, attendanceController *controllers.AttendanceController, classUserService services.ClassUserService, googleAuthController *controllers.GoogleAuthController, createClassController *controllers.ClassController, chatController *controllers.ChatController, liveClassController *controllers.LiveClassController, jwtService services.JWTService) { +// func setupRoutes(router *gin.Engine, userController *controllers.UserController, classBoardController *controllers.ClassBoardController, classCodeController *controllers.ClassCodeController, classScheduleController *controllers.ClassScheduleController, classUserController *controllers.ClassUserController, attendanceController *controllers.AttendanceController, classUserService services.ClassUserService, googleAuthController *controllers.GoogleAuthController, createClassController *controllers.ClassController, chatController *controllers.ChatController, liveClassController *controllers.LiveClassController, jwtService services.JWTService) { +func setupRoutes(router *gin.Engine, userController *controllers.UserController, classBoardController *controllers.ClassBoardController, classCodeController *controllers.ClassCodeController, classScheduleController *controllers.ClassScheduleController, classUserController *controllers.ClassUserController, attendanceController *controllers.AttendanceController, classUserService services.ClassUserService, googleAuthController *controllers.GoogleAuthController, createClassController *controllers.ClassController, chatController *controllers.ChatController, jwtService services.JWTService) { setupUserRoutes(router, userController, jwtService) setupClassBoardRoutes(router, classBoardController, classUserService, jwtService) setupClassCodeRoutes(router, classCodeController, jwtService) @@ -244,7 +554,7 @@ func setupRoutes(router *gin.Engine, userController *controllers.UserController, setupGoogleAuthRoutes(router, googleAuthController) setupCreateClassRoutes(router, createClassController, jwtService) setupChatRoutes(router, chatController, jwtService) - setupLiveClassRoutes(router, liveClassController, jwtService) + //setupLiveClassRoutes(router, liveClassController, jwtService) } // @securityDefinitions.apikey Bearer @@ -468,13 +778,13 @@ func manageChatRooms(db *gorm.DB, chatManager *services.Manager) { // @in header // @name Authorization // @description Type "Bearer" followed by a space and JWT token. -func setupLiveClassRoutes(router *gin.Engine, liveClassController *controllers.LiveClassController, jwtService services.JWTService) { - live := router.Group("/api/gin/live") - live.Use(middlewares.TokenAuthMiddleware(jwtService)) - { - live.POST("create-room/:classID/:userID", liveClassController.CreateRoom) - live.POST("start-screen-share/:roomID/:userID", liveClassController.StartScreenShare) - live.POST("stop-screen-share/:roomID/:userID", liveClassController.StopScreenShare) - live.GET("join-screen-share/:roomID/:userID", liveClassController.JoinScreenShare) - } -} +//func setupLiveClassRoutes(router *gin.Engine, liveClassController *controllers.LiveClassController, jwtService services.JWTService) { +// live := router.Group("/api/gin/live") +// live.Use(middlewares.TokenAuthMiddleware(jwtService)) +// { +// live.POST("create-room/:classID/:userID", liveClassController.CreateRoom) +// live.POST("start-screen-share/:roomID/:userID", liveClassController.StartScreenShare) +// live.POST("stop-screen-share/:roomID/:userID", liveClassController.StopScreenShare) +// live.GET("join-screen-share/:roomID/:userID", liveClassController.JoinScreenShare) +// } +//} diff --git a/services/live_class_service.go b/services/live_class_service.go index ed26c51..b541482 100644 --- a/services/live_class_service.go +++ b/services/live_class_service.go @@ -1,293 +1,298 @@ package services -import ( - "errors" - "fmt" - "github.com/YJU-OKURA/project_minori-gin-deployment-repo/repositories" - "log" - "sync" - "time" - - "github.com/google/uuid" - "github.com/gorilla/websocket" - "github.com/pion/webrtc/v4" -) - -var webrtcConfig = webrtc.Configuration{ - ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}}, -} - -type Room struct { - ID string - Members map[string]*websocket.Conn - PeerConnection *webrtc.PeerConnection - IsSharing bool - ClassID uint -} - -type RoomMap struct { - rooms map[string]*Room - mu sync.RWMutex -} - -func NewRoomMap() *RoomMap { - return &RoomMap{ - rooms: make(map[string]*Room), - } -} - -type LiveClassService interface { - CreateRoom(classID uint, adminID uint) (string, error) - StartScreenShare(roomID string, userID string) error - StopScreenShare(roomID string, adminID string) error - GetScreenShareSDP(roomID string) (string, error) - JoinScreenShare(roomID string, userID uint) (string, error) - IsUserInClass(userID uint, classID uint) (bool, error) -} - -type liveClassServiceImpl struct { - roomMap *RoomMap - config webrtc.Configuration - classUserRepo repositories.ClassUserRepository -} - -func NewLiveClassService(roomMap *RoomMap, classUserRepo repositories.ClassUserRepository) LiveClassService { - return &liveClassServiceImpl{ - roomMap: roomMap, - config: webrtcConfig, - classUserRepo: classUserRepo, - } -} - -func (s *liveClassServiceImpl) setupPeerConnection() (*webrtc.PeerConnection, error) { - m := &webrtc.MediaEngine{} - // Register codecs - if err := m.RegisterDefaultCodecs(); err != nil { - return nil, err - } - - // Create a new API with a MediaEngine containing the default codecs - api := webrtc.NewAPI(webrtc.WithMediaEngine(m)) - - // Create a new PeerConnection with the configuration - pc, err := api.NewPeerConnection(s.config) - if err != nil { - return nil, fmt.Errorf("failed to create peer connection: %v", err) - } - - return pc, nil -} - -func (s *liveClassServiceImpl) CreateRoom(classID uint, userID uint) (string, error) { - isAdmin, err := s.classUserRepo.IsAdmin(userID, classID) - if err != nil { - return "", err - } - if !isAdmin { - return "", errors.New("unauthorized: only admins can create rooms") - } - - s.roomMap.mu.Lock() - defer s.roomMap.mu.Unlock() - - roomID := uuid.NewString() - s.roomMap.rooms[roomID] = &Room{ - ID: roomID, - ClassID: classID, - Members: make(map[string]*websocket.Conn), - IsSharing: false, - } - return roomID, nil -} - -func (s *liveClassServiceImpl) StartScreenShare(roomID string, userID string) error { - s.roomMap.mu.Lock() - room, exists := s.roomMap.rooms[roomID] - s.roomMap.mu.Unlock() - - if !exists { - log.Println("Attempt to access non-existent room:", roomID) - return fmt.Errorf("room not found") - } - - pc, err := s.setupPeerConnection() - if err != nil { - log.Println("Error setting up peer connection:", err) - return err - } - - // Setup track - track, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: "video/vp8"}, "video", "pion") - if err != nil { - pc.Close() - log.Println("Failed to create track:", err) - return err - } - - if _, err = pc.AddTrack(track); err != nil { - pc.Close() - log.Println("Failed to add track:", err) - return err - } - - // Ensure peer connection is fully established before continuing - offer, err := pc.CreateOffer(nil) - if err != nil { - pc.Close() - return fmt.Errorf("failed to create offer: %v", err) - } - - if err = pc.SetLocalDescription(offer); err != nil { - pc.Close() - return fmt.Errorf("failed to set local description: %v", err) - } - - // Handling ICE gathering - done := make(chan bool) - go awaitIceGatheringComplete(pc, done) - - if success := <-done; !success { - pc.Close() - return fmt.Errorf("failed to gather ICE candidates") - } - - // Setup connection state monitoring to ensure the connection is ready - connectionReady := make(chan bool) - pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { - if state == webrtc.PeerConnectionStateConnected { - close(connectionReady) - } - }) - - select { - case <-connectionReady: - log.Println("Connection is fully established.") - case <-time.After(30 * time.Second): // Wait for up to 30 seconds - pc.Close() - return fmt.Errorf("peer connection setup timed out") - } - - room.PeerConnection = pc - room.IsSharing = true - log.Println("Started screen sharing in room:", roomID) - return nil -} - -func (s *liveClassServiceImpl) StopScreenShare(roomID string, adminID string) error { - s.roomMap.mu.Lock() - room, exists := s.roomMap.rooms[roomID] - s.roomMap.mu.Unlock() - - if !exists { - return fmt.Errorf("room not found") - } - - if room.PeerConnection != nil { - err := room.PeerConnection.Close() - if err != nil { - return err - } - room.PeerConnection = nil - room.IsSharing = false - } - return nil -} - -func (s *liveClassServiceImpl) GetScreenShareSDP(roomID string) (string, error) { - s.roomMap.mu.RLock() - room, exists := s.roomMap.rooms[roomID] - s.roomMap.mu.RUnlock() - - if !exists || !room.IsSharing || room.PeerConnection == nil || room.PeerConnection.CurrentRemoteDescription() == nil { - log.Println("Attempt to fetch SDP from inactive session:", roomID) - return "", errors.New("screen share not active or connection closed") - } - return room.PeerConnection.LocalDescription().SDP, nil -} - -func (s *liveClassServiceImpl) JoinScreenShare(roomID string, userID uint) (string, error) { - s.roomMap.mu.RLock() - room, exists := s.roomMap.rooms[roomID] - s.roomMap.mu.RUnlock() - - if !exists || !room.IsSharing { - return "", fmt.Errorf("room not found or no active sharing") - } - - // Check if user is in the class associated with the room - classID, err := s.getClassIDFromRoomID(roomID) // Implement this method based on your app's logic - if err != nil { - return "", err - } - - isMember, err := s.classUserRepo.IsMember(userID, classID) - if err != nil || !isMember { - return "", fmt.Errorf("user is not a member of the class") - } - - // Assume a simplified scenario where the viewer also sets up a peer connection - pc, err := s.setupPeerConnection() - if err != nil { - return "", err - } - - // Create an offer to send to the admin - offer, err := pc.CreateOffer(nil) - if err != nil { - pc.Close() - return "", fmt.Errorf("failed to create offer: %v", err) - } - - err = pc.SetLocalDescription(offer) - if err != nil { - pc.Close() - return "", fmt.Errorf("failed to set local description: %v", err) - } - - return offer.SDP, nil -} - -func (s *liveClassServiceImpl) IsUserInClass(userID uint, classID uint) (bool, error) { - // Use the IsAdmin method from the repository to check if the user is a member. - // Assuming IsAdmin method checks for any user role in the class, not just admin. - // You might need to adjust the logic based on your specific role requirements. - return s.classUserRepo.IsAdmin(userID, classID) -} - -func (s *liveClassServiceImpl) getClassIDFromRoomID(roomID string) (uint, error) { - s.roomMap.mu.RLock() - defer s.roomMap.mu.RUnlock() - - room, exists := s.roomMap.rooms[roomID] - if !exists { - return 0, fmt.Errorf("no room found for ID: %s", roomID) - } - return room.ClassID, nil -} - -// awaitIceGatheringComplete waits for ICE candidates to be gathered before continuing. -func awaitIceGatheringComplete(pc *webrtc.PeerConnection, done chan bool) { - gatherComplete := make(chan struct{}) - - // Register the handler for ICE gathering state change - pc.OnICEGatheringStateChange(func(state webrtc.ICEGatheringState) { - log.Printf("ICE Gathering State has changed to %s\n", state.String()) - if state == webrtc.ICEGatheringStateComplete { - close(gatherComplete) - } - }) - - // Check if already complete before the handler - if pc.ICEGatheringState() == webrtc.ICEGatheringStateComplete { - close(gatherComplete) - } - - select { - case <-gatherComplete: - log.Println("ICE gathering complete") - done <- true - case <-time.After(30 * time.Second): // Consider a longer timeout or make it configurable - log.Println("ICE gathering timed out.") - done <- false - } -} +// +//import ( +// "errors" +// "fmt" +// "github.com/YJU-OKURA/project_minori-gin-deployment-repo/repositories" +// "log" +// "sync" +// "time" +// +// "github.com/google/uuid" +// "github.com/gorilla/websocket" +// "github.com/pion/webrtc/v4" +//) +// +//var webrtcConfig = webrtc.Configuration{ +// ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}}, +//} +// +//type Room struct { +// ID string +// Members map[string]*websocket.Conn +// PeerConnection *webrtc.PeerConnection +// IsSharing bool +// ClassID uint +//} +// +//type RoomMap struct { +// rooms map[string]*Room +// mu sync.RWMutex +//} +// +//func NewRoomMap() *RoomMap { +// return &RoomMap{ +// rooms: make(map[string]*Room), +// } +//} +// +//type LiveClassService interface { +// CreateRoom(classID uint, adminID uint) (string, error) +// StartScreenShare(roomID string, userID string) error +// StopScreenShare(roomID string, adminID string) error +// GetScreenShareSDP(roomID string) (string, error) +// JoinScreenShare(roomID string, userID uint) (string, error) +// IsUserInClass(userID uint, classID uint) (bool, error) +//} +// +//type liveClassServiceImpl struct { +// roomMap *RoomMap +// config webrtc.Configuration +// classUserRepo repositories.ClassUserRepository +//} +// +//func NewLiveClassService(roomMap *RoomMap, classUserRepo repositories.ClassUserRepository) LiveClassService { +// return &liveClassServiceImpl{ +// roomMap: roomMap, +// config: webrtcConfig, +// classUserRepo: classUserRepo, +// } +//} +// +//func (s *liveClassServiceImpl) setupPeerConnection() (*webrtc.PeerConnection, error) { +// m := &webrtc.MediaEngine{} +// // Register codecs +// if err := m.RegisterDefaultCodecs(); err != nil { +// return nil, err +// } +// +// // Create a new API with a MediaEngine containing the default codecs +// api := webrtc.NewAPI(webrtc.WithMediaEngine(m)) +// +// // Create a new PeerConnection with the configuration +// pc, err := api.NewPeerConnection(s.config) +// if err != nil { +// return nil, fmt.Errorf("failed to create peer connection: %v", err) +// } +// +// return pc, nil +//} +// +//func (s *liveClassServiceImpl) CreateRoom(classID uint, userID uint) (string, error) { +// isAdmin, err := s.classUserRepo.IsAdmin(userID, classID) +// if err != nil { +// return "", err +// } +// if !isAdmin { +// return "", errors.New("unauthorized: only admins can create rooms") +// } +// +// s.roomMap.mu.Lock() +// defer s.roomMap.mu.Unlock() +// +// roomID := uuid.NewString() +// s.roomMap.rooms[roomID] = &Room{ +// ID: roomID, +// ClassID: classID, +// Members: make(map[string]*websocket.Conn), +// IsSharing: false, +// } +// return roomID, nil +//} +// +//func (s *liveClassServiceImpl) StartScreenShare(roomID string, userID string) error { +// s.roomMap.mu.Lock() +// room, exists := s.roomMap.rooms[roomID] +// s.roomMap.mu.Unlock() +// +// if !exists { +// log.Println("Attempt to access non-existent room:", roomID) +// return fmt.Errorf("room not found") +// } +// +// pc, err := s.setupPeerConnection() +// if err != nil { +// log.Println("Error setting up peer connection:", err) +// return err +// } +// +// track, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: "video/vp8"}, "video", "pion") +// if err != nil { +// pc.Close() +// log.Println("Failed to create track:", err) +// return err +// } +// +// if _, err = pc.AddTrack(track); err != nil { +// pc.Close() +// log.Println("Failed to add track:", err) +// return err +// } +// +// offer, err := pc.CreateOffer(nil) +// if err != nil { +// pc.Close() +// log.Println("Failed to create offer:", err) +// return err +// } +// +// err = pc.SetLocalDescription(offer) +// if err != nil { +// pc.Close() +// log.Println("Failed to set local description:", err) +// return err +// } +// +// done := make(chan bool) +// go awaitIceGatheringComplete(pc, done) +// if success := <-done; !success { +// pc.Close() +// log.Println("ICE gathering failed to complete in time.") +// return fmt.Errorf("failed to gather ICE candidates") +// } +// +// connectionReady := make(chan bool) +// pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { +// log.Printf("Peer Connection State has changed to: %s\n", state.String()) +// if state == webrtc.PeerConnectionStateConnected { +// close(connectionReady) +// } else if state == webrtc.PeerConnectionStateFailed { +// log.Println("Peer Connection failed.") +// pc.Close() +// } +// }) +// +// select { +// case <-connectionReady: +// log.Println("Connection is fully established.") +// case <-time.After(60 * time.Second): // Increased timeout +// log.Println("Extending the timeout did not help.") +// pc.Close() +// return fmt.Errorf("peer connection setup timed out after extended period") +// } +// +// room.PeerConnection = pc +// room.IsSharing = true +// log.Println("Started screen sharing in room:", roomID) +// return nil +//} +// +//func (s *liveClassServiceImpl) StopScreenShare(roomID string, adminID string) error { +// s.roomMap.mu.Lock() +// room, exists := s.roomMap.rooms[roomID] +// s.roomMap.mu.Unlock() +// +// if !exists { +// return fmt.Errorf("room not found") +// } +// +// if room.PeerConnection != nil { +// err := room.PeerConnection.Close() +// if err != nil { +// return err +// } +// room.PeerConnection = nil +// room.IsSharing = false +// } +// return nil +//} +// +//func (s *liveClassServiceImpl) GetScreenShareSDP(roomID string) (string, error) { +// s.roomMap.mu.RLock() +// room, exists := s.roomMap.rooms[roomID] +// s.roomMap.mu.RUnlock() +// +// if !exists || !room.IsSharing || room.PeerConnection == nil || room.PeerConnection.CurrentRemoteDescription() == nil { +// log.Println("Attempt to fetch SDP from inactive session:", roomID) +// return "", errors.New("screen share not active or connection closed") +// } +// return room.PeerConnection.LocalDescription().SDP, nil +//} +// +//func (s *liveClassServiceImpl) JoinScreenShare(roomID string, userID uint) (string, error) { +// s.roomMap.mu.RLock() +// room, exists := s.roomMap.rooms[roomID] +// s.roomMap.mu.RUnlock() +// +// if !exists || !room.IsSharing { +// return "", fmt.Errorf("room not found or no active sharing") +// } +// +// // Check if user is in the class associated with the room +// classID, err := s.getClassIDFromRoomID(roomID) // Implement this method based on your app's logic +// if err != nil { +// return "", err +// } +// +// isMember, err := s.classUserRepo.IsMember(userID, classID) +// if err != nil || !isMember { +// return "", fmt.Errorf("user is not a member of the class") +// } +// +// // Assume a simplified scenario where the viewer also sets up a peer connection +// pc, err := s.setupPeerConnection() +// if err != nil { +// return "", err +// } +// +// // Create an offer to send to the admin +// offer, err := pc.CreateOffer(nil) +// if err != nil { +// pc.Close() +// return "", fmt.Errorf("failed to create offer: %v", err) +// } +// +// err = pc.SetLocalDescription(offer) +// if err != nil { +// pc.Close() +// return "", fmt.Errorf("failed to set local description: %v", err) +// } +// +// return offer.SDP, nil +//} +// +//func (s *liveClassServiceImpl) IsUserInClass(userID uint, classID uint) (bool, error) { +// // Use the IsAdmin method from the repository to check if the user is a member. +// // Assuming IsAdmin method checks for any user role in the class, not just admin. +// // You might need to adjust the logic based on your specific role requirements. +// return s.classUserRepo.IsAdmin(userID, classID) +//} +// +//func (s *liveClassServiceImpl) getClassIDFromRoomID(roomID string) (uint, error) { +// s.roomMap.mu.RLock() +// defer s.roomMap.mu.RUnlock() +// +// room, exists := s.roomMap.rooms[roomID] +// if !exists { +// return 0, fmt.Errorf("no room found for ID: %s", roomID) +// } +// return room.ClassID, nil +//} +// +//// awaitIceGatheringComplete waits for ICE candidates to be gathered before continuing. +//func awaitIceGatheringComplete(pc *webrtc.PeerConnection, done chan bool) { +// gatherComplete := make(chan struct{}) +// +// // Register the handler for ICE gathering state change +// pc.OnICEGatheringStateChange(func(state webrtc.ICEGatheringState) { +// log.Printf("ICE Gathering State has changed to %s\n", state.String()) +// if state == webrtc.ICEGatheringStateComplete { +// close(gatherComplete) +// } +// }) +// +// // Check if already complete before the handler +// if pc.ICEGatheringState() == webrtc.ICEGatheringStateComplete { +// close(gatherComplete) +// } +// +// select { +// case <-gatherComplete: +// log.Println("ICE gathering complete") +// done <- true +// case <-time.After(30 * time.Second): // Consider a longer timeout or make it configurable +// log.Println("ICE gathering timed out.") +// done <- false +// } +//}