diff --git a/streamingpro-core/src/main/java/tech/mlsql/tool/HDFSOperatorV2.scala b/streamingpro-core/src/main/java/tech/mlsql/tool/HDFSOperatorV2.scala index f4479a8fc..1c2eac3f8 100644 --- a/streamingpro-core/src/main/java/tech/mlsql/tool/HDFSOperatorV2.scala +++ b/streamingpro-core/src/main/java/tech/mlsql/tool/HDFSOperatorV2.scala @@ -266,48 +266,82 @@ object HDFSOperatorV2 { } } - def saveWithoutTopNLines(inPath: String, skipFirstNLines: Int, header: Boolean): String = { + def saveWithoutLastNLines(inPath: String, skipLastNLines: Int): String = { val fs = FileSystem.get(hadoopConfiguration) val src: Path = new Path(inPath) - var br: BufferedReader = null var line: String = null var dos: FSDataOutputStream = null + var br1: BufferedReader = null + var br2: BufferedReader = null val pathElements = inPath.split(PathFun.pathSeparator) - val writePathParts = pathElements.take(pathElements.length - 1) :+ String.format("skipFirstNLines_%s_%s", String.valueOf(skipFirstNLines), pathElements(pathElements.length - 1)) + val writePathParts = pathElements.take(pathElements.length - 1) :+ String.format("skipLastNLines_%s_%s", String.valueOf(skipLastNLines), pathElements(pathElements.length - 1)) val outPath = writePathParts.mkString(PathFun.pathSeparator) - if (!fileExists(outPath)) { - try { - dos = fs.create(new Path(new java.io.File(outPath).getPath), true) - br = new BufferedReader(new InputStreamReader(fs.open(src))) - line = br.readLine() - var count = 1 - while (line != null) { + try { + dos = fs.create(new Path(new java.io.File(outPath).getPath), true) + br1 = new BufferedReader(new InputStreamReader(fs.open(src))) + var totalLinesCount = 0 + while (br1.readLine() != null) totalLinesCount += 1 - if (header && count == 1) { - dos.writeBytes(line + "\n") - line = br.readLine() - } + br2 = new BufferedReader(new InputStreamReader(fs.open(src))) + line = br2.readLine() + var count = 1 - if (count >= skipFirstNLines) { - dos.writeBytes(line + "\n") - } - count += 1 - line = br.readLine() + while (line != null) { + if (count <= totalLinesCount - skipLastNLines) { + dos.writeBytes(line + "\n") } - } finally { - if (br != null) br.close() - if (null != dos) { - try { - dos.close() - } catch { - case ex: Exception => - println("close exception") - } + count += 1 + line = br2.readLine() + } + } finally { + if (br1 != null) br1.close() + if (null != dos) { + try { dos.close() + } catch { + case ex: Exception => throw ex } + dos.close() } } outPath } + + def saveWithoutTopNLines(inPath: String, skipFirstNLines: Int): String = { + val fs = FileSystem.get(hadoopConfiguration) + val src: Path = new Path(inPath) + var br: BufferedReader = null + var line: String = null + var dos: FSDataOutputStream = null + val pathElements = inPath.split(PathFun.pathSeparator) + val writePathParts = pathElements.take(pathElements.length - 1) :+ String.format("skipFirstNLines_%s_%s", String.valueOf(skipFirstNLines), pathElements(pathElements.length - 1)) + val outPath = writePathParts.mkString(PathFun.pathSeparator) + + try { + dos = fs.create(new Path(new java.io.File(outPath).getPath), true) + br = new BufferedReader(new InputStreamReader(fs.open(src))) + line = br.readLine() + var count = 1 + while (line != null) { + if (count > skipFirstNLines) { + dos.writeBytes(line + "\n") + } + count += 1 + line = br.readLine() + } + } finally { + if (br != null) br.close() + if (null != dos) { + try { + dos.close() + } catch { + case ex: Exception => throw ex + } + dos.close() + } + } + + outPath + } } \ No newline at end of file diff --git a/streamingpro-mlsql/src/main/java/streaming/core/datasource/impl/MLSQLCSV.scala b/streamingpro-mlsql/src/main/java/streaming/core/datasource/impl/MLSQLCSV.scala index a31ff2edd..14603fc00 100644 --- a/streamingpro-mlsql/src/main/java/streaming/core/datasource/impl/MLSQLCSV.scala +++ b/streamingpro-mlsql/src/main/java/streaming/core/datasource/impl/MLSQLCSV.scala @@ -16,22 +16,26 @@ class MLSQLCSV(override val uid: String) extends MLSQLBaseFileSource with WowPar def handleDFWithOption(originPath: String, config: DataSourceConfig): String = { val skipFirstNLines = config.config.get("skipFirstNLines") - skipFirstNLines match { + val skipLastNLines = config.config.get("skipLastNLines") + val pathAfterSkipTopLines = skipFirstNLines match { case None => originPath case Some(_) => { var numsToSkip = skipFirstNLines.get.toInt - val header = config.config.get("header") match { - case Some("true") => { - numsToSkip += 1 - true - } - case _ => false - } val path = originPath - val newPath = HDFSOperatorV2.saveWithoutTopNLines(path, numsToSkip, header) + val newPath = HDFSOperatorV2.saveWithoutTopNLines(path, numsToSkip) newPath } } + val res = skipLastNLines match { + case None=> pathAfterSkipTopLines + case Some(_) => { + var numsToSkip = skipLastNLines.get.toInt + val path = pathAfterSkipTopLines + val newPath = HDFSOperatorV2.saveWithoutLastNLines(path, numsToSkip) + newPath + } + } + res } def deleteTmpFiles(config: DataSourceConfig, newPath: String): Unit = {