How to read data of a particular snapshot in Apache Iceberg?

542 Views Asked by At

I am doing a POC on Apache Iceberg. While writing I am writing twice using 2 datafiles that create 2 snapshots. In my example, each file contains 4 rows. The last file is marked as current snapshot which is right. While reading the data, I want to read the data of the latest snapshot only. However, the reader prints the data of all the snapshots (8 rows) even though I set the snapshot of TableScan as current snapshot.

Below is the sample code

package org.example2;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
import org.customCatalog.CustomCatalog;

import java.util.UUID;

public class Example2 {

    public static String dataPath = "C:\\data\\iceberg";

    public static void main(String[] args) throws Exception {
        CustomCatalog catalog = new CustomCatalog(dataPath);
        Schema schema = new Schema(
                Types.NestedField.optional(1, "event_id", Types.StringType.get()),
                Types.NestedField.optional(2, "username", Types.StringType.get()),
                Types.NestedField.optional(3, "userid", Types.IntegerType.get()),
                Types.NestedField.optional(4, "api_version", Types.StringType.get()),
                Types.NestedField.optional(5, "command", Types.StringType.get())
        );

        Namespace webapp = Namespace.of("webapp");
        TableIdentifier name = TableIdentifier.of(webapp, "user_events");
        System.out.println(catalog.tableExists(name));
        Table table;
        if (!catalog.tableExists(name)) {
            table = catalog.createTable(name, schema, PartitionSpec.unpartitioned());
        } else {
            table = catalog.loadTable(name);
        }

        for (int i = 0; i < 2; i++) {
            GenericRecord record = GenericRecord.create(schema);
            ImmutableList.Builder<GenericRecord> builder = ImmutableList.builder();
            builder.add(record.copy(ImmutableMap
                    .of("event_id", UUID.randomUUID().toString(), "username", "Bruce", "userid", 1, "api_version", "1.0",
                            "command", "grapple")));
            builder.add(record.copy(ImmutableMap
                    .of("event_id", UUID.randomUUID().toString(), "username", "Wayne", "userid", 1, "api_version", "1.0",
                            "command", "glide")));
            builder.add(record.copy(ImmutableMap
                    .of("event_id", UUID.randomUUID().toString(), "username", "Clark", "userid", 1, "api_version", "2.0",
                            "command", "fly")));
            builder.add(record.copy(ImmutableMap
                    .of("event_id", UUID.randomUUID().toString(), "username", "Kent", "userid", 1, "api_version", "1.0",
                            "command", "land")));
            ImmutableList<GenericRecord> records = builder.build();

            String filepath = table.location() + "/" + UUID.randomUUID().toString();
            OutputFile file = table.io().newOutputFile(filepath);
            DataWriter<GenericRecord> dataWriter =
                    Parquet.writeData(file)
                            .schema(schema)
                            .createWriterFunc(GenericParquetWriter::buildWriter)
                            .overwrite()
                            .withSpec(PartitionSpec.unpartitioned())
                            .build();
            try {
                for (GenericRecord record1 : records) {
                    dataWriter.write(record1);
                }
            } finally {
                dataWriter.close();
            }

            DataFile dataFile = dataWriter.toDataFile();
            System.out.println("Writing file " + dataFile.toString());
            table.newAppend().appendFile(dataFile).commit();
            System.out.println("Records written " + records.size());

        }


        table = catalog.loadTable(name);
        System.out.println("Snapshots " + table.snapshots());
        System.out.println("current snapshot " + table.currentSnapshot().snapshotId());


        CloseableIterable<Record> result = IcebergGenerics.read(table).useSnapshot(table.currentSnapshot().snapshotId()).build(); // the useSnapshot method should force to use only the current snapshot??
        for (Record r : result) {
            System.out.println(r);
        }
    }
}

Adding the custom catalog implementation as asked.

package org.customCatalog;

import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.parquet.Preconditions;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Extend BaseMetastoreCatalog to provide default warehouse locations and instantiate CustomTableOperations
 */
public class CustomCatalog extends BaseMetastoreCatalog {

    private String warehousePath = "C:\\data\\iceberg";
    private String name;
    public CustomCatalog() {
        initialize("custom", new HashMap<>());
    }

    public CustomCatalog(String warehousePath) {
        this.warehousePath = warehousePath;
        initialize("custom", new HashMap<>());
    }

    @Override
    protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
        String dbName = tableIdentifier.namespace().level(0);
        String tableName = tableIdentifier.name();
        // instantiate the CustomTableOperations
        return new CustomTableOperations(warehousePath, dbName, tableName);
    }

    @Override
    protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {

        // Can choose to use any other configuration name
        String tableLocation = warehousePath;

        // Can be an s3 or hdfs path
//      if (tableLocation == null) {
//          throw new RuntimeException("custom.iceberg.warehouse.location configuration not set!");
//      }

        return String.format(
                "%s/%s/%s", tableLocation,
                tableIdentifier.namespace().levels()[0],
                tableIdentifier.name());
    }

    @Override
    public List<TableIdentifier> listTables(Namespace namespace) {
        List<TableIdentifier> tableIdentifiers = new ArrayList<>();
        File f = new File(warehousePath + '/' +namespace);
        if(f.exists()) {
            for(String s : f.list()) {
                File f2 = new File(warehousePath + '/' + namespace.levels()[0] +'/'+ s);
                if(f2.exists() && f2.isDirectory()) {
                    tableIdentifiers.add(TableIdentifier.of(namespace, s));
                }
            }
        } else {
            System.out.println("Namespace `"+namespace.levels()[0]+"` doesn't exist.");
        }
        return tableIdentifiers;
    }

    @Override
    public boolean dropTable(TableIdentifier identifier, boolean purge) {
        // will implement later
        String namespace = identifier.namespace().level(0);
        String tableName = identifier.name();

        return true;
    }

    @Override
    public void renameTable(TableIdentifier from, TableIdentifier to) {
        // will implement later
        return;
    }

    // implement this method to read catalog name and properties during initialization
    public void initialize(String name, Map<String, String> properties) {
        this.name = name;
    }

    @Override public String name() {
        return name;
    }
}
0

There are 0 best solutions below