# Use the right level of parallelism

Clusters will not be fully utilized unless the level of parallelism for each operation is high enough. **Spark automatically sets the number of partitions of an input file according to its size and for distributed shuffles**. By default **spark create one partition for each block of the file in HDFS it is 64MB by default**.

You can also pass second argument as a number of partition when creating RDD.

Let see example of creating RDD of text file:

```
val rdd= sc.textFile(“file.txt”,5)
```

above statement make a RDD of textFile with 5 partition. Now if we have a cluster with 4 cores then each partition need to process 5 minutes so 4 partition process parallel and 5 partition process after that whenever core will be free so it so final result will be completed in 10 minutes and resources also ideal while only one partition process.

So to overcome this problem we should **make RDD with number of partition is equal to number of cores in the cluster** by this *all partition will process parallel and resources are also used equally.*

**As a rule of thumb tasks should take at least 100 ms to execute**; you can ensure that this is the case by monitoring the task execution latency from the Spark Shell. If your tasks take considerably longer than that keep increasing the level of parallelism, by say 1.5, until performance stops improving.

**DataFrame **create a number of partitions equal to **spark.sql.shuffle.partitions** parameter. **spark.sql.shuffle.partitions**'s default value is 200.

## How to estimate the size of a Dataset

An approximated calculation for the size of a dataset is

```
number Of Megabytes = M = (N*V*W) / 1024^2
```

where

```
N = number of records
V = number of variables
W = average width in bytes of a variable
```

In approximating **W**, remember:

Type of variable |
Width |
---|---|

Integers, −127 <= x <= 100 | 1 |

Integers, 32,767 <= x <= 32,740 | 2 |

Integers, -2,147,483,647 <= x <= 2,147,483,620 | 4 |

Floats single precision | 4 |

Floats double precision | 8 |

Strings | maximum lenght |

Say that you have a 20,000-observation dataset. That dataset contains

```
1 string identifier of length 20 20
10 small integers (1 byte each) 10
4 standard integers (2 bytes each) 8
5 floating-point numbers (4 bytes each) 20
--------------------------------------------------------
20 variables total 58
```

Thus the average width of a variable is

```
W = 58/20 = 2.9 bytes
```

The size of your dataset is

```
M = 20000*20*2.9/1024^2 = 1.13 megabytes
```

This result slightly understates the size of the dataset because we have not included any variable labels, value labels, or notes that you might add to the data. That does not amount to much. For instance, imagine that you added variable labels to all 20 variables and that the average length of the text of the labels was 22 characters.

That would amount to a total of 20*22=440 bytes or 440/10242=.00042 megabytes.

**Explanation of formula**

```
M = 20000*20*2.9/1024^2 = 1.13 megabytes
```

N*V*W is, of course, the total size of the data. The 1,0242 in the denominator rescales the results to megabytes.

Yes, the result is divided by 1,0242 even though 1,0002 = a million. Computer memory comes in binary increments. Although we think of k as standing for kilo, in the computer business, k is really a “binary” thousand, 210 = 1,024. A megabyte is a binary million—a binary k squared:

```
1 MB = 1024 KB = 1024*1024 = 1,048,576 bytes
```

With cheap memory, we sometimes talk about a gigabyte. Here is how a binary gig works:

```
1 GB = 1024 MB = 10243 = 1,073,741,824 bytes
```

## How to estimate the number of partitions, executor's and driver's params **(YARN Cluster Mode)**

```
yarn.nodemanager.resource.memory-mb = ((Node's Ram GB - 2 GB) * 1024) MB
```

```
Total Number Of Node's Core = yarn.nodemanager.resource.cpu-vcores
```

**-** **Executor's params (Worker Node):**

`Executor (VM) x Node = ((total number of Node's core) / 5) - 1`

- 5 is the upper bound for cores per executor because more than 5 cores per executor can degrade HDFS I/O throughput.
If the total number of Node's core is less than or equal to 8 we divide It by 2.

If the total number of Node's core is equal to 1 the Executor x Node is equal to 1.

`numExecutors (Number of executorns to launch for this session) = number of Nodes * Executor (VM) x Node`

- The Driver is included in executors.

`executorCores (Number of cores to use for each executor) = (total number of Node's core - 5 ) / Executor x Node`

`executorMemory (Amount of memory to use per executor process) = (yarn.nodemanager.resource.memory-mb - 1024) / (Executor (VM) x Node + 1)`

For the **executorMemory **We have to take a further reasoning. If you review the **BlockManager **source code: ./core/src/main/scala/org/apache/spark/storage/BlockManager.scala

You will note that the memory allocation is based on the algorithm:

`Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction.`

where **memoryFraction = spark.storage.memoryFraction** and **safetyFraction = spark.storage.safetyFraction**

The default values of **spark.storage.memoryFraction** and **spark.storage.safetyFraction** are respectively **0.6** and **0.9 **so the real **executorMemory **is:

`executorMemory = ((yarn.nodemanager.resource.memory-mb - 1024) / (Executor (VM) x Node + 1)) * memoryFraction * safetyFraction.`

**- Driver's params (Application Master Node):**

`driverCores = executorCores`

`driverMemory = executorMemory`

**Example**

I have 3 Worker nodes and one Application Master Node each with **16 vCPUs, 52 GB memory**

`yarn.nodemanager.resource.memory-mb = (52 - 2) * 1024 = 51200 MB`

`yarn.scheduler.maximum-allocation-mb = 20830 MB (Must be greater than executorMemory)`

**-** Executor's params (Worker Node):

`Executor x Node = (16) / 5 = 2`

`numExecutors = 2 * 4 = 8`

`executorCores = (16 - 5) / 2 = 5`

`executorMemory = ((51200 - 1024) / 3) * 0.6 * 0.9 = 16725,33 MB * 0.6 * 0.9 = 9031,68 MB`

**- **Driver's params (Application Master Node):

`driverCores = 5`

`driverMemory = 16725,33 MB * 0.6 * 0.9 = 9031,68 MB`

See in diagram how params are estimated:**Example**

I have to process a dataset that have 10.000.000 of rows and 100 double variables.

```
number of megabytes = M = 10.000.000*100*8/1024^2 = 5.722 megabytes
```

`Partition = 5.722/64 = 89`

As in the previous example, I have 3 **Worker **nodes and one **Application Master** Node each with **16 vCPUs, 52 GB memory**

`yarn.nodemanager.resource.memory-mb = (52 - 2) * 1024 = 51200 MB`

`yarn.scheduler.maximum-allocation-mb = 20830 MB (Must be greater than executorMemory)`

**-** Executor's params (Worker Node):

`Executor x Node = ((16) / 5) - 1 = 2`

`numExecutors = 2 * 4 = 8`

`executorCores = (16 - 5) / 2 = 5`

`executorMemory = ((51200 - 1024) / 3) * 0.6 * 0.9 = 16725,33 MB * 0.6 * 0.9 = 9031,68 MB`

**- **Driver's params (Application Master Node):

`driverCores = 5`

`driverMemory = 16725,33 MB * 0.6 * 0.9 = 9031,68 MB`