Can anyone explain the output of apache-beam streaming pipeline with Fixed Window of 60 seconds?

25 Views Asked by At

I have written a pipeline , which reads streaming data from PubSub, and aggregate the results and write the result to Bigquery. The pipeline is working fine and writing agg result to Bigquery table. But, I am not able to understand, how its calculating the result.

Below is my code of apache-beam:

import json
import os
import typing
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.transforms.sql import SqlTransform

table_spec1 = bigquery.TableReference(
    projectId=<PROJECT_ID>,
    datasetId='training',
    tableId='dflow_agg')


SCHEMA = {
        "fields": [
            {
                "name": 'Name',
                "type": "STRING",
                
            },
            {
                "name": 'avg_sal',
                "type": "FLOAT64"
            },
            {
                "name": 'window_start',
                "type": "STRING",
                "mode": "NULLABLE"
            },
              {
                "name": 'window_end',
                "type": "STRING",
                "mode": "NULLABLE"
            }
        ]
    }

pipeline_options = PipelineOptions( streaming=True)

class ProcessWords(beam.DoFn):
  def process(self, ele):
    yield eval(ele)





def run():
    with beam.Pipeline(options=pipeline_options) as p:


        out= (
            p
            | "Read from Pub/Sub subscription" >> beam.io.ReadFromPubSub(subscription="projects/<PROJECT_ID>/subscriptions/Test-sub")
            | "Decode and parse Json" >> beam.Map(lambda element: element.decode("utf-8"))
            |"Formatting " >> beam.ParDo(ProcessWords()) #.with_output_types(CommonLog)
            | "Create beam Row" >> beam.Map(lambda x: beam.Row(Name=str(x[0]),Stream=str(x[1]),Salary=int(x[2])))
            
            |"window" >> beam.WindowInto(beam.window.FixedWindows(30))  
            | SqlTransform(
                """
                    SELECT
                    Name,
                    AVG(Salary) AS avg_sal
                    FROM PCOLLECTION
                    GROUP BY Name
                    
                """)
   
        | "Assemble Dictionary" >> beam.Map(
        lambda row,
        window=beam.DoFn.WindowParam: {
            "Name": row.Name,
            "avg_sal": row.avg_sal,
            "window_start": window.start.to_rfc3339(),
            "window_end": window.end.to_rfc3339()
        })



            | "Write to BigQuery" >> beam.io.WriteToBigQuery(
                table=table_spec1,
                dataset='training',
                schema=SCHEMA,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            )
            # |beam.MapTuple(lambda N,S,M : {"Name":N,"Stream":S,"Marks":M})
            # | beam.Map(print)
        )
    

    # p.run()
if __name__ == '__main__':
  run()

I have a python code, which publishes msg to a PubSub topic with a sleep time of 5 seconds. Below is my publisher code.

import time
import random
from google.cloud import pubsub_v1

project_id = "<PROJECT_ID>"
topic_name = "Test"
publisher = pubsub_v1.PublisherClient()

topic_path = publisher.topic_path(project_id, topic_name)
i=0
while i < 101:
    name=['A','B','C']
    sal=[10,12,11,23,77,54,23,21,4,9,5,22,19]
    stream=["Stream1","Stream2","Stream3"]

    data = '{}'.format((random.choice(name),random.choice(stream),random.choice(sal)))
    # Data must be a bytestring
    data = data.encode("utf-8")
    future = publisher.publish(topic_path, data)
    print(data)
    future.result()
    time.sleep(5)
    i+=1

I have printed the msgs which are published to the topic and below are 20 msg from beginning.

b"('B', 'Stream2', 23)"
b"('A', 'Stream3', 77)"
b"('C', 'Stream2', 10)"
b"('B', 'Stream2', 10)"
b"('B', 'Stream3', 10)"
b"('B', 'Stream2', 19)"
b"('C', 'Stream1', 11)"
b"('C', 'Stream2', 22)"
b"('A', 'Stream2', 12)"
b"('B', 'Stream1', 11)"
b"('A', 'Stream2', 23)"
b"('C', 'Stream3', 23)"
b"('A', 'Stream2', 4)"
b"('C', 'Stream2', 22)"
b"('B', 'Stream2', 4)"
b"('C', 'Stream2', 10)"
b"('B', 'Stream3', 11)"
b"('C', 'Stream2', 10)"
b"('C', 'Stream2', 22)"
b"('A', 'Stream2', 19)"

Bigquery results when ordering by Window start time looks like this:

select * from training.dflow_agg order by window_start

Bigquery Table data

Now, I want to understand, how the avg is calculated.

Case1: If window size is 60 sec, then the first window will contain element

b"('B', 'Stream2', 23)"
b"('A', 'Stream3', 77)"
b"('C', 'Stream2', 10)"
b"('B', 'Stream2', 10)"
b"('B', 'Stream3', 10)"
b"('B', 'Stream2', 19)"

So, The Avg of A will be 77-> This is correct,as there is only one A and Avg of one element is same.

Avg of B should be 23+10+10+10+19= 72/4 =18 ( Divide by 4, because total 4 B's are there)

Why C is not in the Same window start and window end time,as of A and B, because C was also published within 30 sec of window time (See, pubsub output above)

Can anyone explain me the output.

Thanks in advance.

Above all things I have tried

0

There are 0 best solutions below