If you are even remotely associated with Big Data Analytics, you will have heard of Apache Spark and why every one is really excited about it.
This post is basically a simple code example of using the Spark's Python API i.e PySpark to push data to an HBase table.
Transform the RDD
We neet to transform the RDD into a (key,value) pair having the following contents:
( rowkey , [ row key , column family , column name , value ] )
Save to HBase
We can make use of the RDD.saveAsNewAPIHadoopDataset function as used in this example: PySpark Hbase example to save the RDD to HBase
The complete code of the Spark Streaming app will look like:
To run this successfully we need to first create an HBase table
Start a local data server using netcat and simply paste the JSON data in the netcat terminal.
Submit the app to Spark using spark-submit.
Et voila ! We can then see the cats data inserted in HBase.
This post is basically a simple code example of using the Spark's Python API i.e PySpark to push data to an HBase table.
Lets look at the case of loading a stream of the JSON objects such as these into HBase.
{"id":1,"name":"April","color":"orange","description":"mini tiger"}
{"id":2,"name":"Fluffy","color":"white","description":"White fur ball"}
{"id":3,"name":"Moony","color":"grey","description":"Monochrome kitty"}
We can get the individual lines from the stream as done in the Network Wordcount example
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
And then transform and save contents of each RDD in the stream to HBase
lines.foreachRDD(SaveRecord)
Now to save an RDD to hbase :
Transform the RDD
We neet to transform the RDD into a (key,value) pair having the following contents:
( rowkey , [ row key , column family , column name , value ] )
datamap = rdd.map(lambda x: (str(json.loads(x)["id"]),[str(json.loads(x)["id"]),"cfamily","cats_json",x]))
Save to HBase
datamap.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
The complete code of the Spark Streaming app will look like:
1: import sys
2: import json
3: from pyspark import SparkContext
4: from pyspark.streaming import StreamingContext
5:
6:
7: def SaveRecord(rdd):
8: host = 'sparkmaster.example.com'
9: table = 'cats'
10: keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
11: valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
12: conf = {"hbase.zookeeper.quorum": host,
13: "hbase.mapred.outputtable": table,
14: "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
15: "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
16: "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
17: datamap = rdd.map(lambda x: (str(json.loads(x)["id"]),[str(json.loads(x)["id"]),"cfamily","cats_json",x]))
18: datamap.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
19:
20: if __name__ == "__main__":
21: if len(sys.argv) != 3:
22: print("Usage: StreamCatsToHBase.py <hostname> <port>")
23: exit(-1)
24:
25: sc = SparkContext(appName="StreamCatsToHBase")
26: ssc = StreamingContext(sc, 1)
27: lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
28: lines.foreachRDD(SaveRecord)
29:
30: ssc.start() # Start the computation
31: ssc.awaitTermination() # Wait for the computation to terminate
To run this successfully we need to first create an HBase table
$ hbase shell
hbase(main):003:0> create 'cats','cfamily'
0 row(s) in 1.2270 seconds
Start a local data server using netcat and simply paste the JSON data in the netcat terminal.
$ nc -lk 9999
Submit the app to Spark using spark-submit.
bin/spark-submit --jars lib/spark-examples-1.4.0-hadoop2.6.0.jar /tmp/StreamCatsToHbase.py localhost 9999
Et voila ! We can then see the cats data inserted in HBase.
hbase(main):002:0> scan 'cats'
ROW COLUMN+CELL
1 column=cfamily:cats_json, timestamp=1444469220927,
value={"id":1,"name":"April","color":"orange","description":"mini tiger"}
2 column=cfamily:cats_json, timestamp=1444469220927,
value={"id":2,"name":"Fluffy","color":"white","description":"White fur ball"}
3 column=cfamily:cats_json, timestamp=1444469220927,
value={"id":3,"name":"Moony","color":"grey","description":"Monochrome kitty"}
3 row(s) in 0.0110 seconds