基于serf的数据库同步模块库
github项目rqlite的db.go 用于操作数据库
github项目servicecomb-service-center/syncer/serf,用于serf的接口调用
5个文件已添加
1791 ■■■■■ 已修改文件
agent.go 213 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent_test.go 74 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.go 110 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db.go 514 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db_test.go 880 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent.go
New file
@@ -0,0 +1,213 @@
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package serf
import (
    "context"
    "errors"
    "time"
    "github.com/apache/servicecomb-service-center/pkg/log"
    "github.com/hashicorp/serf/cmd/serf/command/agent"
    "github.com/hashicorp/serf/serf"
)
// Agent warps the serf agent
type Agent struct {
    *agent.Agent
    conf    *Config
    readyCh chan struct{}
    errorCh chan error
}
// Create create serf agent with config
func Create(conf *Config) (*Agent, error) {
    // config cover to serf config
    serfConf, err := conf.convertToSerf()
    if err != nil {
        return nil, err
    }
    // create serf agent with serf config
    serfAgent, err := agent.Create(conf.Config, serfConf, nil)
    if err != nil {
        return nil, err
    }
    return &Agent{
        Agent:   serfAgent,
        conf:    conf,
        readyCh: make(chan struct{}),
        errorCh: make(chan error),
    }, nil
}
// Start agent
func (a *Agent) Start(ctx context.Context) {
    err := a.Agent.Start()
    if err != nil {
        log.Errorf(err, "start serf agent failed")
        a.errorCh <- err
        return
    }
    a.RegisterEventHandler(a)
    err = a.retryJoin(ctx)
    if err != nil {
        log.Errorf(err, "start serf agent failed")
        if err != ctx.Err() && a.errorCh != nil {
            a.errorCh <- err
        }
    }
}
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
// when the startup mode is "ModeCluster",
// used for logical grouping of serf nodes
func (a *Agent) HandleEvent(event serf.Event) {
    if event.EventType() != serf.EventMemberJoin {
        return
    }
    if a.conf.Mode == ModeCluster {
        if len(a.GroupMembers(a.conf.ClusterName)) < groupExpect {
            return
        }
    }
    a.DeregisterEventHandler(a)
    close(a.readyCh)
}
// Ready Returns a channel that will be closed when serf is ready
func (a *Agent) Ready() <-chan struct{} {
    return a.readyCh
}
// Error Returns a channel that will be transmit a serf error
func (a *Agent) Error() <-chan error {
    return a.errorCh
}
// Stop serf agent
func (a *Agent) Stop() {
    if a.errorCh != nil {
        a.Leave()
        a.Shutdown()
        close(a.errorCh)
        a.errorCh = nil
    }
}
// LocalMember returns the Member information for the local node
func (a *Agent) LocalMember() *serf.Member {
    serfAgent := a.Agent.Serf()
    if serfAgent != nil {
        member := serfAgent.LocalMember()
        return &member
    }
    return nil
}
// GroupMembers returns a point-in-time snapshot of the members of by groupName
func (a *Agent) GroupMembers(groupName string) (members []serf.Member) {
    serfAgent := a.Agent.Serf()
    if serfAgent != nil {
        for _, member := range serfAgent.Members() {
            log.Debugf("member = %s, groupName = %s", member.Name, member.Tags[tagKeyClusterName])
            if member.Tags[tagKeyClusterName] == groupName {
                members = append(members, member)
            }
        }
    }
    return
}
// Member get member information with node
func (a *Agent) Member(node string) *serf.Member {
    serfAgent := a.Agent.Serf()
    if serfAgent != nil {
        ms := serfAgent.Members()
        for _, m := range ms {
            if m.Name == node {
                return &m
            }
        }
    }
    return nil
}
// SerfConfig get serf config
func (a *Agent) SerfConfig() *serf.Config {
    return a.Agent.SerfConfig()
}
// Join serf clusters through one or more members
func (a *Agent) Join(addrs []string, replay bool) (n int, err error) {
    return a.Agent.Join(addrs, replay)
}
// UserEvent sends a UserEvent on Serf
func (a *Agent) UserEvent(name string, payload []byte, coalesce bool) error {
    return a.Agent.UserEvent(name, payload, coalesce)
}
// Query sends a Query on Serf
func (a *Agent) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) {
    return a.Agent.Query(name, payload, params)
}
func (a *Agent) retryJoin(ctx context.Context) (err error) {
    if len(a.conf.RetryJoin) == 0 {
        log.Infof("retry join mumber %d", len(a.conf.RetryJoin))
        return nil
    }
    // Count of attempts
    attempt := 0
    ticker := time.NewTicker(a.conf.RetryInterval)
    for {
        log.Infof("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin)
        var n int
        // Try to join the specified serf nodes
        n, err = a.Join(a.conf.RetryJoin, a.conf.ReplayOnJoin)
        if err == nil {
            log.Infof("serf: Join completed. Synced with %d initial agents", n)
            break
        }
        attempt++
        // If RetryMaxAttempts is greater than 0, agent will exit
        // and throw an error when the number of attempts exceeds RetryMaxAttempts,
        // else agent will try to join other nodes until successful always
        if a.conf.RetryMaxAttempts > 0 && attempt > a.conf.RetryMaxAttempts {
            err = errors.New("serf: maximum retry join attempts made, exiting")
            log.Errorf(err, err.Error())
            break
        }
        select {
        case <-ctx.Done():
            err = ctx.Err()
            goto done
        // Waiting for ticker to trigger
        case <-ticker.C:
        }
    }
done:
    ticker.Stop()
    return
}
agent_test.go
New file
@@ -0,0 +1,74 @@
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package serf
import (
    "context"
    "testing"
    "time"
    "github.com/hashicorp/serf/serf"
)
func TestAgent(t *testing.T) {
    conf := DefaultConfig()
    agent, err := Create(conf)
    if err != nil {
        t.Errorf("create agent failed, error: %s", err)
    }
    agent.Start(context.Background())
    <- agent.readyCh
    go func() {
        agent.ShutdownCh()
    }()
    time.Sleep(time.Second)
    err = agent.UserEvent("test", []byte("test"), true)
    if err != nil {
        t.Errorf("send user event failed, error: %s", err)
    }
    _, err = agent.Query("test", []byte("test"), &serf.QueryParam{})
    if err != nil {
        t.Errorf("query for other node failed, error: %s", err)
    }
    agent.LocalMember()
    agent.Member("testnode")
    agent.SerfConfig()
    _, err = agent.Join([]string{"127.0.0.1:9999"}, true)
    if err != nil {
        t.Logf("join to other node failed, error: %s", err)
    }
    err = agent.Leave()
    if err != nil {
        t.Errorf("angent leave failed, error: %s", err)
    }
    err = agent.ForceLeave("testnode")
    if err != nil {
        t.Errorf("angent force leave failed, error: %s", err)
    }
    err = agent.Shutdown()
    if err != nil {
        t.Errorf("angent shutdown failed, error: %s", err)
    }
}
config.go
New file
@@ -0,0 +1,110 @@
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package serf
import (
    "fmt"
    "strconv"
    "github.com/apache/servicecomb-service-center/syncer/pkg/utils"
    "github.com/hashicorp/memberlist"
    "github.com/hashicorp/serf/cmd/serf/command/agent"
    "github.com/hashicorp/serf/serf"
)
const (
    DefaultBindPort    = 30190
    DefaultRPCPort     = 30191
    DefaultClusterPort = 30192
    ModeSingle         = "single"
    ModeCluster        = "cluster"
    retryMaxAttempts   = 3
    groupExpect        = 3
    tagKeyClusterName  = "syncer-cluster-name"
    TagKeyClusterPort  = "syncer-cluster-port"
    TagKeyRPCPort      = "syncer-rpc-port"
)
// DefaultConfig default config
func DefaultConfig() *Config {
    agentConf := agent.DefaultConfig()
    agentConf.BindAddr = fmt.Sprintf("0.0.0.0:%d", DefaultBindPort)
    agentConf.RPCAddr = fmt.Sprintf("0.0.0.0:%d", DefaultRPCPort)
    return &Config{
        Mode:        ModeSingle,
        Config:      agentConf,
        ClusterPort: DefaultClusterPort,
    }
}
// Config struct
type Config struct {
    // config from serf agent
    *agent.Config
    Mode string `json:"mode"`
    // name to group members into cluster
    ClusterName string `json:"cluster_name"`
    // port to communicate between cluster members
    ClusterPort int `yaml:"cluster_port"`
    RPCPort     int `yaml:"-"`
}
// readConfigFile reads configuration from config file
func (c *Config) readConfigFile(filepath string) error {
    if filepath != "" {
        // todo:
    }
    return nil
}
// convertToSerf convert Config to serf.Config
func (c *Config) convertToSerf() (*serf.Config, error) {
    serfConf := serf.DefaultConfig()
    bindIP, bindPort, err := utils.SplitHostPort(c.BindAddr, DefaultBindPort)
    if err != nil {
        return nil, fmt.Errorf("invalid bind address: %s", err)
    }
    switch c.Profile {
    case "lan":
        serfConf.MemberlistConfig = memberlist.DefaultLANConfig()
    case "wan":
        serfConf.MemberlistConfig = memberlist.DefaultWANConfig()
    case "local":
        serfConf.MemberlistConfig = memberlist.DefaultLocalConfig()
    default:
        serfConf.MemberlistConfig = memberlist.DefaultLANConfig()
    }
    serfConf.MemberlistConfig.BindAddr = bindIP
    serfConf.MemberlistConfig.BindPort = bindPort
    serfConf.NodeName = c.NodeName
    serfConf.Tags = map[string]string{TagKeyRPCPort: strconv.Itoa(c.RPCPort)}
    if c.ClusterName != "" {
        serfConf.Tags[tagKeyClusterName] = c.ClusterName
        serfConf.Tags[TagKeyClusterPort] = strconv.Itoa(c.ClusterPort)
    }
    if c.Mode == ModeCluster && c.RetryMaxAttempts <= 0 {
        c.RetryMaxAttempts = retryMaxAttempts
    }
    return serfConf, nil
}
db.go
New file
@@ -0,0 +1,514 @@
// Package db exposes a lightweight abstraction over the SQLite code.
// It performs some basic mapping of lower-level types to rqlite types.
package db
import (
    "database/sql/driver"
    "expvar"
    "fmt"
    "io"
    "net/url"
    "strings"
    "time"
    "github.com/mattn/go-sqlite3"
)
const bkDelay = 250
const (
    fkChecks         = "PRAGMA foreign_keys"
    fkChecksEnabled  = "PRAGMA foreign_keys=ON"
    fkChecksDisabled = "PRAGMA foreign_keys=OFF"
    numExecutions      = "executions"
    numExecutionErrors = "execution_errors"
    numQueries         = "queries"
    numETx             = "execute_transactions"
    numQTx             = "query_transactions"
)
// DBVersion is the SQLite version.
var DBVersion string
// stats captures stats for the DB layer.
var stats *expvar.Map
func init() {
    DBVersion, _, _ = sqlite3.Version()
    stats = expvar.NewMap("db")
    stats.Add(numExecutions, 0)
    stats.Add(numExecutionErrors, 0)
    stats.Add(numQueries, 0)
    stats.Add(numETx, 0)
    stats.Add(numQTx, 0)
}
// Result represents the outcome of an operation that changes rows.
type Result struct {
    LastInsertID int64   `json:"last_insert_id,omitempty"`
    RowsAffected int64   `json:"rows_affected,omitempty"`
    Error        string  `json:"error,omitempty"`
    Time         float64 `json:"time,omitempty"`
}
// Rows represents the outcome of an operation that returns query data.
type Rows struct {
    Columns []string        `json:"columns,omitempty"`
    Types   []string        `json:"types,omitempty"`
    Values  [][]interface{} `json:"values,omitempty"`
    Error   string          `json:"error,omitempty"`
    Time    float64         `json:"time,omitempty"`
}
// DB is the SQL database.
type DB struct {
    path     string // Path to database file.
    dsnQuery string // DSN query params, if any.
    memory   bool   // In-memory only.
    fqdsn    string // Fully-qualified DSN for opening SQLite.
}
// New returns an instance of the database at path. If the database
// has already been created and opened, this database will share
// the data of that database when connected.
func New(path, dsnQuery string, memory bool) (*DB, error) {
    q, err := url.ParseQuery(dsnQuery)
    if err != nil {
        return nil, err
    }
    if memory {
        q.Set("mode", "memory")
        q.Set("cache", "shared")
    }
    if !strings.HasPrefix(path, "file:") {
        path = fmt.Sprintf("file:%s", path)
    }
    var fqdsn string
    if len(q) > 0 {
        fqdsn = fmt.Sprintf("%s?%s", path, q.Encode())
    } else {
        fqdsn = path
    }
    return &DB{
        path:     path,
        dsnQuery: dsnQuery,
        memory:   memory,
        fqdsn:    fqdsn,
    }, nil
}
// Connect returns a connection to the database.
func (d *DB) Connect() (*Conn, error) {
    drv := sqlite3.SQLiteDriver{}
    c, err := drv.Open(d.fqdsn)
    if err != nil {
        return nil, err
    }
    return &Conn{
        sqlite: c.(*sqlite3.SQLiteConn),
    }, nil
}
// Conn represents a connection to a database. Two Connection objects
// to the same database are READ_COMMITTED isolated.
type Conn struct {
    sqlite *sqlite3.SQLiteConn
}
// TransactionActive returns whether a transaction is currently active
// i.e. if the database is NOT in autocommit mode.
func (c *Conn) TransactionActive() bool {
    return !c.sqlite.AutoCommit()
}
// AbortTransaction aborts -- rolls back -- any active transaction. Calling code
// should know exactly what it is doing if it decides to call this function. It
// can be used to clean up any dangling state that may result from certain
// error scenarios.
func (c *Conn) AbortTransaction() error {
    _, err := c.Execute([]string{`ROLLBACK`}, false, false)
    return err
}
// Execute executes queries that modify the database.
func (c *Conn) Execute(queries []string, tx, xTime bool) ([]*Result, error) {
    stats.Add(numExecutions, int64(len(queries)))
    if tx {
        stats.Add(numETx, 1)
    }
    type Execer interface {
        Exec(query string, args []driver.Value) (driver.Result, error)
    }
    var allResults []*Result
    err := func() error {
        var execer Execer
        var rollback bool
        var t driver.Tx
        var err error
        // Check for the err, if set rollback.
        defer func() {
            if t != nil {
                if rollback {
                    t.Rollback()
                    return
                }
                t.Commit()
            }
        }()
        // handleError sets the error field on the given result. It returns
        // whether the caller should continue processing or break.
        handleError := func(result *Result, err error) bool {
            stats.Add(numExecutionErrors, 1)
            result.Error = err.Error()
            allResults = append(allResults, result)
            if tx {
                rollback = true // Will trigger the rollback.
                return false
            }
            return true
        }
        execer = c.sqlite
        // Create the correct execution object, depending on whether a
        // transaction was requested.
        if tx {
            t, err = c.sqlite.Begin()
            if err != nil {
                return err
            }
        }
        // Execute each query.
        for _, q := range queries {
            if q == "" {
                continue
            }
            result := &Result{}
            start := time.Now()
            r, err := execer.Exec(q, nil)
            if err != nil {
                if handleError(result, err) {
                    continue
                }
                break
            }
            if r == nil {
                continue
            }
            lid, err := r.LastInsertId()
            if err != nil {
                if handleError(result, err) {
                    continue
                }
                break
            }
            result.LastInsertID = lid
            ra, err := r.RowsAffected()
            if err != nil {
                if handleError(result, err) {
                    continue
                }
                break
            }
            result.RowsAffected = ra
            if xTime {
                result.Time = time.Now().Sub(start).Seconds()
            }
            allResults = append(allResults, result)
        }
        return nil
    }()
    return allResults, err
}
// Query executes queries that return rows, but don't modify the database.
func (c *Conn) Query(queries []string, tx, xTime bool) ([]*Rows, error) {
    stats.Add(numQueries, int64(len(queries)))
    if tx {
        stats.Add(numQTx, 1)
    }
    type Queryer interface {
        Query(query string, args []driver.Value) (driver.Rows, error)
    }
    var allRows []*Rows
    err := func() (err error) {
        var queryer Queryer
        var t driver.Tx
        defer func() {
            // XXX THIS DOESN'T ACTUALLY WORK! Might as WELL JUST COMMIT?
            if t != nil {
                if err != nil {
                    t.Rollback()
                    return
                }
                t.Commit()
            }
        }()
        queryer = c.sqlite
        // Create the correct query object, depending on whether a
        // transaction was requested.
        if tx {
            t, err = c.sqlite.Begin()
            if err != nil {
                return err
            }
        }
        for _, q := range queries {
            if q == "" {
                continue
            }
            rows := &Rows{}
            start := time.Now()
            rs, err := queryer.Query(q, nil)
            if err != nil {
                rows.Error = err.Error()
                allRows = append(allRows, rows)
                continue
            }
            defer rs.Close()
            columns := rs.Columns()
            rows.Columns = columns
            rows.Types = rs.(*sqlite3.SQLiteRows).DeclTypes()
            dest := make([]driver.Value, len(rows.Columns))
            for {
                err := rs.Next(dest)
                if err != nil {
                    if err != io.EOF {
                        rows.Error = err.Error()
                    }
                    break
                }
                values := normalizeRowValues(dest, rows.Types)
                rows.Values = append(rows.Values, values)
            }
            if xTime {
                rows.Time = time.Now().Sub(start).Seconds()
            }
            allRows = append(allRows, rows)
        }
        return nil
    }()
    return allRows, err
}
// EnableFKConstraints allows control of foreign key constraint checks.
func (c *Conn) EnableFKConstraints(e bool) error {
    q := fkChecksEnabled
    if !e {
        q = fkChecksDisabled
    }
    _, err := c.sqlite.Exec(q, nil)
    return err
}
// FKConstraints returns whether FK constraints are set or not.
func (c *Conn) FKConstraints() (bool, error) {
    r, err := c.sqlite.Query(fkChecks, nil)
    if err != nil {
        return false, err
    }
    dest := make([]driver.Value, len(r.Columns()))
    types := r.(*sqlite3.SQLiteRows).DeclTypes()
    if err := r.Next(dest); err != nil {
        return false, err
    }
    values := normalizeRowValues(dest, types)
    if values[0] == int64(1) {
        return true, nil
    }
    return false, nil
}
// Load loads the connected database from the database connected to src.
// It overwrites the data contained in this database. It is the caller's
// responsibility to ensure that no other connections to this database
// are accessed while this operation is in progress.
func (c *Conn) Load(src *Conn) error {
    return copyDatabase(c.sqlite, src.sqlite)
}
// Backup writes a snapshot of the database over the given database
// connection, erasing all the contents of the destination database.
// The consistency of the snapshot is READ_COMMITTED relative to any
// other connections currently open to this database. The caller must
// ensure that all connections to the destination database are not
// accessed during this operation.
func (c *Conn) Backup(dst *Conn) error {
    return copyDatabase(dst.sqlite, c.sqlite)
}
// Dump writes a snapshot of the database in SQL text format. The consistency
// of the snapshot is READ_COMMITTED relative to any other connections
// currently open to this database.
func (c *Conn) Dump(w io.Writer) error {
    if _, err := w.Write([]byte("PRAGMA foreign_keys=OFF;\nBEGIN TRANSACTION;\n")); err != nil {
        return err
    }
    // Get the schema.
    query := `SELECT "name", "type", "sql" FROM "sqlite_master"
              WHERE "sql" NOT NULL AND "type" == 'table' ORDER BY "name"`
    rows, err := c.Query([]string{query}, false, false)
    if err != nil {
        return err
    }
    row := rows[0]
    for _, v := range row.Values {
        table := v[0].(string)
        var stmt string
        if table == "sqlite_sequence" {
            stmt = `DELETE FROM "sqlite_sequence";`
        } else if table == "sqlite_stat1" {
            stmt = `ANALYZE "sqlite_master";`
        } else if strings.HasPrefix(table, "sqlite_") {
            continue
        } else {
            stmt = v[2].(string)
        }
        if _, err := w.Write([]byte(fmt.Sprintf("%s;\n", stmt))); err != nil {
            return err
        }
        tableIndent := strings.Replace(table, `"`, `""`, -1)
        query = fmt.Sprintf(`PRAGMA table_info("%s")`, tableIndent)
        r, err := c.Query([]string{query}, false, false)
        if err != nil {
            return err
        }
        var columnNames []string
        for _, w := range r[0].Values {
            columnNames = append(columnNames, fmt.Sprintf(`'||quote("%s")||'`, w[1].(string)))
        }
        query = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' FROM "%s";`,
            tableIndent,
            strings.Join(columnNames, ","),
            tableIndent)
        r, err = c.Query([]string{query}, false, false)
        if err != nil {
            return err
        }
        for _, x := range r[0].Values {
            y := fmt.Sprintf("%s;\n", x[0].(string))
            if _, err := w.Write([]byte(y)); err != nil {
                return err
            }
        }
    }
    // Do indexes, triggers, and views.
    query = `SELECT "name", "type", "sql" FROM "sqlite_master"
              WHERE "sql" NOT NULL AND "type" IN ('index', 'trigger', 'view')`
    rows, err = c.Query([]string{query}, false, false)
    if err != nil {
        return err
    }
    row = rows[0]
    for _, v := range row.Values {
        if _, err := w.Write([]byte(fmt.Sprintf("%s;\n", v[2]))); err != nil {
            return err
        }
    }
    if _, err := w.Write([]byte("COMMIT;\n")); err != nil {
        return err
    }
    return nil
}
// Close closes the connection.
func (c *Conn) Close() error {
    if c != nil {
        return c.sqlite.Close()
    }
    return nil
}
func copyDatabase(dst *sqlite3.SQLiteConn, src *sqlite3.SQLiteConn) error {
    bk, err := dst.Backup("main", src, "main")
    if err != nil {
        return err
    }
    for {
        done, err := bk.Step(-1)
        if err != nil {
            bk.Finish()
            return err
        }
        if done {
            break
        }
        time.Sleep(bkDelay * time.Millisecond)
    }
    return bk.Finish()
}
// normalizeRowValues performs some normalization of values in the returned rows.
// Text values come over (from sqlite-go) as []byte instead of strings
// for some reason, so we have explicitly convert (but only when type
// is "text" so we don't affect BLOB types)
func normalizeRowValues(row []driver.Value, types []string) []interface{} {
    values := make([]interface{}, len(types))
    for i, v := range row {
        if isTextType(types[i]) {
            switch val := v.(type) {
            case []byte:
                values[i] = string(val)
            default:
                values[i] = val
            }
        } else {
            values[i] = v
        }
    }
    return values
}
// isTextType returns whether the given type has a SQLite text affinity.
// http://www.sqlite.org/datatype3.html
func isTextType(t string) bool {
    return t == "text" ||
        t == "json" ||
        t == "" ||
        strings.HasPrefix(t, "varchar") ||
        strings.HasPrefix(t, "varying character") ||
        strings.HasPrefix(t, "nchar") ||
        strings.HasPrefix(t, "native character") ||
        strings.HasPrefix(t, "nvarchar") ||
        strings.HasPrefix(t, "clob")
}
db_test.go
New file
@@ -0,0 +1,880 @@
package db
import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "os"
    "strings"
    "testing"
    "github.com/rqlite/rqlite/testdata/chinook"
)
/*
 * Lowest-layer database tests
 */
