简介
最近,在帮助一位想要从 ClickHouse 中保存的向量计算质心的用户时,我们意识到相同的解决方案可以用于实现 K-Means 聚类。他们希望在可能数十亿的数据点上大规模解决此问题,同时确保可以严格管理内存。在这篇文章中,我们尝试仅使用 SQL 实现 K-means 聚类,并表明它可以扩展到数十亿行。
在撰写这篇博客的过程中,我们了解到 Boris Tyshkevich 所做的工作。虽然我们在这篇博客中使用了不同的方法,但我们要感谢 Boris 的工作以及他在我们之前就有了这个想法!
作为使用 ClickHouse SQL 实现 K-Means 的一部分,我们在 3 分钟内聚类了 1.7 亿次纽约市出租车行程。使用相同资源的等效 scikit-learn 操作需要 100 多分钟,并且需要 90GB 的 RAM。在没有内存限制且 ClickHouse 自动分配计算的情况下,我们表明 ClickHouse 可以加速机器学习工作负载并缩短迭代时间。
这篇博客文章的所有代码都可以在这里的 notebook 中找到。
为什么在 ClickHouse SQL 中使用 K-Means?
使用 ClickHouse SQL 进行 K-Means 的主要动机是训练不受内存限制,这使得聚类 PB 级数据集成为可能,这要归功于质心的增量计算(具有限制内存开销的设置)。相比之下,使用基于 Python 的方法跨服务器分发此工作负载将需要额外的框架和复杂性。
此外,我们可以轻松增加聚类中的并行级别,以充分利用 Clickhouse 实例的资源。如果我们需要处理更大的数据集,我们只需扩展数据库服务 - 这是 ClickHouse Cloud 中的一个简单操作。
转换 K-Means 数据是一个简单的 SQL 查询,每秒可以处理数十亿行。由于质心和点都保存在 ClickHouse 中,我们可以仅使用 SQL 计算模型误差等统计信息,并可能将我们的集群用于其他操作,例如向量搜索的产品量化。
K-Means 回顾
K-Means 是一种无监督机器学习算法,用于将数据集划分为 K 个不同的、非重叠的子组(集群),其中每个数据点都属于与其最近均值(集群的质心)的集群。该过程首先随机或基于某些启发式方法初始化 K 个质心。这些质心充当集群的初始代表。然后,该算法迭代执行两个主要步骤,直到收敛:分配和更新。
在分配步骤中,每个数据点都根据欧几里得距离(或其他距离度量)分配到最近的集群,该距离是数据点与质心之间的距离。在更新步骤中,质心被重新计算为分配给各自集群的所有点的均值,从而可能改变它们的位置。
此过程保证收敛,点的集群分配最终稳定,并且在迭代之间不会改变。集群的数量 K 需要预先指定,并且严重影响算法的有效性,最佳值取决于数据集和聚类的目标。有关更多详细信息,我们推荐这篇优秀概述。
点和质心
我们的用户提出的关键问题是有效计算质心的能力。假设我们有一个简单的transactions
表的数据模式,其中每一行代表特定客户的银行交易。ClickHouse 中的向量表示为Array
类型。
CREATE TABLE transactions
(
id UInt32,
vector Array(Float32),
-- e.g.[0.6860357,-1.0086979,0.83166444,-1.0089169,0.22888935]
customer UInt32,
...other columns omitted for brevity
)
ENGINE = MergeTree ORDER BY id
我们的用户想要找到每个客户的质心,实际上是与每个客户关联的所有交易向量的位置平均值。为了找到平均向量集,我们可以使用avgForEach
[1][2]函数。例如,考虑计算 3 个向量的平均值的示例,每个向量包含 4 个元素
WITH vectors AS
(
SELECT c1 AS vector
FROM VALUES([1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12])
)
SELECT avgForEach(vector) AS centroid
FROM vectors
┌─centroid──┐
│ [5,6,7,8] │
└───────────┘
在我们原始的transactions
表中,计算每个客户的平均值因此变为
SELECT customer, avgForEach(vector) AS centroid FROM transactions GROUP BY customer
虽然很简单,但这种方法有一些限制。首先,对于非常大的数据集,当vector
包含许多Float32
点,并且customer
列具有许多唯一元素(高基数)时,此查询可能会非常消耗内存。其次,并且可能与 K-Means 更相关的是,如果插入新行,此方法需要我们重新运行查询,这是低效的。我们可以通过物化视图和 AggregatingMergeTree 引擎来解决这些问题。
使用物化视图增量计算质心
物化视图允许我们将计算质心的成本转移到插入时间。与其他数据库不同,ClickHouse 物化视图只是一个触发器,它在数据块插入到表中时对数据块运行查询。此查询的结果将插入到第二个“目标”表。在我们的例子中,物化视图查询将计算我们的质心,并将结果插入到表centroids
。
这里有一些重要的细节
- 我们的查询(计算质心)必须以可以与后续结果集合并的格式生成结果集 - 因为每个插入的块都会生成一个结果集。与其仅将平均值发送到我们的
centroids
表(平均值的平均值将是不正确的),我们发送“平均状态”。平均状态表示包含每个向量位置的总和,以及计数。这是使用avgForEachState
函数实现的 - 请注意我们是如何将State
附加到我们的函数名称的!AggregatingMergeTree 表引擎是存储这些聚合状态所必需的。我们将在下面进一步探讨这一点。 - 整个过程是增量的,
centroids
表包含最终状态,即每个质心一行。读者会注意到,接收插入的表具有 Null 表引擎。这会导致插入的行被丢弃,从而节省了与每次迭代时写入完整数据集相关的 IO。 - 我们的物化视图的查询仅在块插入时执行。每个块中的行数可能因插入方法而异。如果客户端制定块,例如使用 Go 客户端,我们建议每个块至少包含 1000 行。如果服务器负责形成块(例如,通过 HTTP 插入时),则还可以指定大小。
- 如果使用
INSERT INTO SELECT
,其中 ClickHouse 从另一个表或外部源(例如 S3)读取行,则块大小可以由之前的博客中详细讨论的几个关键参数控制。这些设置(以及插入线程数)对内存使用量(较大的块 = 更多内存)和摄取速度(较大的块 = 更快)都有显着影响。这些设置意味着内存使用量可以被精细控制以换取性能。
AggregatingMergeTree
我们的目标表centroids
使用引擎AggregatingMergeTree
CREATE TABLE centroids
(
customer UInt32,
vector AggregateFunction(avgForEach, Array(Float32))
)
ENGINE = AggregatingMergeTree ORDER BY customer
我们的vector
列在此处包含由上面的avgForEachState
函数生成的聚合状态。这些是必须合并以产生最终答案的中间质心。此列需要是适当的类型AggregateFunction(avgForEach, Array(Float32))
。
像所有 ClickHouse MergeTree 表一样,AggregatingMergeTree 将数据存储为必须透明合并的部分,以提高查询效率。当合并包含聚合状态的部分时,必须这样做,以便仅合并与同一客户相关的状态。这通过使用ORDER BY
子句按customer
列对表进行排序来有效实现。在查询时,我们还必须确保对中间状态进行分组和合并。这可以通过确保我们按列customer
GROUP BY
并使用avgForEach
函数的 Merge 等效项来实现:avgForEachMerge。
SELECT customer, avgForEachMerge(vector) AS centroid
FROM centroids GROUP BY customer
所有聚合函数都有等效的状态函数,通过将
State
附加到其名称获得,它生成一个可以存储的中间表示,然后可以使用Merge
等效项检索和合并。有关更多详细信息,我们推荐这篇博客和我们自己的 Mark的视频。
与我们之前的GROUP BY
相比,此查询将非常快。计算平均值的大部分工作已转移到插入时间,只有少量行留给查询时间合并。考虑在 48GiB、12 vCPU 云服务上使用 1 亿个随机交易的以下两种方法的性能。加载数据的步骤在此。
对比从transactions
表计算质心的性能
SELECT customer, avgForEach(vector) AS centroid
FROM transactions GROUP BY customer
ORDER BY customer ASC
LIMIT 1 FORMAT Vertical
10 rows in set. Elapsed: 147.526 sec. Processed 100.00 million rows, 41.20 GB (677.85 thousand rows/s., 279.27 MB/s.)
Row 1:
──────
customer: 1
centroid: [0.49645231463677153,0.5042792240640065,...,0.5017436349466129]
1 row in set. Elapsed: 36.017 sec. Processed 100.00 million rows, 41.20 GB (2.78 million rows/s., 1.14 GB/s.)
Peak memory usage: 437.54 MiB.
与centroids
表的性能相比,后者速度快 1700 多倍
SELECT customer, avgForEachMerge(vector) AS centroid
FROM centroids GROUP BY customer
ORDER BY customer ASC
LIMIT 1
FORMAT Vertical
Row 1:
──────
customer: 1
centroid: [0.49645231463677153,0.5042792240640065,...,0.5017436349466129]
1 row in set. Elapsed: 0.085 sec. Processed 10.00 thousand rows, 16.28 MB (117.15 thousand rows/s., 190.73 MB/s.)
整合在一起
凭借我们增量计算质心的能力,让我们专注于 K-Means 聚类。假设我们正在尝试聚类表points
,其中每一行都有向量表示。在这里,我们将基于相似性进行聚类,而不仅仅是像我们对交易那样基于客户来确定质心。
单次迭代
我们需要能够在算法的每次迭代后存储当前的质心。现在,假设我们已经确定了 K 的最佳值。我们的质心的目标表可能如下所示
CREATE TABLE centroids
(
k UInt32,
iteration UInt32,
centroid UInt32,
vector AggregateFunction(avgForEach, Array(Float32))
)
ENGINE = AggregatingMergeTree
ORDER BY (k, iteration, centroid)
k
列的值设置为我们选择的 K 值。我们的centroid
列在此处表示质心编号本身,其值介于 0 和K-1
之间。我们没有为算法的每次迭代使用单独的表,而是简单地包含一个iteration
列,并确保我们的排序键是(k, iteration, centroid)
。ClickHouse 将确保仅针对每个唯一的 K、质心和迭代合并中间状态。这意味着我们的最终行数将很小,从而确保快速查询这些质心。
我们用于计算质心的物化视图应该是熟悉的,只需稍作调整即可GROUP BY k、质心和迭代
CREATE TABLE temp
(
k UInt32,
iteration UInt32,
centroid UInt32,
vector Array(Float32)
)
ENGINE = Null
CREATE MATERIALIZED VIEW centroids_mv TO centroids
AS SELECT k, iteration, centroid, avgForEachState(vector) AS vector
FROM temp GROUP BY k, centroid, iteration
请注意,我们的查询在插入到temp
表(而不是我们的数据源表 transactions)的块上执行,该表没有iteration
或centroid
列。此临时表将接收我们的插入,并再次使用 Null 表引擎以避免写入数据。有了这些构建块,我们可以可视化算法的单次迭代,假设K = 5
上面显示了我们如何插入到我们的临时表中,从而通过对points
表执行INSERT INTO SELECT
作为我们的源数据来计算我们的质心。此插入有效地表示算法的一次迭代。这里的SELECT
查询至关重要,因为它需要指定交易向量及其当前质心和迭代(以及 K 的固定值)。我们如何计算后两者?完整的INSERT INTO SELECT
如下所示
INSERT INTO temp
WITH
5 as k_val,
-- (1) obtain the max value of iteration - will be the previous iteration
(
SELECT max(iteration)
FROM centroids
-- As later we will reuse this table for all values of K
WHERE k = k_val
) AS c_iteration,
(
-- (3) convert centroids into a array of tuples
-- i.e. [(0, [vector]), (1, [vector]), ... , (k-1, [vector])]
SELECT groupArray((centroid, position))
FROM
(
-- (2) compute the centroids from the previous iteration
SELECT
centroid,
avgForEachMerge(vector) AS position
FROM centroids
WHERE iteration = c_iteration AND k = k_val
GROUP BY centroid
)
) AS c_centroids
SELECT
k_val AS k,
-- (4) increment the iteration
c_iteration + 1 AS iteration,
-- (5) find the closest centroid for this vector using Euclidean distance
(arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).1 AS centroid,
vector AS v
FROM points
首先,在 (1) 处,此查询标识上一次迭代的编号。然后在 (2) 处的 CTE 中使用它来确定此迭代(和选定的 K)生成的质心,使用之前显示的相同avgForEachMerge
查询。这些质心通过groupArray
查询折叠成包含元组数组的单行,以便于与点匹配。在SELECT
中,我们递增迭代次数 (4) 并计算新的最近质心(使用欧几里得距离L2Distance
函数),方法是为每个点使用arrayMap
和arraySort
函数。
通过将行插入到此处的临时表,质心基于上一次迭代,我们可以让物化视图计算新的质心(迭代值为 +1)。
初始化质心
以上假设我们有一些迭代 1 的初始质心,用于计算成员资格。这需要我们初始化系统。我们可以通过使用以下查询(k=5)简单地选择和插入 K 个随机点来做到这一点
INSERT INTO temp WITH
5 as k_val,
vectors AS
(
SELECT vector
FROM points
-- select random points, use k to make pseudo-random
ORDER BY cityHash64(concat(toString(id), toString(k_val))) ASC
LIMIT k_val -- k
)
SELECT
k_val as k,
1 AS iteration,
rowNumberInAllBlocks() AS centroid,
vector
FROM vectors
成功的聚类对质心的初始放置非常敏感;不良的分配会导致收敛缓慢或聚类效果不佳。我们稍后将对此进行讨论。
质心分配以及何时停止迭代
以上所有内容都代表一次迭代(和初始化步骤)。在每次迭代之后,我们需要根据聚类是否已收敛的经验测量来决定是否停止。最简单的方法是当点在迭代之间不再更改质心(和集群)时停止。
为了确定哪些点属于哪些质心,我们可以随时使用上面
INSERT INTO SELECT
中的 SELECT。
为了计算上次迭代中移动集群的点数,我们首先计算前两次迭代的质心 (1) 和 (2)。使用这些质心,我们确定每次迭代中每个点的质心 (3) 和 (4)。如果这些质心相同 (5),我们返回 0,否则返回 1。这些 (6) 值的总和为我们提供了移动集群的点数。
WITH 5 as k_val,
(
SELECT max(iteration)
FROM centroids
) AS c_iteration,
(
-- (1) current centroids
SELECT groupArray((centroid, position))
FROM
(
SELECT
centroid,
avgForEachMerge(vector) AS position
FROM centroids
WHERE iteration = c_iteration AND k = k_val
GROUP BY centroid
)
) AS c_centroids,
(
-- (2) previous centroids
SELECT groupArray((centroid, position))
FROM
(
SELECT
centroid,
avgForEachMerge(vector) AS position
FROM centroids
WHERE iteration = (c_iteration-1) AND k = k_val
GROUP BY centroid
)
) AS c_p_centroids
-- (6) sum differences
SELECT sum(changed) FROM (
SELECT id,
-- (3) current centroid for point
(arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).1 AS cluster,
-- (4) previous centroid for point
(arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_p_centroids))[1]).1 AS cluster_p,
-- (5) difference in allocation
if(cluster = cluster_p, 0, 1) as changed
FROM points
)
测试数据集
以上大部分是理论性的。让我们看看以上方法是否真的适用于真实数据集!为此,我们将使用流行的纽约市出租车数据集的 300 万行子集,因为集群有望具有相关性。从 S3 创建和插入数据的步骤
CREATE TABLE trips (
trip_id UInt32,
pickup_datetime DateTime,
dropoff_datetime DateTime,
pickup_longitude Nullable(Float64),
pickup_latitude Nullable(Float64),
dropoff_longitude Nullable(Float64),
dropoff_latitude Nullable(Float64),
passenger_count UInt8,
trip_distance Float32,
fare_amount Float32,
extra Float32,
tip_amount Float32,
tolls_amount Float32,
total_amount Float32,
payment_type Enum('CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4, 'UNK' = 5),
pickup_ntaname LowCardinality(String),
dropoff_ntaname LowCardinality(String)
)
ENGINE = MergeTree
ORDER BY (pickup_datetime, dropoff_datetime);
INSERT INTO trips SELECT trip_id, pickup_datetime, dropoff_datetime, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, passenger_count, trip_distance, fare_amount, extra, tip_amount, tolls_amount, total_amount, payment_type, pickup_ntaname, dropoff_ntaname
FROM gcs('https://storage.googleapis.com/clickhouse-public-datasets/nyc-taxi/trips_{0..2}.gz', 'TabSeparatedWithNames');
特征选择
特征选择对于良好的聚类至关重要,因为它直接影响形成的集群的质量。我们不会在此处详细介绍我们如何选择特征。对于感兴趣的人,我们将注释包含在notebook中。我们最终得到以下points
表
CREATE TABLE points
(
`id` UInt32,
`vector` Array(Float32),
`pickup_hour` UInt8,
`pickup_day_of_week` UInt8,
`pickup_day_of_month` UInt8,
`dropoff_hour` UInt8,
`pickup_longitude` Float64,
`pickup_latitude` Float64,
`dropoff_longitude` Float64,
`dropoff_latitude` Float64,
`passenger_count` UInt8,
`trip_distance` Float32,
`fare_amount` Float32,
`total_amount` Float32
) ENGINE = MergeTree ORDER BY id
为了填充此表,我们使用INSERT INTO SELECT
SQL 查询,它创建特征,缩放特征并过滤掉任何异常值。请注意,我们的最终列也编码在vector
列中。
链接的查询是我们首次尝试生成特征。我们希望这里可以进行更多工作,这可能会产生比所示更好的结果。欢迎提出建议!
一点 Python
我们已经描述了算法中的迭代如何有效地简化为INSERT INTO SELECT
,物化视图处理质心的维护。这意味着我们需要调用此语句 N 次,直到发生收敛。
我们没有等待达到点在质心之间不再移动的状态,而是使用了 1000 的阈值,即如果移动集群的点少于 1000 个,我们就停止。每 5 次迭代进行一次此检查。
对于特定的 K 值执行 K-Means 的伪代码变得非常简单,因为大部分工作由 ClickHouse 执行。
def kmeans(k, report_every = 5, min_cluster_move = 1000):
startTime = time.time()
# INITIALIZATION QUERY
run_init_query(k)
i = 0
while True:
# ITERATION QUERY
run_iteration_query(k)
# report every N iterations
if (i + 1) % report_every == 0 or i == 0:
num_moved = calculate_points_moved(k)
if num_moved <= min_cluster_move:
break
i += 1
execution_time = (time.time() - startTime))
# COMPUTE d^2 ERROR
d_2_error = compute_d2_error(k)
# return the d^2, execution time and num of required iterations
return d_2_error, execution_time, i+1
此循环的完整代码(包括查询)可以在notebook中找到。
选择 K
到目前为止,我们假设 K 已经被确定。有几种技术可以确定 K 的最佳值,其中最简单的方法是计算每个 K 值的每个点与其各自集群之间的聚合平方距离 (SSE)。这为我们提供了一个我们旨在最小化的成本指标。compute_d2_error
方法使用以下 SQL 查询计算此值(假设 K 值为 5)
WITH 5 as k_val,
(
SELECT max(iteration)
FROM centroids WHERE k={k}
) AS c_iteration,
(
SELECT groupArray((centroid, position))
FROM
(
SELECT
centroid,
avgForEachMerge(vector) AS position
FROM centroids
WHERE iteration = c_iteration AND k=k_val
GROUP BY centroid
)
) AS c_centroids
SELECT
sum(pow((arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).2, 2)) AS distance
FROM points
当我们增加 K 时,此值保证会减小,例如,如果我们将 K 设置为点数,则每个集群将有 1 个点,从而使我们的误差为 0。不幸的是,这不会很好地概括数据!
随着 K 的增加,SSE 通常会减小,因为数据点更接近其集群质心。目标是找到“肘点”,其中 SSE 的减少率急剧变化。此点表示增加 K 的好处的回报递减。在肘点选择 K 可以提供一个模型,该模型捕获数据中的固有分组,而不会过度拟合。识别此肘点的简单方法是绘制 K 与 SEE 的关系图,并直观地识别该值。对于我们的纽约市出租车数据,我们测量并绘制 K 值 2 到 20 的 SSE
这里的肘点不像我们希望的那样清晰,但值 5 似乎是一个合理的候选值。
以上结果基于每个 K 值的单次端到端运行。K-Means 可能会收敛到局部最小值,并且不能保证附近的点最终会进入同一个集群。建议为每个 K 值运行多个值,每次都使用不同的初始质心,以找到最佳候选值。
结果
如果我们选择 5 作为我们的 K 值,则该算法大约需要 30 次迭代和 20 秒才能在 12 vCPU ClickHouse Cloud 节点上收敛。此方法考虑每次迭代的所有 300 万行。
k=5
initializing...OK
Iteration 0
Number changed cluster in first iteration: 421206
Iteration 1, 2, 3, 4
Number changed cluster in iteration 5: 87939
Iteration 5, 6, 7, 8, 9
Number changed cluster in iteration 10: 3610
Iteration 10, 11, 12, 13, 14
Number changed cluster in iteration 15: 1335
Iteration 15, 16, 17, 18, 19
Number changed cluster in iteration 20: 1104
Iteration 20, 21, 22, 23, 24
Number changed cluster in iteration 25: 390
stopping as moved less than 1000 clusters in last iteration
Execution time in seconds: 20.79200577735901
D^2 error for 5: 33000373.34968858
为了可视化这些集群,我们需要降低维度。为此,我们使用主成分分析 (PCA)。我们将 PCA 在 SQL 中的实现推迟到另一篇博客,而只使用 Python 和 10,000 个随机点的样本。我们可以通过检查主成分解释了多少方差来评估 PCA 在捕获数据基本属性方面的有效性。82% 低于通常使用的 90% 阈值,但足以理解我们聚类的有效性
Explained variances of the 3 principal components: 0.824
使用我们的 3 个主成分,我们可以绘制相同的随机 10,000 个点,并根据每个点的集群为其关联颜色。
集群的 PCA 可视化显示了 PC1 和 PC3 上的一个密集平面,整齐地分为四个不同的集群,表明这些维度内的方差受到约束。沿着第二个主成分 (PC2),可视化变得更加稀疏,集群(编号 3)偏离了主组,可能特别有趣。
为了理解我们的集群,我们需要标签。理想情况下,我们将通过探索每个集群中每一列的分布来生成这些标签,寻找独特的特征和时间/空间模式。我们将尝试使用 SQL 查询简洁地做到这一点,以了解每个集群中每一列的分布。对于要关注的列,我们可以检查 PCA 成分的值并识别占主导地位的维度。执行此操作的代码可以在 notebook 中找到,并标识以下内容
PCA1:: ['pickup_day_of_month: 0.9999497049810415', 'dropoff_latitude: -0.006371842399701939', 'pickup_hour: 0.004444108327647353', 'dropoff_hour: 0.003868258226185553', …]
PCA 2:: ['total_amount: 0.5489526881298809', 'fare_amount: 0.5463895585884886', 'pickup_longitude: 0.43181504878694826', 'pickup_latitude: -0.3074228612885196', 'dropoff_longitude: 0.2756342866763702', 'dropoff_latitude: -0.19809343490462433', …]
PCA 3:: ['dropoff_hour: -0.6998176337701472', 'pickup_hour: -0.6995098287872831', 'pickup_day_of_week: 0.1134719682173672', 'pickup_longitude: -0.05495391127067617', …]
对于 PCA1,pickup_day_of_month
很重要,这表明重点关注月份中的时间。对于 PC2 维度,上车和下车地点以及乘车费用似乎贡献很大。此组件可能侧重于特定的行程类型。最后,对于 PC3,行程发生的小时似乎是最相关的。为了了解这些列在每个集群中在时间、日期和价格方面的差异,我们可以再次仅使用 SQL 查询
WITH
5 AS k_val,
(
SELECT max(iteration)
FROM centroids
WHERE k = k_val
) AS c_iteration,
(
SELECT groupArray((centroid, position))
FROM
(
SELECT
centroid,
avgForEachMerge(vector) AS position
FROM centroids
WHERE (iteration = c_iteration) AND (k = k_val)
GROUP BY centroid
)
) AS c_centroids
SELECT
(arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).1 AS cluster,
floor(avg(pickup_day_of_month)) AS pickup_day_of_month,
round(avg(pickup_hour)) AS avg_pickup_hour,
round(avg(fare_amount)) AS avg_fare_amount,
round(avg(total_amount)) AS avg_total_amount
FROM points
GROUP BY cluster
ORDER BY cluster ASC
┌─cluster─┬─pickup_day_of_month─┬─avg_pickup_hour─┬─avg_fare_amount─┬─avg_total_amount─┐
│ 0 │ 11 │ 14 │ 11 │ 13 │
│ 1 │ 3 │ 14 │ 12 │ 14 │
│ 2 │ 18 │ 13 │ 11 │ 13 │
│ 3 │ 16 │ 14 │ 49 │ 58 │
│ 4 │ 26 │ 14 │ 12 │ 14 │
└─────────┴─────────────────────┴─────────────────┴─────────────────┴──────────────────┘
9 rows in set. Elapsed: 0.625 sec. Processed 2.95 million rows, 195.09 MB (4.72 million rows/s., 312.17 MB/s.)
Peak memory usage: 720.16 MiB.
集群 3 显然与更昂贵的行程相关联。鉴于行程费用与主成分相关联,该主成分也将上车和下车地点识别为关键,因此这些可能与特定的行程类型相关联。其他集群需要更深入的分析,但似乎侧重于每月模式。我们可以将仅集群 3 的上车和下车地点绘制在地图可视化上。蓝色和红色点分别代表以下图中的上车和下车地点
仔细检查该图,此集群与往返 JFK 的机场行程相关联。
扩展
我们之前的示例仅使用了纽约市出租车行程的 300 万行子集。在更大的数据集上测试 2009 年的所有出租车行程(1.7 亿行),我们可以在大约 3 分钟内完成 k=5 的聚类,使用的 ClickHouse 服务有 60 个内核。
k=5
initializing...OK
…
Iteration 15, 16, 17, 18, 19
Number changed cluster in iteration 20: 288
stopping as moved less than 1000 clusters in last iteration
Execution time in seconds: 178.61135005950928
D^2 error for 5: 1839404623.265372
Completed in 178.61135005950928s and 20 iterations with error 1839404623.265372
这产生了与我们之前较小子集相似的集群。在 64 核m5d.16xlarge
上使用 scikit-learn 运行相同的聚类需要 6132 秒,速度慢了 34 倍以上!重现此基准测试的步骤可以在 notebook 的末尾找到,并使用这些步骤进行 scikit-learn。
潜在的改进和未来的工作
聚类对选择的初始点非常敏感。K-Means++ 是对标准 K-Means 聚类的改进,它通过引入更智能的初始化过程来解决此问题,该过程旨在分散初始质心,减少初始质心放置不佳的可能性,并导致更快的收敛以及可能更好的聚类。我们将此作为练习留给读者进行改进。
K-Means 也难以处理分类变量。这可以部分通过单热编码(在 SQL 中也是可能的)以及专用于此类数据的算法(例如KModes 聚类)来处理。特定领域的自定义距离函数(而不仅仅是欧几里得距离)也很常见,应该可以使用用户定义函数 (UDF) 来实现。
最后,探索其他软聚类算法(例如用于正态分布特征的高斯混合模型)或分层聚类算法(例如凝聚聚类)也可能很有趣。后一种方法也克服了 K-Means 的主要限制之一 - 需要指定 K。我们很乐意看到尝试在 ClickHouse SQL 中实现这些!