diff --git a/streamingpro-core/src/main/java/tech/mlsql/tool/HDFSOperatorV2.scala b/streamingpro-core/src/main/java/tech/mlsql/tool/HDFSOperatorV2.scala index f4479a8fc..847da81c1 100644 --- a/streamingpro-core/src/main/java/tech/mlsql/tool/HDFSOperatorV2.scala +++ b/streamingpro-core/src/main/java/tech/mlsql/tool/HDFSOperatorV2.scala @@ -266,7 +266,7 @@ object HDFSOperatorV2 { } } - def saveWithoutTopNLines(inPath: String, skipFirstNLines: Int, header: Boolean): String = { + def saveWithoutTopNLines(inPath: String, skipFirstNLines: Int): String = { val fs = FileSystem.get(hadoopConfiguration) val src: Path = new Path(inPath) var br: BufferedReader = null @@ -275,39 +275,33 @@ object HDFSOperatorV2 { val pathElements = inPath.split(PathFun.pathSeparator) val writePathParts = pathElements.take(pathElements.length - 1) :+ String.format("skipFirstNLines_%s_%s", String.valueOf(skipFirstNLines), pathElements(pathElements.length - 1)) val outPath = writePathParts.mkString(PathFun.pathSeparator) - if (!fileExists(outPath)) { - try { - dos = fs.create(new Path(new java.io.File(outPath).getPath), true) - br = new BufferedReader(new InputStreamReader(fs.open(src))) - line = br.readLine() - - var count = 1 - while (line != null) { - if (header && count == 1) { - dos.writeBytes(line + "\n") - line = br.readLine() - } + try { + dos = fs.create(new Path(new java.io.File(outPath).getPath), true) + br = new BufferedReader(new InputStreamReader(fs.open(src))) + line = br.readLine() - if (count >= skipFirstNLines) { - dos.writeBytes(line + "\n") - } - count += 1 - line = br.readLine() + var count = 1 + while (line != null) { + if (count > skipFirstNLines) { + dos.writeBytes(line + "\n") } - } finally { - if (br != null) br.close() - if (null != dos) { - try { - dos.close() - } catch { - case ex: Exception => - println("close exception") - } + count += 1 + line = br.readLine() + } + } finally { + if (br != null) br.close() + if (null != dos) { + try { dos.close() + } catch { + case ex: Exception => + println("close exception") } + dos.close() } } + outPath } } \ No newline at end of file diff --git a/streamingpro-mlsql/libs/aws-glue-datacatalog-spark-client-3.4.0-SNAPSHOT.jar b/streamingpro-mlsql/libs/aws-glue-datacatalog-spark-client-3.4.0-SNAPSHOT.jar new file mode 100644 index 000000000..6c16c5eb4 Binary files /dev/null and b/streamingpro-mlsql/libs/aws-glue-datacatalog-spark-client-3.4.0-SNAPSHOT.jar differ diff --git a/streamingpro-mlsql/libs/hive-common-2.3.7.jar b/streamingpro-mlsql/libs/hive-common-2.3.7.jar new file mode 100644 index 000000000..fba35d83b Binary files /dev/null and b/streamingpro-mlsql/libs/hive-common-2.3.7.jar differ diff --git a/streamingpro-mlsql/libs/hive-exec-2.3.7-core.jar b/streamingpro-mlsql/libs/hive-exec-2.3.7-core.jar new file mode 100644 index 000000000..38df23294 Binary files /dev/null and b/streamingpro-mlsql/libs/hive-exec-2.3.7-core.jar differ diff --git a/streamingpro-mlsql/libs/hive-exec-2.3.7.jar b/streamingpro-mlsql/libs/hive-exec-2.3.7.jar new file mode 100644 index 000000000..3cbf5caf8 Binary files /dev/null and b/streamingpro-mlsql/libs/hive-exec-2.3.7.jar differ diff --git a/streamingpro-mlsql/src/main/java/streaming/core/datasource/impl/MLSQLCSV.scala b/streamingpro-mlsql/src/main/java/streaming/core/datasource/impl/MLSQLCSV.scala index a31ff2edd..4d9d726c0 100644 --- a/streamingpro-mlsql/src/main/java/streaming/core/datasource/impl/MLSQLCSV.scala +++ b/streamingpro-mlsql/src/main/java/streaming/core/datasource/impl/MLSQLCSV.scala @@ -20,15 +20,8 @@ class MLSQLCSV(override val uid: String) extends MLSQLBaseFileSource with WowPar case None => originPath case Some(_) => { var numsToSkip = skipFirstNLines.get.toInt - val header = config.config.get("header") match { - case Some("true") => { - numsToSkip += 1 - true - } - case _ => false - } val path = originPath - val newPath = HDFSOperatorV2.saveWithoutTopNLines(path, numsToSkip, header) + val newPath = HDFSOperatorV2.saveWithoutTopNLines(path, numsToSkip) newPath } }