Skip to content

Commit

Permalink
fix(内嵌流媒体): 修复内嵌流媒体启动多个服务问题 (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
liu4410 authored Oct 29, 2024
1 parent 241fffd commit 00dbb89
Showing 1 changed file with 74 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

import java.io.*;
import java.io.File;
import java.io.FileWriter;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -45,9 +46,9 @@ public class ProcessZLMediaRuntime implements ZLMediaRuntime {
private Process process;

private final Sinks.Many<String> output = Sinks
.many()
.unicast()
.onBackpressureBuffer();
.many()
.unicast()
.onBackpressureBuffer();

private final Disposable.Composite disposable = Disposables.composite();

Expand Down Expand Up @@ -88,33 +89,33 @@ public ProcessZLMediaRuntime(String processFile,
this.configs.putAll(configs.createConfigs());
String secure = configs.getSecret();
this.operations = new RestfulZLMediaOperations(
builder
.clone()
.baseUrl("http://127.0.0.1:" + configs.getPorts().getHttp())
.filter((request, exchange) -> exchange.exchange(
ClientRequest
.from(request)
.url(UriComponentsBuilder
.fromUri(request.url())
.queryParam("secret", secure)
.build()
.toUri())
.build()
))
.build(),
configs,
mapper);
builder
.clone()
.baseUrl("http://127.0.0.1:" + configs.getPorts().getHttp())
.filter((request, exchange) -> exchange.exchange(
ClientRequest
.from(request)
.url(UriComponentsBuilder
.fromUri(request.url())
.queryParam("secret", secure)
.build()
.toUri())
.build()
))
.build(),
configs,
mapper);
}

@Override
@SneakyThrows
public Mono<Void> start() {
return Mono
//启动
.fromRunnable(this::start0)
.subscribeOn(Schedulers.boundedElastic())
//等待
.then(startAwait.asMono());
//启动
.fromRunnable(this::start0)
.subscribeOn(Schedulers.boundedElastic())
//等待
.then(startAwait.asMono());
}


Expand All @@ -132,9 +133,11 @@ protected long getPid() {
}

@SneakyThrows
protected void start0() {
protected synchronized void start0() {
File file = new File(processFile);

if (isDisposed() || process != null) {
return;
}
storeInit(this.configs, new File(new File(processFile).getParent(), "config.ini"));

Path pidFile = Paths.get(processFile + ".pid");
Expand All @@ -143,20 +146,20 @@ protected void start0() {
String pid = new String(Files.readAllBytes(pidFile));
log.warn("zlmedia process already exists, kill it:{}", pid);
Runtime
.getRuntime()
.exec(new String[]{"kill", pid})
.waitFor();
.getRuntime()
.exec(new String[]{"kill", pid})
.waitFor();
} catch (Throwable e) {
log.warn("kill zlmedia process error", e);
}
}

process = new ProcessBuilder()
.command(file.getAbsolutePath())
.directory(file.getParentFile())
.redirectErrorStream(true)
.inheritIO()
.start();
.command(file.getAbsolutePath())
.directory(file.getParentFile())
.redirectErrorStream(true)
.inheritIO()
.start();
long pid = getPid();

if (pid > 0) {
Expand All @@ -166,8 +169,8 @@ protected void start0() {
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
pidFile
.toFile()
.deleteOnExit();
.toFile()
.deleteOnExit();

disposable.add(() -> {
boolean ignore = pidFile.toFile().delete();
Expand All @@ -176,34 +179,37 @@ protected void start0() {

//监听进程退出
disposable
.add(
Mono
.<DataBuffer>fromCallable(() -> {
try {
processExit(process.waitFor());
} catch (InterruptedException ignore) {
processExit(-1);
}
return null;
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe()
);
.add(
Mono
.<DataBuffer>fromCallable(() -> {
try {
processExit(process.waitFor());
} catch (InterruptedException ignore) {
processExit(-1);
}
return null;
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe()
);

//定时检查是否启动成功
disposable.add(
Flux.interval(Duration.ofSeconds(2), Duration.ofSeconds(1))
.onBackpressureDrop()
.concatMap(ignore -> operations
.opsForState()
.isAlive())
.filter(Boolean::booleanValue)
.take(1)
.subscribe(ignore -> {
restartCount = 0;
startAwait.tryEmitEmpty();
})
Flux.interval(Duration.ofSeconds(2), Duration.ofSeconds(1))
.onBackpressureDrop()
.concatMap(ignore -> operations
.opsForState()
.isAlive())
.filter(Boolean::booleanValue)
.take(1)
.subscribe(ignore -> {
restartCount = 0;
startAwait.tryEmitEmpty();
})
);
if (isDisposed()) {
process.destroy();
}
}

private void handleOutput(String line) {
Expand All @@ -226,13 +232,13 @@ protected void processExit(int code) {
}
restartCount++;
Schedulers
.boundedElastic()
.schedule(() -> {
if (disposable.isDisposed()) {
return;
}
start0();
}, 2, TimeUnit.SECONDS);
.boundedElastic()
.schedule(() -> {
if (disposable.isDisposed()) {
return;
}
start0();
}, 2, TimeUnit.SECONDS);
// disposable.dispose();
}

Expand Down

0 comments on commit 00dbb89

Please sign in to comment.