From 93a59f0f4208347d38a411f4c1092d1c80b4fde2 Mon Sep 17 00:00:00 2001
From: chenshijun <csj_sky@126.com>
Date: 星期五, 02 八月 2019 17:07:15 +0800
Subject: [PATCH] github项目rqlite的db.go 用于操作数据库 github项目servicecomb-service-center/syncer/serf,用于serf的接口调用

---
 agent.go      |  213 +++++++
 db.go         |  514 +++++++++++++++++
 config.go     |  110 +++
 agent_test.go |   74 ++
 db_test.go    |  880 +++++++++++++++++++++++++++++
 5 files changed, 1,791 insertions(+), 0 deletions(-)

diff --git a/agent.go b/agent.go
new file mode 100644
index 0000000..f1b2e28
--- /dev/null
+++ b/agent.go
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package serf
+
+import (
+	"context"
+	"errors"
+	"time"
+
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	"github.com/hashicorp/serf/cmd/serf/command/agent"
+	"github.com/hashicorp/serf/serf"
+)
+
+// Agent warps the serf agent
+type Agent struct {
+	*agent.Agent
+	conf    *Config
+	readyCh chan struct{}
+	errorCh chan error
+}
+
+// Create create serf agent with config
+func Create(conf *Config) (*Agent, error) {
+	// config cover to serf config
+	serfConf, err := conf.convertToSerf()
+	if err != nil {
+		return nil, err
+	}
+
+	// create serf agent with serf config
+	serfAgent, err := agent.Create(conf.Config, serfConf, nil)
+	if err != nil {
+		return nil, err
+	}
+	return &Agent{
+		Agent:   serfAgent,
+		conf:    conf,
+		readyCh: make(chan struct{}),
+		errorCh: make(chan error),
+	}, nil
+}
+
+// Start agent
+func (a *Agent) Start(ctx context.Context) {
+	err := a.Agent.Start()
+	if err != nil {
+		log.Errorf(err, "start serf agent failed")
+		a.errorCh <- err
+		return
+	}
+	a.RegisterEventHandler(a)
+
+	err = a.retryJoin(ctx)
+	if err != nil {
+		log.Errorf(err, "start serf agent failed")
+		if err != ctx.Err() && a.errorCh != nil {
+			a.errorCh <- err
+		}
+	}
+}
+
+// HandleEvent Handles serf.EventMemberJoin events,
+// which will wait for members to join until the number of group members is equal to "groupExpect"
+// when the startup mode is "ModeCluster",
+// used for logical grouping of serf nodes
+func (a *Agent) HandleEvent(event serf.Event) {
+	if event.EventType() != serf.EventMemberJoin {
+		return
+	}
+
+	if a.conf.Mode == ModeCluster {
+		if len(a.GroupMembers(a.conf.ClusterName)) < groupExpect {
+			return
+		}
+	}
+	a.DeregisterEventHandler(a)
+	close(a.readyCh)
+}
+
+// Ready Returns a channel that will be closed when serf is ready
+func (a *Agent) Ready() <-chan struct{} {
+	return a.readyCh
+}
+
+// Error Returns a channel that will be transmit a serf error
+func (a *Agent) Error() <-chan error {
+	return a.errorCh
+}
+
+// Stop serf agent
+func (a *Agent) Stop() {
+	if a.errorCh != nil {
+		a.Leave()
+		a.Shutdown()
+		close(a.errorCh)
+		a.errorCh = nil
+	}
+}
+
+// LocalMember returns the Member information for the local node
+func (a *Agent) LocalMember() *serf.Member {
+	serfAgent := a.Agent.Serf()
+	if serfAgent != nil {
+		member := serfAgent.LocalMember()
+		return &member
+	}
+	return nil
+}
+
+// GroupMembers returns a point-in-time snapshot of the members of by groupName
+func (a *Agent) GroupMembers(groupName string) (members []serf.Member) {
+	serfAgent := a.Agent.Serf()
+	if serfAgent != nil {
+		for _, member := range serfAgent.Members() {
+			log.Debugf("member = %s, groupName = %s", member.Name, member.Tags[tagKeyClusterName])
+			if member.Tags[tagKeyClusterName] == groupName {
+				members = append(members, member)
+			}
+		}
+	}
+	return
+}
+
+// Member get member information with node
+func (a *Agent) Member(node string) *serf.Member {
+	serfAgent := a.Agent.Serf()
+	if serfAgent != nil {
+		ms := serfAgent.Members()
+		for _, m := range ms {
+			if m.Name == node {
+				return &m
+			}
+		}
+	}
+	return nil
+}
+
+// SerfConfig get serf config
+func (a *Agent) SerfConfig() *serf.Config {
+	return a.Agent.SerfConfig()
+}
+
+// Join serf clusters through one or more members
+func (a *Agent) Join(addrs []string, replay bool) (n int, err error) {
+	return a.Agent.Join(addrs, replay)
+}
+
+// UserEvent sends a UserEvent on Serf
+func (a *Agent) UserEvent(name string, payload []byte, coalesce bool) error {
+	return a.Agent.UserEvent(name, payload, coalesce)
+}
+
+// Query sends a Query on Serf
+func (a *Agent) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) {
+	return a.Agent.Query(name, payload, params)
+}
+
+func (a *Agent) retryJoin(ctx context.Context) (err error) {
+	if len(a.conf.RetryJoin) == 0 {
+		log.Infof("retry join mumber %d", len(a.conf.RetryJoin))
+		return nil
+	}
+
+	// Count of attempts
+	attempt := 0
+	ticker := time.NewTicker(a.conf.RetryInterval)
+	for {
+		log.Infof("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin)
+		var n int
+
+		// Try to join the specified serf nodes
+		n, err = a.Join(a.conf.RetryJoin, a.conf.ReplayOnJoin)
+		if err == nil {
+			log.Infof("serf: Join completed. Synced with %d initial agents", n)
+			break
+		}
+		attempt++
+
+		// If RetryMaxAttempts is greater than 0, agent will exit
+		// and throw an error when the number of attempts exceeds RetryMaxAttempts,
+		// else agent will try to join other nodes until successful always
+		if a.conf.RetryMaxAttempts > 0 && attempt > a.conf.RetryMaxAttempts {
+			err = errors.New("serf: maximum retry join attempts made, exiting")
+			log.Errorf(err, err.Error())
+			break
+		}
+		select {
+		case <-ctx.Done():
+			err = ctx.Err()
+			goto done
+		// Waiting for ticker to trigger
+		case <-ticker.C:
+		}
+	}
+done:
+	ticker.Stop()
+	return
+}
diff --git a/agent_test.go b/agent_test.go
new file mode 100644
index 0000000..af68f8f
--- /dev/null
+++ b/agent_test.go
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package serf
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	"github.com/hashicorp/serf/serf"
+)
+
+func TestAgent(t *testing.T) {
+	conf := DefaultConfig()
+	agent, err := Create(conf)
+	if err != nil {
+		t.Errorf("create agent failed, error: %s", err)
+	}
+	agent.Start(context.Background())
+	<- agent.readyCh
+	go func() {
+		agent.ShutdownCh()
+	}()
+	time.Sleep(time.Second)
+
+	err = agent.UserEvent("test", []byte("test"), true)
+	if err != nil {
+		t.Errorf("send user event failed, error: %s", err)
+	}
+
+	_, err = agent.Query("test", []byte("test"), &serf.QueryParam{})
+	if err != nil {
+		t.Errorf("query for other node failed, error: %s", err)
+	}
+	agent.LocalMember()
+
+	agent.Member("testnode")
+
+	agent.SerfConfig()
+
+	_, err = agent.Join([]string{"127.0.0.1:9999"}, true)
+	if err != nil {
+		t.Logf("join to other node failed, error: %s", err)
+	}
+
+	err = agent.Leave()
+	if err != nil {
+		t.Errorf("angent leave failed, error: %s", err)
+	}
+
+	err = agent.ForceLeave("testnode")
+	if err != nil {
+		t.Errorf("angent force leave failed, error: %s", err)
+	}
+
+	err = agent.Shutdown()
+	if err != nil {
+		t.Errorf("angent shutdown failed, error: %s", err)
+	}
+}
diff --git a/config.go b/config.go
new file mode 100644
index 0000000..8a25908
--- /dev/null
+++ b/config.go
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package serf
+
+import (
+	"fmt"
+	"strconv"
+
+	"github.com/apache/servicecomb-service-center/syncer/pkg/utils"
+	"github.com/hashicorp/memberlist"
+	"github.com/hashicorp/serf/cmd/serf/command/agent"
+	"github.com/hashicorp/serf/serf"
+)
+
+const (
+	DefaultBindPort    = 30190
+	DefaultRPCPort     = 30191
+	DefaultClusterPort = 30192
+	ModeSingle         = "single"
+	ModeCluster        = "cluster"
+	retryMaxAttempts   = 3
+	groupExpect        = 3
+	tagKeyClusterName  = "syncer-cluster-name"
+	TagKeyClusterPort  = "syncer-cluster-port"
+	TagKeyRPCPort      = "syncer-rpc-port"
+)
+
+// DefaultConfig default config
+func DefaultConfig() *Config {
+	agentConf := agent.DefaultConfig()
+	agentConf.BindAddr = fmt.Sprintf("0.0.0.0:%d", DefaultBindPort)
+	agentConf.RPCAddr = fmt.Sprintf("0.0.0.0:%d", DefaultRPCPort)
+	return &Config{
+		Mode:        ModeSingle,
+		Config:      agentConf,
+		ClusterPort: DefaultClusterPort,
+	}
+}
+
+// Config struct
+type Config struct {
+	// config from serf agent
+	*agent.Config
+	Mode string `json:"mode"`
+
+	// name to group members into cluster
+	ClusterName string `json:"cluster_name"`
+
+	// port to communicate between cluster members
+	ClusterPort int `yaml:"cluster_port"`
+	RPCPort     int `yaml:"-"`
+}
+
+// readConfigFile reads configuration from config file
+func (c *Config) readConfigFile(filepath string) error {
+	if filepath != "" {
+		// todo:
+	}
+	return nil
+}
+
+// convertToSerf convert Config to serf.Config
+func (c *Config) convertToSerf() (*serf.Config, error) {
+	serfConf := serf.DefaultConfig()
+
+	bindIP, bindPort, err := utils.SplitHostPort(c.BindAddr, DefaultBindPort)
+	if err != nil {
+		return nil, fmt.Errorf("invalid bind address: %s", err)
+	}
+
+	switch c.Profile {
+	case "lan":
+		serfConf.MemberlistConfig = memberlist.DefaultLANConfig()
+	case "wan":
+		serfConf.MemberlistConfig = memberlist.DefaultWANConfig()
+	case "local":
+		serfConf.MemberlistConfig = memberlist.DefaultLocalConfig()
+	default:
+		serfConf.MemberlistConfig = memberlist.DefaultLANConfig()
+	}
+
+	serfConf.MemberlistConfig.BindAddr = bindIP
+	serfConf.MemberlistConfig.BindPort = bindPort
+	serfConf.NodeName = c.NodeName
+	serfConf.Tags = map[string]string{TagKeyRPCPort: strconv.Itoa(c.RPCPort)}
+
+	if c.ClusterName != "" {
+		serfConf.Tags[tagKeyClusterName] = c.ClusterName
+		serfConf.Tags[TagKeyClusterPort] = strconv.Itoa(c.ClusterPort)
+	}
+
+	if c.Mode == ModeCluster && c.RetryMaxAttempts <= 0 {
+		c.RetryMaxAttempts = retryMaxAttempts
+	}
+	return serfConf, nil
+}
diff --git a/db.go b/db.go
new file mode 100644
index 0000000..5585515
--- /dev/null
+++ b/db.go
@@ -0,0 +1,514 @@
+// Package db exposes a lightweight abstraction over the SQLite code.
+// It performs some basic mapping of lower-level types to rqlite types.
+package db
+
+import (
+	"database/sql/driver"
+	"expvar"
+	"fmt"
+	"io"
+	"net/url"
+	"strings"
+	"time"
+
+	"github.com/mattn/go-sqlite3"
+)
+
+const bkDelay = 250
+
+const (
+	fkChecks         = "PRAGMA foreign_keys"
+	fkChecksEnabled  = "PRAGMA foreign_keys=ON"
+	fkChecksDisabled = "PRAGMA foreign_keys=OFF"
+
+	numExecutions      = "executions"
+	numExecutionErrors = "execution_errors"
+	numQueries         = "queries"
+	numETx             = "execute_transactions"
+	numQTx             = "query_transactions"
+)
+
+// DBVersion is the SQLite version.
+var DBVersion string
+
+// stats captures stats for the DB layer.
+var stats *expvar.Map
+
+func init() {
+	DBVersion, _, _ = sqlite3.Version()
+	stats = expvar.NewMap("db")
+	stats.Add(numExecutions, 0)
+	stats.Add(numExecutionErrors, 0)
+	stats.Add(numQueries, 0)
+	stats.Add(numETx, 0)
+	stats.Add(numQTx, 0)
+}
+
+// Result represents the outcome of an operation that changes rows.
+type Result struct {
+	LastInsertID int64   `json:"last_insert_id,omitempty"`
+	RowsAffected int64   `json:"rows_affected,omitempty"`
+	Error        string  `json:"error,omitempty"`
+	Time         float64 `json:"time,omitempty"`
+}
+
+// Rows represents the outcome of an operation that returns query data.
+type Rows struct {
+	Columns []string        `json:"columns,omitempty"`
+	Types   []string        `json:"types,omitempty"`
+	Values  [][]interface{} `json:"values,omitempty"`
+	Error   string          `json:"error,omitempty"`
+	Time    float64         `json:"time,omitempty"`
+}
+
+// DB is the SQL database.
+type DB struct {
+	path     string // Path to database file.
+	dsnQuery string // DSN query params, if any.
+	memory   bool   // In-memory only.
+	fqdsn    string // Fully-qualified DSN for opening SQLite.
+}
+
+// New returns an instance of the database at path. If the database
+// has already been created and opened, this database will share
+// the data of that database when connected.
+func New(path, dsnQuery string, memory bool) (*DB, error) {
+	q, err := url.ParseQuery(dsnQuery)
+	if err != nil {
+		return nil, err
+	}
+	if memory {
+		q.Set("mode", "memory")
+		q.Set("cache", "shared")
+	}
+
+	if !strings.HasPrefix(path, "file:") {
+		path = fmt.Sprintf("file:%s", path)
+	}
+
+	var fqdsn string
+	if len(q) > 0 {
+		fqdsn = fmt.Sprintf("%s?%s", path, q.Encode())
+	} else {
+		fqdsn = path
+	}
+
+	return &DB{
+		path:     path,
+		dsnQuery: dsnQuery,
+		memory:   memory,
+		fqdsn:    fqdsn,
+	}, nil
+}
+
+// Connect returns a connection to the database.
+func (d *DB) Connect() (*Conn, error) {
+	drv := sqlite3.SQLiteDriver{}
+	c, err := drv.Open(d.fqdsn)
+	if err != nil {
+		return nil, err
+	}
+
+	return &Conn{
+		sqlite: c.(*sqlite3.SQLiteConn),
+	}, nil
+}
+
+// Conn represents a connection to a database. Two Connection objects
+// to the same database are READ_COMMITTED isolated.
+type Conn struct {
+	sqlite *sqlite3.SQLiteConn
+}
+
+// TransactionActive returns whether a transaction is currently active
+// i.e. if the database is NOT in autocommit mode.
+func (c *Conn) TransactionActive() bool {
+	return !c.sqlite.AutoCommit()
+}
+
+// AbortTransaction aborts -- rolls back -- any active transaction. Calling code
+// should know exactly what it is doing if it decides to call this function. It
+// can be used to clean up any dangling state that may result from certain
+// error scenarios.
+func (c *Conn) AbortTransaction() error {
+	_, err := c.Execute([]string{`ROLLBACK`}, false, false)
+	return err
+}
+
+// Execute executes queries that modify the database.
+func (c *Conn) Execute(queries []string, tx, xTime bool) ([]*Result, error) {
+	stats.Add(numExecutions, int64(len(queries)))
+	if tx {
+		stats.Add(numETx, 1)
+	}
+
+	type Execer interface {
+		Exec(query string, args []driver.Value) (driver.Result, error)
+	}
+
+	var allResults []*Result
+	err := func() error {
+		var execer Execer
+		var rollback bool
+		var t driver.Tx
+		var err error
+
+		// Check for the err, if set rollback.
+		defer func() {
+			if t != nil {
+				if rollback {
+					t.Rollback()
+					return
+				}
+				t.Commit()
+			}
+		}()
+
+		// handleError sets the error field on the given result. It returns
+		// whether the caller should continue processing or break.
+		handleError := func(result *Result, err error) bool {
+			stats.Add(numExecutionErrors, 1)
+
+			result.Error = err.Error()
+			allResults = append(allResults, result)
+			if tx {
+				rollback = true // Will trigger the rollback.
+				return false
+			}
+			return true
+		}
+
+		execer = c.sqlite
+
+		// Create the correct execution object, depending on whether a
+		// transaction was requested.
+		if tx {
+			t, err = c.sqlite.Begin()
+			if err != nil {
+				return err
+			}
+		}
+
+		// Execute each query.
+		for _, q := range queries {
+			if q == "" {
+				continue
+			}
+
+			result := &Result{}
+			start := time.Now()
+
+			r, err := execer.Exec(q, nil)
+			if err != nil {
+				if handleError(result, err) {
+					continue
+				}
+				break
+			}
+			if r == nil {
+				continue
+			}
+
+			lid, err := r.LastInsertId()
+			if err != nil {
+				if handleError(result, err) {
+					continue
+				}
+				break
+			}
+			result.LastInsertID = lid
+
+			ra, err := r.RowsAffected()
+			if err != nil {
+				if handleError(result, err) {
+					continue
+				}
+				break
+			}
+			result.RowsAffected = ra
+			if xTime {
+				result.Time = time.Now().Sub(start).Seconds()
+			}
+			allResults = append(allResults, result)
+		}
+
+		return nil
+	}()
+
+	return allResults, err
+}
+
+// Query executes queries that return rows, but don't modify the database.
+func (c *Conn) Query(queries []string, tx, xTime bool) ([]*Rows, error) {
+	stats.Add(numQueries, int64(len(queries)))
+	if tx {
+		stats.Add(numQTx, 1)
+	}
+
+	type Queryer interface {
+		Query(query string, args []driver.Value) (driver.Rows, error)
+	}
+
+	var allRows []*Rows
+	err := func() (err error) {
+		var queryer Queryer
+		var t driver.Tx
+		defer func() {
+			// XXX THIS DOESN'T ACTUALLY WORK! Might as WELL JUST COMMIT?
+			if t != nil {
+				if err != nil {
+					t.Rollback()
+					return
+				}
+				t.Commit()
+			}
+		}()
+
+		queryer = c.sqlite
+
+		// Create the correct query object, depending on whether a
+		// transaction was requested.
+		if tx {
+			t, err = c.sqlite.Begin()
+			if err != nil {
+				return err
+			}
+		}
+
+		for _, q := range queries {
+			if q == "" {
+				continue
+			}
+
+			rows := &Rows{}
+			start := time.Now()
+
+			rs, err := queryer.Query(q, nil)
+			if err != nil {
+				rows.Error = err.Error()
+				allRows = append(allRows, rows)
+				continue
+			}
+			defer rs.Close()
+			columns := rs.Columns()
+
+			rows.Columns = columns
+			rows.Types = rs.(*sqlite3.SQLiteRows).DeclTypes()
+			dest := make([]driver.Value, len(rows.Columns))
+			for {
+				err := rs.Next(dest)
+				if err != nil {
+					if err != io.EOF {
+						rows.Error = err.Error()
+					}
+					break
+				}
+
+				values := normalizeRowValues(dest, rows.Types)
+				rows.Values = append(rows.Values, values)
+			}
+			if xTime {
+				rows.Time = time.Now().Sub(start).Seconds()
+			}
+			allRows = append(allRows, rows)
+		}
+
+		return nil
+	}()
+
+	return allRows, err
+}
+
+// EnableFKConstraints allows control of foreign key constraint checks.
+func (c *Conn) EnableFKConstraints(e bool) error {
+	q := fkChecksEnabled
+	if !e {
+		q = fkChecksDisabled
+	}
+	_, err := c.sqlite.Exec(q, nil)
+	return err
+}
+
+// FKConstraints returns whether FK constraints are set or not.
+func (c *Conn) FKConstraints() (bool, error) {
+	r, err := c.sqlite.Query(fkChecks, nil)
+	if err != nil {
+		return false, err
+	}
+
+	dest := make([]driver.Value, len(r.Columns()))
+	types := r.(*sqlite3.SQLiteRows).DeclTypes()
+	if err := r.Next(dest); err != nil {
+		return false, err
+	}
+
+	values := normalizeRowValues(dest, types)
+	if values[0] == int64(1) {
+		return true, nil
+	}
+	return false, nil
+}
+
+// Load loads the connected database from the database connected to src.
+// It overwrites the data contained in this database. It is the caller's
+// responsibility to ensure that no other connections to this database
+// are accessed while this operation is in progress.
+func (c *Conn) Load(src *Conn) error {
+	return copyDatabase(c.sqlite, src.sqlite)
+}
+
+// Backup writes a snapshot of the database over the given database
+// connection, erasing all the contents of the destination database.
+// The consistency of the snapshot is READ_COMMITTED relative to any
+// other connections currently open to this database. The caller must
+// ensure that all connections to the destination database are not
+// accessed during this operation.
+func (c *Conn) Backup(dst *Conn) error {
+	return copyDatabase(dst.sqlite, c.sqlite)
+}
+
+// Dump writes a snapshot of the database in SQL text format. The consistency
+// of the snapshot is READ_COMMITTED relative to any other connections
+// currently open to this database.
+func (c *Conn) Dump(w io.Writer) error {
+	if _, err := w.Write([]byte("PRAGMA foreign_keys=OFF;\nBEGIN TRANSACTION;\n")); err != nil {
+		return err
+	}
+
+	// Get the schema.
+	query := `SELECT "name", "type", "sql" FROM "sqlite_master"
+              WHERE "sql" NOT NULL AND "type" == 'table' ORDER BY "name"`
+	rows, err := c.Query([]string{query}, false, false)
+	if err != nil {
+		return err
+	}
+	row := rows[0]
+	for _, v := range row.Values {
+		table := v[0].(string)
+		var stmt string
+
+		if table == "sqlite_sequence" {
+			stmt = `DELETE FROM "sqlite_sequence";`
+		} else if table == "sqlite_stat1" {
+			stmt = `ANALYZE "sqlite_master";`
+		} else if strings.HasPrefix(table, "sqlite_") {
+			continue
+		} else {
+			stmt = v[2].(string)
+		}
+
+		if _, err := w.Write([]byte(fmt.Sprintf("%s;\n", stmt))); err != nil {
+			return err
+		}
+
+		tableIndent := strings.Replace(table, `"`, `""`, -1)
+		query = fmt.Sprintf(`PRAGMA table_info("%s")`, tableIndent)
+		r, err := c.Query([]string{query}, false, false)
+		if err != nil {
+			return err
+		}
+		var columnNames []string
+		for _, w := range r[0].Values {
+			columnNames = append(columnNames, fmt.Sprintf(`'||quote("%s")||'`, w[1].(string)))
+		}
+
+		query = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' FROM "%s";`,
+			tableIndent,
+			strings.Join(columnNames, ","),
+			tableIndent)
+		r, err = c.Query([]string{query}, false, false)
+		if err != nil {
+			return err
+		}
+		for _, x := range r[0].Values {
+			y := fmt.Sprintf("%s;\n", x[0].(string))
+			if _, err := w.Write([]byte(y)); err != nil {
+				return err
+			}
+		}
+	}
+
+	// Do indexes, triggers, and views.
+	query = `SELECT "name", "type", "sql" FROM "sqlite_master"
+			  WHERE "sql" NOT NULL AND "type" IN ('index', 'trigger', 'view')`
+	rows, err = c.Query([]string{query}, false, false)
+	if err != nil {
+		return err
+	}
+	row = rows[0]
+	for _, v := range row.Values {
+		if _, err := w.Write([]byte(fmt.Sprintf("%s;\n", v[2]))); err != nil {
+			return err
+		}
+	}
+
+	if _, err := w.Write([]byte("COMMIT;\n")); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// Close closes the connection.
+func (c *Conn) Close() error {
+	if c != nil {
+		return c.sqlite.Close()
+	}
+	return nil
+}
+
+func copyDatabase(dst *sqlite3.SQLiteConn, src *sqlite3.SQLiteConn) error {
+	bk, err := dst.Backup("main", src, "main")
+	if err != nil {
+		return err
+	}
+
+	for {
+		done, err := bk.Step(-1)
+		if err != nil {
+			bk.Finish()
+			return err
+		}
+		if done {
+			break
+		}
+		time.Sleep(bkDelay * time.Millisecond)
+	}
+
+	return bk.Finish()
+}
+
+// normalizeRowValues performs some normalization of values in the returned rows.
+// Text values come over (from sqlite-go) as []byte instead of strings
+// for some reason, so we have explicitly convert (but only when type
+// is "text" so we don't affect BLOB types)
+func normalizeRowValues(row []driver.Value, types []string) []interface{} {
+	values := make([]interface{}, len(types))
+	for i, v := range row {
+		if isTextType(types[i]) {
+			switch val := v.(type) {
+			case []byte:
+				values[i] = string(val)
+			default:
+				values[i] = val
+			}
+		} else {
+			values[i] = v
+		}
+	}
+	return values
+}
+
+// isTextType returns whether the given type has a SQLite text affinity.
+// http://www.sqlite.org/datatype3.html
+func isTextType(t string) bool {
+	return t == "text" ||
+		t == "json" ||
+		t == "" ||
+		strings.HasPrefix(t, "varchar") ||
+		strings.HasPrefix(t, "varying character") ||
+		strings.HasPrefix(t, "nchar") ||
+		strings.HasPrefix(t, "native character") ||
+		strings.HasPrefix(t, "nvarchar") ||
+		strings.HasPrefix(t, "clob")
+}
diff --git a/db_test.go b/db_test.go
new file mode 100644
index 0000000..01ace9a
--- /dev/null
+++ b/db_test.go
@@ -0,0 +1,880 @@
+package db
+
+import (
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"os"
+	"strings"
+	"testing"
+
+	"github.com/rqlite/rqlite/testdata/chinook"
+)
+
+/*
+ * Lowest-layer database tests
+ */
+
+func TestNewDBOnDisk(t *testing.T) {
+	t.Parallel()
+
+	db, err := New(mustTempFilename(), "", false)
+	if err != nil {
+		t.Fatalf("failed to create new database: %s", err.Error())
+	}
+	if db == nil {
+		t.Fatal("database is nil")
+	}
+}
+
+func TestNewDBInMemory(t *testing.T) {
+	t.Parallel()
+
+	db, err := New(mustTempFilename(), "", true)
+	if err != nil {
+		t.Fatalf("failed to create new database: %s", err.Error())
+	}
+	if db == nil {
+		t.Fatal("database is nil")
+	}
+}
+
+func TestDBCreateOnDiskConnection(t *testing.T) {
+	t.Parallel()
+
+	db, path := mustCreateDatabase()
+	defer os.Remove(path)
+
+	conn, err := db.Connect()
+	if err != nil {
+		t.Fatalf("failed to create connection to on-disk database: %s", err.Error())
+	}
+	if conn == nil {
+		t.Fatal("connection to on-disk database is nil")
+	}
+	if err := conn.Close(); err != nil {
+		t.Fatalf("failed to close on-disk connection: %s", err.Error())
+	}
+}
+
+func TestDBCreateInMemoryConnection(t *testing.T) {
+	t.Parallel()
+
+	db := mustCreateInMemoryDatabase()
+
+	conn, err := db.Connect()
+	if err != nil {
+		t.Fatalf("failed to create connection to in-memory database: %s", err.Error())
+	}
+	if conn == nil {
+		t.Fatal("connection to in-memory database is nil")
+	}
+	if err := conn.Close(); err != nil {
+		t.Fatalf("failed to close in-memory connection: %s", err.Error())
+	}
+}
+
+func TestDumpOnDisk(t *testing.T) {
+	t.Parallel()
+
+	db, path := mustCreateDatabase()
+	defer os.Remove(path)
+	c, err := db.Connect()
+	if err != nil {
+		t.Fatalf("failed to create connection to on-disk database: %s", err.Error())
+	}
+
+	_, err = c.Execute([]string{chinook.DB}, false, false)
+	if err != nil {
+		t.Fatalf("failed to load chinook dump: %s", err.Error())
+	}
+
+	var b strings.Builder
+	if err := c.Dump(&b); err != nil {
+		t.Fatalf("failed to dump database: %s", err.Error())
+	}
+
+	if b.String() != chinook.DB {
+		t.Fatal("dumped database does not equal entered database")
+	}
+}
+
+func TestDumpInMemory(t *testing.T) {
+	t.Parallel()
+
+	db := mustCreateInMemoryDatabase()
+	c, err := db.Connect()
+	if err != nil {
+		t.Fatalf("failed to create connection to on-disk database: %s", err.Error())
+	}
+
+	_, err = c.Execute([]string{chinook.DB}, false, false)
+	if err != nil {
+		t.Fatalf("failed to load chinook dump: %s", err.Error())
+	}
+
+	var b strings.Builder
+	if err := c.Dump(&b); err != nil {
+		t.Fatalf("failed to dump database: %s", err.Error())
+	}
+
+	if b.String() != chinook.DB {
+		t.Fatal("dumped database does not equal entered database")
+	}
+}
+
+type testF func(test *testing.T, c *Conn)
+
+var dbTestfunctions []testF = []testF{
+	testTableCreation,
+	testSQLiteMasterTable,
+	testTextTypes,
+	testEmptyStatements,
+	testSimpleSingleStatements,
+	testSimpleJoinStatements,
+	testSimpleSingleConcatStatements,
+	testSimpleMultiStatements,
+	testSimpleSingleMultiLineStatements,
+	testSimpleFailingStatementsExecute,
+	testSimpleFailingStatementsQuery,
+	testSimplePragmaTableInfo,
+	testCommonTableExpressions,
+	testForeignKeyConstraints,
+	testUniqueConstraints,
+	testActiveTransaction,
+	testAbortTransaction,
+	testPartialFail,
+	testSimpleTransaction,
+	testPartialFailTransaction,
+	testBackup,
+	testLoad,
+}
+
+func TestDatabaseInMemory(t *testing.T) {
+	t.Parallel()
+
+	for _, f := range dbTestfunctions {
+		dbInMem := mustCreateInMemoryDatabase()
+		connInMem, err := dbInMem.Connect()
+		if err != nil {
+			t.Fatalf("failed to create connection to in-memory database: %s", err.Error())
+		}
+
+		f(t, connInMem)
+		connInMem.Close()
+	}
+}
+
+func TestDatabaseOnDisk(t *testing.T) {
+	t.Parallel()
+
+	for _, f := range dbTestfunctions {
+		dbDisk, path := mustCreateDatabase()
+		connDisk, err := dbDisk.Connect()
+		if err != nil {
+			t.Fatalf("failed to create connection to on-disk database: %s", err.Error())
+		}
+
+		f(t, connDisk)
+		connDisk.Close()
+		os.Remove(path)
+	}
+}
+
+func testTableCreation(t *testing.T, c *Conn) {
+	_, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
+	if err != nil {
+		t.Fatalf("failed to create table: %s", err.Error())
+	}
+
+	r, err := c.Query([]string{"SELECT * FROM foo"}, false, false)
+	if err != nil {
+		t.Fatalf("failed to query empty table: %s", err.Error())
+	}
+	if exp, got := `[{"columns":["id","name"],"types":["integer","text"]}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query, expected %s, got %s", exp, got)
+	}
+}
+
+func testSQLiteMasterTable(t *testing.T, c *Conn) {
+	_, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
+	if err != nil {
+		t.Fatalf("failed to create table: %s", err.Error())
+	}
+
+	r, err := c.Query([]string{"SELECT * FROM sqlite_master"}, false, false)
+	if err != nil {
+		t.Fatalf("failed to query master table: %s", err.Error())
+	}
+	if exp, got := `[{"columns":["type","name","tbl_name","rootpage","sql"],"types":["text","text","text","int","text"],"values":[["table","foo","foo",2,"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"]]}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query, expected %s, got %s", exp, got)
+	}
+}
+
+func testEmptyStatements(t *testing.T, c *Conn) {
+	_, err := c.Execute([]string{""}, false, false)
+	if err != nil {
+		t.Fatalf("failed to execute empty statement: %s", err.Error())
+	}
+	_, err = c.Execute([]string{";"}, false, false)
+	if err != nil {
+		t.Fatalf("failed to execute empty statement with semicolon: %s", err.Error())
+	}
+}
+
+func testTextTypes(t *testing.T, c *Conn) {
+	_, err := c.Execute([]string{"CREATE TABLE foo (c0 VARCHAR(36), c1 JSON, c2 NCHAR, c3 NVARCHAR, c4 CLOB)"}, false, false)
+	if err != nil {
+		t.Fatalf("failed to create table: %s", err.Error())
+	}
+	_, err = c.Execute([]string{`INSERT INTO foo(c0, c1, c2, c3, c4) VALUES("fiona", '{"mittens": "foobar"}', "bob", "dana", "declan")`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to insert record: %s", err.Error())
+	}
+
+	r, err := c.Query([]string{"SELECT * FROM foo"}, false, false)
+	if err != nil {
+		t.Fatalf("failed to query: %s", err.Error())
+	}
+	if exp, got := `[{"columns":["c0","c1","c2","c3","c4"],"types":["varchar(36)","json","nchar","nvarchar","clob"],"values":[["fiona","{\"mittens\": \"foobar\"}","bob","dana","declan"]]}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query, expected %s, got %s", exp, got)
+	}
+}
+
+func testSimpleSingleStatements(t *testing.T, c *Conn) {
+	_, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
+	if err != nil {
+		t.Fatalf("failed to create table: %s", err.Error())
+	}
+
+	_, err = c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to insert record: %s", err.Error())
+	}
+
+	_, err = c.Execute([]string{`INSERT INTO foo(name) VALUES("aoife")`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to insert record: %s", err.Error())
+	}
+
+	r, err := c.Query([]string{`SELECT * FROM foo`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to query table: %s", err.Error())
+	}
+	if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"aoife"]]}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+
+	r, err = c.Query([]string{`SELECT * FROM foo WHERE name="aoife"`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to query table: %s", err.Error())
+	}
+	if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[2,"aoife"]]}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+
+	r, err = c.Query([]string{`SELECT * FROM foo WHERE name="dana"`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to query table: %s", err.Error())
+	}
+	if exp, got := `[{"columns":["id","name"],"types":["integer","text"]}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+
+	r, err = c.Query([]string{`SELECT * FROM foo ORDER BY name`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to query table: %s", err.Error())
+	}
+	if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[2,"aoife"],[1,"fiona"]]}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+
+	r, err = c.Query([]string{`SELECT *,name FROM foo`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to query table: %s", err.Error())
+	}
+	if exp, got := `[{"columns":["id","name","name"],"types":["integer","text","text"],"values":[[1,"fiona","fiona"],[2,"aoife","aoife"]]}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+}
+
+func testSimpleJoinStatements(t *testing.T, c *Conn) {
+	_, err := c.Execute([]string{"CREATE TABLE names (id INTEGER NOT NULL PRIMARY KEY, name TEXT, ssn TEXT)"}, false, false)
+	if err != nil {
+		t.Fatalf("failed to create table: %s", err.Error())
+	}
+
+	_, err = c.Execute([]string{
+		`INSERT INTO "names" VALUES(1,'bob','123-45-678')`,
+		`INSERT INTO "names" VALUES(2,'tom','111-22-333')`,
+		`INSERT INTO "names" VALUES(3,'matt','222-22-333')`,
+	}, false, false)
+	if err != nil {
+		t.Fatalf("failed to insert record: %s", err.Error())
+	}
+
+	_, err = c.Execute([]string{"CREATE TABLE staff (id INTEGER NOT NULL PRIMARY KEY, employer TEXT, ssn TEXT)"}, false, false)
+	if err != nil {
+		t.Fatalf("failed to create table: %s", err.Error())
+	}
+
+	_, err = c.Execute([]string{`INSERT INTO "staff" VALUES(1,'acme','222-22-333')`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to insert record: %s", err.Error())
+	}
+
+	r, err := c.Query([]string{`SELECT names.id,name,names.ssn,employer FROM names INNER JOIN staff ON staff.ssn = names.ssn`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to query table using JOIN: %s", err.Error())
+	}
+	if exp, got := `[{"columns":["id","name","ssn","employer"],"types":["integer","text","text","text"],"values":[[3,"matt","222-22-333","acme"]]}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+}
+
+func testSimpleSingleConcatStatements(t *testing.T, c *Conn) {
+	_, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
+	if err != nil {
+		t.Fatalf("failed to create table: %s", err.Error())
+	}
+
+	_, err = c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to insert record: %s", err.Error())
+	}
+
+	r, err := c.Query([]string{`SELECT id || "_bar", name FROM foo`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to query table: %s", err.Error())
+	}
+	if exp, got := `[{"columns":["id || \"_bar\"","name"],"types":["","text"],"values":[["1_bar","fiona"]]}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+}
+
+func testSimpleMultiStatements(t *testing.T, c *Conn) {
+	_, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
+	if err != nil {
+		t.Fatalf("failed to create table: %s", err.Error())
+	}
+
+	re, err := c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`, `INSERT INTO foo(name) VALUES("dana")`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to insert record: %s", err.Error())
+	}
+	if exp, got := `[{"last_insert_id":1,"rows_affected":1},{"last_insert_id":2,"rows_affected":1}]`, asJSON(re); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+
+	ro, err := c.Query([]string{`SELECT * FROM foo`, `SELECT * FROM foo`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to query empty table: %s", err.Error())
+	}
+	if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"dana"]]},{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"dana"]]}]`, asJSON(ro); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+}
+
+func testSimpleSingleMultiLineStatements(t *testing.T, c *Conn) {
+	_, err := c.Execute([]string{`
+CREATE TABLE foo (
+    id INTEGER NOT NULL PRIMARY KEY,
+    name TEXT
+)`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to create table: %s", err.Error())
+	}
+
+	re, err := c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`, `INSERT INTO foo(name) VALUES("dana")`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to insert record: %s", err.Error())
+	}
+	if exp, got := `[{"last_insert_id":1,"rows_affected":1},{"last_insert_id":2,"rows_affected":1}]`, asJSON(re); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+}
+
+func testSimpleFailingStatementsExecute(t *testing.T, c *Conn) {
+	r, err := c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`}, false, false)
+	if err != nil {
+		t.Fatalf("error executing insertion into non-existent table: %s", err.Error())
+	}
+	if exp, got := `[{"error":"no such table: foo"}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+
+	r, err = c.Execute([]string{`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to create table: %s", err.Error())
+	}
+	if exp, got := `[{}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+	r, err = c.Execute([]string{`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to attempt creation of duplicate table: %s", err.Error())
+	}
+	if exp, got := `[{"error":"table foo already exists"}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+
+	r, err = c.Execute([]string{`INSERT INTO foo(id, name) VALUES(11, "fiona")`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to insert record: %s", err.Error())
+	}
+	if exp, got := `[{"last_insert_id":11,"rows_affected":1}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+	r, err = c.Execute([]string{`INSERT INTO foo(id, name) VALUES(11, "fiona")`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to attempt duplicate record insertion: %s", err.Error())
+	}
+	if exp, got := `[{"error":"UNIQUE constraint failed: foo.id"}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+
+	r, err = c.Execute([]string{`utter nonsense`}, false, false)
+	if err != nil {
+		if exp, got := `[{"error":"near \"utter\": syntax error"}]`, asJSON(r); exp != got {
+			t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+		}
+	}
+}
+
+func testSimpleFailingStatementsQuery(t *testing.T, c *Conn) {
+	ro, err := c.Query([]string{`SELECT * FROM bar`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to attempt query of non-existent table: %s", err.Error())
+	}
+	if exp, got := `[{"error":"no such table: bar"}]`, asJSON(ro); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+
+	ro, err = c.Query([]string{`SELECTxx * FROM foo`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to attempt nonsense query: %s", err.Error())
+	}
+	if exp, got := `[{"error":"near \"SELECTxx\": syntax error"}]`, asJSON(ro); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+	r, err := c.Query([]string{`utter nonsense`}, false, false)
+	if err != nil {
+		if exp, got := `[{"error":"near \"utter\": syntax error"}]`, asJSON(r); exp != got {
+			t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+		}
+	}
+}
+
+func testSimplePragmaTableInfo(t *testing.T, c *Conn) {
+	r, err := c.Execute([]string{`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to create table: %s", err.Error())
+	}
+	if exp, got := `[{}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+
+	res, err := c.Query([]string{`PRAGMA table_info("foo")`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to query a common table expression: %s", err.Error())
+	}
+	if exp, got := `[{"columns":["cid","name","type","notnull","dflt_value","pk"],"types":["","","","","",""],"values":[[0,"id","INTEGER",1,null,1],[1,"name","TEXT",0,null,0]]}]`, asJSON(res); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+
+}
+
+func testCommonTableExpressions(t *testing.T, c *Conn) {
+	_, err := c.Execute([]string{"CREATE TABLE test(x foo)"}, false, false)
+	if err != nil {
+		t.Fatalf("failed to create table: %s", err.Error())
+	}
+
+	_, err = c.Execute([]string{`INSERT INTO test VALUES(1)`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to insert record: %s", err.Error())
+	}
+
+	r, err := c.Query([]string{`WITH bar AS (SELECT * FROM test) SELECT * FROM test WHERE x = 1`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to query a common table expression: %s", err.Error())
+	}
+	if exp, got := `[{"columns":["x"],"types":["foo"],"values":[[1]]}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+
+	r, err = c.Query([]string{`WITH bar AS (SELECT * FROM test) SELECT * FROM test WHERE x = 2`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to query a common table expression: %s", err.Error())
+	}
+	if exp, got := `[{"columns":["x"],"types":["foo"]}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+}
+
+func testUniqueConstraints(t *testing.T, c *Conn) {
+	_, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT, CONSTRAINT name_unique UNIQUE (name))"}, false, false)
+	if err != nil {
+		t.Fatalf("failed to create table: %s", err.Error())
+	}
+
+	r, err := c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`}, false, false)
+	if err != nil {
+		t.Fatalf("error executing insertion into table: %s", err.Error())
+	}
+	if exp, got := `[{"last_insert_id":1,"rows_affected":1}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for INSERT\nexp: %s\ngot: %s", exp, got)
+	}
+
+	// UNIQUE constraint should fire.
+	r, err = c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`}, false, false)
+	if err != nil {
+		t.Fatalf("error executing insertion into table: %s", err.Error())
+	}
+	if exp, got := `[{"error":"UNIQUE constraint failed: foo.name"}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for INSERT\nexp: %s\ngot: %s", exp, got)
+	}
+}
+
+func testActiveTransaction(t *testing.T, c *Conn) {
+	if c.TransactionActive() {
+		t.Fatal("transaction incorrectly marked as active")
+	}
+
+	if _, err := c.Execute([]string{`BEGIN`}, false, false); err != nil {
+		t.Fatalf("error starting transaction: %s", err.Error())
+	}
+
+	if !c.TransactionActive() {
+		t.Fatal("transaction incorrectly marked as inactive")
+	}
+
+	if _, err := c.Execute([]string{`COMMIT`}, false, false); err != nil {
+		t.Fatalf("error starting transaction: %s", err.Error())
+	}
+
+	if c.TransactionActive() {
+		t.Fatal("transaction incorrectly marked as active")
+	}
+
+	if _, err := c.Execute([]string{`BEGIN`}, false, false); err != nil {
+		t.Fatalf("error starting transaction: %s", err.Error())
+	}
+
+	if !c.TransactionActive() {
+		t.Fatal("transaction incorrectly marked as inactive")
+	}
+
+	if _, err := c.Execute([]string{`ROLLBACK`}, false, false); err != nil {
+		t.Fatalf("error starting transaction: %s", err.Error())
+	}
+
+	if c.TransactionActive() {
+		t.Fatal("transaction incorrectly marked as active")
+	}
+}
+
+func testAbortTransaction(t *testing.T, c *Conn) {
+	if err := c.AbortTransaction(); err != nil {
+		t.Fatalf("error abrorting non-active transaction: %s", err.Error())
+	}
+
+	if _, err := c.Execute([]string{`BEGIN`}, false, false); err != nil {
+		t.Fatalf("error starting transaction: %s", err.Error())
+	}
+
+	if !c.TransactionActive() {
+		t.Fatal("transaction incorrectly marked as inactive")
+	}
+
+	if err := c.AbortTransaction(); err != nil {
+		t.Fatalf("error abrorting non-active transaction: %s", err.Error())
+	}
+
+	if c.TransactionActive() {
+		t.Fatal("transaction incorrectly marked as active")
+	}
+}
+
+func testPartialFail(t *testing.T, c *Conn) {
+	_, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
+	if err != nil {
+		t.Fatalf("failed to create table: %s", err.Error())
+	}
+
+	stmts := []string{
+		`INSERT INTO foo(id, name) VALUES(1, "fiona")`,
+		`INSERT INTO foo(id, name) VALUES(2, "fiona")`,
+		`INSERT INTO foo(id, name) VALUES(1, "fiona")`,
+		`INSERT INTO foo(id, name) VALUES(4, "fiona")`,
+	}
+	r, err := c.Execute(stmts, false, false)
+	if err != nil {
+		t.Fatalf("failed to insert records: %s", err.Error())
+	}
+	if exp, got := `[{"last_insert_id":1,"rows_affected":1},{"last_insert_id":2,"rows_affected":1},{"error":"UNIQUE constraint failed: foo.id"},{"last_insert_id":4,"rows_affected":1}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+	ro, err := c.Query([]string{`SELECT * FROM foo`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to query table: %s", err.Error())
+	}
+	if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"fiona"],[4,"fiona"]]}]`, asJSON(ro); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+}
+
+func testSimpleTransaction(t *testing.T, c *Conn) {
+	_, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
+	if err != nil {
+		t.Fatalf("failed to create table: %s", err.Error())
+	}
+
+	stmts := []string{
+		`INSERT INTO foo(id, name) VALUES(1, "fiona")`,
+		`INSERT INTO foo(id, name) VALUES(2, "fiona")`,
+		`INSERT INTO foo(id, name) VALUES(3, "fiona")`,
+		`INSERT INTO foo(id, name) VALUES(4, "fiona")`,
+	}
+	r, err := c.Execute(stmts, true, false)
+	if err != nil {
+		t.Fatalf("failed to insert records: %s", err.Error())
+	}
+	if exp, got := `[{"last_insert_id":1,"rows_affected":1},{"last_insert_id":2,"rows_affected":1},{"last_insert_id":3,"rows_affected":1},{"last_insert_id":4,"rows_affected":1}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+	ro, err := c.Query([]string{`SELECT * FROM foo`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to query table: %s", err.Error())
+	}
+	if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"fiona"],[3,"fiona"],[4,"fiona"]]}]`, asJSON(ro); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+}
+
+func testPartialFailTransaction(t *testing.T, c *Conn) {
+	_, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
+	if err != nil {
+		t.Fatalf("failed to create table: %s", err.Error())
+	}
+
+	stmts := []string{
+		`INSERT INTO foo(id, name) VALUES(1, "fiona")`,
+		`INSERT INTO foo(id, name) VALUES(2, "fiona")`,
+		`INSERT INTO foo(id, name) VALUES(1, "fiona")`,
+		`INSERT INTO foo(id, name) VALUES(4, "fiona")`,
+	}
+	r, err := c.Execute(stmts, true, false)
+	if err != nil {
+		t.Fatalf("failed to insert records: %s", err.Error())
+	}
+	if exp, got := `[{"last_insert_id":1,"rows_affected":1},{"last_insert_id":2,"rows_affected":1},{"error":"UNIQUE constraint failed: foo.id"}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+	ro, err := c.Query([]string{`SELECT * FROM foo`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to query table: %s", err.Error())
+	}
+	if exp, got := `[{"columns":["id","name"],"types":["integer","text"]}]`, asJSON(ro); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+}
+
+func testForeignKeyConstraints(t *testing.T, c *Conn) {
+	_, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, ref INTEGER REFERENCES foo(id))"}, false, false)
+	if err != nil {
+		t.Fatalf("failed to create table: %s", err.Error())
+	}
+
+	// Explicitly disable constraints.
+	if err := c.EnableFKConstraints(false); err != nil {
+		t.Fatalf("failed to enable foreign key constraints: %s", err.Error())
+	}
+
+	// Check constraints
+	fk, err := c.FKConstraints()
+	if err != nil {
+		t.Fatalf("failed to check FK constraints: %s", err.Error())
+	}
+	if fk != false {
+		t.Fatal("FK constraints are not disabled")
+	}
+
+	stmts := []string{
+		`INSERT INTO foo(id, ref) VALUES(1, 2)`,
+	}
+	r, err := c.Execute(stmts, false, false)
+	if err != nil {
+		t.Fatalf("failed to execute FK test statement: %s", err.Error())
+	}
+	if exp, got := `[{"last_insert_id":1,"rows_affected":1}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+
+	// Explicitly enable constraints.
+	if err := c.EnableFKConstraints(true); err != nil {
+		t.Fatalf("failed to enable foreign key constraints: %s", err.Error())
+	}
+
+	// Check constraints
+	fk, err = c.FKConstraints()
+	if err != nil {
+		t.Fatalf("failed to check FK constraints: %s", err.Error())
+	}
+	if fk != true {
+		t.Fatal("FK constraints are not enabled")
+	}
+
+	stmts = []string{
+		`INSERT INTO foo(id, ref) VALUES(1, 3)`,
+	}
+	r, err = c.Execute(stmts, false, false)
+	if err != nil {
+		t.Fatalf("failed to execute FK test statement: %s", err.Error())
+	}
+	if exp, got := `[{"error":"UNIQUE constraint failed: foo.id"}]`, asJSON(r); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+}
+
+func testBackup(t *testing.T, c *Conn) {
+	_, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
+	if err != nil {
+		t.Fatalf("failed to create table: %s", err.Error())
+	}
+
+	stmts := []string{
+		`INSERT INTO foo(id, name) VALUES(1, "fiona")`,
+		`INSERT INTO foo(id, name) VALUES(2, "fiona")`,
+		`INSERT INTO foo(id, name) VALUES(3, "fiona")`,
+		`INSERT INTO foo(id, name) VALUES(4, "fiona")`,
+	}
+	_, err = c.Execute(stmts, true, false)
+	if err != nil {
+		t.Fatalf("failed to insert records: %s", err.Error())
+	}
+
+	dstDB, path := mustCreateDatabase()
+	defer os.Remove(path)
+	dstConn, err := dstDB.Connect()
+	if err != nil {
+		t.Fatalf("failed to connect to destination database: %s", err.Error())
+	}
+
+	err = c.Backup(dstConn)
+	if err != nil {
+		t.Fatalf("failed to backup database: %s", err.Error())
+	}
+
+	ro, err := dstConn.Query([]string{`SELECT * FROM foo`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to query table: %s", err.Error())
+	}
+	if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"fiona"],[3,"fiona"],[4,"fiona"]]}]`, asJSON(ro); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+}
+
+func testLoad(t *testing.T, c *Conn) {
+	dstDB, path := mustCreateDatabase()
+	defer os.Remove(path)
+	dstConn, err := dstDB.Connect()
+	if err != nil {
+		t.Fatalf("failed to connect to destination database: %s", err.Error())
+	}
+	defer dstConn.Close()
+
+	_, err = dstConn.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
+	if err != nil {
+		t.Fatalf("failed to create table: %s", err.Error())
+	}
+
+	stmts := []string{
+		`INSERT INTO foo(id, name) VALUES(1, "fiona")`,
+		`INSERT INTO foo(id, name) VALUES(2, "fiona")`,
+		`INSERT INTO foo(id, name) VALUES(3, "fiona")`,
+		`INSERT INTO foo(id, name) VALUES(4, "fiona")`,
+	}
+	_, err = dstConn.Execute(stmts, true, false)
+	if err != nil {
+		t.Fatalf("failed to insert records: %s", err.Error())
+	}
+
+	ro, err := dstConn.Query([]string{`SELECT * FROM foo`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to query table: %s", err.Error())
+	}
+	if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"fiona"],[3,"fiona"],[4,"fiona"]]}]`, asJSON(ro); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+
+	err = c.Load(dstConn)
+	if err != nil {
+		t.Fatalf("failed to load database: %s", err.Error())
+	}
+
+	ro, err = c.Query([]string{`SELECT * FROM foo`}, false, false)
+	if err != nil {
+		t.Fatalf("failed to query table: %s", err.Error())
+	}
+	if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"fiona"],[3,"fiona"],[4,"fiona"]]}]`, asJSON(ro); exp != got {
+		t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+	}
+}
+
+// mustExecute executes a spath, and panics on failure. Used for statements
+// that should never fail, even taking into account test setup.
+func mustExecute(c *Conn, stmt string) {
+	_, err := c.Execute([]string{stmt}, false, false)
+	if err != nil {
+		panic(fmt.Sprintf("failed to execute statement: %s", err.Error()))
+	}
+}
+
+// mustQuery executes a statement, and panics on failure. Used for statements
+// that should never fail, even taking into account test setup.
+func mustQuery(c *Conn, stmt string) {
+	_, err := c.Query([]string{stmt}, false, false)
+	if err != nil {
+		panic(fmt.Sprintf("failed to query: %s", err.Error()))
+	}
+}
+
+func mustCreateDatabase() (*DB, string) {
+	path := mustTempFilename()
+	db, err := New(mustTempFilename(), "", false)
+	if err != nil {
+		panic("failed to create in-memory database")
+	}
+	return db, path
+}
+
+func mustCreateInMemoryDatabase() *DB {
+	db, err := New(mustTempFilename(), "", true)
+	if err != nil {
+		panic("failed to create in-memory database")
+	}
+	return db
+}
+
+func mustTempFilename() string {
+	fd, err := ioutil.TempFile("", "rqlilte-tmp-test-")
+	if err != nil {
+		panic(err.Error())
+	}
+	if err := fd.Close(); err != nil {
+		panic(err.Error())
+	}
+	if err := os.Remove(fd.Name()); err != nil {
+		panic(err.Error())
+	}
+	return fd.Name()
+}
+
+func asJSON(v interface{}) string {
+	b, err := json.Marshal(v)
+	if err != nil {
+		panic("failed to JSON marshal value")
+	}
+	return string(b)
+}

--
Gitblit v1.8.0