跳到主要内容
跳到主要内容

ClickHouse Go

一个简单的例子

让我们用一个简单的例子开始。这将连接到 ClickHouse 并从系统数据库中选择数据。要开始,您需要您的连接详情。

连接详情

要使用原生 TCP 连接到 ClickHouse,您需要以下信息

  • 主机和端口:通常,使用 TLS 时端口为 9440,不使用 TLS 时端口为 9000。

  • 数据库名称:开箱即用,有一个名为 default 的数据库,请使用您要连接的数据库的名称。

  • 用户名和密码:开箱即用,用户名是 default。请使用适合您用例的用户名。

您的 ClickHouse Cloud 服务的详细信息可在 ClickHouse Cloud 控制台中找到。选择您要连接的服务,然后单击连接

ClickHouse Cloud service connect button

选择原生,详细信息将在 clickhouse-client 命令示例中提供。

ClickHouse Cloud Native TCP connection details

如果您使用的是自托管 ClickHouse,则连接详细信息由您的 ClickHouse 管理员设置。

初始化模块

mkdir clickhouse-golang-example
cd clickhouse-golang-example
go mod init clickhouse-golang-example

复制一些示例代码

将此代码复制到 clickhouse-golang-example 目录中,命名为 main.go

package main

import (
"context"
"crypto/tls"
"fmt"
"log"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)

func main() {
conn, err := connect()
if err != nil {
panic((err))
}

ctx := context.Background()
rows, err := conn.Query(ctx, "SELECT name,toString(uuid) as uuid_str FROM system.tables LIMIT 5")
if err != nil {
log.Fatal(err)
}

for rows.Next() {
var (
name, uuid string
)
if err := rows.Scan(
&name,
&uuid,
); err != nil {
log.Fatal(err)
}
log.Printf("name: %s, uuid: %s",
name, uuid)
}

}

func connect() (driver.Conn, error) {
var (
ctx = context.Background()
conn, err = clickhouse.Open(&clickhouse.Options{
Addr: []string{"<CLICKHOUSE_SECURE_NATIVE_HOSTNAME>:9440"},
Auth: clickhouse.Auth{
Database: "default",
Username: "default",
Password: "<DEFAULT_USER_PASSWORD>",
},
ClientInfo: clickhouse.ClientInfo{
Products: []struct {
Name string
Version string
}{
{Name: "an-example-go-client", Version: "0.1"},
},
},

Debugf: func(format string, v ...interface{}) {
fmt.Printf(format, v)
},
TLS: &tls.Config{
InsecureSkipVerify: true,
},
})
)

if err != nil {
return nil, err
}

if err := conn.Ping(ctx); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
fmt.Printf("Exception [%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
}
return nil, err
}
return conn, nil
}

运行 go mod tidy

go mod tidy

设置您的连接详情

之前您查找了您的连接详情。在 main.goconnect() 函数中设置它们

func connect() (driver.Conn, error) {
var (
ctx = context.Background()
conn, err = clickhouse.Open(&clickhouse.Options{
Addr: []string{"<CLICKHOUSE_SECURE_NATIVE_HOSTNAME>:9440"},
Auth: clickhouse.Auth{
Database: "default",
Username: "default",
Password: "<DEFAULT_USER_PASSWORD>",
},

运行示例

go run .
2023/03/06 14:18:33 name: COLUMNS, uuid: 00000000-0000-0000-0000-000000000000
2023/03/06 14:18:33 name: SCHEMATA, uuid: 00000000-0000-0000-0000-000000000000
2023/03/06 14:18:33 name: TABLES, uuid: 00000000-0000-0000-0000-000000000000
2023/03/06 14:18:33 name: VIEWS, uuid: 00000000-0000-0000-0000-000000000000
2023/03/06 14:18:33 name: hourly_data, uuid: a4e36bd4-1e82-45b3-be77-74a0fe65c52b

了解更多

本类别中的其余文档涵盖了 ClickHouse Go 客户端的详细信息。

ClickHouse Go 客户端

ClickHouse 支持两个官方 Go 客户端。这些客户端是互补的,并且有意支持不同的用例。

  • clickhouse-go - 高级语言客户端,支持 Go 标准 database/sql 接口或原生接口。
  • ch-go - 低级客户端。仅限原生接口。

clickhouse-go 提供了一个高级接口,允许用户使用面向行的语义和批处理来查询和插入数据,这些语义和批处理在数据类型方面是宽松的 - 只要不潜在地造成精度损失,值将被转换。与此同时,ch-go 提供了一个优化的面向列的接口,该接口提供快速数据块流式传输,具有低 CPU 和内存开销,但代价是类型严格性和更复杂的使用方式。

从 2.3 版本开始,Clickhouse-go 利用 ch-go 进行低级功能,例如编码、解码和压缩。请注意,clickhouse-go 也支持 Go database/sql 接口标准。这两个客户端都使用原生格式进行编码,以提供最佳性能,并且可以通过原生 ClickHouse 协议进行通信。clickhouse-go 还支持 HTTP 作为其传输机制,以应对用户需要代理或负载均衡流量的情况。

在选择客户端库时,用户应了解其各自的优点和缺点 - 请参阅选择客户端库。

原生格式原生协议HTTP 协议面向行的 API面向列的 API类型灵活性压缩查询占位符
clickhouse-go
ch-go

选择客户端

选择客户端库取决于您的使用模式和对最佳性能的需求。对于插入密集型用例,其中每秒需要数百万次插入,我们建议使用低级客户端 ch-go。此客户端避免了将数据从面向行的格式透视到列的相关开销,因为 ClickHouse 原生格式需要这样做。此外,它避免了任何反射或使用 interface{} (any) 类型来简化使用。

对于专注于聚合或较低吞吐量插入工作负载的查询工作负载,clickhouse-go 提供了熟悉的 database/sql 接口和更直接的行语义。用户还可以选择使用 HTTP 作为传输协议,并利用辅助函数将行编组为结构体和从结构体解组行。

clickhouse-go 客户端

clickhouse-go 客户端提供了两个 API 接口用于与 ClickHouse 通信

  • ClickHouse 客户端特定 API
  • database/sql 标准 - Golang 提供的围绕 SQL 数据库的通用接口。

虽然 database/sql 提供了与数据库无关的接口,允许开发人员抽象其数据存储,但它强制执行了一些影响性能的类型和查询语义。因此,在性能很重要的情况下,应使用客户端特定 API。但是,希望将 ClickHouse 集成到支持多个数据库的工具中的用户可能更喜欢使用标准接口。

这两个接口都使用原生格式和原生协议进行数据编码以进行通信。此外,标准接口支持通过 HTTP 进行通信。

原生格式原生协议HTTP 协议批量写入支持结构体编组压缩查询占位符
ClickHouse API
database/sql API

安装

驱动程序的 v1 版本已弃用,并且不会进行功能更新或支持新的 ClickHouse 类型。用户应迁移到 v2,后者提供卓越的性能。

要安装客户端的 2.x 版本,请将软件包添加到您的 go.mod 文件中

require github.com/ClickHouse/clickhouse-go/v2 main

或者,克隆存储库

git clone --branch v2 https://github.com/clickhouse/clickhouse-go.git $GOPATH/src/github

要安装另一个版本,请相应地修改路径或分支名称。

mkdir my-clickhouse-app && cd my-clickhouse-app

cat > go.mod <<-END
module my-clickhouse-app

go 1.18

require github.com/ClickHouse/clickhouse-go/v2 main
END

cat > main.go <<-END
package main

import (
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
)

func main() {
conn, _ := clickhouse.Open(&clickhouse.Options{Addr: []string{"127.0.0.1:9000"}})
v, _ := conn.ServerVersion()
fmt.Println(v.String())
}
END

go mod tidy
go run main.go

版本控制与兼容性

客户端独立于 ClickHouse 发布。2.x 代表当前正在开发的主要版本。2.x 的所有版本都应彼此兼容。

ClickHouse 兼容性

客户端支持

  • 此处记录的所有当前支持的 ClickHouse 版本。由于不再支持 ClickHouse 版本,因此也不再针对客户端版本进行积极测试。
  • 自客户端发布之日起 2 年内的所有 ClickHouse 版本。请注意,仅 LTS 版本经过积极测试。

Golang 兼容性

客户端版本Golang 版本
=> 2.0 <= 2.21.17, 1.18
>= 2.31.18

ClickHouse 客户端 API

ClickHouse 客户端 API 的所有代码示例都可以在此处找到。

连接

以下示例(返回服务器版本)演示了如何连接到 ClickHouse - 假设 ClickHouse 未受到保护,并且可以使用默认用户访问。

请注意,我们使用默认原生端口进行连接。

conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
fmt.Println(v)

完整示例

对于所有后续示例,除非明确显示,否则我们假设 ClickHouse conn 变量已创建并且可用。

连接设置

打开连接时,可以使用 Options 结构体来控制客户端行为。以下设置可用

  • Protocol - 原生或 HTTP。目前仅 database/sql API 支持 HTTP。
  • TLS - TLS 选项。非 nil 值启用 TLS。请参阅使用 TLS
  • Addr - 包含端口的地址切片。
  • Auth - 身份验证详细信息。请参阅身份验证
  • DialContext - 自定义拨号函数,用于确定如何建立连接。
  • Debug - true/false 以启用调试。
  • Debugf - 提供一个函数来消耗调试输出。需要将 debug 设置为 true。
  • Settings - ClickHouse 设置的映射。这些设置将应用于所有 ClickHouse 查询。使用 Context 允许为每个查询设置设置。
  • Compression - 启用块压缩。请参阅压缩
  • DialTimeout - 建立连接的最大时间。默认为 1 秒
  • MaxOpenConns - 任何时候使用的最大连接数。空闲池中可能有更多或更少的连接,但任何时候只能使用此数量。默认为 MaxIdleConns+5
  • MaxIdleConns - 池中要维护的连接数。如果可能,连接将被重用。默认为 5
  • ConnMaxLifetime - 保持连接可用的最大生命周期。默认为 1 小时。连接在此时间后将被销毁,并根据需要将新连接添加到池中。
  • ConnOpenStrategy - 确定应如何使用节点地址列表以及用于打开连接。请参阅连接到多个节点
  • BlockBufferSize - 一次解码到缓冲区中的最大块数。较大的值将增加并行化,但会增加内存消耗。块大小取决于查询,因此虽然您可以在连接上设置此值,但我们建议您根据查询返回的数据为每个查询覆盖此值。默认为 2
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
DialContext: func(ctx context.Context, addr string) (net.Conn, error) {
dialCount++
var d net.Dialer
return d.DialContext(ctx, "tcp", addr)
},
Debug: true,
Debugf: func(format string, v ...interface{}) {
fmt.Printf(format, v)
},
Settings: clickhouse.Settings{
"max_execution_time": 60,
},
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
DialTimeout: time.Duration(10) * time.Second,
MaxOpenConns: 5,
MaxIdleConns: 5,
ConnMaxLifetime: time.Duration(10) * time.Minute,
ConnOpenStrategy: clickhouse.ConnOpenInOrder,
BlockBufferSize: 10,
})
if err != nil {
return err
}

完整示例

连接池

客户端维护一个连接池,根据需要在查询之间重用这些连接。任何时候最多将使用 MaxOpenConns,最大池大小由 MaxIdleConns 控制。客户端将为每个查询执行从池中获取一个连接,并将其返回到池中以供重用。连接在批处理的生命周期内使用,并在 Send() 时释放。

除非用户设置 MaxOpenConns=1,否则不能保证池中的同一连接将用于后续查询。这很少需要,但对于用户使用临时表的情况可能是必需的。

另请注意,默认情况下 ConnMaxLifetime 为 1 小时。如果节点离开集群,这可能会导致 ClickHouse 的负载变得不平衡。当节点变得不可用时,可能会发生这种情况,连接将平衡到其他节点。即使有问题的节点返回到集群,这些连接也会持续存在,并且默认情况下不会刷新 1 小时。在重负载情况下,请考虑降低此值。

使用 TLS

在底层,所有客户端连接方法 (DSN/OpenDB/Open) 都将使用Go tls 包来建立安全连接。如果 Options 结构体包含非 nil tls.Config 指针,则客户端知道使用 TLS。

env, err := GetNativeTestEnvironment()
if err != nil {
return err
}
cwd, err := os.Getwd()
if err != nil {
return err
}
t := &tls.Config{}
caCert, err := ioutil.ReadFile(path.Join(cwd, "../../tests/resources/CAroot.crt"))
if err != nil {
return err
}
caCertPool := x509.NewCertPool()
successful := caCertPool.AppendCertsFromPEM(caCert)
if !successful {
return err
}
t.RootCAs = caCertPool
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.SslPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
TLS: t,
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
if err != nil {
return err
}
fmt.Println(v.String())

完整示例

此最小 TLS.Config 通常足以连接到 ClickHouse 服务器上的安全原生端口(通常为 9440)。如果 ClickHouse 服务器没有有效的证书(已过期、主机名错误、未由公共认可的根证书颁发机构签名),则 InsecureSkipVerify 可以为 true,但这强烈不建议。

conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.SslPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
TLS: &tls.Config{
InsecureSkipVerify: true,
},
})
if err != nil {
return err
}
v, err := conn.ServerVersion()

完整示例

如果需要额外的 TLS 参数,应用程序代码应在 tls.Config 结构体中设置所需的字段。这可以包括特定的密码套件、强制使用特定的 TLS 版本(如 1.2 或 1.3)、添加内部 CA 证书链、添加客户端证书(和私钥)(如果 ClickHouse 服务器需要)以及更专业的安全设置附带的大多数其他选项。

身份验证

在连接详细信息中指定 Auth 结构体以指定用户名和密码。

conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
if err != nil {
return err
}
if err != nil {
return err
}
v, err := conn.ServerVersion()

完整示例

连接到多个节点

可以通过 Addr 结构体指定多个地址。

conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{"127.0.0.1:9001", "127.0.0.1:9002", fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
if err != nil {
return err
}
fmt.Println(v.String())

完整示例

有两种连接策略可用

  • ConnOpenInOrder(默认)- 地址按顺序使用。只有在无法使用列表中较早的地址进行连接的情况下,才会使用后面的地址。这实际上是一种故障转移策略。
  • ConnOpenRoundRobin - 使用轮循策略在地址之间平衡负载。

可以通过选项 ConnOpenStrategy 控制此行为

conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{"127.0.0.1:9001", "127.0.0.1:9002", fmt.Sprintf("%s:%d", env.Host, env.Port)},
ConnOpenStrategy: clickhouse.ConnOpenRoundRobin,
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
if err != nil {
return err
}

完整示例

执行

可以通过 Exec 方法执行任意语句。这对于 DDL 和简单语句很有用。它不应用于较大的插入或查询迭代。

conn.Exec(context.Background(), `DROP TABLE IF EXISTS example`)
err = conn.Exec(context.Background(), `
CREATE TABLE IF NOT EXISTS example (
Col1 UInt8,
Col2 String
) engine=Memory
`)
if err != nil {
return err
}
conn.Exec(context.Background(), "INSERT INTO example VALUES (1, 'test-1')")

完整示例

请注意能够将 Context 传递给查询。这可以用于传递特定的查询级别设置 - 请参阅使用 Context

批量插入

要插入大量行,客户端提供批处理语义。这需要准备一个批处理,可以将行附加到该批处理中。这最终通过 Send() 方法发送。批处理将保存在内存中,直到执行 Send。

conn, err := GetNativeConnection(nil, nil, nil)
if err != nil {
return err
}
ctx := context.Background()
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
conn.Exec(context.Background(), "DROP TABLE IF EXISTS example")
err = conn.Exec(ctx, `
CREATE TABLE IF NOT EXISTS example (
Col1 UInt8
, Col2 String
, Col3 FixedString(3)
, Col4 UUID
, Col5 Map(String, UInt8)
, Col6 Array(String)
, Col7 Tuple(String, UInt8, Array(Map(String, String)))
, Col8 DateTime
) Engine = Memory
`)
if err != nil {
return err
}


batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
for i := 0; i < 1000; i++ {
err := batch.Append(
uint8(42),
"ClickHouse",
"Inc",
uuid.New(),
map[string]uint8{"key": 1}, // Map(String, UInt8)
[]string{"Q", "W", "E", "R", "T", "Y"}, // Array(String)
[]interface{}{ // Tuple(String, UInt8, Array(Map(String, String)))
"String Value", uint8(5), []map[string]string{
{"key": "value"},
{"key": "value"},
{"key": "value"},
},
},
time.Now(),
)
if err != nil {
return err
}
}
return batch.Send()

完整示例

ClickHouse 的建议在此处适用。批处理不应在 go-routine 之间共享 - 为每个例程构建单独的批处理。

从上面的示例中,请注意变量类型需要与附加行时的列类型对齐。虽然映射通常很明显,但此接口试图保持灵活性,并且只要不造成精度损失,类型就会被转换。例如,以下示例演示了将字符串插入 datetime64。

batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
for i := 0; i < 1000; i++ {
err := batch.Append(
"2006-01-02 15:04:05.999",
)
if err != nil {
return err
}
}
return batch.Send()

完整示例

有关每个列类型支持的完整 go 类型摘要,请参阅类型转换

查询行

用户可以使用 QueryRow 方法查询单行,也可以通过 Query 获取游标以迭代结果集。虽然前者接受要将数据序列化到的目标,但后者需要对每一行调用 Scan

row := conn.QueryRow(context.Background(), "SELECT * FROM example")
var (
col1 uint8
col2, col3, col4 string
col5 map[string]uint8
col6 []string
col7 []interface{}
col8 time.Time
)
if err := row.Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7, &col8); err != nil {
return err
}
fmt.Printf("row: col1=%d, col2=%s, col3=%s, col4=%s, col5=%v, col6=%v, col7=%v, col8=%v\n", col1, col2, col3, col4, col5, col6, col7, col8)

完整示例

rows, err := conn.Query(ctx, "SELECT Col1, Col2, Col3 FROM example WHERE Col1 >= 2")
if err != nil {
return err
}
for rows.Next() {
var (
col1 uint8
col2 string
col3 time.Time
)
if err := rows.Scan(&col1, &col2, &col3); err != nil {
return err
}
fmt.Printf("row: col1=%d, col2=%s, col3=%s\n", col1, col2, col3)
}
rows.Close()
return rows.Err()

完整示例

请注意,在这两种情况下,我们都需要传递指向我们希望将相应列值序列化到的变量的指针。这些指针必须按照 SELECT 语句中指定的顺序传递 - 默认情况下,如果发生 SELECT *,则将使用列声明的顺序,如上所示。

与插入类似,Scan 方法要求目标变量具有适当的类型。这再次旨在保持灵活性,类型将在可能的情况下进行转换,前提是不可能造成精度损失,例如,上面的示例显示了 UUID 列被读取到字符串变量中。有关每个列类型支持的完整 go 类型列表,请参阅类型转换

最后,请注意能够将 Context 传递给 QueryQueryRow 方法。这可以用于查询级别设置 - 有关更多详细信息,请参阅使用 Context

异步插入

通过 Async 方法支持异步插入。这允许用户指定客户端是否应等待服务器完成插入,或者在收到数据后立即响应。这有效地控制了参数wait_for_async_insert

conn, err := GetNativeConnection(nil, nil, nil)
if err != nil {
return err
}
ctx := context.Background()
if err := clickhouse_tests.CheckMinServerServerVersion(conn, 21, 12, 0); err != nil {
return nil
}
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
conn.Exec(ctx, `DROP TABLE IF EXISTS example`)
const ddl = `
CREATE TABLE example (
Col1 UInt64
, Col2 String
, Col3 Array(UInt8)
, Col4 DateTime
) ENGINE = Memory
`
if err := conn.Exec(ctx, ddl); err != nil {
return err
}
for i := 0; i < 100; i++ {
if err := conn.AsyncInsert(ctx, fmt.Sprintf(`INSERT INTO example VALUES (
%d, '%s', [1, 2, 3, 4, 5, 6, 7, 8, 9], now()
)`, i, "Golang SQL database driver"), false); err != nil {
return err
}
}

完整示例

列式插入

插入可以以列格式插入。如果数据已经以这种结构定向,则可以避免透视到行的需要,从而提供性能优势。

batch, err := conn.PrepareBatch(context.Background(), "INSERT INTO example")
if err != nil {
return err
}
var (
col1 []uint64
col2 []string
col3 [][]uint8
col4 []time.Time
)
for i := 0; i < 1_000; i++ {
col1 = append(col1, uint64(i))
col2 = append(col2, "Golang SQL database driver")
col3 = append(col3, []uint8{1, 2, 3, 4, 5, 6, 7, 8, 9})
col4 = append(col4, time.Now())
}
if err := batch.Column(0).Append(col1); err != nil {
return err
}
if err := batch.Column(1).Append(col2); err != nil {
return err
}
if err := batch.Column(2).Append(col3); err != nil {
return err
}
if err := batch.Column(3).Append(col4); err != nil {
return err
}
return batch.Send()

完整示例

使用结构体

对于用户而言,Golang 结构体提供了 ClickHouse 中数据行的逻辑表示形式。为了帮助实现这一点,原生接口提供了几个方便的函数。

使用序列化进行选择

Select 方法允许将一组响应行编组到结构体切片中,只需一次调用。

var result []struct {
Col1 uint8
Col2 string
ColumnWithName time.Time `ch:"Col3"`
}

if err = conn.Select(ctx, &result, "SELECT Col1, Col2, Col3 FROM example"); err != nil {
return err
}

for _, v := range result {
fmt.Printf("row: col1=%d, col2=%s, col3=%s\n", v.Col1, v.Col2, v.ColumnWithName)
}

完整示例

扫描结构体

ScanStruct 允许将查询中的单行编组到结构体中。

var result struct {
Col1 int64
Count uint64 `ch:"count"`
}
if err := conn.QueryRow(context.Background(), "SELECT Col1, COUNT() AS count FROM example WHERE Col1 = 5 GROUP BY Col1").ScanStruct(&result); err != nil {
return err
}

完整示例

追加结构体

AppendStruct 允许将结构体追加到现有的批处理中,并将其解释为完整行。这要求结构体的列在名称和类型上都与表对齐。虽然所有列都必须具有等效的结构体字段,但某些结构体字段可能没有等效的列表示形式。这些字段将被简单地忽略。

batch, err := conn.PrepareBatch(context.Background(), "INSERT INTO example")
if err != nil {
return err
}
for i := 0; i < 1_000; i++ {
err := batch.AppendStruct(&row{
Col1: uint64(i),
Col2: "Golang SQL database driver",
Col3: []uint8{1, 2, 3, 4, 5, 6, 7, 8, 9},
Col4: time.Now(),
ColIgnored: "this will be ignored",
})
if err != nil {
return err
}
}

完整示例

类型转换

在接受插入和响应编组的变量类型方面,客户端旨在尽可能灵活。在大多数情况下,ClickHouse 列类型都存在等效的 Golang 类型,例如,UInt64uint64。这些逻辑映射应始终受支持。如果首先发生变量或接收数据的转换,用户可能希望利用可以插入列或用于接收响应的变量类型。客户端旨在透明地支持这些转换,因此用户无需在插入之前精确地转换其数据以对齐,并在查询时提供灵活的编组。这种透明转换不允许精度损失。例如,uint32 不能用于接收来自 UInt64 列的数据。相反,字符串可以插入到 datetime64 字段中,前提是它满足格式要求。

当前为原始类型支持的类型转换在此处捕获

这项工作正在进行中,可以分为插入 (Append/AppendRow) 和读取时间 (通过 Scan)。如果您需要支持特定的转换,请提出问题。

复杂类型

日期/日期时间类型

ClickHouse go 客户端支持 DateDate32DateTimeDateTime64 日期/日期时间类型。日期可以作为格式为 2006-01-02 的字符串插入,也可以使用原生 go time.Time{}sql.NullTime 插入。DateTime 也支持后两种类型,但要求字符串以格式 2006-01-02 15:04:05 传递,并带有可选的时区偏移量,例如 2006-01-02 15:04:05 +08:00time.Time{}sql.NullTime 在读取时以及任何 sql.Scanner 接口的实现中都受支持。

时区信息的处理取决于 ClickHouse 类型以及值是被插入还是被读取

  • DateTime/DateTime64
    • 插入时,值以 UNIX 时间戳格式发送到 ClickHouse。如果未提供时区,客户端将假定客户端的本地时区。time.Time{}sql.NullTime 将相应地转换为 epoch。
    • 选择时,如果设置了列的时区,则在返回 time.Time 值时将使用该时区。否则,将使用服务器的时区。
  • Date/Date32
    • 插入时,在将日期转换为 unix 时间戳时,会考虑任何日期的时区,即,在作为日期存储之前,它将按时区偏移,因为 Date 类型在 ClickHouse 中没有区域设置。如果未在字符串值中指定,则将使用本地时区。
    • 选择时,扫描到 time.Time{}sql.NullTime{} 实例中的日期将在不包含时区信息的情况下返回。

数组

数组应作为切片插入。元素的类型规则与原始类型的规则一致,即在可能的情况下,元素将被转换。

在 Scan 时应提供指向切片的指针。

batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
var i int64
for i = 0; i < 10; i++ {
err := batch.Append(
[]string{strconv.Itoa(int(i)), strconv.Itoa(int(i + 1)), strconv.Itoa(int(i + 2)), strconv.Itoa(int(i + 3))},
[][]int64{{i, i + 1}, {i + 2, i + 3}, {i + 4, i + 5}},
)
if err != nil {
return err
}
}
if err := batch.Send(); err != nil {
return err
}
var (
col1 []string
col2 [][]int64
)
rows, err := conn.Query(ctx, "SELECT * FROM example")
if err != nil {
return err
}
for rows.Next() {
if err := rows.Scan(&col1, &col2); err != nil {
return err
}
fmt.Printf("row: col1=%v, col2=%v\n", col1, col2)
}
rows.Close()

完整示例

Map

Map 应作为 Golang 映射插入,其键和值符合前面定义的类型规则。

batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
var i int64
for i = 0; i < 10; i++ {
err := batch.Append(
map[string]uint64{strconv.Itoa(int(i)): uint64(i)},
map[string][]string{strconv.Itoa(int(i)): {strconv.Itoa(int(i)), strconv.Itoa(int(i + 1)), strconv.Itoa(int(i + 2)), strconv.Itoa(int(i + 3))}},
map[string]map[string]uint64{strconv.Itoa(int(i)): {strconv.Itoa(int(i)): uint64(i)}},
)
if err != nil {
return err
}
}
if err := batch.Send(); err != nil {
return err
}
var (
col1 map[string]uint64
col2 map[string][]string
col3 map[string]map[string]uint64
)
rows, err := conn.Query(ctx, "SELECT * FROM example")
if err != nil {
return err
}
for rows.Next() {
if err := rows.Scan(&col1, &col2, &col3); err != nil {
return err
}
fmt.Printf("row: col1=%v, col2=%v, col3=%v\n", col1, col2, col3)
}
rows.Close()

完整示例

元组

元组表示任意长度的列组。列可以显式命名,也可以仅指定类型,例如

//unnamed
Col1 Tuple(String, Int64)

//named
Col2 Tuple(name String, id Int64, age uint8)

在这些方法中,命名元组提供了更大的灵活性。虽然未命名元组必须使用切片插入和读取,但命名元组也与映射兼容。

if err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 Tuple(name String, age UInt8),
Col2 Tuple(String, UInt8),
Col3 Tuple(name String, id String)
)
Engine Memory
`); err != nil {
return err
}

defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
// both named and unnamed can be added with slices. Note we can use strongly typed lists and maps if all elements are the same type
if err = batch.Append([]interface{}{"Clicky McClickHouse", uint8(42)}, []interface{}{"Clicky McClickHouse Snr", uint8(78)}, []string{"Dale", "521211"}); err != nil {
return err
}
if err = batch.Append(map[string]interface{}{"name": "Clicky McClickHouse Jnr", "age": uint8(20)}, []interface{}{"Baby Clicky McClickHouse", uint8(1)}, map[string]string{"name": "Geoff", "id": "12123"}); err != nil {
return err
}
if err = batch.Send(); err != nil {
return err
}
var (
col1 map[string]interface{}
col2 []interface{}
col3 map[string]string
)
// named tuples can be retrieved into a map or slices, unnamed just slices
if err = conn.QueryRow(ctx, "SELECT * FROM example").Scan(&col1, &col2, &col3); err != nil {
return err
}
fmt.Printf("row: col1=%v, col2=%v, col3=%v\n", col1, col2, col3)

完整示例

注意:支持类型化切片和映射,前提是命名元组中的子列都是相同的类型。

嵌套

Nested 字段等效于命名元组数组。用法取决于用户是否将 flatten_nested 设置为 1 或 0。

通过将 flatten_nested 设置为 0,Nested 列保持为单个元组数组。这允许用户使用映射切片进行插入和检索,并允许任意级别的嵌套。映射的键必须等于列的名称,如下例所示。

注意:由于映射表示元组,因此它们必须是 map[string]interface{} 类型。这些值目前不是强类型的。

conn, err := GetNativeConnection(clickhouse.Settings{
"flatten_nested": 0,
}, nil, nil)
if err != nil {
return err
}
ctx := context.Background()
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
conn.Exec(context.Background(), "DROP TABLE IF EXISTS example")
err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 Nested(Col1_1 String, Col1_2 UInt8),
Col2 Nested(
Col2_1 UInt8,
Col2_2 Nested(
Col2_2_1 UInt8,
Col2_2_2 UInt8
)
)
) Engine Memory
`)
if err != nil {
return err
}

batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
var i int64
for i = 0; i < 10; i++ {
err := batch.Append(
[]map[string]interface{}{
{
"Col1_1": strconv.Itoa(int(i)),
"Col1_2": uint8(i),
},
{
"Col1_1": strconv.Itoa(int(i + 1)),
"Col1_2": uint8(i + 1),
},
{
"Col1_1": strconv.Itoa(int(i + 2)),
"Col1_2": uint8(i + 2),
},
},
[]map[string]interface{}{
{
"Col2_2": []map[string]interface{}{
{
"Col2_2_1": uint8(i),
"Col2_2_2": uint8(i + 1),
},
},
"Col2_1": uint8(i),
},
{
"Col2_2": []map[string]interface{}{
{
"Col2_2_1": uint8(i + 2),
"Col2_2_2": uint8(i + 3),
},
},
"Col2_1": uint8(i + 1),
},
},
)
if err != nil {
return err
}
}
if err := batch.Send(); err != nil {
return err
}
var (
col1 []map[string]interface{}
col2 []map[string]interface{}
)
rows, err := conn.Query(ctx, "SELECT * FROM example")
if err != nil {
return err
}
for rows.Next() {
if err := rows.Scan(&col1, &col2); err != nil {
return err
}
fmt.Printf("row: col1=%v, col2=%v\n", col1, col2)
}
rows.Close()

完整示例 - flatten_tested=0

如果对 flatten_nested 使用默认值 1,则嵌套列将展平为单独的数组。这需要使用嵌套切片进行插入和检索。虽然任意级别的嵌套可能有效,但这并未获得官方支持。

conn, err := GetNativeConnection(nil, nil, nil)
if err != nil {
return err
}
ctx := context.Background()
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
conn.Exec(ctx, "DROP TABLE IF EXISTS example")
err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 Nested(Col1_1 String, Col1_2 UInt8),
Col2 Nested(
Col2_1 UInt8,
Col2_2 Nested(
Col2_2_1 UInt8,
Col2_2_2 UInt8
)
)
) Engine Memory
`)
if err != nil {
return err
}


batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
var i uint8
for i = 0; i < 10; i++ {
col1_1_data := []string{strconv.Itoa(int(i)), strconv.Itoa(int(i + 1)), strconv.Itoa(int(i + 2))}
col1_2_data := []uint8{i, i + 1, i + 2}
col2_1_data := []uint8{i, i + 1, i + 2}
col2_2_data := [][][]interface{}{
{
{i, i + 1},
},
{
{i + 2, i + 3},
},
{
{i + 4, i + 5},
},
}
err := batch.Append(
col1_1_data,
col1_2_data,
col2_1_data,
col2_2_data,
)
if err != nil {
return err
}
}
if err := batch.Send(); err != nil {
return err
}

完整示例 - flatten_nested=1

注意:Nested 列必须具有相同的维度。例如,在上面的示例中,Col_2_2Col_2_1 必须具有相同数量的元素。

由于更直接的界面和对嵌套的官方支持,我们建议使用 flatten_nested=0

地理类型

客户端支持地理类型 Point、Ring、Polygon 和 Multi Polygon。这些字段在 Golang 中使用 github.com/paulmach/orb 包。

if err = conn.Exec(ctx, `
CREATE TABLE example (
point Point,
ring Ring,
polygon Polygon,
mPolygon MultiPolygon
)
Engine Memory
`); err != nil {
return err
}

batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}

if err = batch.Append(
orb.Point{11, 22},
orb.Ring{
orb.Point{1, 2},
orb.Point{1, 2},
},
orb.Polygon{
orb.Ring{
orb.Point{1, 2},
orb.Point{12, 2},
},
orb.Ring{
orb.Point{11, 2},
orb.Point{1, 12},
},
},
orb.MultiPolygon{
orb.Polygon{
orb.Ring{
orb.Point{1, 2},
orb.Point{12, 2},
},
orb.Ring{
orb.Point{11, 2},
orb.Point{1, 12},
},
},
orb.Polygon{
orb.Ring{
orb.Point{1, 2},
orb.Point{12, 2},
},
orb.Ring{
orb.Point{11, 2},
orb.Point{1, 12},
},
},
},
); err != nil {
return err
}

if err = batch.Send(); err != nil {
return err
}

var (
point orb.Point
ring orb.Ring
polygon orb.Polygon
mPolygon orb.MultiPolygon
)

if err = conn.QueryRow(ctx, "SELECT * FROM example").Scan(&point, &ring, &polygon, &mPolygon); err != nil {
return err
}

完整示例

UUID

UUID 类型由 github.com/google/uuid 包支持。用户还可以将 UUID 作为字符串或任何实现了 sql.ScannerStringify 类型的类型发送和编组。

if err = conn.Exec(ctx, `
CREATE TABLE example (
col1 UUID,
col2 UUID
)
Engine Memory
`); err != nil {
return err
}

batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
col1Data, _ := uuid.NewUUID()
if err = batch.Append(
col1Data,
"603966d6-ed93-11ec-8ea0-0242ac120002",
); err != nil {
return err
}

if err = batch.Send(); err != nil {
return err
}

var (
col1 uuid.UUID
col2 uuid.UUID
)

if err = conn.QueryRow(ctx, "SELECT * FROM example").Scan(&col1, &col2); err != nil {
return err
}

完整示例

Decimal

Decimal 类型由 github.com/shopspring/decimal 包支持。

if err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 Decimal32(3),
Col2 Decimal(18,6),
Col3 Decimal(15,7),
Col4 Decimal128(8),
Col5 Decimal256(9)
) Engine Memory
`); err != nil {
return err
}

batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
if err = batch.Append(
decimal.New(25, 4),
decimal.New(30, 5),
decimal.New(35, 6),
decimal.New(135, 7),
decimal.New(256, 8),
); err != nil {
return err
}

if err = batch.Send(); err != nil {
return err
}

var (
col1 decimal.Decimal
col2 decimal.Decimal
col3 decimal.Decimal
col4 decimal.Decimal
col5 decimal.Decimal
)

if err = conn.QueryRow(ctx, "SELECT * FROM example").Scan(&col1, &col2, &col3, &col4, &col5); err != nil {
return err
}
fmt.Printf("col1=%v, col2=%v, col3=%v, col4=%v, col5=%v\n", col1, col2, col3, col4, col5)

完整示例

Nullable

Nil 的 go 值表示 ClickHouse NULL。如果字段声明为 Nullable,则可以使用它。在插入时,Nil 可以传递给列的 normal 和 Nullable 版本。对于前者,将持久化该类型的默认值,例如,字符串的空字符串。对于 nullable 版本,NULL 值将存储在 ClickHouse 中。

在扫描时,用户必须传递一个指向支持 nil 的类型的指针,例如 *string,以便表示 Nullable 字段的 nil 值。在下面的示例中,col1(它是 Nullable(String))因此接收一个 **string。这允许表示 nil。

