DoubleCloud 即将停止服务。在限定时间内享受免费迁移服务,迁移至 ClickHouse。立即联系我们 ->->

博客 / 工程

Kafka、延迟与你:使用 Kafka 和 ClickHouse 最小化延迟

author avatar
Paul Moore
2023年9月13日

介绍

ClickHouse 的许多博客主题都源于社区参与,例如通过我们的公共 Slack 频道。随着我们官方 Kafka 连接器及其在 Confluent Cloud 中的支持的最近发布,我们收到了很多用户询问在处理 Kafka 并将数据传输到 ClickHouse Cloud 时如何衡量和降低延迟的问题。这是一个很好的讨论话题,我们将其整理成一篇博文进行解答 - 让我们开始讨论吧!

Kafka 连接器如何工作?

首先,让我们提醒大家 Kafka 和 ClickHouse 之间的连接是如何工作的(以及我们的连接器在其中扮演什么角色)。数据流示例可能如下所示

kafka_clickhouse_flow.png

从本质上讲,生产者将数据提交到主题 (A),这些数据会被“Kafka Connect”获取,然后通过轮询 (B) 发送到连接器,连接器将记录推送到 ClickHouse (C)。在上图中,A 与 B 分开,B 与 C 分开 - 所有三个都是独立的系统,它们之间存在网络连接。

当然,实现方式可能会有所不同,但我们目前将使用最广泛的设置,以便涵盖大多数情况 - 如果您的设置中没有某个组件,可以忽略该部分。

什么是延迟?

这可能是显而易见的,但值得重申:延迟(在此上下文中)是指数据从 A 点传输到 B 点所需的时间。它是由多种因素造成的,最常见的是:硬件、软件和网络。

硬件因素通常是导致延迟的物理限制 - VM 大小、处理器速度等。我们将这些问题留给读者自行解决,并专注于连接器本身可控制的变量。

软件因素通常是设计限制而不是物理问题 - 线程数、数据结构、配置设置等。它们也特别常见,并且本指南的大部分内容都将重点介绍这些问题。

网络因素通常是节点之间的物理网络、虚拟机之间的虚拟连接、客户端到服务器的距离等。

何时会出现延迟问题?

在我们深入探讨延迟的“如何”之前,有必要简要讨论一下“何时”和“为什么”:“我为什么要关心延迟?何时会出现延迟问题?”

您是您用例的专家,但我们建议您不要对延迟设定不切实际的期望 - 容易过度关注延迟的微小差异,而这些差异不太可能对业务产生重大影响,以至于即使延迟一秒或两秒也会导致警报。请记住,在网络分布式环境中,会存在一定程度的延迟 - 任务更多地是关于在可能的情况下降低/稳定延迟,而不是完全消除它。理想情况下,您能够确定可接受的延迟并确保在定义的范围内保持这种延迟 - 可能通过 SLA。

同样,一个用例可能比另一个用例容忍更多的延迟 - 如果您每周对数据进行一次分析,那么这可能比实时分析允许更多的延迟。即使对于实时分析,“实时”的含义也是您需要决定的 - 同样,您是您用例的专家;请记住,会存在硬件/软件/网络限制。

如何衡量延迟?

为了确定 Kafka 管道中是否存在延迟问题,您需要能够对其进行衡量。

最简单的方法是在 Kafka 管道的各个点将时间戳值附加到消息中。稍后可以比较和可视化这些时间戳,以指示可能导致延迟的步骤。

我们看到的一种非常有效的方法1 是使用单消息转换器

message_transform.png

1 有多种可用选项,例如 JMX,但我们发现 SMT 足够好 - 您的里程可能会有所不同

此转换器采用几个现成的字段(偏移量、分区、时间戳和主题),并将它们添加到对象中,以便连接器传递给 ClickHouse,而无需更改消息生产者。此转换器可以配置为在各个步骤运行,允许用户将延迟归因于处理管道的特定部分。

kafka_data_transformed.png

上述转换器使我们能够识别 Kafka 管道不同阶段的时间。我们还可以记录将消息作为行插入 ClickHouse 的确切时间点。这可以通过使用 DEFAULT 表达式插入时间戳来实现,然后可以将其与转换器添加的早期时间进行比较

CREATE TABLE data_testing
(
  `raw` String,
  `generationTimestamp` DateTime64(3),
  `insertTime` DateTime64(3) DEFAULT now()
)
ENGINE = MergeTree
ORDER BY insertTime

请注意,我们使用“DEFAULT now()”,以便 ClickHouse 生成时间,而不是在消息本身中存在 - 这将为我们提供插入时间。

如果您的消息具有生成时间戳,您还可以跟踪以下时间之间的时间:消息生成 <-> Kafka 插入 <-> ClickHouse 插入。

如何可视化延迟?

可视化延迟的一种常见方法是使用 ClickHouse SQL 控制台(在 ClickHouse Cloud 上)提供的内置表格

sql-ui-kafka_increasing_latency.png

此图表的数据是使用一个相当简单的查询创建的,以计算中位数、第 75 个百分位数和第 99 个百分位数

SELECT formatDateTime(insertTime, '%F %k:%i') as byMinutes,
    median(dateDiff('second', generationTimestamp, insertTime)) AS median,
    median(.75)(dateDiff('second', generationTimestamp, insertTime)) AS p_75,
    median(.95)(dateDiff('second', generationTimestamp, insertTime)) AS p_95
