基本程式設計
- 如果是用python撰寫spark的程式,必須import以下的spark classes,且指定設定檔。
- appName是程式顯示在cluster UI上的名稱
- master指Spark, Mesos or YARN cluster URL, 或是在local模式
- 實際使用時,我們會使用spark-submit執行程式,但是在測試時,使用local模式較為方便。
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
Pyspark交互模式
- 如果使用pyspark進入交互界面時,SparkConext已經用變數sc預設載入,且會讓自行指定的Conext失效。
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
distData.reduce(lambda a, b: a + b)
- spark可簡單的讀取外部檔案,支援多種格式如local file system, HDFS, Cassandra, HBase, Amazon S3等。
- 一但讀取後,則可使用dataset的方式處理資料。
- 如果使用local file system的方式讀取資料時,則driver與worder node必須在相同的路徑存放此檔案,可使用拷貝或mount的方式解決。
- 可使用路徑、壓縮檔或是wildcard的方式指定檔案。
- textFile可加上第二個參數來指定partion個數(預設為one partition for one block)。
distFile = sc.textFile("data.txt")
Writable RDD key-value對應python格式
RDD writable type |
python type |
Text |
unicode str |
IntWritable |
int |
FloatWritable |
float |
DoubleWritable |
float |
BooleanWritable |
bool |
BytesWritable |
bytearray |
NullWritable |
None |
MapWritable |
dict |
Basic program
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
Spark中使用自訂的函數
"""MyScript.py"""
if __name__ == "__main__":
def myFunc(s):
words = s.split(" ")
return len(words)
sc = SparkContext(...)
sc.textFile("file.txt").map(myFunc)