Plumber is a framework for creating data pipelines and stream processing tools.
Checkpoints run under special circumstances and backup current state of system.
Backends for our stateful processor.
Streams are the way we move data around. Streams are the input and output of our application. Streams are stateful and their state is just a part of System state.
Pipes are pure functions that get the state and an input and return some output. Remember that since Pipes get runned using Goroutiens you can block in them so you can do any kind of event buckets in them. ( Similar to Windows in ApacheFlink, see pipe/window.go)
Windows are a specific type of pipes that can block data flow and release a buffer of events based on various logics such as:
You can use plumber as a simple library for creating fast scalable data piplines and stream processing tools in Golang. Example:
package main
import (
"context"
"fmt"
"strings"
"time"
"github.com/amirrezaask/plumber"
"github.com/amirrezaask/plumber/checkpoint"
"github.com/amirrezaask/plumber/pipe"
"github.com/amirrezaask/plumber/pipeline"
"github.com/amirrezaask/plumber/state"
"github.com/amirrezaask/plumber/stream"
)
func toLower(s plumber.State, i interface{}) (interface{}, error) {
word := strings.ToLower(i.(string))
return word, nil
}
func toUpper(ctx *plumber.PipeCtx) {
for {
word := (<-ctx.In).(string)
word = strings.ToUpper(word)
ctx.Out <- word
}
}
func count(ctx *plumber.PipeCtx) {
for {
word := (<-ctx.In).(string)
counter, err := ctx.State.GetInt(string(word))
if err != nil {
ctx.Err <- err
return
}
counter = counter + 1
err = ctx.State.Set(string(word), counter)
if err != nil {
ctx.Err <- err
return
}
ctx.Out <- word
}
}
func main() {
r, err := state.NewRedis(context.Background(), "localhost", "6379", "", "", 0)
if err != nil {
panic(err)
}
//create our plumber pipeline
errs, err := pipeline.
NewDefaultSystem().
SetCheckpoint(checkpoint.WithInterval(time.Second * 1)).
SetState(r).
From(stream.NewArrayStream("amirreza", "parsa")).
Then(toUpper).
Then(pipe.MakePipe(toLower)).
Then(count).
To(stream.NewPrinterStream()).
Initiate()
if err != nil {
panic(err)
}
for err := range errs {
fmt.Println(err)
}
}
Plumber can also be used as a standalone binary that you feed configuration into it. It has all the benefits of plumber but you can write you processing logic in any language even in Bash. Example configuration:
{
"from": {
"type": "array",
"args": {
"words": ["amirreza"]
}
},
"to": {
"type": "printer",
"args": {
}
},
"checkpoint": {
"type": "time-based",
"args": {
"interval": 2
}
},
"state": {
"type": "map",
"args": {
}
},
"pipeline": [
{
"path": "echo",
"needs_state": false
},
{
"path": "cowsay",
"needs_state": false
}
]
}
FBL is tool to find broken links in articles and files
Hascal is a general purpose and open source programming language designed to build optimal, maintainable, reliable and efficient software.
FL Chart is a highly customizable Flutter chart library that supports Line Chart, Bar Chart, Pie Chart, Scatter Chart, and Radar Chart.
Loki, experimental language that compiles to C.
My world in code
Hello World in different languages !
PyBotNet framework, high level remote control