pyspark memo
Classic Staring Code
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('conf').setMaster("local[*]")
# to use spark, SparkContext is necessary.
sc = SparkContext(conf=conf)
data = [1, 2, 3, 4]
rdd = sc.parallelized(data, 4)
# or you can read from file (hdfs, file, ...)
file = sc.textFile('README.md', 4)
# no materialization so far.
Transformations
typical one
- map
- filter
- distince
- flatMap
rdd.map(lambda x: x * 2)
rdd.filter(lambda x: x % 2 == 0)
rdd.distinct()
>>> [1, 2, 3, 3] -> [1, 2, 3]
# flatMap will flap list.
rdd.flatMap(lambda x: [x, x+5])
>>> [1, 2, 3] -> [1, 6, 2, 7, 3, 8]
rdd.flatMap(lambda x: list(x))
>>> [{1, 2}, {3}] -> [1, 2, 3]
# transformation, so all not materialized.
key-value transformation
- reduceByKey
- sortByKey
- groupByKey
rdd.reduceByKey(lambda a, b: a + b)
>>> [(1, 2), (3, 4), (3, 6)] -> [(1, 2), (3, 10)]
# sort by keys
rdd.sortByKey()
>>> [(1, 2), (2, 3), (1, 3)] -> [(1, 2), (1, 3), (2, 3)]
rdd.groupByKey()
>>> [(1, 2), (1, 3), (2, 3)] -> [(1, [2, 3]), (2, [3])]
other
- mapValues
- sortBy
- join
rdd.groupByKey().mapValues(sum)
# False for desending, True for ascending
rdd.sortBy(lambda x:x[1], False)
rdd.join(other_rdd, rdd.id == other_rdd.uid)
Actions
reduce (commutative and associative) take collect takeOrdered(n, key=func) count isEmpty treeReduce(Reduces the elements of this RDD in a multi-level tree pattern. faster than normal one)
rdd = sc.parallelize([1, 2, 3])
rdd.reduce(lambda a, b: a * b)
rdd.take(2)
rdd.collect()
rdd = sc.parallelize([5, 3, 1, 2])
# defualt ascending order, key function can be customized
rdd.takeOrdered(3, lambda s: -1 * s)
rdd = sc.parallelize([1, 2, 3, 4])
# start with $1, using function $2 to merge in partition, use function $3 to merge between partition
rdd.aggregate(set(), lambda x, y: x.add(y), lambda x, y: x.union(y))
# foreach is action, map is transformation
fruits.foreach(lambda x: print("I get a", x))
>>> I get a pen
>>> I get a apple
Accumulator & Broadcast
Accumulator
read-only value for driver write-only for task
accumulator can used in actions or transformations:
- actions: each taskps update to accumulator only once
- failed/slow may get rescheduled, no guarantees
accum = sc.accumulator(0)
rdd = sc.parallelize([1, 2, 3, 4])
def f(x):
global accum
accum += x #pay attention to '+='
rdd.foreach(f)
accum.value
>>> Value:10
file = sc.textFile(inputFile)
blankLines = sc.accumulator(0)
def extractCallSigns(line):
global blankLines
if (line == ""):
blankLines += 1
return line.split(" ")
callSigns = file.flatMap(extractCallSigns)
print '...'
Broadcast
read only on executors
signPrefixes = sc.boradcast(loadCallSignTable())
def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes)
count = sign_count[1]
return (country, count)
countryContactCounts = (contactCounts.map(processSignCount).reduceByKey(lambda x, y: x + y))