跳至主要内容

RabbitMQ 引擎

此引擎允许将 ClickHouse 与 RabbitMQ 集成。

RabbitMQ 允许您

  • 发布或订阅数据流。
  • 在数据可用时处理流。

创建表

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1],
name2 [type2],
...
) ENGINE = RabbitMQ SETTINGS
rabbitmq_host_port = 'host:port' [or rabbitmq_address = 'amqp(s)://guest:guest@localhost/vhost'],
rabbitmq_exchange_name = 'exchange_name',
rabbitmq_format = 'data_format'[,]
[rabbitmq_exchange_type = 'exchange_type',]
[rabbitmq_routing_key_list = 'key1,key2,...',]
[rabbitmq_secure = 0,]
[rabbitmq_schema = '',]
[rabbitmq_num_consumers = N,]
[rabbitmq_num_queues = N,]
[rabbitmq_queue_base = 'queue',]
[rabbitmq_deadletter_exchange = 'dl-exchange',]
[rabbitmq_persistent = 0,]
[rabbitmq_skip_broken_messages = N,]
[rabbitmq_max_block_size = N,]
[rabbitmq_flush_interval_ms = N,]
[rabbitmq_queue_settings_list = 'x-dead-letter-exchange=my-dlx,x-max-length=10,x-overflow=reject-publish',]
[rabbitmq_queue_consume = false,]
[rabbitmq_address = '',]
[rabbitmq_vhost = '/',]
[rabbitmq_username = '',]
[rabbitmq_password = '',]
[rabbitmq_commit_on_select = false,]
[rabbitmq_max_rows_per_message = 1,]
[rabbitmq_handle_error_mode = 'default']

必需参数

  • rabbitmq_host_port – 主机:端口(例如,localhost:5672)。
  • rabbitmq_exchange_name – RabbitMQ 交换机名称。
  • rabbitmq_format – 消息格式。使用与 SQL FORMAT 函数相同的表示法,例如 JSONEachRow。有关更多信息,请参见 格式 部分。

可选参数

  • rabbitmq_exchange_type – RabbitMQ 交换机类型:directfanouttopicheadersconsistent_hash。默认值:fanout
  • rabbitmq_routing_key_list – 路由键的逗号分隔列表。
  • rabbitmq_schema – 如果格式需要模式定义,则必须使用此参数。例如,Cap’n Proto 需要模式文件路径和根 schema.capnp:Message 对象的名称。
  • rabbitmq_num_consumers – 每个表的消费者数量。如果一个消费者的吞吐量不足,请指定更多消费者。默认值:1
  • rabbitmq_num_queues – 队列总数。增加此数字可以显着提高性能。默认值:1
  • rabbitmq_queue_base - 为队列名称指定提示。此设置的使用案例如下所述。
  • rabbitmq_deadletter_exchange - 为 死信交换机 指定名称。您可以使用此交换机名称创建另一个表,并在消息重新发布到死信交换机时收集这些消息。默认情况下,未指定死信交换机。
  • rabbitmq_persistent - 如果设置为 1(true),则在插入查询中,传递模式将设置为 2(将消息标记为“持久性”)。默认值:0
  • rabbitmq_skip_broken_messages – RabbitMQ 消息解析器对每个块中与模式不兼容的消息的容忍度。如果 rabbitmq_skip_broken_messages = N,则引擎会跳过无法解析的 N 个 RabbitMQ 消息(一条消息等于一行数据)。默认值:0
  • rabbitmq_max_block_size - 从 RabbitMQ 刷新数据之前收集的行数。默认值:max_insert_block_size
  • rabbitmq_flush_interval_ms - 从 RabbitMQ 刷新数据的超时时间。默认值:stream_flush_interval_ms
  • rabbitmq_queue_settings_list - 允许在创建队列时设置 RabbitMQ 设置。可用设置:x-max-lengthx-max-length-bytesx-message-ttlx-expiresx-priorityx-max-priorityx-overflowx-dead-letter-exchangex-queue-type。队列的 durable 设置会自动启用。
  • rabbitmq_address - 连接地址。使用此设置或 rabbitmq_host_port
  • rabbitmq_vhost - RabbitMQ 虚拟主机。默认值:'\'
  • rabbitmq_queue_consume - 使用用户定义的队列,并且不进行任何 RabbitMQ 设置:声明交换机、队列、绑定。默认值:false
  • rabbitmq_username - RabbitMQ 用户名。
  • rabbitmq_password - RabbitMQ 密码。
  • reject_unhandled_messages - 如果发生错误,则拒绝消息(发送 RabbitMQ 负确认)。如果 rabbitmq_queue_settings_list 中定义了 x-dead-letter-exchange,则会自动启用此设置。
  • rabbitmq_commit_on_select - 在执行 select 查询时提交消息。默认值:false
  • rabbitmq_max_rows_per_message — 基于行的格式在一个 RabbitMQ 消息中写入的最大行数。默认值:1
  • rabbitmq_empty_queue_backoff_start — 如果 rabbitmq 队列为空,则重新安排读取的起始回退点。
  • rabbitmq_empty_queue_backoff_end — 如果 rabbitmq 队列为空,则重新安排读取的结束回退点。
  • rabbitmq_handle_error_mode — 如何处理 RabbitMQ 引擎的错误。可能的值:default(如果我们无法解析消息,则会抛出异常)、stream(异常消息和原始消息将保存在虚拟列 _error_raw_message 中)。
  • SSL 连接

使用 rabbitmq_secure = 1 或连接地址中的 amqpsrabbitmq_address = 'amqps://guest:guest@localhost/vhost'。所用库的默认行为是不检查创建的 TLS 连接是否足够安全。无论证书是否过期、自签名、丢失或无效:连接都简单地被允许。将来可能会实现对证书的更严格检查。

此外,可以与 rabbitmq 相关的设置一起添加格式设置。

示例

  CREATE TABLE queue (
key UInt64,
value UInt64,
date DateTime
) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
rabbitmq_exchange_name = 'exchange1',
rabbitmq_format = 'JSONEachRow',
rabbitmq_num_consumers = 5,
date_time_input_format = 'best_effort';

RabbitMQ 服务器配置应使用 ClickHouse 配置文件添加。

必需配置

 <rabbitmq>
<username>root</username>
<password>clickhouse</password>
</rabbitmq>

附加配置

 <rabbitmq>
<vhost>clickhouse</vhost>
</rabbitmq>

描述

SELECT 对读取消息(调试除外)不是特别有用,因为每条消息只能读取一次。使用 物化视图 创建实时线程更实用。为此

  1. 使用该引擎创建 RabbitMQ 消费者并将其视为数据流。
  2. 使用所需的结构创建表。
  3. 创建一个物化视图,将数据从引擎转换为先前创建的表。

MATERIALIZED VIEW 加入引擎时,它会开始在后台收集数据。这允许您持续接收来自 RabbitMQ 的消息,并使用 SELECT 将其转换为所需的格式。一个 RabbitMQ 表可以拥有任意数量的物化视图。

数据可以根据 rabbitmq_exchange_type 和指定的 rabbitmq_routing_key_list 进行通道化。每个表最多只能有一个交换机。一个交换机可以在多个表之间共享 - 它允许同时路由到多个表。

交换机类型选项

  • direct - 路由基于键的精确匹配。例如表键列表:key1,key2,key3,key4,key5,消息键可以等于其中的任何一个。
  • fanout - 无论键如何,都路由到所有表(交换机名称相同的表)。
  • topic - 路由基于具有点分隔键的模式。例如:*.logsrecords.*.*.2020*.2018,*.2019,*.2020
  • headers - 路由基于具有设置 x-match=allx-match=anykey=value 匹配。例如表键列表:x-match=all,format=logs,type=report,year=2020
  • consistent_hash - 数据在所有绑定表(交换机名称相同的表)之间均匀分布。请注意,此交换机类型必须使用 RabbitMQ 插件启用:rabbitmq-plugins enable rabbitmq_consistent_hash_exchange

rabbitmq_queue_base 设置可用于以下情况

  • 允许不同的表共享队列,以便可以为同一队列注册多个消费者,从而提高性能。如果使用 rabbitmq_num_consumers 和/或 rabbitmq_num_queues 设置,则在这些参数相同的情况下,可以实现队列的精确匹配。
  • 能够在并非所有消息都成功使用时从某些持久队列恢复读取。要从一个特定队列恢复使用 - 在 rabbitmq_queue_base 设置中设置其名称,并且不要指定 rabbitmq_num_consumersrabbitmq_num_queues(默认为 1)。要从为特定表声明的所有队列恢复使用 - 只需指定相同的设置:rabbitmq_queue_baserabbitmq_num_consumersrabbitmq_num_queues。默认情况下,队列名称对于表是唯一的。
  • 重用队列,因为它们是持久化的并且不会自动删除。(可以通过任何 RabbitMQ CLI 工具删除。)

为了提高性能,接收到的消息将分组为大小为 max_insert_block_size 的块。如果在 stream_flush_interval_ms 毫秒内未形成块,则无论块是否完整,数据都将刷新到表中。

如果 rabbitmq_num_consumers 和/或 rabbitmq_num_queues 设置与 rabbitmq_exchange_type 一起指定,则

  • 必须启用 rabbitmq-consistent-hash-exchange 插件。

  • 发布的消息必须指定message_id属性(每条消息/批次必须唯一)。

对于插入查询,存在消息元数据,每个发布的消息都会添加:messageIDrepublished标志(如果发布多次则为true) - 可以通过消息头访问。

不要对同一张表进行插入和物化视图操作。

示例

  CREATE TABLE queue (
key UInt64,
value UInt64
) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
rabbitmq_exchange_name = 'exchange1',
rabbitmq_exchange_type = 'headers',
rabbitmq_routing_key_list = 'format=logs,type=report,year=2020',
rabbitmq_format = 'JSONEachRow',
rabbitmq_num_consumers = 5;

CREATE TABLE daily (key UInt64, value UInt64)
ENGINE = MergeTree() ORDER BY key;

CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT key, value FROM queue;

SELECT key, value FROM daily ORDER BY key;

虚拟列

  • _exchange_name - RabbitMQ 交换机名称。数据类型:String
  • _channel_id - ChannelID,接收消息的消费者声明所在的通道。数据类型:String
  • _delivery_tag - 接收消息的 DeliveryTag。作用域为每个通道。数据类型:UInt64
  • _redelivered - 消息的redelivered标志。数据类型:UInt8
  • _message_id - 接收消息的 messageID;如果在发布消息时设置,则不为空。数据类型:String
  • _timestamp - 接收消息的时间戳;如果在发布消息时设置,则不为空。数据类型:UInt64

kafka_handle_error_mode='stream'时,额外的虚拟列

  • _raw_message - 无法成功解析的原始消息。数据类型:Nullable(String)
  • _error - 解析失败期间发生的异常消息。数据类型:Nullable(String)

注意:只有在解析期间发生异常的情况下,才会填充_raw_message_error虚拟列,当消息成功解析时,它们始终为NULL

注意事项

即使您可以在表定义中指定默认列表达式(例如DEFAULTMATERIALIZEDALIAS),这些表达式也会被忽略。相反,列将使用其类型的相应默认值填充。

数据格式支持

RabbitMQ 引擎支持 ClickHouse 支持的所有格式。一条 RabbitMQ 消息中的行数取决于格式是基于行的还是基于块的。

  • 对于基于行的格式,可以通过设置rabbitmq_max_rows_per_message来控制一条 RabbitMQ 消息中的行数。
  • 对于基于块的格式,我们无法将块分成更小的部分,但可以通过通用设置max_block_size来控制一个块中的行数。