I'm preparing a toy spark.ml
example. Spark version 1.6.0
, running on top of Oracle JDK version 1.8.0_65
, pyspark, ipython notebook.
First, it hardly has anything to do with Spark, ML, StringIndexer: handling unseen labels. The exception is thrown while fitting a pipeline to a dataset, not transforming it. And suppressing the exception might not be a solution here, since, I'm afraid, the dataset gets messed pretty bad in this case.
My dataset is about 800Mb uncompressed, so it might be hard to reproduce (smaller subsets seem to dodge this issue).
The dataset looks like this:
+--------------------+-----------+-----+-------+-----+--------------------+
| url| ip| rs| lang|label| txt|
+--------------------+-----------+-----+-------+-----+--------------------+
|http://3d-detmold...|217.160.215|378.0| de| 0.0|homwillkommskip c...|
| http://3davto.ru/| 188.225.16|891.0| id| 1.0|оформить заказ пе...|
| http://404.szm.com/| 85.248.42| 58.0| cs| 0.0|kliknite tu alebo...|
| http://404.xls.hu/| 212.52.166|168.0| hu| 0.0|honlapkészítés404...|
|http://a--m--a--t...| 66.6.43|462.0| en| 0.0|back top archiv r...|
|http://a-wrf.ru/c...| 78.108.80|126.0|unknown| 1.0| |
|http://a-wrf.ru/s...| 78.108.80|214.0| ru| 1.0|установк фаркопна...|
+--------------------+-----------+-----+-------+-----+--------------------+
The value being predicted is label
. The whole pipeline applied to it:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, Tokenizer, HashingTF
from pyspark.ml.classification import LogisticRegression
train, test = munge(src_dataframe).randomSplit([70., 30.], seed=12345)
pipe_stages = [
StringIndexer(inputCol='lang', outputCol='lang_idx'),
OneHotEncoder(inputCol='lang_idx', outputCol='lang_onehot'),
Tokenizer(inputCol='ip', outputCol='ip_tokens'),
HashingTF(numFeatures=2**10, inputCol='ip_tokens', outputCol='ip_vector'),
Tokenizer(inputCol='txt', outputCol='txt_tokens'),
HashingTF(numFeatures=2**18, inputCol='txt_tokens', outputCol='txt_vector'),
VectorAssembler(inputCols=['lang_onehot', 'ip_vector', 'txt_vector'], outputCol='features'),
LogisticRegression(labelCol='label', featuresCol='features')
]
pipe = Pipeline(stages=pipe_stages)
pipemodel = pipe.fit(train)
And here is the stacktrace:
Py4JJavaError: An error occurred while calling o10793.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 627.0 failed 1 times, most recent failure: Lost task 18.0 in stage 627.0 (TID 23259, localhost): org.apache.spark.SparkException: Unseen label: pl-PL.
at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157)
at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1113)
at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:271)
at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:159)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:71)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Unseen label: pl-PL.
at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157)
at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
The most interesting line is:
org.apache.spark.SparkException: Unseen label: pl-PL.
No idea, how edited: some hasty coclusions, corrected thanks to @zero323pl-PL
which is a value from lang
column could have gotten mixed up in the label
column, which is a float
, not string
I've looked further into it and found, that pl-PL
is a value from the testing part of the dataset, not training. So now I don't even know where to look for the culprit: it might easily be the randomSplit
code, not StringIndexer
, and who knows what else.
How do I investigate this?
Unseen label
is a generic message which doesn't correspond to a specific column. Most likely problem is with a following stage:
StringIndexer(inputCol='lang', outputCol='lang_idx')
with pl-PL
present in train("lang")
and not present in test("lang")
.
You can correct it using setHandleInvalid
with skip
:
from pyspark.ml.feature import StringIndexer
train = sc.parallelize([(1, "foo"), (2, "bar")]).toDF(["k", "v"])
test = sc.parallelize([(3, "foo"), (4, "foobar")]).toDF(["k", "v"])
indexer = StringIndexer(inputCol="v", outputCol="vi")
indexer.fit(train).transform(test).show()
## Py4JJavaError: An error occurred while calling o112.showString.
## : org.apache.spark.SparkException: Job aborted due to stage failure:
## ...
## org.apache.spark.SparkException: Unseen label: foobar.
indexer.setHandleInvalid("skip").fit(train).transform(test).show()
## +---+---+---+
## | k| v| vi|
## +---+---+---+
## | 3|foo|1.0|
## +---+---+---+
or, in the latest versions, keep
:
indexer.setHandleInvalid("keep").fit(train).transform(test).show()
## +---+------+---+
## | k| v| vi|
## +---+------+---+
## | 3| foo|0.0|
## | 4|foobar|2.0|
## +---+------+---+