Playing with GPars: Managing the upper limit of queues

I was invited speaker at GR8Conf-Europe in Copenhagen. As such I had the pleasure to watch all the other speakers on great Groovy, Griffon and Grails technology. Especially the presentation on GPars from Václav Pech raised my attention. I started right away to play around and created a little example based on the Dataflow concept. My first attempt of a simple dataflow processing chain ended in an OutOfMemory error. So I started a discussion with Václav, who gave the necessary advice and fixes to my code. In this episode I will wrap up what has been discussed (the original mailing list entry is here). My original example looked like this
import groovyx.gpars.dataflow.DataFlowBroadcast
import groovyx.gpars.dataflow.DataFlowQueue
import static groovyx.gpars.dataflow.DataFlow.operator
import static groovyx.gpars.dataflow.DataFlow.task

def b = new DataFlowQueue()
def a0 = new DataFlowBroadcast()

operator([inputs:[a0.createReadChannel()], outputs:[b], maxForks:2]) { val ->
  bindOutput "$val[*]: ${val*val}"
}

operator([inputs:[a0.createReadChannel()], outputs:[b], maxForks:2]) { val ->
  bindOutput "$val[+]: ${val+val}"
}


task {
  int i = 1
  while (true) {
    println "${i++}: $b.val"
  }
}

for (i in 1..1000000) {
  a0 << i
}
The code is creating an integer sequence, which is passed through two operators which create the double value (val+val) and the square (val*val). The DataFlowBroadcast ensures that both operators get the same value. Finally the result of both operations is pushed to printing task via a DataFlowQueue. As I said, this code failed due to a higher rate of production than the rate of consumption, leading to an overflow in one of the queues.

Now Václav suggested two changes. The first simply checks for the size of all queues against a MAX value and if that is reached, the corresponding thread is paused via Thread.yield(). Here is the code

import groovyx.gpars.dataflow.DataFlowQueue
import static groovyx.gpars.dataflow.DataFlow.operator
import static groovyx.gpars.dataflow.DataFlow.splitter
import static groovyx.gpars.dataflow.DataFlow.task

def b = new DataFlowQueue()
def a0 = new DataFlowQueue()
def a1 = new DataFlowQueue()
def a2 = new DataFlowQueue()

MAX=100

splitter(a0, [a1, a2])
operator([inputs: [a1], outputs: [b], maxForks: 4]) { val ->
  bindOutput "$val[*]: ${val * val}"
}

operator([inputs: [a2], outputs: [b], maxForks: 4]) { val ->
  bindOutput "$val[+]: ${val + val}"
}

task {
  int i = 1
  long t0 = System.currentTimeMillis()
  int count = 0
  while (true) {
    b.val
    println "${i++}: $b.val"
  }
}

int i=0
while (i<50000) {
  a0 << i++
  while ([a0.length(), a1.length(), a2.length()].max()>MAX) Thread.yield()
}
Also the broadcast queue is replaced by a splitter and two DataFlowQueues, because DataFlowBroadcast has no length() method, which is necessary for the upper bound check. He also suggested a second alternative, which implements a communication between consumer and producer than an a global check on queue length. In that example, a special token is passed around as a confirmation message to signal the next chunk of delivery. This solution allows that producer and consumer are further away from each other. The downside is, that each operator has to handle this special token in its processing engine. Also note that now the length() method is no longer required and we can go back to DataFlowBroadcast.
import groovyx.gpars.dataflow.DataFlowBroadcast
import groovyx.gpars.dataflow.DataFlowQueue
import static groovyx.gpars.dataflow.DataFlow.operator
import static groovyx.gpars.dataflow.DataFlow.task

def b = new DataFlowQueue()
def c = new DataFlowQueue()
def a0 = new DataFlowBroadcast()

operator([inputs: [a0.createReadChannel()], outputs: [b], maxForks: 2]) { val ->
  if (val == -1) bindOutput val
  else bindOutput "$val[*]: ${val * val}"
}

operator([inputs: [a0.createReadChannel()], outputs: [b], maxForks: 2]) { val ->
  if (val == -1) bindOutput val
  else bindOutput "$val[+]: ${val + val}"
}

task {
  int i = 1
  long t0 = System.currentTimeMillis()
  int count = 0
  while (true) {
    def value = b.val
    if (value == -1) c << value
    else {
      println "${i++}: $value "
    }
  }
}
int MAX = 100
int n = 0

int i=0
while (i<50000) {
  a0 << -1
  MAX.times {
    a0 << i++
  }
  c.val
  c.val
}

Notes

  1. async posted this
Comments
blog comments powered by Disqus