引言
ClickHouse 博客上的许多主题都是由社区通过我们的 公共 Slack 频道等媒介互动而发起的。随着我们最近宣布 官方 Kafka Connector 及其在 Confluent Cloud 中的支持,我们收到了一些人的询问,询问在处理 Kafka 并将数据传输到 ClickHouse Cloud 时如何衡量和缓解延迟。这是一个值得讨论的好话题,我们在一篇博文中整理了我们的回应 - 让我们来讨论一下!
Kafka Connector 是如何工作的?
首先,让我们提醒大家 Kafka 和 ClickHouse 之间的连接是如何工作的(以及我们的连接器在其中扮演的角色)。一个示例数据流可能如下所示
本质上,生产者将数据提交到主题 (A),然后“Kafka Connect”通过轮询 (B) 获取并发送这些主题,再由连接器将记录推送到 ClickHouse (C)。在上面的图表中,A 与 B 分离,B 与 C 分离 - 这三个都是不同的系统,它们之间有网络连接。
当然,实现方式可能有所不同,但我们现在将使用最广泛的设置,以便涵盖大多数情况 - 如果您的设置中没有提及某个组件,请随意忽略该部分。
什么是延迟?
这一切可能显而易见,但有必要重复一遍:延迟(在此上下文中)是数据从 A 点传输到 B 点所需的时间。它由多种因素引起,最常见的因素是:硬件、软件和网络。
硬件因素通常是导致延迟的物理限制 - VM 大小、处理器速度等。我们将其留给读者自行解决,并专注于连接器本身可控制的变量。
软件因素通常更多的是设计限制而不是物理问题 - 线程数、数据结构、配置设置等。它们也很常见,也是本指南的大部分内容将重点关注的内容。
网络因素通常是节点之间的物理网络、虚拟机之间的虚拟连接、客户端到服务器的距离等。
何时延迟会成为问题?
在我们深入探讨延迟的方法之前,简要讨论一下何时和为什么很重要:“我为什么要关心延迟?延迟何时会成为问题?”
您是您的用例专家,但我们要提醒您,不要对延迟设置不切实际的期望 - 很容易过度关注延迟的微小差异,这些差异不太可能对业务产生重大影响,以至于即使是一两秒的偏差也会导致警报。请记住,在网络分布式环境中,总会存在一定程度的延迟 - 任务更多的是在可能和必要时缓解/稳定延迟,而不是完全消除延迟。理想情况下,您能够确定什么是可接受的延迟,并确保将其维持在定义的范围内 - 可能使用 SLA。
同样,一个用例可能比另一个用例容忍更高的延迟 - 如果您每周对数据进行一次分析,那么它可能比实时分析允许更高的延迟。即使对于实时分析,什么是“实时”也是您必须决定的事情 - 同样,您是您的用例专家;只需记住,将存在硬件/软件/网络限制。
如何衡量延迟?
为了确定您的 Kafka 管道中是否存在延迟问题,您需要能够衡量它。
最简单的方法是在 Kafka 管道的各个点将时间戳值附加到消息中。这些值稍后可以进行比较和可视化,以指示可能导致延迟的步骤。
我们看到实现此目的1的一种非常有效的方法是使用 单消息转换器
1 有多种选项可用,例如 JMX,但我们发现 SMT 足够好 - YMMV
此转换采用多个开箱即用的字段(Offset、Partition、Timestamp 和 Topic)并将它们添加到对象中,以便连接器传递到 ClickHouse,而无需更改消息生产者。可以将此转换配置为在各个步骤运行,从而允许用户将延迟归因于处理管道的特定部分。
上面的转换使我们能够识别 Kafka 管道不同阶段的计时。我们还可以记录消息插入 ClickHouse 作为行的确切时间点。这可以使用 DEFAULT 表达式通过插入时间戳来实现,然后可以将其与转换添加的早期计时进行比较
CREATE TABLE data_testing
(
`raw` String,
`generationTimestamp` DateTime64(3),
`insertTime` DateTime64(3) DEFAULT now()
)
ENGINE = MergeTree
ORDER BY insertTime
请注意,我们正在使用“DEFAULT now()”,以便时间由 ClickHouse 生成,而不是消息本身中存在的时间 - 这将为我们提供插入时间。
如果您的消息具有生成时间戳,您还可以跟踪以下时间间隔:消息生成 <-> Kafka 插入 <-> ClickHouse 插入。
如何可视化延迟?
可视化延迟的一种常用方法是使用 ClickHouse SQL 控制台(在 ClickHouse Cloud 上)提供的内置表
此图表的数据是使用相当简单的查询创建的,用于计算中位数、第 75 百分位数和第 99 百分位数
SELECT formatDateTime(insertTime, '%F %k:%i') as byMinutes,
median(dateDiff('second', generationTimestamp, insertTime)) AS median,
median(.75)(dateDiff('second', generationTimestamp, insertTime)) AS p_75,
median(.95)(dateDiff('second', generationTimestamp, insertTime)) AS p_95
FROM default.kafka_data
WHERE insertTime > now() - interval 60 MINUTE
GROUP BY byMinutes
ORDER BY byMinutes ASCENDING
InsertTime 由默认列提供(前面提到使用“now()”),而 generationTimestamp
是我们在生成数据时设置的列 - 您也可以轻松地使用诸如 kafkaField
之类的东西(前面提到的是由转换创建的)。
注意:示例图表中的数据显示延迟时间缓慢增加,仅看到这一点就足以表明可能存在问题 - 可视化的力量!
如何处理延迟?
我们将介绍几种解决延迟的方法,但请记住,此列表并非详尽无遗 - 您可能会发现其他方法效果更好。如果您找到了,请告诉我们 - 信息越多,我们就越能帮助其他人!
延迟可能发生在系统的多个部分。我们在下面介绍了每个阶段的潜在原因。
Kafka 和 Kafka Connect 之间的延迟
获取和批处理大小
记录由 Kafka Connect 独立于连接器获取,您可以通过设置 consumer.fetch.min.bytes 和 consumer.fetch.max.bytes 来调整此设置。您可以想象,这里存在一个平衡 - 理想情况下,您获取的记录多于批处理大小,以便您始终有足够的记录来填充一个批处理。您甚至可以通过设置 consumer.fetch.max.wait.ms 来调整获取发生的频率 - 这可能需要更长的时间,但如果您发现您不经常生成消息,则获取更有可能达到最大大小(从而优化需要建立的连接数)。
同样,在另一侧,连接器接收的批处理由轮询控制(默认值为 500)
consumer.max.poll.records=[NUMBER OF RECORDS]
consumer.max.partition.fetch.bytes=[NUMBER OF RECORDS * RECORD SIZE IN BYTES]
不同的 Kafka Connect 主机将有不同的处理调整方式 - 例如,有关其首选方法,请参阅 Confluent 的文档。
分区数
主题分区的数量会影响插入量和批处理大小(您可以想象,这会增加网络聊天)。一百条消息分布在一百个分区中,将比单个分区产生更大的影响,尤其是基于运行的任务/工作进程数量。由于上述轮询和获取过程是按任务执行的(见下文),因此更多分区可能会增加填充批处理的时间(如果遵循下面每个分区一个任务的建议) - 可能会增加延迟。因此,我们通常建议倾向于较少的分区(在合理范围内)而不是更多,但没有针对此大小调整的固定规则。
我们通常建议一次插入至少 1,000 行的大批数据,理想情况下在 10,000 到 100,000 行之间(有关更多信息,请参阅我们的 最佳实践)。
任务数
在 Kafka Connect 中,任务是实际复制数据的工作进程 - 我们看到建议您至少拥有与分区一样多的任务,以确保每个“通道”正常流动。
如果您想更深入地了解任务,请查看关于该主题的 Confluent 文档。
零件过多
我们遇到的一些问题是“零件过多”错误 - ClickHouse 针对大数据量(正确批处理)进行了优化,而不是频繁的小型插入(经典的“聊天”与“批处理”网络辩论)。插入是按任务处理的 - 每个任务负责将该任务的数据插入到 ClickHouse 中 - 因此,更多的分区/任务并不总是更好。
Kafka Connect 和 ClickHouse Cloud 之间的延迟
实例休眠
当服务不接受查询时,ClickHouse Cloud 会将实例置于休眠状态(以节省您的积分)。这意味着当插入和查询活动中存在间隙时,实例可能会暂停。如果出现这种情况,反过来,如果涌入新数据,则可能会在实例重新启动时造成延迟。
如果这种延迟是不可接受的,用户可以 调整服务的空闲超时 以适应工作负载 - 增加超时时间,以便较短的非活动期间不会导致空闲。
实例大小调整
更多的数据将需要更多的容量 - 尽管 ClickHouse 非常高效,但这一普遍格言仍然适用。ClickHouse Cloud 通常会为您处理扩展 - 有关更多详细信息,请参阅我们的 文档。
异步与同步插入
默认情况下,我们的连接器通过同步插入发送数据 - 等待确认后再继续。从数据丢失的角度来看,这通常更安全,但这确实会引入一定程度的延迟,因为每次插入都需要成功确认。
另一方面,异步插入可以配置为仅提交数据并继续,而无需等待 - 可能会提高吞吐量。如果潜在的消息丢失不是问题(例如,您正在进行消费者分析,并且需要更一般的数据量而不是特定条目),这可能是一种更快的方法。请注意,此方法意味着 较低的持久性保证,最多只能提供至少一次语义。
有关配置此功能的更多详细信息,请参阅我们的 文档。
结论
在本文中,我们描述了延迟,这可能是 Kafka 和 ClickHouse 架构中的一个问题。除了确定延迟的原因和最小化延迟的方法外,我们首先建议了一种衡量和可视化延迟的简单方法。
我们希望这能帮助您踏上 ClickHouse 之旅 - 更多精彩内容即将推出!