这篇文章演示了如何通过特征存储使用 ClickHouse 中的数据来训练模型。作为其中的一部分,我们还展示了数据科学家和工程师在探索数据集和准备特征时需要执行的常见任务,如何使用 ClickHouse 在潜在的 PB 级数据集上通过 SQL 在几秒钟内完成。为了协助特征创建,我们使用了开源特征存储 Featureform,ClickHouse 最近已与之集成。
我们首先快速回顾一下用户为什么要使用 ClickHouse 来训练机器学习模型以及特征存储如何帮助实现此过程。熟悉这些概念的用户可以直接跳到下面的示例。
这篇文章将是我们将使用 Featureform 训练模型的一系列文章中的第一篇,逐步构建一个更复杂的机器学习平台,并配备一套工具将我们的模型投入生产。在这篇初始文章中,我们演示了使用特征存储的构建块,以帮助使用 ClickHouse 中的数据训练基于逻辑回归和决策树的分类器。
特征 ≈ 列
我们在本文中使用了“特征”一词。提醒一下,特征是实体的某些属性,对机器学习 (ML) 模型具有预测能力。从这个意义上说,实体是特征的集合以及表示现实世界概念的类别或标签。如果质量足够高,并且存在这种关系,则这些特征应该有助于预测实体的类别。例如,银行交易可以被视为一个实体。这可能包含诸如交易金额和参与的购买/卖家等特征,其中类别描述交易是否欺诈。
在结构化数据的情况下,我们可以将特征视为一列——来自表或结果集。我们在这里互换使用这些术语,但值得记住的是,特征通常需要一些先验的数据工程步骤和数据转换逻辑才能使用。
特征存储
我们最近发布了一篇博文,描述了不同类型的特征存储、为什么您可能需要一个特征存储以及它们的 utama komponen。作为其中的一部分,我们探讨了如何使用它们来训练机器学习模型。下面,我们对新手做一个简短的回顾。
总之,特征存储是用于存储、处理和访问模型训练、推理和评估中常用的特征的集中式中心。此抽象提供了诸如版本控制、访问管理以及自动将特征定义转换为 SQL 语句等便利功能。
这里的主要价值在于提高特征的协作和可重用性,从而减少模型迭代时间。通过从数据科学家那里抽象出数据工程的复杂性,并且只通过 API 公开经过版本控制的高质量特征,可以提高模型的可靠性和质量。
特征存储包含许多关键组件。在本博文中,我们重点关注两个:**转换引擎**和**离线存储**。
在任何模型进行训练之前,必须首先分析数据以了解其特征、分布和关系。这个评估和理解的过程成为一个迭代过程,导致一系列临时查询,这些查询通常聚合并在整个数据集上计算指标。执行此任务的数据科学家需要查询响应能力才能快速迭代(以及其他因素,例如成本效益和准确性)。原始数据很少是干净且格式良好的,因此必须在用于训练模型之前进行转换。所有这些任务都需要一个**转换和查询引擎**,该引擎理想情况下可以扩展并且不受内存限制。
**离线存储**保存转换产生的特征,并在训练期间将它们提供给模型。这些特征通常作为实体分组,并与标签(目标预测)相关联。通常,模型需要选择性地使用这些特征,无论是迭代地还是通过聚合,可能多次并且以随机顺序。模型通常需要多个特征,这需要将特征组合在一个“特征组”中——通常通过实体 ID 和时间维度。这要求离线存储能够为特定时间点提供特征和标签的正确版本。这种“时间点正确性”通常对模型至关重要,因为模型需要增量训练。
ClickHouse 作为转换引擎和离线存储
虽然 ClickHouse 是机器学习模型(例如点击流量)的自然数据来源,但它也非常适合转换引擎和离线存储的角色。这提供了优于其他方法的几个独特优势。
作为针对聚合进行了优化的实时数据仓库,并且能够扩展到 PB 级数据集,ClickHouse 允许用户使用熟悉的 SQL 语言执行转换。与其需要将数据从现有数据库流式传输到计算框架(例如 Spark),不如将数据存储在 ClickHouse 中,并将任何探索性和转换工作处理在源处。
ClickHouse 预构建的统计和聚合函数使此 SQL 易于编写和维护。从根本上说,这种架构受益于数据本地性,提供无与伦比的性能,并允许通过聚合查询将数十亿行精简到几千行。
这些转换的结果也可以通过INSERT INTO SELECT 语句持久化到 ClickHouse 中,或者简单地作为视图公开。由于转换通常按实体 ID 分组并返回多个列作为结果,因此 ClickHouse 的模式推断可以自动从这些结果中检测所需的类型并生成相应的表模式以存储它们。
然后,这些生成的表和视图可以构成离线存储的基础,为模型训练提供数据。
特征有效地被提取并表示为返回表格数据的 SQL 查询。
总之,通过将 ClickHouse 用作转换引擎和离线存储,用户可以受益于数据本地性和 ClickHouse 在集群中并行执行计算密集型任务的能力。这允许离线存储扩展到 PB,使特征存储充当轻量级协调层和 API,通过该层访问和共享数据。
这些图表突出了特征存储的其他关键组件,特别是**在线存储**。一旦模型训练完成,就会部署用于实时预测,这需要立即数据(例如用户的 ID)和预计算特征(例如历史购买记录),因为实时生成这些特征成本过高。这些特征存储在在线存储中以便快速访问,对于延迟敏感的任务(如欺诈检测)至关重要。它们会从离线存储中更新,以确保反映最新数据。有关更多信息,请参阅我们之前的博文。虽然 ClickHouse 可以用作在线存储,但在本文中,我们重点关注训练示例,因此关注转换引擎和离线存储的作用。
Featureform
我们之前的文章探讨了不同类型的特征存储:物理存储、字面存储和虚拟存储。虚拟存储最适合 ClickHouse,使其有机会同时用作离线存储和转换引擎。在这种架构中,特征存储不负责管理转换以及特征的持久性和版本控制,而仅充当协调器。
为了实现我们由 ClickHouse 加速的虚拟特征存储的愿景,我们确定 Featureform 是一个理想的集成解决方案。Featureform 除了是开源的,允许我们轻松贡献之外,还提供了针对离线存储、在线存储和向量数据库的成熟(按设计)集成点。
开始使用 Featureform 和 ClickHouse
首先,我们使用简单的 pip 安装来安装 featureform
pip install featureform
虽然 Featureform 采用了一种模块化架构,可以部署在 Kubernetes 中,但对于我们的初始示例,我们将使用优秀的入门体验。在此模式下,架构的所有组件都部署为单个 Docker 容器。更好的是,一个简单的标志确保 ClickHouse 与预加载的测试数据集一起部署。
(.venv) clickhouse@PY test_project % featureform deploy docker --quickstart --include_clickhouse
DeprecationWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html
Deploying Featureform on Docker
Starting Docker deployment on Darwin 23.3.0
Checking if featureform container exists...
Container featureform not found. Creating new container...
'featureform' container started
Checking if quickstart-postgres container exists...
Container quickstart-postgres not found. Creating new container...
'quickstart-postgres' container started
Checking if quickstart-redis container exists...
Container quickstart-redis not found. Creating new container...
'quickstart-redis' container started
Checking if quickstart-clickhouse container exists...
Container quickstart-clickhouse not found. Creating new container...
'quickstart-clickhouse' container started
…
Featureform is now running!
To access the dashboard, visit https://127.0.0.1:80
Run jupyter notebook in the quickstart directory to get started.
这为我们提供了 ClickHouse、FeatureForm、Redis 和 Postgres 容器。目前我们可以忽略后两者。
测试数据集
此时,我们可以通过 ClickHouse 客户端连接到我们的 ClickHouse 容器,并确认测试数据集已加载。
(.venv) dalemcdiarmid@PY test_project % clickhouse client
ClickHouse client version 24.1.1.1286 (official build).
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 24.2.1.
55733a345323 :) SHOW TABLES IN fraud
SHOW TABLES FROM fraud
┌─name───────┐
│ creditcard │
└────────────┘
1 row in set. Elapsed: 0.005 sec.
对于我们的测试数据集,我们使用了一个流行的欺诈数据集在 Kaggle 上分发,包含超过 550,000 条匿名交易,目的是开发欺诈检测算法。
灵感来源
希望将这篇博文重点放在使用 ClickHouse 和 Featureform 的机制上,我们寻找了之前成功为此数据集训练模型的努力。
认识到数据科学家通常更喜欢在 Notebook 环境中工作,Kaggle 上有一些尝试将模型拟合到此数据集,对交易是否属于欺诈进行分类。我们被"信用卡欺诈检测,准确率 100%"这个有点不切实际的标题所吸引,但也对代码的布局和有条理的特性印象深刻,因此将其用作我们示例的基础。感谢Anmol Arora 的贡献。
我们承认,这些笔记本中使用的模型(逻辑回归/决策树)并不代表分类的“最先进”算法。但是,这篇博文的目的是展示如何使用 ClickHouse 和 Featureform 训练模型,而不是展示预测欺诈的“最佳”或“最新”技术。随意调整任何示例以使用更现代的技术!附带说明:我也不是机器学习专家。
连接 Featureform 和 ClickHouse
请记住,Featureform 将提供特征存储接口,通过该接口我们可以访问 ClickHouse 中的数据,从而允许我们定义可重用且版本化的特征,这些特征可以传递给模型进行训练。这需要我们将 ClickHouse 服务器注册到 Featureform。这只需要几行代码,我们首先定义 Featureform 客户端(对于此简单的测试示例,它是本地且不安全的),然后注册我们的本地 ClickHouse 实例。
请注意,我们使用环境变量
FEATUREFORM_HOST
来指定 Featureform 实例的位置。这些连接详细信息也可以直接传递给客户端。
# install any dependencies into our notebook
!pip install featureform==1.12.6
!pip install river -U
!pip install scikit-learn -U
!pip install seaborn
!pip install googleapis-common-protos
!pip install matplotlib
!pip install matplotlib-inline
!pip install ipywidgets
%env FEATUREFORM_HOST=localhost:7878
%matplotlib inline
from featureform import Client
import featureform as ff
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import SGDClassifier, LogisticRegression
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score
import numpy as np
# Featureform client. Local instance and insecure
client = Client(insecure=True)
# Register our local container with Featureform
clickhouse = ff.register_clickhouse(
name="clickhouse",
description="A ClickHouse deployment for example",
host="host.docker.internal",
port=9000,
user="default",
password="",
database="fraud"
)
client.apply(verbose=True)
Featureform 中的
client.apply()
方法创建了定义的资源。否则,这些资源会被延迟评估,并在需要时(即在下游使用时)创建。注册此数据集会导致它在 ClickHouse 中使用INSERT INTO SELECT
进行内部复制,有效地为 Featureform 版本控制的分析创建数据的不可变副本。有关更多详细信息,请参见下文。
注册表
除了充当转换引擎和离线存储外,ClickHouse 首先也是 ML Ops 工作流中的数据源。在我们可以从欺诈表中转换和提取特征之前,我们首先需要在 Featureform 中注册此表。这将在 Featureform 中创建必要的元数据,以便我们可以对数据进行版本控制和跟踪。这方面的代码非常少。
creditcard = clickhouse.register_table(
name="creditcard",
table="creditcard",
)
data = client.dataframe(creditcard, limit=100)
data.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 31 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 id 100 non-null int64
1 V1 100 non-null float64
2 V2 100 non-null float64
3 V3 100 non-null float64
4 V4 100 non-null float64
5 V5 100 non-null float64
6 V6 100 non-null float64
7 V7 100 non-null float64
8 V8 100 non-null float64
9 V9 100 non-null float64
10 V10 100 non-null float64
11 V11 100 non-null float64
12 V12 100 non-null float64
13 V13 100 non-null float64
14 V14 100 non-null float64
15 V15 100 non-null float64
16 V16 100 non-null float64
17 V17 100 non-null float64
18 V18 100 non-null float64
19 V19 100 non-null float64
20 V20 100 non-null float64
21 V21 100 non-null float64
22 V22 100 non-null float64
23 V23 100 non-null float64
24 V24 100 non-null float64
25 V25 100 non-null float64
26 V26 100 non-null float64
27 V27 100 non-null float64
28 V28 100 non-null float64
29 Amount 100 non-null float64
30 Class 100 non-null int64
dtypes: float64(29), int64(2)
memory usage: 24.3 KB
我们这里第二行将底层数据的 100 行作为数据帧获取 - 此LIMIT
会下推到 ClickHouse,为我们提供了最后一次查看我们正在使用的列的机会。
这与数据集描述一致,其中列V1
到V28
提供了匿名特征以及交易Amount
。我们的 Class 表示交易是否为欺诈,用值为 1 和 0 的布尔值表示。
转换数据
在 Featureform 中注册了数据集后,我们可以开始使用 ClickHouse 作为转换引擎。
为了有效地训练任何分类器,我们应该了解类的分布。
虽然我们可以在之前的数据帧上计算它,但这将我们的分析限制为它包含的 100 行。
data['Class'].value_counts().plot.pie(autopct='%3.1f%%',shadow=True, legend= True,startangle =45)
plt.title('Distribution of Class',size=14)
plt.show()
理想情况下,我们希望为表中的所有行计算此值。对于我们的数据集,我们可能只需通过删除限制将 550k 行的整个数据集加载到内存中。但是,从网络或内存的角度来看,这对于更大的数据集是不可行的。为了将这项工作下推到 ClickHouse,我们可以定义一个转换。
@clickhouse.sql_transformation(inputs=[creditcard])
def classCounts(creditcard):
return "SELECT Class,count() c FROM {{creditcard}} GROUP BY Class"
client.dataframe(classCounts).pivot_table(index='Class', columns=None, values='c', aggfunc='sum').plot.pie(
explode=[0.1, 0], autopct='%3.1f%%', shadow=True, legend=True, startangle=45, subplots=True)
我们在这里使用简单的聚合来计算每个类的计数。请注意查询中模板的使用。变量{{creditcard}}
将被信用卡数据集的表名替换。这不会是原始表 - 正如我们前面提到的,由于 Featureform 在 ClickHouse 中创建了数据的不可变副本,因此这将是一个具有特定于版本的名称的表。创建后,我们可以将转换结果转换为数据帧并绘制结果。
此数据集是平衡的(50% 欺诈和 50% 非欺诈)。虽然这大大简化了未来的训练,但它让我们怀疑此数据集是否为人工数据集。这也可能极大地导致原始笔记本中宣传的相当高的准确率分数。
以我们的主要数据作为原材料,然后将其转换为包含为服务和训练机器学习模型所需的一组特征和标签的数据集。这些转换可以直接应用于主要数据集,也可以在其他先前转换的数据集上进行排序和执行。必须了解,Featureform 本身不执行数据转换。相反,它协调 ClickHouse 执行转换。转换的结果也将作为版本化的表存储在 ClickHouse 中,以便以后快速检索 - 这个过程称为物化。
类似地,在首次探索数据时,用户通常会对数据帧使用describe
方法来了解列的属性。
data.describe()
这需要我们为数据集中每一列计算多个统计数据。不幸的是,此处的 SQL 查询非常复杂,因为它利用了动态列选择。
import re
@clickhouse.sql_transformation(inputs=[creditcard])
def describe_creditcard(creditcard):
return "SELECT * APPLY count, * APPLY avg, * APPLY std, * APPLY x -> (quantiles(0.25)(x)[1]), * APPLY x -> (quantiles(0.5)(x)[1]), * APPLY x -> (quantiles(0.75)(x)[1]), * APPLY min, * APPLY max FROM {{creditcard}}"
df = client.dataframe(describe_creditcard, limit=1)
df_melted = df.melt()
df_melted['Var'] = df_melted['variable'].apply(lambda x: re.search(r"\((\w*)\)", x).group(1))
df_melted['Statistic'] = df_melted['variable'].apply(
lambda x: re.search(r"(.*)\(\w*\)", x).group(1).replace('arrayElement(', ''))
df_melted.pivot(index='Statistic', columns='Var', values='value').reset_index()
在 Featureform 集成的后续版本中,我们希望将这种复杂性从用户那里抽象出来。理想情况下,
describe()
应该为用户制定所需的 ClickHouse 查询并为用户透明地执行此查询。敬请期待。
我们的最终笔记本(用户可以在Featureform 示例存储库中找到),包含许多旨在执行数据分析的转换。这些重现了原始笔记本作者使用 pandas 直接完成的工作。我们下面重点介绍了一些更有趣的转换。
相关矩阵可以帮助我们了解数据集中变量/列之间的任何线性关系。在 pandas 中,这需要在数据帧上进行corr
方法调用。此特定操作在非常大的数据集上计算量非常大。在 SQL 中,这需要使用 ClickHouse 的corrMatrix
函数。以下转换将结果透视,使其与流行的Seaborn 可视化库预期的格式保持一致。
@clickhouse.sql_transformation(inputs=[creditcard])
def credit_correlation_matrix(creditcard):
return """WITH matrix AS
(
SELECT arrayJoin(arrayMap(row -> arrayMap(col -> round(col, 3), row),
corrMatrix(id, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13, V14,
V15, V16, V17, V18, V19, V20, V21, V22, V23, V24, V25, V26, V27, V28, Amount, Class))) AS matrix
FROM {{creditcard}}
)
SELECT
matrix[1] AS id, matrix[2] AS V1, matrix[3] AS V2, matrix[4] AS V3, matrix[5] AS V4,
matrix[6] AS V5, matrix[7] AS V6, matrix[8] AS V7, matrix[9] AS V8, matrix[10] AS V9,
matrix[11] AS V10, matrix[12] AS V11, matrix[13] AS V12, matrix[14] AS V13,
matrix[15] AS V14, matrix[16] AS V15, matrix[17] AS V16, matrix[18] AS V17,
matrix[19] AS V18, matrix[20] AS V19, matrix[21] AS V20, matrix[22] AS V21,
matrix[23] AS V22, matrix[24] AS V23, matrix[25] AS V24, matrix[26] AS V25,
matrix[27] AS V26, matrix[28] AS V27, matrix[29] AS V28, matrix[30] AS Amount,
matrix[31] AS Class
FROM matrix"""
client.dataframe(credit_correlation_matrix)
paper = plt.figure(figsize=[20, 10])
sns.heatmap(client.dataframe(credit_correlation_matrix, limit=100), cmap='crest', annot=True)
plt.show()
由此,我们可以得出与原始笔记本相同的观察结果,即我们的一些特征/列高度相关。
- V17 和 V18
- V16 和 V17
- V16 和 V18
- V14 与 V4 负相关
- V12 也与 V10 和 V11 负相关。
- V11 与 V10 负相关,与 V4 正相关。
- V3 与 V10 和 V12 正相关。
- V9 和 V10 也正相关。
- 几个特征与目标类别变量显示出很强的相关性。
逻辑回归假设自变量之间不存在完全的多重共线性。因此,我们可能希望在训练之前删除一些特征及其相关的列,这些特征和列是高度相关的。
通常,我们可能会使用这里的结果来识别冗余特征。两个特征之间的高度相关性表明它们可能传达了类似的信息,并且可以在不损失显着预测能力的情况下删除其中一个。此外,与目标变量的相关性可以突出显示哪些特征最相关。但是,原始笔记本在没有执行此操作的情况下获得了良好的结果。
虽然我们的大多数V*
特征都在相同的范围内,但我们可以从我们之前的.describe()
中看到Amount
具有不同的比例。我们可以使用简单的箱线图确认此比例。
@clickhouse.sql_transformation(inputs=[creditcard])
def amountQuantitles(creditcard):
return "SELECT arrayJoin(quantiles(0, 0.25, 0.5, 0.75, 1.)(Amount)) AS Amount FROM {{creditcard}}"
client.dataframe(amountQuantitles).plot.box()
这确认我们在将这些值用于任何回归模型之前需要对其应用标量。
虽然决策树可以处理缺失值,但逻辑回归技术本身无法在模型拟合过程中直接处理缺失数据。虽然有处理此问题的技术,但我们应该事先确认是否需要这样做。此外,识别重复项也很有帮助。这两者都可以通过两个简单的转换来处理。
@clickhouse.sql_transformation(inputs=[creditcard])
def anynull_creditcard(creditcard):
return "SELECT * APPLY x -> sum(if(x IS NULL, 1, 0)) FROM {{creditcard}}"
client.dataframe(anynull_creditcard, limit=1).melt()
@clickhouse.sql_transformation(inputs=[creditcard])
def credit_duplicates(creditcard):
return "SELECT *, count() AS cnt FROM {{creditcard}} GROUP BY * HAVING cnt > 1"
client.dataframe(credit_duplicates, limit=1)
我们这里不包含结果(很多无聊的 0 和空表!),但我们的数据没有重复项或缺失值。
缩放值
在尝试训练逻辑回归模型之前,缩放特征是一个重要的步骤,该模型使用梯度下降算法来优化模型的成本函数。当特征处于不同的尺度时,这种不平衡会导致收敛速度变慢,因为学习算法在某些方向上会采取较小的步骤,而在其他方向上会采取较大的步骤,可能导致振荡或需要更长的时间才能达到最小值。
原始笔记本使用 Scikit Learn 中提供的StandardScalar对所有列缩放数据集。我们使用另一个转换来复制此操作。
@clickhouse.sql_transformation(inputs=[creditcard])
def scaled_credit_cards(creditcard):
column_averages = ', '.join([f'avg(V{i})' for i in range(1, 29)])
column_std_deviations = ', '.join([f'stddevPop(V{i})' for i in range(1, 29)])
columns_scaled = ', '.join(f'(V{i} - avgs[{i}]) / stds[{i}] AS V{i}' for i in range(1,29))
return f"WITH ( SELECT [{column_averages}, avg(Amount)] FROM {{{{creditcard}}}} ) AS avgs, (SELECT [{column_std_deviations}, stddevPop(Amount)] FROM {{{{creditcard}}}}) AS stds SELECT id, { columns_scaled }, (Amount - avgs[29])/stds[29] AS Amount, Class FROM {{{{creditcard}}}}"
client.apply()
在这里,我们首先计算每一列的平均值和标准差,然后计算并返回 value - avg/std.deviation
- 从而复制 StandardScalar 操作。
此转换的结果表示 ClickHouse 中以表格形式存储的数据的版本,我们可以将其用于模型训练。
模型训练
在我们的数据进行缩放后,我们可以训练我们的第一个模型。使用 ClickHouse 作为训练源的优势之一是它能够快速聚合和转换数据。Featureform 将这些转换具体化,并将它们存储为一个新表,以便快速迭代和重用。虽然我们这里的转换非常简单 - 一个简单的标量函数,但它也可能是在数万亿行上进行 GROUP BY
以返回数千行的结果集。
在此角色中,ClickHouse 现在充当离线存储来提供数据。
第一步,我们需要定义一个实体。根据我们之前的描述,此实体包含一组特征和一个类别。在 Featureform 中,这很简单
@ff.entity
class Transaction:
# Register multiple columns from a dataset as features
transaction_features = ff.MultiFeature(
scaled_credit_cards,
client.dataframe(scaled_credit_cards, limit=10),
include_columns=[f"V{i}" for i in range(1, 29)] + ["Amount"],
entity_column="id",
exclude_columns=["Class"],
)
fraud = ff.Label(
scaled_credit_cards[["id", "Class"]], type=ff.Bool,
)
请注意,我们需要指定一个实体标识符(id
列)并使用我们之前创建的转换 scaled_credit_cards
作为源。正如您所料,我们将 Class
从我们的特征中排除,因为它是我们目标标签,并相应地指定它。
定义了实体后,我们可以注册一个训练集。这里的术语可能有点令人困惑,因为它将包含我们的训练集和测试集。我们现在将跳过使用验证集,因为我们没有进行任何超参数调整或尝试识别最佳算法。
fraud_training_set = ff.register_training_set(
"fraud_training_set",
label=Transaction.fraud,
features=Transaction.transaction_features,
)
client.apply()
像往常一样,我们的资源是延迟计算的,因此我们显式调用 apply
。这将导致创建我们的训练集,同样作为 ClickHouse 中的一个版本化表,并且只能通过 Featureform API 访问。
在后台,每个特征都由其自己的表表示。这允许新实体由特征组成。在我们的简单示例中,这可能不是必需的,但它释放了协作和可重用性,我们将在后面讨论。
我们的“训练集”准备就绪后,我们现在需要将其拆分为训练集和验证集。对于 Featureform,这是一个简单的调用,要求我们指定拆分的比例。为此,我们使用经典的 80/20 拆分。
# fetch the training set through the FF client
ds = client.training_set(fraud_training_set)
# split our training set ino a train and test set
train, test = ds.train_test_split(test_size=0.2, train_size=0.8, shuffle=True, random_state=3, batch_size=1000)
shuffle
确保 ClickHouse 以随机顺序提供数据(鉴于我们下面介绍的训练方法,这很有用)。我们还将 random_state
指定为此随机化的种子,这样如果我们多次调用笔记本,数据将以相同的随机顺序提供。
此时,我们可以简单地将 train
加载到内存中的数据帧中,并使用它来训练模型。为了展示如何处理大于本地内存的数据集,我们演示了一种迭代方法来使用数据。
这要求我们偏离原始笔记本并使用增量方法进行训练 - 特别是,我们使用 具有 log_loss 函数的随机梯度下降 (SGD) 算法 来训练逻辑回归模型。
clf = SGDClassifier(loss='log_loss')
for features, label in train:
clf.partial_fit(features, label, classes=[True,False])
我们的 Featureform 训练数据集可以有效地进行迭代,为我们提供之前定义的 1000 的批次数据。这些用于部分和增量地拟合我们的回归模型。
训练完成后(这可能需要几秒钟,具体取决于资源),我们可以利用我们的测试数据集来评估我们的模型。为简洁起见,我们将跳过根据我们的训练集评估性能。
from sklearn.metrics import confusion_matrix
def model_eval(actual, predicted):
acc_score = accuracy_score(actual, predicted)
conf_matrix = confusion_matrix(actual, predicted)
clas_rep = classification_report(actual, predicted)
print('Model Accuracy is: ', round(acc_score, 2))
print(conf_matrix)
print(clas_rep)
def plot_confusion_matrix(cm, classes=None, title='Confusion matrix'):
"""Plots a confusion matrix."""
if classes is not None:
sns.heatmap(cm, xticklabels=classes, yticklabels=classes, vmin=0., vmax=1., annot=True)
else:
sns.heatmap(cm, vmin=0., vmax=1.)
plt.title(title)
plt.ylabel('True label')
plt.xlabel('Predicted label')
# Make a test prediction
pred_test= np.array([])
label_test = np.array([])
for features, label in test:
batch_pred = np.array(clf.predict(features))
pred_test = np.concatenate([pred_test, batch_pred])
label_test = np.concatenate([label_test, np.array(label)])
model_eval(label_test, pred_test)
cm = confusion_matrix(label_test, pred_test)
plot_confusion_matrix(cm, classes=[True, False], title='')
此处定义的方法仅为我们提供模型准确性指标并绘制混淆矩阵。请注意,我们如何分批进行预测,使用 test
上的迭代器,并在计算准确性和绘制混淆矩阵之前收集这些预测。
Model Accuracy is: 0.96
[[55024 1825]
[ 2358 54519]]
precision recall f1-score support
0.0 0.96 0.97 0.96 56849
1.0 0.97 0.96 0.96 56877
accuracy 0.96 113726
macro avg 0.96 0.96 0.96 113726
weighted avg 0.96 0.96 0.96 113726
在没有调整的情况下,这里的结果令人印象深刻,准确率为 96%。原始笔记本进一步扩展了这一点,训练了决策树和随机森林分类器,最后使用 xgboost 方法。鉴于随机森林在此处提供了最佳性能,在测试集和训练集上都获得了 100% 的准确率,我们认为我们将尝试使用增量方法来重现这一点。
继续使用在线学习方法的方法,我们使用 Hoeffding 自适应树分类器 通过 River 库。该算法基于 Hoeffding 自适应树算法,一种决策树形式,它使用 Hoeffding 界来确定以高置信度拆分节点所需的最小样本数,确保所选拆分接近于在无限数据情况下可以做出的最佳拆分。此方法允许树在数据流式传输时增长并适应不断变化的数据分布。随着数据流入,算法会持续评估其拆分(节点)的性能,并可以通过用新的分支替换性能不佳的分支来进行调整。
使用 Featureform API 训练此模型与我们之前的 SGD 示例非常相似。
from river import tree
from river import metrics
ds = client.training_set(fraud_training_set)
train, test = ds.train_test_split(test_size=0.2, train_size=0.8, shuffle=True, random_state=3, batch_size=1000)
# Initialize the HoeffdingAdaptiveTreeClassifier
model = tree.HoeffdingAdaptiveTreeClassifier()
# we need to pivot into [({"feature1": value1, "feature2": value2}, label), ...] for the model
feature_names = [t.name for t in Transaction.transaction_features]
n = 0
for features, label in train:
# Update the model with the current instance
for i, feature in enumerate(features):
feature_dict = dict(zip(feature_names, feature))
model.learn_one(feature_dict, label[i])
n += 1
print(f'{n} processed')
此模型从我们的 Macbook 2023 开始训练大约需要 10 分钟。训练完成后,我们可以评估模型性能
predicted_labels = []
actual_labels = []
for features, label in test:
for i, feature in enumerate(features):
feature_dict = dict(zip(feature_names, feature))
predicted_labels.append(model.predict_one(feature_dict))
actual_labels.append(label[i])
# Evaluate the model
model_eval(actual_labels, predicted_labels)
# Plot the confusion matrix
cm = confusion_matrix(actual_labels, predicted_labels)
plot_confusion_matrix(cm)
Model Accuracy is: 0.98
[[55715 1067]
[ 1234 55710]]
precision recall f1-score support
0 0.98 0.98 0.98 56782
1 0.98 0.98 0.98 56944
accuracy 0.98 113726
macro avg 0.98 0.98 0.98 113726
weighted avg 0.98 0.98 0.98 113726
因此,质量可能不是 100%,但我们已经设法使用在线技术获得了与原始笔记本相当的性能。这是一种过于简化的解决问题的方法,我们肯定受益于数据已被清理(或人为生成?)。我们可能也更喜欢一个强调最小化假阴性(在上面的混淆矩阵中约 1k/500k)的模型。我们将进一步的模型改进留给用户作为练习,并欢迎您对我们的努力提出批评!
共享和版本控制
以上所有内容可能看起来与任何其他具有 ClickHouse 简单抽象的笔记本一样。但是,在后台,Featureform 管理状态创建和版本控制。可以通过用户界面 (https://127.0.0.1) 查看当前状态,该界面显示系统中跟踪的当前特征、实体、标签和训练集。
此状态对笔记本重启具有鲁棒性(它由 Featureform 保留到本地数据库中),但更重要的是,允许其他数据科学家和数据工程师使用这些对象。例如,上述特征可以由另一位工程师用来组合不同的实体,而该实体本身又可以被共享。所有这些都是通过上述 Python API 实现的,该 API 提供这些对象并支持协作。
如果您希望更多地探索 Featureform,我们建议您熟悉 它提供的抽象。我们也只探讨了特征和实体在 ClickHouse 中作为表(或视图)具体化的情况。Featureform 提供了许多其他功能,包括 按需特征 和 流式功能,在批处理不合适时。
在这篇博文中,我们提到了 Featureform 中的对象如何进行版本控制,但没有明确显示其方式。Featureform 使用一种称为变体的机制来对数据源、转换、特征、标签和训练集进行版本控制。默认情况下,这些资源中的每一个都是不可变的,确保您可以自信地利用其他人创建的版本化资源,而无需担心由于上游修改而导致的中断。
对于每个资源,都存在一个可以手动或自动配置的变体参数。认识到特征的后续变体不一定比以前的变体更好(不幸的是,机器学习不是线性改进的旅程),术语 "变体" 比 "版本" 更受欢迎。在我们的示例中,我们只是依赖于自动版本控制。如果我们更改了流程中的任何资源(例如,更改实体使用的特征),这将创建资源的新变体并触发所有依赖资源的重新创建。这种 血缘关系跟踪需要有向无环图 (DAG) 并使用类似于 dbt 等用户可能熟悉的工具的技术。
结论
在这篇文章中,我们探讨了如何使用 ClickHouse 作为特征存储来训练使用 Featureform 的机器学习模型。在此角色中,ClickHouse 充当转换引擎和离线存储。我们仅使用 SQL 对欺诈数据集进行了转换和缩放,并使用后续数据增量训练逻辑回归和基于决策树的模型,以预测交易是否欺诈。在此过程中,我们评论了如何使用 SQL 数据库和特征存储允许特征工程的协作方法,提高特征的可用性,进而减少模型迭代时间。在未来的文章中,我们将探讨如何将此模型投入生产并与 AWS Sagemaker 等工具集成以进行模型训练。