Skip to content

Commit

Permalink
fix header issues when setup skip first N lines
Browse files Browse the repository at this point in the history
  • Loading branch information
ckeys committed Aug 10, 2022
1 parent 4defad3 commit 56e7081
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ object HDFSOperatorV2 {
}
}

def saveWithoutTopNLines(inPath: String, skipFirstNLines: Int, header: Boolean): String = {
def saveWithoutTopNLines(inPath: String, skipFirstNLines: Int): String = {
val fs = FileSystem.get(hadoopConfiguration)
val src: Path = new Path(inPath)
var br: BufferedReader = null
Expand All @@ -275,39 +275,33 @@ object HDFSOperatorV2 {
val pathElements = inPath.split(PathFun.pathSeparator)
val writePathParts = pathElements.take(pathElements.length - 1) :+ String.format("skipFirstNLines_%s_%s", String.valueOf(skipFirstNLines), pathElements(pathElements.length - 1))
val outPath = writePathParts.mkString(PathFun.pathSeparator)
if (!fileExists(outPath)) {
try {
dos = fs.create(new Path(new java.io.File(outPath).getPath), true)
br = new BufferedReader(new InputStreamReader(fs.open(src)))
line = br.readLine()

var count = 1
while (line != null) {

if (header && count == 1) {
dos.writeBytes(line + "\n")
line = br.readLine()
}
try {
dos = fs.create(new Path(new java.io.File(outPath).getPath), true)
br = new BufferedReader(new InputStreamReader(fs.open(src)))
line = br.readLine()

if (count >= skipFirstNLines) {
dos.writeBytes(line + "\n")
}
count += 1
line = br.readLine()
var count = 1
while (line != null) {
if (count > skipFirstNLines) {
dos.writeBytes(line + "\n")
}
} finally {
if (br != null) br.close()
if (null != dos) {
try {
dos.close()
} catch {
case ex: Exception =>
println("close exception")
}
count += 1
line = br.readLine()
}
} finally {
if (br != null) br.close()
if (null != dos) {
try {
dos.close()
} catch {
case ex: Exception =>
println("close exception")
}
dos.close()
}
}

outPath
}
}
Binary file not shown.
Binary file added streamingpro-mlsql/libs/hive-common-2.3.7.jar
Binary file not shown.
Binary file added streamingpro-mlsql/libs/hive-exec-2.3.7-core.jar
Binary file not shown.
Binary file added streamingpro-mlsql/libs/hive-exec-2.3.7.jar
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,8 @@ class MLSQLCSV(override val uid: String) extends MLSQLBaseFileSource with WowPar
case None => originPath
case Some(_) => {
var numsToSkip = skipFirstNLines.get.toInt
val header = config.config.get("header") match {
case Some("true") => {
numsToSkip += 1
true
}
case _ => false
}
val path = originPath
val newPath = HDFSOperatorV2.saveWithoutTopNLines(path, numsToSkip, header)
val newPath = HDFSOperatorV2.saveWithoutTopNLines(path, numsToSkip)
newPath
}
}
Expand Down

0 comments on commit 56e7081

Please sign in to comment.