I am trying to pivot a table in ksql, but I could not find any documentation on the topic.
Currently, I create this table:
CREATE OR REPLACE TABLE WINDOWED_PARAMS WITH (
kafka_topic='device-parameter-windowed',
value_format='protobuf'
)
AS SELECT
deviceId,
parameterId,
AS_VALUE(deviceId) did,
AS_VALUE(parameterId) pid,
MIN(value) AS min,
MAX(value) AS max,
AVG(value) AS avg,
HISTOGRAM(state) AS hist,
FROM_UNIXTIME(WINDOWSTART) as wstart,
FROM_UNIXTIME(WINDOWEND) as wend,
FROM_UNIXTIME(max(ROWTIME)) as wemit
FROM MQTT_REKEYED
WINDOW TUMBLING (SIZE 60 SECONDS, RETENTION 1 DAYS)
GROUP BY deviceId, parameterId
EMIT CHANGES;
As an example, I have exported a chunk of data and imported into a dataframe.
df = pandas.read_parquet(data)
print(df.head(3))
The data looks like this
DID PID MIN MAX AVG HIST WSTART WEND WEMIT
0 mqttx_cdd7d7fe_2 6 0.00 96.17 53.316667 [{'key': 'REPLACE', 'value': 1}, {'key': 'OK',... 2023-12-16 01:02:00+00:00 2023-12-16 01:03:00+00:00 2023-12-16 01:02:28.576000+00:00
1 mqttx_cdd7d7fe_3 1 21.82 58.52 38.246667 [{'key': 'OK', 'value': 3}] 2023-12-16 01:02:00+00:00 2023-12-16 01:03:00+00:00 2023-12-16 01:02:28.577000+00:00
2 mqttx_cdd7d7fe_1 3 24.82 52.32 42.066667 [{'key': 'OK', 'value': 3}] 2023-12-16 01:02:00+00:00 2023-12-16 01:03:00+00:00 2023-12-16 01:02:29.375000+00:00
Now I can pivot the data frame
df = df.pivot_table(index=["WSTART", "WEND", "DID"], columns=["PID"], values=["AVG"])
print(df.head(9))
And then I get the data like this. You can see how there are groups for each time window and all the parameters for one device in a single row.
PID 1 2 3 4 5 6 7 8
WSTART WEND DID
2023-12-16 01:02:00+00:00 2023-12-16 01:03:00+00:00 mqttx_cdd7d7fe_1 45.887101 35.694159 35.597056 39.086250 40.344830 23.980508 52.540083 36.363794
mqttx_cdd7d7fe_2 45.829092 58.929444 48.002167 44.444980 49.314799 50.818738 23.622372 45.628124
mqttx_cdd7d7fe_3 33.965083 55.536380 42.390214 27.456479 56.258364 49.469824 56.277753 74.851500
2023-12-16 01:03:00+00:00 2023-12-16 01:04:00+00:00 mqttx_cdd7d7fe_1 40.987696 56.532549 33.920771 22.169299 38.082033 46.716898 65.882501 18.361611
mqttx_cdd7d7fe_2 20.690082 50.694250 53.984836 48.592554 59.548939 35.242335 71.439792 39.998781
mqttx_cdd7d7fe_3 20.425159 16.145967 14.912235 23.548500 34.238898 39.995365 45.996767 61.638200
2023-12-16 01:04:00+00:00 2023-12-16 01:05:00+00:00 mqttx_cdd7d7fe_1 54.938980 46.150033 42.245640 48.946073 62.512500 49.871040 46.898542 24.046995
mqttx_cdd7d7fe_2 21.143902 58.522889 42.975378 64.022450 36.715628 35.660963 60.447972 69.555625
mqttx_cdd7d7fe_3 51.007068 37.408933 56.265423 51.058452 47.630560 46.783079 42.090840 57.331373
I want to do the pivoting in ksql, instead of pandas. Can this be done and if so, how?