if err = conn.Exec(ctx, `
CREATE TABLE example (
col1 Nullable(String),
col2 String,
col3 Nullable(Int8),
col4 Nullable(Int64)
)
Engine Memory
`); err != nil {
return err
}

batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
if err = batch.Append(
nil,
nil,
nil,
sql.NullInt64{Int64: 0, Valid: false},
); err != nil {
return err
}

if err = batch.Send(); err != nil {
return err
}

var (
col1 *string
col2 string
col3 *int8
col4 sql.NullInt64
)

if err = conn.QueryRow(ctx, "SELECT * FROM example").Scan(&col1, &col2, &col3, &col4); err != nil {
return err
}

完整示例

客户端还额外支持 sql.Null* 类型,例如 sql.NullInt64。这些类型与其等效的 ClickHouse 类型兼容。

大整数 - Int128、Int256、UInt128、UInt256

大于 64 位的数字类型使用原生 go big 包表示。

if err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 Int128,
Col2 UInt128,
Col3 Array(Int128),
Col4 Int256,
Col5 Array(Int256),
Col6 UInt256,
Col7 Array(UInt256)
) Engine Memory`); err != nil {
return err
}

batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}

col1Data, _ := new(big.Int).SetString("170141183460469231731687303715884105727", 10)
col2Data := big.NewInt(128)
col3Data := []*big.Int{
big.NewInt(-128),
big.NewInt(128128),
big.NewInt(128128128),
}
col4Data := big.NewInt(256)
col5Data := []*big.Int{
big.NewInt(256),
big.NewInt(256256),
big.NewInt(256256256256),
}
col6Data := big.NewInt(256)
col7Data := []*big.Int{
big.NewInt(256),
big.NewInt(256256),
big.NewInt(256256256256),
}

if err = batch.Append(col1Data, col2Data, col3Data, col4Data, col5Data, col6Data, col7Data); err != nil {
return err
}

if err = batch.Send(); err != nil {
return err
}

var (
col1 big.Int
col2 big.Int
col3 []*big.Int
col4 big.Int
col5 []*big.Int
col6 big.Int
col7 []*big.Int
)

if err = conn.QueryRow(ctx, "SELECT * FROM example").Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7); err != nil {
return err
}
fmt.Printf("col1=%v, col2=%v, col3=%v, col4=%v, col5=%v, col6=%v, col7=%v\n", col1, col2, col3, col4, col5, col6, col7)

完整示例

压缩

对压缩方法的支持取决于正在使用的底层协议。对于原生协议,客户端支持 LZ4ZSTD 压缩。这仅在块级别执行。可以通过在连接中包含 Compression 配置来启用压缩。

conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionZSTD,
},
MaxOpenConns: 1,
})
ctx := context.Background()
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
conn.Exec(context.Background(), "DROP TABLE IF EXISTS example")
if err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 Array(String)
) Engine Memory
`); err != nil {
return err
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
for i := 0; i < 1000; i++ {
if err := batch.Append([]string{strconv.Itoa(i), strconv.Itoa(i + 1), strconv.Itoa(i + 2), strconv.Itoa(i + 3)}); err != nil {
return err
}
}
if err := batch.Send(); err != nil {
return err
}

完整示例

如果通过 HTTP 使用标准接口,则可以使用其他压缩技术。有关更多详细信息,请参阅 database/sql API - 压缩

参数绑定

客户端支持 ExecQueryQueryRow 方法的参数绑定。如下例所示,这通过命名、编号和位置参数支持。我们在下面提供了这些示例。

var count uint64
// positional bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 >= ? AND Col3 < ?", 500, now.Add(time.Duration(750)*time.Second)).Scan(&count); err != nil {
return err
}
// 250
fmt.Printf("Positional bind count: %d\n", count)
// numeric bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= $2 AND Col3 > $1", now.Add(time.Duration(150)*time.Second), 250).Scan(&count); err != nil {
return err
}
// 100
fmt.Printf("Numeric bind count: %d\n", count)
// named bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= @col1 AND Col3 > @col3", clickhouse.Named("col1", 100), clickhouse.Named("col3", now.Add(time.Duration(50)*time.Second))).Scan(&count); err != nil {
return err
}
// 50
fmt.Printf("Named bind count: %d\n", count)

完整示例

特殊情况

默认情况下,如果切片作为参数传递给查询,则切片将展开为逗号分隔的值列表。如果用户需要一组用 [ ] 包裹的值进行注入,则应使用 ArraySet

如果需要用 ( ) 包裹的组/元组,例如,用于 IN 运算符,用户可以使用 GroupSet。这对于需要多个组的情况尤其有用,如下例所示。

最后,DateTime64 字段需要精度,以确保参数得到适当的呈现。客户端不知道字段的精度级别,但是,用户必须提供它。为了方便这一点,我们提供了 DateNamed 参数。

var count uint64
// arrays will be unfolded
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 IN (?)", []int{100, 200, 300, 400, 500}).Scan(&count); err != nil {
return err
}
fmt.Printf("Array unfolded count: %d\n", count)
// arrays will be preserved with []
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col4 = ?", clickhouse.ArraySet{300, 301}).Scan(&count); err != nil {
return err
}
fmt.Printf("Array count: %d\n", count)
// Group sets allow us to form ( ) lists
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 IN ?", clickhouse.GroupSet{[]interface{}{100, 200, 300, 400, 500}}).Scan(&count); err != nil {
return err
}
fmt.Printf("Group count: %d\n", count)
// More useful when we need nesting
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE (Col1, Col5) IN (?)", []clickhouse.GroupSet{{[]interface{}{100, 101}}, {[]interface{}{200, 201}}}).Scan(&count); err != nil {
return err
}
fmt.Printf("Group count: %d\n", count)
// Use DateNamed when you need a precision in your time#
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col3 >= @col3", clickhouse.DateNamed("col3", now.Add(time.Duration(500)*time.Millisecond), clickhouse.NanoSeconds)).Scan(&count); err != nil {
return err
}
fmt.Printf("NamedDate count: %d\n", count)

完整示例

使用 Context

Go 上下文提供了一种跨 API 边界传递截止时间、取消信号和其他请求作用域值的方法。连接上的所有方法都接受上下文作为它们的第一个变量。虽然之前的示例使用了 context.Background(),但用户可以使用此功能来传递设置和截止时间以及取消查询。

传递使用 withDeadline 创建的上下文允许对查询设置执行时间限制。请注意,这是一个绝对时间,过期只会释放连接并向 ClickHouse 发送取消信号。或者,可以使用 WithCancel 显式取消查询。

助手 clickhouse.WithQueryIDclickhouse.WithQuotaKey 允许指定查询 ID 和配额密钥。查询 ID 对于跟踪日志中的查询和取消目的非常有用。配额密钥可用于根据唯一密钥值对 ClickHouse 使用施加限制 - 有关更多详细信息,请参阅 配额管理

用户还可以使用上下文来确保设置仅应用于特定查询 - 而不是应用于整个连接,如 连接设置 中所示。

最后,用户可以通过 clickhouse.WithBlockSize 控制块缓冲区的大小。这将覆盖连接级别设置 BlockBufferSize,并控制在任何时候解码和保存在内存中的最大块数。较大的值可能意味着更多的并行化,但会牺牲内存。

以上示例在下面显示。

dialCount := 0
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
DialContext: func(ctx context.Context, addr string) (net.Conn, error) {
dialCount++
var d net.Dialer
return d.DialContext(ctx, "tcp", addr)
},
})
if err != nil {
return err
}
if err := clickhouse_tests.CheckMinServerServerVersion(conn, 22, 6, 1); err != nil {
return nil
}
// we can use context to pass settings to a specific API call
ctx := clickhouse.Context(context.Background(), clickhouse.WithSettings(clickhouse.Settings{
"allow_experimental_object_type": "1",
}))

conn.Exec(ctx, "DROP TABLE IF EXISTS example")

