cannot return by tuplestore_putvalues

24 Views Asked by At

Here is my code, I tried to calculate average value for each group by using SPI. However, I cannot add the average value into result using tuplestore_putvalues

PG_MODULE_MAGIC;

typedef struct {
    int l_suppkey;
    int l_returnflag_int;
    float4 *quantities;
    int count;
    int capacity;
} MyGroup;

typedef struct {
    MyGroup *groups;
    int numGroups;
    int capacity;
} GroupsContext;

// Utility function declarations
static void prepTuplestoreResult(FunctionCallInfo fcinfo);
static MyGroup* findOrCreateGroup(GroupsContext *context, int l_suppkey, int l_returnflag_int);
static void addQuantityToGroup(MyGroup *group, float4 quantity);
static float4 calculateRandomSampleAverage(float4 *quantities, int count);


static void
prepTuplestoreResult(FunctionCallInfo fcinfo)
{
    ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;

    /* check to see if query supports us returning a tuplestore */
    if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("set-valued function called in context that cannot accept a set")));
    if (!(rsinfo->allowedModes & SFRM_Materialize))
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("materialize mode required, but it is not allowed in this context")));

    /* let the executor know we're sending back a tuplestore */
    rsinfo->returnMode = SFRM_Materialize;

    /* caller must fill these to return a non-empty result */
    rsinfo->setResult = NULL;
    rsinfo->setDesc = NULL;
}

static MyGroup* findOrCreateGroup(GroupsContext *context, int l_suppkey, int l_returnflag_int) {
 
    int i;
    for (i = 0; i < context->numGroups; ++i) {
        if (context->groups[i].l_suppkey == l_suppkey && context->groups[i].l_returnflag_int == l_returnflag_int) {
            return &context->groups[i];
        }
    }


    if (context->numGroups >= context->capacity) {
        
        context->capacity *= 2;
        context->groups = (MyGroup *) repalloc(context->groups, sizeof(MyGroup) * context->capacity);
    }

    MyGroup *newGroup = &context->groups[context->numGroups++];
    newGroup->l_suppkey = l_suppkey;
    newGroup->l_returnflag_int = l_returnflag_int;
    newGroup->quantities = (float4 *) palloc(sizeof(float4) * 100); // problem
    newGroup->count = 0;
    newGroup->capacity = 100;

    return newGroup;
}


static void addQuantityToGroup(MyGroup *group, float4 quantity) {
    if (group->count >= group->capacity) {
        
        group->capacity *= 2;
        group->quantities = (float4 *) repalloc(group->quantities, sizeof(float4) * group->capacity);
    }
    group->quantities[group->count++] = quantity;
}


static float4 calculateRandomSampleAverage(float4 *quantities, int count) {
    int sampleSize = 1000;
    float4 sum = 0;
    int i;
    for (i = 0; i < sampleSize; ++i) {
        int idx = rand() % count; 
        sum += quantities[idx];
    }
    return sum / sampleSize;
}



PG_FUNCTION_INFO_V1(spi_bootstrap2);

