DoubleCloud 即将关闭。迁移到 ClickHouse,享受限时免费迁移服务。立即联系我们 ->->

博客 / 工程

如何使用 ClickHouse SQL 扩展 K-Means 聚类

author avatar
Dale McDiarmid
2024 年 4 月 11 日

简介

最近,在帮助一位用户计算存储在 ClickHouse 中的向量质心时,我们意识到相同的解决方案可以用于实现 K-Means 聚类。他们希望在可能数十亿个数据点上实现大规模计算,同时确保内存可以严格管理。在这篇文章中,我们将尝试仅使用 SQL 实现 K-Means 聚类,并展示它可以扩展到数十亿行。

在撰写这篇博文时,我们了解到 Boris Tyshkevich 所做的工作。虽然我们在博文中使用不同的方法,但我们想表彰 Boris 的工作,并感谢他比我们更早地提出了这个想法!

作为使用 ClickHouse SQL 实现 K-Means 的一部分,我们在不到 3 分钟内对 1.7 亿个纽约市出租车行程进行了聚类。使用相同资源的等效 scikit-learn 操作需要超过 100 分钟,并且需要 90GB 的 RAM。在没有内存限制的情况下,ClickHouse 自动分配计算,我们展示了 ClickHouse 如何加速机器学习工作负载并缩短迭代时间。

kmeans_cluster_1.png

这篇博文的所有代码都可以在 这里 的笔记本中找到。

为什么在 ClickHouse SQL 中使用 K-Means?

使用 ClickHouse SQL 进行 K-Means 的主要动机是,训练不受内存限制,由于质心的增量计算(以及限制内存开销的设置),可以对 PB 级数据集进行聚类。相比之下,使用基于 Python 的方法跨服务器分配此工作负载将需要额外的框架和复杂性。

此外,我们可以轻松地提高 聚类中的并行级别,以充分利用 Clickhouse 实例的全部资源。如果需要处理更大的数据集,我们只需扩展数据库服务——这在 ClickHouse Cloud 中是一个简单的操作。

将数据转换为 K-Means 是一个简单的 SQL 查询,每秒可以处理数十亿行。将质心和点存储在 ClickHouse 中,我们可以仅使用 SQL 计算统计信息,例如模型误差,并且可以将我们的聚类用于其他操作,例如用于向量搜索的产品量化。

K-Means 回顾

K-Means 是一种 无监督机器学习算法,用于将数据集划分为 K 个不同的非重叠子组(聚类),其中每个数据点都属于具有最近均值(聚类质心)的聚类。该过程首先通过随机或基于某种启发式方法初始化 K 个质心。这些质心作为聚类的初始代表。然后,该算法迭代两个主要步骤,直到收敛:分配和更新。

在分配步骤中,根据每个数据点与其质心之间的欧几里德距离(或其他距离度量)将其分配到最近的聚类。在更新步骤中,质心重新计算为分配到其各自聚类中的所有点的平均值,从而可能改变其位置。

该过程保证会收敛,点到聚类的分配最终会稳定下来,并且在迭代之间不会改变。聚类数量 K 需要事先指定,并且会极大地影响算法的有效性,最佳值取决于数据集和聚类的目标。有关更多详细信息,我们建议您阅读这篇 优秀的概述

点和质心

我们的用户提出的关键问题是有效地计算质心的能力。假设我们有一个针对 transactions 表的简单数据模式,其中每行代表特定客户的银行交易。ClickHouse 中的向量表示为 Array 类型。

CREATE TABLE transactions
(
  id UInt32,
  vector Array(Float32), 
  -- e.g.[0.6860357,-1.0086979,0.83166444,-1.0089169,0.22888935]
  customer UInt32,
  ...other columns omitted for brevity
)
ENGINE = MergeTree ORDER BY id

我们的用户想要找到每个客户的质心,实际上是与每个客户关联的所有交易向量的平均位置。为了找到平均向量的集合,我们可以使用 avgForEach[1][2] 函数。例如,考虑计算 3 个向量的平均值,每个向量具有 4 个元素

WITH vectors AS
   (
       SELECT c1 AS vector
       FROM VALUES([1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12])
   )
SELECT avgForEach(vector) AS centroid
FROM vectors

┌─centroid──┐
│ [5,6,7,8] │
└───────────┘

在我们最初的 transactions 表中,计算每个客户的平均值变为

SELECT customer, avgForEach(vector) AS centroid FROM transactions GROUP BY customer

虽然简单,但这种方法有一些局限性。首先,对于非常大的数据集,当 vector 包含许多 Float32 点并且 customer 列具有许多唯一元素(高基数)时,此查询可能非常占用内存。其次,可能与 K-Means 更相关的是,如果插入新行,则此方法需要我们重新运行查询,这效率低下。我们可以通过物化视图和 AggregatingMergeTree 引擎来解决这些问题。

使用物化视图增量计算质心

物化视图允许我们将计算质心的成本转移到插入时间。与其他数据库不同,ClickHouse 物化视图只是一个触发器,它在将数据块插入表时运行查询。此查询的结果将插入到第二个“目标”表中。在本例中,物化视图查询将计算我们的质心,并将结果插入到 centroids 表中。

Incremental computing centroids with MV.png

这里有一些重要的细节

  • 计算质心的查询必须以可以与后续结果集合并的格式生成结果集——因为插入的每个数据块都会生成一个结果集。与其仅仅将平均值发送到 centroids 表 (平均值的平均值将不正确),我们将发送 “平均状态”。平均状态表示包含每个向量位置的总和以及计数。这是使用 avgForEachState 函数实现的——请注意,我们只是在函数名末尾添加了 State!AggregatingMergeTree 表引擎是存储这些聚合状态所必需的。我们将在下面对此进行更深入的探讨。
  • 整个过程是增量的,centroids 表包含最终状态,即每个质心对应一行。读者会注意到,接收插入的表具有 Null 表引擎。这会导致插入的行被丢弃,从而节省了与每次迭代写入完整数据集相关的 IO 操作。
  • 物化视图的查询仅在数据块被插入时执行。每个数据块中的行数可能因插入方法而异。我们建议在客户端侧(例如,使用 Go 客户端)制定数据块时,每个数据块至少包含 1000 行。如果让服务器来生成数据块(例如,通过 HTTP 插入时),则大小也可以 指定
  • 如果使用 INSERT INTO SELECT 从 ClickHouse 读取另一张表或外部数据源(例如 S3)中的行,则可以通过几个关键参数来控制块大小,这些参数在之前的博客中进行了详细讨论。这些设置(以及插入线程数量)会对使用的内存(块越大,内存使用量就越大)和数据加载速度(块越大,速度就越快)产生重大影响。这些设置意味着可以使用内存量进行微调,以换取性能。

AggregatingMergeTree

我们的目标表 centroids 使用引擎AggregatingMergeTree

CREATE TABLE centroids
(
   customer UInt32,
   vector AggregateFunction(avgForEach, Array(Float32))
)
ENGINE = AggregatingMergeTree  ORDER BY customer

此处的 vector 列包含由上面的avgForEachState 函数生成的聚合状态。这些是必须合并以产生最终答案的中间质心。该列需要使用合适的类型AggregateFunction(avgForEach, Array(Float32))

与所有 ClickHouse MergeTree 表一样,AggregatingMergeTree 将数据存储为必须透明合并的多个部分,以允许更有效地查询。当合并包含聚合状态的部分时,必须确保只合并属于同一客户的那些状态。这可以通过使用 ORDER BY 子句按 customer 列对表进行排序来有效实现。在查询时,我们还必须确保对中间状态进行分组和合并。这可以通过确保我们按 customer 列进行 GROUP BY 并使用 avgForEach 函数的 Merge 等效项:avgForEachMerge 来实现。

SELECT customer, avgForEachMerge(vector) AS centroid
FROM centroids GROUP BY customer

所有聚合函数都有一个等效的状态函数,可以通过在其名称末尾追加 State 来获得,它会生成一个可以存储的中间表示,然后可以检索并与 Merge 等效项合并。有关更多详细信息,我们推荐这篇博客和来自我们Mark 的视频

与我们之前的 GROUP BY 相比,此查询的速度非常快。计算平均值的大部分工作都已移至插入时间,查询时间合并的行数很少。请考虑在 48GiB、12 个 vCPU 云服务上使用 1 亿个随机交易时的以下两种方法的性能。加载数据的步骤在这里

对比从 transactions 表计算质心的性能

SELECT customer, avgForEach(vector) AS centroid
FROM transactions GROUP BY customer
ORDER BY customer ASC
LIMIT 1 FORMAT Vertical

10 rows in set. Elapsed: 147.526 sec. Processed 100.00 million rows, 41.20 GB (677.85 thousand rows/s., 279.27 MB/s.)

Row 1:
──────
customer: 1
centroid: [0.49645231463677153,0.5042792240640065,...,0.5017436349466129]

1 row in set. Elapsed: 36.017 sec. Processed 100.00 million rows, 41.20 GB (2.78 million rows/s., 1.14 GB/s.)
Peak memory usage: 437.54 MiB.

centroids 表相比,速度快了 1700 倍

SELECT customer, avgForEachMerge(vector) AS centroid
FROM centroids GROUP BY customer
ORDER BY customer ASC
LIMIT 1
FORMAT Vertical

Row 1:
──────
customer: 1
centroid: [0.49645231463677153,0.5042792240640065,...,0.5017436349466129]

1 row in set. Elapsed: 0.085 sec. Processed 10.00 thousand rows, 16.28 MB (117.15 thousand rows/s., 190.73 MB/s.)

总结

有了对质心进行增量计算的能力,让我们关注 K 均值聚类。假设我们正在尝试对 points 表进行聚类,其中每一行都有一个向量表示。在这里,我们将根据相似性进行聚类,而不是像处理交易那样仅根据客户进行聚类。

单次迭代

我们需要能够在算法的每次迭代后存储当前质心。现在,假设我们已经确定了 K 的最优值。我们的质心目标表可能如下所示

CREATE TABLE centroids
(
  k UInt32,
  iteration UInt32,
  centroid UInt32,
  vector AggregateFunction(avgForEach, Array(Float32))
)
ENGINE = AggregatingMergeTree 
ORDER BY (k, iteration, centroid)

k 列的值设置为我们选择的 K 值。此处的 centroid 列表示质心编号本身,其值介于 0 和 K-1 之间。我们没有为算法的每次迭代使用单独的表,而是简单地包含一个 iteration 列,并确保排序键为 (k, iteration, centroid)。ClickHouse 将确保仅对每个唯一的 K、质心和迭代进行中间状态的合并。这意味着我们的最终行数将很小,从而确保快速查询这些质心。

我们的物化视图用于计算质心,只需对 GROUP BY k, centroiditeration 进行少量调整即可

CREATE TABLE temp
(
   k UInt32,
   iteration UInt32,
   centroid UInt32,
   vector Array(Float32)
)
ENGINE = Null

CREATE MATERIALIZED VIEW centroids_mv TO centroids
AS SELECT k, iteration, centroid, avgForEachState(vector) AS vector
FROM temp GROUP BY k, centroid, iteration

请注意,我们的查询是在插入到 temp 表中的块上执行的,而不是我们的数据源表 transactions,该表没有 iterationcentroid 列。此临时表将接收我们的插入,并再次使用 Null 表引擎以避免写入数据。有了这些构建块,我们可以将算法的单次迭代可视化,假设 K = 5

kmeans_clickhouse.png

上面显示了如何将数据插入到我们的临时表中,并通过执行使用 points 表作为源数据的 INSERT INTO SELECT 来计算我们的质心。此插入有效地代表了算法的一次迭代。此处的 SELECT 查询至关重要,因为它需要指定交易向量及其当前质心和迭代(以及 K 的固定值)。我们如何计算这两个值中的后者?完整的 INSERT INTO SELECT 如下所示

INSERT INTO temp 
WITH
  5 as k_val,
  -- (1) obtain the max value of iteration - will be the previous iteration
  (
      SELECT max(iteration)
      FROM centroids 
      -- As later we will reuse this table for all values of K
      WHERE k = k_val
  ) AS c_iteration,
  (
      -- (3) convert centroids into a array of tuples 
      -- i.e. [(0, [vector]), (1, [vector]), ... , (k-1, [vector])]
      SELECT groupArray((centroid, position))
      FROM
      (
         -- (2) compute the centroids from the previous iteration
          SELECT
              centroid,
              avgForEachMerge(vector) AS position
          FROM centroids
          WHERE iteration = c_iteration AND k = k_val
          GROUP BY centroid
      )
  ) AS c_centroids
SELECT
  k_val AS k,
  -- (4) increment the iteration
  c_iteration + 1 AS iteration,
  -- (5) find the closest centroid for this vector using Euclidean distance
  (arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).1 AS centroid,
  vector AS v
FROM points

首先,在 (1) 中,此查询标识了前一次迭代的编号。然后在 (2) 中的 CTE 中使用它来确定为此迭代(以及选择的 K)生成的质心,使用前面显示的相同的 avgForEachMerge 查询。这些质心通过 groupArray 查询折叠成包含元组数组的单行,以方便与点进行匹配。在 SELECT 中,我们递增迭代编号 (4) 并计算新的最近质心(使用欧几里得距离L2Distance 函数)使用arrayMaparraySort 函数对每个点进行计算。

通过将行插入到此处的 temp 中,并使用基于前一次迭代的质心,我们可以让物化视图计算新的质心(迭代值 +1)。

初始化质心

以上假设我们有一些用于迭代 1 的初始质心,这些质心用于计算成员资格。这要求我们初始化系统。我们可以通过简单地选择并插入 K 个随机点来完成此操作,使用以下查询(k=5)

INSERT INTO temp WITH 
  5 as k_val,
  vectors AS
  (
      SELECT vector
      FROM points
      -- select random points, use k to make pseudo-random
      ORDER BY cityHash64(concat(toString(id), toString(k_val))) ASC
      LIMIT k_val -- k
  )
SELECT
  k_val as k,
  1 AS iteration,
  rowNumberInAllBlocks() AS centroid,
  vector
FROM vectors

成功的聚类对质心的初始位置非常敏感;分配不当会导致收敛缓慢或聚类效果不佳。我们将在稍后讨论这一点。

质心分配和何时停止迭代

以上所有内容都代表了一次迭代(以及初始化步骤)。每次迭代后,我们需要根据聚类是否收敛的经验测量结果来决定是否停止。最简单的方法是,只要点在迭代之间不再改变质心(以及聚类)就停止。

要确定哪些点属于哪些质心,我们可以在任何时候使用我们之前在 INSERT INTO SELECT 中使用的 SELECT。

要计算上次迭代中移动聚类的点数,我们首先计算前两次迭代 (1) 和 (2) 的质心。使用这些,我们确定每次迭代 (3) 和 (4) 中每个点的质心。如果这些相同 (5),我们返回 0,否则返回 1。这些 (6) 值的总和为我们提供了移动聚类的点数。

WITH 5 as k_val,
(
      SELECT max(iteration)
      FROM centroids
) AS c_iteration,
(
  -- (1) current centroids
  SELECT groupArray((centroid, position))
  FROM
  (
      SELECT
          centroid,
          avgForEachMerge(vector) AS position
      FROM centroids
      WHERE iteration = c_iteration AND k = k_val
      GROUP BY centroid
  )
) AS c_centroids,
(
  -- (2) previous centroids
  SELECT groupArray((centroid, position))
  FROM
  (
      SELECT
          centroid,
          avgForEachMerge(vector) AS position
      FROM centroids
      WHERE iteration = (c_iteration-1) AND k = k_val
      GROUP BY centroid
  )
) AS c_p_centroids
-- (6) sum differences
SELECT sum(changed) FROM (
  SELECT id,
  -- (3) current centroid for point
  (arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).1 AS cluster,
  -- (4) previous centroid for point
  (arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_p_centroids))[1]).1 AS cluster_p,
  -- (5) difference in allocation
  if(cluster = cluster_p, 0, 1) as changed
  FROM points
)

测试数据集

以上主要是理论上的。让我们看看以上是否在实际数据集上有效!为此,我们将使用流行的纽约市出租车数据集的 300 万行子集,因为聚类可能与现实相关。要从 S3 创建和插入数据

CREATE TABLE trips (
  trip_id         	UInt32,
  pickup_datetime 	DateTime,
  dropoff_datetime	DateTime,
  pickup_longitude	Nullable(Float64),
  pickup_latitude 	Nullable(Float64),
  dropoff_longitude   Nullable(Float64),
  dropoff_latitude	Nullable(Float64),
  passenger_count 	UInt8,
  trip_distance   	Float32,
  fare_amount     	Float32,
  extra           	Float32,
  tip_amount      	Float32,
  tolls_amount    	Float32,
  total_amount    	Float32,
  payment_type    	Enum('CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4, 'UNK' = 5),
  pickup_ntaname  	LowCardinality(String),
  dropoff_ntaname 	LowCardinality(String)
)
ENGINE = MergeTree
ORDER BY (pickup_datetime, dropoff_datetime);

INSERT INTO trips SELECT trip_id, pickup_datetime, dropoff_datetime, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, passenger_count, trip_distance, fare_amount, extra, tip_amount, tolls_amount, total_amount, payment_type, pickup_ntaname, dropoff_ntaname
FROM gcs('https://storage.googleapis.com/clickhouse-public-datasets/nyc-taxi/trips_{0..2}.gz', 'TabSeparatedWithNames');

特征选择

特征选择对于良好的聚类至关重要,因为它直接影响形成的聚类的质量。我们在此不会详细介绍如何选择特征。对于有兴趣的人,我们在笔记本中包含了笔记。我们最终得到以下 points

CREATE TABLE points
(
   `id` UInt32,
   `vector` Array(Float32),
   `pickup_hour` UInt8,
   `pickup_day_of_week` UInt8,
   `pickup_day_of_month` UInt8,
   `dropoff_hour` UInt8,
   `pickup_longitude` Float64,
   `pickup_latitude` Float64,
   `dropoff_longitude` Float64,
   `dropoff_latitude` Float64,
   `passenger_count` UInt8,
   `trip_distance` Float32,
   `fare_amount` Float32,
   `total_amount` Float32
) ENGINE = MergeTree ORDER BY id

要填充此表,我们使用 INSERT INTO SELECT SQL 查询,该查询创建特征、对其进行缩放并过滤任何异常值。请注意,我们的最终列也编码在 vector 列中。

链接的查询是我们生成特征的第一次尝试。我们希望这里可以进行更多工作,这可能会产生比显示结果更好的结果。欢迎提出建议!

一些 Python 代码

我们已经描述了算法中的迭代如何有效地简化为 INSERT INTO SELECT,物化视图负责维护质心。这意味着我们需要调用此语句 N 次,直到收敛。

我们没有等到没有点在质心之间移动的状态,而是使用 1000 的阈值,即如果移动聚类的点数少于 1000,我们就会停止。每 5 次迭代进行一次检查。

鉴于 ClickHouse 执行了大部分工作,用于执行特定 K 值的 K 均值的伪代码变得非常简单。

def kmeans(k, report_every = 5, min_cluster_move = 1000):
   startTime = time.time()
   # INITIALIZATION QUERY
   run_init_query(k)
   i = 0
   while True:
       # ITERATION QUERY
       run_iteration_query(k)
       # report every N iterations
       if (i + 1) % report_every == 0 or i == 0:
           num_moved = calculate_points_moved(k)
           if num_moved <= min_cluster_move:
               break
       i += 1
   execution_time = (time.time() - startTime))
   # COMPUTE d^2 ERROR
   d_2_error = compute_d2_error(k)
   # return the d^2, execution time and num of required iterations
   return d_2_error, execution_time, i+1

此循环的完整代码(包括查询)可以在笔记本中找到。

选择 K

到目前为止,我们假设已经确定了 K。有几种技术可以确定 K 的最佳值,最简单的方法是计算每个点的每个 K 值与其各自聚类的聚合平方距离 (SSE)。这为我们提供了一个我们旨在最小化的成本指标。compute_d2_error 方法使用以下 SQL 查询来计算此值(假设 K 的值为 5)

WITH 5 as k_val,
(
       SELECT max(iteration)
       FROM centroids WHERE k={k}
) AS c_iteration,
(
   SELECT groupArray((centroid, position))
   FROM
   (
       SELECT
           centroid,
           avgForEachMerge(vector) AS position
       FROM centroids
       WHERE iteration = c_iteration AND k=k_val
       GROUP BY centroid
   )
) AS c_centroids
SELECT
   sum(pow((arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).2, 2)) AS distance
FROM points

随着 K 的增加,此值保证会减小,例如,如果我们将 K 设置为点的数量,则每个聚类将包含 1 个点,因此我们的误差为 0。不幸的是,这不会很好地泛化数据!

随着 K 的增加,SSE 通常会减小,因为数据点更接近其聚类质心。目标是找到“拐点”,即 SSE 的下降率急剧变化的点。这个点表明增加 K 带来的益处正在递减。在拐点处选择 K 可以提供一个模型,该模型可以捕捉数据中固有的分组,而不会过拟合。识别此拐点的一种简单方法是绘制 K 与 SEE 的关系图并直观地确定该值。对于我们的纽约市出租车数据,我们测量并绘制了 K 值为 2 到 20 时的 SSE

k_vs_d_2.png

此处的拐点不像我们希望的那样清晰,但值为 5 似乎是一个合理的候选值。

以上结果基于对每个 K 值的单次端到端运行。K 均值可能会收敛到局部最小值,无法保证附近的点最终会出现在同一个聚类中。建议对每个 K 值运行多个值,每次使用不同的初始质心,以找到最佳候选值。

结果

如果我们将 5 选择为 K 的值,则算法在大约 30 次迭代和 20 秒内在 12 个 vCPU ClickHouse 云节点上收敛。这种方法在每次迭代中都考虑了所有 300 万行。

k=5
initializing...OK
Iteration 0
Number changed cluster in first iteration: 421206
Iteration 1, 2, 3, 4
Number changed cluster in iteration 5: 87939
Iteration 5, 6, 7, 8, 9
Number changed cluster in iteration 10: 3610
Iteration 10, 11, 12, 13, 14
Number changed cluster in iteration 15: 1335
Iteration 15, 16, 17, 18, 19
Number changed cluster in iteration 20: 1104
Iteration 20, 21, 22, 23, 24
Number changed cluster in iteration 25: 390
stopping as moved less than 1000 clusters in last iteration
Execution time in seconds: 20.79200577735901
D^2 error for 5: 33000373.34968858

要将这些聚类可视化,我们需要降低维数。为此,我们使用主成分分析 (PCA)。我们将 SQL 中 PCA 的实现推迟到另一篇博客,只使用 Python 和 10,000 个随机点的样本。我们可以通过检查主成分占用了多少方差来评估 PCA 在捕捉数据的基本属性方面的有效性。82% 低于通常使用的 90% 阈值,但足以了解我们聚类的有效性

Explained variances of the 3 principal components: 0.824

使用我们的 3 个主成分,我们可以绘制相同的 10,000 个随机点,并根据其聚类将颜色与每个点相关联。

kmeans_1.png

聚类的 PCA 可视化显示了跨 PC1 和 PC3 的密集平面,整齐地划分为四个不同的聚类,表明这些维度内的方差受限。沿着第二个主成分 (PC2),可视化变得更加稀疏,一个聚类(编号 3)与主组不同,可能特别有趣。

为了更好地理解我们的聚类结果,我们需要添加标签。理想情况下,我们可以通过探索每个聚类中每个列的分布来实现,寻找独特的特征和时间/空间模式。我们将尝试使用一个简洁的 SQL 查询来理解每个聚类中每个列的分布,以达到这个目的。为了确定关注的列,我们可以检查 PCA 成分的数值,并识别出主导维度。执行此操作的代码可以在笔记本中找到,并识别出以下内容

PCA1:: ['pickup_day_of_month: 0.9999497049810415', 'dropoff_latitude: -0.006371842399701939', 'pickup_hour: 0.004444108327647353', 'dropoff_hour: 0.003868258226185553', …]

PCA 2:: ['total_amount: 0.5489526881298809', 'fare_amount: 0.5463895585884886', 'pickup_longitude: 0.43181504878694826', 'pickup_latitude: -0.3074228612885196', 'dropoff_longitude: 0.2756342866763702', 'dropoff_latitude: -0.19809343490462433', …]

PCA 3:: ['dropoff_hour: -0.6998176337701472', 'pickup_hour: -0.6995098287872831', 'pickup_day_of_week: 0.1134719682173672', 'pickup_longitude: -0.05495391127067617', …]

对于 PCA1,pickup_day_of_month 很重要,表明重点是月份的时间。对于 PC2,维度,接送地点和行程费用似乎贡献很大。该成分可能专注于特定类型的行程。最后,对于 PC3,行程发生的时段似乎最相关。为了理解这些列在时间、日期和价格方面如何因聚类而异,我们再次可以使用 SQL 查询

WITH
   5 AS k_val,
   (
       SELECT max(iteration)
       FROM centroids
       WHERE k = k_val
   ) AS c_iteration,
   (
       SELECT groupArray((centroid, position))
       FROM
       (
           SELECT
               centroid,
               avgForEachMerge(vector) AS position
           FROM centroids
           WHERE (iteration = c_iteration) AND (k = k_val)
           GROUP BY centroid
       )
   ) AS c_centroids
SELECT
   (arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).1 AS cluster,
   floor(avg(pickup_day_of_month)) AS pickup_day_of_month,
   round(avg(pickup_hour)) AS avg_pickup_hour,
   round(avg(fare_amount)) AS avg_fare_amount,
   round(avg(total_amount)) AS avg_total_amount
FROM points
GROUP BY cluster
ORDER BY cluster ASC

┌─cluster─┬─pickup_day_of_month─┬─avg_pickup_hour─┬─avg_fare_amount─┬─avg_total_amount─┐
│   	011141113 │
│   	13141214 │
│   	218131113 │
│   	316144958 │
│   	426141214 │
└─────────┴─────────────────────┴─────────────────┴─────────────────┴──────────────────┘

9 rows in set. Elapsed: 0.625 sec. Processed 2.95 million rows, 195.09 MB (4.72 million rows/s., 312.17 MB/s.)
Peak memory usage: 720.16 MiB.

聚类 3 明显与更昂贵的行程相关。考虑到行程费用与一个主成分相关联,该成分也确定了接送地点为关键,这些行程可能与特定类型的行程相关联。其他聚类需要更深入的分析,但似乎侧重于每月模式。我们可以在地图可视化上绘制仅聚类 3 的接送地点。在下图中,蓝色和红色点分别代表接送地点

clusters_nyc_map.png

仔细观察该图后,发现该聚类与往返 JFK 机场的机场行程相关联。

缩放

我们之前的示例仅使用了纽约市出租车行程 300 万行的子集。在对 2009 年所有出租车行程(1.7 亿行)的更大数据集进行测试时,我们可以使用 ClickHouse 服务在 60 个核心上,在约 3 分钟内完成 k=5 的聚类。

k=5
initializing...OK
…
Iteration 15, 16, 17, 18, 19
Number changed cluster in iteration 20: 288
stopping as moved less than 1000 clusters in last iteration
Execution time in seconds: 178.61135005950928
D^2 error for 5: 1839404623.265372
Completed in 178.61135005950928s and 20 iterations with error 1839404623.265372

这产生了与我们之前较小的子集相似的聚类。使用 scikit-learn 在 64 核 m5d.16xlarge 上运行相同的聚类需要 6132 秒,慢了 34 倍!在笔记本的末尾可以找到重现此基准测试的步骤,以及使用 这些步骤 来使用 scikit-learn。

潜在改进和未来工作

聚类对选择的初始点非常敏感。K-Means++ 是一种比标准 K-Means 聚类更好的方法,它通过引入更智能的初始化过程来解决这个问题,该过程旨在分散初始质心,从而降低初始质心放置不当的可能性,并导致更快的收敛以及潜在的更好的聚类。我们将此作为练习留给读者进行改进。

K-Means 也难以处理分类变量。可以通过独热编码(在 SQL 中也可以实现)以及专门的算法(例如针对此类数据而设计的 KModes 聚类)来部分解决这个问题。特定领域的自定距离函数(而不是仅使用欧几里得距离)也很常见,应该可以使用用户定义函数(UDF)来实现。

最后,探索其他软聚类算法(例如,针对正态分布特征的高斯混合模型)或层次聚类算法(例如,凝聚式聚类)可能也很有趣。这些后一种方法也克服了 K-Means 的主要局限性之一——需要指定 K。我们希望看到在 ClickHouse SQL 中实现这些方法的尝试!

立即开始使用 ClickHouse Cloud,并获得 300 美元的积分。在您的 30 天试用期结束后,您可以继续使用按需付费计划,或 联系我们 以了解有关我们的按量计费折扣的更多信息。请访问我们的 价格页面 以获取详细信息。

分享此帖子

订阅我们的时事通讯

了解有关功能发布、产品路线图、支持和云服务的最新信息!
正在加载表单...
关注我们
Twitter imageSlack imageGitHub image
Telegram imageMeetup imageRss image
©2024ClickHouse, Inc. 总部位于加州湾区和荷兰阿姆斯特丹。