Skip to content

Commit

Permalink
Use DefaultUncaughtExceptionHandler to log the uncached exception
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Jan 16, 2024
1 parent 0b873e6 commit 0a90348
Show file tree
Hide file tree
Showing 23 changed files with 159 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.dolphinscheduler.alert;

import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;
import org.apache.dolphinscheduler.alert.service.AlertBootstrapService;
import org.apache.dolphinscheduler.alert.service.ListenerEventPostService;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;

import javax.annotation.PreDestroy;
Expand Down Expand Up @@ -54,6 +56,8 @@ public class AlertServer {
private AlertRegistryClient alertRegistryClient;

public static void main(String[] args) {
AlertServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
Thread.currentThread().setName(Constants.THREAD_NAME_ALERT_SERVER);
new SpringApplicationBuilder(AlertServer.class).run(args);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ public void registerPendingAlertGauge(final Supplier<Number> supplier) {
.register(Metrics.globalRegistry);
}

public static void registerUncachedException(final Supplier<Number> supplier) {
Gauge.builder("ds.master.uncached.exception", supplier)
.description("number of uncached exception")
.register(Metrics.globalRegistry);
}

public void incAlertSuccessCount() {
alertSuccessCounter.increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.dolphinscheduler.api;

import org.apache.dolphinscheduler.api.metrics.ApiServerMetrics;
import org.apache.dolphinscheduler.common.enums.PluginType;
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
Expand Down Expand Up @@ -51,6 +53,8 @@ public class ApiApplicationServer {
private PluginDao pluginDao;

public static void main(String[] args) {
ApiServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
SpringApplication.run(ApiApplicationServer.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.dolphinscheduler.api.metrics;

import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import lombok.experimental.UtilityClass;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;

Expand Down Expand Up @@ -120,4 +122,10 @@ public void cleanUpApiResponseTimeMetricsByUserId(final int userId) {
"ds.api.response.time",
"user.id", String.valueOf(userId)));
}

public static void registerUncachedException(final Supplier<Number> supplier) {
Gauge.builder("ds.master.uncached.exception", supplier)
.description("number of uncached exception")
.register(Metrics.globalRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ public abstract class BaseDaemonThread extends Thread {
protected BaseDaemonThread(Runnable runnable) {
super(runnable);
this.setDaemon(true);
this.setUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
}

protected BaseDaemonThread(String threadName) {
super();
this.setName(threadName);
this.setDaemon(true);
this.setUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.common.thread;

import java.util.concurrent.atomic.LongAdder;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DefaultUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {

private static final DefaultUncaughtExceptionHandler INSTANCE = new DefaultUncaughtExceptionHandler();

private static final LongAdder uncaughtExceptionCount = new LongAdder();

private DefaultUncaughtExceptionHandler() {
}

public static DefaultUncaughtExceptionHandler getInstance() {
return INSTANCE;
}

public static long getUncaughtExceptionCount() {
return uncaughtExceptionCount.longValue();
}

@Override
public void uncaughtException(Thread t, Throwable e) {
uncaughtExceptionCount.add(1);
log.error("Caught an exception in {}.", t, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,20 @@
@Slf4j
public class ThreadUtils {

/**
* Wrapper over newDaemonFixedThreadExecutor.
*
* @param threadName threadName
* @param threadsNum threadsNum
* @return ExecutorService
*/
public static ThreadPoolExecutor newDaemonFixedThreadExecutor(String threadName, int threadsNum) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build();
return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum, threadFactory);
return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum, newDaemonThreadFactory(threadName));
}

public static ScheduledExecutorService newSingleDaemonScheduledExecutorService(String threadName) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadName)
return Executors.newSingleThreadScheduledExecutor(newDaemonThreadFactory(threadName));
}

public static ThreadFactory newDaemonThreadFactory(String threadName) {
return new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(threadName)
.setUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance())
.build();
return Executors.newSingleThreadScheduledExecutor(threadFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.common.thread;

import java.util.concurrent.ThreadPoolExecutor;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class ThreadUtilsTest {

@Test
void newDaemonFixedThreadExecutor() throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = ThreadUtils.newDaemonFixedThreadExecutor("DemonThread", 1);
threadPoolExecutor.execute(() -> {
throw new IllegalArgumentException("I am an exception");
});
Thread.sleep(1_000);
Assertions.assertEquals(1, DefaultUncaughtExceptionHandler.getUncaughtExceptionCount());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private void processReceived(final Transporter transporter) {
future.release();
if (future.getInvokeCallback() != null) {
future.removeFuture();
this.callbackExecutor.submit(future::executeInvokeCallback);
this.callbackExecutor.execute(future::executeInvokeCallback);
} else {
future.putResponse(deserialize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.extract.base;

import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
import org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException;
Expand All @@ -30,7 +31,6 @@
import org.apache.dolphinscheduler.extract.base.utils.CallerThreadExecutePolicy;
import org.apache.dolphinscheduler.extract.base.utils.Constants;
import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.extract.base.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;

import java.net.InetSocketAddress;
Expand All @@ -40,6 +40,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -80,25 +81,24 @@ public class NettyRemotingClient implements AutoCloseable {

public NettyRemotingClient(final NettyClientConfig clientConfig) {
this.clientConfig = clientConfig;
ThreadFactory nettyClientThreadFactory = ThreadUtils.newDaemonThreadFactory("NettyClientThread-");
if (Epoll.isAvailable()) {
this.workerGroup =
new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new NamedThreadFactory("NettyClient"));
this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory);
} else {
this.workerGroup =
new NioEventLoopGroup(clientConfig.getWorkerThreads(), new NamedThreadFactory("NettyClient"));
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory);
}
this.callbackExecutor = new ThreadPoolExecutor(
Constants.CPUS,
Constants.CPUS,
1,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(1000),
new NamedThreadFactory("CallbackExecutor"),
ThreadUtils.newDaemonThreadFactory("NettyClientCallbackThread-"),
new CallerThreadExecutePolicy());
this.clientHandler = new NettyClientHandler(this, callbackExecutor);

this.responseFutureExecutor =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor"));
this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.newDaemonThreadFactory("NettyClientResponseFutureThread-"));

this.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.extract.base;

import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.exception.RemoteException;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
Expand All @@ -27,15 +28,11 @@
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import lombok.extern.slf4j.Slf4j;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
Expand All @@ -55,8 +52,8 @@ public class NettyRemotingServer {

private final ServerBootstrap serverBootstrap = new ServerBootstrap();

private final ExecutorService defaultExecutor =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
private final ExecutorService defaultExecutor = ThreadUtils
.newDaemonFixedThreadExecutor("NettyRemotingServerThread", Runtime.getRuntime().availableProcessors() * 2);

private final EventLoopGroup bossGroup;

Expand All @@ -68,16 +65,12 @@ public class NettyRemotingServer {

private final AtomicBoolean isStarted = new AtomicBoolean(false);

private static final String NETTY_BIND_FAILURE_MSG = "NettyRemotingServer bind %s fail";

public NettyRemotingServer(final NettyServerConfig serverConfig) {
this.serverConfig = serverConfig;
ThreadFactory bossThreadFactory =
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(serverConfig.getServerName() + "BossThread_%s")
.build();
ThreadUtils.newDaemonThreadFactory(serverConfig.getServerName() + "BossThread_%s");
ThreadFactory workerThreadFactory =
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat(serverConfig.getServerName() + "WorkerThread_%s").build();
ThreadUtils.newDaemonThreadFactory(serverConfig.getServerName() + "WorkerThread_%s");
if (Epoll.isAvailable()) {
this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory);
this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private void processReceived(final Channel channel, final Transporter transporte
channel.writeAndFlush(response);
return;
}
nettyRemotingServer.getDefaultExecutor().submit(() -> {
nettyRemotingServer.getDefaultExecutor().execute(() -> {
StandardRpcResponse iRpcResponse;
try {
StandardRpcRequest standardRpcRequest =
Expand Down
Loading

0 comments on commit 0a90348

Please sign in to comment.