在本文中,我们将探讨 MLOps 的世界,以及如何对 ClickHouse 中的数据进行建模和转换,使其能够作为高效的特征存储来训练机器学习模型。本文中讨论的方法已被现有的 ClickHouse 用户使用,我们感谢他们分享他们的技术,以及现成的特征存储。
我们专注于 ClickHouse 作为数据源、离线存储和转换引擎。这些特征存储组件对于高效且正确地将数据传递到模型训练至关重要。虽然大多数现成的特征存储提供了抽象,但我们将深入探讨并描述如何对数据进行有效建模以构建和服务特征。对于希望构建自己的特征存储的用户,或者只是对现有存储使用的方法感到好奇的用户,请继续阅读。
为什么选择 ClickHouse?
我们在之前的博文中探讨了特征存储是什么,并建议用户在深入了解本文之前先熟悉这个概念。简单来说,特征存储是一个用于存储和管理将用于训练机器学习模型的数据的集中式存储库,旨在提高协作和可重用性,并减少模型迭代时间。
作为实时数据仓库,ClickHouse 可以满足特征存储的两个主要组件,而不仅仅是提供数据源。
- 转换引擎:ClickHouse 利用 SQL 来声明数据转换,并通过其分析和统计函数进行优化。它支持从各种来源查询数据,例如 Parquet、Postgres 和 MySQL,并在 PB 级别的数据上执行聚合操作。物化视图允许在插入时进行数据转换。此外,ClickHouse 可以通过 chDB 在 Python 中用于转换大型数据帧。
- 离线存储:ClickHouse 可以通过
INSERT INTO SELECT
语句持久化查询结果,并自动生成表模式。它支持高效的数据迭代和扩展,其中特征通常以包含时间戳的表的形式表示,用于进行时间点查询。ClickHouse 的稀疏索引和ASOF LEFT JOIN
子句有助于快速过滤和特征选择,从而优化训练管道的准备工作。这项工作是并行执行的,并在整个集群中执行,使离线存储能够扩展到 PB 级,同时保持特征存储的轻量级。
在这篇文章中,我们将展示如何在 ClickHouse 中对数据进行建模和管理以执行这些角色。
高级步骤
当使用 ClickHouse 作为离线特征存储的基础时,我们认为训练模型的步骤如下所示
-
探索 - 使用 SQL 查询熟悉 ClickHouse 中的源数据。
-
识别数据集和特征 - 识别可能的特征、它们各自的实体以及生成它们所需的数据子集。我们将此步骤中的子集称为“特征子集”。
-
创建特征 - 创建生成特征所需的 SQL 查询。
-
生成模型数据 - 合并特征以生成一组特征向量,通常使用 ASOF JOIN 基于公共键和时间戳接近度来实现。
-
生成测试集和训练集 - 将“特征子集”拆分为测试集和训练集(以及可能的验证集)。
-
训练模型 - 使用训练数据训练模型,可以使用不同的算法。
-
模型选择和调整 - 根据验证集评估模型,选择最佳模型并微调超参数。
-
模型评估 - 根据测试集评估最终模型。如果性能足够,则停止;否则,返回步骤 2。
我们关注步骤 (1) 到 (5),因为这些步骤是 ClickHouse 特定的。上述过程的一个关键特性是它具有高度迭代性。步骤 (3) 和 (4) 可以被宽泛地称为“特征工程”,并且通常比选择模型和优化超参数花费更多时间。因此,优化此过程并确保 ClickHouse 的高效使用可以带来巨大的时间和成本节省。
我们将在下面探讨每个步骤,并提出一种灵活的方法,该方法可以最佳地利用 ClickHouse 的功能,使用户能够高效地进行迭代。
数据集和示例
在我们的示例中,我们使用以下 Web 分析数据集,此处有描述。此数据集包含 1 亿行,其中一个事件表示对特定 URL 的请求。我们在用户中看到一个常见的用例,即能够在 ClickHouse 中对 Web 分析数据进行机器学习模型训练[1][2]。
由于其大小,下表已截断为我们将使用的列。请参阅此处以获取完整模式。
CREATE TABLE default.web_events
(
`EventTime` DateTime,
`UserID` UInt64,
`URL` String,
`UserAgent` UInt8,
`RefererCategoryID` UInt16,
`URLCategoryID` UInt16,
`FetchTiming` UInt32,
`ClientIP` UInt32,
`IsNotBounce` UInt8,
-- many more columns...
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
为了说明建模步骤,假设我们希望使用此数据集构建一个模型,该模型预测用户在请求到达时是否会跳出。如果我们考虑上述源数据,则由列IsNotBounce
表示。这表示我们的目标类或标签。
我们不会真正构建此模型并提供 Python 代码,而是专注于数据建模过程。因此,我们对特征的选择仅用于说明。
步骤 1 - 探索
探索和理解源数据需要用户熟悉 ClickHouse SQL。我们建议用户在此步骤中熟悉 ClickHouse 的各种分析函数。熟悉数据后,我们可以开始识别模型的特征以及生成它们所需的数据子集。
步骤 2 - 特征和子集
为了预测访问是否会跳出,我们的模型将需要一个训练集,其中每个数据点都包含组装成特征向量的适当特征。通常,这些特征将基于数据的一个子集。
我们在其他博客中探讨了特征和特征向量的概念,以及它们如何与结果集的列和行分别松散地相关。重要的是,它们需要在训练和请求时可用。
请注意,我们在上面强调了“结果集”。特征向量很少仅仅是表中的一行,其中包含子集列的特征。通常,必须发出一个复杂的查询来从聚合或转换中计算特征。
识别特征
在识别特征之前,我们应该了解两个会影响建模过程的关键属性。
-
与实体的关联 - 特征通常与一个它们所关联或“关联”的
实体
相关联。对于我们的问题,我们认为可能有助于做出预测的特征可能是用户或域的混合。基于用户的特征将是请求特定的,例如,用户的年龄、客户端 IP 或用户代理。域特征将与访问的页面相关联,例如每年访问次数
。将特征与实体实例关联需要该实体具有键或标识符。在我们的例子中,我们需要用户的标识符和域值。这些分别来自
UserID
和URL
列。可以使用 ClickHouse 中的域函数从 URL 中提取域,即domain(URL)
。 -
动态和复杂 - 虽然某些特征保持相对静态,例如用户的年龄,但其他特征,例如客户端 IP,会随着时间的推移而发生变化。在这种情况下,我们需要访问特征在特定时间戳存在时的值。这对于创建时间点正确的训练集至关重要。
虽然某些特征可能很简单,例如设备是否为移动设备或客户端 IP,但其他更复杂的特征需要聚合统计信息,这些信息会随着时间而变化——而这正是 ClickHouse 擅长的地方!
示例特征
例如,假设我们认为以下特征有助于预测网站访问是否会跳出。我们所有的特征都与时间戳相关联,因为它们是动态的并且会随着时间而变化。
- 访问的用户代理 - 与用户实体相关联,可在
UserAgent
列中找到。 - 推荐来源的类别 - (例如,搜索引擎、社交媒体、直接)。用户特征,可通过
RefererCategoryID
列获取。 - 每小时访问的域数量 - 在用户发出请求时。需要使用
GROUP BY
来计算此用户特征。 - 每小时访问域的唯一 IP 数量 - 域特征,需要使用
GROUP BY
。 - 页面的类别 - 用户特征,可通过
URLCategoryID
列获取。 - 每小时域的平均请求时间 - 需要使用
GROUP BY
从FetchTiming
列计算此值。
这些特征可能并不理想,仅供说明。将许多这些特征链接到用户实体有些过于简单化。例如,某些特征更准确地与请求或会话实体相关联。
特征子集
一旦我们对将要使用的特征有了了解,我们就可以识别构建这些特征所需的数据子集。此步骤是可选的,因为有时用户希望使用所有数据,或者数据不够大或复杂到不需要此步骤。
当应用时,我们经常看到用户为其模型数据创建表——我们称之为“特征子集”。这些包括
- 每个特征向量的实体值。
- 如果存在,则为事件的时间戳。
- 类别标签。
- 生成计划特征所需的列。用户可能希望添加其他他们在未来迭代中认为有用的列。
这种方法提供了几个优势
-
允许对数据进行排序并针对未来的访问进行优化。通常,模型数据读取和过滤的方式与源数据不同。在生成最终训练集时,这些表也可以比源数据更快地生成特征。
-
模型可能需要一个数据子集或转换后的数据集。识别、过滤和转换此子集可能是一个代价高昂的查询。通过将数据插入到中间模型表中,此查询只需要执行一次。因此,随后的特征生成和模型训练运行可以有效地从该表中获取数据。
我们可以使用此步骤来对任何数据进行去重。虽然原始数据可能不包含重复项,但如果我们提取特征的列子集,则结果子集可能会包含重复项。
假设对于我们的模型,我们创建了一个中间表 predict_bounce_subset
。此表需要 EventTime
、标签 IsNotBounce
以及实体键 Domain
和 UserID
。此外,我们还包括简单的特征列 UserAgent
、RefererCategoryID
和 URLCategoryID
,以及生成聚合特征所需的那些列——FetchTiming
和 ClientIP
。
CREATE TABLE predict_bounce_subset
(
EventTime DateTime64,
UserID UInt64,
Domain String,
UserAgent UInt8,
RefererCategoryID UInt16,
URLCategoryID UInt16,
FetchTiming UInt32,
ClientIP UInt32,
IsNotBounce UInt8
)
ENGINE = ReplacingMergeTree
ORDER BY (EventTime, Domain, UserID, UserAgent, RefererCategoryID, URLCategoryID, FetchTiming, ClientIP, IsNotBounce)
PRIMARY KEY (EventTime, Domain, UserID)
我们使用ReplacingMergeTree 作为我们的表引擎。此引擎对在排序键列中具有相同值的行进行去重。这在合并树的后台异步执行,尽管我们可以使用FINAL
修饰符 确保我们在查询时不会收到重复项。在这种情况下,这种去重方法往往比另一种方法更有效率——在插入时间以下使用我们希望按其对数据进行去重的列的 GROUP BY
。上面,我们假设所有列都应该用于识别唯一行。可以存在具有相同 EventTime
、UserID
、Domain
的事件,例如在访问具有不同 FetchTiming
值的域时发出的多个请求。
有关ReplacingMergeTree 的更多详细信息,请参见此处。
作为优化,我们仅通过
PRIMARY KEY
子句将排序键的子集加载到主键(保存在内存中)。默认情况下,会加载ORDER BY
中的所有列。在这种情况下,我们可能只想按EventTime
、Domain
和UserID
查询。
假设我们希望仅对与机器人无关的事件(由 Robotness=0
标识)训练我们的跳出预测模型。我们还需要 Domain
和 UserID
值。
我们可以使用 INSERT INTO SELECT
为我们的 predict_bounce_subset
表填充数据,从 web_events
读取行并应用过滤器,将我们的数据大小减少到 4200 万行。
INSERT INTO predict_bounce_subset SELECT
EventTime,
UserID,
domain(URL) AS Domain,
UserAgent,
RefererCategoryID,
URLCategoryID,
FetchTiming,
ClientIP,
IsNotBounce
FROM web_events
WHERE Robotness = 0 AND Domain != '' AND UserID != 0
0 rows in set. Elapsed: 7.886 sec. Processed 99.98 million rows, 12.62 GB (12.68 million rows/s., 1.60 GB/s.)
SELECT formatReadableQuantity(count()) AS count
FROM predict_bounce_subset FINAL
┌─count─────────┐
│ 42.89 million │
└───────────────┘
1 row in set. Elapsed: 0.003 sec.
请注意上面
FINAL
子句的使用,以确保我们只计算唯一行。
更新特征子集
虽然某些数据子集是静态的,但其他子集会随着源表中出现新事件而发生变化。因此,用户通常需要保持子集的最新状态。虽然这可以通过重建表的计划查询(例如,使用 dbt)来实现,但 ClickHouse(增量)物化视图可用于维护这些子集。
物化视图允许用户将计算成本从查询时间转移到插入时间。ClickHouse 物化视图只是一个触发器,当数据块插入到表中时(例如,web_events
表)会运行一个查询。然后将此查询的结果插入到第二个“目标”表中——在本例中为子集表。如果插入更多行,结果将再次发送到目标表。此合并结果等效于对所有原始数据运行查询。
下面显示了一个维护我们 predict_bounce_subset
的物化视图
CREATE MATERIALIZED VIEW predict_bounce_subset_mv TO predict_bounce_subset AS
SELECT
EventTime,
UserID,
domain(URL) AS Domain,
UserAgent,
RefererCategoryID,
URLCategoryID,
FetchTiming,
ClientIP,
IsNotBounce
FROM web_events
WHERE Robotness = 0 AND Domain != '' AND UserID != 0
这是一个简单的示例,我们建议用户了解有关物化视图功能的更多信息此处。在后续步骤中,我们假设我们将使用模型表 predict_bounce_subset
。
步骤 3 - 创建特征
对于模型训练,我们需要将我们计划的特征组合成一组具有标签 IsNotBounce
的特征向量,即
请注意我们如何从多个实体的特征中组合我们的特征向量。在许多情况下,我们正在寻找来自另一个实体的最接近时间点的特征值。
熟悉 ClickHouse 的经验丰富的 SQL 开发人员可能能够制定一个查询来生成上述特征向量。虽然这可能,但它无疑不仅会是一个复杂且计算量大的查询——尤其是在数十亿行的大型数据集上。
此外,如上所述,我们的特征向量是动态的,并且可能包含在不同训练迭代期间的任意数量的不同特征组合。此外,理想情况下,不同的工程师和数据科学家将使用相同的特征定义以及尽可能优化地编写查询。
鉴于上述需求,将我们的特征物化到表中,使用 INSERT INTO SELECT
将查询结果写入表中是有意义的。这实际上意味着我们只执行一次特征生成,将结果写入表中,以便在迭代时可以有效地读取和重用这些结果。这也允许我们声明一次特征查询,并以最佳方式共享结果与其他工程师和科学家。
目前,我们省略了用户如何声明、版本控制和共享其特征定义。毕竟,SQL 也是代码。这个问题有几种解决方案,其中一些解决了不同的挑战——请参见“构建或采用”。
特征表
特征表包含与实体相关联的特征实例,以及可选的时间戳。我们已经看到我们的用户为特征采用了两种表模型——要么为每个特征创建一个表,要么为每个实体创建一个表。每种方法都有其各自的优缺点,我们在下面进行探讨。但这两种方法都允许特征的可重用性,并且具有数据压缩良好的优势。
您无需为每个特征创建特征表。在某些情况下,可以按原样从模型表中使用它们,如果它们由列值表示。
特征表(每个特征)
使用每个特征一个表,表名表示特征本身。这种方法的主要优点是它可能简化了以后的连接。此外,这意味着用户可以使用物化视图来维护这些表,因为源数据会发生变化——请参阅“更新”。
这种方法的缺点是它不像“每个事件”那样具有良好的可扩展性。拥有数千个特征的用户将拥有数千个表。虽然可能可以管理,但创建物化视图来维护每个视图是不可行的。
例如,考虑负责存储域特征“每个域的唯一 IP 数量”的表。
CREATE TABLE number_unique_ips_per_hour
(
Domain String,
EventTime DateTime64,
Value Int64
)
ENGINE = MergeTree
ORDER BY (Domain, EventTime)
我们选择 ORDER BY
键来优化压缩和未来的读取。我们可以使用带有聚合的简单 INSERT INTO SELECT
为特征表填充数据以计算我们的特征。
INSERT INTO number_unique_ips_per_hour SELECT
Domain,
toStartOfHour(EventTime) AS EventTime,
uniqExact(ClientIP) AS Value
FROM predict_bounce_subset FINAL
GROUP BY
Domain,
EventTime
0 rows in set. Elapsed: 0.777 sec. Processed 43.80 million rows, 1.49 GB (56.39 million rows/s., 1.92 GB/s.)
SELECT count()
FROM number_unique_ips_per_hour
┌─count()─┐
│ 613382 │
└─────────┘
我们选择使用 Domain
和 Value
作为实体和特征值的列名。这使得我们未来的查询更简单一些。用户可能希望对所有特征使用通用的表结构,以及 Entity
和 Value
列。
CREATE TABLE <feature_name>
(
Entity Variant(UInt64, Int64, String),
EventTime DateTime64,
Value Variant(UInt64, Int64, Float64)
)
ENGINE = MergeTree
ORDER BY (Entity, EventTime)
这需要使用 Variant
类型。此类型允许列支持其他数据类型的联合,例如 Variant(String, Float64, Int64)
表示此类型的每一行都具有 String
、Float64
、Int64
或其中任何一个类型的值(NULL 值)。
此功能目前处于实验阶段,对于上述方法是可选的,但对于下面描述的“每个实体”方法是必需的。
特征表(每个实体)
在每个实体的方法中,我们对与同一个实体关联的所有特征使用相同的表。在这种情况下,我们使用下面的 FeatureId
列来表示特征的名称。
这种方法的优点在于其可扩展性。单个表可以轻松容纳数千个特征。
主要缺点是目前这种方法不支持物化视图。
请注意,对于域特征,我们被迫在下面的示例中使用新的 Variant
类型。虽然此表支持 UInt64
、Int64
和 Float64
特征值,但它可能支持更多。
-- domain Features
SET allow_experimental_variant_type=1
CREATE TABLE domain_features
(
Domain String,
FeatureId String,
EventTime DateTime,
Value Variant(UInt64, Int64, Float64)
)
ENGINE = MergeTree
ORDER BY (FeatureId, Domain, EventTime)
此表的 ORDER BY
针对按特定 FeatureId
和 Domain
过滤进行了优化 - 这是我们稍后将看到的典型访问模式。
要将我们的“每个域的唯一 IP 数”特征填充到此表中,我们需要一个类似于之前使用的查询。
INSERT INTO domain_features SELECT
Domain,
'number_unique_ips_per_hour' AS FeatureId,
toStartOfHour(EventTime) AS EventTime,
uniqExact(ClientIP) AS Value
FROM predict_bounce_subset FINAL
GROUP BY
Domain,
EventTime
0 rows in set. Elapsed: 0.573 sec. Processed 43.80 million rows, 1.49 GB (76.40 million rows/s., 2.60 GB/s.)
SELECT count()
FROM domain_features
┌─count()─┐
│ 613382 │
└─────────┘
ClickHouse 的新手可能会想知道“每个特征”方法是否比“每个实体”方法允许更快地检索特征。由于使用了排序键和 ClickHouse 的稀疏索引,因此这里应该没有区别。
更新特征表
虽然某些特征是静态的,但我们通常希望确保它们随着源数据或子集的变化而更新。如“更新子集”中所述,我们可以使用物化视图来实现这一点。
对于特征表,我们的物化视图通常更复杂,因为结果通常是聚合的结果,而不仅仅是简单的转换和过滤。因此,物化视图执行的查询将生成部分聚合状态。这些部分聚合状态表示聚合的中间状态,目标特征表可以将它们合并在一起。这要求我们的特征表使用带有适当 AggregateFunction
类型的 AggregatingMergeTree。
我们下面提供了一个“每个特征”表的示例 number_unique_ips_per_hour
。
CREATE TABLE number_unique_ips_per_hour
(
Entity String,
EventTime DateTime64,
-- the AggregateFunction merges states produced by the view
Value AggregateFunction(uniqExact, UInt32)
)
ENGINE = AggregatingMergeTree
ORDER BY (Entity, EventTime)
CREATE MATERIALIZED VIEW number_unique_ips_per_hour_mv TO number_unique_ips_per_hour AS
SELECT
domain(URL) AS Entity,
toStartOfHour(EventTime) AS EventTime,
-- our view uses the -State suffix to generate intermediate states
uniqExactState(ClientIP) AS Value
FROM predict_bounce_subset
GROUP BY
Entity,
EventTime
当新的行插入到 predict_bounce_subset
表中时,我们的 number_unique_ips_per_hour
特征表将更新。
查询 number_unique_ips_per_hour
时,必须使用 FINAL
子句或 GROUP BY Entity, EventTime
来确保聚合状态与聚合函数的 -Merge
变体(在本例中为 uniqExact
)一起合并。如下所示,这会更改用于获取实体的查询 - 请参阅 此处 以获取更多详细信息。
-- Select entities for a single domain
SELECT
EventTime,
Entity,
uniqExactMerge(Value) AS Value
FROM number_unique_ips_per_hour
WHERE Entity = 'smeshariki.ru'
GROUP BY
Entity,
EventTime
ORDER BY EventTime DESC LIMIT 5
┌───────────────EventTime─┬─Entity────────┬─Value─┐
│ 2013-07-31 23:00:00.000 │ smeshariki.ru │ 3810 │
│ 2013-07-31 22:00:00.000 │ smeshariki.ru │ 3895 │
│ 2013-07-31 21:00:00.000 │ smeshariki.ru │ 4053 │
│ 2013-07-31 20:00:00.000 │ smeshariki.ru │ 3893 │
│ 2013-07-31 19:00:00.000 │ smeshariki.ru │ 3926 │
└─────────────────────────┴───────────────┴───────┘
5 rows in set. Elapsed: 0.491 sec. Processed 8.19 thousand rows, 1.28 MB (16.67 thousand rows/s., 2.61 MB/s.)
Peak memory usage: 235.93 MiB.
虽然稍微复杂一些,但中间聚合状态允许我们使用上述表为不同的时间生成特征。例如,我们可以使用 此查询 从上述表计算每个域每天的唯一 IP 数,这是我们无法使用原始特征表做到的。
用户可能会注意到我们的子集表 predict_bounce_subset
已经使用物化视图更新,而该物化视图又附加了物化视图。如下所示,这意味着我们的物化视图实际上是“链式”的。有关链式物化视图的更多示例,请参阅 此处。
更新每个实体的特征表
上述“每个特征”建模方法意味着每个特征表一个物化视图(一个物化视图只能将结果发送到一个表)。在更大的用例中,这成为扩展的约束 - 物化视图会产生插入时间开销,我们不建议在单个表上使用超过 10 个。
“每个实体”特征表模型减少了特征表的数量。为了封装不同特征的多个查询,将需要我们使用带有 AggregateFunction
类型的 Variant 类型。目前不支持此功能。
作为替代方案,用户可以使用可刷新物化视图(目前处于实验阶段)。与 ClickHouse 的增量物化视图不同,这些视图会定期对整个数据集执行视图查询,并将结果存储在目标表中,目标表的内容会以原子方式替换。
用户可以使用此功能按计划定期更新特征表。我们下面提供了一个示例,其中“每个域的唯一 IP 数”每 10 分钟更新一次。
--enable experimental feature
SET allow_experimental_refreshable_materialized_view = 1
CREATE MATERIALIZED VIEW domain_features_mv REFRESH EVERY 10 MINUTES TO domain_features AS
SELECT
Domain,
'number_unique_ips_per_hour' AS FeatureId,
toStartOfHour(EventTime) AS EventTime,
uniqExact(ClientIP) AS Value
FROM predict_bounce_subset
GROUP BY
Domain,
EventTime
有关可刷新物化视图的更多详细信息,请参阅 此处。
步骤 4 - 生成模型数据
创建特征后,我们可以开始将这些特征组合到我们的模型数据中 - 这将构成我们的训练集、验证集和测试集的基础。
此表中的行将构成我们模型的基础,每一行对应一个特征向量。要生成它,我们需要连接我们的特征。其中一些特征将来自特征表,另一些则直接来自我们的“特征子集”表,即上面提到的 predict_bounce_subset
。
由于特征子集表包含我们的事件以及标签、时间戳和实体键,因此将其作为连接的基本表(左侧)是有意义的。这也有可能是最大的表,使其成为 连接左侧的明智选择。
特征将根据两个条件连接到此表
- 每个特征最接近特征子集表(
predict_bounce_subset
)中的行的的时间戳(EventTime
)。 - 特征表的对应实体列,例如
UserID
或Domain
。
要基于此等值连接和最接近的时间进行连接,需要使用 ASOF JOIN
。
我们将使用 INSERT INTO
将此连接的结果发送到一个表,我们将其称为“模型表”。这将用于生成未来的训练集、验证集和测试集。
在探索连接之前,我们声明我们的“模型表” predict_bounce
。
CREATE TABLE predict_bounce_model (
Row UInt64,
EventTime DateTime64,
UserID UInt64,
Domain String,
UserAgent UInt8,
RefererCategoryID UInt16,
URLCategoryID UInt16,
DomainsVisitedPerHour UInt32 COMMENT 'Number of domains visited in last hour by the user',
UniqueIPsPerHour UInt32 COMMENT 'Number of unique ips visiting the domain per hour',
AverageRequestTime Float32 COMMENT 'Average request time for the domain per hour',
IsNotBounce UInt8,
) ENGINE = MergeTree
ORDER BY (Row, EventTime)
此处的 Row
列将包含数据集中每一行的唯一条目。我们将此作为我们 ORDER BY
的一部分,并在稍后利用它来高效地生成训练集和测试集。
连接和对齐特征
要基于实体键和时间戳连接和对齐特征,我们可以使用 ASOF。我们如何构建连接取决于几个因素,即我们是否使用“每个特征”或“每个实体”特征表。
假设我们使用的是“每个特征”表,并且有以下可用
number_unique_ips_per_hour
- 包含每小时访问**每个域**的唯一 IP 数。以上示例。domains_visited_per_hour
- **每个用户**在过去一小时访问的域数。使用 此处 的查询生成。average_request_time
- **每个域**每小时的平均请求时间。使用 此处 的查询生成。
我们的连接在这里非常简单
INSERT INTO predict_bounce_model SELECT
rand() AS Row,
mt.EventTime AS EventTime,
mt.UserID AS UserID,
mt.Domain AS Domain,
mt.UserAgent,
mt.RefererCategoryID,
mt.URLCategoryID,
dv.Value AS DomainsVisitedPerHour,
uips.Value AS UniqueIPsPerHour,
art.Value AS AverageRequestTime,
mt.IsNotBounce
FROM predict_bounce_subset AS mt FINAL
ASOF JOIN domains_visited_per_hour AS dv ON (mt.UserID = dv.UserID) AND (mt.EventTime >= dv.EventTime)
ASOF JOIN number_unique_ips_per_hour AS uips ON (mt.Domain = uips.Domain) AND (mt.EventTime >= uips.EventTime)
ASOF JOIN average_request_time AS art ON (mt.Domain = art.Domain) AND (mt.EventTime >= art.EventTime)
0 rows in set. Elapsed: 13.440 sec. Processed 89.38 million rows, 3.10 GB (6.65 million rows/s., 230.36 MB/s.)
Peak memory usage: 2.94 GiB.
SELECT * FROM predict_bounce_model LIMIT 1 FORMAT Vertical
Row 1:
──────
Row: 57
EventTime: 2013-07-10 06:11:39.000
UserID: 1993141920794806602
Domain: smeshariki.ru
UserAgent: 7
RefererCategoryID: 16000
URLCategoryID: 9911
DomainsVisitedPerHour: 1
UniqueIPsPerHour: 16479
AverageRequestTime: 182.69382
IsNotBounce: 0
虽然相似,但如果使用“每个实体”特征表,此连接会变得稍微复杂(且成本更高)。假设我们有特征表 domain_features
和 user_features
,并使用 这些查询 进行填充。
INSERT INTO predict_bounce_model SELECT
rand() AS Row,
mt.EventTime AS EventTime,
mt.UserID AS UserID,
mt.Domain AS Domain,
mt.UserAgent,
mt.RefererCategoryID,
mt.URLCategoryID,
DomainsVisitedPerHour,
UniqueIPsPerHour,
AverageRequestTime,
mt.IsNotBounce
FROM predict_bounce_subset AS mt FINAL
ASOF LEFT JOIN (
SELECT Domain, EventTime, Value.UInt64 AS UniqueIPsPerHour
FROM domain_features
WHERE FeatureId = 'number_unique_ips_per_hour'
) AS df ON (mt.Domain = df.Domain) AND (mt.EventTime >= df.EventTime)
ASOF LEFT JOIN (
SELECT Domain, EventTime, Value.Float64 AS AverageRequestTime
FROM domain_features
WHERE FeatureId = 'average_request_time'
) AS art ON (mt.Domain = art.Domain) AND (mt.EventTime >= art.EventTime)
ASOF LEFT JOIN (
SELECT UserID, EventTime, Value.UInt64 AS DomainsVisitedPerHour
FROM user_features
WHERE FeatureId = 'domains_visited_per_hour'
) AS dv ON (mt.UserID = dv.UserID) AND (mt.EventTime >= dv.EventTime)
0 rows in set. Elapsed: 12.528 sec. Processed 58.65 million rows, 3.08 GB (4.68 million rows/s., 245.66 MB/s.)
Peak memory usage: 3.16 GiB.
上述连接使用哈希连接。24.7 添加了对
ASOF JOIN
的full_sorting_merge
算法的支持。此算法可以利用被连接表的排序顺序,从而避免连接之前的排序阶段。可以在任何排序和合并操作之前,根据彼此的连接键过滤连接的表,以最大程度地减少处理的数据量。这将使上述查询保持快速,同时消耗更少的资源。
步骤 5 - 生成测试集和训练集
生成模型数据后,我们可以根据需要生成训练集、验证集和测试集。每个集合将包含不同百分比的数据,例如 80%、10%、10%。在查询执行过程中,这些需要是一致的结果 - 我们不能允许测试数据渗入训练数据,反之亦然。此外,结果集需要稳定 - 一些算法可能会受到数据交付顺序的影响,从而产生不同的结果。
为了实现结果的一致性和稳定性,同时确保查询能够快速返回行,我们可以利用 Row
和 EventTime
列。
假设我们希望按照 EventTime
的顺序获得 80% 的训练数据。这可以通过对 predict_bounce_model
表执行一个简单的查询来获得,该查询对 Row
列执行 mod 100
操作
SELECT * EXCEPT Row
FROM predict_bounce_model
WHERE (Row % 100) < 80
ORDER BY EventTime, Row ASC
我们可以通过一些简单的查询确认这些行提供稳定的结果
SELECT
groupBitXor(sub) AS hash,
count() AS count
FROM
(
SELECT sipHash64(concat(*)) AS sub
FROM predict_bounce_model
WHERE (Row % 100) < 80
ORDER BY
EventTime ASC,
Row ASC
)
┌─────────────────hash─┬────count─┐
│ 14452214628073740040 │ 34315802 │
└──────────────────────┴──────────┘
1 row in set. Elapsed: 8.346 sec. Processed 42.89 million rows, 2.74 GB (5.14 million rows/s., 328.29 MB/s.)
Peak memory usage: 10.29 GiB.
--repeat query, omitted for brevity
┌─────────────────hash─┬────count─┐
│ 14452214628073740040 │ 34315802 │
└──────────────────────┴──────────┘
类似地,可以使用以下方法获得训练集和验证集,每个集合构成 10% 的数据
-- validation
SELECT * EXCEPT Row
FROM predict_bounce_model
WHERE (Row % 100) BETWEEN 80 AND 89
ORDER BY EventTime, Row ASC
-- test
SELECT * EXCEPT Row
FROM predict_bounce_model
WHERE (Row % 100) BETWEEN 90 AND 100
ORDER BY EventTime, Row ASC
用户在此处使用的确切方法可能取决于他们希望如何分离训练集和测试集。对于我们的模型,最好使用某个固定时间点之前的所有数据进行训练,而测试集使用此时间点之后的所有行。上面,我们的训练样本数据和测试样本数据包含来自整个时期的数据。这可能会导致两个集合之间出现一些信息泄漏,例如来自同一页面访问的事件。为了示例的目的,我们忽略了这一点。我们建议根据您打算如何使用行来调整模型表上的排序键 - 请参阅 此处 以获取有关选择排序键的建议。
构建与采用
所述过程涉及多个查询和复杂的过程。使用 ClickHouse 构建离线特征存储时,用户通常采用以下方法之一
-
构建自己的特征存储:这是最先进和最复杂的方法,允许用户根据其特定数据优化流程和模式。它通常被特征存储对其业务至关重要且涉及大量数据的公司采用,例如广告技术领域。
-
Dbt + Airflow:Dbt 广泛用于管理数据转换和处理复杂的查询以及数据建模。当与 Airflow(一个强大的工作流编排工具)结合使用时,用户可以自动化和计划上述过程。这种方法提供了模块化且可维护的工作流,平衡了自定义解决方案与现有工具,以处理大量数据和复杂查询,而无需完全自定义构建的特征存储或现有解决方案强加的工作流。
-
采用具有 ClickHouse 的特征存储:Featureform 等特征存储将 ClickHouse 集成到数据存储、转换和服务离线特征中。这些存储管理流程、版本化特征并执行治理和合规性规则,从而降低数据科学家的数据工程复杂性。采用这些技术取决于抽象和工作流与用例的契合程度。
来源:Featureform 特征存储
推理时的 ClickHouse
这篇博文讨论了使用 ClickHouse 作为离线存储来生成模型训练特征。模型训练完成后,可以部署模型进行预测,这需要实时数据,例如用户 ID 和域。预计算的特征(例如“过去一小时访问的域”)对于预测是必要的,但在推理期间计算成本过高。需要根据最新数据版本提供这些特征,特别是对于实时预测。
ClickHouse 作为实时分析数据库,由于其日志结构合并树,可以处理高并发查询,具有低延迟和高写入工作负载。这使其适用于在线存储中的特征服务。来自离线存储的特征可以使用现有功能物化到同一 ClickHouse 集群或不同实例中的新表中。有关此过程的更多详细信息将在以后的文章中介绍。
结论
这篇博文概述了使用 ClickHouse 作为离线特征存储和转换引擎的常见数据建模方法。虽然不全面,但这些方法提供了一个起点,并与 Featureform 等特征存储中使用的技术保持一致,Featureform 集成了 ClickHouse。我们欢迎改进的贡献和想法。如果您正在使用 ClickHouse 作为特征存储,请告诉我们!
了解如何在 ClickHouse 中建模机器学习数据,以加速您的数据管道并在可能数十亿行数据上快速构建特征。