(Part 2.1) Golang Framework Hands-on — KisFlow Streaming Computing Framework — Project Construction / Basic Modules

Aceld
11 min readOct 21, 2024

--

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Twitter: https://twitter.com/Aceld_

[KisFlow-Golang Framework Hands-on]

<(Part 1) Overview>
<(Part 2.1) Project Construction / Basic Modules>

[V0.1]-Project Construction and Basic Module Definition

First, let’s create our project. The main directory of the project is called KisFlow, and the corresponding repository is created on Github: https://github.com/aceld/kis-flow. Then clone the project code to your local machine.

2.0 Project Construction

(If you are developing according to this tutorial, you need to create a new project in your repository and clone it locally for development.)

2.0.1 Create Project Directory

Next, we will create the necessary file directories for the project. The directory structure of the project is as follows:

kis-flow /
.
├── LICENSE
├── README.md
├── common/
├── example/
├── function/
├── conn/
├── config/
├── flow/
└── kis/

Here, we create six folders:

  • common/: To store some common basic constants, enumeration parameters, and some utility methods.
  • flow/: To store the core code of KisFlow.
  • function/: To store the core code of KisFunction.
  • conn/: To store the core code of KisConnector.
  • config/: To store configuration information for flow, function, connector, etc.
  • example/: To store some test cases and unit test cases for KisFlow to verify the project effect in a timely manner.
  • kis/: To store the abstraction layer of all modules.

2.0.2 Create go.mod

cd to the root directory of the kis-flow project and execute the following command:

go mod init kis-flow

You will get the go.mod file, which is the package management file for the current project:

module kis-flow
go 1.18

First, because there will be a lot of debugging logs to print later, we integrate the log module. KisFlow provides a default standard output Logger object, and opens a SetLogger() method for developers to reset their own Logger module.

2.1 KisLogger

2.1.1 Logger Abstract Interface

Define Logger in the kis-flow/log/ directory and create the kis_log.go file:

kis-flow/log/kis_log.go

package log

import "context"
type KisLogger interface {
// InfoFX Info level log interface with context, formatted string
InfoFX(ctx context.Context, str string, v ...interface{})
// ErrorFX Error level log interface with context, formatted string
ErrorFX(ctx context.Context, str string, v ...interface{})
// DebugFX Debug level log interface with context, formatted string
DebugFX(ctx context.Context, str string, v ...interface{})
// InfoF Info level log interface without context, formatted string
InfoF(str string, v ...interface{})
// ErrorF Error level log interface without context, formatted string
ErrorF(str string, v ...interface{})
// DebugF Debug level log interface without context, formatted string
DebugF(str string, v ...interface{})
}
// kisLog Default KisLog object
var kisLog KisLogger
// SetLogger Set KisLog object, can be a user-defined Logger object
func SetLogger(newlog KisLogger) {
kisLog = newlog
}
// Logger Get the kisLog object
func Logger() KisLogger {
return kisLog
}

KisLogger provides three levels of logs: Info, Error, and Debug. It also provides two sets of log interfaces with and without context parameters.

Provide a global object kisLog, the default KisLog object. And methods SetLogger() and Logger() are provided for developers to set their own Logger object and get the Logger object.

2.1.2 Default Log Object KisDefaultLogger

If the developer does not define a custom log object, KisFlow will provide a default log object kisDefaultLogger. This class implements all the interfaces of KisLogger, and all logs are printed in the default standard output format. It is defined in the kis-flow/log/ directory, create the kis_default_log.go file.

kis-flow/log/kis_default_log.go

package log

import (
"context"
"fmt"
)
// kisDefaultLog Default provided log object
type kisDefaultLog struct{}

func (log *kisDefaultLog) InfoF(str string, v ...interface{}) {
fmt.Printf(str, v...)
}

func (log *kisDefaultLog) ErrorF(str string, v ...interface{}) {
fmt.Printf(str, v...)
}

func (log *kisDefaultLog) DebugF(str string, v ...interface{}) {
fmt.Printf(str, v...)
}

func (log *kisDefaultLog) InfoFX(ctx context.Context, str string, v ...interface{}) {
fmt.Println(ctx)
fmt.Printf(str, v...)
}

func (log *kisDefaultLog) ErrorFX(ctx context.Context, str string, v ...interface{}) {
fmt.Println(ctx)
fmt.Printf(str, v...)
}

func (log *kisDefaultLog) DebugFX(ctx context.Context, str string, v ...interface{}) {
fmt.Println(ctx)
fmt.Printf(str, v...)
}

func init() {
// If Logger is not set, use kisDefaultLog object by default at startup
if Logger() == nil {
SetLogger(&kisDefaultLog{})
}
}

Here, in the init() initialization method, it checks whether a global Logger object has been set. If not, KisFlow will default to using kisDefaultLog as the global Logger logging object.

2.1.3 Unit Testing KisLogger

For now, we won’t focus too much on the development of the KisLogger methods. Instead, we'll prioritize getting the existing program up and running and conduct a unit test to test the creation of a KisLogger.

kis-flow/test/kis_log_test.go

package test

import (
"context"
"kis-flow/log"
"testing"
)

func TestKisLogger(t *testing.T) {
ctx := context.Background()
log.Logger().InfoFX(ctx, "TestKisLogger InfoFX")
log.Logger().ErrorFX(ctx, "TestKisLogger ErrorFX")
log.Logger().DebugFX(ctx, "TestKisLogger DebugFX")
log.Logger().InfoF("TestKisLogger InfoF")
log.Logger().ErrorF("TestKisLogger ErrorF")
log.Logger().DebugF("TestKisLogger DebugF")
}

Navigate to the kis-flow/test/ directory and run the unit test command:

go test -test.v -test.paniconexit0 -test.run TestKisLogger

The result is as follows:

=== RUN   TestKisLogger
context.Background
TestKisLogger InfoFX
context.Background
TestKisLogger ErrorFX
context.Background
TestKisLogger DebugFX
TestKisLogger InfoF
TestKisLogger ErrorF
TestKisLogger DebugF
--- PASS: TestKisLogger (0.00s)
PASS
ok kis-flow/test 0.509s

2.2 KisConfig

In KisFlow, we define three core modules: KisFunction, KisFlow, and KisConnector. Therefore, KisConfig also needs to define these three modules separately. We will place all the code related to KisConfig in the kis-flow/config/ directory.

➜  kis-flow git:(master) ✗ tree
.
├── LICENSE
├── README.md
├── common/
│ └──
├── example/
│ └──
├── config/
│ ├──
├── test/
└── go.mod

2.2.1 KisFuncConfig Definition

In KisFlow, we define three core modules: KisFunction, KisFlow, and KisConnector. Therefore, KisConfig also needs to define these three modules separately. We will place all the code related to KisConfig in the kis-flow/config/ directory.

➜  kis-flow git:(master) ✗ tree
.
├── LICENSE
├── README.md
├── common/
│ └──
├── example/
│ └──
├── config/
│ ├──
├── test/
└── go.mod

2.2.1 KisFuncConfig Definition

The design document for KisFuncConfig in YAML format is as follows:

kistype: func
fname: TestKisFunction_S1
fmode: Save
source:
name: Test data source 1 - user order dimension
must:
- userid
- orderid

option:
cname: TestKisConnector_1
retry_times: 3
retry_duration: 500
default_params:
default1: default1_param
default2: default2_param

Parameter:

Next, based on the above configuration protocol, we will define the KisFunction strategy configuration structure and provide some corresponding initialization methods. We will create a kis_func_config.go file in the project documentation to implement the required Config definitions.

A. Struct Definition

kis-flow/config/kis_func_config.go

package config

import (
"kis-flow/common"
"kis-flow/log"
)

// FParam represents the fixed configuration parameters type for Function in the current Flow
type FParam map[string]string

// KisSource represents the business source of the current Function
type KisSource struct {
Name string `yaml:"name"` // Description of the data source for this Function
Must []string `yaml:"must"` // Fields required by the source
}

// KisFuncOption optional configurations
type KisFuncOption struct {
CName string `yaml:"cname"` // Connector name
RetryTimes int `yaml:"retry_times"` // Optional, maximum number of retries for Function scheduling (excluding normal scheduling)
RetryDuriton int `yaml:"return_duration"` // Optional, maximum time interval for each retry of Function scheduling (unit: ms)
Params FParam `yaml:"default_params"` // Optional, fixed configuration parameters for Function in the current Flow
}

// KisFuncConfig a KisFunction strategy configuration
type KisFuncConfig struct {
KisType string `yaml:"kistype"`
FName string `yaml:"fname"`
FMode string `yaml:"fmode"`
Source KisSource `yaml:"source"`
Option KisFuncOption `yaml:"option"`
}

Here, KisFuncConfig is the related struct, where FParam, KisSource, KisFuncOption are all relevant parameter types.

B. Method Definitions

Below, we first provide a simple constructor for creating KisFuncConfig.

kis-flow/config/kis_func_config.go

// NewFuncConfig creates a Function strategy configuration object, used to describe a KisFunction information
func NewFuncConfig(funcName string, mode common.KisMode, source *KisSource, option *KisFuncOption) *KisFuncConfig {
config := new(KisFuncConfig)
config.FName = funcName

if source == nil {
log.Logger().ErrorF("funcName NewConfig Error, source is nil, funcName = %s\n", funcName)
return nil
}
config.Source = *source
config.FMode = string(mode)
// Functions S and L require the KisConnector parameter to be passed, because S and L need to establish a streaming relationship through Connector
if mode == common.S || mode == common.L {
if option == nil {
log.Logger().ErrorF("Function S/L need option->Cid\n")
return nil
} else if option.CName == "" {
log.Logger().ErrorF("Function S/L need option->Cid\n")
return nil
}
}
if option != nil {
config.Option = *option
}
return config
}

The code above mentions two enum types, common.S and common.L. These are five types of enum values provided by us for KisFunction, which can be defined in the kis-flow/common/const.go file.

kis-flow/common/const.go

package common

type KisMode string

const (
// V is for feature verification KisFunction,
// mainly for data filtering, validation, field sorting, idempotent preprocessing
V KisMode = "Verify"
// S is for feature storage KisFunction,
// Save will store the data through NsConnector, and the temporary life cycle of the data is NsWindow
S KisMode = "Save"
// L is for feature loading KisFunction,
// Load will load the data through KisConnector, and logically it can be merged with the corresponding S Function
L KisMode = "Load"
// C is for feature calculation KisFunction,
// Calculate will calculate the data through data in KisFlow, generate new fields, pass the data flow to downstream S for storage, or it can be stored directly through KisConnector
C KisMode = "Calculate"
// E is for extending features KisFunction,
// as a custom feature function for streaming computation, such as, Notify scheduler triggers task message sending, deleting some data, resetting status, etc.
E KisMode = "Expand"
)

If fmode is Save or Load, it means this function has the behavior of querying the library or storing data, then this Function needs to be associated with a KisConnector, so CName needs to be passed in.

C. Create KisFuncConfig Unit Test

Now, we won’t do much method development for KisFuncConfig. We'll first run the existing program to do a unit test to test creating a KisFuncConfig.

kis-flow/test/kis_config_test.go

func TestNewFuncConfig(t *testing.T) {
source := config.KisSource{
Name: "TikTokOrder",
Must: []string{"order_id", "user_id"},
}

option := config.KisFuncOption{
CName: "connectorName1",
RetryTimes: 3,
RetryDuriton: 300,
Params: config.FParam{
"param1": "value1",
"param2": "value2",
},
}

myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option)
log.Logger().InfoF("funcName1: %+v\n", myFunc1)
}

We cd to the kis-flow/test/ directory and execute the unit test command:

go test -test.v -test.paniconexit0 -test.run TestNewFuncConfig

The result is as follows:

=== RUN   TestNewFuncConfig
funcName1: &{KisType: FName:funcName1 FMode:Save Source:{Name:TikTokOrder Must:[order_id user_id]} Option:{CName:connectorName1 RetryTimes:3 RetryDuriton:300 Params:map[param1:value1 param2:value2]}}

--- PASS: TestNewFuncConfig 0.00s)

Alright, now the basic creation of KisFuncConfig strategy is basically completed.

2.2.2 Definition of KisFlowConfig

The YAML representation of KisFlowConfig in the design document is as follows:

kistype: flow
status: 1
flow_name: MyFlow1
flows:
- fname: TestPrintInput
params:
args1: value1
args2: value2
- fname: TestKisFunction_S1
- fname: TestPrintInput
params:
args1: value11
args2: value22
default2: newDefault
- fname: TestPrintInput
- fname: TestKisFunction_S1
params:
my_user_param1: ffffffxxxxxx
- fname: TestPrintInput

Parameter:

A. Struct Definition

Next, we define the KisFlow strategy configuration struct based on the aforementioned configuration protocol and provide corresponding initialization methods. We create a kis_flow_config.go file in the project documentation, where we will implement the required Config definitions.

kis-flow/config/kis_flow_config.go

package config

import "kis-flow/common"

// KisFlowFunctionParam represents the ID of a Function and carries fixed configuration parameters within a Flow configuration
type KisFlowFunctionParam struct {
FuncName string `yaml:"fname"` // Required
Params FParam `yaml:"params"` // Optional, used to customize fixed configuration parameters for the Function within the current Flow
}

// KisFlowConfig represents an object that spans the entire streaming computing context
type KisFlowConfig struct {
KisType string `yaml:"kistype"`
Status int `yaml:"status"`
FlowName string `yaml:"flow_name"`
Flows []KisFlowFunctionParam `yaml:"flows"`
}

Here, a new parameter type KisFlowFunctionParam is provided. This represents the default parameters passed to the currently scheduled Function when configuring KisFlow. If not needed, this parameter can be omitted.

B. Method Definitions

We provide a constructor for creating a KisFlowConfig.

kis-flow/config/kis_flow_config.go

// NewFlowConfig creates a Flow strategy configuration object to describe a KisFlow information
func NewFlowConfig(flowName string, enable common.KisOnOff) *KisFlowConfig {
config := new(KisFlowConfig)
config.FlowName = flowName
config.Flows = make([]KisFlowFunctionParam, 0)
config.Status = int(enable)
return config
}

// AppendFunctionConfig adds a Function Config to the current Flow
func (fConfig *KisFlowConfig) AppendFunctionConfig(params KisFlowFunctionParam) {
fConfig.Flows = append(fConfig.Flows, params)
}

Regarding the Function configuration carried by the flow, we dynamically add it through AppendFunctionConfig. This is done in anticipation that KisFlow configuration may be extracted from a database/dynamic remote configuration in the future. Thus, configurations need to be dynamically combined.

C. KisFlowConfig Unit Test

Similarly, we create a simple unit test to test the creation of KisFlowConfig.

kis-flow/test/kis_config_test.go

func TestNewFlowConfig(t *testing.T) {

flowFuncParams1 := config.KisFlowFunctionParam{
FuncName: "funcName1",
Params: config.FParam{
"flowSetFunParam1": "value1",
"flowSetFunParam2": "value2",
},
}

flowFuncParams2 := config.KisFlowFunctionParam{
FuncName: "funcName2",
Params: config.FParam{
"default": "value1",
},
}

myFlow1 := config.NewFlowConfig("flowName1", common.FlowEnable)
myFlow1.AppendFunctionConfig(flowFuncParams1)
myFlow1.AppendFunctionConfig(flowFuncParams2)
log.Logger().InfoF("myFlow1: %+v\n", myFlow1)
}

Navigate to the kis-flow/test/ directory and execute the following command to run the unit test:

$ go test -test.v -test.paniconexit0 -test.run TestNewFlowConfig

The result is as follows:

=== RUN   TestNewFlowConfig
myFlow1: &{KisType: Status:1 FlowName:flowName1 Flows:[{FuncName:funcName1 Params:map[flowSetFunParam1:value1 flowSetFunParam2:value2]} {FuncName:funcName2 Params:map[default:value1]}]}

--- PASS: TestNewFlowConfig (0.00s)
PASS
ok kis-flow/test 0.251s

2.2.3 KisConnConfig

The KisConnConfig in the design document is formatted as follows in YAML:

kistype: conn
cname: TestKisConnector_1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: userid_orderid_option
params:
args1: value1
args2: value2
load: null
save:
- 测试KisFunction_S1

A. Struct Definition

Next, we define the KisConnector strategy configuration struct based on the aforementioned configuration protocol and provide corresponding initialization methods. We create a kis_conn_config.go file in the project documentation, where we will implement the required Config definitions.

kis-flow/config/kis_conn_config.go

package config

import (
"errors"
"fmt"
"kis-flow/common"
)

// KisConnConfig represents the KisConnector strategy configuration
type KisConnConfig struct {
// Configuration type
KisType string `yaml:"kistype"`
// Unique descriptor
CName string `yaml:"cname"`
// Basic storage medium address
AddrString string `yaml:"addrs"`
// Storage medium engine type, such as "Mysql", "Redis", "Kafka", etc.
Type common.KisConnType `yaml:"type"`
// Identifier for a single storage: for example, Key name for Redis, Table name for Mysql, Topic name for Kafka, etc.
Key string `yaml:"key"`
// Custom parameters in the configuration information
Params map[string]string `yaml:"params"`
// Function ID bound to storage reading
Load []string `yaml:"load"`
Save []string `yaml:"save"`
}

B. Method Definitions

kis-flow/config/kis_conn_config.go

// NewConnConfig creates a KisConnector strategy configuration object to describe a KisConnector information
func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig {
strategy := new(KisConnConfig)
strategy.CName = cName
strategy.AddrString = addr

strategy.Type = t
strategy.Key = key
strategy.Params = param
return strategy
}


// WithFunc binds Connector to Function
func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {
switch common.KisMode(fConfig.FMode) {
case common.S:
cConfig.Save = append(cConfig.Save, fConfig.FName)
case common.L:
cConfig.Load = append(cConfig.Load, fConfig.FName)
default:
return errors.New(fmt.Sprintf("Wrong KisMode %s", fConfig.FMode))
}
return nil
}

Here, the WithFunc method is also provided to dynamically add the relationship between Conn and Function.

C. KisConnConfig Unit Test

Similarly, we create a simple unit test to test the creation of KisConnConfig.

kis-flow/test/kis_config_test.go

func TestNewConnConfig(t *testing.T) {

source := config.KisSource{
Name: "Public Account TikTok Store User Order Data",
Must: []string{"order_id", "user_id"},
}

option := config.KisFuncOption{
CName: "connectorName1",
RetryTimes: 3,
RetryDuriton: 300,
Params: config.FParam{
"param1": "value1",
"param2": "value2",
},
}

myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option)
connParams := config.FParam{
"param1": "value1",
"param2": "value2",
}

myConnector1 := config.NewConnConfig("connectorName1", "0.0.0.0:9987,0.0.0.0:9997", common.REDIS, "key", connParams)
if err := myConnector1.WithFunc(myFunc1); err != nil {
log.Logger().ErrorF("WithFunc err: %s\n", err.Error())
}

log.Logger().InfoF("myConnector1: %+v\n", myConnector1)
}

Navigate to the kis-flow/test/ directory and execute the following command to run the unit test:

$ go test -test.v -test.paniconexit0 -test.run TestNewConnConfig

The result is as follows:

=== RUN   TestNewConnConfig
myConnector1: &{KisType: CName:connectorName1 AddrString:0.0.0.0:9987,0.0.0.0:9997 Type:redis Key:key Params:map[param1:value1 param2:value2] Load:[] Save:[funcName1]}

--- PASS: TestNewConnConfig (0.00s)
PASS
ok kis-flow/test 0.481s

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki

[KisFlow-Golang Framework Hands-on]

<(Part 1) Overview>
<(Part 2.1) Project Construction / Basic Modules>

--

--

No responses yet