博客 / 工程

使用 ClickHouse 训练机器学习模型

author avatar
Dale McDiarmid
2024 年 4 月 3 日 - 33 分钟阅读

这篇文章演示了如何通过特征存储使用 ClickHouse 中的数据来训练模型。作为其中的一部分,我们还展示了数据科学家和工程师在探索数据集和准备特征时需要执行的常见任务,如何使用 ClickHouse 在可能达到 PB 级的数据集上通过 SQL 在几秒钟内完成。为了帮助创建特征,我们使用了开源特征存储 Featureform,ClickHouse 最近已集成到其中。

我们首先快速回顾一下用户可能想要使用 ClickHouse 训练机器学习模型的原因,以及特征存储如何帮助实现这一过程。熟悉这些概念的用户可以直接跳到下面的示例

这篇文章将是系列文章中的第一篇,我们将在其中使用 Featureform 训练模型,逐步构建一个更复杂的机器学习平台,其中包含一套将我们的模型投入生产的工具。在这篇初始文章中,我们演示了使用特征存储来帮助使用 ClickHouse 中的数据训练 Logistic 回归和基于决策树的分类器的构建块。

特征 ≈ 列

我们在整篇文章中都使用了“特征”一词。作为提醒,特征是实体的某些属性,这些属性对于机器学习 (ML) 模型具有预测能力。从这个意义上讲,实体是特征的集合以及表示现实世界概念的类或标签。如果质量足够好,并且如果存在这种关系,则特征应有助于预测实体的类别。例如,银行交易可以被视为实体。这可能包含诸如交易金额和所涉及的购买者/卖方等特征,类别描述了交易是否为欺诈。

feature.png

在结构化数据的情况下,我们可以将特征视为列 - 来自表或结果集。我们在这里互换使用这些术语,但值得记住的是,特征通常需要一些先前的 数据工程 步骤和数据转换逻辑,然后才能使用。

特征存储

我们最近发布了一篇博文,描述了不同类型的特征存储、您可能需要特征存储的原因及其主要组件。作为其中的一部分,我们探讨了如何使用这些特征存储来训练机器学习模型。下面,我们为那些不熟悉这个概念的人做一个简短的回顾。

总而言之,特征存储是用于存储、处理和访问模型训练、推理和评估常用的特征的中心枢纽。这种抽象提供了便利功能,例如版本控制、访问管理以及自动将特征定义转换为 SQL 语句。

这里的主要价值在于提高特征的协作性和可重用性,从而缩短模型迭代时间。通过从数据科学家那里抽象出数据工程的复杂性,并且仅通过 API 公开版本化的高质量特征,可以提高模型的可靠性和质量。

feature_store.png

特征存储由许多关键组件组成。在这篇博文中,我们重点关注两个:转换引擎离线存储

在训练任何模型之前,必须首先分析数据以了解其特征、分布和关系。这种评估和理解过程变成一个迭代过程,产生一系列即席查询,这些查询通常会聚合和计算整个数据集的指标。执行此任务的数据科学家需要查询响应速度,以便快速迭代(以及其他因素,如成本效益和准确性)。原始数据很少是干净且格式良好的,因此必须在用于训练模型之前进行转换。所有这些任务都需要一个转换和查询引擎,该引擎最好可以扩展且不受内存限制。

离线存储保存转换产生的特征,在训练期间将它们提供给模型。这些特征通常按实体分组,并与标签(目标预测)关联。通常,模型需要选择性地消耗这些特征,无论是迭代地还是通过聚合,可能多次且以随机顺序。模型通常需要多个特征,需要将特征分组为“特征组” - 通常按实体 ID 和时间维度分组。这要求离线存储能够为特定时间点提供正确版本的特征和标签。这种“时间点正确性”对于模型通常至关重要,模型需要以增量方式进行训练。

ClickHouse 作为转换引擎和离线存储

featurestore_clickhouse.png

虽然 ClickHouse 是机器学习模型的天然数据源(例如,点击流量),但它也非常适合担任转换引擎和离线存储的角色。与其他方法相比,这具有几个明显的优势。

作为一个针对聚合进行优化并能够扩展到 PB 级数据集的实时数据仓库,ClickHouse 允许用户使用熟悉的 SQL 语言执行转换。数据可以存储在 ClickHouse 中,任何探索性和转换工作都在源头处理,而无需将数据从现有数据库流式传输到 Spark 等计算框架中。

ClickHouse 预构建的统计和聚合函数使这种 SQL 易于编写和维护。从根本上讲,这种架构受益于数据局部性,提供无与伦比的性能,并通过聚合查询将数十亿行数据提炼为数千行。

这些转换的结果也可以通过 INSERT INTO SELECT 语句持久保存在 ClickHouse 中,或者简单地作为视图公开。由于转换通常按实体 ID 分组并返回许多列作为结果,ClickHouse 的模式推断可以自动检测这些结果所需的类型,并生成适当的表模式来存储它们。

然后,这些结果表和视图可以构成离线存储的基础,为模型训练提供数据。

特征被有效地提取并表示为返回表格数据的 SQL 查询。

总而言之,通过使用 ClickHouse 作为转换引擎和离线存储,用户可以受益于数据局部性以及 ClickHouse 在集群中并行化和执行计算密集型任务的能力。这允许离线存储扩展到 PB 级,使特征存储充当轻量级协调层和 API,通过该层访问和共享数据。

这些图表突出了特征存储的其他关键组件,特别是在线存储。一旦模型经过训练,就会部署用于实时预测,这既需要像用户 ID 这样的即时数据,也需要预先计算的特征,例如历史购买记录,这些特征的生成成本太高,无法即时生成。这些特征存储在在线存储中以供快速访问,对于欺诈检测等延迟敏感型任务至关重要。它们从离线存储更新,以确保它们反映最新的数据。有关更多信息,请参阅我们之前的博文。虽然 ClickHouse 可以用作在线存储,但在本文中,我们重点关注训练示例,从而重点关注转换引擎和离线存储的角色。

Featureform

我们之前的文章探讨了不同类型的特征存储:物理存储字面存储虚拟存储。虚拟存储最适合 ClickHouse,为其提供机会用作离线存储和转换引擎。在这种架构中,特征存储不负责管理转换以及特征的持久性和版本控制,而仅充当协调器。

为了实现我们由 ClickHouse 驱动的虚拟特征存储的愿景,我们将 Featureform 确定为理想的集成解决方案。Featureform 除了是开源的,使我们能够轻松贡献之外,还为离线存储、在线存储和向量数据库提供了成熟的(按设计)集成点。

Featureform 和 ClickHouse 入门

首先,我们通过简单的 pip 安装来安装 featureform

pip install featureform

虽然 Featureform 采用模块化架构,可以部署在 Kubernetes 中,但对于我们的初始示例,我们将使用出色的入门体验。在这种模式下,架构的所有组件都部署为单个 docker 容器。更棒的是,一个简单的标志确保 ClickHouse 与预加载的测试数据集一起部署。

(.venv) clickhouse@PY test_project % featureform deploy docker --quickstart --include_clickhouse
DeprecationWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html
Deploying Featureform on Docker
Starting Docker deployment on Darwin 23.3.0
Checking if featureform container exists...
    Container featureform not found. Creating new container...
    'featureform' container started
Checking if quickstart-postgres container exists...
    Container quickstart-postgres not found. Creating new container...
    'quickstart-postgres' container started
Checking if quickstart-redis container exists...
    Container quickstart-redis not found. Creating new container...
    'quickstart-redis' container started
Checking if quickstart-clickhouse container exists...
    Container quickstart-clickhouse not found. Creating new container...
    'quickstart-clickhouse' container started
…

Featureform is now running!
To access the dashboard, visit https://127.0.0.1:80
Run jupyter notebook in the quickstart directory to get started.

这为我们提供了 ClickHouse、FeatureForm、Redis 和 Postgres 容器。后两者我们现在可以忽略。

测试数据集

此时,我们可以通过 ClickHouse 客户端连接到我们的 ClickHouse 容器,并确认测试数据集已加载。

(.venv) dalemcdiarmid@PY test_project % clickhouse client
ClickHouse client version 24.1.1.1286 (official build).
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 24.2.1.

55733a345323 :) SHOW TABLES IN fraud

SHOW TABLES FROM fraud

┌─name───────┐
│ creditcard │
└────────────┘

1 row in set. Elapsed: 0.005 sec.

对于我们的测试数据集,我们使用了在 Kaggle 上分发的流行欺诈数据集,该数据集包含超过 550,000 个匿名交易,目标是开发欺诈检测算法。

灵感来源

为了将这篇博文重点放在使用 ClickHouse 和 Featureform 的机制上,我们寻找了先前成功为此数据集训练模型的努力。

