-
Notifications
You must be signed in to change notification settings - Fork 290
Write Mapper Reducer in Go
One of the common request is to be able to write Go code for mappers and reducers. Many users like the syntax from Glow, which sends out the whole binary executable to the agents, and executes the functions via reflection.
However, reflection is not very performant.
Gleam will still send the driver binary to agents, which will invoke the registered mapper or reducer functions in the binary.
The driver program mostly are the same. See the full working example here: https://github.com/chrislusf/gleam/blob/master/examples/word_count_in_go/word_count_in_go.go
var (
MapperTokenizer = gio.RegisterMapper(tokenize)
MapperAddOne = gio.RegisterMapper(addOne)
ReducerSum = gio.RegisterReducer(sum)
)
func main() {
// Init() will determine whether the driver program will execute the mapper/reducer or not.
gio.Init()
flow.New().
TextFile("/etc/passwd").
Pipe("tr 'A-Z' 'a-z'").
Mapper(MapperTokenizer). // invoke the registered "tokenize" mapper function.
Mapper(MapperAddOne). // invoke the registered "addOne" mapper function.
ReducerBy(ReducerSum). // invoke the registered "sum" reducer function.
Sort(flow.OrderBy(2, true)).
Fprintf(os.Stdout, "%s %d\n").
Run()
Here are the interface to define mapper and reducer. See https://godoc.org/github.com/chrislusf/gleam/gio#Mapper
type Mapper func([]interface{}) error
type Reducer func(x, y interface{}) (interface{}, error)
We are using interface{} here, without the luxury of strict type checking any more. It's a small cost to pay for a much higher performance.
func tokenize(row []interface{}) error {
line := string(row[0].([]byte))
for _, s := range strings.FieldsFunc(line, func(r rune) bool {
return !('A' <= r && r <= 'Z' || 'a' <= r && r <= 'z' || '0' <= r && r <= '9')
}) {
gio.Emit(s)
}
return nil
}
func addOne(row []interface{}) error {
word := string(row[0].([]byte))
gio.Emit(word, 1)
return nil
}
func sum(x, y interface{}) (interface{}, error) {
return x.(uint64) + y.(uint64), nil
}
Without it, it may get into a fork loop.
It's because LuaJIT only knows []byte. This is trying to be compatible with LuaJIT.