HDFSとSparkの連携

SparkでHDFS上のデータを分析し結果を再度HDFS上に保存するまでの備忘録です。

環境

HDFSはCDH5.3.2
SparkはHDFSクラスタとは別で用意
StandAloneモードでインストールしました。

サンプルデータ

Generate Test Data - Amazon Elastic MapReduceから作成

こういうデータ(抜粋)

0|Christopher WOOD|1954-10-15|F|NC|christopher.wood.1954@hotmail.com|412-850-6209
1|Scarlett YOUNG|1998-10-24|M|OK|scarlett.young.1998@live.com|151-447-8098
2|Ian ADAMS|1982-02-12|F|CT|ian.adams.1982@hotmail.com|768-213-4991
・
・

サンプルデータをHDFSにUPしておきます。
ファイルパスは/user/hdfs/customers/customersです。

ではSparkいきます。
今回はSpark-shell(scala)で試します。

HDFS上のデータ指定

scala> val test = sc.textFile("hdfs://Namenode:8020/user/hdfs/customers/customers")

"1954”の文字列を含む行だけ取り出す

scala> val lineCount = test.filter(line => line.contains("1954"))

何行あるかカウント

scala> lineCount.count()
res10: Long = 164203

もう少し絞ります。
上記結果の中から"PETERSON”の文字列を含む行だけ取り出す

scala> val peterson = lineCount.filter(line => line.contains("PETERSON"))

何行あるかカウント

scala> peterson.count()
res12: Long = 811

結果をHDFS上に保存

scala> peterson.saveAsTextFile("hdfs://Namenode:8020:8020/user/hdfs/peterson")

結果データ(抜粋)
1954とPETERSONを含む行のみ抽出

5072|Zoe PETERSON|1954-04-23|F|CT|zoe.peterson.1954@yahoo.com|318-623-6684
36486|Layla PETERSON|1954-11-27|F|WA|layla.peterson.1954@outlook.com|710-802-2058
92149|Arianna PETERSON|1954-08-02|F|AR|arianna.peterson.1954@outlook.com|358-917-5173
・
・

参考までにpysparkだと

test = sc.textFile("hdfs://Namenode:8020:8020/user/hdfs/customers/customers")
lineCount = test.filter(lambda line: "1954" in line)
peterson = lineCount.filter(lambda line: "PETERSON" in line)
peterson.saveAsTextFile("hdfs://Namenode:8020:8020/user/hdfs/peterson")

以上、SparkとHDFSの連携でした。
(ノ´▽`)ノ{+++THANK YOU+++}ヽ(´▽`ヽ)

Hadoop徹底入門 第2版

Hadoop徹底入門 第2版