Spark UDF not running in parallel










2















I'm attempting to use the Python port of the Google phonenumbers library to normalize 50 Million phone numbers. I'm reading into a SparkDataFrame from a Parquet file on S3 and then running operations on the dataframe. The following function, parsePhoneNumber, is expressed as a UDF:



def isValidNumber(phoneNum):
try:
pn = phonenumbers.parse(phoneNum, "US")
except:
return False
else:
return phonenumbers.is_valid_number(pn) and phonenumbers.is_possible_number(pn)

def parsePhoneNumber(phoneNum):
if isValidNumber(phoneNum):
parsedNumber = phonenumbers.parse(phoneNum, "US")
formattedNumber = phonenumbers.format_number(parsedNumber, phonenumbers.PhoneNumberFormat.E164)

return (True, parsedNumber.country_code, formattedNumber, parsedNumber.national_number, parsedNumber.extension)
else:
return (False, None, None, None)


And below is a sample of how I use the UDF to derive new columns:



newDataFrame = oldDataFrame.withColumn("new_column", parsePhoneNumber_udf(oldDataFrame.phone)).select("id", "new_column".national_number)


Executing the UDF by running display(newDataFrame) or newDataFrame.show(5) or something similar only uses one executer in the cluster, so it doesn't appear that something in the UDF is causing it only run on one worker.



If I'm doing anything that would prevent this from running in parallel, can you provide some insight?



The execution environment is on a cloud cluster controlled by Databricks.



Edit: Below is the output of oldDataFrame.explain



== Parsed Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Analyzed Logical Plan ==
id: string, person_id: string, phone: string, type: string, source_id: string, created_date: string, modified_date: string
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Optimized Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Physical Plan ==
*FileScan parquet [id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/mnt/person-data/parquet/phone], PartitionFilters: , PushedFilters: , ReadSchema: struct<id:string,person_id:string,phone:string,type:string,source_id:string,created_date:strin...









share|improve this question



















  • 1





    How do you create SparkSession?

    – T. Gawęda
    Jan 23 '18 at 16:45






  • 2





    I don't believe it's necessary to build a SparkSession on Databricks; therefore, I'm using sqlContext directly.

    – Sean Lindo
    Jan 23 '18 at 16:50






  • 1





    Ok, but in which mode is your SparkSession running? Maybe local, then it will run on one node

    – T. Gawęda
    Jan 23 '18 at 16:50











  • Please run sc.master where sc is SparkContext

    – T. Gawęda
    Jan 23 '18 at 16:55











  • Running sc.master returns a URL.

    – Sean Lindo
    Jan 23 '18 at 17:03















2















I'm attempting to use the Python port of the Google phonenumbers library to normalize 50 Million phone numbers. I'm reading into a SparkDataFrame from a Parquet file on S3 and then running operations on the dataframe. The following function, parsePhoneNumber, is expressed as a UDF:



def isValidNumber(phoneNum):
try:
pn = phonenumbers.parse(phoneNum, "US")
except:
return False
else:
return phonenumbers.is_valid_number(pn) and phonenumbers.is_possible_number(pn)

def parsePhoneNumber(phoneNum):
if isValidNumber(phoneNum):
parsedNumber = phonenumbers.parse(phoneNum, "US")
formattedNumber = phonenumbers.format_number(parsedNumber, phonenumbers.PhoneNumberFormat.E164)

return (True, parsedNumber.country_code, formattedNumber, parsedNumber.national_number, parsedNumber.extension)
else:
return (False, None, None, None)


And below is a sample of how I use the UDF to derive new columns:



newDataFrame = oldDataFrame.withColumn("new_column", parsePhoneNumber_udf(oldDataFrame.phone)).select("id", "new_column".national_number)


Executing the UDF by running display(newDataFrame) or newDataFrame.show(5) or something similar only uses one executer in the cluster, so it doesn't appear that something in the UDF is causing it only run on one worker.



If I'm doing anything that would prevent this from running in parallel, can you provide some insight?



The execution environment is on a cloud cluster controlled by Databricks.



Edit: Below is the output of oldDataFrame.explain



== Parsed Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Analyzed Logical Plan ==
id: string, person_id: string, phone: string, type: string, source_id: string, created_date: string, modified_date: string
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Optimized Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Physical Plan ==
*FileScan parquet [id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/mnt/person-data/parquet/phone], PartitionFilters: , PushedFilters: , ReadSchema: struct<id:string,person_id:string,phone:string,type:string,source_id:string,created_date:strin...









share|improve this question



















  • 1





    How do you create SparkSession?

    – T. Gawęda
    Jan 23 '18 at 16:45






  • 2





    I don't believe it's necessary to build a SparkSession on Databricks; therefore, I'm using sqlContext directly.

    – Sean Lindo
    Jan 23 '18 at 16:50






  • 1





    Ok, but in which mode is your SparkSession running? Maybe local, then it will run on one node

    – T. Gawęda
    Jan 23 '18 at 16:50











  • Please run sc.master where sc is SparkContext

    – T. Gawęda
    Jan 23 '18 at 16:55











  • Running sc.master returns a URL.

    – Sean Lindo
    Jan 23 '18 at 17:03













2












2








2








I'm attempting to use the Python port of the Google phonenumbers library to normalize 50 Million phone numbers. I'm reading into a SparkDataFrame from a Parquet file on S3 and then running operations on the dataframe. The following function, parsePhoneNumber, is expressed as a UDF:



def isValidNumber(phoneNum):
try:
pn = phonenumbers.parse(phoneNum, "US")
except:
return False
else:
return phonenumbers.is_valid_number(pn) and phonenumbers.is_possible_number(pn)

def parsePhoneNumber(phoneNum):
if isValidNumber(phoneNum):
parsedNumber = phonenumbers.parse(phoneNum, "US")
formattedNumber = phonenumbers.format_number(parsedNumber, phonenumbers.PhoneNumberFormat.E164)

return (True, parsedNumber.country_code, formattedNumber, parsedNumber.national_number, parsedNumber.extension)
else:
return (False, None, None, None)


And below is a sample of how I use the UDF to derive new columns:



newDataFrame = oldDataFrame.withColumn("new_column", parsePhoneNumber_udf(oldDataFrame.phone)).select("id", "new_column".national_number)


Executing the UDF by running display(newDataFrame) or newDataFrame.show(5) or something similar only uses one executer in the cluster, so it doesn't appear that something in the UDF is causing it only run on one worker.



If I'm doing anything that would prevent this from running in parallel, can you provide some insight?



The execution environment is on a cloud cluster controlled by Databricks.



Edit: Below is the output of oldDataFrame.explain



== Parsed Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Analyzed Logical Plan ==
id: string, person_id: string, phone: string, type: string, source_id: string, created_date: string, modified_date: string
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Optimized Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Physical Plan ==
*FileScan parquet [id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/mnt/person-data/parquet/phone], PartitionFilters: , PushedFilters: , ReadSchema: struct<id:string,person_id:string,phone:string,type:string,source_id:string,created_date:strin...









share|improve this question
















I'm attempting to use the Python port of the Google phonenumbers library to normalize 50 Million phone numbers. I'm reading into a SparkDataFrame from a Parquet file on S3 and then running operations on the dataframe. The following function, parsePhoneNumber, is expressed as a UDF:



def isValidNumber(phoneNum):
try:
pn = phonenumbers.parse(phoneNum, "US")
except:
return False
else:
return phonenumbers.is_valid_number(pn) and phonenumbers.is_possible_number(pn)

def parsePhoneNumber(phoneNum):
if isValidNumber(phoneNum):
parsedNumber = phonenumbers.parse(phoneNum, "US")
formattedNumber = phonenumbers.format_number(parsedNumber, phonenumbers.PhoneNumberFormat.E164)

return (True, parsedNumber.country_code, formattedNumber, parsedNumber.national_number, parsedNumber.extension)
else:
return (False, None, None, None)


And below is a sample of how I use the UDF to derive new columns:



newDataFrame = oldDataFrame.withColumn("new_column", parsePhoneNumber_udf(oldDataFrame.phone)).select("id", "new_column".national_number)


Executing the UDF by running display(newDataFrame) or newDataFrame.show(5) or something similar only uses one executer in the cluster, so it doesn't appear that something in the UDF is causing it only run on one worker.



If I'm doing anything that would prevent this from running in parallel, can you provide some insight?



The execution environment is on a cloud cluster controlled by Databricks.



Edit: Below is the output of oldDataFrame.explain



== Parsed Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Analyzed Logical Plan ==
id: string, person_id: string, phone: string, type: string, source_id: string, created_date: string, modified_date: string
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Optimized Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Physical Plan ==
*FileScan parquet [id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/mnt/person-data/parquet/phone], PartitionFilters: , PushedFilters: , ReadSchema: struct<id:string,person_id:string,phone:string,type:string,source_id:string,created_date:strin...






python apache-spark pyspark databricks






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Jan 23 '18 at 17:43







Sean Lindo

















asked Jan 23 '18 at 16:39









Sean LindoSean Lindo

632822




632822







  • 1





    How do you create SparkSession?

    – T. Gawęda
    Jan 23 '18 at 16:45






  • 2





    I don't believe it's necessary to build a SparkSession on Databricks; therefore, I'm using sqlContext directly.

    – Sean Lindo
    Jan 23 '18 at 16:50






  • 1





    Ok, but in which mode is your SparkSession running? Maybe local, then it will run on one node

    – T. Gawęda
    Jan 23 '18 at 16:50











  • Please run sc.master where sc is SparkContext

    – T. Gawęda
    Jan 23 '18 at 16:55











  • Running sc.master returns a URL.

    – Sean Lindo
    Jan 23 '18 at 17:03












  • 1





    How do you create SparkSession?

    – T. Gawęda
    Jan 23 '18 at 16:45






  • 2





    I don't believe it's necessary to build a SparkSession on Databricks; therefore, I'm using sqlContext directly.

    – Sean Lindo
    Jan 23 '18 at 16:50






  • 1





    Ok, but in which mode is your SparkSession running? Maybe local, then it will run on one node

    – T. Gawęda
    Jan 23 '18 at 16:50











  • Please run sc.master where sc is SparkContext

    – T. Gawęda
    Jan 23 '18 at 16:55











  • Running sc.master returns a URL.

    – Sean Lindo
    Jan 23 '18 at 17:03







1




1





How do you create SparkSession?

– T. Gawęda
Jan 23 '18 at 16:45





How do you create SparkSession?

– T. Gawęda
Jan 23 '18 at 16:45




2




2





I don't believe it's necessary to build a SparkSession on Databricks; therefore, I'm using sqlContext directly.

– Sean Lindo
Jan 23 '18 at 16:50





I don't believe it's necessary to build a SparkSession on Databricks; therefore, I'm using sqlContext directly.

– Sean Lindo
Jan 23 '18 at 16:50




1




1





Ok, but in which mode is your SparkSession running? Maybe local, then it will run on one node

– T. Gawęda
Jan 23 '18 at 16:50





Ok, but in which mode is your SparkSession running? Maybe local, then it will run on one node

– T. Gawęda
Jan 23 '18 at 16:50













Please run sc.master where sc is SparkContext

– T. Gawęda
Jan 23 '18 at 16:55





Please run sc.master where sc is SparkContext

– T. Gawęda
Jan 23 '18 at 16:55













Running sc.master returns a URL.

– Sean Lindo
Jan 23 '18 at 17:03





Running sc.master returns a URL.

