NATS 引擎
此引擎允许将 ClickHouse 与 NATS 集成。
NATS
允许您
- 发布或订阅消息主题。
- 处理新消息,这些消息在可用时会立即提供。
创建表格
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = NATS SETTINGS
nats_url = 'host:port',
nats_subjects = 'subject1,subject2,...',
nats_format = 'data_format'[,]
[nats_schema = '',]
[nats_num_consumers = N,]
[nats_queue_group = 'group_name',]
[nats_secure = false,]
[nats_max_reconnect = N,]
[nats_reconnect_wait = N,]
[nats_server_list = 'host1:port1,host2:port2,...',]
[nats_skip_broken_messages = N,]
[nats_max_block_size = N,]
[nats_flush_interval_ms = N,]
[nats_username = 'user',]
[nats_password = 'password',]
[nats_token = 'clickhouse',]
[nats_credential_file = '/var/nats_credentials',]
[nats_startup_connect_tries = '5']
[nats_max_rows_per_message = 1,]
[nats_handle_error_mode = 'default']
所需参数
nats_url
– 主机:端口(例如,localhost:5672
)。nats_subjects
– NATS 表要订阅/发布的主题列表。支持通配符主题,如foo.*.bar
或baz.>
nats_format
– 消息格式。使用与 SQLFORMAT
函数相同的表示法,例如JSONEachRow
。有关更多信息,请参阅 格式 部分。
可选参数
nats_schema
– 如果格式需要模式定义,则必须使用此参数。例如,Cap’n Proto 需要模式文件路径和根schema.capnp:Message
对象的名称。nats_num_consumers
– 每个表的消费者数量。默认值:1
。如果一个消费者的吞吐量不足,请指定更多消费者。nats_queue_group
– NATS 订阅者的队列组名称。默认值为表名。nats_max_reconnect
– 每次尝试连接到 NATS 的最大重连尝试次数。默认值:5
。nats_reconnect_wait
– 每次重连尝试之间休眠的时间量(以毫秒为单位)。默认值:5000
。nats_server_list
- 用于连接的服务器列表。可以指定以连接到 NATS 集群。nats_skip_broken_messages
- NATS 消息解析器对每个块的与模式不兼容的消息的容忍度。默认值:0
。如果nats_skip_broken_messages = N
,则引擎会跳过N 个无法解析的 NATS 消息(一条消息等于一行数据)。nats_max_block_size
- 从 NATS 刷新的数据通过轮询(poll)收集的行数。默认值:max_insert_block_size。nats_flush_interval_ms
- 从 NATS 读取数据的刷新超时时间。默认值:stream_flush_interval_ms。nats_username
- NATS 用户名。nats_password
- NATS 密码。nats_token
- NATS 身份验证令牌。nats_credential_file
- NATS 凭据文件的路径。nats_startup_connect_tries
- 启动时连接的尝试次数。默认值:5
。nats_max_rows_per_message
— 基于行的格式中,写入一条 NATS 消息的最大行数。(默认值:1
)。nats_handle_error_mode
— 如何处理 NATS 引擎的错误。可能的取值:default(如果我们无法解析消息,则会抛出异常)、stream(异常消息和原始消息将保存在虚拟列_error
和_raw_message
中)。
SSL 连接
对于安全连接,请使用 nats_secure = 1
。所用库的默认行为是不检查创建的 TLS 连接是否足够安全。证书是否过期、自签名、丢失或无效:连接只是被允许。将来可能实现更严格的证书检查。
写入 NATS 表
如果表仅从一个主题读取,则任何插入都会发布到同一个主题。但是,如果表从多个主题读取,则需要指定要发布到的主题。这就是为什么在将数据插入到具有多个主题的表时,需要设置 stream_like_engine_insert_queue
的原因。您可以选择表读取的主题之一,并在那里发布您的数据。例如
CREATE TABLE queue (
key UInt64,
value UInt64
) ENGINE = NATS
SETTINGS nats_url = 'localhost:4444',
nats_subjects = 'subject1,subject2',
nats_format = 'JSONEachRow';
INSERT INTO queue
SETTINGS stream_like_engine_insert_queue = 'subject2'
VALUES (1, 1);
格式设置也可以与 nats 相关的设置一起添加。
示例
CREATE TABLE queue (
key UInt64,
value UInt64,
date DateTime
) ENGINE = NATS
SETTINGS nats_url = 'localhost:4444',
nats_subjects = 'subject1',
nats_format = 'JSONEachRow',
date_time_input_format = 'best_effort';
NATS 服务器配置可以使用 ClickHouse 配置文件添加。更具体地说,您可以为 NATS 引擎添加 Redis 密码
<nats>
<user>click</user>
<password>house</password>
<token>clickhouse</token>
</nats>
描述
SELECT
对于读取消息(除了调试外)并不是特别有用,因为每条消息只能读取一次。使用 物化视图 创建实时线程更实用。为此
- 使用此引擎创建 NATS 消费者并将其视为数据流。
- 创建一个具有所需结构的表。
- 创建一个物化视图,它将数据从引擎转换为先前创建的表。
当 MATERIALIZED VIEW
连接到引擎时,它会开始在后台收集数据。这使您可以持续地从 NATS 接收消息,并使用 SELECT
将它们转换为所需的格式。一个 NATS 表可以拥有任意多的物化视图,它们不会直接从表中读取数据,而是接收新记录(以块为单位),这样您就可以写入多个具有不同详细程度的表(通过分组 - 聚合和不分组)。
示例
CREATE TABLE queue (
key UInt64,
value UInt64
) ENGINE = NATS
SETTINGS nats_url = 'localhost:4444',
nats_subjects = 'subject1',
nats_format = 'JSONEachRow',
date_time_input_format = 'best_effort';
CREATE TABLE daily (key UInt64, value UInt64)
ENGINE = MergeTree() ORDER BY key;
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT key, value FROM queue;
SELECT key, value FROM daily ORDER BY key;
要停止接收流数据或更改转换逻辑,请分离物化视图
DETACH TABLE consumer;
ATTACH TABLE consumer;
如果要使用 ALTER
更改目标表,建议您禁用物化视图,以避免目标表与视图中的数据之间出现差异。
虚拟列
_subject
- NATS 消息主题。数据类型:String
。
当 nats_handle_error_mode='stream'
时,额外的虚拟列
_raw_message
- 无法成功解析的原始消息。数据类型:Nullable(String)
。_error
- 发生在解析失败过程中的异常消息。数据类型:Nullable(String)
。
注意:_raw_message
和 _error
虚拟列仅在解析期间出现异常时才填充,在成功解析消息时始终为 NULL
。
数据格式支持
NATS 引擎支持 ClickHouse 中支持的所有 格式。一条 NATS 消息中的行数取决于格式是基于行的还是基于块的
- 对于基于行的格式,可以通过设置
nats_max_rows_per_message
来控制一条 NATS 消息中的行数。 - 对于基于块的格式,我们无法将块划分为更小的部分,但可以通过一般设置 max_block_size 来控制一个块中的行数。