parallel processing - How to limit number of unprocessed Futures in Scala? -
i cannot fund if there way limit number of unprocessed futures in scala. example in following code:
import executioncontext.implicits.global (i <- 1 n) { val f = future { //some work bunch of object creation } }
if n big, throw oom. there way limit number of unprocessed futures ether queue-like wait or exception?
so, simplest answer can create executioncontext
blocks or throttles execution of new tasks beyond limit. see this blog post. more fleshed out example of blocking java executorservice
, here an example. [you can use directly if want, library on maven central here.] wraps nonblocking executorservice
, can create using factory methods of java.util.concurrent.executors
.
to convert java executorservice
scala executioncontext
executioncontext.fromexecutorservice( executorservice )
. so, using library linked above, might have code like...
import java.util.concurrent.{executioncontext,executors} import com.mchange.v3.concurrent.boundedexecutorservice val executorservice = new boundedexecutorservice( executors.newfixedthreadpool( 10 ), // pool of ten threads 100, // block new tasks when 100 in process 50 // restart accepting tasks when number of in-process tasks falls below 50 ) implicit val executioncontext = executioncontext.fromexecutorservice( executorservice ) // stuff creates lots of futures here...
that's fine if want bounded executorservice
last long whole application. if creating lots of futures in localized point in code, , want shut down executorservice
when done it. define loan-pattern methods in scala [maven central] both create context , shut down after i'm done. code ends looking like...
import com.mchange.sc.v2.concurrent.executioncontexts executioncontexts.withboundedfixedthreadpool( size = 10, blockbound = 100, restartbeneath = 50 ) { implicit executioncontext => // stuff creates lots of futures here... // make sure futures have completed before scope ends! // that's important! otherwise, futures never run }
rather using executorservice
, blocks outright, can use instance slows things down forcing task-scheduling (future
-creating) thread
execute task rather running asynchronously. you'd make java.util.concurrent.threadpoolexecutor
using threadpoolexecutor.callerrunspolicy
. threadpoolexecutor complex build directly.
a newer, sexier, more scala-centric alternative of check out akka streams alternative future
concurrent execution "back-pressure" prevent outofmemoryerrors
.
Comments
Post a Comment