Sending Large CSV to Kafka using python Spark

Phineas Dashevsky picture Phineas Dashevsky · Aug 31, 2015 · Viewed 7.3k times · Source

I am trying to send a large CSV to kafka. The basic structure is to read a line of the CSV and zip it with the header.

a = dict(zip(header, line.split(",")

This then gets converted to a json with:

message = json.dumps(a)

I then use kafka-python library to send the message

from kafka import SimpleProducer, KafkaClient
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
producer.send_messages("topic", message)

Using PYSPARK I have easily created an RDD of messages from the CSV file

sc = SparkContext()
text = sc.textFile("file.csv")
header = text.first().split(',')
def remove_header(itr_index, itr):
    return iter(list(itr)[1:]) if itr_index == 0 else itr
noHeader = text.mapPartitionsWithIndex(remove_header)

messageRDD = noHeader.map(lambda x: json.dumps(dict(zip(header, x.split(","))

Now I want to send these messages: I define a function

def sendkafka(message):
  kafka = KafkaClient("localhost:9092")
  producer = SimpleProducer(kafka)
  return producer.send_messages('topic',message)

Then I create a new RDD to send the messages

sentRDD = messageRDD.map(lambda x: kafkasend(x))

I then call sentRDD.count()

Which starts churning and sending messages

Unfortunately this is very slow. It sends 1000 messages a second. This is on a 10 node cluster of 4 cpus each and 8gb of memory.

In comparison, creating the messages takes about 7 seconds on a 10 million row csv. ~ about 2gb

I think the issue is that I am instantiating a kafka producer inside the function. However, if I don't then spark complains that the producer doesn't exist even though I have tried defining it globally.

Perhaps someone can shed some light on how this problem may be approached.

Thank you,

Answer

zero323 picture zero323 · Sep 1, 2015

You can create a single producer per partition and use either mapPartitions or foreachPartition:

def sendkafka(messages):
    kafka = KafkaClient("localhost:9092")
    producer = SimpleProducer(kafka)
    for message in messages:
        yield producer.send_messages('topic', message)

sentRDD = messageRDD.mapPartitions(sendkafka)

If above alone won't help you can try to extend it using an asynchronous producer.

In Spark 2.x it is also possible to use Kafka data source. You'll have to include spark-sql-kafka jar, matching Spark and Scala version (here 2.2.0 and 2.11 respectively):

spark.jars.packages  org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

convert data to a DataFrame (if it is not DataFrame already):

messageDF = spark.createDataFrame(messageRDD, "string")

and write using DataFrameWriter:

(messageDF.write
    .format("kafka")
    .option("topic", topic_name)
    .option("kafka.bootstrap.servers", bootstrap_servers)
    .save())