– Sean Lindo
Jan 23 '18 at 17:03












1 Answer
1






active

oldest

votes


















4














You are all good. Display, with default arguments shows the first 1000 rows at most. Similarly newDataFrame.show(5) shows only the first five rows.



At the same time execution plain (oldDataFrame.explain) shows no shuffles so in both cases Spark will evaluate only the minimum number of partitions to get the required number of rows - for these values it is probably one partition.



If you want to be sure:



  • Check if oldDataFrame.rdd.getNumPartitions() is larger than one.

  • If it is, force execution of all partitions with df.foreach(lambda _: None) or newDataFrame.foreach(lambda _: None).

You should see more active executors.






share|improve this answer

























  • show() uses normal limit - it should do local limit and then global limit. Maybe Parquet file has only one partition?

    – T. Gawęda
    Jan 23 '18 at 17:57











  • I've been partitioning our CSV files into 25 when converting them to Parquet, so the output of oldDataFrame.rdd.getNumPartitions() is 25. In addition, running oldDataFrame.foreach(lambda _: None) executes 4 tasks each on the 4 workers. To that point, is using withColumn the appropriate strategy here? I assume simply mapping over the DataFrame will execute in parallel as well.

    – Sean Lindo
    Jan 23 '18 at 18:00












  • @sean Spark gets more and more complicated, maybe they optimized this. So, up vote deserved ;)

    – T. Gawęda
    Jan 23 '18 at 18:02











  • @T.Gawęda As far as I remember show(n) will evaluate to df.limit(n).queryExecution... and with narrow transformation (no shuffle) it will collect only as much as it needs.

    – hi-zir
    Jan 23 '18 at 18:05







  • 1





    @T.Gawęda The worst thing about having an optimizer is that you never know what exactly happens out there. And now we have cost based one :)

    – hi-zir
    Jan 23 '18 at 19:40










Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













draft saved

draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f48406749%2fspark-udf-not-running-in-parallel%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









4














You are all good. Display, with default arguments shows the first 1000 rows at most. Similarly newDataFrame.show(5) shows only the first five rows.



At the same time execution plain (oldDataFrame.explain) shows no shuffles so in both cases Spark will evaluate only the minimum number of partitions to get the required number of rows - for these values it is probably one partition.



If you want to be sure:



  • Check if oldDataFrame.rdd.getNumPartitions() is larger than one.

  • If it is, force execution of all partitions with df.foreach(lambda _: None) or newDataFrame.foreach(lambda _: None).

You should see more active executors.






share|improve this answer

























  • show() uses normal limit - it should do local limit and then global limit. Maybe Parquet file has only one partition?

    – T. Gawęda
    Jan 23 '18 at 17:57











  • I've been partitioning our CSV files into 25 when converting them to Parquet, so the output of oldDataFrame.rdd.getNumPartitions() is 25. In addition, running oldDataFrame.foreach(lambda _: None) executes 4 tasks each on the 4 workers. To that point, is using withColumn the appropriate strategy here? I assume simply mapping over the DataFrame will execute in parallel as well.

    – Sean Lindo
    Jan 23 '18 at 18:00












  • @sean Spark gets more and more complicated, maybe they optimized this. So, up vote deserved ;)

    – T. Gawęda
    Jan 23 '18 at 18:02











  • @T.Gawęda As far as I remember show(n) will evaluate to df.limit(n).queryExecution... and with narrow transformation (no shuffle) it will collect only as much as it needs.

    – hi-zir
    Jan 23 '18 at 18:05







  • 1





    @T.Gawęda The worst thing about having an optimizer is that you never know what exactly happens out there. And now we have cost based one :)

    – hi-zir
    Jan 23 '18 at 19:40















4














You are all good. Display, with default arguments shows the first 1000 rows at most. Similarly newDataFrame.show(5) shows only the first five rows.



