Here is my code, I tried to calculate average value for each group by using SPI. However, I cannot add the average value into result using tuplestore_putvalues
PG_MODULE_MAGIC;
typedef struct {
int l_suppkey;
int l_returnflag_int;
float4 *quantities;
int count;
int capacity;
} MyGroup;
typedef struct {
MyGroup *groups;
int numGroups;
int capacity;
} GroupsContext;
// Utility function declarations
static void prepTuplestoreResult(FunctionCallInfo fcinfo);
static MyGroup* findOrCreateGroup(GroupsContext *context, int l_suppkey, int l_returnflag_int);
static void addQuantityToGroup(MyGroup *group, float4 quantity);
static float4 calculateRandomSampleAverage(float4 *quantities, int count);
static void
prepTuplestoreResult(FunctionCallInfo fcinfo)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
/* check to see if query supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context that cannot accept a set")));
if (!(rsinfo->allowedModes & SFRM_Materialize))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not allowed in this context")));
/* let the executor know we're sending back a tuplestore */
rsinfo->returnMode = SFRM_Materialize;
/* caller must fill these to return a non-empty result */
rsinfo->setResult = NULL;
rsinfo->setDesc = NULL;
}
static MyGroup* findOrCreateGroup(GroupsContext *context, int l_suppkey, int l_returnflag_int) {
int i;
for (i = 0; i < context->numGroups; ++i) {
if (context->groups[i].l_suppkey == l_suppkey && context->groups[i].l_returnflag_int == l_returnflag_int) {
return &context->groups[i];
}
}
if (context->numGroups >= context->capacity) {
context->capacity *= 2;
context->groups = (MyGroup *) repalloc(context->groups, sizeof(MyGroup) * context->capacity);
}
MyGroup *newGroup = &context->groups[context->numGroups++];
newGroup->l_suppkey = l_suppkey;
newGroup->l_returnflag_int = l_returnflag_int;
newGroup->quantities = (float4 *) palloc(sizeof(float4) * 100); // problem
newGroup->count = 0;
newGroup->capacity = 100;
return newGroup;
}
static void addQuantityToGroup(MyGroup *group, float4 quantity) {
if (group->count >= group->capacity) {
group->capacity *= 2;
group->quantities = (float4 *) repalloc(group->quantities, sizeof(float4) * group->capacity);
}
group->quantities[group->count++] = quantity;
}
static float4 calculateRandomSampleAverage(float4 *quantities, int count) {
int sampleSize = 1000;
float4 sum = 0;
int i;
for (i = 0; i < sampleSize; ++i) {
int idx = rand() % count;
sum += quantities[idx];
}
return sum / sampleSize;
}
PG_FUNCTION_INFO_V1(spi_bootstrap2);
Datum spi_bootstrap2(PG_FUNCTION_ARGS) {
int ret;
int i;
Tuplestorestate *tupstore;
TupleDesc tupdesc;
//MemoryContext oldcontext;
// Connect to SPI
if (SPI_connect() != SPI_OK_CONNECT) {
ereport(ERROR, (errmsg("SPI_connect failed")));
}
// Prepare and execute the SQL query
char sql[1024];
char* sampleSize = text_to_cstring(PG_GETARG_TEXT_PP(0));
char* tablename = text_to_cstring(PG_GETARG_TEXT_PP(1));
char* otherAttribue = text_to_cstring(PG_GETARG_TEXT_PP(2));
char* groupby = text_to_cstring(PG_GETARG_TEXT_PP(3));
prepTuplestoreResult(fcinfo);
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
//oldcontext = MemoryContextSwitchTo(CurrentMemoryContext);
snprintf(sql, sizeof(sql), "select * from reservoir_sampler_tpch(%s,'%s','%s','%s');",sampleSize,tablename,otherAttribue,groupby);
//elog(INFO, "SPI query -- %s", sql);
ret = SPI_execute(sql, true, 0);
if (ret != SPI_OK_SELECT) {
SPI_finish();
ereport(ERROR, (errmsg("SPI_execute failed")));
}
// Prepare for tuplestore use
tupdesc = CreateTemplateTupleDesc(3, false);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "l_suppkey", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "l_returnflag_int", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "avg_l_quantity", FLOAT4OID, -1, 0);
//TupleDescInitEntry(tupdesc, (AttrNumber) 3, "avg_l_quantity", INT4OID, -1, 0);
tupdesc = BlessTupleDesc(tupdesc);
tupstore = tuplestore_begin_heap(true, false, work_mem);
//MemoryContextSwitchTo(oldcontext); //problem
// Initialize GroupsContext
GroupsContext groupsContext;
groupsContext.groups = (MyGroup *)palloc(sizeof(MyGroup) * 30000); // problem 1Initial capacity
groupsContext.numGroups = 0;
groupsContext.capacity = 30000;
// Process SPI results
for (i = 0; i < SPI_processed; i++) {
//HeapTuple tuple = SPI_tuptable->vals[i];
//TupleDesc tupdesc = SPI_tuptable->tupdesc;
//elog(INFO, "SPI current id is -- %d", i);
int attnum1 = SPI_fnumber(SPI_tuptable->tupdesc, "l_suppkey");
int attnum2 = SPI_fnumber(SPI_tuptable->tupdesc, "l_returnflag_int");
int attnum3 = SPI_fnumber(SPI_tuptable->tupdesc, "l_quantity");
char* value1 = SPI_getvalue((SPI_tuptable->vals)[i], SPI_tuptable->tupdesc, attnum1);
char* value2 = SPI_getvalue((SPI_tuptable->vals)[i], SPI_tuptable->tupdesc, attnum2);
char* value3 = SPI_getvalue((SPI_tuptable->vals)[i], SPI_tuptable->tupdesc, attnum3);
//elog(INFO, "SPI l_suppkey -- %d", attnum1);
//elog(INFO, "SPI l_returnflag_int -- %d", attnum2);
//elog(INFO, "SPI quantity -- %d", attnum3);
//elog(INFO, "SPI l_suppkey -- %s", value1);
//elog(INFO, "SPI l_returnflag_int -- %s", value2);
//elog(INFO, "SPI quantity -- %s", value3);
int l_suppkey = atoi(value1);
int l_returnflag_int = atoi(value2);
int quantity = atoi(value3);
//int l_suppkey = DatumGetInt32(SPI_getbinval(tuple, tupdesc, 1, NULL));
//int l_returnflag_int = DatumGetInt32(SPI_getbinval(tuple, tupdesc, 2, NULL));
//int quantity = DatumGetInt32(SPI_getbinval(tuple, tupdesc, 3, NULL));
//elog(INFO, "SPI l_suppkey -- %d", l_suppkey);
//elog(INFO, "SPI l_returnflag_int -- %d", l_returnflag_int);
//elog(INFO, "SPI quantity -- %d", quantity);
MyGroup *group = findOrCreateGroup(&groupsContext, l_suppkey, l_returnflag_int);
addQuantityToGroup(group, quantity);
//elog(INFO, "group l_suppkey is %d", group->l_suppkey);
//elog(INFO, "group l_returnflag_int is %d", group->l_returnflag_int);
}
elog(INFO, "Finish adding");
// Process each group: calculate random sample average and store results
srand(time(NULL)); // Initialize random seed
int j;
for (j = 0; j < groupsContext.numGroups; j++) {
elog(INFO, "SPI j is -- %d", j);
MyGroup *group = &groupsContext.groups[j];
float4 avg_l_quantity = calculateRandomSampleAverage(group->quantities, group->count);
Datum values[3];
bool nulls[3] = {false, false, false};
values[0] = Int32GetDatum(group->l_suppkey);
values[1] = Int32GetDatum(group->l_returnflag_int);
values[2] = Float4GetDatum(avg_l_quantity);
//values[0] = group->l_suppkey;
//values[1] = group->l_returnflag_int;
//values[2] = avg_quantity;
elog(INFO, "l_suppkey is %d",values[0]);
elog(INFO, "l_returnflag_int is %d",values[1]);
elog(INFO, "avg_l_quantity is %f",avg_l_quantity);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
tuplestore_donestoring(tupstore);
// Cleanup
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupdesc;
rsinfo->returnMode = SFRM_Materialize;
SPI_finish();
PG_RETURN_NULL();
}
I could generate the correct results until tuplestore_putvalues(tupstore, tupdesc, values, nulls); So I wonder whether there is something wrong?