Creating Pyspark RDD using random key from tuple

76 Views Asked by At

I'm studing Apache Spark and found some interesting thing. When I creating a new rdd with pair of key-value, where key is randomly choosing from tuple - the result of reducebykey is not correct.

from pyspark.sql import SparkSession
import random
spark:SparkSession = SparkSession.builder.master("local[1]").appName("SparkNew").getOrCreate()
data = [1,2,3,4,5,6,7,8,9,10]
rdd = spark.sparkContext.parallelize(data)
indexes = ('a', 'b', 'c')
rdd2 = rdd.map(lambda x:(indexes[(random.randint(0,2))], 1))
rdd2.take(10)

After creating rdd2, for example, i'm getting this

[('c', 1),
 ('a', 1),
 ('b', 1),
 ('a', 1),
 ('a', 1),
 ('c', 1),
 ('a', 1),
 ('a', 1),
 ('c', 1),
 ('a', 1)]

And after reduceByKey iI'm getting this

[('c', 5), ('a', 2), ('b', 3)] 

Which is obviously not correct. Anyone knows why it's happening? Is it because of randint? But why? Thanks for helping!

2

There are 2 best solutions below

0
user238607 On

If you are looking for a way to generate random values between a given range, you can use uniform random distribution function F.rand() and then scale it by a range as shown below.

from pyspark import SQLContext
from pyspark.sql.functions import *
import pyspark.sql.functions as F


sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = list(range(100000))
data1 = [[x] for x in data1]

df1Columns = ["id"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)

df1.show(n=5, truncate=False)

start = 0
end = 2

df2 = df1.withColumn("randInt", F.lit(F.floor(start + F.rand() * ( (end+1) - start) )))

df2.show(n=5, truncate=False)

print("cumulative count")
result = df2.select("randInt").groupBy("randInt").agg(F.count("randInt"))
result.show(n=10, truncate=False)

Output :

+---+
|id |
+---+
|0  |
|1  |
|2  |
|3  |
|4  |
+---+
only showing top 5 rows

+---+-------+
|id |randInt|
+---+-------+
|0  |1      |
|1  |1      |
|2  |1      |
|3  |0      |
|4  |1      |
+---+-------+
only showing top 5 rows

cumulative count
+-------+--------------+
|randInt|count(randInt)|
+-------+--------------+
|0      |33440         |
|1      |33143         |
|2      |33417         |
+-------+--------------+
0
Doof On

You are not using the right the add operation for reducebykey ! you are simply using add operator which wont give you desired result , let me explain

First you need to understand how spark, spark doesn't execute any transformation until an action is being called.

Lets understand this in your code

In the below code you are first importing the library and then initialising spark, then create a rdd with data. Then you are creating the index to map it to your data. Its tricky , lets understand how rdd.map function works i.e Return a new RDD by applying a function to each element of the RDD. So if you need to add each value by it's use this function

lambda x, y: x + y

from pyspark.sql import SparkSession
import random
spark:SparkSession = SparkSession.builder.master("local[1]").appName("SparkNew").getOrCreate()
data = [1,2,3,4,5,6,7,8,9,10]
rdd = spark.sparkContext.parallelize(data)
indexes = ('a', 'b', 'c')
rdd2 = rdd.map(lambda x:(indexes[(random.randint(0,2))], 1))

In order to use reduceByKey to add values for each key, use below command

rdd2.reduceByKey(lambda x, y: x + y).collect()

#this gives the desired output 
('b', 3)
('a', 5)
('c', 2)