简介
作为一家开源公司,我们乐于推广其他让我们印象深刻的开源项目,无论是其技术实力、对性能的极致追求,还是我们认为能够真正帮助用户的功能。在发现 UI 库 Perspective 后,我们意识到它满足了所有这些要求,允许用户在 ClickHouse 数据之上构建真正的实时可视化!为了了解该库是否可以轻松与 ClickHouse 集成,我们构建了一个简单的演示应用程序,通过使用 Apache Arrow 将外汇数据流式传输到浏览器,从而提供丰富的可视化功能,所有这些仅用 100 多行代码即可实现!
此示例应该很容易进行调整,并允许用户在数据流式传输到 ClickHouse 时对其进行可视化。请告诉我们您的想法,并向 Perspective 致敬,感谢他们构建了如此酷的库!
如果您想运行 示例 Perspective 应用程序,我们已为您提供了一个 ClickHouse 实例以供使用。或者,您可以 在此处试用托管版本。最后,我们将探讨我们可以以多快的速度流式传输数据,以及当前方法为何不理想,以及一些未来 ClickHouse 功能的想法,这些想法将解决这些缺陷。
什么是 Perspective?
Perspective 库 是一种高性能数据分析和可视化工具,旨在高效处理实时和流式数据集。它提供交互式和可自定义的可视化效果,例如热图、折线图和树状图。与 ClickHouse 一样,Perspective 也注重性能。其核心是用 Rust 和 C++ 编写的,并编译成 WebAssembly。这使其能够在浏览器中处理数百万个数据点并响应连续数据流。
除了简单的渲染之外,Perspective 还提供了快速操作,用于在浏览器或服务器端对数据集进行透视、过滤和聚合,以及使用 ExprTK 执行表达式。虽然这并非为 ClickHouse 中看到的 PB 级规模而设计,但它允许在传递到客户端的行上进行第二级数据转换 - 如果所需数据已可用并且只需要简单的转换即可实现所需的视觉效果,则减少了对进一步查询的需求。
这使其成为 ClickHouse 支持的应用程序的理想选择,在这些应用程序中,实时洞察和流畅的交互至关重要。凭借对 Python 和 JavaScript 的支持,它可以集成到后端分析管道和基于 Web 的界面中。
虽然 Perspective 完美地补充了 ClickHouse 的标准可视化需求,但我们尤其对其处理流式数据的能力感兴趣,通过仅保留最新的 N 行来保持恒定的内存开销。我们很好奇跟踪持续更新的数据集有多容易,仅将新的增量加载到浏览器中,其中仅保留并汇总最新的点子集。
虽然我们专注于 Perspective 的 JavaScript 集成,但用户也可以在 Python 中使用 Perspective,使用 JupyterLab 小部件和客户端库在笔记本中进行交互式数据分析。
ClickHouse 用于流式数据?
虽然 ClickHouse 不是流处理引擎,而是一个 OLAP 数据库,但它具有一些功能,例如 增量物化视图,这允许实现与 Apache Flink 等技术中看到的大部分相同的功能。这些视图是触发器,在插入数据块时执行查询(可以包含聚合),并将结果存储在不同的表中以供以后使用。
虽然许多简单的流处理功能可以复制人们在 Flink 等引擎中执行的更简单的转换和聚合以简化架构,但我们承认这些技术可以协同工作,后者为高级案例提供额外的功能。当用于流处理时,ClickHouse 具有有效存储所有数据的额外优势 - 允许查询历史数据。
在我们的案例中,我们想尝试将 ClickHouse 中的最新行流式传输到 Perspective 以进行渲染。对于我们的示例,我们将粗略地模拟可视化外汇交易到达 ClickHouse 的需求。这可能对交易者最有用,如果需要,行将持久化以供将来进行历史分析。
数据集 - 外汇
在我们的示例中,我们将使用外汇数据集。外汇交易是指不同国家/地区的货币彼此之间的交易,交易者可以从经纪商处以(卖出价
)买入基础货币和报价货币,或者卖出基础货币并以(买入价
)获得回报。数据集跟踪每个货币对随时间的价格变化 - 重要的是它们变化很快!
对于不熟悉外汇交易的人,我们建议阅读 这篇早期文章中的一个简短部分,我们在其中总结了这些概念。
完整的数据集(可在 公共 S3 存储桶中获得)是从 www.histdata.com 下载的,涵盖了 2000 年至 2022 年。它有 115 亿行和 66 个货币对(解压缩后约 600GB)。
虽然简单,但此数据集的模式对于我们的示例来说是理想的。每行代表一个刻度。此处的 timestamp 精确到毫秒,列指示基础货币和报价货币以及卖出价和买入价。
CREATE TABLE forex
(
`datetime` DateTime64(3),
`bid` Decimal(11, 5),
`ask` Decimal(11, 5),
`base` LowCardinality(String),
`quote` LowCardinality(String)
)
ENGINE = MergeTree
ORDER BY (datetime, base, quote)
刻度记录股票或商品价格何时变化预定的金额或分数变化,即,当价格上涨或下跌特定金额或分数变化时发生刻度。外汇中的刻度将在买入价或报价价发生变化时发生。
由于没有可用的外汇流式提要,我们将通过加载一年份的 parquet 格式数据并将其偏移到当前时间来模拟此操作。如果他们希望在本地实例上尝试该应用程序,则可以复制此数据集 - 请参阅 此处。
请注意,刻度数据不代表实际的交易/交换。每秒的交易/交换次数要高得多!它也不会捕获商定的价格或交换货币的数量(在源数据中逻辑上为 0,因此被忽略)。相反,它只是标记价格何时以称为 点差 的单位发生变化。
使用 Arrow 将 Perspective 连接到 ClickHouse
一些样板代码
设置和配置 Perspective 需要导入几个包,并编写少量样板代码。 公开示例 非常出色,但总而言之,我们创建了一个 worker 和一个 table。worker 代表一个 Web Worker 进程,它将繁重的操作(例如更新)从浏览器的主要渲染线程中卸载,从而确保即使在流式传输大型实时数据集时,界面也能保持响应速度。table 代表主要的数据结构,可以通过新数据动态更新。
import perspective from "https://cdn.jsdelivr.net.cn/npm/@finos/[email protected]/dist/cdn/perspective.js";
const forex_worker = await perspective.worker();
// fetch some rows...
const forex_table = await market_worker.table(rows, { limit: 20000 })
我们尽可能简化了示例,通过 CDN 导入包,并避免除 Perspective 之外的任何依赖项。建议将 Perspective 集成到现有应用程序或构建生产应用程序的用户探索 常见 JS 框架和构建工具 的示例。
Perspective 提供了许多部署模型,这些模型决定了数据的加载和绑定方式,每个模型都有其各自的优缺点。对于我们的示例,我们将使用客户端 方法,将数据流式传输到浏览器,几行 Javascript 从 ClickHouse 通过 HTTP 获取数据,并且 WebAssembly 库运行所有计算和 UI 交互。
流式传输最新交易
如以下所示,在创建 table 时,我们限制了保留的行数以限制内存开销。
const forex_table = await market_worker.table(rows, { limit: 20000 });
对于我们的示例,我们将不断向此 table 添加新行,因为新的交易变得可用,依靠 Perspective 仅保留最新的 20k 行。
截至撰写本文时,ClickHouse 不支持 WebSockets 或将更改的行流式传输到客户端的方法。因此,我们使用轮询方法通过 HTTP 获取最新的行。由于 Perspective 偏好 Apache Arrow 格式的数据,因此我们利用 ClickHouse 将数据以这种格式返回的能力,这还有助于最大程度地减少传输的数据量。
外汇价格变化很快,对于交易量最大的货币对,每秒最多可达 35 次。我们希望尽快获取这些数据——理想情况下每 30-50 毫秒获取一次,以确保可视化所有值。因此,我们的查询需要快速执行,每个连接的客户端每秒发出 10 次查询。在所有连接的客户端中,我们预计每秒会发出 100 次查询——这与一些误解相反,ClickHouse 可以轻松处理。
我们的查询仅根据事件的时间戳进行过滤,时间戳是我们的主键中的第一个条目,从而确保过滤得到优化。由于所有客户端都针对大约相同的时间段(即现在)发出请求,并且是单调递增的,因此我们的查询应该是缓存友好的。测试表明,虽然我们的查询即使在完整的 110 亿行数据集上也能在不到 10 毫秒内执行,但到 ClickHouse 实例的 HTTP 往返时间乐观地(同一区域)为 20-30 毫秒。因此,我们只使用从当前时间到上次获取时间的简单滑动窗口。此操作会持续执行,并尽快检索 ClickHouse 可以提供的行。
上图所示的简化图表假设每个查询执行都正好需要 50 毫秒。我们的查询获取所有列,并计算点差(ask
和 bid
之间的差值)。理想情况下,我们还希望显示当前 bid 的变化——这在交易中很有用。为了确保每个货币对的第一个值具有正确的变化值,我们需要确保我们拥有每个货币对在当前窗口之外的最后一个价格。为此,我们查询比返回的数据略多一些,如上图和我们下面的最终查询所示。
SELECT *
FROM
(
SELECT
concat(base, '.', quote) AS base_quote,
datetime AS last_update,
bid,
ask,
ask - bid AS spread,
ask - any(ask) OVER (PARTITION BY base_quote ORDER BY base_quote ASC, datetime ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS chg
FROM forex
WHERE datetime > {prev_lower_bound:DateTime64(3)} AND datetime <= {upper_bound:DateTime64(3)}
ORDER BY
base_quote ASC,
datetime ASC
)
WHERE datetime > {lower_bound:DateTime64(3)} AND datetime <= {upper_bound:DateTime64(3)}
ORDER BY last_update ASC
┌─base_quote─┬─────────────last_update─┬─────bid─┬────────ask─┬──spread─┬──────chg─┐
│ AUD.CAD │ 2024-09-19 13:25:30.840 │ 0.97922 │ 0.97972 │ 0.0005 │ -0.00002 │
│ XAG.USD │ 2024-09-19 13:25:30.840 │ 17.858 │ 17.90299 │ 0.04499 │ 0.00499 │
│ AUD.JPY │ 2024-09-19 13:25:30.840 │ 97.28 │ 97.31 │ 0.03 │ -0.001 │
│ AUD.NZD │ 2024-09-19 13:25:30.840 │ 1.09886 │ 1.09946 │ 0.0006 │ 0.00004 │
...
│ EUR.AUD │ 2024-09-19 13:25:30.840 │ 1.43734 │ 1.43774 │ 0.0004 │ -0.00002 │
└────────────┴─────────────────────────┴─────────┴────────────┴─────────┴──────────┘
25 rows in set. Elapsed: 0.012 sec. Processed 24.57 thousand rows, 638.82 KB (2.11 million rows/s., 54.98 MB/s.)
Peak memory usage: 5.10 MiB.
请注意,我们如何将时间范围过滤器应用于 argMax,因此我们不需要对所有时间 < 下界(而是
下界 - 5 分钟 < 时间 < 下界
)的行执行完整扫描。
我们用于获取下一行的最终函数可以在此处找到。它使用上述查询,以 Arrow 格式请求数据,并将响应读入 Perspective 所需的 ArrayBuffer 中。
我们不使用ClickHouse JS 库,主要目的是为了最大程度地减少依赖项,而且我们的代码非常简单。我们建议复杂的应用程序使用它。
初始应用程序
我们的应用程序(如下所示)调用上述函数来在循环中持续获取行
我们的循环还计算平均获取时间(在最新的 10 个请求中取平均值)。此处的性能将取决于您与 ClickHouse 集群的距离,延迟主要由 HTTP 获取时间决定。如果与 ClickHouse 服务的距离合理,我们可以将其减少到大约 30 毫秒。
虽然我们显示了开箱即用的可视化效果的数据网格,但用户可以轻松修改可视化类型并应用转换。在下面的示例中,我们切换到散点可视化,以绘制 EUR-GBP
货币对的 bid 和 ask 价格。
出于好奇,我们启动了 20 个并发客户端,导致每秒发出近 200 个查询,以测试 CPU 负载。即使在这种负载下,ClickHouse 使用的内核也少于 2 个。
还没有完全流式传输
实际上,在真实场景中,如果由于 ClickHouse 的最终一致性模型而跟踪当前时间,上述操作可能会错过行。即使可以通过仔细配置来缓解此问题,但它也不是最佳选择。行也可能会产生一些插入延迟,因此我们希望将查询从当前时间偏移,而不仅仅是使用 now()
。
我们承认这些不足之处,并且已经开始探索流式查询的概念,它将可靠地仅返回与指定查询匹配的新行。这将消除轮询的需要,客户端只需打开一个带有查询的 HTTP 连接,并在行到达时接收行。
速度测试——使用 Arrow Stream
虽然实时跟踪价格变化可能是最有用的,但我们很好奇 Perspective 实际上可以处理数据的速度有多快。为此,我们希望按升序日期顺序将整个数据集流式传输到 Perspective,同样只保留最后 N 个数据点。在这种情况下,我们的查询变为
SELECT concat(base, '.', quote) AS base_quote,
datetime AS last_update,
CAST(bid, 'Float32') AS bid,
CAST(ask, 'Float32') AS ask,
ask - bid AS spread
FROM forex
ORDER BY datetime ASC
FORMAT ArrowStream
SETTINGS output_format_arrow_compression_method='none'
请注意,我们如何删除了每个货币对的变化计算。这需要一个窗口函数,该函数不会利用optimize_read_in_order 并阻止结果的立即流式传输。
为此,您会注意到我们不仅仅使用了 Arrow 格式。这将要求我们下载整个数据集>60GB 在 ClickHouse 中压缩 并将其转换为 Perspective 的 table。即使以 Arrow 格式,对于浏览器来说,这仍然有点大!
相反,我们利用 ClickHouse 对 Arrow Stream 格式的支持,分块读取数据并将数据传递给 Perspective。虽然以前我们可以在没有依赖项的情况下完成所有工作,但对于此,我们需要Arrow js 库。虽然这使得使用 Arrow 文件变得微不足道,但要支持流式传输,我们需要更多 JS。我们最终用于分批流式传输整个数据集的函数如下所示。
async function get_all_rows(table, lower_bound) {
const view = await table.view({ // Create a view with aggregation to get the maximum datetime value
columns: ["last_update"], // Column you're interested in
aggregates: { last_update: "max" } // Aggregate by the maximum of datetime
});
const response = await fetch(clickhouse_url, {
method: 'POST',
body: `SELECT concat(base, '.', quote) AS base_quote, datetime AS last_update, bid::Float32 as bid, ask::Float32 as ask, ask - bid AS spread
FROM forex WHERE datetime > ${lower_bound}::DateTime64(3) ORDER BY datetime ASC FORMAT ArrowStream SETTINGS output_format_arrow_compression_method='none'`,
headers: { 'Authorization': `Basic ${credentials}` }
});
const reader = await RecordBatchReader.from(response);
await reader.open();
for await (const recordBatch of reader) { // Continuously read from the stream
if (real_time) { // set to false if we want to stop the stream
await view.delete();
return;
}
const batchTable = new Table(recordBatch); // currently required, see https://github.com/finos/perspective/issues/1157
const ipcStream = tableToIPC(batchTable, 'stream');
const bytes = new Uint8Array(ipcStream);
table.update(bytes);
const result = await view.to_columns();
const maxDateTime = result["last_update"][0];
document.getElementById("last_updated").textContent = `Last updated: ${new Date(maxDateTime).toISOString()}`;
total_size += (bytes.length);
document.getElementById("total_download").textContent = `Total Downloaded: ${prettyPrintSize(total_size,2)}`;
}
}
此代码可能比实际效率更高——主要是因为 Perspective 当前需要一个字节数组用于更新调用。这迫使我们将批处理转换为 table 并将其流式传输到数组中。我们还使用 Perspective 视图,从概念上类似于 ClickHouse 中的物化视图——在加载 table 数据时对其执行聚合。在这种情况下,我们使用视图只是计算最大流式传输日期,并在最终 UI 中显示。
通过添加少量代码来在早期的“实时”轮询模式和此“流式传输模式”之间切换,我们得到了最终的应用程序。切换到流式传输模式显示了 Perspective 的性能
货币对的折线图显示了我们如何能够每秒渲染数千个数据点,并且至少有 25 MiB/秒的数据流式传输到浏览器。
您可以在此处试用最终应用程序。
结论
我们使用此博文探讨了一个流行的开源可视化库 Perspective,它对 ClickHouse 具有有用的应用,在性能和处理快速到达的大量数据的能力方面具有协同作用!借助 Apache Arrow,我们只需几行 JavaScript 即可将 ClickHouse 与 Perspective 集成。在此过程中,我们还发现了一些 ClickHouse 处理流式数据的局限性,并重点介绍了我们希望解决这些问题的当前工作。