Skip to main content
Skip to main content

Parallel Replicas

Beta feature. Learn more.

Introduction

ClickHouse processes queries extremely quickly, but how are these queries distributed and parallelized across multiple servers?

In this guide, we will first discuss how ClickHouse distributes a query across multiple shards via distributed tables, and then how a query can leverage multiple replicas for its execution.

Sharded architecture

In a shared-nothing architecture, clusters are commonly split into multiple shards, with each shardA subset of data. ClickHouse always has at least one shard for your data. If you do not split the data across multiple servers, your data will be stored in one shard. Sharding data across multiple servers can be used to divide the load if you exceed the capacity of a single server. containing a subset of the overall data. A distributed tableA distributed table in ClickHouse is a special type of table that does not store data itself but provides a unified view for distributed query processing across multiple servers in a cluster. sits on top of these shards, providing a unified view of the complete data.

Reads can be sent to the local table. Query execution will occur only on the specified shardA subset of data. ClickHouse always has at least one shard for your data. If you do not split the data across multiple servers, your data will be stored in one shard. Sharding data across multiple servers can be used to divide the load if you exceed the capacity of a single server., or it can be sent to the distributed tableA distributed table in ClickHouse is a special type of table that does not store data itself but provides a unified view for distributed query processing across multiple servers in a cluster., and in that case, each shardA subset of data. ClickHouse always has at least one shard for your data. If you do not split the data across multiple servers, your data will be stored in one shard. Sharding data across multiple servers can be used to divide the load if you exceed the capacity of a single server. will execute the given queries. The server where the distributed table was queried will aggregate the data and respond to the client:

The figure above visualizes what happens when a client queries a distributed tableA distributed table in ClickHouse is a special type of table that does not store data itself but provides a unified view for distributed query processing across multiple servers in a cluster.:

  1. The select query is sent to a distributed tableA distributed table in ClickHouse is a special type of table that does not store data itself but provides a unified view for distributed query processing across multiple servers in a cluster. on a node arbitrarily (via a round-robin strategy or after being routed to a specific server by a load balancer). This node is now going to act as a coordinator.

  2. The node will locate each shardA subset of data. ClickHouse always has at least one shard for your data. If you do not split the data across multiple servers, your data will be stored in one shard. Sharding data across multiple servers can be used to divide the load if you exceed the capacity of a single server. that needs to execute the query via the information specified by the distributed tableA distributed table in ClickHouse is a special type of table that does not store data itself but provides a unified view for distributed query processing across multiple servers in a cluster., and the query is sent to each shardA subset of data. ClickHouse always has at least one shard for your data. If you do not split the data across multiple servers, your data will be stored in one shard. Sharding data across multiple servers can be used to divide the load if you exceed the capacity of a single server..

  3. Each shardA subset of data. ClickHouse always has at least one shard for your data. If you do not split the data across multiple servers, your data will be stored in one shard. Sharding data across multiple servers can be used to divide the load if you exceed the capacity of a single server. reads, filters, and aggregates the data locally and then sends back a mergeable state to the coordinator.

  4. The coordinating node merges the data and then sends the response back to the client.

When we add replicas into the mix, the process is fairly similar, with the only difference being that only a single replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers. from each shardA subset of data. ClickHouse always has at least one shard for your data. If you do not split the data across multiple servers, your data will be stored in one shard. Sharding data across multiple servers can be used to divide the load if you exceed the capacity of a single server. will execute the query. This means that more queries can then be processed in parallel.

Non-sharded architecture

ClickHouse Cloud has a very different architecture to the one presented above. (See "ClickHouse Cloud Architecture" for more details). With separation of compute and storage, and with virtually an infinite amount of storage, the need for shards becomes less important.

The figure below shows the ClickHouse Cloud architecture:

This architecture allows us to be able to add and remove replicas nearly instantaneously, ensuring a very high clusterA collection of nodes (servers) that work together to store and process data. scalability. The ClickHouse Keeper clusterA collection of nodes (servers) that work together to store and process data. (shown right) ensures that we have a single source of truth for the metadata. Replicas can fetch the metadata from the ClickHouse Keeper clusterA collection of nodes (servers) that work together to store and process data. and all maintain the same data. The data themselves are stored in object storage, and the SSD cache allows us to speed up queries.