At the same time execution plain (oldDataFrame.explain) shows no shuffles so in both cases Spark will evaluate only the minimum number of partitions to get the required number of rows - for these values it is probably one partition.



If you want to be sure:



  • Check if oldDataFrame.rdd.getNumPartitions() is larger than one.

  • If it is, force execution of all partitions with df.foreach(lambda _: None) or newDataFrame.foreach(lambda _: None).

You should see more active executors.






share|improve this answer

























  • show() uses normal limit - it should do local limit and then global limit. Maybe Parquet file has only one partition?

    – T. Gawęda
    Jan 23 '18 at 17:57











  • I've been partitioning our CSV files into 25 when converting them to Parquet, so the output of oldDataFrame.rdd.getNumPartitions() is 25. In addition, running oldDataFrame.foreach(lambda _: None) executes 4 tasks each on the 4 workers. To that point, is using withColumn the appropriate strategy here? I assume simply mapping over the DataFrame will execute in parallel as well.

    – Sean Lindo
    Jan 23 '18 at 18:00












  • @sean Spark gets more and more complicated, maybe they optimized this. So, up vote deserved ;)

    – T. Gawęda
    Jan 23 '18 at 18:02











  • @T.Gawęda As far as I remember show(n) will evaluate to df.limit(n).queryExecution... and with narrow transformation (no shuffle) it will collect only as much as it needs.

    – hi-zir
    Jan 23 '18 at 18:05







  • 1





    @T.Gawęda The worst thing about having an optimizer is that you never know what exactly happens out there. And now we have cost based one :)

    – hi-zir
    Jan 23 '18 at 19:40













4












4








4







You are all good. Display, with default arguments shows the first 1000 rows at most. Similarly newDataFrame.show(5) shows only the first five rows.



At the same time execution plain (oldDataFrame.explain) shows no shuffles so in both cases Spark will evaluate only the minimum number of partitions to get the required number of rows - for these values it is probably one partition.



If you want to be sure:



  • Check if oldDataFrame.rdd.getNumPartitions() is larger than one.

  • If it is, force execution of all partitions with df.foreach(lambda _: None) or newDataFrame.foreach(lambda _: None).

You should see more active executors.






share|improve this answer















You are all good. Display, with default arguments shows the first 1000 rows at most. Similarly newDataFrame.show(5) shows only the first five rows.



At the same time execution plain (oldDataFrame.explain) shows no shuffles so in both cases Spark will evaluate only the minimum number of partitions to get the required number of rows - for these values it is probably one partition.



If you want to be sure:



  • Check if oldDataFrame.rdd.getNumPartitions() is larger than one.

  • If it is, force execution of all partitions with df.foreach(lambda _: None) or newDataFrame.foreach(lambda _: None).

You should see more active executors.







share|improve this answer














share|improve this answer



share|improve this answer








edited Jan 23 '18 at 18:15

























answered Jan 23 '18 at 17:53









hi-zirhi-zir

20.6k63064




