I want to modify the innerjoin-example of the advanced tutorial such that it makes sparse matrix multiplication with mapreduce possible (described by Ullman). Therefore I need a second map-reduce step summing values of equal position in the result matrix.
Unfortunately I don't manage to get the output of the first reduce-function of the class CsvInnerJoin into the map function of SumJob.
import sys
sys.path.append("/home/damian/disco/lib/")
from disco.core import Job, result_iterator
from disco.worker.classic.func import chain_reader
import csv, sys
if __name__ == '__main__':
input_filename = "input.csv"
output_filename = "output.csv"
if len(sys.argv) > 1:
input_filename = sys.argv[1]
if len(sys.argv) > 2:
output_filename = sys.argv[2]
from CsvInnerJoiner import CsvInnerJoiner
from SumJob import SumJob
job = CsvInnerJoiner().run(input=[input_filename])
job = SumJob().run() (******************)
with open(output_filename, 'w') as fp:
writer = csv.writer(fp)
for url_key, descriptors in result_iterator(job.wait(show=True)):
writer.writerow([url_key] + descriptors)
CsvInnerJoiner.py is this file:
import sys
sys.path.append("/home/damian/disco/lib/")
from disco.core import Job, result_iterator
from disco.worker.classic.func import chain_reader
import csv, sys
class CsvInnerJoiner(Job):
partitions = 2
sort = True
def map(self, row, params):
yield row[0], row[1:]
@staticmethod
def map_reader(fd, size, url, params):
reader = csv.reader(fd, delimiter=',')
for row in reader:
yield row
#@staticmethod
def reduce(self, rows_iter, out, params):
from disco.util import kvgroup
from itertools import chain
#for url_key, descriptors in kvgroup(sorted(rows_iter)):
for url_key, descriptors in kvgroup(rows_iter):
merged_descriptors = list(chain.from_iterable(descriptors))
print url_key,"_______",merged_descriptors
if len(merged_descriptors) > 3:
Alist = merged_descriptors[:merged_descriptors.index("B")]
Blist = merged_descriptors[merged_descriptors.index("B"):]
Alistlength = len(Alist)/3
Blistlength = len(Blist)/3
for i in range(Alistlength):
for j in range(Blistlength):
container = int(Alist[3*i+2])*int(Blist[3*j+2])
yield [Alist[3*i+1],Blist[3*j+1]],container
#out.add(Alist[3*i+1],[Blist[3*j+1],container])
SumJob.py is this:
import sys
sys.path.append("/home/damian/disco/lib/")
from disco.core import Job, result_iterator
from disco.worker.classic.func import chain_reader
import csv, sys
class SumJob(Job):
map_reader = staticmethod(chain_reader)
@staticmethod
def map(self,key_value, params):
print "KEY::::::",str(key_value[0])
print "VAL::::::",str(key_value[1])
yield key_value[0], key_value[1]
@staticmethod
def reduce(self,key_value,out, params):
Summe = sum(key_value[1])
out.add(key_value[0],Summe)
The problem is that I do not know how to change the (**) line such that the second output of the first reduce step is taken as input by the second map-function.
Thank you so much for your help! Damian
You can use the output of a map/reduce stage as the input of the another (The return from
job.wait()).I'm not an expert by that chunk of code works for me (I implement the
pagerankalgorithm which has many stages and several iterations).