I have a class that takes a local file, transforms it, and stores it in GCS:
import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }
import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._
class GcsService(gcsStorage: Storage) {
def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build
if (destination.unzipGzip) {
for (input ← managed(new ZipInputStream(Files.newInputStream(localPath)));
output ← managed(new GZIPOutputStream(Channels.newOutputStream(gcsStorage.writer(blobInfo))))) {
ByteStreams.copy(input, output)
}
} else if (destination.decompressBzip2) {
for (input <- managed(new BZip2CompressorInputStream(Files.newInputStream(localPath)));
output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
ByteStreams.copy(input, output)
}
} else {
for (input <- managed(Files.newInputStream(localPath));
output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
IOUtils.copy(input, output)
}
}
}
}
case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)
I am trying to remove some code duplication, in particular the creation of the fileInputStream and gcsOutputStream. But I cannot simply extract those variables at the top of the method, because it would create resources outside of the scala-arm managed block:
import java.io.{ InputStream, OutputStream }
import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }
import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._
class GcsService(gcsStorage: Storage) {
def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build
// FIXME: creates a resource outside of the ARM block
val fileInputStream = Files.newInputStream(localPath)
val gcsOutputStream = Channels.newOutputStream(gcsStorage.writer(blobInfo))
if (destination.unzipGzip) {
unzipGzip(fileInputStream, gcsOutputStream)
} else if (destination.decompressBzip2) {
decompressBzip2(fileInputStream, gcsOutputStream)
} else {
copy(fileInputStream, gcsOutputStream)
}
}
private def unzipGzip(inputStream: InputStream, outputStream: OutputStream): Unit = {
for (input ← managed(new ZipInputStream(inputStream));
output ← managed(new GZIPOutputStream(outputStream))) {
ByteStreams.copy(input, output)
}
}
private def decompressBzip2(inputStream: InputStream, outputStream: OutputStream): Unit = {
for (input <- managed(new BZip2CompressorInputStream(inputStream));
output <- managed(outputStream)) {
ByteStreams.copy(input, output)
}
}
private def copy(inputStream: InputStream, outputStream: OutputStream): Unit = {
for (input <- managed(inputStream);
output <- managed(outputStream)) {
IOUtils.copy(input, output)
}
}
}
case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)
As you can see, the code is a lot clearer and more testable, but resources are not handled correctly since they are not "managed". As an example, if an exception is thrown when creating gcsOutputStream, fileInputStream won't be closed.
I could probably solve this using Google Guava sources and sinks, but I am wondering if there is a better way of doing this in Scala, without introducing Guava. Ideally using the standard library, or a scala-arm feature, or maybe even in Cats?
- Should I define
fileInputStreamandgcsOutputStreamas functions that take nothing and return the stream? It seems the code will be more verbose with() => InputStreamand() => OutputStreameverywhere? - Should I use multiple scala-arm "managed" for comprehensions (one to define
fileInputStreamandgcsOutputStream, and another one inside each sub-function)? If I do that, isn't it a problem that the "inner" inputstream will be closed twice? - Is there a clean and "scalaish" approach to doing this that I am not seeing?
You could refactor it like this:
First, declare managed resources:
It doesn't open these resources, it's just declaration, that you want these resources to be managed.
Then you could use
mapto wrap them in desired decorators (likeZipInputStream):Of course
ManagedResource[A]is just value, so you can even pass it to a method as parameter: