Writing Your Own Wallaroo Go Application

In this section, we will go over the components that are required in order to write a Wallaroo Go application. We will start with the stateless reverseapplication from the examples section, then move on to an application that maintains and modifies state.

A Stateless Application - Reverse Words

The reverse application is going to receive text as its input, reverse it, and then send out the reversed text to its sink. In order to do this, our application needs to provide the following functions:

  • Input decoding - how to translate the incoming bytestream into a Go string
  • Computation - this is where the input string is going to get reversed
  • Output encoding - how to construct and format the bytestream that will be sent out by the sink

Computation

Let's start with the computation, because that's the purpose of the application:

type Reverse struct {}

func (r *Reverse) Name() string {
  return "reverse"
}

func (r *Reverse) Compute(data interface{}) interface{} {
  input := *(data.(*string))

  // string reversal taken from
  // https://groups.google.com/forum/#!topic/golang-nuts/oPuBaYJ17t4

  n := 0
  rune := make([]rune, len(input))
  for _, r := range input {
    rune[n] = r
    n++
  }
  rune = rune[0:n]
  // Reverse
  for i := 0; i < n/2; i++ {
    rune[i], rune[n-1-i] = rune[n-1-i], rune[i]
  }
  // Convert back to UTF-8.
  output := string(rune)

  return output
}

A Computation has no state, so it only needs to define its name, and how to convert input data into output data. In this case, string reversal is done using some code we got from the go-lang nuts forum.

You'll notice that the type of argument to Compute is interface {}. Your data types are opaque to Wallaroo and could be of any type. For this reason, you'll see interface {} used in a number of places in the Wallaroo Go API.

Sink Encoder

Next, we are going to define how the output gets constructed for the sink. It is important to remember that when using a TCP sink, Wallaroo sends its output over the network, so data going through the sink needs to be of type []byte.

type Encoder struct {}

func (e *Encoder) Encode(data interface {}) []byte {
  msg := data.(string)
  return []byte(msg + "\n")

SourceDecoder

Now, we also need to decode the incoming bytes of the source.

type Decoder struct {}

func (d *Decoder) HeaderLength() uint64 {
  return 4
}

func (d *Decoder) PayloadLength(b []byte) uint64 {
  return uint64(binary.BigEndian.Uint32(b[0:4]))
}

func (d* Decoder) Decode(b []byte) interface{} {
  x := string(b[:])
  return &x
}

This one is different. When using a TCP source, Wallaroo handles streams of bytes, and in order to do that efficiently, it uses a method called message framing. This means that Wallaroo requires input data to follow a special structure, as well as for the application to provide the mechanism with which to decode this data.

To read more about this, please refer to the Creating A Decoder section.

For our application purposes, we will simply define the structure and how it is going to get parsed:

  1. Input messages have the following structure: A fixed length PAYLOAD_SIZE followed by PAYLOAD
  2. Wallaroo requires three methods to parse this type of message:
    1. HeaderLength(), which returns the number of bytes used for the PAYLOAD_SIZE in the message. This value tells Wallaroo how many bytes to read from the stream as HEADER.
    2. PayloadLength([]byte), which reads PAYLOAD_SIZE bytestring of the size returned by HeaderLength() and computes the size of PAYLOAD. It then returns that size as an integer to Wallaroo, which will then read that many bytes from the stream.
    3. Decode([]byte), which receives the remainder of the message, MSG, and decodes it into a Go object.

In our case:

  • PAYLOAD_SIZE is a big-endian unsigned 64-bit integer, so we return 4 from HeaderLength() and use uint64(binary.BigEndian.Uint32(b[0:4])) to read it as an integer.
  • PAYLOAD is our text that we are going to reverse.

Application Setup

So now that we have input decoding, computation, and output decoding defined, how do we build it all into an application?

For this, two things are needed:

  1. An entry point for Wallaroo to create the application. This is the function ApplicationSetup() that you need to define.
  2. The actual topology ApplicationSetup() is going to return for Wallaroo to create the application.

Application Builder and Pipelines

An application is constructed of pipelines which, in turn, are constructed from a sequence of a source, one or more computations and state computations, and optionally a sink. Our reverse application only has one pipeline, so we only need to create one:

application := app.MakeApplication("Reverse Word")
application.NewPipeline("Reverse", app.MakeTCPSourceConfig(inHost, inPort, &Decoder{})).

Each pipeline must have a source, and each source must have a decoder, so NewPipeline takes a name and a TCPSourceConfig instance as its arguments.

Next, we add the computation step:

To(&ReverseBuilder{}).

And finally, we add the sink, using a TCPSinkConfig:

ToSink(app.MakeTCPSinkConfig(outHost, outPort, &Encoder{}))

The ApplicationSetup Entry Point

After Wallaroo starts, it will try to execute its ApplicationSetup() function. This function is where the application builder steps from above are going to run.

//export ApplicationSetup
func ApplicationSetup() *C.char {
  fs := flag.NewFlagSet("wallaroo", flag.ExitOnError)
  inHostsPortsArg := fs.String("in", "", "input host:port list")
  outHostsPortsArg := fs.String("out", "", "output host:port list")

  fs.Parse(wa.Args[1:])

  inHostsPorts := hostsPortsToList(*inHostsPortsArg)

  inHost := inHostsPorts[0][0]
  inPort := inHostsPorts[0][1]

  outHostsPorts := hostsPortsToList(*outHostsPortsArg)
  outHost := outHostsPorts[0][0]
  outPort := outHostsPorts[0][1]

  wa.Serialize = Serialize
  wa.Deserialize = Deserialize

  application := app.MakeApplication("Reverse Word")
  application.NewPipeline("Reverse", app.MakeTCPSourceConfig(inHost, inPort, &Decoder{})).
    To(&ReverseBuilder{}).
    ToSink(app.MakeTCPSinkConfig(outHost, outPort, &Encoder{}))

  return C.CString(application.ToJson())
}

Configuration objects are used to pass information about sources and sinks to the application builder.

Running reverse

The complete example is available here. To run it, follow the Reverse application instructions

Next Steps

To learn how to write a stateful application, continue to Writing Your Own Stateful Application.

Our Reverse application contains serialization and deserialization code that we didn't cover; to learn more, skip ahead to Interworker Serialization and Resilience.

results matching ""

    No results matching ""