s3 表函数
提供一个类似表的接口,用于在 Amazon S3 和 Google Cloud Storage 中选择/插入文件。此表函数类似于 hdfs 函数,但提供 S3 特有的功能。
如果您的集群中有多个副本,您可以改用 s3Cluster 函数来并行插入。
当将 s3 表函数
与 INSERT INTO...SELECT 一起使用时,数据以流式方式读取和插入。只有少量数据块驻留在内存中,同时数据块会不断地从 S3 读取并推送到目标表中。
语法
s3(url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,structure] [,compression_method],[,headers])
s3(named_collection[, option=value [,..]])
S3 表函数通过使用 GCS XML API 和 HMAC 密钥与 Google Cloud Storage 集成。有关端点和 HMAC 的更多详细信息,请参阅 Google 互操作性文档。
对于 GCS,请在您看到 access_key_id 和 secret_access_key 的地方替换您的 HMAC 密钥和 HMAC 密钥。
参数
s3 表函数支持以下普通参数
- url — 带有文件路径的 Bucket url。在只读模式下支持以下通配符:*、**、?、{abc,def} 和 {N..M},其中 N、M — 数字,'abc'、'def' — 字符串。有关更多信息,请参见此处。
GCS
GCS url 的格式如下,因为 Google XML API 的端点与 JSON API 不同
https://storage.googleapis.com/<bucket>/<folder>/<filename(s)>
- NOSIGN — 如果提供此关键字代替凭据,则所有请求都不会被签名。
- access_key_id 和 secret_access_key — 用于指定与给定端点一起使用的凭据的密钥。可选。
- session_token - 与给定密钥一起使用的会话令牌。传递密钥时可选。
- format — 文件的格式。
- structure — 表的结构。格式
'column1_name column1_type, column2_name column2_type, ...'
。 - compression_method — 参数是可选的。支持的值:
none
、gzip
或gz
、brotli
或br
、xz
或LZMA
、zstd
或zst
。默认情况下,它将通过文件扩展名自动检测压缩方法。 - headers - 参数是可选的。允许在 S3 请求中传递标头。以
headers(key=value)
格式传递,例如headers('x-amz-request-payer' = 'requester')
。
参数也可以使用命名集合传递。在这种情况下,url
、access_key_id
、secret_access_key
、format
、structure
、compression_method
的工作方式相同,并且支持一些额外的参数
filename
— 如果指定,则附加到 url。use_environment_credentials
— 默认启用,允许使用环境变量AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
、AWS_CONTAINER_CREDENTIALS_FULL_URI
、AWS_CONTAINER_AUTHORIZATION_TOKEN
、AWS_EC2_METADATA_DISABLED
传递额外的参数。no_sign_request
— 默认禁用。expiration_window_seconds
— 默认值为 120。
返回值
一个具有指定结构的表,用于在指定文件中读取或写入数据。
示例
从 S3 文件 https://datasets-documentation.s3.eu-west-3.amazonaws.com/aapl_stock.csv
的表中选择前 5 行
SELECT *
FROM s3(
'https://datasets-documentation.s3.eu-west-3.amazonaws.com/aapl_stock.csv',
'CSVWithNames'
)
LIMIT 5;
┌───────Date─┬────Open─┬────High─┬─────Low─┬───Close─┬───Volume─┬─OpenInt─┐
│ 1984-09-07 │ 0.42388 │ 0.42902 │ 0.41874 │ 0.42388 │ 23220030 │ 0 │
│ 1984-09-10 │ 0.42388 │ 0.42516 │ 0.41366 │ 0.42134 │ 18022532 │ 0 │
│ 1984-09-11 │ 0.42516 │ 0.43668 │ 0.42516 │ 0.42902 │ 42498199 │ 0 │
│ 1984-09-12 │ 0.42902 │ 0.43157 │ 0.41618 │ 0.41618 │ 37125801 │ 0 │
│ 1984-09-13 │ 0.43927 │ 0.44052 │ 0.43927 │ 0.43927 │ 57822062 │ 0 │
└────────────┴─────────┴─────────┴─────────┴─────────┴──────────┴─────────┘
ClickHouse 使用文件名扩展名来确定数据格式。例如,我们本可以在没有 CSVWithNames
的情况下运行之前的命令
SELECT *
FROM s3(
'https://datasets-documentation.s3.eu-west-3.amazonaws.com/aapl_stock.csv'
)
LIMIT 5;
ClickHouse 也可以确定文件的压缩方法。例如,如果文件使用 .csv.gz
扩展名压缩,ClickHouse 将自动解压缩该文件。
用法
假设我们在 S3 上有几个具有以下 URI 的文件
- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/some_prefix/some_file_1.csv'
- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/some_prefix/some_file_2.csv'
- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/some_prefix/some_file_3.csv'
- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/some_prefix/some_file_4.csv'
- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/another_prefix/some_file_1.csv'
- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/another_prefix/some_file_2.csv'
- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/another_prefix/some_file_3.csv'
- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/another_prefix/some_file_4.csv'
计算文件名以数字 1 到 3 结尾的文件中的行数
SELECT count(*)
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/my-test-bucket-768/{some,another}_prefix/some_file_{1..3}.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32')
┌─count()─┐
│ 18 │
└─────────┘
计算这两个目录中所有文件中的总行数
SELECT count(*)
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/my-test-bucket-768/{some,another}_prefix/*', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32')
┌─count()─┐
│ 24 │
└─────────┘
如果您的文件列表包含带有前导零的数字范围,请对每个数字分别使用大括号结构或使用 ?
。
计算名为 file-000.csv
、file-001.csv
、...、file-999.csv
的文件中的总行数
SELECT count(*)
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/my-test-bucket-768/big_prefix/file-{000..999}.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32');
┌─count()─┐
│ 12 │
└─────────┘
将数据插入文件 test-data.csv.gz
INSERT INTO FUNCTION s3('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
VALUES ('test-data', 1), ('test-data-2', 2);
从现有表将数据插入文件 test-data.csv.gz
INSERT INTO FUNCTION s3('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
SELECT name, value FROM existing_table;
Glob ** 可用于递归目录遍历。考虑以下示例,它将递归地从 my-test-bucket-768
目录中获取所有文件
SELECT * FROM s3('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/**', 'CSV', 'name String, value UInt32', 'gzip');
以下代码从 my-test-bucket
目录内的任何文件夹中递归获取所有 test-data.csv.gz
文件的数据
SELECT * FROM s3('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/**/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip');
注意。可以在服务器配置文件中指定自定义 URL 映射器。示例
SELECT * FROM s3('s3://clickhouse-public-datasets/my-test-bucket-768/**/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip');
URL 's3://clickhouse-public-datasets/my-test-bucket-768/**/test-data.csv.gz'
将被替换为 'http://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/**/test-data.csv.gz'
自定义映射器可以添加到 config.xml
中
<url_scheme_mappers>
<s3>
<to>https://{bucket}.s3.amazonaws.com</to>
</s3>
<gs>
<to>https://{bucket}.storage.googleapis.com</to>
</gs>
<oss>
<to>https://{bucket}.oss.aliyuncs.com</to>
</oss>
</url_scheme_mappers>
对于生产用例,建议使用命名集合。这是一个示例
CREATE NAMED COLLECTION creds AS
access_key_id = '***',
secret_access_key = '***';
SELECT count(*)
FROM s3(creds, url='https://s3-object-url.csv')
分区写入
如果在将数据插入 S3
表时指定 PARTITION BY
表达式,则会为每个分区值创建一个单独的文件。将数据拆分为单独的文件有助于提高读取操作效率。
示例
- 在键中使用分区 ID 会创建单独的文件
INSERT INTO TABLE FUNCTION
s3('http://bucket.amazonaws.com/my_bucket/file_{_partition_id}.csv', 'CSV', 'a String, b UInt32, c UInt32')
PARTITION BY a VALUES ('x', 2, 3), ('x', 4, 5), ('y', 11, 12), ('y', 13, 14), ('z', 21, 22), ('z', 23, 24);
结果,数据被写入三个文件:file_x.csv
、file_y.csv
和 file_z.csv
。
- 在 bucket 名称中使用分区 ID 会在不同的 bucket 中创建文件
INSERT INTO TABLE FUNCTION
s3('http://bucket.amazonaws.com/my_bucket_{_partition_id}/file.csv', 'CSV', 'a UInt32, b UInt32, c UInt32')
PARTITION BY a VALUES (1, 2, 3), (1, 4, 5), (10, 11, 12), (10, 13, 14), (20, 21, 22), (20, 23, 24);
结果,数据被写入不同 bucket 中的三个文件:my_bucket_1/file.csv
、my_bucket_10/file.csv
和 my_bucket_20/file.csv
。
访问公共 bucket
ClickHouse 尝试从多种不同类型的源获取凭据。有时,当访问某些公共 bucket 时,可能会产生问题,导致客户端返回 403 错误代码。通过使用 NOSIGN
关键字,强制客户端忽略所有凭据,并且不对请求进行签名,可以避免此问题。
SELECT *
FROM s3(
'https://datasets-documentation.s3.eu-west-3.amazonaws.com/aapl_stock.csv',
NOSIGN,
'CSVWithNames'
)
LIMIT 5;
使用 S3 凭据 (ClickHouse Cloud)
对于非公共 bucket,用户可以将 aws_access_key_id
和 aws_secret_access_key
传递给函数。例如
SELECT count() FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/mta/*.tsv', '<KEY>', '<SECRET>','TSVWithNames')
这适用于一次性访问或凭据可以轻松轮换的情况。但是,对于重复访问或凭据敏感的情况,不建议将其作为长期解决方案。在这种情况下,我们建议用户依赖于基于角色的访问。
ClickHouse Cloud 中 S3 的基于角色的访问在此处记录。
配置完成后,可以通过 extra_credentials
参数将 roleARN
传递给 s3 函数。例如
SELECT count() FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/mta/*.tsv','CSVWithNames',extra_credentials(role_arn = 'arn:aws:iam::111111111111:role/ClickHouseAccessRole-001'))
更多示例可以在此处找到
使用存档
假设我们在 S3 上有几个具有以下 URI 的存档文件
- 'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-10.csv.zip'
- 'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-11.csv.zip'
- 'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-12.csv.zip'
可以使用 ::
从这些存档中提取数据。Glob 可以用于 url 部分以及 ::
之后的部分(负责存档内文件的名称)。
SELECT *
FROM s3(
'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-1{0..2}.csv.zip :: *.csv'
);
ClickHouse 支持三种存档格式:ZIP TAR 7Z。虽然可以从任何受支持的存储位置访问 ZIP 和 TAR 存档,但 7Z 存档只能从安装 ClickHouse 的本地文件系统读取。
虚拟列
_path
— 文件的路径。类型:LowCardinality(String)
。在存档的情况下,以以下格式显示路径:"{path_to_archive}::{path_to_file_inside_archive}"
_file
— 文件的名称。类型:LowCardinality(String)
。在存档的情况下,显示存档内文件的名称。_size
— 文件大小(以字节为单位)。类型:Nullable(UInt64)
。如果文件大小未知,则值为NULL
。在存档的情况下,显示存档内文件的未压缩文件大小。_time
— 文件的最后修改时间。类型:Nullable(DateTime)
。如果时间未知,则值为NULL
。
Hive 风格分区
当 use_hive_partitioning
设置为 1 时,ClickHouse 将检测路径中的 Hive 风格分区 (/name=value/
),并允许在查询中使用分区列作为虚拟列。这些虚拟列将具有与分区路径中相同的名称,但以 _
开头。
示例
使用使用 Hive 风格分区创建的虚拟列
SELECT * from s3('s3://data/path/date=*/country=*/code=*/*.parquet') where _date > '2020-01-01' and _country = 'Netherlands' and _code = 42;
访问请求者付费 bucket
要访问请求者付费 bucket,必须在任何请求中传递标头 x-amz-request-payer = requester
。这可以通过将参数 headers('x-amz-request-payer' = 'requester')
传递给 s3 函数来实现。例如
SELECT
count() AS num_rows,
uniqExact(_file) AS num_files
FROM s3('https://coiled-datasets-rp.s3.us-east-1.amazonaws.com/1trc/measurements-100*.parquet', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', headers('x-amz-request-payer' = 'requester'))
┌───num_rows─┬─num_files─┐
│ 1110000000 │ 111 │
└────────────┴───────────┘
1 row in set. Elapsed: 3.089 sec. Processed 1.09 billion rows, 0.00 B (353.55 million rows/s., 0.00 B/s.)
Peak memory usage: 192.27 KiB.
存储设置
- s3_truncate_on_insert - 允许在插入文件之前截断文件。默认禁用。
- s3_create_new_file_on_insert - 如果格式有后缀,则允许在每次插入时创建一个新文件。默认禁用。
- s3_skip_empty_files - 允许在读取时跳过空文件。默认启用。
另请参阅