S3Queue 表引擎
此引擎提供与 Amazon S3 生态系统的集成,并允许流式导入。此引擎类似于 Kafka、RabbitMQ 引擎,但提供了 S3 特有的功能。
创建表
CREATE TABLE s3_queue_engine_table (name String, value UInt32)
ENGINE = S3Queue(path, [NOSIGN, | aws_access_key_id, aws_secret_access_key,] format, [compression], [headers])
[SETTINGS]
[mode = '',]
[after_processing = 'keep',]
[keeper_path = '',]
[loading_retries = 0,]
[processing_threads_num = 1,]
[enable_logging_to_s3queue_log = 0,]
[polling_min_timeout_ms = 1000,]
[polling_max_timeout_ms = 10000,]
[polling_backoff_ms = 0,]
[tracked_file_ttl_sec = 0,]
[tracked_files_limit = 1000,]
[cleanup_interval_min_ms = 10000,]
[cleanup_interval_max_ms = 30000,]
在 24.7
之前的版本中,除了 mode
、after_processing
和 keeper_path
之外,所有设置都需要使用 s3queue_
前缀。
引擎参数
S3Queue
参数与 S3
表引擎支持的参数相同。请参阅此处的参数部分。
示例
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
mode = 'unordered';
使用命名集合
<clickhouse>
<named_collections>
<s3queue_conf>
<url>'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*</url>
<access_key_id>test<access_key_id>
<secret_access_key>test</secret_access_key>
</s3queue_conf>
</named_collections>
</clickhouse>
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue(s3queue_conf, format = 'CSV', compression_method = 'gzip')
SETTINGS
mode = 'ordered';
设置
要获取为表配置的设置列表,请使用 system.s3_queue_settings
表。从 24.10
版本开始可用。
mode
可能的值
- unordered — 在无序模式下,所有已处理文件的集合通过 ZooKeeper 中的持久节点进行跟踪。
- ordered — 在有序模式下,文件按字典顺序处理。这意味着如果在某个时候处理了名为 'BBB' 的文件,之后又向存储桶添加了名为 'AA' 的文件,则该文件将被忽略。只有成功消费的文件的最大名称(按字典顺序)以及在加载尝试失败后将重试的文件名称存储在 ZooKeeper 中。
默认值:在 24.6 之前的版本中为 ordered
。从 24.6 开始,没有默认值,该设置变为必须手动指定。对于在早期版本上创建的表,默认值将保持 Ordered
以实现兼容性。
after_processing
成功处理后删除或保留文件。可能的值
- keep。
- delete。
默认值:keep
。
keeper_path
ZooKeeper 中的路径可以指定为表引擎设置,也可以从全局配置提供的路径和表 UUID 形成默认路径。可能的值
- 字符串。
默认值:/
。
s3queue_loading_retries
重试文件加载最多指定的次数。默认情况下,没有重试。可能的值
- 正整数。
默认值:0
。
s3queue_processing_threads_num
执行处理的线程数。仅适用于 Unordered
模式。
默认值:1
。
s3queue_enable_logging_to_s3queue_log
启用日志记录到 system.s3queue_log
。
默认值:0
。
s3queue_polling_min_timeout_ms
指定 ClickHouse 在进行下一次轮询尝试之前等待的最短时间,以毫秒为单位。
可能的值
- 正整数。
默认值:1000
。
s3queue_polling_max_timeout_ms
定义 ClickHouse 在启动下一次轮询尝试之前等待的最长时间,以毫秒为单位。
可能的值
- 正整数。
默认值:10000
。
s3queue_polling_backoff_ms
确定在未找到新文件时添加到先前轮询间隔的额外等待时间。下一次轮询发生在先前间隔和此退避值之和,或最大间隔之间,以较低者为准。
可能的值
- 正整数。
默认值:0
。
s3queue_tracked_files_limit
如果使用“unordered”模式,则允许限制 Zookeeper 节点的数量,对于“ordered”模式不起作用。如果达到限制,则最早处理的文件将从 ZooKeeper 节点中删除并再次处理。
可能的值
- 正整数。
默认值:1000
。
s3queue_tracked_file_ttl_sec
将已处理文件存储在 ZooKeeper 节点中的最大秒数(默认情况下永久存储),用于“unordered”模式,对于“ordered”模式不起作用。超过指定的秒数后,文件将重新导入。
可能的值
- 正整数。
默认值:0
。
s3queue_cleanup_interval_min_ms
对于“Ordered”模式。定义负责维护跟踪文件 TTL 和最大跟踪文件集的后台任务的重新调度间隔的最小边界。
默认值:10000
。
s3queue_cleanup_interval_max_ms
对于“Ordered”模式。定义负责维护跟踪文件 TTL 和最大跟踪文件集的后台任务的重新调度间隔的最大边界。
默认值:30000
。
s3queue_buckets
对于“Ordered”模式。自 24.6
版本起可用。如果 S3Queue 表有多个副本,每个副本都在 Keeper 中使用相同的元数据目录,则 s3queue_buckets
的值需要至少等于副本数。如果也使用了 s3queue_processing_threads
设置,则进一步增加 s3queue_buckets
设置是有意义的,因为它定义了 S3Queue
处理的实际并行度。
S3 相关设置
引擎支持所有 s3 相关设置。有关 S3 设置的更多信息,请参阅此处。
S3 基于角色的访问
s3Queue 表引擎支持基于角色的访问。有关配置角色以访问您的存储桶的步骤,请参阅此处的文档。
配置角色后,可以通过 extra_credentials
参数传递 roleARN
,如下所示
CREATE TABLE s3_table
(
ts DateTime,
value UInt64
)
ENGINE = S3Queue(
'https://<your_bucket>/*.csv',
extra_credentials(role_arn = 'arn:aws:iam::111111111111:role/<your_role>')
,'CSV')
SETTINGS
...
S3Queue 有序模式
S3Queue
处理模式允许在 ZooKeeper 中存储更少的元数据,但有一个限制,即按时间稍后添加的文件需要具有字母数字顺序更大的名称。
S3Queue
ordered
模式以及 unordered
模式都支持 (s3queue_)processing_threads_num
设置(s3queue_
前缀是可选的),该设置允许控制将在服务器本地执行 S3
文件处理的线程数。此外,ordered
模式还引入了另一个名为 (s3queue_)buckets
的设置,这意味着“逻辑线程”。这意味着在分布式场景中,当有多台服务器具有 S3Queue
表副本时,此设置定义了处理单元的数量。例如,每个 S3Queue
副本上的每个处理线程都将尝试锁定某个 bucket
进行处理,每个 bucket
都通过文件名的哈希值归因于某些文件。因此,在分布式场景中,强烈建议将 (s3queue_)buckets
设置为至少等于副本数或更大。桶的数量大于副本的数量是可以的。最理想的情况是 (s3queue_)buckets
设置等于 number_of_replicas
和 (s3queue_)processing_threads_num
的乘积。不建议在 24.6
之前的版本中使用 (s3queue_)processing_threads_num
设置。(s3queue_)buckets
设置从 24.6
版本开始可用。
描述
SELECT
对于流式导入来说不是特别有用(调试除外),因为每个文件只能导入一次。使用物化视图创建实时线程更为实用。为此:
- 使用引擎创建一个表,用于从 S3 中的指定路径消费,并将其视为数据流。
- 创建一个具有所需结构的表。
- 创建一个物化视图,将来自引擎的数据转换为数据并将其放入先前创建的表中。
当 MATERIALIZED VIEW
加入引擎时,它开始在后台收集数据。
示例
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
mode = 'unordered';
CREATE TABLE stats (name String, value UInt32)
ENGINE = MergeTree() ORDER BY name;
CREATE MATERIALIZED VIEW consumer TO stats
AS SELECT name, value FROM s3queue_engine_table;
SELECT * FROM stats ORDER BY name;
虚拟列
_path
— 文件的路径。_file
— 文件的名称。
有关虚拟列的更多信息,请参阅此处。
路径中的通配符
path
参数可以使用类似 bash 的通配符指定多个文件。为了被处理,文件应该存在并匹配整个路径模式。文件列表在 SELECT
期间确定(而不是在 CREATE
时刻)。
*
— 替换除/
之外的任意数量的任意字符,包括空字符串。**
— 替换包括/
在内的任意数量的任意字符,包括空字符串。?
— 替换任何单个字符。{some_string,another_string,yet_another_one}
— 替换字符串'some_string'、'another_string'、'yet_another_one'
中的任何一个。{N..M}
— 替换从 N 到 M(包括边界)范围内的任何数字。N 和 M 可以有前导零,例如000..078
。
带有 {}
的构造类似于 remote 表函数。
限制
- 重复行可能是以下原因造成的:
-
在文件处理过程中解析时发生异常,并且通过
s3queue_loading_retries
启用了重试; -
S3Queue
配置在指向同一 zookeeper 路径的多台服务器上,并且 keeper 会话在服务器设法提交已处理的文件之前过期,这可能导致另一台服务器接管文件的处理,而该文件可能已被第一台服务器部分或完全处理; -
服务器异常终止。
S3Queue
配置在指向同一 zookeeper 路径的多台服务器上,并且使用了Ordered
模式,则s3queue_loading_retries
将不起作用。这将很快修复。
内省
对于内省,请使用 system.s3queue
无状态表和 system.s3queue_log
持久表。
system.s3queue
。此表不是持久的,并显示S3Queue
的内存状态:当前正在处理哪些文件,哪些文件已处理或失败。
┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ CREATE TABLE system.s3queue
(
`database` String,
`table` String,
`file_name` String,
`rows_processed` UInt64,
`status` String,
`processing_start_time` Nullable(DateTime),
`processing_end_time` Nullable(DateTime),
`ProfileEvents` Map(String, UInt64)
`exception` String
)
ENGINE = SystemS3Queue
COMMENT 'Contains in-memory state of S3Queue metadata and currently processed rows per file.' │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
示例
SELECT *
FROM system.s3queue
Row 1:
──────
zookeeper_path: /clickhouse/s3queue/25ea5621-ae8c-40c7-96d0-cec959c5ab88/3b3f66a1-9866-4c2e-ba78-b6bfa154207e
file_name: wikistat/original/pageviews-20150501-030000.gz
rows_processed: 5068534
status: Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time: 2023-10-13 13:10:31
ProfileEvents: {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5068534,'SelectedBytes':198132283,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':2480,'S3QueueSetFileProcessedMicroseconds':9985,'S3QueuePullMicroseconds':273776,'LogTest':17}
exception:
system.s3queue_log
。持久表。具有与system.s3queue
相同的信息,但针对processed
和failed
文件。
该表具有以下结构
SHOW CREATE TABLE system.s3queue_log
Query id: 0ad619c3-0f2a-4ee4-8b40-c73d86e04314
┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ CREATE TABLE system.s3queue_log
(
`event_date` Date,
`event_time` DateTime,
`table_uuid` String,
`file_name` String,
`rows_processed` UInt64,
`status` Enum8('Processed' = 0, 'Failed' = 1),
`processing_start_time` Nullable(DateTime),
`processing_end_time` Nullable(DateTime),
`ProfileEvents` Map(String, UInt64),
`exception` String
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time)
SETTINGS index_granularity = 8192 │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
为了使用 system.s3queue_log
,请在服务器配置文件中定义其配置
<s3queue_log>
<database>system</database>
<table>s3queue_log</table>
</s3queue_log>
示例
SELECT *
FROM system.s3queue_log
Row 1:
──────
event_date: 2023-10-13
event_time: 2023-10-13 13:10:12
table_uuid:
file_name: wikistat/original/pageviews-20150501-020000.gz
rows_processed: 5112621
status: Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time: 2023-10-13 13:10:12
ProfileEvents: {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5112621,'SelectedBytes':198577687,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':1934,'S3QueueSetFileProcessedMicroseconds':17063,'S3QueuePullMicroseconds':5841972,'LogTest':17}
exception: