python - How to resolve : Very large size tasks in spark -
here pasting python code running on spark in order perform analysis on data. able run following program on small amount of data-set. when coming large data-set, saying "stage 1 contains task of large size (17693 kb). maximum recommended task size 100 kb".
import os import sys import unicodedata operator import add try: pyspark import sparkconf pyspark import sparkcontext except importerror e: print ("error importing spark modules", e) sys.exit(1) def tokenize(text): resultdict = {} text = unicodedata.normalize('nfkd', text).encode('ascii','ignore') str1= text[1] str2= text[0] arrtext= text.split(str1) ss1 = arrtext[0].split("/") docid = ss1[0].strip() docname = ss[1].strip() resultdict[docid+"_"+docname] = 1 return resultdict.iteritems() sc=sparkcontext('local') textfile = sc.textfile("path data") filecontent = textfile.flatmap(tokenize) rdd = sc.parallelize(filecontent.collect()) rdd= rdd.map(lambda x: (x[0], x[1])).reducebykey(add) print rdd.collect() #reducebykey(lambda a,b: a+b) rdd.coalesce(1).saveastextfile("path result")
here posting little more warning: job not running after this. can 1 me this.
16/06/10 19:19:58 warn tasksetmanager: stage 1 contains task of large size (17693 kb). maximum recommended task size 100 kb. 16/06/10 19:19:58 info tasksetmanager: starting task 0.0 in stage 1.0 (tid 5314, localhost, partition 0,process_local, 18118332 bytes) 16/06/10 19:19:58 info executor: running task 0.0 in stage 1.0 (tid 5314) 16/06/10 19:43:00 info blockmanagerinfo: removed broadcast_1_piece0 on localhost:43480 in memory (size: 3.9 kb, free: 511.1 mb) 16/06/10 19:43:00 info contextcleaner: cleaned accumulator 2
when spark serializes tasks, recursively serializes full closure context. in case, logical culprit seems unicodedata
use in tokenize
. may wrong don't see other heavy data structures in code. (caveat, typically use spark scala , python rusty.) wonder whether library backed heavy data structures not available on executor nodes.
the typical patterns dealing these types of problems are:
make sure libraries available on executor nodes.
use broadcast variables distribute heavy data structures executors.
unrelated, unless using debugging tool, doing unnecessary collection of data driver collect
. transformation can chained:
sc.textfile(...).flatmap(...).map(...).reducebykey(add).coalesce(1).saveastextfile(...)
Comments
Post a Comment