I have a set of files. The path to the files are saved in a file., say all_files.txt
. Using apache spark, I need to do an operation on all the files and club the results.
The steps that I want to do are:
all_files.txt
all_files.txt
(Each line is a path to some file),
read the contents of each of the files into a single RDDThis is the code I wrote for the same:
def return_contents_from_file (file_name):
return spark.read.text(file_name).rdd.map(lambda r: r[0])
def run_spark():
file_name = 'path_to_file'
spark = SparkSession \
.builder \
.appName("PythonWordCount") \
.getOrCreate()
counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) \ # this line is supposed to return the paths to each file
.flatMap(return_contents_from_file) \ # here i am expecting to club all the contents of all files
.flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files
This is throwing the error:
line 323, in get_return_value py4j.protocol.Py4JError: An error occurred while calling o25.getnewargs. Trace: py4j.Py4JException: Method getnewargs([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:272) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745)
Can someone please tell me what I am doing wrong and how I should proceed further. Thanks in advance.
Using spark
inside flatMap
or any transformation that occures on executors is not allowed (spark
session is available on driver only). It is also not possible to create RDD of RDDs (see: Is it possible to create nested RDDs in Apache Spark?)
But you can achieve this transformation in another way - read all content of all_files.txt
into dataframe, use local map
to make them dataframes and local reduce
to union all, see example:
>>> filenames = spark.read.text('all_files.txt').collect()
>>> dataframes = map(lambda r: spark.read.text(r[0]), filenames)
>>> all_lines_df = reduce(lambda df1, df2: df1.unionAll(df2), dataframes)