I am trying to send a large CSV to kafka. The basic structure is to read a line of the CSV and zip it with the header.
a = dict(zip(header, line.split(",")
This then gets converted to a json with:
message = json.dumps(a)
I then use kafka-python library to send the message
from kafka import SimpleProducer, KafkaClient
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
producer.send_messages("topic", message)
Using PYSPARK I have easily created an RDD of messages from the CSV file
sc = SparkContext()
text = sc.textFile("file.csv")
header = text.first().split(',')
def remove_header(itr_index, itr):
return iter(list(itr)[1:]) if itr_index == 0 else itr
noHeader = text.mapPartitionsWithIndex(remove_header)
messageRDD = noHeader.map(lambda x: json.dumps(dict(zip(header, x.split(","))
Now I want to send these messages: I define a function
def sendkafka(message):
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
return producer.send_messages('topic',message)
Then I create a new RDD to send the messages
sentRDD = messageRDD.map(lambda x: kafkasend(x))
I then call sentRDD.count()
Which starts churning and sending messages
Unfortunately this is very slow. It sends 1000 messages a second. This is on a 10 node cluster of 4 cpus each and 8gb of memory.
In comparison, creating the messages takes about 7 seconds on a 10 million row csv. ~ about 2gb
I think the issue is that I am instantiating a kafka producer inside the function. However, if I don't then spark complains that the producer doesn't exist even though I have tried defining it globally.
Perhaps someone can shed some light on how this problem may be approached.
Thank you,
You can create a single producer per partition and use either mapPartitions
or foreachPartition
:
def sendkafka(messages):
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
for message in messages:
yield producer.send_messages('topic', message)
sentRDD = messageRDD.mapPartitions(sendkafka)
If above alone won't help you can try to extend it using an asynchronous producer.
In Spark 2.x it is also possible to use Kafka data source. You'll have to include spark-sql-kafka
jar, matching Spark and Scala version (here 2.2.0 and 2.11 respectively):
spark.jars.packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0
convert data to a DataFrame
(if it is not DataFrame
already):
messageDF = spark.createDataFrame(messageRDD, "string")
and write using DataFrameWriter
:
(messageDF.write
.format("kafka")
.option("topic", topic_name)
.option("kafka.bootstrap.servers", bootstrap_servers)
.save())