博客 / 工程

如何仅使用 ClickHouse SQL 扩展 K-Means 聚类

author avatar
Dale McDiarmid
4 月 11, 2024 - 13 分钟阅读

简介

最近,在帮助一位想要从 ClickHouse 中保存的向量计算质心的用户时,我们意识到相同的解决方案可以用于实现 K-Means 聚类。他们希望在可能数十亿的数据点上大规模解决此问题,同时确保可以严格管理内存。在这篇文章中,我们尝试仅使用 SQL 实现 K-means 聚类,并表明它可以扩展到数十亿行。

在撰写这篇博客的过程中,我们了解到 Boris Tyshkevich 所做的工作。虽然我们在这篇博客中使用了不同的方法,但我们要感谢 Boris 的工作以及他在我们之前就有了这个想法!

作为使用 ClickHouse SQL 实现 K-Means 的一部分,我们在 3 分钟内聚类了 1.7 亿次纽约市出租车行程。使用相同资源的等效 scikit-learn 操作需要 100 多分钟,并且需要 90GB 的 RAM。在没有内存限制且 ClickHouse 自动分配计算的情况下,我们表明 ClickHouse 可以加速机器学习工作负载并缩短迭代时间。

kmeans_cluster_1.png

这篇博客文章的所有代码都可以在这里的 notebook 中找到。

为什么在 ClickHouse SQL 中使用 K-Means?

使用 ClickHouse SQL 进行 K-Means 的主要动机是训练不受内存限制,这使得聚类 PB 级数据集成为可能,这要归功于质心的增量计算(具有限制内存开销的设置)。相比之下,使用基于 Python 的方法跨服务器分发此工作负载将需要额外的框架和复杂性。

此外,我们可以轻松增加聚类中的并行级别,以充分利用 Clickhouse 实例的资源。如果我们需要处理更大的数据集,我们只需扩展数据库服务 - 这是 ClickHouse Cloud 中的一个简单操作。

转换 K-Means 数据是一个简单的 SQL 查询,每秒可以处理数十亿行。由于质心和点都保存在 ClickHouse 中,我们可以仅使用 SQL 计算模型误差等统计信息,并可能将我们的集群用于其他操作,例如向量搜索的产品量化。

K-Means 回顾

K-Means 是一种无监督机器学习算法,用于将数据集划分为 K 个不同的、非重叠的子组(集群),其中每个数据点都属于与其最近均值(集群的质心)的集群。该过程首先随机或基于某些启发式方法初始化 K 个质心。这些质心充当集群的初始代表。然后,该算法迭代执行两个主要步骤,直到收敛:分配和更新。

在分配步骤中,每个数据点都根据欧几里得距离(或其他距离度量)分配到最近的集群,该距离是数据点与质心之间的距离。在更新步骤中,质心被重新计算为分配给各自集群的所有点的均值,从而可能改变它们的位置。

此过程保证收敛,点的集群分配最终稳定,并且在迭代之间不会改变。集群的数量 K 需要预先指定,并且严重影响算法的有效性,最佳值取决于数据集和聚类的目标。有关更多详细信息,我们推荐这篇优秀概述

点和质心

我们的用户提出的关键问题是有效计算质心的能力。假设我们有一个简单的transactions表的数据模式,其中每一行代表特定客户的银行交易。ClickHouse 中的向量表示为Array类型。

CREATE TABLE transactions
(
  id UInt32,
  vector Array(Float32), 
  -- e.g.[0.6860357,-1.0086979,0.83166444,-1.0089169,0.22888935]
  customer UInt32,
  ...other columns omitted for brevity
)
ENGINE = MergeTree ORDER BY id

我们的用户想要找到每个客户的质心,实际上是与每个客户关联的所有交易向量的位置平均值。为了找到平均向量集,我们可以使用avgForEach[1][2]函数。例如,考虑计算 3 个向量的平均值的示例,每个向量包含 4 个元素

WITH vectors AS
   (
       SELECT c1 AS vector
       FROM VALUES([1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12])
   )
SELECT avgForEach(vector) AS centroid
FROM vectors

┌─centroid──┐
│ [5,6,7,8] │
└───────────┘

在我们原始的transactions表中,计算每个客户的平均值因此变为

SELECT customer, avgForEach(vector) AS centroid FROM transactions GROUP BY customer

虽然很简单,但这种方法有一些限制。首先,对于非常大的数据集,当vector包含许多Float32点,并且customer列具有许多唯一元素(高基数)时,此查询可能会非常消耗内存。其次,并且可能与 K-Means 更相关的是,如果插入新行,此方法需要我们重新运行查询,这是低效的。我们可以通过物化视图和 AggregatingMergeTree 引擎来解决这些问题。

使用物化视图增量计算质心

物化视图允许我们将计算质心的成本转移到插入时间。与其他数据库不同,ClickHouse 物化视图只是一个触发器,它在数据块插入到表中时对数据块运行查询。此查询的结果将插入到第二个“目标”表。在我们的例子中,物化视图查询将计算我们的质心,并将结果插入到表centroids

Incremental computing centroids with MV.png

这里有一些重要的细节

  • 我们的查询(计算质心)必须以可以与后续结果集合并的格式生成结果集 - 因为每个插入的块都会生成一个结果集。与其仅将平均值发送到我们的centroids表(平均值的平均值将是不正确的),我们发送“平均状态”。平均状态表示包含每个向量位置的总和,以及计数。这是使用avgForEachState函数实现的 - 请注意我们是如何将State附加到我们的函数名称的!AggregatingMergeTree 表引擎是存储这些聚合状态所必需的。我们将在下面进一步探讨这一点。
  • 整个过程是增量的,centroids表包含最终状态,即每个质心一行。读者会注意到,接收插入的表具有 Null 表引擎。这会导致插入的行被丢弃,从而节省了与每次迭代时写入完整数据集相关的 IO。
  • 我们的物化视图的查询仅在块插入时执行。每个块中的行数可能因插入方法而异。如果客户端制定块,例如使用 Go 客户端,我们建议每个块至少包含 1000 行。如果服务器负责形成块(例如,通过 HTTP 插入时),则还可以指定大小
  • 如果使用INSERT INTO SELECT,其中 ClickHouse 从另一个表或外部源(例如 S3)读取行,则块大小可以由之前的博客中详细讨论的几个关键参数控制。这些设置(以及插入线程数)对内存使用量(较大的块 = 更多内存)和摄取速度(较大的块 = 更快)都有显着影响。这些设置意味着内存使用量可以被精细控制以换取性能。

AggregatingMergeTree

我们的目标表centroids使用引擎AggregatingMergeTree

CREATE TABLE centroids
(
   customer UInt32,
   vector AggregateFunction(avgForEach, Array(Float32))
)
ENGINE = AggregatingMergeTree  ORDER BY customer

我们的vector列在此处包含由上面的avgForEachState函数生成的聚合状态。这些是必须合并以产生最终答案的中间质心。此列需要是适当的类型AggregateFunction(avgForEach, Array(Float32))

像所有 ClickHouse MergeTree 表一样,AggregatingMergeTree 将数据存储为必须透明合并的部分,以提高查询效率。当合并包含聚合状态的部分时,必须这样做,以便仅合并与同一客户相关的状态。这通过使用ORDER BY子句按customer列对表进行排序来有效实现。在查询时,我们还必须确保对中间状态进行分组和合并。这可以通过确保我们按列customerGROUP BY并使用avgForEach函数的 Merge 等效项来实现:avgForEachMerge。

SELECT customer, avgForEachMerge(vector) AS centroid
FROM centroids GROUP BY customer

所有聚合函数都有等效的状态函数,通过将State附加到其名称获得,它生成一个可以存储的中间表示,然后可以使用Merge等效项检索和合并。有关更多详细信息,我们推荐这篇博客和我们自己的 Mark的视频。

与我们之前的GROUP BY相比,此查询将非常快。计算平均值的大部分工作已转移到插入时间,只有少量行留给查询时间合并。考虑在 48GiB、12 vCPU 云服务上使用 1 亿个随机交易的以下两种方法的性能。加载数据的步骤在此

对比从transactions表计算质心的性能

SELECT customer, avgForEach(vector) AS centroid
FROM transactions GROUP BY customer
ORDER BY customer ASC
LIMIT 1 FORMAT Vertical

10 rows in set. Elapsed: 147.526 sec. Processed 100.00 million rows, 41.20 GB (677.85 thousand rows/s., 279.27 MB/s.)

Row 1:
──────
customer: 1
centroid: [0.49645231463677153,0.5042792240640065,...,0.5017436349466129]

1 row in set. Elapsed: 36.017 sec. Processed 100.00 million rows, 41.20 GB (2.78 million rows/s., 1.14 GB/s.)
Peak memory usage: 437.54 MiB.

centroids表的性能相比,后者速度快 1700 多倍

SELECT customer, avgForEachMerge(vector) AS centroid
FROM centroids GROUP BY customer
ORDER BY customer ASC
LIMIT 1
FORMAT Vertical

Row 1:
──────
customer: 1
centroid: [0.49645231463677153,0.5042792240640065,...,0.5017436349466129]

1 row in set. Elapsed: 0.085 sec. Processed 10.00 thousand rows, 16.28 MB (117.15 thousand rows/s., 190.73 MB/s.)

整合在一起

凭借我们增量计算质心的能力,让我们专注于 K-Means 聚类。假设我们正在尝试聚类表points,其中每一行都有向量表示。在这里,我们将基于相似性进行聚类,而不仅仅是像我们对交易那样基于客户来确定质心。

单次迭代

我们需要能够在算法的每次迭代后存储当前的质心。现在,假设我们已经确定了 K 的最佳值。我们的质心的目标表可能如下所示

CREATE TABLE centroids
(
  k UInt32,
  iteration UInt32,
  centroid UInt32,
  vector AggregateFunction(avgForEach, Array(Float32))
)
ENGINE = AggregatingMergeTree 
ORDER BY (k, iteration, centroid)

k列的值设置为我们选择的 K 值。我们的centroid列在此处表示质心编号本身,其值介于 0 和K-1之间。我们没有为算法的每次迭代使用单独的表,而是简单地包含一个iteration列,并确保我们的排序键是(k, iteration, centroid)。ClickHouse 将确保仅针对每个唯一的 K、质心和迭代合并中间状态。这意味着我们的最终行数将很小,从而确保快速查询这些质心。

我们用于计算质心的物化视图应该是熟悉的,只需稍作调整即可GROUP BY k、质心和迭代

CREATE TABLE temp
(
   k UInt32,
   iteration UInt32,
   centroid UInt32,
   vector Array(Float32)
)
ENGINE = Null

CREATE MATERIALIZED VIEW centroids_mv TO centroids
AS SELECT k, iteration, centroid, avgForEachState(vector) AS vector
FROM temp GROUP BY k, centroid, iteration

请注意,我们的查询在插入到temp表(而不是我们的数据源表 transactions)的块上执行,该表没有iterationcentroid列。此临时表将接收我们的插入,并再次使用 Null 表引擎以避免写入数据。有了这些构建块,我们可以可视化算法的单次迭代,假设K = 5

kmeans_clickhouse.png

上面显示了我们如何插入到我们的临时表中,从而通过对points表执行INSERT INTO SELECT作为我们的源数据来计算我们的质心。此插入有效地表示算法的一次迭代。这里的SELECT查询至关重要,因为它需要指定交易向量及其当前质心和迭代(以及 K 的固定值)。我们如何计算后两者?完整的INSERT INTO SELECT如下所示

INSERT INTO temp 
WITH
  5 as k_val,
  -- (1) obtain the max value of iteration - will be the previous iteration
  (
      SELECT max(iteration)
      FROM centroids 
      -- As later we will reuse this table for all values of K
      WHERE k = k_val
  ) AS c_iteration,
  (
      -- (3) convert centroids into a array of tuples 
      -- i.e. [(0, [vector]), (1, [vector]), ... , (k-1, [vector])]
      SELECT groupArray((centroid, position))
      FROM
      (
         -- (2) compute the centroids from the previous iteration
          SELECT
              centroid,
              avgForEachMerge(vector) AS position
          FROM centroids
          WHERE iteration = c_iteration AND k = k_val
          GROUP BY centroid
      )
  ) AS c_centroids
SELECT
  k_val AS k,
  -- (4) increment the iteration
  c_iteration + 1 AS iteration,
  -- (5) find the closest centroid for this vector using Euclidean distance
  (arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).1 AS centroid,
  vector AS v
