hadoop - Spark job with large text file in gzip format -
i'm running spark job taking way long time process input file. input file 6.8 gb in gzip format , contains 110 m lines of text. know it's in gzip format, it's not splittable , 1 executor used read file.
as part of debug process, decided see how long take convert gzip file parquet. idea once convert parquet files , if run original spark job on file, in case use multiple executors , input file processed in parallel.
but small job taking long time expected. here code:
val input = sqlcontext.read.text("input.gz") input.write.parquet("s3n://temp-output/")
when extracted file in laptop (16 gb ram), took less 2 minutes. when run on spark cluster, expectation take same or less time since executor memory using 58 gb. took ~20 minutes.
what missing here? i'm sorry if sounds pretty amateur i'm new in spark.
what's optimal way run spark job on gzip file? assume not have option create file in other file format (bzip2, snappy, lzo).
when doing input-process-output type of spark jobs, there 3 separate issues consider:
- input parallelism
- processing parallelism
- output parallelism
in case, input parallelism 1 because in question claim cannot change input format or granularity.
you doing no processing can't gains there.
however, can control output parallelism, give 2 benefits:
multiple cpus write, decreasing total time of write operation.
your output split in multiple files allowing take advantage of input parallelism in later processing.
to increase parallelism, have increase number of partitions, can repartition()
, e.g.,
val numpartitions = ... input.repartition(numpartitions).write.parquet("s3n://temp-output/")
when choosing optimal number of partitions, there number of different factors consider.
- data size
- parition skew
- cluster ram size
- number of cores in cluster
- the type of follow-on processing you'll do
- the size of cluster (ram & cores) you'll use follow-on processing
- the system writing to
without knowing goals , constraints difficult make solid recommendation here couple general guidelines work with:
since partitions won't skewed (the above use of
repartition
use hash partitioner corrects skew), fastest throughput if set number of partitions equal number of executor cores, assuming using nodes sufficient i/o.when process data, want entire partition able "fit" in ram allocated single executor core. "fit" means here depends on processing. if doing simple
map
transformation, data may streamed. if doing involving ordering ram needs grow substantially. if using spark 1.6+, you'll benefit of more flexible memory management. if using earlier version, you'll have more careful. job execution grinds halt when spark has start "buffering" disk. on-disk size , in-ram size can very, different. latter varies based on how process data , how benefit spark can predicate pushdown (parquet supports that). use spark ui see how ram various job stages take.
btw, unless data has specific structure, not hard-code partition numbers because code run sub-optimally on clusters of varying sizes. instead, use following trick determine number of executors in cluster. can multiply number of cores per executor based on machines using.
// -1 driver node val numexecutors = sparkcontext.getexecutorstoragestatus.length - 1
just point of reference, on our team, use rather complex data structures, means ram size >> disk size, aim keep s3 objects in 50-250mb range processing on nodes each executor core has 10-20gb ram.
hope helps.
Comments
Post a Comment