GUIDE TO RUN SPARK
Most Spark developers spend a lot of time troubleshooting Fetch Failed exceptions that occur during random operations. This story provides insight into the most common causes of a failed fetch exception and reveals the results of a recent exception search.
Random operations are the backbone of almost all Spark jobs aimed at aggregating, combining, or restructuring data. During a random operation, data is shuffled between multiple nodes in the cluster through a two-step process:
a) Random write: Random map tasks write the data to be shuffled to a disk file, the data is arranged in the file according to the shuffling tasks. The set of random data corresponding to a random reduction task written by a random mapping task is called a random block. In addition, each of the random map tasks informs the driver about the recorded random data.
b) Random Read: Shuffle tasks query the driver for the positions of their random blocks. These tasks then connect to the executors hosting their random blocks and start getting the required random blocks. Once a block is retrieved, it is available for further calculations in the shrink job.
To learn more about the shuffling process, you can check out my previous story titled:Discovering Apache Spark Merge Magic.
Although the two-step scrambling process sounds simple, it is labor intensive and involves sorting data, disk read/write operations, and network transfers. Therefore, there is always a question mark regarding the reliability of a random operation, and the proof of this unreliability is the "FetchFailed Exception" that usually occurs during the random operation. Most Spark developers spend a lot of time troubleshooting this very common exception. First they try to find out the root cause of the exception and then find the right solution for it.
A fetch failure exception reported in a random task indicates an error reading one or more random blocks from the hosting executors. Debugging a FetchFailed exception is quite difficult as it can occur for many reasons. Finding and knowing the right reason is very important because it would help you to find the right solution to overcome the exception.
Troubleshooting hundreds of Spark jobs recently made me realize that the Fetch Failed exception is mainly due to the following reasons:
- Off-heap memory on executors
- Low memory overhead for executors
- Random block larger than 2GB
- Network timeout.
To understand the frequency of these reasons, I also performeda questionnairerecently in the LinkedIn group with the most followers on Spark,Spark-ApacheName🇧🇷 To my surprise, many people responded to the survey and sent feedback, further confirming the fact that this exception is a common occurrence in their Spark jobs. These are the survey results:
According to the survey results, the most frequently chosen reasons are "Insufficient memory on an executor" and "Random block greater than 2 GB". This is followed by "Network timeout" and "Low memory overhead on an executor".
Let’s understand each of these reasons in detail:
'Not enough heap memory on an executor':This reason indicates that the "Fetch Failed" exception occurred because an executor hosting the corresponding random blocks crashed due to the Java "Out of Memory" error. Insufficient memory error can occur when the executor runs out of heap space or the executor's garbage collector is wasting more time on garbage collection than on actual useful work.
To correlate this reason, you need to check the hosting implementer details (hostname/IP address/port) mentioned in a fetch failure exception. After receiving the executor details, you may notice the following task errors when hosting the executors:
- 'ExecutorLostFailure' due to exit code 143
- ExecutorLostFailure due to executor heartbeat timeout.
These hosting executor task errors indicate that the executor hosting the random blocks has terminated due to Java Out of Memory error. You can also explicitly report the error to the executor's container logs. Since the hosting executor was removed, the hosted random blocks could not be fetched and therefore may throw Fetch Failed exceptions in one or more random tasks.
According to the poll results, this reason has the highest percentage of votes. I also experienced a greater proportion of this motive in my work.
(Video) Apache Spark on Kubernetes—Lessons Learned from Launching Millions of Spark Executors
'Low memory overhead in an executor':This reason indicates that a fetch exception failed because an executor hosting the matching random blocks crashed due to low memory overhead. Low memory overflow error occurs when an executor's physical RAM footprint exceeds specified physical memory limits. This scenario can occur when the executor heap is heavily used and there is also a good demand for off-heap memory.
To correlate this reason, you need to check the hosting implementer details (hostname/IP address/port) mentioned in the fetch failure exception. After receiving the executor details, you may notice the following task error when hosting the executors:
- 'ExecutorLostFailure, # GB occupied by # GB physical memory. Consider increasing spark.yarn.executor.overhead'
The above task error against a hosting executor indicates that the executor hosting the random blocks has been killed due to overuse of defined physical memory limits. As the hosting executor was removed again, the hosted random fragments could not be retrieved and therefore could throw recovery failure exceptions on one or more random tasks.
According to the poll results, this reason has the lowest percentage of votes. But I also experienced a greater proportion of this reason. In fact, the ratio is similar to the "out of memory in an executor" reason.
'Random block larger than 2GB':The "Retrieval Failed" exception, which lists "Too Large Frame", "Frame size supering", or "size supering Integer.MaxValue" as the cause of the error, indicates that the corresponding random task attempted to retrieve a random block larger than 2GB to recover. This is mainly due to the limitation of Integer.MaxValue (2GB) in the data structure abstraction (ByteBuffer) used to store a random block in memory.
However, as of Spark 2.4, this particular cause has been largely fixed.
According to the survey results, this reason received the second highest percentage of votes. But I rarely saw Fetch Failed in my work because of this.
'Network timeout':The random block fetch is normally repeated a configurable number of times (spark.shuffle.io.maxRetries
) at configurable intervals (spark.shuffle.io.retryWait
🇧🇷 When all withdrawals are exhausted by getting a random block from its hosting executor, a fetch failure exception is thrown in the shuffling task. These failed recovery exceptions are usually categorized under the Network Timeout category.
These fetch failure exceptions are difficult to correlate. These exceptions can also occur due to network issues or when the executors hosting the matching random blocks are overloaded.
According to the survey results, this reason received the third highest percentage of votes. I also experienced this cause frequently.
Hope after reading the story you have a clear idea about the different causes of recovery failure exceptions. I intend to cover the possible solutions for each cause in another story. If you are looking for an urgent solution for the restore failed exception, you can leave a message.
Finally, I would like to thank everyone who participated in the survey and provided feedback.
If you have any comments or questions about this story, please write in the comments section. Hope you find it useful.Here is the link to other full Apache Spark stories posted by me.
FAQs
Which of the following will cause a Spark job to fail? ›
Spark jobs might fail due to out of memory exceptions at the driver or executor end. When troubleshooting the out of memory exceptions, you should understand how much memory and cores the application requires, and these are the essential parameters for optimizing the Spark appication.
What happens when Spark job fails? ›Failure of worker node – The node which runs the application code on the Spark cluster is Spark worker node. These are the slave nodes. Any of the worker nodes running executor can fail, thus resulting in loss of in-memory If any receivers were running on failed nodes, then their buffer data will be lost.
What is Spark shuffle service enabled? ›If the service is enabled, Spark executors fetch shuffle files from the service instead of from each other. Thus, any shuffle state written by an executor may continue to be served beyond the executor's lifetime.
What is Spark shuffle partitions? ›spark. sql. shuffle. partitions : Determines how many output partitions you will have after doing wide operations on Dataframes/Datasets by default.
What are the common issues of Apache Spark? ›Some of the drawbacks of Apache Spark are there is no support for real-time processing, Problem with small file, no dedicated File management system, Expensive and much more due to these limitations of Apache Spark, industries have started shifting to Apache Flink– 4G of Big Data.
How does Spark recover from failure? ›In the programming world, faults and application failures at the production scale are extremely common. To tackle such incidents, Spark recovers loss once the failure occurs. This self-recovery feature of Spark is due to the presence of a Resilient Distributed Dataset (RDD).
What are the four main components of Spark? ›Apache Spark consists of Spark Core Engine, Spark SQL, Spark Streaming, MLlib, GraphX, and Spark R. You can use Spark Core Engine along with any of the other five components mentioned above.
How does Spark handle node failure? ›In case if the worker fails, the executors in that worker node will be killed, along with the data in their memory. Using the lineage graph, those tasks will be accomplished in any other worker nodes. The data is also replicated to other worker nodes to achieve fault tolerance.
Why is my Spark job so slow? ›Sometimes, Spark runs slowly because there are too many concurrent tasks running. The capacity for high concurrency is a beneficial feature, as it provides Spark-native fine-grained sharing. This leads to maximum resource utilization while cutting down query latencies.
What is job failure? ›A job fails due to one or more steps failing. A job succeeds, but one or more steps fail and we wish to report on those failures. All steps in a job succeed, but the job itself fails. This is often the result of a job configuration issue. A job fails that has no steps defined for it.