func TestNewDBOnDisk(t *testing.T) {
    t.Parallel()
    db, err := New(mustTempFilename(), "", false)
    if err != nil {
        t.Fatalf("failed to create new database: %s", err.Error())
    }
    if db == nil {
        t.Fatal("database is nil")
    }
}
func TestNewDBInMemory(t *testing.T) {
    t.Parallel()
    db, err := New(mustTempFilename(), "", true)
    if err != nil {
        t.Fatalf("failed to create new database: %s", err.Error())
    }
    if db == nil {
        t.Fatal("database is nil")
    }
}
func TestDBCreateOnDiskConnection(t *testing.T) {
    t.Parallel()
    db, path := mustCreateDatabase()
    defer os.Remove(path)
    conn, err := db.Connect()
    if err != nil {
        t.Fatalf("failed to create connection to on-disk database: %s", err.Error())
    }
    if conn == nil {
        t.Fatal("connection to on-disk database is nil")
    }
    if err := conn.Close(); err != nil {
        t.Fatalf("failed to close on-disk connection: %s", err.Error())
    }
}
func TestDBCreateInMemoryConnection(t *testing.T) {
    t.Parallel()
    db := mustCreateInMemoryDatabase()
    conn, err := db.Connect()
    if err != nil {
        t.Fatalf("failed to create connection to in-memory database: %s", err.Error())
    }
    if conn == nil {
        t.Fatal("connection to in-memory database is nil")
    }
    if err := conn.Close(); err != nil {
        t.Fatalf("failed to close in-memory connection: %s", err.Error())
    }
}
func TestDumpOnDisk(t *testing.T) {
    t.Parallel()
    db, path := mustCreateDatabase()
    defer os.Remove(path)
    c, err := db.Connect()
    if err != nil {
        t.Fatalf("failed to create connection to on-disk database: %s", err.Error())
    }
    _, err = c.Execute([]string{chinook.DB}, false, false)
    if err != nil {
        t.Fatalf("failed to load chinook dump: %s", err.Error())
    }
    var b strings.Builder
    if err := c.Dump(&b); err != nil {
        t.Fatalf("failed to dump database: %s", err.Error())
    }
    if b.String() != chinook.DB {
        t.Fatal("dumped database does not equal entered database")
    }
}
func TestDumpInMemory(t *testing.T) {
    t.Parallel()
    db := mustCreateInMemoryDatabase()
    c, err := db.Connect()
    if err != nil {
        t.Fatalf("failed to create connection to on-disk database: %s", err.Error())
    }
    _, err = c.Execute([]string{chinook.DB}, false, false)
    if err != nil {
        t.Fatalf("failed to load chinook dump: %s", err.Error())
    }
    var b strings.Builder
    if err := c.Dump(&b); err != nil {
        t.Fatalf("failed to dump database: %s", err.Error())
    }
    if b.String() != chinook.DB {
        t.Fatal("dumped database does not equal entered database")
    }
}
type testF func(test *testing.T, c *Conn)
var dbTestfunctions []testF = []testF{
    testTableCreation,
    testSQLiteMasterTable,
    testTextTypes,
    testEmptyStatements,
    testSimpleSingleStatements,
    testSimpleJoinStatements,
    testSimpleSingleConcatStatements,
    testSimpleMultiStatements,
    testSimpleSingleMultiLineStatements,
    testSimpleFailingStatementsExecute,
    testSimpleFailingStatementsQuery,
    testSimplePragmaTableInfo,
    testCommonTableExpressions,
    testForeignKeyConstraints,
    testUniqueConstraints,
    testActiveTransaction,
    testAbortTransaction,
    testPartialFail,
    testSimpleTransaction,
    testPartialFailTransaction,
    testBackup,
    testLoad,
}
func TestDatabaseInMemory(t *testing.T) {
    t.Parallel()
    for _, f := range dbTestfunctions {
        dbInMem := mustCreateInMemoryDatabase()
        connInMem, err := dbInMem.Connect()
        if err != nil {
            t.Fatalf("failed to create connection to in-memory database: %s", err.Error())
        }
        f(t, connInMem)
        connInMem.Close()
    }
}
func TestDatabaseOnDisk(t *testing.T) {
    t.Parallel()
    for _, f := range dbTestfunctions {
        dbDisk, path := mustCreateDatabase()
        connDisk, err := dbDisk.Connect()
        if err != nil {
            t.Fatalf("failed to create connection to on-disk database: %s", err.Error())
        }
        f(t, connDisk)
        connDisk.Close()
        os.Remove(path)
    }
}
func testTableCreation(t *testing.T, c *Conn) {
    _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
    if err != nil {
        t.Fatalf("failed to create table: %s", err.Error())
    }
    r, err := c.Query([]string{"SELECT * FROM foo"}, false, false)
    if err != nil {
        t.Fatalf("failed to query empty table: %s", err.Error())
    }
    if exp, got := `[{"columns":["id","name"],"types":["integer","text"]}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query, expected %s, got %s", exp, got)
    }
}
func testSQLiteMasterTable(t *testing.T, c *Conn) {
    _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
    if err != nil {
        t.Fatalf("failed to create table: %s", err.Error())
    }
    r, err := c.Query([]string{"SELECT * FROM sqlite_master"}, false, false)
    if err != nil {
        t.Fatalf("failed to query master table: %s", err.Error())
    }
    if exp, got := `[{"columns":["type","name","tbl_name","rootpage","sql"],"types":["text","text","text","int","text"],"values":[["table","foo","foo",2,"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"]]}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query, expected %s, got %s", exp, got)
    }
}
func testEmptyStatements(t *testing.T, c *Conn) {
    _, err := c.Execute([]string{""}, false, false)
    if err != nil {
        t.Fatalf("failed to execute empty statement: %s", err.Error())
    }
    _, err = c.Execute([]string{";"}, false, false)
    if err != nil {
        t.Fatalf("failed to execute empty statement with semicolon: %s", err.Error())
    }
}
func testTextTypes(t *testing.T, c *Conn) {
    _, err := c.Execute([]string{"CREATE TABLE foo (c0 VARCHAR(36), c1 JSON, c2 NCHAR, c3 NVARCHAR, c4 CLOB)"}, false, false)
    if err != nil {
        t.Fatalf("failed to create table: %s", err.Error())
    }
    _, err = c.Execute([]string{`INSERT INTO foo(c0, c1, c2, c3, c4) VALUES("fiona", '{"mittens": "foobar"}', "bob", "dana", "declan")`}, false, false)
    if err != nil {
        t.Fatalf("failed to insert record: %s", err.Error())
    }
    r, err := c.Query([]string{"SELECT * FROM foo"}, false, false)
    if err != nil {
        t.Fatalf("failed to query: %s", err.Error())
    }
    if exp, got := `[{"columns":["c0","c1","c2","c3","c4"],"types":["varchar(36)","json","nchar","nvarchar","clob"],"values":[["fiona","{\"mittens\": \"foobar\"}","bob","dana","declan"]]}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query, expected %s, got %s", exp, got)
    }
}
func testSimpleSingleStatements(t *testing.T, c *Conn) {
    _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
    if err != nil {
        t.Fatalf("failed to create table: %s", err.Error())
    }
    _, err = c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`}, false, false)
    if err != nil {
        t.Fatalf("failed to insert record: %s", err.Error())
    }
    _, err = c.Execute([]string{`INSERT INTO foo(name) VALUES("aoife")`}, false, false)
    if err != nil {
        t.Fatalf("failed to insert record: %s", err.Error())
    }
    r, err := c.Query([]string{`SELECT * FROM foo`}, false, false)
    if err != nil {
        t.Fatalf("failed to query table: %s", err.Error())
    }
    if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"aoife"]]}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
    r, err = c.Query([]string{`SELECT * FROM foo WHERE name="aoife"`}, false, false)
    if err != nil {
        t.Fatalf("failed to query table: %s", err.Error())
    }
    if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[2,"aoife"]]}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
    r, err = c.Query([]string{`SELECT * FROM foo WHERE name="dana"`}, false, false)
    if err != nil {
        t.Fatalf("failed to query table: %s", err.Error())
    }
    if exp, got := `[{"columns":["id","name"],"types":["integer","text"]}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
    r, err = c.Query([]string{`SELECT * FROM foo ORDER BY name`}, false, false)
    if err != nil {
        t.Fatalf("failed to query table: %s", err.Error())
    }
    if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[2,"aoife"],[1,"fiona"]]}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
    r, err = c.Query([]string{`SELECT *,name FROM foo`}, false, false)
    if err != nil {
        t.Fatalf("failed to query table: %s", err.Error())
    }
    if exp, got := `[{"columns":["id","name","name"],"types":["integer","text","text"],"values":[[1,"fiona","fiona"],[2,"aoife","aoife"]]}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
}
func testSimpleJoinStatements(t *testing.T, c *Conn) {
    _, err := c.Execute([]string{"CREATE TABLE names (id INTEGER NOT NULL PRIMARY KEY, name TEXT, ssn TEXT)"}, false, false)
    if err != nil {
        t.Fatalf("failed to create table: %s", err.Error())
    }
    _, err = c.Execute([]string{
        `INSERT INTO "names" VALUES(1,'bob','123-45-678')`,
        `INSERT INTO "names" VALUES(2,'tom','111-22-333')`,
        `INSERT INTO "names" VALUES(3,'matt','222-22-333')`,
    }, false, false)
    if err != nil {
        t.Fatalf("failed to insert record: %s", err.Error())
    }
    _, err = c.Execute([]string{"CREATE TABLE staff (id INTEGER NOT NULL PRIMARY KEY, employer TEXT, ssn TEXT)"}, false, false)
    if err != nil {
        t.Fatalf("failed to create table: %s", err.Error())
    }
    _, err = c.Execute([]string{`INSERT INTO "staff" VALUES(1,'acme','222-22-333')`}, false, false)
    if err != nil {
        t.Fatalf("failed to insert record: %s", err.Error())
    }
    r, err := c.Query([]string{`SELECT names.id,name,names.ssn,employer FROM names INNER JOIN staff ON staff.ssn = names.ssn`}, false, false)
    if err != nil {
        t.Fatalf("failed to query table using JOIN: %s", err.Error())
    }
    if exp, got := `[{"columns":["id","name","ssn","employer"],"types":["integer","text","text","text"],"values":[[3,"matt","222-22-333","acme"]]}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
}
func testSimpleSingleConcatStatements(t *testing.T, c *Conn) {
    _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
    if err != nil {
        t.Fatalf("failed to create table: %s", err.Error())
    }
    _, err = c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`}, false, false)
    if err != nil {
        t.Fatalf("failed to insert record: %s", err.Error())
    }
    r, err := c.Query([]string{`SELECT id || "_bar", name FROM foo`}, false, false)
    if err != nil {
        t.Fatalf("failed to query table: %s", err.Error())
    }
    if exp, got := `[{"columns":["id || \"_bar\"","name"],"types":["","text"],"values":[["1_bar","fiona"]]}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
}
func testSimpleMultiStatements(t *testing.T, c *Conn) {
    _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
    if err != nil {
        t.Fatalf("failed to create table: %s", err.Error())
    }
    re, err := c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`, `INSERT INTO foo(name) VALUES("dana")`}, false, false)
    if err != nil {
        t.Fatalf("failed to insert record: %s", err.Error())
    }
    if exp, got := `[{"last_insert_id":1,"rows_affected":1},{"last_insert_id":2,"rows_affected":1}]`, asJSON(re); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
    ro, err := c.Query([]string{`SELECT * FROM foo`, `SELECT * FROM foo`}, false, false)
    if err != nil {
        t.Fatalf("failed to query empty table: %s", err.Error())
    }
    if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"dana"]]},{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"dana"]]}]`, asJSON(ro); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
}
func testSimpleSingleMultiLineStatements(t *testing.T, c *Conn) {
    _, err := c.Execute([]string{`
