我们 ClickHouse 非常喜欢 开放数据挑战赛,所以当我们看到 MTA(大都会运输署)在他们的网站上宣布了这样一个挑战赛时,我们无法抗拒参与的诱惑。我们专注于闸机数据集,允许分析纽约市的地铁使用情况,并在我们的新 Playground 中提供此数据,用户可以免费查询数据。
MTA(大都会运输署)运营纽约市的公共交通系统,包括地铁、公交车和通勤铁路服务,每天为数百万乘客提供服务。MTA 开放数据挑战赛是一项为期一个月的竞赛,面向开发者和数据爱好者。MTA 鼓励参与者使用他们的数据集创建项目,以创造性的方式利用数据,无论是通过 Web 应用程序、可视化还是报告。提交的作品必须至少使用来自 data.ny.gov 的一个数据集,并将根据创造力、实用性、执行力和透明度进行评判。
虽然 MTA 挑战赛有 176 个数据集可供使用,但其中大多数数据集都很小,只有几百行。它们仍然是优秀的资源,但它们的数据量实际上并不适合 ClickHouse 的最佳应用场景。
ClickHouse 是一个为规模而设计的 OLAP 数据库,因此,我们希望找到最大的数据集进行探索!碰巧这是闸机数据集,它包含多年来的 1 亿行数据。该数据集包含纽约市闸机的入口/出口值信息,从而可以分析城市中人员的流动情况。乍一看,这个数据集似乎很简单,但正如我们发现的那样,清理并以可用形式提供它比最初预期的要付出更多的努力。
数据集本身涵盖了 2014 年到 2022 年。一个(更干净的)闸机数据版本可用于最近几年。我们还加载了这个数据集并提供了示例查询。然而,为了提供所有可用数据,我们在本博客中重点关注历史数据。
在这篇文章中,我们将探讨加载和清理此数据的步骤,使其可用于进一步分析,以便其他人可以在他们自己的 ClickHouse 实例中重现它。这突出了 ClickHouse 为数据工程提供的部分关键功能,许多步骤和查询可以为其他数据集重用。
对于那些只对最终数据集感兴趣的人,我们已将其放在我们新的 ClickHouse Playground 中,我们在其中为您准备了 220 多个查询和 35 个数据集,供您试用! 要贡献新的查询和数据集,请访问演示仓库。
本博客中的所有步骤都可以使用 clickhouse-local 重现,这是一个易于使用的 ClickHouse 版本,非常适合需要在本地和远程文件上使用 SQL 执行快速处理的开发者,而无需安装完整的数据库服务器。
初始数据探索和加载
为了简化加载,我们已将闸机数据(以 TSV 文件形式分发)放在公共存储桶上。我们可以使用简单的 S3 查询来探索可用的列。这依赖于 ClickHouse 模式推断来推断列类型
DESCRIBE TABLE s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/mta/*.tsv')
SETTINGS describe_compact_output = 1
┌─name───────────────────────────────────────────────────────┬─type────────────────────┐
│ C/A │ Nullable(String) │
│ Unit │ Nullable(String) │
│ SCP │ Nullable(String) │
│ Station │ Nullable(String) │
│ Line Name │ Nullable(String) │
│ Division │ Nullable(String) │
│ Date │ Nullable(DateTime64(9)) │
│ Time │ Nullable(String) │
│ Description │ Nullable(String) │
│ Entries │ Nullable(Int64) │
│ Exits │ Nullable(Int64) │
└────────────────────────────────────────────────────────────┴─────────────────────────┘
11 rows in set. Elapsed: 0.309 sec.
如果我们对数据进行抽样并查看数据集描述,我们可以看到每一行代表在特定时间报告的闸机的入口和出口计数。描述强调这些计数是定期报告的,因此这些统计数据代表了之前的期间。下面,我们直接在 S3 中(就地)查询数据
SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/mta/*.tsv')
LIMIT 1
FORMAT Vertical
Row 1:
──────
C/A: A002
Unit: R051
SCP: 02-00-00
Station: LEXINGTON AVE
Line Name: NQR456
Division: BMT
Date: 2014-12-31 00:00:00.000000000
Time: 23:00:00
Description: REGULAR
Entries: 4943320
Exits : 1674736
1 rows in set. Elapsed: 1.113 sec.
为了更轻松地处理并防止重复下载数据,我们可以将此数据加载到本地表中。要从推断的模式创建此表并加载数据,我们可以使用以下命令。
CREATE TABLE subway_transits_2014_2022_raw
ENGINE = MergeTree
ORDER BY tuple() EMPTY
AS SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/mta/*.tsv')
这将使用该模式创建一个空表。我们将此表用作仅用于数据探索的暂存表,目前省略排序键:加载此数据变得像简单的 INSERT INTO SELECT
一样简单
INSERT INTO subway_transits_2014_2022_raw
SETTINGS max_insert_threads = 16, parallel_distributed_insert_select = 2
SELECT *
FROM s3Cluster('default', 'https://datasets-documentation.s3.eu-west-3.amazonaws.com/mta/*.tsv')
SETTINGS max_insert_threads = 16, parallel_distributed_insert_select = 2
0 rows in set. Elapsed: 39.236 sec. Processed 94.88 million rows, 13.82 GB (2.42 million rows/s., 352.14 MB/s.)
Peak memory usage: 1.54 GiB.
SELECT count()
FROM subway_transits_2014_2022_raw
┌──count()─┐
│ 94875892 │ -- 94.88 million
└──────────┘
1 row in set. Elapsed: 0.002 sec.
我们应用了一些简单的优化来加速此加载,例如使用 s3Cluster 函数。您可以在优化 S3 插入和读取性能指南* 中阅读有关这些的更多信息。上面的计时(以及后续的计时)来自我们的 sql.clickhouse.com 环境,该环境由 3 个节点组成,每个节点有 30 个 vCPU。您的性能会有所不同,但考虑到数据集的大小,性能将很大程度上取决于网络连接。
模式改进
检查表模式会发现很多优化机会。
SHOW CREATE TABLE subway_transits_2014_2022_raw
CREATE TABLE subway_transits_2014_2022_raw
(
`C/A` Nullable(String),
`Unit` Nullable(String),
`SCP` Nullable(String),
`Station` Nullable(String),
`Line Name` Nullable(String),
`Division` Nullable(String),
`Date` Nullable(DateTime64(9)),
`Time` Nullable(String),
`Description` Nullable(String),
`Entries` Nullable(Int64),
`Exits ` Nullable(Int64)
)
ENGINE = SharedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}')
ORDER BY tuple()
除了列名不太理想(首选小写且没有特殊字符)之外,Nullable 类型不是必需的。这消耗额外的空间来区分 Null 值和空值,应避免使用。此外,我们的 Date
和 Time
应合并为一个 date_time
列 - ClickHouse 具有丰富的日期时间函数集,使 DateTime 类型能够根据时间、日期或两者进行查询。
快速查看数据列描述 揭示了一些额外的优化机会。入口和出口不能超过 Int32,超过后会回绕(单独的问题),并且只能为正数。大多数 String 列也是低基数的,我们可以通过快速查询来确认这一点
SELECT
uniq(`C/A`),
uniq(Unit),
uniq(SCP),
uniq(Station),
uniq(`Line Name`),
uniq(Division),
uniq(Description)
FROM subway_transits_2014_2022_raw
FORMAT Vertical
Query id: c925aaa4-6302-41e4-9f1e-1ba88587c3bc
Row 1:
──────
uniq(C/A): 762
uniq(Unit): 476
uniq(SCP): 334
uniq(Station): 579
uniq(Line Name): 130
uniq(Division): 7
uniq(Description): 2
1 row in set. Elapsed: 0.959 sec. Processed 94.88 million rows, 10.27 GB (98.91 million rows/s., 10.71 GB/s.)
Peak memory usage: 461.18 MiB.
因此,将这些设为 LowCardinality(String)
类型是合理的,这将带来更好的压缩和更快的查询速度!
任何来自纽约市的人都熟悉线路命名系统。列 Line Name
表示闸机可用的线路,即
“停靠在该站点的火车线路,例如 456”
456 因此代表 4、5 和 6 号线。浏览数据后发现这些线路的顺序不一致。例如,
456NQR
与 NQR456
相同
SELECT `Line Name`
FROM subway_transits_2014_2022_raw
WHERE (`Line Name` = 'NQR456') OR (`Line Name` = '456NQR')
LIMIT 1 BY `Line Name`
┌─Line Name─┐
│ NQR456 │
│ 456NQR │
└───────────┘
2 rows in set. Elapsed: 0.059 sec. Processed 94.88 million rows, 1.20 GB (1.60 billion rows/s., 20.20 GB/s.)
Peak memory usage: 105.88 MiB.
为了简化未来的查询,我们将此字符串标记化为 Array(LowCardinality(String))
并对值进行排序。
最后,station
和 date_time
似乎是排序键的合理首选。
因此,我们的表模式变为
CREATE TABLE subway_transits_2014_2022_v1
(
`ca` LowCardinality(String),
`unit` LowCardinality(String),
`scp` LowCardinality(String),
`station` LowCardinality(String),
`line_names` Array(LowCardinality(String)),
`division` LowCardinality(String),
`date_time` DateTime32,
`description` LowCardinality(String),
`entries` UInt32,
`exits` UInt32
)
ENGINE = MergeTree
ORDER BY (station, date_time)
我们可以通过从我们之前的 subway_transits_2014_2022_raw
表中读取数据来加载此数据,使用 SELECT
来转换行。
INSERT INTO subway_transits_2014_2022_v1 SELECT
`C/A` AS ca,
Unit AS unit,
SCP AS scp,
Station AS station,
arraySort(ngrams(assumeNotNull(`Line Name`), 1)) AS line_names,
Division AS division,
parseDateTimeBestEffort(trimBoth(concat(CAST(Date, 'Date32'), ' ', Time))) AS date_time,
Description AS description,
Entries AS entries,
`Exits ` AS exits
FROM subway_transits_2014_2022_raw
SETTINGS max_insert_threads = 16
0 rows in set. Elapsed: 4.235 sec. Processed 94.88 million rows, 14.54 GB (22.40 million rows/s., 3.43 GB/s.)
清理 MTA 交通数据集
现在让我们回顾一下我们为清理数据而采取的步骤。我们发现了一些主要问题,我们将依次介绍。
挑战 1:累积值和异常值
MTA 提供了更长版本的数据描述,其中提供了对一些数据质量问题和挑战的见解。尤其重要的是,entries
和 exit
值是累积的。
> 提供有关每个闸机入口和出口累积寄存器值的每四小时数据,类似于里程表读数。由于需要错开以防止系统一次性被审计读数淹没,因此四小时间隔将因站点而异。在全系统范围内,站点已设置为在 00 到 03 小时之间开始审计传输,然后在当天第一次审计后每四小时进行一次审计。期间内进入或离开闸机的人数可以通过将其与较早的读数进行比较来获得。
这些累积值难以使用,并且需要查询来计算每个闸机的时间排序导数。我们注意到,4 小时周期性数据交付将使将站点的使用归因于特定时期仍然非常不精确。这是我们无法解决的问题,因此低于此周期粒度的计数不太可能是准确的。
这些值也存在一些明显的数据质量问题
> 闸机审计通常不是每四小时都可用,闸机有时会向下计数而不是向上计数,出口和入口计数器会定期重置,并且闸机的审计时间戳因闸机而异。此外,数据长度为 10 位数字,溢出时将回滚为零。
理想情况下,我们希望根据闸机前一个时间值与当前时间值之间的差异来计算每行的入口和出口数量。这要求我们能够可靠地识别闸机。
虽然闸机具有 scp
标识符,但这在各个站点之间不是唯一的。相反,我们可以使用 scp、ca(站点上的售票亭标识符)和 unit(站点的远程单元 ID)的组合来识别特定站点。
要计算每个闸机的入口和出口数量,需要一个窗口函数。以下命令计算每行的 entries_change
和 exits_change
列。
WITH 1000 AS threshold_per_hour
SELECT
*,
any(date_time) OVER (PARTITION BY ca, unit, scp ORDER BY date_time ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS p_date_time,
any(entries) OVER (PARTITION BY ca, unit, scp ORDER BY date_time ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS p_entries,
any(exits) OVER (PARTITION BY ca, unit, scp ORDER BY date_time ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS p_exits,
dateDiff('hour', p_date_time, date_time) AS hours,
if((entries < p_entries) OR (((entries - p_entries) / if(hours > 0, hours, 1)) > threshold_per_hour), 0, entries - p_entries) AS entries_change,
if((exits < p_exits) OR (((exits - p_exits) / if(hours > 0, hours, 1)) > threshold_per_hour), 0, exits - p_exits) AS exits_change
FROM subway_transits_2014_2022_v1
ORDER BY
ca ASC,
unit ASC,
scp ASC,
date_time ASC
查询的一些要点
- 我们按
ca
、unit
、scp
和date_time
(升序)排序。这确保了每个闸机的行按时间顺序递增地一起处理,从而使我们能够计算增量。 - 该函数使用
PARTITION BY ca, unit, scp
为每个闸机创建一个窗口。在每个窗口中,数据再次按时间递增排序。ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
子句用于添加列p_entries
和p_exits
。这些列包含每行的先前入口和出口值。前一行的时间在p_date_time
中捕获。 - 列
entries_change
和exits_change
分别包含入口和出口的先前值和当前值之间的增量。重要的是,如果更改为负数,我们将返回 0 值,假设这表示回滚。此外,数据分析显示显著的异常值,其中更改值高得不切实际,例如,一小时内有 10,000 人使用了闸机。如果更改值超过每小时 N 个(1000 个)的阈值,我们还会返回 0 以滤除这些值。选择阈值是基于每小时可以通过闸机的实际人数(每分钟 10-15 人)。这种方法是不完善的,更复杂的方法可能会考虑历史趋势。
挑战 2:缺少/不一致的站点名称
虽然每个年份的数据集都使用相同的模式,但 2022 年的数据集缺少站点名称。
SELECT toYear(date_time) AS year
FROM mta.subway_transits_2014_2022_v1
WHERE station = ''
GROUP BY year
┌─year─┐
1. │ 2022 │
└──────┘
1 row in set. Elapsed: 0.016 sec. Processed 10.98 million rows, 54.90 MB (678.86 million rows/s., 3.39 GB/s.)
Peak memory usage: 98.99 MiB.
为了使此数据集更易于使用,我们理想情况下会根据唯一的闸机 ID 到站点名称的映射(从早期数据填充)来填充 2022 年的站点名称。
但是,如果我们分析站点名称,我们可以看到即使对于同一闸机,站点名称也很少一致!例如,不一致地使用 AV
和 AVE
表示 “avenue” 似乎会导致同一站点出现多个条目。
SELECT DISTINCT station
FROM subway_transits_2014_2022_v1
WHERE station LIKE '%AV%'
ORDER BY station ASC
LIMIT 10
FORMAT PrettyCompactMonoBlock
┌─station──────┐
│ 1 AV │
│ 1 AVE │
│ 138 ST-3 AVE │
│ 14 ST-6 AVE │
│ 149 ST-3 AVE │
│ 18 AV │
│ 18 AVE │
│ 2 AV │
│ 2 AVE │
│ 20 AV │
└──────────────┘
10 rows in set. Elapsed: 0.024 sec. Processed 36.20 million rows, 41.62 MB (1.53 billion rows/s., 1.75 GB/s.)
Peak memory usage: 26.68 MiB.
如果我们能够建立闸机到站点名称的映射,我们可以通过始终如一地选择其中一个名称(例如,始终是最长的名称)并重新映射所有数据来解决此类简单问题。请注意,这不会解决更复杂的映射,例如将名称 '42 ST-TIMES SQ' 和 'TIMES SQ-42 ST' 映射到 “TIMES SQ”。我们现在可以将这些推迟到查询时处理。
为了保存我们的映射,我们可以使用字典。这种内存结构将允许通过 (ca, unit, scp)
元组查找站点名称。我们使用下面显示的查询填充此字典,为每个闸机选择最长的站点名称。后者是通过使用 groupArrayDistinct
函数生成分配给每个 (ca, unit, scp)
的不同站点名称列表来实现的。然后按长度对该列表进行排序,并选择第一个(最长的)条目。
CREATE DICTIONARY station_names
(
`ca` String,
`unit` String,
`scp` String,
`station_name` String
)
PRIMARY KEY (ca, unit, scp)
SOURCE(CLICKHOUSE(QUERY $query$
SELECT
ca,
unit,
scp,
arrayReverseSort(station -> length(station), groupArrayDistinct(station))[1] AS station_name
FROM subway_transits_2014_2022_v1
WHERE station != ''
GROUP BY
ca,
unit,
scp
$query$))
LIFETIME(MIN 0 MAX 0)
LAYOUT(complex_key_hashed())
有关字典的更多详细信息,包括可用类型以及如何配置它们,请参阅字典文档。
我们可以使用 dictGet` 函数有效地检索特定的站点名称。例如
SELECT dictGet(station_names, 'station_name', ('R148', 'R033', '01-04-01'))
┌─name───────────┐
│ 42 ST-TIMES SQ │
└────────────────┘
1 row in set. Elapsed: 0.001 sec.
请注意,第一次调用字典时,请求可能会很慢,具体取决于数据是在创建时急切加载还是在第一次请求时延迟加载。这可以使用 dictionaries_lazy_load 进行配置。
结合解决方案以获得最终数据
我们现在可以结合我们的窗口函数和字典查找来生成数据的最终版本。这里的想法很简单:使用窗口函数和 dictGet
对我们表的 v1 执行查询,并将结果插入到新表中。我们的最终表模式
CREATE TABLE mta.subway_transits_2014_2022_v2
(
`ca` LowCardinality(String),
`unit` LowCardinality(String),
`scp` LowCardinality(String),
`line_names` Array(LowCardinality(String)),
`division` LowCardinality(String),
`date_time` DateTime,
`description` LowCardinality(String),
`entries` UInt32,
`exits` UInt32,
`station` LowCardinality(String),
`entries_change` UInt32,
`exits_change` UInt32
)
ENGINE = MergeTree
ORDER BY (ca, unit, scp, date_time)
使用 INSERT INTO SELECT
INSERT INTO mta.subway_transits_2014_2022_v2 WITH 2000 AS threshold_per_hour SELECT
ca, unit, scp, line_names, division, date_time, description, entries, exits,
dictGet(station_names, 'station_name', (ca, unit, scp)) as station,
entries_change, exits_change
FROM
(
SELECT
*,
any(date_time) OVER (PARTITION BY ca, unit, scp ORDER BY date_time ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS p_date_time,
any(entries) OVER (PARTITION BY ca, unit, scp ORDER BY date_time ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS p_entries,
any(exits) OVER (PARTITION BY ca, unit, scp ORDER BY date_time ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS p_exits,
dateDiff('hour', p_date_time, date_time) AS hours,
if((entries < p_entries) OR (((entries - p_entries) / if(hours > 0, hours, 1)) > threshold_per_hour), 0, entries - p_entries) AS entries_change,
if((exits < p_exits) OR (((exits - p_exits) / if(hours > 0, hours, 1)) > threshold_per_hour), 0, exits - p_exits) AS exits_change
FROM subway_transits_2014_2022_v1
ORDER BY
ca ASC,
unit ASC,
scp ASC,
date_time ASC
) SETTINGS max_insert_threads=16
0 rows in set. Elapsed: 24.305 sec. Processed 94.88 million rows, 2.76 GB (3.90 million rows/s., 113.67 MB/s.)
我们的最终表
SELECT *
FROM mta.subway_transits_2014_2022_v2
LIMIT 1
FORMAT Vertical
Row 1:
──────
ca: A002
unit: R051
scp: 02-00-00
line_names: ['4','5','6','N','Q','R']
division: BMT
date_time: 2014-01-02 03:00:00
description: REGULAR
entries: 4469306
exits: 1523801
station: LEXINGTON AVE
entries_change: 0
exits_change: 0
1 rows in set. Elapsed: 0.005 sec.
MTA 交通数据集的示例查询
您可以在 ClickHouse Playground 中运行以下查询。我们为每个查询提供了一些默认图表,以帮助您入门。
如果您想为 MTA 数据集或其他数据集提出进一步的查询或改进建议,请随时联系我们并在演示仓库上提出问题。
让我们首先确认最受欢迎的站点与官方数据一致。例如,我们将使用 2018 年的数据
SELECT station, sum(entries_change) AS total_entries, formatReadableQuantity(total_entries) AS total_entries_read FROM mta.subway_transits_2014_2022_v2 WHERE toYear(date_time) = '2018' GROUP BY station ORDER BY sum(entries_change) DESC LIMIT 10✎
我们的结果质量受到数据(数据非常嘈杂)和我们用于删除异常值的方法的影响。但是,这些结果似乎与 MTA 报告的高级数据一致。另请注意,某些站点条目(例如时代广场)在我们的数据中具有单独的入口点,即 '42 ST-TIMES SQ' 和 'TIMES SQ-42 ST' to 'TIMES SQ'。我们将此清理练习留作待办事项,目前在查询时使用条件语句解决。
如果我们检查完整期间前 10 个站点的交通流量,COVID 造成的下降是显而易见的
SELECT station, toYear(date_time) AS year, sum(entries_change) AS total_entries FROM mta.subway_transits_2014_2022_v2 WHERE station IN ( SELECT station FROM mta.subway_transits_2014_2022_v2 GROUP BY station ORDER BY sum(entries_change) DESC LIMIT 10 ) GROUP BY year, station ORDER BY year ASC✎
尽管我们做出了努力,但这些数据仍然非常嘈杂。存在明显的异常情况,需要进一步努力才能消除 - 我们欢迎有关方法的建议。相反,来自 2022 年的交通数据似乎更加可靠且质量更高。我们还将其加载到 transit_data
表中,并提供了一些示例查询。
使用此数据,我们可以观察日常通勤模式,以显示哪些站点在哪个高峰时段繁忙
SELECT station_complex, toHour(hour_of_day) AS hour, CAST(avg(total_entries), 'UInt64') AS avg_entries FROM ( SELECT toStartOfHour(transit_timestamp) AS hour_of_day, station_complex, sum(ridership) AS total_entries FROM mta.transit_data WHERE toDayOfWeek(transit_timestamp) <= 5 GROUP BY station_complex, hour_of_day ) GROUP BY hour, station_complex ORDER BY hour ASC, avg_entries DESC LIMIT 3 BY hour✎
我们还可以轻松比较周末和工作日的交通流量。这突出显示了一年中某些明显的时段,例如 7 月 4 日,此时通勤交通量显着降低。
SELECT toStartOfWeek(transit_timestamp) AS week, 'weekday' AS period, sum(ridership) AS total FROM mta.transit_data WHERE toDayOfWeek(transit_timestamp) <= 5 GROUP BY week ORDER BY week ASC UNION ALL SELECT toStartOfWeek(transit_timestamp) AS week, 'weekend' AS period, sum(ridership) AS total FROM mta.transit_data WHERE toDayOfWeek(transit_timestamp) > 5 GROUP BY week ORDER BY week ASC✎
结论
我们很高兴参与 MTA 挑战赛(至少和清理数据一样有趣!),并希望我们的工作能够让每个人更轻松地对数据进行一些有趣的分析。
如果您在 演示仓库中提出了任何查询(和图表),我们非常欢迎您分享。