(Part 2.2) Golang Framework Hands-on — KisFlow Streaming Computing Framework — Project Construction / Basic Modules

Aceld
8 min readOct 22, 2024

--

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki

First, we need to define the core structure of KisFlow, the KisFlow struct. Based on the design philosophy described above, we understand that KisFlow represents the structure of an entire data computing stream. In this structure, each piece of data in a flow is processed sequentially by the functions attached to that flow.

2.2.1 KisFunction Family

KisFunction should be a chain of calls, so the basic form of the struct should be a linked list, where after the execution of one function, it can automatically schedule to the next function node. In KisFlow, there are various types of functions such as save, load, calculate, extend(sink), and verify. Therefore, we adopt the template classes for these five types of functions, making it more flexible and facilitating the isolation and modification of features for different types of functions in the future.

The overall class diagram design for KisFunction is as follows:

2.2.2 Definition of Abstraction Layer KisFunction

Create a new directory function in kis-flow to store the function code.
First, write the abstract interface in the kis/ directory.

kis-flow/kis/function.go

package kis

import (
"context"
"kis-flow/config"
)

// Function is the basic calculation module for streaming computing.
// KisFunction is a fundamental logic unit for streaming computation,
// and any number of KisFunctions can be combined into a KisFlow.
type Function interface {
// Call executes the streaming computation logic.
Call(ctx context.Context, flow Flow) error

// SetConfig configures the current Function instance.
SetConfig(s *config.KisFuncConfig) error

// GetConfig gets the configuration policy of the current Function instance.
GetConfig() *config.KisFuncConfig

// SetFlow sets the Flow instance that the current Function instance depends on.
SetFlow(f Flow) error

// GetFlow gets the Flow instance that the current Function instance depends on.
GetFlow() Flow

// CreateId generates a random instance KisID for the current Function instance.
CreateId()

// GetId gets the FID of the current Function.
GetId() string

// GetPrevId gets the FID of the previous Function node on the current Function.
GetPrevId() string

// GetNextId gets the FID of the next Function node on the current Function.
GetNextId() string

// Next returns the next layer of calculation flow Function.
// If the current layer is the last layer, it returns nil.
Next() Function

// Prev returns the previous layer of calculation flow Function.
// If the current layer is the last layer, it returns nil.
Prev() Function

// SetN sets the next Function instance.
SetN(f Function)

// SetP sets the previous Function instance.
SetP(f Function)
}

2.2.3 KisId: Random Unique Instance ID

The new concept of KisId has been introduced. KisID serves as the instance ID of Function, used within KisFlow to distinguish different instance objects. The difference between KisId and Fid in Function Config is that Fid describes the ID of a type of Function strategy, while KisId is the ID of the instantiated Function object in KisFlow, which is randomly generated and unique.

Create a kis-flow/id/ directory and create a kis_id.go file to implement the algorithm for generating kis_id.

kis-flow/id/kis_id.go

package id

import (
"github.com/google/uuid"
"kis-flow/common"
"strings"
)

// KisID generates a random instance ID.
// The format is "prefix1-[prefix2-][prefix3-]ID"
// Examples:
// flow-1234567890
// func-1234567890
// conn-1234567890
// func-1-1234567890
func KisID(prefix ...string) (kisId string) {
idStr := strings.Replace(uuid.New().String(), "-", "", -1)
kisId = formatKisID(idStr, prefix...)
return
}

func formatKisID(idStr string, prefix ...string) string {
var kisId string
for _, fix := range prefix {
kisId += fix
kisId += common.KisIdJoinChar
}
kisId += idStr
return kisId
}

The kisId module provides the KisID() method, which depends on the third-party distributed ID generation library github.com/google/uuid. The generated random ID is a string, and the caller can provide multiple prefixes, which are concatenated by the - symbol to get the random string ID, e.g., func-1234567890.

For the prefixes of KisId, some string enumerations are provided as follows:

kis-flow/common/const.go

// KisIdType is used to generate the KisId string prefix
const (
KisIdTypeFlow = "flow"
KisIdTypeConnnector = "conn"
KisIdTypeFunction = "func"
KisIdTypeGlobal = "global"
KisIdJoinChar = "-"
)

2.2.4 BaseFunction: Basic Parent Class

According to the design, we need to provide a BaseFunction as a subclass of Function to implement some basic functional interfaces. The Call() method is left empty for concrete types of KisFunctionX to override the implementation. Let's define the BaseFunction structure below.

A. Structure Definition

kis-flow/function/kis_base_function.go

package function

import (
"context"
"errors"
"kis-flow/common"
"kis-flow/config"
"kis-flow/id"
"kis-flow/kis"
)

type BaseFunction struct {
// Id is the instance ID of KisFunction, used within KisFlow to distinguish different instance objects
Id string
Config *config.KisFuncConfig
// flow
Flow kis.Flow // Contextual KisFlow
// link
N kis.Function // Next streaming computation Function
P kis.Function // Previous streaming computation Function
}

B. Method Implementation

kis-flow/function/kis_base_function.go

// Call
// BaseFunction is an empty implementation, aiming to allow other concrete types of KisFunctions, such as KisFunction_V, to inherit BaseFunction and override this method.
func (base *BaseFunction) Call(ctx context.Context, flow kis.Flow) error { return nil }

func (base *BaseFunction) Next() kis.Function {
return base.N
}

func (base *BaseFunction) Prev() kis.Function {
return base.P
}

func (base *BaseFunction) SetN(f kis.Function) {
base.N = f
}

func (base *BaseFunction) SetP(f kis.Function) {
base.P = f
}

func (base *BaseFunction) SetConfig(s *config.KisFuncConfig) error {
if s == nil {
return errors.New("KisFuncConfig is nil")
}
base.Config = s
return nil
}

func (base *BaseFunction) GetId() string {
return base.Id
}

func (base *BaseFunction) GetPrevId() string {
if base.P == nil {
// Function is the first node
return common.FunctionIdFirstVirtual
}
return base.P.GetId()
}

func (base *BaseFunction) GetNextId() string {
if base.N == nil {
// Function is the last node
return common.FunctionIdLastVirtual
}
return base.N.GetId()
}

func (base *BaseFunction) GetConfig() *config.KisFuncConfig {
return base.Config
}

func (base *BaseFunction) SetFlow(f kis.Flow) error {
if f == nil {
return errors.New("KisFlow is nil")
}
base.Flow = f
return nil
}

func (base *BaseFunction) GetFlow() kis.Flow {
return base.Flow
}

func (base *BaseFunction) CreateId() {
base.Id = id.KisID(common.KisIdTypeFunction)
}

Note the implementation of GetPrevId() and GetNextId() methods. If the current Function is the first or last node in the bidirectional linked list, their previous or next node does not exist, so the ID also does not exist. To prevent situations where the ID cannot be obtained during use, we provide two virtual FIDs for special case boundary processing, defined in const.go.

kis-flow/common/const.go

const (
// FunctionIdFirstVirtual is the previous virtual Function ID for the first node
FunctionIdFirstVirtual = "FunctionIdFirstVirtual"
// FunctionIdLastVirtual is the next virtual Function ID for the last node
FunctionIdLastVirtual = "FunctionIdLastVirtual"
)

2.2.5 Definition of KisFunction V/S/L/C/E Mode Classes

Next, we’ll implement subclasses of KisFunction for the five different modes: V, S, L, C, and E. We’ll use separate files to implement each.

A. KisFunctionV

kis-flow/function/kis_function_v.go

package function

import (
"context"
"fmt"
"kis-flow/kis"
)

type KisFunctionV struct {
BaseFunction
}

func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error {
fmt.Printf("KisFunctionV, flow = %+v\n", flow)
// TODO: Invoke the specific function execution method
return nil
}

B. KisFunctionS

kis-flow/function/kis_function_s.go

package function

import (
"context"
"fmt"
"kis-flow/kis"
)

type KisFunctionS struct {
BaseFunction
}

func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error {
fmt.Printf("KisFunctionS, flow = %+v\n", flow)
// TODO: Invoke the specific function execution method
return nil
}

C. KisFunctionL

kis-flow/function/kis_function_l.go

package function

import (
"context"
"fmt"
"kis-flow/kis"
)

type KisFunctionL struct {
BaseFunction
}

func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error {
fmt.Printf("KisFunctionL, flow = %+v\n", flow)
// TODO: Invoke the specific function execution method
return nil
}

D. KisFunctionC

kis-flow/function/kis_function_c.go

package function

import (
"context"
"fmt"
"kis-flow/kis"
)

type KisFunctionC struct {
BaseFunction
}

func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {
fmt.Printf("KisFunction_C, flow = %+v\n", flow)
// TODO: Invoke the specific function execution method
return nil
}

E. KisFunctionE

kis-flow/function/kis_function_e.go

package function

import (
"context"
"fmt"
"kis-flow/kis"
)

type KisFunctionE struct {
BaseFunction
}

func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {
fmt.Printf("KisFunctionE, flow = %+v\n", flow)
// TODO: Invoke the specific function execution method
return nil
}

2.2.6 Creating KisFunction Instances

Here, we provide a method to create specific Function modes using the simple factory method pattern.

kis-flow/function/kis_base_function.go

func (base *BaseFunction) CreateId() {
base.Id = id.KisID(common.KisIdTypeFunction)
}

// NewKisFunction creates an NsFunction
// flow: the current associated flow instance
// s : the configuration strategy for the current function
func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {
var f kis.Function
// Factory produces generic objects
switch common.KisMode(config.FMode) {
case common.V:
f = new(KisFunctionV)
break
case common.S:
f = new(KisFunctionS)
case common.L:
f = new(KisFunctionL)
case common.C:
f = new(KisFunctionC)
case common.E:
f = new(KisFunctionE)
default:
//LOG ERROR
return nil
}

// Generate a random instance unique ID
f.CreateId()

// Set basic information properties
if err := f.SetConfig(config); err != nil {
panic(err)
}

if err := f.SetFlow(flow); err != nil {
panic(err)
}
return f
}

Note that the NewKisFunction() method returns an abstract interface Function.

Also, note that currently, we have not implemented the Flow object yet. However, creating a KisFunciton requires passing a Flow object. For now, we can temporarily create a constructor for a Flow object, and we will refine this part of the code in the Flow section later on.
Create a flow.go file in kis-flow/kis/.

kis-flow/kis/flow.go

package kis

import (
"context"
"kis-flow/config"
)

type Flow interface {
// TODO
}

Create a kis_flow.go file under kis-flow/flow/ with the following:

kis-flow/flow/kis_flow.go

package flow

import "kis-flow/config"

// KisFlow is used to traverse the entire streaming computation context
type KisFlow struct {
Id string
Name string
// TODO
}

// TODO for test
// NewKisFlow creates a KisFlow.
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
flow := new(KisFlow)
// Basic information
flow.Id = id.KisID(common.KisIdTypeFlow)
flow.Name = conf.FlowName
return flow
}

2.2.7 Unit Testing for KisFunction Creation Instances

Now, let’s create a simple unit test for the KisFunction instance creation. Create a kis_function_test.go file in kis-flow/test/.

kis-flow/test/kis_function_test.go

package test

import (
"context"
"kis-flow/common"
"kis-flow/config"
"kis-flow/flow"
"kis-flow/function"
"testing"
)

func TestNewKisFunction(t *testing.T) {
ctx := context.Background()

// 1. Create a KisFunction configuration instance
source := config.KisSource{
Name: "Public Account TikTok Store User Order Data",
Must: []string{"order_id", "user_id"},
}

myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source, nil)
if myFuncConfig1 == nil {
panic("myFuncConfig1 is nil")
}

// 2. Create a KisFlow configuration instance
myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)

// 3. Create a KisFlow object
flow1 := flow.NewKisFlow(myFlowConfig1)

// 4. Create a KisFunction object
func1 := function.NewKisFunction(flow1, myFuncConfig1)
if err := func1.Call(ctx, flow1); err != nil {
t.Errorf("func1.Call() error = %v", err)
}
}

The process is simple and divided into four small steps:

  • Create a KisFunction configuration instance
  • Create a KisFlow configuration instance
  • Create a KisFlow object
  • Create a KisFunction object Navigate to the kis-flow/test/ directory and execute:
go test -test.v -test.paniconexit0 -test.run TestNewKisFunction

The result is as follows:

=== RUN   TestNewKisFunction
KisFunctionC, flow = &{Id:flow-866de5abc8134fc9bb8e5248a3ce7137 Name:flowName1 Conf:0xc00014e780 Funcs:map[] FlowHead:<nil> FlowTail:<nil> flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:<nil> ThisFunctionId: PrevFunctionId: funcParams:map[] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}

--- PASS: TestNewKisFunction (0.00s)
PASS
ok kis-flow/test 1.005s

We have successfully called the Call() method of the specific KisFunction_C instance.

2.5 [V0.1] Source Code

https://github.com/aceld/kis-flow/releases/tag/v0.1

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki

[KisFlow-Golang Framework Hands-on]

--

--

No responses yet