Parallel FP Growth in Spark

172 Views Asked by At

I am trying to understand the "add" and "extract" methods of the FPTree class: (https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala).

  1. What is the purpose of 'summaries' variable?
  2. where is the Group list? I assume it is the following, am I correct:
val numParts = if (numPartitions > 0) numPartitions else data.partitions.length
val partitioner = new HashPartitioner(numParts)
  1. What will 'summaries contain for 3 transactions of {a,b,c} , {a,b} , {b,c} where all are frequent?
def add(t: Iterable[T], count: Long = 1L): FPTree[T] = {
  require(count > 0)
  var curr = root
  curr.count += count
  t.foreach { item =>
    val summary = summaries.getOrElseUpdate(item, new Summary)
    summary.count += count
    val child = curr.children.getOrElseUpdate(item, {
      val newNode = new Node(curr)
      newNode.item = item
      summary.nodes += newNode
      newNode
    })
    child.count += count
    curr = child
  }
  this
}

def extract(
    minCount: Long,
    validateSuffix: T => Boolean = _ => true): Iterator[(List[T], Long)] = {
  summaries.iterator.flatMap { case (item, summary) =>
    if (validateSuffix(item) && summary.count >= minCount) {
      Iterator.single((item :: Nil, summary.count)) ++
        project(item).extract(minCount).map { case (t, c) =>
          (item :: t, c)
        }
    } else {
      Iterator.empty
    }
  }
}
1

There are 1 best solutions below

0
1LeveL1 On

After a bit experiments, it is pretty straight forward:

1+2) The partition is indeed the Group representative. It is also how the conditional transactions calculated:

  private def genCondTransactions[Item: ClassTag](
      transaction: Array[Item],
      itemToRank: Map[Item, Int],
      partitioner: Partitioner): mutable.Map[Int, Array[Int]] = {
    val output = mutable.Map.empty[Int, Array[Int]]
    // Filter the basket by frequent items pattern and sort their ranks.
    val filtered = transaction.flatMap(itemToRank.get)
    ju.Arrays.sort(filtered)
    val n = filtered.length
    var i = n - 1
    while (i >= 0) {
      val item = filtered(i)
      val part = partitioner.getPartition(item)
      if (!output.contains(part)) {
        output(part) = filtered.slice(0, i + 1)
      }
      i -= 1
    }
    output
  }
  1. The summaries is just a helper to save the count of items in transaction The extract/project will generate the FIS by using up/down recursion and dependent FP-Trees (project), while checking summaries if traversal that path is needed. summaries of node 'a' will have {b:2,c:1} and children of node 'a' are 'b' and 'c'.