I am trying to write a pyspark script to generate all the prime numbers <= a given number. For example, generate all the primes upto 1 billion.
I wrote the following code. It does well for small numbers, but as soon as the number goes to 100 million, the script performs poorly.
from pyspark.sql import SparkSession
from math import isqrt
def is_perfect_square(num):
root = isqrt(num)
return root*root == num
def sieve_of_eratosthenes_partition(iterator):
upper_limit = max(iterator) # Upper limit for prime number generation
prime_flag = [True] * len(iterator) # Initialize boolean array for primes
result = []
cur_prime = 2
while cur_prime * cur_prime <= upper_limit:
i = 0
if ((cur_prime % 2 == 0 and cur_prime != 2) or is_perfect_square(cur_prime)):
cur_prime += 1
else:
for num in iterator:
if(num % cur_prime == 0 and num != cur_prime):
prime_flag[i] = False
i += 1
cur_prime += 1
for num, is_prime in zip(iterator, prime_flag):
if is_prime and num > 1:
result.append(num)
return result
spark = SparkSession.builder.appName("SievePrimesMapPartitions").getOrCreate()
n = 10**7 # End range
numbers = spark.sparkContext.parallelize(range(1, n+1), 1000)
result_rdd = numbers.mapPartitions(sieve_of_eratosthenes_partition)
# result_rdd.map(str).saveAsTextFile("primes")
primes = result_rdd.collect()
print(primes)
print(len(primes))
I am dividing my range (1, 10 million) into 1000 partitions like (1.....10,000), (10,0001....20,000) and so on. For each partition, I am applying the sieve function. The sieve iteratively filters out all the multiples of primes in the given range.
I can see the limiting factor in my script. For partition with the range of smallest numbers, say (1...10,000), the sieve function will iterate upto 100 only. However, for the partition with the range of largest numbers, i.e., the last partition (9,990,000....10,000,000), the sieve function will iterate upto sqrt(10 million). In effect, the performance of my script is decided by the time taken to process the partition with the largest numbers.
How can I improve upon this? Is there any other way to partition my dataset? One other thought that came to my mind is to make a sieve upto sqrt(given number). Then distribute this sieve to the nodes. On each node, filter out all the multiples of the respective sieves and combine the results to get the list of primes. Does this look like an improvement?
Like you thought, a good way of scaling up a Sieve of Eratosthenes algorithm can be doneby initially creating a smaller sieve up to the square root of the given number and then distributing this sieve across your cluster.
Here's how you could implement this:
then you broadcast it to your sparks:
now you just get your cluster to use the broadcasted list of primes to filter out their multiples.
now you just have to print your results like you've done, and boom. a bajillion primes are at your fingertips :)