pyspark generate row hash of specific columns and add it as a new column

msashish picture msashish · Sep 12, 2018 · Viewed 19.5k times · Source

I am working with spark 2.2.0 and pyspark2.

I have created a DataFrame df and now trying to add a new column "rowhash" that is the sha2 hash of specific columns in the DataFrame.

For example, say that df has the columns: (column1, column2, ..., column10)

I require sha2((column2||column3||column4||...... column8), 256) in a new column "rowhash".

For now, I tried using below methods:

1) Used hash() function but since it gives an integer output it is of not much use

2) Tried using sha2() function but it is failing.

Say columnarray has array of columns I need.

def concat(columnarray):
    concat_str = ''
    for val in columnarray:
        concat_str = concat_str + '||' + str(val) 
    concat_str = concat_str[2:] 
    return concat_str 

and then

df1 = df1.withColumn("row_sha2", sha2(concat(columnarray),256))

This is failing with "cannot resolve" error.

Thanks gaw for your answer. Since I have to hash only specific columns, I created a list of those column names (in hash_col) and changed your function as :

 def sha_concat(row, columnarray):
   row_dict = row.asDict()      #transform row to a dict
   concat_str = '' 
   for v in columnarray: 
       concat_str = concat_str + '||' + str(row_dict.get(v)) 
   concat_str = concat_str[2:] 
   #preserve concatenated value for testing (this can be removed later)
   row_dict["sha_values"] = concat_str  
   row_dict["sha_hash"] = hashlib.sha256(concat_str).hexdigest()
   return Row(**row_dict) 

Then passed as :

    df1.rdd.map(lambda row: sha_concat(row,hash_col)).toDF().show(truncate=False)

It is now however failing with error:

    UnicodeEncodeError: 'ascii' codec can't encode character u'\ufffd' in position 8: ordinal not in range(128)

I can see value of \ufffd in one of the column so I am unsure if there is a way to handle this ?

Answer

pault picture pault · Sep 12, 2018

You can use pyspark.sql.functions.concat_ws() to concatenate your columns and pyspark.sql.functions.sha2() to get the SHA256 hash.

Using the data from @gaw:

from pyspark.sql.functions import sha2, concat_ws
df = spark.createDataFrame(
    [(1,"2",5,1),(3,"4",7,8)],
    ("col1","col2","col3","col4")
)
df.withColumn("row_sha2", sha2(concat_ws("||", *df.columns), 256)).show(truncate=False)
#+----+----+----+----+----------------------------------------------------------------+
#|col1|col2|col3|col4|row_sha2                                                        |
#+----+----+----+----+----------------------------------------------------------------+
#|1   |2   |5   |1   |1b0ae4beb8ce031cf585e9bb79df7d32c3b93c8c73c27d8f2c2ddc2de9c8edcd|
#|3   |4   |7   |8   |57f057bdc4178b69b1b6ab9d78eabee47133790cba8cf503ac1658fa7a496db1|
#+----+----+----+----+----------------------------------------------------------------+

You can pass in either 0 or 256 as the second argument to sha2(), as per the docs:

Returns the hex string result of SHA-2 family of hash functions (SHA-224, SHA-256, SHA-384, and SHA-512). The numBits indicates the desired bit length of the result, which must have a value of 224, 256, 384, 512, or 0 (which is equivalent to 256).

The function concat_ws takes in a separator, and a list of columns to join. I am passing in || as the separator and df.columns as the list of columns.

I am using all of the columns here, but you can specify whatever subset of columns you'd like- in your case that would be columnarray. (You need to use the * to unpack the list.)