Skip to content

Commit

Permalink
Scala3 ftp support (#170)
Browse files Browse the repository at this point in the history
* support scala3 in ftp connector

* Update BaseSpec.scala
  • Loading branch information
pjfanning committed Aug 17, 2023
1 parent c9d353f commit 5cda19e
Show file tree
Hide file tree
Showing 11 changed files with 31 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private[ftp] trait FtpBrowserGraphStage[FtpClient, S <: RemoteFileSettings]

def emitTraversedDirectories: Boolean = false

def createLogic(inheritedAttributes: Attributes) = {
def createLogic(inheritedAttributes: Attributes): FtpGraphStageLogic[FtpFile, FtpClient, S] = {
val logic = new FtpGraphStageLogic[FtpFile, FtpClient, S](shape, ftpLike, connectionSettings, ftpClient) {

private[this] var buffer: Seq[FtpFile] = Seq.empty[FtpFile]
Expand Down Expand Up @@ -84,9 +84,9 @@ private[ftp] trait FtpBrowserGraphStage[FtpClient, S <: RemoteFileSettings]

private[this] def getFilesFromPath(basePath: String) =
if (basePath.isEmpty)
ftpLike.listFiles(handler.get)
graphStageFtpLike.listFiles(handler.get)
else
ftpLike.listFiles(basePath, handler.get)
graphStageFtpLike.listFiles(basePath, handler.get)

} // end of stage logic

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private[ftp] trait FtpDirectoryOperationsGraphStage[FtpClient, S <: RemoteFileSe
out,
new OutHandler {
override def onPull(): Unit = {
push(out, ftpLike.mkdir(basePath, directoryName, handler.get))
push(out, graphStageFtpLike.mkdir(basePath, directoryName, handler.get))
complete(out)
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ import scala.util.control.NonFatal
@InternalApi
private[ftp] abstract class FtpGraphStageLogic[T, FtpClient, S <: RemoteFileSettings](
val shape: Shape,
val ftpLike: FtpLike[FtpClient, S],
val graphStageFtpLike: FtpLike[FtpClient, S],
val connectionSettings: S,
val ftpClient: () => FtpClient) extends GraphStageLogic(shape) {

protected[this] implicit val client = ftpClient()
protected[this] var handler: Option[ftpLike.Handler] = Option.empty[ftpLike.Handler]
protected[this] implicit val client: FtpClient = ftpClient()
protected[this] var handler: Option[graphStageFtpLike.Handler] = Option.empty[graphStageFtpLike.Handler]
protected[this] var failed = false

override def preStart(): Unit = {
super.preStart()
try {
val tryConnect = ftpLike.connect(connectionSettings)
val tryConnect = graphStageFtpLike.connect(connectionSettings)
if (tryConnect.isSuccess) {
handler = tryConnect.toOption
} else
Expand Down Expand Up @@ -76,7 +76,7 @@ private[ftp] abstract class FtpGraphStageLogic[T, FtpClient, S <: RemoteFileSett
protected[this] def doPreStart(): Unit

protected[this] def disconnect(): Unit =
handler.foreach(ftpLike.disconnect)
handler.foreach(graphStageFtpLike.disconnect)

protected[this] def matSuccess(): Boolean

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private[ftp] trait FtpIOSourceStage[FtpClient, S <: RemoteFileSettings]
isOpt.foreach { os =>
try {
os.close()
ftpLike match {
graphStageFtpLike match {
case cfo: CommonFtpOperations =>
if (!cfo.completePendingCommand(handler.get.asInstanceOf[cfo.Handler]))
throw new IOException("File transfer failed.")
Expand All @@ -128,13 +128,13 @@ private[ftp] trait FtpIOSourceStage[FtpClient, S <: RemoteFileSettings]
}

protected[this] def doPreStart(): Unit =
isOpt = ftpLike match {
isOpt = graphStageFtpLike match {
case ur: UnconfirmedReads =>
withUnconfirmedReads(ur)
case ro: RetrieveOffset =>
Some(ro.retrieveFileInputStream(path, handler.get.asInstanceOf[ro.Handler], offset).get)
case _ =>
Some(ftpLike.retrieveFileInputStream(path, handler.get).get)
Some(graphStageFtpLike.retrieveFileInputStream(path, handler.get).get)
}

private def withUnconfirmedReads(
Expand Down Expand Up @@ -229,7 +229,7 @@ private[ftp] trait FtpIOSinkStage[FtpClient, S <: RemoteFileSettings]
osOpt.foreach { os =>
try {
os.close()
ftpLike match {
graphStageFtpLike match {
case cfo: CommonFtpOperations =>
if (!cfo.completePendingCommand(handler.get.asInstanceOf[cfo.Handler]))
throw new IOException("File transfer failed.")
Expand All @@ -251,7 +251,7 @@ private[ftp] trait FtpIOSinkStage[FtpClient, S <: RemoteFileSettings]
}

protected[this] def doPreStart(): Unit = {
osOpt = Some(ftpLike.storeFileOutputStream(path, handler.get, append).get)
osOpt = Some(graphStageFtpLike.storeFileOutputStream(path, handler.get, append).get)
pull(in)
}

Expand Down Expand Up @@ -301,7 +301,7 @@ private[ftp] trait FtpMoveSink[FtpClient, S <: RemoteFileSettings]
override def onPush(): Unit = {
try {
val sourcePath = grab(in)
ftpLike.move(sourcePath.path, destinationPath(sourcePath), handler.get)
graphStageFtpLike.move(sourcePath.path, destinationPath(sourcePath), handler.get)
numberOfMovedFiles = numberOfMovedFiles + 1
pull(in)
} catch {
Expand Down Expand Up @@ -356,7 +356,7 @@ private[ftp] trait FtpRemoveSink[FtpClient, S <: RemoteFileSettings]
new InHandler {
override def onPush(): Unit = {
try {
ftpLike.remove(grab(in).path, handler.get)
graphStageFtpLike.remove(grab(in).path, handler.get)
numberOfRemovedFiles = numberOfRemovedFiles + 1
pull(in)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected[ftp] trait FtpLike[FtpClient, S <: RemoteFileSettings] {
* INTERNAL API
*/
@InternalApi
protected[ftp] trait RetrieveOffset { _: FtpLike[_, _] =>
protected[ftp] trait RetrieveOffset { self: FtpLike[_, _] =>

def retrieveFileInputStream(name: String, handler: Handler, offset: Long): Try[InputStream]

Expand All @@ -64,7 +64,7 @@ protected[ftp] trait RetrieveOffset { _: FtpLike[_, _] =>
* INTERNAL API
*/
@InternalApi
protected[ftp] trait UnconfirmedReads { _: FtpLike[_, _] =>
protected[ftp] trait UnconfirmedReads { self: FtpLike[_, _] =>

def retrieveFileInputStream(name: String, handler: Handler, offset: Long, maxUnconfirmedReads: Int): Try[InputStream]

Expand All @@ -76,8 +76,11 @@ protected[ftp] trait UnconfirmedReads { _: FtpLike[_, _] =>
@InternalApi
object FtpLike {
// type class instances
implicit val ftpLikeInstance = new FtpLike[FTPClient, FtpSettings] with RetrieveOffset with FtpOperations
implicit val ftpsLikeInstance = new FtpLike[FTPSClient, FtpsSettings] with RetrieveOffset with FtpsOperations
implicit val sFtpLikeInstance =
implicit val ftpLikeInstance: FtpLike[FTPClient, FtpSettings] with RetrieveOffset with FtpOperations =
new FtpLike[FTPClient, FtpSettings] with RetrieveOffset with FtpOperations
implicit val ftpsLikeInstance: FtpLike[FTPSClient, FtpsSettings] with RetrieveOffset with FtpsOperations =
new FtpLike[FTPSClient, FtpsSettings] with RetrieveOffset with FtpsOperations
implicit val sFtpLikeInstance
: FtpLike[SSHClient, SftpSettings] with RetrieveOffset with SftpOperations with UnconfirmedReads =
new FtpLike[SSHClient, SftpSettings] with RetrieveOffset with SftpOperations with UnconfirmedReads
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.util.Try
* INTERNAL API
*/
@InternalApi
private[ftp] trait FtpOperations extends CommonFtpOperations { _: FtpLike[FTPClient, FtpSettings] =>
private[ftp] trait FtpOperations extends CommonFtpOperations { self: FtpLike[FTPClient, FtpSettings] =>

def connect(connectionSettings: FtpSettings)(implicit ftpClient: FTPClient): Try[Handler] = Try {
connectionSettings.proxy.foreach(ftpClient.setProxy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.util.Try
*/
@InternalApi
private[ftp] trait FtpsOperations extends CommonFtpOperations {
_: FtpLike[FTPSClient, FtpsSettings] =>
self: FtpLike[FTPSClient, FtpsSettings] =>

def connect(connectionSettings: FtpsSettings)(implicit ftpClient: FTPSClient): Try[Handler] =
Try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import scala.util.{ Failure, Try }
* INTERNAL API
*/
@InternalApi
private[ftp] trait SftpOperations { _: FtpLike[SSHClient, SftpSettings] =>
private[ftp] trait SftpOperations { self: FtpLike[SSHClient, SftpSettings] =>

type Handler = SFTPClient

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import net.schmizz.sshj.SSHClient
import org.apache.commons.net.ftp.{ FTPClient, FTPSClient }

@DoNotInherit
sealed trait FtpApi[FtpClient, S <: RemoteFileSettings] { _: FtpSourceFactory[FtpClient, S] =>
sealed trait FtpApi[FtpClient, S <: RemoteFileSettings] { self: FtpSourceFactory[FtpClient, S] =>

/**
* Java API: creates a [[pekko.stream.javadsl.Source Source]] of [[FtpFile]]s from the remote user `root` directory.
Expand Down Expand Up @@ -573,6 +573,6 @@ object Sftp extends SftpApi {
*/
def create(customSshClient: SSHClient): SftpApi =
new SftpApi {
override val sshClient: SSHClient = customSshClient
override def sshClient(): SSHClient = customSshClient
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.commons.net.ftp.{ FTPClient, FTPSClient }
import scala.concurrent.Future

@DoNotInherit
sealed trait FtpApi[FtpClient, S <: RemoteFileSettings] { _: FtpSourceFactory[FtpClient, S] =>
sealed trait FtpApi[FtpClient, S <: RemoteFileSettings] { self: FtpSourceFactory[FtpClient, S] =>

/**
* Scala API: creates a [[pekko.stream.scaladsl.Source Source]] of [[FtpFile]]s from the remote user `root` directory.
Expand Down Expand Up @@ -398,6 +398,6 @@ object Sftp extends SftpApi {
*/
def apply(customSshClient: SSHClient): SftpApi =
new SftpApi {
override val sshClient: SSHClient = customSshClient
override def sshClient(): SSHClient = customSshClient
}
}
1 change: 0 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ object Dependencies {
))

val Ftp = Seq(
crossScalaVersions -= Scala3,
libraryDependencies ++= Seq(
"commons-net" % "commons-net" % "3.8.0",
"com.hierynomus" % "sshj" % "0.33.0"))
Expand Down

0 comments on commit 5cda19e

Please sign in to comment.