Skip to content

Commit

Permalink
Restore support for Disruptor 3.x
Browse files Browse the repository at this point in the history
  • Loading branch information
ppkarwasz committed Dec 19, 2023
1 parent ee58635 commit 463e259
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceReportingEventHandler;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
Expand All @@ -40,6 +41,7 @@
import org.apache.logging.log4j.core.util.Log4jThreadFactory;
import org.apache.logging.log4j.core.util.Throwables;
import org.apache.logging.log4j.message.ReusableMessage;
import org.apache.logging.log4j.util.LoaderUtil;

/**
* Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX Disruptor library.
Expand Down Expand Up @@ -97,7 +99,9 @@ private static class Log4jEventWrapperHandler implements EventHandler<Log4jEvent
private Sequence sequenceCallback;
private int counter;

@Override
/*
* Overrides a method from Disruptor 4.x. Do not remove.
*/
public void setSequenceCallback(final Sequence sequenceCallback) {
this.sequenceCallback = sequenceCallback;
}
Expand All @@ -124,6 +128,12 @@ private void notifyIntermediateProgress(final long sequence) {
}
}

/**
* A version of Log4jEventWrapperHandler for LMAX Disruptor 3.x.
*/
private static final class Log4jEventWrapperHandler3 extends Log4jEventWrapperHandler
implements SequenceReportingEventHandler<Log4jEventWrapper> {}

/**
* Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the
* RingBuffer.
Expand Down Expand Up @@ -155,6 +165,16 @@ private void notifyIntermediateProgress(final long sequence) {
ringBufferElement.loggerConfig = loggerConfig;
};

private Log4jEventWrapperHandler createEventHandler() {
try {
return LoaderUtil.newInstanceOf(
"org.apache.logging.log4j.core.async.AsyncLoggerConfigDisruptor$Log4jEventWrapperHandler3");
} catch (final ReflectiveOperationException | LinkageError e) {
LOGGER.debug("LMAX Disruptor 3.x is missing, trying version 4.x.", e);
}
return new Log4jEventWrapperHandler();
}

private int ringBufferSize;
private AsyncQueueFullPolicy asyncQueueFullPolicy;
private Boolean mutable = Boolean.FALSE;
Expand Down Expand Up @@ -220,7 +240,7 @@ public Thread newThread(final Runnable r) {
final ExceptionHandler<Log4jEventWrapper> errorHandler = DisruptorUtil.getAsyncLoggerConfigExceptionHandler();
disruptor.setDefaultExceptionHandler(errorHandler);

final Log4jEventWrapperHandler[] handlers = {new Log4jEventWrapperHandler()};
final Log4jEventWrapperHandler[] handlers = {createEventHandler()};
disruptor.handleEventsWith(handlers);

LOGGER.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public Thread newThread(final Runnable r) {
final ExceptionHandler<RingBufferLogEvent> errorHandler = DisruptorUtil.getAsyncLoggerExceptionHandler();
disruptor.setDefaultExceptionHandler(errorHandler);

final RingBufferLogEventHandler[] handlers = {new RingBufferLogEventHandler()};
final RingBufferLogEventHandler4[] handlers = {RingBufferLogEventHandler4.create()};
disruptor.handleEventsWith(handlers);

LOGGER.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,19 @@ static WaitStrategy createDefaultWaitStrategy(final String propertyName) {
LOGGER.trace(
"DefaultAsyncWaitStrategyFactory creating TimeoutBlockingWaitStrategy(timeout={}, unit=MILLIS)",
timeoutMillis);
try {
// Check for the v 4.x version of the strategy, the version in 3.x is not garbage-free.
if (DisruptorUtil.DISRUPTOR_MAJOR_VERSION == 4) {
return (WaitStrategy) Class.forName("com.lmax.disruptor.TimeoutBlockingWaitStrategy")
.getConstructor(long.class, TimeUnit.class)
.newInstance(timeoutMillis, TimeUnit.MILLISECONDS);
}
} catch (final ReflectiveOperationException | LinkageError e) {
LOGGER.debug(
"DefaultAsyncWaitStrategyFactory failed to load 'com.lmax.disruptor.TimeoutBlockingWaitStrategy', using '{}' instead.",
TimeoutBlockingWaitStrategy.class.getName());
}
// Use our version
return new TimeoutBlockingWaitStrategy(timeoutMillis, TimeUnit.MILLISECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ final class DisruptorUtil {
static final boolean ASYNC_CONFIG_SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_FULL = PropertiesUtil.getProperties()
.getBooleanProperty("AsyncLoggerConfig.SynchronizeEnqueueWhenQueueFull", true);

static final int DISRUPTOR_MAJOR_VERSION =
LoaderUtil.isClassAvailable("com.lmax.disruptor.SequenceReportingEventHandler") ? 3 : 4;

private DisruptorUtil() {}

static WaitStrategy createWaitStrategy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,68 +16,21 @@
*/
package org.apache.logging.log4j.core.async;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.SequenceReportingEventHandler;

