From d70649cfb61e64fabce40199ad1d53d6a4973f0e Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期一, 31 五月 2021 16:23:28 +0800
Subject: [PATCH] no business
---
agent.go | 437 ++----------------------------------------------
/dev/null | 83 ---------
config.go | 10
3 files changed, 27 insertions(+), 503 deletions(-)
diff --git a/agent.go b/agent.go
index a2ae892..75be578 100644
--- a/agent.go
+++ b/agent.go
@@ -22,44 +22,34 @@
"errors"
"fmt"
"github.com/hashicorp/memberlist"
- "io/ioutil"
"net"
- "os"
"strconv"
- "sync"
- //"os"
- "strings"
"time"
- "basic.com/valib/serf.git/serf"
"basic.com/valib/serf.git/cmd/serf/command/agent"
+ "basic.com/valib/serf.git/serf"
//"github.com/apache/servicecomb-service-center/pkg/log"
"basic.com/valib/logger.git"
- "github.com/satori/go.uuid"
-)
-
-const (
- QueryEventGetDB = "GetDatabase"
- QueryEventUpdateDBData = "UpdateDBData"
- UserEventSyncSql = "SyncSql"
- UserEventSyncDbTablePersonCache = "SyncCache"
- UserEventSyncVirtualIp = "SyncVirtualIp" //婕傜Щip淇敼
)
// Agent warps the serf agent
type Agent struct {
*agent.Agent
- conf *Config
- readyCh chan struct{}
- errorCh chan error
+ conf *Config
+ readyCh chan struct{}
+ errorCh chan error
+ handleEv HandleEventFunc
}
+//鐢ㄦ埛鑷畾涔変簨浠跺鐞�
+type HandleEventFunc func(event serf.Event)
+
type NodeInfo struct {
- ClusterID string `json:"clusterID"`
- NodeID string `json:"nodeID"`
- NodeAddress string `json:"nodeAddress"`
- IsAlive int `json:"isAlive"`
+ ClusterID string `json:"clusterID"`
+ NodeID string `json:"nodeID"`
+ NodeAddress string `json:"nodeAddress"`
+ IsAlive int `json:"isAlive"`
}
// Create create serf agent with config
@@ -95,6 +85,12 @@
}, nil
}
+func (a *Agent) RegisterHandleEventFunc(f HandleEventFunc) {
+ if f != nil {
+ a.handleEv = f
+ }
+}
+
// Start agent
func (a *Agent) Start(ctx context.Context) {
err := a.Agent.Start()
@@ -116,234 +112,14 @@
go a.BroadcastMemberlist(BroadcastInterval * time.Second)
}
-var SyncDbTablePersonCacheChan = make(chan []byte,512)
-var SyncVirtualIpChan = make(chan []byte, 512)
-
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
// when the startup mode is "ModeCluster",
// used for logical grouping of serf nodes
func (a *Agent) HandleEvent(event serf.Event) {
-
- switch ev := event.(type) {
- case serf.UserEvent:
- if ev.Name == UserEventSyncSql {
- logger.Info("receive a UserEventSyncSql event")
- var sqlUe SqlUserEvent
- err := json.Unmarshal(ev.Payload, &sqlUe)
- if err !=nil {
- logger.Error("sqlUe unmarshal err:",err)
- return
- }
-
- logger.Info("ev.LTime:", ev.LTime ,"owner:", sqlUe.Owner, "sql:", sqlUe.Sql)
-
- if sqlUe.Owner != a.conf.NodeName {
- go func() {
- flag, e := ExecuteSqlByGorm(sqlUe.Sql)
- logger.Info("ev.LTime:",ev.LTime,"userEvent exec ",sqlUe.Sql,",Result:",flag,", err:",e)
- logLT := strconv.Itoa(int(ev.LTime))
- logT := time.Now().Format("2006-01-02 15:04:05")
- logSql := strings.ReplaceAll(strings.Join(sqlUe.Sql, ";"), "'", "''")
- logResult := "0"
- if flag {
- logResult = "1"
- }
- logErr := ""
- if e != nil {
- logErr = e.Error()
- }
- ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+sqlUe.Owner+"',"+logResult+",'"+logErr+"')"})
- }()
- }
- } else if ev.Name == UserEventSyncDbTablePersonCache {
- logger.Info("LTime:",ev.LTime,",ev.Payload.len:",len(ev.Payload))
- SyncDbTablePersonCacheChan <- ev.Payload
- } else if ev.Name == UserEventSyncVirtualIp {
- logger.Info("LTime:", ev.LTime, " Recevie virtualIp change")
- SyncVirtualIpChan <- ev.Payload
- }
-
-
- case *serf.Query:
-
- if ev.Name == QueryEventGetDB {
- //bak file and send resp
- filename, err := BakDbFile()
- if err != nil {
- logger.Error("bak db file error!")
- return
- }
- logger.Info(filename)
-
- filebuf, err := ioutil.ReadFile(filename)
- logger.Info("filebuf: ", len(filebuf))
- if err != nil {
- logger.Error("file to []bytes error: %s\n", err)
- return
- }
-
- err = os.Remove(filename)
- if err != nil {
- logger.Error("remove file%s\n failed", filename)
- return
- }
-
- logger.Info("query payload: ", len(ev.Payload))
- if query, ok := event.(*serf.Query); ok {
- if err := query.Respond(filebuf); err != nil {
- logger.Error("err: %s\n", err)
- return
- }
- }
- } else if ev.Name == QueryEventUpdateDBData {
- //logger.Info(string(ev.Payload))
- //var tmpstringslice []string
- //tmpstringslice = append(tmpstringslice, string(ev.Payload))
- //logger.Info(tmpstringslice)
- //rows, err := ExecuteQuerySql(tmpstringslice)
- //if err != nil {
- // logger.Error("err: ", err)
- // return
- //}
- //var rowsReturn []Rows
- //for _, r := range rows {
- // rowsReturn = append(rowsReturn, *r)
- //}
- logger.Info("receive QueryEventUpdateDBData, current node:", a.conf.NodeName)
- var fromP QueryTableDataParam
- err := json.Unmarshal(ev.Payload, &fromP)
- if err !=nil {
- logger.Error("Query tableNames unmarshal err")
- if query, ok := event.(*serf.Query); ok {
- if err := query.Respond([]byte("request unmarshal err")); err != nil {
- logger.Error("query.Respond err: %s\n", err)
- return
- }
- }
- return
- }
- logger.Info("Query tableNames:",fromP.Tables)
- datas, err := ExecuteQueryByGorm(fromP.Tables)
- if err !=nil {
- logger.Error("queryByGorm err:", err)
- if query, ok := event.(*serf.Query); ok {
- if err := query.Respond([]byte("queryByGorm err")); err != nil {
- logger.Error("query.Respond err: %s\n", err)
- return
- }
- }
- return
- }
- bytesReturn, err := json.Marshal(datas)
- logger.Info("results.len: ", len(bytesReturn))
-
- var targetNode *memberlist.Node
- nodes := a.Serf().Memberlist().Members()
- if nodes != nil && len(nodes) > 0 {
- for _,n :=range nodes {
- if n.Name == fromP.From {
- targetNode = n
- break
- }
- }
- }
- logger.Debug("targetNode:",targetNode.Name)
- if targetNode !=nil {
- go func() {
- addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort)
- sendErr := rawSendTcpMsg(addr, bytesReturn)
-
- logLT := strconv.Itoa(int(ev.LTime))
- logT := time.Now().Format("2006-01-02 15:04:05")
- logSql := strings.ReplaceAll("QueryEventUpdateDBData from "+targetNode.Name,"'","''")
- logResult := "0"
- logErr := ""
- if sendErr ==nil {
- logResult = "1"
- logger.Debug("sendToTcp success")
- } else {
- logErr = sendErr.Error()
- logger.Debug("sendToTcp err:",sendErr)
- }
-
-
- ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+targetNode.Name+"',"+logResult+",'"+logErr+"')"})
- }()
- } else {
- logger.Debug("targetNode is nil")
- }
-
- //if query, ok := event.(*serf.Query); ok {
- // if err := query.Respond(bytesReturn); err != nil {
- // logger.Error("err: %s\n", err)
- // return
- // }
- //}
- }
- case serf.MemberEvent:
- if event.EventType() == serf.EventMemberLeave {
- if ev.Members !=nil && len(ev.Members) ==1 {
- leaveMember := ev.Members[0]
- leaveSql := "update cluster_node set isDelete=1 where node_id='"+leaveMember.Name+"'"
- flag,e := ExecuteSqlByGorm([]string{ leaveSql })
-
- logger.Info("EventMemberLeave,current Members:",ev.Members)
- logLT := ""
- logT := time.Now().Format("2006-01-02 15:04:05")
- logSql := strings.ReplaceAll(leaveSql, "'","''")
- logResult := "0"
- if flag {
- logResult = "1"
- }
- logErr := ""
- if e != nil {
- logErr = e.Error()
- }
- ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+leaveMember.Name+"',"+logResult+",'"+logErr+"')"})
- }
- return
- } else if event.EventType() == serf.EventMemberJoin {
- if ev.Members !=nil && len(ev.Members) ==1 {
- leaveMember := ev.Members[0]
- joinSql := "update cluster_node set isDelete=0 where node_id='"+leaveMember.Name+"'"
- flag,e := ExecuteSqlByGorm([]string{joinSql})
-
- logger.Info("EventMemberJoin,current Members:",ev.Members)
- logLT := ""
- logT := time.Now().Format("2006-01-02 15:04:05")
- logSql := strings.ReplaceAll(joinSql, "'", "''")
- logResult := "0"
- if flag {
- logResult = "1"
- }
- logErr := ""
- if e != nil {
- logErr = e.Error()
- }
- ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+leaveMember.Name+"',"+logResult+",'"+logErr+"')"})
- }
- return
- }
-
-
- default:
- logger.Warn("Unknown event type: %s\n", ev.EventType().String())
+ if a.handleEv != nil {
+ a.handleEv(event)
}
-
- //if event.EventType() != serf.EventMemberJoin {
- // logger.Info("event.EventType() != serf.EventMemberJoin")
- // return
- //}
- //
- //if a.conf.Mode == ModeCluster {
- // if len(a.GroupMembers(a.conf.ClusterID)) < groupExpect {
- // logger.Error("len(a.GroupMembers(a.conf.ClusterID)) < groupExpect")
- // return
- // }
- //}
- //a.DeregisterEventHandler(a)
- //close(a.readyCh)
}
@@ -499,176 +275,9 @@
return
}
-//GetDbFromCluster get the newest database after join cluster
-//dbPathWrite the path where to write after got a database,
-func (a *Agent) GetDbFromCluster(dbPathWrite string) {
- //members: get name of first member
- mbs := a.GroupMembers(a.conf.ClusterID)
- var specmembername string
- for _, m := range mbs {
- if m.Addr.String() != a.conf.BindAddr {
- specmembername = m.Name
- break
- }
- }
- logger.Info(specmembername)
-
- //query: get db file.
- params := serf.QueryParam{
- FilterNodes: strings.Fields(specmembername),
- }
-
- resp, err := a.Query(QueryEventGetDB, []byte(""), ¶ms)
- if err == nil || !strings.Contains(err.Error(), "cannot contain") {
- logger.Error("err: ", err)
- }
-
- go func() {
- respCh := resp.ResponseCh()
- for {
- select {
- case r := <-respCh:
- logger.Info("x length is: ", len(r.Payload))
-
- // // byte to file.
- SerfDbConn.Close()
- SerfDbConn = nil
- err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644)
- if err != nil {
- logger.Error("query byte to file error!", err)
- }
- err := InitDbConn("")
- if err != nil {
- logger.Error("create db conn of test.db error: ", err)
- }
- return
- }
- }
- }()
-}
-
-type QueryTableDataParam struct {
- Tables []string `json:"tables"`
- From string `json:"from"`
-}
-
-var QueryTcpResponseChan = make(chan []byte)
-//GetDbFromCluster get the newest database after join cluster
-//dbPathWrite the path where to write after got a database,
-func (a *Agent) GetTableDataFromCluster(tableNames []string, timeout time.Duration) (*[]string,error) {
- //members: get name of first member
- mbs := a.GroupMembers(a.conf.ClusterID)
- var specmembername string
- for _, m := range mbs {
- logger.Info("m",m)
- if m.Name != a.conf.NodeName { //鍓嶇紑锛欴SVAD:鍒嗘瀽鏈嶅姟鍣� DSPAD:杩涘嚭鍏ad
- if strings.HasPrefix(a.conf.NodeName, "DSVAD"){
- if strings.HasPrefix(m.Name, "DSVAD") {
- specmembername = m.Name
- break
- }
- }else{
- specmembername = m.Name
- break
- }
- }
- }
- logger.Info("mbs:",mbs,"a.conf.BindAddr:",a.conf.BindAddr,"specmembername:",specmembername)
- if specmembername == "" {//濡傛灉鏈壘鍒扮洰鏍囪妭鐐癸紝璇存槑褰撳墠闆嗙兢鍐呴櫎浜嗘湰鑺傜偣锛屾病鏈夊叾浠栧彲鐢ㄨ妭鐐�
- return nil,errors.New("specmembername not found")
- }
-
- //query: get db file.
- params := serf.QueryParam{
- FilterNodes: strings.Fields(specmembername),
- }
-
- //get db tables
- var fromP = QueryTableDataParam{
- Tables: tableNames,
- From: a.conf.NodeName,
- }
- tBytes, _ := json.Marshal(fromP)
-
- resp, err := a.Query(QueryEventUpdateDBData, tBytes, ¶ms)
- if err == nil || !strings.Contains(err.Error(), "cannot contain") {
- logger.Error("err: ", err)
- }
- logger.Info("Query.resp.err:",err,"resp:",resp)
-
- var dumpSqls []string
-
- var wg sync.WaitGroup
- wg.Add(1)
- ticker := time.NewTicker(timeout)
- go func(tk *time.Ticker) {
- defer tk.Stop()
- defer wg.Done()
- for {
- select {
- case <-tk.C:
- return
- case msg := <- QueryTcpResponseChan:
- logger.Info("Query response's len:", len(msg))
- err := json.Unmarshal(msg, &dumpSqls)
- if err == nil {
- logger.Error("dumpSql:", dumpSqls)
- logger.Error("data dump success")
- }
- return
- }
- }
- }(ticker)
- wg.Wait()
- return &dumpSqls,nil
-
- //r, err = c.Query([]string{query}, false, false)
- //if err != nil {
- // return err
- //}
- //for _, x := range r[0].Values {
- // y := logger.Info("%s;\n", x[0].(string))
- // if _, err := w.Write([]byte(y)); err != nil {
- // return err
- // }
- //}
-
-}
-
-type SqlUserEvent struct {
- Owner string `json:"owner"`
- Sql []string `json:"sql"`
-}
-
-//SyncSql boardcast sql to cluster
-func (a *Agent) SyncSql(sqlOp []string) {
- // event : use to send command to operate db.
- var sqlUe = SqlUserEvent{
- Owner: a.conf.NodeName,
- Sql: sqlOp,
- }
- ueB, err := json.Marshal(sqlUe)
- if err !=nil {
- logger.Error("sqlUE marshal err:",err)
- return
- }
- err = a.UserEvent(UserEventSyncSql, ueB, false)
- if err == nil || !strings.Contains(err.Error(), "cannot contain") {
- logger.Error("err: ", err)
- }
-}
-
-//鏇存柊鍚屾搴撶殑姣斿缂撳瓨
-func (a *Agent) SyncDbTablePersonCache(b []byte) {
- err := a.UserEvent(UserEventSyncDbTablePersonCache, b, false)
- if err !=nil{
- logger.Error("UserEventSyncDbTablePersonCache err:",err)
- }
-}
-
//Init serf Init
-func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string) (*Agent, error) {
- agent, err := InitNode(clusterID, password, nodeID, snapshotPath)
+func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string, hef HandleEventFunc) (*Agent, error) {
+ agent, err := InitNode(clusterID, password, nodeID, snapshotPath, hef)
if err != nil {
logger.Error("InitNode failed, error: %s", err)
return agent, err
@@ -684,7 +293,7 @@
}
//InitNode web鍚庡彴鏀跺埌鍒涘缓闆嗙兢鐨勮姹傦紝
-func InitNode(clusterID string, password string, nodeID string, snapshotPath string) (*Agent, error) {
+func InitNode(clusterID string, password string, nodeID string, snapshotPath string, hef HandleEventFunc) (*Agent, error) {
conf := DefaultConfig()
logger.Info("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
conf.ClusterID = clusterID
diff --git a/config.go b/config.go
index 81b019b..643226a 100644
--- a/config.go
+++ b/config.go
@@ -50,8 +50,6 @@
ReplayOnJoinDefault = false
SnapshotPathDefault = "./serfSnapShot"
MaxEventBufferCount = 2048
-
- TcpTransportPort = 30194 //tcp浼犺緭澶ф暟鎹噺鎺ュ彛
)
// DefaultConfig default config
@@ -73,14 +71,14 @@
type Config struct {
// config from serf agent
*agent.Config
- Mode string `json:"mode"`
+ Mode string `json:"mode"`
// name to group members into cluster
- ClusterID string `json:"cluster_name"`
+ ClusterID string `json:"cluster_name"`
// port to communicate between cluster members
- ClusterPort int `yaml:"cluster_port"`
- RPCPort int `yaml:"-"`
+ ClusterPort int `yaml:"cluster_port"`
+ RPCPort int `yaml:"-"`
}
// readConfigFile reads configuration from config file
diff --git a/db.go b/db.go
deleted file mode 100644
index cce6d76..0000000
--- a/db.go
+++ /dev/null
@@ -1,514 +0,0 @@
-// Package db exposes a lightweight abstraction over the SQLite code.
-// It performs some basic mapping of lower-level types to rqlite types.
-package syncdb
-
-import (
- "database/sql/driver"
- "expvar"
- "fmt"
- "io"
- "net/url"
- "strings"
- "time"
-
- "github.com/mattn/go-sqlite3"
-)
-
-const bkDelay = 250
-
-const (
- fkChecks = "PRAGMA foreign_keys"
- fkChecksEnabled = "PRAGMA foreign_keys=ON"
- fkChecksDisabled = "PRAGMA foreign_keys=OFF"
-
- numExecutions = "executions"
- numExecutionErrors = "execution_errors"
- numQueries = "queries"
- numETx = "execute_transactions"
- numQTx = "query_transactions"
-)
-
-// DBVersion is the SQLite version.
-var DBVersion string
-
-// stats captures stats for the DB layer.
-var stats *expvar.Map
-
-func init() {
- DBVersion, _, _ = sqlite3.Version()
- stats = expvar.NewMap("db")
- stats.Add(numExecutions, 0)
- stats.Add(numExecutionErrors, 0)
- stats.Add(numQueries, 0)
- stats.Add(numETx, 0)
- stats.Add(numQTx, 0)
-}
-
-// Result represents the outcome of an operation that changes rows.
-type Result struct {
- LastInsertID int64 `json:"last_insert_id,omitempty"`
- RowsAffected int64 `json:"rows_affected,omitempty"`
- Error string `json:"error,omitempty"`
- Time float64 `json:"time,omitempty"`
-}
-
-// Rows represents the outcome of an operation that returns query data.
-type Rows struct {
- Columns []string `json:"columns,omitempty"`
- Types []string `json:"types,omitempty"`
- Values [][]interface{} `json:"values,omitempty"`
- Error string `json:"error,omitempty"`
- Time float64 `json:"time,omitempty"`
-}
-
-// DB is the SQL database.
-type DB struct {
- path string // Path to database file.
- dsnQuery string // DSN query params, if any.
- memory bool // In-memory only.
- fqdsn string // Fully-qualified DSN for opening SQLite.
-}
-
-// New returns an instance of the database at path. If the database
-// has already been created and opened, this database will share
-// the data of that database when connected.
-func New(path, dsnQuery string, memory bool) (*DB, error) {
- q, err := url.ParseQuery(dsnQuery)
- if err != nil {
- return nil, err
- }
- if memory {
- q.Set("mode", "memory")
- q.Set("cache", "shared")
- }
-
- if !strings.HasPrefix(path, "file:") {
- path = fmt.Sprintf("file:%s", path)
- }
-
- var fqdsn string
- if len(q) > 0 {
- fqdsn = fmt.Sprintf("%s?%s", path, q.Encode())
- } else {
- fqdsn = path
- }
-
- return &DB{
- path: path,
- dsnQuery: dsnQuery,
- memory: memory,
- fqdsn: fqdsn,
- }, nil
-}
-
-// Connect returns a connection to the database.
-func (d *DB) Connect() (*Conn, error) {
- drv := sqlite3.SQLiteDriver{}
- c, err := drv.Open(d.fqdsn)
- if err != nil {
- return nil, err
- }
-
- return &Conn{
- sqlite: c.(*sqlite3.SQLiteConn),
- }, nil
-}
-
-// Conn represents a connection to a database. Two Connection objects
-// to the same database are READ_COMMITTED isolated.
-type Conn struct {
- sqlite *sqlite3.SQLiteConn
-}
-
-// TransactionActive returns whether a transaction is currently active
-// i.e. if the database is NOT in autocommit mode.
-func (c *Conn) TransactionActive() bool {
- return !c.sqlite.AutoCommit()
-}
-
-// AbortTransaction aborts -- rolls back -- any active transaction. Calling code
-// should know exactly what it is doing if it decides to call this function. It
-// can be used to clean up any dangling state that may result from certain
-// error scenarios.
-func (c *Conn) AbortTransaction() error {
- _, err := c.Execute([]string{`ROLLBACK`}, false, false)
- return err
-}
-
-// Execute executes queries that modify the database.
-func (c *Conn) Execute(queries []string, tx, xTime bool) ([]*Result, error) {
- stats.Add(numExecutions, int64(len(queries)))
- if tx {
- stats.Add(numETx, 1)
- }
-
- type Execer interface {
- Exec(query string, args []driver.Value) (driver.Result, error)
- }
-
- var allResults []*Result
- err := func() error {
- var execer Execer
- var rollback bool
- var t driver.Tx
- var err error
-
- // Check for the err, if set rollback.
- defer func() {
- if t != nil {
- if rollback {
- t.Rollback()
- return
- }
- t.Commit()
- }
- }()
-
- // handleError sets the error field on the given result. It returns
- // whether the caller should continue processing or break.
- handleError := func(result *Result, err error) bool {
- stats.Add(numExecutionErrors, 1)
-
- result.Error = err.Error()
- allResults = append(allResults, result)
- if tx {
- rollback = true // Will trigger the rollback.
- return false
- }
- return true
- }
-
- execer = c.sqlite
-
- // Create the correct execution object, depending on whether a
- // transaction was requested.
- if tx {
- t, err = c.sqlite.Begin()
- if err != nil {
- return err
- }
- }
-
- // Execute each query.
- for _, q := range queries {
- if q == "" {
- continue
- }
-
- result := &Result{}
- start := time.Now()
-
- r, err := execer.Exec(q, nil)
- if err != nil {
- if handleError(result, err) {
- continue
- }
- break
- }
- if r == nil {
- continue
- }
-
- lid, err := r.LastInsertId()
- if err != nil {
- if handleError(result, err) {
- continue
- }
- break
- }
- result.LastInsertID = lid
-
- ra, err := r.RowsAffected()
- if err != nil {
- if handleError(result, err) {
- continue
- }
- break
- }
- result.RowsAffected = ra
- if xTime {
- result.Time = time.Now().Sub(start).Seconds()
- }
- allResults = append(allResults, result)
- }
-
- return nil
- }()
-
- return allResults, err
-}
-
-// Query executes queries that return rows, but don't modify the database.
-func (c *Conn) Query(queries []string, tx, xTime bool) ([]*Rows, error) {
- stats.Add(numQueries, int64(len(queries)))
- if tx {
- stats.Add(numQTx, 1)
- }
-
- type Queryer interface {
- Query(query string, args []driver.Value) (driver.Rows, error)
- }
-
- var allRows []*Rows
- err := func() (err error) {
- var queryer Queryer
- var t driver.Tx
- defer func() {
- // XXX THIS DOESN'T ACTUALLY WORK! Might as WELL JUST COMMIT?
- if t != nil {
- if err != nil {
- t.Rollback()
- return
- }
- t.Commit()
- }
- }()
-
- queryer = c.sqlite
-
- // Create the correct query object, depending on whether a
- // transaction was requested.
- if tx {
- t, err = c.sqlite.Begin()
- if err != nil {
- return err
- }
- }
-
- for _, q := range queries {
- if q == "" {
- continue
- }
-
- rows := &Rows{}
- start := time.Now()
-
- rs, err := queryer.Query(q, nil)
- if err != nil {
- rows.Error = err.Error()
- allRows = append(allRows, rows)
- continue
- }
- defer rs.Close()
- columns := rs.Columns()
-
- rows.Columns = columns
- rows.Types = rs.(*sqlite3.SQLiteRows).DeclTypes()
- dest := make([]driver.Value, len(rows.Columns))
- for {
- err := rs.Next(dest)
- if err != nil {
- if err != io.EOF {
- rows.Error = err.Error()
- }
- break
- }
-
- values := normalizeRowValues(dest, rows.Types)
- rows.Values = append(rows.Values, values)
- }
- if xTime {
- rows.Time = time.Now().Sub(start).Seconds()
- }
- allRows = append(allRows, rows)
- }
-
- return nil
- }()
-
- return allRows, err
-}
-
-// EnableFKConstraints allows control of foreign key constraint checks.
-func (c *Conn) EnableFKConstraints(e bool) error {
- q := fkChecksEnabled
- if !e {
- q = fkChecksDisabled
- }
- _, err := c.sqlite.Exec(q, nil)
- return err
-}
-
-// FKConstraints returns whether FK constraints are set or not.
-func (c *Conn) FKConstraints() (bool, error) {
- r, err := c.sqlite.Query(fkChecks, nil)
- if err != nil {
- return false, err
- }
-
- dest := make([]driver.Value, len(r.Columns()))
- types := r.(*sqlite3.SQLiteRows).DeclTypes()
- if err := r.Next(dest); err != nil {
- return false, err
- }
-
- values := normalizeRowValues(dest, types)
- if values[0] == int64(1) {
- return true, nil
- }
- return false, nil
-}
-
-// Load loads the connected database from the database connected to src.
-// It overwrites the data contained in this database. It is the caller's
-// responsibility to ensure that no other connections to this database
-// are accessed while this operation is in progress.
-func (c *Conn) Load(src *Conn) error {
- return copyDatabase(c.sqlite, src.sqlite)
-}
-
-// Backup writes a snapshot of the database over the given database
-// connection, erasing all the contents of the destination database.
-// The consistency of the snapshot is READ_COMMITTED relative to any
-// other connections currently open to this database. The caller must
-// ensure that all connections to the destination database are not
-// accessed during this operation.
-func (c *Conn) Backup(dst *Conn) error {
- return copyDatabase(dst.sqlite, c.sqlite)
-}
-
-// Dump writes a snapshot of the database in SQL text format. The consistency
-// of the snapshot is READ_COMMITTED relative to any other connections
-// currently open to this database.
-func (c *Conn) Dump(w io.Writer) error {
- if _, err := w.Write([]byte("PRAGMA foreign_keys=OFF;\nBEGIN TRANSACTION;\n")); err != nil {
- return err
- }
-
- // Get the schema.
- query := `SELECT "name", "type", "sql" FROM "sqlite_master"
- WHERE "sql" NOT NULL AND "type" == 'table' ORDER BY "name"`
- rows, err := c.Query([]string{query}, false, false)
- if err != nil {
- return err
- }
- row := rows[0]
- for _, v := range row.Values {
- table := v[0].(string)
- var stmt string
-
- if table == "sqlite_sequence" {
- stmt = `DELETE FROM "sqlite_sequence";`
- } else if table == "sqlite_stat1" {
- stmt = `ANALYZE "sqlite_master";`
- } else if strings.HasPrefix(table, "sqlite_") {
- continue
- } else {
- stmt = v[2].(string)
- }
-
- if _, err := w.Write([]byte(fmt.Sprintf("%s;\n", stmt))); err != nil {
- return err
- }
-
- tableIndent := strings.Replace(table, `"`, `""`, -1)
- query = fmt.Sprintf(`PRAGMA table_info("%s")`, tableIndent)
- r, err := c.Query([]string{query}, false, false)
- if err != nil {
- return err
- }
- var columnNames []string
- for _, w := range r[0].Values {
- columnNames = append(columnNames, fmt.Sprintf(`'||quote("%s")||'`, w[1].(string)))
- }
-
- query = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' FROM "%s";`,
- tableIndent,
- strings.Join(columnNames, ","),
- tableIndent)
- r, err = c.Query([]string{query}, false, false)
- if err != nil {
- return err
- }
- for _, x := range r[0].Values {
- y := fmt.Sprintf("%s;\n", x[0].(string))
- if _, err := w.Write([]byte(y)); err != nil {
- return err
- }
- }
- }
-
- // Do indexes, triggers, and views.
- query = `SELECT "name", "type", "sql" FROM "sqlite_master"
- WHERE "sql" NOT NULL AND "type" IN ('index', 'trigger', 'view')`
- rows, err = c.Query([]string{query}, false, false)
- if err != nil {
- return err
- }
- row = rows[0]
- for _, v := range row.Values {
- if _, err := w.Write([]byte(fmt.Sprintf("%s;\n", v[2]))); err != nil {
- return err
- }
- }
-
- if _, err := w.Write([]byte("COMMIT;\n")); err != nil {
- return err
- }
-
- return nil
-}
-
-// Close closes the connection.
-func (c *Conn) Close() error {
- if c != nil {
- return c.sqlite.Close()
- }
- return nil
-}
-
-func copyDatabase(dst *sqlite3.SQLiteConn, src *sqlite3.SQLiteConn) error {
- bk, err := dst.Backup("main", src, "main")
- if err != nil {
- return err
- }
-
- for {
- done, err := bk.Step(-1)
- if err != nil {
- bk.Finish()
- return err
- }
- if done {
- break
- }
- time.Sleep(bkDelay * time.Millisecond)
- }
-
- return bk.Finish()
-}
-
-// normalizeRowValues performs some normalization of values in the returned rows.
-// Text values come over (from sqlite-go) as []byte instead of strings
-// for some reason, so we have explicitly convert (but only when type
-// is "text" so we don't affect BLOB types)
-func normalizeRowValues(row []driver.Value, types []string) []interface{} {
- values := make([]interface{}, len(types))
- for i, v := range row {
- if isTextType(types[i]) {
- switch val := v.(type) {
- case []byte:
- values[i] = string(val)
- default:
- values[i] = val
- }
- } else {
- values[i] = v
- }
- }
- return values
-}
-
-// isTextType returns whether the given type has a SQLite text affinity.
-// http://www.sqlite.org/datatype3.html
-func isTextType(t string) bool {
- return t == "text" ||
- t == "json" ||
- t == "" ||
- strings.HasPrefix(t, "varchar") ||
- strings.HasPrefix(t, "varying character") ||
- strings.HasPrefix(t, "nchar") ||
- strings.HasPrefix(t, "native character") ||
- strings.HasPrefix(t, "nvarchar") ||
- strings.HasPrefix(t, "clob")
-}
diff --git a/db_test.go b/db_test.go
deleted file mode 100644
index 42a7da3..0000000
--- a/db_test.go
+++ /dev/null
@@ -1,880 +0,0 @@
-package syncdb
-
-import (
- "encoding/json"
- "fmt"
- "io/ioutil"
- "os"
- "strings"
- "testing"
-
- "github.com/rqlite/rqlite/testdata/chinook"
-)
-
-/*
- * Lowest-layer database tests
- */
-
-func TestNewDBOnDisk(t *testing.T) {
- t.Parallel()
-
- db, err := New(mustTempFilename(), "", false)
- if err != nil {
- t.Fatalf("failed to create new database: %s", err.Error())
- }
- if db == nil {
- t.Fatal("database is nil")
- }
-}
-
-func TestNewDBInMemory(t *testing.T) {
- t.Parallel()
-
- db, err := New(mustTempFilename(), "", true)
- if err != nil {
- t.Fatalf("failed to create new database: %s", err.Error())
- }
- if db == nil {
- t.Fatal("database is nil")
- }
-}
-
-func TestDBCreateOnDiskConnection(t *testing.T) {
- t.Parallel()
-
- db, path := mustCreateDatabase()
- defer os.Remove(path)
-
- conn, err := db.Connect()
- if err != nil {
- t.Fatalf("failed to create connection to on-disk database: %s", err.Error())
- }
- if conn == nil {
- t.Fatal("connection to on-disk database is nil")
- }
- if err := conn.Close(); err != nil {
- t.Fatalf("failed to close on-disk connection: %s", err.Error())
- }
-}
-
-func TestDBCreateInMemoryConnection(t *testing.T) {
- t.Parallel()
-
- db := mustCreateInMemoryDatabase()
-
- conn, err := db.Connect()
- if err != nil {
- t.Fatalf("failed to create connection to in-memory database: %s", err.Error())
- }
- if conn == nil {
- t.Fatal("connection to in-memory database is nil")
- }
- if err := conn.Close(); err != nil {
- t.Fatalf("failed to close in-memory connection: %s", err.Error())
- }
-}
-
-func TestDumpOnDisk(t *testing.T) {
- t.Parallel()
-
- db, path := mustCreateDatabase()
- defer os.Remove(path)
- c, err := db.Connect()
- if err != nil {
- t.Fatalf("failed to create connection to on-disk database: %s", err.Error())
- }
-
- _, err = c.Execute([]string{chinook.DB}, false, false)
- if err != nil {
- t.Fatalf("failed to load chinook dump: %s", err.Error())
- }
-
- var b strings.Builder
- if err := c.Dump(&b); err != nil {
- t.Fatalf("failed to dump database: %s", err.Error())
- }
-
- if b.String() != chinook.DB {
- t.Fatal("dumped database does not equal entered database")
- }
-}
-
-func TestDumpInMemory(t *testing.T) {
- t.Parallel()
-
- db := mustCreateInMemoryDatabase()
- c, err := db.Connect()
- if err != nil {
- t.Fatalf("failed to create connection to on-disk database: %s", err.Error())
- }
-
- _, err = c.Execute([]string{chinook.DB}, false, false)
- if err != nil {
- t.Fatalf("failed to load chinook dump: %s", err.Error())
- }
-
- var b strings.Builder
- if err := c.Dump(&b); err != nil {
- t.Fatalf("failed to dump database: %s", err.Error())
- }
-
- if b.String() != chinook.DB {
- t.Fatal("dumped database does not equal entered database")
- }
-}
-
-type testF func(test *testing.T, c *Conn)
-
-var dbTestfunctions []testF = []testF{
- testTableCreation,
- testSQLiteMasterTable,
- testTextTypes,
- testEmptyStatements,
- testSimpleSingleStatements,
- testSimpleJoinStatements,
- testSimpleSingleConcatStatements,
- testSimpleMultiStatements,
- testSimpleSingleMultiLineStatements,
- testSimpleFailingStatementsExecute,
- testSimpleFailingStatementsQuery,
- testSimplePragmaTableInfo,
- testCommonTableExpressions,
- testForeignKeyConstraints,
- testUniqueConstraints,
- testActiveTransaction,
- testAbortTransaction,
- testPartialFail,
- testSimpleTransaction,
- testPartialFailTransaction,
- testBackup,
- testLoad,
-}
-
-func TestDatabaseInMemory(t *testing.T) {
- t.Parallel()
-
- for _, f := range dbTestfunctions {
- dbInMem := mustCreateInMemoryDatabase()
- connInMem, err := dbInMem.Connect()
- if err != nil {
- t.Fatalf("failed to create connection to in-memory database: %s", err.Error())
- }
-
- f(t, connInMem)
- connInMem.Close()
- }
-}
-
-func TestDatabaseOnDisk(t *testing.T) {
- t.Parallel()
-
- for _, f := range dbTestfunctions {
- dbDisk, path := mustCreateDatabase()
- connDisk, err := dbDisk.Connect()
- if err != nil {
- t.Fatalf("failed to create connection to on-disk database: %s", err.Error())
- }
-
- f(t, connDisk)
- connDisk.Close()
- os.Remove(path)
- }
-}
-
-func testTableCreation(t *testing.T, c *Conn) {
- _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
- if err != nil {
- t.Fatalf("failed to create table: %s", err.Error())
- }
-
- r, err := c.Query([]string{"SELECT * FROM foo"}, false, false)
- if err != nil {
- t.Fatalf("failed to query empty table: %s", err.Error())
- }
- if exp, got := `[{"columns":["id","name"],"types":["integer","text"]}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query, expected %s, got %s", exp, got)
- }
-}
-
-func testSQLiteMasterTable(t *testing.T, c *Conn) {
- _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
- if err != nil {
- t.Fatalf("failed to create table: %s", err.Error())
- }
-
- r, err := c.Query([]string{"SELECT * FROM sqlite_master"}, false, false)
- if err != nil {
- t.Fatalf("failed to query master table: %s", err.Error())
- }
- if exp, got := `[{"columns":["type","name","tbl_name","rootpage","sql"],"types":["text","text","text","int","text"],"values":[["table","foo","foo",2,"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"]]}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query, expected %s, got %s", exp, got)
- }
-}
-
-func testEmptyStatements(t *testing.T, c *Conn) {
- _, err := c.Execute([]string{""}, false, false)
- if err != nil {
- t.Fatalf("failed to execute empty statement: %s", err.Error())
- }
- _, err = c.Execute([]string{";"}, false, false)
- if err != nil {
- t.Fatalf("failed to execute empty statement with semicolon: %s", err.Error())
- }
-}
-
-func testTextTypes(t *testing.T, c *Conn) {
- _, err := c.Execute([]string{"CREATE TABLE foo (c0 VARCHAR(36), c1 JSON, c2 NCHAR, c3 NVARCHAR, c4 CLOB)"}, false, false)
- if err != nil {
- t.Fatalf("failed to create table: %s", err.Error())
- }
- _, err = c.Execute([]string{`INSERT INTO foo(c0, c1, c2, c3, c4) VALUES("fiona", '{"mittens": "foobar"}', "bob", "dana", "declan")`}, false, false)
- if err != nil {
- t.Fatalf("failed to insert record: %s", err.Error())
- }
-
- r, err := c.Query([]string{"SELECT * FROM foo"}, false, false)
- if err != nil {
- t.Fatalf("failed to query: %s", err.Error())
- }
- if exp, got := `[{"columns":["c0","c1","c2","c3","c4"],"types":["varchar(36)","json","nchar","nvarchar","clob"],"values":[["fiona","{\"mittens\": \"foobar\"}","bob","dana","declan"]]}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query, expected %s, got %s", exp, got)
- }
-}
-
-func testSimpleSingleStatements(t *testing.T, c *Conn) {
- _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
- if err != nil {
- t.Fatalf("failed to create table: %s", err.Error())
- }
-
- _, err = c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`}, false, false)
- if err != nil {
- t.Fatalf("failed to insert record: %s", err.Error())
- }
-
- _, err = c.Execute([]string{`INSERT INTO foo(name) VALUES("aoife")`}, false, false)
- if err != nil {
- t.Fatalf("failed to insert record: %s", err.Error())
- }
-
- r, err := c.Query([]string{`SELECT * FROM foo`}, false, false)
- if err != nil {
- t.Fatalf("failed to query table: %s", err.Error())
- }
- if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"aoife"]]}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-
- r, err = c.Query([]string{`SELECT * FROM foo WHERE name="aoife"`}, false, false)
- if err != nil {
- t.Fatalf("failed to query table: %s", err.Error())
- }
- if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[2,"aoife"]]}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-
- r, err = c.Query([]string{`SELECT * FROM foo WHERE name="dana"`}, false, false)
- if err != nil {
- t.Fatalf("failed to query table: %s", err.Error())
- }
- if exp, got := `[{"columns":["id","name"],"types":["integer","text"]}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-
- r, err = c.Query([]string{`SELECT * FROM foo ORDER BY name`}, false, false)
- if err != nil {
- t.Fatalf("failed to query table: %s", err.Error())
- }
- if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[2,"aoife"],[1,"fiona"]]}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-
- r, err = c.Query([]string{`SELECT *,name FROM foo`}, false, false)
- if err != nil {
- t.Fatalf("failed to query table: %s", err.Error())
- }
- if exp, got := `[{"columns":["id","name","name"],"types":["integer","text","text"],"values":[[1,"fiona","fiona"],[2,"aoife","aoife"]]}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-}
-
-func testSimpleJoinStatements(t *testing.T, c *Conn) {
- _, err := c.Execute([]string{"CREATE TABLE names (id INTEGER NOT NULL PRIMARY KEY, name TEXT, ssn TEXT)"}, false, false)
- if err != nil {
- t.Fatalf("failed to create table: %s", err.Error())
- }
-
- _, err = c.Execute([]string{
- `INSERT INTO "names" VALUES(1,'bob','123-45-678')`,
- `INSERT INTO "names" VALUES(2,'tom','111-22-333')`,
- `INSERT INTO "names" VALUES(3,'matt','222-22-333')`,
- }, false, false)
- if err != nil {
- t.Fatalf("failed to insert record: %s", err.Error())
- }
-
- _, err = c.Execute([]string{"CREATE TABLE staff (id INTEGER NOT NULL PRIMARY KEY, employer TEXT, ssn TEXT)"}, false, false)
- if err != nil {
- t.Fatalf("failed to create table: %s", err.Error())
- }
-
- _, err = c.Execute([]string{`INSERT INTO "staff" VALUES(1,'acme','222-22-333')`}, false, false)
- if err != nil {
- t.Fatalf("failed to insert record: %s", err.Error())
- }
-
- r, err := c.Query([]string{`SELECT names.id,name,names.ssn,employer FROM names INNER JOIN staff ON staff.ssn = names.ssn`}, false, false)
- if err != nil {
- t.Fatalf("failed to query table using JOIN: %s", err.Error())
- }
- if exp, got := `[{"columns":["id","name","ssn","employer"],"types":["integer","text","text","text"],"values":[[3,"matt","222-22-333","acme"]]}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-}
-
-func testSimpleSingleConcatStatements(t *testing.T, c *Conn) {
- _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
- if err != nil {
- t.Fatalf("failed to create table: %s", err.Error())
- }
-
- _, err = c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`}, false, false)
- if err != nil {
- t.Fatalf("failed to insert record: %s", err.Error())
- }
-
- r, err := c.Query([]string{`SELECT id || "_bar", name FROM foo`}, false, false)
- if err != nil {
- t.Fatalf("failed to query table: %s", err.Error())
- }
- if exp, got := `[{"columns":["id || \"_bar\"","name"],"types":["","text"],"values":[["1_bar","fiona"]]}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-}
-
-func testSimpleMultiStatements(t *testing.T, c *Conn) {
- _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
- if err != nil {
- t.Fatalf("failed to create table: %s", err.Error())
- }
-
- re, err := c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`, `INSERT INTO foo(name) VALUES("dana")`}, false, false)
- if err != nil {
- t.Fatalf("failed to insert record: %s", err.Error())
- }
- if exp, got := `[{"last_insert_id":1,"rows_affected":1},{"last_insert_id":2,"rows_affected":1}]`, asJSON(re); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-
- ro, err := c.Query([]string{`SELECT * FROM foo`, `SELECT * FROM foo`}, false, false)
- if err != nil {
- t.Fatalf("failed to query empty table: %s", err.Error())
- }
- if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"dana"]]},{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"dana"]]}]`, asJSON(ro); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-}
-
-func testSimpleSingleMultiLineStatements(t *testing.T, c *Conn) {
- _, err := c.Execute([]string{`
-CREATE TABLE foo (
- id INTEGER NOT NULL PRIMARY KEY,
- name TEXT
-)`}, false, false)
- if err != nil {
- t.Fatalf("failed to create table: %s", err.Error())
- }
-
- re, err := c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`, `INSERT INTO foo(name) VALUES("dana")`}, false, false)
- if err != nil {
- t.Fatalf("failed to insert record: %s", err.Error())
- }
- if exp, got := `[{"last_insert_id":1,"rows_affected":1},{"last_insert_id":2,"rows_affected":1}]`, asJSON(re); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-}
-
-func testSimpleFailingStatementsExecute(t *testing.T, c *Conn) {
- r, err := c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`}, false, false)
- if err != nil {
- t.Fatalf("error executing insertion into non-existent table: %s", err.Error())
- }
- if exp, got := `[{"error":"no such table: foo"}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-
- r, err = c.Execute([]string{`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`}, false, false)
- if err != nil {
- t.Fatalf("failed to create table: %s", err.Error())
- }
- if exp, got := `[{}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
- r, err = c.Execute([]string{`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`}, false, false)
- if err != nil {
- t.Fatalf("failed to attempt creation of duplicate table: %s", err.Error())
- }
- if exp, got := `[{"error":"table foo already exists"}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-
- r, err = c.Execute([]string{`INSERT INTO foo(id, name) VALUES(11, "fiona")`}, false, false)
- if err != nil {
- t.Fatalf("failed to insert record: %s", err.Error())
- }
- if exp, got := `[{"last_insert_id":11,"rows_affected":1}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
- r, err = c.Execute([]string{`INSERT INTO foo(id, name) VALUES(11, "fiona")`}, false, false)
- if err != nil {
- t.Fatalf("failed to attempt duplicate record insertion: %s", err.Error())
- }
- if exp, got := `[{"error":"UNIQUE constraint failed: foo.id"}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-
- r, err = c.Execute([]string{`utter nonsense`}, false, false)
- if err != nil {
- if exp, got := `[{"error":"near \"utter\": syntax error"}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
- }
-}
-
-func testSimpleFailingStatementsQuery(t *testing.T, c *Conn) {
- ro, err := c.Query([]string{`SELECT * FROM bar`}, false, false)
- if err != nil {
- t.Fatalf("failed to attempt query of non-existent table: %s", err.Error())
- }
- if exp, got := `[{"error":"no such table: bar"}]`, asJSON(ro); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-
- ro, err = c.Query([]string{`SELECTxx * FROM foo`}, false, false)
- if err != nil {
- t.Fatalf("failed to attempt nonsense query: %s", err.Error())
- }
- if exp, got := `[{"error":"near \"SELECTxx\": syntax error"}]`, asJSON(ro); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
- r, err := c.Query([]string{`utter nonsense`}, false, false)
- if err != nil {
- if exp, got := `[{"error":"near \"utter\": syntax error"}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
- }
-}
-
-func testSimplePragmaTableInfo(t *testing.T, c *Conn) {
- r, err := c.Execute([]string{`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`}, false, false)
- if err != nil {
- t.Fatalf("failed to create table: %s", err.Error())
- }
- if exp, got := `[{}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-
- res, err := c.Query([]string{`PRAGMA table_info("foo")`}, false, false)
- if err != nil {
- t.Fatalf("failed to query a common table expression: %s", err.Error())
- }
- if exp, got := `[{"columns":["cid","name","type","notnull","dflt_value","pk"],"types":["","","","","",""],"values":[[0,"id","INTEGER",1,null,1],[1,"name","TEXT",0,null,0]]}]`, asJSON(res); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-
-}
-
-func testCommonTableExpressions(t *testing.T, c *Conn) {
- _, err := c.Execute([]string{"CREATE TABLE test(x foo)"}, false, false)
- if err != nil {
- t.Fatalf("failed to create table: %s", err.Error())
- }
-
- _, err = c.Execute([]string{`INSERT INTO test VALUES(1)`}, false, false)
- if err != nil {
- t.Fatalf("failed to insert record: %s", err.Error())
- }
-
- r, err := c.Query([]string{`WITH bar AS (SELECT * FROM test) SELECT * FROM test WHERE x = 1`}, false, false)
- if err != nil {
- t.Fatalf("failed to query a common table expression: %s", err.Error())
- }
- if exp, got := `[{"columns":["x"],"types":["foo"],"values":[[1]]}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-
- r, err = c.Query([]string{`WITH bar AS (SELECT * FROM test) SELECT * FROM test WHERE x = 2`}, false, false)
- if err != nil {
- t.Fatalf("failed to query a common table expression: %s", err.Error())
- }
- if exp, got := `[{"columns":["x"],"types":["foo"]}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-}
-
-func testUniqueConstraints(t *testing.T, c *Conn) {
- _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT, CONSTRAINT name_unique UNIQUE (name))"}, false, false)
- if err != nil {
- t.Fatalf("failed to create table: %s", err.Error())
- }
-
- r, err := c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`}, false, false)
- if err != nil {
- t.Fatalf("error executing insertion into table: %s", err.Error())
- }
- if exp, got := `[{"last_insert_id":1,"rows_affected":1}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for INSERT\nexp: %s\ngot: %s", exp, got)
- }
-
- // UNIQUE constraint should fire.
- r, err = c.Execute([]string{`INSERT INTO foo(name) VALUES("fiona")`}, false, false)
- if err != nil {
- t.Fatalf("error executing insertion into table: %s", err.Error())
- }
- if exp, got := `[{"error":"UNIQUE constraint failed: foo.name"}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for INSERT\nexp: %s\ngot: %s", exp, got)
- }
-}
-
-func testActiveTransaction(t *testing.T, c *Conn) {
- if c.TransactionActive() {
- t.Fatal("transaction incorrectly marked as active")
- }
-
- if _, err := c.Execute([]string{`BEGIN`}, false, false); err != nil {
- t.Fatalf("error starting transaction: %s", err.Error())
- }
-
- if !c.TransactionActive() {
- t.Fatal("transaction incorrectly marked as inactive")
- }
-
- if _, err := c.Execute([]string{`COMMIT`}, false, false); err != nil {
- t.Fatalf("error starting transaction: %s", err.Error())
- }
-
- if c.TransactionActive() {
- t.Fatal("transaction incorrectly marked as active")
- }
-
- if _, err := c.Execute([]string{`BEGIN`}, false, false); err != nil {
- t.Fatalf("error starting transaction: %s", err.Error())
- }
-
- if !c.TransactionActive() {
- t.Fatal("transaction incorrectly marked as inactive")
- }
-
- if _, err := c.Execute([]string{`ROLLBACK`}, false, false); err != nil {
- t.Fatalf("error starting transaction: %s", err.Error())
- }
-
- if c.TransactionActive() {
- t.Fatal("transaction incorrectly marked as active")
- }
-}
-
-func testAbortTransaction(t *testing.T, c *Conn) {
- if err := c.AbortTransaction(); err != nil {
- t.Fatalf("error abrorting non-active transaction: %s", err.Error())
- }
-
- if _, err := c.Execute([]string{`BEGIN`}, false, false); err != nil {
- t.Fatalf("error starting transaction: %s", err.Error())
- }
-
- if !c.TransactionActive() {
- t.Fatal("transaction incorrectly marked as inactive")
- }
-
- if err := c.AbortTransaction(); err != nil {
- t.Fatalf("error abrorting non-active transaction: %s", err.Error())
- }
-
- if c.TransactionActive() {
- t.Fatal("transaction incorrectly marked as active")
- }
-}
-
-func testPartialFail(t *testing.T, c *Conn) {
- _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
- if err != nil {
- t.Fatalf("failed to create table: %s", err.Error())
- }
-
- stmts := []string{
- `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
- `INSERT INTO foo(id, name) VALUES(2, "fiona")`,
- `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
- `INSERT INTO foo(id, name) VALUES(4, "fiona")`,
- }
- r, err := c.Execute(stmts, false, false)
- if err != nil {
- t.Fatalf("failed to insert records: %s", err.Error())
- }
- if exp, got := `[{"last_insert_id":1,"rows_affected":1},{"last_insert_id":2,"rows_affected":1},{"error":"UNIQUE constraint failed: foo.id"},{"last_insert_id":4,"rows_affected":1}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
- ro, err := c.Query([]string{`SELECT * FROM foo`}, false, false)
- if err != nil {
- t.Fatalf("failed to query table: %s", err.Error())
- }
- if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"fiona"],[4,"fiona"]]}]`, asJSON(ro); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-}
-
-func testSimpleTransaction(t *testing.T, c *Conn) {
- _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
- if err != nil {
- t.Fatalf("failed to create table: %s", err.Error())
- }
-
- stmts := []string{
- `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
- `INSERT INTO foo(id, name) VALUES(2, "fiona")`,
- `INSERT INTO foo(id, name) VALUES(3, "fiona")`,
- `INSERT INTO foo(id, name) VALUES(4, "fiona")`,
- }
- r, err := c.Execute(stmts, true, false)
- if err != nil {
- t.Fatalf("failed to insert records: %s", err.Error())
- }
- if exp, got := `[{"last_insert_id":1,"rows_affected":1},{"last_insert_id":2,"rows_affected":1},{"last_insert_id":3,"rows_affected":1},{"last_insert_id":4,"rows_affected":1}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
- ro, err := c.Query([]string{`SELECT * FROM foo`}, false, false)
- if err != nil {
- t.Fatalf("failed to query table: %s", err.Error())
- }
- if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"fiona"],[3,"fiona"],[4,"fiona"]]}]`, asJSON(ro); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-}
-
-func testPartialFailTransaction(t *testing.T, c *Conn) {
- _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
- if err != nil {
- t.Fatalf("failed to create table: %s", err.Error())
- }
-
- stmts := []string{
- `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
- `INSERT INTO foo(id, name) VALUES(2, "fiona")`,
- `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
- `INSERT INTO foo(id, name) VALUES(4, "fiona")`,
- }
- r, err := c.Execute(stmts, true, false)
- if err != nil {
- t.Fatalf("failed to insert records: %s", err.Error())
- }
- if exp, got := `[{"last_insert_id":1,"rows_affected":1},{"last_insert_id":2,"rows_affected":1},{"error":"UNIQUE constraint failed: foo.id"}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
- ro, err := c.Query([]string{`SELECT * FROM foo`}, false, false)
- if err != nil {
- t.Fatalf("failed to query table: %s", err.Error())
- }
- if exp, got := `[{"columns":["id","name"],"types":["integer","text"]}]`, asJSON(ro); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-}
-
-func testForeignKeyConstraints(t *testing.T, c *Conn) {
- _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, ref INTEGER REFERENCES foo(id))"}, false, false)
- if err != nil {
- t.Fatalf("failed to create table: %s", err.Error())
- }
-
- // Explicitly disable constraints.
- if err := c.EnableFKConstraints(false); err != nil {
- t.Fatalf("failed to enable foreign key constraints: %s", err.Error())
- }
-
- // Check constraints
- fk, err := c.FKConstraints()
- if err != nil {
- t.Fatalf("failed to check FK constraints: %s", err.Error())
- }
- if fk != false {
- t.Fatal("FK constraints are not disabled")
- }
-
- stmts := []string{
- `INSERT INTO foo(id, ref) VALUES(1, 2)`,
- }
- r, err := c.Execute(stmts, false, false)
- if err != nil {
- t.Fatalf("failed to execute FK test statement: %s", err.Error())
- }
- if exp, got := `[{"last_insert_id":1,"rows_affected":1}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-
- // Explicitly enable constraints.
- if err := c.EnableFKConstraints(true); err != nil {
- t.Fatalf("failed to enable foreign key constraints: %s", err.Error())
- }
-
- // Check constraints
- fk, err = c.FKConstraints()
- if err != nil {
- t.Fatalf("failed to check FK constraints: %s", err.Error())
- }
- if fk != true {
- t.Fatal("FK constraints are not enabled")
- }
-
- stmts = []string{
- `INSERT INTO foo(id, ref) VALUES(1, 3)`,
- }
- r, err = c.Execute(stmts, false, false)
- if err != nil {
- t.Fatalf("failed to execute FK test statement: %s", err.Error())
- }
- if exp, got := `[{"error":"UNIQUE constraint failed: foo.id"}]`, asJSON(r); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-}
-
-func testBackup(t *testing.T, c *Conn) {
- _, err := c.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
- if err != nil {
- t.Fatalf("failed to create table: %s", err.Error())
- }
-
- stmts := []string{
- `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
- `INSERT INTO foo(id, name) VALUES(2, "fiona")`,
- `INSERT INTO foo(id, name) VALUES(3, "fiona")`,
- `INSERT INTO foo(id, name) VALUES(4, "fiona")`,
- }
- _, err = c.Execute(stmts, true, false)
- if err != nil {
- t.Fatalf("failed to insert records: %s", err.Error())
- }
-
- dstDB, path := mustCreateDatabase()
- defer os.Remove(path)
- dstConn, err := dstDB.Connect()
- if err != nil {
- t.Fatalf("failed to connect to destination database: %s", err.Error())
- }
-
- err = c.Backup(dstConn)
- if err != nil {
- t.Fatalf("failed to backup database: %s", err.Error())
- }
-
- ro, err := dstConn.Query([]string{`SELECT * FROM foo`}, false, false)
- if err != nil {
- t.Fatalf("failed to query table: %s", err.Error())
- }
- if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"fiona"],[3,"fiona"],[4,"fiona"]]}]`, asJSON(ro); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-}
-
-func testLoad(t *testing.T, c *Conn) {
- dstDB, path := mustCreateDatabase()
- defer os.Remove(path)
- dstConn, err := dstDB.Connect()
- if err != nil {
- t.Fatalf("failed to connect to destination database: %s", err.Error())
- }
- defer dstConn.Close()
-
- _, err = dstConn.Execute([]string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, false, false)
- if err != nil {
- t.Fatalf("failed to create table: %s", err.Error())
- }
-
- stmts := []string{
- `INSERT INTO foo(id, name) VALUES(1, "fiona")`,
- `INSERT INTO foo(id, name) VALUES(2, "fiona")`,
- `INSERT INTO foo(id, name) VALUES(3, "fiona")`,
- `INSERT INTO foo(id, name) VALUES(4, "fiona")`,
- }
- _, err = dstConn.Execute(stmts, true, false)
- if err != nil {
- t.Fatalf("failed to insert records: %s", err.Error())
- }
-
- ro, err := dstConn.Query([]string{`SELECT * FROM foo`}, false, false)
- if err != nil {
- t.Fatalf("failed to query table: %s", err.Error())
- }
- if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"fiona"],[3,"fiona"],[4,"fiona"]]}]`, asJSON(ro); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-
- err = c.Load(dstConn)
- if err != nil {
- t.Fatalf("failed to load database: %s", err.Error())
- }
-
- ro, err = c.Query([]string{`SELECT * FROM foo`}, false, false)
- if err != nil {
- t.Fatalf("failed to query table: %s", err.Error())
- }
- if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"fiona"],[3,"fiona"],[4,"fiona"]]}]`, asJSON(ro); exp != got {
- t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
- }
-}
-
-// mustExecute executes a spath, and panics on failure. Used for statements
-// that should never fail, even taking into account test setup.
-func mustExecute(c *Conn, stmt string) {
- _, err := c.Execute([]string{stmt}, false, false)
- if err != nil {
- panic(fmt.Sprintf("failed to execute statement: %s", err.Error()))
- }
-}
-
-// mustQuery executes a statement, and panics on failure. Used for statements
-// that should never fail, even taking into account test setup.
-func mustQuery(c *Conn, stmt string) {
- _, err := c.Query([]string{stmt}, false, false)
- if err != nil {
- panic(fmt.Sprintf("failed to query: %s", err.Error()))
- }
-}
-
-func mustCreateDatabase() (*DB, string) {
- path := mustTempFilename()
- db, err := New(mustTempFilename(), "", false)
- if err != nil {
- panic("failed to create in-memory database")
- }
- return db, path
-}
-
-func mustCreateInMemoryDatabase() *DB {
- db, err := New(mustTempFilename(), "", true)
- if err != nil {
- panic("failed to create in-memory database")
- }
- return db
-}
-
-func mustTempFilename() string {
- fd, err := ioutil.TempFile("", "rqlilte-tmp-test-")
- if err != nil {
- panic(err.Error())
- }
- if err := fd.Close(); err != nil {
- panic(err.Error())
- }
- if err := os.Remove(fd.Name()); err != nil {
- panic(err.Error())
- }
- return fd.Name()
-}
-
-func asJSON(v interface{}) string {
- b, err := json.Marshal(v)
- if err != nil {
- panic("failed to JSON marshal value")
- }
- return string(b)
-}
diff --git a/dbself.go b/dbself.go
deleted file mode 100644
index 47fe633..0000000
--- a/dbself.go
+++ /dev/null
@@ -1,237 +0,0 @@
-package syncdb
-
-import (
- "errors"
- "fmt"
- "os"
- "os/exec"
- "path/filepath"
- "strings"
- "sync"
- "github.com/jinzhu/gorm"
- "basic.com/valib/logger.git"
-)
-
-const (
- PersonSqliteDBPath = "~/workspace/gitblit/dbserver/config/testdb.db"
- DbT_TableName = "dbtables"
- DBP_TableName = "dbtablepersons"
-)
-
-var syncMut sync.Mutex
-var SerfDbConn *Conn
-
-// get Conn of db for do execute.
-func InitDbConn(dbPath string) error {
-
- if dbPath == "" {
- dbPath = PersonSqliteDBPath
- }
-
- logger.Info("self: ========>", dbPath)
- db, err := New(dbPath, "", false)
- if err != nil {
- logger.Error("new db database: ", err)
- return err
- }
- dbConn, err := db.Connect()
- if err != nil {
- logger.Error("new db conn error; ", err)
- return err
- }
-
- SerfDbConn = dbConn
- return nil
-}
-
-//bak dbdata.
-func BakDbFile() (string, error) {
-
- path, err := getCurrentPath()
- if err != nil {
- logger.Error("getCurrentPath error; ", err)
- return "", err
- }
-
- filepath := path + "tmp.db"
- logger.Info("filepath:", filepath)
- db, err := New(filepath, "", false)
- if err != nil {
- logger.Error("new db database: ", err)
- return "", err
- }
-
- tmpconn, err := db.Connect()
- if err != nil {
- logger.Error("new db conn error; ", err)
- return "", err
- }
- defer tmpconn.Close()
-
- err = SerfDbConn.Backup(tmpconn)
- if err != nil {
- return "", err
- }
- return filepath, nil
-}
-
-// do exet when get querystring.
-func ExecuteWriteSql(sqlString []string) ([]*Result, error) {
- syncMut.Lock()
- defer syncMut.Unlock()
- allResults, err := SerfDbConn.Execute(sqlString, false, false)
- if err != nil {
- logger.Error("execute error!", err)
- return nil, err
- }
- return allResults, nil
-}
-
-var localDb *gorm.DB
-func InitLocalDb(db *gorm.DB) {
- localDb = db
-}
-
-// do exet when get querystring.
-func ExecuteQuerySql(sqlString []string) ([]*Rows, error) {
- syncMut.Lock()
- defer syncMut.Unlock()
- rows, err := SerfDbConn.Query(sqlString, false, false)
- if err != nil {
- logger.Error("execute error!", err)
- return nil, err
- }
- return rows, nil
-}
-
-func ExecuteSqlByGorm(sqls []string) (bool,error) {
- if localDb != nil {
- localDb.LogMode(false)
- defer localDb.LogMode(true)
- var err error
- tx := localDb.Begin()
- defer func() {
- if err !=nil && tx !=nil {
- tx.Rollback()
- }
- }()
- for _,sql :=range sqls {
- result := tx.Exec(sql)
- err = result.Error
- if err !=nil {
- logger.Error("ExecuteSqlByGorm err:",err,",sql:",sql)
- return false,err
- }
- if result.RowsAffected == 0 {
- logger.Debug("ExecuteSqlByGorm RowsAffected == 0",",sql:",sql)
- err = errors.New("ExecuteSqlByGorm RowsAffected == 0")
- return false,err
- }
- }
- tx.Commit()
- return true,nil
- }
- return false,errors.New("localDb is nil")
-}
-
-type TableDesc struct {
- Cid int `json:"cid"`
- Name string `json:"name"`
- Type string `json:"type"`
- Notnull bool `json:"notnull"`
- DFltValue interface{} `json:"dflt_value"`
- Pk int `json:"pk"`
-}
-
-type DumpSql struct {
- Sql string `json:"sql"`
-}
-
-func ExecuteQueryByGorm(tableNames []string) ([]string, error) {
- localDb.LogMode(false)
- defer localDb.LogMode(true)
- if tableNames !=nil {
- var arr []string
- var dumpSql []DumpSql
- for _,table :=range tableNames {
- logger.Info("dump current tableName:", table)
- dumpSql = make([]DumpSql, 0)
- var tDescArr []TableDesc
- tSql := fmt.Sprintf(`PRAGMA table_info("%s")`, table)
- err := localDb.Raw(tSql).Scan(&tDescArr).Error
- logger.Debug("tDescArr err:", err, "len(tDescArr)=", len(tDescArr))
- if err !=nil {
- return nil,errors.New("tableDesc err")
- }
- logger.Info(table,"'Columns is:",tDescArr)
- if tDescArr == nil || len(tDescArr) == 0 {
- return nil,errors.New(table+" has no column")
- }
- var columnNames []string
- for _,col :=range tDescArr {
- columnNames = append(columnNames, fmt.Sprintf(`'||quote("%s")||'`, col.Name))
- }
- if table == DbT_TableName {
- tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s" where (analyServerId='' or analyServerId is NULL);`,
- table,
- strings.Join(columnNames, ","),
- table)
- } else if table == DBP_TableName {
- tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s" where tableId in (select id from dbTables where (analyServerId='' or analyServerId is NULL));`,
- table,
- strings.Join(columnNames, ","),
- table)
- } else {
- tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s";`,
- table,
- strings.Join(columnNames, ","),
- table)
- }
-
- logger.Info("tSql:",tSql)
-
- err = localDb.Raw(tSql).Scan(&dumpSql).Error
- logger.Debug("dump err:", err)
- if err !=nil {
- return nil,errors.New("dump err")
- }
- if len(dumpSql)>0 {
- for _,d :=range dumpSql {
- arr = append(arr, d.Sql)
- }
- }
-
- }
- return arr,nil
- }
- return nil,errors.New("tableNames is nil")
-}
-
-func Dumpdb() {
-
- var b strings.Builder
- if err := SerfDbConn.Dump(&b); err != nil {
- logger.Error("dump file ", err.Error())
- }
- logger.Info("%T\n", b)
-}
-
-// get current path
-func getCurrentPath() (string, error) {
- file, err := exec.LookPath(os.Args[0])
- if err != nil {
- return "", err
- }
- path, err := filepath.Abs(file)
- if err != nil {
- return "", err
- }
- i := strings.LastIndex(path, "/")
- if i < 0 {
- i = strings.LastIndex(path, "\\")
- }
- if i < 0 {
- return "", errors.New(`error: Can't find "/" or "\".`)
- }
- return string(path[0 : i+1]), nil
-}
diff --git a/transport.go b/transport.go
deleted file mode 100644
index e14ea10..0000000
--- a/transport.go
+++ /dev/null
@@ -1,83 +0,0 @@
-package syncdb
-
-import (
- "basic.com/valib/logger.git"
- "bufio"
- "bytes"
- "encoding/binary"
- "io"
- "net"
- "strconv"
-)
-
-func rawSendTcpMsg(addr string, sendBuf []byte) error {
- conn, err := net.Dial("tcp", addr)
- if err != nil {
- logger.Debug("net.Dialt err", err)
- return err
- }
-
- defer conn.Close()
-
- //鍙戦��
- sizeBuf := make([]byte,4)
- var buf bytes.Buffer
- binary.BigEndian.PutUint32(sizeBuf,uint32(len(sendBuf)))
- buf.Write(sizeBuf)
- buf.Write(sendBuf)
- _, err = conn.Write(buf.Bytes())
- if err != nil {
- logger.Debug("conn.Write err", err)
- return err
- } else {
- logger.Debug("raw send success")
- return nil
- }
-}
-
-func RawReceiveTcpMsg() {
-
- tcpAddr := "0.0.0.0:"+strconv.Itoa(TcpTransportPort)
- listener,err := net.Listen("tcp",tcpAddr)
- if err != nil {
- logger.Debug("RawReceive server listen err:",err)
- return
- }
- defer listener.Close()
-
- for{
- conn,err := listener.Accept()
- if err!=nil {
- logger.Debug("listener.Accept err:", err)
- continue
- }
- logger.Debug("A transport client connected :" +conn.RemoteAddr().String())
- go readStream(conn)
- }
-}
-
-func readStream(conn net.Conn) {
- defer conn.Close()
-
- var data []byte
- var reader io.Reader = bufio.NewReader(conn)
- sizeBuf :=make([]byte,4)
- if _,err := reader.Read(sizeBuf[:]);err !=nil {
- logger.Debug("read tcpStream msg length err:",err)
- } else {
- var msgLen uint32
- binary.Read(bytes.NewBuffer(sizeBuf),binary.BigEndian,&msgLen)
- dataLen := int(msgLen)
- logger.Debug("read tcpStream msg lenth:",dataLen)
- if dataLen >0 {
- data =make([]byte, dataLen)
- n,err := io.ReadAtLeast(reader, data, dataLen)
- if err ==nil {
- logger.Debug("io.ReadAtLeast n:",n)
- } else {
- logger.Debug("io.readAtLeast err:",err)
- }
- }
- }
- QueryTcpResponseChan <- data
-}
\ No newline at end of file
--
Gitblit v1.8.0