// to create a JSON column we need allow_experimental_object_type=1
if err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 JSON
)
Engine Memory
`); err != nil {
return err
}

// queries can be cancelled using the context
ctx, cancel := context.WithCancel(context.Background())
go func() {
cancel()
}()
if err = conn.QueryRow(ctx, "SELECT sleep(3)").Scan(); err == nil {
return fmt.Errorf("expected cancel")
}

// set a deadline for a query - this will cancel the query after the absolute time is reached.
// queries will continue to completion in ClickHouse
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(-time.Second))
defer cancel()
if err := conn.Ping(ctx); err == nil {
return fmt.Errorf("expected deadline exceeeded")
}

// set a query id to assist tracing queries in logs e.g. see system.query_log
var one uint8
queryId, _ := uuid.NewUUID()
ctx = clickhouse.Context(context.Background(), clickhouse.WithQueryID(queryId.String()))
if err = conn.QueryRow(ctx, "SELECT 1").Scan(&one); err != nil {
return err
}

conn.Exec(context.Background(), "DROP QUOTA IF EXISTS foobar")
defer func() {
conn.Exec(context.Background(), "DROP QUOTA IF EXISTS foobar")
}()
ctx = clickhouse.Context(context.Background(), clickhouse.WithQuotaKey("abcde"))
// set a quota key - first create the quota
if err = conn.Exec(ctx, "CREATE QUOTA IF NOT EXISTS foobar KEYED BY client_key FOR INTERVAL 1 minute MAX queries = 5 TO default"); err != nil {
return err
}

type Number struct {
Number uint64 `ch:"number"`
}
for i := 1; i <= 6; i++ {
var result []Number
if err = conn.Select(ctx, &result, "SELECT number FROM numbers(10)"); err != nil {
return err
}
}

完整示例

进度/Profile/日志信息

可以在查询中请求进度、Profile 和日志信息。进度信息将报告 ClickHouse 中已读取和处理的行数和字节数的统计信息。相反,Profile 信息提供返回给客户端的数据摘要,包括字节总数(未压缩)、行数和块数。最后,日志信息提供有关线程的统计信息,例如,内存使用情况和数据速度。

获取此信息需要用户使用 Context,用户可以在其中传递回调函数。

totalRows := uint64(0)
// use context to pass a call back for progress and profile info
ctx := clickhouse.Context(context.Background(), clickhouse.WithProgress(func(p *clickhouse.Progress) {
fmt.Println("progress: ", p)
totalRows += p.Rows
}), clickhouse.WithProfileInfo(func(p *clickhouse.ProfileInfo) {
fmt.Println("profile info: ", p)
}), clickhouse.WithLogs(func(log *clickhouse.Log) {
fmt.Println("log info: ", log)
}))

rows, err := conn.Query(ctx, "SELECT number from numbers(1000000) LIMIT 1000000")
if err != nil {
return err
}
for rows.Next() {
}

fmt.Printf("Total Rows: %d\n", totalRows)
rows.Close()

完整示例

动态扫描

用户可能需要读取他们不知道模式或返回字段类型的表。这在执行临时数据分析或编写通用工具的情况下很常见。为了实现这一点,列类型信息在查询响应中可用。这可以与 Go 反射一起使用,以创建可以传递给 Scan 的正确类型变量的运行时实例。

const query = `
SELECT
1 AS Col1
, 'Text' AS Col2
`
rows, err := conn.Query(context.Background(), query)
if err != nil {
return err
}
var (
columnTypes = rows.ColumnTypes()
vars = make([]interface{}, len(columnTypes))
)
for i := range columnTypes {
vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
}
for rows.Next() {
if err := rows.Scan(vars...); err != nil {
return err
}
for _, v := range vars {
switch v := v.(type) {
case *string:
fmt.Println(*v)
case *uint8:
fmt.Println(*v)
}
}
}

完整示例

外部表

外部表允许客户端将数据发送到 ClickHouse,并执行 SELECT 查询。此数据放入临时表中,并可以在查询本身中用于评估。

要将外部数据与查询一起发送到客户端,用户必须通过 ext.NewTable 构建外部表,然后在上下文中传递它。

table1, err := ext.NewTable("external_table_1",
ext.Column("col1", "UInt8"),
ext.Column("col2", "String"),
ext.Column("col3", "DateTime"),
)
if err != nil {
return err
}

for i := 0; i < 10; i++ {
if err = table1.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now()); err != nil {
return err
}
}

table2, err := ext.NewTable("external_table_2",
ext.Column("col1", "UInt8"),
ext.Column("col2", "String"),
ext.Column("col3", "DateTime"),
)

for i := 0; i < 10; i++ {
table2.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now())
}
ctx := clickhouse.Context(context.Background(),
clickhouse.WithExternalTable(table1, table2),
)
rows, err := conn.Query(ctx, "SELECT * FROM external_table_1")
if err != nil {
return err
}
for rows.Next() {
var (
col1 uint8
col2 string
col3 time.Time
)
rows.Scan(&col1, &col2, &col3)
fmt.Printf("col1=%d, col2=%s, col3=%v\n", col1, col2, col3)
}
rows.Close()

var count uint64
if err := conn.QueryRow(ctx, "SELECT COUNT(*) FROM external_table_1").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_1: %d\n", count)
if err := conn.QueryRow(ctx, "SELECT COUNT(*) FROM external_table_2").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_2: %d\n", count)
if err := conn.QueryRow(ctx, "SELECT COUNT(*) FROM (SELECT * FROM external_table_1 UNION ALL SELECT * FROM external_table_2)").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_1 UNION external_table_2: %d\n", count)

完整示例

Open Telemetry

ClickHouse 允许将 跟踪上下文 作为原生协议的一部分传递。客户端允许通过函数 clickhouse.withSpan 创建 Span,并通过 Context 传递以实现此目的。

var count uint64
rows := conn.QueryRow(clickhouse.Context(context.Background(), clickhouse.WithSpan(
trace.NewSpanContext(trace.SpanContextConfig{
SpanID: trace.SpanID{1, 2, 3, 4, 5},
TraceID: trace.TraceID{5, 4, 3, 2, 1},
}),
)), "SELECT COUNT() FROM (SELECT number FROM system.numbers LIMIT 5)")
if err := rows.Scan(&count); err != nil {
return err
}
fmt.Printf("count: %d\n", count)

完整示例

有关利用跟踪的完整详细信息,请参见 OpenTelemetry 支持

Database/SQL API

database/sql 或“标准”API 允许用户在应用程序代码应与底层数据库无关的场景中使用客户端,方法是符合标准接口。这会带来一些代价 - 额外的抽象和间接层,以及不一定与 ClickHouse 对齐的原语。但是,在工具需要连接到多个数据库的场景中,这些成本通常是可以接受的。

此外,此客户端支持使用 HTTP 作为传输层 - 数据仍将以原生格式编码以获得最佳性能。

以下旨在镜像 ClickHouse API 文档的结构。

标准 API 的完整代码示例可以在 此处 找到。

连接

可以通过 DSN 字符串(格式为 clickhouse://<host>:<port>?<query_option>=<value>)和 Open 方法,或通过 clickhouse.OpenDB 方法来实现连接。后者不是 database/sql 规范的一部分,但返回一个 sql.DB 实例。此方法提供诸如 profiling 之类的功能,对于这些功能,没有明显的手段通过 database/sql 规范公开。

func Connect() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
return conn.Ping()
}


func ConnectDSN() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("clickhouse://%s:%d?username=%s&password=%s", env.Host, env.Port, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}

完整示例

对于所有后续示例,除非明确显示,否则我们假设 ClickHouse conn 变量已创建并且可用。

连接设置

以下参数可以在 DSN 字符串中传递

  • hosts - 用于负载均衡和故障转移的逗号分隔的单地址主机列表 - 请参阅 连接到多个节点
  • username/password - 身份验证凭据 - 请参阅 身份验证
  • database - 选择当前默认数据库
  • dial_timeout - 持续时间字符串是可能带符号的十进制数字序列,每个数字都有可选的小数部分和一个单位后缀,例如 300ms1s。有效的时间单位是 mssm
  • connection_open_strategy - random/in_order (默认 random) - 请参阅 连接到多个节点
    • round_robin - 从集合中选择一个轮询服务器
    • in_order - 按指定顺序选择第一个活动的服务器
  • debug - 启用调试输出(布尔值)
  • compress - 指定压缩算法 - none(默认)、zstdlz4gzipdeflatebr。如果设置为 true,将使用 lz4。只有 lz4zstd 支持原生通信。
  • compress_level - 压缩级别(默认为 0)。请参阅压缩。这是算法特定的
    • gzip - -2(最快速度)到 9(最佳压缩)
    • deflate - -2(最快速度)到 9(最佳压缩)
    • br - 0(最快速度)到 11(最佳压缩)
    • zstd, lz4 - 忽略
  • secure - 建立安全 SSL 连接(默认为 false
  • skip_verify - 跳过证书验证(默认为 false
  • block_buffer_size - 允许用户控制块缓冲区大小。请参阅 BlockBufferSize。(默认为 2
func ConnectSettings() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("clickhouse://127.0.0.1:9001,127.0.0.1:9002,%s:%d/%s?username=%s&password=%s&dial_timeout=10s&connection_open_strategy=round_robin&debug=true&compress=lz4", env.Host, env.Port, env.Database, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}

完整示例

连接池

用户可以影响提供的节点地址列表的使用,如 连接到多个节点 中所述。但是,连接管理和池化由设计委托给 sql.DB

通过 HTTP 连接

默认情况下,连接是通过原生协议建立的。对于需要 HTTP 的用户,可以通过修改 DSN 以包含 HTTP 协议或在连接选项中指定协议来启用此功能。

func ConnectHTTP() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Protocol: clickhouse.HTTP,
})
return conn.Ping()
}

func ConnectDSNHTTP() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("http://%s:%d?username=%s&password=%s", env.Host, env.HttpPort, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}

完整示例

连接到多个节点

如果使用 OpenDB,请使用与 ClickHouse API 相同的方法连接到多个主机 - 可选地指定 ConnOpenStrategy

对于基于 DSN 的连接,字符串接受多个主机和一个 connection_open_strategy 参数,可以为该参数设置值 round_robinin_order

func MultiStdHost() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{"127.0.0.1:9001", "127.0.0.1:9002", fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
ConnOpenStrategy: clickhouse.ConnOpenRoundRobin,
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
if err != nil {
return err
}
fmt.Println(v.String())
return nil
}

func MultiStdHostDSN() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("clickhouse://127.0.0.1:9001,127.0.0.1:9002,%s:%d?username=%s&password=%s&connection_open_strategy=round_robin", env.Host, env.Port, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}

完整示例

使用 TLS

如果使用 DSN 连接字符串,可以通过参数“secure=true”启用 SSL。OpenDB 方法使用与 原生 API for TLS 相同的方法,依赖于非 nil TLS 结构的规范。虽然 DSN 连接字符串支持参数 skip_verify 以跳过 SSL 验证,但 OpenDB 方法是更高级 TLS 配置所必需的 - 因为它允许传递配置。

func ConnectSSL() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
cwd, err := os.Getwd()
if err != nil {
return err
}
t := &tls.Config{}
caCert, err := ioutil.ReadFile(path.Join(cwd, "../../tests/resources/CAroot.crt"))
if err != nil {
return err
}
caCertPool := x509.NewCertPool()
successful := caCertPool.AppendCertsFromPEM(caCert)
if !successful {
return err
}
t.RootCAs = caCertPool


conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.SslPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
TLS: t,
})
return conn.Ping()
}

func ConnectDSNSSL() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("https://%s:%d?secure=true&skip_verify=true&username=%s&password=%s", env.Host, env.HttpsPort, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}

完整示例

身份验证

如果使用 OpenDB,可以通过常用选项传递身份验证信息。对于基于 DSN 的连接,用户名和密码可以在连接字符串中传递 - 可以作为参数,也可以作为编码在地址中的凭据。

func ConnectAuth() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
return conn.Ping()
}

func ConnectDSNAuth() error {
env, err := GetStdTestEnvironment()
conn, err := sql.Open("clickhouse", fmt.Sprintf("http://%s:%d?username=%s&password=%s", env.Host, env.HttpPort, env.Username, env.Password))
if err != nil {
return err
}
if err = conn.Ping(); err != nil {
return err
}
conn, err = sql.Open("clickhouse", fmt.Sprintf("http://%s:%s@%s:%d", env.Username, env.Password, env.Host, env.HttpPort))
if err != nil {
return err
}
return conn.Ping()
}

完整示例

执行

获得连接后,用户可以发出 sql 语句以通过 Exec 方法执行。

conn.Exec(`DROP TABLE IF EXISTS example`)
_, err = conn.Exec(`
CREATE TABLE IF NOT EXISTS example (
Col1 UInt8,
Col2 String
) engine=Memory
`)
if err != nil {
return err
}
_, err = conn.Exec("INSERT INTO example VALUES (1, 'test-1')")

完整示例

此方法不支持接收上下文 - 默认情况下,它使用后台上下文执行。如果需要,用户可以使用 ExecContext - 请参阅 使用 Context

批量插入

可以通过使用 Being 方法创建 sql.Tx 来实现批处理语义。由此,可以使用 Prepare 方法和 INSERT 语句获得批处理。这将返回一个 sql.Stmt,可以使用 Exec 方法将行附加到其中。批处理将累积在内存中,直到在原始 sql.Tx 上执行 Commit

batch, err := scope.Prepare("INSERT INTO example")
if err != nil {
return err
}
for i := 0; i < 1000; i++ {
_, err := batch.Exec(
uint8(42),
"ClickHouse", "Inc",
uuid.New(),
map[string]uint8{"key": 1}, // Map(String, UInt8)
[]string{"Q", "W", "E", "R", "T", "Y"}, // Array(String)
[]interface{}{ // Tuple(String, UInt8, Array(Map(String, String)))
"String Value", uint8(5), []map[string]string{
map[string]string{"key": "value"},
map[string]string{"key": "value"},
map[string]string{"key": "value"},
},
},
time.Now(),
)
if err != nil {
return err
}
}
return scope.Commit()

完整示例

查询行/多行

可以使用 QueryRow 方法实现查询单行。这将返回一个 *sql.Row,可以在其上调用 Scan,并使用指向变量的指针,列应编组到这些变量中。QueryRowContext 变体允许传递除后台之外的上下文 - 请参阅 使用 Context

row := conn.QueryRow("SELECT * FROM example")
var (
col1 uint8
col2, col3, col4 string
col5 map[string]uint8
col6 []string
col7 interface{}
col8 time.Time
)
if err := row.Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7, &col8); err != nil {
return err
}

完整示例

迭代多行需要 Query 方法。这将返回一个 *sql.Rows 结构,可以在其上调用 Next 以迭代行。QueryContext 等效项允许传递上下文。

rows, err := conn.Query("SELECT * FROM example")
if err != nil {
return err
}
var (
col1 uint8
col2, col3, col4 string
col5 map[string]uint8
col6 []string
col7 interface{}
col8 time.Time
)
for rows.Next() {
if err := rows.Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7, &col8); err != nil {
return err
}
fmt.Printf("row: col1=%d, col2=%s, col3=%s, col4=%s, col5=%v, col6=%v, col7=%v, col8=%v\n", col1, col2, col3, col4, col5, col6, col7, col8)
}

完整示例

异步插入

可以通过使用 ExecContext 方法执行插入来实现异步插入。这应该传递一个启用了异步模式的上下文,如下所示。这允许用户指定客户端是应等待服务器完成插入,还是在收到数据后立即响应。这有效地控制了参数 wait_for_async_insert

const ddl = `
CREATE TABLE example (
Col1 UInt64
, Col2 String
, Col3 Array(UInt8)
, Col4 DateTime
) ENGINE = Memory
`
if _, err := conn.Exec(ddl); err != nil {
return err
}
ctx := clickhouse.Context(context.Background(), clickhouse.WithStdAsync(false))
{
for i := 0; i < 100; i++ {
_, err := conn.ExecContext(ctx, fmt.Sprintf(`INSERT INTO example VALUES (
%d, '%s', [1, 2, 3, 4, 5, 6, 7, 8, 9], now()
)`, i, "Golang SQL database driver"))
if err != nil {
return err
}
}
}

完整示例

列式插入

标准接口不支持。

使用结构体

标准接口不支持。

类型转换

标准 database/sql 接口应支持与 ClickHouse API 相同的类型。有一些例外,主要是针对复杂类型,我们在下面记录了这些例外。与 ClickHouse API 类似,客户端旨在尽可能灵活地接受用于插入和编组响应的可变类型。有关更多详细信息,请参阅 类型转换

复杂类型

除非另有说明,否则复杂类型处理应与 ClickHouse API 相同。差异是 database/sql 内部结构的结果。

Maps

与 ClickHouse API 不同,标准 API 要求 map 在扫描类型时是强类型的。例如,用户不能为 Map(String,String) 字段传递 map[string]interface{},而必须改用 map[string]stringinterface{} 变量将始终兼容,并且可以用于更复杂的结构。读取时不支持结构体。

var (
col1Data = map[string]uint64{
"key_col_1_1": 1,
"key_col_1_2": 2,
}
col2Data = map[string]uint64{
"key_col_2_1": 10,
"key_col_2_2": 20,
}
col3Data = map[string]uint64{}
col4Data = []map[string]string{
{"A": "B"},
{"C": "D"},
}
col5Data = map[string]uint64{
"key_col_5_1": 100,
"key_col_5_2": 200,
}
)
if _, err := batch.Exec(col1Data, col2Data, col3Data, col4Data, col5Data); err != nil {
return err
}
if err = scope.Commit(); err != nil {
return err
}
var (
col1 interface{}
col2 map[string]uint64
col3 map[string]uint64
col4 []map[string]string
col5 map[string]uint64
)
if err := conn.QueryRow("SELECT * FROM example").Scan(&col1, &col2, &col3, &col4, &col5); err != nil {
return err
}
fmt.Printf("col1=%v, col2=%v, col3=%v, col4=%v, col5=%v", col1, col2, col3, col4, col5)

完整示例

插入行为与 ClickHouse API 相同。

压缩

标准 API 支持与原生 ClickHouse API 相同的压缩算法,即块级别的 lz4zstd 压缩。此外,HTTP 连接还支持 gzip、deflate 和 br 压缩。如果启用其中任何一个,则在插入期间和查询响应期间对块执行压缩。其他请求,例如 ping 或查询请求,将保持未压缩状态。这与 lz4zstd 选项一致。

如果使用 OpenDB 方法建立连接,则可以传递 Compression 配置。这包括指定压缩级别的能力(见下文)。如果通过带有 DSN 的 sql.Open 连接,请使用参数 compress。这可以是特定的压缩算法,即 gzipdeflatebrzstdlz4,也可以是布尔标志。如果设置为 true,将使用 lz4。默认值为 none,即禁用压缩。

conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionBrotli,
Level: 5,
},
Protocol: clickhouse.HTTP,
})

完整示例

conn, err := sql.Open("clickhouse", fmt.Sprintf("http://%s:%d?username=%s&password=%s&compress=gzip&compress_level=5", env.Host, env.HttpPort, env.Username, env.Password))

完整示例

应用的压缩级别可以通过 DSN 参数 compress_level 或 Compression 选项的 Level 字段来控制。这默认为 0,但它是算法特定的

  • gzip - -2(最快速度)到 9(最佳压缩)
  • deflate - -2(最快速度)到 9(最佳压缩)
  • br - 0(最快速度)到 11(最佳压缩)
  • zstd, lz4 - 忽略

参数绑定

标准 API 支持与 ClickHouse API 相同的参数绑定功能,允许将参数传递给 ExecQueryQueryRow 方法(及其等效的 Context 变体)。支持位置、命名和编号参数。

var count uint64
// positional bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 >= ? AND Col3 < ?", 500, now.Add(time.Duration(750)*time.Second)).Scan(&count); err != nil {
return err
}
// 250
fmt.Printf("Positional bind count: %d\n", count)
// numeric bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= $2 AND Col3 > $1", now.Add(time.Duration(150)*time.Second), 250).Scan(&count); err != nil {
return err
}
// 100
fmt.Printf("Numeric bind count: %d\n", count)
// named bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= @col1 AND Col3 > @col3", clickhouse.Named("col1", 100), clickhouse.Named("col3", now.Add(time.Duration(50)*time.Second))).Scan(&count); err != nil {
return err
}
// 50
fmt.Printf("Named bind count: %d\n", count)

完整示例

请注意 特殊情况 仍然适用。

使用 Context

标准 API 支持与 ClickHouse API 相同的功能,可以通过上下文传递截止时间、取消信号和其他请求作用域值。与 ClickHouse API 不同,这是通过使用方法的 Context 变体来实现的,即默认情况下使用后台上下文的方法(如 Exec)具有变体 ExecContext,可以将上下文作为第一个参数传递给该变体。这允许在应用程序流程的任何阶段传递上下文。例如,用户可以在通过 ConnContext 建立连接或通过 QueryRowContext 请求查询行时传递上下文。下面显示了所有可用方法的示例。

有关使用上下文传递截止时间、取消信号、查询 ID、配额密钥和连接设置的更多详细信息,请参阅 ClickHouse API 的使用 Context。

ctx := clickhouse.Context(context.Background(), clickhouse.WithSettings(clickhouse.Settings{
"allow_experimental_object_type": "1",
}))
conn.ExecContext(ctx, "DROP TABLE IF EXISTS example")
// to create a JSON column we need allow_experimental_object_type=1
if _, err = conn.ExecContext(ctx, `
CREATE TABLE example (
Col1 JSON
)
Engine Memory
`); err != nil {
return err
}

// queries can be cancelled using the context
ctx, cancel := context.WithCancel(context.Background())
go func() {
cancel()
}()
if err = conn.QueryRowContext(ctx, "SELECT sleep(3)").Scan(); err == nil {
return fmt.Errorf("expected cancel")
}

// set a deadline for a query - this will cancel the query after the absolute time is reached. Again terminates the connection only,
// queries will continue to completion in ClickHouse
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(-time.Second))
defer cancel()
if err := conn.PingContext(ctx); err == nil {
return fmt.Errorf("expected deadline exceeeded")
}

// set a query id to assist tracing queries in logs e.g. see system.query_log
var one uint8
ctx = clickhouse.Context(context.Background(), clickhouse.WithQueryID(uuid.NewString()))
if err = conn.QueryRowContext(ctx, "SELECT 1").Scan(&one); err != nil {
return err
}

conn.ExecContext(context.Background(), "DROP QUOTA IF EXISTS foobar")
defer func() {
conn.ExecContext(context.Background(), "DROP QUOTA IF EXISTS foobar")
}()
ctx = clickhouse.Context(context.Background(), clickhouse.WithQuotaKey("abcde"))
// set a quota key - first create the quota
if _, err = conn.ExecContext(ctx, "CREATE QUOTA IF NOT EXISTS foobar KEYED BY client_key FOR INTERVAL 1 minute MAX queries = 5 TO default"); err != nil {
return err
}

// queries can be cancelled using the context
ctx, cancel = context.WithCancel(context.Background())
// we will get some results before cancel
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{
"max_block_size": "1",
}))
rows, err := conn.QueryContext(ctx, "SELECT sleepEachRow(1), number FROM numbers(100);")
if err != nil {
return err
}
var (
col1 uint8
col2 uint8
)

for rows.Next() {
if err := rows.Scan(&col1, &col2); err != nil {
if col2 > 3 {
fmt.Println("expected cancel")
return nil
}
return err
}
fmt.Printf("row: col2=%d\n", col2)
if col2 == 3 {
cancel()
}
}

完整示例

会话

虽然原生连接固有地具有会话,但通过 HTTP 的连接需要用户创建会话 ID 以在上下文中作为设置传递。这允许使用绑定到会话的功能,例如临时表。

conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Protocol: clickhouse.HTTP,
Settings: clickhouse.Settings{
"session_id": uuid.NewString(),
},
})
if _, err := conn.Exec(`DROP TABLE IF EXISTS example`); err != nil {
return err
}
_, err = conn.Exec(`
CREATE TEMPORARY TABLE IF NOT EXISTS example (
Col1 UInt8
)
`)
if err != nil {
return err
}
scope, err := conn.Begin()
if err != nil {
return err
}
batch, err := scope.Prepare("INSERT INTO example")
if err != nil {
return err
}
for i := 0; i < 10; i++ {
_, err := batch.Exec(
uint8(i),
)
if err != nil {
return err
}
}
rows, err := conn.Query("SELECT * FROM example")
if err != nil {
return err
}
var (
col1 uint8
)
for rows.Next() {
if err := rows.Scan(&col1); err != nil {
return err
}
fmt.Printf("row: col1=%d\n", col1)
}

完整示例

动态扫描

ClickHouse API 类似,列类型信息可用于允许用户创建可以传递给 Scan 的正确类型变量的运行时实例。这允许读取类型未知的列。

const query = `
SELECT
1 AS Col1
, 'Text' AS Col2
`
rows, err := conn.QueryContext(context.Background(), query)
if err != nil {
return err
}
columnTypes, err := rows.ColumnTypes()
if err != nil {
return err
}
vars := make([]interface{}, len(columnTypes))
for i := range columnTypes {
vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
}
for rows.Next() {
if err := rows.Scan(vars...); err != nil {
return err
}
for _, v := range vars {
switch v := v.(type) {
case *string:
fmt.Println(*v)
case *uint8:
fmt.Println(*v)
}
}
}

完整示例

外部表

外部表允许客户端将数据发送到 ClickHouse,并执行 SELECT 查询。此数据放入临时表中,并可以在查询本身中用于评估。

要将外部数据与查询一起发送到客户端,用户必须通过 ext.NewTable 构建外部表,然后在上下文中传递它。

table1, err := ext.NewTable("external_table_1",
ext.Column("col1", "UInt8"),
ext.Column("col2", "String"),
ext.Column("col3", "DateTime"),
)
if err != nil {
return err
}

for i := 0; i < 10; i++ {
if err = table1.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now()); err != nil {
return err
}
}

table2, err := ext.NewTable("external_table_2",
ext.Column("col1", "UInt8"),
ext.Column("col2", "String"),
ext.Column("col3", "DateTime"),
)

for i := 0; i < 10; i++ {
table2.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now())
}
ctx := clickhouse.Context(context.Background(),
clickhouse.WithExternalTable(table1, table2),
)
rows, err := conn.QueryContext(ctx, "SELECT * FROM external_table_1")
if err != nil {
return err
}
for rows.Next() {
var (
col1 uint8
col2 string
col3 time.Time
)
rows.Scan(&col1, &col2, &col3)
fmt.Printf("col1=%d, col2=%s, col3=%v\n", col1, col2, col3)
}
rows.Close()


var count uint64
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_table_1").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_1: %d\n", count)
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_table_2").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_2: %d\n", count)
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM (SELECT * FROM external_table_1 UNION ALL SELECT * FROM external_table_2)").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_1 UNION external_table_2: %d\n", count)

完整示例

Open Telemetry

ClickHouse 允许将 跟踪上下文 作为原生协议的一部分传递。客户端允许通过函数 clickhouse.withSpan 创建 Span,并通过 Context 传递以实现此目的。当使用 HTTP 作为传输时,不支持此功能。

var count uint64
rows := conn.QueryRowContext(clickhouse.Context(context.Background(), clickhouse.WithSpan(
trace.NewSpanContext(trace.SpanContextConfig{
SpanID: trace.SpanID{1, 2, 3, 4, 5},
TraceID: trace.TraceID{5, 4, 3, 2, 1},
}),
)), "SELECT COUNT() FROM (SELECT number FROM system.numbers LIMIT 5)")
if err := rows.Scan(&count); err != nil {
return err
}
fmt.Printf("count: %d\n", count)

完整示例

性能提示

  • 尽可能利用 ClickHouse API,尤其是对于原始类型。这避免了大量的反射和间接。
  • 如果读取大型数据集,请考虑修改 BlockBufferSize。这将增加内存占用,但意味着在行迭代期间可以并行解码更多块。默认值 2 是保守的,并且最大限度地减少了内存开销。较高的值意味着内存中更多的块。这需要测试,因为不同的查询可能会产生不同的块大小。因此,可以通过 Context 在 查询级别 设置它。
  • 在插入数据时,请指定您的类型。虽然客户端旨在灵活,例如,允许解析字符串以用于 UUID 或 IP,但这需要数据验证并在插入时产生开销。
  • 尽可能使用面向列的插入。同样,这些应该是强类型的,避免客户端转换您的值。
  • 遵循 ClickHouse 建议 以获得最佳插入性能。