I use bulkLoadHFiles.bulkLoad. I have org.apache.spark.sql.Dataset, which contains two columns of string (keys and value). I will transform this text into JavaPairRDD<ImmutableBytesWritable, KeyValue>. If the Dataset is not pre-sorted, then I get the error "IOException: Added a key not lexically larger than previous ... "
JavaPairRDD<ImmutableBytesWritable, KeyValue> pairsToBulkLoad =
inputDataset.toJavaRDD().mapToPair(row -> convertToKV (row, "cf", "column"));
BulkLoadHFiles bulkLoadHFiles = BulkLoadHFiles.create(jobConfiguration);
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
pairsToBulkLoad.saveAsNewAPIHadoopFile(output.toString(), ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, jobConfiguration);
bulkLoadHFiles.bulkLoad(TableName.valueOf(hbaseFullTableName), output);
public Tuple2<ImmutableBytesWritable, KeyValue> convertToKV (final Row row, final String columnFamily,final String column) {
final String key = row.getString(0);
final String value = row.getString(1);
return new Tuple2<>(new ImmutableBytesWritable(Bytes.toBytes(key)),
new KeyValue(Bytes.toBytes(key), Bytes.toBytes(columnFamily),
Bytes.toBytes(column), Bytes.toBytes(value)));
}
If I submit a pre-sorted Dataset, then this code works stably. But in reality, in an industrial environment, an unordered Dataset may arrive here. I tried to insert: pairsToBulkLoad = pairsToBulkLoad.sortByKey(true);
JavaPairRDD<ImmutableBytesWritable, KeyValue> pairsToBulkLoad =
inputDataset.toJavaRDD().mapToPair(row -> convertToKV (row, "cf", "column"));
pairsToBulkLoad = pairsToBulkLoad.sortByKey(true);
BulkLoadHFiles bulkLoadHFiles = BulkLoadHFiles.create(jobConfiguration);
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
pairsToBulkLoad.saveAsNewAPIHadoopFile(output.toString(), ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, jobConfiguration);
bulkLoadHFiles.bulkLoad(TableName.valueOf(hbaseFullTableName), output);
In this case, I get another error: Job aborted due to stage failure: task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.hbase.io.ImmutableBytesWritable - object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable
I don't understand how to look for a solution.
- If I sort Dataset using Spark, does this guarantee the ordering of the JavaPairRDD<ImmutableBytesWritable, KeyValue>?
- Or the second solution is to sort the JavaPairRDD<ImmutableBytesWritable, KeyValue>, but why does it fall on sorting " object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable"?
(Java 8, Spark 3. HBase 2.4.2)
I will be grateful for any advice)
I managed to solve this problem myself. You can sort an JavaPairRDD<ImmutableBytesWritable, KeyValue> in order like this: