虽然说,曾经也听本科的C语言老师说过国外不看重上课教语言这样的说法,但是完全零基础啃API开始使用 Apache Spark 着实还是让我费了一番力气。
在这里记录一下在Lab1~2里用到的 RDD 和 DataFrame 的各种初级用法以及踩过的坑。
The following content are basically practiced in PySpark.
drinks = [("HoneyLemon", 1.5), ("Mojito", 6), ("TehC", 2)]
rdd = sc.parallelize(drinks)
def f(x): print(x)
p = rdd.foreach(f)
A transformation operation that flattens the RDD/DataFrame (array/map DataFrame columns) after applying the function on every element and returns a new PySpark RDD/DataFrame.
words_filter = rdd.filter(lambda x: 'Mojito' in x)
filtered = words_filter.collect()
print("Fitered RDD -> %s" % (filtered)) # Fitered RDD -> [('Mojito', 6)]
Pair RDD is nothing but RDD containing a key-value pair.
rdd = df.rdd
It only returns a Row object.
So if you want to have the regular RDD format.
Try this:
rdd = df.rdd.map(tuple)
rdd = df.rdd.map(list)
Pair RDD 包含一些特有的函数。
# -*- coding:utf-8 -*-
from pyspark import SparkContext
sc = SparkContext("local", "Scratch")
#SparkContext is the entry point to any spark functionality.
#When we run any Spark application, a driver program starts,
#which has the main function and your SparkContext gets initiated here.
#The driver program then runs the operations inside the executors on worker nodes.
import numpy as np
import pandas as pd
# Before import python packages, you should determine the python driver for pyspark
# make sure you have set your env variables in ./conf/spark-env.sh
#(if it doesn't exist you can use spark-env.sh.template as a base.)
def count(rdd):
#Count() returns the number of elements in the RDD.
rdd_cnt = rdd.count()
print("Number of elements in RDD: %i" %(rdd_cnt))
def collect(rdd):
# Collect returna all the elements in the RDD.
rdd_col = rdd.collect()
print("Elements in RDD: %s" %(rdd_col))
def reduce(rdd):
#reduce() doesn’t return a new iterable but uses the function
#called to reduce the iterable to a single value.
concat = rdd.reduce(lambda x,y: x + ', ' + y)
print("List of all companies : %s" %(concat))
def filter(rdd):
# filter() returns only those elements which meet the condition of the function inside.
rdd_filter = rdd.filter(lambda x: 'Go' in x)
filtered = rdd_filter.collect()
print("Fitered RDD: %s" %(filtered))
def foreach(rdd):
#foreach() is like a for loop that iterates through every elemnt in the RDD
def func(x): print(x)
each_rdd = rdd.foreach(func)
def map(rdd):
#map() returns a new RDD by applying a function to each element in the current RDD.
rdd_map = rdd.map(lambda x: (x, 1))
mapping = rdd_map.collect()
print("Key value pair: %s" % (mapping))
def join(rdd1,rdd2):
#join() returns RDD with a pair of elements with the matching keys
#and all the values for that particular key.
joined = rdd1.join(rdd2)
final = joined.collect()
print("Joined RDD: %s" %(final))
return joined
def groupbykey(rdd):
# groupByKey() transformation is used to Group the values for each key
# in the RDD into a single sequence.
group_rdd = rdd.groupByKey()
for element in group_rdd.map(lambda x : (x[0], list(x[1]))).collect():
def reducebykey(rdd):
# reduceByKey() transformation is used to merge the values of each key
# using an associative reduce function and learned it is a wider transformation
# that shuffles the data across RDD partitions.
rdd2=rdd.reduceByKey(lambda a,b: a+b)
for element in rdd2.collect():
if __name__ == '__main__':
# define rdd
rdd = sc.parallelize (
# action
# count(rdd)
# collect(rdd)
# foreach(rdd)
# reduce(rdd)
# define rdd1 rdd2, which are used for join
rdd1 = sc.parallelize([("Amazon", 1), ("IBM", 4)])
rdd2 = sc.parallelize([("Amazon", 2), ("IBM", 5)])
rdd_joined = join(rdd1,rdd2)