CREATE TABLE foo (
    id INTEGER NOT NULL PRIMARY KEY,
    name TEXT
)`}, false, false)
    if err != nil {
        t.Fatalf("failed to create table: %s", err.Error())
    }
    re, err := c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`, `INSERT INTO foo(name) VALUES("dana")`}, false, false)
    if err != nil {
        t.Fatalf("failed to insert record: %s", err.Error())
    }
    if exp, got := `[{"last_insert_id":1,"rows_affected":1},{"last_insert_id":2,"rows_affected":1}]`, asJSON(re); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
}
func testSimpleFailingStatementsExecute(t *testing.T, c *Conn) {
    r, err := c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`}, false, false)
    if err != nil {
        t.Fatalf("error executing insertion into non-existent table: %s", err.Error())
    }
    if exp, got := `[{"error":"no such table: foo"}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
    r, err = c.Execute([]string{`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`}, false, false)
    if err != nil {
        t.Fatalf("failed to create table: %s", err.Error())
    }
    if exp, got := `[{}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
    r, err = c.Execute([]string{`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`}, false, false)
    if err != nil {
        t.Fatalf("failed to attempt creation of duplicate table: %s", err.Error())
    }
    if exp, got := `[{"error":"table foo already exists"}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
    r, err = c.Execute([]string{`INSERT INTO foo(id, name) VALUES(11, "fiona")`}, false, false)
    if err != nil {
        t.Fatalf("failed to insert record: %s", err.Error())
    }
    if exp, got := `[{"last_insert_id":11,"rows_affected":1}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
    r, err = c.Execute([]string{`INSERT INTO foo(id, name) VALUES(11, "fiona")`}, false, false)
    if err != nil {
        t.Fatalf("failed to attempt duplicate record insertion: %s", err.Error())
    }
    if exp, got := `[{"error":"UNIQUE constraint failed: foo.id"}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
    r, err = c.Execute([]string{`utter nonsense`}, false, false)
    if err != nil {
        if exp, got := `[{"error":"near \"utter\": syntax error"}]`, asJSON(r); exp != got {
            t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
        }
    }
}
func testSimpleFailingStatementsQuery(t *testing.T, c *Conn) {
    ro, err := c.Query([]string{`SELECT * FROM bar`}, false, false)
    if err != nil {
        t.Fatalf("failed to attempt query of non-existent table: %s", err.Error())
    }
    if exp, got := `[{"error":"no such table: bar"}]`, asJSON(ro); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
    ro, err = c.Query([]string{`SELECTxx * FROM foo`}, false, false)
    if err != nil {
        t.Fatalf("failed to attempt nonsense query: %s", err.Error())
    }
    if exp, got := `[{"error":"near \"SELECTxx\": syntax error"}]`, asJSON(ro); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
    r, err := c.Query([]string{`utter nonsense`}, false, false)
    if err != nil {
        if exp, got := `[{"error":"near \"utter\": syntax error"}]`, asJSON(r); exp != got {
            t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
        }
    }
}
func testSimplePragmaTableInfo(t *testing.T, c *Conn) {
    r, err := c.Execute([]string{`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`}, false, false)
    if err != nil {
        t.Fatalf("failed to create table: %s", err.Error())
    }
    if exp, got := `[{}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
    res, err := c.Query([]string{`PRAGMA table_info("foo")`}, false, false)
    if err != nil {
        t.Fatalf("failed to query a common table expression: %s", err.Error())
    }
    if exp, got := `[{"columns":["cid","name","type","notnull","dflt_value","pk"],"types":["","","","","",""],"values":[[0,"id","INTEGER",1,null,1],[1,"name","TEXT",0,null,0]]}]`, asJSON(res); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
}
func testCommonTableExpressions(t *testing.T, c *Conn) {
    _, err := c.Execute([]string{"CREATE TABLE test(x foo)"}, false, false)
    if err != nil {
        t.Fatalf("failed to create table: %s", err.Error())
    }
    _, err = c.Execute([]string{`INSERT INTO test VALUES(1)`}, false, false)
    if err != nil {
        t.Fatalf("failed to insert record: %s", err.Error())
    }
    r, err := c.Query([]string{`WITH bar AS (SELECT * FROM test) SELECT * FROM test WHERE x = 1`}, false, false)
    if err != nil {
        t.Fatalf("failed to query a common table expression: %s", err.Error())
    }
    if exp, got := `[{"columns":["x"],"types":["foo"],"values":[[1]]}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
    r, err = c.Query([]string{`WITH bar AS (SELECT * FROM test) SELECT * FROM test WHERE x = 2`}, false, false)
    if err != nil {
        t.Fatalf("failed to query a common table expression: %s", err.Error())
    }
    if exp, got := `[{"columns":["x"],"types":["foo"]}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
}
func testUniqueConstraints(t *testing.T, c *Conn) {
    _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT, CONSTRAINT name_unique UNIQUE (name))"}, false, false)
    if err != nil {
        t.Fatalf("failed to create table: %s", err.Error())
    }
    r, err := c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`}, false, false)
    if err != nil {
        t.Fatalf("error executing insertion into table: %s", err.Error())
    }
    if exp, got := `[{"last_insert_id":1,"rows_affected":1}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for INSERT\nexp: %s\ngot: %s", exp, got)
    }
    // UNIQUE constraint should fire.
    r, err = c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`}, false, false)
    if err != nil {
        t.Fatalf("error executing insertion into table: %s", err.Error())
    }
    if exp, got := `[{"error":"UNIQUE constraint failed: foo.name"}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for INSERT\nexp: %s\ngot: %s", exp, got)
    }
}
func testActiveTransaction(t *testing.T, c *Conn) {
    if c.TransactionActive() {
        t.Fatal("transaction incorrectly marked as active")
    }
    if _, err := c.Execute([]string{`BEGIN`}, false, false); err != nil {
        t.Fatalf("error starting transaction: %s", err.Error())
    }
    if !c.TransactionActive() {
        t.Fatal("transaction incorrectly marked as inactive")
    }
    if _, err := c.Execute([]string{`COMMIT`}, false, false); err != nil {
        t.Fatalf("error starting transaction: %s", err.Error())
    }
    if c.TransactionActive() {
        t.Fatal("transaction incorrectly marked as active")
    }
    if _, err := c.Execute([]string{`BEGIN`}, false, false); err != nil {
        t.Fatalf("error starting transaction: %s", err.Error())
    }
    if !c.TransactionActive() {
        t.Fatal("transaction incorrectly marked as inactive")
    }
    if _, err := c.Execute([]string{`ROLLBACK`}, false, false); err != nil {
        t.Fatalf("error starting transaction: %s", err.Error())
    }
    if c.TransactionActive() {
        t.Fatal("transaction incorrectly marked as active")
    }
}
func testAbortTransaction(t *testing.T, c *Conn) {
    if err := c.AbortTransaction(); err != nil {
        t.Fatalf("error abrorting non-active transaction: %s", err.Error())
    }
    if _, err := c.Execute([]string{`BEGIN`}, false, false); err != nil {
        t.Fatalf("error starting transaction: %s", err.Error())
    }
    if !c.TransactionActive() {
        t.Fatal("transaction incorrectly marked as inactive")
    }
    if err := c.AbortTransaction(); err != nil {
        t.Fatalf("error abrorting non-active transaction: %s", err.Error())
    }
    if c.TransactionActive() {
        t.Fatal("transaction incorrectly marked as active")
    }
}
func testPartialFail(t *testing.T, c *Conn) {
    _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
    if err != nil {
        t.Fatalf("failed to create table: %s", err.Error())
    }
    stmts := []string{
        `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
        `INSERT INTO foo(id, name) VALUES(2, "fiona")`,
        `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
        `INSERT INTO foo(id, name) VALUES(4, "fiona")`,
    }
    r, err := c.Execute(stmts, false, false)
    if err != nil {
        t.Fatalf("failed to insert records: %s", err.Error())
    }
    if exp, got := `[{"last_insert_id":1,"rows_affected":1},{"last_insert_id":2,"rows_affected":1},{"error":"UNIQUE constraint failed: foo.id"},{"last_insert_id":4,"rows_affected":1}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
    ro, err := c.Query([]string{`SELECT * FROM foo`}, false, false)
    if err != nil {
        t.Fatalf("failed to query table: %s", err.Error())
    }
    if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"fiona"],[4,"fiona"]]}]`, asJSON(ro); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
}
func testSimpleTransaction(t *testing.T, c *Conn) {
    _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
    if err != nil {
        t.Fatalf("failed to create table: %s", err.Error())
    }
    stmts := []string{
        `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
        `INSERT INTO foo(id, name) VALUES(2, "fiona")`,
        `INSERT INTO foo(id, name) VALUES(3, "fiona")`,
        `INSERT INTO foo(id, name) VALUES(4, "fiona")`,
    }
    r, err := c.Execute(stmts, true, false)
    if err != nil {
        t.Fatalf("failed to insert records: %s", err.Error())
    }
    if exp, got := `[{"last_insert_id":1,"rows_affected":1},{"last_insert_id":2,"rows_affected":1},{"last_insert_id":3,"rows_affected":1},{"last_insert_id":4,"rows_affected":1}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
    ro, err := c.Query([]string{`SELECT * FROM foo`}, false, false)
    if err != nil {
        t.Fatalf("failed to query table: %s", err.Error())
    }
    if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"fiona"],[3,"fiona"],[4,"fiona"]]}]`, asJSON(ro); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
}
func testPartialFailTransaction(t *testing.T, c *Conn) {
    _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
    if err != nil {
        t.Fatalf("failed to create table: %s", err.Error())
    }
    stmts := []string{
        `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
        `INSERT INTO foo(id, name) VALUES(2, "fiona")`,
        `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
        `INSERT INTO foo(id, name) VALUES(4, "fiona")`,
    }
    r, err := c.Execute(stmts, true, false)
    if err != nil {
        t.Fatalf("failed to insert records: %s", err.Error())
    }
    if exp, got := `[{"last_insert_id":1,"rows_affected":1},{"last_insert_id":2,"rows_affected":1},{"error":"UNIQUE constraint failed: foo.id"}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
    ro, err := c.Query([]string{`SELECT * FROM foo`}, false, false)
    if err != nil {
        t.Fatalf("failed to query table: %s", err.Error())
    }
    if exp, got := `[{"columns":["id","name"],"types":["integer","text"]}]`, asJSON(ro); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
}
func testForeignKeyConstraints(t *testing.T, c *Conn) {
    _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, ref INTEGER REFERENCES foo(id))"}, false, false)
    if err != nil {
        t.Fatalf("failed to create table: %s", err.Error())
    }
    // Explicitly disable constraints.
    if err := c.EnableFKConstraints(false); err != nil {
        t.Fatalf("failed to enable foreign key constraints: %s", err.Error())
    }
    // Check constraints
    fk, err := c.FKConstraints()
    if err != nil {
        t.Fatalf("failed to check FK constraints: %s", err.Error())
    }
    if fk != false {
        t.Fatal("FK constraints are not disabled")
    }
    stmts := []string{
        `INSERT INTO foo(id, ref) VALUES(1, 2)`,
    }
    r, err := c.Execute(stmts, false, false)
    if err != nil {
        t.Fatalf("failed to execute FK test statement: %s", err.Error())
    }
    if exp, got := `[{"last_insert_id":1,"rows_affected":1}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
    // Explicitly enable constraints.
    if err := c.EnableFKConstraints(true); err != nil {
        t.Fatalf("failed to enable foreign key constraints: %s", err.Error())
    }
    // Check constraints
    fk, err = c.FKConstraints()
    if err != nil {
        t.Fatalf("failed to check FK constraints: %s", err.Error())
    }
    if fk != true {
        t.Fatal("FK constraints are not enabled")
    }
    stmts = []string{
        `INSERT INTO foo(id, ref) VALUES(1, 3)`,
    }
    r, err = c.Execute(stmts, false, false)
    if err != nil {
        t.Fatalf("failed to execute FK test statement: %s", err.Error())
    }
    if exp, got := `[{"error":"UNIQUE constraint failed: foo.id"}]`, asJSON(r); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
}
func testBackup(t *testing.T, c *Conn) {
    _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
    if err != nil {
        t.Fatalf("failed to create table: %s", err.Error())
    }
    stmts := []string{
        `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
        `INSERT INTO foo(id, name) VALUES(2, "fiona")`,
        `INSERT INTO foo(id, name) VALUES(3, "fiona")`,
        `INSERT INTO foo(id, name) VALUES(4, "fiona")`,
    }
    _, err = c.Execute(stmts, true, false)
    if err != nil {
        t.Fatalf("failed to insert records: %s", err.Error())
    }
    dstDB, path := mustCreateDatabase()
    defer os.Remove(path)
    dstConn, err := dstDB.Connect()
    if err != nil {
        t.Fatalf("failed to connect to destination database: %s", err.Error())
    }
    err = c.Backup(dstConn)
    if err != nil {
        t.Fatalf("failed to backup database: %s", err.Error())
    }
    ro, err := dstConn.Query([]string{`SELECT * FROM foo`}, false, false)
    if err != nil {
        t.Fatalf("failed to query table: %s", err.Error())
    }
    if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"fiona"],[3,"fiona"],[4,"fiona"]]}]`, asJSON(ro); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
}
func testLoad(t *testing.T, c *Conn) {
    dstDB, path := mustCreateDatabase()
    defer os.Remove(path)
    dstConn, err := dstDB.Connect()
    if err != nil {
        t.Fatalf("failed to connect to destination database: %s", err.Error())
    }
    defer dstConn.Close()
    _, err = dstConn.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
    if err != nil {
        t.Fatalf("failed to create table: %s", err.Error())
    }
    stmts := []string{
        `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
        `INSERT INTO foo(id, name) VALUES(2, "fiona")`,
        `INSERT INTO foo(id, name) VALUES(3, "fiona")`,
        `INSERT INTO foo(id, name) VALUES(4, "fiona")`,
    }
    _, err = dstConn.Execute(stmts, true, false)
    if err != nil {
        t.Fatalf("failed to insert records: %s", err.Error())
    }
    ro, err := dstConn.Query([]string{`SELECT * FROM foo`}, false, false)
    if err != nil {
        t.Fatalf("failed to query table: %s", err.Error())
    }
    if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"fiona"],[3,"fiona"],[4,"fiona"]]}]`, asJSON(ro); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
    err = c.Load(dstConn)
    if err != nil {
        t.Fatalf("failed to load database: %s", err.Error())
    }
    ro, err = c.Query([]string{`SELECT * FROM foo`}, false, false)
    if err != nil {
        t.Fatalf("failed to query table: %s", err.Error())
    }
    if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"fiona"],[3,"fiona"],[4,"fiona"]]}]`, asJSON(ro); exp != got {
        t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
    }
}
// mustExecute executes a spath, and panics on failure. Used for statements
// that should never fail, even taking into account test setup.
func mustExecute(c *Conn, stmt string) {
    _, err := c.Execute([]string{stmt}, false, false)
    if err != nil {
        panic(fmt.Sprintf("failed to execute statement: %s", err.Error()))
    }
}
// mustQuery executes a statement, and panics on failure. Used for statements
// that should never fail, even taking into account test setup.
func mustQuery(c *Conn, stmt string) {
    _, err := c.Query([]string{stmt}, false, false)
    if err != nil {
        panic(fmt.Sprintf("failed to query: %s", err.Error()))
    }
}
func mustCreateDatabase() (*DB, string) {
    path := mustTempFilename()
    db, err := New(mustTempFilename(), "", false)
    if err != nil {
        panic("failed to create in-memory database")
    }
    return db, path
}
func mustCreateInMemoryDatabase() *DB {
    db, err := New(mustTempFilename(), "", true)
    if err != nil {
        panic("failed to create in-memory database")
    }
    return db
}
func mustTempFilename() string {
    fd, err := ioutil.TempFile("", "rqlilte-tmp-test-")
    if err != nil {
        panic(err.Error())
    }
    if err := fd.Close(); err != nil {
        panic(err.Error())
    }
    if err := os.Remove(fd.Name()); err != nil {
        panic(err.Error())
    }
    return fd.Name()
}
func asJSON(v interface{}) string {
    b, err := json.Marshal(v)
    if err != nil {
        panic("failed to JSON marshal value")
    }
    return string(b)
}