Sunday, October 11, 2015

PySpark HBase and Spark Streaming: Save RDDs to HBase

If you are even remotely associated with Big Data Analytics, you will have heard of Apache Spark and why every one is really excited about it.

This post is basically a simple code example of using the Spark's Python API i.e PySpark to push data to an HBase table.



Lets look at the case of loading a stream of the JSON objects such as these into HBase.

 {"id":1,"name":"April","color":"orange","description":"mini tiger"}  
 {"id":2,"name":"Fluffy","color":"white","description":"White fur ball"}  
 {"id":3,"name":"Moony","color":"grey","description":"Monochrome kitty"}  


We can get the individual lines from the stream as done in the Network Wordcount example

 ssc = StreamingContext(sc, 1)  
 lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))  

And then transform and save contents of each RDD in the stream to HBase

 lines.foreachRDD(SaveRecord)  

Now to save an RDD to hbase :

Transform the RDD 

We neet to transform the RDD into a (key,value) pair having the following contents:
         ( rowkey , [ row key , column family , column name , value ] )


 datamap = rdd.map(lambda x: (str(json.loads(x)["id"]),[str(json.loads(x)["id"]),"cfamily","cats_json",x]))  

Save to HBase
We can make use of the RDD.saveAsNewAPIHadoopDataset function as used in this example: PySpark Hbase example to save the RDD to HBase


 datamap.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)  


The complete code of the Spark Streaming app will look like:



1:  import sys  
2:  import json  
3:  from pyspark import SparkContext  
4:  from pyspark.streaming import StreamingContext  
5:    
6:    
7:  def SaveRecord(rdd):  
8:    host = 'sparkmaster.example.com'  
9:    table = 'cats'  
10:    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"  
11:    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"  
12:    conf = {"hbase.zookeeper.quorum": host,  
13:        "hbase.mapred.outputtable": table,  
14:        "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",  
15:        "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",  
16:        "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}  
17:    datamap = rdd.map(lambda x: (str(json.loads(x)["id"]),[str(json.loads(x)["id"]),"cfamily","cats_json",x]))  
18:    datamap.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)  
19:    
20:  if __name__ == "__main__":  
21:    if len(sys.argv) != 3:  
22:      print("Usage: StreamCatsToHBase.py <hostname> <port>")  
23:      exit(-1)  
24:    
25:    sc = SparkContext(appName="StreamCatsToHBase")  
26:    ssc = StreamingContext(sc, 1)  
27:    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))  
28:    lines.foreachRDD(SaveRecord)  
29:    
30:    ssc.start()       # Start the computation  
31:    ssc.awaitTermination() # Wait for the computation to terminate  

To run this successfully we need to first create an HBase table



 $ hbase shell  
 hbase(main):003:0> create 'cats','cfamily'  
   
 0 row(s) in 1.2270 seconds  

Start a local data server using netcat  and simply paste the JSON data in the netcat terminal.


 $ nc -lk 9999  

Submit the app to Spark using spark-submit.


 bin/spark-submit --jars lib/spark-examples-1.4.0-hadoop2.6.0.jar /tmp/StreamCatsToHbase.py localhost 9999  

Et voila ! We can then see the cats data inserted in HBase.

 hbase(main):002:0> scan 'cats'  
 ROW  COLUMN+CELL  
  1  column=cfamily:cats_json, timestamp=1444469220927,   
    value={"id":1,"name":"April","color":"orange","description":"mini tiger"}  
  2  column=cfamily:cats_json, timestamp=1444469220927,   
    value={"id":2,"name":"Fluffy","color":"white","description":"White fur ball"}  
  3  column=cfamily:cats_json, timestamp=1444469220927,   
    value={"id":3,"name":"Moony","color":"grey","description":"Monochrome kitty"}  
 3 row(s) in 0.0110 seconds