一个简单的例子
让我们用一个简单的例子开始。 这将连接到 ClickHouse 并从 system 数据库中选择数据。 要开始,您需要您的连接详细信息。
连接详细信息
要使用原生 TCP 连接到 ClickHouse,您需要这些信息
| 参数 | 描述 |
|---|
HOST 和 PORT | 通常,使用 TLS 时的端口是 9440,不使用 TLS 时的端口是 9000。 |
数据库名称 | 默认情况下,有一个名为 default 的数据库,使用您要连接的数据库的名称。 |
USERNAME 和 PASSWORD | 默认情况下,用户名是 default。 使用适合您用例的用户名。 |
您的 ClickHouse Cloud 服务的详细信息可在 ClickHouse Cloud 控制台中找到。 选择您将连接的服务,然后单击 Connect
选择 Native,详细信息可在示例 clickhouse-client 命令中找到。
如果您使用的是自托管 ClickHouse,则连接详细信息由您的 ClickHouse 管理员设置。
初始化一个模块
mkdir clickhouse-golang-example
cd clickhouse-golang-example
go mod init clickhouse-golang-example
复制一些示例代码
将此代码复制到 clickhouse-golang-example 目录中,作为 main.go。
package main
import (
"context"
"crypto/tls"
"fmt"
"log"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)
func main() {
conn, err := connect()
if err != nil {
panic(err)
}
ctx := context.Background()
rows, err := conn.Query(ctx, "SELECT name, toString(uuid) as uuid_str FROM system.tables LIMIT 5")
if err != nil {
log.Fatal(err)
}
defer rows.Close()
for rows.Next() {
var name, uuid string
if err := rows.Scan(&name, &uuid); err != nil {
log.Fatal(err)
}
log.Printf("name: %s, uuid: %s", name, uuid)
}
// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
log.Fatal(err)
}
}
func connect() (driver.Conn, error) {
var (
ctx = context.Background()
conn, err = clickhouse.Open(&clickhouse.Options{
Addr: []string{"<CLICKHOUSE_SECURE_NATIVE_HOSTNAME>:9440"},
Auth: clickhouse.Auth{
Database: "default",
Username: "default",
Password: "<DEFAULT_USER_PASSWORD>",
},
ClientInfo: clickhouse.ClientInfo{
Products: []struct {
Name string
Version string
}{
{Name: "an-example-go-client", Version: "0.1"},
},
},
Debugf: func(format string, v ...interface{}) {
fmt.Printf(format, v)
},
TLS: &tls.Config{
InsecureSkipVerify: true,
},
})
)
if err != nil {
return nil, err
}
if err := conn.Ping(ctx); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
fmt.Printf("Exception [%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
}
return nil, err
}
return conn, nil
}
运行 go mod tidy
设置您的连接详细信息
之前您查找了您的连接详细信息。 在 main.go 的 connect() 函数中设置它们
func connect() (driver.Conn, error) {
var (
ctx = context.Background()
conn, err = clickhouse.Open(&clickhouse.Options{
#highlight-next-line
Addr: []string{"<CLICKHOUSE_SECURE_NATIVE_HOSTNAME>:9440"},
Auth: clickhouse.Auth{
#highlight-start
Database: "default",
Username: "default",
Password: "<DEFAULT_USER_PASSWORD>",
#highlight-end
},
运行示例
2023/03/06 14:18:33 name: COLUMNS, uuid: 00000000-0000-0000-0000-000000000000
2023/03/06 14:18:33 name: SCHEMATA, uuid: 00000000-0000-0000-0000-000000000000
2023/03/06 14:18:33 name: TABLES, uuid: 00000000-0000-0000-0000-000000000000
2023/03/06 14:18:33 name: VIEWS, uuid: 00000000-0000-0000-0000-000000000000
2023/03/06 14:18:33 name: hourly_data, uuid: a4e36bd4-1e82-45b3-be77-74a0fe65c52b
了解更多
此类别中的其余文档涵盖了 ClickHouse Go 客户端的详细信息。
ClickHouse Go 客户端
ClickHouse 支持两个官方的 Go 客户端。 这些客户端是互补的,并且有意支持不同的用例。
clickhouse-go 提供了一个高级接口,允许用户使用面向行语义和批量处理查询和插入数据,并且对数据类型比较宽松 - 只要没有潜在的精度损失,值将被转换。与此同时,ch-go 提供了一个优化的面向列的接口,它提供快速的数据块流,具有较低的 CPU 和内存开销,但代价是类型严格性和更复杂的用法。
从版本 2.3 开始,Clickhouse-go 针对编码、解码和压缩等底层函数使用 ch-go。 请注意,clickhouse-go 还支持 Go database/sql 接口标准。 两个客户端都使用原生格式进行编码,以提供最佳性能,并且可以通过原生 ClickHouse 协议进行通信。 clickhouse-go 还支持 HTTP 作为其传输机制,用于用户需要代理或负载均衡流量的情况。
在选择客户端库时,您应该了解每个客户端库的各自优缺点。 请参阅 “选择库” 以获取指导。
| 原生格式 | 原生协议 | HTTP 协议 | 行式 API | 列式 API | 类型灵活性 | 压缩 | 查询占位符 |
|---|
| clickhouse-go | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| ch-go | ✅ | ✅ | | | ✅ | | ✅ | |
选择客户端
选择客户端库取决于您的使用模式和对最佳性能的需求。 对于插入量大的用例,例如每秒需要数百万次插入,我们建议使用低级客户端 ch-go。 此客户端避免了将数据从面向行格式转换为列的关联开销,因为 ClickHouse 原生格式需要这样做。 此外,它避免了任何反射或使用 interface{} (any) 类型以简化用法。
对于专注于聚合或较低吞吐量插入工作负载的查询工作负载,clickhouse-go 提供了一个熟悉的 database/sql 接口和更直接的行语义。 您还可以选择使用 HTTP 作为传输协议,并利用辅助函数将行来回编组到结构体。
clickhouse-go 客户端
clickhouse-go 客户端提供两个 API 接口用于与 ClickHouse 通信
- ClickHouse 客户端特定 API
database/sql 标准 - Golang 提供的围绕 SQL 数据库的通用接口。
虽然 database/sql 提供了一个数据库无关的接口,允许开发人员抽象他们的数据存储,但它强制执行一些类型和查询语义,这会影响性能。 因此,当 性能很重要 时,应使用客户端特定的 API。 但是,希望将 ClickHouse 集成到支持多个数据库的工具中的用户可能更喜欢使用标准接口。
两个接口都使用 原生格式 和原生协议进行通信。 此外,标准接口支持通过 HTTP 进行通信。
| 原生格式 | 原生协议 | HTTP 协议 | 批量写入支持 | 结构体编组 | 压缩 | 查询占位符 |
|---|
| ClickHouse API | ✅ | ✅ | | ✅ | ✅ | ✅ | ✅ |
database/sql API | ✅ | ✅ | ✅ | ✅ | | ✅ | ✅ |
v1 驱动程序已弃用,将不会收到功能更新或对新 ClickHouse 类型支持。 您应该迁移到 v2,它提供卓越的性能。
要安装 2.x 版本的客户端,请将包添加到您的 go.mod 文件
require github.com/ClickHouse/clickhouse-go/v2 main
或者,克隆仓库
git clone --branch v2 https://github.com/clickhouse/clickhouse-go.git $GOPATH/src/github
要安装另一个版本,请相应地修改路径或分支名称。
mkdir my-clickhouse-app && cd my-clickhouse-app
cat > go.mod <<-END
module my-clickhouse-app
go 1.18
require github.com/ClickHouse/clickhouse-go/v2 main
END
cat > main.go <<-END
package main
import (
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
)
func main() {
conn, _ := clickhouse.Open(&clickhouse.Options{Addr: []string{"127.0.0.1:9000"}})
v, _ := conn.ServerVersion()
fmt.Println(v.String())
}
END
go mod tidy
go run main.go
版本控制和兼容性
客户端独立于 ClickHouse 发布。 2.x 代表当前正在开发的主要版本。 所有 2.x 版本都应相互兼容。
ClickHouse 兼容性
客户端支持
- 所有当前受支持的 ClickHouse 版本,如 此处 记录。 随着 ClickHouse 版本的不再受支持,它们也不会再对客户端发布进行积极测试。
- 所有 ClickHouse 版本,从客户端发布日期起 2 年内。 请注意,仅对 LTS 版本进行积极测试。
Golang 兼容性
| 客户端版本 | Golang 版本 |
|---|
| => 2.0 <= 2.2 | 1.17, 1.18 |
| >= 2.3 | 1.18 |
ClickHouse 客户端 API
所有 ClickHouse 客户端 API 代码示例都可以在 此处 找到。
以下示例,它返回服务器版本,演示了连接到 ClickHouse - 假设 ClickHouse 没有受到保护并且可以使用默认用户访问。
请注意,我们使用默认原生端口进行连接。
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
fmt.Println(v)
完整示例
对于后续所有示例,除非明确说明,我们假设已创建并可用 ClickHouse conn 变量。
连接设置
打开连接时,可以使用 Options 结构来控制客户端行为。 可用的设置如下
Protocol - Native 或 HTTP。 HTTP 当前仅受 database/sql API 支持。
TLS - TLS 选项。 非空值启用 TLS。 请参阅 使用 TLS。
Addr - 包含端口的地址切片。
Auth - 身份验证详细信息。 请参阅 身份验证。
DialContext - 自定义拨号函数,以确定如何建立连接。
Debug - true/false 以启用调试。
Debugf - 提供一个函数来消耗调试输出。 需要将 debug 设置为 true。
Settings - ClickHouse 设置的映射。 这些将应用于所有 ClickHouse 查询。 使用 Context 允许为每个查询设置设置。
Compression - 启用块压缩。 请参阅 压缩。
DialTimeout - 建立连接的最大时间。 默认为 1s。
MaxOpenConns - 任何时候可以使用的最大连接数。 更多或更少的连接可能在空闲池中,但只有这个数字可以在任何时候使用。 默认为 MaxIdleConns+5。
MaxIdleConns - 要在池中维护的连接数。 如果可能,将重用连接。 默认为 5。
ConnMaxLifetime - 保持连接可用的最大时间。 默认为 1 小时。 连接在经过这段时间后将被销毁,并根据需要向池中添加新连接。
ConnOpenStrategy - 确定应如何使用节点地址列表来打开连接。 请参阅 连接到多个节点。
BlockBufferSize - 一次解码到缓冲区中的最大块数。 较大的值将增加并行性,但代价是内存。 块大小取决于查询,因此虽然可以在连接上设置此值,但我们建议根据其返回的数据,为每个查询覆盖设置。 默认为 2。
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
DialContext: func(ctx context.Context, addr string) (net.Conn, error) {
dialCount++
var d net.Dialer
return d.DialContext(ctx, "tcp", addr)
},
Debug: true,
Debugf: func(format string, v ...interface{}) {
fmt.Printf(format, v)
},
Settings: clickhouse.Settings{
"max_execution_time": 60,
},
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
DialTimeout: time.Duration(10) * time.Second,
MaxOpenConns: 5,
MaxIdleConns: 5,
ConnMaxLifetime: time.Duration(10) * time.Minute,
ConnOpenStrategy: clickhouse.ConnOpenInOrder,
BlockBufferSize: 10,
})
if err != nil {
return err
}
完整示例
连接池
客户端维护一个连接池,根据需要重用这些连接进行查询。 最多,任何时候都可以使用 MaxOpenConns,最大池大小由 MaxIdleConns 控制。 客户端将在每个查询执行期间从池中获取一个连接,并将其返回到池中以供重用。 批处理的生命周期使用一个连接,并在 Send() 上释放。
除非用户设置 MaxOpenConns=1,否则不能保证池中的相同连接将用于后续查询。 这很少需要,但对于用户使用临时表的情况,可能需要这样做。
另外,请注意,ConnMaxLifetime 默认值为 1 小时。如果节点离开集群,这可能导致负载在 ClickHouse 上变得不平衡。当节点不可用时,连接将平衡到其他节点。这些连接将持续存在,默认情况下不会刷新 1 小时,即使出现问题节点返回到集群。在重负载情况下,请考虑降低此值。
连接轮询已为 Native (TCP) 和 HTTP 协议启用。
使用 TLS
在底层,所有客户端连接方法 (DSN/OpenDB/Open) 都将使用 Go tls 包 来建立安全连接。如果 Options 结构体包含一个非 nil 的 tls.Config 指针,客户端就知道要使用 TLS。
env, err := GetNativeTestEnvironment()
if err != nil {
return err
}
cwd, err := os.Getwd()
if err != nil {
return err
}
t := &tls.Config{}
caCert, err := ioutil.ReadFile(path.Join(cwd, "../../tests/resources/CAroot.crt"))
if err != nil {
return err
}
caCertPool := x509.NewCertPool()
successful := caCertPool.AppendCertsFromPEM(caCert)
if !successful {
return err
}
t.RootCAs = caCertPool
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.SslPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
TLS: t,
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
if err != nil {
return err
}
fmt.Println(v.String())
完整示例
这个最小的 TLS.Config 通常足以连接到 ClickHouse 服务器上的安全 native 端口 (通常为 9440)。如果 ClickHouse 服务器没有有效的证书 (过期、主机名错误、未由公共认可的根证书颁发机构签名),可以将 InsecureSkipVerify 设置为 true,但这强烈不建议这样做。
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.SslPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
TLS: &tls.Config{
InsecureSkipVerify: true,
},
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
完整示例
如果需要额外的 TLS 参数,应用程序代码应在 tls.Config 结构体中设置所需的字段。这包括特定的密码套件、强制特定的 TLS 版本 (例如 1.2 或 1.3)、添加内部 CA 证书链、添加客户端证书 (和私钥) (如果 ClickHouse 服务器需要),以及更专业的安全设置中的大多数其他选项。
身份验证
在连接详细信息中指定 Auth 结构体以指定用户名和密码。
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
完整示例
连接到多个节点
可以通过 Addr 结构体指定多个地址。
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{"127.0.0.1:9001", "127.0.0.1:9002", fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
if err != nil {
return err
}
fmt.Println(v.String())
完整示例
有两种连接策略可用
ConnOpenInOrder (默认) - 按照顺序使用地址。如果使用列表中较早的地址连接失败,则仅使用后续地址。这实际上是一种故障转移策略。
ConnOpenRoundRobin - 使用轮询策略在地址之间平衡负载。
可以通过选项 ConnOpenStrategy 进行控制
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{"127.0.0.1:9001", "127.0.0.1:9002", fmt.Sprintf("%s:%d", env.Host, env.Port)},
ConnOpenStrategy: clickhouse.ConnOpenRoundRobin,
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
if err != nil {
return err
}
完整示例
可以通过 Exec 方法执行任意语句。这对于 DDL 和简单语句很有用。不应将其用于较大的插入或查询迭代。
conn.Exec(context.Background(), `DROP TABLE IF EXISTS example`)
err = conn.Exec(context.Background(), `
CREATE TABLE IF NOT EXISTS example (
Col1 UInt8,
Col2 String
) engine=Memory
`)
if err != nil {
return err
}
conn.Exec(context.Background(), "INSERT INTO example VALUES (1, 'test-1')")
完整示例
请注意,可以将 Context 传递给查询。这可用于传递特定的查询级别设置 - 请参阅 使用 Context。
批量插入
为了插入大量行,客户端提供批量语义。这需要准备一个可以附加行的批处理。最后通过 Send() 方法发送。批处理在内存中保留,直到执行 Send。
建议在批处理上调用 Close 以防止泄漏连接。可以在准备批处理后通过 defer 关键字来完成。如果从未调用 Send,这将清理连接。请注意,如果未附加任何行,这将导致查询日志中显示 0 行插入。
conn, err := GetNativeConnection(nil, nil, nil)
if err != nil {
return err
}
ctx := context.Background()
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
conn.Exec(context.Background(), "DROP TABLE IF EXISTS example")
err = conn.Exec(ctx, `
CREATE TABLE IF NOT EXISTS example (
Col1 UInt8
, Col2 String
, Col3 FixedString(3)
, Col4 UUID
, Col5 Map(String, UInt8)
, Col6 Array(String)
, Col7 Tuple(String, UInt8, Array(Map(String, String)))
, Col8 DateTime
) Engine = Memory
`)
if err != nil {
return err
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
defer batch.Close()
for i := 0; i < 1000; i++ {
err := batch.Append(
uint8(42),
"ClickHouse",
"Inc",
uuid.New(),
map[string]uint8{"key": 1}, // Map(String, UInt8)
[]string{"Q", "W", "E", "R", "T", "Y"}, // Array(String)
[]interface{}{ // Tuple(String, UInt8, Array(Map(String, String)))
"String Value", uint8(5), []map[string]string{
{"key": "value"},
{"key": "value"},
{"key": "value"},
},
},
time.Now(),
)
if err != nil {
return err
}
}
return batch.Send()
完整示例
适用于 ClickHouse 的建议 此处适用。批处理不应在 go-routine 之间共享 - 每个 routine 构建一个单独的批处理。
从上面的示例中,请注意需要变量类型与附加行时列类型对齐。虽然映射通常是显而易见的,但此接口试图保持灵活性,并且只要没有精度损失,类型将被转换。例如,以下演示了将字符串插入到 datetime64 中。
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
defer batch.Close()
for i := 0; i < 1000; i++ {
err := batch.Append(
"2006-01-02 15:04:05.999",
)
if err != nil {
return err
}
}
return batch.Send()
完整示例
有关每个列类型支持的 go 类型的完整摘要,请参阅 类型转换。
查询行
您可以使用 QueryRow 方法查询单个行,或者通过 Query 获取用于迭代结果集的游标。虽然前者接受用于将数据序列化到的目标,但后者需要对每一行调用 Scan。
row := conn.QueryRow(context.Background(), "SELECT * FROM example")
var (
col1 uint8
col2, col3, col4 string
col5 map[string]uint8
col6 []string
col7 []interface{}
col8 time.Time
)
if err := row.Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7, &col8); err != nil {
return err
}
fmt.Printf("row: col1=%d, col2=%s, col3=%s, col4=%s, col5=%v, col6=%v, col7=%v, col8=%v\n", col1, col2, col3, col4, col5, col6, col7, col8)
完整示例
rows, err := conn.Query(ctx, "SELECT Col1, Col2, Col3 FROM example WHERE Col1 >= 2")
if err != nil {
return err
}
for rows.Next() {
var (
col1 uint8
col2 string
col3 time.Time
)
if err := rows.Scan(&col1, &col2, &col3); err != nil {
return err
}
fmt.Printf("row: col1=%d, col2=%s, col3=%s\n", col1, col2, col3)
}
rows.Close()
return rows.Err()
完整示例
请注意,在两种情况下,我们都需要传递一个指向我们希望将各自的列值序列化到的变量的指针。这些必须按照 SELECT 语句中指定的顺序传递 - 默认情况下,如果使用 SELECT * (如上所示),则将使用列声明的顺序。
与插入类似,Scan 方法需要目标变量的类型合适。这同样旨在保持灵活性,只要没有可能发生精度损失,就可以转换类型,例如,上面的示例显示 UUID 列被读取到字符串变量中。有关每个列类型支持的 go 类型的完整列表,请参阅 类型转换。
最后,请注意可以将 Context 传递给 Query 和 QueryRow 方法。这可用于查询级别设置 - 有关更多详细信息,请参阅 使用 Context。
异步插入
异步插入通过 Async 方法支持。这允许用户指定客户端是应该等待服务器完成插入还是在收到数据后响应。这有效地控制了参数 wait_for_async_insert。
conn, err := GetNativeConnection(nil, nil, nil)
if err != nil {
return err
}
ctx := context.Background()
if err := clickhouse_tests.CheckMinServerServerVersion(conn, 21, 12, 0); err != nil {
return nil
}
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
conn.Exec(ctx, `DROP TABLE IF EXISTS example`)
const ddl = `
CREATE TABLE example (
Col1 UInt64
, Col2 String
, Col3 Array(UInt8)
, Col4 DateTime
) ENGINE = Memory
`
if err := conn.Exec(ctx, ddl); err != nil {
return err
}
for i := 0; i < 100; i++ {
if err := conn.AsyncInsert(ctx, fmt.Sprintf(`INSERT INTO example VALUES (
%d, '%s', [1, 2, 3, 4, 5, 6, 7, 8, 9], now()
)`, i, "Golang SQL database driver"), false); err != nil {
return err
}
}
完整示例
列式插入
可以以列格式插入插入。如果数据已经以这种结构组织,这可以提供性能优势,从而避免需要转换为行。
batch, err := conn.PrepareBatch(context.Background(), "INSERT INTO example")
if err != nil {
return err
}
defer batch.Close()
var (
col1 []uint64
col2 []string
col3 [][]uint8
col4 []time.Time
)
for i := 0; i < 1_000; i++ {
col1 = append(col1, uint64(i))
col2 = append(col2, "Golang SQL database driver")
col3 = append(col3, []uint8{1, 2, 3, 4, 5, 6, 7, 8, 9})
col4 = append(col4, time.Now())
}
if err := batch.Column(0).Append(col1); err != nil {
return err
}
if err := batch.Column(1).Append(col2); err != nil {
return err
}
if err := batch.Column(2).Append(col3); err != nil {
return err
}
if err := batch.Column(3).Append(col4); err != nil {
return err
}
return batch.Send()
完整示例
使用结构体
对于用户来说,Golang 结构体提供了 ClickHouse 中一行数据的逻辑表示。为了协助完成此操作,native 接口提供了几个方便的函数。
使用 serialize 进行选择
Select 方法允许将一组响应行编组到一个结构体切片中,只需一次调用即可。
var result []struct {
Col1 uint8
Col2 string
ColumnWithName time.Time `ch:"Col3"`
}
if err = conn.Select(ctx, &result, "SELECT Col1, Col2, Col3 FROM example"); err != nil {
return err
}
for _, v := range result {
fmt.Printf("row: col1=%d, col2=%s, col3=%s\n", v.Col1, v.Col2, v.ColumnWithName)
}
完整示例
扫描结构体
ScanStruct 允许将来自查询的单个行编组到结构体中。
var result struct {
Col1 int64
Count uint64 `ch:"count"`
}
if err := conn.QueryRow(context.Background(), "SELECT Col1, COUNT() AS count FROM example WHERE Col1 = 5 GROUP BY Col1").ScanStruct(&result); err != nil {
return err
}
完整示例
附加结构体
AppendStruct 允许将结构体附加到现有的 批处理 并将其解释为完整的行。这要求结构体的列在名称和类型上与表对齐。虽然所有列都必须具有等效的结构体字段,但某些结构体字段可能没有等效的列表示形式。这些将被简单地忽略。
batch, err := conn.PrepareBatch(context.Background(), "INSERT INTO example")
if err != nil {
return err
}
defer batch.Close()
for i := 0; i < 1_000; i++ {
err := batch.AppendStruct(&row{
Col1: uint64(i),
Col2: "Golang SQL database driver",
Col3: []uint8{1, 2, 3, 4, 5, 6, 7, 8, 9},
Col4: time.Now(),
ColIgnored: "this will be ignored",
})
if err != nil {
return err
}
}
完整示例
类型转换
客户端旨在尽可能灵活地接受变量类型,用于插入和响应的编组。在大多数情况下,ClickHouse 列类型存在等效的 Golang 类型,例如,UInt64 到 uint64。应始终支持这些逻辑映射。如果您希望使用可以在插入或用于接收响应之前进行转换的变量类型,则可能希望使用变量类型。客户端旨在透明地支持这些转换,因此用户无需在插入之前将数据精确对齐,并在查询时提供灵活的编组。这种透明转换不允许精度损失。例如,不能使用 uint32 来接收来自 UInt64 列的数据。相反,可以插入字符串到 datetime64 字段,前提是它满足格式要求。
当前支持的原始类型转换记录在 此处。
这项工作正在进行中,可以分为插入 (Append/AppendRow) 和读取时间 (通过 Scan)。如果您需要支持特定的转换,请提出 issue。
复杂类型
日期/日期时间类型
ClickHouse go 客户端支持 Date、Date32、DateTime 和 DateTime64 日期/日期时间类型。日期可以作为格式为 2006-01-02 的字符串插入,或使用 native go time.Time{} 或 sql.NullTime。DateTimes 也支持后两种类型,但要求字符串以格式 2006-01-02 15:04:05 传递,并带有可选的时区偏移量,例如 2006-01-02 15:04:05 +08:00。time.Time{} 和 sql.NullTime 在读取时都受支持,以及任何实现 sql.Scanner 接口的实现。
时区信息的处理取决于 ClickHouse 类型以及值是正在插入还是读取
- DateTime/DateTime64
- 在 插入 时,如果未提供时区,该值将以 UNIX 时间戳格式发送到 ClickHouse。客户端将假定客户端的本地时区。
time.Time{} 或 sql.NullTime 将相应地转换为 epoch。
- 在 选择 时,如果设置了时区,则将使用列的时区返回
time.Time 值。如果没有,将使用服务器的时区。
- Date/Date32
- 在 插入 时,在将日期转换为 unix 时间戳时,将考虑任何日期的时区,即它将在存储为日期之前通过时区进行偏移,因为 ClickHouse 中的 Date 类型没有区域设置。如果未在字符串值中指定此值,将使用本地时区。
- 在 选择 时,扫描到
time.Time{} 或 sql.NullTime{} 实例的日期将返回而不包含时区信息。
数组应作为切片插入。元素类型规则与 原始类型 一致,即,在可能的情况下,将转换元素。
应在 Scan 时提供指向切片的指针。
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
defer batch.Close()
var i int64
for i = 0; i < 10; i++ {
err := batch.Append(
[]string{strconv.Itoa(int(i)), strconv.Itoa(int(i + 1)), strconv.Itoa(int(i + 2)), strconv.Itoa(int(i + 3))},
[][]int64{{i, i + 1}, {i + 2, i + 3}, {i + 4, i + 5}},
)
if err != nil {
return err
}
}
if err := batch.Send(); err != nil {
return err
}
var (
col1 []string
col2 [][]int64
)
rows, err := conn.Query(ctx, "SELECT * FROM example")
if err != nil {
return err
}
for rows.Next() {
if err := rows.Scan(&col1, &col2); err != nil {
return err
}
fmt.Printf("row: col1=%v, col2=%v\n", col1, col2)
}
// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
return err
}
rows.Close()
完整示例
Map
Map 应作为 Golang map 插入,其键和值符合 之前 定义的类型规则。
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
defer batch.Close()
var i int64
for i = 0; i < 10; i++ {
err := batch.Append(
map[string]uint64{strconv.Itoa(int(i)): uint64(i)},
map[string][]string{strconv.Itoa(int(i)): {strconv.Itoa(int(i)), strconv.Itoa(int(i + 1)), strconv.Itoa(int(i + 2)), strconv.Itoa(int(i + 3))}},
map[string]map[string]uint64{strconv.Itoa(int(i)): {strconv.Itoa(int(i)): uint64(i)}},
)
if err != nil {
return err
}
}
if err := batch.Send(); err != nil {
return err
}
var (
col1 map[string]uint64
col2 map[string][]string
col3 map[string]map[string]uint64
)
rows, err := conn.Query(ctx, "SELECT * FROM example")
if err != nil {
return err
}
for rows.Next() {
if err := rows.Scan(&col1, &col2, &col3); err != nil {
return err
}
fmt.Printf("row: col1=%v, col2=%v, col3=%v\n", col1, col2, col3)
}
// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
return err
}
rows.Close()
完整示例
元组表示任意长度的列组。这些列可以显式命名,也可以仅指定类型,例如:
//unnamed
Col1 Tuple(String, Int64)
//named
Col2 Tuple(name String, id Int64, age uint8)
在这些方法中,命名元组提供更大的灵活性。而未命名元组必须使用切片插入和读取,命名元组也与映射兼容。
if err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 Tuple(name String, age UInt8),
Col2 Tuple(String, UInt8),
Col3 Tuple(name String, id String)
)
Engine Memory
`); err != nil {
return err
}
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
defer batch.Close()
// both named and unnamed can be added with slices. Note we can use strongly typed lists and maps if all elements are the same type
if err = batch.Append([]interface{}{"Clicky McClickHouse", uint8(42)}, []interface{}{"Clicky McClickHouse Snr", uint8(78)}, []string{"Dale", "521211"}); err != nil {
return err
}
if err = batch.Append(map[string]interface{}{"name": "Clicky McClickHouse Jnr", "age": uint8(20)}, []interface{}{"Baby Clicky McClickHouse", uint8(1)}, map[string]string{"name": "Geoff", "id": "12123"}); err != nil {
return err
}
if err = batch.Send(); err != nil {
return err
}
var (
col1 map[string]interface{}
col2 []interface{}
col3 map[string]string
)
// named tuples can be retrieved into a map or slices, unnamed just slices
if err = conn.QueryRow(ctx, "SELECT * FROM example").Scan(&col1, &col2, &col3); err != nil {
return err
}
fmt.Printf("row: col1=%v, col2=%v, col3=%v\n", col1, col2, col3)
完整示例
注意:支持类型的切片和映射,前提是命名元组中的子列都具有相同的类型。
嵌套字段等同于命名元组的数组。使用方式取决于用户是否将 flatten_nested 设置为 1 或 0。
通过将 flatten_nested 设置为 0,嵌套列将保持为元组的单个数组。这允许您使用映射的切片进行插入和检索,以及任意级别的嵌套。映射的键必须等于列的名称,如下例所示。
注意:由于映射表示一个元组,因此它们必须是 map[string]interface{} 类型。目前值没有进行强类型化。
conn, err := GetNativeConnection(clickhouse.Settings{
"flatten_nested": 0,
}, nil, nil)
if err != nil {
return err
}
ctx := context.Background()
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
conn.Exec(context.Background(), "DROP TABLE IF EXISTS example")
err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 Nested(Col1_1 String, Col1_2 UInt8),
Col2 Nested(
Col2_1 UInt8,
Col2_2 Nested(
Col2_2_1 UInt8,
Col2_2_2 UInt8
)
)
) Engine Memory
`)
if err != nil {
return err
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
defer batch.Close()
var i int64
for i = 0; i < 10; i++ {
err := batch.Append(
[]map[string]interface{}{
{
"Col1_1": strconv.Itoa(int(i)),
"Col1_2": uint8(i),
},
{
"Col1_1": strconv.Itoa(int(i + 1)),
"Col1_2": uint8(i + 1),
},
{
"Col1_1": strconv.Itoa(int(i + 2)),
"Col1_2": uint8(i + 2),
},
},
[]map[string]interface{}{
{
"Col2_2": []map[string]interface{}{
{
"Col2_2_1": uint8(i),
"Col2_2_2": uint8(i + 1),
},
},
"Col2_1": uint8(i),
},
{
"Col2_2": []map[string]interface{}{
{
"Col2_2_1": uint8(i + 2),
"Col2_2_2": uint8(i + 3),
},
},
"Col2_1": uint8(i + 1),
},
},
)
if err != nil {
return err
}
}
if err := batch.Send(); err != nil {
return err
}
var (
col1 []map[string]interface{}
col2 []map[string]interface{}
)
rows, err := conn.Query(ctx, "SELECT * FROM example")
if err != nil {
return err
}
for rows.Next() {
if err := rows.Scan(&col1, &col2); err != nil {
return err
}
fmt.Printf("row: col1=%v, col2=%v\n", col1, col2)
}
// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
return err
}
rows.Close()
完整示例 - flatten_tested=0
如果 flatten_nested 使用默认值 1,则嵌套列将被展平为单独的数组。这需要使用嵌套切片进行插入和检索。虽然任意级别的嵌套可能有效,但这并非官方支持。
conn, err := GetNativeConnection(nil, nil, nil)
if err != nil {
return err
}
ctx := context.Background()
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
conn.Exec(ctx, "DROP TABLE IF EXISTS example")
err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 Nested(Col1_1 String, Col1_2 UInt8),
Col2 Nested(
Col2_1 UInt8,
Col2_2 Nested(
Col2_2_1 UInt8,
Col2_2_2 UInt8
)
)
) Engine Memory
`)
if err != nil {
return err
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
defer batch.Close()
var i uint8
for i = 0; i < 10; i++ {
col1_1_data := []string{strconv.Itoa(int(i)), strconv.Itoa(int(i + 1)), strconv.Itoa(int(i + 2))}
col1_2_data := []uint8{i, i + 1, i + 2}
col2_1_data := []uint8{i, i + 1, i + 2}
col2_2_data := [][][]interface{}{
{
{i, i + 1},
},
{
{i + 2, i + 3},
},
{
{i + 4, i + 5},
},
}
err := batch.Append(
col1_1_data,
col1_2_data,
col2_1_data,
col2_2_data,
)
if err != nil {
return err
}
}
if err := batch.Send(); err != nil {
return err
}
完整示例 - flatten_nested=1
注意:嵌套列必须具有相同的维度。例如,在上面的示例中,Col_2_2 和 Col_2_1 必须具有相同数量的元素。
由于更直接的接口和对嵌套的官方支持,我们建议 flatten_nested=0。
地理类型
客户端支持地理类型 Point、Ring、Polygon 和 Multi Polygon。这些字段在 Golang 中使用 github.com/paulmach/orb 包。
if err = conn.Exec(ctx, `
CREATE TABLE example (
point Point,
ring Ring,
polygon Polygon,
mPolygon MultiPolygon
)
Engine Memory
`); err != nil {
return err
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
defer batch.Close()
if err = batch.Append(
orb.Point{11, 22},
orb.Ring{
orb.Point{1, 2},
orb.Point{1, 2},
},
orb.Polygon{
orb.Ring{
orb.Point{1, 2},
orb.Point{12, 2},
},
orb.Ring{
orb.Point{11, 2},
orb.Point{1, 12},
},
},
orb.MultiPolygon{
orb.Polygon{
orb.Ring{
orb.Point{1, 2},
orb.Point{12, 2},
},
orb.Ring{
orb.Point{11, 2},
orb.Point{1, 12},
},
},
orb.Polygon{
orb.Ring{
orb.Point{1, 2},
orb.Point{12, 2},
},
orb.Ring{
orb.Point{11, 2},
orb.Point{1, 12},
},
},
},
); err != nil {
return err
}
if err = batch.Send(); err != nil {
return err
}
var (
point orb.Point
ring orb.Ring
polygon orb.Polygon
mPolygon orb.MultiPolygon
)
if err = conn.QueryRow(ctx, "SELECT * FROM example").Scan(&point, &ring, &polygon, &mPolygon); err != nil {
return err
}
完整示例
UUID
UUID 类型受 github.com/google/uuid 包的支持。您还可以将 UUID 作为字符串或任何实现 sql.Scanner 或 Stringify 的类型发送和编组。
if err = conn.Exec(ctx, `
CREATE TABLE example (
col1 UUID,
col2 UUID
)
Engine Memory
`); err != nil {
return err
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
defer batch.Close()
col1Data, _ := uuid.NewUUID()
if err = batch.Append(
col1Data,
"603966d6-ed93-11ec-8ea0-0242ac120002",
); err != nil {
return err
}
if err = batch.Send(); err != nil {
return err
}
var (
col1 uuid.UUID
col2 uuid.UUID
)
if err = conn.QueryRow(ctx, "SELECT * FROM example").Scan(&col1, &col2); err != nil {
return err
}
完整示例
十进制
由于 Go 缺乏内置的十进制类型,我们建议使用第三方包 github.com/shopspring/decimal 以原生方式处理十进制类型,而无需修改原始查询。
if err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 Decimal32(3),
Col2 Decimal(18,6),
Col3 Decimal(15,7),
Col4 Decimal128(8),
Col5 Decimal256(9)
) Engine Memory
`); err != nil {
return err
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
defer batch.Close()
if err = batch.Append(
decimal.New(25, 4),
decimal.New(30, 5),
decimal.New(35, 6),
decimal.New(135, 7),
decimal.New(256, 8),
); err != nil {
return err
}
if err = batch.Send(); err != nil {
return err
}
var (
col1 decimal.Decimal
col2 decimal.Decimal
col3 decimal.Decimal
col4 decimal.Decimal
col5 decimal.Decimal
)
if err = conn.QueryRow(ctx, "SELECT * FROM example").Scan(&col1, &col2, &col3, &col4, &col5); err != nil {
return err
}
fmt.Printf("col1=%v, col2=%v, col3=%v, col4=%v, col5=%v\n", col1, col2, col3, col4, col5)
完整示例
可为空
Go 值 Nil 表示 ClickHouse NULL。如果字段声明为可为空,则可以使用它。在插入时,Nil 可以传递给列的普通版本和可为空版本。对于前者,将持久化该类型的默认值,例如字符串的空字符串。对于可为空版本,将在 ClickHouse 中存储 NULL 值。
在扫描时,用户必须传递一个指向支持 nil 的类型的指针,例如 *string,以便表示可为空字段的 nil 值。在下面的示例中,col1 是 Nullable(String),因此接收一个 **string。这允许表示 nil。
if err = conn.Exec(ctx, `
CREATE TABLE example (
col1 Nullable(String),
col2 String,
col3 Nullable(Int8),
col4 Nullable(Int64)
)
Engine Memory
`); err != nil {
return err
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
defer batch.Close()
if err = batch.Append(
nil,
nil,
nil,
sql.NullInt64{Int64: 0, Valid: false},
); err != nil {
return err
}
if err = batch.Send(); err != nil {
return err
}
var (
col1 *string
col2 string
col3 *int8
col4 sql.NullInt64
)
if err = conn.QueryRow(ctx, "SELECT * FROM example").Scan(&col1, &col2, &col3, &col4); err != nil {
return err
}
完整示例
客户端还支持 sql.Null* 类型,例如 sql.NullInt64。它们与等效的 ClickHouse 类型兼容。
大整数 - Int128、Int256、UInt128、UInt256
大于 64 位的数字类型使用原生 go big 包表示。
if err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 Int128,
Col2 UInt128,
Col3 Array(Int128),
Col4 Int256,
Col5 Array(Int256),
Col6 UInt256,
Col7 Array(UInt256)
) Engine Memory`); err != nil {
return err
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
defer batch.Close()
col1Data, _ := new(big.Int).SetString("170141183460469231731687303715884105727", 10)
col2Data := big.NewInt(128)
col3Data := []*big.Int{
big.NewInt(-128),
big.NewInt(128128),
big.NewInt(128128128),
}
col4Data := big.NewInt(256)
col5Data := []*big.Int{
big.NewInt(256),
big.NewInt(256256),
big.NewInt(256256256256),
}
col6Data := big.NewInt(256)
col7Data := []*big.Int{
big.NewInt(256),
big.NewInt(256256),
big.NewInt(256256256256),
}
if err = batch.Append(col1Data, col2Data, col3Data, col4Data, col5Data, col6Data, col7Data); err != nil {
return err
}
if err = batch.Send(); err != nil {
return err
}
var (
col1 big.Int
col2 big.Int
col3 []*big.Int
col4 big.Int
col5 []*big.Int
col6 big.Int
col7 []*big.Int
)
if err = conn.QueryRow(ctx, "SELECT * FROM example").Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7); err != nil {
return err
}
fmt.Printf("col1=%v, col2=%v, col3=%v, col4=%v, col5=%v, col6=%v, col7=%v\n", col1, col2, col3, col4, col5, col6, col7)
完整示例
对压缩方法的支持取决于所使用的底层协议。对于原生协议,客户端支持 LZ4 和 ZSTD 压缩。这仅在块级别进行。可以通过在连接中包含 Compression 配置来启用压缩。
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionZSTD,
},
MaxOpenConns: 1,
})
ctx := context.Background()
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
conn.Exec(context.Background(), "DROP TABLE IF EXISTS example")
if err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 Array(String)
) Engine Memory
`); err != nil {
return err
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
defer batch.Close()
for i := 0; i < 1000; i++ {
if err := batch.Append([]string{strconv.Itoa(i), strconv.Itoa(i + 1), strconv.Itoa(i + 2), strconv.Itoa(i + 3)}); err != nil {
return err
}
}
if err := batch.Send(); err != nil {
return err
}
完整示例
如果使用通过 HTTP 的标准接口,则可以使用其他压缩技术。有关更多详细信息,请参阅 database/sql API - 压缩。
参数绑定
客户端支持 Exec、Query 和 QueryRow 方法的参数绑定。如下例所示,这支持使用命名、编号和位置参数。我们在此处提供这些示例。
var count uint64
// positional bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 >= ? AND Col3 < ?", 500, now.Add(time.Duration(750)*time.Second)).Scan(&count); err != nil {
return err
}
// 250
fmt.Printf("Positional bind count: %d\n", count)
// numeric bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= $2 AND Col3 > $1", now.Add(time.Duration(150)*time.Second), 250).Scan(&count); err != nil {
return err
}
// 100
fmt.Printf("Numeric bind count: %d\n", count)
// named bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= @col1 AND Col3 > @col3", clickhouse.Named("col1", 100), clickhouse.Named("col3", now.Add(time.Duration(50)*time.Second))).Scan(&count); err != nil {
return err
}
// 50
fmt.Printf("Named bind count: %d\n", count)
完整示例
特殊情况
默认情况下,如果作为查询的参数传递,切片将展开为逗号分隔的值列表。如果您需要使用包装 [ ] 注入一组值,则应使用 ArraySet。
如果需要带有包装 ( ) 的组/元组,例如用于 IN 运算符,则可以使用 GroupSet。这对于需要多个组的情况特别有用,如下例所示。
最后,DateTime64 字段需要精度才能确保参数正确呈现。该字段的精度级别对客户端未知,因此用户必须提供它。为了便于操作,我们提供了 DateNamed 参数。
var count uint64
// arrays will be unfolded
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 IN (?)", []int{100, 200, 300, 400, 500}).Scan(&count); err != nil {
return err
}
fmt.Printf("Array unfolded count: %d\n", count)
// arrays will be preserved with []
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col4 = ?", clickhouse.ArraySet{300, 301}).Scan(&count); err != nil {
return err
}
fmt.Printf("Array count: %d\n", count)
// Group sets allow us to form ( ) lists
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 IN ?", clickhouse.GroupSet{[]interface{}{100, 200, 300, 400, 500}}).Scan(&count); err != nil {
return err
}
fmt.Printf("Group count: %d\n", count)
// More useful when we need nesting
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE (Col1, Col5) IN (?)", []clickhouse.GroupSet{{[]interface{}{100, 101}}, {[]interface{}{200, 201}}}).Scan(&count); err != nil {
return err
}
fmt.Printf("Group count: %d\n", count)
// Use DateNamed when you need a precision in your time#
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col3 >= @col3", clickhouse.DateNamed("col3", now.Add(time.Duration(500)*time.Millisecond), clickhouse.NanoSeconds)).Scan(&count); err != nil {
return err
}
fmt.Printf("NamedDate count: %d\n", count)
完整示例
使用上下文
Go 上下文提供了一种跨 API 边界传递截止时间、取消信号和其他请求范围值的手段。连接上的所有方法都将上下文作为其第一个变量接受。虽然之前的示例使用了 context.Background(),但您可以使用此功能传递设置和截止时间以及取消查询。
传递使用 withDeadline 创建的上下文允许对查询施加执行时间限制。请注意,这是一个绝对时间,并且到期只会释放连接并向 ClickHouse 发送取消信号。WithCancel 也可以用于显式取消查询。
辅助函数 clickhouse.WithQueryID 和 clickhouse.WithQuotaKey 允许指定查询 ID 和配额键。查询 ID 对于在日志中跟踪查询和用于取消目的很有用。配额键可用于根据唯一的键值对 ClickHouse 使用施加限制 - 有关更多详细信息,请参阅 配额管理 。
您还可以使用上下文来确保仅对特定查询应用设置 - 而不是应用于整个连接,如 连接设置 中所示。
最后,您可以通过 clickhouse.WithBlockSize 控制块缓冲区的大小。这会覆盖连接级别的设置 BlockBufferSize 并控制同时解码和保存在内存中的最大块数。较大的值可能意味着更多的并行化,但会消耗更多的内存。
上述示例如下所示。
dialCount := 0
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
DialContext: func(ctx context.Context, addr string) (net.Conn, error) {
dialCount++
var d net.Dialer
return d.DialContext(ctx, "tcp", addr)
},
})
if err != nil {
return err
}
if err := clickhouse_tests.CheckMinServerServerVersion(conn, 22, 6, 1); err != nil {
return nil
}
// we can use context to pass settings to a specific API call
ctx := clickhouse.Context(context.Background(), clickhouse.WithSettings(clickhouse.Settings{
"allow_experimental_object_type": "1",
}))
conn.Exec(ctx, "DROP TABLE IF EXISTS example")
// to create a JSON column we need allow_experimental_object_type=1
if err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 JSON
)
Engine Memory
`); err != nil {
return err
}
// queries can be cancelled using the context
ctx, cancel := context.WithCancel(context.Background())
go func() {
cancel()
}()
if err = conn.QueryRow(ctx, "SELECT sleep(3)").Scan(); err == nil {
return fmt.Errorf("expected cancel")
}
// set a deadline for a query - this will cancel the query after the absolute time is reached.
// queries will continue to completion in ClickHouse
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(-time.Second))
defer cancel()
if err := conn.Ping(ctx); err == nil {
return fmt.Errorf("expected deadline exceeeded")
}
// set a query id to assist tracing queries in logs e.g. see system.query_log
var one uint8
queryId, _ := uuid.NewUUID()
ctx = clickhouse.Context(context.Background(), clickhouse.WithQueryID(queryId.String()))
if err = conn.QueryRow(ctx, "SELECT 1").Scan(&one); err != nil {
return err
}
conn.Exec(context.Background(), "DROP QUOTA IF EXISTS foobar")
defer func() {
conn.Exec(context.Background(), "DROP QUOTA IF EXISTS foobar")
}()
ctx = clickhouse.Context(context.Background(), clickhouse.WithQuotaKey("abcde"))
// set a quota key - first create the quota
if err = conn.Exec(ctx, "CREATE QUOTA IF NOT EXISTS foobar KEYED BY client_key FOR INTERVAL 1 minute MAX queries = 5 TO default"); err != nil {
return err
}
type Number struct {
Number uint64 `ch:"number"`
}
for i := 1; i <= 6; i++ {
var result []Number
if err = conn.Select(ctx, &result, "SELECT number FROM numbers(10)"); err != nil {
return err
}
}
完整示例
可以请求查询的进度、Profile 和 Log 信息。进度信息将报告 ClickHouse 中读取和处理的行数和字节数的统计信息。相反,Profile 信息提供返回给客户端的数据摘要,包括字节数(未压缩)、行数和块数的总计。最后,日志信息提供有关线程的统计信息,例如内存使用情况和数据速度。
获取此信息需要用户使用 上下文,用户可以向其传递回调函数。
totalRows := uint64(0)
// use context to pass a call back for progress and profile info
ctx := clickhouse.Context(context.Background(), clickhouse.WithProgress(func(p *clickhouse.Progress) {
fmt.Println("progress: ", p)
totalRows += p.Rows
}), clickhouse.WithProfileInfo(func(p *clickhouse.ProfileInfo) {
fmt.Println("profile info: ", p)
}), clickhouse.WithLogs(func(log *clickhouse.Log) {
fmt.Println("log info: ", log)
}))
rows, err := conn.Query(ctx, "SELECT number from numbers(1000000) LIMIT 1000000")
if err != nil {
return err
}
for rows.Next() {
}
// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
return err
}
fmt.Printf("Total Rows: %d\n", totalRows)
rows.Close()
完整示例
动态扫描
您可能需要读取不知道模式或返回字段类型的表。这在执行临时数据分析或编写通用工具时很常见。为此,可以在查询响应中获取列类型信息。这可与 Go 反射结合使用,以创建正确类型的变量的运行时实例,这些变量可以传递给 Scan。
const query = `
SELECT
1 AS Col1
, 'Text' AS Col2
`
rows, err := conn.Query(context.Background(), query)
if err != nil {
return err
}
defer rows.Close()
var (
columnTypes = rows.ColumnTypes()
vars = make([]interface{}, len(columnTypes))
)
for i := range columnTypes {
vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
}
for rows.Next() {
if err := rows.Scan(vars...); err != nil {
return err
}
for _, v := range vars {
switch v := v.(type) {
case *string:
fmt.Println(*v)
case *uint8:
fmt.Println(*v)
}
}
}
// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
return err
}
完整示例
外部表
外部表 允许客户端将数据发送到 ClickHouse,并进行 SELECT 查询。此数据放入临时表,并可以在查询本身中进行评估。
要使用查询将外部数据发送到客户端,用户必须通过 ext.NewTable 构建外部表,然后通过上下文传递此表。
table1, err := ext.NewTable("external_table_1",
ext.Column("col1", "UInt8"),
ext.Column("col2", "String"),
ext.Column("col3", "DateTime"),
)
if err != nil {
return err
}
for i := 0; i < 10; i++ {
if err = table1.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now()); err != nil {
return err
}
}
table2, err := ext.NewTable("external_table_2",
ext.Column("col1", "UInt8"),
ext.Column("col2", "String"),
ext.Column("col3", "DateTime"),
)
for i := 0; i < 10; i++ {
table2.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now())
}
ctx := clickhouse.Context(context.Background(),
clickhouse.WithExternalTable(table1, table2),
)
rows, err := conn.Query(ctx, "SELECT * FROM external_table_1")
if err != nil {
return err
}
for rows.Next() {
var (
col1 uint8
col2 string
col3 time.Time
)
rows.Scan(&col1, &col2, &col3)
fmt.Printf("col1=%d, col2=%s, col3=%v\n", col1, col2, col3)
}
// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
return err
}
rows.Close()
var count uint64
if err := conn.QueryRow(ctx, "SELECT COUNT(*) FROM external_table_1").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_1: %d\n", count)
if err := conn.QueryRow(ctx, "SELECT COUNT(*) FROM external_table_2").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_2: %d\n", count)
if err := conn.QueryRow(ctx, "SELECT COUNT(*) FROM (SELECT * FROM external_table_1 UNION ALL SELECT * FROM external_table_2)").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_1 UNION external_table_2: %d\n", count)
完整示例
开放遥测
ClickHouse 允许通过原生协议传递一个 追踪上下文。客户端允许通过函数 clickhouse.withSpan 创建一个 Span 并通过 Context 传递来实现这一点。
var count uint64
rows := conn.QueryRow(clickhouse.Context(context.Background(), clickhouse.WithSpan(
trace.NewSpanContext(trace.SpanContextConfig{
SpanID: trace.SpanID{1, 2, 3, 4, 5},
TraceID: trace.TraceID{5, 4, 3, 2, 1},
}),
)), "SELECT COUNT() FROM (SELECT number FROM system.numbers LIMIT 5)")
if err := rows.Scan(&count); err != nil {
return err
}
// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
return err
}
fmt.Printf("count: %d\n", count)
完整示例
有关利用追踪的完整细节,请参阅 OpenTelemetry 支持。
Database/SQL API
database/sql 或 “标准” API 允许您在应用程序代码应该与底层数据库无关的情况下使用客户端,通过遵循标准接口来实现。这样做会带来一些开销——额外的抽象和间接层以及不一定与 ClickHouse 对齐的原始类型。然而,在需要工具连接到多个数据库的场景中,这些成本通常是可以接受的。
此外,此客户端支持使用 HTTP 作为传输层——数据仍然以原生格式编码,以获得最佳性能。
以下旨在镜像 ClickHouse API 文档的结构。
标准 API 的完整代码示例可以在 此处 找到。
可以通过格式为 clickhouse://<host>:<port>?<query_option>=<value> 的 DSN 字符串和 Open 方法,或者通过 clickhouse.OpenDB 方法建立连接。后者不是 database/sql 规范的一部分,但返回一个 sql.DB 实例。此方法提供了一些功能,例如性能分析,而这些功能无法通过 database/sql 规范以明显的方式暴露。
func Connect() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
return conn.Ping()
}
func ConnectDSN() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("clickhouse://%s:%d?username=%s&password=%s", env.Host, env.Port, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}
完整示例
对于后续所有示例,除非明确说明,我们假设已创建并可用 ClickHouse conn 变量。
连接设置
以下参数可以在 DSN 字符串中传递
hosts - 逗号分隔的单个地址主机列表,用于负载均衡和故障转移 - 请参阅 连接到多个节点。
username/password - 身份验证凭据 - 请参阅 身份验证
database - 选择当前默认数据库
dial_timeout - 一个持续时间字符串是一个带可选小数和单位后缀的十进制数字序列,例如 300ms、1s。有效时间单位为 ms、s、m。
connection_open_strategy - random/in_order (默认 random) - 请参阅 连接到多个节点
round_robin - 从集合中选择轮询服务器
in_order - 选择指定顺序中的第一个可用服务器
debug - 启用调试输出 (布尔值)
compress - 指定压缩算法 - none (默认), zstd, lz4, gzip, deflate, br。如果设置为 true,将使用 lz4。仅 lz4 和 zstd 支持原生通信。
compress_level - 压缩级别 (默认值为 0)。请参阅压缩。这取决于算法
gzip - -2 (最佳速度) 到 9 (最佳压缩)
deflate - -2 (最佳速度) 到 9 (最佳压缩)
br - 0 (最佳速度) 到 11 (最佳压缩)
zstd, lz4 - 忽略
secure - 建立安全的 SSL 连接 (默认值为 false)
skip_verify - 跳过证书验证 (默认值为 false)
block_buffer_size - 允许您控制块缓冲区大小。请参阅 BlockBufferSize。 (默认值为 2)
func ConnectSettings() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("clickhouse://127.0.0.1:9001,127.0.0.1:9002,%s:%d/%s?username=%s&password=%s&dial_timeout=10s&connection_open_strategy=round_robin&debug=true&compress=lz4", env.Host, env.Port, env.Database, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}
完整示例
连接池
您可以影响所提供节点地址列表的使用方式,如 连接到多个节点 中所述。连接管理和池化由设计委托给 sql.DB。连接池已为 Native (TCP) 和 HTTP 协议启用。
通过 HTTP 连接
默认情况下,连接是通过原生协议建立的。对于需要 HTTP 的用户,可以通过修改 DSN 以包含 HTTP 协议,或者通过在连接选项中指定协议来启用它。
func ConnectHTTP() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Protocol: clickhouse.HTTP,
})
return conn.Ping()
}
func ConnectDSNHTTP() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("http://%s:%d?username=%s&password=%s", env.Host, env.HttpPort, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}
完整示例
连接到多个节点
如果使用 OpenDB,请使用与 ClickHouse API 相同的方法连接到多个主机,并使用相同的选项,包括可选地指定 ConnOpenStrategy。
对于基于 DSN 的连接,该字符串接受多个主机和一个 connection_open_strategy 参数,其值为 round_robin 或 in_order。
func MultiStdHost() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{"127.0.0.1:9001", "127.0.0.1:9002", fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
ConnOpenStrategy: clickhouse.ConnOpenRoundRobin,
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
if err != nil {
return err
}
fmt.Println(v.String())
return nil
}
func MultiStdHostDSN() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("clickhouse://127.0.0.1:9001,127.0.0.1:9002,%s:%d?username=%s&password=%s&connection_open_strategy=round_robin", env.Host, env.Port, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}
完整示例
使用 TLS
如果使用 DSN 连接字符串,可以通过参数 "secure=true" 启用 SSL。OpenDB 方法使用与 原生 API 的 TLS 相同的方法,依赖于指定非空 TLS 结构。虽然 DSN 连接字符串支持参数 skip_verify 以跳过 SSL 验证,但 OpenDB 方法需要进行更高级的 TLS 配置——因为它允许传递配置。
func ConnectSSL() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
cwd, err := os.Getwd()
if err != nil {
return err
}
t := &tls.Config{}
caCert, err := ioutil.ReadFile(path.Join(cwd, "../../tests/resources/CAroot.crt"))
if err != nil {
return err
}
caCertPool := x509.NewCertPool()
successful := caCertPool.AppendCertsFromPEM(caCert)
if !successful {
return err
}
t.RootCAs = caCertPool
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.SslPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
TLS: t,
})
return conn.Ping()
}
func ConnectDSNSSL() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("https://%s:%d?secure=true&skip_verify=true&username=%s&password=%s", env.Host, env.HttpsPort, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}
完整示例
身份验证
如果使用 OpenDB,可以通过常规选项传递身份验证信息。对于基于 DSN 的连接,可以在连接字符串中传递用户名和密码——作为参数或编码在地址中的凭据。
func ConnectAuth() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
return conn.Ping()
}
func ConnectDSNAuth() error {
env, err := GetStdTestEnvironment()
conn, err := sql.Open("clickhouse", fmt.Sprintf("http://%s:%d?username=%s&password=%s", env.Host, env.HttpPort, env.Username, env.Password))
if err != nil {
return err
}
if err = conn.Ping(); err != nil {
return err
}
conn, err = sql.Open("clickhouse", fmt.Sprintf("http://%s:%s@%s:%d", env.Username, env.Password, env.Host, env.HttpPort))
if err != nil {
return err
}
return conn.Ping()
}
完整示例
获得连接后,您可以通过 Exec 方法执行 sql 语句。
conn.Exec(`DROP TABLE IF EXISTS example`)
_, err = conn.Exec(`
CREATE TABLE IF NOT EXISTS example (
Col1 UInt8,
Col2 String
) engine=Memory
`)
if err != nil {
return err
}
_, err = conn.Exec("INSERT INTO example VALUES (1, 'test-1')")
完整示例
此方法不支持接收上下文——默认情况下,它使用后台上下文执行。如果需要,可以使用 ExecContext - 请参阅 使用 Context。
批量插入
可以通过通过 Being 方法创建 sql.Tx 来实现批量语义。从这个,可以使用带有 INSERT 语句的 Prepare 方法获得一个批处理。这将返回一个 sql.Stmt,可以将行附加到该语句,使用 Exec 方法。批处理将在内存中累积,直到在原始 sql.Tx 上执行 Commit。
batch, err := scope.Prepare("INSERT INTO example")
if err != nil {
return err
}
for i := 0; i < 1000; i++ {
_, err := batch.Exec(
uint8(42),
"ClickHouse", "Inc",
uuid.New(),
map[string]uint8{"key": 1}, // Map(String, UInt8)
[]string{"Q", "W", "E", "R", "T", "Y"}, // Array(String)
[]interface{}{ // Tuple(String, UInt8, Array(Map(String, String)))
"String Value", uint8(5), []map[string]string{
map[string]string{"key": "value"},
map[string]string{"key": "value"},
map[string]string{"key": "value"},
},
},
time.Now(),
)
if err != nil {
return err
}
}
return scope.Commit()
完整示例
查询行/s
可以使用 QueryRow 方法查询单行。这将返回一个 *sql.Row,可以在其上调用 Scan,并使用指针指向应将列编组到的变量。QueryRowContext 变体允许传递除后台以外的上下文 - 请参阅 使用 Context。
row := conn.QueryRow("SELECT * FROM example")
var (
col1 uint8
col2, col3, col4 string
col5 map[string]uint8
col6 []string
col7 interface{}
col8 time.Time
)
if err := row.Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7, &col8); err != nil {
return err
}
完整示例
迭代多行需要 Query 方法。这将返回一个 *sql.Rows 结构,可以在其上调用 Next 以迭代行。QueryContext 等效项允许传递上下文。
rows, err := conn.Query("SELECT * FROM example")
if err != nil {
return err
}
defer rows.Close()
var (
col1 uint8
col2, col3, col4 string
col5 map[string]uint8
col6 []string
col7 interface{}
col8 time.Time
)
for rows.Next() {
if err := rows.Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7, &col8); err != nil {
return err
}
fmt.Printf("row: col1=%d, col2=%s, col3=%s, col4=%s, col5=%v, col6=%v, col7=%v, col8=%v\n", col1, col2, col3, col4, col5, col6, col7, col8)
}
// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
return err
}
完整示例
异步插入
可以通过通过 ExecContext 方法执行插入来完成异步插入。应将启用异步模式的上下文传递给它,如下所示。这允许用户指定客户端是应该等待服务器完成插入还是在收到数据后响应。这有效地控制了参数 wait_for_async_insert。
const ddl = `
CREATE TABLE example (
Col1 UInt64
, Col2 String
, Col3 Array(UInt8)
, Col4 DateTime
) ENGINE = Memory
`
if _, err := conn.Exec(ddl); err != nil {
return err
}
ctx := clickhouse.Context(context.Background(), clickhouse.WithStdAsync(false))
{
for i := 0; i < 100; i++ {
_, err := conn.ExecContext(ctx, fmt.Sprintf(`INSERT INTO example VALUES (
%d, '%s', [1, 2, 3, 4, 5, 6, 7, 8, 9], now()
)`, i, "Golang SQL database driver"))
if err != nil {
return err
}
}
}
完整示例
列式插入
不支持使用标准接口。
使用结构体
不支持使用标准接口。
类型转换
标准 database/sql 接口应支持与 ClickHouse API 相同的类型。对于以下复杂类型,有一些例外情况,我们将在下面记录。与 ClickHouse API 类似,客户端旨在在接受插入和响应编组的变量类型方面尽可能灵活。有关更多详细信息,请参阅 类型转换。
复杂类型
除非另有说明,复杂类型处理应与 ClickHouse API 相同。差异是 database/sql 内部结构的结果。
Maps
与 ClickHouse API 不同,标准 API 要求在扫描类型时对 maps 进行强类型化。例如,您不能为 Map(String,String) 字段传递 map[string]interface{},而必须使用 map[string]string。interface{} 变量始终兼容,可用于更复杂的结构。读取时不支持结构体。
var (
col1Data = map[string]uint64{
"key_col_1_1": 1,
"key_col_1_2": 2,
}
col2Data = map[string]uint64{
"key_col_2_1": 10,
"key_col_2_2": 20,
}
col3Data = map[string]uint64{}
col4Data = []map[string]string{
{"A": "B"},
{"C": "D"},
}
col5Data = map[string]uint64{
"key_col_5_1": 100,
"key_col_5_2": 200,
}
)
if _, err := batch.Exec(col1Data, col2Data, col3Data, col4Data, col5Data); err != nil {
return err
}
if err = scope.Commit(); err != nil {
return err
}
var (
col1 interface{}
col2 map[string]uint64
col3 map[string]uint64
col4 []map[string]string
col5 map[string]uint64
)
if err := conn.QueryRow("SELECT * FROM example").Scan(&col1, &col2, &col3, &col4, &col5); err != nil {
return err
}
fmt.Printf("col1=%v, col2=%v, col3=%v, col4=%v, col5=%v", col1, col2, col3, col4, col5)
完整示例
插入行为与 ClickHouse API 相同。
标准 API 支持与原生 ClickHouse API 相同的压缩算法,即块级别的 lz4 和 zstd 压缩。此外,gzip、deflate 和 br 压缩支持 HTTP 连接。如果启用了任何这些压缩算法,则在插入和查询响应期间对块进行压缩。其他请求,例如 ping 或查询请求,将保持未压缩状态。这与 lz4 和 zstd 选项一致。
如果使用 OpenDB 方法建立连接,则可以传递 Compression 配置。这包括指定压缩级别的能力(如下所示)。如果通过带有 DSN 的 sql.Open 连接,请使用参数 compress。这可以是特定的压缩算法,即 gzip、deflate、br、zstd 或 lz4,也可以是布尔标志。如果设置为 true,将使用 lz4。默认值为 none,即禁用压缩。
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionBrotli,
Level: 5,
},
Protocol: clickhouse.HTTP,
})
完整示例
conn, err := sql.Open("clickhouse", fmt.Sprintf("http://%s:%d?username=%s&password=%s&compress=gzip&compress_level=5", env.Host, env.HttpPort, env.Username, env.Password))
完整示例
应用压缩的级别可以通过 DSN 参数 compress_level 或 Compression 选项的 Level 字段来控制。默认值为 0,但具体取决于算法。
gzip - -2 (最佳速度) 到 9 (最佳压缩)
deflate - -2 (最佳速度) 到 9 (最佳压缩)
br - 0 (最佳速度) 到 11 (最佳压缩)
zstd, lz4 - 忽略
参数绑定
标准 API 支持与 ClickHouse API 相同的参数绑定功能,允许将参数传递给 Exec、Query 和 QueryRow 方法(以及它们的等效 Context 变体)。支持位置、命名和编号参数。
var count uint64
// positional bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 >= ? AND Col3 < ?", 500, now.Add(time.Duration(750)*time.Second)).Scan(&count); err != nil {
return err
}
// 250
fmt.Printf("Positional bind count: %d\n", count)
// numeric bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= $2 AND Col3 > $1", now.Add(time.Duration(150)*time.Second), 250).Scan(&count); err != nil {
return err
}
// 100
fmt.Printf("Numeric bind count: %d\n", count)
// named bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= @col1 AND Col3 > @col3", clickhouse.Named("col1", 100), clickhouse.Named("col3", now.Add(time.Duration(50)*time.Second))).Scan(&count); err != nil {
return err
}
// 50
fmt.Printf("Named bind count: %d\n", count)
完整示例
注意 特殊情况 仍然适用。
使用上下文
标准 API 支持与 ClickHouse API 相同的功能,可以通过上下文传递截止时间、取消信号和其他请求范围内的值。与 ClickHouse API 不同,这是通过使用方法的 Context 变体来实现的,例如,默认使用后台上下文的 Exec 方法,具有一个 ExecContext 变体,可以将上下文作为第一个参数传递。这允许在应用程序流程的任何阶段传递上下文。例如,您可以在通过 ConnContext 建立连接或通过 QueryRowContext 请求查询行时传递上下文。下面显示了所有可用方法的示例。
有关使用上下文传递截止时间、取消信号、查询 ID、配额密钥和连接设置的更多详细信息,请参阅 ClickHouse API 的使用上下文。
ctx := clickhouse.Context(context.Background(), clickhouse.WithSettings(clickhouse.Settings{
"allow_experimental_object_type": "1",
}))
conn.ExecContext(ctx, "DROP TABLE IF EXISTS example")
// to create a JSON column we need allow_experimental_object_type=1
if _, err = conn.ExecContext(ctx, `
CREATE TABLE example (
Col1 JSON
)
Engine Memory
`); err != nil {
return err
}
// queries can be cancelled using the context
ctx, cancel := context.WithCancel(context.Background())
go func() {
cancel()
}()
if err = conn.QueryRowContext(ctx, "SELECT sleep(3)").Scan(); err == nil {
return fmt.Errorf("expected cancel")
}
// set a deadline for a query - this will cancel the query after the absolute time is reached. Again terminates the connection only,
// queries will continue to completion in ClickHouse
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(-time.Second))
defer cancel()
if err := conn.PingContext(ctx); err == nil {
return fmt.Errorf("expected deadline exceeeded")
}
// set a query id to assist tracing queries in logs e.g. see system.query_log
var one uint8
ctx = clickhouse.Context(context.Background(), clickhouse.WithQueryID(uuid.NewString()))
if err = conn.QueryRowContext(ctx, "SELECT 1").Scan(&one); err != nil {
return err
}
conn.ExecContext(context.Background(), "DROP QUOTA IF EXISTS foobar")
defer func() {
conn.ExecContext(context.Background(), "DROP QUOTA IF EXISTS foobar")
}()
ctx = clickhouse.Context(context.Background(), clickhouse.WithQuotaKey("abcde"))
// set a quota key - first create the quota
if _, err = conn.ExecContext(ctx, "CREATE QUOTA IF NOT EXISTS foobar KEYED BY client_key FOR INTERVAL 1 minute MAX queries = 5 TO default"); err != nil {
return err
}
// queries can be cancelled using the context
ctx, cancel = context.WithCancel(context.Background())
// we will get some results before cancel
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{
"max_block_size": "1",
}))
rows, err := conn.QueryContext(ctx, "SELECT sleepEachRow(1), number FROM numbers(100);")
if err != nil {
return err
}
defer rows.Close()
var (
col1 uint8
col2 uint8
)
for rows.Next() {
if err := rows.Scan(&col1, &col2); err != nil {
if col2 > 3 {
fmt.Println("expected cancel")
return nil
}
return err
}
fmt.Printf("row: col2=%d\n", col2)
if col2 == 3 {
cancel()
}
}
// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
return err
}
完整示例
虽然本机连接固有地具有会话,但通过 HTTP 的连接需要用户在上下文中创建一个会话 ID 作为设置。这允许使用与会话绑定的功能,例如临时表。
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Protocol: clickhouse.HTTP,
Settings: clickhouse.Settings{
"session_id": uuid.NewString(),
},
})
if _, err := conn.Exec(`DROP TABLE IF EXISTS example`); err != nil {
return err
}
_, err = conn.Exec(`
CREATE TEMPORARY TABLE IF NOT EXISTS example (
Col1 UInt8
)
`)
if err != nil {
return err
}
scope, err := conn.Begin()
if err != nil {
return err
}
batch, err := scope.Prepare("INSERT INTO example")
if err != nil {
return err
}
for i := 0; i < 10; i++ {
_, err := batch.Exec(
uint8(i),
)
if err != nil {
return err
}
}
rows, err := conn.Query("SELECT * FROM example")
if err != nil {
return err
}
defer rows.Close()
var (
col1 uint8
)
for rows.Next() {
if err := rows.Scan(&col1); err != nil {
return err
}
fmt.Printf("row: col1=%d\n", col1)
}
// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
return err
}
完整示例
动态扫描
与 ClickHouse API 类似,列类型信息可用,允许您创建正确类型的变量的运行时实例,这些变量可以传递给 Scan。这允许读取列,其中类型未知。
const query = `
SELECT
1 AS Col1
, 'Text' AS Col2
`
rows, err := conn.QueryContext(context.Background(), query)
if err != nil {
return err
}
defer rows.Close()
columnTypes, err := rows.ColumnTypes()
if err != nil {
return err
}
vars := make([]interface{}, len(columnTypes))
for i := range columnTypes {
vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
}
for rows.Next() {
if err := rows.Scan(vars...); err != nil {
return err
}
for _, v := range vars {
switch v := v.(type) {
case *string:
fmt.Println(*v)
case *uint8:
fmt.Println(*v)
}
}
}
// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
return err
}
完整示例
外部表
外部表 允许客户端使用 SELECT 查询将数据发送到 ClickHouse。此数据放入临时表,并可以在查询本身中使用进行评估。
要使用查询将外部数据发送到客户端,用户必须通过 ext.NewTable 构建外部表,然后通过上下文传递此表。
table1, err := ext.NewTable("external_table_1",
ext.Column("col1", "UInt8"),
ext.Column("col2", "String"),
ext.Column("col3", "DateTime"),
)
if err != nil {
return err
}
for i := 0; i < 10; i++ {
if err = table1.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now()); err != nil {
return err
}
}
table2, err := ext.NewTable("external_table_2",
ext.Column("col1", "UInt8"),
ext.Column("col2", "String"),
ext.Column("col3", "DateTime"),
)
for i := 0; i < 10; i++ {
table2.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now())
}
ctx := clickhouse.Context(context.Background(),
clickhouse.WithExternalTable(table1, table2),
)
rows, err := conn.QueryContext(ctx, "SELECT * FROM external_table_1")
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var (
col1 uint8
col2 string
col3 time.Time
)
rows.Scan(&col1, &col2, &col3)
fmt.Printf("col1=%d, col2=%s, col3=%v\n", col1, col2, col3)
}
// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
return err
}
var count uint64
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_table_1").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_1: %d\n", count)
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_table_2").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_2: %d\n", count)
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM (SELECT * FROM external_table_1 UNION ALL SELECT * FROM external_table_2)").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_1 UNION external_table_2: %d\n", count)
完整示例
开放遥测
ClickHouse 允许将 跟踪上下文 作为本机协议的一部分传递。客户端允许通过函数 clickhouse.withSpan 创建一个 Span,并通过 Context 传递它来实现这一点。当使用 HTTP 作为传输时,这不受支持。
var count uint64
rows := conn.QueryRowContext(clickhouse.Context(context.Background(), clickhouse.WithSpan(
trace.NewSpanContext(trace.SpanContextConfig{
SpanID: trace.SpanID{1, 2, 3, 4, 5},
TraceID: trace.TraceID{5, 4, 3, 2, 1},
}),
)), "SELECT COUNT() FROM (SELECT number FROM system.numbers LIMIT 5)")
if err := rows.Scan(&count); err != nil {
return err
}
// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
return err
}
fmt.Printf("count: %d\n", count)
完整示例
- 尽可能使用 ClickHouse API,尤其是对于基本类型。这避免了大量的反射和间接寻址。
- 如果读取大型数据集,请考虑修改
BlockBufferSize。这将增加内存占用,但意味着在行迭代期间可以并行解码更多的块。默认值 2 是保守的,可最大程度地减少内存开销。较高的值意味着内存中更多的块。这需要测试,因为不同的查询会产生不同的块大小。因此,可以通过 Context 在 查询级别 上进行设置。
- 在插入数据时,请明确指定您的类型。虽然客户端旨在具有灵活性,例如允许解析字符串以获取 UUID 或 IP,但这需要数据验证,并在插入时产生成本。
- 尽可能使用面向列的插入。同样,这些应该被强烈地类型化,避免客户端需要转换您的值。
- 遵循 ClickHouse 建议 以获得最佳的插入性能。