I have the following tables stored as separate csv files:
customers (c_id, gender, address, dob)
meals (r_id, c_id, date) (so a customer having a meal at a restaurant)
restaurants (type, r_id)
restaurants has size 10.000, meals 1.000.000 and customers 2.000.000
I need the following map reduce job: for all restaurants, show the number of meals they appear in where the restaurant is of type 'bistro' and customers are male.
This translates to the following sql query:
SELECT r.r_id, COUNT(*) AS count_meals
FROM restaurants r
INNER JOIN meals m ON r.r_id = r.r_id
INNER JOIN customers c ON m.c_id = c.c_id
WHERE c.gender = 'MALE' AND r.type = 'bistro'
GROUP BY r.r_id
The conditions above reduce the sizes of the table as so: restaurants 300, customers 900.000 and meals remains the same
The map reduce job is started with the following command:
python3 mrCustomers.py < "data/restaurants.csv" "data/customers.csv" "data/meals.csv" > output.csv
To read in the files I relied on the length of each entry and my mapper looks like this:
from mrjob.job import MRJob
from mrjob.step import MRStep
from mr3px.csvprotocol import CsvProtocol
import csv
class MRCustomers(MRJob):
OUTPUT_PROTOCOL = CsvProtocol
def mapper(self, _, line):
if line.startswith('c_id') or line.startswith('r_id') or line.startswith('type'):
return
reader = csv.reader([line])
columns = next(reader)
if len(columns) == 4:
if str(columns[1]) != 'MALE':
return
c_id = columns[0]
yield c_id, "customer"
elif len(columns) == 3:
r_id = columns[0]
c_id = columns[1]
yield r_id, ("M", c_id)
else:
type = columns[0]
if type == 'bistro':
r_id = columns[1]
yield r_id, "restaurant"
The problem I have is that in the reducer I always receive chunks of each table, e.g. in the first round I get 30 customers, 20.000 meals and no restaurants. It is impossible for me to perform a join if I don't get matching sets from each mapper round. Is my mapper logic flawed? Also how should I go about writing the reducer?
Also in the reducer theoretically I'm supposed to get aggregated tuples that have the same key, so the yield r_id, "restaurant" and yield r_id, ("M", c_id) should be aggregated and received as one match in the reducer but this doesn't happen
Edit: So I've managed to join meals and restaurants together on the reducer side in the first job-step as follows:
def reducer1(self, key, values):
joins = [x for x in values]
if len(joins) > 1:
if joins[0][0] == "restaurants":
for tup in joins[1:]:
c_id = tup[1]
yield c_id, (tup[0], key)
elif joins[0][0] == "customer":
for customer in joins:
yield key, ("customer", key)
and then in the second job-step I try to aggregate the results in a mapper then pass it to a reducer and join on customers
def mapper2(self, key, value):
yield key, value
def reducer2(self, key, values):
joins = [x for x in values]
if len(joins) > 1:
if joins[0][0] == "customer":
for tup in joins[1:]:
yield tup[1], 1
def reducer3(self, key, values):
yield None, (key, sum(values))
def steps(self):
first_step = MRStep(
mapper=self.mapper1,
reducer=self.reducer1,
)
second_step = MRStep(
mapper=self.mapper2,
reducer=self.reducer2,
)
third_step = MRStep(
reducer=self.reducer3
)
return [first_step, second_step, third_step]
Now I have the following problem: the tuples that are generated in the output are part of the full result from the SQL query, so it is an incomplete solution. I don't understand why the join isn't working as expected