I have some use cases that I would like to be more clarified, about Kafka topic partitioning -> spark streaming resource utilization.
I use spark standalone mode, so only settings I have are "total number of executors" and "executor memory". As far as I know and according to documentation, way to introduce parallelism into Spark streaming is using partitioned Kafka topic -> RDD will have same number of partitions as kafka, when I use spark-kafka direct stream integration.
So if I have 1 partition in the topic, and 1 executor core, that core will sequentially read from Kafka.
What happens if I have:
2 partitions in the topic and only 1 executor core? Will that core read first from one partition and then from the second one, so there will be no benefit in partitioning the topic?
2 partitions in the topic and 2 cores? Will then 1 executor core read from 1 partition, and second core from the second partition?
1 kafka partition and 2 executor cores?
Thank you.
The basic rule is that you can scale up to the number of Kafka partitions. If you set spark.executor.cores
greater than the number of partitions, some of the threads will be idle. If it's less than the number of partitions, Spark will have threads read from one partition then the other. So:
2 partitions, 1 executor: reads from one partition then then other. (I am not sure how Spark decides how much to read from each before switching)
2p, 2c: parallel execution
1p, 2c: one thread is idle
For case #1, note that having more partitions than executors is OK since it allows you to scale out later without having to re-partition. The trick is to make sure that your partitions are evenly divisible by the number of executors. Spark has to process all the partitions before passing data onto the next step in the pipeline. So, if you have 'remainder' partitions, this can slow down processing. For example, 5 partitions and 4 threads => processing takes the time of 2 partitions - 4 at once then one thread running the 5th partition by itself.
Also note that you may also see better processing throughput if you keep the number of partitions / RDDs the same throughout the pipeline by explicitly setting the number of data partitions in functions like reduceByKey()
.