跳到主要内容

如何将数据从 Kafka 摄取到 ClickHouse

·6 分钟阅读
了解如何使用 Kafka 表引擎、物化视图和 MergeTree 表将数据从 Kafka 主题摄取到 ClickHouse 中。

概述

本文介绍了将数据从 Kafka 主题发送到 ClickHouse 表的过程。我们将使用 Wiki 最近更改源,它提供了一个事件流,表示对各种 Wikimedia 属性所做的更改。步骤包括

  1. 如何在 Ubuntu 上设置 Kafka
  2. 将数据流摄取到 Kafka 主题中
  3. 创建一个订阅该主题的 ClickHouse 表

1. 在 Ubuntu 上设置 Kafka

  1. 创建一个 Ubuntu ec2 实例并通过 SSH 连接到它
ssh -i ~/training.pem [email protected]
  1. 安装 Kafka(基于此处的说明:https://www.linode.com/docs/guides/how-to-install-apache-kafka-on-ubuntu/
sudo apt update
sudo apt install openjdk-11-jdk

mkdir /home/ubuntu/kafka
cd /home/ubuntu/kafka/

wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz

tar -zxvf kafka_2.13-3.7.0.tgz
  1. 启动 ZooKeeper
cd kafka_2.13-3.7.0
bin/zookeeper-server-start.sh config/zookeeper.properties
  1. 打开一个新的控制台并启动 Kafka
ssh -i ~/training.pem [email protected]
cd kafka/kafka_2.13-3.7.0/
bin/kafka-server-start.sh config/server.properties
  1. 打开第三个控制台并创建一个名为 wikimedia 的主题
ssh -i ~/training.pem [email protected]
cd kafka/kafka_2.13-3.7.0/

bin/kafka-topics.sh --create --topic wikimedia --bootstrap-server localhost:9092
  1. 您可以通过以下方式验证它是否成功创建
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

2. 将 Wikimedia 流摄取到 Kafka 中

  1. 我们首先需要一些实用程序
sudo apt-get install librdkafka-dev libyajl-dev
sudo apt-get install kafkacat
  1. 数据使用一个巧妙的 curl 命令发送到 Kafka,该命令获取最新的 Wikimedia 事件,解析出 JSON 数据并将其发送到 Kafka 主题
curl -N https://stream.wikimedia.org/v2/stream/recentchange  | awk '/^data: /{gsub(/^data: /, ""); print}' | kafkacat -P -b localhost:9092 -t wikimedia
  1. 您可以“描述”该主题
bin/kafka-topics.sh --describe --topic wikimedia --bootstrap-server localhost:9092
  1. 让我们通过消费一些事件来验证一切正常
bin/kafka-console-consumer.sh --topic wikimedia --from-beginning --bootstrap-server localhost:9092
  1. 按 Ctrl+c 终止之前的命令。

3. 将数据摄取到 ClickHouse 中

  1. 这是传入数据的样子
{
"$schema": "/mediawiki/recentchange/1.0.0",
"meta": {
"uri": "https://www.wikidata.org/wiki/Q45791749",
"request_id": "f64cfb17-04ba-4d09-8935-38ec6f0001c2",
"id": "9d7d2b5a-b79b-45ea-b72c-69c3b69ae931",
"dt": "2024-04-18T13:21:21Z",
"domain": "www.wikidata.org",
"stream": "mediawiki.recentchange",
"topic": "eqiad.mediawiki.recentchange",
"partition": 0,
"offset": 5032636513
},
"id": 2196113017,
"type": "edit",
"namespace": 0,
"title": "Q45791749",
"title_url": "https://www.wikidata.org/wiki/Q45791749",
"comment": "/* wbsetqualifier-add:1| */ [[Property:P1545]]: 20, Modify PubMed ID: 7292984 citation data from NCBI, Europe PMC and CrossRef",
"timestamp": 1713446481,
"user": "Cewbot",
"bot": true,
"notify_url": "https://www.wikidata.org/w/index.php?diff=2131981357&oldid=2131981341&rcid=2196113017",
"minor": false,
"patrolled": true,
"length": {
"old": 75618,
"new": 75896
},
"revision": {
"old": 2131981341,
"new": 2131981357
},
"server_url": "https://www.wikidata.org",
"server_name": "www.wikidata.org",
"server_script_path": "/w",
"wiki": "wikidatawiki",
"parsedcomment": "<span dir=\"auto\"><span class=\"autocomment\">Added qualifier: </span></span> <a href=\"/wiki/Property:P1545\" title=\"series ordinal | position of an item in its parent series (most frequently a 1-based index), generally to be used as a qualifier (different from &quot;rank&quot; defined as a class, and from &quot;ranking&quot; defined as a property for evaluating a quality).\"><span class=\"wb-itemlink\"><span class=\"wb-itemlink-label\" lang=\"en\" dir=\"ltr\">series ordinal</span> <span class=\"wb-itemlink-id\">(P1545)</span></span></a>: 20, Modify PubMed ID: 7292984 citation data from NCBI, Europe PMC and CrossRef"
}
  1. 我们将需要 Kafka 表引擎来从 Kafka 主题中拉取数据
CREATE OR REPLACE TABLE wikiQueue
(
`id` UInt32,
`type` String,
`title` String,
`title_url` String,
`comment` String,
`timestamp` UInt64,
`user` String,
`bot` Bool,
`server_url` String,
`server_name` String,
`wiki` String,
`meta` Tuple(uri String, id String, stream String, topic String, domain String)
)
ENGINE = Kafka(
'ec2.compute.amazonaws.com:9092',
'wikimedia',
'consumer-group-wiki',
'JSONEachRow'
);
  1. 由于某些原因,Kafka 表引擎似乎采用了公共 ec2 URL 并将其转换为私有 DNS 名称,因此我必须将其添加到我的本地 /etc/hosts 文件中
52.14.154.92  ip.us-east-2.compute.internal
  1. 您可以从 Kafka 表中读取数据,只需启用一个设置即可
SELECT *
FROM wikiQueue
LIMIT 20
FORMAT Vertical
SETTINGS stream_like_engine_allow_direct_select = 1;

行应该根据 wikiQueue 表中定义的列进行良好解析后返回

id:          2473996741
type: edit
title: File:Père-Lachaise - Division 6 - Cassereau 05.jpg
title_url: https://commons.wikimedia.org/wiki/File:P%C3%A8re-Lachaise_-_Division_6_-_Cassereau_05.jpg
comment: /* wbcreateclaim-create:1| */ [[d:Special:EntityPage/P921]]: [[d:Special:EntityPage/Q112327116]], [[:toollabs:quickstatements/#/batch/228454|batch #228454]]
timestamp: 1713457283
user: Ameisenigel
bot: false
server_url: https://commons.wikimedia.org
server_name: commons.wikimedia.org
wiki: commonswiki
meta: ('https://commons.wikimedia.org/wiki/File:P%C3%A8re-Lachaise_-_Division_6_-_Cassereau_05.jpg','01a832e2-24c5-4ccb-bd93-8e2c0e429418','mediawiki.recentchange','eqiad.mediawiki.recentchange','commons.wikimedia.org')
  1. 我们需要一个 MergeTree 表来存储这些传入的事件
CREATE TABLE rawEvents (
id UInt64,
type LowCardinality(String),
comment String,
timestamp DateTime64(3, 'UTC'),
title_url String,
topic LowCardinality(String),
user String
)
ENGINE = MergeTree
ORDER BY (type, timestamp);
  1. 让我们定义一个物化视图,当 Kafka 表上发生插入时触发该视图,并将数据发送到我们的 rawEvents 表
CREATE MATERIALIZED VIEW rawEvents_mv TO rawEvents
AS
SELECT
id,
type,
comment,
toDateTime(timestamp) AS timestamp,
title_url,
tupleElement(meta, 'topic') AS topic,
user
FROM wikiQueue
WHERE title_url <> '';
  1. 您应该几乎立即开始看到数据进入 rawEvents
SELECT count()
FROM rawEvents;
  1. 让我们查看一些行
SELECT *
FROM rawEvents
LIMIT 5
FORMAT Vertical
Row 1:
──────
id: 124842852
type: 142
comment: Pere prlpz commented on "Plantilles Enciclopèdia Catalana" (Diria que no cal fer res als articles. Es pot actualitzar els enllaços que es facin servir a les referències (tot i que l'antic encara ha...)
timestamp: 2024-04-18 16:22:29.000
title_url: https://ca.wikipedia.org/wiki/Tema:Wu36d6vfsiuu4jsi
topic: eqiad.mediawiki.recentchange
user: Pere prlpz

Row 2:
──────
id: 2473996748
type: categorize
comment: [[:File:Ruïne van een poortgebouw, RP-T-1976-29-6(R).jpg]] removed from category
timestamp: 2024-04-18 16:21:20.000
title_url: https://commons.wikimedia.org/wiki/Category:Pieter_Moninckx
topic: eqiad.mediawiki.recentchange
user: Warburg1866

Row 3:
──────
id: 311828596
type: categorize
comment: [[:Cujo (película)]] añadida a la categoría
timestamp: 2024-04-18 16:21:21.000
title_url: https://es.wikipedia.org/wiki/Categor%C3%ADa:Pel%C3%ADculas_basadas_en_obras_de_Stephen_King
topic: eqiad.mediawiki.recentchange
user: Beta15

Row 4:
──────
id: 311828597
type: categorize
comment: [[:Cujo (película)]] eliminada de la categoría
timestamp: 2024-04-18 16:21:21.000
title_url: https://es.wikipedia.org/wiki/Categor%C3%ADa:Trabajos_basados_en_obras_de_Stephen_King
topic: eqiad.mediawiki.recentchange
user: Beta15

Row 5:
──────
id: 48494536
type: categorize
comment: [[:braiteremmo]] ajoutée à la catégorie
timestamp: 2024-04-18 16:21:21.000
title_url: https://fr.wiktionary.org/wiki/Cat%C3%A9gorie:Wiktionnaire:Exemples_manquants_en_italien
topic: eqiad.mediawiki.recentchange
user: Àncilu bot
  1. 让我们看看有哪些类型的事件正在进入
SELECT
type,
count()
FROM rawEvents
GROUP BY type
   ┌─type───────┬─count()─┐
1. │ 142 │ 1 │
2. │ new │ 1003 │
3. │ categorize │ 12228 │
4. │ log │ 1799 │
5. │ edit │ 17142 │
└────────────┴─────────┘

让我们定义一个链接到当前物化视图的物化视图。我们将跟踪每分钟的一些聚合统计信息

CREATE TABLE byMinute
(
`dateTime` DateTime64(3, 'UTC') NOT NULL,
`users` AggregateFunction(uniq, String),
`pages` AggregateFunction(uniq, String),
`updates` AggregateFunction(sum, UInt32)
)
ENGINE = AggregatingMergeTree
ORDER BY dateTime;

CREATE MATERIALIZED VIEW byMinute_mv TO byMinute
AS SELECT
toStartOfMinute(timestamp) AS dateTime,
uniqState(user) AS users,
uniqState(title_url) AS pages,
sumState(toUInt32(1)) AS updates
FROM rawEvents
GROUP BY dateTime;
  1. 我们将需要 -Merge 函数来查看结果
SELECT
dateTime AS dateTime,
uniqMerge(users) AS users,
uniqMerge(pages) AS pages,
sumMerge(updates) AS updates
FROM byMinute
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;