20.6k63064












  • show() uses normal limit - it should do local limit and then global limit. Maybe Parquet file has only one partition?

    – T. Gawęda
    Jan 23 '18 at 17:57











  • I've been partitioning our CSV files into 25 when converting them to Parquet, so the output of oldDataFrame.rdd.getNumPartitions() is 25. In addition, running oldDataFrame.foreach(lambda _: None) executes 4 tasks each on the 4 workers. To that point, is using withColumn the appropriate strategy here? I assume simply mapping over the DataFrame will execute in parallel as well.

    – Sean Lindo
    Jan 23 '18 at 18:00












  • @sean Spark gets more and more complicated, maybe they optimized this. So, up vote deserved ;)

    – T. Gawęda
    Jan 23 '18 at 18:02











  • @T.Gawęda As far as I remember show(n) will evaluate to df.limit(n).queryExecution... and with narrow transformation (no shuffle) it will collect only as much as it needs.

    – hi-zir
    Jan 23 '18 at 18:05







  • 1





    @T.Gawęda The worst thing about having an optimizer is that you never know what exactly happens out there. And now we have cost based one :)

    – hi-zir
    Jan 23 '18 at 19:40

















  • show() uses normal limit - it should do local limit and then global limit. Maybe Parquet file has only one partition?

    – T. Gawęda
    Jan 23 '18 at 17:57











  • I've been partitioning our CSV files into 25 when converting them to Parquet, so the output of oldDataFrame.rdd.getNumPartitions() is 25. In addition, running oldDataFrame.foreach(lambda _: None) executes 4 tasks each on the 4 workers. To that point, is using withColumn the appropriate strategy here? I assume simply mapping over the DataFrame will execute in parallel as well.

    – Sean Lindo
    Jan 23 '18 at 18:00












  • @sean Spark gets more and more complicated, maybe they optimized this. So, up vote deserved ;)

    – T. Gawęda
    Jan 23 '18 at 18:02











  • @T.Gawęda As far as I remember show(n) will evaluate to df.limit(n).queryExecution... and with narrow transformation (no shuffle) it will collect only as much as it needs.

    – hi-zir
    Jan 23 '18 at 18:05







  • 1





    @T.Gawęda The worst thing about having an optimizer is that you never know what exactly happens out there. And now we have cost based one :)

    – hi-zir
    Jan 23 '18 at 19:40
















show() uses normal limit - it should do local limit and then global limit. Maybe Parquet file has only one partition?

– T. Gawęda
Jan 23 '18 at 17:57





show() uses normal limit - it should do local limit and then global limit. Maybe Parquet file has only one partition?

– T. Gawęda
Jan 23 '18 at 17:57













I've been partitioning our CSV files into 25 when converting them to Parquet, so the output of oldDataFrame.rdd.getNumPartitions() is 25. In addition, running oldDataFrame.foreach(lambda _: None) executes 4 tasks each on the 4 workers. To that point, is using withColumn the appropriate strategy here? I assume simply mapping over the DataFrame will execute in parallel as well.

– Sean Lindo
Jan 23 '18 at 18:00






I've been partitioning our CSV files into 25 when converting them to Parquet, so the output of oldDataFrame.rdd.getNumPartitions() is 25. In addition, running oldDataFrame.foreach(lambda _: None) executes 4 tasks each on the 4 workers. To that point, is using withColumn the appropriate strategy here? I assume simply mapping over the DataFrame will execute in parallel as well.

– Sean Lindo
Jan 23 '18 at 18:00














@sean Spark gets more and more complicated, maybe they optimized this. So, up vote deserved ;)

– T. Gawęda
Jan 23 '18 at 18:02





@sean Spark gets more and more complicated, maybe they optimized this. So, up vote deserved ;)

– T. Gawęda
Jan 23 '18 at 18:02













@T.Gawęda As far as I remember show(n) will evaluate to df.limit(n).queryExecution... and with narrow transformation (no shuffle) it will collect only as much as it needs.

– hi-zir
Jan 23 '18 at 18:05






@T.Gawęda As far as I remember show(n) will evaluate to df.limit(n).queryExecution... and with narrow transformation (no shuffle) it will collect only as much as it needs.

– hi-zir
Jan 23 '18 at 18:05





1




1





@T.Gawęda The worst thing about having an optimizer is that you never know what exactly happens out there. And now we have cost based one :)

– hi-zir
Jan 23 '18 at 19:40





@T.Gawęda The worst thing about having an optimizer is that you never know what exactly happens out there. And now we have cost based one :)

– hi-zir
Jan 23 '18 at 19:40



















draft saved

draft discarded
















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid


  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f48406749%2fspark-udf-not-running-in-parallel%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







這個網誌中的熱門文章

Barbados

How to read a connectionString WITH PROVIDER in .NET Core?

Node.js Script on GitHub Pages or Amazon S3