chenshijun
2019-09-04 aa3f73eef78b96c8f1a53336c5d6459098907a6b
agent.go
@@ -14,7 +14,7 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package main
package syncdb
import (
   "context"
@@ -117,14 +117,17 @@
   switch ev := event.(type) {
   case serf.UserEvent:
      var sqlUe SqlUserEvent
      fmt.Println(string(ev.Payload))
      var tmpstringslice []string
      tmpstringslice = append(tmpstringslice, string(ev.Payload))
      fmt.Println(tmpstringslice)
      results, err := ExecuteWriteSql(tmpstringslice)
      for _, result := range results {
         fmt.Println(result, "results err: ", err)
      err := json.Unmarshal(ev.Payload, &sqlUe)
      if err !=nil {
         fmt.Println("sqlUe unmarshal err:",err)
         return
      }
      if sqlUe.Owner != a.conf.NodeName {
         results, _ := ExecuteWriteSql(sqlUe.Sql)
         //flag, _ := ExecuteSqlByGorm(sqlUe.Sql)
         fmt.Println("userEvent exec ",sqlUe.Sql,",Result:",results)
      }
   case *serf.Query:
@@ -185,6 +188,19 @@
         //var res []*Rows
         //json.Unmarshal(bytesReturn, &res)
      }
      case serf.MemberEvent:
         if event.EventType() == serf.EventMemberLeave {
            if ev.Members !=nil && len(ev.Members) ==1 {
               leaveMember := ev.Members[0]
               leaveSql := "delete from cluster_node where node_id='"+leaveMember.Name+"'"
               fmt.Println("leaveSql:",leaveSql)
               //ExecuteSqlByGorm([]string{ leaveSql })
               fmt.Println("EventMemberLeave,current Members:",ev.Members)
            }
            return
         }
   default:
      fmt.Printf("Unknown event type: %s\n", ev.EventType().String())
@@ -405,10 +421,23 @@
   }()
}
type SqlUserEvent struct {
   Owner string `json:"owner"`
   Sql []string `json:"sql"`
}
//SyncSql boardcast sql to cluster
func (a *Agent) SyncSql(sqlOp string) {
func (a *Agent) SyncSql(sqlOp []string) {
   // event : use to send command to operate db.
   err := a.UserEvent("SyncSql", []byte(sqlOp), false)
   var sqlUe = SqlUserEvent{
      Owner: a.conf.NodeName,
      Sql: sqlOp,
   }
   ueB, err := json.Marshal(sqlUe)
   if err !=nil {
      fmt.Println("sqlUE marshal err:",err)
      return
   }
   err = a.UserEvent("SyncSql", []byte(ueB), false)
   if err == nil || !strings.Contains(err.Error(), "cannot contain") {
      fmt.Println("err: ", err)
   }
@@ -480,8 +509,8 @@
   n, err := a.Agent.Join(nodes, true)
   if err != nil || n == 0 {
      a.Stop()
      fmt.Println("Stop node")
      //a.Stop()
      //fmt.Println("Stop node")
      return fmt.Errorf("Error Encrypt Key!")
   }