简介
最近,在帮助一个想要从 ClickHouse 中保存的向量计算质心的用户时,我们意识到同样的解决方案可以用来实现 K 均值聚类。他们希望在可能数十亿个数据点上大规模地解决这个问题,同时确保内存可以得到严格管理。在这篇文章中,我们尝试仅使用 SQL 来实现 K 均值聚类,并展示它可以扩展到数十亿行。
在撰写这篇博文时,我们了解到 Boris Tyshkevich 所做的工作。虽然我们在本文中使用了不同的方法,但我们想感谢 Boris 的工作,以及他比我们更早地想到这个主意!
作为用 ClickHouse SQL 实现 K 均值的一部分,我们在不到 3 分钟的时间内对 1.7 亿个纽约出租车行程进行了聚类。使用相同资源的等效 scikit-learn 操作需要超过 100 分钟,并且需要 90GB 的 RAM。没有内存限制,并且 ClickHouse 自动分配计算,我们证明 ClickHouse 可以加速机器学习工作负载并减少迭代时间。
本文所有代码均可在笔记本中找到 这里.
为什么在 ClickHouse SQL 中使用 K 均值?
使用 ClickHouse SQL 进行 K 均值的关键动机是训练不受内存限制,这使得能够对 PB 数据集进行聚类,这得益于质心的增量计算(使用设置来限制内存开销)。相比之下,使用基于 Python 的方法在服务器之间分配此工作负载将需要额外的框架和复杂性。
此外,我们可以轻松地增加我们聚类中的并行级别以使用 Clickhouse 实例的全部资源。如果需要处理更大的数据集,我们只需扩展数据库服务——这在 ClickHouse Cloud 中是一个简单的操作。
将数据转换为 K 均值是一个简单的 SQL 查询,每秒可以处理数十亿行。有了 ClickHouse 中的质心和点,我们可以仅使用 SQL 计算统计数据,例如模型误差,并可能将我们的聚类用于其他操作,例如用于向量搜索的产品量化。
K 均值回顾
K 均值是一种无监督机器学习算法,用于将数据集划分为 K 个不同的、非重叠的子组(聚类),其中每个数据点都属于与最近平均值(聚类的质心)最接近的聚类。该过程从随机初始化 K 个质心开始,或者基于一些启发式算法。这些质心作为聚类的初始代表。然后,算法在两个主要步骤中迭代,直到收敛:分配和更新。
在分配步骤中,根据每个数据点与质心之间的欧几里得距离(或其他距离度量),将其分配给最接近的聚类。在更新步骤中,质心被重新计算为分配给它们各自聚类的所有点的平均值,这可能会改变它们的位置。
此过程保证收敛,随着点到聚类的分配最终稳定下来,并且在迭代之间不会改变。聚类的数量 K 需要事先指定,并且会严重影响算法的有效性,最佳值取决于数据集和聚类的目标。有关更多详细信息,我们推荐此优秀的概述.
点和质心
我们的用户提出的关键问题是有效地计算质心的能力。假设我们有一个简单的 transactions
表的数据模式,其中每行代表特定客户的银行交易。ClickHouse 中的向量表示为 Array
类型。
CREATE TABLE transactions
(
id UInt32,
vector Array(Float32),
-- e.g.[0.6860357,-1.0086979,0.83166444,-1.0089169,0.22888935]
customer UInt32,
...other columns omitted for brevity
)
ENGINE = MergeTree ORDER BY id
我们的用户想要找到每个客户的质心,实际上是与每个客户相关联的所有交易向量的平均位置。要找到平均向量集,我们可以使用 avgForEach
[1][2] 函数。例如,考虑计算 3 个向量的平均值,每个向量有 4 个元素
WITH vectors AS
(
SELECT c1 AS vector
FROM VALUES([1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12])
)
SELECT avgForEach(vector) AS centroid
FROM vectors
┌─centroid──┐
│ [5,6,7,8] │
└───────────┘
在我们的原始 transactions
表中,因此计算每个客户的平均值变为
SELECT customer, avgForEach(vector) AS centroid FROM transactions GROUP BY customer
虽然简单,但这种方法有一些局限性。首先,对于非常大的数据集,当 vector
包含许多 Float32
点,并且 customer
列具有许多唯一元素(高基数)时,此查询可能非常占用内存。其次,也许与 K 均值更相关的是,如果插入新行,此方法需要我们重新运行查询,这是低效的。我们可以通过物化视图和 AggregatingMergeTree 引擎来解决这些问题。
使用物化视图增量计算质心
物化视图允许我们将计算质心的成本转移到插入时间。与其他数据库不同,ClickHouse 物化视图只是一个触发器,在数据块被插入表时运行查询。此查询的结果将插入到第二个“目标”表中。在我们的例子中,物化视图查询将计算我们的质心,并将结果插入到 centroids
表中。
这里有一些重要的细节
- 我们的查询(计算质心)必须以可以与后续结果集合并的格式生成结果集——因为插入的每个块都会生成一个结果集。我们不是仅仅将平均值发送到
centroids
表(平均值的平均值将是不正确的),而是发送“平均状态”。平均状态表示包含每个向量位置的总和以及计数。这是使用avgForEachState
函数实现的——请注意,我们只是在函数名称后面添加了State
!AggregatingMergeTree 表引擎是存储这些聚合状态所必需的。我们将在下面对此进行更多探讨。 - 整个过程是增量的,
centroids
表包含最终状态,即每个质心对应一行。读者会注意到,接收插入的表具有 Null 表引擎。这会导致插入的行被丢弃,从而节省与每次迭代写入完整数据集相关的 IO。 - 物化视图的查询只在数据块被插入时执行。每个块中的行数可能会有所不同,具体取决于插入方法。我们建议在客户端端制定块时,每块至少包含 1000 行,例如使用 Go 客户端。如果服务器被设置为形成块(例如,通过 HTTP 插入时),大小也可以指定。
- 如果使用
INSERT INTO SELECT
从另一个表或外部源(例如 S3)读取行到 ClickHouse,则可以通过几个关键参数控制块大小,这些参数在 之前的博客 中进行了详细讨论。这些设置(以及 插入线程数)会对使用的内存(更大的块 = 更多的内存)和摄取速度(更大的块 = 更快)产生重大影响。这些设置意味着可以使用内存量 可以进行精细控制,以换取性能。
AggregatingMergeTree
我们的目标表 centroids
使用引擎 AggregatingMergeTree
CREATE TABLE centroids
(
customer UInt32,
vector AggregateFunction(avgForEach, Array(Float32))
)
ENGINE = AggregatingMergeTree ORDER BY customer
我们这里的 vector
列包含由 avgForEachState
函数在上面产生的聚合状态。这些是必须合并以产生最终答案的中间质心。此列需要具有适当的类型 AggregateFunction(avgForEach, Array(Float32))
。
与所有 ClickHouse MergeTree 表一样,AggregatingMergeTree 将数据存储为必须透明合并的部分,以允许更有效的查询。当合并包含我们聚合状态的部分时,必须这样做,以便仅合并与同一客户相关的状态。这实际上是通过使用 ORDER BY
子句按 customer
列对表进行排序来实现的。在查询时,我们还必须确保中间状态被分组和合并。这可以通过确保我们按 customer
列 GROUP BY
并使用 avgForEach
函数的合并等效项 avgForEachMerge
来实现。
SELECT customer, avgForEachMerge(vector) AS centroid
FROM centroids GROUP BY customer
所有聚合函数都有一个等效的状态函数,通过在其名称后附加
State
来获得,它生成一个可以存储的中间表示,然后可以检索并与Merge
等效项合并。有关更多详细信息,我们建议您阅读 这篇博文 以及我们 Mark 的视频。
与我们之前的 GROUP BY
相比,此查询将非常快。计算平均值的大部分工作已转移到插入时间,查询时间合并的少量行留了下来。考虑以下两种方法在 48GiB、12 vCPU 云服务上使用 1 亿条随机交易时的性能。加载数据的步骤 在这里。
对比从 transactions
表计算质心的性能
SELECT customer, avgForEach(vector) AS centroid
FROM transactions GROUP BY customer
ORDER BY customer ASC
LIMIT 1 FORMAT Vertical
10 rows in set. Elapsed: 147.526 sec. Processed 100.00 million rows, 41.20 GB (677.85 thousand rows/s., 279.27 MB/s.)
Row 1:
──────
customer: 1
centroid: [0.49645231463677153,0.5042792240640065,...,0.5017436349466129]
1 row in set. Elapsed: 36.017 sec. Processed 100.00 million rows, 41.20 GB (2.78 million rows/s., 1.14 GB/s.)
Peak memory usage: 437.54 MiB.
与 centroids
表相比,速度快 1700 倍以上
SELECT customer, avgForEachMerge(vector) AS centroid
FROM centroids GROUP BY customer
ORDER BY customer ASC
LIMIT 1
FORMAT Vertical
Row 1:
──────
customer: 1
centroid: [0.49645231463677153,0.5042792240640065,...,0.5017436349466129]
1 row in set. Elapsed: 0.085 sec. Processed 10.00 thousand rows, 16.28 MB (117.15 thousand rows/s., 190.73 MB/s.)
将所有内容整合在一起
有了我们增量计算质心的能力,让我们关注 K 均值聚类。假设我们尝试对表 points
进行聚类,其中每一行都有一个向量表示。在这里,我们将根据相似性进行聚类,而不是像我们对交易所做的那样,仅仅根据客户来确定我们的质心。
一次迭代
我们需要能够在算法的每次迭代后存储当前的质心。现在,假设我们已经确定了 K 的最佳值。我们用于质心的目标表可能如下所示
CREATE TABLE centroids
(
k UInt32,
iteration UInt32,
centroid UInt32,
vector AggregateFunction(avgForEach, Array(Float32))
)
ENGINE = AggregatingMergeTree
ORDER BY (k, iteration, centroid)
k
列的值设置为我们选择的 K 值。我们这里的 centroid
列表示质心编号本身,其值介于 0 和 K-1
之间。与其为算法的每次迭代使用单独的表,不如我们简单地包含一个 iteration
列,并确保我们的排序键是 (k, iteration, centroid)
。ClickHouse 将确保仅针对每个唯一的 K、质心和迭代合并中间状态。这意味着我们的最终行数将很小,从而确保快速查询这些质心。
我们的物化视图用于计算我们的质心,它应该很熟悉,只是需要对 GROUP BY k、centroid 和 iteration
进行一些小的调整
CREATE TABLE temp
(
k UInt32,
iteration UInt32,
centroid UInt32,
vector Array(Float32)
)
ENGINE = Null
CREATE MATERIALIZED VIEW centroids_mv TO centroids
AS SELECT k, iteration, centroid, avgForEachState(vector) AS vector
FROM temp GROUP BY k, centroid, iteration
请注意,我们的查询执行的是插入到 temp
表中的块,而不是我们的数据源表事务,它没有 iteration
或 centroid
列。此临时表将接收我们的插入,并再次使用 Null 表引擎来避免写入数据。有了这些构建块,我们可以可视化算法的一次迭代,假设 K = 5
以上显示了我们如何插入到我们的临时表中,并通过使用 points
表作为我们的源数据执行 INSERT INTO SELECT
来计算我们的质心。此插入有效地表示了算法的一次迭代。这里的 SELECT
查询至关重要,因为它需要指定交易向量及其当前质心和迭代(以及固定的 K 值)。我们如何计算后两者?完整的 INSERT INTO SELECT
如下所示
INSERT INTO temp
WITH
5 as k_val,
-- (1) obtain the max value of iteration - will be the previous iteration
(
SELECT max(iteration)
FROM centroids
-- As later we will reuse this table for all values of K
WHERE k = k_val
) AS c_iteration,
(
-- (3) convert centroids into a array of tuples
-- i.e. [(0, [vector]), (1, [vector]), ... , (k-1, [vector])]
SELECT groupArray((centroid, position))
FROM
(
-- (2) compute the centroids from the previous iteration
SELECT
centroid,
avgForEachMerge(vector) AS position
FROM centroids
WHERE iteration = c_iteration AND k = k_val
GROUP BY centroid
)
) AS c_centroids
SELECT
k_val AS k,
-- (4) increment the iteration
c_iteration + 1 AS iteration,
-- (5) find the closest centroid for this vector using Euclidean distance
(arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).1 AS centroid,
vector AS v
FROM points
首先,在 (1) 中,此查询标识前一次迭代的编号。然后,在 (2) 中的 CTE 中使用它来确定针对此迭代(以及选择的 K)产生的质心,使用前面显示的相同的 avgForEachMerge
查询。这些质心通过 groupArray
查询折叠成包含元组数组的单行,以方便与点进行轻松匹配。在 SELECT
中,我们增加迭代编号 (4) 并计算新的最近质心(使用欧几里得距离 L2Distance
函数),对每个点使用 arrayMap
和 arraySort
函数。
通过将行插入到这里的临时表中,并使用基于前一次迭代的质心,我们可以让物化视图计算新的质心(迭代值 +1)。
初始化质心
以上假设我们有一些针对迭代 1 的初始质心,用于计算成员资格。这要求我们初始化系统。我们可以通过简单地选择和插入 K 个随机点来执行此操作,使用以下查询 (k=5)
INSERT INTO temp WITH
5 as k_val,
vectors AS
(
SELECT vector
FROM points
-- select random points, use k to make pseudo-random
ORDER BY cityHash64(concat(toString(id), toString(k_val))) ASC
LIMIT k_val -- k
)
SELECT
k_val as k,
1 AS iteration,
rowNumberInAllBlocks() AS centroid,
vector
FROM vectors
成功的聚类对质心的初始位置非常敏感;分配不当会导致收敛缓慢或聚类不佳。稍后我们将对此进行讨论。
质心分配和何时停止迭代
以上所有内容都代表一次迭代(和初始化步骤)。在每次迭代后,我们需要根据对聚类是否收敛的经验测量做出是否停止的决定。最简单的方法是当点在迭代之间不再改变质心(以及聚类)时停止。
为了确定哪些点属于哪些质心,我们可以随时使用我们之前在
INSERT INTO SELECT
中的SELECT
。
为了计算在上一次迭代中移动聚类的点数,我们首先计算前两次迭代的质心 (1) 和 (2)。使用这些,我们确定每次迭代中每个点的质心 (3) 和 (4)。如果这些相同 (5),我们返回 0,否则返回 1。这些值的总和 (6) 为我们提供了移动聚类的点数。
WITH 5 as k_val,
(
SELECT max(iteration)
FROM centroids
) AS c_iteration,
(
-- (1) current centroids
SELECT groupArray((centroid, position))
FROM
(
SELECT
centroid,
avgForEachMerge(vector) AS position
FROM centroids
WHERE iteration = c_iteration AND k = k_val
GROUP BY centroid
)
) AS c_centroids,
(
-- (2) previous centroids
SELECT groupArray((centroid, position))
FROM
(
SELECT
centroid,
avgForEachMerge(vector) AS position
FROM centroids
WHERE iteration = (c_iteration-1) AND k = k_val
GROUP BY centroid
)
) AS c_p_centroids
-- (6) sum differences
SELECT sum(changed) FROM (
SELECT id,
-- (3) current centroid for point
(arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).1 AS cluster,
-- (4) previous centroid for point
(arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_p_centroids))[1]).1 AS cluster_p,
-- (5) difference in allocation
if(cluster = cluster_p, 0, 1) as changed
FROM points
)
测试数据集
以上内容主要是理论上的。让我们看看以上内容是否在真实数据集上有效!为此,我们将使用流行的纽约出租车数据集的 300 万行子集,因为这些聚类可能具有相关性。要从 S3 创建和插入数据
CREATE TABLE trips (
trip_id UInt32,
pickup_datetime DateTime,
dropoff_datetime DateTime,
pickup_longitude Nullable(Float64),
pickup_latitude Nullable(Float64),
dropoff_longitude Nullable(Float64),
dropoff_latitude Nullable(Float64),
passenger_count UInt8,
trip_distance Float32,
fare_amount Float32,
extra Float32,
tip_amount Float32,
tolls_amount Float32,
total_amount Float32,
payment_type Enum('CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4, 'UNK' = 5),
pickup_ntaname LowCardinality(String),
dropoff_ntaname LowCardinality(String)
)
ENGINE = MergeTree
ORDER BY (pickup_datetime, dropoff_datetime);
INSERT INTO trips SELECT trip_id, pickup_datetime, dropoff_datetime, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, passenger_count, trip_distance, fare_amount, extra, tip_amount, tolls_amount, total_amount, payment_type, pickup_ntaname, dropoff_ntaname
FROM gcs('https://storage.googleapis.com/clickhouse-public-datasets/nyc-taxi/trips_{0..2}.gz', 'TabSeparatedWithNames');
特征选择
特征选择对于良好的聚类至关重要,因为它直接影响形成的聚类的质量。我们在这里不会详细介绍我们如何选择特征。对于有兴趣的人,我们将在 笔记本 中包含这些说明。最终,我们得到了以下 points
表
CREATE TABLE points
(
`id` UInt32,
`vector` Array(Float32),
`pickup_hour` UInt8,
`pickup_day_of_week` UInt8,
`pickup_day_of_month` UInt8,
`dropoff_hour` UInt8,
`pickup_longitude` Float64,
`pickup_latitude` Float64,
`dropoff_longitude` Float64,
`dropoff_latitude` Float64,
`passenger_count` UInt8,
`trip_distance` Float32,
`fare_amount` Float32,
`total_amount` Float32
) ENGINE = MergeTree ORDER BY id
要填充此表,我们使用 INSERT INTO SELECT
SQL 查询,它创建特征、缩放特征并过滤任何异常值。请注意,我们的最终列也编码在 vector
列中。
链接的查询是我们首次尝试生成特征。我们预计这里可以做更多工作,这可能会产生比所示结果更好的结果。欢迎您提出建议!
一点 Python
我们已经描述了算法中的迭代如何有效地减少到 INSERT INTO SELECT
,物化视图负责维护质心。这意味着我们需要调用此语句 N 次,直到收敛。
与其等到没有点在质心之间移动的状态,不如我们使用 1000 的阈值,即如果移动聚类的点数少于 1000 个,我们就停止。每隔 5 次迭代执行一次此检查。
给定 ClickHouse 执行的大部分工作,针对特定 K 值执行 K 均值的伪代码变得非常简单。
def kmeans(k, report_every = 5, min_cluster_move = 1000):
startTime = time.time()
# INITIALIZATION QUERY
run_init_query(k)
i = 0
while True:
# ITERATION QUERY
run_iteration_query(k)
# report every N iterations
if (i + 1) % report_every == 0 or i == 0:
num_moved = calculate_points_moved(k)
if num_moved <= min_cluster_move:
break
i += 1
execution_time = (time.time() - startTime))
# COMPUTE d^2 ERROR
d_2_error = compute_d2_error(k)
# return the d^2, execution time and num of required iterations
return d_2_error, execution_time, i+1
此循环的完整代码,包括查询,可以在 笔记本 中找到。
选择 K
到目前为止,我们假设 K 已经确定。有多种技术可以确定 K 的最佳值,最简单的方法是计算每个 K 值的每个点与其各自聚类之间的聚合平方距离 (SSE)。这给了我们一个我们试图最小化的成本指标。方法 compute_d2_error
使用以下 SQL 查询来计算此值(假设 K 的值为 5)
WITH 5 as k_val,
(
SELECT max(iteration)
FROM centroids WHERE k={k}
) AS c_iteration,
(
SELECT groupArray((centroid, position))
FROM
(
SELECT
centroid,
avgForEachMerge(vector) AS position
FROM centroids
WHERE iteration = c_iteration AND k=k_val
GROUP BY centroid
)
) AS c_centroids
SELECT
sum(pow((arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).2, 2)) AS distance
FROM points
此值保证随着 K 的增加而减小,例如,如果我们将 K 设置为点的数量,每个聚类将有 1 个点,从而使我们的误差为 0。不幸的是,这将不会很好地概括数据!
随着 K 的增加,SSE 通常会减小,因为数据点更靠近它们的聚类质心。目标是找到“肘点”,即 SSE 的下降率急剧变化的点。该点表明增加 K 的收益递减。在肘点处选择 K 可以提供一个模型,该模型可以捕获数据中固有的分组,而不会过度拟合。识别此肘点的一种简单方法是绘制 K 与 SEE 并直观地识别该值。对于我们的纽约出租车数据,我们测量并绘制了 K 值 2 到 20 的 SSE
这里的肘点不如我们想要的那么清晰,但 5 的值似乎是一个合理的候选者。
以上结果基于每个 K 值的单次端到端运行。K 均值可以收敛到局部最小值,无法保证附近的点最终会落在同一个聚类中。建议针对每个 K 值运行多个值,每次使用不同的初始质心,以找到最佳候选者。
结果
如果我们选择 5 作为 K 的值,算法大约需要 30 次迭代和 20 秒才能在一个 12 vCPU ClickHouse 云节点上收敛。这种方法考虑了每次迭代的 300 万行。
k=5
initializing...OK
Iteration 0
Number changed cluster in first iteration: 421206
Iteration 1, 2, 3, 4
Number changed cluster in iteration 5: 87939
Iteration 5, 6, 7, 8, 9
Number changed cluster in iteration 10: 3610
Iteration 10, 11, 12, 13, 14
Number changed cluster in iteration 15: 1335
Iteration 15, 16, 17, 18, 19
Number changed cluster in iteration 20: 1104
Iteration 20, 21, 22, 23, 24
Number changed cluster in iteration 25: 390
stopping as moved less than 1000 clusters in last iteration
Execution time in seconds: 20.79200577735901
D^2 error for 5: 33000373.34968858
为了可视化这些聚类,我们需要降低维度。为此,我们使用主成分分析 (PCA)。我们将 PCA 在 SQL 中的实现留待另一篇博客,只使用 Python 和 10,000 个随机点的样本。我们可以通过检查主成分占了多少方差来评估 PCA 在捕获数据基本属性方面的有效性。82% 低于通常使用的 90% 的阈值,但足以理解我们聚类有效性。
Explained variances of the 3 principal components: 0.824
使用我们的 3 个主成分,我们可以绘制相同的 10,000 个随机点,并根据其聚类为每个点关联一种颜色。
聚类的 PCA 可视化显示了跨 PC1 和 PC3 的一个密集平面,整齐地划分为四个不同的聚类,这表明这些维度内的方差受限。沿着第二主成分 (PC2),可视化变得稀疏,一个聚类(编号 3)偏离了主群体,可能特别有趣。
为了理解我们的聚类,我们需要标签。理想情况下,我们会通过探索每个聚类中每列的分布来生成这些标签,寻找独特的特征和时间/空间模式。我们将尝试使用 SQL 查询简洁地做到这一点,以了解每个聚类中每列的分布。对于要关注的列,我们可以检查 PCA 成分的数值,并识别占主导地位的维度。在笔记本中可以找到用于执行此操作的代码,并识别以下内容:
PCA1:: ['pickup_day_of_month: 0.9999497049810415', 'dropoff_latitude: -0.006371842399701939', 'pickup_hour: 0.004444108327647353', 'dropoff_hour: 0.003868258226185553', …]
PCA 2:: ['total_amount: 0.5489526881298809', 'fare_amount: 0.5463895585884886', 'pickup_longitude: 0.43181504878694826', 'pickup_latitude: -0.3074228612885196', 'dropoff_longitude: 0.2756342866763702', 'dropoff_latitude: -0.19809343490462433', …]
PCA 3:: ['dropoff_hour: -0.6998176337701472', 'pickup_hour: -0.6995098287872831', 'pickup_day_of_week: 0.1134719682173672', 'pickup_longitude: -0.05495391127067617', …]
对于 PCA1,pickup_day_of_month
很重要,这表明关注月份的时间。对于 PC2,维度、接送地点、行程的成本似乎贡献很大。这个成分可能集中在特定类型的行程上。最后,对于 PC3,行程发生的时数似乎最相关。为了了解这些列在时间、日期和价格方面如何根据聚类而有所不同,我们再次可以使用 SQL 查询。
WITH
5 AS k_val,
(
SELECT max(iteration)
FROM centroids
WHERE k = k_val
) AS c_iteration,
(
SELECT groupArray((centroid, position))
FROM
(
SELECT
centroid,
avgForEachMerge(vector) AS position
FROM centroids
WHERE (iteration = c_iteration) AND (k = k_val)
GROUP BY centroid
)
) AS c_centroids
SELECT
(arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).1 AS cluster,
floor(avg(pickup_day_of_month)) AS pickup_day_of_month,
round(avg(pickup_hour)) AS avg_pickup_hour,
round(avg(fare_amount)) AS avg_fare_amount,
round(avg(total_amount)) AS avg_total_amount
FROM points
GROUP BY cluster
ORDER BY cluster ASC
┌─cluster─┬─pickup_day_of_month─┬─avg_pickup_hour─┬─avg_fare_amount─┬─avg_total_amount─┐
│ 0 │ 11 │ 14 │ 11 │ 13 │
│ 1 │ 3 │ 14 │ 12 │ 14 │
│ 2 │ 18 │ 13 │ 11 │ 13 │
│ 3 │ 16 │ 14 │ 49 │ 58 │
│ 4 │ 26 │ 14 │ 12 │ 14 │
└─────────┴─────────────────────┴─────────────────┴─────────────────┴──────────────────┘
9 rows in set. Elapsed: 0.625 sec. Processed 2.95 million rows, 195.09 MB (4.72 million rows/s., 312.17 MB/s.)
Peak memory usage: 720.16 MiB.
聚类 3 明显与更昂贵的行程相关。鉴于行程的成本与一个主成分相关联,该成分还将接送地点确定为关键要素,这些地点可能与特定类型的行程相关联。其他聚类需要更深入的分析,但似乎侧重于月度模式。我们可以在地图可视化上绘制仅聚类 3 的接送地点。蓝色和红色点分别代表下图中的接送地点。
仔细观察该图,可以发现该聚类与往返肯尼迪机场的机场行程相关联。
缩放
我们之前的示例只使用了纽约市出租车行程的 300 万行子集。在针对 2009 年所有出租车行程(1.7 亿行)的更大数据集进行测试时,我们可以使用 ClickHouse 服务,在约 3 分钟内完成 k=5 的聚类,使用 60 个内核。
k=5
initializing...OK
…
Iteration 15, 16, 17, 18, 19
Number changed cluster in iteration 20: 288
stopping as moved less than 1000 clusters in last iteration
Execution time in seconds: 178.61135005950928
D^2 error for 5: 1839404623.265372
Completed in 178.61135005950928s and 20 iterations with error 1839404623.265372
这将生成与我们之前较小的子集相似的聚类。在 64 核 m5d.16xlarge
上使用 scikit-learn 运行相同的聚类需要 6132 秒,慢了 34 倍以上!在笔记本结尾处可以找到用于复制此基准测试的步骤,以及使用这些步骤 用于 scikit-learn 的步骤。
潜在的改进和未来工作
聚类对选定的初始点非常敏感。K-Means++ 是对标准 K-Means 聚类的一种改进,它通过引入一种更智能的初始化过程来解决这个问题,该过程旨在分散初始质心,从而降低了初始质心放置不当的可能性,并导致更快的收敛速度以及可能更好的聚类结果。我们将其留作练习,供读者改进。
K-Means 也难以处理分类变量。这可以通过独热编码(在 SQL 中也可能)以及专门的算法(例如为这类数据而设计的KModes 聚类)来部分解决。专门领域的自定义距离函数,而不是欧几里得距离,也很常见,应该可以使用用户定义函数 (UDF) 来实现。
最后,探索其他软聚类算法(如高斯混合模型,适用于正态分布特征)或层次聚类算法(如凝聚层次聚类)可能也很有趣。这些后一种方法也克服了 K-Means 的一个主要限制——需要指定 K。我们很想看到在 ClickHouse SQL 中实现这些算法的尝试!