tag:blogger.com,1999:blog-6750396425043273302024-03-13T12:17:32.111-07:00Using Spark for ETL Using Apache Spark to extract transform and load big data.Clyde DCruzhttp://www.blogger.com/profile/03281646448232595853noreply@blogger.comBlogger1125tag:blogger.com,1999:blog-675039642504327330.post-18791967443331333952015-10-11T04:54:00.000-07:002016-01-21T01:21:08.436-08:00PySpark HBase and Spark Streaming: Save RDDs to HBase <div dir="ltr" style="text-align: left;" trbidi="on">
<span style="font-family: "verdana" , sans-serif;">If you are even remotely associated with Big Data Analytics, you will have heard of <a href="http://spark.apache.org/">Apache Spark</a> and why every one is really excited about it.</span><br />
<span style="font-family: "verdana" , sans-serif;"><br />
This post is basically a simple code example of using the <a href="http://spark.apache.org/docs/latest/api/python/">Spark's Python API</a> i.e PySpark to push data to an <a href="http://hbase.apache.org/">HBase</a> table.</span><br />
<div class="separator" style="clear: both; text-align: center;">
<br /></div>
<div class="separator" style="clear: both; text-align: center;">
<span style="font-family: "verdana" , sans-serif;"><br /></span></div>
<div class="separator" style="clear: both; text-align: left;">
<span style="font-family: "verdana" , sans-serif;">Lets look at the case of loading a stream of the JSON objects such as these into HBase.</span></div>
<div class="separator" style="clear: both; text-align: left;">
<span style="font-family: "trebuchet ms" , sans-serif;"><br /></span></div>
<pre style="background: #f0f0f0; border: 1px dashed #CCCCCC; color: black; font-family: arial; font-size: 12px; height: auto; line-height: 20px; overflow: auto; padding: 0px; text-align: left; width: 99%;"><code style="color: black; word-wrap: normal;"> {"id":1,"name":"April","color":"orange","description":"mini tiger"}
{"id":2,"name":"Fluffy","color":"white","description":"White fur ball"}
{"id":3,"name":"Moony","color":"grey","description":"Monochrome kitty"}
</code></pre>
<pre class="brush: plain"></pre>
<div class="separator" style="clear: both; text-align: left;">
<span style="font-family: "trebuchet ms" , sans-serif;"><br /></span></div>
<div class="separator" style="clear: both; text-align: left;">
<span style="font-family: "verdana" , sans-serif;">We can get the individual lines from the stream as done in the <a href="https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/network_wordcount.py" target="_blank">Network Wordcount example</a></span></div>
<div class="separator" style="clear: both; text-align: left;">
<br /></div>
<pre style="background: #f0f0f0; border: 1px dashed #CCCCCC; color: black; font-family: arial; font-size: 12px; height: auto; line-height: 20px; overflow: auto; padding: 0px; text-align: left; width: 99%;"><code style="color: black; word-wrap: normal;"> ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
</code></pre>
<div class="separator" style="clear: both; text-align: left;">
<span style="font-family: "trebuchet ms" , sans-serif;"><br /></span></div>
<div class="separator" style="clear: both; text-align: left;">
<span style="font-family: "verdana" , sans-serif;">And then transform and save contents of each RDD in the stream to HBase</span></div>
<div class="separator" style="clear: both; text-align: left;">
<span style="font-family: "trebuchet ms" , sans-serif;"><br /></span></div>
<pre style="background: #f0f0f0; border: 1px dashed #CCCCCC; color: black; font-family: arial; font-size: 12px; height: auto; line-height: 20px; overflow: auto; padding: 0px; text-align: left; width: 99%;"><code style="color: black; word-wrap: normal;"> lines.foreachRDD(SaveRecord)
</code></pre>
<div class="separator" style="clear: both; text-align: left;">
<span style="font-family: "trebuchet ms" , sans-serif;"><br /></span></div>
<div class="separator" style="clear: both; text-align: left;">
<span style="font-family: "verdana" , sans-serif;">Now to save an RDD to hbase :</span></div>
<div>
<span style="font-family: "verdana" , sans-serif;"><br /><b>Transform the RDD </b></span><br />
<span style="font-family: "verdana" , sans-serif;">We neet to transform the RDD into a (key,value) pair having the following contents:</span><br />
<span style="background-color: white; font-family: "verdana" , sans-serif;"> <b> ( rowkey , [ row key , column family , column name , value ] )</b></span><br />
<span style="background-color: white; font-family: "trebuchet ms" , sans-serif;"><br /></span>
<br />
<pre style="background: #f0f0f0; border: 1px dashed #CCCCCC; color: black; font-family: arial; font-size: 12px; height: auto; line-height: 20px; overflow: auto; padding: 0px; text-align: left; width: 99%;"><code style="color: black; word-wrap: normal;"> datamap = rdd.map(lambda x: (str(json.loads(x)["id"]),[str(json.loads(x)["id"]),"cfamily","cats_json",x]))
</code></pre>
<b><span style="font-family: "trebuchet ms" , sans-serif;"><br /></span></b>
<b><span style="font-family: "verdana" , sans-serif;">Save to HBase</span></b></div>
<span style="font-family: "verdana" , sans-serif;">We can make use of the </span><span style="background-color: #eeeeee;"><span style="font-family: "courier new" , "courier" , monospace;">RDD.saveAsNewAPIHadoopDataset</span></span><span style="font-family: "verdana" , sans-serif;"> function as used in this example: </span><a href="https://github.com/apache/spark/blob/master/examples/src/main/python/hbase_outputformat.py" style="font-family: Verdana, sans-serif;" target="_blank">PySpark Hbase example</a><span style="font-family: "verdana" , sans-serif;"> to save the RDD to HBase</span><br />
<span style="font-family: "trebuchet ms" , sans-serif;"><br /></span>
<br />
<pre style="background: #f0f0f0; border: 1px dashed #CCCCCC; color: black; font-family: arial; font-size: 12px; height: auto; line-height: 20px; overflow: auto; padding: 0px; text-align: left; width: 99%;"><code style="color: black; word-wrap: normal;"> datamap.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
</code></pre>
<pre class="brush: python"></pre>
<span style="font-family: "verdana" , sans-serif;"><br />
The complete code of the Spark Streaming app will look like:</span><br />
<span style="font-family: "trebuchet ms" , sans-serif;"><br /></span>
<br />
<pre style="background-image: URL(https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgh1RySRGRj03tgIJglfQxnfVYHMKLLhz0KKwwbelYVhPDVOonEogCHAJ7Yg09mxs-XewhIhNv2EtTTgRVDCyKH02nrJ-65Fb183EBxxi_SIX33vTvtXJSWoD7FBveO5RbgTjn8UKCSclvm/s320/codebg.gif); background: #f0f0f0; border: 1px dashed #CCCCCC; color: black; font-family: arial; font-size: 12px; height: auto; line-height: 20px; overflow: auto; padding: 0px; text-align: left; width: 99%;"><code style="color: black; word-wrap: normal;">1: import sys
2: import json
3: from pyspark import SparkContext
4: from pyspark.streaming import StreamingContext
5:
6:
7: def SaveRecord(rdd):
8: host = 'sparkmaster.example.com'
9: table = 'cats'
10: keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
11: valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
12: conf = {"hbase.zookeeper.quorum": host,
13: "hbase.mapred.outputtable": table,
14: "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
15: "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
16: "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
17: datamap = rdd.map(lambda x: (str(json.loads(x)["id"]),[str(json.loads(x)["id"]),"cfamily","cats_json",x]))
18: datamap.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
19:
20: if __name__ == "__main__":
21: if len(sys.argv) != 3:
22: print("Usage: StreamCatsToHBase.py <hostname> <port>")
23: exit(-1)
24:
25: sc = SparkContext(appName="StreamCatsToHBase")
26: ssc = StreamingContext(sc, 1)
27: lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
28: lines.foreachRDD(SaveRecord)
29:
30: ssc.start() # Start the computation
31: ssc.awaitTermination() # Wait for the computation to terminate
</code></pre>
<span style="font-family: "trebuchet ms" , sans-serif;"><br /></span>
<span style="font-family: "verdana" , sans-serif;">To run this successfully we need to first create an HBase table</span><br />
<span style="font-family: "trebuchet ms" , sans-serif;"><br /></span>
<br />
<div>
<pre class="brush: plain"></pre>
<pre style="background: #f0f0f0; border: 1px dashed #CCCCCC; color: black; font-family: arial; font-size: 12px; height: auto; line-height: 20px; overflow: auto; padding: 0px; text-align: left; width: 99%;"><code style="color: black; word-wrap: normal;"> $ hbase shell
hbase(main):003:0> create 'cats','cfamily'
0 row(s) in 1.2270 seconds
</code></pre>
<span style="font-family: "trebuchet ms" , sans-serif;"><br /></span>
<span style="font-family: "verdana" , sans-serif;">Start a local data server using <a href="http://netcat.sourceforge.net/" target="_blank">netcat </a> and simply paste the JSON data in the netcat terminal.</span><br />
<span style="font-family: "verdana" , sans-serif;"><br /></span>
<br />
<pre style="background: #f0f0f0; border: 1px dashed #CCCCCC; color: black; font-family: arial; font-size: 12px; height: auto; line-height: 20px; overflow: auto; padding: 0px; text-align: left; width: 99%;"><code style="color: black; word-wrap: normal;"> $ nc -lk 9999
</code></pre>
<span style="font-family: "trebuchet ms" , sans-serif;"><br /></span>
<span style="font-family: "verdana" , sans-serif;">Submit the app to Spark using </span><span style="font-family: "courier new" , "courier" , monospace;">spark-submit</span><span style="font-family: "verdana" , sans-serif;">.</span><br />
<span style="font-family: "trebuchet ms" , sans-serif;"><br /></span>
<br />
<pre style="background: #f0f0f0; border: 1px dashed #CCCCCC; color: black; font-family: arial; font-size: 12px; height: auto; line-height: 20px; overflow: auto; padding: 0px; text-align: left; width: 99%;"><code style="color: black; word-wrap: normal;"> bin/spark-submit --jars lib/spark-examples-1.4.0-hadoop2.6.0.jar /tmp/StreamCatsToHbase.py localhost 9999
</code></pre>
<span style="font-family: "trebuchet ms" , sans-serif;"><br /></span>
<span style="font-family: "verdana" , sans-serif;">Et voila ! We can then see the cats data inserted in HBase.</span><br />
<span style="font-family: "verdana" , sans-serif;"><br /></span></div>
<pre style="background: #f0f0f0; border: 1px dashed #CCCCCC; color: black; font-family: arial; font-size: 12px; height: auto; line-height: 20px; overflow: auto; padding: 0px; text-align: left; width: 99%;"><code style="color: black; word-wrap: normal;"> hbase(main):002:0> scan 'cats'
ROW COLUMN+CELL
1 column=cfamily:cats_json, timestamp=1444469220927,
value={"id":1,"name":"April","color":"orange","description":"mini tiger"}
2 column=cfamily:cats_json, timestamp=1444469220927,
value={"id":2,"name":"Fluffy","color":"white","description":"White fur ball"}
3 column=cfamily:cats_json, timestamp=1444469220927,
value={"id":3,"name":"Moony","color":"grey","description":"Monochrome kitty"}
3 row(s) in 0.0110 seconds
</code></pre>
<br />
<br /></div>
Clyde DCruzhttp://www.blogger.com/profile/03281646448232595853noreply@blogger.com0