Is it a good idea to write big data trough Trino?

285 Views Asked by At

Me and my team are creating a new big data engine, reading big amounts of data from Kafka (and other sources), performing some operations on the data and finally writing to S3 compatible storage (e.g. MinIO) in Iceberg format (all this in Apache Spark). From there data can be queried e.g. using Trino by external services.

My collegue came up with an idea to also WRITE all data trough Trino (Spark -> Trino API -> Storage), because then if we had to switch e.g. from Iceberg to Hudi (e.g. functionality or licensing related problems), we could do that easily (all you have to do is change Trino connector I guess).

I'm a bit concerned/skeptical because it seems to me that Trino is advertised more as "reading" solution (for quick data extraction queries) and not exactly for transforming/writing data. Two major concernes are:

  1. Not a lot of resources/examples online with this kind of architecture, feels like everyone is joining Spark + Iceberg directly (feels like this solutions are almost designed to DIRECTLY work with eachother), which might significantly extend our development time (which is very limited).
  2. Performance, which is a very important factor in big data solutions. It is hard to predict how much our performance will suffer after putting Trino on the "write side".

Do you guys see any real benefits of putting Trino on the "write side" of the solution?

Would you go for something like this in your own architecture or stick with standard, direct approach between Spark and Iceberg?

4

There are 4 best solutions below

0
Himadri Pal On

As you already know both trino and spark are query engines.

One can only write to trino using spark-jdbc from spark job. JDBC connections (as for many other databases) has limit on number of allowed connections using JDBC. Writing a heavy dataframe using jdbc may need lot of optimization and tuning on both spark job and in trino engine side well.

Whereas using spark provided connectors or using spark engine would not in most cases hit these limit upto the limit of the underlying storage/databses. Using spark engine to write also gives the developer, flexibility to play around with partitions, executor cores & instances to get the desired performance.

From standardization point of view, one might argue that using trino as SQL interface to interact all the underlying databases is beneficial as the code need not change when underlying storage/database is changed but same can be said for spark-sql and/or dataframe or dataset spark API.

Changing from existing storage to Hudi or Cassandra (another database example) only needs set of database specific properties to be specified either as options or as sparkConf.

7
Pavel Orekhov On

I think it's a bad idea, I've never heard of this as a pattern, the code I can imagine may also look complicated. You should use Spark to write into Iceberg tables and Trino should be given to data analysts to query Iceberg tables. Also, Spark supports more Iceberg features than Trino, for example there are more metadata tables available when using Spark, there are more maintenance actions, etc. Trino is mostly good for ad-hoc subsecond queries, which Spark is bad at.

Spark also has optimizations for writing dataframes into Iceberg, it has different distribution modes and allows you to easily control the sizes of files that get written.

If you're reading from Kafka, you will probably also have streaming workloads, Trino does not support streaming. I don't think you can create a streaming dataframe and use the jdbc sink to write it into Trino. As a workaround maybe it's possible to use foreachBatch and write the current microbatch as a non-streaming df into Trino, but you will lose exactly-once guarantees.

You can also write your code in such a way that you separate Spark's dataframe transformation logic, from the logic of writers, so create a Hudi writer an Iceberg writer which you can switch between, it's all a matter of how you write your code.

As for changing table formats, I don't think you should change from Iceberg to Hudi or Delta lake. Iceberg is the native table format for Trino, this is the table format that Trino community prioritizes, Hudi and Delta lake are second-class citizens for Trino. It seems like you definitely plan to use Trino, that means that you should stick to Iceberg. You can even open the doc for the Hudi Trino connector and you'll see that there are barely any features supported: https://trino.io/docs/current/connector/hudi.html Compare that to Iceberg: https://trino.io/docs/current/connector/iceberg.html

Btw, regarding Hudi, you should never choose that table format, because it does not support ACID. They have said it for years that their table format supports ACID, but it isn't the case and you can read about this here: https://tabular.io/blog/iceberg-hudi-acid-guarantees/

The consequence of Hudi not supporting ACID is the fact that you will have data duplication or data loss when using Hudi.

Delta lake connector seems like it's supported a bit better than Hudi, but it is still not up to par feature-wise with Iceberg. Furthermore, using the open source version of Delta lake, you will find that there are inconveniencies/short-comings, compared to using Iceberg.

You cannot safely perform concurrent transactions in open source Delta lake, because there's no locking mechanism to avoid race conditions between parallel transactions. I heard that if you use AWS there's some dynamoDB locking mechanism, but if you're not on AWS, there's nothing. Also, we don't know how that locking mechanism works, it may just fail one of 2 concurrent transactions, instead of deconflicting them as Iceberg does with its rest catalog. So, we can basically say that open source delta lake does not support ACID. Because 2 concurrent transactions can lead to data loss.

Another shortcoming of Delta lake (and Hudi) is that it doesn't have partition evolution, which I think is a really cool feature, and hidden partitioning (you can read about it in iceberg docs).

Iceberg also allows you to choose different file formats: avro, parquet, orc, in Delta lake you can only use parquet. Hudi also supports orc and parquet but not avro from what I remember.

Also, when I tested these table formats several months ago regarding schema evolution, I had a stuct<field1: array<struct<field1: Int, field2: Int, ...>>> and I wanted to add another field into the inner struct and only Iceberg supported that. Not sure how it is now.

