跳至主要内容

S3Queue 表引擎

此引擎提供了与 Amazon S3 生态系统的集成并允许流式导入。此引擎类似于 KafkaRabbitMQ 引擎,但提供 S3 特定的功能。

创建表

CREATE TABLE s3_queue_engine_table (name String, value UInt32)
ENGINE = S3Queue(path, [NOSIGN, | aws_access_key_id, aws_secret_access_key,] format, [compression])
[SETTINGS]
[mode = '',]
[after_processing = 'keep',]
[keeper_path = '',]
[loading_retries = 0,]
[processing_threads_num = 1,]
[enable_logging_to_s3queue_log = 0,]
[polling_min_timeout_ms = 1000,]
[polling_max_timeout_ms = 10000,]
[polling_backoff_ms = 0,]
[tracked_file_ttl_sec = 0,]
[tracked_files_limit = 1000,]
[cleanup_interval_min_ms = 10000,]
[cleanup_interval_max_ms = 30000,]
危险

24.7 之前,需要为除 modeafter_processingkeeper_path 之外的所有设置使用 s3queue_ 前缀。

引擎参数

S3Queue 参数与 S3 表引擎支持的相同。请参阅 此处的参数部分。

示例

CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
mode = 'unordered';

使用命名集合

<clickhouse>
<named_collections>
<s3queue_conf>
<url>'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*</url>
<access_key_id>test<access_key_id>
<secret_access_key>test</secret_access_key>
</s3queue_conf>
</named_collections>
</clickhouse>
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue(s3queue_conf, format = 'CSV', compression_method = 'gzip')
SETTINGS
mode = 'ordered';

设置

mode

可能的值

  • unordered - 在无序模式下,已处理的所有文件的集合通过 ZooKeeper 中的持久节点进行跟踪。
  • ordered - 在有序模式下,文件按字典顺序处理。这意味着,如果某个时候处理了名为 'BBB' 的文件,并且后来将名为 'AA' 的文件添加到存储桶中,则该文件将被忽略。只有成功消费的文件的最大名称(按字典顺序),以及在加载尝试失败后将被重试的文件的名称存储在 ZooKeeper 中。

默认值:24.6 之前版本的 ordered。从 24.6 开始没有默认值,该设置需要手动指定。对于在早期版本上创建的表,默认值将保留为 Ordered 以保持兼容性。

after_processing

在成功处理后删除或保留文件。可能的值

  • keep。
  • delete。

默认值:keep

keeper_path

ZooKeeper 中的路径可以作为表引擎设置指定,或者可以从全局配置提供的路径和表 UUID 形成默认路径。可能的值

  • 字符串。

默认值:/

s3queue_loading_retries

最多重试指定次数的文件加载。默认情况下,没有重试。可能的值

  • 正整数。

默认值:0

s3queue_processing_threads_num

执行处理的线程数。仅适用于 Unordered 模式。

默认值:1

s3queue_enable_logging_to_s3queue_log

启用日志记录到 system.s3queue_log

默认值:0

s3queue_polling_min_timeout_ms

下一次轮询之前的最小超时时间(以毫秒为单位)。

可能的值

  • 正整数。

默认值:1000

s3queue_polling_max_timeout_ms

下一次轮询之前的最大超时时间(以毫秒为单位)。

可能的值

  • 正整数。

默认值:10000

s3queue_polling_backoff_ms

轮询回退(以毫秒为单位)。

可能的值

  • 正整数。

默认值:0

s3queue_tracked_files_limit

允许在使用 'unordered' 模式时限制 Zookeeper 节点的数量,对 'ordered' 模式没有作用。如果限制达到,最旧的已处理文件将从 ZooKeeper 节点中删除并重新处理。

可能的值

  • 正整数。

默认值:1000

s3queue_tracked_file_ttl_sec

在 ZooKeeper 节点中存储已处理文件的最大秒数(默认情况下永远存储)'unordered' 模式,对 'ordered' 模式没有作用。在指定的秒数后,文件将重新导入。

可能的值

  • 正整数。

默认值:0

s3queue_cleanup_interval_min_ms

对于 'Ordered' 模式。定义负责维护跟踪文件 TTL 和最大跟踪文件集的后台任务的重新调度间隔的最小边界。

默认值:10000

s3queue_cleanup_interval_max_ms

对于 'Ordered' 模式。定义负责维护跟踪文件 TTL 和最大跟踪文件集的后台任务的重新调度间隔的最大边界。

默认值:30000

s3queue_buckets

对于 'Ordered' 模式。从 24.6 开始可用。如果 S3Queue 表有多个副本,每个副本都使用 keeper 中相同的元数据目录,那么 s3queue_buckets 的值至少需要等于副本的数量。如果还使用 s3queue_processing_threads 设置,则有意义地进一步增加 s3queue_buckets 设置的值,因为它定义了 S3Queue 处理的实际并行性。

与 S3 相关的设置

引擎支持所有与 s3 相关的设置。有关 S3 设置的更多信息,请参阅 此处

S3Queue 有序模式

S3Queue 处理模式允许在 ZooKeeper 中存储更少的元数据,但有一个限制,即按时间顺序添加的文件需要具有字母数字上更大的名称。

S3Queueordered 模式以及 unordered 模式支持 (s3queue_)processing_threads_num 设置(s3queue_ 前缀是可选的),它允许控制在服务器上本地执行 S3 文件处理的线程数。此外,ordered 模式还引入了另一个名为 (s3queue_)buckets 的设置,它表示“逻辑线程”。这意味着在分布式场景中,当有多台服务器具有 S3Queue 表副本时,此设置定义了处理单元的数量。例如,每个 S3Queue 副本上的每个处理线程都会尝试锁定某个 bucket 进行处理,每个 bucket 会通过文件名哈希分配给特定文件。因此,在分布式场景中,强烈建议将 (s3queue_)buckets 设置的值至少等于副本的数量或更大。将桶的数量设置得大于副本的数量是可以的。最优场景是将 (s3queue_)buckets 设置的值等于 副本数量(s3queue_)processing_threads_num 的乘积。在 24.6 版本之前,不建议使用 (s3queue_)processing_threads_num 设置。(s3queue_)buckets 设置从 24.6 版本开始可用。

描述

SELECT 对流式导入并不特别有用(调试除外),因为每个文件只能导入一次。使用 物化视图 创建实时线程更实用。为此

  1. 使用该引擎创建一个表,用于从 S3 中指定的路径进行消费,并将其视为数据流。
  2. 创建一个具有所需结构的表。
  3. 创建一个将数据从引擎转换为先前创建的表的物化视图。

MATERIALIZED VIEW 加入引擎时,它会开始在后台收集数据。

示例

  CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
mode = 'unordered';

CREATE TABLE stats (name String, value UInt32)
ENGINE = MergeTree() ORDER BY name;

CREATE MATERIALIZED VIEW consumer TO stats
AS SELECT name, value FROM s3queue_engine_table;

SELECT * FROM stats ORDER BY name;

虚拟列

  • _path - 文件的路径。
  • _file - 文件的名称。

有关虚拟列的更多信息,请参阅 此处

路径中的通配符

path 参数可以使用类似 bash 的通配符指定多个文件。要处理的文件应该存在并与整个路径模式匹配。文件列表在 SELECT 期间确定(而不是在 CREATE 时)。

  • * - 替换除 / 之外的任何数量的任何字符,包括空字符串。
  • ** - 替换任何数量的任何字符,包括 /,包括空字符串。
  • ? - 替换任何单个字符。
  • {some_string,another_string,yet_another_one} - 替换任何字符串 'some_string', 'another_string', 'yet_another_one'
  • {N..M} - 替换从 N 到 M 的任何数字,包括两个边界。N 和 M 可以有前导零,例如 000..078

使用 {} 的构造与 远程 表函数类似。

限制

  1. 重复行可能是由于
  • 解析文件处理过程中间发生异常,并且通过 s3queue_loading_retries 启用了重试;

  • S3Queue 在指向 Zookeeper 中同一路径的多个服务器上配置,并且 Keeper 会话在某台服务器成功提交已处理文件之前过期,这会导致另一台服务器接管文件的处理,而该文件可能已被第一台服务器部分或全部处理;

  • 服务器异常终止。

  1. S3Queue 在指向 Zookeeper 中同一路径的多个服务器上配置,并且使用了 Ordered 模式,那么 s3queue_loading_retries 将不起作用。这个问题很快就会得到解决。

自省

为了自省,请使用 system.s3queue 无状态表和 system.s3queue_log 持久表。

  1. system.s3queue。此表不持久,并显示 S3Queue 的内存状态:当前正在处理哪些文件,哪些文件已处理或失败。
┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
CREATE TABLE system.s3queue
(
`database` String,
`table` String,
`file_name` String,
`rows_processed` UInt64,
`status` String,
`processing_start_time` Nullable(DateTime),
`processing_end_time` Nullable(DateTime),
`ProfileEvents` Map(String, UInt64)
`exception` String
)
ENGINE = SystemS3Queue
COMMENT 'Contains in-memory state of S3Queue metadata and currently processed rows per file.'
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

示例


SELECT *
FROM system.s3queue

Row 1:
──────
zookeeper_path: /clickhouse/s3queue/25ea5621-ae8c-40c7-96d0-cec959c5ab88/3b3f66a1-9866-4c2e-ba78-b6bfa154207e
file_name: wikistat/original/pageviews-20150501-030000.gz
rows_processed: 5068534
status: Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time: 2023-10-13 13:10:31
ProfileEvents: {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5068534,'SelectedBytes':198132283,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':2480,'S3QueueSetFileProcessedMicroseconds':9985,'S3QueuePullMicroseconds':273776,'LogTest':17}
exception:
  1. system.s3queue_log。持久表。具有与 system.s3queue 相同的信息,但针对 processedfailed 文件。

该表具有以下结构

SHOW CREATE TABLE system.s3queue_log

Query id: 0ad619c3-0f2a-4ee4-8b40-c73d86e04314

┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
CREATE TABLE system.s3queue_log
(
`event_date` Date,
`event_time` DateTime,
`table_uuid` String,
`file_name` String,
`rows_processed` UInt64,
`status` Enum8('Processed' = 0, 'Failed' = 1),
`processing_start_time` Nullable(DateTime),
`processing_end_time` Nullable(DateTime),
`ProfileEvents` Map(String, UInt64),
`exception` String
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time)
SETTINGS index_granularity = 8192
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

为了使用 system.s3queue_log,请在服务器配置文件中定义其配置

    <s3queue_log>
<database>system</database>
<table>s3queue_log</table>
</s3queue_log>

示例

SELECT *
FROM system.s3queue_log

Row 1:
──────
event_date: 2023-10-13
event_time: 2023-10-13 13:10:12
table_uuid:
file_name: wikistat/original/pageviews-20150501-020000.gz
rows_processed: 5112621
status: Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time: 2023-10-13 13:10:12
ProfileEvents: {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5112621,'SelectedBytes':198577687,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':1934,'S3QueueSetFileProcessedMicroseconds':17063,'S3QueuePullMicroseconds':5841972,'LogTest':17}
exception: