I am doing a POC on Apache Iceberg. While writing I am writing twice using 2 datafiles that create 2 snapshots. In my example, each file contains 4 rows. The last file is marked as current snapshot which is right. While reading the data, I want to read the data of the latest snapshot only. However, the reader prints the data of all the snapshots (8 rows) even though I set the snapshot of TableScan as current snapshot.
Below is the sample code
package org.example2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
import org.customCatalog.CustomCatalog;
import java.util.UUID;
public class Example2 {
public static String dataPath = "C:\\data\\iceberg";
public static void main(String[] args) throws Exception {
CustomCatalog catalog = new CustomCatalog(dataPath);
Schema schema = new Schema(
Types.NestedField.optional(1, "event_id", Types.StringType.get()),
Types.NestedField.optional(2, "username", Types.StringType.get()),
Types.NestedField.optional(3, "userid", Types.IntegerType.get()),
Types.NestedField.optional(4, "api_version", Types.StringType.get()),
Types.NestedField.optional(5, "command", Types.StringType.get())
);
Namespace webapp = Namespace.of("webapp");
TableIdentifier name = TableIdentifier.of(webapp, "user_events");
System.out.println(catalog.tableExists(name));
Table table;
if (!catalog.tableExists(name)) {
table = catalog.createTable(name, schema, PartitionSpec.unpartitioned());
} else {
table = catalog.loadTable(name);
}
for (int i = 0; i < 2; i++) {
GenericRecord record = GenericRecord.create(schema);
ImmutableList.Builder<GenericRecord> builder = ImmutableList.builder();
builder.add(record.copy(ImmutableMap
.of("event_id", UUID.randomUUID().toString(), "username", "Bruce", "userid", 1, "api_version", "1.0",
"command", "grapple")));
builder.add(record.copy(ImmutableMap
.of("event_id", UUID.randomUUID().toString(), "username", "Wayne", "userid", 1, "api_version", "1.0",
"command", "glide")));
builder.add(record.copy(ImmutableMap
.of("event_id", UUID.randomUUID().toString(), "username", "Clark", "userid", 1, "api_version", "2.0",
"command", "fly")));
builder.add(record.copy(ImmutableMap
.of("event_id", UUID.randomUUID().toString(), "username", "Kent", "userid", 1, "api_version", "1.0",
"command", "land")));
ImmutableList<GenericRecord> records = builder.build();
String filepath = table.location() + "/" + UUID.randomUUID().toString();
OutputFile file = table.io().newOutputFile(filepath);
DataWriter<GenericRecord> dataWriter =
Parquet.writeData(file)
.schema(schema)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.withSpec(PartitionSpec.unpartitioned())
.build();
try {
for (GenericRecord record1 : records) {
dataWriter.write(record1);
}
} finally {
dataWriter.close();
}
DataFile dataFile = dataWriter.toDataFile();
System.out.println("Writing file " + dataFile.toString());
table.newAppend().appendFile(dataFile).commit();
System.out.println("Records written " + records.size());
}
table = catalog.loadTable(name);
System.out.println("Snapshots " + table.snapshots());
System.out.println("current snapshot " + table.currentSnapshot().snapshotId());
CloseableIterable<Record> result = IcebergGenerics.read(table).useSnapshot(table.currentSnapshot().snapshotId()).build(); // the useSnapshot method should force to use only the current snapshot??
for (Record r : result) {
System.out.println(r);
}
}
}
Adding the custom catalog implementation as asked.
package org.customCatalog;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.parquet.Preconditions;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Extend BaseMetastoreCatalog to provide default warehouse locations and instantiate CustomTableOperations
*/
public class CustomCatalog extends BaseMetastoreCatalog {
private String warehousePath = "C:\\data\\iceberg";
private String name;
public CustomCatalog() {
initialize("custom", new HashMap<>());
}
public CustomCatalog(String warehousePath) {
this.warehousePath = warehousePath;
initialize("custom", new HashMap<>());
}
@Override
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
String dbName = tableIdentifier.namespace().level(0);
String tableName = tableIdentifier.name();
// instantiate the CustomTableOperations
return new CustomTableOperations(warehousePath, dbName, tableName);
}
@Override
protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
// Can choose to use any other configuration name
String tableLocation = warehousePath;
// Can be an s3 or hdfs path
// if (tableLocation == null) {
// throw new RuntimeException("custom.iceberg.warehouse.location configuration not set!");
// }
return String.format(
"%s/%s/%s", tableLocation,
tableIdentifier.namespace().levels()[0],
tableIdentifier.name());
}
@Override
public List<TableIdentifier> listTables(Namespace namespace) {
List<TableIdentifier> tableIdentifiers = new ArrayList<>();
File f = new File(warehousePath + '/' +namespace);
if(f.exists()) {
for(String s : f.list()) {
File f2 = new File(warehousePath + '/' + namespace.levels()[0] +'/'+ s);
if(f2.exists() && f2.isDirectory()) {
tableIdentifiers.add(TableIdentifier.of(namespace, s));
}
}
} else {
System.out.println("Namespace `"+namespace.levels()[0]+"` doesn't exist.");
}
return tableIdentifiers;
}
@Override
public boolean dropTable(TableIdentifier identifier, boolean purge) {
// will implement later
String namespace = identifier.namespace().level(0);
String tableName = identifier.name();
return true;
}
@Override
public void renameTable(TableIdentifier from, TableIdentifier to) {
// will implement later
return;
}
// implement this method to read catalog name and properties during initialization
public void initialize(String name, Map<String, String> properties) {
this.name = name;
}
@Override public String name() {
return name;
}
}