package common
|
|
import (
|
"context"
|
"time"
|
|
"basic.com/pubsub/protomsg.git"
|
|
"basic.com/libgowrapper/sdkstruct.git"
|
)
|
|
/////////////////////////////////////////////////////////////////
|
|
// FlowCreate create flow
|
func FlowCreate(ctx context.Context, id string, shm bool, ipc2Rule string, ruleCacheSize int, fn func(...interface{})) (<-chan []byte, chan<- sdkstruct.MsgSDK) {
|
|
const (
|
postPull = `_1`
|
postPush = `_2`
|
)
|
ipcRcv := GetIpcAddress(shm, id+postPull)
|
ipcSnd := GetIpcAddress(shm, id+postPush)
|
chRcv := make(chan []byte, 3)
|
chSnd := make(chan sdkstruct.MsgSDK, 3)
|
|
rcver := NewReciever(ipcRcv, chRcv, shm, fn)
|
snder := NewSender(ipcSnd, chSnd, shm, fn)
|
torule := NewToRule(ipc2Rule, ruleCacheSize, fn)
|
|
snder.ApplyCallbackFunc(torule.Push)
|
|
go rcver.Run(ctx)
|
go snder.Run(ctx)
|
go torule.Run(ctx)
|
|
return chRcv, chSnd
|
}
|
|
// WorkFlowSimple work
|
func WorkFlowSimple(ctx context.Context, out chan<- sdkstruct.MsgSDK, typ string,
|
fnConsume func() []interface{}, fnRun func([]protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string),
|
fn func(...interface{})) {
|
|
tm := time.Now()
|
sc := 0
|
|
for {
|
select {
|
case <-ctx.Done():
|
return
|
default:
|
|
elems := fnConsume()
|
if elems == nil || len(elems) == 0 {
|
time.Sleep(10 * time.Millisecond)
|
continue
|
}
|
|
var msgs []protomsg.SdkMessage
|
for _, v := range elems {
|
msgs = append(msgs, v.(protomsg.SdkMessage))
|
}
|
|
fnRun(msgs, out, typ)
|
|
sc++
|
if sc == 25 {
|
fn(typ, " RUN 25 FRAME USE TIME: ", time.Since(tm))
|
sc = 0
|
tm = time.Now()
|
}
|
if time.Since(tm) > time.Second {
|
fn(typ, " RUN ", sc, " FRAME USE TIME: ", time.Since(tm))
|
sc = 0
|
tm = time.Now()
|
}
|
}
|
}
|
}
|
|
// FlowSimple wrap
|
func FlowSimple(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK, typ string,
|
fnProduce func(interface{}), fnConsume func() []interface{},
|
fnRun func(protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string),
|
fnClose func(), fn func(...interface{})) {
|
|
cb := func(msgs []protomsg.SdkMessage, ch chan<- sdkstruct.MsgSDK, typ string) {
|
fnRun(msgs[0], ch, typ)
|
}
|
|
FlowBatch(ctx, in, out, typ, fnProduce, fnConsume, cb, fnClose, fn)
|
|
}
|
|
// FlowBatch batch
|
func FlowBatch(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK, typ string,
|
fnProduce func(interface{}), fnConsume func() []interface{},
|
fnRun func([]protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string),
|
fnClose func(), fn func(...interface{})) {
|
|
chMsg := make(chan protomsg.SdkMessage, 3)
|
go UnserilizeProto(ctx, in, chMsg, fn)
|
|
go WorkFlowSimple(ctx, out, typ, fnConsume, fnRun, fn)
|
|
for {
|
select {
|
case <-ctx.Done():
|
fnClose()
|
return
|
case rMsg := <-chMsg:
|
if !ValidRemoteMessage(rMsg, typ, fn) {
|
fn(typ, " validremotemessage invalid")
|
EjectResult(nil, rMsg, out)
|
continue
|
}
|
fnProduce(rMsg)
|
|
default:
|
time.Sleep(10 * time.Millisecond)
|
}
|
}
|
|
}
|