RabbitMQ 引擎
此引擎允许将 ClickHouse 与 RabbitMQ 集成。
RabbitMQ
允许您
- 发布或订阅数据流。
- 在数据流可用时处理它们。
创建表
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1],
name2 [type2],
...
) ENGINE = RabbitMQ SETTINGS
rabbitmq_host_port = 'host:port' [or rabbitmq_address = 'amqp(s)://guest:guest@localhost/vhost'],
rabbitmq_exchange_name = 'exchange_name',
rabbitmq_format = 'data_format'[,]
[rabbitmq_exchange_type = 'exchange_type',]
[rabbitmq_routing_key_list = 'key1,key2,...',]
[rabbitmq_secure = 0,]
[rabbitmq_schema = '',]
[rabbitmq_num_consumers = N,]
[rabbitmq_num_queues = N,]
[rabbitmq_queue_base = 'queue',]
[rabbitmq_deadletter_exchange = 'dl-exchange',]
[rabbitmq_persistent = 0,]
[rabbitmq_skip_broken_messages = N,]
[rabbitmq_max_block_size = N,]
[rabbitmq_flush_interval_ms = N,]
[rabbitmq_queue_settings_list = 'x-dead-letter-exchange=my-dlx,x-max-length=10,x-overflow=reject-publish',]
[rabbitmq_queue_consume = false,]
[rabbitmq_address = '',]
[rabbitmq_vhost = '/',]
[rabbitmq_username = '',]
[rabbitmq_password = '',]
[rabbitmq_commit_on_select = false,]
[rabbitmq_max_rows_per_message = 1,]
[rabbitmq_handle_error_mode = 'default']
必需参数
rabbitmq_host_port
– 主机:端口(例如,localhost:5672
)。rabbitmq_exchange_name
– RabbitMQ 交换机名称。rabbitmq_format
– 消息格式。使用与 SQLFORMAT
函数相同的表示法,例如JSONEachRow
。有关更多信息,请参阅格式部分。
可选参数
-
rabbitmq_exchange_type
– RabbitMQ 交换机的类型:direct
、fanout
、topic
、headers
、consistent_hash
。默认值:fanout
。 -
rabbitmq_routing_key_list
– 以逗号分隔的路由键列表。 -
rabbitmq_schema
– 如果格式需要模式定义,则必须使用的参数。例如,Cap’n Proto 需要模式文件的路径和根schema.capnp:Message
对象的名称。 -
rabbitmq_num_consumers
– 每个表的消费者数量。如果一个消费者的吞吐量不足,请指定更多消费者。默认值:1
-
rabbitmq_num_queues
– 队列总数。增加此数字可以显著提高性能。默认值:1
。 -
rabbitmq_queue_base
- 指定队列名称的提示。此设置的用例在下面描述。 -
rabbitmq_deadletter_exchange
- 指定 死信交换机 的名称。您可以创建另一个具有此交换机名称的表,并在消息重新发布到死信交换机时收集消息。默认情况下,未指定死信交换机。 -
rabbitmq_persistent
- 如果设置为 1(true),则在插入查询中,传递模式将设置为 2(将消息标记为“持久”)。默认值:0
。 -
rabbitmq_skip_broken_messages
– RabbitMQ 消息解析器对每个块中与模式不兼容的消息的容忍度。如果rabbitmq_skip_broken_messages = N
,则引擎跳过 N 条无法解析的 RabbitMQ 消息(一条消息等于一行数据)。默认值:0
。 -
rabbitmq_max_block_size
- 从 RabbitMQ 刷新数据之前收集的行数。默认值:max_insert_block_size。 -
rabbitmq_flush_interval_ms
- 从 RabbitMQ 刷新数据的超时时间。默认值:stream_flush_interval_ms。 -
rabbitmq_queue_settings_list
- 允许在创建队列时设置 RabbitMQ 设置。可用设置:x-max-length
、x-max-length-bytes
、x-message-ttl
、x-expires
、x-priority
、x-max-priority
、x-overflow
、x-dead-letter-exchange
、x-queue-type
。队列自动启用durable
设置。 -
rabbitmq_address
- 连接地址。使用此设置或rabbitmq_host_port
。 -
rabbitmq_vhost
- RabbitMQ vhost。默认值:'\'
。 -
rabbitmq_queue_consume
- 使用用户定义的队列,并且不进行任何 RabbitMQ 设置:声明交换机、队列、绑定。默认值:false
。 -
rabbitmq_username
- RabbitMQ 用户名。 -
rabbitmq_password
- RabbitMQ 密码。 -
reject_unhandled_messages
- 在发生错误时拒绝消息(发送 RabbitMQ 否定确认)。如果在rabbitmq_queue_settings_list
中定义了x-dead-letter-exchange
,则会自动启用此设置。 -
rabbitmq_commit_on_select
- 在进行 select 查询时提交消息。默认值:false
。 -
rabbitmq_max_rows_per_message
— 基于行的格式在一个 RabbitMQ 消息中写入的最大行数。默认值:1
。 -
rabbitmq_empty_queue_backoff_start
— 如果 rabbitmq 队列为空,则重新安排读取的起始退避点。 -
rabbitmq_empty_queue_backoff_end
— 如果 rabbitmq 队列为空,则重新安排读取的结束退避点。 -
rabbitmq_handle_error_mode
— 如何处理 RabbitMQ 引擎的错误。可能的值:default(如果我们无法解析消息,则会抛出异常)、stream(异常消息和原始消息将保存在虚拟列_error
和_raw_message
中)。- SSL 连接
在连接地址中使用 rabbitmq_secure = 1
或 amqps
:rabbitmq_address = 'amqps://guest:guest@localhost/vhost'
。所用库的默认行为是不检查创建的 TLS 连接是否足够安全。无论证书是否已过期、自签名、缺失或无效:都允许连接。将来可能会实现更严格的证书检查。
格式设置可以与 rabbitmq 相关的设置一起添加。
示例
CREATE TABLE queue (
key UInt64,
value UInt64,
date DateTime
) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
rabbitmq_exchange_name = 'exchange1',
rabbitmq_format = 'JSONEachRow',
rabbitmq_num_consumers = 5,
date_time_input_format = 'best_effort';
应使用 ClickHouse 配置文件添加 RabbitMQ 服务器配置。
必需配置
<rabbitmq>
<username>root</username>
<password>clickhouse</password>
</rabbitmq>
附加配置
<rabbitmq>
<vhost>clickhouse</vhost>
</rabbitmq>
描述
SELECT
对于读取消息不是特别有用(调试除外),因为每条消息只能读取一次。使用 物化视图 创建实时线程更实用。为此
- 使用引擎创建 RabbitMQ 消费者,并将其视为数据流。
- 创建一个具有所需结构的表。
- 创建一个物化视图,将来自引擎的数据转换为数据并将其放入先前创建的表中。
当 MATERIALIZED VIEW
加入引擎时,它开始在后台收集数据。这允许您不断地从 RabbitMQ 接收消息,并使用 SELECT
将它们转换为所需的格式。一个 RabbitMQ 表可以有任意多个物化视图。
数据可以基于 rabbitmq_exchange_type
和指定的 rabbitmq_routing_key_list
进行通道化。每个表最多可以有一个交换机。一个交换机可以在多个表之间共享 - 它允许路由到多个表同时进行。
交换机类型选项
direct
- 路由基于键的精确匹配。示例表键列表:key1,key2,key3,key4,key5
,消息键可以等于它们中的任何一个。fanout
- 路由到所有表(交换机名称相同),而与键无关。topic
- 路由基于带有圆点分隔键的模式。示例:*.logs
、records.*.*.2020
、*.2018,*.2019,*.2020
。headers
- 路由基于key=value
匹配,并带有设置x-match=all
或x-match=any
。示例表键列表:x-match=all,format=logs,type=report,year=2020
。consistent_hash
- 数据在所有绑定的表之间均匀分布(交换机名称相同)。请注意,必须使用 RabbitMQ 插件启用此交换机类型:rabbitmq-plugins enable rabbitmq_consistent_hash_exchange
。
设置 rabbitmq_queue_base
可用于以下情况
- 让不同的表共享队列,以便可以为相同的队列注册多个消费者,从而获得更好的性能。如果使用
rabbitmq_num_consumers
和/或rabbitmq_num_queues
设置,则在这些参数相同时实现队列的精确匹配。 - 能够在并非所有消息都成功消费时恢复从某些持久队列的读取。要从一个特定队列恢复消费,请在
rabbitmq_queue_base
设置中设置其名称,并且不要指定rabbitmq_num_consumers
和rabbitmq_num_queues
(默认为 1)。要从为特定表声明的所有队列恢复消费,只需指定相同的设置:rabbitmq_queue_base
、rabbitmq_num_consumers
、rabbitmq_num_queues
。默认情况下,队列名称对于表是唯一的。 - 重用队列,因为它们被声明为持久的且未自动删除。(可以通过任何 RabbitMQ CLI 工具删除。)
为了提高性能,接收到的消息被分组到大小为 max_insert_block_size 的块中。如果在 stream_flush_interval_ms 毫秒内未形成块,则无论块的完整性如何,数据都将被刷新到表中。
如果 rabbitmq_num_consumers
和/或 rabbitmq_num_queues
设置与 rabbitmq_exchange_type
一起指定,则
- 必须启用
rabbitmq-consistent-hash-exchange
插件。 - 必须指定已发布消息的
message_id
属性(每条消息/批次唯一)。
对于插入查询,有消息元数据,为每条发布的消息添加:messageID
和 republished
标志(如果发布多次则为 true) - 可以通过消息头访问。
不要将同一个表用于插入和物化视图。
示例
CREATE TABLE queue (
key UInt64,
value UInt64
) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
rabbitmq_exchange_name = 'exchange1',
rabbitmq_exchange_type = 'headers',
rabbitmq_routing_key_list = 'format=logs,type=report,year=2020',
rabbitmq_format = 'JSONEachRow',
rabbitmq_num_consumers = 5;
CREATE TABLE daily (key UInt64, value UInt64)
ENGINE = MergeTree() ORDER BY key;
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT key, value FROM queue;
SELECT key, value FROM daily ORDER BY key;
虚拟列
_exchange_name
- RabbitMQ 交换机名称。数据类型:String
。_channel_id
- ChannelID,在其上声明了接收消息的消费者。数据类型:String
。_delivery_tag
- 接收到的消息的 DeliveryTag。作用域为每个通道。数据类型:UInt64
。_redelivered
- 消息的redelivered
标志。数据类型:UInt8
。_message_id
- 接收到的消息的 messageID;如果设置了,则为非空,当消息发布时。数据类型:String
。_timestamp
- 接收到的消息的时间戳;如果设置了,则为非空,当消息发布时。数据类型:UInt64
。
当 kafka_handle_error_mode='stream'
时的附加虚拟列
_raw_message
- 无法成功解析的原始消息。数据类型:Nullable(String)
。_error
- 解析失败期间发生的异常消息。数据类型:Nullable(String)
。
注意:_raw_message
和 _error
虚拟列仅在解析期间发生异常时填充,当消息成功解析时,它们始终为 NULL
。
注意事项
即使您可以在表定义中指定 默认列表达式(例如 DEFAULT
、MATERIALIZED
、ALIAS
),这些表达式也会被忽略。相反,列将填充其类型各自的默认值。
数据格式支持
RabbitMQ 引擎支持 ClickHouse 中支持的所有格式。一个 RabbitMQ 消息中的行数取决于格式是基于行还是基于块
- 对于基于行的格式,一个 RabbitMQ 消息中的行数可以通过设置
rabbitmq_max_rows_per_message
来控制。 - 对于基于块的格式,我们无法将块分成更小的部分,但是一个块中的行数可以通过常规设置 max_block_size 来控制。