diff --git a/.gitignore b/.gitignore index 252ad80b..86b9ad2d 100644 --- a/.gitignore +++ b/.gitignore @@ -8,11 +8,6 @@ spark-doris-connector/output/ spark-doris-connector/target/ spark-doris-connector/.idea/ -spark-load/spark-dpp/dependency-reduced-pom.xml -spark-load/spark-dpp/output/ -spark-load/spark-dpp/target/ -spark-load/spark-dpp/.idea/ - spark-load/target spark-load/spark-load-core/dependency-reduced-pom.xml spark-load/spark-load-core/output/ diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java index b6264edc..3b6be497 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java @@ -87,7 +87,7 @@ public int read() throws IOException { if (read < 0) { return -1; } else { - return bytes[0] & 0xFF; + return bytes[0]; } } diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala index faa08d7d..06bb56ff 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala @@ -498,12 +498,14 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader val loadResponse: StreamLoadResponse = StreamLoadResponse(code, msg, content) if (loadResponse.code != HttpStatus.SC_OK) { - throw new StreamLoadException(String.format("stream load error, http status:%d, msg:%s", - new Integer(loadResponse.code), loadResponse.msg)) + LOG.error(s"Stream load http status is not OK, status: ${loadResponse.code}, response: $loadResponse") + throw new StreamLoadException(String.format("stream load error, http status:%d, response:%s", + new Integer(loadResponse.code), loadResponse)) } else { try { val respContent = MAPPER.readValue(loadResponse.content, classOf[RespContent]) if (!respContent.isSuccess) { + LOG.error(s"Stream load status is not success, status:${respContent.getStatus}, response:$loadResponse") throw new StreamLoadException(String.format("stream load error, load status:%s, response:%s", respContent.getStatus, loadResponse)) } LOG.info("Stream load Response:{}", loadResponse) diff --git a/spark-load/pom.xml b/spark-load/pom.xml index 1816125f..60a41168 100644 --- a/spark-load/pom.xml +++ b/spark-load/pom.xml @@ -30,6 +30,7 @@ spark-load-core spark-load-dpp spark-load-dist + spark-load-common @@ -37,7 +38,6 @@ 1.8 UTF-8 1.0-SNAPSHOT - 1.2-SNAPSHOT 1.13 3.9 3.3.6 @@ -56,49 +56,11 @@ 2.0.7 1.2 1.12.669 + 0.8.13 - - org.apache.doris - fe-common - ${doris.fe.version} - - - org.apache.logging.log4j - log4j-1.2-api - - - org.apache.logging.log4j - log4j-api - - - org.apache.logging.log4j - log4j-core - - - commons-logging - commons-logging - - - org.slf4j - slf4j-api - - - com.fasterxml.jackson.core - jackson-core - - - com.fasterxml.jackson.core - jackson-databind - - - com.fasterxml.jackson.core - jackson-annotations - - - commons-codec @@ -372,7 +334,11 @@ commons-logging ${commons-logging.version} - + + org.roaringbitmap + RoaringBitmap + ${RoaringBitmap.version} + diff --git a/spark-load/spark-load-common/pom.xml b/spark-load/spark-load-common/pom.xml new file mode 100644 index 00000000..cbab0271 --- /dev/null +++ b/spark-load/spark-load-common/pom.xml @@ -0,0 +1,39 @@ + + + 4.0.0 + + org.apache.doris + spark-load + ${revision} + + + spark-load-common + + + 8 + 8 + UTF-8 + + + + + com.fasterxml.jackson.core + jackson-databind + + + com.google.guava + guava + + + org.roaringbitmap + RoaringBitmap + + + commons-codec + commons-codec + + + + \ No newline at end of file diff --git a/spark-load/spark-load-core/src/main/java/org/apache/doris/common/DppResult.java b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/DppResult.java similarity index 96% rename from spark-load/spark-load-core/src/main/java/org/apache/doris/common/DppResult.java rename to spark-load/spark-load-common/src/main/java/org/apache/doris/common/DppResult.java index a27445bb..3daa6541 100644 --- a/spark-load/spark-load-core/src/main/java/org/apache/doris/common/DppResult.java +++ b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/DppResult.java @@ -21,6 +21,9 @@ import java.io.Serializable; +/** + * Copied from Apache Doris org.apache.doris.sparkdpp.DppResult + */ public class DppResult implements Serializable { @JsonProperty(value = "is_success", required = true) diff --git a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/io/BitmapValue.java b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/io/BitmapValue.java index 04bb368f..d6fd410b 100644 --- a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/io/BitmapValue.java +++ b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/io/BitmapValue.java @@ -6,6 +6,9 @@ import java.io.DataOutput; import java.io.IOException; +/** + * Copied from Apache Doris + */ public class BitmapValue { public static final int EMPTY = 0; diff --git a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/io/Codec.java b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/io/Codec.java index 2d783a3f..3c57a0f1 100644 --- a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/io/Codec.java +++ b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/io/Codec.java @@ -21,6 +21,9 @@ import java.io.DataOutput; import java.io.IOException; +/** + * Copied from Apache Doris + */ public class Codec { // not support encode negative value now diff --git a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/io/Hll.java b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/io/Hll.java index 8f8042ee..427543f8 100644 --- a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/io/Hll.java +++ b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/io/Hll.java @@ -9,6 +9,9 @@ import java.util.HashSet; import java.util.Set; +/** + * Copied from Apache Doris + */ public class Hll { public static final byte HLL_DATA_EMPTY = 0; diff --git a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/io/Roaring64Map.java b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/io/Roaring64Map.java index 85db5853..67b1e765 100644 --- a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/io/Roaring64Map.java +++ b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/io/Roaring64Map.java @@ -27,6 +27,9 @@ import java.util.SortedMap; import java.util.TreeMap; +/** + * Copied from Apache Doris + */ public class Roaring64Map { private static final boolean DEFAULT_ORDER_IS_SIGNED = false; diff --git a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/AutoType.java b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/AutoType.java index eeebc76e..f65a9fdf 100644 --- a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/AutoType.java +++ b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/AutoType.java @@ -19,25 +19,15 @@ * |Short |short * |Integer |int * |Float |float - * |Long |long + * |Long |longFieldReflection * |Double |double + *

+ * Copied from Apache Doris */ public class AutoType { private static final Map, Class> PRIMITIVE_TO_WRAPPER = new HashMap(); private static final Map, Class> WRAPPER_TO_PRIMITIVE = new HashMap(); - public static boolean isWrapperOfPrimitiveType(Class type) { - return WRAPPER_TO_PRIMITIVE.containsKey(type); - } - - public static Class getPrimitiveType(Class wrapperType) { - return WRAPPER_TO_PRIMITIVE.get(wrapperType); - } - - public static Class getWrapperType(Class primitiveType) { - return PRIMITIVE_TO_WRAPPER.get(primitiveType); - } - static { WRAPPER_TO_PRIMITIVE.put(Boolean.class, Boolean.TYPE); WRAPPER_TO_PRIMITIVE.put(Character.class, Character.TYPE); @@ -57,4 +47,16 @@ public static Class getWrapperType(Class primitiveType) { PRIMITIVE_TO_WRAPPER.put(Long.TYPE, Long.class); PRIMITIVE_TO_WRAPPER.put(Double.TYPE, Double.class); } + + public static boolean isWrapperOfPrimitiveType(Class type) { + return WRAPPER_TO_PRIMITIVE.containsKey(type); + } + + public static Class getPrimitiveType(Class wrapperType) { + return WRAPPER_TO_PRIMITIVE.get(wrapperType); + } + + public static Class getWrapperType(Class primitiveType) { + return PRIMITIVE_TO_WRAPPER.get(primitiveType); + } } diff --git a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/ConstructorReflection.java b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/ConstructorReflection.java index 2fce0e56..4b437ce4 100644 --- a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/ConstructorReflection.java +++ b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/ConstructorReflection.java @@ -12,6 +12,8 @@ /** * Modify from mockit.internal.util.ConstructorReflection JMockit v1.13 * Util class to invoke constructor of specified class. + *

+ * Copied from Apache Doris */ public final class ConstructorReflection { @@ -129,7 +131,8 @@ private static Constructor findCompatibleConstructor(Class theClass, C int gap = declaredParamTypes.length - argTypes.length; if (gap == 0 && (ParameterReflection.matchesParameterTypes(declaredParamTypes, argTypes) || ParameterReflection.acceptsArgumentTypes(declaredParamTypes, argTypes)) - && (found == null || ParameterReflection.hasMoreSpecificTypes(declaredParamTypes, foundParameters))) { + && + (found == null || ParameterReflection.hasMoreSpecificTypes(declaredParamTypes, foundParameters))) { found = (Constructor) declaredConstructor; foundParameters = declaredParamTypes; } @@ -143,10 +146,12 @@ private static Constructor findCompatibleConstructor(Class theClass, C // check if this constructor is belong to a inner class // the parameter[0] of inner class's constructor is a instance of outer class if (paramTypes[0] == declaringClass && paramTypes.length > argTypes.length) { - throw new IllegalArgumentException("Invalid instantiation of inner class; use newInnerInstance instead"); + throw new IllegalArgumentException( + "Invalid instantiation of inner class; use newInnerInstance instead"); } else { String argTypesDesc = ParameterReflection.getParameterTypesDescription(argTypes); - throw new IllegalArgumentException("No compatible constructor found: " + theClass.getSimpleName() + argTypesDesc); + throw new IllegalArgumentException( + "No compatible constructor found: " + theClass.getSimpleName() + argTypesDesc); } } } diff --git a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/Deencapsulation.java b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/Deencapsulation.java index 5fb33717..74362e0c 100644 --- a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/Deencapsulation.java +++ b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/Deencapsulation.java @@ -7,6 +7,8 @@ /** * Modify from mockit.internal.util.Deencapsulation JMockit ver1.13 + *

+ * Copied from Apache Doris */ public final class Deencapsulation { private Deencapsulation() { diff --git a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/FieldReflection.java b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/FieldReflection.java index f37aedee..04c6d9cd 100644 --- a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/FieldReflection.java +++ b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/FieldReflection.java @@ -12,10 +12,11 @@ import java.lang.reflect.Type; import java.lang.reflect.TypeVariable; - /** * Modify from mockit.internal.util.FieldReflection JMockit v1.13 * Util class to set and get the value of specified field. + *

+ * Copied from Apache Doris */ public final class FieldReflection { private FieldReflection() { diff --git a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/GeneratedClasses.java b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/GeneratedClasses.java index 1aae3418..1281f4ed 100644 --- a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/GeneratedClasses.java +++ b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/GeneratedClasses.java @@ -10,6 +10,8 @@ /** * Modify from mockit.internal.util.GeneratedClasses JMockit v1.13 * Helper class to return type of mocked-object + *

+ * Copied from Apache Doris */ public final class GeneratedClasses { private static final String IMPLCLASS_PREFIX = "$Impl_"; diff --git a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/MethodReflection.java b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/MethodReflection.java index 749e2e7c..293e9816 100644 --- a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/MethodReflection.java +++ b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/MethodReflection.java @@ -13,6 +13,8 @@ /** * Modify from mockit.internal.util.MethodReflection JMockit v1.13 * Util class to get and invoke method from specified class. + *

+ * Copied from Apache Doris */ public final class MethodReflection { private MethodReflection() { @@ -24,9 +26,11 @@ public static T invoke(Class theClass, Object targetInstance, String meth } boolean staticMethod = targetInstance == null; Class[] argTypes = ParameterReflection.getArgumentTypesFromArgumentValues(methodArgs); - Method method = staticMethod ? findCompatibleStaticMethod(theClass, methodName, argTypes) : findCompatibleMethod(theClass, methodName, argTypes); + Method method = staticMethod ? findCompatibleStaticMethod(theClass, methodName, argTypes) : + findCompatibleMethod(theClass, methodName, argTypes); if (staticMethod && !Modifier.isStatic(method.getModifiers())) { - throw new IllegalArgumentException("Attempted to invoke non-static method without an instance to invoke it on"); + throw new IllegalArgumentException( + "Attempted to invoke non-static method without an instance to invoke it on"); } else { T result = invoke(targetInstance, method, methodArgs); return result; @@ -110,7 +114,7 @@ private static Method findCompatibleMethodInClass(Class theClass, String meth if (gap == 0 && (ParameterReflection.matchesParameterTypes(declaredParamTypes, argTypes) || ParameterReflection.acceptsArgumentTypes(declaredParamTypes, argTypes)) && (foundParamTypes == null - || ParameterReflection.hasMoreSpecificTypes(declaredParamTypes, foundParamTypes))) { + || ParameterReflection.hasMoreSpecificTypes(declaredParamTypes, foundParamTypes))) { found = declaredMethod; foundParamTypes = declaredParamTypes; } @@ -132,7 +136,9 @@ private static Method findCompatibleMethodIfAvailable(Class theClass, String while (true) { Method compatibleMethod = findCompatibleMethodInClass(theClass, methodName, argTypes); - if (compatibleMethod != null && (methodFound == null || ParameterReflection.hasMoreSpecificTypes(compatibleMethod.getParameterTypes(), methodFound.getParameterTypes()))) { + if (compatibleMethod != null && (methodFound == null || + ParameterReflection.hasMoreSpecificTypes(compatibleMethod.getParameterTypes(), + methodFound.getParameterTypes()))) { methodFound = compatibleMethod; } @@ -146,7 +152,6 @@ private static Method findCompatibleMethodIfAvailable(Class theClass, String } - // ensure that field is accessible public static void makeAccessible(AccessibleObject classMember) { if (!classMember.isAccessible()) { diff --git a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/ParameterReflection.java b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/ParameterReflection.java index 84a54dfd..6a6efc11 100644 --- a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/ParameterReflection.java +++ b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/jmockit/ParameterReflection.java @@ -10,6 +10,8 @@ /** * Modify from mockit.internal.util.ParameterReflection JMockit v1.13 * Util class to verify parameter of methods. + *

+ * Copied from Apache Doris */ public final class ParameterReflection { public static final Class[] NO_PARAMETERS = new Class[0]; diff --git a/spark-load/spark-load-common/src/main/java/org/apache/doris/config/EtlJobConfig.java b/spark-load/spark-load-common/src/main/java/org/apache/doris/config/EtlJobConfig.java index 93c4c6c0..99e679f1 100644 --- a/spark-load/spark-load-common/src/main/java/org/apache/doris/config/EtlJobConfig.java +++ b/spark-load/spark-load-common/src/main/java/org/apache/doris/config/EtlJobConfig.java @@ -1,6 +1,5 @@ package org.apache.doris.config; -import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; @@ -14,6 +13,9 @@ import java.util.List; import java.util.Map; +/** + * Copied from Apache Doris org.apache.doris.sparkdpp.EtlJobConfig + */ public class EtlJobConfig implements Serializable { // global dict public static final String GLOBAL_DICT_TABLE_NAME = "doris_global_dict_table_%d"; diff --git a/spark-load/spark-load-core/pom.xml b/spark-load/spark-load-core/pom.xml index b1a31e82..2878ad4d 100644 --- a/spark-load/spark-load-core/pom.xml +++ b/spark-load/spark-load-core/pom.xml @@ -36,6 +36,11 @@ + + org.apache.doris + spark-load-common + ${revision} + com.fasterxml.jackson.core jackson-databind @@ -79,10 +84,6 @@ org.apache.hadoop hadoop-client - - org.apache.doris - fe-common - org.apache.logging.log4j log4j-core diff --git a/spark-load/spark-load-core/src/main/java/org/apache/doris/SparkLoadRunner.java b/spark-load/spark-load-core/src/main/java/org/apache/doris/SparkLoadRunner.java index a3207636..a8d64dcf 100644 --- a/spark-load/spark-load-core/src/main/java/org/apache/doris/SparkLoadRunner.java +++ b/spark-load/spark-load-core/src/main/java/org/apache/doris/SparkLoadRunner.java @@ -114,13 +114,9 @@ private static CommandLineOptions parseArgs(String[] args) { System.exit(-1); } - if (cmd.hasOption('c') || cmd.hasOption("config")) { - String configPath = cmd.getOptionValue("config"); - boolean recovery = cmd.hasOption('r') || cmd.hasOption("recovery"); - return new CommandLineOptions(configPath, recovery); - } - - throw new IllegalArgumentException(); + String configPath = cmd.getOptionValue("config"); + boolean recovery = cmd.hasOption('r') || cmd.hasOption("recovery"); + return new CommandLineOptions(configPath, recovery); } @@ -144,6 +140,7 @@ private static void checkConfig(JobConfig jobConfig) { Preconditions.checkArgument(StringUtils.isNoneBlank(jobConfig.getDatabase()), "database is empty"); jobConfig.checkTaskInfo(); jobConfig.checkSparkInfo(); + jobConfig.checkHadoopProperties(); } } diff --git a/spark-load/spark-load-core/src/main/java/org/apache/doris/common/meta/LoadMeta.java b/spark-load/spark-load-core/src/main/java/org/apache/doris/common/meta/LoadMeta.java index 6ac4ff71..fd56cd97 100644 --- a/spark-load/spark-load-core/src/main/java/org/apache/doris/common/meta/LoadMeta.java +++ b/spark-load/spark-load-core/src/main/java/org/apache/doris/common/meta/LoadMeta.java @@ -18,9 +18,9 @@ package org.apache.doris.common.meta; import org.apache.doris.common.Constants; +import org.apache.doris.config.EtlJobConfig; import org.apache.doris.config.JobConfig; import org.apache.doris.exception.SparkLoadException; -import org.apache.doris.sparkdpp.EtlJobConfig; import com.google.common.annotations.VisibleForTesting; import lombok.Data; diff --git a/spark-load/spark-load-core/src/main/java/org/apache/doris/config/JobConfig.java b/spark-load/spark-load-core/src/main/java/org/apache/doris/config/JobConfig.java index 9ffc4408..2eaaed21 100644 --- a/spark-load/spark-load-core/src/main/java/org/apache/doris/config/JobConfig.java +++ b/spark-load/spark-load-core/src/main/java/org/apache/doris/config/JobConfig.java @@ -20,11 +20,11 @@ import org.apache.doris.SparkLoadRunner; import org.apache.doris.common.Constants; import org.apache.doris.common.enums.LoadMode; -import org.apache.doris.sparkdpp.EtlJobConfig; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import lombok.Data; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import java.io.File; diff --git a/spark-load/spark-load-core/src/main/java/org/apache/doris/load/job/PullLoader.java b/spark-load/spark-load-core/src/main/java/org/apache/doris/load/job/PullLoader.java index 23a0e829..a2bcf7bc 100644 --- a/spark-load/spark-load-core/src/main/java/org/apache/doris/load/job/PullLoader.java +++ b/spark-load/spark-load-core/src/main/java/org/apache/doris/load/job/PullLoader.java @@ -25,9 +25,9 @@ import org.apache.doris.common.enums.JobStatus; import org.apache.doris.common.meta.LoadMeta; import org.apache.doris.common.meta.TableMeta; +import org.apache.doris.config.EtlJobConfig; import org.apache.doris.config.JobConfig; import org.apache.doris.exception.SparkLoadException; -import org.apache.doris.sparkdpp.EtlJobConfig; import org.apache.doris.util.DateUtils; import org.apache.doris.util.FileSystemUtils; import org.apache.doris.util.JsonUtils; diff --git a/spark-load/spark-load-core/src/test/java/org/apache/doris/common/meta/LoadMetaTest.java b/spark-load/spark-load-core/src/test/java/org/apache/doris/common/meta/LoadMetaTest.java index 79c95739..7546583a 100644 --- a/spark-load/spark-load-core/src/test/java/org/apache/doris/common/meta/LoadMetaTest.java +++ b/spark-load/spark-load-core/src/test/java/org/apache/doris/common/meta/LoadMetaTest.java @@ -18,8 +18,8 @@ package org.apache.doris.common.meta; +import org.apache.doris.config.EtlJobConfig; import org.apache.doris.exception.SparkLoadException; -import org.apache.doris.sparkdpp.EtlJobConfig; import org.junit.Assert; import org.junit.Rule; @@ -45,7 +45,7 @@ public void checkMapping() throws SparkLoadException { columns.add(new EtlJobConfig.EtlColumn("c1", "HLL", true, false, "NONE", null, 0, 10, 0)); columns.add(new EtlJobConfig.EtlColumn("c2", "BITMAP", true, false, "NONE", null, 0, 10, 0)); - EtlJobConfig.EtlIndex etlIndex = new EtlJobConfig.EtlIndex(1, columns, 1, "DUPLICATE", true); + EtlJobConfig.EtlIndex etlIndex = new EtlJobConfig.EtlIndex(1, columns, 1, "DUPLICATE", true, 1); EtlJobConfig.EtlPartition etlPartition = new EtlJobConfig.EtlPartition(1L, Collections.singletonList(0), Collections.singletonList(1), true, 1); EtlJobConfig.EtlPartitionInfo etlPartitionInfo = diff --git a/spark-load/spark-load-dpp/pom.xml b/spark-load/spark-load-dpp/pom.xml index af5ca5c7..67647cff 100644 --- a/spark-load/spark-load-dpp/pom.xml +++ b/spark-load/spark-load-dpp/pom.xml @@ -33,8 +33,9 @@ under the License. - ${project.groupId} - fe-common + org.apache.doris + spark-load-common + ${revision} @@ -46,11 +47,7 @@ under the License. org.apache.commons commons-lang3 - - - - - + diff --git a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java index 84ef9ba8..d639b31f 100644 --- a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java +++ b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java @@ -18,7 +18,7 @@ package org.apache.doris.load.loadv2.dpp; import org.apache.doris.common.SparkDppException; -import org.apache.doris.sparkdpp.EtlJobConfig; +import org.apache.doris.config.EtlJobConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,8 +33,6 @@ // Parser to validate value for different type public abstract class ColumnParser implements Serializable { - protected static final Logger LOG = LoggerFactory.getLogger(ColumnParser.class); - // thread safe formatter public static final DateTimeFormatter DATE_FORMATTER = new DateTimeFormatterBuilder() .appendPattern("uuuu-MM-dd") @@ -42,6 +40,7 @@ public abstract class ColumnParser implements Serializable { public static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder() .appendPattern("uuuu-MM-dd HH:mm:ss") .toFormatter(); + protected static final Logger LOG = LoggerFactory.getLogger(ColumnParser.class); public static ColumnParser create(EtlJobConfig.EtlColumn etlColumn) throws SparkDppException { String columnType = etlColumn.columnType; diff --git a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitioner.java b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitioner.java index 05f2bdcc..9fd413db 100644 --- a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitioner.java +++ b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitioner.java @@ -17,7 +17,7 @@ package org.apache.doris.load.loadv2.dpp; -import org.apache.doris.sparkdpp.EtlJobConfig; +import org.apache.doris.config.EtlJobConfig; import org.apache.spark.Partitioner; diff --git a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppUtils.java b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppUtils.java index 0c6b6454..bf190408 100644 --- a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppUtils.java +++ b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppUtils.java @@ -18,7 +18,7 @@ package org.apache.doris.load.loadv2.dpp; import org.apache.doris.common.SparkDppException; -import org.apache.doris.sparkdpp.EtlJobConfig; +import org.apache.doris.config.EtlJobConfig; import com.google.common.collect.Lists; import org.apache.spark.sql.Row; @@ -231,7 +231,7 @@ public static StructType replaceBinaryColsInSchema(Set binaryColumns, St } public static StructType createDstTableSchema(List columns, - boolean addBucketIdColumn, boolean regardDistinctColumnAsBinary) { + boolean addBucketIdColumn, boolean regardDistinctColumnAsBinary) { List fields = new ArrayList<>(); if (addBucketIdColumn) { StructField bucketIdField = DataTypes.createStructField(BUCKET_ID, DataTypes.StringType, true); diff --git a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilder.java b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilder.java index 0b54389a..ca89ab8d 100644 --- a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilder.java +++ b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilder.java @@ -17,7 +17,7 @@ package org.apache.doris.load.loadv2.dpp; -import org.apache.doris.sparkdpp.EtlJobConfig; +import org.apache.doris.config.EtlJobConfig; import java.util.ArrayList; import java.util.Collections; diff --git a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/RollupTreeBuilder.java b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/RollupTreeBuilder.java index acb0d4c9..16ce92b8 100644 --- a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/RollupTreeBuilder.java +++ b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/RollupTreeBuilder.java @@ -17,7 +17,7 @@ package org.apache.doris.load.loadv2.dpp; -import org.apache.doris.sparkdpp.EtlJobConfig; +import org.apache.doris.config.EtlJobConfig; // RollupTreeBuilder is used to get the RollupTree from the TableMeta public abstract interface RollupTreeBuilder { diff --git a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/RollupTreeNode.java b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/RollupTreeNode.java index a95482c2..ec3129f3 100644 --- a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/RollupTreeNode.java +++ b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/RollupTreeNode.java @@ -17,7 +17,7 @@ package org.apache.doris.load.loadv2.dpp; -import org.apache.doris.sparkdpp.EtlJobConfig; +import org.apache.doris.config.EtlJobConfig; import java.util.List; diff --git a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java index 08137c70..325b39c8 100644 --- a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java +++ b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java @@ -17,9 +17,9 @@ package org.apache.doris.load.loadv2.dpp; +import org.apache.doris.common.DppResult; import org.apache.doris.common.SparkDppException; -import org.apache.doris.sparkdpp.DppResult; -import org.apache.doris.sparkdpp.EtlJobConfig; +import org.apache.doris.config.EtlJobConfig; import com.google.common.base.Strings; import com.google.common.collect.Maps; @@ -95,6 +95,8 @@ public final class SparkDpp implements java.io.Serializable { private static final String NULL_FLAG = "\\N"; private static final String DPP_RESULT_FILE = "dpp_result.json"; private static final String BITMAP_TYPE = "bitmap"; + Map> tableToBitmapDictColumns = new HashMap<>(); + Map> tableToBinaryBitmapColumns = new HashMap<>(); private SparkSession spark = null; private EtlJobConfig etlJobConfig = null; private LongAccumulator abnormalRowAcc = null; @@ -109,8 +111,6 @@ public final class SparkDpp implements java.io.Serializable { // we need to wrap it so that we can use it in executor. private SerializableConfiguration serializableHadoopConf; private DppResult dppResult = new DppResult(); - Map> tableToBitmapDictColumns = new HashMap<>(); - Map> tableToBinaryBitmapColumns = new HashMap<>(); // just for ut public SparkDpp() { @@ -252,6 +252,7 @@ private void writeRepartitionAndSortedRDDToParquet(JavaPairRDD, Obj conf.setBoolean("spark.sql.parquet.int64AsTimestampMillis", false); conf.setBoolean("spark.sql.parquet.int96AsTimestamp", true); conf.setBoolean("spark.sql.parquet.binaryAsString", false); + conf.setBoolean("spark.sql.parquet.fieldId.write.enabled", true); conf.set("spark.sql.parquet.outputTimestampType", "INT96"); ParquetWriteSupport.setSchema(dstSchema, conf); ParquetWriteSupport parquetWriteSupport = new ParquetWriteSupport(); diff --git a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java index 0e140af1..e06dc2df 100644 --- a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java +++ b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java @@ -20,7 +20,7 @@ import org.apache.doris.common.SparkDppException; import org.apache.doris.common.io.BitmapValue; import org.apache.doris.common.io.Hll; -import org.apache.doris.sparkdpp.EtlJobConfig; +import org.apache.doris.config.EtlJobConfig; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; diff --git a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java index a359612e..03300014 100644 --- a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java +++ b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java @@ -18,14 +18,14 @@ package org.apache.doris.load.loadv2.etl; import org.apache.doris.common.SparkDppException; +import org.apache.doris.config.EtlJobConfig; +import org.apache.doris.config.EtlJobConfig.EtlColumn; +import org.apache.doris.config.EtlJobConfig.EtlColumnMapping; +import org.apache.doris.config.EtlJobConfig.EtlFileGroup; +import org.apache.doris.config.EtlJobConfig.EtlIndex; +import org.apache.doris.config.EtlJobConfig.EtlTable; import org.apache.doris.load.loadv2.dpp.GlobalDictBuilder; import org.apache.doris.load.loadv2.dpp.SparkDpp; -import org.apache.doris.sparkdpp.EtlJobConfig; -import org.apache.doris.sparkdpp.EtlJobConfig.EtlColumn; -import org.apache.doris.sparkdpp.EtlJobConfig.EtlColumnMapping; -import org.apache.doris.sparkdpp.EtlJobConfig.EtlFileGroup; -import org.apache.doris.sparkdpp.EtlJobConfig.EtlIndex; -import org.apache.doris.sparkdpp.EtlJobConfig.EtlTable; import com.google.common.collect.Lists; import com.google.common.collect.Maps; diff --git a/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/ColumnParserTest.java b/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/ColumnParserTest.java index 9091686c..41de92ae 100644 --- a/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/ColumnParserTest.java +++ b/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/ColumnParserTest.java @@ -17,7 +17,7 @@ package org.apache.doris.load.loadv2.dpp; -import org.apache.doris.sparkdpp.EtlJobConfig; +import org.apache.doris.config.EtlJobConfig; import org.junit.Assert; import org.junit.Test; diff --git a/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitionerTest.java b/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitionerTest.java index 131018ed..0e865ddd 100644 --- a/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitionerTest.java +++ b/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitionerTest.java @@ -17,7 +17,7 @@ package org.apache.doris.load.loadv2.dpp; -import org.apache.doris.sparkdpp.EtlJobConfig; +import org.apache.doris.config.EtlJobConfig; import org.junit.Assert; import org.junit.Test; diff --git a/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/DppUtilsTest.java b/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/DppUtilsTest.java index e7cea5d0..30f7a30b 100644 --- a/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/DppUtilsTest.java +++ b/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/DppUtilsTest.java @@ -17,7 +17,7 @@ package org.apache.doris.load.loadv2.dpp; -import org.apache.doris.sparkdpp.EtlJobConfig; +import org.apache.doris.config.EtlJobConfig; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; diff --git a/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilderTest.java b/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilderTest.java index 90c95cf0..33c0dba3 100644 --- a/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilderTest.java +++ b/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilderTest.java @@ -17,7 +17,7 @@ package org.apache.doris.load.loadv2.dpp; -import org.apache.doris.sparkdpp.EtlJobConfig; +import org.apache.doris.config.EtlJobConfig; import org.junit.Assert; import org.junit.Test; @@ -55,24 +55,24 @@ public void testBuild() { baseColumns.add(column3); baseColumns.add(column4); EtlJobConfig.EtlIndex baseIndex = new EtlJobConfig.EtlIndex(10000, - baseColumns, 12345, "DUPLICATE", true); + baseColumns, 12345, "DUPLICATE", true, 1); List roll1Columns = new ArrayList<>(); roll1Columns.add(column1); roll1Columns.add(column2); roll1Columns.add(column4); EtlJobConfig.EtlIndex roll1Index = new EtlJobConfig.EtlIndex(10001, - roll1Columns, 12346, "AGGREGATE", false); + roll1Columns, 12346, "AGGREGATE", false, 1); List roll2Columns = new ArrayList<>(); roll2Columns.add(column1); roll2Columns.add(column4); EtlJobConfig.EtlIndex roll2Index = new EtlJobConfig.EtlIndex(10002, - roll2Columns, 12347, "AGGREGATE", false); + roll2Columns, 12347, "AGGREGATE", false, 1); List roll3Columns = new ArrayList<>(); roll3Columns.add(column3); roll3Columns.add(column4); EtlJobConfig.EtlIndex roll3Index = new EtlJobConfig.EtlIndex(10003, - roll3Columns, 12348, "AGGREGATE", false); + roll3Columns, 12348, "AGGREGATE", false, 1); List indexes = new ArrayList<>(); indexes.add(baseIndex); diff --git a/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/SparkDppTest.java b/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/SparkDppTest.java index 7522a69c..31ad8d36 100644 --- a/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/SparkDppTest.java +++ b/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/SparkDppTest.java @@ -17,7 +17,7 @@ package org.apache.doris.load.loadv2.dpp; -import org.apache.doris.sparkdpp.EtlJobConfig; +import org.apache.doris.config.EtlJobConfig; import org.apache.spark.sql.RowFactory; import org.junit.Assert; diff --git a/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java b/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java index 0ea7f660..aa50de59 100644 --- a/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java +++ b/spark-load/spark-load-dpp/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java @@ -18,15 +18,15 @@ package org.apache.doris.load.loadv2.etl; import org.apache.doris.common.jmockit.Deencapsulation; -import org.apache.doris.sparkdpp.EtlJobConfig; -import org.apache.doris.sparkdpp.EtlJobConfig.EtlColumn; -import org.apache.doris.sparkdpp.EtlJobConfig.EtlColumnMapping; -import org.apache.doris.sparkdpp.EtlJobConfig.EtlFileGroup; -import org.apache.doris.sparkdpp.EtlJobConfig.EtlIndex; -import org.apache.doris.sparkdpp.EtlJobConfig.EtlJobProperty; -import org.apache.doris.sparkdpp.EtlJobConfig.EtlPartition; -import org.apache.doris.sparkdpp.EtlJobConfig.EtlPartitionInfo; -import org.apache.doris.sparkdpp.EtlJobConfig.EtlTable; +import org.apache.doris.config.EtlJobConfig; +import org.apache.doris.config.EtlJobConfig.EtlColumn; +import org.apache.doris.config.EtlJobConfig.EtlColumnMapping; +import org.apache.doris.config.EtlJobConfig.EtlFileGroup; +import org.apache.doris.config.EtlJobConfig.EtlIndex; +import org.apache.doris.config.EtlJobConfig.EtlJobProperty; +import org.apache.doris.config.EtlJobConfig.EtlPartition; +import org.apache.doris.config.EtlJobConfig.EtlPartitionInfo; +import org.apache.doris.config.EtlJobConfig.EtlTable; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -68,9 +68,9 @@ public void setUp() { EtlColumn k1 = new EtlColumn("k1", "INT", false, true, "NONE", "0", 0, 0, 0); EtlColumn k2 = new EtlColumn("k2", "VARCHAR", false, true, "NONE", "0", 10, 0, 0); EtlColumn v1 = new EtlColumn("v1", "BIGINT", false, false, "NONE", "0", 0, 0, 0); - EtlIndex index1 = new EtlIndex(index1Id, Lists.newArrayList(k1, k2, v1), 666666, "DUPLICATE", true); + EtlIndex index1 = new EtlIndex(index1Id, Lists.newArrayList(k1, k2, v1), 666666, "DUPLICATE", true, 1); v1 = new EtlColumn("v1", "BIGINT", false, false, "SUM", "0", 0, 0, 0); - EtlIndex index2 = new EtlIndex(index2Id, Lists.newArrayList(k1, v1), 888888, "AGGREGATE", true); + EtlIndex index2 = new EtlIndex(index2Id, Lists.newArrayList(k1, v1), 888888, "AGGREGATE", true,1 ); List indexes = Lists.newArrayList(index1, index2); // partition info List partitions = Lists.newArrayList();