跳到主要内容
跳到主要内容

NATS 引擎

此引擎允许将 ClickHouse 与 NATS 集成。

NATS 允许您

  • 发布或订阅消息主题。
  • 在新消息可用时处理它们。

创建表

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = NATS SETTINGS
nats_url = 'host:port',
nats_subjects = 'subject1,subject2,...',
nats_format = 'data_format'[,]
[nats_schema = '',]
[nats_num_consumers = N,]
[nats_queue_group = 'group_name',]
[nats_secure = false,]
[nats_max_reconnect = N,]
[nats_reconnect_wait = N,]
[nats_server_list = 'host1:port1,host2:port2,...',]
[nats_skip_broken_messages = N,]
[nats_max_block_size = N,]
[nats_flush_interval_ms = N,]
[nats_username = 'user',]
[nats_password = 'password',]
[nats_token = 'clickhouse',]
[nats_credential_file = '/var/nats_credentials',]
[nats_startup_connect_tries = '5']
[nats_max_rows_per_message = 1,]
[nats_handle_error_mode = 'default']

必需参数

  • nats_url – 主机:端口(例如,localhost:5672)。
  • nats_subjects – NATS 表订阅/发布的主题列表。支持通配符主题,如 foo.*.barbaz.>
  • nats_format – 消息格式。使用与 SQL FORMAT 函数相同的表示法,例如 JSONEachRow。有关更多信息,请参阅格式部分。

可选参数

  • nats_schema – 如果格式需要模式定义,则必须使用的参数。例如,Cap’n Proto 需要模式文件的路径和根 schema.capnp:Message 对象的名称。
  • nats_num_consumers – 每个表的消费者数量。默认值:1。如果一个消费者的吞吐量不足,请指定更多消费者。
  • nats_queue_group – NATS 订阅者的队列组名称。默认为表名。
  • nats_max_reconnect – 每次尝试连接到 NATS 的最大重连尝试次数。默认值:5
  • nats_reconnect_wait – 每次重连尝试之间休眠的时间量(毫秒)。默认值:5000
  • nats_server_list - 连接的服务器列表。可以指定连接到 NATS 集群。
  • nats_skip_broken_messages - NATS 消息解析器对每个块中与模式不兼容的消息的容忍度。默认值:0。如果 nats_skip_broken_messages = N,则引擎跳过 N 个无法解析的 NATS 消息(一条消息等于一行数据)。
  • nats_max_block_size - 轮询收集的行数,用于从 NATS 刷新数据。默认值:max_insert_block_size
  • nats_flush_interval_ms - 从 NATS 读取的数据刷新的超时时间。默认值:stream_flush_interval_ms
  • nats_username - NATS 用户名。
  • nats_password - NATS 密码。
  • nats_token - NATS 身份验证令牌。
  • nats_credential_file - NATS 凭据文件的路径。
  • nats_startup_connect_tries - 启动时的连接尝试次数。默认值:5
  • nats_max_rows_per_message — 对于基于行的格式,在一条 NATS 消息中写入的最大行数。(默认值:1)。
  • nats_handle_error_mode — 如何处理 NATS 引擎的错误。可能的值:default(如果解析消息失败,将抛出异常),stream(异常消息和原始消息将保存在虚拟列 _error_raw_message 中)。

SSL 连接

对于安全连接,请使用 nats_secure = 1。所用库的默认行为是不检查创建的 TLS 连接是否足够安全。无论证书是否过期、自签名、丢失或无效:连接都被简单地允许。将来可能会实现更严格的证书检查。

写入 NATS 表

如果表仅从一个主题读取数据,则任何插入都将发布到同一主题。但是,如果表从多个主题读取数据,我们需要指定要发布到哪个主题。这就是为什么每当插入具有多个主题的表时,需要设置 stream_like_engine_insert_queue。您可以选择表从中读取的主题之一并在那里发布您的数据。例如

  CREATE TABLE queue (
key UInt64,
value UInt64
) ENGINE = NATS
SETTINGS nats_url = 'localhost:4444',
nats_subjects = 'subject1,subject2',
nats_format = 'JSONEachRow';

INSERT INTO queue
SETTINGS stream_like_engine_insert_queue = 'subject2'
VALUES (1, 1);

格式设置也可以与 nats 相关设置一起添加。

示例

  CREATE TABLE queue (
key UInt64,
value UInt64,
date DateTime
) ENGINE = NATS
SETTINGS nats_url = 'localhost:4444',
nats_subjects = 'subject1',
nats_format = 'JSONEachRow',
date_time_input_format = 'best_effort';

可以使用 ClickHouse 配置文件添加 NATS 服务器配置。更具体地说,您可以为 NATS 引擎添加 Redis 密码

<nats>
<user>click</user>
<password>house</password>
<token>clickhouse</token>
</nats>

描述

SELECT 对于读取消息(调试除外)不是特别有用,因为每条消息只能读取一次。使用物化视图创建实时线程更实用。为此

  1. 使用引擎创建 NATS 消费者,并将其视为数据流。
  2. 创建具有所需结构的表。
  3. 创建一个物化视图,将来自引擎的数据转换为数据并放入先前创建的表中。

MATERIALIZED VIEW 加入引擎时,它开始在后台收集数据。这允许您持续从 NATS 接收消息,并使用 SELECT 将它们转换为所需的格式。一个 NATS 表可以拥有任意数量的物化视图,它们不直接从表中读取数据,而是接收新记录(以块为单位),这样您就可以将数据写入具有不同详细级别的多个表(带分组 - 聚合和不带分组)。

示例

  CREATE TABLE queue (
key UInt64,
value UInt64
) ENGINE = NATS
SETTINGS nats_url = 'localhost:4444',
nats_subjects = 'subject1',
nats_format = 'JSONEachRow',
date_time_input_format = 'best_effort';

CREATE TABLE daily (key UInt64, value UInt64)
ENGINE = MergeTree() ORDER BY key;

CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT key, value FROM queue;

SELECT key, value FROM daily ORDER BY key;

要停止接收流数据或更改转换逻辑,请分离物化视图

  DETACH TABLE consumer;
ATTACH TABLE consumer;

如果您想通过使用 ALTER 更改目标表,我们建议禁用物化视图,以避免目标表和视图中的数据之间存在差异。

虚拟列

  • _subject - NATS 消息主题。数据类型:String

nats_handle_error_mode='stream' 时的附加虚拟列

  • _raw_message - 无法成功解析的原始消息。数据类型:Nullable(String)
  • _error - 解析失败期间发生的异常消息。数据类型:Nullable(String)

注意:_raw_message_error 虚拟列仅在解析期间发生异常时填充,当消息成功解析时,它们始终为 NULL

数据格式支持

NATS 引擎支持 ClickHouse 中支持的所有格式。一条 NATS 消息中的行数取决于格式是基于行还是基于块

  • 对于基于行的格式,一条 NATS 消息中的行数可以通过设置 nats_max_rows_per_message 来控制。
  • 对于基于块的格式,我们无法将块分成更小的部分,但一个块中的行数可以通过常规设置 max_block_size 来控制。