Add a column to an Apache Beam Pcollection in Go

27 Views Asked by At

I have written an Apache Beam function in Go which queries a database, creating a PCollection and then writes that PCollection to BigQuery. I would like to add a column to the PCollection before writing it to BigQuery.

Part of the complication here definitely comes from the fact that I am trying to allow this function to handle arbitrary types (eg structs) to read from different tables in my DB.

The relevant parts of my code look something like this:

func init() {
    register.DoFn1x2[beam.X, string, beam.X](&addTenantColumnFn{})
}

func copyFromSQLToBQ(s beam.Scope, t tenant, cfg *Config, query, targetTable string, rt reflect.Type) {
    queryResults := databaseio.Query(s, "pgx", dsn, query, rt)

    queryResultsWithTenantIDs := beam.ParDo(s, &addTenantColumnFn{TenantID: t.ID}, queryResults)

    bigqueryio.Write(s, cfg.GCPProjectID, bqTableName(cfg.GCPProjectID, targetTable), queryResultsWithTenantIDs, bigqueryio.WithCreateDisposition(bigquery.CreateIfNeeded))
}

type addTenantColumnFn struct {
    TenantID string
}

func (a *addTenantColumnFn) ProcessElement(x beam.X) (string, beam.X) {
    return a.TenantID, x
}

Unfortunately, when I attempt to run, I get a compile time error:

panic: Method ProcessElement in DoFn github.com/apache/beam/sdks/v2/go/pkg/beam.addFixedKeyFn does not have enough main inputs. 2 main inputs were expected, but only 1 inputs were found.
Full error:
        inserting ParDo in scope root/bigquery.Write
        graph.AsDoFn: for Fn named github.com/apache/beam/sdks/v2/go/pkg/beam.addFixedKeyFn
ProcessElement method has too few main inputs

I also tried implementing the solution I found in this post, by adding changing my do line to the following: queryResultsWithTenantIDs := beam.ParDo(s, &addTenantColumnFn{TenantID: t.ID}, queryResults, beam.TypeDefinition{Var: beam.XType, T: reflect.TypeOf("")})

This leads to an error as well:

panic:  inserting ParDo in scope root
        creating new DoFn in scope root
        binding fn github.com/apache/beam/sdks/v2/go/pkg/beam/register.registerDoFn1x2StructWrappersAndFuncs[...].func2.1
cannot substitute type X with string, already defined as main.user
0

There are 0 best solutions below