简介
最近,在帮助一位用户计算存储在 ClickHouse 中的向量质心时,我们意识到相同的解决方案可以用于实现 K-Means 聚类。他们希望在可能数十亿个数据点上实现大规模计算,同时确保内存可以严格管理。在这篇文章中,我们将尝试仅使用 SQL 实现 K-Means 聚类,并展示它可以扩展到数十亿行。
在撰写这篇博文时,我们了解到 Boris Tyshkevich 所做的工作。虽然我们在博文中使用不同的方法,但我们想表彰 Boris 的工作,并感谢他比我们更早地提出了这个想法!
作为使用 ClickHouse SQL 实现 K-Means 的一部分,我们在不到 3 分钟内对 1.7 亿个纽约市出租车行程进行了聚类。使用相同资源的等效 scikit-learn 操作需要超过 100 分钟,并且需要 90GB 的 RAM。在没有内存限制的情况下,ClickHouse 自动分配计算,我们展示了 ClickHouse 如何加速机器学习工作负载并缩短迭代时间。
这篇博文的所有代码都可以在 这里 的笔记本中找到。
为什么在 ClickHouse SQL 中使用 K-Means?
使用 ClickHouse SQL 进行 K-Means 的主要动机是,训练不受内存限制,由于质心的增量计算(以及限制内存开销的设置),可以对 PB 级数据集进行聚类。相比之下,使用基于 Python 的方法跨服务器分配此工作负载将需要额外的框架和复杂性。
此外,我们可以轻松地提高 聚类中的并行级别,以充分利用 Clickhouse 实例的全部资源。如果需要处理更大的数据集,我们只需扩展数据库服务——这在 ClickHouse Cloud 中是一个简单的操作。
将数据转换为 K-Means 是一个简单的 SQL 查询,每秒可以处理数十亿行。将质心和点存储在 ClickHouse 中,我们可以仅使用 SQL 计算统计信息,例如模型误差,并且可以将我们的聚类用于其他操作,例如用于向量搜索的产品量化。
K-Means 回顾
K-Means 是一种 无监督机器学习算法,用于将数据集划分为 K 个不同的非重叠子组(聚类),其中每个数据点都属于具有最近均值(聚类质心)的聚类。该过程首先通过随机或基于某种启发式方法初始化 K 个质心。这些质心作为聚类的初始代表。然后,该算法迭代两个主要步骤,直到收敛:分配和更新。
在分配步骤中,根据每个数据点与其质心之间的欧几里德距离(或其他距离度量)将其分配到最近的聚类。在更新步骤中,质心重新计算为分配到其各自聚类中的所有点的平均值,从而可能改变其位置。
该过程保证会收敛,点到聚类的分配最终会稳定下来,并且在迭代之间不会改变。聚类数量 K 需要事先指定,并且会极大地影响算法的有效性,最佳值取决于数据集和聚类的目标。有关更多详细信息,我们建议您阅读这篇 优秀的概述。
点和质心
我们的用户提出的关键问题是有效地计算质心的能力。假设我们有一个针对 transactions
表的简单数据模式,其中每行代表特定客户的银行交易。ClickHouse 中的向量表示为 Array
类型。
CREATE TABLE transactions
(
id UInt32,
vector Array(Float32),
-- e.g.[0.6860357,-1.0086979,0.83166444,-1.0089169,0.22888935]
customer UInt32,
...other columns omitted for brevity
)
ENGINE = MergeTree ORDER BY id
我们的用户想要找到每个客户的质心,实际上是与每个客户关联的所有交易向量的平均位置。为了找到平均向量的集合,我们可以使用 avgForEach
[1][2] 函数。例如,考虑计算 3 个向量的平均值,每个向量具有 4 个元素
WITH vectors AS
(
SELECT c1 AS vector
FROM VALUES([1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12])
)
SELECT avgForEach(vector) AS centroid
FROM vectors
┌─centroid──┐
│ [5,6,7,8] │
└───────────┘
在我们最初的 transactions
表中,计算每个客户的平均值变为
SELECT customer, avgForEach(vector) AS centroid FROM transactions GROUP BY customer
虽然简单,但这种方法有一些局限性。首先,对于非常大的数据集,当 vector
包含许多 Float32
点并且 customer
列具有许多唯一元素(高基数)时,此查询可能非常占用内存。其次,可能与 K-Means 更相关的是,如果插入新行,则此方法需要我们重新运行查询,这效率低下。我们可以通过物化视图和 AggregatingMergeTree 引擎来解决这些问题。
使用物化视图增量计算质心
物化视图允许我们将计算质心的成本转移到插入时间。与其他数据库不同,ClickHouse 物化视图只是一个触发器,它在将数据块插入表时运行查询。此查询的结果将插入到第二个“目标”表中。在本例中,物化视图查询将计算我们的质心,并将结果插入到 centroids
表中。
这里有一些重要的细节
- 计算质心的查询必须以可以与后续结果集合并的格式生成结果集——因为插入的每个数据块都会生成一个结果集。与其仅仅将平均值发送到
centroids
表 (平均值的平均值将不正确),我们将发送 “平均状态”。平均状态表示包含每个向量位置的总和以及计数。这是使用avgForEachState
函数实现的——请注意,我们只是在函数名末尾添加了State
!AggregatingMergeTree 表引擎是存储这些聚合状态所必需的。我们将在下面对此进行更深入的探讨。 - 整个过程是增量的,
centroids
表包含最终状态,即每个质心对应一行。读者会注意到,接收插入的表具有 Null 表引擎。这会导致插入的行被丢弃,从而节省了与每次迭代写入完整数据集相关的 IO 操作。 - 物化视图的查询仅在数据块被插入时执行。每个数据块中的行数可能因插入方法而异。我们建议在客户端侧(例如,使用 Go 客户端)制定数据块时,每个数据块至少包含 1000 行。如果让服务器来生成数据块(例如,通过 HTTP 插入时),则大小也可以 指定。
- 如果使用
INSERT INTO SELECT
从 ClickHouse 读取另一张表或外部数据源(例如 S3)中的行,则可以通过几个关键参数来控制块大小,这些参数在之前的博客中进行了详细讨论。这些设置(以及插入线程数量)会对使用的内存(块越大,内存使用量就越大)和数据加载速度(块越大,速度就越快)产生重大影响。这些设置意味着可以使用内存量进行微调,以换取性能。
AggregatingMergeTree
我们的目标表 centroids
使用引擎AggregatingMergeTree
CREATE TABLE centroids
(
customer UInt32,
vector AggregateFunction(avgForEach, Array(Float32))
)
ENGINE = AggregatingMergeTree ORDER BY customer
此处的 vector
列包含由上面的avgForEachState
函数生成的聚合状态。这些是必须合并以产生最终答案的中间质心。该列需要使用合适的类型AggregateFunction(avgForEach, Array(Float32))
。
与所有 ClickHouse MergeTree 表一样,AggregatingMergeTree 将数据存储为必须透明合并的多个部分,以允许更有效地查询。当合并包含聚合状态的部分时,必须确保只合并属于同一客户的那些状态。这可以通过使用 ORDER BY
子句按 customer
列对表进行排序来有效实现。在查询时,我们还必须确保对中间状态进行分组和合并。这可以通过确保我们按 customer
列进行 GROUP BY
并使用 avgForEach
函数的 Merge 等效项:avgForEachMerge
来实现。
SELECT customer, avgForEachMerge(vector) AS centroid
FROM centroids GROUP BY customer
所有聚合函数都有一个等效的状态函数,可以通过在其名称末尾追加
State
来获得,它会生成一个可以存储的中间表示,然后可以检索并与Merge
等效项合并。有关更多详细信息,我们推荐这篇博客和来自我们Mark 的视频。
与我们之前的 GROUP BY
相比,此查询的速度非常快。计算平均值的大部分工作都已移至插入时间,查询时间合并的行数很少。请考虑在 48GiB、12 个 vCPU 云服务上使用 1 亿个随机交易时的以下两种方法的性能。加载数据的步骤在这里。
对比从 transactions
表计算质心的性能
SELECT customer, avgForEach(vector) AS centroid
FROM transactions GROUP BY customer
ORDER BY customer ASC
LIMIT 1 FORMAT Vertical
10 rows in set. Elapsed: 147.526 sec. Processed 100.00 million rows, 41.20 GB (677.85 thousand rows/s., 279.27 MB/s.)
Row 1:
──────
customer: 1
centroid: [0.49645231463677153,0.5042792240640065,...,0.5017436349466129]
1 row in set. Elapsed: 36.017 sec. Processed 100.00 million rows, 41.20 GB (2.78 million rows/s., 1.14 GB/s.)
Peak memory usage: 437.54 MiB.
与 centroids
表相比,速度快了 1700 倍
SELECT customer, avgForEachMerge(vector) AS centroid
FROM centroids GROUP BY customer
ORDER BY customer ASC
LIMIT 1
FORMAT Vertical
Row 1:
──────
customer: 1
centroid: [0.49645231463677153,0.5042792240640065,...,0.5017436349466129]
1 row in set. Elapsed: 0.085 sec. Processed 10.00 thousand rows, 16.28 MB (117.15 thousand rows/s., 190.73 MB/s.)
总结
有了对质心进行增量计算的能力,让我们关注 K 均值聚类。假设我们正在尝试对 points
表进行聚类,其中每一行都有一个向量表示。在这里,我们将根据相似性进行聚类,而不是像处理交易那样仅根据客户进行聚类。
单次迭代
我们需要能够在算法的每次迭代后存储当前质心。现在,假设我们已经确定了 K 的最优值。我们的质心目标表可能如下所示
CREATE TABLE centroids
(
k UInt32,
iteration UInt32,
centroid UInt32,
vector AggregateFunction(avgForEach, Array(Float32))
)
ENGINE = AggregatingMergeTree
ORDER BY (k, iteration, centroid)
k
列的值设置为我们选择的 K 值。此处的 centroid
列表示质心编号本身,其值介于 0 和 K-1
之间。我们没有为算法的每次迭代使用单独的表,而是简单地包含一个 iteration
列,并确保排序键为 (k, iteration, centroid)
。ClickHouse 将确保仅对每个唯一的 K、质心和迭代进行中间状态的合并。这意味着我们的最终行数将很小,从而确保快速查询这些质心。
我们的物化视图用于计算质心,只需对 GROUP BY k, centroid
和 iteration
进行少量调整即可
CREATE TABLE temp
(
k UInt32,
iteration UInt32,
centroid UInt32,
vector Array(Float32)
)
ENGINE = Null
CREATE MATERIALIZED VIEW centroids_mv TO centroids
AS SELECT k, iteration, centroid, avgForEachState(vector) AS vector
FROM temp GROUP BY k, centroid, iteration
请注意,我们的查询是在插入到 temp
表中的块上执行的,而不是我们的数据源表 transactions
,该表没有 iteration
或 centroid
列。此临时表将接收我们的插入,并再次使用 Null 表引擎以避免写入数据。有了这些构建块,我们可以将算法的单次迭代可视化,假设 K = 5
上面显示了如何将数据插入到我们的临时表中,并通过执行使用 points
表作为源数据的 INSERT INTO SELECT
来计算我们的质心。此插入有效地代表了算法的一次迭代。此处的 SELECT
查询至关重要,因为它需要指定交易向量及其当前质心和迭代(以及 K 的固定值)。我们如何计算这两个值中的后者?完整的 INSERT INTO SELECT
如下所示
INSERT INTO temp
WITH
5 as k_val,
-- (1) obtain the max value of iteration - will be the previous iteration
(
SELECT max(iteration)
FROM centroids
-- As later we will reuse this table for all values of K
WHERE k = k_val
) AS c_iteration,
(
-- (3) convert centroids into a array of tuples
-- i.e. [(0, [vector]), (1, [vector]), ... , (k-1, [vector])]
SELECT groupArray((centroid, position))
FROM
(
-- (2) compute the centroids from the previous iteration
SELECT
centroid,
avgForEachMerge(vector) AS position
FROM centroids
WHERE iteration = c_iteration AND k = k_val
GROUP BY centroid
)
) AS c_centroids
SELECT
k_val AS k,
-- (4) increment the iteration
c_iteration + 1 AS iteration,
-- (5) find the closest centroid for this vector using Euclidean distance
(arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).1 AS centroid,
vector AS v
FROM points
首先,在 (1) 中,此查询标识了前一次迭代的编号。然后在 (2) 中的 CTE 中使用它来确定为此迭代(以及选择的 K)生成的质心,使用前面显示的相同的 avgForEachMerge
查询。这些质心通过 groupArray
查询折叠成包含元组数组的单行,以方便与点进行匹配。在 SELECT
中,我们递增迭代编号 (4) 并计算新的最近质心(使用欧几里得距离L2Distance
函数)使用arrayMap
和arraySort
函数对每个点进行计算。
通过将行插入到此处的 temp 中,并使用基于前一次迭代的质心,我们可以让物化视图计算新的质心(迭代值 +1)。
初始化质心
以上假设我们有一些用于迭代 1 的初始质心,这些质心用于计算成员资格。这要求我们初始化系统。我们可以通过简单地选择并插入 K 个随机点来完成此操作,使用以下查询(k=5)
INSERT INTO temp WITH
5 as k_val,
vectors AS
(
SELECT vector
FROM points
-- select random points, use k to make pseudo-random
ORDER BY cityHash64(concat(toString(id), toString(k_val))) ASC
LIMIT k_val -- k
)
SELECT
k_val as k,
1 AS iteration,
rowNumberInAllBlocks() AS centroid,
vector
FROM vectors
成功的聚类对质心的初始位置非常敏感;分配不当会导致收敛缓慢或聚类效果不佳。我们将在稍后讨论这一点。
质心分配和何时停止迭代
以上所有内容都代表了一次迭代(以及初始化步骤)。每次迭代后,我们需要根据聚类是否收敛的经验测量结果来决定是否停止。最简单的方法是,只要点在迭代之间不再改变质心(以及聚类)就停止。
要确定哪些点属于哪些质心,我们可以在任何时候使用我们之前在
INSERT INTO SELECT
中使用的 SELECT。
要计算上次迭代中移动聚类的点数,我们首先计算前两次迭代 (1) 和 (2) 的质心。使用这些,我们确定每次迭代 (3) 和 (4) 中每个点的质心。如果这些相同 (5),我们返回 0,否则返回 1。这些 (6) 值的总和为我们提供了移动聚类的点数。
WITH 5 as k_val,
(
SELECT max(iteration)
FROM centroids
) AS c_iteration,
(
-- (1) current centroids
SELECT groupArray((centroid, position))
FROM
(
SELECT
centroid,
avgForEachMerge(vector) AS position
FROM centroids
WHERE iteration = c_iteration AND k = k_val
GROUP BY centroid
)
) AS c_centroids,
(
-- (2) previous centroids
SELECT groupArray((centroid, position))
FROM
(
SELECT
centroid,
avgForEachMerge(vector) AS position
FROM centroids
WHERE iteration = (c_iteration-1) AND k = k_val
GROUP BY centroid
)
) AS c_p_centroids
-- (6) sum differences
SELECT sum(changed) FROM (
SELECT id,
-- (3) current centroid for point
(arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).1 AS cluster,
-- (4) previous centroid for point
(arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_p_centroids))[1]).1 AS cluster_p,
-- (5) difference in allocation
if(cluster = cluster_p, 0, 1) as changed
FROM points
)
测试数据集
以上主要是理论上的。让我们看看以上是否在实际数据集上有效!为此,我们将使用流行的纽约市出租车数据集的 300 万行子集,因为聚类可能与现实相关。要从 S3 创建和插入数据
CREATE TABLE trips (
trip_id UInt32,
pickup_datetime DateTime,
dropoff_datetime DateTime,
pickup_longitude Nullable(Float64),
pickup_latitude Nullable(Float64),
dropoff_longitude Nullable(Float64),
dropoff_latitude Nullable(Float64),
passenger_count UInt8,
trip_distance Float32,
fare_amount Float32,
extra Float32,
tip_amount Float32,
tolls_amount Float32,
total_amount Float32,
payment_type Enum('CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4, 'UNK' = 5),
pickup_ntaname LowCardinality(String),
dropoff_ntaname LowCardinality(String)
)
ENGINE = MergeTree
ORDER BY (pickup_datetime, dropoff_datetime);
INSERT INTO trips SELECT trip_id, pickup_datetime, dropoff_datetime, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, passenger_count, trip_distance, fare_amount, extra, tip_amount, tolls_amount, total_amount, payment_type, pickup_ntaname, dropoff_ntaname
FROM gcs('https://storage.googleapis.com/clickhouse-public-datasets/nyc-taxi/trips_{0..2}.gz', 'TabSeparatedWithNames');
特征选择
特征选择对于良好的聚类至关重要,因为它直接影响形成的聚类的质量。我们在此不会详细介绍如何选择特征。对于有兴趣的人,我们在笔记本中包含了笔记。我们最终得到以下 points
表
CREATE TABLE points
(
`id` UInt32,
`vector` Array(Float32),
`pickup_hour` UInt8,
`pickup_day_of_week` UInt8,
`pickup_day_of_month` UInt8,
`dropoff_hour` UInt8,
`pickup_longitude` Float64,
`pickup_latitude` Float64,
`dropoff_longitude` Float64,
`dropoff_latitude` Float64,
`passenger_count` UInt8,
`trip_distance` Float32,
`fare_amount` Float32,
`total_amount` Float32
) ENGINE = MergeTree ORDER BY id
要填充此表,我们使用 INSERT INTO SELECT
SQL 查询,该查询创建特征、对其进行缩放并过滤任何异常值。请注意,我们的最终列也编码在 vector
列中。
链接的查询是我们生成特征的第一次尝试。我们希望这里可以进行更多工作,这可能会产生比显示结果更好的结果。欢迎提出建议!
一些 Python 代码
我们已经描述了算法中的迭代如何有效地简化为 INSERT INTO SELECT
,物化视图负责维护质心。这意味着我们需要调用此语句 N 次,直到收敛。
我们没有等到没有点在质心之间移动的状态,而是使用 1000 的阈值,即如果移动聚类的点数少于 1000,我们就会停止。每 5 次迭代进行一次检查。
鉴于 ClickHouse 执行了大部分工作,用于执行特定 K 值的 K 均值的伪代码变得非常简单。
def kmeans(k, report_every = 5, min_cluster_move = 1000):
startTime = time.time()
# INITIALIZATION QUERY
run_init_query(k)
i = 0
while True:
# ITERATION QUERY
run_iteration_query(k)
# report every N iterations
if (i + 1) % report_every == 0 or i == 0:
num_moved = calculate_points_moved(k)
if num_moved <= min_cluster_move:
break
i += 1
execution_time = (time.time() - startTime))
# COMPUTE d^2 ERROR
d_2_error = compute_d2_error(k)
# return the d^2, execution time and num of required iterations
return d_2_error, execution_time, i+1
此循环的完整代码(包括查询)可以在笔记本中找到。
选择 K
到目前为止,我们假设已经确定了 K。有几种技术可以确定 K 的最佳值,最简单的方法是计算每个点的每个 K 值与其各自聚类的聚合平方距离 (SSE)。这为我们提供了一个我们旨在最小化的成本指标。compute_d2_error
方法使用以下 SQL 查询来计算此值(假设 K 的值为 5)
WITH 5 as k_val,
(
SELECT max(iteration)
FROM centroids WHERE k={k}
) AS c_iteration,
(
SELECT groupArray((centroid, position))
FROM
(
SELECT
centroid,
avgForEachMerge(vector) AS position
FROM centroids
WHERE iteration = c_iteration AND k=k_val
GROUP BY centroid
)
) AS c_centroids
SELECT
sum(pow((arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).2, 2)) AS distance
FROM points
随着 K 的增加,此值保证会减小,例如,如果我们将 K 设置为点的数量,则每个聚类将包含 1 个点,因此我们的误差为 0。不幸的是,这不会很好地泛化数据!
随着 K 的增加,SSE 通常会减小,因为数据点更接近其聚类质心。目标是找到“拐点”,即 SSE 的下降率急剧变化的点。这个点表明增加 K 带来的益处正在递减。在拐点处选择 K 可以提供一个模型,该模型可以捕捉数据中固有的分组,而不会过拟合。识别此拐点的一种简单方法是绘制 K 与 SEE 的关系图并直观地确定该值。对于我们的纽约市出租车数据,我们测量并绘制了 K 值为 2 到 20 时的 SSE
此处的拐点不像我们希望的那样清晰,但值为 5 似乎是一个合理的候选值。
以上结果基于对每个 K 值的单次端到端运行。K 均值可能会收敛到局部最小值,无法保证附近的点最终会出现在同一个聚类中。建议对每个 K 值运行多个值,每次使用不同的初始质心,以找到最佳候选值。
结果
如果我们将 5 选择为 K 的值,则算法在大约 30 次迭代和 20 秒内在 12 个 vCPU ClickHouse 云节点上收敛。这种方法在每次迭代中都考虑了所有 300 万行。
k=5
initializing...OK
Iteration 0
Number changed cluster in first iteration: 421206
Iteration 1, 2, 3, 4
Number changed cluster in iteration 5: 87939
Iteration 5, 6, 7, 8, 9
Number changed cluster in iteration 10: 3610
Iteration 10, 11, 12, 13, 14
Number changed cluster in iteration 15: 1335
Iteration 15, 16, 17, 18, 19
Number changed cluster in iteration 20: 1104
Iteration 20, 21, 22, 23, 24
Number changed cluster in iteration 25: 390
stopping as moved less than 1000 clusters in last iteration
Execution time in seconds: 20.79200577735901
D^2 error for 5: 33000373.34968858
要将这些聚类可视化,我们需要降低维数。为此,我们使用主成分分析 (PCA)。我们将 SQL 中 PCA 的实现推迟到另一篇博客,只使用 Python 和 10,000 个随机点的样本。我们可以通过检查主成分占用了多少方差来评估 PCA 在捕捉数据的基本属性方面的有效性。82% 低于通常使用的 90% 阈值,但足以了解我们聚类的有效性
Explained variances of the 3 principal components: 0.824
使用我们的 3 个主成分,我们可以绘制相同的 10,000 个随机点,并根据其聚类将颜色与每个点相关联。
聚类的 PCA 可视化显示了跨 PC1 和 PC3 的密集平面,整齐地划分为四个不同的聚类,表明这些维度内的方差受限。沿着第二个主成分 (PC2),可视化变得更加稀疏,一个聚类(编号 3)与主组不同,可能特别有趣。
为了更好地理解我们的聚类结果,我们需要添加标签。理想情况下,我们可以通过探索每个聚类中每个列的分布来实现,寻找独特的特征和时间/空间模式。我们将尝试使用一个简洁的 SQL 查询来理解每个聚类中每个列的分布,以达到这个目的。为了确定关注的列,我们可以检查 PCA 成分的数值,并识别出主导维度。执行此操作的代码可以在笔记本中找到,并识别出以下内容
PCA1:: ['pickup_day_of_month: 0.9999497049810415', 'dropoff_latitude: -0.006371842399701939', 'pickup_hour: 0.004444108327647353', 'dropoff_hour: 0.003868258226185553', …]
PCA 2:: ['total_amount: 0.5489526881298809', 'fare_amount: 0.5463895585884886', 'pickup_longitude: 0.43181504878694826', 'pickup_latitude: -0.3074228612885196', 'dropoff_longitude: 0.2756342866763702', 'dropoff_latitude: -0.19809343490462433', …]
PCA 3:: ['dropoff_hour: -0.6998176337701472', 'pickup_hour: -0.6995098287872831', 'pickup_day_of_week: 0.1134719682173672', 'pickup_longitude: -0.05495391127067617', …]
对于 PCA1,pickup_day_of_month
很重要,表明重点是月份的时间。对于 PC2,维度,接送地点和行程费用似乎贡献很大。该成分可能专注于特定类型的行程。最后,对于 PC3,行程发生的时段似乎最相关。为了理解这些列在时间、日期和价格方面如何因聚类而异,我们再次可以使用 SQL 查询
WITH
5 AS k_val,
(
SELECT max(iteration)
FROM centroids
WHERE k = k_val
) AS c_iteration,
(
SELECT groupArray((centroid, position))
FROM
(
SELECT
centroid,
avgForEachMerge(vector) AS position
FROM centroids
WHERE (iteration = c_iteration) AND (k = k_val)
GROUP BY centroid
)
) AS c_centroids
SELECT
(arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).1 AS cluster,
floor(avg(pickup_day_of_month)) AS pickup_day_of_month,
round(avg(pickup_hour)) AS avg_pickup_hour,
round(avg(fare_amount)) AS avg_fare_amount,
round(avg(total_amount)) AS avg_total_amount
FROM points
GROUP BY cluster
ORDER BY cluster ASC
┌─cluster─┬─pickup_day_of_month─┬─avg_pickup_hour─┬─avg_fare_amount─┬─avg_total_amount─┐
│ 0 │ 11 │ 14 │ 11 │ 13 │
│ 1 │ 3 │ 14 │ 12 │ 14 │
│ 2 │ 18 │ 13 │ 11 │ 13 │
│ 3 │ 16 │ 14 │ 49 │ 58 │
│ 4 │ 26 │ 14 │ 12 │ 14 │
└─────────┴─────────────────────┴─────────────────┴─────────────────┴──────────────────┘
9 rows in set. Elapsed: 0.625 sec. Processed 2.95 million rows, 195.09 MB (4.72 million rows/s., 312.17 MB/s.)
Peak memory usage: 720.16 MiB.
聚类 3 明显与更昂贵的行程相关。考虑到行程费用与一个主成分相关联,该成分也确定了接送地点为关键,这些行程可能与特定类型的行程相关联。其他聚类需要更深入的分析,但似乎侧重于每月模式。我们可以在地图可视化上绘制仅聚类 3 的接送地点。在下图中,蓝色和红色点分别代表接送地点
仔细观察该图后,发现该聚类与往返 JFK 机场的机场行程相关联。
缩放
我们之前的示例仅使用了纽约市出租车行程 300 万行的子集。在对 2009 年所有出租车行程(1.7 亿行)的更大数据集进行测试时,我们可以使用 ClickHouse 服务在 60 个核心上,在约 3 分钟内完成 k=5 的聚类。
k=5
initializing...OK
…
Iteration 15, 16, 17, 18, 19
Number changed cluster in iteration 20: 288
stopping as moved less than 1000 clusters in last iteration
Execution time in seconds: 178.61135005950928
D^2 error for 5: 1839404623.265372
Completed in 178.61135005950928s and 20 iterations with error 1839404623.265372
这产生了与我们之前较小的子集相似的聚类。使用 scikit-learn 在 64 核 m5d.16xlarge
上运行相同的聚类需要 6132 秒,慢了 34 倍!在笔记本的末尾可以找到重现此基准测试的步骤,以及使用 这些步骤 来使用 scikit-learn。
潜在改进和未来工作
聚类对选择的初始点非常敏感。K-Means++ 是一种比标准 K-Means 聚类更好的方法,它通过引入更智能的初始化过程来解决这个问题,该过程旨在分散初始质心,从而降低初始质心放置不当的可能性,并导致更快的收敛以及潜在的更好的聚类。我们将此作为练习留给读者进行改进。
K-Means 也难以处理分类变量。可以通过独热编码(在 SQL 中也可以实现)以及专门的算法(例如针对此类数据而设计的 KModes 聚类)来部分解决这个问题。特定领域的自定距离函数(而不是仅使用欧几里得距离)也很常见,应该可以使用用户定义函数(UDF)来实现。
最后,探索其他软聚类算法(例如,针对正态分布特征的高斯混合模型)或层次聚类算法(例如,凝聚式聚类)可能也很有趣。这些后一种方法也克服了 K-Means 的主要局限性之一——需要指定 K。我们希望看到在 ClickHouse SQL 中实现这些方法的尝试!