I've played around with all three of these table formats: Hudi, Iceberg and Delta lake. Iceberg according to my testing is the best table format if you use Spark+Trino. Delta lake is probably great if you use databricks, because they have proprietary enhancements, and I heard it's really fast on databricks and unity catalog allows you to safely perform parallel transactions, etc.

As for Hudi, besides the very annoyingly crazy fact that it doesn't support ACID, I honestly do not like that table format at all, it is overloaded with features and it's very complicated to use, there are a million settings you have to set, for each specific use case, if you want to write a non-partitioned table you specify one bunch of settings, if you want to write a partitioned table, you have to specify another bunch of settings. Look at this code example: https://hudi.apache.org/docs/writing_data#insert-overwrite-table

You will see that you have to set a bunch non-intuitive options to overwrite a table. Iceberg and Delta lake are orders of magnitude simpler to use.

In fact here's a list of all Hudi settings, there are a million settings there: https://hudi.apache.org/docs/configurations

Compare that to the list of Iceberg settings: https://iceberg.apache.org/docs/latest/configuration/

The architecture of Hudi is also very complicated, and if you read the design doc, you will have a lot of questions that are unanswered in that doc, whereas Delta lake and Iceberg follow very simple architecture that's why they are more stable and are less error-prone.

Hudi is always the last table format to support the latest version of Spark, you can see that from this table: https://hudi.apache.org/docs/quick-start-guide/#spark-3-support-matrix

Both Iceberg and Delta lake support Spark 3.5.0, Hudi just recently added support for Spark 3.4.0...

You will also see quite a few data loss issues in Hudi's github.

When I was checking out Hudi, I was appealed by it because I read that you could configure table services that run inside your Spark jobs and expire snapshots and compact small files, in Iceberg you have to run parallel maintenance jobs in order to do that. Also, in Iceberg you have to pause your streaming job, if it is not append-only and you want to do compaction on it, otherwise there will be a lot of commit retries, because the compaction job and the streaming job are modifying the same set of files and are stepping on each others' toes. But all these benefits of hoodie were dwarfed when I saw how unnecessarily complicated that table format was, and when I was following the code examples from their docs I was able to set up a streaming job, but couldn't set up the asynchronous table service for compacting small files, there were cryptic errors that even the people in hudi's slack channel were unable to help me with. In Iceberg and Delta lake, due to their simplicity things just work.

Also, in Hudi, if you have 2 Spark jobs writing to the same table at the same time, it leads to data loss. There are several issues about this that I've seen, this one, for example: https://github.com/apache/hudi/issues/9674.

I think you are on the right track with your current architecture, keep using Iceberg, write with Spark, read with Trino. Btw, for Iceberg (if you use open source Spark+Trino) you should use Nessie or Rest catalog, other catalogs suck (they are buggy) and are not supported as much as the Nessie/Rest catalogs (have fewer features). If you want a free Rest catalog, you may want to check out gravitino: https://github.com/datastrato/gravitino If you want nessie, check out their github: https://github.com/projectnessie/nessie

Iceberg also has its quirks, though, for example, if you DROP a column named "col" in Iceberg and add back a column with the same name "col", you don't get back the data that used to be there, this is a SQL compliant behavior, which Hudi doesn't support and in Delta lake it's an opt-in feature from what I remember. There's a gotcha with this, if you DROP a column by accident, you can't recover its data anymore through standard SQL, you have to re-register the older metadata.json file of the that that still has that older column with the correct column id.

Another interesting thing I learned about Iceberg is that Iceberg table's schema is not tied to snapshots, so, if you roll back to one of the previous snapshots, you don't get the old schema that you used to have, Delta lake on the other hand will also roll back the schema. This feature is a double-edged sword, at first glance it may be unintuitive, but imagine a situation where you evolved your schema in Flink/Spark writers and decided to roll back to an older snapshot, in Delta lake the Flink/Spark writers will break, because it rolled back the schema, whereas Iceberg will work in this case.

Bottom line is, on databricks definitely use Delta lake, if you use Spark+Trino then definitely use Iceberg, and as for Hudi use it at your own risk, I wouldn't recommend it.

0
Atif On

Apart from above mentioned points which points to that writing data to Iceberg table using Trino is a bad idea, also another drawback is that Trino currently only supports Merge-On-Read, which means that if you are planning to do update/Delete/Merge on your data read will be slow as Copy-on-Write option is currently not supported by Trino.

0
Timo On

If you already use Spark for Kafka and transformations, then I would suggest using it also for writing the Iceberg tables.

That said, Trino is perfectly capable of writing and upserting Iceberg tables. But you can't really do it via the Trino REST API i.e running queries like INSERT INTO mytable VALUES(all data here). as it slow and does not scale that well.

However, you could upload a batch of data to S3, register it as a Hive table and merge data into Iceberg from that i.e INSERT INTO iceberg.myschema.mytable SELECT * FROM hive.myschema.mybatch. Upserts via MERGE INTO work fine too. Of course, attention must be paid to use good partitioning strategies and search conditions so workers can parallelize the work reasonably.

I don't know how performance compares to Spark and how does it scale with Spark and Trino cluster sizes respectively.