In this post we'll see how to read data from a Hbase table and write to another Hbase table using Spark RDDs.
To read the data from Hbase table set the 'SOURCE_TABLE' in Hbase configuration object. To create the RDD call the method newAPIHadoopRDD.
For writing to a DESTINATION_TABLE Hbase table first register it to the Job conf object. Based on the class specified to this method conf.setOutputFormatClass destination source is decided.
To read the data from Hbase table set the 'SOURCE_TABLE' in Hbase configuration object. To create the RDD call the method newAPIHadoopRDD.
For writing to a DESTINATION_TABLE Hbase table first register it to the Job conf object. Based on the class specified to this method conf.setOutputFormatClass destination source is decided.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | Configuration hbaseConf = HBaseConfiguration.create(); hbaseConf.setLong("hbase.rpc.timeout", 600000); hbaseConf.setLong("hbase.client.scanner.caching", 1000); hbaseConf.set(TableInputFormat.INPUT_TABLE, SOURCE_TABLE); Job conf = Job.getInstance(hbaseConf, "Hbase Reader and Writer"); conf.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, DESTINATION_TABLE); conf.setOutputFormatClass(TableOutputFormat.class); SparkConf sparkConf = new SparkConf(); JavaSparkContext jsc = new JavaSparkContext(sparkConf); JavaPairRDD < ImmutableBytesWritable, Result > inputRDD = jsc.newAPIHadoopRDD(hbaseConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); JavaPairRDD < ImmutableBytesWritable, Put > outputRDD = inputRDD.mapToPair( new PairFunction < Tuple2 < ImmutableBytesWritable, Result > , ImmutableBytesWritable, Put > () { private static final long serialVersionUID = 1 L; @Override public Tuple2 < ImmutableBytesWritable, Put > call(Tuple2 < ImmutableBytesWritable, Result > tuple) throws Exception { Result result = tuple._2; String keyRow = Bytes.toString(result.getRow()); String valueFromHbase = Bytes.toString(result.getValue(Bytes.toBytes(SOURCE_TABLE_COLUMN_FAMILY), Bytes.toBytes(SOURCE_TABLE_COLUMN_QUALIFIER))); // Perform your business operations Put put = new Put(Bytes.toBytes(keyRow)); put.addColumn(Bytes.toBytes(DESTINATION_TABLE_COLUMN_FAMILY), Bytes.toBytes(DESTINATION_TABLE_COLUMN_QUALIFIER), Bytes.toBytes(valueFromHbase); return new Tuple2 < ImmutableBytesWritable, Put > (new ImmutableBytesWritable(), put); } outputRDD.saveAsNewAPIHadoopDataset(conf.getConfiguration()); jsc.close(); |
No comments:
Post a Comment