calculate median of a list of values parallely using Hadoop map-reduce

292 Views Asked by At

I'm new to Hadoop mrjob. I have a text file which consists of data "id groupId value" in each line. I am trying to calculate a median of all values in the text file using Hadoop map-reduce. But i'm stuck when it comes to calculate only the median value. What I get is a median value for each id like:

"123213"        5.0
"123218"        2
"231532"        1
"234634"        7
"234654"        2
"345345"        9
"345445"        4.5
"345645"        2
"346324"        2
"436324"        6
"436456"        2
"674576"        10
"781623"        1.5

The output should be like "median value of all values is: ####". I got influnced by this article https://computehustle.com/2019/09/02/getting-started-with-mapreduce-in-python/ My python file median-mrjob.py :

from mrjob.job import MRJob
from mrjob.step import MRStep

class MRMedian(MRJob):
    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_stats, combiner=self.reducer_count_stats),
            MRStep(reducer=self.reducer_sort_by_values),
            MRStep(reducer=self.reducer_retrieve_median)
        ]

    def mapper_get_stats(self, _, line):
        line_arr = line.split(" ")
        values = int(float(line_arr[-1]))
        id = line_arr[0]
        yield id, values

    def reducer_count_stats(self, key, values):
        yield str(sum(values)).zfill(2), key

    def reducer_sort_by_values(self, values, ids):
        for id in ids:
            yield id, values

    def reducer_retrieve_median(self, id, values):
        valList=[]
        median = 0
        for val in values:
            valList.append(int(val))
        N = len(valList)
        #find the median
        if N % 2 == 0:
            #if N is even
            m1 = N / 2
            m2 = (N / 2) + 1
            #Convert to integer, match post
            m1 = int(m1) - 1
            m2 = int(m2) - 1
            median = (valList[m1] + valList[m2]) / 2 
        else:
            m = (N + 1) / 2
            # Convert to integer, match position
            m = int(m) - 1
            median = valList[m]
        yield (id, median)

if __name__ == '__main__':
   MRMedian.run()

My original text files is about 1million and 1billion line of data, but I have created a test file which has arbitrary data. It has the name input.txt :

781623 2 2.3243
781623 1 1.1243
234654 1 2.122
123218 8 2.1245
436456 22 2.26346
436324 3 6.6667
346324 8 2.123
674576 1 10.1232
345345 1 9.56135
345645 7 2.1231
345445 10 6.1232
231532 1 1.1232
234634 6 7.124
345445 6 3.654376
123213 18 8.123
123213 2 2.1232

What I care about is the values. Considering that might be duplicates. I run the command line in the terminal to run the code python median-mrjob.py input.txt

Update: The point of the assignment is not to use any libraries, so I need to sort the list manually(or maybe some of it as I understood) and calculate the median manually(hardcoding). Otherwise the goal of using MapReduce disappears. Using PySpark is not allowed in this assignment. Check this link for more inspiration Computing median in map reduce

1

There are 1 best solutions below

3
OneCricketeer On

The output should be like "median value of all values is: ####"

Then you need to force all data to one reducer first (effectively defeating the purpose of using MapReduce).

You'd do that by not using the ID as the key and discarding it

def mapper_get_stats(self, _, line):
    line_arr = line.split()
    if line_arr:  # prevent empty lines
        value = float(line_arr[-1])
        yield None, value

After that, sort and find the median (I fixed your parameter order)

def reducer_retrieve_median(self, key, values):
    import statistics
    yield None, f"median value of all values is: {statistics.median(values)}"  # automatically sorts the data

So, only two steps

class MRMedian(MRJob):
    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_stats),
            MRStep(reducer=self.reducer_retrieve_median)
        ]

For the given file, you should see

null    "median value of all values is: 2.2938799999999997"

original text files is about 1million and 1billion line of data

Not that it matters, but which is it?

You should upload the file to HDFS first, then you can use better tools than MrJob for this like Hive or Pig.