FROM points

首先,在 (1) 处,此查询标识上一次迭代的编号。然后在 (2) 处的 CTE 中使用它来确定此迭代(和选定的 K)生成的质心,使用之前显示的相同avgForEachMerge查询。这些质心通过groupArray查询折叠成包含元组数组的单行,以便于与点匹配。在SELECT中,我们递增迭代次数 (4) 并计算新的最近质心(使用欧几里得距离L2Distance函数),方法是为每个点使用arrayMaparraySort函数。

通过将行插入到此处的临时表,质心基于上一次迭代,我们可以让物化视图计算新的质心(迭代值为 +1)。

初始化质心

以上假设我们有一些迭代 1 的初始质心,用于计算成员资格。这需要我们初始化系统。我们可以通过使用以下查询(k=5)简单地选择和插入 K 个随机点来做到这一点

INSERT INTO temp WITH 
  5 as k_val,
  vectors AS
  (
      SELECT vector
      FROM points
      -- select random points, use k to make pseudo-random
      ORDER BY cityHash64(concat(toString(id), toString(k_val))) ASC
      LIMIT k_val -- k
  )
SELECT
  k_val as k,
  1 AS iteration,
  rowNumberInAllBlocks() AS centroid,
  vector
FROM vectors

成功的聚类对质心的初始放置非常敏感;不良的分配会导致收敛缓慢或聚类效果不佳。我们稍后将对此进行讨论。

质心分配以及何时停止迭代

以上所有内容都代表一次迭代(和初始化步骤)。在每次迭代之后,我们需要根据聚类是否已收敛的经验测量来决定是否停止。最简单的方法是当点在迭代之间不再更改质心(和集群)时停止。

为了确定哪些点属于哪些质心,我们可以随时使用上面INSERT INTO SELECT中的 SELECT。

为了计算上次迭代中移动集群的点数,我们首先计算前两次迭代的质心 (1) 和 (2)。使用这些质心,我们确定每次迭代中每个点的质心 (3) 和 (4)。如果这些质心相同 (5),我们返回 0,否则返回 1。这些 (6) 值的总和为我们提供了移动集群的点数。

WITH 5 as k_val,
(
      SELECT max(iteration)
      FROM centroids
) AS c_iteration,
(
  -- (1) current centroids
  SELECT groupArray((centroid, position))
  FROM
  (
      SELECT
          centroid,
          avgForEachMerge(vector) AS position
      FROM centroids
      WHERE iteration = c_iteration AND k = k_val
      GROUP BY centroid
  )
) AS c_centroids,
(
  -- (2) previous centroids
  SELECT groupArray((centroid, position))
  FROM
  (
      SELECT
          centroid,
          avgForEachMerge(vector) AS position
      FROM centroids
      WHERE iteration = (c_iteration-1) AND k = k_val
      GROUP BY centroid
  )
) AS c_p_centroids
-- (6) sum differences
SELECT sum(changed) FROM (
  SELECT id,
  -- (3) current centroid for point
  (arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).1 AS cluster,
  -- (4) previous centroid for point
  (arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_p_centroids))[1]).1 AS cluster_p,
  -- (5) difference in allocation
  if(cluster = cluster_p, 0, 1) as changed
  FROM points
)

测试数据集

以上大部分是理论性的。让我们看看以上方法是否真的适用于真实数据集!为此,我们将使用流行的纽约市出租车数据集的 300 万行子集,因为集群有望具有相关性。从 S3 创建和插入数据的步骤

