I am trying to join two tables in a mapreduce job that I am working on. I have two tables that I am joining using one table to fill the other.
Please note that I was following the example (Part 1: Joining) from this Blog so you can visit the blog if interested.
Goal: I want to associate a location to each purchase.
The Mapper reads both datasets and distinguishes them by the number of fields in each row. Transaction records have 5 fields, users have only 4.
The mapper does two things:
- For transactions - Extract the user_id and product_id
- For users - Extract the user_id and the location
- The mapper outputs three fields: user_id, product_id, location
Here are the two datasets which I will read using sys.stdin:
"users.txt" dataset
id email language location
1 [email protected] EN US
2 [email protected] EN GB
3 [email protected] FR FR
"transaction.txt" dataset
tran-id prod-id user-id purch-amt item-desc
1 1 1 300 a jumper
2 1 2 300 a jumper
3 1 2 300 a jumper
4 2 3 100 a rubber chicken
5 1 3 300 a jumper
The mapper.py code is as follows:
#!/usr/bin/env python
import sys
for line in sys.stdin:
# Setting some defaults
user_id = ""
product_id = "-"
location = "-"
line = line.strip()
splits = line.split("\t")
if len(splits) == 5: # Transactions have more columns than users
user_id = splits[2]
product_id = splits[1]
else:
user_id = splits[0]
location = splits[3]
print (('%s\t%s\t%s') % (user_id,product_id,location))
The above mapper creates an output like this:
1 1 -
1 - US
2 1 -
2 1 -
2 - GB
3 1 -
3 2 -
3 - FR
Now in the reducer, I want to this. For each new user the Reducer will first remember that user’s location and then add this location to the transactions.
The reducer.py is as follows:
#!/usr/bin/env python
import sys
import string
last_user_id = None
cur_location = "-"
for line in sys.stdin:
line = line.strip()
user_id,product_id,location = line.split("\t")
if not last_user_id or last_user_id != user_id:
last_user_id = user_id
cur_location = location
elif user_id == last_user_id:
location = cur_location
print (('%s\t%s') % (product_id,location))
The above reducer produces this output:
- -
1 -
- -
2 -
- -
But the output I am expecting is:
1 FR
1 GB
1 GB
1 US
2 FR
I am sure the problem is in the reducer script, but I can't figure it out.