AvroConfluent
Input | Output | Alias |
---|---|---|
✔ | ✗ |
描述
AvroConfluent 支持解码通常与 Kafka 和 Confluent Schema Registry 一起使用的单对象 Avro 消息。每个 Avro 消息都嵌入了一个模式 ID,可以在 Schema Registry 的帮助下解析为实际模式。模式在解析后会被缓存。
数据类型匹配
下表显示了 Apache Avro 格式支持的所有数据类型,以及它们在 INSERT
和 SELECT
查询中对应的 ClickHouse 数据类型。
Avro 数据类型 INSERT | ClickHouse 数据类型 | Avro 数据类型 SELECT |
---|---|---|
boolean , int , long , float , double | Int(8\16\32), UInt(8\16\32) | int |
boolean , int , long , float , double | Int64, UInt64 | long |
boolean , int , long , float , double | Float32 | float |
boolean , int , long , float , double | Float64 | double |
bytes , string , fixed , enum | String | bytes 或 string * |
bytes , string , fixed | FixedString(N) | fixed(N) |
enum | Enum(8\16) | enum |
array(T) | Array(T) | array(T) |
map(V, K) | Map(V, K) | map(string, K) |
union(null, T) , union(T, null) | Nullable(T) | union(null, T) |
union(T1, T2, …) ** | Variant(T1, T2, …) | union(T1, T2, …) ** |
null | Nullable(Nothing) | null |
int (date) *** | Date, Date32 | int (date) *** |
long (timestamp-millis) *** | DateTime64(3) | long (timestamp-millis) *** |
long (timestamp-micros) *** | DateTime64(6) | long (timestamp-micros) *** |
bytes (decimal) *** | DateTime64(N) | bytes (decimal) *** |
int | IPv4 | int |
fixed(16) | IPv6 | fixed(16) |
bytes (decimal) *** | Decimal(P, S) | bytes (decimal) *** |
string (uuid) *** | UUID | string (uuid) *** |
fixed(16) | Int128/UInt128 | fixed(16) |
fixed(32) | Int256/UInt256 | fixed(32) |
record | Tuple | record |
* bytes
是默认值,由设置 output_format_avro_string_column_pattern
控制
** Variant 类型 隐式接受 null
作为字段值,因此例如 Avro union(T1, T2, null)
将被转换为 Variant(T1, T2)
。因此,当从 ClickHouse 生成 Avro 时,我们必须始终在 Avro union
类型集中包含 null
类型,因为我们不知道在模式推断期间是否有任何值实际上是 null
。
*** Avro 逻辑类型
不支持的 Avro 逻辑数据类型
time-millis
time-micros
duration
示例用法
要快速验证模式解析,您可以使用带有 clickhouse-local 的 kafkacat
$ kafkacat -b kafka-broker -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String" -q 'select * from table'
1 a
2 b
3 c
要将 AvroConfluent
与 Kafka 一起使用
CREATE TABLE topic1_stream
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent';
-- for debug purposes you can set format_avro_schema_registry_url in a session.
-- this way cannot be used in production
SET format_avro_schema_registry_url = 'http://schema-registry';
SELECT * FROM topic1_stream;
格式设置
Schema Registry URL 通过 format_avro_schema_registry_url
配置。
注意
format_avro_schema_registry_url
设置需要在 users.xml
中配置,以便在重启后保持其值。您也可以使用 Kafka
表引擎的 format_avro_schema_registry_url
设置。
Setting | 描述 | Default |
---|---|---|
input_format_avro_allow_missing_fields | 对于 Avro/AvroConfluent 格式:当在模式中找不到字段时,使用默认值而不是错误 | 0 |
input_format_avro_null_as_default | 对于 Avro/AvroConfluent 格式:在 null 和非 Nullable 列的情况下插入默认值 | 0 |
format_avro_schema_registry_url | 对于 AvroConfluent 格式:Confluent Schema Registry URL。 |