跳到主要内容
跳到主要内容

EMQX 与 ClickHouse 集成

连接 EMQX

EMQX 是一个开源 MQTT Broker,具有高性能实时消息处理引擎,为大规模物联网设备的事件流提供动力。 作为最具扩展性的 MQTT Broker,EMQX 可以帮助您连接任何规模的任何设备。 将您的 IoT 数据移动并处理到任何地方。

EMQX Cloud 是由 EMQ 托管的物联网领域的 MQTT 消息中间件产品。 作为世界上首个完全托管的 MQTT 5.0 云消息服务,EMQX Cloud 为 MQTT 消息服务提供一站式 O&M 共址和一个独特的隔离环境。 在万物互联时代,EMQX Cloud 可以帮助您为物联网领域快速构建行业应用,轻松收集、传输、计算和持久化物联网数据。

借助云服务提供商提供的基础设施,EMQX Cloud 为全球数十个国家和地区提供服务,为 5G 和万物互联应用提供低成本、安全可靠的云服务。

EMQX Cloud Architecture

假设

  • 您熟悉 MQTT 协议,该协议被设计为一种极其轻量级的发布/订阅消息传输协议。
  • 您正在使用 EMQX 或 EMQX Cloud 进行实时消息处理引擎,为大规模物联网设备的事件流提供动力。
  • 您已准备好 Clickhouse Cloud 实例以持久化设备数据。
  • 我们正在使用 MQTT X 作为 MQTT 客户端测试工具来连接 EMQX Cloud 的部署以发布 MQTT 数据。 或其他连接到 MQTT Broker 的方法也可以完成这项工作。

获取您的 ClickHouse Cloud 服务

在此设置过程中,我们在 AWS 北弗吉尼亚州(us-east-1)部署了 ClickHouse 实例,而 EMQX Cloud 实例也部署在同一区域。

clickhouse_cloud_1

在设置过程中,您还需要注意连接设置。在本教程中,我们选择“Anywhere”,但如果您申请特定位置,您将需要将从 EMQX Cloud 部署获得的 NAT 网关 IP 地址添加到白名单。

clickhouse_cloud_2

然后您需要保存您的用户名和密码以供将来使用。

clickhouse_cloud_3

之后,您将获得一个正在运行的 Clickhouse 实例。 单击“连接”以获取 Clickhouse Cloud 的实例连接地址。

clickhouse_cloud_4

单击“连接到 SQL 控制台”以创建用于与 EMQX Cloud 集成的数据库和表。

clickhouse_cloud_5

您可以参考以下 SQL 语句,或根据实际情况修改 SQL。

CREATE TABLE emqx.temp_hum
(
client_id String,
timestamp DateTime,
topic String,
temp Float32,
hum Float32
)
ENGINE = MergeTree()
PRIMARY KEY (client_id, timestamp)

clickhouse_cloud_6

在 EMQX Cloud 上创建 MQTT 服务

在 EMQX Cloud 上创建专用 MQTT Broker 就像点击几下一样简单。

获取帐户

EMQX Cloud 为每个帐户的标准部署和专业部署提供 14 天免费试用。

如果您是 EMQX Cloud 的新手,请从 EMQX Cloud 注册页面开始,然后单击“免费开始”注册帐户。

EMQX Cloud Signup

创建 MQTT 集群

登录后,单击帐户菜单下的“Cloud Console”,您将能够看到绿色的按钮来创建新的部署。

EMQX Cloud Create 1

在本教程中,我们将使用专业部署,因为只有 Pro 版本提供数据集成功能,该功能可以将 MQTT 数据直接发送到 ClickHouse,而无需编写单行代码。

选择 Pro 版本并选择 N.Virginial 区域,然后单击 立即创建。只需几分钟,您将获得一个完全托管的 MQTT Broker

EMQX Cloud Create 2

现在单击面板转到集群视图。在此仪表板上,您将看到 MQTT Broker 的概览。

EMQX Cloud Overview

添加客户端凭据

EMQX Cloud 默认不允许匿名连接,因此您需要添加客户端凭据,以便您可以使用 MQTT 客户端工具将数据发送到此 Broker。

单击左侧菜单上的“身份验证和 ACL”,然后在子菜单中单击“身份验证”。 单击右侧的“添加”按钮,并为以后的 MQTT 连接提供用户名和密码。 在这里,我们将使用 emqxxxxxxx 作为用户名和密码。

EMQX Cloud Auth

单击“确认”,现在我们拥有一个完全托管的 MQTT Broker,准备就绪。

启用 NAT 网关

在开始设置 ClickHouse 集成之前,我们需要先启用 NAT 网关。 默认情况下,MQTT Broker 部署在私有 VPC 中,该 VPC 无法通过公共网络将数据发送到第三方系统。

返回“概览”页面并向下滚动到页面底部,您将在其中看到 NAT 网关小部件。 单击“订阅”按钮并按照说明进行操作。 请注意,NAT 网关是一项增值服务,但也提供 14 天免费试用。

EMQX Cloud Nat Gateway

创建完成后,您将在小部件中找到公共 IP 地址。 请注意,如果您在 ClickHouse Cloud 设置期间选择“从特定位置连接”,则需要将此 IP 地址添加到白名单。

将 EMQX Cloud 与 ClickHouse Cloud 集成

EMQX Cloud 数据集成 用于配置处理和响应 EMQX 消息流和设备事件的规则。 数据集成不仅提供了清晰灵活的“可配置”架构解决方案,而且简化了开发过程,提高了用户可用性,降低了业务系统与 EMQX Cloud 之间的耦合度。 它还为 EMQX Cloud 专有功能的定制提供了卓越的基础设施。

EMQX Cloud Data Integration

EMQX Cloud 提供与流行数据系统的 30 多个原生集成。 ClickHouse 是其中之一。

data_integration_clickhouse

创建 ClickHouse 资源

单击左侧菜单上的“数据集成”,然后单击“查看所有资源”。 您将在“数据持久化”部分找到 ClickHouse,或者您可以搜索 ClickHouse。

单击 ClickHouse 卡片以创建新资源。

  • 注意:为此资源添加注释。
  • 服务器地址:这是您的 ClickHouse Cloud 服务的地址,请记住不要忘记端口。
  • 数据库名称:我们在上述步骤中创建的 emqx
  • 用户:用于连接到您的 ClickHouse Cloud 服务的用户名。
  • 密钥:连接密码。

data_integration_resource

创建新规则

在创建资源期间,您将看到一个弹出窗口,单击“新建”将引导您进入规则创建页面。

EMQX 提供了一个强大的规则引擎,可以在将原始 MQTT 消息发送到第三方系统之前对其进行转换和丰富。

这是本教程中使用的规则

SELECT
clientid as client_id,
(timestamp div 1000) as timestamp,
topic as topic,
payload.temp as temp,
payload.hum as hum
FROM
"temp_hum/emqx"

它将从 temp_hum/emqx 主题读取消息,并通过添加 client_id、topic 和时间戳信息来丰富 JSON 对象。

因此,您发送到主题的原始 JSON

{"temp": 28.5, "hum": 0.68}

data_integration_rule_1

您可以使用 SQL 测试来测试并查看结果。

data_integration_rule_2

现在单击“下一步”按钮。 此步骤是告诉 EMQX Cloud 如何将精炼的数据插入到您的 ClickHouse 数据库中。

添加响应动作

如果您只有一个资源,则无需修改“资源”和“动作类型”。 您只需要设置 SQL 模板。 这是本教程中使用的示例

INSERT INTO temp_hum (client_id, timestamp, topic, temp, hum) VALUES ('${client_id}', ${timestamp}, '${topic}', ${temp}, ${hum})

data_integration_rule_action

这是将数据插入 Clickhouse 的模板,您可以看到此处使用了变量。

查看规则详情

单击“确认”和“查看详情”。 现在,一切都应该设置好了。 您可以从规则详情页面看到数据集成正在工作。

data_integration_details

发送到 temp_hum/emqx 主题的所有 MQTT 消息都将持久化到您的 ClickHouse Cloud 数据库中。

将数据保存到 ClickHouse 中

我们将模拟温度和湿度数据,并通过 MQTT X 将这些数据报告给 EMQX Cloud,然后使用 EMQX Cloud 数据集成将数据保存到 ClickHouse Cloud 中。

work-flow

将 MQTT 消息发布到 EMQX Cloud

您可以使用任何 MQTT 客户端或 SDK 来发布消息。 在本教程中,我们将使用 MQTT X,这是 EMQ 提供的用户友好的 MQTT 客户端应用程序。

MQTTX Overview

单击 MQTTX 上的“新建连接”并填写连接表单

  • 名称:连接名称。 使用您想要的任何名称。
  • 主机:MQTT Broker 连接地址。 您可以从 EMQX Cloud 概览页面获取它。
  • 端口:MQTT Broker 连接端口。 您可以从 EMQX Cloud 概览页面获取它。
  • 用户名/密码:使用上面创建的凭据,在本教程中应为 emqxxxxxxx

MQTTX New

单击右上角的“连接”按钮,应建立连接。

现在您可以使用此工具将消息发送到 MQTT Broker。 输入:1. 将有效负载格式设置为“JSON”。 2. 设置主题:temp_hum/emqx(我们刚刚在规则中设置的主题)3. JSON 正文

{"temp": 23.1, "hum": 0.68}

单击右侧的发送按钮。 您可以更改温度值并向 MQTT Broker 发送更多数据。

发送到 EMQX Cloud 的数据应由规则引擎处理并自动插入到 ClickHouse Cloud 中。

MQTTX Publish

查看规则监控

检查规则监控并在成功次数中加一。

rule_monitor

检查持久化数据

现在是时候查看 ClickHouse Cloud 上的数据了。 理想情况下,您使用 MQTTX 发送的数据将进入 EMQX Cloud,并在原生数据集成的帮助下持久化到 ClickHouse Cloud 的数据库中。

您可以连接到 ClickHouse Cloud 面板上的 SQL 控制台,或使用任何客户端工具从您的 ClickHouse 中获取数据。 在本教程中,我们使用了 SQL 控制台。 通过执行 SQL

SELECT * FROM emqx.temp_hum;

clickhouse_result

总结

您没有编写任何代码,现在已将 MQTT 数据从 EMQX Cloud 转移到 ClickHouse Cloud。 借助 EMQX Cloud 和 ClickHouse Cloud,您无需管理基础设施,只需专注于编写您的 IoT 应用程序,并将数据安全地存储在 ClickHouse Cloud 中。