**概述:**本文介绍了将数据从 Kafka 主题发送到 ClickHouse 表的过程。我们将使用维基百科最近更改的 Feed,它提供了一个表示对各种维基媒体属性所做的更改的事件流。步骤包括
- 如何在 Ubuntu 上设置 Kafka
- 将数据流导入 Kafka 主题
- 创建一个订阅该主题的 ClickHouse 表
1. 在 Ubuntu 上设置 Kafka
- 创建一个 Ubuntu ec2 实例并通过 SSH 连接到它
ssh -i ~/training.pem [email protected]
sudo apt update
sudo apt install openjdk-11-jdk
mkdir /home/ubuntu/kafka
cd /home/ubuntu/kafka/
wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
tar -zxvf kafka_2.13-3.7.0.tgz
- 启动 ZooKeeper
cd kafka_2.13-3.7.0
bin/zookeeper-server-start.sh config/zookeeper.properties
- 打开一个新的控制台并启动 Kafka
ssh -i ~/training.pem [email protected]
cd kafka/kafka_2.13-3.7.0/
bin/kafka-server-start.sh config/server.properties
- 打开第三个控制台并创建一个名为 wikimedia 的主题
ssh -i ~/training.pem [email protected]
cd kafka/kafka_2.13-3.7.0/
bin/kafka-topics.sh --create --topic wikimedia --bootstrap-server localhost:9092
- 您可以通过以下方式验证它是否已成功创建
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
2. 将维基媒体流导入 Kafka
- 我们首先需要一些实用程序
sudo apt-get install librdkafka-dev libyajl-dev
sudo apt-get install kafkacat
- 使用巧妙的 curl 命令将数据发送到 Kafka,该命令获取最新的维基媒体事件,解析 JSON 数据并将数据发送到 Kafka 主题
curl -N https://stream.wikimedia.org/v2/stream/recentchange | awk '/^data: /{gsub(/^data: /, ""); print}' | kafkacat -P -b localhost:9092 -t wikimedia
- 您可以“描述”该主题
bin/kafka-topics.sh --describe --topic wikimedia --bootstrap-server localhost:9092
- 让我们通过使用一些事件来验证一切是否正常
bin/kafka-console-consumer.sh --topic wikimedia --from-beginning --bootstrap-server localhost:9092
- 按 Ctrl+c 终止之前的命令。
3. 将数据导入 ClickHouse
- 以下是传入数据的示例
{
"$schema": "/mediawiki/recentchange/1.0.0",
"meta": {
"uri": "https://www.wikidata.org/wiki/Q45791749",
"request_id": "f64cfb17-04ba-4d09-8935-38ec6f0001c2",
"id": "9d7d2b5a-b79b-45ea-b72c-69c3b69ae931",
"dt": "2024-04-18T13:21:21Z",
"domain": "www.wikidata.org",
"stream": "mediawiki.recentchange",
"topic": "eqiad.mediawiki.recentchange",
"partition": 0,
"offset": 5032636513
},
"id": 2196113017,
"type": "edit",
"namespace": 0,
"title": "Q45791749",
"title_url": "https://www.wikidata.org/wiki/Q45791749",
"comment": "/* wbsetqualifier-add:1| */ [[Property:P1545]]: 20, Modify PubMed ID: 7292984 citation data from NCBI, Europe PMC and CrossRef",
"timestamp": 1713446481,
"user": "Cewbot",
"bot": true,
"notify_url": "https://www.wikidata.org/w/index.php?diff=2131981357&oldid=2131981341&rcid=2196113017",
"minor": false,
"patrolled": true,
"length": {
"old": 75618,
"new": 75896
},
"revision": {
"old": 2131981341,
"new": 2131981357
},
"server_url": "https://www.wikidata.org",
"server_name": "www.wikidata.org",
"server_script_path": "/w",
"wiki": "wikidatawiki",
"parsedcomment": "<span dir=\"auto\"><span class=\"autocomment\">Added qualifier: </span></span> <a href=\"/wiki/Property:P1545\" title=\"series ordinal | position of an item in its parent series (most frequently a 1-based index), generally to be used as a qualifier (different from "rank" defined as a class, and from "ranking" defined as a property for evaluating a quality).\"><span class=\"wb-itemlink\"><span class=\"wb-itemlink-label\" lang=\"en\" dir=\"ltr\">series ordinal</span> <span class=\"wb-itemlink-id\">(P1545)</span></span></a>: 20, Modify PubMed ID: 7292984 citation data from NCBI, Europe PMC and CrossRef"
}
- 我们将需要 Kafka 表引擎从 Kafka 主题中提取数据
CREATE OR REPLACE TABLE wikiQueue
(
`id` UInt32,
`type` String,
`title` String,
`title_url` String,
`comment` String,
`timestamp` UInt64,
`user` String,
`bot` Bool,
`server_url` String,
`server_name` String,
`wiki` String,
`meta` Tuple(uri String, id String, stream String, topic String, domain String)
)
ENGINE = Kafka(
'ec2.compute.amazonaws.com:9092',
'wikimedia',
'consumer-group-wiki',
'JSONEachRow'
);
- 由于某种原因,Kafka 表引擎似乎获取公共 ec2 URL 并将其转换为私有 DNS 名称,因此我必须将其添加到我的本地
/etc/hosts
文件中
52.14.154.92 ip.us-east-2.compute.internal
- 您可以从 Kafka 表中读取数据,只需启用一个设置即可
SELECT *
FROM wikiQueue
LIMIT 20
FORMAT Vertical
SETTINGS stream_like_engine_allow_direct_select = 1;
根据 wikiQueue 表中定义的列,行应该被很好地解析
id: 2473996741
type: edit
title: File:Père-Lachaise - Division 6 - Cassereau 05.jpg
title_url: https://commons.wikimedia.org/wiki/File:P%C3%A8re-Lachaise_-_Division_6_-_Cassereau_05.jpg
comment: /* wbcreateclaim-create:1| */ [[d:Special:EntityPage/P921]]: [[d:Special:EntityPage/Q112327116]], [[:toollabs:quickstatements/#/batch/228454|batch #228454]]
timestamp: 1713457283
user: Ameisenigel
bot: false
server_url: https://commons.wikimedia.org
server_name: commons.wikimedia.org
wiki: commonswiki
meta: ('https://commons.wikimedia.org/wiki/File:P%C3%A8re-Lachaise_-_Division_6_-_Cassereau_05.jpg','01a832e2-24c5-4ccb-bd93-8e2c0e429418','mediawiki.recentchange','eqiad.mediawiki.recentchange','commons.wikimedia.org')
- 我们需要一个 MergeTree 表来存储这些传入事件
CREATE TABLE rawEvents (
id UInt64,
type LowCardinality(String),
comment String,
timestamp DateTime64(3, 'UTC'),
title_url String,
topic LowCardinality(String),
user String
)
ENGINE = MergeTree
ORDER BY (type, timestamp);
- 让我们定义一个物化视图,当 Kafka 表上发生插入操作时触发该视图,并将数据发送到我们的 rawEvents 表
CREATE MATERIALIZED VIEW rawEvents_mv TO rawEvents
AS
SELECT
id,
type,
comment,
toDateTime(timestamp) AS timestamp,
title_url,
tupleElement(meta, 'topic') AS topic,
user
FROM wikiQueue
WHERE title_url <> '';
- 您应该会立即看到数据进入 rawEvents
SELECT count()
FROM rawEvents;
- 让我们查看一些行
SELECT *
FROM rawEvents
LIMIT 5
FORMAT Vertical
Row 1:
──────
id: 124842852
type: 142
comment: Pere prlpz commented on "Plantilles Enciclopèdia Catalana" (Diria que no cal fer res als articles. Es pot actualitzar els enllaços que es facin servir a les referències (tot i que l'antic encara ha...)
timestamp: 2024-04-18 16:22:29.000
title_url: https://ca.wikipedia.org/wiki/Tema:Wu36d6vfsiuu4jsi
topic: eqiad.mediawiki.recentchange
user: Pere prlpz
Row 2:
──────
id: 2473996748
type: categorize
comment: [[:File:Ruïne van een poortgebouw, RP-T-1976-29-6(R).jpg]] removed from category
timestamp: 2024-04-18 16:21:20.000
title_url: https://commons.wikimedia.org/wiki/Category:Pieter_Moninckx
topic: eqiad.mediawiki.recentchange
user: Warburg1866
Row 3:
──────
id: 311828596
type: categorize
comment: [[:Cujo (película)]] añadida a la categoría
timestamp: 2024-04-18 16:21:21.000
title_url: https://es.wikipedia.org/wiki/Categor%C3%ADa:Pel%C3%ADculas_basadas_en_obras_de_Stephen_King
topic: eqiad.mediawiki.recentchange
user: Beta15
Row 4:
──────
id: 311828597
type: categorize
comment: [[:Cujo (película)]] eliminada de la categoría
timestamp: 2024-04-18 16:21:21.000
title_url: https://es.wikipedia.org/wiki/Categor%C3%ADa:Trabajos_basados_en_obras_de_Stephen_King
topic: eqiad.mediawiki.recentchange
user: Beta15
Row 5:
──────
id: 48494536
type: categorize
comment: [[:braiteremmo]] ajoutée à la catégorie
timestamp: 2024-04-18 16:21:21.000
title_url: https://fr.wiktionary.org/wiki/Cat%C3%A9gorie:Wiktionnaire:Exemples_manquants_en_italien
topic: eqiad.mediawiki.recentchange
user: Àncilu bot
- 让我们看看有哪些类型的事件正在传入
SELECT
type,
count()
FROM rawEvents
GROUP BY type
┌─type───────┬─count()─┐
1. │ 142 │ 1 │
2. │ new │ 1003 │
3. │ categorize │ 12228 │
4. │ log │ 1799 │
5. │ edit │ 17142 │
└────────────┴─────────┘
让我们定义一个链接到当前物化视图的物化视图。我们将跟踪每分钟的一些聚合统计信息
CREATE TABLE byMinute
(
`dateTime` DateTime64(3, 'UTC') NOT NULL,
`users` AggregateFunction(uniq, String),
`pages` AggregateFunction(uniq, String),
`updates` AggregateFunction(sum, UInt32)
)
ENGINE = AggregatingMergeTree
ORDER BY dateTime;
CREATE MATERIALIZED VIEW byMinute_mv TO byMinute
AS SELECT
toStartOfMinute(timestamp) AS dateTime,
uniqState(user) AS users,
uniqState(title_url) AS pages,
sumState(toUInt32(1)) AS updates
FROM rawEvents
GROUP BY dateTime;
- 我们将需要 -Merge 函数来查看结果
SELECT
dateTime AS dateTime,
uniqMerge(users) AS users,
uniqMerge(pages) AS pages,
sumMerge(updates) AS updates
FROM byMinute
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;