FROM default.kafka_data
WHERE insertTime > now() - interval 60 MINUTE
GROUP BY byMinutes
ORDER BY byMinutes ASCENDING

InsertTime 由默认列(前面使用“now()”提到)提供,而 generationTimestamp 是我们在生成数据时设置的列 - 您也可以轻松地使用类似 kafkaField 的内容(前面提到的是由转换器创建的)。

注意:示例图表中的数据显示延迟时间缓慢增加,仅看到这一点就足以表明可能存在问题 - 可视化的力量!

如何处理延迟?

我们将介绍一些解决延迟的方法,但请记住,此列表并不详尽 - 您可能会发现其他方法更有效。如果您有,请告诉我们 - 信息越多,我们就能更好地帮助其他人!

延迟可能发生在系统的多个部分。我们在下面解决每个阶段的潜在原因。

Kafka 和 Kafka Connect 之间的延迟

获取和批处理大小

记录是由 Kafka Connect 独立于连接器获取的,您可以通过设置**consumer.fetch.min.bytes** 和 **consumer.fetch.max.bytes** 来调整此行为。正如您所料,这里需要权衡 - 理想情况下,您获取的记录数量多于批处理大小,以便始终有足够的记录来填充一个批处理。您甚至可以通过设置 **consumer.fetch.max.wait.ms** 来调整获取频率 - 虽然这可能需要更长的时间,但如果发现您生成消息的频率并不高,那么获取更有可能达到最大大小(从而优化需要建立的连接数量)。

类似地,在另一端,连接器接收的批处理由轮询控制(默认值为 500)。

consumer.max.poll.records=[NUMBER OF RECORDS]
consumer.max.partition.fetch.bytes=[NUMBER OF RECORDS * RECORD SIZE IN BYTES]

不同的 Kafka Connect 主机将有不同的调整处理方式 - 例如,请参阅 Confluent 的文档,了解其首选方法。

分区数量

主题分区的数量会影响插入量和批处理大小(这可能会增加网络通信量,正如您所料)。一百条消息分散在一百个分区中,其影响将大于单个分区,尤其是在运行的任务/工作程序数量的基础上。由于上述轮询和获取过程是在每个任务中执行的(见下文),因此更多的分区可能会增加填充批处理的时间(如果遵循以下建议,每个分区一个任务) - 可能会增加延迟。因此,我们通常建议使用较少的分区(在合理的范围内),而不是更多的分区,但没有关于此大小的固定规则。

我们通常建议一次以至少 1,000 行的相当大的批次插入数据,理想情况下在 10,000 到 100,000 行之间(有关更多信息,请参阅我们的 最佳实践)。

任务数量

在 Kafka Connect 中,任务是实际复制数据的 Worker - 我们看到建议您至少拥有与分区一样多的任务,以确保每个“通道”都能正常流动。

如果您想更深入地了解任务,请查看 关于此主题的 Confluent 文档

太多部分

我们遇到过“太多部分”错误 - ClickHouse 针对大量数据(正确批处理)进行了优化,而不是针对频繁的小型插入(经典的“健谈”与“批量”网络争论)。插入是在每个任务的基础上处理的 - 每个任务负责将该任务的数据插入 ClickHouse - 因此,更多的分区/任务并不总是更好。

Kafka Connect 和 ClickHouse Cloud 之间的延迟

实例休眠

当服务不受查询影响时,ClickHouse Cloud 会将实例置于休眠状态(以节省您的费用)。这意味着,当插入和查询活动之间存在间隙时,实例可能会暂停。如果随后有大量新数据涌入,则当实例重新启动时,可能会造成延迟。

如果这种延迟不可接受,用户可以 调整服务的空闲超时以适应工作负载 - 增加它,以便较短时间的空闲不会导致空闲。

实例大小

更多数据将需要更多容量 - 虽然 ClickHouse 非常高效,但这句格言仍然适用。ClickHouse Cloud 通常会为您处理扩展 - 请参阅 我们的文档,了解更多详细信息。

异步与同步插入

默认情况下,我们的连接器通过同步插入发送数据 - 在继续之前等待确认。从数据丢失的角度来看,这通常更安全,但它确实会引入一定程度的延迟,因为每次插入都需要确认成功。

另一方面,可以将异步插入配置为仅提交数据并继续,而无需等待 - 潜在地提高吞吐量。如果潜在的消息丢失不是问题(例如,您正在进行用户分析,需要更通用的数据量而不是特定的条目),这可能是一种更快的方法。请注意,此方法意味着 更低的持久性保证,最多提供至少一次语义。

有关配置此内容的更多详细信息,请参阅我们的 文档

结论

在这篇文章中,我们描述了延迟,这可能是 Kafka 和 ClickHouse 架构中的一个问题。除了识别延迟的原因和减少延迟的方法外,我们首先建议了一种简单的方法来测量和可视化它。

我们希望这能帮助您踏上 ClickHouse 之旅 - 更多内容即将推出!

立即开始使用 ClickHouse Cloud 并获得 300 美元的信用额度。在 30 天试用期结束时,继续使用按需付费计划,或 联系我们 了解有关我们基于容量的折扣的更多信息。访问我们的 定价页面 以获取详细信息。

分享此帖子

订阅我们的新闻稿

随时了解功能发布、产品路线图、支持和云产品!
正在加载表单...
关注我们
Twitter imageSlack imageGitHub image
Telegram imageMeetup imageRss image