package common import ( "context" "time" "basic.com/pubsub/protomsg.git" "basic.com/libgowrapper/sdkstruct.git" ) ///////////////////////////////////////////////////////////////// // FlowCreate create flow func FlowCreate(ctx context.Context, id string, shm bool, ipc2Rule string, ruleCacheSize int, fn func(...interface{})) (<-chan []byte, chan<- sdkstruct.MsgSDK) { const ( postPull = `_1` postPush = `_2` ) ipcRcv := GetIpcAddress(shm, id+postPull) ipcSnd := GetIpcAddress(shm, id+postPush) chRcv := make(chan []byte, 3) chSnd := make(chan sdkstruct.MsgSDK, 3) rcver := NewReciever(ipcRcv, chRcv, shm, fn) snder := NewSender(ipcSnd, chSnd, shm, fn) torule := NewToRule(ipc2Rule, ruleCacheSize, fn) snder.ApplyCallbackFunc(torule.Push) go rcver.Run(ctx) go snder.Run(ctx) go torule.Run(ctx) return chRcv, chSnd } // WorkFlowSimple work func WorkFlowSimple(ctx context.Context, out chan<- sdkstruct.MsgSDK, typ string, fnConsume func() []interface{}, fnRun func([]protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string), fn func(...interface{})) { tm := time.Now() sc := 0 for { select { case <-ctx.Done(): return default: elems := fnConsume() if elems == nil || len(elems) == 0 { time.Sleep(10 * time.Millisecond) continue } var msgs []protomsg.SdkMessage for _, v := range elems { msgs = append(msgs, v.(protomsg.SdkMessage)) } fnRun(msgs, out, typ) sc++ if sc == 25 { fn(typ, " RUN 25 FRAME USE TIME: ", time.Since(tm)) sc = 0 tm = time.Now() } if time.Since(tm) > time.Second { fn(typ, " RUN ", sc, " FRAME USE TIME: ", time.Since(tm)) sc = 0 tm = time.Now() } } } } // FlowSimple wrap func FlowSimple(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK, typ string, fnProduce func(interface{}), fnConsume func() []interface{}, fnRun func(protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string), fnClose func(), fn func(...interface{})) { cb := func(msgs []protomsg.SdkMessage, ch chan<- sdkstruct.MsgSDK, typ string) { fnRun(msgs[0], ch, typ) } FlowBatch(ctx, in, out, typ, fnProduce, fnConsume, cb, fnClose, fn) } // FlowBatch batch func FlowBatch(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK, typ string, fnProduce func(interface{}), fnConsume func() []interface{}, fnRun func([]protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string), fnClose func(), fn func(...interface{})) { chMsg := make(chan protomsg.SdkMessage, 3) go UnserilizeProto(ctx, in, chMsg, fn) go WorkFlowSimple(ctx, out, typ, fnConsume, fnRun, fn) for { select { case <-ctx.Done(): fnClose() return case rMsg := <-chMsg: if !ValidRemoteMessage(rMsg, typ, fn) { fn(typ, " validremotemessage invalid") EjectResult(nil, rMsg, out) continue } fnProduce(rMsg) default: time.Sleep(10 * time.Millisecond) } } }