Question 1
1/ 1pts
Find all distinct countries.
Hint: use select(), distinct()
Your Answer:
a= df.select(Country).distinct()a.show(50,False)
Question 2
1/ 1pts
Find the Name and Price of sales records in Brazil.
Hint: use filter().
Your Answer:
b= df.filter(df[Country] == Brazil)b.select(Name, Price).show()
Question 3
1/ 1pts
For each country, find the total Price.
Hint: Use groupBy()
Your Answer:
df.groupBy(Country).sum(Price).show(50)
Question 4
2/ 2pts
List countries by their total Price in descending order.
Hint: Use orderBy()
Your Answer:
t= df.groupBy(Country).sum(Price)t.orderBy(sum(Price),ascending=False).show(50)
Question 5
2/ 2pts
Now load a second table countries:
http://www.cse.ust.hk/msbd5003/data/countries.csv
df2 = spark.read.csv(countries.csv, header=True, inferSchema=True)
Redo Question 3, but replace the country names by their IDs.
Hint: Use join()
Your Answer:
t= df.join(df2, df.Country == df2.Country)c= t.groupBy(ID).sum(Price)c.select(ID, sum(Price)).show(50)
Question 6
1/ 3pts
Rewrite the PageRank example using DataFrame API. Here is a skeleton of the code. Your job is to fill in the missing part. The data files can be downloaded at:
https://www.cse.ust.hk/msbd5003/data/pagerank_data.txt
https://www.cse.ust.hk/msbd5003/data/dblp.in
from pyspark.sql.functions import *numOfIterations = 10lines = spark.read.text(pagerank_data.txt)# You can also test your program on the follow larger data set:# lines = spark.read.text(dblp.in)a = lines.select(split(lines[0], ))links = a.select(a[0][0].alias(src), a[0][1].alias(dst))outdegrees = links.groupBy(src).count()ranks = outdegrees.select(src, lit(1).alias(rank))for iteration in range(numOfIterations):# FILL IN THIS PARTranks.orderBy(desc(rank)).show()
Note: There is a bug in the current SparkSQL implementation: The groupBy (followed by some aggregation) method sometimes fails to group all rows with the same key. A temporary workaround is the following:
Suppose you want to do
df.groupBy(A).sum(B)
If it fails to produce the desired result, try
df.withColumnRenamed(A, A1).groupBy(A1).sum(B)
We have reported this bug to the Spark developers and the issue is currently under investigation:
https://issues.apache.org/jira/browse/SPARK-20169(Links to an external site.)Links to an external site.
Your Answer:
from pyspark.sql.functions import *numOfIterations = 10lines = spark.read.text(pagerank_data.txt)a = lines.select(split(lines[0], ))links = a.select(a[0][0].alias(src), a[0][1].alias(dst))outdegrees = links.groupBy(src).count()ranks = outdegrees.select(src, lit(1).alias(rank))# joining outdegrees and ranks dataframe based on src column before entering the loopjoined_df = outdegrees.join(ranks, on=src)joined_df = joined_df.withColumn(X, (col(rank)/col(count)))for iteration in range(numOfIterations):joined_df = joined_df.withColumn(Result, (col(rank)*col(X)))ranks = links.join(joined_df, on=src, how=left).select(dst,Result)ranks = ranks.withColumnRenamed(dst,PageName).groupBy(PageName).sum(Result)joined_df = ranks.join(joined_df, joined_df.src==ranks.PageName).drop(ranks.PageName)joined_df = joined_df.drop(rank).withColumnRenamed(sum(Result), rank)ranks = ranks.withColumnRenamed(sum(Result), rank)ranks.orderBy(desc(rank)).show()
Reviews
There are no reviews yet.