CREATE TABLE trips (
  trip_id         	UInt32,
  pickup_datetime 	DateTime,
  dropoff_datetime	DateTime,
  pickup_longitude	Nullable(Float64),
  pickup_latitude 	Nullable(Float64),
  dropoff_longitude   Nullable(Float64),
  dropoff_latitude	Nullable(Float64),
  passenger_count 	UInt8,
  trip_distance   	Float32,
  fare_amount     	Float32,
  extra           	Float32,
  tip_amount      	Float32,
  tolls_amount    	Float32,
  total_amount    	Float32,
  payment_type    	Enum('CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4, 'UNK' = 5),
  pickup_ntaname  	LowCardinality(String),
  dropoff_ntaname 	LowCardinality(String)
)
ENGINE = MergeTree
ORDER BY (pickup_datetime, dropoff_datetime);

INSERT INTO trips SELECT trip_id, pickup_datetime, dropoff_datetime, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, passenger_count, trip_distance, fare_amount, extra, tip_amount, tolls_amount, total_amount, payment_type, pickup_ntaname, dropoff_ntaname
FROM gcs('https://storage.googleapis.com/clickhouse-public-datasets/nyc-taxi/trips_{0..2}.gz', 'TabSeparatedWithNames');

特征选择

特征选择对于良好的聚类至关重要,因为它直接影响形成的集群的质量。我们不会在此处详细介绍我们如何选择特征。对于感兴趣的人,我们将注释包含在notebook中。我们最终得到以下points

CREATE TABLE points
(
   `id` UInt32,
   `vector` Array(Float32),
   `pickup_hour` UInt8,
   `pickup_day_of_week` UInt8,
   `pickup_day_of_month` UInt8,
   `dropoff_hour` UInt8,
   `pickup_longitude` Float64,
   `pickup_latitude` Float64,
   `dropoff_longitude` Float64,
   `dropoff_latitude` Float64,
   `passenger_count` UInt8,
   `trip_distance` Float32,
   `fare_amount` Float32,
   `total_amount` Float32
) ENGINE = MergeTree ORDER BY id

为了填充此表,我们使用INSERT INTO SELECT SQL 查询,它创建特征,缩放特征并过滤掉任何异常值。请注意,我们的最终列也编码在vector列中。

链接的查询是我们首次尝试生成特征。我们希望这里可以进行更多工作,这可能会产生比所示更好的结果。欢迎提出建议!

一点 Python

我们已经描述了算法中的迭代如何有效地简化为INSERT INTO SELECT,物化视图处理质心的维护。这意味着我们需要调用此语句 N 次,直到发生收敛。

我们没有等待达到点在质心之间不再移动的状态,而是使用了 1000 的阈值,即如果移动集群的点少于 1000 个,我们就停止。每 5 次迭代进行一次此检查。

对于特定的 K 值执行 K-Means 的伪代码变得非常简单,因为大部分工作由 ClickHouse 执行。

def kmeans(k, report_every = 5, min_cluster_move = 1000):
   startTime = time.time()
   # INITIALIZATION QUERY
   run_init_query(k)
   i = 0
   while True:
       # ITERATION QUERY
       run_iteration_query(k)
       # report every N iterations
       if (i + 1) % report_every == 0 or i == 0:
           num_moved = calculate_points_moved(k)
           if num_moved <= min_cluster_move:
               break
       i += 1
   execution_time = (time.time() - startTime))
   # COMPUTE d^2 ERROR
   d_2_error = compute_d2_error(k)
   # return the d^2, execution time and num of required iterations
   return d_2_error, execution_time, i+1

此循环的完整代码(包括查询)可以在notebook中找到。

选择 K

到目前为止,我们假设 K 已经被确定。有几种技术可以确定 K 的最佳值,其中最简单的方法是计算每个 K 值的每个点与其各自集群之间的聚合平方距离 (SSE)。这为我们提供了一个我们旨在最小化的成本指标。compute_d2_error方法使用以下 SQL 查询计算此值(假设 K 值为 5)

WITH 5 as k_val,
(
       SELECT max(iteration)
       FROM centroids WHERE k={k}
) AS c_iteration,
(
   SELECT groupArray((centroid, position))
   FROM
   (
       SELECT
           centroid,
           avgForEachMerge(vector) AS position
       FROM centroids
       WHERE iteration = c_iteration AND k=k_val
       GROUP BY centroid
   )
) AS c_centroids
SELECT
   sum(pow((arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).2, 2)) AS distance
FROM points

当我们增加 K 时,此值保证会减小,例如,如果我们将 K 设置为点数,则每个集群将有 1 个点,从而使我们的误差为 0。不幸的是,这不会很好地概括数据!