/**
* This event handler gets passed messages from the RingBuffer as they become
* available. Processing of these messages is done in a separate thread,
* controlled by the {@code Executor} passed to the {@code Disruptor}
* constructor.
*/
public class RingBufferLogEventHandler implements EventHandler<RingBufferLogEvent> {

private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
private Sequence sequenceCallback;
private int counter;
private long threadId = -1;

@Override
public void setSequenceCallback(final Sequence sequenceCallback) {
this.sequenceCallback = sequenceCallback;
}

@Override
public void onEvent(final RingBufferLogEvent event, final long sequence, final boolean endOfBatch)
throws Exception {
try {
// RingBufferLogEvents are populated by an EventTranslator. If an exception is thrown during event
// translation, the event may not be fully populated, but Disruptor requires that the associated sequence
// still be published since a slot has already been claimed in the ring buffer. Ignore any such unpopulated
// events. The exception that occurred during translation will have already been propagated.
if (event.isPopulated()) {
event.execute(endOfBatch);
}
} finally {
event.clear();
// notify the BatchEventProcessor that the sequence has progressed.
// Without this callback the sequence would not be progressed
// until the batch has completely finished.
notifyCallback(sequence);
}
}

private void notifyCallback(final long sequence) {
if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
sequenceCallback.set(sequence);
counter = 0;
}
}
public class RingBufferLogEventHandler extends RingBufferLogEventHandler4
implements SequenceReportingEventHandler<RingBufferLogEvent>, LifecycleAware {

/**
* Returns the thread ID of the background consumer thread, or {@code -1} if the background thread has not started
* yet.
* @return the thread ID of the background consumer thread, or {@code -1}
* @deprecated Use the {@link RingBufferLogEventHandler4#create()} factory method instead.
*/
public long getThreadId() {
return threadId;
}

@Override
public void onStart() {
threadId = Thread.currentThread().getId();
}

@Override
public void onShutdown() {}
@Deprecated
public RingBufferLogEventHandler() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.logging.log4j.core.async;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.Sequence;
import org.apache.logging.log4j.status.StatusLogger;
import org.apache.logging.log4j.util.LoaderUtil;

/**
* This event handler gets passed messages from the RingBuffer as they become
* available. Processing of these messages is done in a separate thread,
* controlled by the {@code Executor} passed to the {@code Disruptor}
* constructor.
*/
class RingBufferLogEventHandler4 implements EventHandler<RingBufferLogEvent> {

private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
private Sequence sequenceCallback;
private int counter;
private long threadId = -1;

/**
* Returns the appropriate {@link EventHandler} for the version of LMAX Disruptor used.
*/
public static RingBufferLogEventHandler4 create() {
try {
return LoaderUtil.newInstanceOf("org.apache.logging.log4j.core.async.RingBufferLogEventHandler");
} catch (final ReflectiveOperationException | LinkageError e) {
StatusLogger.getLogger().debug("LMAX Disruptor 3.x is missing, trying version 4.x.", e);
}
return new RingBufferLogEventHandler4();
}

/*
* Overrides a method from Disruptor 4.x. Do not remove.
*/
public void setSequenceCallback(final Sequence sequenceCallback) {
this.sequenceCallback = sequenceCallback;
}

@Override
public void onEvent(final RingBufferLogEvent event, final long sequence, final boolean endOfBatch)
throws Exception {
try {
// RingBufferLogEvents are populated by an EventTranslator. If an exception is thrown during event
// translation, the event may not be fully populated, but Disruptor requires that the associated sequence
// still be published since a slot has already been claimed in the ring buffer. Ignore any such unpopulated
// events. The exception that occurred during translation will have already been propagated.
if (event.isPopulated()) {
event.execute(endOfBatch);
}
} finally {
event.clear();
// notify the BatchEventProcessor that the sequence has progressed.
// Without this callback the sequence would not be progressed
// until the batch has completely finished.
notifyCallback(sequence);
}
}

private void notifyCallback(final long sequence) {
if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
sequenceCallback.set(sequence);
counter = 0;
}
}

/**
* Returns the thread ID of the background consumer thread, or {@code -1} if the background thread has not started
* yet.
*
* @return the thread ID of the background consumer thread, or {@code -1}
*/
public long getThreadId() {
return threadId;
}

/*
* Overrides a method from Disruptor 4.x. Do not remove.
*/
public void onStart() {
threadId = Thread.currentThread().getId();
}

/*
* Overrides a method from Disruptor 4.x. Do not remove.
*/
public void onShutdown() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* Provides Asynchronous Logger classes and interfaces for low-latency logging.
*/
@Export
@Version("2.21.0")
@Version("2.23.0")
package org.apache.logging.log4j.core.async;

import org.osgi.annotation.bundle.Export;
Expand Down
2 changes: 1 addition & 1 deletion log4j-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
<commons-logging.version>1.3.0</commons-logging.version>
<!-- `com.conversantmedia:disruptor` version 1.2.16 requires Java 9: -->
<conversant.disruptor.version>1.2.15</conversant.disruptor.version>
<disruptor.version>4.0.0</disruptor.version>
<disruptor.version>3.4.4</disruptor.version>
<elasticsearch-java.version>8.11.2</elasticsearch-java.version>
<embedded-ldap.version>0.9.0</embedded-ldap.version>
<felix.version>7.0.5</felix.version>
Expand Down

0 comments on commit 463e259

Please sign in to comment.