pivot windowed table in ksql

27 Views Asked by At

I am trying to pivot a table in ksql, but I could not find any documentation on the topic.

Currently, I create this table:

CREATE OR REPLACE TABLE WINDOWED_PARAMS WITH (
  kafka_topic='device-parameter-windowed',
  value_format='protobuf'
)
AS SELECT
  deviceId,
  parameterId,
  AS_VALUE(deviceId) did,
  AS_VALUE(parameterId) pid,
  MIN(value) AS min,
  MAX(value) AS max,
  AVG(value) AS avg,
  HISTOGRAM(state) AS hist,
  FROM_UNIXTIME(WINDOWSTART) as wstart,
  FROM_UNIXTIME(WINDOWEND) as wend,
  FROM_UNIXTIME(max(ROWTIME)) as wemit
FROM MQTT_REKEYED
WINDOW TUMBLING (SIZE 60 SECONDS, RETENTION 1 DAYS)
GROUP BY deviceId, parameterId
EMIT CHANGES;

As an example, I have exported a chunk of data and imported into a dataframe.

df = pandas.read_parquet(data)
print(df.head(3))

The data looks like this

                DID PID    MIN    MAX        AVG                                               HIST                    WSTART                      WEND                            WEMIT
0  mqttx_cdd7d7fe_2   6   0.00  96.17  53.316667  [{'key': 'REPLACE', 'value': 1}, {'key': 'OK',... 2023-12-16 01:02:00+00:00 2023-12-16 01:03:00+00:00 2023-12-16 01:02:28.576000+00:00
1  mqttx_cdd7d7fe_3   1  21.82  58.52  38.246667                        [{'key': 'OK', 'value': 3}] 2023-12-16 01:02:00+00:00 2023-12-16 01:03:00+00:00 2023-12-16 01:02:28.577000+00:00
2  mqttx_cdd7d7fe_1   3  24.82  52.32  42.066667                        [{'key': 'OK', 'value': 3}] 2023-12-16 01:02:00+00:00 2023-12-16 01:03:00+00:00 2023-12-16 01:02:29.375000+00:00

Now I can pivot the data frame

df = df.pivot_table(index=["WSTART", "WEND", "DID"], columns=["PID"], values=["AVG"])
print(df.head(9))

And then I get the data like this. You can see how there are groups for each time window and all the parameters for one device in a single row.

PID                                                                           1          2          3          4          5          6          7          8
WSTART                    WEND                      DID                                                                                                     
2023-12-16 01:02:00+00:00 2023-12-16 01:03:00+00:00 mqttx_cdd7d7fe_1  45.887101  35.694159  35.597056  39.086250  40.344830  23.980508  52.540083  36.363794
                                                    mqttx_cdd7d7fe_2  45.829092  58.929444  48.002167  44.444980  49.314799  50.818738  23.622372  45.628124
                                                    mqttx_cdd7d7fe_3  33.965083  55.536380  42.390214  27.456479  56.258364  49.469824  56.277753  74.851500
2023-12-16 01:03:00+00:00 2023-12-16 01:04:00+00:00 mqttx_cdd7d7fe_1  40.987696  56.532549  33.920771  22.169299  38.082033  46.716898  65.882501  18.361611
                                                    mqttx_cdd7d7fe_2  20.690082  50.694250  53.984836  48.592554  59.548939  35.242335  71.439792  39.998781
                                                    mqttx_cdd7d7fe_3  20.425159  16.145967  14.912235  23.548500  34.238898  39.995365  45.996767  61.638200
2023-12-16 01:04:00+00:00 2023-12-16 01:05:00+00:00 mqttx_cdd7d7fe_1  54.938980  46.150033  42.245640  48.946073  62.512500  49.871040  46.898542  24.046995
                                                    mqttx_cdd7d7fe_2  21.143902  58.522889  42.975378  64.022450  36.715628  35.660963  60.447972  69.555625
                                                    mqttx_cdd7d7fe_3  51.007068  37.408933  56.265423  51.058452  47.630560  46.783079  42.090840  57.331373

I want to do the pivoting in ksql, instead of pandas. Can this be done and if so, how?

0

There are 0 best solutions below