跳至主要内容

如何将数据从 Kafka 导入 ClickHouse

·阅读时间:6分钟

**概述:**本文介绍了将数据从 Kafka 主题发送到 ClickHouse 表的过程。我们将使用维基百科最近更改的 Feed,它提供了一个表示对各种维基媒体属性所做的更改的事件流。步骤包括

  1. 如何在 Ubuntu 上设置 Kafka
  2. 将数据流导入 Kafka 主题
  3. 创建一个订阅该主题的 ClickHouse 表

1. 在 Ubuntu 上设置 Kafka

  1. 创建一个 Ubuntu ec2 实例并通过 SSH 连接到它
ssh -i ~/training.pem [email protected]
  1. 安装 Kafka(基于此处的说明:https://www.linode.com/docs/guides/how-to-install-apache-kafka-on-ubuntu/)
sudo apt update
sudo apt install openjdk-11-jdk

mkdir /home/ubuntu/kafka
cd /home/ubuntu/kafka/

wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz

tar -zxvf kafka_2.13-3.7.0.tgz
  1. 启动 ZooKeeper
cd kafka_2.13-3.7.0
bin/zookeeper-server-start.sh config/zookeeper.properties
  1. 打开一个新的控制台并启动 Kafka
ssh -i ~/training.pem [email protected]
cd kafka/kafka_2.13-3.7.0/
bin/kafka-server-start.sh config/server.properties
  1. 打开第三个控制台并创建一个名为 wikimedia 的主题
ssh -i ~/training.pem [email protected]
cd kafka/kafka_2.13-3.7.0/

bin/kafka-topics.sh --create --topic wikimedia --bootstrap-server localhost:9092
  1. 您可以通过以下方式验证它是否已成功创建
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

2. 将维基媒体流导入 Kafka

  1. 我们首先需要一些实用程序
sudo apt-get install librdkafka-dev libyajl-dev
sudo apt-get install kafkacat
  1. 使用巧妙的 curl 命令将数据发送到 Kafka,该命令获取最新的维基媒体事件,解析 JSON 数据并将数据发送到 Kafka 主题
curl -N https://stream.wikimedia.org/v2/stream/recentchange  | awk '/^data: /{gsub(/^data: /, ""); print}' | kafkacat -P -b localhost:9092 -t wikimedia
  1. 您可以“描述”该主题
bin/kafka-topics.sh --describe --topic wikimedia --bootstrap-server localhost:9092
  1. 让我们通过使用一些事件来验证一切是否正常
bin/kafka-console-consumer.sh --topic wikimedia --from-beginning --bootstrap-server localhost:9092
  1. Ctrl+c 终止之前的命令。

3. 将数据导入 ClickHouse

  1. 以下是传入数据的示例
{
"$schema": "/mediawiki/recentchange/1.0.0",
"meta": {
"uri": "https://www.wikidata.org/wiki/Q45791749",
"request_id": "f64cfb17-04ba-4d09-8935-38ec6f0001c2",
"id": "9d7d2b5a-b79b-45ea-b72c-69c3b69ae931",
"dt": "2024-04-18T13:21:21Z",
"domain": "www.wikidata.org",
"stream": "mediawiki.recentchange",
"topic": "eqiad.mediawiki.recentchange",
"partition": 0,
"offset": 5032636513
},
"id": 2196113017,
"type": "edit",
"namespace": 0,
"title": "Q45791749",
"title_url": "https://www.wikidata.org/wiki/Q45791749",
"comment": "/* wbsetqualifier-add:1| */ [[Property:P1545]]: 20, Modify PubMed ID: 7292984 citation data from NCBI, Europe PMC and CrossRef",
"timestamp": 1713446481,
"user": "Cewbot",
"bot": true,
"notify_url": "https://www.wikidata.org/w/index.php?diff=2131981357&oldid=2131981341&rcid=2196113017",
"minor": false,
"patrolled": true,
"length": {
"old": 75618,
"new": 75896
},
"revision": {
"old": 2131981341,
"new": 2131981357
},
"server_url": "https://www.wikidata.org",
"server_name": "www.wikidata.org",
"server_script_path": "/w",
"wiki": "wikidatawiki",
"parsedcomment": "<span dir=\"auto\"><span class=\"autocomment\">Added qualifier: </span></span> <a href=\"/wiki/Property:P1545\" title=\"series ordinal | position of an item in its parent series (most frequently a 1-based index), generally to be used as a qualifier (different from &quot;rank&quot; defined as a class, and from &quot;ranking&quot; defined as a property for evaluating a quality).\"><span class=\"wb-itemlink\"><span class=\"wb-itemlink-label\" lang=\"en\" dir=\"ltr\">series ordinal</span> <span class=\"wb-itemlink-id\">(P1545)</span></span></a>: 20, Modify PubMed ID: 7292984 citation data from NCBI, Europe PMC and CrossRef"
}
  1. 我们将需要 Kafka 表引擎从 Kafka 主题中提取数据
CREATE OR REPLACE TABLE wikiQueue
(
`id` UInt32,
`type` String,
`title` String,
`title_url` String,
`comment` String,
`timestamp` UInt64,
`user` String,
`bot` Bool,
`server_url` String,
`server_name` String,
`wiki` String,
`meta` Tuple(uri String, id String, stream String, topic String, domain String)
)
ENGINE = Kafka(
'ec2.compute.amazonaws.com:9092',
'wikimedia',
'consumer-group-wiki',
'JSONEachRow'
);
  1. 由于某种原因,Kafka 表引擎似乎获取公共 ec2 URL 并将其转换为私有 DNS 名称,因此我必须将其添加到我的本地 /etc/hosts 文件中
52.14.154.92  ip.us-east-2.compute.internal
  1. 您可以从 Kafka 表中读取数据,只需启用一个设置即可
SELECT *
FROM wikiQueue
LIMIT 20
FORMAT Vertical
SETTINGS stream_like_engine_allow_direct_select = 1;

根据 wikiQueue 表中定义的列,行应该被很好地解析

id:          2473996741
type: edit
title: File:Père-Lachaise - Division 6 - Cassereau 05.jpg
title_url: https://commons.wikimedia.org/wiki/File:P%C3%A8re-Lachaise_-_Division_6_-_Cassereau_05.jpg
comment: /* wbcreateclaim-create:1| */ [[d:Special:EntityPage/P921]]: [[d:Special:EntityPage/Q112327116]], [[:toollabs:quickstatements/#/batch/228454|batch #228454]]
timestamp: 1713457283
user: Ameisenigel
bot: false
server_url: https://commons.wikimedia.org
server_name: commons.wikimedia.org
wiki: commonswiki
meta: ('https://commons.wikimedia.org/wiki/File:P%C3%A8re-Lachaise_-_Division_6_-_Cassereau_05.jpg','01a832e2-24c5-4ccb-bd93-8e2c0e429418','mediawiki.recentchange','eqiad.mediawiki.recentchange','commons.wikimedia.org')
  1. 我们需要一个 MergeTree 表来存储这些传入事件
CREATE TABLE rawEvents (
id UInt64,
type LowCardinality(String),
comment String,
timestamp DateTime64(3, 'UTC'),
title_url String,
topic LowCardinality(String),
user String
)
ENGINE = MergeTree
ORDER BY (type, timestamp);
  1. 让我们定义一个物化视图,当 Kafka 表上发生插入操作时触发该视图,并将数据发送到我们的 rawEvents
CREATE MATERIALIZED VIEW rawEvents_mv TO rawEvents
AS
SELECT
id,
type,
comment,
toDateTime(timestamp) AS timestamp,
title_url,
tupleElement(meta, 'topic') AS topic,
user
FROM wikiQueue
WHERE title_url <> '';
  1. 您应该会立即看到数据进入 rawEvents
SELECT count()
FROM rawEvents;
  1. 让我们查看一些行
SELECT *
FROM rawEvents
LIMIT 5
FORMAT Vertical
Row 1:
──────
id: 124842852
type: 142
comment: Pere prlpz commented on "Plantilles Enciclopèdia Catalana" (Diria que no cal fer res als articles. Es pot actualitzar els enllaços que es facin servir a les referències (tot i que l'antic encara ha...)
timestamp: 2024-04-18 16:22:29.000
title_url: https://ca.wikipedia.org/wiki/Tema:Wu36d6vfsiuu4jsi
topic: eqiad.mediawiki.recentchange
user: Pere prlpz

Row 2:
──────
id: 2473996748
type: categorize
comment: [[:File:Ruïne van een poortgebouw, RP-T-1976-29-6(R).jpg]] removed from category
timestamp: 2024-04-18 16:21:20.000
title_url: https://commons.wikimedia.org/wiki/Category:Pieter_Moninckx
topic: eqiad.mediawiki.recentchange
user: Warburg1866

Row 3:
──────
id: 311828596
type: categorize
comment: [[:Cujo (película)]] añadida a la categoría
timestamp: 2024-04-18 16:21:21.000
title_url: https://es.wikipedia.org/wiki/Categor%C3%ADa:Pel%C3%ADculas_basadas_en_obras_de_Stephen_King
topic: eqiad.mediawiki.recentchange
user: Beta15

Row 4:
──────
id: 311828597
type: categorize
comment: [[:Cujo (película)]] eliminada de la categoría
timestamp: 2024-04-18 16:21:21.000
title_url: https://es.wikipedia.org/wiki/Categor%C3%ADa:Trabajos_basados_en_obras_de_Stephen_King
topic: eqiad.mediawiki.recentchange
user: Beta15

Row 5:
──────
id: 48494536
type: categorize
comment: [[:braiteremmo]] ajoutée à la catégorie
timestamp: 2024-04-18 16:21:21.000
title_url: https://fr.wiktionary.org/wiki/Cat%C3%A9gorie:Wiktionnaire:Exemples_manquants_en_italien
topic: eqiad.mediawiki.recentchange
user: Àncilu bot
  1. 让我们看看有哪些类型的事件正在传入
SELECT
type,
count()
FROM rawEvents
GROUP BY type
   ┌─type───────┬─count()─┐
1. │ 142 │ 1 │
2. │ new │ 1003 │
3. │ categorize │ 12228 │
4. │ log │ 1799 │
5. │ edit │ 17142 │
└────────────┴─────────┘

让我们定义一个链接到当前物化视图的物化视图。我们将跟踪每分钟的一些聚合统计信息

CREATE TABLE byMinute
(
`dateTime` DateTime64(3, 'UTC') NOT NULL,
`users` AggregateFunction(uniq, String),
`pages` AggregateFunction(uniq, String),
`updates` AggregateFunction(sum, UInt32)
)
ENGINE = AggregatingMergeTree
ORDER BY dateTime;

CREATE MATERIALIZED VIEW byMinute_mv TO byMinute
AS SELECT
toStartOfMinute(timestamp) AS dateTime,
uniqState(user) AS users,
uniqState(title_url) AS pages,
sumState(toUInt32(1)) AS updates
FROM rawEvents
GROUP BY dateTime;
  1. 我们将需要 -Merge 函数来查看结果
SELECT
dateTime AS dateTime,
uniqMerge(users) AS users,
uniqMerge(pages) AS pages,
sumMerge(updates) AS updates
FROM byMinute
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;