随着 K 的增加,SSE 通常会减小,因为数据点更接近其集群质心。目标是找到“肘点”,其中 SSE 的减少率急剧变化。此点表示增加 K 的好处的回报递减。在肘点选择 K 可以提供一个模型,该模型捕获数据中的固有分组,而不会过度拟合。识别此肘点的简单方法是绘制 K 与 SEE 的关系图,并直观地识别该值。对于我们的纽约市出租车数据,我们测量并绘制 K 值 2 到 20 的 SSE

k_vs_d_2.png

这里的肘点不像我们希望的那样清晰,但值 5 似乎是一个合理的候选值。

以上结果基于每个 K 值的单次端到端运行。K-Means 可能会收敛到局部最小值,并且不能保证附近的点最终会进入同一个集群。建议为每个 K 值运行多个值,每次都使用不同的初始质心,以找到最佳候选值。

结果

如果我们选择 5 作为我们的 K 值,则该算法大约需要 30 次迭代和 20 秒才能在 12 vCPU ClickHouse Cloud 节点上收敛。此方法考虑每次迭代的所有 300 万行。

k=5
initializing...OK
Iteration 0
Number changed cluster in first iteration: 421206
Iteration 1, 2, 3, 4
Number changed cluster in iteration 5: 87939
Iteration 5, 6, 7, 8, 9
Number changed cluster in iteration 10: 3610
Iteration 10, 11, 12, 13, 14
Number changed cluster in iteration 15: 1335
Iteration 15, 16, 17, 18, 19
Number changed cluster in iteration 20: 1104
Iteration 20, 21, 22, 23, 24
Number changed cluster in iteration 25: 390
stopping as moved less than 1000 clusters in last iteration
Execution time in seconds: 20.79200577735901
D^2 error for 5: 33000373.34968858

为了可视化这些集群,我们需要降低维度。为此,我们使用主成分分析 (PCA)。我们将 PCA 在 SQL 中的实现推迟到另一篇博客,而只使用 Python 和 10,000 个随机点的样本。我们可以通过检查主成分解释了多少方差来评估 PCA 在捕获数据基本属性方面的有效性。82% 低于通常使用的 90% 阈值,但足以理解我们聚类的有效性

Explained variances of the 3 principal components: 0.824

使用我们的 3 个主成分,我们可以绘制相同的随机 10,000 个点,并根据每个点的集群为其关联颜色。

kmeans_1.png

集群的 PCA 可视化显示了 PC1 和 PC3 上的一个密集平面,整齐地分为四个不同的集群,表明这些维度内的方差受到约束。沿着第二个主成分 (PC2),可视化变得更加稀疏,集群(编号 3)偏离了主组,可能特别有趣。

为了理解我们的集群,我们需要标签。理想情况下,我们将通过探索每个集群中每一列的分布来生成这些标签,寻找独特的特征和时间/空间模式。我们将尝试使用 SQL 查询简洁地做到这一点,以了解每个集群中每一列的分布。对于要关注的列,我们可以检查 PCA 成分的值并识别占主导地位的维度。执行此操作的代码可以在 notebook 中找到,并标识以下内容

PCA1:: ['pickup_day_of_month: 0.9999497049810415', 'dropoff_latitude: -0.006371842399701939', 'pickup_hour: 0.004444108327647353', 'dropoff_hour: 0.003868258226185553', …]

PCA 2:: ['total_amount: 0.5489526881298809', 'fare_amount: 0.5463895585884886', 'pickup_longitude: 0.43181504878694826', 'pickup_latitude: -0.3074228612885196', 'dropoff_longitude: 0.2756342866763702', 'dropoff_latitude: -0.19809343490462433', …]

PCA 3:: ['dropoff_hour: -0.6998176337701472', 'pickup_hour: -0.6995098287872831', 'pickup_day_of_week: 0.1134719682173672', 'pickup_longitude: -0.05495391127067617', …]

对于 PCA1,pickup_day_of_month很重要,这表明重点关注月份中的时间。对于 PC2 维度,上车和下车地点以及乘车费用似乎贡献很大。此组件可能侧重于特定的行程类型。最后,对于 PC3,行程发生的小时似乎是最相关的。为了了解这些列在每个集群中在时间、日期和价格方面的差异,我们可以再次仅使用 SQL 查询

