Use the right level of parallelism

Clusters will not be fully utilized unless the level of parallelism for each operation is high enough. Spark automatically sets the number of partitions of an input file according to its size and for distributed shuffles. By default spark create one partition for each block of the file in HDFS it is 64MB by default.

You can also pass second argument as a number of partition when creating RDD.

Let see example of creating RDD of text file:

val rdd= sc.textFile(“file.txt”,5)

above statement make a RDD of textFile with 5 partition. Now if we have a cluster with 4 cores then each partition need to process 5 minutes so 4 partition process parallel and 5 partition process after that whenever core will be free so it so final result will be completed in 10 minutes and resources also ideal while only one partition process.

So to overcome this problem we should make RDD with number of partition is equal to number of cores in the cluster by this all partition will process parallel and resources are also used equally.

As a rule of thumb tasks should take at least 100 ms to execute; you can ensure that this is the case by monitoring the task execution latency from the Spark Shell. If your tasks take considerably longer than that keep increasing the level of parallelism, by say 1.5, until performance stops improving.

DataFrame create a number of partitions equal to spark.sql.shuffle.partitions parameter. spark.sql.shuffle.partitions's default value is 200.

How to estimate the size of a Dataset

An approximated calculation for the size of a dataset is

number Of Megabytes = M = (N*V*W) / 1024^2

where

    N  =  number of records

    V  =  number of variables

    W  =  average width in bytes of a variable

In approximating W, remember:

Type of variable Width
Integers, −127 <= x <= 100 1
Integers, 32,767 <= x <= 32,740 2
Integers, -2,147,483,647 <= x <= 2,147,483,620 4
Floats single precision 4
Floats double precision 8
Strings maximum lenght

Say that you have a 20,000-observation dataset. That dataset contains

    1  string identifier of length 20                     20

    10  small integers (1 byte each)                      10

    4  standard integers (2 bytes each)                    8

    5  floating-point numbers (4 bytes each)              20

    --------------------------------------------------------

    20  variables total                                   58

Thus the average width of a variable is

W = 58/20 = 2.9  bytes

The size of your dataset is

M = 20000*20*2.9/1024^2 = 1.13 megabytes

This result slightly understates the size of the dataset because we have not included any variable labels, value labels, or notes that you might add to the data. That does not amount to much. For instance, imagine that you added variable labels to all 20 variables and that the average length of the text of the labels was 22 characters.

That would amount to a total of 20*22=440 bytes or 440/10242=.00042 megabytes.

Explanation of formula

M = 20000*20*2.9/1024^2 = 1.13 megabytes

N*V*W is, of course, the total size of the data. The 1,0242 in the denominator rescales the results to megabytes.

Yes, the result is divided by 1,0242 even though 1,0002 = a million. Computer memory comes in binary increments. Although we think of k as standing for kilo, in the computer business, k is really a “binary” thousand, 210 = 1,024. A megabyte is a binary million—a binary k squared:

1 MB = 1024 KB = 1024*1024 = 1,048,576 bytes

With cheap memory, we sometimes talk about a gigabyte. Here is how a binary gig works:

1 GB = 1024 MB = 10243 = 1,073,741,824 bytes

How to estimate the number of partitions, executor's and driver's params (YARN Cluster Mode)

yarn.nodemanager.resource.memory-mb = ((Node's Ram GB - 2 GB) * 1024) MB
Total Number Of Node's Core = yarn.nodemanager.resource.cpu-vcores

- Executor's params (Worker Node):

  • Executor (VM) x Node = ((total number of Node's core) / 5) - 1

    • 5 is the upper bound for cores per executor because more than 5 cores per executor can degrade HDFS I/O throughput.
    • If the total number of Node's core is less than or equal to 8 we divide It by 2.

    • If the total number of Node's core is equal to 1 the Executor x Node is equal to 1.

  • numExecutors (Number of executorns to launch for this session) = number of Nodes * Executor (VM) x Node

    • The Driver is included in executors.
  • executorCores (Number of cores to use for each executor) = (total number of Node's core - 5 ) / Executor x Node

  • executorMemory (Amount of memory to use per executor process) = (yarn.nodemanager.resource.memory-mb - 1024) / (Executor (VM) x Node + 1)

For the executorMemory We have to take a further reasoning. If you review the BlockManager source code: ./core/src/main/scala/org/apache/spark/storage/BlockManager.scala

You will note that the memory allocation is based on the algorithm:

Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction.

where memoryFraction = spark.storage.memoryFraction and safetyFraction = spark.storage.safetyFraction

The default values of spark.storage.memoryFraction and spark.storage.safetyFraction are respectively 0.6 and 0.9 so the real executorMemory is:

executorMemory = ((yarn.nodemanager.resource.memory-mb - 1024) / (Executor (VM) x Node + 1)) * memoryFraction * safetyFraction.

- Driver's params (Application Master Node):

  • driverCores = executorCores
  • driverMemory = executorMemory

Example

I have 3 Worker nodes and one Application Master Node each with 16 vCPUs, 52 GB memory

yarn.nodemanager.resource.memory-mb = (52 - 2) * 1024 = 51200 MB

yarn.scheduler.maximum-allocation-mb = 20830 MB (Must be greater than executorMemory)

- Executor's params (Worker Node):

  • Executor x Node = (16) / 5 = 2
  • numExecutors = 2 * 4 = 8
  • executorCores = (16 - 5) / 2 = 5
  • executorMemory = ((51200 - 1024) / 3) * 0.6 * 0.9 = 16725,33 MB * 0.6 * 0.9 = 9031,68 MB

- Driver's params (Application Master Node):

  • driverCores = 5
  • driverMemory = 16725,33 MB * 0.6 * 0.9 = 9031,68 MB

See in diagram how params are estimated:Example

I have to process a dataset that have 10.000.000 of rows and 100 double variables.

number of megabytes  =  M  =  10.000.000*100*8/1024^2 =  5.722 megabytes

Partition = 5.722/64 = 89

As in the previous example, I have 3 Worker nodes and one Application Master Node each with 16 vCPUs, 52 GB memory

yarn.nodemanager.resource.memory-mb = (52 - 2) * 1024 = 51200 MB

yarn.scheduler.maximum-allocation-mb = 20830 MB (Must be greater than executorMemory)

- Executor's params (Worker Node):

  • Executor x Node = ((16) / 5) - 1 = 2
  • numExecutors = 2 * 4 = 8
  • executorCores = (16 - 5) / 2 = 5
  • executorMemory = ((51200 - 1024) / 3) * 0.6 * 0.9 = 16725,33 MB * 0.6 * 0.9 = 9031,68 MB

- Driver's params (Application Master Node):

  • driverCores = 5
  • driverMemory = 16725,33 MB * 0.6 * 0.9 = 9031,68 MB

results matching ""

    No results matching ""