ClickHouse Go
一个简单的示例
让我们用一个简单的例子来介绍 Go。这个例子将连接到 ClickHouse 并从系统数据库中选择数据。要开始,你需要你的连接详细信息。
连接详细信息
要使用原生 TCP 连接到 ClickHouse,你需要以下信息:
主机和端口:通常,使用 TLS 时端口为 9440,不使用 TLS 时端口为 9000。
数据库名称:默认情况下有一个名为
default
的数据库,请使用你想要连接到的数据库的名称。用户名和密码:默认情况下用户名为
default
。请使用适合你的用例的用户名。
你的 ClickHouse Cloud 服务的详细信息可以在 ClickHouse Cloud 控制台中找到。选择你将连接到的服务,然后点击 **连接**。
选择 **原生**,详细信息将在 clickhouse-client
命令的示例中提供。
如果你使用的是自管理的 ClickHouse,连接详细信息由你的 ClickHouse 管理员设置。
初始化模块
mkdir clickhouse-golang-example
cd clickhouse-golang-example
go mod init clickhouse-golang-example
复制一些示例代码
将这段代码复制到 clickhouse-golang-example
目录中,命名为 main.go
。
package main
import (
"context"
"crypto/tls"
"fmt"
"log"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)
func main() {
conn, err := connect()
if err != nil {
panic((err))
}
ctx := context.Background()
rows, err := conn.Query(ctx, "SELECT name,toString(uuid) as uuid_str FROM system.tables LIMIT 5")
if err != nil {
log.Fatal(err)
}
for rows.Next() {
var (
name, uuid string
)
if err := rows.Scan(
&name,
&uuid,
); err != nil {
log.Fatal(err)
}
log.Printf("name: %s, uuid: %s",
name, uuid)
}
}
func connect() (driver.Conn, error) {
var (
ctx = context.Background()
conn, err = clickhouse.Open(&clickhouse.Options{
Addr: []string{"<CLICKHOUSE_SECURE_NATIVE_HOSTNAME>:9440"},
Auth: clickhouse.Auth{
Database: "default",
Username: "default",
Password: "<DEFAULT_USER_PASSWORD>",
},
ClientInfo: clickhouse.ClientInfo{
Products: []struct {
Name string
Version string
}{
{Name: "an-example-go-client", Version: "0.1"},
},
},
Debugf: func(format string, v ...interface{}) {
fmt.Printf(format, v)
},
TLS: &tls.Config{
InsecureSkipVerify: true,
},
})
)
if err != nil {
return nil, err
}
if err := conn.Ping(ctx); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
fmt.Printf("Exception [%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
}
return nil, err
}
return conn, nil
}
运行 go mod tidy
go mod tidy
设置你的连接详细信息
你之前已经查找了你的连接详细信息。在 main.go
的 connect()
函数中设置它们。
func connect() (driver.Conn, error) {
var (
ctx = context.Background()
conn, err = clickhouse.Open(&clickhouse.Options{
Addr: []string{"<CLICKHOUSE_SECURE_NATIVE_HOSTNAME>:9440"},
Auth: clickhouse.Auth{
Database: "default",
Username: "default",
Password: "<DEFAULT_USER_PASSWORD>",
},
运行示例
go run .
2023/03/06 14:18:33 name: COLUMNS, uuid: 00000000-0000-0000-0000-000000000000
2023/03/06 14:18:33 name: SCHEMATA, uuid: 00000000-0000-0000-0000-000000000000
2023/03/06 14:18:33 name: TABLES, uuid: 00000000-0000-0000-0000-000000000000
2023/03/06 14:18:33 name: VIEWS, uuid: 00000000-0000-0000-0000-000000000000
2023/03/06 14:18:33 name: hourly_data, uuid: a4e36bd4-1e82-45b3-be77-74a0fe65c52b
了解更多
本类别中的其余文档涵盖了 ClickHouse Go 客户端的详细信息。
ClickHouse Go 客户端
ClickHouse 支持两个官方 Go 客户端。这两个客户端是互补的,并且有意地支持不同的用例。
- clickhouse-go - 高级语言客户端,支持 Go 标准数据库/sql 接口或原生接口。
- ch-go - 低级客户端。仅原生接口。
clickhouse-go 提供了一个高级接口,允许用户使用面向行的语义和批处理来查询和插入数据,这些语义和批处理对数据类型比较宽松 - 只要不发生潜在的精度损失,值就会被转换。而 ch-go 提供了一个优化的面向列的接口,它以牺牲类型严格性和更复杂的使用方式为代价,提供了快速的数据块流式传输,并具有低 CPU 和内存开销。
从 2.3 版本开始,clickhouse-go 使用 ch-go 来执行低级函数,例如编码、解码和压缩。请注意,clickhouse-go 还支持 Go database/sql
接口标准。这两个客户端都使用原生格式进行编码,以提供最佳性能,并且可以通过 ClickHouse 原生协议进行通信。clickhouse-go 还支持 HTTP 作为其传输机制,用于用户需要代理或负载均衡流量的情况。
在选择客户端库时,用户应该了解其各自的优缺点 - 请参阅选择客户端库。
原生格式 | 原生协议 | HTTP 协议 | 面向行 API | 面向列 API | 类型灵活性 | 压缩 | 查询占位符 | |
---|---|---|---|---|---|---|---|---|
clickhouse-go | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
ch-go | ✅ | ✅ | ✅ | ✅ |
选择客户端
选择客户端库取决于你的使用模式和对最佳性能的需求。对于插入量很大的用例,每秒需要插入数百万条数据,我们建议使用低级客户端 ch-go。该客户端避免了将数据从面向行的格式转换为列的关联开销,因为 ClickHouse 原生格式需要这样做。此外,它避免了任何反射或使用 interface{}
(any
) 类型来简化使用。
对于以聚合为中心或插入吞吐量较低的查询工作负载,clickhouse-go 提供了一个熟悉的 database/sql
接口和更简单的面向行语义。用户还可以选择使用 HTTP 作为传输协议,并利用帮助器函数将行编组到结构体中,以及从结构体中解组。
clickhouse-go 客户端
clickhouse-go 客户端提供了两种 API 接口来与 ClickHouse 通信。
- ClickHouse 客户端特定的 API
database/sql
标准 - 由 Golang 提供的围绕 SQL 数据库的通用接口。
虽然 database/sql
提供了一个与数据库无关的接口,允许开发人员抽象他们的数据存储,但它强制执行了一些类型和查询语义,这些语义会影响性能。因此,在 性能很重要 的情况下,应该使用客户端特定的 API。但是,希望将 ClickHouse 集成到支持多个数据库的工具中的用户,可能更喜欢使用标准接口。
这两个接口都使用 原生格式 和原生协议进行数据编码和通信。此外,标准接口支持通过 HTTP 进行通信。
原生格式 | 原生协议 | HTTP 协议 | 批量写入支持 | 结构体编组 | 压缩 | 查询占位符 | |
---|---|---|---|---|---|---|---|
ClickHouse API | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | |
database/sql API | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
安装
驱动程序的 v1 版本已弃用,不会发布新功能更新或对新 ClickHouse 类型的支持。用户应该迁移到 v2 版本,它提供了更好的性能。
要安装客户端的 2.x 版本,请将包添加到你的 go.mod 文件中。
require github.com/ClickHouse/clickhouse-go/v2 main
或者,克隆存储库。
git clone --branch v2 https://github.com/clickhouse/clickhouse-go.git $GOPATH/src/github
要安装其他版本,请相应地修改路径或分支名称。
mkdir my-clickhouse-app && cd my-clickhouse-app
cat > go.mod <<-END
module my-clickhouse-app
go 1.18
require github.com/ClickHouse/clickhouse-go/v2 main
END
cat > main.go <<-END
package main
import (
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
)
func main() {
conn, _ := clickhouse.Open(&clickhouse.Options{Addr: []string{"127.0.0.1:9000"}})
v, _ := conn.ServerVersion()
fmt.Println(v.String())
}
END
go mod tidy
go run main.go
版本控制和兼容性
客户端独立于 ClickHouse 发布。2.x 代表当前正在开发的主要版本。所有 2.x 版本应该彼此兼容。
ClickHouse 兼容性
客户端支持
- 所有目前支持的 ClickHouse 版本,如 此处 记录。当 ClickHouse 版本不再受支持时,它们也不再针对客户端版本进行主动测试。
- 从客户端发布日期起 2 年内的所有 ClickHouse 版本。请注意,仅 LTS 版本会进行主动测试。
Golang 兼容性
客户端版本 | Golang 版本 |
---|---|
=> 2.0 <= 2.2 | 1.17, 1.18 |
>= 2.3 | 1.18 |
ClickHouse 客户端 API
所有 ClickHouse 客户端 API 的代码示例都可以在 此处 找到。
连接
以下示例演示了如何连接到 ClickHouse,它返回服务器版本 - 假设 ClickHouse 未经保护,并且可以通过默认用户访问。
请注意,我们使用默认的原生端口进行连接。
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
fmt.Println(v)
在所有后续示例中,除非明确显示,我们假设使用了 ClickHouse conn
变量,并且该变量可用。
连接设置
打开连接时,可以使用 Options 结构体来控制客户端行为。以下设置可用:
Protocol
- Native 或 HTTP。目前,HTTP 仅受 database/sql API 支持。TLS
- TLS 选项。非空值启用 TLS。请参阅 使用 TLS。Addr
- 包含端口的地址列表。Auth
- 身份验证详细信息。请参阅 身份验证。DialContext
- 自定义拨号函数,用于确定如何建立连接。Debug
- 布尔值,用于启用调试功能。Debugf
- 提供一个函数来使用调试输出。需要将debug
设置为 true。Settings
- ClickHouse 设置的映射。这些设置将应用于所有 ClickHouse 查询。 使用上下文 允许为每个查询设置设置。Compression
- 为块启用压缩。请参阅 压缩。DialTimeout
- 建立连接的最大时间。默认为1s
。MaxOpenConns
- 任何时间可用的最大连接数。空闲池中可能存在更多或更少的连接,但一次只能使用这个数量的连接。默认为 MaxIdleConns+5。MaxIdleConns
- 池中维护的连接数量。如果可能,连接将被重用。默认为5
。ConnMaxLifetime
- 可用连接的最大生命周期。默认为 1 小时。连接在此时间后会被销毁,并在需要时向池中添加新连接。ConnOpenStrategy
- 确定如何使用节点地址列表和使用这些地址打开连接。请参阅 连接到多个节点。BlockBufferSize
- 一次解码到缓冲区的最大块数。较大的值会以内存为代价提高并行化。块大小取决于查询,因此虽然您可以在连接上设置此值,但我们建议您根据数据返回情况在每个查询中覆盖此值。默认为2
。
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
DialContext: func(ctx context.Context, addr string) (net.Conn, error) {
dialCount++
var d net.Dialer
return d.DialContext(ctx, "tcp", addr)
},
Debug: true,
Debugf: func(format string, v ...interface{}) {
fmt.Printf(format, v)
},
Settings: clickhouse.Settings{
"max_execution_time": 60,
},
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
DialTimeout: time.Duration(10) * time.Second,
MaxOpenConns: 5,
MaxIdleConns: 5,
ConnMaxLifetime: time.Duration(10) * time.Minute,
ConnOpenStrategy: clickhouse.ConnOpenInOrder,
BlockBufferSize: 10,
})
if err != nil {
return err
}
连接池
客户端维护一个连接池,根据需要在查询之间重用这些连接。最多,MaxOpenConns
将在任何时间使用,最大池大小由 MaxIdleConns
控制。客户端将为每个查询执行从池中获取一个连接,并将连接返回到池中以供重用。连接用于批处理的整个生命周期,并在 Send()
上释放。
无法保证池中的相同连接将用于后续查询,除非用户将 MaxOpenConns=1
。这很少需要,但可能需要在用户使用临时表的情况下。
此外,请注意 ConnMaxLifetime
默认情况下为 1 小时。如果节点离开集群,这会导致 ClickHouse 的负载变得不平衡。当节点变得不可用时,可能会发生这种情况,连接将平衡到其他节点。这些连接将持续存在,默认情况下不会在 1 小时内刷新,即使有问题的节点返回到集群也是如此。在高负载情况下,请考虑降低此值。
使用 TLS
在低级别,所有客户端连接方法(DSN/OpenDB/Open)都将使用 Go tls 包来建立安全连接。如果 Options 结构包含一个非空 tls.Config
指针,则客户端知道使用 TLS。
env, err := GetNativeTestEnvironment()
if err != nil {
return err
}
cwd, err := os.Getwd()
if err != nil {
return err
}
t := &tls.Config{}
caCert, err := ioutil.ReadFile(path.Join(cwd, "../../tests/resources/CAroot.crt"))
if err != nil {
return err
}
caCertPool := x509.NewCertPool()
successful := caCertPool.AppendCertsFromPEM(caCert)
if !successful {
return err
}
t.RootCAs = caCertPool
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.SslPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
TLS: t,
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
if err != nil {
return err
}
fmt.Println(v.String())
此最小 TLS.Config
通常足以连接到 ClickHouse 服务器上的安全本机端口(通常为 9440)。如果 ClickHouse 服务器没有有效的证书(过期、主机名错误、未由公开认可的根证书颁发机构签署),InsecureSkipVerify 可以为 true,但这强烈不建议。
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.SslPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
TLS: &tls.Config{
InsecureSkipVerify: true,
},
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
如果需要其他 TLS 参数,应用程序代码应在 tls.Config
结构中设置所需字段。这可能包括特定的密码套件、强制使用特定的 TLS 版本(如 1.2 或 1.3)、添加内部 CA 证书链、如果 ClickHouse 服务器需要,添加客户端证书(和私钥),以及大多数其他选项,这些选项伴随着更专业的安全设置。
身份验证
在连接详细信息中指定 Auth 结构以指定用户名和密码。
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
if err != nil {
return err
}
if err != nil {
return err
}
v, err := conn.ServerVersion()
连接到多个节点
可以通过 Addr
结构指定多个地址。
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{"127.0.0.1:9001", "127.0.0.1:9002", fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
if err != nil {
return err
}
fmt.Println(v.String())
两种连接策略可用
ConnOpenInOrder
(默认) - 按顺序使用地址。如果使用列表中较早的地址无法连接,则仅使用较晚的地址。这实际上是一种故障转移策略。ConnOpenRoundRobin
- 使用循环策略在地址之间平衡负载。
这可以通过选项 ConnOpenStrategy
来控制
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{"127.0.0.1:9001", "127.0.0.1:9002", fmt.Sprintf("%s:%d", env.Host, env.Port)},
ConnOpenStrategy: clickhouse.ConnOpenRoundRobin,
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
if err != nil {
return err
}
执行
可以使用 Exec
方法执行任意语句。这对于 DDL 和简单语句很有用。它不应用于较大的插入或查询迭代。
conn.Exec(context.Background(), `DROP TABLE IF EXISTS example`)
err = conn.Exec(context.Background(), `
CREATE TABLE IF NOT EXISTS example (
Col1 UInt8,
Col2 String
) engine=Memory
`)
if err != nil {
return err
}
conn.Exec(context.Background(), "INSERT INTO example VALUES (1, 'test-1')")
请注意将上下文传递给查询的能力。这可用于传递特定于查询级别的设置 - 请参阅 使用上下文。
批量插入
要插入大量行,客户端提供批处理语义。这需要准备一个批处理,可以将行追加到该批处理。最后,通过 Send()
方法发送这些批处理。批处理将保存在内存中,直到执行 Send。
conn, err := GetNativeConnection(nil, nil, nil)
if err != nil {
return err
}
ctx := context.Background()
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
conn.Exec(context.Background(), "DROP TABLE IF EXISTS example")
err = conn.Exec(ctx, `
CREATE TABLE IF NOT EXISTS example (
Col1 UInt8
, Col2 String
, Col3 FixedString(3)
, Col4 UUID
, Col5 Map(String, UInt8)
, Col6 Array(String)
, Col7 Tuple(String, UInt8, Array(Map(String, String)))
, Col8 DateTime
) Engine = Memory
`)
if err != nil {
return err
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
for i := 0; i < 1000; i++ {
err := batch.Append(
uint8(42),
"ClickHouse",
"Inc",
uuid.New(),
map[string]uint8{"key": 1}, // Map(String, UInt8)
[]string{"Q", "W", "E", "R", "T", "Y"}, // Array(String)
[]interface{}{ // Tuple(String, UInt8, Array(Map(String, String)))
"String Value", uint8(5), []map[string]string{
{"key": "value"},
{"key": "value"},
{"key": "value"},
},
},
time.Now(),
)
if err != nil {
return err
}
}
return batch.Send()
有关 ClickHouse 的建议,请参阅 此处。批处理不应在 go-routine 之间共享 - 每个例程构建一个单独的批处理。
从上面的示例中,请注意需要将变量类型与列类型对齐,以便在追加行时对齐。虽然映射通常很明显,但此接口试图灵活,并且将在不发生精度损失的情况下转换类型。例如,以下演示将字符串插入 datetime64 中。
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
for i := 0; i < 1000; i++ {
err := batch.Append(
"2006-01-02 15:04:05.999",
)
if err != nil {
return err
}
}
return batch.Send()
有关每种列类型支持的 Go 类型完整摘要,请参阅 类型转换。
查询行/s
用户可以使用 QueryRow
方法查询单行,或通过 Query
获取一个游标来迭代结果集。前者接受数据的目标以序列化,后者需要在每一行上调用 Scan
。
row := conn.QueryRow(context.Background(), "SELECT * FROM example")
var (
col1 uint8
col2, col3, col4 string
col5 map[string]uint8
col6 []string
col7 []interface{}
col8 time.Time
)
if err := row.Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7, &col8); err != nil {
return err
}
fmt.Printf("row: col1=%d, col2=%s, col3=%s, col4=%s, col5=%v, col6=%v, col7=%v, col8=%v\n", col1, col2, col3, col4, col5, col6, col7, col8)
rows, err := conn.Query(ctx, "SELECT Col1, Col2, Col3 FROM example WHERE Col1 >= 2")
if err != nil {
return err
}
for rows.Next() {
var (
col1 uint8
col2 string
col3 time.Time
)
if err := rows.Scan(&col1, &col2, &col3); err != nil {
return err
}
fmt.Printf("row: col1=%d, col2=%s, col3=%s\n", col1, col2, col3)
}
rows.Close()
return rows.Err()
请注意,在这两种情况下,我们都需要将要将相应的列值序列化到的变量的指针传递给它。这些必须按照 SELECT
语句中指定的顺序传递 - 默认情况下,如果使用 SELECT *
,则将使用列声明的顺序,如上所示。
类似于插入,Scan 方法要求目标变量是适当的类型。这再次旨在灵活,并尽可能地转换类型,前提是不会发生精度损失,例如,上面的示例显示 UUID 列被读入字符串变量。有关每种列类型支持的 Go 类型完整列表,请参阅 类型转换。
最后,请注意将上下文传递给 Query 和 QueryRow 方法的能力。这可用于查询级别的设置 - 请参阅 使用上下文 以获取更多详细信息。
异步插入
异步插入通过 Async 方法支持。这允许用户指定客户端是否应该等待服务器完成插入,或者在数据被接收后响应。这实际上控制着参数 wait_for_async_insert。
conn, err := GetNativeConnection(nil, nil, nil)
if err != nil {
return err
}
ctx := context.Background()
if err := clickhouse_tests.CheckMinServerServerVersion(conn, 21, 12, 0); err != nil {
return nil
}
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
conn.Exec(ctx, `DROP TABLE IF EXISTS example`)
const ddl = `
CREATE TABLE example (
Col1 UInt64
, Col2 String
, Col3 Array(UInt8)
, Col4 DateTime
) ENGINE = Memory
`
if err := conn.Exec(ctx, ddl); err != nil {
return err
}
for i := 0; i < 100; i++ {
if err := conn.AsyncInsert(ctx, fmt.Sprintf(`INSERT INTO example VALUES (
%d, '%s', [1, 2, 3, 4, 5, 6, 7, 8, 9], now()
)`, i, "Golang SQL database driver"), false); err != nil {
return err
}
}
列式插入
插入可以以列格式插入。如果数据已经以这种结构定向,那么这可以通过避免将数据透视到行来提供性能优势。
batch, err := conn.PrepareBatch(context.Background(), "INSERT INTO example")
if err != nil {
return err
}
var (
col1 []uint64
col2 []string
col3 [][]uint8
col4 []time.Time
)
for i := 0; i < 1_000; i++ {
col1 = append(col1, uint64(i))
col2 = append(col2, "Golang SQL database driver")
col3 = append(col3, []uint8{1, 2, 3, 4, 5, 6, 7, 8, 9})
col4 = append(col4, time.Now())
}
if err := batch.Column(0).Append(col1); err != nil {
return err
}
if err := batch.Column(1).Append(col2); err != nil {
return err
}
if err := batch.Column(2).Append(col3); err != nil {
return err
}
if err := batch.Column(3).Append(col4); err != nil {
return err
}
return batch.Send()
使用结构体
对于用户,Golang 结构体提供了 ClickHouse 中数据行的逻辑表示。为了帮助做到这一点,本机接口提供了一些方便的功能。
使用序列化选择
Select 方法允许将一组响应行通过一次调用封送到结构体切片中。
var result []struct {
Col1 uint8
Col2 string
ColumnWithName time.Time `ch:"Col3"`
}
if err = conn.Select(ctx, &result, "SELECT Col1, Col2, Col3 FROM example"); err != nil {
return err
}
for _, v := range result {
fmt.Printf("row: col1=%d, col2=%s, col3=%s\n", v.Col1, v.Col2, v.ColumnWithName)
}
扫描结构体
ScanStruct 允许将查询中的单个行封送到结构体中。
var result struct {
Col1 int64
Count uint64 `ch:"count"`
}
if err := conn.QueryRow(context.Background(), "SELECT Col1, COUNT() AS count FROM example WHERE Col1 = 5 GROUP BY Col1").ScanStruct(&result); err != nil {
return err
}
追加结构体
AppendStruct 允许将结构体追加到现有的 批处理 中,并将其解释为完整的行。这要求结构体的列在名称和类型上都与表对齐。虽然所有列都必须具有等效的结构体字段,但某些结构体字段可能没有等效的列表示。这些将被忽略。
batch, err := conn.PrepareBatch(context.Background(), "INSERT INTO example")
if err != nil {
return err
}
for i := 0; i < 1_000; i++ {
err := batch.AppendStruct(&row{
Col1: uint64(i),
Col2: "Golang SQL database driver",
Col3: []uint8{1, 2, 3, 4, 5, 6, 7, 8, 9},
Col4: time.Now(),
ColIgnored: "this will be ignored",
})
if err != nil {
return err
}
}
类型转换
客户端旨在尽可能灵活,以便接受插入和响应封送的变量类型。在大多数情况下,ClickHouse 列类型存在等效的 Golang 类型,例如,UInt64 到 uint64。这些逻辑映射应该始终得到支持。用户可能希望使用可以插入列或用于接收响应的变量类型,如果变量或接收数据的转换先进行。客户端旨在透明地支持这些转换,因此用户不需要在插入之前精确地转换数据,并提供在查询时灵活的封送。这种透明转换不允许精度损失。例如,不能使用 uint32 来接收来自 UInt64 列的数据。相反,如果字符串符合格式要求,则可以将字符串插入 datetime64 字段。
当前支持的基本类型的类型转换已捕获 此处。
这项工作正在进行中,可以分为插入 (Append
/AppendRow
) 和读取时间(通过 Scan
)。如果您需要特定转换的支持,请提出问题。
复杂类型
日期/时间类型
ClickHouse 的 Go 客户端支持 Date
、Date32
、DateTime
和 DateTime64
日期/时间类型。日期可以以字符串形式插入,格式为 2006-01-02
,也可以使用原生 Go 的 time.Time{}
或 sql.NullTime
。DateTime 也支持后两种类型,但需要以字符串形式传递,格式为 2006-01-02 15:04:05
,可选地带有时区偏移量,例如 2006-01-02 15:04:05 +08:00
。time.Time{}
和 sql.NullTime
在读取时也都支持,以及任何实现了 sql.Scanner
接口的实现。
时区信息的处理取决于 ClickHouse 类型以及值是插入还是读取。
- DateTime/DateTime64
- 在插入时,值将以 UNIX 时间戳格式发送到 ClickHouse。如果没有提供时区,客户端将假设客户端的本地时区。
time.Time{}
或sql.NullTime
将相应地转换为 epoch。 - 在选择时,如果设置了返回
time.Time
值时的列时区,将使用该时区。如果没有设置,将使用服务器的时区。
- 在插入时,值将以 UNIX 时间戳格式发送到 ClickHouse。如果没有提供时区,客户端将假设客户端的本地时区。
- Date/Date32
- 在插入时,任何日期的时区在将日期转换为 UNIX 时间戳时都会被考虑在内,即它将在存储为日期之前被时区偏移,因为日期类型在 ClickHouse 中没有区域设置。如果字符串值中没有指定此信息,将使用本地时区。
- 在选择时,日期将被扫描到
time.Time{}
或sql.NullTime{}
实例中,并将返回不带时区信息。
Array
数组应该以切片的形式插入。元素的类型规则与 原始类型 的类型规则一致,即在可能的情况下,元素将被转换。
在 Scan 时应该提供指向切片的指针。
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
var i int64
for i = 0; i < 10; i++ {
err := batch.Append(
[]string{strconv.Itoa(int(i)), strconv.Itoa(int(i + 1)), strconv.Itoa(int(i + 2)), strconv.Itoa(int(i + 3))},
[][]int64{{i, i + 1}, {i + 2, i + 3}, {i + 4, i + 5}},
)
if err != nil {
return err
}
}
if err := batch.Send(); err != nil {
return err
}
var (
col1 []string
col2 [][]int64
)
rows, err := conn.Query(ctx, "SELECT * FROM example")
if err != nil {
return err
}
for rows.Next() {
if err := rows.Scan(&col1, &col2); err != nil {
return err
}
fmt.Printf("row: col1=%v, col2=%v\n", col1, col2)
}
rows.Close()
Map
映射应该以 Golang 映射的形式插入,其键和值符合 之前 定义的类型规则。
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
var i int64
for i = 0; i < 10; i++ {
err := batch.Append(
map[string]uint64{strconv.Itoa(int(i)): uint64(i)},
map[string][]string{strconv.Itoa(int(i)): {strconv.Itoa(int(i)), strconv.Itoa(int(i + 1)), strconv.Itoa(int(i + 2)), strconv.Itoa(int(i + 3))}},
map[string]map[string]uint64{strconv.Itoa(int(i)): {strconv.Itoa(int(i)): uint64(i)}},
)
if err != nil {
return err
}
}
if err := batch.Send(); err != nil {
return err
}
var (
col1 map[string]uint64
col2 map[string][]string
col3 map[string]map[string]uint64
)
rows, err := conn.Query(ctx, "SELECT * FROM example")
if err != nil {
return err
}
for rows.Next() {
if err := rows.Scan(&col1, &col2, &col3); err != nil {
return err
}
fmt.Printf("row: col1=%v, col2=%v, col3=%v\n", col1, col2, col3)
}
rows.Close()
Tuples
元组表示一组任意长度的列。这些列可以是显式命名的,也可以只指定类型,例如:
//unnamed
Col1 Tuple(String, Int64)
//named
Col2 Tuple(name String, id Int64, age uint8)
在这两种方法中,命名元组提供了更大的灵活性。虽然未命名元组必须使用切片进行插入和读取,但命名元组也与映射兼容。
if err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 Tuple(name String, age UInt8),
Col2 Tuple(String, UInt8),
Col3 Tuple(name String, id String)
)
Engine Memory
`); err != nil {
return err
}
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
// both named and unnamed can be added with slices. Note we can use strongly typed lists and maps if all elements are the same type
if err = batch.Append([]interface{}{"Clicky McClickHouse", uint8(42)}, []interface{}{"Clicky McClickHouse Snr", uint8(78)}, []string{"Dale", "521211"}); err != nil {
return err
}
if err = batch.Append(map[string]interface{}{"name": "Clicky McClickHouse Jnr", "age": uint8(20)}, []interface{}{"Baby Clicky McClickHouse", uint8(1)}, map[string]string{"name": "Geoff", "id": "12123"}); err != nil {
return err
}
if err = batch.Send(); err != nil {
return err
}
var (
col1 map[string]interface{}
col2 []interface{}
col3 map[string]string
)
// named tuples can be retrieved into a map or slices, unnamed just slices
if err = conn.QueryRow(ctx, "SELECT * FROM example").Scan(&col1, &col2, &col3); err != nil {
return err
}
fmt.Printf("row: col1=%v, col2=%v, col3=%v\n", col1, col2, col3)
注意:支持类型化的切片和映射,前提是命名元组中的子列都是相同的类型。
Nested
嵌套字段等效于命名元组的数组。使用方法取决于用户是否将 flatten_nested 设置为 1 或 0。
通过将 flatten_nested 设置为 0,嵌套列保持为单个元组数组。这允许用户使用映射切片进行插入和检索,以及任意级别的嵌套。映射的键必须等于列的名称,如下面的示例所示。
注意:由于映射表示元组,因此它们必须是 map[string]interface{}
类型。目前,值没有强类型。
conn, err := GetNativeConnection(clickhouse.Settings{
"flatten_nested": 0,
}, nil, nil)
if err != nil {
return err
}
ctx := context.Background()
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
conn.Exec(context.Background(), "DROP TABLE IF EXISTS example")
err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 Nested(Col1_1 String, Col1_2 UInt8),
Col2 Nested(
Col2_1 UInt8,
Col2_2 Nested(
Col2_2_1 UInt8,
Col2_2_2 UInt8
)
)
) Engine Memory
`)
if err != nil {
return err
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
var i int64
for i = 0; i < 10; i++ {
err := batch.Append(
[]map[string]interface{}{
{
"Col1_1": strconv.Itoa(int(i)),
"Col1_2": uint8(i),
},
{
"Col1_1": strconv.Itoa(int(i + 1)),
"Col1_2": uint8(i + 1),
},
{
"Col1_1": strconv.Itoa(int(i + 2)),
"Col1_2": uint8(i + 2),
},
},
[]map[string]interface{}{
{
"Col2_2": []map[string]interface{}{
{
"Col2_2_1": uint8(i),
"Col2_2_2": uint8(i + 1),
},
},
"Col2_1": uint8(i),
},
{
"Col2_2": []map[string]interface{}{
{
"Col2_2_1": uint8(i + 2),
"Col2_2_2": uint8(i + 3),
},
},
"Col2_1": uint8(i + 1),
},
},
)
if err != nil {
return err
}
}
if err := batch.Send(); err != nil {
return err
}
var (
col1 []map[string]interface{}
col2 []map[string]interface{}
)
rows, err := conn.Query(ctx, "SELECT * FROM example")
if err != nil {
return err
}
for rows.Next() {
if err := rows.Scan(&col1, &col2); err != nil {
return err
}
fmt.Printf("row: col1=%v, col2=%v\n", col1, col2)
}
rows.Close()
如果 flatten_nested
使用默认值 1,嵌套列将展平成单独的数组。这需要使用嵌套切片进行插入和检索。虽然任意级别的嵌套可能有效,但这并不被正式支持。
conn, err := GetNativeConnection(nil, nil, nil)
if err != nil {
return err
}
ctx := context.Background()
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
conn.Exec(ctx, "DROP TABLE IF EXISTS example")
err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 Nested(Col1_1 String, Col1_2 UInt8),
Col2 Nested(
Col2_1 UInt8,
Col2_2 Nested(
Col2_2_1 UInt8,
Col2_2_2 UInt8
)
)
) Engine Memory
`)
if err != nil {
return err
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
var i uint8
for i = 0; i < 10; i++ {
col1_1_data := []string{strconv.Itoa(int(i)), strconv.Itoa(int(i + 1)), strconv.Itoa(int(i + 2))}
col1_2_data := []uint8{i, i + 1, i + 2}
col2_1_data := []uint8{i, i + 1, i + 2}
col2_2_data := [][][]interface{}{
{
{i, i + 1},
},
{
{i + 2, i + 3},
},
{
{i + 4, i + 5},
},
}
err := batch.Append(
col1_1_data,
col1_2_data,
col2_1_data,
col2_2_data,
)
if err != nil {
return err
}
}
if err := batch.Send(); err != nil {
return err
}
注意:嵌套列必须具有相同的维度。例如,在上面的示例中,Col_2_2
和 Col_2_1
必须具有相同数量的元素。
由于更直观的接口以及对嵌套的正式支持,我们建议使用 flatten_nested=0
。
Geo Types
客户端支持 Point、Ring、Polygon 和 Multi Polygon 地理类型。这些字段在 Golang 中使用包 github.com/paulmach/orb。
if err = conn.Exec(ctx, `
CREATE TABLE example (
point Point,
ring Ring,
polygon Polygon,
mPolygon MultiPolygon
)
Engine Memory
`); err != nil {
return err
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
if err = batch.Append(
orb.Point{11, 22},
orb.Ring{
orb.Point{1, 2},
orb.Point{1, 2},
},
orb.Polygon{
orb.Ring{
orb.Point{1, 2},
orb.Point{12, 2},
},
orb.Ring{
orb.Point{11, 2},
orb.Point{1, 12},
},
},
orb.MultiPolygon{
orb.Polygon{
orb.Ring{
orb.Point{1, 2},
orb.Point{12, 2},
},
orb.Ring{
orb.Point{11, 2},
orb.Point{1, 12},
},
},
orb.Polygon{
orb.Ring{
orb.Point{1, 2},
orb.Point{12, 2},
},
orb.Ring{
orb.Point{11, 2},
orb.Point{1, 12},
},
},
},
); err != nil {
return err
}
if err = batch.Send(); err != nil {
return err
}
var (
point orb.Point
ring orb.Ring
polygon orb.Polygon
mPolygon orb.MultiPolygon
)
if err = conn.QueryRow(ctx, "SELECT * FROM example").Scan(&point, &ring, &polygon, &mPolygon); err != nil {
return err
}
UUID
UUID 类型由 github.com/google/uuid 包支持。用户也可以将 uuid 作为字符串或任何实现 sql.Scanner
或 Stringify
的类型发送和编组。
if err = conn.Exec(ctx, `
CREATE TABLE example (
col1 UUID,
col2 UUID
)
Engine Memory
`); err != nil {
return err
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
col1Data, _ := uuid.NewUUID()
if err = batch.Append(
col1Data,
"603966d6-ed93-11ec-8ea0-0242ac120002",
); err != nil {
return err
}
if err = batch.Send(); err != nil {
return err
}
var (
col1 uuid.UUID
col2 uuid.UUID
)
if err = conn.QueryRow(ctx, "SELECT * FROM example").Scan(&col1, &col2); err != nil {
return err
}
Decimal
Decimal 类型由 github.com/shopspring/decimal 包支持。
if err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 Decimal32(3),
Col2 Decimal(18,6),
Col3 Decimal(15,7),
Col4 Decimal128(8),
Col5 Decimal256(9)
) Engine Memory
`); err != nil {
return err
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
if err = batch.Append(
decimal.New(25, 4),
decimal.New(30, 5),
decimal.New(35, 6),
decimal.New(135, 7),
decimal.New(256, 8),
); err != nil {
return err
}
if err = batch.Send(); err != nil {
return err
}
var (
col1 decimal.Decimal
col2 decimal.Decimal
col3 decimal.Decimal
col4 decimal.Decimal
col5 decimal.Decimal
)
if err = conn.QueryRow(ctx, "SELECT * FROM example").Scan(&col1, &col2, &col3, &col4, &col5); err != nil {
return err
}
fmt.Printf("col1=%v, col2=%v, col3=%v, col4=%v, col5=%v\n", col1, col2, col3, col4, col5)
Nullable
Go 值 Nil 表示 ClickHouse 的 NULL。如果字段被声明为 Nullable,则可以使用此值。在插入时,可以为列的普通版本和 Nullable 版本传递 Nil。对于前者,将持久化类型的默认值,例如,对于字符串,将持久化空字符串。对于 Nullable 版本,将在 ClickHouse 中存储 NULL 值。
在 Scan 时,用户必须传递指向支持 nil 的类型的指针,例如 *string,以便为 Nullable 字段表示 nil 值。在下面的示例中,col1 是 Nullable(String),因此接收 **string。这允许表示 nil。
if err = conn.Exec(ctx, `
CREATE TABLE example (
col1 Nullable(String),
col2 String,
col3 Nullable(Int8),
col4 Nullable(Int64)
)
Engine Memory
`); err != nil {
return err
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
if err = batch.Append(
nil,
nil,
nil,
sql.NullInt64{Int64: 0, Valid: false},
); err != nil {
return err
}
if err = batch.Send(); err != nil {
return err
}
var (
col1 *string
col2 string
col3 *int8
col4 sql.NullInt64
)
if err = conn.QueryRow(ctx, "SELECT * FROM example").Scan(&col1, &col2, &col3, &col4); err != nil {
return err
}
客户端还支持 sql.Null*
类型,例如 sql.NullInt64
。它们与其等效的 ClickHouse 类型兼容。
Big Ints - Int128, Int256, UInt128, UInt256
大于 64 位的数字类型使用原生 Go big 包表示。
if err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 Int128,
Col2 UInt128,
Col3 Array(Int128),
Col4 Int256,
Col5 Array(Int256),
Col6 UInt256,
Col7 Array(UInt256)
) Engine Memory`); err != nil {
return err
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
col1Data, _ := new(big.Int).SetString("170141183460469231731687303715884105727", 10)
col2Data := big.NewInt(128)
col3Data := []*big.Int{
big.NewInt(-128),
big.NewInt(128128),
big.NewInt(128128128),
}
col4Data := big.NewInt(256)
col5Data := []*big.Int{
big.NewInt(256),
big.NewInt(256256),
big.NewInt(256256256256),
}
col6Data := big.NewInt(256)
col7Data := []*big.Int{
big.NewInt(256),
big.NewInt(256256),
big.NewInt(256256256256),
}
if err = batch.Append(col1Data, col2Data, col3Data, col4Data, col5Data, col6Data, col7Data); err != nil {
return err
}
if err = batch.Send(); err != nil {
return err
}
var (
col1 big.Int
col2 big.Int
col3 []*big.Int
col4 big.Int
col5 []*big.Int
col6 big.Int
col7 []*big.Int
)
if err = conn.QueryRow(ctx, "SELECT * FROM example").Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7); err != nil {
return err
}
fmt.Printf("col1=%v, col2=%v, col3=%v, col4=%v, col5=%v, col6=%v, col7=%v\n", col1, col2, col3, col4, col5, col6, col7)
Compression
对压缩方法的支持取决于所使用的底层协议。对于原生协议,客户端支持 LZ4
和 ZSTD
压缩。这仅在块级别执行。可以通过在连接中包含 Compression
配置来启用压缩。
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionZSTD,
},
MaxOpenConns: 1,
})
ctx := context.Background()
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
conn.Exec(context.Background(), "DROP TABLE IF EXISTS example")
if err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 Array(String)
) Engine Memory
`); err != nil {
return err
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
for i := 0; i < 1000; i++ {
if err := batch.Append([]string{strconv.Itoa(i), strconv.Itoa(i + 1), strconv.Itoa(i + 2), strconv.Itoa(i + 3)}); err != nil {
return err
}
}
if err := batch.Send(); err != nil {
return err
}
如果使用 HTTP 上的标准接口,则可以使用其他压缩技术。有关更多详细信息,请参阅 database/sql API - Compression。
Parameter Binding
客户端支持 Exec、Query 和 QueryRow 方法的参数绑定。如下面的示例所示,可以使用命名参数、编号参数和位置参数来支持此功能。我们提供了以下示例。
var count uint64
// positional bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 >= ? AND Col3 < ?", 500, now.Add(time.Duration(750)*time.Second)).Scan(&count); err != nil {
return err
}
// 250
fmt.Printf("Positional bind count: %d\n", count)
// numeric bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= $2 AND Col3 > $1", now.Add(time.Duration(150)*time.Second), 250).Scan(&count); err != nil {
return err
}
// 100
fmt.Printf("Numeric bind count: %d\n", count)
// named bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= @col1 AND Col3 > @col3", clickhouse.Named("col1", 100), clickhouse.Named("col3", now.Add(time.Duration(50)*time.Second))).Scan(&count); err != nil {
return err
}
// 50
fmt.Printf("Named bind count: %d\n", count)
Special Cases
默认情况下,如果将切片作为查询的参数传递,它们将被展开为逗号分隔的值列表。如果用户需要一组带有包装 [ ]
的值进行注入,则应使用 ArraySet。
如果需要组/元组,并带有包装 ( )
(例如,用于 IN 运算符),则用户可以使用 GroupSet。这对于需要多个组的情况特别有用,如下面的示例所示。
最后,DateTime64 字段需要精度才能确保参数被正确地呈现。客户端不知道该字段的精度级别,因此用户必须提供它。为了方便此操作,我们提供了 DateNamed
参数。
var count uint64
// arrays will be unfolded
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 IN (?)", []int{100, 200, 300, 400, 500}).Scan(&count); err != nil {
return err
}
fmt.Printf("Array unfolded count: %d\n", count)
// arrays will be preserved with []
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col4 = ?", clickhouse.ArraySet{300, 301}).Scan(&count); err != nil {
return err
}
fmt.Printf("Array count: %d\n", count)
// Group sets allow us to form ( ) lists
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 IN ?", clickhouse.GroupSet{[]interface{}{100, 200, 300, 400, 500}}).Scan(&count); err != nil {
return err
}
fmt.Printf("Group count: %d\n", count)
// More useful when we need nesting
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE (Col1, Col5) IN (?)", []clickhouse.GroupSet{{[]interface{}{100, 101}}, {[]interface{}{200, 201}}}).Scan(&count); err != nil {
return err
}
fmt.Printf("Group count: %d\n", count)
// Use DateNamed when you need a precision in your time#
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col3 >= @col3", clickhouse.DateNamed("col3", now.Add(time.Duration(500)*time.Millisecond), clickhouse.NanoSeconds)).Scan(&count); err != nil {
return err
}
fmt.Printf("NamedDate count: %d\n", count)
Using Context
Go 上下文提供了一种方法,可以在 API 边界之间传递截止时间、取消信号和其他请求范围内的值。连接上的所有方法都接受上下文作为其第一个变量。虽然之前的示例使用了 context.Background(),但用户可以使用此功能来传递设置和截止时间以及取消查询。
传递使用 withDeadline
创建的上下文允许对查询设置执行时间限制。请注意,这是一个绝对时间,过期只会释放连接并将取消信号发送到 ClickHouse。WithCancel
可以选择用于显式地取消查询。
帮助程序 clickhouse.WithQueryID
和 clickhouse.WithQuotaKey
允许指定查询 ID 和配额密钥。查询 ID 可用于在日志中跟踪查询和取消查询。配额密钥可用于根据唯一的密钥值对 ClickHouse 使用情况施加限制 - 有关更多详细信息,请参阅 配额管理。
用户也可以使用上下文来确保仅对特定查询应用设置 - 而不是对整个连接应用设置,如 连接设置 中所示。
最后,用户可以通过 clickhouse.WithBlockSize
控制块缓冲区的大小。这将覆盖连接级设置 BlockBufferSize
,并控制在任何时间解码并保存在内存中的块的最大数量。较大的值可能意味着更多并行化,但会占用更多内存。
以下示例展示了上述内容。
dialCount := 0
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
DialContext: func(ctx context.Context, addr string) (net.Conn, error) {
dialCount++
var d net.Dialer
return d.DialContext(ctx, "tcp", addr)
},
})
if err != nil {
return err
}
if err := clickhouse_tests.CheckMinServerServerVersion(conn, 22, 6, 1); err != nil {
return nil
}
// we can use context to pass settings to a specific API call
ctx := clickhouse.Context(context.Background(), clickhouse.WithSettings(clickhouse.Settings{
"allow_experimental_object_type": "1",
}))
conn.Exec(ctx, "DROP TABLE IF EXISTS example")
// to create a JSON column we need allow_experimental_object_type=1
if err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 JSON
)
Engine Memory
`); err != nil {
return err
}
// queries can be cancelled using the context
ctx, cancel := context.WithCancel(context.Background())
go func() {
cancel()
}()
if err = conn.QueryRow(ctx, "SELECT sleep(3)").Scan(); err == nil {
return fmt.Errorf("expected cancel")
}
// set a deadline for a query - this will cancel the query after the absolute time is reached.
// queries will continue to completion in ClickHouse
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(-time.Second))
defer cancel()
if err := conn.Ping(ctx); err == nil {
return fmt.Errorf("expected deadline exceeeded")
}
// set a query id to assist tracing queries in logs e.g. see system.query_log
var one uint8
queryId, _ := uuid.NewUUID()
ctx = clickhouse.Context(context.Background(), clickhouse.WithQueryID(queryId.String()))
if err = conn.QueryRow(ctx, "SELECT 1").Scan(&one); err != nil {
return err
}
conn.Exec(context.Background(), "DROP QUOTA IF EXISTS foobar")
defer func() {
conn.Exec(context.Background(), "DROP QUOTA IF EXISTS foobar")
}()
ctx = clickhouse.Context(context.Background(), clickhouse.WithQuotaKey("abcde"))
// set a quota key - first create the quota
if err = conn.Exec(ctx, "CREATE QUOTA IF NOT EXISTS foobar KEYED BY client_key FOR INTERVAL 1 minute MAX queries = 5 TO default"); err != nil {
return err
}
type Number struct {
Number uint64 `ch:"number"`
}
for i := 1; i <= 6; i++ {
var result []Number
if err = conn.Select(ctx, &result, "SELECT number FROM numbers(10)"); err != nil {
return err
}
}
Progress/Profile/Log Information
可以在查询上请求进度、概要文件和日志信息。进度信息将报告关于在 ClickHouse 中已读取和处理的行数和字节数的统计信息。相反,概要文件信息提供了返回给客户端的数据摘要,包括字节数、行数和块数的总数。最后,日志信息提供了有关线程的统计信息,例如内存使用情况和数据速度。
要获取此信息,用户需要使用 上下文,用户可以在其中传递回调函数。
totalRows := uint64(0)
// use context to pass a call back for progress and profile info
ctx := clickhouse.Context(context.Background(), clickhouse.WithProgress(func(p *clickhouse.Progress) {
fmt.Println("progress: ", p)
totalRows += p.Rows
}), clickhouse.WithProfileInfo(func(p *clickhouse.ProfileInfo) {
fmt.Println("profile info: ", p)
}), clickhouse.WithLogs(func(log *clickhouse.Log) {
fmt.Println("log info: ", log)
}))
rows, err := conn.Query(ctx, "SELECT number from numbers(1000000) LIMIT 1000000")
if err != nil {
return err
}
for rows.Next() {
}
fmt.Printf("Total Rows: %d\n", totalRows)
rows.Close()
Dynamic Scanning
用户可能需要读取他们不知道其模式或返回字段类型的表。这在进行临时数据分析或编写通用工具时很常见。为了实现这一点,可以在查询响应中获得列类型信息。这可以与 Go 反射一起使用来创建正确类型变量的运行时实例,这些实例可以传递给 Scan。
const query = `
SELECT
1 AS Col1
, 'Text' AS Col2
`
rows, err := conn.Query(context.Background(), query)
if err != nil {
return err
}
var (
columnTypes = rows.ColumnTypes()
vars = make([]interface{}, len(columnTypes))
)
for i := range columnTypes {
vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
}
for rows.Next() {
if err := rows.Scan(vars...); err != nil {
return err
}
for _, v := range vars {
switch v := v.(type) {
case *string:
fmt.Println(*v)
case *uint8:
fmt.Println(*v)
}
}
}
External tables
外部表 允许客户端使用 SELECT 查询将数据发送到 ClickHouse。这些数据被放入一个临时表中,并可以用于查询本身进行评估。
要使用查询将外部数据发送到客户端,用户必须通过 ext.NewTable 构建一个外部表,然后通过上下文传递它。
table1, err := ext.NewTable("external_table_1",
ext.Column("col1", "UInt8"),
ext.Column("col2", "String"),
ext.Column("col3", "DateTime"),
)
if err != nil {
return err
}
for i := 0; i < 10; i++ {
if err = table1.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now()); err != nil {
return err
}
}
table2, err := ext.NewTable("external_table_2",
ext.Column("col1", "UInt8"),
ext.Column("col2", "String"),
ext.Column("col3", "DateTime"),
)
for i := 0; i < 10; i++ {
table2.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now())
}
ctx := clickhouse.Context(context.Background(),
clickhouse.WithExternalTable(table1, table2),
)
rows, err := conn.Query(ctx, "SELECT * FROM external_table_1")
if err != nil {
return err
}
for rows.Next() {
var (
col1 uint8
col2 string
col3 time.Time
)
rows.Scan(&col1, &col2, &col3)
fmt.Printf("col1=%d, col2=%s, col3=%v\n", col1, col2, col3)
}
rows.Close()
var count uint64
if err := conn.QueryRow(ctx, "SELECT COUNT(*) FROM external_table_1").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_1: %d\n", count)
if err := conn.QueryRow(ctx, "SELECT COUNT(*) FROM external_table_2").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_2: %d\n", count)
if err := conn.QueryRow(ctx, "SELECT COUNT(*) FROM (SELECT * FROM external_table_1 UNION ALL SELECT * FROM external_table_2)").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_1 UNION external_table_2: %d\n", count)
Open Telemetry
ClickHouse 允许将 跟踪上下文 作为原生协议的一部分进行传递。客户端允许通过函数 clickhouse.withSpan
创建一个 Span,并通过上下文传递它来实现此目的。
var count uint64
rows := conn.QueryRow(clickhouse.Context(context.Background(), clickhouse.WithSpan(
trace.NewSpanContext(trace.SpanContextConfig{
SpanID: trace.SpanID{1, 2, 3, 4, 5},
TraceID: trace.TraceID{5, 4, 3, 2, 1},
}),
)), "SELECT COUNT() FROM (SELECT number FROM system.numbers LIMIT 5)")
if err := rows.Scan(&count); err != nil {
return err
}
fmt.Printf("count: %d\n", count)
有关利用跟踪的完整详细信息,请参阅OpenTelemetry 支持。
数据库/SQL API
database/sql
或“标准”API 允许用户在应用程序代码应与底层数据库无关的情况下使用客户端,方法是符合标准接口。这样做会带来一些代价 - 额外的抽象和间接层以及与 ClickHouse 不一定一致的原语。但是,在需要连接到多个数据库的情况下,这些成本通常是可以接受的。
此外,此客户端支持使用 HTTP 作为传输层 - 数据仍然会以本机格式编码以获得最佳性能。
以下旨在反映 ClickHouse API 文档的结构。
标准 API 的完整代码示例可以在此处找到。
连接
连接可以通过具有格式 clickhouse://<host>:<port>?<query_option>=<value>
的 DSN 字符串和 Open
方法,或通过 clickhouse.OpenDB
方法来实现。后者不是 database/sql
规范的一部分,但会返回 sql.DB
实例。此方法提供了诸如分析之类的功能,而通过 database/sql
规范无法以明显的方式公开这些功能。
func Connect() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
return conn.Ping()
}
func ConnectDSN() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("clickhouse://%s:%d?username=%s&password=%s", env.Host, env.Port, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}
在所有后续示例中,除非明确显示,我们假设使用了 ClickHouse conn
变量,并且该变量可用。
连接设置
以下参数可以在 DSN 字符串中传递
hosts
- 逗号分隔的单个地址主机列表,用于负载平衡和故障转移 - 请参阅连接到多个节点。username/password
- 身份验证凭据 - 请参阅身份验证database
- 选择当前默认数据库dial_timeout
- 持续时间字符串是一个可能带符号的十进制数字序列,每个数字都有可选的小数部分和一个单位后缀,例如300ms
、1s
。有效的时间单位是ms
、s
、m
。connection_open_strategy
-random/in_order
(默认值为random
) - 请参阅连接到多个节点round_robin
- 从集合中选择一个循环服务器in_order
- 按指定顺序选择第一个活动服务器
debug
- 启用调试输出(布尔值)compress
- 指定压缩算法 -none
(默认值)、zstd
、lz4
、gzip
、deflate
、br
。如果设置为true
,将使用lz4
。仅lz4
和zstd
支持本机通信。compress_level
- 压缩级别(默认值为0
)。请参阅压缩。这与算法有关gzip
--2
(最快速度)到9
(最佳压缩)deflate
--2
(最快速度)到9
(最佳压缩)br
-0
(最快速度)到11
(最佳压缩)zstd
、lz4
- 被忽略
secure
- 建立安全的 SSL 连接(默认值为false
)skip_verify
- 跳过证书验证(默认值为false
)block_buffer_size
- 允许用户控制块缓冲区大小。请参阅BlockBufferSize。(默认值为2
)
func ConnectSettings() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("clickhouse://127.0.0.1:9001,127.0.0.1:9002,%s:%d/%s?username=%s&password=%s&dial_timeout=10s&connection_open_strategy=round_robin&debug=true&compress=lz4", env.Host, env.Port, env.Database, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}
连接池
用户可以影响提供的节点地址列表的使用,如连接到多个节点中所述。但是,连接管理和池化由设计委托给 sql.DB
。
通过 HTTP 连接
默认情况下,连接是通过本机协议建立的。对于需要 HTTP 的用户,可以通过修改 DSN 以包含 HTTP 协议或通过在连接选项中指定协议来启用此功能。
func ConnectHTTP() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Protocol: clickhouse.HTTP,
})
return conn.Ping()
}
func ConnectDSNHTTP() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("http://%s:%d?username=%s&password=%s", env.Host, env.HttpPort, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}
连接到多个节点
如果使用 OpenDB
,请使用与用于 ClickHouse API 的方法相同的方法连接到多个主机 - 可选地指定 ConnOpenStrategy。
对于基于 DSN 的连接,字符串接受多个主机和一个 connection_open_strategy
参数,该参数的值可以设置为 round_robin
或 in_order
。
func MultiStdHost() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{"127.0.0.1:9001", "127.0.0.1:9002", fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
ConnOpenStrategy: clickhouse.ConnOpenRoundRobin,
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
if err != nil {
return err
}
fmt.Println(v.String())
return nil
}
func MultiStdHostDSN() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("clickhouse://127.0.0.1:9001,127.0.0.1:9002,%s:%d?username=%s&password=%s&connection_open_strategy=round_robin", env.Host, env.Port, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}
使用 TLS
如果使用 DSN 连接字符串,可以通过参数“secure=true”启用 SSL。OpenDB 方法使用与本机 API 的 TLS相同的方法,依赖于指定一个非 nil TLS 结构体。虽然 DSN 连接字符串支持参数 skip_verify 以跳过 SSL 验证,但 OpenDB 方法是更高级 TLS 配置所必需的 - 因为它允许传递配置。
func ConnectSSL() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
cwd, err := os.Getwd()
if err != nil {
return err
}
t := &tls.Config{}
caCert, err := ioutil.ReadFile(path.Join(cwd, "../../tests/resources/CAroot.crt"))
if err != nil {
return err
}
caCertPool := x509.NewCertPool()
successful := caCertPool.AppendCertsFromPEM(caCert)
if !successful {
return err
}
t.RootCAs = caCertPool
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.SslPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
TLS: t,
})
return conn.Ping()
}
func ConnectDSNSSL() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("https://%s:%d?secure=true&skip_verify=true&username=%s&password=%s", env.Host, env.HttpsPort, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}
身份验证
如果使用 OpenDB,可以通过通常的选项传递身份验证信息。对于基于 DSN 的连接,可以在连接字符串中传递用户名和密码 - 作为参数或作为编码在地址中的凭据。
func ConnectAuth() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
return conn.Ping()
}
func ConnectDSNAuth() error {
env, err := GetStdTestEnvironment()
conn, err := sql.Open("clickhouse", fmt.Sprintf("http://%s:%d?username=%s&password=%s", env.Host, env.HttpPort, env.Username, env.Password))
if err != nil {
return err
}
if err = conn.Ping(); err != nil {
return err
}
conn, err = sql.Open("clickhouse", fmt.Sprintf("http://%s:%s@%s:%d", env.Username, env.Password, env.Host, env.HttpPort))
if err != nil {
return err
}
return conn.Ping()
}
执行
获得连接后,用户可以通过 Exec 方法发出 sql
语句以执行。
conn.Exec(`DROP TABLE IF EXISTS example`)
_, err = conn.Exec(`
CREATE TABLE IF NOT EXISTS example (
Col1 UInt8,
Col2 String
) engine=Memory
`)
if err != nil {
return err
}
_, err = conn.Exec("INSERT INTO example VALUES (1, 'test-1')")
此方法不支持接收上下文 - 默认情况下,它使用后台上下文执行。如果需要,用户可以使用 ExecContext - 请参阅使用上下文。
批处理插入
可以通过使用 Being
方法创建 sql.Tx
来实现批处理语义。由此,可以使用 Prepare
方法(使用 INSERT
语句)获得批处理。这将返回一个 sql.Stmt
,可以使用 Exec
方法向其中追加行。批处理将累积在内存中,直到对原始 sql.Tx
执行 Commit
。
batch, err := scope.Prepare("INSERT INTO example")
if err != nil {
return err
}
for i := 0; i < 1000; i++ {
_, err := batch.Exec(
uint8(42),
"ClickHouse", "Inc",
uuid.New(),
map[string]uint8{"key": 1}, // Map(String, UInt8)
[]string{"Q", "W", "E", "R", "T", "Y"}, // Array(String)
[]interface{}{ // Tuple(String, UInt8, Array(Map(String, String)))
"String Value", uint8(5), []map[string]string{
map[string]string{"key": "value"},
map[string]string{"key": "value"},
map[string]string{"key": "value"},
},
},
time.Now(),
)
if err != nil {
return err
}
}
return scope.Commit()
查询行
可以使用 QueryRow 方法查询单行。这将返回一个 *sql.Row,可以在其上调用 Scan,并使用指向变量的指针,将列封送入这些变量中。QueryRowContext 变体允许传递除后台上下文之外的上下文 - 请参阅使用上下文。
row := conn.QueryRow("SELECT * FROM example")
var (
col1 uint8
col2, col3, col4 string
col5 map[string]uint8
col6 []string
col7 interface{}
col8 time.Time
)
if err := row.Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7, &col8); err != nil {
return err
}
迭代多行需要 Query
方法。这将返回一个 *sql.Rows
结构体,可以在其上调用 Next 以迭代行。QueryContext 等效项允许传递上下文。
rows, err := conn.Query("SELECT * FROM example")
if err != nil {
return err
}
var (
col1 uint8
col2, col3, col4 string
col5 map[string]uint8
col6 []string
col7 interface{}
col8 time.Time
)
for rows.Next() {
if err := rows.Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7, &col8); err != nil {
return err
}
fmt.Printf("row: col1=%d, col2=%s, col3=%s, col4=%s, col5=%v, col6=%v, col7=%v, col8=%v\n", col1, col2, col3, col4, col5, col6, col7, col8)
}
异步插入
可以通过使用 ExecContext 方法执行插入来实现异步插入。这应该传递一个启用了异步模式的上下文,如下所示。这允许用户指定客户端是否应等待服务器完成插入或在数据已接收后响应。这实际上控制着参数wait_for_async_insert。
const ddl = `
CREATE TABLE example (
Col1 UInt64
, Col2 String
, Col3 Array(UInt8)
, Col4 DateTime
) ENGINE = Memory
`
if _, err := conn.Exec(ddl); err != nil {
return err
}
ctx := clickhouse.Context(context.Background(), clickhouse.WithStdAsync(false))
{
for i := 0; i < 100; i++ {
_, err := conn.ExecContext(ctx, fmt.Sprintf(`INSERT INTO example VALUES (
%d, '%s', [1, 2, 3, 4, 5, 6, 7, 8, 9], now()
)`, i, "Golang SQL database driver"))
if err != nil {
return err
}
}
}
列式插入
不支持使用标准接口。
使用结构体
不支持使用标准接口。
类型转换
标准 database/sql
接口应支持与ClickHouse API相同的类型。有一些例外,主要适用于复杂类型,我们在下面记录这些例外。与 ClickHouse API 类似,客户端旨在尽可能灵活地接受插入和封送响应的变量类型。请参阅类型转换以了解更多详细信息。
复杂类型
除非另有说明,否则复杂类型处理应与ClickHouse API相同。差异是 database/sql
内部机制的结果。
映射
与 ClickHouse API 不同,标准 API 要求映射在扫描类型时具有强类型。例如,用户不能为 Map(String,String)
字段传递 map[string]interface{}
,而必须使用 map[string]string
。interface{}
变量始终兼容,可用于更复杂的结构。在读取时不支持结构体。
var (
col1Data = map[string]uint64{
"key_col_1_1": 1,
"key_col_1_2": 2,
}
col2Data = map[string]uint64{
"key_col_2_1": 10,
"key_col_2_2": 20,
}
col3Data = map[string]uint64{}
col4Data = []map[string]string{
{"A": "B"},
{"C": "D"},
}
col5Data = map[string]uint64{
"key_col_5_1": 100,
"key_col_5_2": 200,
}
)
if _, err := batch.Exec(col1Data, col2Data, col3Data, col4Data, col5Data); err != nil {
return err
}
if err = scope.Commit(); err != nil {
return err
}
var (
col1 interface{}
col2 map[string]uint64
col3 map[string]uint64
col4 []map[string]string
col5 map[string]uint64
)
if err := conn.QueryRow("SELECT * FROM example").Scan(&col1, &col2, &col3, &col4, &col5); err != nil {
return err
}
fmt.Printf("col1=%v, col2=%v, col3=%v, col4=%v, col5=%v", col1, col2, col3, col4, col5)
插入行为与 ClickHouse API 相同。
压缩
标准 API 支持与本机ClickHouse API相同的压缩算法,即块级别的 lz4
和 zstd
压缩。此外,gzip、deflate 和 br 压缩支持 HTTP 连接。如果启用了这些算法中的任何一种,压缩将在插入和查询响应期间在块上执行。其他请求(例如 ping 或查询请求)将保持未压缩状态。这与 lz4
和 zstd
选项一致。
如果使用 OpenDB
方法建立连接,则可以传递一个 Compression 配置。这包括指定压缩级别(见下文)的能力。如果通过 sql.Open
使用 DSN 连接,请使用参数 compress
。这可以是特定的压缩算法(即 gzip
、deflate
、br
、zstd
或 lz4
)或布尔标志。如果设置为 true,将使用 lz4
。默认值为 none
,即压缩禁用。
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionBrotli,
Level: 5,
},
Protocol: clickhouse.HTTP,
})
conn, err := sql.Open("clickhouse", fmt.Sprintf("http://%s:%d?username=%s&password=%s&compress=gzip&compress_level=5", env.Host, env.HttpPort, env.Username, env.Password))
应用压缩的级别可以通过 DSN 参数 compress_level 或 Compression 选项的 Level 字段控制。这默认为 0,但与算法有关
gzip
--2
(最快速度)到9
(最佳压缩)deflate
--2
(最快速度)到9
(最佳压缩)br
-0
(最快速度)到11
(最佳压缩)zstd
、lz4
- 被忽略
参数绑定
标准 API 支持与ClickHouse API相同的参数绑定功能,允许将参数传递给 Exec、Query 和 QueryRow 方法(及其等效的Context 变体)。支持位置参数、命名参数和编号参数。
var count uint64
// positional bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 >= ? AND Col3 < ?", 500, now.Add(time.Duration(750)*time.Second)).Scan(&count); err != nil {
return err
}
// 250
fmt.Printf("Positional bind count: %d\n", count)
// numeric bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= $2 AND Col3 > $1", now.Add(time.Duration(150)*time.Second), 250).Scan(&count); err != nil {
return err
}
// 100
fmt.Printf("Numeric bind count: %d\n", count)
// named bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= @col1 AND Col3 > @col3", clickhouse.Named("col1", 100), clickhouse.Named("col3", now.Add(time.Duration(50)*time.Second))).Scan(&count); err != nil {
return err
}
// 50
fmt.Printf("Named bind count: %d\n", count)
请注意,特殊情况仍然适用。
使用上下文
标准 API 支持与ClickHouse API相同的功能,即通过上下文传递截止时间、取消信号和其他请求范围的值。与 ClickHouse API 不同,这是通过使用方法的 Context
变体来实现的,例如 Exec
(默认情况下使用后台上下文)有一个变体 ExecContext
,可以将上下文作为第一个参数传递给它。这允许在应用程序流程的任何阶段传递上下文。例如,用户可以在通过 ConnContext
建立连接时,或在通过 QueryRowContext
请求查询行时传递上下文。下面展示了所有可用方法的示例。
有关使用上下文传递截止时间、取消信号、查询 ID、配额密钥和连接设置的更多详细信息,请参阅ClickHouse API中的使用上下文。
ctx := clickhouse.Context(context.Background(), clickhouse.WithSettings(clickhouse.Settings{
"allow_experimental_object_type": "1",
}))
conn.ExecContext(ctx, "DROP TABLE IF EXISTS example")
// to create a JSON column we need allow_experimental_object_type=1
if _, err = conn.ExecContext(ctx, `
CREATE TABLE example (
Col1 JSON
)
Engine Memory
`); err != nil {
return err
}
// queries can be cancelled using the context
ctx, cancel := context.WithCancel(context.Background())
go func() {
cancel()
}()
if err = conn.QueryRowContext(ctx, "SELECT sleep(3)").Scan(); err == nil {
return fmt.Errorf("expected cancel")
}
// set a deadline for a query - this will cancel the query after the absolute time is reached. Again terminates the connection only,
// queries will continue to completion in ClickHouse
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(-time.Second))
defer cancel()
if err := conn.PingContext(ctx); err == nil {
return fmt.Errorf("expected deadline exceeeded")
}
// set a query id to assist tracing queries in logs e.g. see system.query_log
var one uint8
ctx = clickhouse.Context(context.Background(), clickhouse.WithQueryID(uuid.NewString()))
if err = conn.QueryRowContext(ctx, "SELECT 1").Scan(&one); err != nil {
return err
}
conn.ExecContext(context.Background(), "DROP QUOTA IF EXISTS foobar")
defer func() {
conn.ExecContext(context.Background(), "DROP QUOTA IF EXISTS foobar")
}()
ctx = clickhouse.Context(context.Background(), clickhouse.WithQuotaKey("abcde"))
// set a quota key - first create the quota
if _, err = conn.ExecContext(ctx, "CREATE QUOTA IF NOT EXISTS foobar KEYED BY client_key FOR INTERVAL 1 minute MAX queries = 5 TO default"); err != nil {
return err
}
// queries can be cancelled using the context
ctx, cancel = context.WithCancel(context.Background())
// we will get some results before cancel
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{
"max_block_size": "1",
}))
rows, err := conn.QueryContext(ctx, "SELECT sleepEachRow(1), number FROM numbers(100);")
if err != nil {
return err
}
var (
col1 uint8
col2 uint8
)
for rows.Next() {
if err := rows.Scan(&col1, &col2); err != nil {
if col2 > 3 {
fmt.Println("expected cancel")
return nil
}
return err
}
fmt.Printf("row: col2=%d\n", col2)
if col2 == 3 {
cancel()
}
}
会话
虽然原生连接固有地具有会话,但通过 HTTP 连接的用户需要创建一个会话 ID 用于传递上下文作为设置。这允许使用绑定到会话的功能,例如临时表。
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Protocol: clickhouse.HTTP,
Settings: clickhouse.Settings{
"session_id": uuid.NewString(),
},
})
if _, err := conn.Exec(`DROP TABLE IF EXISTS example`); err != nil {
return err
}
_, err = conn.Exec(`
CREATE TEMPORARY TABLE IF NOT EXISTS example (
Col1 UInt8
)
`)
if err != nil {
return err
}
scope, err := conn.Begin()
if err != nil {
return err
}
batch, err := scope.Prepare("INSERT INTO example")
if err != nil {
return err
}
for i := 0; i < 10; i++ {
_, err := batch.Exec(
uint8(i),
)
if err != nil {
return err
}
}
rows, err := conn.Query("SELECT * FROM example")
if err != nil {
return err
}
var (
col1 uint8
)
for rows.Next() {
if err := rows.Scan(&col1); err != nil {
return err
}
fmt.Printf("row: col1=%d\n", col1)
}
动态扫描
类似于ClickHouse API,列类型信息可用,允许用户创建正确类型变量的运行时实例,这些实例可以传递给 Scan。这允许读取类型未知的列。
const query = `
SELECT
1 AS Col1
, 'Text' AS Col2
`
rows, err := conn.QueryContext(context.Background(), query)
if err != nil {
return err
}
columnTypes, err := rows.ColumnTypes()
if err != nil {
return err
}
vars := make([]interface{}, len(columnTypes))
for i := range columnTypes {
vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
}
for rows.Next() {
if err := rows.Scan(vars...); err != nil {
return err
}
for _, v := range vars {
switch v := v.(type) {
case *string:
fmt.Println(*v)
case *uint8:
fmt.Println(*v)
}
}
}
外部表
外部表 允许客户端使用 SELECT
查询将数据发送到 ClickHouse。这些数据将被放到一个临时表中,并且可以在查询本身中用于评估。
要使用查询将外部数据发送到客户端,用户必须通过 ext.NewTable
构建一个外部表,然后通过上下文传递它。
table1, err := ext.NewTable("external_table_1",
ext.Column("col1", "UInt8"),
ext.Column("col2", "String"),
ext.Column("col3", "DateTime"),
)
if err != nil {
return err
}
for i := 0; i < 10; i++ {
if err = table1.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now()); err != nil {
return err
}
}
table2, err := ext.NewTable("external_table_2",
ext.Column("col1", "UInt8"),
ext.Column("col2", "String"),
ext.Column("col3", "DateTime"),
)
for i := 0; i < 10; i++ {
table2.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now())
}
ctx := clickhouse.Context(context.Background(),
clickhouse.WithExternalTable(table1, table2),
)
rows, err := conn.QueryContext(ctx, "SELECT * FROM external_table_1")
if err != nil {
return err
}
for rows.Next() {
var (
col1 uint8
col2 string
col3 time.Time
)
rows.Scan(&col1, &col2, &col3)
fmt.Printf("col1=%d, col2=%s, col3=%v\n", col1, col2, col3)
}
rows.Close()
var count uint64
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_table_1").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_1: %d\n", count)
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_table_2").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_2: %d\n", count)
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM (SELECT * FROM external_table_1 UNION ALL SELECT * FROM external_table_2)").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_1 UNION external_table_2: %d\n", count)
Open Telemetry
ClickHouse 允许将跟踪上下文作为原生协议的一部分传递。客户端允许通过函数 clickhouse.withSpan
创建一个 Span 并通过 Context 传递它来实现这一点。当使用 HTTP 作为传输时,不支持此功能。
var count uint64
rows := conn.QueryRowContext(clickhouse.Context(context.Background(), clickhouse.WithSpan(
trace.NewSpanContext(trace.SpanContextConfig{
SpanID: trace.SpanID{1, 2, 3, 4, 5},
TraceID: trace.TraceID{5, 4, 3, 2, 1},
}),
)), "SELECT COUNT() FROM (SELECT number FROM system.numbers LIMIT 5)")
if err := rows.Scan(&count); err != nil {
return err
}
fmt.Printf("count: %d\n", count)
性能提示
- 尽可能使用 ClickHouse API,特别是对于基本类型。这避免了大量的反射和间接调用。
- 如果读取大型数据集,请考虑修改BlockBufferSize。这将增加内存占用,但意味着在行迭代期间可以并行解码更多块。默认值为 2 比较保守,可以最大程度地减少内存开销。较高的值意味着内存中的块更多。这需要测试,因为不同的查询会生成不同的块大小。因此,它可以通过 Context 在查询级别 上设置。
- 插入数据时,请明确您的类型。虽然客户端的目标是灵活,例如允许字符串解析为 UUID 或 IP,但这需要数据验证,并且在插入时会产生成本。
- 尽可能使用面向列的插入。同样,这些应该具有强类型,避免客户端转换您的值。
- 请遵循 ClickHouse 的建议,以获得最佳插入性能。