If you are even remotely associated with Big Data Analytics, you will have heard of Apache Spark and why every one is really excited about it.
This post is basically a simple code example of using the Spark's Python API i.e PySpark to push data to an HBase table.
Transform the RDD
We neet to transform the RDD into a (key,value) pair having the following contents:
( rowkey , [ row key , column family , column name , value ] )
Save to HBase
We can make use of the RDD.saveAsNewAPIHadoopDataset function as used in this example: PySpark Hbase example to save the RDD to HBase
The complete code of the Spark Streaming app will look like:
To run this successfully we need to first create an HBase table
Start a local data server using netcat and simply paste the JSON data in the netcat terminal.
Submit the app to Spark using spark-submit.
Et voila ! We can then see the cats data inserted in HBase.
This post is basically a simple code example of using the Spark's Python API i.e PySpark to push data to an HBase table.
Lets look at the case of loading a stream of the JSON objects such as these into HBase.
 {"id":1,"name":"April","color":"orange","description":"mini tiger"}  
 {"id":2,"name":"Fluffy","color":"white","description":"White fur ball"}  
 {"id":3,"name":"Moony","color":"grey","description":"Monochrome kitty"}  
We can get the individual lines from the stream as done in the Network Wordcount example
 ssc = StreamingContext(sc, 1)  
 lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))  
And then transform and save contents of each RDD in the stream to HBase
 lines.foreachRDD(SaveRecord)  
Now to save an RDD to hbase :
Transform the RDD
We neet to transform the RDD into a (key,value) pair having the following contents:
( rowkey , [ row key , column family , column name , value ] )
 datamap = rdd.map(lambda x: (str(json.loads(x)["id"]),[str(json.loads(x)["id"]),"cfamily","cats_json",x]))  
Save to HBase
 datamap.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)  
The complete code of the Spark Streaming app will look like:
1:  import sys  
2:  import json  
3:  from pyspark import SparkContext  
4:  from pyspark.streaming import StreamingContext  
5:    
6:    
7:  def SaveRecord(rdd):  
8:    host = 'sparkmaster.example.com'  
9:    table = 'cats'  
10:    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"  
11:    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"  
12:    conf = {"hbase.zookeeper.quorum": host,  
13:        "hbase.mapred.outputtable": table,  
14:        "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",  
15:        "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",  
16:        "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}  
17:    datamap = rdd.map(lambda x: (str(json.loads(x)["id"]),[str(json.loads(x)["id"]),"cfamily","cats_json",x]))  
18:    datamap.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)  
19:    
20:  if __name__ == "__main__":  
21:    if len(sys.argv) != 3:  
22:      print("Usage: StreamCatsToHBase.py <hostname> <port>")  
23:      exit(-1)  
24:    
25:    sc = SparkContext(appName="StreamCatsToHBase")  
26:    ssc = StreamingContext(sc, 1)  
27:    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))  
28:    lines.foreachRDD(SaveRecord)  
29:    
30:    ssc.start()       # Start the computation  
31:    ssc.awaitTermination() # Wait for the computation to terminate  
To run this successfully we need to first create an HBase table
 $ hbase shell  
 hbase(main):003:0> create 'cats','cfamily'  
   
 0 row(s) in 1.2270 seconds  
Start a local data server using netcat and simply paste the JSON data in the netcat terminal.
 $ nc -lk 9999  
Submit the app to Spark using spark-submit.
 bin/spark-submit --jars lib/spark-examples-1.4.0-hadoop2.6.0.jar /tmp/StreamCatsToHbase.py localhost 9999  
Et voila ! We can then see the cats data inserted in HBase.
 hbase(main):002:0> scan 'cats'  
 ROW  COLUMN+CELL  
  1  column=cfamily:cats_json, timestamp=1444469220927,   
    value={"id":1,"name":"April","color":"orange","description":"mini tiger"}  
  2  column=cfamily:cats_json, timestamp=1444469220927,   
    value={"id":2,"name":"Fluffy","color":"white","description":"White fur ball"}  
  3  column=cfamily:cats_json, timestamp=1444469220927,   
    value={"id":3,"name":"Moony","color":"grey","description":"Monochrome kitty"}  
 3 row(s) in 0.0110 seconds  
 
No comments:
Post a Comment