DoubleCloud 即将停止运营。利用限时免费迁移服务迁移到 ClickHouse。立即联系我们 ->->

博客 / 工程

如何使用 ClickHouse SQL 扩展 K 均值聚类

author avatar
Dale McDiarmid
2024 年 4 月 11 日

简介

最近,在帮助一个想要从 ClickHouse 中保存的向量计算质心的用户时,我们意识到同样的解决方案可以用来实现 K 均值聚类。他们希望在可能数十亿个数据点上大规模地解决这个问题,同时确保内存可以得到严格管理。在这篇文章中,我们尝试仅使用 SQL 来实现 K 均值聚类,并展示它可以扩展到数十亿行。

在撰写这篇博文时,我们了解到 Boris Tyshkevich 所做的工作。虽然我们在本文中使用了不同的方法,但我们想感谢 Boris 的工作,以及他比我们更早地想到这个主意!

作为用 ClickHouse SQL 实现 K 均值的一部分,我们在不到 3 分钟的时间内对 1.7 亿个纽约出租车行程进行了聚类。使用相同资源的等效 scikit-learn 操作需要超过 100 分钟,并且需要 90GB 的 RAM。没有内存限制,并且 ClickHouse 自动分配计算,我们证明 ClickHouse 可以加速机器学习工作负载并减少迭代时间。

kmeans_cluster_1.png

本文所有代码均可在笔记本中找到 这里.

为什么在 ClickHouse SQL 中使用 K 均值?

使用 ClickHouse SQL 进行 K 均值的关键动机是训练不受内存限制,这使得能够对 PB 数据集进行聚类,这得益于质心的增量计算(使用设置来限制内存开销)。相比之下,使用基于 Python 的方法在服务器之间分配此工作负载将需要额外的框架和复杂性。

此外,我们可以轻松地增加我们聚类中的并行级别以使用 Clickhouse 实例的全部资源。如果需要处理更大的数据集,我们只需扩展数据库服务——这在 ClickHouse Cloud 中是一个简单的操作。

将数据转换为 K 均值是一个简单的 SQL 查询,每秒可以处理数十亿行。有了 ClickHouse 中的质心和点,我们可以仅使用 SQL 计算统计数据,例如模型误差,并可能将我们的聚类用于其他操作,例如用于向量搜索的产品量化。

K 均值回顾

K 均值是一种无监督机器学习算法,用于将数据集划分为 K 个不同的、非重叠的子组(聚类),其中每个数据点都属于与最近平均值(聚类的质心)最接近的聚类。该过程从随机初始化 K 个质心开始,或者基于一些启发式算法。这些质心作为聚类的初始代表。然后,算法在两个主要步骤中迭代,直到收敛:分配和更新。

在分配步骤中,根据每个数据点与质心之间的欧几里得距离(或其他距离度量),将其分配给最接近的聚类。在更新步骤中,质心被重新计算为分配给它们各自聚类的所有点的平均值,这可能会改变它们的位置。

此过程保证收敛,随着点到聚类的分配最终稳定下来,并且在迭代之间不会改变。聚类的数量 K 需要事先指定,并且会严重影响算法的有效性,最佳值取决于数据集和聚类的目标。有关更多详细信息,我们推荐此优秀的概述.

点和质心

我们的用户提出的关键问题是有效地计算质心的能力。假设我们有一个简单的 transactions 表的数据模式,其中每行代表特定客户的银行交易。ClickHouse 中的向量表示为 Array 类型。

CREATE TABLE transactions
(
  id UInt32,
  vector Array(Float32), 
  -- e.g.[0.6860357,-1.0086979,0.83166444,-1.0089169,0.22888935]
  customer UInt32,
  ...other columns omitted for brevity
)
ENGINE = MergeTree ORDER BY id

我们的用户想要找到每个客户的质心,实际上是与每个客户相关联的所有交易向量的平均位置。要找到平均向量集,我们可以使用 avgForEach[1][2] 函数。例如,考虑计算 3 个向量的平均值,每个向量有 4 个元素

WITH vectors AS
   (
       SELECT c1 AS vector
       FROM VALUES([1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12])
   )
SELECT avgForEach(vector) AS centroid
FROM vectors

┌─centroid──┐
│ [5,6,7,8] │
└───────────┘

在我们的原始 transactions 表中,因此计算每个客户的平均值变为

SELECT customer, avgForEach(vector) AS centroid FROM transactions GROUP BY customer

虽然简单,但这种方法有一些局限性。首先,对于非常大的数据集,当 vector 包含许多 Float32 点,并且 customer 列具有许多唯一元素(高基数)时,此查询可能非常占用内存。其次,也许与 K 均值更相关的是,如果插入新行,此方法需要我们重新运行查询,这是低效的。我们可以通过物化视图和 AggregatingMergeTree 引擎来解决这些问题。

使用物化视图增量计算质心

物化视图允许我们将计算质心的成本转移到插入时间。与其他数据库不同,ClickHouse 物化视图只是一个触发器,在数据块被插入表时运行查询。此查询的结果将插入到第二个“目标”表中。在我们的例子中,物化视图查询将计算我们的质心,并将结果插入到 centroids 表中。

Incremental computing centroids with MV.png

这里有一些重要的细节

  • 我们的查询(计算质心)必须以可以与后续结果集合并的格式生成结果集——因为插入的每个块都会生成一个结果集。我们不是仅仅将平均值发送到 centroids 表(平均值的平均值将是不正确的),而是发送“平均状态”。平均状态表示包含每个向量位置的总和以及计数。这是使用 avgForEachState 函数实现的——请注意,我们只是在函数名称后面添加了 State!AggregatingMergeTree 表引擎是存储这些聚合状态所必需的。我们将在下面对此进行更多探讨。
  • 整个过程是增量的,centroids 表包含最终状态,即每个质心对应一行。读者会注意到,接收插入的表具有 Null 表引擎。这会导致插入的行被丢弃,从而节省与每次迭代写入完整数据集相关的 IO。
  • 物化视图的查询只在数据块被插入时执行。每个块中的行数可能会有所不同,具体取决于插入方法。我们建议在客户端端制定块时,每块至少包含 1000 行,例如使用 Go 客户端。如果服务器被设置为形成块(例如,通过 HTTP 插入时),大小也可以指定
  • 如果使用 INSERT INTO SELECT 从另一个表或外部源(例如 S3)读取行到 ClickHouse,则可以通过几个关键参数控制块大小,这些参数在 之前的博客 中进行了详细讨论。这些设置(以及 插入线程数)会对使用的内存(更大的块 = 更多的内存)和摄取速度(更大的块 = 更快)产生重大影响。这些设置意味着可以使用内存量 可以进行精细控制,以换取性能。

AggregatingMergeTree

我们的目标表 centroids 使用引擎 AggregatingMergeTree

CREATE TABLE centroids
(
   customer UInt32,
   vector AggregateFunction(avgForEach, Array(Float32))
)
ENGINE = AggregatingMergeTree  ORDER BY customer

我们这里的 vector 列包含由 avgForEachState 函数在上面产生的聚合状态。这些是必须合并以产生最终答案的中间质心。此列需要具有适当的类型 AggregateFunction(avgForEach, Array(Float32))

与所有 ClickHouse MergeTree 表一样,AggregatingMergeTree 将数据存储为必须透明合并的部分,以允许更有效的查询。当合并包含我们聚合状态的部分时,必须这样做,以便仅合并与同一客户相关的状态。这实际上是通过使用 ORDER BY 子句按 customer 列对表进行排序来实现的。在查询时,我们还必须确保中间状态被分组和合并。这可以通过确保我们按 customerGROUP BY 并使用 avgForEach 函数的合并等效项 avgForEachMerge 来实现。

SELECT customer, avgForEachMerge(vector) AS centroid
FROM centroids GROUP BY customer

所有聚合函数都有一个等效的状态函数,通过在其名称后附加 State 来获得,它生成一个可以存储的中间表示,然后可以检索并与 Merge 等效项合并。有关更多详细信息,我们建议您阅读 这篇博文 以及我们 Mark 的视频。

与我们之前的 GROUP BY 相比,此查询将非常快。计算平均值的大部分工作已转移到插入时间,查询时间合并的少量行留了下来。考虑以下两种方法在 48GiB、12 vCPU 云服务上使用 1 亿条随机交易时的性能。加载数据的步骤 在这里

对比从 transactions 表计算质心的性能

SELECT customer, avgForEach(vector) AS centroid
FROM transactions GROUP BY customer
ORDER BY customer ASC
LIMIT 1 FORMAT Vertical

10 rows in set. Elapsed: 147.526 sec. Processed 100.00 million rows, 41.20 GB (677.85 thousand rows/s., 279.27 MB/s.)

Row 1:
──────
customer: 1
centroid: [0.49645231463677153,0.5042792240640065,...,0.5017436349466129]

1 row in set. Elapsed: 36.017 sec. Processed 100.00 million rows, 41.20 GB (2.78 million rows/s., 1.14 GB/s.)
Peak memory usage: 437.54 MiB.

centroids 表相比,速度快 1700 倍以上

SELECT customer, avgForEachMerge(vector) AS centroid
FROM centroids GROUP BY customer
ORDER BY customer ASC
LIMIT 1
FORMAT Vertical

Row 1:
──────
customer: 1
centroid: [0.49645231463677153,0.5042792240640065,...,0.5017436349466129]

1 row in set. Elapsed: 0.085 sec. Processed 10.00 thousand rows, 16.28 MB (117.15 thousand rows/s., 190.73 MB/s.)

将所有内容整合在一起

有了我们增量计算质心的能力,让我们关注 K 均值聚类。假设我们尝试对表 points 进行聚类,其中每一行都有一个向量表示。在这里,我们将根据相似性进行聚类,而不是像我们对交易所做的那样,仅仅根据客户来确定我们的质心。

一次迭代

我们需要能够在算法的每次迭代后存储当前的质心。现在,假设我们已经确定了 K 的最佳值。我们用于质心的目标表可能如下所示

CREATE TABLE centroids
(
  k UInt32,
  iteration UInt32,
  centroid UInt32,
  vector AggregateFunction(avgForEach, Array(Float32))
)
ENGINE = AggregatingMergeTree 
ORDER BY (k, iteration, centroid)

k 列的值设置为我们选择的 K 值。我们这里的 centroid 列表示质心编号本身,其值介于 0 和 K-1 之间。与其为算法的每次迭代使用单独的表,不如我们简单地包含一个 iteration 列,并确保我们的排序键是 (k, iteration, centroid)。ClickHouse 将确保仅针对每个唯一的 K、质心和迭代合并中间状态。这意味着我们的最终行数将很小,从而确保快速查询这些质心。

我们的物化视图用于计算我们的质心,它应该很熟悉,只是需要对 GROUP BY k、centroid 和 iteration 进行一些小的调整

CREATE TABLE temp
(
   k UInt32,
   iteration UInt32,
   centroid UInt32,
   vector Array(Float32)
)
ENGINE = Null

CREATE MATERIALIZED VIEW centroids_mv TO centroids
AS SELECT k, iteration, centroid, avgForEachState(vector) AS vector
FROM temp GROUP BY k, centroid, iteration

请注意,我们的查询执行的是插入到 temp 表中的块,而不是我们的数据源表事务,它没有 iterationcentroid 列。此临时表将接收我们的插入,并再次使用 Null 表引擎来避免写入数据。有了这些构建块,我们可以可视化算法的一次迭代,假设 K = 5

kmeans_clickhouse.png

以上显示了我们如何插入到我们的临时表中,并通过使用 points 表作为我们的源数据执行 INSERT INTO SELECT 来计算我们的质心。此插入有效地表示了算法的一次迭代。这里的 SELECT 查询至关重要,因为它需要指定交易向量及其当前质心和迭代(以及固定的 K 值)。我们如何计算后两者?完整的 INSERT INTO SELECT 如下所示

INSERT INTO temp 
WITH
  5 as k_val,
  -- (1) obtain the max value of iteration - will be the previous iteration
  (
      SELECT max(iteration)
      FROM centroids 
      -- As later we will reuse this table for all values of K
      WHERE k = k_val
  ) AS c_iteration,
  (
      -- (3) convert centroids into a array of tuples 
      -- i.e. [(0, [vector]), (1, [vector]), ... , (k-1, [vector])]
      SELECT groupArray((centroid, position))
      FROM
      (
         -- (2) compute the centroids from the previous iteration
          SELECT
              centroid,
              avgForEachMerge(vector) AS position
          FROM centroids
          WHERE iteration = c_iteration AND k = k_val
          GROUP BY centroid
      )
  ) AS c_centroids
SELECT
  k_val AS k,
  -- (4) increment the iteration
  c_iteration + 1 AS iteration,
  -- (5) find the closest centroid for this vector using Euclidean distance
  (arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).1 AS centroid,
  vector AS v
FROM points

首先,在 (1) 中,此查询标识前一次迭代的编号。然后,在 (2) 中的 CTE 中使用它来确定针对此迭代(以及选择的 K)产生的质心,使用前面显示的相同的 avgForEachMerge 查询。这些质心通过 groupArray 查询折叠成包含元组数组的单行,以方便与点进行轻松匹配。在 SELECT 中,我们增加迭代编号 (4) 并计算新的最近质心(使用欧几里得距离 L2Distance 函数),对每个点使用 arrayMaparraySort 函数。

通过将行插入到这里的临时表中,并使用基于前一次迭代的质心,我们可以让物化视图计算新的质心(迭代值 +1)。

初始化质心

以上假设我们有一些针对迭代 1 的初始质心,用于计算成员资格。这要求我们初始化系统。我们可以通过简单地选择和插入 K 个随机点来执行此操作,使用以下查询 (k=5)

INSERT INTO temp WITH 
  5 as k_val,
  vectors AS
  (
      SELECT vector
      FROM points
      -- select random points, use k to make pseudo-random
      ORDER BY cityHash64(concat(toString(id), toString(k_val))) ASC
      LIMIT k_val -- k
  )
SELECT
  k_val as k,
  1 AS iteration,
  rowNumberInAllBlocks() AS centroid,
  vector
FROM vectors

成功的聚类对质心的初始位置非常敏感;分配不当会导致收敛缓慢或聚类不佳。稍后我们将对此进行讨论。

质心分配和何时停止迭代

以上所有内容都代表一次迭代(和初始化步骤)。在每次迭代后,我们需要根据对聚类是否收敛的经验测量做出是否停止的决定。最简单的方法是当点在迭代之间不再改变质心(以及聚类)时停止。

为了确定哪些点属于哪些质心,我们可以随时使用我们之前在 INSERT INTO SELECT 中的 SELECT

为了计算在上一次迭代中移动聚类的点数,我们首先计算前两次迭代的质心 (1) 和 (2)。使用这些,我们确定每次迭代中每个点的质心 (3) 和 (4)。如果这些相同 (5),我们返回 0,否则返回 1。这些值的总和 (6) 为我们提供了移动聚类的点数。

WITH 5 as k_val,
(
      SELECT max(iteration)
      FROM centroids
) AS c_iteration,
(
  -- (1) current centroids
  SELECT groupArray((centroid, position))
  FROM
  (
      SELECT
          centroid,
          avgForEachMerge(vector) AS position
      FROM centroids
      WHERE iteration = c_iteration AND k = k_val
      GROUP BY centroid
  )
) AS c_centroids,
(
  -- (2) previous centroids
  SELECT groupArray((centroid, position))
  FROM
  (
      SELECT
          centroid,
          avgForEachMerge(vector) AS position
      FROM centroids
      WHERE iteration = (c_iteration-1) AND k = k_val
      GROUP BY centroid
  )
) AS c_p_centroids
-- (6) sum differences
SELECT sum(changed) FROM (
  SELECT id,
  -- (3) current centroid for point
  (arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).1 AS cluster,
  -- (4) previous centroid for point
  (arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_p_centroids))[1]).1 AS cluster_p,
  -- (5) difference in allocation
  if(cluster = cluster_p, 0, 1) as changed
  FROM points
)

测试数据集

以上内容主要是理论上的。让我们看看以上内容是否在真实数据集上有效!为此,我们将使用流行的纽约出租车数据集的 300 万行子集,因为这些聚类可能具有相关性。要从 S3 创建和插入数据

CREATE TABLE trips (
  trip_id         	UInt32,
  pickup_datetime 	DateTime,
  dropoff_datetime	DateTime,
  pickup_longitude	Nullable(Float64),
  pickup_latitude 	Nullable(Float64),
  dropoff_longitude   Nullable(Float64),
  dropoff_latitude	Nullable(Float64),
  passenger_count 	UInt8,
  trip_distance   	Float32,
  fare_amount     	Float32,
  extra           	Float32,
  tip_amount      	Float32,
  tolls_amount    	Float32,
  total_amount    	Float32,
  payment_type    	Enum('CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4, 'UNK' = 5),
  pickup_ntaname  	LowCardinality(String),
  dropoff_ntaname 	LowCardinality(String)
)
ENGINE = MergeTree
ORDER BY (pickup_datetime, dropoff_datetime);

INSERT INTO trips SELECT trip_id, pickup_datetime, dropoff_datetime, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, passenger_count, trip_distance, fare_amount, extra, tip_amount, tolls_amount, total_amount, payment_type, pickup_ntaname, dropoff_ntaname
FROM gcs('https://storage.googleapis.com/clickhouse-public-datasets/nyc-taxi/trips_{0..2}.gz', 'TabSeparatedWithNames');

特征选择

特征选择对于良好的聚类至关重要,因为它直接影响形成的聚类的质量。我们在这里不会详细介绍我们如何选择特征。对于有兴趣的人,我们将在 笔记本 中包含这些说明。最终,我们得到了以下 points

CREATE TABLE points
(
   `id` UInt32,
   `vector` Array(Float32),
   `pickup_hour` UInt8,
   `pickup_day_of_week` UInt8,
   `pickup_day_of_month` UInt8,
   `dropoff_hour` UInt8,
   `pickup_longitude` Float64,
   `pickup_latitude` Float64,
   `dropoff_longitude` Float64,
   `dropoff_latitude` Float64,
   `passenger_count` UInt8,
   `trip_distance` Float32,
   `fare_amount` Float32,
   `total_amount` Float32
) ENGINE = MergeTree ORDER BY id

要填充此表,我们使用 INSERT INTO SELECT SQL 查询,它创建特征、缩放特征并过滤任何异常值。请注意,我们的最终列也编码在 vector 列中。

链接的查询是我们首次尝试生成特征。我们预计这里可以做更多工作,这可能会产生比所示结果更好的结果。欢迎您提出建议!

一点 Python

我们已经描述了算法中的迭代如何有效地减少到 INSERT INTO SELECT,物化视图负责维护质心。这意味着我们需要调用此语句 N 次,直到收敛。

与其等到没有点在质心之间移动的状态,不如我们使用 1000 的阈值,即如果移动聚类的点数少于 1000 个,我们就停止。每隔 5 次迭代执行一次此检查。

给定 ClickHouse 执行的大部分工作,针对特定 K 值执行 K 均值的伪代码变得非常简单。

def kmeans(k, report_every = 5, min_cluster_move = 1000):
   startTime = time.time()
   # INITIALIZATION QUERY
   run_init_query(k)
   i = 0
   while True:
       # ITERATION QUERY
       run_iteration_query(k)
       # report every N iterations
       if (i + 1) % report_every == 0 or i == 0:
           num_moved = calculate_points_moved(k)
           if num_moved <= min_cluster_move:
               break
       i += 1
   execution_time = (time.time() - startTime))
   # COMPUTE d^2 ERROR
   d_2_error = compute_d2_error(k)
   # return the d^2, execution time and num of required iterations
   return d_2_error, execution_time, i+1

此循环的完整代码,包括查询,可以在 笔记本 中找到。

选择 K

到目前为止,我们假设 K 已经确定。有多种技术可以确定 K 的最佳值,最简单的方法是计算每个 K 值的每个点与其各自聚类之间的聚合平方距离 (SSE)。这给了我们一个我们试图最小化的成本指标。方法 compute_d2_error 使用以下 SQL 查询来计算此值(假设 K 的值为 5)

WITH 5 as k_val,
(
       SELECT max(iteration)
       FROM centroids WHERE k={k}
) AS c_iteration,
(
   SELECT groupArray((centroid, position))
   FROM
   (
       SELECT
           centroid,
           avgForEachMerge(vector) AS position
       FROM centroids
       WHERE iteration = c_iteration AND k=k_val
       GROUP BY centroid
   )
) AS c_centroids
SELECT
   sum(pow((arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).2, 2)) AS distance
FROM points

此值保证随着 K 的增加而减小,例如,如果我们将 K 设置为点的数量,每个聚类将有 1 个点,从而使我们的误差为 0。不幸的是,这将不会很好地概括数据!

随着 K 的增加,SSE 通常会减小,因为数据点更靠近它们的聚类质心。目标是找到“肘点”,即 SSE 的下降率急剧变化的点。该点表明增加 K 的收益递减。在肘点处选择 K 可以提供一个模型,该模型可以捕获数据中固有的分组,而不会过度拟合。识别此肘点的一种简单方法是绘制 K 与 SEE 并直观地识别该值。对于我们的纽约出租车数据,我们测量并绘制了 K 值 2 到 20 的 SSE

k_vs_d_2.png

这里的肘点不如我们想要的那么清晰,但 5 的值似乎是一个合理的候选者。

以上结果基于每个 K 值的单次端到端运行。K 均值可以收敛到局部最小值,无法保证附近的点最终会落在同一个聚类中。建议针对每个 K 值运行多个值,每次使用不同的初始质心,以找到最佳候选者。

结果

如果我们选择 5 作为 K 的值,算法大约需要 30 次迭代和 20 秒才能在一个 12 vCPU ClickHouse 云节点上收敛。这种方法考虑了每次迭代的 300 万行。

k=5
initializing...OK
Iteration 0
Number changed cluster in first iteration: 421206
Iteration 1, 2, 3, 4
Number changed cluster in iteration 5: 87939
Iteration 5, 6, 7, 8, 9
Number changed cluster in iteration 10: 3610
Iteration 10, 11, 12, 13, 14
Number changed cluster in iteration 15: 1335
Iteration 15, 16, 17, 18, 19
Number changed cluster in iteration 20: 1104
Iteration 20, 21, 22, 23, 24
Number changed cluster in iteration 25: 390
stopping as moved less than 1000 clusters in last iteration
Execution time in seconds: 20.79200577735901
D^2 error for 5: 33000373.34968858

为了可视化这些聚类,我们需要降低维度。为此,我们使用主成分分析 (PCA)。我们将 PCA 在 SQL 中的实现留待另一篇博客,只使用 Python 和 10,000 个随机点的样本。我们可以通过检查主成分占了多少方差来评估 PCA 在捕获数据基本属性方面的有效性。82% 低于通常使用的 90% 的阈值,但足以理解我们聚类有效性。

Explained variances of the 3 principal components: 0.824

使用我们的 3 个主成分,我们可以绘制相同的 10,000 个随机点,并根据其聚类为每个点关联一种颜色。

kmeans_1.png

聚类的 PCA 可视化显示了跨 PC1 和 PC3 的一个密集平面,整齐地划分为四个不同的聚类,这表明这些维度内的方差受限。沿着第二主成分 (PC2),可视化变得稀疏,一个聚类(编号 3)偏离了主群体,可能特别有趣。

为了理解我们的聚类,我们需要标签。理想情况下,我们会通过探索每个聚类中每列的分布来生成这些标签,寻找独特的特征和时间/空间模式。我们将尝试使用 SQL 查询简洁地做到这一点,以了解每个聚类中每列的分布。对于要关注的列,我们可以检查 PCA 成分的数值,并识别占主导地位的维度。在笔记本中可以找到用于执行此操作的代码,并识别以下内容:

PCA1:: ['pickup_day_of_month: 0.9999497049810415', 'dropoff_latitude: -0.006371842399701939', 'pickup_hour: 0.004444108327647353', 'dropoff_hour: 0.003868258226185553', …]

PCA 2:: ['total_amount: 0.5489526881298809', 'fare_amount: 0.5463895585884886', 'pickup_longitude: 0.43181504878694826', 'pickup_latitude: -0.3074228612885196', 'dropoff_longitude: 0.2756342866763702', 'dropoff_latitude: -0.19809343490462433', …]

PCA 3:: ['dropoff_hour: -0.6998176337701472', 'pickup_hour: -0.6995098287872831', 'pickup_day_of_week: 0.1134719682173672', 'pickup_longitude: -0.05495391127067617', …]

对于 PCA1,pickup_day_of_month 很重要,这表明关注月份的时间。对于 PC2,维度、接送地点、行程的成本似乎贡献很大。这个成分可能集中在特定类型的行程上。最后,对于 PC3,行程发生的时数似乎最相关。为了了解这些列在时间、日期和价格方面如何根据聚类而有所不同,我们再次可以使用 SQL 查询。

WITH
   5 AS k_val,
   (
       SELECT max(iteration)
       FROM centroids
       WHERE k = k_val
   ) AS c_iteration,
   (
       SELECT groupArray((centroid, position))
       FROM
       (
           SELECT
               centroid,
               avgForEachMerge(vector) AS position
           FROM centroids
           WHERE (iteration = c_iteration) AND (k = k_val)
           GROUP BY centroid
       )
   ) AS c_centroids
SELECT
   (arraySort(c -> (c.2), arrayMap(x -> (x.1, L2Distance(x.2, vector)), c_centroids))[1]).1 AS cluster,
   floor(avg(pickup_day_of_month)) AS pickup_day_of_month,
   round(avg(pickup_hour)) AS avg_pickup_hour,
   round(avg(fare_amount)) AS avg_fare_amount,
   round(avg(total_amount)) AS avg_total_amount
FROM points
GROUP BY cluster
ORDER BY cluster ASC

┌─cluster─┬─pickup_day_of_month─┬─avg_pickup_hour─┬─avg_fare_amount─┬─avg_total_amount─┐
│   	011141113 │
│   	13141214 │
│   	218131113 │
│   	316144958 │
│   	426141214 │
└─────────┴─────────────────────┴─────────────────┴─────────────────┴──────────────────┘

9 rows in set. Elapsed: 0.625 sec. Processed 2.95 million rows, 195.09 MB (4.72 million rows/s., 312.17 MB/s.)
Peak memory usage: 720.16 MiB.

聚类 3 明显与更昂贵的行程相关。鉴于行程的成本与一个主成分相关联,该成分还将接送地点确定为关键要素,这些地点可能与特定类型的行程相关联。其他聚类需要更深入的分析,但似乎侧重于月度模式。我们可以在地图可视化上绘制仅聚类 3 的接送地点。蓝色和红色点分别代表下图中的接送地点。

clusters_nyc_map.png

仔细观察该图,可以发现该聚类与往返肯尼迪机场的机场行程相关联。

缩放

我们之前的示例只使用了纽约市出租车行程的 300 万行子集。在针对 2009 年所有出租车行程(1.7 亿行)的更大数据集进行测试时,我们可以使用 ClickHouse 服务,在约 3 分钟内完成 k=5 的聚类,使用 60 个内核。

k=5
initializing...OK
…
Iteration 15, 16, 17, 18, 19
Number changed cluster in iteration 20: 288
stopping as moved less than 1000 clusters in last iteration
Execution time in seconds: 178.61135005950928
D^2 error for 5: 1839404623.265372
Completed in 178.61135005950928s and 20 iterations with error 1839404623.265372

这将生成与我们之前较小的子集相似的聚类。在 64 核 m5d.16xlarge 上使用 scikit-learn 运行相同的聚类需要 6132 秒,慢了 34 倍以上!在笔记本结尾处可以找到用于复制此基准测试的步骤,以及使用这些步骤 用于 scikit-learn 的步骤。

潜在的改进和未来工作

聚类对选定的初始点非常敏感。K-Means++ 是对标准 K-Means 聚类的一种改进,它通过引入一种更智能的初始化过程来解决这个问题,该过程旨在分散初始质心,从而降低了初始质心放置不当的可能性,并导致更快的收敛速度以及可能更好的聚类结果。我们将其留作练习,供读者改进。

K-Means 也难以处理分类变量。这可以通过独热编码(在 SQL 中也可能)以及专门的算法(例如为这类数据而设计的KModes 聚类)来部分解决。专门领域的自定义距离函数,而不是欧几里得距离,也很常见,应该可以使用用户定义函数 (UDF) 来实现。

最后,探索其他软聚类算法(如高斯混合模型,适用于正态分布特征)或层次聚类算法(如凝聚层次聚类)可能也很有趣。这些后一种方法也克服了 K-Means 的一个主要限制——需要指定 K。我们很想看到在 ClickHouse SQL 中实现这些算法的尝试!

立即开始使用 ClickHouse Cloud 并获得 300 美元的积分。在您的 30 天试用期结束后,您可以继续使用按需付费计划,或者联系我们 了解更多有关我们的按使用量计费折扣的信息。请访问我们的价格页面 了解详情。

分享此帖子

订阅我们的时事通讯

随时了解功能发布、产品路线图、支持和云服务!
正在加载表格...
关注我们
Twitter imageSlack imageGitHub image
Telegram imageMeetup imageRss image