Joining Two Tables in Python MapReduce

322 Views Asked by At

I am trying to join two tables in a mapreduce job that I am working on. I have two tables that I am joining using one table to fill the other.

Please note that I was following the example (Part 1: Joining) from this Blog so you can visit the blog if interested.

Goal: I want to associate a location to each purchase.

The Mapper reads both datasets and distinguishes them by the number of fields in each row. Transaction records have 5 fields, users have only 4.

The mapper does two things:

  • For transactions - Extract the user_id and product_id
  • For users - Extract the user_id and the location
  • The mapper outputs three fields: user_id, product_id, location

Here are the two datasets which I will read using sys.stdin:

"users.txt" dataset
id   email        language location
1   [email protected]    EN  US
2   [email protected]   EN  GB
3   [email protected]   FR  FR

"transaction.txt" dataset    
tran-id prod-id user-id purch-amt item-desc
1          1      1          300    a jumper
2          1      2          300    a jumper
3          1      2          300    a jumper
4          2      3          100    a rubber chicken
5          1      3          300    a jumper

The mapper.py code is as follows:

#!/usr/bin/env python

import sys
for line in sys.stdin:
    # Setting some defaults
        user_id = ""
        product_id = "-"
        location = "-"

        line = line.strip()
        splits = line.split("\t")
        if len(splits) == 5: # Transactions have more columns than users
           user_id = splits[2]
           product_id = splits[1]
        else:
            user_id = splits[0]
            location = splits[3]
        print (('%s\t%s\t%s') % (user_id,product_id,location))

The above mapper creates an output like this:

1       1       -
1       -       US
2       1       -
2       1       -
2       -       GB
3       1       -
3       2       -
3       -       FR

Now in the reducer, I want to this. For each new user the Reducer will first remember that user’s location and then add this location to the transactions.

The reducer.py is as follows:

#!/usr/bin/env python
import sys
import string

last_user_id = None
cur_location = "-"

for line in sys.stdin:
    line = line.strip()
    user_id,product_id,location = line.split("\t")

    if not last_user_id or last_user_id != user_id:
        last_user_id = user_id
        cur_location = location
    elif user_id == last_user_id:
        location = cur_location
        print (('%s\t%s') % (product_id,location))

The above reducer produces this output:

-       -
1       -
-       -
2       -
-       -

But the output I am expecting is:

1   FR
1   GB
1   GB
1   US
2   FR

I am sure the problem is in the reducer script, but I can't figure it out.

0

There are 0 best solutions below