WITH
   5 AS k_val,
   (
       SELECT max(iteration)
       FROM centroids
       WHERE k = k_val
   ) AS c_iteration,
   (
       SELECT groupArray((centroid, position))
       FROM
       (
           SELECT
               centroid,
               avgForEachMerge(vector) AS position
           FROM centroids
           WHERE (iteration = c_iteration) AND (k = k_val)
           GROUP BY centroid
       )
   ) AS c_centroids
SELECT
   (arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).1 AS cluster,
   floor(avg(pickup_day_of_month)) AS pickup_day_of_month,
   round(avg(pickup_hour)) AS avg_pickup_hour,
   round(avg(fare_amount)) AS avg_fare_amount,
   round(avg(total_amount)) AS avg_total_amount
FROM points
GROUP BY cluster
ORDER BY cluster ASC

┌─cluster─┬─pickup_day_of_month─┬─avg_pickup_hour─┬─avg_fare_amount─┬─avg_total_amount─┐
│   	011141113 │
│   	13141214 │
│   	218131113 │
│   	316144958 │
│   	426141214 │
└─────────┴─────────────────────┴─────────────────┴─────────────────┴──────────────────┘

9 rows in set. Elapsed: 0.625 sec. Processed 2.95 million rows, 195.09 MB (4.72 million rows/s., 312.17 MB/s.)
Peak memory usage: 720.16 MiB.

集群 3 显然与更昂贵的行程相关联。鉴于行程费用与主成分相关联,该主成分也将上车和下车地点识别为关键,因此这些可能与特定的行程类型相关联。其他集群需要更深入的分析,但似乎侧重于每月模式。我们可以将仅集群 3 的上车和下车地点绘制在地图可视化上。蓝色和红色点分别代表以下图中的上车和下车地点

clusters_nyc_map.png

仔细检查该图,此集群与往返 JFK 的机场行程相关联。

扩展

我们之前的示例仅使用了纽约市出租车行程的 300 万行子集。在更大的数据集上测试 2009 年的所有出租车行程(1.7 亿行),我们可以在大约 3 分钟内完成 k=5 的聚类,使用的 ClickHouse 服务有 60 个内核。

k=5
initializing...OK
…
Iteration 15, 16, 17, 18, 19
Number changed cluster in iteration 20: 288
stopping as moved less than 1000 clusters in last iteration
Execution time in seconds: 178.61135005950928
D^2 error for 5: 1839404623.265372
Completed in 178.61135005950928s and 20 iterations with error 1839404623.265372

这产生了与我们之前较小子集相似的集群。在 64 核m5d.16xlarge上使用 scikit-learn 运行相同的聚类需要 6132 秒,速度慢了 34 倍以上!重现此基准测试的步骤可以在 notebook 的末尾找到,并使用这些步骤进行 scikit-learn。

潜在的改进和未来的工作

聚类对选择的初始点非常敏感。K-Means++ 是对标准 K-Means 聚类的改进,它通过引入更智能的初始化过程来解决此问题,该过程旨在分散初始质心,减少初始质心放置不佳的可能性,并导致更快的收敛以及可能更好的聚类。我们将此作为练习留给读者进行改进。

K-Means 也难以处理分类变量。这可以部分通过单热编码(在 SQL 中也是可能的)以及专用于此类数据的算法(例如KModes 聚类)来处理。特定领域的自定义距离函数(而不仅仅是欧几里得距离)也很常见,应该可以使用用户定义函数 (UDF) 来实现。

最后,探索其他软聚类算法(例如用于正态分布特征的高斯混合模型)或分层聚类算法(例如凝聚聚类)也可能很有趣。后一种方法也克服了 K-Means 的主要限制之一 - 需要指定 K。我们很乐意看到尝试在 ClickHouse SQL 中实现这些!

立即开始使用 ClickHouse Cloud,并获得 300 美元的信用额度。在 30 天试用期结束时,继续使用按需付费计划,或联系我们以了解有关我们基于用量的折扣的更多信息。访问我们的定价页面了解详情。

分享这篇文章

订阅我们的新闻通讯

随时了解功能发布、产品路线图、支持和云产品!
正在加载表单...
关注我们
X imageSlack imageGitHub image
Telegram imageMeetup imageRss image
©2025ClickHouse, Inc. 总部位于加利福尼亚州湾区和荷兰阿姆斯特丹。