Working on Hbase table using Spark

In this post we'll see how to read data from a Hbase table and write to another Hbase table using Spark RDDs.

To read the data from Hbase table set the 'SOURCE_TABLE' in Hbase configuration object. To create the RDD call the method newAPIHadoopRDD.

For writing to a DESTINATION_TABLE Hbase table first register it to the Job conf object. Based on the class specified to this method conf.setOutputFormatClass destination source is decided.



1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
Configuration hbaseConf = HBaseConfiguration.create();

hbaseConf.setLong("hbase.rpc.timeout", 600000);

hbaseConf.setLong("hbase.client.scanner.caching", 1000);

hbaseConf.set(TableInputFormat.INPUT_TABLE, SOURCE_TABLE);

Job conf = Job.getInstance(hbaseConf, "Hbase Reader and Writer");

conf.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, DESTINATION_TABLE);

conf.setOutputFormatClass(TableOutputFormat.class);

SparkConf sparkConf = new SparkConf();

JavaSparkContext jsc = new JavaSparkContext(sparkConf);

JavaPairRDD < ImmutableBytesWritable, Result > inputRDD = jsc.newAPIHadoopRDD(hbaseConf, TableInputFormat.class,

 ImmutableBytesWritable.class, Result.class);

JavaPairRDD < ImmutableBytesWritable, Put > outputRDD = inputRDD.mapToPair(

  new PairFunction < Tuple2 < ImmutableBytesWritable, Result > , ImmutableBytesWritable, Put > () {

   private static final long serialVersionUID = 1 L;

   @Override

   public Tuple2 < ImmutableBytesWritable, Put > call(Tuple2 < ImmutableBytesWritable, Result > tuple) throws Exception {

     Result result = tuple._2;

     String keyRow = Bytes.toString(result.getRow());

     String valueFromHbase = Bytes.toString(result.getValue(Bytes.toBytes(SOURCE_TABLE_COLUMN_FAMILY), Bytes.toBytes(SOURCE_TABLE_COLUMN_QUALIFIER)));

     // Perform your business operations

     Put put = new Put(Bytes.toBytes(keyRow));

     put.addColumn(Bytes.toBytes(DESTINATION_TABLE_COLUMN_FAMILY), Bytes.toBytes(DESTINATION_TABLE_COLUMN_QUALIFIER), Bytes.toBytes(valueFromHbase);

      return new Tuple2 < ImmutableBytesWritable, Put > (new ImmutableBytesWritable(), put);
     }

     outputRDD.saveAsNewAPIHadoopDataset(conf.getConfiguration());

     jsc.close();

No comments:

Post a Comment