Skip to content

Commit

Permalink
Merge pull request #1619 from pedroSG94/feature/decoder-refactor
Browse files Browse the repository at this point in the history
Feature/decoder refactor
  • Loading branch information
pedroSG94 authored Oct 22, 2024
2 parents 2dd78a0 + f73fb36 commit 56bc753
Show file tree
Hide file tree
Showing 15 changed files with 417 additions and 224 deletions.
44 changes: 5 additions & 39 deletions common/src/main/java/com/pedro/common/socket/StreamSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,12 @@

package com.pedro.common.socket

import io.ktor.network.selector.SelectorManager
import io.ktor.network.sockets.ReadWriteSocket
import io.ktor.network.sockets.isClosed
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import java.net.InetAddress
import java.net.InetSocketAddress

/**
* Created by pedro on 22/9/24.
*/
abstract class StreamSocket(
private val host: String,
private val port: Int
) {

private var selectorManager = SelectorManager(Dispatchers.IO)
protected var socket: ReadWriteSocket? = null
private var address: InetAddress? = null

abstract suspend fun buildSocketConfigAndConnect(selectorManager: SelectorManager): ReadWriteSocket
abstract suspend fun closeResources()

suspend fun connect() {
selectorManager = SelectorManager(Dispatchers.IO)
val socket = buildSocketConfigAndConnect(selectorManager)
address = InetSocketAddress(host, port).address
this.socket = socket
}

suspend fun close() = withContext(Dispatchers.IO) {
try {
address = null
closeResources()
socket?.close()
selectorManager.close()
} catch (ignored: Exception) {}
}

fun isConnected(): Boolean = socket?.isClosed != true

fun isReachable(): Boolean = address?.isReachable(5000) ?: false
interface StreamSocket {
suspend fun connect()
suspend fun close()
fun isConnected(): Boolean
fun isReachable(): Boolean
}
42 changes: 29 additions & 13 deletions common/src/main/java/com/pedro/common/socket/TcpStreamSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,25 @@ import io.ktor.network.selector.SelectorManager
import io.ktor.network.sockets.InetSocketAddress
import io.ktor.network.sockets.ReadWriteSocket
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.isClosed
import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.network.tls.tls
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.ByteWriteChannel
import io.ktor.utils.io.readByte
import io.ktor.utils.io.readFully
import io.ktor.utils.io.readUTF8Line
import io.ktor.utils.io.writeByte
import io.ktor.utils.io.writeFully
import io.ktor.utils.io.writeStringUtf8
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
import java.net.ConnectException
import java.security.SecureRandom
import javax.net.ssl.TrustManager
import java.net.InetAddress

/**
* Created by pedro on 22/9/24.
Expand All @@ -46,46 +50,58 @@ class TcpStreamSocket(
private val port: Int,
private val secured: Boolean = false,
private val certificate: TrustManager? = null
): StreamSocket(host, port) {
): StreamSocket {

private val timeout = 5000L
private var input: ByteReadChannel? = null
private var output: ByteWriteChannel? = null
private var selectorManager = SelectorManager(Dispatchers.IO)
private var socket: ReadWriteSocket? = null
private var address: InetAddress? = null

override suspend fun buildSocketConfigAndConnect(selectorManager: SelectorManager): ReadWriteSocket {
val builder = aSocket(selectorManager).tcp()
override suspend fun connect() {
selectorManager = SelectorManager(Dispatchers.IO)
val builder = aSocket(selectorManager).tcp().connect(remoteAddress = InetSocketAddress(host, port))
val socket = if (secured) {
builder.connect(remoteAddress = InetSocketAddress(host, port)).tls(Dispatchers.Default) {
builder.tls(Dispatchers.Default) {
trustManager = certificate
random = SecureRandom()
}
} else {
builder.connect(host, port)
}
} else builder
input = socket.openReadChannel()
output = socket.openWriteChannel(autoFlush = false)
return socket
address = java.net.InetSocketAddress(host, port).address
this.socket = socket
}

override suspend fun closeResources() {
input = null
output = null
override suspend fun close() = withContext(Dispatchers.IO) {
try {
address = null
input = null
output = null
socket?.close()
selectorManager.close()
} catch (ignored: Exception) {}
}

override fun isConnected(): Boolean = socket?.isClosed != true

override fun isReachable(): Boolean = address?.isReachable(5000) ?: false

suspend fun flush() {
output?.flush()
}

suspend fun write(b: Int) = withTimeout(timeout) {
output?.writeByte(b)
output?.writeByte(b.toByte())
}

suspend fun write(b: ByteArray) = withTimeout(timeout) {
output?.writeFully(b)
}

suspend fun write(b: ByteArray, offset: Int, size: Int) = withTimeout(timeout) {
output?.writeFully(b, offset, size)
output?.writeFully(b, offset, offset + size)
}

suspend fun writeUInt16(b: Int) {
Expand Down
46 changes: 32 additions & 14 deletions common/src/main/java/com/pedro/common/socket/UdpStreamSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,49 +22,67 @@ import io.ktor.network.selector.SelectorManager
import io.ktor.network.sockets.ConnectedDatagramSocket
import io.ktor.network.sockets.Datagram
import io.ktor.network.sockets.InetSocketAddress
import io.ktor.network.sockets.ReadWriteSocket
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.isClosed
import io.ktor.utils.io.core.ByteReadPacket
import io.ktor.utils.io.core.readBytes
import io.ktor.utils.io.core.remaining
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import kotlinx.io.readByteArray
import java.net.ConnectException
import java.net.InetAddress

/**
* Created by pedro on 22/9/24.
*/
class UdpStreamSocket(
host: String,
port: Int,
private val host: String,
private val port: Int,
private val sourcePort: Int? = null,
private val receiveSize: Int? = null,
private val broadcastMode: Boolean = false
): StreamSocket(host, port) {
): StreamSocket {

private val address = InetSocketAddress(host, port)
private val udpSocket by lazy {
socket as ConnectedDatagramSocket
}
private var selectorManager = SelectorManager(Dispatchers.IO)
private var socket: ConnectedDatagramSocket? = null
private var myAddress: InetAddress? = null

override suspend fun buildSocketConfigAndConnect(selectorManager: SelectorManager): ReadWriteSocket {
override suspend fun connect() {
selectorManager = SelectorManager(Dispatchers.IO)
val builder = aSocket(selectorManager).udp()
val localAddress = if (sourcePort == null) null else InetSocketAddress("0.0.0.0", sourcePort)
return builder.connect(
val socket = builder.connect(
remoteAddress = address,
localAddress = localAddress
) {
broadcast = broadcastMode
receiveBufferSize = receiveSize ?: 0
}
myAddress = java.net.InetSocketAddress(host, port).address
this.socket = socket
}

override suspend fun close() = withContext(Dispatchers.IO) {
try {
socket?.close()
selectorManager.close()
} catch (ignored: Exception) {}
}

override suspend fun closeResources() { }
override fun isConnected(): Boolean = socket?.isClosed != true

override fun isReachable(): Boolean = myAddress?.isReachable(5000) ?: false

suspend fun readPacket(): ByteArray {
val packet = udpSocket.receive().packet
val socket = socket ?: throw ConnectException("Read with socket closed, broken pipe")
val packet = socket.receive().packet
val length = packet.remaining.toInt()
return packet.readBytes().sliceArray(0 until length)
return packet.readByteArray().sliceArray(0 until length)
}

suspend fun writePacket(bytes: ByteArray) {
val datagram = Datagram(ByteReadPacket(bytes), address)
udpSocket.send(datagram)
socket?.send(datagram)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
*
* * Copyright (C) 2024 pedroSG94.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package com.pedro.encoder.input.decoder

import android.content.Context
import android.media.MediaExtractor
import android.media.MediaFormat
import android.net.Uri
import com.pedro.common.frame.MediaFrame
import com.pedro.common.getIntegerSafe
import com.pedro.common.getLongSafe
import com.pedro.common.validMessage
import java.io.FileDescriptor
import java.io.IOException
import java.nio.ByteBuffer

/**
* Created by pedro on 18/10/24.
*/
class AndroidExtractor: Extractor {

private var mediaExtractor = MediaExtractor()
private var sleepTime: Long = 0
private var accumulativeTs: Long = 0
@Volatile
private var lastExtractorTs: Long = 0
private var format: MediaFormat? = null

override fun selectTrack(type: MediaFrame.Type): String {
return when (type) {
MediaFrame.Type.VIDEO -> selectTrack("video/")
MediaFrame.Type.AUDIO -> selectTrack("audio/")
}
}

override fun initialize(path: String) {
try {
reset()
mediaExtractor = MediaExtractor()
mediaExtractor.setDataSource(path)
} catch (e: Exception) {
throw IOException(e.validMessage())
}
}

override fun initialize(context: Context, uri: Uri) {
try {
reset()
mediaExtractor = MediaExtractor()
mediaExtractor.setDataSource(context, uri, null)
} catch (e: Exception) {
throw IOException(e.validMessage())
}
}

override fun initialize(fileDescriptor: FileDescriptor) {
try {
reset()
mediaExtractor = MediaExtractor()
mediaExtractor.setDataSource(fileDescriptor)
} catch (e: Exception) {
throw IOException(e.validMessage())
}
}

override fun readFrame(buffer: ByteBuffer): Int {
return mediaExtractor.readSampleData(buffer, 0)
}

override fun advance(): Boolean {
return mediaExtractor.advance()
}

override fun getTimeStamp(): Long {
return mediaExtractor.sampleTime
}

override fun getSleepTime(ts: Long): Long {
val extractorTs = getTimeStamp()
accumulativeTs += extractorTs - lastExtractorTs
lastExtractorTs = getTimeStamp()
sleepTime = if (accumulativeTs > ts) (accumulativeTs - ts) / 1000 else 0
return sleepTime
}

override fun seekTo(time: Long) {
mediaExtractor.seekTo(time, MediaExtractor.SEEK_TO_PREVIOUS_SYNC)
lastExtractorTs = getTimeStamp()
}

override fun release() {
mediaExtractor.release()
}

override fun getVideoInfo(): VideoInfo {
val format = this.format ?: throw IOException("Extractor track not selected")
val width = format.getIntegerSafe(MediaFormat.KEY_WIDTH) ?: throw IOException("Width info is required")
val height = format.getIntegerSafe(MediaFormat.KEY_HEIGHT) ?: throw IOException("Height info is required")
val duration = format.getLongSafe(MediaFormat.KEY_DURATION) ?: throw IOException("Duration info is required")
val fps = format.getIntegerSafe(MediaFormat.KEY_FRAME_RATE) ?: 30
return VideoInfo(width, height, fps, duration)
}

override fun getAudioInfo(): AudioInfo {
val format = this.format ?: throw IOException("Extractor track not selected")
val sampleRate = format.getIntegerSafe(MediaFormat.KEY_SAMPLE_RATE) ?: throw IOException("Channels info is required")
val channels = format.getIntegerSafe(MediaFormat.KEY_CHANNEL_COUNT) ?: throw IOException("SampleRate info is required")
val duration = format.getLongSafe(MediaFormat.KEY_DURATION) ?: throw IOException("Duration info is required")
return AudioInfo(sampleRate, channels, duration)
}

override fun getFormat(): MediaFormat {
return format ?: throw IOException("Extractor track not selected")
}

private fun selectTrack(type: String): String {
for (i in 0 until mediaExtractor.trackCount) {
val format = mediaExtractor.getTrackFormat(i)
val mime = format.getString(MediaFormat.KEY_MIME) ?: continue
if (mime.startsWith(type, ignoreCase = true)) {
mediaExtractor.selectTrack(i)
this.format = format
return mime
}
}
throw IOException("track not found")
}

private fun reset() {
sleepTime = 0
accumulativeTs = 0
lastExtractorTs = 0
format = null
}
}
Loading

0 comments on commit 56bc753

Please sign in to comment.