基本程式設計

  • 如果是用python撰寫spark的程式,必須import以下的spark classes,且指定設定檔。
    • appName是程式顯示在cluster UI上的名稱
    • master指Spark, Mesos or YARN cluster URL, 或是在local模式
    • 實際使用時,我們會使用spark-submit執行程式,但是在測試時,使用local模式較為方便。
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

Pyspark交互模式

  • 如果使用pyspark進入交互界面時,SparkConext已經用變數sc預設載入,且會讓自行指定的Conext失效。
# 在pyspark模式下
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
distData.reduce(lambda a, b: a + b) # 15
  • spark可簡單的讀取外部檔案,支援多種格式如local file system, HDFS, Cassandra, HBase, Amazon S3等。
    • 一但讀取後,則可使用dataset的方式處理資料。
    • 如果使用local file system的方式讀取資料時,則driver與worder node必須在相同的路徑存放此檔案,可使用拷貝或mount的方式解決。
    • 可使用路徑、壓縮檔或是wildcard的方式指定檔案。
    • textFile可加上第二個參數來指定partion個數(預設為one partition for one block)。
distFile = sc.textFile("data.txt")
# 以下三種均為合法的指定檔案方式
# sc.textFile("/my/directory")
# sc.textFile("/my/directory/*.txt")
# sc.textFile("/my/directory/*.gz")

Writable RDD key-value對應python格式

  • 注意array沒有在列表中。
RDD writable type python type
Text unicode str
IntWritable int
FloatWritable float
DoubleWritable float
BooleanWritable bool
BytesWritable bytearray
NullWritable None
MapWritable dict

Basic program

# 讀取外部檔案,檔案並沒有真正讀到記憶體中,而只有lines變數指向檔案
lines = sc.textFile("data.txt")

# 定義map transformation,此時還沒有真的計算
lineLengths = lines.map(lambda s: len(s))

# 如果希望在action後能重複使用lineLengths的RDD,可加上persist
# lineLengths.persist()

# reduce action,此時spark將計算分成許多tasks到不同機器,
# 每台機器執行自已的map與local reduction,再傳回至master
totalLength = lineLengths.reduce(lambda a, b: a + b)

Spark中使用自訂的函數

"""MyScript.py"""
if __name__ == "__main__":
    def myFunc(s):
        words = s.split(" ")
        return len(words)

    sc = SparkContext(...)
    sc.textFile("file.txt").map(myFunc)

results matching ""

    No results matching ""