认识到数据科学家通常更喜欢在 Notebook 环境中工作,Kaggle 上有许多 Notebook 尝试将模型拟合到此数据集,对交易是否为欺诈进行分类。被“信用卡欺诈检测,准确率 100%”这个有点不切实际的标题所吸引,但代码的布局和有条不紊的性质给我们留下了深刻的印象,我们以此为基础作为我们的示例。向 Anmol Arora 致敬,感谢他们的贡献。

我们承认,这些 Notebook 中使用的模型(Logistic 回归/决策树)并不代表分类的“最先进”算法。但是,这篇博文的目的是展示如何使用 ClickHouse 和 Featureform 训练模型,而不是代表预测欺诈的“最佳”或“最新”技术。随时调整任何示例以使用更现代的技术!附注:我也不是 ML 专家。

连接 Featureform 和 ClickHouse

请记住,Featureform 将提供特征存储接口,通过该接口我们可以访问 ClickHouse 中的数据,从而使我们能够定义可重用且版本化的特征,我们可以将其传递给模型进行训练。这需要我们在 Featureform 中注册我们的 ClickHouse 服务器。这只需要几行代码,我们首先定义一个 Featureform 客户端(对于这个简单的测试示例,是本地且不安全的),然后再注册我们的本地 ClickHouse 实例。

请注意,我们使用环境变量 FEATUREFORM_HOST 来指定 Featureform 实例的位置。这些连接详细信息也可以直接传递给客户端。

# install any dependencies into our notebook
!pip install featureform==1.12.6
!pip install river -U
!pip install scikit-learn -U
!pip install seaborn
!pip install googleapis-common-protos
!pip install matplotlib
!pip install matplotlib-inline
!pip install ipywidgets

%env FEATUREFORM_HOST=localhost:7878
%matplotlib inline
from featureform import Client
import featureform as ff
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import SGDClassifier, LogisticRegression
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score
import numpy as np

# Featureform client. Local instance and insecure
client = Client(insecure=True)
# Register our local container with Featureform
clickhouse = ff.register_clickhouse(
   name="clickhouse",
   description="A ClickHouse deployment for example",
   host="host.docker.internal",
   port=9000,
   user="default",
   password="",
   database="fraud"
)
client.apply(verbose=True)

Featureform 中的 client.apply() 方法创建定义的资源。否则,这些资源将被延迟评估,并根据需要(即在下游使用时)创建。注册此数据集会导致它在 ClickHouse 中使用 INSERT INTO SELECT 在内部复制,从而有效地创建数据的不可变副本以供 Featureform 版本控制的分析。有关更多详细信息,请参见下文

注册表

除了充当转换引擎和离线存储之外,ClickHouse 首先是 ML Ops 工作流程中的数据源。在我们从欺诈表中转换和提取特征之前,我们首先需要在 Featureform 中注册此表。这会在 Featureform 中创建必要的元数据,以便我们可以对数据进行版本控制和跟踪。此代码非常少。

creditcard = clickhouse.register_table(
   name="creditcard",
   table="creditcard",
)
data = client.dataframe(creditcard, limit=100)
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 31 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   id      100 non-null    int64  
 1   V1      100 non-null    float64
 2   V2      100 non-null    float64
 3   V3      100 non-null    float64
 4   V4      100 non-null    float64
 5   V5      100 non-null    float64
 6   V6      100 non-null    float64
 7   V7      100 non-null    float64
 8   V8      100 non-null    float64
 9   V9      100 non-null    float64
 10  V10     100 non-null    float64
 11  V11     100 non-null    float64
 12  V12     100 non-null    float64
 13  V13     100 non-null    float64
 14  V14     100 non-null    float64
 15  V15     100 non-null    float64
 16  V16     100 non-null    float64
 17  V17     100 non-null    float64
 18  V18     100 non-null    float64
 19  V19     100 non-null    float64
 20  V20     100 non-null    float64
 21  V21     100 non-null    float64
 22  V22     100 non-null    float64
 23  V23     100 non-null    float64
 24  V24     100 non-null    float64
 25  V25     100 non-null    float64
 26  V26     100 non-null    float64
 27  V27     100 non-null    float64
 28  V28     100 non-null    float64
 29  Amount  100 non-null    float64
 30  Class   100 non-null    int64  
dtypes: float64(29), int64(2)
memory usage: 24.3 KB

我们的第二行代码在此处提取基础数据的 100 行作为数据帧 - 此 LIMIT 被下推到 ClickHouse,为我们提供了最后一次机会来查看我们正在使用的列。

这与 数据集的描述 一致,其中除了交易 Amount 之外,列 V1V28 还提供了匿名特征。我们的 Class 表示交易是否为欺诈,表示为值为 1 和 0 的布尔值。

转换数据

在 Featureform 中注册我们的数据集后,我们可以开始使用 ClickHouse 作为转换引擎。

为了有效训练任何分类器,我们应该了解类的分布。

虽然我们可以在之前的数据帧上计算此值,但这会将我们的分析限制为它包含的 100 行。

data['Class'].value_counts().plot.pie(autopct='%3.1f%%',shadow=True, legend= True,startangle =45)
plt.title('Distribution of Class',size=14)
plt.show()

理想情况下,我们希望为表中的所有行计算此值。对于我们的数据集,我们可能只需删除限制即可将 55 万行的整个数据集加载到内存中。但是,从网络或内存的角度来看,这对于更大的数据集是不可行的。为了将这项工作下推到 ClickHouse,我们可以定义一个转换。

@clickhouse.sql_transformation(inputs=[creditcard])
def classCounts(creditcard):
    return "SELECT Class,count() c FROM {{creditcard}} GROUP BY Class"

client.dataframe(classCounts).pivot_table(index='Class', columns=None, values='c', aggfunc='sum').plot.pie(
    explode=[0.1, 0], autopct='%3.1f%%', shadow=True, legend=True, startangle=45, subplots=True)

我们在这里使用简单的聚合来计算每个类的计数。请注意,此处使用了模板来查询。变量 {{creditcard}} 将替换为信用卡数据集的表名。这将不是原始表 - 正如我们之前提到的,Featureform 在 ClickHouse 中创建数据的不可变副本,这将是一个名称特定于版本的表。创建后,我们可以将转换结果转换为数据帧并绘制结果。

class_split.png

此数据集是平衡的(50% 欺诈和 50% 非欺诈)。虽然这大大简化了未来的训练,但它让我们怀疑此数据集是否是人工的。这也可能极大地促成了原始 Notebook 中宣传的相当高的准确率得分。

我们的主要数据充当原材料,然后将其转换为包含用于服务和训练机器学习模型的特征和标签集的数据集。这些转换可以直接应用于主要数据集,也可以按顺序应用于其他先前转换的数据集。必须了解的是,Featureform 本身不执行数据转换。相反,它协调 ClickHouse 执行转换。转换的结果也将作为版本化表存储在 ClickHouse 中,以便以后快速检索 - 此过程称为物化。

同样,当首次探索数据时,用户通常使用数据帧的 describe 方法来了解列的属性。

data.describe()

这要求我们计算数据集中每列的多个统计信息。不幸的是,此操作的 SQL 查询非常复杂,因为它利用了动态列选择

import re
@clickhouse.sql_transformation(inputs=[creditcard])
def describe_creditcard(creditcard):
    return "SELECT * APPLY count, * APPLY avg, * APPLY std, * APPLY x -> (quantiles(0.25)(x)[1]), * APPLY x -> (quantiles(0.5)(x)[1]), * APPLY x -> (quantiles(0.75)(x)[1]), * APPLY min, * APPLY max FROM {{creditcard}}"

df = client.dataframe(describe_creditcard, limit=1)
df_melted = df.melt()
df_melted['Var'] = df_melted['variable'].apply(lambda x: re.search(r"\((\w*)\)", x).group(1))
df_melted['Statistic'] = df_melted['variable'].apply(
    lambda x: re.search(r"(.*)\(\w*\)", x).group(1).replace('arrayElement(', ''))
df_melted.pivot(index='Statistic', columns='Var', values='value').reset_index()

dataframe_ch_describe.png

在 Featureform 集成的更高版本中,我们希望将这种复杂性从用户那里抽象出来。理想情况下,describe() 应该为用户制定所需的 ClickHouse 查询,并为用户透明地执行此查询。敬请期待。

用户可以在 Featureform 示例存储库中找到我们的最终 Notebook,其中包含许多旨在执行数据分析的转换。这些转换再现了原始 Notebook 作者所做的工作,该作者直接使用了 pandas。我们在下面重点介绍一些更有趣的转换。

相关矩阵可以帮助我们了解数据集中变量/列之间的任何线性关系。在 pandas 中,这需要在数据帧上调用 corr 方法。此特定操作在非常大的数据集上计算量非常大。在 SQL 中,这需要使用 ClickHouse corrMatrix 函数。以下转换透视结果,使其与流行的 Seaborn 可视化库预期的格式一致。

@clickhouse.sql_transformation(inputs=[creditcard])
def credit_correlation_matrix(creditcard):
    return """WITH matrix AS
    (
        SELECT arrayJoin(arrayMap(row -> arrayMap(col -> round(col, 3), row), 
               corrMatrix(id, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13, V14, 
               V15, V16, V17, V18, V19, V20, V21, V22, V23, V24, V25, V26, V27, V28, Amount, Class))) AS matrix
        FROM {{creditcard}}
    )
    SELECT
        matrix[1] AS id, matrix[2] AS V1, matrix[3] AS V2,  matrix[4] AS V3, matrix[5] AS V4, 
        matrix[6] AS V5,  matrix[7] AS V6, matrix[8] AS V7, matrix[9] AS V8, matrix[10] AS V9, 
        matrix[11] AS V10, matrix[12] AS V11, matrix[13] AS V12, matrix[14] AS V13, 
        matrix[15] AS V14, matrix[16] AS V15, matrix[17] AS V16, matrix[18] AS V17,
        matrix[19] AS V18, matrix[20] AS V19, matrix[21] AS V20,  matrix[22] AS V21, 
        matrix[23] AS V22, matrix[24] AS V23, matrix[25] AS V24, matrix[26] AS V25, 
        matrix[27] AS V26, matrix[28] AS V27, matrix[29] AS V28, matrix[30] AS Amount,  
        matrix[31] AS Class
    FROM matrix"""

client.dataframe(credit_correlation_matrix)
paper = plt.figure(figsize=[20, 10])
sns.heatmap(client.dataframe(credit_correlation_matrix, limit=100), cmap='crest', annot=True)
plt.show()

corr_matrix.png

由此,我们可以得出与原始 Notebook 相同的观察结果,即我们的一些特征/列高度相关。

  • V17 和 V18
  • V16 和 V17
  • V16 和 V18
  • V14 与 V4 呈负相关
  • V12 也与 V10 和 V11 呈负相关。
  • V11 与 V10 呈负相关,与 V4 呈正相关。
  • V3 与 V10 和 V12 呈正相关。
  • V9 和 V10 也呈正相关。
  • 一些特征显示与目标类变量强相关。

Logistic 回归假设自变量之间不存在完全多重共线性。因此,我们可能希望在训练之前删除一些强相关的特征及其关联的列。

通常,我们可以使用此处的结果来识别冗余特征。两个特征之间的高度相关性表明它们可能传达相似的信息,并且可以潜在地删除一个而不会损失显着的预测能力。此外,与目标变量的相关性可以突出显示哪些特征最相关。但是,原始 Notebook 在不这样做的情况下也获得了良好的结果。

虽然我们的大部分 V* 特征都在同一范围内,但我们可以从我们早期的 .describe() 中看到 Amount 具有不同的尺度。我们可以通过简单的箱线图确认此尺度。

@clickhouse.sql_transformation(inputs=[creditcard])
def amountQuantitles(creditcard):
    return "SELECT arrayJoin(quantiles(0, 0.25, 0.5, 0.75, 1.)(Amount)) AS Amount FROM {{creditcard}}"
client.dataframe(amountQuantitles).plot.box()

这证实我们需要在回归模型中使用这些值之前对这些值应用标量。

box_plot.png

虽然决策树可以处理缺失值,但 Logistic 回归技术无法在模型拟合过程中固有地直接处理缺失数据。虽然有一些处理此问题的技术,但我们应该事先确认是否需要这样做。此外,识别重复项也很有帮助。这两者都可以通过两个简单的转换来处理。

@clickhouse.sql_transformation(inputs=[creditcard])
def anynull_creditcard(creditcard):
    return "SELECT * APPLY x -> sum(if(x IS NULL, 1, 0)) FROM {{creditcard}}"

client.dataframe(anynull_creditcard, limit=1).melt()

@clickhouse.sql_transformation(inputs=[creditcard])
def credit_duplicates(creditcard):
    return "SELECT *, count() AS cnt FROM {{creditcard}} GROUP BY * HAVING cnt > 1"

client.dataframe(credit_duplicates, limit=1)

我们不在此处包含结果(大量无聊的 0 和空表!),但我们的数据没有重复项或缺失值。

缩放值

缩放特征是在尝试训练 Logistic 回归模型之前的重要步骤,该模型使用梯度下降算法来优化模型的成本函数。当特征处于不同的尺度时,不平衡会导致收敛缓慢,因为学习算法在某些方向上步长较小,而在其他方向上步长较大,可能会振荡或采取较长的路径才能达到最小值。

原始 Notebook 使用 Scikit Learn 中提供的 StandardScalar 缩放了数据集的所有列。我们使用另一个转换来复制这一点。

@clickhouse.sql_transformation(inputs=[creditcard])
def scaled_credit_cards(creditcard):
    column_averages = ', '.join([f'avg(V{i})' for i in range(1, 29)])
    column_std_deviations = ', '.join([f'stddevPop(V{i})' for i in range(1, 29)])
    columns_scaled = ', '.join(f'(V{i} - avgs[{i}]) / stds[{i}] AS V{i}' for i in range(1,29))
    return f"WITH ( SELECT [{column_averages}, avg(Amount)] FROM {{{{creditcard}}}} ) AS avgs, (SELECT [{column_std_deviations}, stddevPop(Amount)] FROM {{{{creditcard}}}}) AS stds SELECT id, { columns_scaled }, (Amount - avgs[29])/stds[29] AS Amount, Class FROM {{{{creditcard}}}}"

client.apply()

在这里,我们计算每列的平均值和标准差,然后计算并返回列 value - avg/std.deviation - 从而复制 StandardScalar 操作。

此转换的结果表示数据的版本,该版本作为表存储在 ClickHouse 中,我们可以将其用于模型训练。

模型训练

在数据缩放后,我们可以训练我们的第一个模型。使用 ClickHouse 作为训练源的优势之一是它能够快速聚合和转换数据。Featureform 物化这些转换,将它们存储为新表,以便快速迭代和重用。虽然我们的转换在这里非常简单 - 一个简单的标量函数,但它也可能是对数万亿行数据进行 GROUP BY 以返回包含数千行的结果集。

在此角色中,ClickHouse 现在充当离线存储以提供数据。

作为第一步,我们需要定义一个实体。根据我们之前的描述,此实体由一组特征和一个类组成。在 Featureform 中,这很简单

@ff.entity
class Transaction:
    # Register multiple columns from a dataset as features
    transaction_features = ff.MultiFeature(
        scaled_credit_cards,
        client.dataframe(scaled_credit_cards, limit=10),
        include_columns=[f"V{i}" for i in range(1, 29)] + ["Amount"],
        entity_column="id",
        exclude_columns=["Class"],
    )
    fraud = ff.Label(
        scaled_credit_cards[["id", "Class"]], type=ff.Bool,
    )

请注意,我们需要指定实体标识符(id 列)并使用我们先前创建的转换 scaled_credit_cards 作为源。我们从特征中排除 Class,正如您可能预期的那样,因为它我们的目标标签,并相应地指定它。

在定义实体后,我们可以注册训练集。此处的术语可能有点令人困惑,因为这将同时包含我们的训练集和测试集。由于我们不进行任何超参数调整或尝试识别最佳算法,因此我们现在跳过使用验证集。

fraud_training_set = ff.register_training_set(
    "fraud_training_set",
    label=Transaction.fraud,
    features=Transaction.transaction_features,
)
client.apply()

与往常一样,我们的资源是延迟评估的,因此我们显式调用 apply。这会导致创建我们的训练集,再次作为版本化表在 ClickHouse 中创建,该表仅通过 Featureform API 公开。

在底层,每个特征都由其自己的表表示。这允许新实体由特征组成。这在我们的简单示例中可能不是必需的,但它解锁了协作和可重用性,我们稍后将讨论。

在我们的“训练集”准备就绪后,我们现在需要将其拆分为训练集和验证集。对于 Featureform,这是一个简单的调用,要求我们指定拆分比率。为此,我们使用经典的 80/20 拆分。

# fetch the training set through the FF client
ds = client.training_set(fraud_training_set)
# split our training set ino a train and test set
train, test = ds.train_test_split(test_size=0.2, train_size=0.8, shuffle=True, random_state=3, batch_size=1000)

shuffle 确保 ClickHouse 以随机顺序传递数据(考虑到我们下面的训练方法,这很有用)。我们还将 random_state 指定为对此随机化的种子,这样如果我们多次调用 Notebook,则数据将以相同的随机顺序传递。

此时,我们可以简单地将 train 加载到内存中的数据帧中,并使用它来训练模型。为了展示如何处理大于本地内存的数据集,我们演示了一种迭代方法来使用数据。

这要求我们偏离原始 Notebook,并使用增量方法进行训练 - 具体来说,我们使用 随机梯度下降 (SGD) 算法和 log_loss 函数来训练 Logistic 回归模型。

clf = SGDClassifier(loss='log_loss')
for features, label in train:
    clf.partial_fit(features, label, classes=[True,False])

我们的 Featureform 训练数据集可以有效地迭代,为我们提供先前定义大小为 1000 的批次。这些批次用于部分和增量地拟合我们的回归模型。

训练完成后(这可能需要几秒钟,具体取决于资源),我们可以利用我们的测试数据集来评估我们的模型。为了简洁起见,我们将跳过针对我们的训练集评估性能。

from sklearn.metrics import confusion_matrix
def model_eval(actual, predicted):
    acc_score = accuracy_score(actual, predicted)
    conf_matrix = confusion_matrix(actual, predicted)
    clas_rep = classification_report(actual, predicted)
    print('Model Accuracy is: ', round(acc_score, 2))
    print(conf_matrix)
    print(clas_rep)

def plot_confusion_matrix(cm, classes=None, title='Confusion matrix'):
    """Plots a confusion matrix."""
    if classes is not None:
        sns.heatmap(cm, xticklabels=classes, yticklabels=classes, vmin=0., vmax=1., annot=True)
    else:
        sns.heatmap(cm, vmin=0., vmax=1.)
    plt.title(title)
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

# Make a test prediction
pred_test= np.array([])
label_test = np.array([])
for features, label in test:
    batch_pred = np.array(clf.predict(features))
    pred_test = np.concatenate([pred_test, batch_pred])
    label_test = np.concatenate([label_test, np.array(label)])
model_eval(label_test, pred_test)
cm = confusion_matrix(label_test, pred_test)
plot_confusion_matrix(cm, classes=[True, False], title='')

此处定义的方法只是为我们提供了模型准确率指标并绘制了我们的混淆矩阵。请注意,我们如何分批进行预测,使用 test 上的迭代器,并在计算准确率和绘制混淆矩阵之前收集这些预测。

Model Accuracy is:  0.96
[[55024  1825]
 [ 2358 54519]]
              precision    recall  f1-score   support

         0.0       0.96      0.97      0.96     56849
         1.0       0.97      0.96      0.96     56877

    accuracy                           0.96    113726
   macro avg       0.96      0.96      0.96    113726
weighted avg       0.96      0.96      0.96    113726

sgd_confusion.png

在没有调优的情况下,这里的结果令人印象深刻,准确率达到 96%。原始 Notebook 更进一步,训练了决策树和随机森林分类器,然后最终使用了 xgboost 方法。鉴于随机森林在此处提供了最佳性能,声称在测试集和训练集上的准确率均为 100%,我们认为我们应该尝试使用增量方法重现这一点。

继续采用在线学习方法,我们通过 River Library 使用 Hoeffding Adaptive Tree 分类器。此算法基于 Hoeffding Adaptive Tree 算法,这是一种决策树形式,它使用 Hoeffding 边界来确定以高置信度拆分节点所需的最小样本数,确保所选拆分接近于在给定无限数据的情况下可以做出的最佳拆分。这种方法允许树增长并适应随数据流而变化的数据分布。随着数据流动,该算法不断评估其拆分(节点)的性能,并且可以通过用新的分支替换性能不佳的分支来进行调整。

使用 Featureform API 训练此模型与我们之前的 SGD 示例非常相似。

from river import tree
from river import metrics

ds = client.training_set(fraud_training_set)
train, test = ds.train_test_split(test_size=0.2, train_size=0.8, shuffle=True, random_state=3, batch_size=1000)

# Initialize the HoeffdingAdaptiveTreeClassifier
model = tree.HoeffdingAdaptiveTreeClassifier()
# we need to pivot into [({"feature1": value1, "feature2": value2}, label), ...] for the model
feature_names = [t.name for t in Transaction.transaction_features]
n = 0
for features, label in train:
    # Update the model with the current instance
    for i, feature in enumerate(features):
        feature_dict = dict(zip(feature_names, feature))
        model.learn_one(feature_dict, label[i])
        n += 1
    print(f'{n} processed')

此模型在我们的 Macbook 2023 上大约需要 10 分钟才能完成训练。训练完成后,我们可以评估模型性能

predicted_labels = []
actual_labels = []

for features, label in test:
    for i, feature in enumerate(features):
        feature_dict = dict(zip(feature_names, feature))
        predicted_labels.append(model.predict_one(feature_dict))
        actual_labels.append(label[i])

# Evaluate the model
model_eval(actual_labels, predicted_labels)

# Plot the confusion matrix
cm = confusion_matrix(actual_labels, predicted_labels)
plot_confusion_matrix(cm)
Model Accuracy is:  0.98
[[55715  1067]
 [ 1234 55710]]
              precision    recall  f1-score   support

           0       0.98      0.98      0.98     56782
           1       0.98      0.98      0.98     56944

    accuracy                           0.98    113726
   macro avg       0.98      0.98      0.98    113726
weighted avg       0.98      0.98      0.98    113726

confusion_matrix_decision.png

因此,质量可能不是 100% 完美,但我们已经设法通过在线技术获得了与原始 notebook 相当的性能。 这是一种过于简单化的问题处理方法,并且我们无疑受益于数据已被清理(或人为生成?)。 我们可能还会更喜欢一种更侧重于最小化假阴性(在上述混淆矩阵中大约 50 万个样本中出现 1 千个假阴性)的模型。 我们将进一步的模型改进作为用户的练习,并欢迎对我们的工作提出批评!

共享和版本控制

以上所有内容可能看起来像任何其他 notebook,只是对 ClickHouse 进行了简单的抽象。 然而,在幕后,Featureform 管理状态的创建和版本控制。 当前状态可以通过用户界面 (https://127.0.0.1) 查看,其中显示了系统中跟踪的当前特征、实体、标签和训练集。

sharing_versioning.png

此状态对于 notebook 重启是稳健的(它由 Featureform 持久化在本地数据库中),但更重要的是,允许其他数据科学家和数据工程师使用这些对象。 例如,上述特征可以被另一位工程师用来组合不同的实体,而该实体本身又可以被共享。 所有这一切都通过上述 Python API 实现,该 API 服务于这些对象并支持协作。

如果您希望更深入地探索 Featureform,我们建议您熟悉它提供的抽象。 我们还只探讨了特征和实体在 ClickHouse 中以表(或视图)的形式物化的情况。 Featureform 还提供了许多其他功能,包括 按需特征流式处理能力,适用于不适合批量处理的场景。

在本博客中,我们提到了 Featureform 中的对象是如何版本控制的,但没有明确展示如何操作。 Featureform 使用一种称为变体的机制来对数据源、转换、特征、标签和训练集进行版本控制。 默认情况下,这些资源中的每一个都是不可变的,确保您可以自信地使用其他人创建的版本化资源,而不会因上游修改而导致中断的风险。

对于每个资源,都存在一个变体参数,可以手动或自动配置。 考虑到特征的后续变体不一定是优于先前变体的改进(不幸的是,机器学习不是线性的改进过程),因此术语“变体”优于“版本”。 在我们的示例中,我们只是依赖于自动版本控制。 如果我们更改了流程中的任何资源(例如,更改了用于我们实体的特征),这将创建一个新的资源变体并触发所有依赖资源的重建。 这种沿袭跟踪需要有向无环图 (DAG),并采用类似于用户可能熟悉的 dbt 等工具的技术。

结论

在本文中,我们探讨了如何将 ClickHouse 用作特征存储来训练使用 Featureform 的机器学习模型。 在此角色中,ClickHouse 既充当转换引擎又充当离线存储。 我们仅使用 SQL 转换和扩展了欺诈数据集,并使用后续数据来增量训练 Logistic 回归和基于决策树的模型,以预测交易是否具有欺诈性。 在此过程中,我们评论了如何使用带有特征存储的 SQL 数据库来实现特征工程的协作方法,提高特征的可用性,从而缩短模型迭代时间。 在未来的文章中,我们将探讨如何将此模型投入生产,并与 AWS Sagemaker 等工具集成以进行模型训练。

立即开始使用 ClickHouse Cloud,并获得 300 美元的信用额度。 在 30 天试用期结束时,继续使用按需付费计划,或联系我们以了解有关我们基于用量的折扣的更多信息。 访问我们的定价页面了解详情。

分享这篇文章

订阅我们的新闻通讯

随时了解功能发布、产品路线图、支持和云产品!
正在加载表单...
关注我们
X imageSlack imageGitHub image
Telegram imageMeetup imageRss image