From 93a59f0f4208347d38a411f4c1092d1c80b4fde2 Mon Sep 17 00:00:00 2001
From: chenshijun <csj_sky@126.com>
Date: 星期五, 02 八月 2019 17:07:15 +0800
Subject: [PATCH] github项目rqlite的db.go 用于操作数据库 github项目servicecomb-service-center/syncer/serf,用于serf的接口调用
---
agent.go | 213 +++++++
db.go | 514 +++++++++++++++++
config.go | 110 +++
agent_test.go | 74 ++
db_test.go | 880 +++++++++++++++++++++++++++++
5 files changed, 1,791 insertions(+), 0 deletions(-)
diff --git a/agent.go b/agent.go
new file mode 100644
index 0000000..f1b2e28
--- /dev/null
+++ b/agent.go
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package serf
+
+import (
+ "context"
+ "errors"
+ "time"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/hashicorp/serf/cmd/serf/command/agent"
+ "github.com/hashicorp/serf/serf"
+)
+
+// Agent warps the serf agent
+type Agent struct {
+ *agent.Agent
+ conf *Config
+ readyCh chan struct{}
+ errorCh chan error
+}
+
+// Create create serf agent with config
+func Create(conf *Config) (*Agent, error) {
+ // config cover to serf config
+ serfConf, err := conf.convertToSerf()
+ if err != nil {
+ return nil, err
+ }
+
+ // create serf agent with serf config
+ serfAgent, err := agent.Create(conf.Config, serfConf, nil)
+ if err != nil {
+ return nil, err
+ }
+ return &Agent{
+ Agent: serfAgent,
+ conf: conf,
+ readyCh: make(chan struct{}),
+ errorCh: make(chan error),
+ }, nil
+}
+
+// Start agent
+func (a *Agent) Start(ctx context.Context) {
+ err := a.Agent.Start()
+ if err != nil {
+ log.Errorf(err, "start serf agent failed")
+ a.errorCh <- err
+ return
+ }
+ a.RegisterEventHandler(a)
+
+ err = a.retryJoin(ctx)
+ if err != nil {
+ log.Errorf(err, "start serf agent failed")
+ if err != ctx.Err() && a.errorCh != nil {
+ a.errorCh <- err
+ }
+ }
+}
+
+// HandleEvent Handles serf.EventMemberJoin events,
+// which will wait for members to join until the number of group members is equal to "groupExpect"
+// when the startup mode is "ModeCluster",
+// used for logical grouping of serf nodes
+func (a *Agent) HandleEvent(event serf.Event) {
+ if event.EventType() != serf.EventMemberJoin {
+ return
+ }
+
+ if a.conf.Mode == ModeCluster {
+ if len(a.GroupMembers(a.conf.ClusterName)) < groupExpect {
+ return
+ }
+ }
+ a.DeregisterEventHandler(a)
+ close(a.readyCh)
+}
+
+// Ready Returns a channel that will be closed when serf is ready
+func (a *Agent) Ready() <-chan struct{} {
+ return a.readyCh
+}
+
+// Error Returns a channel that will be transmit a serf error
+func (a *Agent) Error() <-chan error {
+ return a.errorCh
+}
+
+// Stop serf agent
+func (a *Agent) Stop() {
+ if a.errorCh != nil {
+ a.Leave()
+ a.Shutdown()
+ close(a.errorCh)
+ a.errorCh = nil
+ }
+}
+
+// LocalMember returns the Member information for the local node
+func (a *Agent) LocalMember() *serf.Member {
+ serfAgent := a.Agent.Serf()
+ if serfAgent != nil {
+ member := serfAgent.LocalMember()
+ return &member
+ }
+ return nil
+}
+
+// GroupMembers returns a point-in-time snapshot of the members of by groupName
+func (a *Agent) GroupMembers(groupName string) (members []serf.Member) {
+ serfAgent := a.Agent.Serf()
+ if serfAgent != nil {
+ for _, member := range serfAgent.Members() {
+ log.Debugf("member = %s, groupName = %s", member.Name, member.Tags[tagKeyClusterName])
+ if member.Tags[tagKeyClusterName] == groupName {
+ members = append(members, member)
+ }
+ }
+ }
+ return
+}
+
+// Member get member information with node
+func (a *Agent) Member(node string) *serf.Member {
+ serfAgent := a.Agent.Serf()
+ if serfAgent != nil {
+ ms := serfAgent.Members()
+ for _, m := range ms {
+ if m.Name == node {
+ return &m
+ }
+ }
+ }
+ return nil
+}
+
+// SerfConfig get serf config
+func (a *Agent) SerfConfig() *serf.Config {
+ return a.Agent.SerfConfig()
+}
+
+// Join serf clusters through one or more members
+func (a *Agent) Join(addrs []string, replay bool) (n int, err error) {
+ return a.Agent.Join(addrs, replay)
+}
+
+// UserEvent sends a UserEvent on Serf
+func (a *Agent) UserEvent(name string, payload []byte, coalesce bool) error {
+ return a.Agent.UserEvent(name, payload, coalesce)
+}
+
+// Query sends a Query on Serf
+func (a *Agent) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) {
+ return a.Agent.Query(name, payload, params)
+}
+
+func (a *Agent) retryJoin(ctx context.Context) (err error) {
+ if len(a.conf.RetryJoin) == 0 {
+ log.Infof("retry join mumber %d", len(a.conf.RetryJoin))
+ return nil
+ }
+
+ // Count of attempts
+ attempt := 0
+ ticker := time.NewTicker(a.conf.RetryInterval)
+ for {
+ log.Infof("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin)
+ var n int
+
+ // Try to join the specified serf nodes
+ n, err = a.Join(a.conf.RetryJoin, a.conf.ReplayOnJoin)
+ if err == nil {
+ log.Infof("serf: Join completed. Synced with %d initial agents", n)
+ break
+ }
+ attempt++
+
+ // If RetryMaxAttempts is greater than 0, agent will exit
+ // and throw an error when the number of attempts exceeds RetryMaxAttempts,
+ // else agent will try to join other nodes until successful always
+ if a.conf.RetryMaxAttempts > 0 && attempt > a.conf.RetryMaxAttempts {
+ err = errors.New("serf: maximum retry join attempts made, exiting")
+ log.Errorf(err, err.Error())
+ break
+ }
+ select {
+ case <-ctx.Done():
+ err = ctx.Err()
+ goto done
+ // Waiting for ticker to trigger
+ case <-ticker.C:
+ }
+ }
+done:
+ ticker.Stop()
+ return
+}
diff --git a/agent_test.go b/agent_test.go
new file mode 100644
index 0000000..af68f8f
--- /dev/null
+++ b/agent_test.go
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package serf
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/hashicorp/serf/serf"
+)
+
+func TestAgent(t *testing.T) {
+ conf := DefaultConfig()
+ agent, err := Create(conf)
+ if err != nil {
+ t.Errorf("create agent failed, error: %s", err)
+ }
+ agent.Start(context.Background())
+ <- agent.readyCh
+ go func() {
+ agent.ShutdownCh()
+ }()
+ time.Sleep(time.Second)
+
+ err = agent.UserEvent("test", []byte("test"), true)
+ if err != nil {
+ t.Errorf("send user event failed, error: %s", err)
+ }
+
+ _, err = agent.Query("test", []byte("test"), &serf.QueryParam{})
+ if err != nil {
+ t.Errorf("query for other node failed, error: %s", err)
+ }
+ agent.LocalMember()
+
+ agent.Member("testnode")
+
+ agent.SerfConfig()
+
+ _, err = agent.Join([]string{"127.0.0.1:9999"}, true)
+ if err != nil {
+ t.Logf("join to other node failed, error: %s", err)
+ }
+
+ err = agent.Leave()
+ if err != nil {
+ t.Errorf("angent leave failed, error: %s", err)
+ }
+
+ err = agent.ForceLeave("testnode")
+ if err != nil {
+ t.Errorf("angent force leave failed, error: %s", err)
+ }
+
+ err = agent.Shutdown()
+ if err != nil {
+ t.Errorf("angent shutdown failed, error: %s", err)
+ }
+}
diff --git a/config.go b/config.go
new file mode 100644
index 0000000..8a25908
--- /dev/null
+++ b/config.go
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package serf
+
+import (
+ "fmt"
+ "strconv"
+
+ "github.com/apache/servicecomb-service-center/syncer/pkg/utils"
+ "github.com/hashicorp/memberlist"
+ "github.com/hashicorp/serf/cmd/serf/command/agent"
+ "github.com/hashicorp/serf/serf"
+)
+
+const (
+ DefaultBindPort = 30190
+ DefaultRPCPort = 30191
+ DefaultClusterPort = 30192
+ ModeSingle = "single"
+ ModeCluster = "cluster"
+ retryMaxAttempts = 3
+ groupExpect = 3
+ tagKeyClusterName = "syncer-cluster-name"
+ TagKeyClusterPort = "syncer-cluster-port"
+ TagKeyRPCPort = "syncer-rpc-port"
+)
+
+// DefaultConfig default config
+func DefaultConfig() *Config {
+ agentConf := agent.DefaultConfig()
+ agentConf.BindAddr = fmt.Sprintf("0.0.0.0:%d", DefaultBindPort)
+ agentConf.RPCAddr = fmt.Sprintf("0.0.0.0:%d", DefaultRPCPort)
+ return &Config{
+ Mode: ModeSingle,
+ Config: agentConf,
+ ClusterPort: DefaultClusterPort,
+ }
+}
+
+// Config struct
+type Config struct {
+ // config from serf agent
+ *agent.Config
+ Mode string `json:"mode"`
+
+ // name to group members into cluster
+ ClusterName string `json:"cluster_name"`
+
+ // port to communicate between cluster members
+ ClusterPort int `yaml:"cluster_port"`
+ RPCPort int `yaml:"-"`
+}
+
+// readConfigFile reads configuration from config file
+func (c *Config) readConfigFile(filepath string) error {
+ if filepath != "" {
+ // todo:
+ }
+ return nil
+}
+
+// convertToSerf convert Config to serf.Config
+func (c *Config) convertToSerf() (*serf.Config, error) {
+ serfConf := serf.DefaultConfig()
+
+ bindIP, bindPort, err := utils.SplitHostPort(c.BindAddr, DefaultBindPort)
+ if err != nil {
+ return nil, fmt.Errorf("invalid bind address: %s", err)
+ }
+
+ switch c.Profile {
+ case "lan":
+ serfConf.MemberlistConfig = memberlist.DefaultLANConfig()
+ case "wan":
+ serfConf.MemberlistConfig = memberlist.DefaultWANConfig()
+ case "local":
+ serfConf.MemberlistConfig = memberlist.DefaultLocalConfig()
+ default:
+ serfConf.MemberlistConfig = memberlist.DefaultLANConfig()
+ }
+
+ serfConf.MemberlistConfig.BindAddr = bindIP
+ serfConf.MemberlistConfig.BindPort = bindPort
+ serfConf.NodeName = c.NodeName
+ serfConf.Tags = map[string]string{TagKeyRPCPort: strconv.Itoa(c.RPCPort)}
+
+ if c.ClusterName != "" {
+ serfConf.Tags[tagKeyClusterName] = c.ClusterName
+ serfConf.Tags[TagKeyClusterPort] = strconv.Itoa(c.ClusterPort)
+ }
+
+ if c.Mode == ModeCluster && c.RetryMaxAttempts <= 0 {
+ c.RetryMaxAttempts = retryMaxAttempts
+ }
+ return serfConf, nil
+}
diff --git a/db.go b/db.go
new file mode 100644
index 0000000..5585515
--- /dev/null
+++ b/db.go
@@ -0,0 +1,514 @@
+// Package db exposes a lightweight abstraction over the SQLite code.
+// It performs some basic mapping of lower-level types to rqlite types.
+package db
+
+import (
+ "database/sql/driver"
+ "expvar"
+ "fmt"
+ "io"
+ "net/url"
+ "strings"
+ "time"
+
+ "github.com/mattn/go-sqlite3"
+)
+
+const bkDelay = 250
+
+const (
+ fkChecks = "PRAGMA foreign_keys"
+ fkChecksEnabled = "PRAGMA foreign_keys=ON"
+ fkChecksDisabled = "PRAGMA foreign_keys=OFF"
+
+ numExecutions = "executions"
+ numExecutionErrors = "execution_errors"
+ numQueries = "queries"
+ numETx = "execute_transactions"
+ numQTx = "query_transactions"
+)
+
+// DBVersion is the SQLite version.
+var DBVersion string
+
+// stats captures stats for the DB layer.
+var stats *expvar.Map
+
+func init() {
+ DBVersion, _, _ = sqlite3.Version()
+ stats = expvar.NewMap("db")
+ stats.Add(numExecutions, 0)
+ stats.Add(numExecutionErrors, 0)
+ stats.Add(numQueries, 0)
+ stats.Add(numETx, 0)
+ stats.Add(numQTx, 0)
+}
+
+// Result represents the outcome of an operation that changes rows.
+type Result struct {
+ LastInsertID int64 `json:"last_insert_id,omitempty"`
+ RowsAffected int64 `json:"rows_affected,omitempty"`
+ Error string `json:"error,omitempty"`
+ Time float64 `json:"time,omitempty"`
+}
+
+// Rows represents the outcome of an operation that returns query data.
+type Rows struct {
+ Columns []string `json:"columns,omitempty"`
+ Types []string `json:"types,omitempty"`
+ Values [][]interface{} `json:"values,omitempty"`
+ Error string `json:"error,omitempty"`
+ Time float64 `json:"time,omitempty"`
+}
+
+// DB is the SQL database.
+type DB struct {
+ path string // Path to database file.
+ dsnQuery string // DSN query params, if any.
+ memory bool // In-memory only.
+ fqdsn string // Fully-qualified DSN for opening SQLite.
+}
+
+// New returns an instance of the database at path. If the database
+// has already been created and opened, this database will share
+// the data of that database when connected.
+func New(path, dsnQuery string, memory bool) (*DB, error) {
+ q, err := url.ParseQuery(dsnQuery)
+ if err != nil {
+ return nil, err
+ }
+ if memory {
+ q.Set("mode", "memory")
+ q.Set("cache", "shared")
+ }
+
+ if !strings.HasPrefix(path, "file:") {
+ path = fmt.Sprintf("file:%s", path)
+ }
+
+ var fqdsn string
+ if len(q) > 0 {
+ fqdsn = fmt.Sprintf("%s?%s", path, q.Encode())
+ } else {
+ fqdsn = path
+ }
+
+ return &DB{
+ path: path,
+ dsnQuery: dsnQuery,
+ memory: memory,
+ fqdsn: fqdsn,
+ }, nil
+}
+
+// Connect returns a connection to the database.
+func (d *DB) Connect() (*Conn, error) {
+ drv := sqlite3.SQLiteDriver{}
+ c, err := drv.Open(d.fqdsn)
+ if err != nil {
+ return nil, err
+ }
+
+ return &Conn{
+ sqlite: c.(*sqlite3.SQLiteConn),
+ }, nil
+}
+
+// Conn represents a connection to a database. Two Connection objects
+// to the same database are READ_COMMITTED isolated.
+type Conn struct {
+ sqlite *sqlite3.SQLiteConn
+}
+
+// TransactionActive returns whether a transaction is currently active
+// i.e. if the database is NOT in autocommit mode.
+func (c *Conn) TransactionActive() bool {
+ return !c.sqlite.AutoCommit()
+}
+
+// AbortTransaction aborts -- rolls back -- any active transaction. Calling code
+// should know exactly what it is doing if it decides to call this function. It
+// can be used to clean up any dangling state that may result from certain
+// error scenarios.
+func (c *Conn) AbortTransaction() error {
+ _, err := c.Execute([]string{`ROLLBACK`}, false, false)
+ return err
+}
+
+// Execute executes queries that modify the database.
+func (c *Conn) Execute(queries []string, tx, xTime bool) ([]*Result, error) {
+ stats.Add(numExecutions, int64(len(queries)))
+ if tx {
+ stats.Add(numETx, 1)
+ }
+
+ type Execer interface {
+ Exec(query string, args []driver.Value) (driver.Result, error)
+ }
+
+ var allResults []*Result
+ err := func() error {
+ var execer Execer
+ var rollback bool
+ var t driver.Tx
+ var err error
+
+ // Check for the err, if set rollback.
+ defer func() {
+ if t != nil {
+ if rollback {
+ t.Rollback()
+ return
+ }
+ t.Commit()
+ }
+ }()
+
+ // handleError sets the error field on the given result. It returns
+ // whether the caller should continue processing or break.
+ handleError := func(result *Result, err error) bool {
+ stats.Add(numExecutionErrors, 1)
+
+ result.Error = err.Error()
+ allResults = append(allResults, result)
+ if tx {
+ rollback = true // Will trigger the rollback.
+ return false
+ }
+ return true
+ }
+
+ execer = c.sqlite
+
+ // Create the correct execution object, depending on whether a
+ // transaction was requested.
+ if tx {
+ t, err = c.sqlite.Begin()
+ if err != nil {
+ return err
+ }
+ }
+
+ // Execute each query.
+ for _, q := range queries {
+ if q == "" {
+ continue
+ }
+
+ result := &Result{}
+ start := time.Now()
+
+ r, err := execer.Exec(q, nil)
+ if err != nil {
+ if handleError(result, err) {
+ continue
+ }
+ break
+ }
+ if r == nil {
+ continue
+ }
+
+ lid, err := r.LastInsertId()
+ if err != nil {
+ if handleError(result, err) {
+ continue
+ }
+ break
+ }
+ result.LastInsertID = lid
+
+ ra, err := r.RowsAffected()
+ if err != nil {
+ if handleError(result, err) {
+ continue
+ }
+ break
+ }
+ result.RowsAffected = ra
+ if xTime {
+ result.Time = time.Now().Sub(start).Seconds()
+ }
+ allResults = append(allResults, result)
+ }
+
+ return nil
+ }()
+
+ return allResults, err
+}
+
+// Query executes queries that return rows, but don't modify the database.
+func (c *Conn) Query(queries []string, tx, xTime bool) ([]*Rows, error) {
+ stats.Add(numQueries, int64(len(queries)))
+ if tx {
+ stats.Add(numQTx, 1)
+ }
+
+ type Queryer interface {
+ Query(query string, args []driver.Value) (driver.Rows, error)
+ }
+
+ var allRows []*Rows
+ err := func() (err error) {
+ var queryer Queryer
+ var t driver.Tx
+ defer func() {
+ // XXX THIS DOESN'T ACTUALLY WORK! Might as WELL JUST COMMIT?
+ if t != nil {
+ if err != nil {
+ t.Rollback()
+ return
+ }
+ t.Commit()
+ }
+ }()
+
+ queryer = c.sqlite
+
+ // Create the correct query object, depending on whether a
+ // transaction was requested.
+ if tx {
+ t, err = c.sqlite.Begin()
+ if err != nil {
+ return err
+ }
+ }
+
+ for _, q := range queries {
+ if q == "" {
+ continue
+ }
+
+ rows := &Rows{}
+ start := time.Now()
+
+ rs, err := queryer.Query(q, nil)
+ if err != nil {
+ rows.Error = err.Error()
+ allRows = append(allRows, rows)
+ continue
+ }
+ defer rs.Close()
+ columns := rs.Columns()
+
+ rows.Columns = columns
+ rows.Types = rs.(*sqlite3.SQLiteRows).DeclTypes()
+ dest := make([]driver.Value, len(rows.Columns))
+ for {
+ err := rs.Next(dest)
+ if err != nil {
+ if err != io.EOF {
+ rows.Error = err.Error()
+ }
+ break
+ }
+
+ values := normalizeRowValues(dest, rows.Types)
+ rows.Values = append(rows.Values, values)
+ }
+ if xTime {
+ rows.Time = time.Now().Sub(start).Seconds()
+ }
+ allRows = append(allRows, rows)
+ }
+
+ return nil
+ }()
+
+ return allRows, err
+}
+
+// EnableFKConstraints allows control of foreign key constraint checks.
+func (c *Conn) EnableFKConstraints(e bool) error {
+ q := fkChecksEnabled
+ if !e {
+ q = fkChecksDisabled
+ }
+ _, err := c.sqlite.Exec(q, nil)
+ return err
+}
+
+// FKConstraints returns whether FK constraints are set or not.
+func (c *Conn) FKConstraints() (bool, error) {
+ r, err := c.sqlite.Query(fkChecks, nil)
+ if err != nil {
+ return false, err
+ }
+
+ dest := make([]driver.Value, len(r.Columns()))
+ types := r.(*sqlite3.SQLiteRows).DeclTypes()
+ if err := r.Next(dest); err != nil {
+ return false, err
+ }
+
+ values := normalizeRowValues(dest, types)
+ if values[0] == int64(1) {
+ return true, nil
+ }
+ return false, nil
+}
+
+// Load loads the connected database from the database connected to src.
+// It overwrites the data contained in this database. It is the caller's
+// responsibility to ensure that no other connections to this database
+// are accessed while this operation is in progress.
+func (c *Conn) Load(src *Conn) error {
+ return copyDatabase(c.sqlite, src.sqlite)
+}
+
+// Backup writes a snapshot of the database over the given database
+// connection, erasing all the contents of the destination database.
+// The consistency of the snapshot is READ_COMMITTED relative to any
+// other connections currently open to this database. The caller must
+// ensure that all connections to the destination database are not
+// accessed during this operation.
+func (c *Conn) Backup(dst *Conn) error {
+ return copyDatabase(dst.sqlite, c.sqlite)
+}
+
+// Dump writes a snapshot of the database in SQL text format. The consistency
+// of the snapshot is READ_COMMITTED relative to any other connections
+// currently open to this database.
+func (c *Conn) Dump(w io.Writer) error {
+ if _, err := w.Write([]byte("PRAGMA foreign_keys=OFF;\nBEGIN TRANSACTION;\n")); err != nil {
+ return err
+ }
+
+ // Get the schema.
+ query := `SELECT "name", "type", "sql" FROM "sqlite_master"
+ WHERE "sql" NOT NULL AND "type" == 'table' ORDER BY "name"`
+ rows, err := c.Query([]string{query}, false, false)
+ if err != nil {
+ return err
+ }
+ row := rows[0]
+ for _, v := range row.Values {
+ table := v[0].(string)
+ var stmt string
+
+ if table == "sqlite_sequence" {
+ stmt = `DELETE FROM "sqlite_sequence";`
+ } else if table == "sqlite_stat1" {
+ stmt = `ANALYZE "sqlite_master";`
+ } else if strings.HasPrefix(table, "sqlite_") {
+ continue
+ } else {
+ stmt = v[2].(string)
+ }
+
+ if _, err := w.Write([]byte(fmt.Sprintf("%s;\n", stmt))); err != nil {
+ return err
+ }
+
+ tableIndent := strings.Replace(table, `"`, `""`, -1)
+ query = fmt.Sprintf(`PRAGMA table_info("%s")`, tableIndent)
+ r, err := c.Query([]string{query}, false, false)
+ if err != nil {
+ return err
+ }
+ var columnNames []string
+ for _, w := range r[0].Values {
+ columnNames = append(columnNames, fmt.Sprintf(`'||quote("%s")||'`, w[1].(string)))
+ }
+
+ query = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' FROM "%s";`,
+ tableIndent,
+ strings.Join(columnNames, ","),
+ tableIndent)
+ r, err = c.Query([]string{query}, false, false)
+ if err != nil {
+ return err
+ }
+ for _, x := range r[0].Values {
+ y := fmt.Sprintf("%s;\n", x[0].(string))
+ if _, err := w.Write([]byte(y)); err != nil {
+ return err
+ }
+ }
+ }
+
+ // Do indexes, triggers, and views.
+ query = `SELECT "name", "type", "sql" FROM "sqlite_master"
+ WHERE "sql" NOT NULL AND "type" IN ('index', 'trigger', 'view')`
+ rows, err = c.Query([]string{query}, false, false)
+ if err != nil {
+ return err
+ }
+ row = rows[0]
+ for _, v := range row.Values {
+ if _, err := w.Write([]byte(fmt.Sprintf("%s;\n", v[2]))); err != nil {
+ return err
+ }
+ }
+
+ if _, err := w.Write([]byte("COMMIT;\n")); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// Close closes the connection.
+func (c *Conn) Close() error {
+ if c != nil {
+ return c.sqlite.Close()
+ }
+ return nil
+}
+
+func copyDatabase(dst *sqlite3.SQLiteConn, src *sqlite3.SQLiteConn) error {
+ bk, err := dst.Backup("main", src, "main")
+ if err != nil {
+ return err
+ }
+
+ for {
+ done, err := bk.Step(-1)
+ if err != nil {
+ bk.Finish()
+ return err
+ }
+ if done {
+ break
+ }
+ time.Sleep(bkDelay * time.Millisecond)
+ }
+
+ return bk.Finish()
+}
+
+// normalizeRowValues performs some normalization of values in the returned rows.
+// Text values come over (from sqlite-go) as []byte instead of strings
+// for some reason, so we have explicitly convert (but only when type
+// is "text" so we don't affect BLOB types)
+func normalizeRowValues(row []driver.Value, types []string) []interface{} {
+ values := make([]interface{}, len(types))
+ for i, v := range row {
+ if isTextType(types[i]) {
+ switch val := v.(type) {
+ case []byte:
+ values[i] = string(val)
+ default:
+ values[i] = val
+ }
+ } else {
+ values[i] = v
+ }
+ }
+ return values
+}
+
+// isTextType returns whether the given type has a SQLite text affinity.
+// http://www.sqlite.org/datatype3.html
+func isTextType(t string) bool {
+ return t == "text" ||
+ t == "json" ||
+ t == "" ||
+ strings.HasPrefix(t, "varchar") ||
+ strings.HasPrefix(t, "varying character") ||
+ strings.HasPrefix(t, "nchar") ||
+ strings.HasPrefix(t, "native character") ||
+ strings.HasPrefix(t, "nvarchar") ||
+ strings.HasPrefix(t, "clob")
+}
diff --git a/db_test.go b/db_test.go
new file mode 100644
index 0000000..01ace9a
--- /dev/null
+++ b/db_test.go
@@ -0,0 +1,880 @@
+package db
+
+import (
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "strings"
+ "testing"
+
+ "github.com/rqlite/rqlite/testdata/chinook"
+)
+
+/*
+ * Lowest-layer database tests
+ */
+
+func TestNewDBOnDisk(t *testing.T) {
+ t.Parallel()
+
+ db, err := New(mustTempFilename(), "", false)
+ if err != nil {
+ t.Fatalf("failed to create new database: %s", err.Error())
+ }
+ if db == nil {
+ t.Fatal("database is nil")
+ }
+}
+
+func TestNewDBInMemory(t *testing.T) {
+ t.Parallel()
+
+ db, err := New(mustTempFilename(), "", true)
+ if err != nil {
+ t.Fatalf("failed to create new database: %s", err.Error())
+ }
+ if db == nil {
+ t.Fatal("database is nil")
+ }
+}
+
+func TestDBCreateOnDiskConnection(t *testing.T) {
+ t.Parallel()
+
+ db, path := mustCreateDatabase()
+ defer os.Remove(path)
+
+ conn, err := db.Connect()
+ if err != nil {
+ t.Fatalf("failed to create connection to on-disk database: %s", err.Error())
+ }
+ if conn == nil {
+ t.Fatal("connection to on-disk database is nil")
+ }
+ if err := conn.Close(); err != nil {
+ t.Fatalf("failed to close on-disk connection: %s", err.Error())
+ }
+}
+
+func TestDBCreateInMemoryConnection(t *testing.T) {
+ t.Parallel()
+
+ db := mustCreateInMemoryDatabase()
+
+ conn, err := db.Connect()
+ if err != nil {
+ t.Fatalf("failed to create connection to in-memory database: %s", err.Error())
+ }
+ if conn == nil {
+ t.Fatal("connection to in-memory database is nil")
+ }
+ if err := conn.Close(); err != nil {
+ t.Fatalf("failed to close in-memory connection: %s", err.Error())
+ }
+}
+
+func TestDumpOnDisk(t *testing.T) {
+ t.Parallel()
+
+ db, path := mustCreateDatabase()
+ defer os.Remove(path)
+ c, err := db.Connect()
+ if err != nil {
+ t.Fatalf("failed to create connection to on-disk database: %s", err.Error())
+ }
+
+ _, err = c.Execute([]string{chinook.DB}, false, false)
+ if err != nil {
+ t.Fatalf("failed to load chinook dump: %s", err.Error())
+ }
+
+ var b strings.Builder
+ if err := c.Dump(&b); err != nil {
+ t.Fatalf("failed to dump database: %s", err.Error())
+ }
+
+ if b.String() != chinook.DB {
+ t.Fatal("dumped database does not equal entered database")
+ }
+}
+
+func TestDumpInMemory(t *testing.T) {
+ t.Parallel()
+
+ db := mustCreateInMemoryDatabase()
+ c, err := db.Connect()
+ if err != nil {
+ t.Fatalf("failed to create connection to on-disk database: %s", err.Error())
+ }
+
+ _, err = c.Execute([]string{chinook.DB}, false, false)
+ if err != nil {
+ t.Fatalf("failed to load chinook dump: %s", err.Error())
+ }
+
+ var b strings.Builder
+ if err := c.Dump(&b); err != nil {
+ t.Fatalf("failed to dump database: %s", err.Error())
+ }
+
+ if b.String() != chinook.DB {
+ t.Fatal("dumped database does not equal entered database")
+ }
+}
+
+type testF func(test *testing.T, c *Conn)
+
+var dbTestfunctions []testF = []testF{
+ testTableCreation,
+ testSQLiteMasterTable,
+ testTextTypes,
+ testEmptyStatements,
+ testSimpleSingleStatements,
+ testSimpleJoinStatements,
+ testSimpleSingleConcatStatements,
+ testSimpleMultiStatements,
+ testSimpleSingleMultiLineStatements,
+ testSimpleFailingStatementsExecute,
+ testSimpleFailingStatementsQuery,
+ testSimplePragmaTableInfo,
+ testCommonTableExpressions,
+ testForeignKeyConstraints,
+ testUniqueConstraints,
+ testActiveTransaction,
+ testAbortTransaction,
+ testPartialFail,
+ testSimpleTransaction,
+ testPartialFailTransaction,
+ testBackup,
+ testLoad,
+}
+
+func TestDatabaseInMemory(t *testing.T) {
+ t.Parallel()
+
+ for _, f := range dbTestfunctions {
+ dbInMem := mustCreateInMemoryDatabase()
+ connInMem, err := dbInMem.Connect()
+ if err != nil {
+ t.Fatalf("failed to create connection to in-memory database: %s", err.Error())
+ }
+
+ f(t, connInMem)
+ connInMem.Close()
+ }
+}
+
+func TestDatabaseOnDisk(t *testing.T) {
+ t.Parallel()
+
+ for _, f := range dbTestfunctions {
+ dbDisk, path := mustCreateDatabase()
+ connDisk, err := dbDisk.Connect()
+ if err != nil {
+ t.Fatalf("failed to create connection to on-disk database: %s", err.Error())
+ }
+
+ f(t, connDisk)
+ connDisk.Close()
+ os.Remove(path)
+ }
+}
+
+func testTableCreation(t *testing.T, c *Conn) {
+ _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
+ if err != nil {
+ t.Fatalf("failed to create table: %s", err.Error())
+ }
+
+ r, err := c.Query([]string{"SELECT * FROM foo"}, false, false)
+ if err != nil {
+ t.Fatalf("failed to query empty table: %s", err.Error())
+ }
+ if exp, got := `[{"columns":["id","name"],"types":["integer","text"]}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query, expected %s, got %s", exp, got)
+ }
+}
+
+func testSQLiteMasterTable(t *testing.T, c *Conn) {
+ _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
+ if err != nil {
+ t.Fatalf("failed to create table: %s", err.Error())
+ }
+
+ r, err := c.Query([]string{"SELECT * FROM sqlite_master"}, false, false)
+ if err != nil {
+ t.Fatalf("failed to query master table: %s", err.Error())
+ }
+ if exp, got := `[{"columns":["type","name","tbl_name","rootpage","sql"],"types":["text","text","text","int","text"],"values":[["table","foo","foo",2,"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"]]}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query, expected %s, got %s", exp, got)
+ }
+}
+
+func testEmptyStatements(t *testing.T, c *Conn) {
+ _, err := c.Execute([]string{""}, false, false)
+ if err != nil {
+ t.Fatalf("failed to execute empty statement: %s", err.Error())
+ }
+ _, err = c.Execute([]string{";"}, false, false)
+ if err != nil {
+ t.Fatalf("failed to execute empty statement with semicolon: %s", err.Error())
+ }
+}
+
+func testTextTypes(t *testing.T, c *Conn) {
+ _, err := c.Execute([]string{"CREATE TABLE foo (c0 VARCHAR(36), c1 JSON, c2 NCHAR, c3 NVARCHAR, c4 CLOB)"}, false, false)
+ if err != nil {
+ t.Fatalf("failed to create table: %s", err.Error())
+ }
+ _, err = c.Execute([]string{`INSERT INTO foo(c0, c1, c2, c3, c4) VALUES("fiona", '{"mittens": "foobar"}', "bob", "dana", "declan")`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to insert record: %s", err.Error())
+ }
+
+ r, err := c.Query([]string{"SELECT * FROM foo"}, false, false)
+ if err != nil {
+ t.Fatalf("failed to query: %s", err.Error())
+ }
+ if exp, got := `[{"columns":["c0","c1","c2","c3","c4"],"types":["varchar(36)","json","nchar","nvarchar","clob"],"values":[["fiona","{\"mittens\": \"foobar\"}","bob","dana","declan"]]}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query, expected %s, got %s", exp, got)
+ }
+}
+
+func testSimpleSingleStatements(t *testing.T, c *Conn) {
+ _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
+ if err != nil {
+ t.Fatalf("failed to create table: %s", err.Error())
+ }
+
+ _, err = c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to insert record: %s", err.Error())
+ }
+
+ _, err = c.Execute([]string{`INSERT INTO foo(name) VALUES("aoife")`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to insert record: %s", err.Error())
+ }
+
+ r, err := c.Query([]string{`SELECT * FROM foo`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to query table: %s", err.Error())
+ }
+ if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"aoife"]]}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+
+ r, err = c.Query([]string{`SELECT * FROM foo WHERE name="aoife"`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to query table: %s", err.Error())
+ }
+ if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[2,"aoife"]]}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+
+ r, err = c.Query([]string{`SELECT * FROM foo WHERE name="dana"`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to query table: %s", err.Error())
+ }
+ if exp, got := `[{"columns":["id","name"],"types":["integer","text"]}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+
+ r, err = c.Query([]string{`SELECT * FROM foo ORDER BY name`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to query table: %s", err.Error())
+ }
+ if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[2,"aoife"],[1,"fiona"]]}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+
+ r, err = c.Query([]string{`SELECT *,name FROM foo`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to query table: %s", err.Error())
+ }
+ if exp, got := `[{"columns":["id","name","name"],"types":["integer","text","text"],"values":[[1,"fiona","fiona"],[2,"aoife","aoife"]]}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+}
+
+func testSimpleJoinStatements(t *testing.T, c *Conn) {
+ _, err := c.Execute([]string{"CREATE TABLE names (id INTEGER NOT NULL PRIMARY KEY, name TEXT, ssn TEXT)"}, false, false)
+ if err != nil {
+ t.Fatalf("failed to create table: %s", err.Error())
+ }
+
+ _, err = c.Execute([]string{
+ `INSERT INTO "names" VALUES(1,'bob','123-45-678')`,
+ `INSERT INTO "names" VALUES(2,'tom','111-22-333')`,
+ `INSERT INTO "names" VALUES(3,'matt','222-22-333')`,
+ }, false, false)
+ if err != nil {
+ t.Fatalf("failed to insert record: %s", err.Error())
+ }
+
+ _, err = c.Execute([]string{"CREATE TABLE staff (id INTEGER NOT NULL PRIMARY KEY, employer TEXT, ssn TEXT)"}, false, false)
+ if err != nil {
+ t.Fatalf("failed to create table: %s", err.Error())
+ }
+
+ _, err = c.Execute([]string{`INSERT INTO "staff" VALUES(1,'acme','222-22-333')`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to insert record: %s", err.Error())
+ }
+
+ r, err := c.Query([]string{`SELECT names.id,name,names.ssn,employer FROM names INNER JOIN staff ON staff.ssn = names.ssn`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to query table using JOIN: %s", err.Error())
+ }
+ if exp, got := `[{"columns":["id","name","ssn","employer"],"types":["integer","text","text","text"],"values":[[3,"matt","222-22-333","acme"]]}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+}
+
+func testSimpleSingleConcatStatements(t *testing.T, c *Conn) {
+ _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
+ if err != nil {
+ t.Fatalf("failed to create table: %s", err.Error())
+ }
+
+ _, err = c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to insert record: %s", err.Error())
+ }
+
+ r, err := c.Query([]string{`SELECT id || "_bar", name FROM foo`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to query table: %s", err.Error())
+ }
+ if exp, got := `[{"columns":["id || \"_bar\"","name"],"types":["","text"],"values":[["1_bar","fiona"]]}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+}
+
+func testSimpleMultiStatements(t *testing.T, c *Conn) {
+ _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
+ if err != nil {
+ t.Fatalf("failed to create table: %s", err.Error())
+ }
+
+ re, err := c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`, `INSERT INTO foo(name) VALUES("dana")`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to insert record: %s", err.Error())
+ }
+ if exp, got := `[{"last_insert_id":1,"rows_affected":1},{"last_insert_id":2,"rows_affected":1}]`, asJSON(re); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+
+ ro, err := c.Query([]string{`SELECT * FROM foo`, `SELECT * FROM foo`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to query empty table: %s", err.Error())
+ }
+ if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"dana"]]},{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"dana"]]}]`, asJSON(ro); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+}
+
+func testSimpleSingleMultiLineStatements(t *testing.T, c *Conn) {
+ _, err := c.Execute([]string{`
+CREATE TABLE foo (
+ id INTEGER NOT NULL PRIMARY KEY,
+ name TEXT
+)`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to create table: %s", err.Error())
+ }
+
+ re, err := c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`, `INSERT INTO foo(name) VALUES("dana")`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to insert record: %s", err.Error())
+ }
+ if exp, got := `[{"last_insert_id":1,"rows_affected":1},{"last_insert_id":2,"rows_affected":1}]`, asJSON(re); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+}
+
+func testSimpleFailingStatementsExecute(t *testing.T, c *Conn) {
+ r, err := c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`}, false, false)
+ if err != nil {
+ t.Fatalf("error executing insertion into non-existent table: %s", err.Error())
+ }
+ if exp, got := `[{"error":"no such table: foo"}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+
+ r, err = c.Execute([]string{`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to create table: %s", err.Error())
+ }
+ if exp, got := `[{}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+ r, err = c.Execute([]string{`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to attempt creation of duplicate table: %s", err.Error())
+ }
+ if exp, got := `[{"error":"table foo already exists"}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+
+ r, err = c.Execute([]string{`INSERT INTO foo(id, name) VALUES(11, "fiona")`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to insert record: %s", err.Error())
+ }
+ if exp, got := `[{"last_insert_id":11,"rows_affected":1}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+ r, err = c.Execute([]string{`INSERT INTO foo(id, name) VALUES(11, "fiona")`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to attempt duplicate record insertion: %s", err.Error())
+ }
+ if exp, got := `[{"error":"UNIQUE constraint failed: foo.id"}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+
+ r, err = c.Execute([]string{`utter nonsense`}, false, false)
+ if err != nil {
+ if exp, got := `[{"error":"near \"utter\": syntax error"}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+ }
+}
+
+func testSimpleFailingStatementsQuery(t *testing.T, c *Conn) {
+ ro, err := c.Query([]string{`SELECT * FROM bar`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to attempt query of non-existent table: %s", err.Error())
+ }
+ if exp, got := `[{"error":"no such table: bar"}]`, asJSON(ro); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+
+ ro, err = c.Query([]string{`SELECTxx * FROM foo`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to attempt nonsense query: %s", err.Error())
+ }
+ if exp, got := `[{"error":"near \"SELECTxx\": syntax error"}]`, asJSON(ro); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+ r, err := c.Query([]string{`utter nonsense`}, false, false)
+ if err != nil {
+ if exp, got := `[{"error":"near \"utter\": syntax error"}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+ }
+}
+
+func testSimplePragmaTableInfo(t *testing.T, c *Conn) {
+ r, err := c.Execute([]string{`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to create table: %s", err.Error())
+ }
+ if exp, got := `[{}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+
+ res, err := c.Query([]string{`PRAGMA table_info("foo")`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to query a common table expression: %s", err.Error())
+ }
+ if exp, got := `[{"columns":["cid","name","type","notnull","dflt_value","pk"],"types":["","","","","",""],"values":[[0,"id","INTEGER",1,null,1],[1,"name","TEXT",0,null,0]]}]`, asJSON(res); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+
+}
+
+func testCommonTableExpressions(t *testing.T, c *Conn) {
+ _, err := c.Execute([]string{"CREATE TABLE test(x foo)"}, false, false)
+ if err != nil {
+ t.Fatalf("failed to create table: %s", err.Error())
+ }
+
+ _, err = c.Execute([]string{`INSERT INTO test VALUES(1)`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to insert record: %s", err.Error())
+ }
+
+ r, err := c.Query([]string{`WITH bar AS (SELECT * FROM test) SELECT * FROM test WHERE x = 1`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to query a common table expression: %s", err.Error())
+ }
+ if exp, got := `[{"columns":["x"],"types":["foo"],"values":[[1]]}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+
+ r, err = c.Query([]string{`WITH bar AS (SELECT * FROM test) SELECT * FROM test WHERE x = 2`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to query a common table expression: %s", err.Error())
+ }
+ if exp, got := `[{"columns":["x"],"types":["foo"]}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+}
+
+func testUniqueConstraints(t *testing.T, c *Conn) {
+ _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT, CONSTRAINT name_unique UNIQUE (name))"}, false, false)
+ if err != nil {
+ t.Fatalf("failed to create table: %s", err.Error())
+ }
+
+ r, err := c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`}, false, false)
+ if err != nil {
+ t.Fatalf("error executing insertion into table: %s", err.Error())
+ }
+ if exp, got := `[{"last_insert_id":1,"rows_affected":1}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for INSERT\nexp: %s\ngot: %s", exp, got)
+ }
+
+ // UNIQUE constraint should fire.
+ r, err = c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`}, false, false)
+ if err != nil {
+ t.Fatalf("error executing insertion into table: %s", err.Error())
+ }
+ if exp, got := `[{"error":"UNIQUE constraint failed: foo.name"}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for INSERT\nexp: %s\ngot: %s", exp, got)
+ }
+}
+
+func testActiveTransaction(t *testing.T, c *Conn) {
+ if c.TransactionActive() {
+ t.Fatal("transaction incorrectly marked as active")
+ }
+
+ if _, err := c.Execute([]string{`BEGIN`}, false, false); err != nil {
+ t.Fatalf("error starting transaction: %s", err.Error())
+ }
+
+ if !c.TransactionActive() {
+ t.Fatal("transaction incorrectly marked as inactive")
+ }
+
+ if _, err := c.Execute([]string{`COMMIT`}, false, false); err != nil {
+ t.Fatalf("error starting transaction: %s", err.Error())
+ }
+
+ if c.TransactionActive() {
+ t.Fatal("transaction incorrectly marked as active")
+ }
+
+ if _, err := c.Execute([]string{`BEGIN`}, false, false); err != nil {
+ t.Fatalf("error starting transaction: %s", err.Error())
+ }
+
+ if !c.TransactionActive() {
+ t.Fatal("transaction incorrectly marked as inactive")
+ }
+
+ if _, err := c.Execute([]string{`ROLLBACK`}, false, false); err != nil {
+ t.Fatalf("error starting transaction: %s", err.Error())
+ }
+
+ if c.TransactionActive() {
+ t.Fatal("transaction incorrectly marked as active")
+ }
+}
+
+func testAbortTransaction(t *testing.T, c *Conn) {
+ if err := c.AbortTransaction(); err != nil {
+ t.Fatalf("error abrorting non-active transaction: %s", err.Error())
+ }
+
+ if _, err := c.Execute([]string{`BEGIN`}, false, false); err != nil {
+ t.Fatalf("error starting transaction: %s", err.Error())
+ }
+
+ if !c.TransactionActive() {
+ t.Fatal("transaction incorrectly marked as inactive")
+ }
+
+ if err := c.AbortTransaction(); err != nil {
+ t.Fatalf("error abrorting non-active transaction: %s", err.Error())
+ }
+
+ if c.TransactionActive() {
+ t.Fatal("transaction incorrectly marked as active")
+ }
+}
+
+func testPartialFail(t *testing.T, c *Conn) {
+ _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
+ if err != nil {
+ t.Fatalf("failed to create table: %s", err.Error())
+ }
+
+ stmts := []string{
+ `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
+ `INSERT INTO foo(id, name) VALUES(2, "fiona")`,
+ `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
+ `INSERT INTO foo(id, name) VALUES(4, "fiona")`,
+ }
+ r, err := c.Execute(stmts, false, false)
+ if err != nil {
+ t.Fatalf("failed to insert records: %s", err.Error())
+ }
+ if exp, got := `[{"last_insert_id":1,"rows_affected":1},{"last_insert_id":2,"rows_affected":1},{"error":"UNIQUE constraint failed: foo.id"},{"last_insert_id":4,"rows_affected":1}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+ ro, err := c.Query([]string{`SELECT * FROM foo`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to query table: %s", err.Error())
+ }
+ if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"fiona"],[4,"fiona"]]}]`, asJSON(ro); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+}
+
+func testSimpleTransaction(t *testing.T, c *Conn) {
+ _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
+ if err != nil {
+ t.Fatalf("failed to create table: %s", err.Error())
+ }
+
+ stmts := []string{
+ `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
+ `INSERT INTO foo(id, name) VALUES(2, "fiona")`,
+ `INSERT INTO foo(id, name) VALUES(3, "fiona")`,
+ `INSERT INTO foo(id, name) VALUES(4, "fiona")`,
+ }
+ r, err := c.Execute(stmts, true, false)
+ if err != nil {
+ t.Fatalf("failed to insert records: %s", err.Error())
+ }
+ if exp, got := `[{"last_insert_id":1,"rows_affected":1},{"last_insert_id":2,"rows_affected":1},{"last_insert_id":3,"rows_affected":1},{"last_insert_id":4,"rows_affected":1}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+ ro, err := c.Query([]string{`SELECT * FROM foo`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to query table: %s", err.Error())
+ }
+ if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"fiona"],[3,"fiona"],[4,"fiona"]]}]`, asJSON(ro); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+}
+
+func testPartialFailTransaction(t *testing.T, c *Conn) {
+ _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
+ if err != nil {
+ t.Fatalf("failed to create table: %s", err.Error())
+ }
+
+ stmts := []string{
+ `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
+ `INSERT INTO foo(id, name) VALUES(2, "fiona")`,
+ `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
+ `INSERT INTO foo(id, name) VALUES(4, "fiona")`,
+ }
+ r, err := c.Execute(stmts, true, false)
+ if err != nil {
+ t.Fatalf("failed to insert records: %s", err.Error())
+ }
+ if exp, got := `[{"last_insert_id":1,"rows_affected":1},{"last_insert_id":2,"rows_affected":1},{"error":"UNIQUE constraint failed: foo.id"}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+ ro, err := c.Query([]string{`SELECT * FROM foo`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to query table: %s", err.Error())
+ }
+ if exp, got := `[{"columns":["id","name"],"types":["integer","text"]}]`, asJSON(ro); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+}
+
+func testForeignKeyConstraints(t *testing.T, c *Conn) {
+ _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, ref INTEGER REFERENCES foo(id))"}, false, false)
+ if err != nil {
+ t.Fatalf("failed to create table: %s", err.Error())
+ }
+
+ // Explicitly disable constraints.
+ if err := c.EnableFKConstraints(false); err != nil {
+ t.Fatalf("failed to enable foreign key constraints: %s", err.Error())
+ }
+
+ // Check constraints
+ fk, err := c.FKConstraints()
+ if err != nil {
+ t.Fatalf("failed to check FK constraints: %s", err.Error())
+ }
+ if fk != false {
+ t.Fatal("FK constraints are not disabled")
+ }
+
+ stmts := []string{
+ `INSERT INTO foo(id, ref) VALUES(1, 2)`,
+ }
+ r, err := c.Execute(stmts, false, false)
+ if err != nil {
+ t.Fatalf("failed to execute FK test statement: %s", err.Error())
+ }
+ if exp, got := `[{"last_insert_id":1,"rows_affected":1}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+
+ // Explicitly enable constraints.
+ if err := c.EnableFKConstraints(true); err != nil {
+ t.Fatalf("failed to enable foreign key constraints: %s", err.Error())
+ }
+
+ // Check constraints
+ fk, err = c.FKConstraints()
+ if err != nil {
+ t.Fatalf("failed to check FK constraints: %s", err.Error())
+ }
+ if fk != true {
+ t.Fatal("FK constraints are not enabled")
+ }
+
+ stmts = []string{
+ `INSERT INTO foo(id, ref) VALUES(1, 3)`,
+ }
+ r, err = c.Execute(stmts, false, false)
+ if err != nil {
+ t.Fatalf("failed to execute FK test statement: %s", err.Error())
+ }
+ if exp, got := `[{"error":"UNIQUE constraint failed: foo.id"}]`, asJSON(r); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+}
+
+func testBackup(t *testing.T, c *Conn) {
+ _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
+ if err != nil {
+ t.Fatalf("failed to create table: %s", err.Error())
+ }
+
+ stmts := []string{
+ `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
+ `INSERT INTO foo(id, name) VALUES(2, "fiona")`,
+ `INSERT INTO foo(id, name) VALUES(3, "fiona")`,
+ `INSERT INTO foo(id, name) VALUES(4, "fiona")`,
+ }
+ _, err = c.Execute(stmts, true, false)
+ if err != nil {
+ t.Fatalf("failed to insert records: %s", err.Error())
+ }
+
+ dstDB, path := mustCreateDatabase()
+ defer os.Remove(path)
+ dstConn, err := dstDB.Connect()
+ if err != nil {
+ t.Fatalf("failed to connect to destination database: %s", err.Error())
+ }
+
+ err = c.Backup(dstConn)
+ if err != nil {
+ t.Fatalf("failed to backup database: %s", err.Error())
+ }
+
+ ro, err := dstConn.Query([]string{`SELECT * FROM foo`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to query table: %s", err.Error())
+ }
+ if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"fiona"],[3,"fiona"],[4,"fiona"]]}]`, asJSON(ro); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+}
+
+func testLoad(t *testing.T, c *Conn) {
+ dstDB, path := mustCreateDatabase()
+ defer os.Remove(path)
+ dstConn, err := dstDB.Connect()
+ if err != nil {
+ t.Fatalf("failed to connect to destination database: %s", err.Error())
+ }
+ defer dstConn.Close()
+
+ _, err = dstConn.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
+ if err != nil {
+ t.Fatalf("failed to create table: %s", err.Error())
+ }
+
+ stmts := []string{
+ `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
+ `INSERT INTO foo(id, name) VALUES(2, "fiona")`,
+ `INSERT INTO foo(id, name) VALUES(3, "fiona")`,
+ `INSERT INTO foo(id, name) VALUES(4, "fiona")`,
+ }
+ _, err = dstConn.Execute(stmts, true, false)
+ if err != nil {
+ t.Fatalf("failed to insert records: %s", err.Error())
+ }
+
+ ro, err := dstConn.Query([]string{`SELECT * FROM foo`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to query table: %s", err.Error())
+ }
+ if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"fiona"],[3,"fiona"],[4,"fiona"]]}]`, asJSON(ro); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+
+ err = c.Load(dstConn)
+ if err != nil {
+ t.Fatalf("failed to load database: %s", err.Error())
+ }
+
+ ro, err = c.Query([]string{`SELECT * FROM foo`}, false, false)
+ if err != nil {
+ t.Fatalf("failed to query table: %s", err.Error())
+ }
+ if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"fiona"],[3,"fiona"],[4,"fiona"]]}]`, asJSON(ro); exp != got {
+ t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
+ }
+}
+
+// mustExecute executes a spath, and panics on failure. Used for statements
+// that should never fail, even taking into account test setup.
+func mustExecute(c *Conn, stmt string) {
+ _, err := c.Execute([]string{stmt}, false, false)
+ if err != nil {
+ panic(fmt.Sprintf("failed to execute statement: %s", err.Error()))
+ }
+}
+
+// mustQuery executes a statement, and panics on failure. Used for statements
+// that should never fail, even taking into account test setup.
+func mustQuery(c *Conn, stmt string) {
+ _, err := c.Query([]string{stmt}, false, false)
+ if err != nil {
+ panic(fmt.Sprintf("failed to query: %s", err.Error()))
+ }
+}
+
+func mustCreateDatabase() (*DB, string) {
+ path := mustTempFilename()
+ db, err := New(mustTempFilename(), "", false)
+ if err != nil {
+ panic("failed to create in-memory database")
+ }
+ return db, path
+}
+
+func mustCreateInMemoryDatabase() *DB {
+ db, err := New(mustTempFilename(), "", true)
+ if err != nil {
+ panic("failed to create in-memory database")
+ }
+ return db
+}
+
+func mustTempFilename() string {
+ fd, err := ioutil.TempFile("", "rqlilte-tmp-test-")
+ if err != nil {
+ panic(err.Error())
+ }
+ if err := fd.Close(); err != nil {
+ panic(err.Error())
+ }
+ if err := os.Remove(fd.Name()); err != nil {
+ panic(err.Error())
+ }
+ return fd.Name()
+}
+
+func asJSON(v interface{}) string {
+ b, err := json.Marshal(v)
+ if err != nil {
+ panic("failed to JSON marshal value")
+ }
+ return string(b)
+}
--
Gitblit v1.8.0