Datum spi_bootstrap2(PG_FUNCTION_ARGS) {
    int ret;
    int i;
    Tuplestorestate *tupstore;
    TupleDesc tupdesc;
    //MemoryContext oldcontext;

    // Connect to SPI
    if (SPI_connect() != SPI_OK_CONNECT) {
        ereport(ERROR, (errmsg("SPI_connect failed")));
    }

    // Prepare and execute the SQL query
    char sql[1024];
    char* sampleSize = text_to_cstring(PG_GETARG_TEXT_PP(0));
    char* tablename = text_to_cstring(PG_GETARG_TEXT_PP(1));
    char* otherAttribue = text_to_cstring(PG_GETARG_TEXT_PP(2));
    char* groupby = text_to_cstring(PG_GETARG_TEXT_PP(3));
    prepTuplestoreResult(fcinfo);
    ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    //oldcontext = MemoryContextSwitchTo(CurrentMemoryContext);

    snprintf(sql, sizeof(sql), "select * from reservoir_sampler_tpch(%s,'%s','%s','%s');",sampleSize,tablename,otherAttribue,groupby);
    //elog(INFO, "SPI query -- %s", sql);
    ret = SPI_execute(sql, true, 0);
    if (ret != SPI_OK_SELECT) {
        SPI_finish();
        ereport(ERROR, (errmsg("SPI_execute failed")));
    }

    // Prepare for tuplestore use
    tupdesc = CreateTemplateTupleDesc(3, false);
    TupleDescInitEntry(tupdesc, (AttrNumber) 1, "l_suppkey", INT4OID, -1, 0);
    TupleDescInitEntry(tupdesc, (AttrNumber) 2, "l_returnflag_int", INT4OID, -1, 0);
    TupleDescInitEntry(tupdesc, (AttrNumber) 3, "avg_l_quantity", FLOAT4OID, -1, 0);
    //TupleDescInitEntry(tupdesc, (AttrNumber) 3, "avg_l_quantity", INT4OID, -1, 0);
    tupdesc = BlessTupleDesc(tupdesc);
    
    tupstore = tuplestore_begin_heap(true, false, work_mem);
    //MemoryContextSwitchTo(oldcontext); //problem 

    
    

    // Initialize GroupsContext
    GroupsContext groupsContext;
    groupsContext.groups = (MyGroup *)palloc(sizeof(MyGroup) * 30000); // problem 1Initial capacity
    groupsContext.numGroups = 0;
    groupsContext.capacity = 30000;

    // Process SPI results
   
    for (i = 0; i < SPI_processed; i++) {
        //HeapTuple tuple = SPI_tuptable->vals[i];
        //TupleDesc tupdesc = SPI_tuptable->tupdesc;
        //elog(INFO, "SPI current id is -- %d", i);

        int attnum1 = SPI_fnumber(SPI_tuptable->tupdesc, "l_suppkey");
        int attnum2 = SPI_fnumber(SPI_tuptable->tupdesc, "l_returnflag_int");
        int attnum3 = SPI_fnumber(SPI_tuptable->tupdesc, "l_quantity");
        char* value1 = SPI_getvalue((SPI_tuptable->vals)[i], SPI_tuptable->tupdesc, attnum1);
        char* value2 = SPI_getvalue((SPI_tuptable->vals)[i], SPI_tuptable->tupdesc, attnum2);
        char* value3 = SPI_getvalue((SPI_tuptable->vals)[i], SPI_tuptable->tupdesc, attnum3);
        //elog(INFO, "SPI l_suppkey -- %d", attnum1);
        //elog(INFO, "SPI l_returnflag_int -- %d", attnum2);
        //elog(INFO, "SPI quantity -- %d", attnum3);
        //elog(INFO, "SPI l_suppkey -- %s", value1);
        //elog(INFO, "SPI l_returnflag_int -- %s", value2);
        //elog(INFO, "SPI quantity -- %s", value3);
        int l_suppkey = atoi(value1);
        int l_returnflag_int = atoi(value2);
        int quantity = atoi(value3);

        //int l_suppkey = DatumGetInt32(SPI_getbinval(tuple, tupdesc, 1, NULL));
        //int l_returnflag_int = DatumGetInt32(SPI_getbinval(tuple, tupdesc, 2, NULL));
        //int quantity = DatumGetInt32(SPI_getbinval(tuple, tupdesc, 3, NULL));
        //elog(INFO, "SPI l_suppkey -- %d", l_suppkey);
        //elog(INFO, "SPI l_returnflag_int -- %d", l_returnflag_int);
        //elog(INFO, "SPI quantity -- %d", quantity);
      
        MyGroup *group = findOrCreateGroup(&groupsContext, l_suppkey, l_returnflag_int);
        addQuantityToGroup(group, quantity);

        //elog(INFO, "group l_suppkey is %d", group->l_suppkey);
        //elog(INFO, "group l_returnflag_int is %d", group->l_returnflag_int); 
    }
    elog(INFO, "Finish adding");
    // Process each group: calculate random sample average and store results
    srand(time(NULL)); // Initialize random seed
    int j;
    for (j = 0; j < groupsContext.numGroups; j++) {
        elog(INFO, "SPI j is -- %d", j);
       
        MyGroup *group = &groupsContext.groups[j];
        
        float4 avg_l_quantity = calculateRandomSampleAverage(group->quantities, group->count);

        Datum values[3];
        bool nulls[3] = {false, false, false};

        values[0] = Int32GetDatum(group->l_suppkey);
        values[1] = Int32GetDatum(group->l_returnflag_int);
        values[2] = Float4GetDatum(avg_l_quantity);
        //values[0] = group->l_suppkey;
        //values[1] = group->l_returnflag_int;
        //values[2] = avg_quantity;
        elog(INFO, "l_suppkey is %d",values[0]);
        elog(INFO, "l_returnflag_int is %d",values[1]);
        elog(INFO, "avg_l_quantity is %f",avg_l_quantity);
        
        

        tuplestore_putvalues(tupstore, tupdesc, values, nulls);
        
    }
    

    tuplestore_donestoring(tupstore);
    // Cleanup

    rsinfo->setResult = tupstore;
    rsinfo->setDesc = tupdesc;
    rsinfo->returnMode = SFRM_Materialize;
    
    SPI_finish();

    PG_RETURN_NULL();
}

I could generate the correct results until tuplestore_putvalues(tupstore, tupdesc, values, nulls); So I wonder whether there is something wrong?

0

There are 0 best solutions below