scala - Why so many tasks in my spark job? -
i have spark job takes file 8 records hdfs, simple aggregation , saves hadoop. notice there hundreds of tasks when this.
i not sure why there multiple jobs this? thought job more when action happened. can speculate why - understanding inside of code should 1 job , should broken down stages, not multiple jobs. why doesn't break down stages, how come breaks jobs?
as far 200 plus tasks, since amount of data , amount of nodes miniscule, doesn't make sense there 25 tasks each row of data when there 1 aggregations , couple of filters. why wouldn't have 1 task per partition per atomic operation?
here relevant scala code -
import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.apache.spark.sparkcontext._ import org.apache.spark.sparkconf object testproj {object testproj { def main(args: array[string]) { /* set application name in sparkconf object */ val appconf = new sparkconf().setappname("test proj") /* env settings don't need set in repl*/ val sc = new sparkcontext(appconf) val sqlcontext = new sqlcontext(sc) import sqlcontext.implicits._ val rdd1 = sc.textfile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt") /*the below rdd have schema defined in record class*/ val rddcase = sc.textfile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt") .map(x=>x.split(" ")) //file record array of strings based spaces .map(x=>record( x(0).toint, x(1).asinstanceof[string], x(2).asinstanceof[string], x(3).toint )) /* below dataframe groups on first letter of first name , counts it*/ val aggdf = rddcase.todf() .groupby($"firstname".substr(1,1).alias("firstletter")) .count .orderby($"firstletter") /* save hdfs*/ aggdf.write.format("parquet").mode("append").save("/raw/miscellaneous/ex_out_agg") } case class record(id: int , firstname: string , lastname: string , quantity:int) }
below screen shot after clicking on application
below stages show when viewing specific "job" of id 0
below first part of screen when clicking on stage on 200 tasks
this second part of screen inside stage
below after clicking on "executors" tab
as requested, here stages job id 1
here details stage in job id 1 200 tasks
this classic spark question.
the 2 tasks used reading (stage id 0 in second figure) defaultminpartitions
setting set 2. can parameter reading value in repl sc.defaultminpartitions
. should visible in spark ui under "environment" tap.
you can take @ code github see happening. if want more partitions used on read, add parameter e.g., sc.textfile("a.txt", 20)
.
now interesting part comes 200 partitions come on second stage (stage id 1 in second figure). well, each time there shuffle, spark needs decide how many partitions shuffle rdd have. can imagine, default 200.
you can change using:
sqlcontext.setconf("spark.sql.shuffle.partitions", "4”)
if run code configuration see 200 partitions not going there more. how set parameter kind of art. maybe choose 2x number of cores have (or whatever).
i think spark 2.0 has way automatically infer best number of partitions shuffle rdds. looking forward that!
finally, number of jobs has how many rdd actions resulting optimized dataframe code resulted to. if read spark specs says each rdd action trigger 1 job. when action involves dataframe or sparksql catalyst optimizer figure out execution plan , generate rdd based code execute it. it's hard why uses 2 actions in case. may need @ optimized query plan see doing.
Comments
Post a Comment