But how can we now distribute query execution across multiple servers? In a sharded architecture, it was fairly obvious given that each shardA subset of data. ClickHouse always has at least one shard for your data. If you do not split the data across multiple servers, your data will be stored in one shard. Sharding data across multiple servers can be used to divide the load if you exceed the capacity of a single server. could actually execute a query on a subset of the data. How does it work when there is no sharding?

Introducing parallel replicas

To parallelize query execution through multiple servers, we first need to be able to assign one of our servers as a coordinator. The coordinator is the one that creates the list of tasks that need to be executed, ensures they are all executed, aggregated and that the result is returned to the client. Like in most distributed systems, this will be the role of the node that receives the initial query. We also need to define the unit of work. In a sharded architecture, the unit of work is the shardA subset of data. ClickHouse always has at least one shard for your data. If you do not split the data across multiple servers, your data will be stored in one shard. Sharding data across multiple servers can be used to divide the load if you exceed the capacity of a single server., a subset of the data. With parallel replicas we will use a small portion of the table, called granules, as the unit of work.

Now, let's see how it works in practice with the help of the figure below:

With parallel replicas:

  1. The query from the client is sent to one node after going through a load balancer. This node becomes the coordinator for this query.

  2. The node analyzes the index of each part, and selects the right partsA physical file (or directory) on disk that stores a portion of the table's data. This is different from a partition, which is a logical division of a table's data that is created using a partition key. and granules to process.

  3. The coordinator splits the workload into a set of granules that can be assigned to different replicas.

  4. Each set of granules gets processed by the corresponding replicas and a mergeable state is sent to the coordinator when they are finished.

  5. Finally, the coordinator merges all the results from the replicas and then returns the response to the client.

The steps above outline how parallel replicas work in theory. However, in practice, there are a lot of factors that can prevent such logic from working perfectly:

  1. Some replicas may be unavailable.

  2. Replication in ClickHouse is asynchronous, some replicas might not have the same partsA physical file (or directory) on disk that stores a portion of the table's data. This is different from a partition, which is a logical division of a table's data that is created using a partition key. at some point in time.

  3. Tail latency between replicas needs to be handled somehow.

  4. The filesystem cache varies from replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers. to replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers. based on the activity on each replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers., meaning that a random task assignment might lead to less optimal performance given the cache locality.

We explore how these factors are overcome in the following sections.

Announcements

To address (1) and (2) from the list above, we introduced the concept of an announcement. Let's visualize how it works using the figure below:

  1. The query from the client is sent to one node after going through a load balancer. The node becomes the coordinator for this query.

  2. The coordinating node sends a request to get announcements from all the replicas in the clusterA collection of nodes (servers) that work together to store and process data.. Replicas may have slightly different views of the current set of partsA physical file (or directory) on disk that stores a portion of the table's data. This is different from a partition, which is a logical division of a table's data that is created using a partition key. for a table. As a result we need to collect this information to avoid incorrect scheduling decisions.

  3. The coordinating node then uses the announcements to define a set of granules that can be assigned to the different replicas. Here for example, we can see that no granules from part 3 have been assigned to replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers. 2 because this replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers. did not provide this part in its announcement. Also note that no tasks were assigned to replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers. 3 because the replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers. did not provide an announcement.

  4. After each replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers. has processed the query on their subset of granules and the mergeable state has been sent back to the coordinator, the coordinator merges the results and the response is sent to the client.

Dynamic coordination

To address the issue of tail latency, we added dynamic coordination. This means that all the granules are not sent to a replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers. in one request, but each replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers. will be able to request a new task (a set of granules to be processed) to the coordinator. The coordinator will give the replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers. the set of granules based on the announcement received.

Let's assume that we are at the stage in the process where all replicas have sent an announcement with all partsA physical file (or directory) on disk that stores a portion of the table's data. This is different from a partition, which is a logical division of a table's data that is created using a partition key..

The figure below visualizes how dynamic coordination works:

  1. Replicas let the coordinator node know that they are able to process tasks, they can also specify how much work they can process.

  2. The coordinator assigns tasks to the replicas.

  1. ReplicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers. 1 and 2 are able to finish their task very quickly. They will request another task from the coordinator node.

  2. The coordinator assigns new tasks to replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers. 1 and 2.

  1. All the replicas are now done with the processing of their task. They request more tasks.

  2. The coordinator, using the announcements, checks what tasks remain to be process, but there are no remaining tasks.

  3. The coordinator tells the replicas that everything has been processed. It will now merge all the mergeable states and respond to the query.

