Pyspark: PicklingError: Could not serialize object:
up vote
4
down vote
favorite
I have the following two data frames: df_whitelist and df_text
+-------+--------------------+
|keyword| whitelist_terms |
+-------+--------------------+
| LA| LA city|
| LA| US LA in da |
| client|this client has i...|
| client|our client has do...|
+-------+--------------------+
+--------------------+----------+
| Text| Keywords|
+--------------------+----------+
|the client as ada...|client;ada|
|this client has l...| client;LA|
+--------------------+----------+
In df_whitelist, each keyword corresponds to a set of terms, e.g. keyword LA corresponds to “LA city” and “US LA in da”.
In df_text, I have text and some keywords found in this text.
What I want to do is that for each piece of text, such as “the client has ada..”, for each of its keywords “client” and “ada”, check through all the whitelist terms for that keyword, to see how many times the term occurred in the text.
what I have tried is like following:
import pyspark.sql.functions as F
import pyspark.sql.types as T
import re
def whitelisting(text,listOfKeyword,df_whitelist):
keywords = listOfKeyword.split(";")
found_whiteterms_count = 0
for k in keywords:
if df_whitelist.filter(df_whitelist.keyword == k).count() == 0:
found_whiteterms_count = found_whiteterms_count + 0
else:
df = df_whitelist.filter(df_whitelist.keyword == k).select("whitelist_terms")
n = df.rdd.map(lambda x:len(re.findall(x["whitelist_terms"],text))).reduce(lambda x, y: x+y)
found_whiteterms_count = found_whiteterms_count + n
return found_whiteterms_count
whitelisting_udf = F.udf(lambda text,listOfKeyword: whitelisting(text,listOfKeyword,df_whitelist),T.IntegerType())
text.withColumn("whitelist_counts", whitelisting_udf(text.Text,text.Keywords))
and I got the error:
PicklingError: Could not serialize object: Py4JError: An error occurred while calling o1153.__getstate__. Trace:
py4j.Py4JException: Method __getstate__() does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.base/java.lang.Thread.run(Thread.java:844)
I can not figure out after trying for some time. Could anybody kindly help to point out the problem and how to fix it. Thanks.
pyspark pickle user-defined-functions
add a comment |
up vote
4
down vote
favorite
I have the following two data frames: df_whitelist and df_text
+-------+--------------------+
|keyword| whitelist_terms |
+-------+--------------------+
| LA| LA city|
| LA| US LA in da |
| client|this client has i...|
| client|our client has do...|
+-------+--------------------+
+--------------------+----------+
| Text| Keywords|
+--------------------+----------+
|the client as ada...|client;ada|
|this client has l...| client;LA|
+--------------------+----------+
In df_whitelist, each keyword corresponds to a set of terms, e.g. keyword LA corresponds to “LA city” and “US LA in da”.
In df_text, I have text and some keywords found in this text.
What I want to do is that for each piece of text, such as “the client has ada..”, for each of its keywords “client” and “ada”, check through all the whitelist terms for that keyword, to see how many times the term occurred in the text.
what I have tried is like following:
import pyspark.sql.functions as F
import pyspark.sql.types as T
import re
def whitelisting(text,listOfKeyword,df_whitelist):
keywords = listOfKeyword.split(";")
found_whiteterms_count = 0
for k in keywords:
if df_whitelist.filter(df_whitelist.keyword == k).count() == 0:
found_whiteterms_count = found_whiteterms_count + 0
else:
df = df_whitelist.filter(df_whitelist.keyword == k).select("whitelist_terms")
n = df.rdd.map(lambda x:len(re.findall(x["whitelist_terms"],text))).reduce(lambda x, y: x+y)
found_whiteterms_count = found_whiteterms_count + n
return found_whiteterms_count
whitelisting_udf = F.udf(lambda text,listOfKeyword: whitelisting(text,listOfKeyword,df_whitelist),T.IntegerType())
text.withColumn("whitelist_counts", whitelisting_udf(text.Text,text.Keywords))
and I got the error:
PicklingError: Could not serialize object: Py4JError: An error occurred while calling o1153.__getstate__. Trace:
py4j.Py4JException: Method __getstate__() does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.base/java.lang.Thread.run(Thread.java:844)
I can not figure out after trying for some time. Could anybody kindly help to point out the problem and how to fix it. Thanks.
pyspark pickle user-defined-functions
add a comment |
up vote
4
down vote
favorite
up vote
4
down vote
favorite
I have the following two data frames: df_whitelist and df_text
+-------+--------------------+
|keyword| whitelist_terms |
+-------+--------------------+
| LA| LA city|
| LA| US LA in da |
| client|this client has i...|
| client|our client has do...|
+-------+--------------------+
+--------------------+----------+
| Text| Keywords|
+--------------------+----------+
|the client as ada...|client;ada|
|this client has l...| client;LA|
+--------------------+----------+
In df_whitelist, each keyword corresponds to a set of terms, e.g. keyword LA corresponds to “LA city” and “US LA in da”.
In df_text, I have text and some keywords found in this text.
What I want to do is that for each piece of text, such as “the client has ada..”, for each of its keywords “client” and “ada”, check through all the whitelist terms for that keyword, to see how many times the term occurred in the text.
what I have tried is like following:
import pyspark.sql.functions as F
import pyspark.sql.types as T
import re
def whitelisting(text,listOfKeyword,df_whitelist):
keywords = listOfKeyword.split(";")
found_whiteterms_count = 0
for k in keywords:
if df_whitelist.filter(df_whitelist.keyword == k).count() == 0:
found_whiteterms_count = found_whiteterms_count + 0
else:
df = df_whitelist.filter(df_whitelist.keyword == k).select("whitelist_terms")
n = df.rdd.map(lambda x:len(re.findall(x["whitelist_terms"],text))).reduce(lambda x, y: x+y)
found_whiteterms_count = found_whiteterms_count + n
return found_whiteterms_count
whitelisting_udf = F.udf(lambda text,listOfKeyword: whitelisting(text,listOfKeyword,df_whitelist),T.IntegerType())
text.withColumn("whitelist_counts", whitelisting_udf(text.Text,text.Keywords))
and I got the error:
PicklingError: Could not serialize object: Py4JError: An error occurred while calling o1153.__getstate__. Trace:
py4j.Py4JException: Method __getstate__() does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.base/java.lang.Thread.run(Thread.java:844)
I can not figure out after trying for some time. Could anybody kindly help to point out the problem and how to fix it. Thanks.
pyspark pickle user-defined-functions
I have the following two data frames: df_whitelist and df_text
+-------+--------------------+
|keyword| whitelist_terms |
+-------+--------------------+
| LA| LA city|
| LA| US LA in da |
| client|this client has i...|
| client|our client has do...|
+-------+--------------------+
+--------------------+----------+
| Text| Keywords|
+--------------------+----------+
|the client as ada...|client;ada|
|this client has l...| client;LA|
+--------------------+----------+
In df_whitelist, each keyword corresponds to a set of terms, e.g. keyword LA corresponds to “LA city” and “US LA in da”.
In df_text, I have text and some keywords found in this text.
What I want to do is that for each piece of text, such as “the client has ada..”, for each of its keywords “client” and “ada”, check through all the whitelist terms for that keyword, to see how many times the term occurred in the text.
what I have tried is like following:
import pyspark.sql.functions as F
import pyspark.sql.types as T
import re
def whitelisting(text,listOfKeyword,df_whitelist):
keywords = listOfKeyword.split(";")
found_whiteterms_count = 0
for k in keywords:
if df_whitelist.filter(df_whitelist.keyword == k).count() == 0:
found_whiteterms_count = found_whiteterms_count + 0
else:
df = df_whitelist.filter(df_whitelist.keyword == k).select("whitelist_terms")
n = df.rdd.map(lambda x:len(re.findall(x["whitelist_terms"],text))).reduce(lambda x, y: x+y)
found_whiteterms_count = found_whiteterms_count + n
return found_whiteterms_count
whitelisting_udf = F.udf(lambda text,listOfKeyword: whitelisting(text,listOfKeyword,df_whitelist),T.IntegerType())
text.withColumn("whitelist_counts", whitelisting_udf(text.Text,text.Keywords))
and I got the error:
PicklingError: Could not serialize object: Py4JError: An error occurred while calling o1153.__getstate__. Trace:
py4j.Py4JException: Method __getstate__() does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.base/java.lang.Thread.run(Thread.java:844)
I can not figure out after trying for some time. Could anybody kindly help to point out the problem and how to fix it. Thanks.
pyspark pickle user-defined-functions
pyspark pickle user-defined-functions
asked Nov 12 '17 at 13:21
user1740987
2814
2814
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
up vote
4
down vote
accepted
You are passing a pyspark dataframe, df_whitelist
to a UDF
, pyspark dataframes cannot be pickled. You are also doing computations on a dataframe inside a UDF
which is not acceptable (not possible). Keep in mind that your function is going to be called as many times as the number of rows in your dataframe, so you should keep computations simple. and do it only if it couldn't be done with pyspark sql functions.
What you need to do instead, is to join the two dataframes on keyword
.
Let's start with the two sample dataframes you provided:
df_whitelist = spark.createDataFrame(
[["LA", "LA city"], ["LA", "US LA in da"], ["client", "this client has i"], ["client", "our client"]],
["keyword", "whitelist_terms"])
df_text = spark.createDataFrame(
[["the client as ada", "client;ada"], ["this client has l", "client;LA"]],
["Text", "Keywords"])
Column Keywords
in df_text
needs some processing, we have to transform the string into an array and then explode it so that we only have one item per line:
import pyspark.sql.functions as F
df_text = df_text.select("Text", F.explode(F.split("Keywords", ";")).alias("keyword"))
+-----------------+-------+
| Text|keyword|
+-----------------+-------+
|the client as ada| client|
|the client as ada| ada|
|this client has l| client|
|this client has l| LA|
+-----------------+-------+
Now we can join the two data frames on keyword
:
df = df_text.join(df_whitelist, "keyword", "leftouter")
+-------+-----------------+-----------------+
|keyword| Text| whitelist_terms|
+-------+-----------------+-----------------+
| LA|this client has l| LA city|
| LA|this client has l| US LA in da|
| ada|the client as ada| null|
| client|the client as ada|this client has i|
| client|the client as ada| our client|
| client|this client has l|this client has i|
| client|this client has l| our client|
+-------+-----------------+-----------------+
The first condition you invoke in your
UDF
can be translated as follows: ifkeyword
indf_text
is not present indf_whitelist
then 0. It is equivalent to saying the value fordf_whitelist
columns are going to be NULL in theleft join
since they only appear in the left data frameThe second condition: you count the number of times
whitelist_terms
appear inText
:Text.count(whitelist_terms)
We'll write a UDF
to do this:
from pyspark.sql.types import IntegerType
count_terms = F.udf(lambda Text, term: Text.count(term) if term is not None else 0, IntegerType())
df = df.select(
"Text",
"keyword",
F.when(F.isnull("whitelist_terms"), 0).otherwise(count_terms("Text", "whitelist_terms")).alias("whitelist_counts"))
+-----------------+-------+----------------+
| Text|keyword|whitelist_counts|
+-----------------+-------+----------------+
|this client has l| LA| 0|
|this client has l| LA| 0|
|the client as ada| ada| 0|
|the client as ada| client| 0|
|the client as ada| client| 0|
|this client has l| client| 0|
|this client has l| client| 0|
+-----------------+-------+----------------+
Finally we can aggregate to get back to a dataframe with only distinct Text
:
res = df.groupBy("Text").agg(
F.collect_set("keyword").alias("Keywords"),
F.sum("whitelist_counts").alias("whitelist_counts"))
res.show()
+-----------------+-------------+----------------+
| Text| Keywords|whitelist_counts|
+-----------------+-------------+----------------+
|this client has l| [client, LA]| 0|
|the client as ada|[ada, client]| 0|
+-----------------+-------------+----------------+
Many many thanks MaFF, much appreciate the neat solution with clear explanation. Just one point, given that there are millions of text items, 50 keywords, and each keyword might be with a thousand terms. Would be the Cartesian join most efficient solution? Or is there still some alternatives in spark for better efficiency?
– user1740987
Nov 12 '17 at 16:47
I am not sure what you mean exactly. A cartesian product will be slower than a join with a key since you would drastically increase the number of lines in your resulting data frame. If one of your dataframes is "small" you should considerbroadcast
ing it (pyspark.sql.functions.broadcast
) inside thejoin
expression, it will increase efficiency
– MaFF
Nov 12 '17 at 17:44
add a comment |
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
4
down vote
accepted
You are passing a pyspark dataframe, df_whitelist
to a UDF
, pyspark dataframes cannot be pickled. You are also doing computations on a dataframe inside a UDF
which is not acceptable (not possible). Keep in mind that your function is going to be called as many times as the number of rows in your dataframe, so you should keep computations simple. and do it only if it couldn't be done with pyspark sql functions.
What you need to do instead, is to join the two dataframes on keyword
.
Let's start with the two sample dataframes you provided:
df_whitelist = spark.createDataFrame(
[["LA", "LA city"], ["LA", "US LA in da"], ["client", "this client has i"], ["client", "our client"]],
["keyword", "whitelist_terms"])
df_text = spark.createDataFrame(
[["the client as ada", "client;ada"], ["this client has l", "client;LA"]],
["Text", "Keywords"])
Column Keywords
in df_text
needs some processing, we have to transform the string into an array and then explode it so that we only have one item per line:
import pyspark.sql.functions as F
df_text = df_text.select("Text", F.explode(F.split("Keywords", ";")).alias("keyword"))
+-----------------+-------+
| Text|keyword|
+-----------------+-------+
|the client as ada| client|
|the client as ada| ada|
|this client has l| client|
|this client has l| LA|
+-----------------+-------+
Now we can join the two data frames on keyword
:
df = df_text.join(df_whitelist, "keyword", "leftouter")
+-------+-----------------+-----------------+
|keyword| Text| whitelist_terms|
+-------+-----------------+-----------------+
| LA|this client has l| LA city|
| LA|this client has l| US LA in da|
| ada|the client as ada| null|
| client|the client as ada|this client has i|
| client|the client as ada| our client|
| client|this client has l|this client has i|
| client|this client has l| our client|
+-------+-----------------+-----------------+
The first condition you invoke in your
UDF
can be translated as follows: ifkeyword
indf_text
is not present indf_whitelist
then 0. It is equivalent to saying the value fordf_whitelist
columns are going to be NULL in theleft join
since they only appear in the left data frameThe second condition: you count the number of times
whitelist_terms
appear inText
:Text.count(whitelist_terms)
We'll write a UDF
to do this:
from pyspark.sql.types import IntegerType
count_terms = F.udf(lambda Text, term: Text.count(term) if term is not None else 0, IntegerType())
df = df.select(
"Text",
"keyword",
F.when(F.isnull("whitelist_terms"), 0).otherwise(count_terms("Text", "whitelist_terms")).alias("whitelist_counts"))
+-----------------+-------+----------------+
| Text|keyword|whitelist_counts|
+-----------------+-------+----------------+
|this client has l| LA| 0|
|this client has l| LA| 0|
|the client as ada| ada| 0|
|the client as ada| client| 0|
|the client as ada| client| 0|
|this client has l| client| 0|
|this client has l| client| 0|
+-----------------+-------+----------------+
Finally we can aggregate to get back to a dataframe with only distinct Text
:
res = df.groupBy("Text").agg(
F.collect_set("keyword").alias("Keywords"),
F.sum("whitelist_counts").alias("whitelist_counts"))
res.show()
+-----------------+-------------+----------------+
| Text| Keywords|whitelist_counts|
+-----------------+-------------+----------------+
|this client has l| [client, LA]| 0|
|the client as ada|[ada, client]| 0|
+-----------------+-------------+----------------+
Many many thanks MaFF, much appreciate the neat solution with clear explanation. Just one point, given that there are millions of text items, 50 keywords, and each keyword might be with a thousand terms. Would be the Cartesian join most efficient solution? Or is there still some alternatives in spark for better efficiency?
– user1740987
Nov 12 '17 at 16:47
I am not sure what you mean exactly. A cartesian product will be slower than a join with a key since you would drastically increase the number of lines in your resulting data frame. If one of your dataframes is "small" you should considerbroadcast
ing it (pyspark.sql.functions.broadcast
) inside thejoin
expression, it will increase efficiency
– MaFF
Nov 12 '17 at 17:44
add a comment |
up vote
4
down vote
accepted
You are passing a pyspark dataframe, df_whitelist
to a UDF
, pyspark dataframes cannot be pickled. You are also doing computations on a dataframe inside a UDF
which is not acceptable (not possible). Keep in mind that your function is going to be called as many times as the number of rows in your dataframe, so you should keep computations simple. and do it only if it couldn't be done with pyspark sql functions.
What you need to do instead, is to join the two dataframes on keyword
.
Let's start with the two sample dataframes you provided:
df_whitelist = spark.createDataFrame(
[["LA", "LA city"], ["LA", "US LA in da"], ["client", "this client has i"], ["client", "our client"]],
["keyword", "whitelist_terms"])
df_text = spark.createDataFrame(
[["the client as ada", "client;ada"], ["this client has l", "client;LA"]],
["Text", "Keywords"])
Column Keywords
in df_text
needs some processing, we have to transform the string into an array and then explode it so that we only have one item per line:
import pyspark.sql.functions as F
df_text = df_text.select("Text", F.explode(F.split("Keywords", ";")).alias("keyword"))
+-----------------+-------+
| Text|keyword|
+-----------------+-------+
|the client as ada| client|
|the client as ada| ada|
|this client has l| client|
|this client has l| LA|
+-----------------+-------+
Now we can join the two data frames on keyword
:
df = df_text.join(df_whitelist, "keyword", "leftouter")
+-------+-----------------+-----------------+
|keyword| Text| whitelist_terms|
+-------+-----------------+-----------------+
| LA|this client has l| LA city|
| LA|this client has l| US LA in da|
| ada|the client as ada| null|
| client|the client as ada|this client has i|
| client|the client as ada| our client|
| client|this client has l|this client has i|
| client|this client has l| our client|
+-------+-----------------+-----------------+
The first condition you invoke in your
UDF
can be translated as follows: ifkeyword
indf_text
is not present indf_whitelist
then 0. It is equivalent to saying the value fordf_whitelist
columns are going to be NULL in theleft join
since they only appear in the left data frameThe second condition: you count the number of times
whitelist_terms
appear inText
:Text.count(whitelist_terms)
We'll write a UDF
to do this:
from pyspark.sql.types import IntegerType
count_terms = F.udf(lambda Text, term: Text.count(term) if term is not None else 0, IntegerType())
df = df.select(
"Text",
"keyword",
F.when(F.isnull("whitelist_terms"), 0).otherwise(count_terms("Text", "whitelist_terms")).alias("whitelist_counts"))
+-----------------+-------+----------------+
| Text|keyword|whitelist_counts|
+-----------------+-------+----------------+
|this client has l| LA| 0|
|this client has l| LA| 0|
|the client as ada| ada| 0|
|the client as ada| client| 0|
|the client as ada| client| 0|
|this client has l| client| 0|
|this client has l| client| 0|
+-----------------+-------+----------------+
Finally we can aggregate to get back to a dataframe with only distinct Text
:
res = df.groupBy("Text").agg(
F.collect_set("keyword").alias("Keywords"),
F.sum("whitelist_counts").alias("whitelist_counts"))
res.show()
+-----------------+-------------+----------------+
| Text| Keywords|whitelist_counts|
+-----------------+-------------+----------------+
|this client has l| [client, LA]| 0|
|the client as ada|[ada, client]| 0|
+-----------------+-------------+----------------+
Many many thanks MaFF, much appreciate the neat solution with clear explanation. Just one point, given that there are millions of text items, 50 keywords, and each keyword might be with a thousand terms. Would be the Cartesian join most efficient solution? Or is there still some alternatives in spark for better efficiency?
– user1740987
Nov 12 '17 at 16:47
I am not sure what you mean exactly. A cartesian product will be slower than a join with a key since you would drastically increase the number of lines in your resulting data frame. If one of your dataframes is "small" you should considerbroadcast
ing it (pyspark.sql.functions.broadcast
) inside thejoin
expression, it will increase efficiency
– MaFF
Nov 12 '17 at 17:44
add a comment |
up vote
4
down vote
accepted
up vote
4
down vote
accepted
You are passing a pyspark dataframe, df_whitelist
to a UDF
, pyspark dataframes cannot be pickled. You are also doing computations on a dataframe inside a UDF
which is not acceptable (not possible). Keep in mind that your function is going to be called as many times as the number of rows in your dataframe, so you should keep computations simple. and do it only if it couldn't be done with pyspark sql functions.
What you need to do instead, is to join the two dataframes on keyword
.
Let's start with the two sample dataframes you provided:
df_whitelist = spark.createDataFrame(
[["LA", "LA city"], ["LA", "US LA in da"], ["client", "this client has i"], ["client", "our client"]],
["keyword", "whitelist_terms"])
df_text = spark.createDataFrame(
[["the client as ada", "client;ada"], ["this client has l", "client;LA"]],
["Text", "Keywords"])
Column Keywords
in df_text
needs some processing, we have to transform the string into an array and then explode it so that we only have one item per line:
import pyspark.sql.functions as F
df_text = df_text.select("Text", F.explode(F.split("Keywords", ";")).alias("keyword"))
+-----------------+-------+
| Text|keyword|
+-----------------+-------+
|the client as ada| client|
|the client as ada| ada|
|this client has l| client|
|this client has l| LA|
+-----------------+-------+
Now we can join the two data frames on keyword
:
df = df_text.join(df_whitelist, "keyword", "leftouter")
+-------+-----------------+-----------------+
|keyword| Text| whitelist_terms|
+-------+-----------------+-----------------+
| LA|this client has l| LA city|
| LA|this client has l| US LA in da|
| ada|the client as ada| null|
| client|the client as ada|this client has i|
| client|the client as ada| our client|
| client|this client has l|this client has i|
| client|this client has l| our client|
+-------+-----------------+-----------------+
The first condition you invoke in your
UDF
can be translated as follows: ifkeyword
indf_text
is not present indf_whitelist
then 0. It is equivalent to saying the value fordf_whitelist
columns are going to be NULL in theleft join
since they only appear in the left data frameThe second condition: you count the number of times
whitelist_terms
appear inText
:Text.count(whitelist_terms)
We'll write a UDF
to do this:
from pyspark.sql.types import IntegerType
count_terms = F.udf(lambda Text, term: Text.count(term) if term is not None else 0, IntegerType())
df = df.select(
"Text",
"keyword",
F.when(F.isnull("whitelist_terms"), 0).otherwise(count_terms("Text", "whitelist_terms")).alias("whitelist_counts"))
+-----------------+-------+----------------+
| Text|keyword|whitelist_counts|
+-----------------+-------+----------------+
|this client has l| LA| 0|
|this client has l| LA| 0|
|the client as ada| ada| 0|
|the client as ada| client| 0|
|the client as ada| client| 0|
|this client has l| client| 0|
|this client has l| client| 0|
+-----------------+-------+----------------+
Finally we can aggregate to get back to a dataframe with only distinct Text
:
res = df.groupBy("Text").agg(
F.collect_set("keyword").alias("Keywords"),
F.sum("whitelist_counts").alias("whitelist_counts"))
res.show()
+-----------------+-------------+----------------+
| Text| Keywords|whitelist_counts|
+-----------------+-------------+----------------+
|this client has l| [client, LA]| 0|
|the client as ada|[ada, client]| 0|
+-----------------+-------------+----------------+
You are passing a pyspark dataframe, df_whitelist
to a UDF
, pyspark dataframes cannot be pickled. You are also doing computations on a dataframe inside a UDF
which is not acceptable (not possible). Keep in mind that your function is going to be called as many times as the number of rows in your dataframe, so you should keep computations simple. and do it only if it couldn't be done with pyspark sql functions.
What you need to do instead, is to join the two dataframes on keyword
.
Let's start with the two sample dataframes you provided:
df_whitelist = spark.createDataFrame(
[["LA", "LA city"], ["LA", "US LA in da"], ["client", "this client has i"], ["client", "our client"]],
["keyword", "whitelist_terms"])
df_text = spark.createDataFrame(
[["the client as ada", "client;ada"], ["this client has l", "client;LA"]],
["Text", "Keywords"])
Column Keywords
in df_text
needs some processing, we have to transform the string into an array and then explode it so that we only have one item per line:
import pyspark.sql.functions as F
df_text = df_text.select("Text", F.explode(F.split("Keywords", ";")).alias("keyword"))
+-----------------+-------+
| Text|keyword|
+-----------------+-------+
|the client as ada| client|
|the client as ada| ada|
|this client has l| client|
|this client has l| LA|
+-----------------+-------+
Now we can join the two data frames on keyword
:
df = df_text.join(df_whitelist, "keyword", "leftouter")
+-------+-----------------+-----------------+
|keyword| Text| whitelist_terms|
+-------+-----------------+-----------------+
| LA|this client has l| LA city|
| LA|this client has l| US LA in da|
| ada|the client as ada| null|
| client|the client as ada|this client has i|
| client|the client as ada| our client|
| client|this client has l|this client has i|
| client|this client has l| our client|
+-------+-----------------+-----------------+
The first condition you invoke in your
UDF
can be translated as follows: ifkeyword
indf_text
is not present indf_whitelist
then 0. It is equivalent to saying the value fordf_whitelist
columns are going to be NULL in theleft join
since they only appear in the left data frameThe second condition: you count the number of times
whitelist_terms
appear inText
:Text.count(whitelist_terms)
We'll write a UDF
to do this:
from pyspark.sql.types import IntegerType
count_terms = F.udf(lambda Text, term: Text.count(term) if term is not None else 0, IntegerType())
df = df.select(
"Text",
"keyword",
F.when(F.isnull("whitelist_terms"), 0).otherwise(count_terms("Text", "whitelist_terms")).alias("whitelist_counts"))
+-----------------+-------+----------------+
| Text|keyword|whitelist_counts|
+-----------------+-------+----------------+
|this client has l| LA| 0|
|this client has l| LA| 0|
|the client as ada| ada| 0|
|the client as ada| client| 0|
|the client as ada| client| 0|
|this client has l| client| 0|
|this client has l| client| 0|
+-----------------+-------+----------------+
Finally we can aggregate to get back to a dataframe with only distinct Text
:
res = df.groupBy("Text").agg(
F.collect_set("keyword").alias("Keywords"),
F.sum("whitelist_counts").alias("whitelist_counts"))
res.show()
+-----------------+-------------+----------------+
| Text| Keywords|whitelist_counts|
+-----------------+-------------+----------------+
|this client has l| [client, LA]| 0|
|the client as ada|[ada, client]| 0|
+-----------------+-------------+----------------+
answered Nov 12 '17 at 15:46
MaFF
3,2881515
3,2881515
Many many thanks MaFF, much appreciate the neat solution with clear explanation. Just one point, given that there are millions of text items, 50 keywords, and each keyword might be with a thousand terms. Would be the Cartesian join most efficient solution? Or is there still some alternatives in spark for better efficiency?
– user1740987
Nov 12 '17 at 16:47
I am not sure what you mean exactly. A cartesian product will be slower than a join with a key since you would drastically increase the number of lines in your resulting data frame. If one of your dataframes is "small" you should considerbroadcast
ing it (pyspark.sql.functions.broadcast
) inside thejoin
expression, it will increase efficiency
– MaFF
Nov 12 '17 at 17:44
add a comment |
Many many thanks MaFF, much appreciate the neat solution with clear explanation. Just one point, given that there are millions of text items, 50 keywords, and each keyword might be with a thousand terms. Would be the Cartesian join most efficient solution? Or is there still some alternatives in spark for better efficiency?
– user1740987
Nov 12 '17 at 16:47
I am not sure what you mean exactly. A cartesian product will be slower than a join with a key since you would drastically increase the number of lines in your resulting data frame. If one of your dataframes is "small" you should considerbroadcast
ing it (pyspark.sql.functions.broadcast
) inside thejoin
expression, it will increase efficiency
– MaFF
Nov 12 '17 at 17:44
Many many thanks MaFF, much appreciate the neat solution with clear explanation. Just one point, given that there are millions of text items, 50 keywords, and each keyword might be with a thousand terms. Would be the Cartesian join most efficient solution? Or is there still some alternatives in spark for better efficiency?
– user1740987
Nov 12 '17 at 16:47
Many many thanks MaFF, much appreciate the neat solution with clear explanation. Just one point, given that there are millions of text items, 50 keywords, and each keyword might be with a thousand terms. Would be the Cartesian join most efficient solution? Or is there still some alternatives in spark for better efficiency?
– user1740987
Nov 12 '17 at 16:47
I am not sure what you mean exactly. A cartesian product will be slower than a join with a key since you would drastically increase the number of lines in your resulting data frame. If one of your dataframes is "small" you should consider
broadcast
ing it (pyspark.sql.functions.broadcast
) inside the join
expression, it will increase efficiency– MaFF
Nov 12 '17 at 17:44
I am not sure what you mean exactly. A cartesian product will be slower than a join with a key since you would drastically increase the number of lines in your resulting data frame. If one of your dataframes is "small" you should consider
broadcast
ing it (pyspark.sql.functions.broadcast
) inside the join
expression, it will increase efficiency– MaFF
Nov 12 '17 at 17:44
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f47249292%2fpyspark-picklingerror-could-not-serialize-object%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown