跳到主要内容
跳到主要内容

AvroConfluent

InputOutputAlias

描述

AvroConfluent 支持解码通常与 KafkaConfluent Schema Registry 一起使用的单对象 Avro 消息。每个 Avro 消息都嵌入了一个模式 ID,可以在 Schema Registry 的帮助下解析为实际模式。模式在解析后会被缓存。

数据类型匹配

下表显示了 Apache Avro 格式支持的所有数据类型,以及它们在 INSERTSELECT 查询中对应的 ClickHouse 数据类型

Avro 数据类型 INSERTClickHouse 数据类型Avro 数据类型 SELECT
boolean, int, long, float, doubleInt(8\16\32), UInt(8\16\32)int
boolean, int, long, float, doubleInt64, UInt64long
boolean, int, long, float, doubleFloat32float
boolean, int, long, float, doubleFloat64double
bytes, string, fixed, enumStringbytesstring *
bytes, string, fixedFixedString(N)fixed(N)
enumEnum(8\16)enum
array(T)Array(T)array(T)
map(V, K)Map(V, K)map(string, K)
union(null, T), union(T, null)Nullable(T)union(null, T)
union(T1, T2, …) **Variant(T1, T2, …)union(T1, T2, …) **
nullNullable(Nothing)null
int (date) ***Date, Date32int (date) ***
long (timestamp-millis) ***DateTime64(3)long (timestamp-millis) ***
long (timestamp-micros) ***DateTime64(6)long (timestamp-micros) ***
bytes (decimal) ***DateTime64(N)bytes (decimal) ***
intIPv4int
fixed(16)IPv6fixed(16)
bytes (decimal) ***Decimal(P, S)bytes (decimal) ***
string (uuid) ***UUIDstring (uuid) ***
fixed(16)Int128/UInt128fixed(16)
fixed(32)Int256/UInt256fixed(32)
recordTuplerecord

* bytes 是默认值,由设置 output_format_avro_string_column_pattern 控制

** Variant 类型 隐式接受 null 作为字段值,因此例如 Avro union(T1, T2, null) 将被转换为 Variant(T1, T2)。因此,当从 ClickHouse 生成 Avro 时,我们必须始终在 Avro union 类型集中包含 null 类型,因为我们不知道在模式推断期间是否有任何值实际上是 null

*** Avro 逻辑类型

不支持的 Avro 逻辑数据类型

  • time-millis
  • time-micros
  • duration

示例用法

要快速验证模式解析,您可以使用带有 clickhouse-localkafkacat

$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c

要将 AvroConfluentKafka 一起使用

CREATE TABLE topic1_stream
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent';

-- for debug purposes you can set format_avro_schema_registry_url in a session.
-- this way cannot be used in production
SET format_avro_schema_registry_url = 'http://schema-registry';

SELECT * FROM topic1_stream;

格式设置

Schema Registry URL 通过 format_avro_schema_registry_url 配置。

注意

format_avro_schema_registry_url 设置需要在 users.xml 中配置,以便在重启后保持其值。您也可以使用 Kafka 表引擎的 format_avro_schema_registry_url 设置。

Setting描述Default
input_format_avro_allow_missing_fields对于 Avro/AvroConfluent 格式:当在模式中找不到字段时,使用默认值而不是错误0
input_format_avro_null_as_default对于 Avro/AvroConfluent 格式:在 null 和非 Nullable 列的情况下插入默认值0
format_avro_schema_registry_url对于 AvroConfluent 格式:Confluent Schema Registry URL。