From 0d951f311fc4c46252e355533a45006ca84b00f6 Mon Sep 17 00:00:00 2001 From: Chen768959 <934103231@qq.com> Date: Fri, 25 Dec 2020 19:01:31 +0800 Subject: [PATCH 1/6] Enhance taildir source's efficiencies Increase the ability to monitor subdirectories If you need to monitor subdirectories, adding the following parameters into the taildir source configuration file: filegroupsIncludeChild filegroupsIncludeChild. The usages of the two parameters are similar to that of filegroups parameter, but they can monitor both existing and newly genereted subdirectories at the same time This enhancement have no effect to the original taildir source's efficiencies --- .../taildir/ReliableTaildirEventReader.java | 33 +- .../flume/source/taildir/TailMatcher.java | 15 + .../taildir/TaildirIncludeChildMatcher.java | 351 ++++++++++++++++++ .../flume/source/taildir/TaildirMatcher.java | 12 +- .../flume/source/taildir/TaildirSource.java | 24 +- .../TaildirSourceConfigurationConstants.java | 4 + 6 files changed, 424 insertions(+), 15 deletions(-) create mode 100644 flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailMatcher.java create mode 100644 flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java index ae9583620a..8d03e0171b 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java @@ -49,7 +49,7 @@ public class ReliableTaildirEventReader implements ReliableEventReader { private static final Logger logger = LoggerFactory.getLogger(ReliableTaildirEventReader.class); - private final List taildirCache; + private final List taildirCache; private final Table headerTable; private TailFile currentFile = null; @@ -64,12 +64,14 @@ public class ReliableTaildirEventReader implements ReliableEventReader { /** * Create a ReliableTaildirEventReader to watch the given directory. */ - private ReliableTaildirEventReader(Map filePaths, + private ReliableTaildirEventReader(Map filePaths, Map filePathsIncludeChild, Table headerTable, String positionFilePath, boolean skipToEnd, boolean addByteOffset, boolean cachePatternMatching, boolean annotateFileName, String fileNameHeader) throws IOException { // Sanity checks - Preconditions.checkNotNull(filePaths); + if (filePaths == null && filePathsIncludeChild == null) { + throw new NullPointerException(); + } Preconditions.checkNotNull(positionFilePath); if (logger.isDebugEnabled()) { @@ -77,10 +79,18 @@ private ReliableTaildirEventReader(Map filePaths, new Object[] { ReliableTaildirEventReader.class.getSimpleName(), filePaths }); } - List taildirCache = Lists.newArrayList(); - for (Entry e : filePaths.entrySet()) { - taildirCache.add(new TaildirMatcher(e.getKey(), e.getValue(), cachePatternMatching)); + List taildirCache = Lists.newArrayList(); + if (filePaths!=null){ + for (Entry e : filePaths.entrySet()) { + taildirCache.add(new TaildirMatcher(e.getKey(), e.getValue(), cachePatternMatching)); + } + } + if (filePathsIncludeChild!=null){ + for (Map.Entry e : filePathsIncludeChild.entrySet()) { + taildirCache.add(new TaildirIncludeChildMatcher(e.getKey(), e.getValue(), cachePatternMatching)); + } } + logger.info("taildirCache: " + taildirCache.toString()); logger.info("headerTable: " + headerTable.toString()); @@ -239,7 +249,7 @@ public List updateTailFiles(boolean skipToEnd) throws IOException { updateTime = System.currentTimeMillis(); List updatedInodes = Lists.newArrayList(); - for (TaildirMatcher taildir : taildirCache) { + for (TailMatcher taildir : taildirCache) { Map headers = headerTable.row(taildir.getFileGroup()); for (File f : taildir.getMatchingFiles()) { @@ -247,6 +257,7 @@ public List updateTailFiles(boolean skipToEnd) throws IOException { try { inode = getInode(f); } catch (NoSuchFileException e) { + taildir.deleteFileCache(f); logger.info("File has been deleted in the meantime: " + e.getMessage()); continue; } @@ -299,6 +310,7 @@ private TailFile openFile(File file, Map headers, long inode, lo */ public static class Builder { private Map filePaths; + private Map filePathsIncludeChild; private Table headerTable; private String positionFilePath; private boolean skipToEnd; @@ -314,6 +326,11 @@ public Builder filePaths(Map filePaths) { return this; } + public Builder filePathsIncludeChild(Map filePathsIncludeChild) { + this.filePathsIncludeChild = filePathsIncludeChild; + return this; + } + public Builder headerTable(Table headerTable) { this.headerTable = headerTable; return this; @@ -350,7 +367,7 @@ public Builder fileNameHeader(String fileNameHeader) { } public ReliableTaildirEventReader build() throws IOException { - return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd, + return new ReliableTaildirEventReader(filePaths, filePathsIncludeChild, headerTable, positionFilePath, skipToEnd, addByteOffset, cachePatternMatching, annotateFileName, fileNameHeader); } diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailMatcher.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailMatcher.java new file mode 100644 index 0000000000..26cfb51002 --- /dev/null +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailMatcher.java @@ -0,0 +1,15 @@ +package org.apache.flume.source.taildir; + +import java.io.File; +import java.util.List; + +/** + * Identifies and caches the files matched + */ +public interface TailMatcher { + List getMatchingFiles(); + + String getFileGroup(); + + void deleteFileCache(File f); +} diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java new file mode 100644 index 0000000000..ced8a74bb8 --- /dev/null +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java @@ -0,0 +1,351 @@ +package org.apache.flume.source.taildir; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.lang.StringUtils; +import org.apache.flume.annotations.InterfaceAudience; +import org.apache.flume.annotations.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.*; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +/** + * Identifies and caches the files matched by single file pattern for {@code TAILDIR} source. + *

+ * file patterns apply to the fileNames and files in subdirectories, + * implementation checks the parent directory and subdirectories for modification (additional or removed files + * update modification time of dir) + * If no modification happened to the dir that means the underlying files could only be + * written to but no need to rerun the pattern matching on fileNames. + * If a directory has modified, can only re-match the directory, no need to match other directories + *

+ * This implementation provides lazy caching or no caching. Instances of this class keep the + * result file list from the last successful execution of {@linkplain #getMatchingFiles()} + * function invocation, and may serve the content without hitting the FileSystem for performance + * optimization. + *

+ * IMPORTANT: It is assumed that the hosting system provides at least second granularity + * for both {@code System.currentTimeMillis()} and {@code File.lastModified()}. Also + * that system clock is used for file system timestamps. If it is not the case then configure it + * as uncached. Class is solely for package only usage. Member functions are not thread safe. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class TaildirIncludeChildMatcher implements TailMatcher { + private static final Logger logger = LoggerFactory.getLogger(TaildirIncludeChildMatcher.class); + private static final FileSystem FS = FileSystems.getDefault(); + + // flag from configuration to switch off caching completely + private final boolean cachePatternMatching; + // id from configuration + private final String fileGroup; + // plain string of the desired files from configuration + private final String filePattern; + + // directory monitored for changes + private Set parentDirList = Sets.newLinkedHashSet(); + + // Key is file path + // Value is a two tuple, contains the lastSeenParentDirMTime and lastCheckedTime of the file + private Map lastTimeMap = Maps.newHashMap(); + + // cached content, files which matched the pattern within the parent directory + private Set lastMatchedFiles = Sets.newHashSet(); + + // Array version cache of lastMatchedFilesSet, use this cache when the files has not been changed + private List lastMatchedFilesCache = new ArrayList<>(); + + // file regex + private final String regex; + + + TaildirIncludeChildMatcher(String fileGroup, String filePattern, boolean cachePatternMatching) { + // store whatever came from configuration + this.fileGroup = fileGroup; + this.filePattern = filePattern; + this.cachePatternMatching = cachePatternMatching; + + // Path to the root directory to be monitored + // The end of the path in the configuration file can be filled with the file regular that needs to be matched + // Note that if "/" is not written at the end of the path, the end field will be treated as a regular + String filePatternParent = ""; + + if (filePattern.charAt(filePattern.length() - 1) == '/') { + filePatternParent = filePattern; + regex = ""; + } else { + String[] res = filePattern.split("\\/"); + List list = new ArrayList<>(Arrays.asList(res)); + regex = list.remove(list.size() - 1); + filePatternParent = StringUtils.join(list, "/"); + } + + File f = new File(filePatternParent); + + // Scan from the top directory + // Scan out all subdirectories and put them into cache + getFileGroupChild(f, this.parentDirList); + + Preconditions.checkState(f.exists(), + "Directory does not exist: " + f.getAbsolutePath()); + } + + /** + * Lists those files within the parentDir and subdirectory that match regex pattern passed in during object + * instantiation. Designed for frequent periodic invocation + * {@link org.apache.flume.source.PollableSourceRunner}. + *

+ * Based on the modification of the parentDirList(parentDir and its subdirectories) this function may trigger cache recalculation by + * calling {@linkplain #updateMatchingFilesNoCache(File, List)} or + * return the value stored in {@linkplain #lastMatchedFilesCache}. + * Parentdir is allowed to be a symbolic link. + *

+ * Files returned by this call are weakly consistent (see {@link DirectoryStream}). + * It does not freeze the directory while iterating, + * so it may (or may not) reflect updates to the directory that occur during the call, + * In which case next call + * will return those files (as mtime is increasing it won't hit cache but trigger recalculation). + * It is guaranteed that invocation reflects every change which was observable at the time of + * invocation. + *

+ * Matching file list recalculation is triggered when caching was turned off or + * if mtime is greater than the previously seen mtime + * (including the case of cache hasn't been calculated before). + * Additionally if a constantly updated directory was configured as parentDir and its subdirectories + * then multiple changes to the parentDirList may happen + * within the same second so in such case (assuming at least second granularity of reported mtime) + * it is impossible to tell whether a change of the dir happened before the check or after + * (unless the check happened after that second). + * Having said that implementation also stores system time of the previous invocation and previous + * invocation has to happen strictly after the current mtime to avoid further cache refresh + * (because then it is guaranteed that previous invocation resulted in valid cache content). + * If system clock hasn't passed the second of + * the current mtime then logic expects more changes as well + * (since it cannot be sure that there won't be any further changes still in that second + * and it would like to avoid data loss in first place) + * hence it recalculates matching files. If system clock finally + * passed actual mtime then a subsequent invocation guarantees that it picked up every + * change from the passed second so + * any further invocations can be served from cache associated with that second + * (given mtime is not updated again). + *

+ * Only the changed directories and new subdirectories will be rescanned each time + * + * @return List of files matching the pattern sorted by last modification time. No recursion. + * No directories. If nothing matches then returns an empty list. If I/O issue occurred then + * returns the list collected to the point when exception was thrown. + */ + public List getMatchingFiles() { + boolean lastMatchedFilesHasChange = false; + + long now = TimeUnit.SECONDS.toMillis( + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())); + + List nonExistDirList = new ArrayList<>(); + + List newDirList = new ArrayList<>(); + + // Traverse all monitored directories + // if any directory changes are found, recalculate the file list of the directory + for (File dir : this.parentDirList) { + long currentParentDirMTime = dir.lastModified(); + + if (currentParentDirMTime == 0) { + this.lastTimeMap.remove(dir.getPath()); + nonExistDirList.add(dir); + continue; + } + + LastTimeTuple lastTimeTuple = lastTimeMap.get(dir.getPath()); + if (lastTimeTuple == null) { + lastTimeTuple = new LastTimeTuple(); + lastTimeMap.put(dir.getPath(), lastTimeTuple); + } + + Long lastSeenParentDirMTime = lastTimeTuple.getLastSeenParentDirMTime(); + Long lastCheckedTime = lastTimeTuple.getLastCheckedTime(); + + // calculate matched files if + // - we don't want to use cache (recalculate every time) OR + // - directory was clearly updated after the last check OR + // - last mtime change wasn't already checked for sure + // (system clock hasn't passed that second yet) + if (!cachePatternMatching || + lastSeenParentDirMTime < currentParentDirMTime || + lastCheckedTime < currentParentDirMTime) { + lastMatchedFilesHasChange = true; + + updateMatchingFilesNoCache(dir, newDirList); + lastTimeTuple.setLastSeenParentDirMTime(currentParentDirMTime); + lastTimeTuple.setLastCheckedTime(now); + } + } + + if (!nonExistDirList.isEmpty()) { + this.parentDirList.removeAll(nonExistDirList); + } + + if (!newDirList.isEmpty()) { + this.parentDirList.addAll(newDirList); + } + + if (lastMatchedFilesHasChange) { + this.lastMatchedFilesCache = sortByLastModifiedTime(new ArrayList<>(this.lastMatchedFiles)); + } + + return this.lastMatchedFilesCache; + } + + /** + * Provides the actual files within the parentDir which + * files are matching the regex pattern. Each invocation uses {@link Pattern} + * to identify matching files. + * + * Files returned by this call are weakly consistent (new files will be set {@linkplain #lastMatchedFilesCache}). + * It does not freeze the directory while iterating, so it may (or may not) reflect updates + * to the directory that occur during the call. In which case next call will return those files. + * + * New dir will be stored in the parameters newDirList + * New file will be stored in the {@linkplain #lastMatchedFiles} + * + * @param dir Directory to be scanned + * @param newDirList Used to store the new directory + */ + private void updateMatchingFilesNoCache(File dir, List newDirList) { + try (DirectoryStream stream = Files.newDirectoryStream(dir.toPath())) { + if (stream != null) { + for (Path child : stream) { + if (Files.isDirectory(child)) { + File newDir = child.toFile(); + if (!this.parentDirList.contains(newDir)) { + newDirList.add(newDir); + updateMatchingFilesNoCache(newDir, newDirList); + } + } else { + if (child.toString().matches(regex) || regex == "") { + this.lastMatchedFiles.add(child.toFile()); + } + + } + } + } + + } catch (IOException e) { + logger.error("I/O exception occurred while listing parent directory. " + + "Files already matched will be returned. " + dir.toPath(), e); + } + } + + /** + * Scan all subdirectories of the specified directory and cache + * + * @param fileGroup Directory to be scanned + * @param fileGroupList Store the parent directory and its subdirectories + * @return void + */ + private static void getFileGroupChild(File fileGroup, Set fileGroupList) { + fileGroupList.add(fileGroup); + + if (fileGroup.listFiles() != null) { + for (File child : fileGroup.listFiles()) { + if (Files.isDirectory(child.toPath())) { + getFileGroupChild(child, fileGroupList); + } + } + } + + } + + /** + * Utility function to sort matched files based on last modification time. + * Sorting itself use only a snapshot of last modification times captured before the sorting + * to keep the number of stat system calls to the required minimum. + * + * @param files list of files in any order + * @return sorted list + */ + private static List sortByLastModifiedTime(List files) { + final HashMap lastModificationTimes = new HashMap(files.size()); + for (File f : files) { + lastModificationTimes.put(f, f.lastModified()); + } + Collections.sort(files, new Comparator() { + @Override + public int compare(File o1, File o2) { + return lastModificationTimes.get(o1).compareTo(lastModificationTimes.get(o2)); + } + }); + + return files; + } + + @Override + public String toString() { + return "{" + + "filegroup='" + fileGroup + '\'' + + ", filePattern='" + filePattern + '\'' + + ", cached=" + cachePatternMatching + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TaildirIncludeChildMatcher that = (TaildirIncludeChildMatcher) o; + + return fileGroup.equals(that.fileGroup); + + } + + @Override + public int hashCode() { + return fileGroup.hashCode(); + } + + public String getFileGroup() { + return fileGroup; + } + + @Override + public void deleteFileCache(File f) { + this.lastMatchedFiles.remove(f); + } + + private class LastTimeTuple { + // system time in milliseconds, stores the last modification time of the + // parent directory seen by the last check, rounded to seconds + // initial value is used in first check only when it will be replaced instantly + // (system time is positive) + private long lastSeenParentDirMTime = -1; + + // system time in milliseconds, time of the last check, rounded to seconds + // initial value is used in first check only when it will be replaced instantly + // (system time is positive) + private long lastCheckedTime = -1; + + public long getLastCheckedTime() { + return lastCheckedTime; + } + + public long getLastSeenParentDirMTime() { + return lastSeenParentDirMTime; + } + + public void setLastCheckedTime(long lastCheckedTime) { + this.lastCheckedTime = lastCheckedTime; + } + + public void setLastSeenParentDirMTime(long lastSeenParentDirMTime) { + this.lastSeenParentDirMTime = lastSeenParentDirMTime; + } + } +} diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java index ad9f720170..a0392262e4 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java @@ -65,7 +65,7 @@ */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class TaildirMatcher { +public class TaildirMatcher implements TailMatcher { private static final Logger logger = LoggerFactory.getLogger(TaildirMatcher.class); private static final FileSystem FS = FileSystems.getDefault(); @@ -180,7 +180,7 @@ public boolean accept(Path entry) throws IOException { * * @see #getMatchingFilesNoCache() */ - List getMatchingFiles() { + public List getMatchingFiles() { long now = TimeUnit.SECONDS.toMillis( TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())); long currentParentDirMTime = parentDir.lastModified(); @@ -283,4 +283,12 @@ public String getFileGroup() { return fileGroup; } + // This method is used to delete the cache of nonexistent files. + // This matcher object is updated all caches will be automatically refreshed, + // so there is no need to delete them + @Override + public void deleteFileCache(File f) { + + } + } diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java index 9ecccd7487..e1d33df643 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java @@ -63,6 +63,7 @@ public class TaildirSource extends AbstractSource implements private static final Logger logger = LoggerFactory.getLogger(TaildirSource.class); private Map filePaths; + private Map filePathsIncludeChild; private Table headerTable; private int batchSize; private String positionFilePath; @@ -95,6 +96,7 @@ public synchronized void start() { try { reader = new ReliableTaildirEventReader.Builder() .filePaths(filePaths) + .filePathsIncludeChild(filePathsIncludeChild) .headerTable(headerTable) .positionFilePath(positionFilePath) .skipToEnd(skipToEnd) @@ -154,12 +156,24 @@ public String toString() { @Override public synchronized void configure(Context context) { String fileGroups = context.getString(FILE_GROUPS); - Preconditions.checkState(fileGroups != null, "Missing param: " + FILE_GROUPS); + String fileGroupsIncludeChild = context.getString(FILE_GROUPS_INCLUDE_CHILD); + Preconditions.checkState(fileGroups != null || fileGroupsIncludeChild !=null, "Missing param: " + FILE_GROUPS); + + Map filePathsMap = context.getSubProperties(FILE_GROUPS_PREFIX); + if (!filePathsMap.isEmpty()) { + filePaths = selectByKeys(filePathsMap, + fileGroups.split("\\s+")); + Preconditions.checkState(!filePaths.isEmpty(), + "Mapping for tailing files is empty or invalid: '" + FILE_GROUPS_PREFIX + "'"); + } - filePaths = selectByKeys(context.getSubProperties(FILE_GROUPS_PREFIX), - fileGroups.split("\\s+")); - Preconditions.checkState(!filePaths.isEmpty(), - "Mapping for tailing files is empty or invalid: '" + FILE_GROUPS_PREFIX + "'"); + Map filePathsIncludeChildMap = context.getSubProperties(FILE_GROUPS_INCLUDE_CHILD_PREFIX); + if (!filePathsIncludeChildMap.isEmpty()){ + filePathsIncludeChild = selectByKeys(filePathsIncludeChildMap, + fileGroupsIncludeChild.split("\\s+")); + Preconditions.checkState(!filePathsIncludeChild.isEmpty(), + "Mapping for tailing files is empty or invalid: '" + FILE_GROUPS_INCLUDE_CHILD_PREFIX + "'"); + } String homePath = System.getProperty("user.home").replace('\\', '/'); positionFilePath = context.getString(POSITION_FILE, homePath + DEFAULT_POSITION_FILE); diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java index c614e26a5d..4294089cbe 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java @@ -22,6 +22,10 @@ public class TaildirSourceConfigurationConstants { public static final String FILE_GROUPS = "filegroups"; public static final String FILE_GROUPS_PREFIX = FILE_GROUPS + "."; + /** Mapping for tailing file groups.(subdirectories are also monitored) */ + public static final String FILE_GROUPS_INCLUDE_CHILD = "filegroupsIncludeChild"; + public static final String FILE_GROUPS_INCLUDE_CHILD_PREFIX = FILE_GROUPS_INCLUDE_CHILD + "."; + /** Mapping for putting headers to events grouped by file groups. */ public static final String HEADERS_PREFIX = "headers."; From c04f8c1b38658ab3cd599c853188e49c16b15f8d Mon Sep 17 00:00:00 2001 From: Chen768959 <934103231@qq.com> Date: Tue, 29 Dec 2020 21:03:12 +0800 Subject: [PATCH 2/6] Add license headers --- .../flume/source/taildir/TailMatcher.java | 19 +++++++++++++++++++ .../taildir/TaildirIncludeChildMatcher.java | 19 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailMatcher.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailMatcher.java index 26cfb51002..9b59e7c9cf 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailMatcher.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailMatcher.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.flume.source.taildir; import java.io.File; diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java index ced8a74bb8..f7978980d1 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.flume.source.taildir; import com.google.common.base.Preconditions; From fec42107cee4405a480ed81dd3a3aee53537f4c5 Mon Sep 17 00:00:00 2001 From: Chen768959 <934103231@qq.com> Date: Wed, 30 Dec 2020 15:50:20 +0800 Subject: [PATCH 3/6] Compliance checkstyle --- .../taildir/ReliableTaildirEventReader.java | 15 +- .../flume/source/taildir/TailMatcher.java | 6 +- .../taildir/TaildirIncludeChildMatcher.java | 596 +++++++++--------- .../flume/source/taildir/TaildirSource.java | 11 +- 4 files changed, 323 insertions(+), 305 deletions(-) diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java index 8d03e0171b..c335b98f26 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java @@ -64,7 +64,8 @@ public class ReliableTaildirEventReader implements ReliableEventReader { /** * Create a ReliableTaildirEventReader to watch the given directory. */ - private ReliableTaildirEventReader(Map filePaths, Map filePathsIncludeChild, + private ReliableTaildirEventReader(Map filePaths, + Map filePathsIncludeChild, Table headerTable, String positionFilePath, boolean skipToEnd, boolean addByteOffset, boolean cachePatternMatching, boolean annotateFileName, String fileNameHeader) throws IOException { @@ -76,18 +77,19 @@ private ReliableTaildirEventReader(Map filePaths, Map taildirCache = Lists.newArrayList(); - if (filePaths!=null){ + if (filePaths != null) { for (Entry e : filePaths.entrySet()) { taildirCache.add(new TaildirMatcher(e.getKey(), e.getValue(), cachePatternMatching)); } } - if (filePathsIncludeChild!=null){ + if (filePathsIncludeChild != null) { for (Map.Entry e : filePathsIncludeChild.entrySet()) { - taildirCache.add(new TaildirIncludeChildMatcher(e.getKey(), e.getValue(), cachePatternMatching)); + taildirCache.add( + new TaildirIncludeChildMatcher(e.getKey(), e.getValue(), cachePatternMatching)); } } @@ -367,7 +369,8 @@ public Builder fileNameHeader(String fileNameHeader) { } public ReliableTaildirEventReader build() throws IOException { - return new ReliableTaildirEventReader(filePaths, filePathsIncludeChild, headerTable, positionFilePath, skipToEnd, + return new ReliableTaildirEventReader(filePaths, filePathsIncludeChild, headerTable, + positionFilePath, skipToEnd, addByteOffset, cachePatternMatching, annotateFileName, fileNameHeader); } diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailMatcher.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailMatcher.java index 9b59e7c9cf..7bb4d4d6d3 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailMatcher.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailMatcher.java @@ -26,9 +26,9 @@ * Identifies and caches the files matched */ public interface TailMatcher { - List getMatchingFiles(); + List getMatchingFiles(); - String getFileGroup(); + String getFileGroup(); - void deleteFileCache(File f); + void deleteFileCache(File f); } diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java index f7978980d1..0e6d80b6c7 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java @@ -30,8 +30,19 @@ import java.io.File; import java.io.IOException; -import java.nio.file.*; -import java.util.*; +import java.nio.file.FileSystem; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.DirectoryStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.Collections; +import java.util.Arrays; +import java.util.Comparator; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -39,8 +50,8 @@ * Identifies and caches the files matched by single file pattern for {@code TAILDIR} source. *

* file patterns apply to the fileNames and files in subdirectories, - * implementation checks the parent directory and subdirectories for modification (additional or removed files - * update modification time of dir) + * implementation checks the parent directory and subdirectories for modification + * (additional or removed files update modification time of dir) * If no modification happened to the dir that means the underlying files could only be * written to but no need to rerun the pattern matching on fileNames. * If a directory has modified, can only re-match the directory, no need to match other directories @@ -58,313 +69,314 @@ @InterfaceAudience.Private @InterfaceStability.Evolving public class TaildirIncludeChildMatcher implements TailMatcher { - private static final Logger logger = LoggerFactory.getLogger(TaildirIncludeChildMatcher.class); - private static final FileSystem FS = FileSystems.getDefault(); - - // flag from configuration to switch off caching completely - private final boolean cachePatternMatching; - // id from configuration - private final String fileGroup; - // plain string of the desired files from configuration - private final String filePattern; - - // directory monitored for changes - private Set parentDirList = Sets.newLinkedHashSet(); - - // Key is file path - // Value is a two tuple, contains the lastSeenParentDirMTime and lastCheckedTime of the file - private Map lastTimeMap = Maps.newHashMap(); - - // cached content, files which matched the pattern within the parent directory - private Set lastMatchedFiles = Sets.newHashSet(); - - // Array version cache of lastMatchedFilesSet, use this cache when the files has not been changed - private List lastMatchedFilesCache = new ArrayList<>(); - - // file regex - private final String regex; - - - TaildirIncludeChildMatcher(String fileGroup, String filePattern, boolean cachePatternMatching) { - // store whatever came from configuration - this.fileGroup = fileGroup; - this.filePattern = filePattern; - this.cachePatternMatching = cachePatternMatching; - - // Path to the root directory to be monitored - // The end of the path in the configuration file can be filled with the file regular that needs to be matched - // Note that if "/" is not written at the end of the path, the end field will be treated as a regular - String filePatternParent = ""; - - if (filePattern.charAt(filePattern.length() - 1) == '/') { - filePatternParent = filePattern; - regex = ""; - } else { - String[] res = filePattern.split("\\/"); - List list = new ArrayList<>(Arrays.asList(res)); - regex = list.remove(list.size() - 1); - filePatternParent = StringUtils.join(list, "/"); - } - - File f = new File(filePatternParent); - - // Scan from the top directory - // Scan out all subdirectories and put them into cache - getFileGroupChild(f, this.parentDirList); - - Preconditions.checkState(f.exists(), - "Directory does not exist: " + f.getAbsolutePath()); + private static final Logger logger = LoggerFactory.getLogger(TaildirIncludeChildMatcher.class); + private static final FileSystem FS = FileSystems.getDefault(); + + // flag from configuration to switch off caching completely + private final boolean cachePatternMatching; + // id from configuration + private final String fileGroup; + // plain string of the desired files from configuration + private final String filePattern; + + // directory monitored for changes + private Set parentDirList = Sets.newLinkedHashSet(); + + // Key is file path + // Value is a two tuple, contains the lastSeenParentDirMTime and lastCheckedTime of the file + private Map lastTimeMap = Maps.newHashMap(); + + // cached content, files which matched the pattern within the parent directory + private Set lastMatchedFiles = Sets.newHashSet(); + + // Array version cache of lastMatchedFilesSet, use this cache when the files has not been changed + private List lastMatchedFilesCache = new ArrayList<>(); + + // file regex + private final String regex; + + TaildirIncludeChildMatcher(String fileGroup, String filePattern, boolean cachePatternMatching) { + // store whatever came from configuration + this.fileGroup = fileGroup; + this.filePattern = filePattern; + this.cachePatternMatching = cachePatternMatching; + + // Path to the root directory to be monitored + // The end of the path in the configuration file can be filled + // with the file regular that needs to be matched + // Note that if "/" is not written at the end of the path, + // the end field will be treated as a regular + String filePatternParent = ""; + + if (filePattern.charAt(filePattern.length() - 1) == '/') { + filePatternParent = filePattern; + regex = ""; + } else { + String[] res = filePattern.split("\\/"); + List list = new ArrayList<>(Arrays.asList(res)); + regex = list.remove(list.size() - 1); + filePatternParent = StringUtils.join(list, "/"); } - /** - * Lists those files within the parentDir and subdirectory that match regex pattern passed in during object - * instantiation. Designed for frequent periodic invocation - * {@link org.apache.flume.source.PollableSourceRunner}. - *

- * Based on the modification of the parentDirList(parentDir and its subdirectories) this function may trigger cache recalculation by - * calling {@linkplain #updateMatchingFilesNoCache(File, List)} or - * return the value stored in {@linkplain #lastMatchedFilesCache}. - * Parentdir is allowed to be a symbolic link. - *

- * Files returned by this call are weakly consistent (see {@link DirectoryStream}). - * It does not freeze the directory while iterating, - * so it may (or may not) reflect updates to the directory that occur during the call, - * In which case next call - * will return those files (as mtime is increasing it won't hit cache but trigger recalculation). - * It is guaranteed that invocation reflects every change which was observable at the time of - * invocation. - *

- * Matching file list recalculation is triggered when caching was turned off or - * if mtime is greater than the previously seen mtime - * (including the case of cache hasn't been calculated before). - * Additionally if a constantly updated directory was configured as parentDir and its subdirectories - * then multiple changes to the parentDirList may happen - * within the same second so in such case (assuming at least second granularity of reported mtime) - * it is impossible to tell whether a change of the dir happened before the check or after - * (unless the check happened after that second). - * Having said that implementation also stores system time of the previous invocation and previous - * invocation has to happen strictly after the current mtime to avoid further cache refresh - * (because then it is guaranteed that previous invocation resulted in valid cache content). - * If system clock hasn't passed the second of - * the current mtime then logic expects more changes as well - * (since it cannot be sure that there won't be any further changes still in that second - * and it would like to avoid data loss in first place) - * hence it recalculates matching files. If system clock finally - * passed actual mtime then a subsequent invocation guarantees that it picked up every - * change from the passed second so - * any further invocations can be served from cache associated with that second - * (given mtime is not updated again). - *

- * Only the changed directories and new subdirectories will be rescanned each time - * - * @return List of files matching the pattern sorted by last modification time. No recursion. - * No directories. If nothing matches then returns an empty list. If I/O issue occurred then - * returns the list collected to the point when exception was thrown. - */ - public List getMatchingFiles() { - boolean lastMatchedFilesHasChange = false; - - long now = TimeUnit.SECONDS.toMillis( - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())); - - List nonExistDirList = new ArrayList<>(); - - List newDirList = new ArrayList<>(); - - // Traverse all monitored directories - // if any directory changes are found, recalculate the file list of the directory - for (File dir : this.parentDirList) { - long currentParentDirMTime = dir.lastModified(); - - if (currentParentDirMTime == 0) { - this.lastTimeMap.remove(dir.getPath()); - nonExistDirList.add(dir); - continue; - } - - LastTimeTuple lastTimeTuple = lastTimeMap.get(dir.getPath()); - if (lastTimeTuple == null) { - lastTimeTuple = new LastTimeTuple(); - lastTimeMap.put(dir.getPath(), lastTimeTuple); - } - - Long lastSeenParentDirMTime = lastTimeTuple.getLastSeenParentDirMTime(); - Long lastCheckedTime = lastTimeTuple.getLastCheckedTime(); - - // calculate matched files if - // - we don't want to use cache (recalculate every time) OR - // - directory was clearly updated after the last check OR - // - last mtime change wasn't already checked for sure - // (system clock hasn't passed that second yet) - if (!cachePatternMatching || - lastSeenParentDirMTime < currentParentDirMTime || - lastCheckedTime < currentParentDirMTime) { - lastMatchedFilesHasChange = true; - - updateMatchingFilesNoCache(dir, newDirList); - lastTimeTuple.setLastSeenParentDirMTime(currentParentDirMTime); - lastTimeTuple.setLastCheckedTime(now); - } - } - - if (!nonExistDirList.isEmpty()) { - this.parentDirList.removeAll(nonExistDirList); - } - - if (!newDirList.isEmpty()) { - this.parentDirList.addAll(newDirList); - } - - if (lastMatchedFilesHasChange) { - this.lastMatchedFilesCache = sortByLastModifiedTime(new ArrayList<>(this.lastMatchedFiles)); - } + File f = new File(filePatternParent); + + // Scan from the top directory + // Scan out all subdirectories and put them into cache + getFileGroupChild(f, this.parentDirList); + + Preconditions.checkState(f.exists(), + "Directory does not exist: " + f.getAbsolutePath()); + } + + /** + * Lists those files within the parentDir + * and subdirectory that match regex pattern passed in during object + * instantiation. Designed for frequent periodic invocation + * {@link org.apache.flume.source.PollableSourceRunner}. + *

+ * Based on the modification of the parentDirList(parentDir and its subdirectories) + * this function may trigger cache recalculation by + * calling {@linkplain #updateMatchingFilesNoCache(File, List)} or + * return the value stored in {@linkplain #lastMatchedFilesCache}. + * Parentdir is allowed to be a symbolic link. + *

+ * Files returned by this call are weakly consistent (see {@link DirectoryStream}). + * It does not freeze the directory while iterating, + * so it may (or may not) reflect updates to the directory that occur during the call, + * In which case next call + * will return those files (as mtime is increasing it won't hit cache but trigger recalculation). + * It is guaranteed that invocation reflects every change which was observable at the time of + * invocation. + *

+ * Matching file list recalculation is triggered when caching was turned off or + * if mtime is greater than the previously seen mtime + * (including the case of cache hasn't been calculated before). + * Additionally if a constantly updated directory was configured + * as parentDir and its subdirectories + * then multiple changes to the parentDirList may happen + * within the same second so in such case (assuming at least second granularity of reported mtime) + * it is impossible to tell whether a change of the dir happened before the check or after + * (unless the check happened after that second). + * Having said that implementation also stores system time of the previous invocation and previous + * invocation has to happen strictly after the current mtime to avoid further cache refresh + * (because then it is guaranteed that previous invocation resulted in valid cache content). + * If system clock hasn't passed the second of + * the current mtime then logic expects more changes as well + * (since it cannot be sure that there won't be any further changes still in that second + * and it would like to avoid data loss in first place) + * hence it recalculates matching files. If system clock finally + * passed actual mtime then a subsequent invocation guarantees that it picked up every + * change from the passed second so + * any further invocations can be served from cache associated with that second + * (given mtime is not updated again). + *

+ * Only the changed directories and new subdirectories will be rescanned each time + * + * @return List of files matching the pattern sorted by last modification time. No recursion. + * No directories. If nothing matches then returns an empty list. If I/O issue occurred then + * returns the list collected to the point when exception was thrown. + */ + public List getMatchingFiles() { + boolean lastMatchedFilesHasChange = false; + + long now = TimeUnit.SECONDS.toMillis( + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())); + + List nonExistDirList = new ArrayList<>(); + + List newDirList = new ArrayList<>(); + + // Traverse all monitored directories + // if any directory changes are found, recalculate the file list of the directory + for (File dir : this.parentDirList) { + long currentParentDirMTime = dir.lastModified(); + + if (currentParentDirMTime == 0) { + this.lastTimeMap.remove(dir.getPath()); + nonExistDirList.add(dir); + continue; + } + + LastTimeTuple lastTimeTuple = lastTimeMap.get(dir.getPath()); + if (lastTimeTuple == null) { + lastTimeTuple = new LastTimeTuple(); + lastTimeMap.put(dir.getPath(), lastTimeTuple); + } + + Long lastSeenParentDirMTime = lastTimeTuple.getLastSeenParentDirMTime(); + Long lastCheckedTime = lastTimeTuple.getLastCheckedTime(); + + // calculate matched files if + // - we don't want to use cache (recalculate every time) OR + // - directory was clearly updated after the last check OR + // - last mtime change wasn't already checked for sure + // (system clock hasn't passed that second yet) + if (!cachePatternMatching || + lastSeenParentDirMTime < currentParentDirMTime || + lastCheckedTime < currentParentDirMTime) { + lastMatchedFilesHasChange = true; + + updateMatchingFilesNoCache(dir, newDirList); + lastTimeTuple.setLastSeenParentDirMTime(currentParentDirMTime); + lastTimeTuple.setLastCheckedTime(now); + } + } - return this.lastMatchedFilesCache; + if (!nonExistDirList.isEmpty()) { + this.parentDirList.removeAll(nonExistDirList); } - /** - * Provides the actual files within the parentDir which - * files are matching the regex pattern. Each invocation uses {@link Pattern} - * to identify matching files. - * - * Files returned by this call are weakly consistent (new files will be set {@linkplain #lastMatchedFilesCache}). - * It does not freeze the directory while iterating, so it may (or may not) reflect updates - * to the directory that occur during the call. In which case next call will return those files. - * - * New dir will be stored in the parameters newDirList - * New file will be stored in the {@linkplain #lastMatchedFiles} - * - * @param dir Directory to be scanned - * @param newDirList Used to store the new directory - */ - private void updateMatchingFilesNoCache(File dir, List newDirList) { - try (DirectoryStream stream = Files.newDirectoryStream(dir.toPath())) { - if (stream != null) { - for (Path child : stream) { - if (Files.isDirectory(child)) { - File newDir = child.toFile(); - if (!this.parentDirList.contains(newDir)) { - newDirList.add(newDir); - updateMatchingFilesNoCache(newDir, newDirList); - } - } else { - if (child.toString().matches(regex) || regex == "") { - this.lastMatchedFiles.add(child.toFile()); - } - - } - } - } + if (!newDirList.isEmpty()) { + this.parentDirList.addAll(newDirList); + } - } catch (IOException e) { - logger.error("I/O exception occurred while listing parent directory. " + - "Files already matched will be returned. " + dir.toPath(), e); - } + if (lastMatchedFilesHasChange) { + this.lastMatchedFilesCache = sortByLastModifiedTime(new ArrayList<>(this.lastMatchedFiles)); } - /** - * Scan all subdirectories of the specified directory and cache - * - * @param fileGroup Directory to be scanned - * @param fileGroupList Store the parent directory and its subdirectories - * @return void - */ - private static void getFileGroupChild(File fileGroup, Set fileGroupList) { - fileGroupList.add(fileGroup); - - if (fileGroup.listFiles() != null) { - for (File child : fileGroup.listFiles()) { - if (Files.isDirectory(child.toPath())) { - getFileGroupChild(child, fileGroupList); - } + return this.lastMatchedFilesCache; + } + + /** + * Provides the actual files within the parentDir which + * files are matching the regex pattern. Each invocation uses {@link Pattern} + * to identify matching files. + *

+ * Files returned by this call are weakly consistent + * (new files will be set {@linkplain #lastMatchedFilesCache}). + * It does not freeze the directory while iterating, so it may (or may not) reflect updates + * to the directory that occur during the call. In which case next call will return those files. + *

+ * New dir will be stored in the parameters newDirList + * New file will be stored in the {@linkplain #lastMatchedFiles} + * + * @param dir Directory to be scanned + * @param newDirList Used to store the new directory + */ + private void updateMatchingFilesNoCache(File dir, List newDirList) { + try (DirectoryStream stream = Files.newDirectoryStream(dir.toPath())) { + if (stream != null) { + for (Path child : stream) { + if (Files.isDirectory(child)) { + File newDir = child.toFile(); + if (!this.parentDirList.contains(newDir)) { + newDirList.add(newDir); + updateMatchingFilesNoCache(newDir, newDirList); + } + } else { + if (child.toString().matches(regex) || regex == "") { + this.lastMatchedFiles.add(child.toFile()); } + } } - + } + } catch (IOException e) { + logger.error("I/O exception occurred while listing parent directory. " + + "Files already matched will be returned. " + dir.toPath(), e); } - - /** - * Utility function to sort matched files based on last modification time. - * Sorting itself use only a snapshot of last modification times captured before the sorting - * to keep the number of stat system calls to the required minimum. - * - * @param files list of files in any order - * @return sorted list - */ - private static List sortByLastModifiedTime(List files) { - final HashMap lastModificationTimes = new HashMap(files.size()); - for (File f : files) { - lastModificationTimes.put(f, f.lastModified()); + } + + /** + * Scan all subdirectories of the specified directory and cache + * + * @param fileGroup Directory to be scanned + * @param fileGroupList Store the parent directory and its subdirectories + * @return void + */ + private static void getFileGroupChild(File fileGroup, Set fileGroupList) { + fileGroupList.add(fileGroup); + + if (fileGroup.listFiles() != null) { + for (File child : fileGroup.listFiles()) { + if (Files.isDirectory(child.toPath())) { + getFileGroupChild(child, fileGroupList); } - Collections.sort(files, new Comparator() { - @Override - public int compare(File o1, File o2) { - return lastModificationTimes.get(o1).compareTo(lastModificationTimes.get(o2)); - } - }); - - return files; - } - - @Override - public String toString() { - return "{" + - "filegroup='" + fileGroup + '\'' + - ", filePattern='" + filePattern + '\'' + - ", cached=" + cachePatternMatching + - '}'; + } } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TaildirIncludeChildMatcher that = (TaildirIncludeChildMatcher) o; - - return fileGroup.equals(that.fileGroup); - + } + + /** + * Utility function to sort matched files based on last modification time. + * Sorting itself use only a snapshot of last modification times captured before the sorting + * to keep the number of stat system calls to the required minimum. + * + * @param files list of files in any order + * @return sorted list + */ + private static List sortByLastModifiedTime(List files) { + final HashMap lastModificationTimes = new HashMap(files.size()); + for (File f : files) { + lastModificationTimes.put(f, f.lastModified()); } - - @Override - public int hashCode() { - return fileGroup.hashCode(); + Collections.sort(files, new Comparator() { + @Override + public int compare(File o1, File o2) { + return lastModificationTimes.get(o1).compareTo(lastModificationTimes.get(o2)); + } + }); + + return files; + } + + @Override + public String toString() { + return "{" + + "filegroup='" + fileGroup + '\'' + + ", filePattern='" + filePattern + '\'' + + ", cached=" + cachePatternMatching + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TaildirIncludeChildMatcher that = (TaildirIncludeChildMatcher) o; + + return fileGroup.equals(that.fileGroup); + } + + @Override + public int hashCode() { + return fileGroup.hashCode(); + } + + public String getFileGroup() { + return fileGroup; + } + + @Override + public void deleteFileCache(File f) { + this.lastMatchedFiles.remove(f); + } + + private class LastTimeTuple { + // system time in milliseconds, stores the last modification time of the + // parent directory seen by the last check, rounded to seconds + // initial value is used in first check only when it will be replaced instantly + // (system time is positive) + private long lastSeenParentDirMTime = -1; + + // system time in milliseconds, time of the last check, rounded to seconds + // initial value is used in first check only when it will be replaced instantly + // (system time is positive) + private long lastCheckedTime = -1; + + public long getLastCheckedTime() { + return lastCheckedTime; } - public String getFileGroup() { - return fileGroup; + public long getLastSeenParentDirMTime() { + return lastSeenParentDirMTime; } - @Override - public void deleteFileCache(File f) { - this.lastMatchedFiles.remove(f); + public void setLastCheckedTime(long lastCheckedTime) { + this.lastCheckedTime = lastCheckedTime; } - private class LastTimeTuple { - // system time in milliseconds, stores the last modification time of the - // parent directory seen by the last check, rounded to seconds - // initial value is used in first check only when it will be replaced instantly - // (system time is positive) - private long lastSeenParentDirMTime = -1; - - // system time in milliseconds, time of the last check, rounded to seconds - // initial value is used in first check only when it will be replaced instantly - // (system time is positive) - private long lastCheckedTime = -1; - - public long getLastCheckedTime() { - return lastCheckedTime; - } - - public long getLastSeenParentDirMTime() { - return lastSeenParentDirMTime; - } - - public void setLastCheckedTime(long lastCheckedTime) { - this.lastCheckedTime = lastCheckedTime; - } - - public void setLastSeenParentDirMTime(long lastSeenParentDirMTime) { - this.lastSeenParentDirMTime = lastSeenParentDirMTime; - } + public void setLastSeenParentDirMTime(long lastSeenParentDirMTime) { + this.lastSeenParentDirMTime = lastSeenParentDirMTime; } + } } diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java index e1d33df643..cedacbba98 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java @@ -157,7 +157,8 @@ public String toString() { public synchronized void configure(Context context) { String fileGroups = context.getString(FILE_GROUPS); String fileGroupsIncludeChild = context.getString(FILE_GROUPS_INCLUDE_CHILD); - Preconditions.checkState(fileGroups != null || fileGroupsIncludeChild !=null, "Missing param: " + FILE_GROUPS); + Preconditions.checkState(fileGroups != null || + fileGroupsIncludeChild != null, "Missing param: " + FILE_GROUPS); Map filePathsMap = context.getSubProperties(FILE_GROUPS_PREFIX); if (!filePathsMap.isEmpty()) { @@ -167,12 +168,14 @@ public synchronized void configure(Context context) { "Mapping for tailing files is empty or invalid: '" + FILE_GROUPS_PREFIX + "'"); } - Map filePathsIncludeChildMap = context.getSubProperties(FILE_GROUPS_INCLUDE_CHILD_PREFIX); - if (!filePathsIncludeChildMap.isEmpty()){ + Map filePathsIncludeChildMap = + context.getSubProperties(FILE_GROUPS_INCLUDE_CHILD_PREFIX); + if (!filePathsIncludeChildMap.isEmpty()) { filePathsIncludeChild = selectByKeys(filePathsIncludeChildMap, fileGroupsIncludeChild.split("\\s+")); Preconditions.checkState(!filePathsIncludeChild.isEmpty(), - "Mapping for tailing files is empty or invalid: '" + FILE_GROUPS_INCLUDE_CHILD_PREFIX + "'"); + "Mapping for tailing files is empty or invalid: '" + + FILE_GROUPS_INCLUDE_CHILD_PREFIX + "'"); } String homePath = System.getProperty("user.home").replace('\\', '/'); From 50f2934bd8a9aea0db910a063b31010c4aacbe3c Mon Sep 17 00:00:00 2001 From: Chen768959 <934103231@qq.com> Date: Thu, 31 Dec 2020 16:39:39 +0800 Subject: [PATCH 4/6] Fix spotbugs bugs --- .../taildir/ReliableTaildirEventReader.java | 2 +- .../apache/flume/source/taildir/TailMatcher.java | 2 +- .../taildir/TaildirIncludeChildMatcher.java | 15 +++++++++------ .../flume/source/taildir/TaildirMatcher.java | 4 +++- 4 files changed, 14 insertions(+), 9 deletions(-) diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java index c335b98f26..264a0c6c0f 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java @@ -199,7 +199,7 @@ public List readEvents(int numEvents, boolean backoffWithoutNL) throws IOException { if (!committed) { if (currentFile == null) { - throw new IllegalStateException("current file does not exist. " + currentFile.getPath()); + throw new IllegalStateException("current file does not exist. "); } logger.info("Last read was never committed - resetting position"); long lastPos = currentFile.getPos(); diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailMatcher.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailMatcher.java index 7bb4d4d6d3..2ab74f81a3 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailMatcher.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailMatcher.java @@ -30,5 +30,5 @@ public interface TailMatcher { String getFileGroup(); - void deleteFileCache(File f); + void deleteFileCache(File file); } diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java index 0e6d80b6c7..b87d7a3837 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java @@ -106,7 +106,7 @@ public class TaildirIncludeChildMatcher implements TailMatcher { // with the file regular that needs to be matched // Note that if "/" is not written at the end of the path, // the end field will be treated as a regular - String filePatternParent = ""; + String filePatternParent; if (filePattern.charAt(filePattern.length() - 1) == '/') { filePatternParent = filePattern; @@ -176,6 +176,7 @@ public class TaildirIncludeChildMatcher implements TailMatcher { * No directories. If nothing matches then returns an empty list. If I/O issue occurred then * returns the list collected to the point when exception was thrown. */ + @Override public List getMatchingFiles() { boolean lastMatchedFilesHasChange = false; @@ -286,8 +287,9 @@ private void updateMatchingFilesNoCache(File dir, List newDirList) { private static void getFileGroupChild(File fileGroup, Set fileGroupList) { fileGroupList.add(fileGroup); - if (fileGroup.listFiles() != null) { - for (File child : fileGroup.listFiles()) { + File[] listFiles = fileGroup.listFiles(); + if (listFiles != null) { + for (File child : listFiles) { if (Files.isDirectory(child.toPath())) { getFileGroupChild(child, fileGroupList); } @@ -342,16 +344,17 @@ public int hashCode() { return fileGroup.hashCode(); } + @Override public String getFileGroup() { return fileGroup; } @Override - public void deleteFileCache(File f) { - this.lastMatchedFiles.remove(f); + public void deleteFileCache(File file) { + this.lastMatchedFiles.remove(file); } - private class LastTimeTuple { + private static class LastTimeTuple { // system time in milliseconds, stores the last modification time of the // parent directory seen by the last check, rounded to seconds // initial value is used in first check only when it will be replaced instantly diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java index a0392262e4..f8ac8a8c99 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java @@ -180,6 +180,7 @@ public boolean accept(Path entry) throws IOException { * * @see #getMatchingFilesNoCache() */ + @Override public List getMatchingFiles() { long now = TimeUnit.SECONDS.toMillis( TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())); @@ -279,6 +280,7 @@ public int hashCode() { return fileGroup.hashCode(); } + @Override public String getFileGroup() { return fileGroup; } @@ -287,7 +289,7 @@ public String getFileGroup() { // This matcher object is updated all caches will be automatically refreshed, // so there is no need to delete them @Override - public void deleteFileCache(File f) { + public void deleteFileCache(File file) { } From 0ba25977493b96d41ca6590862f8e770d16bb45c Mon Sep 17 00:00:00 2001 From: Chen768959 <934103231@qq.com> Date: Tue, 5 Jan 2021 20:48:18 +0800 Subject: [PATCH 5/6] Fix pmd bugs --- .../apache/flume/source/taildir/TaildirIncludeChildMatcher.java | 1 - .../java/org/apache/flume/source/taildir/TaildirMatcher.java | 1 - 2 files changed, 2 deletions(-) diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java index b87d7a3837..02bf952d1d 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java @@ -70,7 +70,6 @@ @InterfaceStability.Evolving public class TaildirIncludeChildMatcher implements TailMatcher { private static final Logger logger = LoggerFactory.getLogger(TaildirIncludeChildMatcher.class); - private static final FileSystem FS = FileSystems.getDefault(); // flag from configuration to switch off caching completely private final boolean cachePatternMatching; diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java index f8ac8a8c99..d7820dbd3e 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java @@ -185,7 +185,6 @@ public List getMatchingFiles() { long now = TimeUnit.SECONDS.toMillis( TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())); long currentParentDirMTime = parentDir.lastModified(); - List result; // calculate matched files if // - we don't want to use cache (recalculate every time) OR From 7960f40ec13087d23c11e27f6f8e81d3764624bf Mon Sep 17 00:00:00 2001 From: Chen768959 <934103231@qq.com> Date: Tue, 5 Jan 2021 21:08:29 +0800 Subject: [PATCH 6/6] Fix unusedImports --- .../apache/flume/source/taildir/TaildirIncludeChildMatcher.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java index 02bf952d1d..92578f55f4 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirIncludeChildMatcher.java @@ -30,8 +30,6 @@ import java.io.File; import java.io.IOException; -import java.nio.file.FileSystem; -import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.DirectoryStream;