Skip to main content
Skip to main content
Edit this page

Working with JSON in ClickHouse

This guide provides common patterns for working with JSON data replicated from MongoDB to ClickHouse via ClickPipes.

Suppose we created a collection t1 in MongoDB to track customer orders:

db.t1.insertOne({
  "order_id": "ORD-001234",
  "customer_id": 98765,
  "status": "completed",
  "total_amount": 299.97,
  "order_date": new Date(),
  "shipping": {
    "method": "express",
    "city": "Seattle",
    "cost": 19.99
  },
  "items": [
    {
      "category": "electronics",
      "price": 149.99
    },
    {
      "category": "accessories",
      "price": 24.99
    }
  ]
})

MongoDB CDC Connector replicates MongoDB documents to ClickHouse using the native JSON data type. The replicated table t1 in ClickHouse will contain the following row:

Row 1:
──────
_id:                "68a4df4b9fe6c73b541703b0"
_full_document:     {"_id":"68a4df4b9fe6c73b541703b0","customer_id":"98765","items":[{"category":"electronics","price":149.99},{"category":"accessories","price":24.99}],"order_date":"2025-08-19T20:32:11.705Z","order_id":"ORD-001234","shipping":{"city":"Seattle","cost":19.99,"method":"express"},"status":"completed","total_amount":299.97}
_peerdb_synced_at:  2025-08-19 20:50:42.005000000
_peerdb_is_deleted: 0
_peerdb_version:    0

Table schema

The replicated tables use this standard schema:

┌─name───────────────┬─type──────────┐
│ _id                │ String        │
│ _full_document     │ JSON          │
│ _peerdb_synced_at  │ DateTime64(9) │
│ _peerdb_version    │ Int64         │
│ _peerdb_is_deleted │ Int8          │
└────────────────────┴───────────────┘
  • _id: Primary key from MongoDB
  • _full_document: MongoDB document replicated as JSON data type
  • _peerdb_synced_at: Records when the row was last synced
  • _peerdb_version: Tracks the version of the row; incremented when the row is updated or deleted
  • _peerdb_is_deleted: Marks whether the row is deleted

ReplacingMergeTree table engine

ClickPipes maps MongoDB collections into ClickHouse using the ReplacingMergeTree table engine family. With this engine, updates are modeled as inserts with a newer version (_peerdb_version) of the document for a given primary key (_id), enabling efficient handling of updates, replaces, and deletes as versioned inserts.

ReplacingMergeTree clears out duplicates asynchronously in the background. To guarantee the absence of duplicates for the same row, use the FINAL modifier. For example:

SELECT * FROM t1 FINAL;

Handling deletes

Deletes from MongoDB are propagated as new rows marked as deleted using the _peerdb_is_deleted column. You typically want to filter these out in your queries:

SELECT * FROM t1 FINAL WHERE _peerdb_is_deleted = 0;

You can also create a row-level policy to automatically filter out deleted rows instead of specifying the filter in each query:

CREATE ROW POLICY policy_name ON t1
FOR SELECT USING _peerdb_is_deleted = 0;

Querying JSON data

You can directly query JSON fields using dot syntax:

SELECT
    _full_document.order_id,
    _full_document.shipping.method
FROM t1;
┌─_full_document.order_id─┬─_full_document.shipping.method─┐
│ ORD-001234              │ express                        │
└─────────────────────────┴────────────────────────────────┘

When querying nested object fields using dot syntax, make sure to add the ^ operator:

SELECT _full_document.^shipping as shipping_info FROM t1;
┌─shipping_info──────────────────────────────────────┐
│ {"city":"Seattle","cost":19.99,"method":"express"} │
└────────────────────────────────────────────────────┘

Dynamic type

In ClickHouse, each field in JSON has Dynamic type. Dynamic type allows ClickHouse to store values of any type without knowing the type in advance. You can verify this with the toTypeName function:

SELECT toTypeName(_full_document.customer_id) AS type FROM t1;
┌─type────┐
│ Dynamic │
└─────────┘

To examine the underlying data type(s) for a field, you can check with the dynamicType function. Note that it's possible to have different data types for the same field name in different rows:

SELECT dynamicType(_full_document.customer_id) AS type FROM t1;
┌─type──┐
│ Int64 │
└───────┘

Regular functions work for dynamic type just like they do for regular columns:

Example 1: Date parsing

SELECT parseDateTimeBestEffortOrNull(_full_document.order_date) AS order_date FROM t1;
┌─order_date──────────┐
│ 2025-08-19 20:32:11 │
└─────────────────────┘

Example 2: Conditional logic

SELECT multiIf(
    _full_document.total_amount < 100, 'less_than_100',
    _full_document.total_amount < 1000, 'less_than_1000',
    '1000+') AS spendings
FROM t1;
┌─spendings──────┐
│ less_than_1000 │
└────────────────┘

Example 3: Array operations

SELECT length(_full_document.items) AS item_count FROM t1;
┌─item_count─┐
│          2 │
└────────────┘

Field casting

Aggregation functions in ClickHouse don't work with dynamic type directly. For example, if you attempt to directly use the sum function on a dynamic type, you get the following error:

SELECT sum(_full_document.shipping.cost) AS shipping_cost FROM t1;
-- DB::Exception: Illegal type Dynamic of argument for aggregate function sum. (ILLEGAL_TYPE_OF_ARGUMENT)

To use aggregation functions, cast the field to the appropriate type with the CAST function or :: syntax:

SELECT sum(_full_document.shipping.cost::Float32) AS shipping_cost FROM t1;
┌─shipping_cost─┐
│         19.99 │
└───────────────┘
Note

Casting from dynamic type to the underlying data type (determined by dynamicType) is very performant, as ClickHouse already stores the value in its underlying type internally.

Flattening JSON

Normal view

You can create normal views on top of the JSON table to encapsulate flattening/casting/transformation logic in order to query data similar to a relational table. Normal views are lightweight as they only store the query itself, not the underlying data. For example:

CREATE VIEW v1 AS
SELECT
    CAST(_full_document._id, 'String') AS object_id,
    CAST(_full_document.order_id, 'String') AS order_id,
    CAST(_full_document.customer_id, 'Int64') AS customer_id,
    CAST(_full_document.status, 'String') AS status,
    CAST(_full_document.total_amount, 'Decimal64(2)') AS total_amount,
    CAST(parseDateTime64BestEffortOrNull(_full_document.order_date, 3), 'DATETIME(3)') AS order_date,
    _full_document.^shipping AS shipping_info,
    _full_document.items AS items
FROM t1 FINAL
WHERE _peerdb_is_deleted = 0;

This view will have the following schema:

┌─name────────────┬─type───────────┐
│ object_id       │ String         │
│ order_id        │ String         │
│ customer_id     │ Int64          │
│ status          │ String         │
│ total_amount    │ Decimal(18, 2) │
│ order_date      │ DateTime64(3)  │
│ shipping_info   │ JSON           │
│ items           │ Dynamic        │
└─────────────────┴────────────────┘

You can now query the view similar to how you would query a flattened table:

SELECT
    customer_id,
    sum(total_amount)
FROM v1
WHERE shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;

Refreshable materialized view

You can also create Refreshable Materialized Views, which enable you to schedule query execution for deduplicating rows and storing the results in a flattened destination table. With each scheduled refresh, the destination table is replaced with the latest query results.

The key advantage of this method is that the query using the FINAL keyword runs only once during the refresh, eliminating the need for subsequent queries on the destination table to use FINAL.

However, a drawback is that the data in the destination table is only as up-to-date as the most recent refresh. For many use cases, refresh intervals ranging from several minutes to a few hours provide a good balance between data freshness and query performance.

CREATE TABLE flattened_t1 (
    `_id` String,
    `order_id` String,
    `customer_id` Int64,
    `status` String,
    `total_amount` Decimal(18, 2),
    `order_date` DateTime64(3),
    `shipping_info` JSON,
    `items` Dynamic
)
ENGINE = ReplacingMergeTree()
PRIMARY KEY _id
ORDER BY _id;

CREATE MATERIALIZED VIEW mv1 REFRESH EVERY 1 HOUR TO flattened_t1 AS
SELECT 
    CAST(_full_document._id, 'String') AS _id,
    CAST(_full_document.order_id, 'String') AS order_id,
    CAST(_full_document.customer_id, 'Int64') AS customer_id,
    CAST(_full_document.status, 'String') AS status,
    CAST(_full_document.total_amount, 'Decimal64(2)') AS total_amount,
    CAST(parseDateTime64BestEffortOrNull(_full_document.order_date, 3), 'DATETIME(3)') AS order_date,
    _full_document.^shipping AS shipping_info,
    _full_document.items AS items
FROM t1 FINAL
WHERE _peerdb_is_deleted = 0;

You can now query the table flattened_t1 directly without the FINAL modifier:

SELECT
    customer_id,
    sum(total_amount)
FROM flattened_t1
WHERE shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;