Managing cache locality

The last remaining potential issue is how we handle cache locality. If the query is executed multiple times, how can we ensure the same task gets routed to the same replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers.? In the previous example, we had the following tasks assigned:

ReplicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers. 1ReplicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers. 2ReplicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers. 3
Part 1g1, g6, g7g2, g4, g5g3
Part 2g1g2, g4, g5g3
Part 3g1, g6g2, g4, g5g3

To ensure that the same tasks are assigned to the same replicas and can benefit from the cache, two things happen. A hash of the part + set of granules (a task) gets computed. A modulo of the number of replicas for the task assignment gets applied.

On paper this sounds good, but in reality, a sudden load on one replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers., or a network degradation, can introduce tail latency if the same replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers. is consistently used for executing certain tasks. If max_parallel_replicas is less than the number of replicas, then random replicas are picked for query execution.

Task stealing

if some replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers. processes tasks slower than others, other replicas will try to 'steal' tasks that in principle belong to that replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers. by hash to reduce the tail latency.

Limitations

This feature has known limitations, of which the major ones are documented in this section.

Note

If you find an issue which is not one of the limitations given below, and suspect parallel replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers. to be the cause, please open an issue on GitHub using the label comp-parallel-replicas.

LimitationDescription
Complex queriesCurrently parallel replica works fairly well for simple queries. Complexity layers like CTEs, subqueries, JOINs, non-flat query, etc. can have a negative impact on query performance.
Small queriesIf you are executing a query that does not process a lot of rows, executing it on multiple replicas might not yield a better performance time given that the network time for the coordination between replicas can lead to additional cycles in the query execution. You can limit these issues by using the setting: parallel_replicas_min_number_of_rows_per_replica.
Parallel replicas are disabled with FINAL
Projections are not used together with Parallel replicas
High Cardinality data and complex aggregationHigh cardinality aggregation that needs to send much data can significantly slow down your queries.
Compatibility with the new analyzerThe new analyzer might significantly slow down or speed up query execution in specific scenarios.
SettingDescription
enable_parallel_replicas0: disabled
1: enabled
2: Force the usage of parallel replica, will throw an exception if not used.
cluster_for_parallel_replicasThe cluster name to use for parallel replication; if you are using ClickHouse Cloud, use default.
max_parallel_replicasMaximum number of replicas to use for the query execution on multiple replicas, if a number lower than the number of replicas in the cluster is specified, nodes will be selected randomly. This value can also be overcommitted to account for horizontal scaling.
parallel_replicas_min_number_of_rows_per_replicaHelp limiting the number of replicas used based on the number of rows that need to be processed the number of replicas used is defined by:
estimated rows to read / min_number_of_rows_per_replica.
allow_experimental_analyzer0: use the old analyzer
1: use the new analyzer.

The behavior of parallel replicas might change based on the analyzer used.

Investigating issues with parallel replicas

You can check what settings are being used for each query in the system.query_log table. You can also look at the system.events table to see all the events that have occured on the server, and you can use the clusterAllReplicas table function to see the tables on all the replicas (if you are a cloud user, use default).

SELECT
   hostname(),
   *
FROM clusterAllReplicas('default', system.events)
WHERE event ILIKE '%ParallelReplicas%'
Response
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasHandleRequestMicroseconds      │   438 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   558 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasReadUnassignedMarks            │   240 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasReadAssignedForStealingMarks   │     4 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasStealingByHashMicroseconds     │     5 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasProcessingPartsMicroseconds    │     5 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     3 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasAvailableCount                 │     6 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasHandleRequestMicroseconds      │   698 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   644 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasReadUnassignedMarks            │   190 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasReadAssignedForStealingMarks   │    54 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasStealingByHashMicroseconds     │     8 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasProcessingPartsMicroseconds    │     4 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     2 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasAvailableCount                 │     6 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasHandleRequestMicroseconds      │   620 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   656 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasReadUnassignedMarks            │     1 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasReadAssignedForStealingMarks   │     1 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasStealingByHashMicroseconds     │     4 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasProcessingPartsMicroseconds    │     3 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     1 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasAvailableCount                 │    12 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasHandleRequestMicroseconds      │   696 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   717 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasReadUnassignedMarks            │     2 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasReadAssignedForStealingMarks   │     2 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasStealingByHashMicroseconds     │    10 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasProcessingPartsMicroseconds    │     6 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     2 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasAvailableCount                 │    12 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘

The system.text_log table also contains information about the execution of queries using parallel replicas:

SELECT message
FROM clusterAllReplicas('default', system.text_log)
WHERE query_id = 'ad40c712-d25d-45c4-b1a1-a28ba8d4019c'
ORDER BY event_time_microseconds ASC
Response
┌─message────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ (from 54.218.178.249:59198) SELECT * FROM session_events WHERE type='type2' LIMIT 10 SETTINGS allow_experimental_parallel_reading_from_replicas=2; (stage: Complete)                                                                                       │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 to stage Complete │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') to stage WithMergeableState only analyze │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') from stage FetchColumns to stage WithMergeableState only analyze │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 to stage WithMergeableState only analyze │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 from stage FetchColumns to stage WithMergeableState only analyze │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 from stage WithMergeableState to stage Complete │
│ The number of replicas requested (100) is bigger than the real number available in the cluster (6). Will use the latter number to execute the query.                                                                                                       │
│ Initial request from replica 4: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 4 replica
                                                                                                   │
│ Reading state is fully initialized: part all_0_2_1 with ranges [(0, 182)] in replicas [4]; part all_3_3_0 with ranges [(0, 62)] in replicas [4]                                                                                                            │
│ Sent initial requests: 1 Replicas count: 6                                                                                                                                                                                                                 │
│ Initial request from replica 2: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 2 replica
                                                                                                   │
│ Sent initial requests: 2 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 4, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 4 with 1 parts: [part all_0_2_1 with ranges [(128, 182)]]. Finish: false; mine_marks=0, stolen_by_hash=54, stolen_rest=0                                                                                                       │
│ Initial request from replica 1: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 1 replica
                                                                                                   │
│ Sent initial requests: 3 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 4, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 4 with 2 parts: [part all_0_2_1 with ranges [(0, 128)], part all_3_3_0 with ranges [(0, 62)]]. Finish: false; mine_marks=0, stolen_by_hash=0, stolen_rest=190                                                                  │
│ Initial request from replica 0: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 0 replica
                                                                                                   │
│ Sent initial requests: 4 Replicas count: 6                                                                                                                                                                                                                 │
│ Initial request from replica 5: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 5 replica
                                                                                                   │
│ Sent initial requests: 5 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 2, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 2 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Initial request from replica 3: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 3 replica
                                                                                                   │
│ Sent initial requests: 6 Replicas count: 6                                                                                                                                                                                                                 │
│ Total rows to read: 2000000                                                                                                                                                                                                                                │
│ Handling request from replica 5, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 5 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 0, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 0 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 1, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 1 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 3, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 3 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ (c-crimson-vd-86-server-rdhnsx3-0.c-crimson-vd-86-server-headless.ns-crimson-vd-86.svc.cluster.local:9000) Cancelling query because enough data has been read                                                                                              │
│ Read 81920 rows, 5.16 MiB in 0.013166 sec., 6222087.194288318 rows/sec., 391.63 MiB/sec.                                                                                                                                                                   │
│ Coordination done: Statistics: replica 0 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 1 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 2 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 3 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 4 - {requests: 3 marks: 244 assigned_to_me: 0 stolen_by_hash: 54 stolen_unassigned: 190}; replica 5 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0} │
│ Peak memory usage (for query): 1.81 MiB.                                                                                                                                                                                                                   │
│ Processed in 0.024095586 sec.                                                                                                                                                                                                                              │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

Finally, you can also use the EXPLAIN PIPELINE. It highlights how ClickHouse is going to execute a query and what resources are going to be used for the execution of the query. Let's take the following query for example:

SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) LIMIT 10

Let's have a look at the query pipeline without parallel replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers.:

EXPLAIN PIPELINE graph = 1, compact = 0 
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) 
LIMIT 10 
SETTINGS allow_experimental_parallel_reading_from_replicas=0 
FORMAT TSV;

And now with parallel replicaA copy of the data stored in a ClickHouse database. You can have any number of replicas of the same data for redundancy and reliability. Replicas are used in conjunction with the ReplicatedMergeTree table engine, which enables ClickHouse to keep multiple copies of data in sync across different servers.:

EXPLAIN PIPELINE graph = 1, compact = 0 
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) 
LIMIT 10 
SETTINGS allow_experimental_parallel_reading_from_replicas=2 
FORMAT TSV;