How can I compose resources in Scala while still closing them correctly with scala-arm?

333 Views Asked by At

I have a class that takes a local file, transforms it, and stores it in GCS:

import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }

import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._


class GcsService(gcsStorage: Storage) {

  def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
    val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build

    if (destination.unzipGzip) {
      for (input ← managed(new ZipInputStream(Files.newInputStream(localPath)));
           output ← managed(new GZIPOutputStream(Channels.newOutputStream(gcsStorage.writer(blobInfo))))) {
        ByteStreams.copy(input, output)
      }
    } else if (destination.decompressBzip2) {
      for (input <- managed(new BZip2CompressorInputStream(Files.newInputStream(localPath)));
           output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
        ByteStreams.copy(input, output)
      }
    } else {
      for (input <- managed(Files.newInputStream(localPath));
           output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
        IOUtils.copy(input, output)
      }
    }
  }

}

case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)

I am trying to remove some code duplication, in particular the creation of the fileInputStream and gcsOutputStream. But I cannot simply extract those variables at the top of the method, because it would create resources outside of the scala-arm managed block:

import java.io.{ InputStream, OutputStream }
import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }

import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._


class GcsService(gcsStorage: Storage) {

  def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
    val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build

    // FIXME: creates a resource outside of the ARM block
    val fileInputStream = Files.newInputStream(localPath)
    val gcsOutputStream = Channels.newOutputStream(gcsStorage.writer(blobInfo))

    if (destination.unzipGzip) {
      unzipGzip(fileInputStream, gcsOutputStream)
    } else if (destination.decompressBzip2) {
      decompressBzip2(fileInputStream, gcsOutputStream)
    } else {
      copy(fileInputStream, gcsOutputStream)
    }
  }

  private def unzipGzip(inputStream: InputStream, outputStream: OutputStream): Unit = {
    for (input ← managed(new ZipInputStream(inputStream));
         output ← managed(new GZIPOutputStream(outputStream))) {
      ByteStreams.copy(input, output)
    }
  }

  private def decompressBzip2(inputStream: InputStream, outputStream: OutputStream): Unit = {
    for (input <- managed(new BZip2CompressorInputStream(inputStream));
         output <- managed(outputStream)) {
      ByteStreams.copy(input, output)
    }
  }

  private def copy(inputStream: InputStream, outputStream: OutputStream): Unit = {
    for (input <- managed(inputStream);
         output <- managed(outputStream)) {
      IOUtils.copy(input, output)
    }
  }
}

case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)

As you can see, the code is a lot clearer and more testable, but resources are not handled correctly since they are not "managed". As an example, if an exception is thrown when creating gcsOutputStream, fileInputStream won't be closed.

I could probably solve this using Google Guava sources and sinks, but I am wondering if there is a better way of doing this in Scala, without introducing Guava. Ideally using the standard library, or a scala-arm feature, or maybe even in Cats?

  • Should I define fileInputStream and gcsOutputStream as functions that take nothing and return the stream? It seems the code will be more verbose with () => InputStream and () => OutputStream everywhere?
  • Should I use multiple scala-arm "managed" for comprehensions (one to define fileInputStream and gcsOutputStream, and another one inside each sub-function)? If I do that, isn't it a problem that the "inner" inputstream will be closed twice?
  • Is there a clean and "scalaish" approach to doing this that I am not seeing?
1

There are 1 best solutions below

3
Krzysztof Atłasik On

You could refactor it like this:

First, declare managed resources:

val fileInputStream: ManagedResource[InputStream] = managed(Files.newInputStream(localPath))
val gcsOutputStream: ManagedResource[OutputStream] = managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))

It doesn't open these resources, it's just declaration, that you want these resources to be managed.

Then you could use map to wrap them in desired decorators (like ZipInputStream):

if (destination.unzipGzip) {
  for (input ← fileInputStream.map(s => new ZipInputStream(s));
       output ← gcsOutputStream.map(s => new GZIPOutputStream(s))) {
    ByteStreams.copy(input, output)
  }
} else if (destination.decompressBzip2) {
  for (input <- fileInputStream.map(s => new BZip2CompressorInputStream(s));
       output <- gcsOutputStream) {
    ByteStreams.copy(input, output)
  }
} else {
  for (input <- fileInputStream;
       output <- gcsOutputStream) {
    IOUtils.copy(input, output)
  }
}

Of course ManagedResource[A] is just value, so you can even pass it to a method as parameter:

private def unzipGzip(inputStream: Managed[InputStream], outputStream: Managed[OutputStream]): Unit = {
  for (input ← inputStream.map(s => new ZipInputStream(s));
       output ← outputStream.map(s => new GZIPOutputStream(s))) {
    ByteStreams.copy(input, output)
  }
}