Using Swarms of ClickHouse® Servers for High-Performance Reads of Parquet

Stanislaw Lem in his novel “The Invincible” predicted that a swarm of bee-sized trivial robots can be (self-)organized in such powerful structures that defeat the most advanced machines humans can build. A similar idea is gaining popularity in data processing as well. It is tempting to replace vast, monolithic database systems with swarms of compute instances that are provisioned on demand, do a job, and disappear when not needed.
(AI-generated image above from https://stablediffusionweb.com/fr/image/15794465-monochrome-graphic-of-mycelial-swarm)
Imagine you’ve got a lot of data in Parquet files stored on S3, and you need to query that data fast. What are the tools you can use for that? Probably ClickHouse® is not the first one that pops up in your mind, but maybe you are in for a pleasant surprise. While originally designed as a database for locally stored data, ClickHouse has evolved into a universal compute engine that can work nicely with external data like Parquet on S3.
A few months ago we discussed how to read external Parquet data from ClickHouse. In the follow-up article we introduced some performance optimizations in ClickHouse, and also a working prototype for metadata cache. That helped to speed up Parquet read performance 10 times. In this article, we will take the next step and show how to further improve Parquet query performance using ClickHouse clustering capabilities.
Reintroducing the Problem
We will keep using the AWS public blockchain dataset that is located in the following S3 bucket:
- s3://aws-public-blockchain/v1.0/btc/
The bucket contains multiple Parquet files with the following path pattern:
- blocks/date={YYYY-MM-DD}/{id}.snappy.parquet
- transactions/date={YYYY-MM-DD}/{id}.snappy.parquet
As of November 2024, the dataset contains more than 1.1B rows in over 6000 Parquet files. The compressed data size is 1.5TB.
In order to query Parquet files we use the convenient s3() table function:
SELECT uniq(_path), count()
FROM s3('s3://aws-public-blockchain/v1.0/btc/transactions/**.parquet', NOSIGN)
┌─uniq(_path)─┬───count()──┐
│ 6035 │ 1108445678 │
└─────────────┴────────────┘
1 row in set. Elapsed: 27.524 sec
We will also run simple analytic queries on top of Parquet data. In particular, we will test if WHERE
conditions are handled efficiently by ClickHouse:
SELECT date, sum(output_count)
FROM s3('s3://aws-public-blockchain/v1.0/btc/transactions/**.parquet', NOSIGN)
WHERE date>='2024-01-01'
GROUP BY date ORDER BY date
304 rows in set. Elapsed: 30 sec.
The query takes 30 seconds. It is slow, because ClickHouse needs to check every Parquet file. There are multiple optimizations possible in order to skip objects early and improve query time. We have already discussed how adding a metadata cache can help in our Building a Better ClickHouse for Parquet in Practice article. Since then, there is a new feature in ClickHouse that can help in this particular query (though it may not help in the generic case).
This is hive-style partitioning. In order to use it, the bucket URL should contain a ‘name=value’ pairs pattern. For blockchain data we have it as 'date={YYYY-MM-DD}'
. Then it should be enabled by adding the use_hive_partitioning
setting to the query as follows:
SELECT date, sum(output_count)
FROM s3( 's3://aws-public-blockchain/v1.0/btc/transactions/date=*/*.parquet', NOSIGN)
WHERE date>='2024-01-01'
GROUP BY date ORDER BY date
SETTINGS use_hive_partitioning=1
304 rows in set. Elapsed: 4.829 sec.
This query completes under 5 seconds on a single node! Of course this trick will only work for dates. Other columns cannot be used.
Note: Hive-style partitioning was first added to ClickHouse in 24.8 version and is fully functional in 24.9.
Now let’s see if we can get even better performance with a ClickHouse cluster.
Distributed Access to S3 buckets
For this experiment, we launched a four-node cluster using the same quite aged m6g.8xlarge
AWS machine type as for the single node. It is not a swarm of cheap compute nodes yet, but it will allow us to compare single and multi-node performance properly. Note that we do not need to create any tables and schema; the nodes are totally stateless, and even cheap spot instances can be used. The state is in the Parquet files on S3.
ClickHouse clustering capabilities are designed for the setup where the data is stored in MergeTree tables and queried via the Distributed table engine. With Parquet data stored on S3, we cannot use it fully yet. In order to distribute reads from S3 across cluster nodes there is the s3cluster()
table function. See our article Tips for High-Performance ClickHouse® Clusters with S3 Object Storage for more detail. Unfortunately, as we will demonstrate below, while the s3cluster()
table function is great for brute force scans and data imports, it is not very efficient for queries. In particular, it cannot apply filters effectively.
So let’s see the baseline performance first, and then see if it can be improved.
Reading Parquet in a Cluster
Let’s start with a simple s3Cluster()
query:
SELECT date, sum(output_count)
FROM s3Cluster('parquet-cluster', 's3://aws-public-blockchain/v1.0/btc/transactions/**.parquet', NOSIGN)
WHERE date>='2024-01-01'
GROUP BY date ORDER BY date
304 rows in set. Elapsed: 28 sec.
As you can see, it is almost as slow as a one-node cluster. The difference is insignificant.
As we showed above, hive-style partitioning gives a huge effect on a single node. However, if we try it with s3Cluster()
we fall back to the worst case performance!
SELECT date, sum(output_count)
FROM s3Cluster('parquet-cluster', 's3://aws-public-blockchain/v1.0/btc/transactions/date=*/*.parquet', NOSIGN)
WHERE date>='2024-01-01'
GROUP BY date ORDER BY date
SETTINGS use_hive_partitioning=1
304 rows in set. Elapsed: 26.128 sec.
Unfortunately, this is true for all s3Cluster()
queries with WHERE
conditions. It's bad enough that single-node queries may even run faster in some cases!
In order to understand why it is happening this way, let’s remember how s3Cluster()
works. First, the query initiator node retrieves a list of objects from a S3 bucket to be processed, then it distributes objects to the cluster nodes in chunks, so every shard reads its own set of objects, and only then does it apply any filters. Obviously, all WHERE
conditions are applied too late to be useful.
We have to find a different way to push conditions down to the distributed S3 query properly.
Pushing Query Conditions to S3 Cluster Query
In order to execute distributed query to S3 efficiently, there are two possible approaches:
- Apply filters on a query initiator node when retrieving a list of objects BEFORE sending those to cluster nodes. This is probably an easy fix for ClickHouse.
- Send filters to nodes, and let them apply those when reading objects. This is how a distributed table works.
We cannot really test the first approach without making code changes to ClickHouse itself, but we can do a trick to emulate the second one. Let’s divide all bucket objects into groups for each cluster node as follows:
On the first shard:
CREATE OR REPLACE VIEW s3_public_blockchain_shard
AS SELECT date, * EXCEPT(date) FROM s3('s3://aws-public-blockchain/v1.0/btc/transactions/date=*-*-{01,05,09,13,17,21,25,29}/*.parquet', NOSIGN)
On the second shard:
CREATE OR REPLACE VIEW s3_public_blockchain_shard
AS SELECT date, * EXCEPT(date) FROM s3('s3://aws-public-blockchain/v1.0/btc/transactions/date=*-*-{02,06,10,14,18,22,26,30}/*.parquet', NOSIGN)
On the third:
CREATE OR REPLACE VIEW s3_public_blockchain_shard
AS SELECT date, * EXCEPT(date) FROM s3('s3://aws-public-blockchain/v1.0/btc/transactions/date=*-*-{03,07,11,15,19,23,27,31}/*.parquet', NOSIGN)
And finally on the last one:
CREATE OR REPLACE VIEW s3_public_blockchain_shard
AS SELECT date, * EXCEPT(date) FROM s3('s3://aws-public-blockchain/v1.0/btc/transactions/date=*-*-{04,08,12,16,20,24,28}/*.parquet', NOSIGN)
As you can see, the main trick is to distribute objects evenly across shards in advance. Fortunately, we have some a priori knowledge about the objects’ naming convention, and split data between shards using a date pattern.
Note that we had to extract the date
column explicitly in the view, otherwise ClickHouse would fail to query it via a distributed table, throwing an error about a missing column. This looks like a ClickHouse bug.
Received exception from server (version 24.9.2):
Code: 47. DB::Exception: Received from parquet-cluster.demo.altinity.cloud:9440. DB::Exception: Received from chi-parquet-cluster-parquet-cluster-0-0:9000. DB::Exception: Unknown expression identifier `date` in scope SELECT date FROM (SELECT * FROM s3('s3://aws-public-blockchain/v1.0/btc/transactions/date=*-*-{01,05,09,13,17,21,25,29}/*.parquet', 'NOSIGN')). (UNKNOWN_IDENTIFIER)
Now we can use the cluster()
table function, which builds a distributed query on the fly:
SELECT date, sum(output_count)
FROM cluster('parquet-cluster', default.s3_public_blockchain_shard)
WHERE date>='2024-01-01'
GROUP BY date ORDER BY date
SETTINGS allow_experimental_analyzer=0;
304 rows in set. Elapsed 9.9 sec.
It is much better than the 28s of s3Cluster()
! What if we enable hive partitioning?
SELECT date, sum(output_count)
FROM cluster('parquet-cluster', default.s3_public_blockchain_shard)
WHERE date>='2024-01-01'
GROUP BY date ORDER BY date
SETTINGS use_hive_partitioning=1, allow_experimental_analyzer=0;
304 rows in set. Elapsed 2.6 sec.
And here we are back to business with under 3s query time! With this trick we have boosted distributed query performance 10 times!
Note that the analyzer needs to be turned OFF. Unfortunately, even in ClickHouse 24.9 it sometimes does not work correctly. In this case, if you turn it on the performance will degrade dramatically, since predicates will be applied on the view, and not on the underlying S3 query. For example, the previous query instead of 2.6s would take 18 seconds! There are multiple related issues in ClickHouse Github repo, e.g. #66878, #68030, #67668.
Here is a summary chart with all the timings for those who prefer a picture over a lot of words:
Future Work
The s3Cluster()
table function does not work for efficient queries. It is designed to parallelize reads, but it lacks other useful query time features that have been designed around its single-node prototype. In particular, s3Cluster(
) does not apply filtering efficiently at all.
A better implementation would apply filtering earlier in the pipeline, probably using metadata cache to avoid excessive list operations, and also push filters down to cluster nodes. It should perform as much work as possible on nodes. In fact, it should work very similar to how a distributed table engine works, but over S3 data. In this case we can expect linear scalability with the number of cluster nodes.
In general, ClickHouse lacks usability when querying objects from S3 in a cluster mode. Switching between non-cluster and cluster functions is inconvenient. The development team has already planned several improvements to make it better (for example, #65024 and #70174), but it may require more thought.
Conclusion
As the demand for high performance analytics on remote data increases, developers look for new tools or new uses for existing ones. Being the best open source analytics database, ClickHouse can not stand aside from this, and it does not. A lot of improvements to Parquet query performance have been recently implemented, many of them contributed by Altinity engineers (the latest example is bloom filter support), as well as friends throughout the ClickHouse community. But improving single-node performance is not sufficient. As the size of datasets grows, more compute power will be needed to query it fast.
This is where ClickHouse has a lot of potential to grow. It delivers outstanding query performance in large scale distributed systems, but lacks the same set of features when data is stored remotely in open formats like Parquet. The dream about swarms of ClickHouse nodes is just a dream yet.
But hold on. Many engineers are working on this problem. Keep reading our blog to learn how ClickHouse will advance.
Originally published at https://altinity.com on November 4, 2024.