Querying Spark SQL DataFrame with complex types
up vote
39
down vote
favorite
How Can I query an RDD with complex types such as maps/arrays?
for example, when I was writing this test code:
case class Test(name: String, map: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
I though the syntax would be something like:
sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world")
or
sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world")
but I get
Can't access nested field in type MapType(StringType,StringType,true)
and
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes
respectively.
scala apache-spark dataframe apache-spark-sql spark-dataframe
add a comment |
up vote
39
down vote
favorite
How Can I query an RDD with complex types such as maps/arrays?
for example, when I was writing this test code:
case class Test(name: String, map: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
I though the syntax would be something like:
sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world")
or
sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world")
but I get
Can't access nested field in type MapType(StringType,StringType,true)
and
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes
respectively.
scala apache-spark dataframe apache-spark-sql spark-dataframe
1
How about accepting that tome of an answer from @zero323 ?
– javadba
Dec 5 '17 at 6:08
add a comment |
up vote
39
down vote
favorite
up vote
39
down vote
favorite
How Can I query an RDD with complex types such as maps/arrays?
for example, when I was writing this test code:
case class Test(name: String, map: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
I though the syntax would be something like:
sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world")
or
sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world")
but I get
Can't access nested field in type MapType(StringType,StringType,true)
and
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes
respectively.
scala apache-spark dataframe apache-spark-sql spark-dataframe
How Can I query an RDD with complex types such as maps/arrays?
for example, when I was writing this test code:
case class Test(name: String, map: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
I though the syntax would be something like:
sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world")
or
sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world")
but I get
Can't access nested field in type MapType(StringType,StringType,true)
and
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes
respectively.
scala apache-spark dataframe apache-spark-sql spark-dataframe
scala apache-spark dataframe apache-spark-sql spark-dataframe
edited Mar 22 at 15:27
Community♦
11
11
asked Feb 4 '15 at 22:12
dvir
9451910
9451910
1
How about accepting that tome of an answer from @zero323 ?
– javadba
Dec 5 '17 at 6:08
add a comment |
1
How about accepting that tome of an answer from @zero323 ?
– javadba
Dec 5 '17 at 6:08
1
1
How about accepting that tome of an answer from @zero323 ?
– javadba
Dec 5 '17 at 6:08
How about accepting that tome of an answer from @zero323 ?
– javadba
Dec 5 '17 at 6:08
add a comment |
3 Answers
3
active
oldest
votes
up vote
103
down vote
accepted
It depends on a type of the column. Lets start with some dummy data:
import org.apache.spark.sql.functions.udf, lit
import scala.util.Try
case class SubRecord(x: Int)
case class ArrayElement(foo: String, bar: Int, vals: Array[Double])
case class Record(
an_array: Array[Int], a_map: Map[String, String],
a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])
val df = sc.parallelize(Seq(
Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1),
Array(
ArrayElement("foo", 1, Array(1.0, 2.0, 2.0)),
ArrayElement("bar", 2, Array(3.0, 4.0, 5.0)))),
Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2),
Array(ArrayElement("foz", 3, Array(5.0, 6.0)),
ArrayElement("baz", 4, Array(7.0, 8.0))))
)).toDF
df.registerTempTable("df")
df.printSchema
// root
// |-- an_array: array (nullable = true)
// | |-- element: integer (containsNull = false)
// |-- a_map: map (nullable = true)
// | |-- key: string
// | |-- value: string (valueContainsNull = true)
// |-- a_struct: struct (nullable = true)
// | |-- x: integer (nullable = false)
// |-- an_array_of_structs: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- foo: string (nullable = true)
// | | |-- bar: integer (nullable = false)
// | | |-- vals: array (nullable = true)
// | | | |-- element: double (containsNull = false)
array (
ArrayType
) columns:Column.getItem
methoddf.select($"an_array".getItem(1)).show
// +-----------+
// |an_array[1]|
// +-----------+
// | 2|
// | 5|
// +-----------+Hive brackets syntax:
sqlContext.sql("SELECT an_array[1] FROM df").show
// +---+
// |_c0|
// +---+
// | 2|
// | 5|
// +---+an UDF
val get_ith = udf((xs: Seq[Int], i: Int) => Try(xs(i)).toOption)
df.select(get_ith($"an_array", lit(1))).show
// +---------------+
// |UDF(an_array,1)|
// +---------------+
// | 2|
// | 5|
// +---------------+Additionally to the methods listed above Spark supports a growing list of built-in functions operating on complex types. Notable examples include higher order functions like
transform
(SQL only, 2.4+):df.selectExpr("transform(an_array, x -> x + 1) an_array_inc").show
// +------------+
// |an_array_inc|
// +------------+
// | [2, 3, 4]|
// | [5, 6, 7]|
// +------------+filter
(SQL only, 2.4+)df.selectExpr("filter(an_array, x -> x % 2 == 0) an_array_even").show
// +-------------+
// |an_array_even|
// +-------------+
// | [2]|
// | [4, 6]|
// +-------------+aggregate
(SQL only, 2.4+):df.selectExpr("aggregate(an_array, 0, (acc, x) -> acc + x, acc -> acc) an_array_sum").show
// +------------+
// |an_array_sum|
// +------------+
// | 6|
// | 15|
// +------------+array processing functions (
array_*
) likearray_distinct
(2.4+):import org.apache.spark.sql.functions.array_distinct
df.select(array_distinct($"an_array_of_structs.vals"(0))).show
// +-------------------------------------------+
// |array_distinct(an_array_of_structs.vals[0])|
// +-------------------------------------------+
// | [1.0, 2.0]|
// | [5.0, 6.0]|
// +-------------------------------------------+array_max
(array_min
, 2.4+):import org.apache.spark.sql.functions.array_max
df.select(array_max($"an_array")).show
// +-------------------+
// |array_max(an_array)|
// +-------------------+
// | 3|
// | 6|
// +-------------------+flatten
(2.4+)import org.apache.spark.sql.functions.flatten
df.select(flatten($"an_array_of_structs.vals")).show
// +---------------------------------+
// |flatten(an_array_of_structs.vals)|
// +---------------------------------+
// | [1.0, 2.0, 2.0, 3...|
// | [5.0, 6.0, 7.0, 8.0]|
// +---------------------------------+arrays_zip
(2.4+):import org.apache.spark.sql.functions.arrays_zip
df.select(arrays_zip($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show(false)
// +--------------------------------------------------------------------+
// |arrays_zip(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
// +--------------------------------------------------------------------+
// |[[1.0, 3.0], [2.0, 4.0], [2.0, 5.0]] |
// |[[5.0, 7.0], [6.0, 8.0]] |
// +--------------------------------------------------------------------+array_union
(2.4+):import org.apache.spark.sql.functions.array_union
df.select(array_union($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show
// +---------------------------------------------------------------------+
// |array_union(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
// +---------------------------------------------------------------------+
// | [1.0, 2.0, 3.0, 4...|
// | [5.0, 6.0, 7.0, 8.0]|
// +---------------------------------------------------------------------+slice
(2.4+):import org.apache.spark.sql.functions.slice
df.select(slice($"an_array", 2, 2)).show
// +---------------------+
// |slice(an_array, 2, 2)|
// +---------------------+
// | [2, 3]|
// | [5, 6]|
// +---------------------+
map (
MapType
) columnsusing
Column.getField
method:df.select($"a_map".getField("foo")).show
// +----------+
// |a_map[foo]|
// +----------+
// | bar|
// | null|
// +----------+using Hive brackets syntax:
sqlContext.sql("SELECT a_map['foz'] FROM df").show
// +----+
// | _c0|
// +----+
// |null|
// | baz|
// +----+using a full path with dot syntax:
df.select($"a_map.foo").show
// +----+
// | foo|
// +----+
// | bar|
// |null|
// +----+using an UDF
val get_field = udf((kvs: Map[String, String], k: String) => kvs.get(k))
df.select(get_field($"a_map", lit("foo"))).show
// +--------------+
// |UDF(a_map,foo)|
// +--------------+
// | bar|
// | null|
// +--------------+Growing number of
map_*
functions likemap_keys
(2.3+)import org.apache.spark.sql.functions.map_keys
df.select(map_keys($"a_map")).show
// +---------------+
// |map_keys(a_map)|
// +---------------+
// | [foo]|
// | [foz]|
// +---------------+or
map_values
(2.3+)import org.apache.spark.sql.functions.map_values
df.select(map_values($"a_map")).show
// +-----------------+
// |map_values(a_map)|
// +-----------------+
// | [bar]|
// | [baz]|
// +-----------------+
Please check SPARK-23899 for a detailed list.
struct (
StructType
) columns using full path with dot syntax:with DataFrame API
df.select($"a_struct.x").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+with raw SQL
sqlContext.sql("SELECT a_struct.x FROM df").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+
fields inside array of
structs
can be accessed using dot-syntax, names and standardColumn
methods:df.select($"an_array_of_structs.foo").show
// +----------+
// | foo|
// +----------+
// |[foo, bar]|
// |[foz, baz]|
// +----------+
sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
// +---+
// |_c0|
// +---+
// |foo|
// |foz|
// +---+
df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show
// +------------------------------+
// |an_array_of_structs.vals[1][1]|
// +------------------------------+
// | 4.0|
// | 8.0|
// +------------------------------+user defined types (UDTs) fields can be accessed using UDFs. See SparkSQL referencing attributes of UDT for details.
Notes:
- depending on a Spark version some of these methods can be available only with
HiveContext
. UDFs should work independent of version with both standardSQLContext
andHiveContext
. generally speaking nested values are a second class citizens. Not all typical operations are supported on nested fields. Depending on a context it could be better to flatten the schema and / or explode collections
df.select(explode($"an_array_of_structs")).show
// +--------------------+
// | col|
// +--------------------+
// |[foo,1,WrappedArr...|
// |[bar,2,WrappedArr...|
// |[foz,3,WrappedArr...|
// |[baz,4,WrappedArr...|
// +--------------------+Dot syntax can be combined with wildcard character (
*
) to select (possibly multiple) fields without specifying names explicitly:df.select($"a_struct.*").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+JSON columns can be queried using
get_json_object
andfrom_json
functions. See How to query JSON data column using Spark DataFrames? for details.
Is it possible to fetch all the elements in a struct array? Is something like this possible.. sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
– user1384205
Jun 6 '16 at 15:39
3
This should be the accepted answer.
– Patrick McGloin
Dec 16 '16 at 8:27
How to do the same thing asSELECT an_array_of_structs[0].foo FROM df
using code not spark sql? And is it supported to execute a UDF on an array of structs column(an_array_of_structs) using code? LikeSELECT max(an_array_of_structs.bar) FROM df
using code.
– DeepNightTwo
Mar 5 at 3:44
add a comment |
up vote
2
down vote
Once You convert it to DF, u can simply fetch data as
val rddRow= rdd.map(kv=>
val k = kv._1
val v = kv._2
Row(k, v)
)
val myFld1 = StructField("name", org.apache.spark.sql.types.StringType, true)
val myFld2 = StructField("map", org.apache.spark.sql.types.MapType(StringType, StringType), true)
val arr = Array( myFld1, myFld2)
val schema = StructType( arr )
val rowrddDF = sqc.createDataFrame(rddRow, schema)
rowrddDF.registerTempTable("rowtbl")
val rowrddDFFinal = rowrddDF.select(rowrddDF("map.one"))
or
val rowrddDFFinal = rowrddDF.select("map.one")
when I try this I geterror: value _1 is not a member of org.apache.spark.sql.Row
– Paul
Nov 23 '17 at 17:32
add a comment |
up vote
0
down vote
here was what I did and it worked
case class Test(name: String, m: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
val rdddf = rdd.toDF
rdddf.registerTempTable("mytable")
sqlContext.sql("select m.hello from mytable").show
Results
+------+
| hello|
+------+
| world|
|people|
+------+
add a comment |
3 Answers
3
active
oldest
votes
3 Answers
3
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
103
down vote
accepted
It depends on a type of the column. Lets start with some dummy data:
import org.apache.spark.sql.functions.udf, lit
import scala.util.Try
case class SubRecord(x: Int)
case class ArrayElement(foo: String, bar: Int, vals: Array[Double])
case class Record(
an_array: Array[Int], a_map: Map[String, String],
a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])
val df = sc.parallelize(Seq(
Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1),
Array(
ArrayElement("foo", 1, Array(1.0, 2.0, 2.0)),
ArrayElement("bar", 2, Array(3.0, 4.0, 5.0)))),
Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2),
Array(ArrayElement("foz", 3, Array(5.0, 6.0)),
ArrayElement("baz", 4, Array(7.0, 8.0))))
)).toDF
df.registerTempTable("df")
df.printSchema
// root
// |-- an_array: array (nullable = true)
// | |-- element: integer (containsNull = false)
// |-- a_map: map (nullable = true)
// | |-- key: string
// | |-- value: string (valueContainsNull = true)
// |-- a_struct: struct (nullable = true)
// | |-- x: integer (nullable = false)
// |-- an_array_of_structs: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- foo: string (nullable = true)
// | | |-- bar: integer (nullable = false)
// | | |-- vals: array (nullable = true)
// | | | |-- element: double (containsNull = false)
array (
ArrayType
) columns:Column.getItem
methoddf.select($"an_array".getItem(1)).show
// +-----------+
// |an_array[1]|
// +-----------+
// | 2|
// | 5|
// +-----------+Hive brackets syntax:
sqlContext.sql("SELECT an_array[1] FROM df").show
// +---+
// |_c0|
// +---+
// | 2|
// | 5|
// +---+an UDF
val get_ith = udf((xs: Seq[Int], i: Int) => Try(xs(i)).toOption)
df.select(get_ith($"an_array", lit(1))).show
// +---------------+
// |UDF(an_array,1)|
// +---------------+
// | 2|
// | 5|
// +---------------+Additionally to the methods listed above Spark supports a growing list of built-in functions operating on complex types. Notable examples include higher order functions like
transform
(SQL only, 2.4+):df.selectExpr("transform(an_array, x -> x + 1) an_array_inc").show
// +------------+
// |an_array_inc|
// +------------+
// | [2, 3, 4]|
// | [5, 6, 7]|
// +------------+filter
(SQL only, 2.4+)df.selectExpr("filter(an_array, x -> x % 2 == 0) an_array_even").show
// +-------------+
// |an_array_even|
// +-------------+
// | [2]|
// | [4, 6]|
// +-------------+aggregate
(SQL only, 2.4+):df.selectExpr("aggregate(an_array, 0, (acc, x) -> acc + x, acc -> acc) an_array_sum").show
// +------------+
// |an_array_sum|
// +------------+
// | 6|
// | 15|
// +------------+array processing functions (
array_*
) likearray_distinct
(2.4+):import org.apache.spark.sql.functions.array_distinct
df.select(array_distinct($"an_array_of_structs.vals"(0))).show
// +-------------------------------------------+
// |array_distinct(an_array_of_structs.vals[0])|
// +-------------------------------------------+
// | [1.0, 2.0]|
// | [5.0, 6.0]|
// +-------------------------------------------+array_max
(array_min
, 2.4+):import org.apache.spark.sql.functions.array_max
df.select(array_max($"an_array")).show
// +-------------------+
// |array_max(an_array)|
// +-------------------+
// | 3|
// | 6|
// +-------------------+flatten
(2.4+)import org.apache.spark.sql.functions.flatten
df.select(flatten($"an_array_of_structs.vals")).show
// +---------------------------------+
// |flatten(an_array_of_structs.vals)|
// +---------------------------------+
// | [1.0, 2.0, 2.0, 3...|
// | [5.0, 6.0, 7.0, 8.0]|
// +---------------------------------+arrays_zip
(2.4+):import org.apache.spark.sql.functions.arrays_zip
df.select(arrays_zip($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show(false)
// +--------------------------------------------------------------------+
// |arrays_zip(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
// +--------------------------------------------------------------------+
// |[[1.0, 3.0], [2.0, 4.0], [2.0, 5.0]] |
// |[[5.0, 7.0], [6.0, 8.0]] |
// +--------------------------------------------------------------------+array_union
(2.4+):import org.apache.spark.sql.functions.array_union
df.select(array_union($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show
// +---------------------------------------------------------------------+
// |array_union(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
// +---------------------------------------------------------------------+
// | [1.0, 2.0, 3.0, 4...|
// | [5.0, 6.0, 7.0, 8.0]|
// +---------------------------------------------------------------------+slice
(2.4+):import org.apache.spark.sql.functions.slice
df.select(slice($"an_array", 2, 2)).show
// +---------------------+
// |slice(an_array, 2, 2)|
// +---------------------+
// | [2, 3]|
// | [5, 6]|
// +---------------------+
map (
MapType
) columnsusing
Column.getField
method:df.select($"a_map".getField("foo")).show
// +----------+
// |a_map[foo]|
// +----------+
// | bar|
// | null|
// +----------+using Hive brackets syntax:
sqlContext.sql("SELECT a_map['foz'] FROM df").show
// +----+
// | _c0|
// +----+
// |null|
// | baz|
// +----+using a full path with dot syntax:
df.select($"a_map.foo").show
// +----+
// | foo|
// +----+
// | bar|
// |null|
// +----+using an UDF
val get_field = udf((kvs: Map[String, String], k: String) => kvs.get(k))
df.select(get_field($"a_map", lit("foo"))).show
// +--------------+
// |UDF(a_map,foo)|
// +--------------+
// | bar|
// | null|
// +--------------+Growing number of
map_*
functions likemap_keys
(2.3+)import org.apache.spark.sql.functions.map_keys
df.select(map_keys($"a_map")).show
// +---------------+
// |map_keys(a_map)|
// +---------------+
// | [foo]|
// | [foz]|
// +---------------+or
map_values
(2.3+)import org.apache.spark.sql.functions.map_values
df.select(map_values($"a_map")).show
// +-----------------+
// |map_values(a_map)|
// +-----------------+
// | [bar]|
// | [baz]|
// +-----------------+
Please check SPARK-23899 for a detailed list.
struct (
StructType
) columns using full path with dot syntax:with DataFrame API
df.select($"a_struct.x").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+with raw SQL
sqlContext.sql("SELECT a_struct.x FROM df").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+
fields inside array of
structs
can be accessed using dot-syntax, names and standardColumn
methods:df.select($"an_array_of_structs.foo").show
// +----------+
// | foo|
// +----------+
// |[foo, bar]|
// |[foz, baz]|
// +----------+
sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
// +---+
// |_c0|
// +---+
// |foo|
// |foz|
// +---+
df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show
// +------------------------------+
// |an_array_of_structs.vals[1][1]|
// +------------------------------+
// | 4.0|
// | 8.0|
// +------------------------------+user defined types (UDTs) fields can be accessed using UDFs. See SparkSQL referencing attributes of UDT for details.
Notes:
- depending on a Spark version some of these methods can be available only with
HiveContext
. UDFs should work independent of version with both standardSQLContext
andHiveContext
. generally speaking nested values are a second class citizens. Not all typical operations are supported on nested fields. Depending on a context it could be better to flatten the schema and / or explode collections
df.select(explode($"an_array_of_structs")).show
// +--------------------+
// | col|
// +--------------------+
// |[foo,1,WrappedArr...|
// |[bar,2,WrappedArr...|
// |[foz,3,WrappedArr...|
// |[baz,4,WrappedArr...|
// +--------------------+Dot syntax can be combined with wildcard character (
*
) to select (possibly multiple) fields without specifying names explicitly:df.select($"a_struct.*").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+JSON columns can be queried using
get_json_object
andfrom_json
functions. See How to query JSON data column using Spark DataFrames? for details.
Is it possible to fetch all the elements in a struct array? Is something like this possible.. sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
– user1384205
Jun 6 '16 at 15:39
3
This should be the accepted answer.
– Patrick McGloin
Dec 16 '16 at 8:27
How to do the same thing asSELECT an_array_of_structs[0].foo FROM df
using code not spark sql? And is it supported to execute a UDF on an array of structs column(an_array_of_structs) using code? LikeSELECT max(an_array_of_structs.bar) FROM df
using code.
– DeepNightTwo
Mar 5 at 3:44
add a comment |
up vote
103
down vote
accepted
It depends on a type of the column. Lets start with some dummy data:
import org.apache.spark.sql.functions.udf, lit
import scala.util.Try
case class SubRecord(x: Int)
case class ArrayElement(foo: String, bar: Int, vals: Array[Double])
case class Record(
an_array: Array[Int], a_map: Map[String, String],
a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])
val df = sc.parallelize(Seq(
Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1),
Array(
ArrayElement("foo", 1, Array(1.0, 2.0, 2.0)),
ArrayElement("bar", 2, Array(3.0, 4.0, 5.0)))),
Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2),
Array(ArrayElement("foz", 3, Array(5.0, 6.0)),
ArrayElement("baz", 4, Array(7.0, 8.0))))
)).toDF
df.registerTempTable("df")
df.printSchema
// root
// |-- an_array: array (nullable = true)
// | |-- element: integer (containsNull = false)
// |-- a_map: map (nullable = true)
// | |-- key: string
// | |-- value: string (valueContainsNull = true)
// |-- a_struct: struct (nullable = true)
// | |-- x: integer (nullable = false)
// |-- an_array_of_structs: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- foo: string (nullable = true)
// | | |-- bar: integer (nullable = false)
// | | |-- vals: array (nullable = true)
// | | | |-- element: double (containsNull = false)
array (
ArrayType
) columns:Column.getItem
methoddf.select($"an_array".getItem(1)).show
// +-----------+
// |an_array[1]|
// +-----------+
// | 2|
// | 5|
// +-----------+Hive brackets syntax:
sqlContext.sql("SELECT an_array[1] FROM df").show
// +---+
// |_c0|
// +---+
// | 2|
// | 5|
// +---+an UDF
val get_ith = udf((xs: Seq[Int], i: Int) => Try(xs(i)).toOption)
df.select(get_ith($"an_array", lit(1))).show
// +---------------+
// |UDF(an_array,1)|
// +---------------+
// | 2|
// | 5|
// +---------------+Additionally to the methods listed above Spark supports a growing list of built-in functions operating on complex types. Notable examples include higher order functions like
transform
(SQL only, 2.4+):df.selectExpr("transform(an_array, x -> x + 1) an_array_inc").show
// +------------+
// |an_array_inc|
// +------------+
// | [2, 3, 4]|
// | [5, 6, 7]|
// +------------+filter
(SQL only, 2.4+)df.selectExpr("filter(an_array, x -> x % 2 == 0) an_array_even").show
// +-------------+
// |an_array_even|
// +-------------+
// | [2]|
// | [4, 6]|
// +-------------+aggregate
(SQL only, 2.4+):df.selectExpr("aggregate(an_array, 0, (acc, x) -> acc + x, acc -> acc) an_array_sum").show
// +------------+
// |an_array_sum|
// +------------+
// | 6|
// | 15|
// +------------+array processing functions (
array_*
) likearray_distinct
(2.4+):import org.apache.spark.sql.functions.array_distinct
df.select(array_distinct($"an_array_of_structs.vals"(0))).show
// +-------------------------------------------+
// |array_distinct(an_array_of_structs.vals[0])|
// +-------------------------------------------+
// | [1.0, 2.0]|
// | [5.0, 6.0]|
// +-------------------------------------------+array_max
(array_min
, 2.4+):import org.apache.spark.sql.functions.array_max
df.select(array_max($"an_array")).show
// +-------------------+
// |array_max(an_array)|
// +-------------------+
// | 3|
// | 6|
// +-------------------+flatten
(2.4+)import org.apache.spark.sql.functions.flatten
df.select(flatten($"an_array_of_structs.vals")).show
// +---------------------------------+
// |flatten(an_array_of_structs.vals)|
// +---------------------------------+
// | [1.0, 2.0, 2.0, 3...|
// | [5.0, 6.0, 7.0, 8.0]|
// +---------------------------------+arrays_zip
(2.4+):import org.apache.spark.sql.functions.arrays_zip
df.select(arrays_zip($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show(false)
// +--------------------------------------------------------------------+
// |arrays_zip(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
// +--------------------------------------------------------------------+
// |[[1.0, 3.0], [2.0, 4.0], [2.0, 5.0]] |
// |[[5.0, 7.0], [6.0, 8.0]] |
// +--------------------------------------------------------------------+array_union
(2.4+):import org.apache.spark.sql.functions.array_union
df.select(array_union($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show
// +---------------------------------------------------------------------+
// |array_union(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
// +---------------------------------------------------------------------+
// | [1.0, 2.0, 3.0, 4...|
// | [5.0, 6.0, 7.0, 8.0]|
// +---------------------------------------------------------------------+slice
(2.4+):import org.apache.spark.sql.functions.slice
df.select(slice($"an_array", 2, 2)).show
// +---------------------+
// |slice(an_array, 2, 2)|
// +---------------------+
// | [2, 3]|
// | [5, 6]|
// +---------------------+
map (
MapType
) columnsusing
Column.getField
method:df.select($"a_map".getField("foo")).show
// +----------+
// |a_map[foo]|
// +----------+
// | bar|
// | null|
// +----------+using Hive brackets syntax:
sqlContext.sql("SELECT a_map['foz'] FROM df").show
// +----+
// | _c0|
// +----+
// |null|
// | baz|
// +----+using a full path with dot syntax:
df.select($"a_map.foo").show
// +----+
// | foo|
// +----+
// | bar|
// |null|
// +----+using an UDF
val get_field = udf((kvs: Map[String, String], k: String) => kvs.get(k))
df.select(get_field($"a_map", lit("foo"))).show
// +--------------+
// |UDF(a_map,foo)|
// +--------------+
// | bar|
// | null|
// +--------------+Growing number of
map_*
functions likemap_keys
(2.3+)import org.apache.spark.sql.functions.map_keys
df.select(map_keys($"a_map")).show
// +---------------+
// |map_keys(a_map)|
// +---------------+
// | [foo]|
// | [foz]|
// +---------------+or
map_values
(2.3+)import org.apache.spark.sql.functions.map_values
df.select(map_values($"a_map")).show
// +-----------------+
// |map_values(a_map)|
// +-----------------+
// | [bar]|
// | [baz]|
// +-----------------+
Please check SPARK-23899 for a detailed list.
struct (
StructType
) columns using full path with dot syntax:with DataFrame API
df.select($"a_struct.x").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+with raw SQL
sqlContext.sql("SELECT a_struct.x FROM df").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+
fields inside array of
structs
can be accessed using dot-syntax, names and standardColumn
methods:df.select($"an_array_of_structs.foo").show
// +----------+
// | foo|
// +----------+
// |[foo, bar]|
// |[foz, baz]|
// +----------+
sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
// +---+
// |_c0|
// +---+
// |foo|
// |foz|
// +---+
df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show
// +------------------------------+
// |an_array_of_structs.vals[1][1]|
// +------------------------------+
// | 4.0|
// | 8.0|
// +------------------------------+user defined types (UDTs) fields can be accessed using UDFs. See SparkSQL referencing attributes of UDT for details.
Notes:
- depending on a Spark version some of these methods can be available only with
HiveContext
. UDFs should work independent of version with both standardSQLContext
andHiveContext
. generally speaking nested values are a second class citizens. Not all typical operations are supported on nested fields. Depending on a context it could be better to flatten the schema and / or explode collections
df.select(explode($"an_array_of_structs")).show
// +--------------------+
// | col|
// +--------------------+
// |[foo,1,WrappedArr...|
// |[bar,2,WrappedArr...|
// |[foz,3,WrappedArr...|
// |[baz,4,WrappedArr...|
// +--------------------+Dot syntax can be combined with wildcard character (
*
) to select (possibly multiple) fields without specifying names explicitly:df.select($"a_struct.*").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+JSON columns can be queried using
get_json_object
andfrom_json
functions. See How to query JSON data column using Spark DataFrames? for details.
Is it possible to fetch all the elements in a struct array? Is something like this possible.. sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
– user1384205
Jun 6 '16 at 15:39
3
This should be the accepted answer.
– Patrick McGloin
Dec 16 '16 at 8:27
How to do the same thing asSELECT an_array_of_structs[0].foo FROM df
using code not spark sql? And is it supported to execute a UDF on an array of structs column(an_array_of_structs) using code? LikeSELECT max(an_array_of_structs.bar) FROM df
using code.
– DeepNightTwo
Mar 5 at 3:44
add a comment |
up vote
103
down vote
accepted
up vote
103
down vote
accepted
It depends on a type of the column. Lets start with some dummy data:
import org.apache.spark.sql.functions.udf, lit
import scala.util.Try
case class SubRecord(x: Int)
case class ArrayElement(foo: String, bar: Int, vals: Array[Double])
case class Record(
an_array: Array[Int], a_map: Map[String, String],
a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])
val df = sc.parallelize(Seq(
Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1),
Array(
ArrayElement("foo", 1, Array(1.0, 2.0, 2.0)),
ArrayElement("bar", 2, Array(3.0, 4.0, 5.0)))),
Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2),
Array(ArrayElement("foz", 3, Array(5.0, 6.0)),
ArrayElement("baz", 4, Array(7.0, 8.0))))
)).toDF
df.registerTempTable("df")
df.printSchema
// root
// |-- an_array: array (nullable = true)
// | |-- element: integer (containsNull = false)
// |-- a_map: map (nullable = true)
// | |-- key: string
// | |-- value: string (valueContainsNull = true)
// |-- a_struct: struct (nullable = true)
// | |-- x: integer (nullable = false)
// |-- an_array_of_structs: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- foo: string (nullable = true)
// | | |-- bar: integer (nullable = false)
// | | |-- vals: array (nullable = true)
// | | | |-- element: double (containsNull = false)
array (
ArrayType
) columns:Column.getItem
methoddf.select($"an_array".getItem(1)).show
// +-----------+
// |an_array[1]|
// +-----------+
// | 2|
// | 5|
// +-----------+Hive brackets syntax:
sqlContext.sql("SELECT an_array[1] FROM df").show
// +---+
// |_c0|
// +---+
// | 2|
// | 5|
// +---+an UDF
val get_ith = udf((xs: Seq[Int], i: Int) => Try(xs(i)).toOption)
df.select(get_ith($"an_array", lit(1))).show
// +---------------+
// |UDF(an_array,1)|
// +---------------+
// | 2|
// | 5|
// +---------------+Additionally to the methods listed above Spark supports a growing list of built-in functions operating on complex types. Notable examples include higher order functions like
transform
(SQL only, 2.4+):df.selectExpr("transform(an_array, x -> x + 1) an_array_inc").show
// +------------+
// |an_array_inc|
// +------------+
// | [2, 3, 4]|
// | [5, 6, 7]|
// +------------+filter
(SQL only, 2.4+)df.selectExpr("filter(an_array, x -> x % 2 == 0) an_array_even").show
// +-------------+
// |an_array_even|
// +-------------+
// | [2]|
// | [4, 6]|
// +-------------+aggregate
(SQL only, 2.4+):df.selectExpr("aggregate(an_array, 0, (acc, x) -> acc + x, acc -> acc) an_array_sum").show
// +------------+
// |an_array_sum|
// +------------+
// | 6|
// | 15|
// +------------+array processing functions (
array_*
) likearray_distinct
(2.4+):import org.apache.spark.sql.functions.array_distinct
df.select(array_distinct($"an_array_of_structs.vals"(0))).show
// +-------------------------------------------+
// |array_distinct(an_array_of_structs.vals[0])|
// +-------------------------------------------+
// | [1.0, 2.0]|
// | [5.0, 6.0]|
// +-------------------------------------------+array_max
(array_min
, 2.4+):import org.apache.spark.sql.functions.array_max
df.select(array_max($"an_array")).show
// +-------------------+
// |array_max(an_array)|
// +-------------------+
// | 3|
// | 6|
// +-------------------+flatten
(2.4+)import org.apache.spark.sql.functions.flatten
df.select(flatten($"an_array_of_structs.vals")).show
// +---------------------------------+
// |flatten(an_array_of_structs.vals)|
// +---------------------------------+
// | [1.0, 2.0, 2.0, 3...|
// | [5.0, 6.0, 7.0, 8.0]|
// +---------------------------------+arrays_zip
(2.4+):import org.apache.spark.sql.functions.arrays_zip
df.select(arrays_zip($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show(false)
// +--------------------------------------------------------------------+
// |arrays_zip(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
// +--------------------------------------------------------------------+
// |[[1.0, 3.0], [2.0, 4.0], [2.0, 5.0]] |
// |[[5.0, 7.0], [6.0, 8.0]] |
// +--------------------------------------------------------------------+array_union
(2.4+):import org.apache.spark.sql.functions.array_union
df.select(array_union($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show
// +---------------------------------------------------------------------+
// |array_union(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
// +---------------------------------------------------------------------+
// | [1.0, 2.0, 3.0, 4...|
// | [5.0, 6.0, 7.0, 8.0]|
// +---------------------------------------------------------------------+slice
(2.4+):import org.apache.spark.sql.functions.slice
df.select(slice($"an_array", 2, 2)).show
// +---------------------+
// |slice(an_array, 2, 2)|
// +---------------------+
// | [2, 3]|
// | [5, 6]|
// +---------------------+
map (
MapType
) columnsusing
Column.getField
method:df.select($"a_map".getField("foo")).show
// +----------+
// |a_map[foo]|
// +----------+
// | bar|
// | null|
// +----------+using Hive brackets syntax:
sqlContext.sql("SELECT a_map['foz'] FROM df").show
// +----+
// | _c0|
// +----+
// |null|
// | baz|
// +----+using a full path with dot syntax:
df.select($"a_map.foo").show
// +----+
// | foo|
// +----+
// | bar|
// |null|
// +----+using an UDF
val get_field = udf((kvs: Map[String, String], k: String) => kvs.get(k))
df.select(get_field($"a_map", lit("foo"))).show
// +--------------+
// |UDF(a_map,foo)|
// +--------------+
// | bar|
// | null|
// +--------------+Growing number of
map_*
functions likemap_keys
(2.3+)import org.apache.spark.sql.functions.map_keys
df.select(map_keys($"a_map")).show
// +---------------+
// |map_keys(a_map)|
// +---------------+
// | [foo]|
// | [foz]|
// +---------------+or
map_values
(2.3+)import org.apache.spark.sql.functions.map_values
df.select(map_values($"a_map")).show
// +-----------------+
// |map_values(a_map)|
// +-----------------+
// | [bar]|
// | [baz]|
// +-----------------+
Please check SPARK-23899 for a detailed list.
struct (
StructType
) columns using full path with dot syntax:with DataFrame API
df.select($"a_struct.x").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+with raw SQL
sqlContext.sql("SELECT a_struct.x FROM df").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+
fields inside array of
structs
can be accessed using dot-syntax, names and standardColumn
methods:df.select($"an_array_of_structs.foo").show
// +----------+
// | foo|
// +----------+
// |[foo, bar]|
// |[foz, baz]|
// +----------+
sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
// +---+
// |_c0|
// +---+
// |foo|
// |foz|
// +---+
df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show
// +------------------------------+
// |an_array_of_structs.vals[1][1]|
// +------------------------------+
// | 4.0|
// | 8.0|
// +------------------------------+user defined types (UDTs) fields can be accessed using UDFs. See SparkSQL referencing attributes of UDT for details.
Notes:
- depending on a Spark version some of these methods can be available only with
HiveContext
. UDFs should work independent of version with both standardSQLContext
andHiveContext
. generally speaking nested values are a second class citizens. Not all typical operations are supported on nested fields. Depending on a context it could be better to flatten the schema and / or explode collections
df.select(explode($"an_array_of_structs")).show
// +--------------------+
// | col|
// +--------------------+
// |[foo,1,WrappedArr...|
// |[bar,2,WrappedArr...|
// |[foz,3,WrappedArr...|
// |[baz,4,WrappedArr...|
// +--------------------+Dot syntax can be combined with wildcard character (
*
) to select (possibly multiple) fields without specifying names explicitly:df.select($"a_struct.*").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+JSON columns can be queried using
get_json_object
andfrom_json
functions. See How to query JSON data column using Spark DataFrames? for details.
It depends on a type of the column. Lets start with some dummy data:
import org.apache.spark.sql.functions.udf, lit
import scala.util.Try
case class SubRecord(x: Int)
case class ArrayElement(foo: String, bar: Int, vals: Array[Double])
case class Record(
an_array: Array[Int], a_map: Map[String, String],
a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])
val df = sc.parallelize(Seq(
Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1),
Array(
ArrayElement("foo", 1, Array(1.0, 2.0, 2.0)),
ArrayElement("bar", 2, Array(3.0, 4.0, 5.0)))),
Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2),
Array(ArrayElement("foz", 3, Array(5.0, 6.0)),
ArrayElement("baz", 4, Array(7.0, 8.0))))
)).toDF
df.registerTempTable("df")
df.printSchema
// root
// |-- an_array: array (nullable = true)
// | |-- element: integer (containsNull = false)
// |-- a_map: map (nullable = true)
// | |-- key: string
// | |-- value: string (valueContainsNull = true)
// |-- a_struct: struct (nullable = true)
// | |-- x: integer (nullable = false)
// |-- an_array_of_structs: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- foo: string (nullable = true)
// | | |-- bar: integer (nullable = false)
// | | |-- vals: array (nullable = true)
// | | | |-- element: double (containsNull = false)
array (
ArrayType
) columns:Column.getItem
methoddf.select($"an_array".getItem(1)).show
// +-----------+
// |an_array[1]|
// +-----------+
// | 2|
// | 5|
// +-----------+Hive brackets syntax:
sqlContext.sql("SELECT an_array[1] FROM df").show
// +---+
// |_c0|
// +---+
// | 2|
// | 5|
// +---+an UDF
val get_ith = udf((xs: Seq[Int], i: Int) => Try(xs(i)).toOption)
df.select(get_ith($"an_array", lit(1))).show
// +---------------+
// |UDF(an_array,1)|
// +---------------+
// | 2|
// | 5|
// +---------------+Additionally to the methods listed above Spark supports a growing list of built-in functions operating on complex types. Notable examples include higher order functions like
transform
(SQL only, 2.4+):df.selectExpr("transform(an_array, x -> x + 1) an_array_inc").show
// +------------+
// |an_array_inc|
// +------------+
// | [2, 3, 4]|
// | [5, 6, 7]|
// +------------+filter
(SQL only, 2.4+)df.selectExpr("filter(an_array, x -> x % 2 == 0) an_array_even").show
// +-------------+
// |an_array_even|
// +-------------+
// | [2]|
// | [4, 6]|
// +-------------+aggregate
(SQL only, 2.4+):df.selectExpr("aggregate(an_array, 0, (acc, x) -> acc + x, acc -> acc) an_array_sum").show
// +------------+
// |an_array_sum|
// +------------+
// | 6|
// | 15|
// +------------+array processing functions (
array_*
) likearray_distinct
(2.4+):import org.apache.spark.sql.functions.array_distinct
df.select(array_distinct($"an_array_of_structs.vals"(0))).show
// +-------------------------------------------+
// |array_distinct(an_array_of_structs.vals[0])|
// +-------------------------------------------+
// | [1.0, 2.0]|
// | [5.0, 6.0]|
// +-------------------------------------------+array_max
(array_min
, 2.4+):import org.apache.spark.sql.functions.array_max
df.select(array_max($"an_array")).show
// +-------------------+
// |array_max(an_array)|
// +-------------------+
// | 3|
// | 6|
// +-------------------+flatten
(2.4+)import org.apache.spark.sql.functions.flatten
df.select(flatten($"an_array_of_structs.vals")).show
// +---------------------------------+
// |flatten(an_array_of_structs.vals)|
// +---------------------------------+
// | [1.0, 2.0, 2.0, 3...|
// | [5.0, 6.0, 7.0, 8.0]|
// +---------------------------------+arrays_zip
(2.4+):import org.apache.spark.sql.functions.arrays_zip
df.select(arrays_zip($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show(false)
// +--------------------------------------------------------------------+
// |arrays_zip(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
// +--------------------------------------------------------------------+
// |[[1.0, 3.0], [2.0, 4.0], [2.0, 5.0]] |
// |[[5.0, 7.0], [6.0, 8.0]] |
// +--------------------------------------------------------------------+array_union
(2.4+):import org.apache.spark.sql.functions.array_union
df.select(array_union($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show
// +---------------------------------------------------------------------+
// |array_union(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
// +---------------------------------------------------------------------+
// | [1.0, 2.0, 3.0, 4...|
// | [5.0, 6.0, 7.0, 8.0]|
// +---------------------------------------------------------------------+slice
(2.4+):import org.apache.spark.sql.functions.slice
df.select(slice($"an_array", 2, 2)).show
// +---------------------+
// |slice(an_array, 2, 2)|
// +---------------------+
// | [2, 3]|
// | [5, 6]|
// +---------------------+
map (
MapType
) columnsusing
Column.getField
method:df.select($"a_map".getField("foo")).show
// +----------+
// |a_map[foo]|
// +----------+
// | bar|
// | null|
// +----------+using Hive brackets syntax:
sqlContext.sql("SELECT a_map['foz'] FROM df").show
// +----+
// | _c0|
// +----+
// |null|
// | baz|
// +----+using a full path with dot syntax:
df.select($"a_map.foo").show
// +----+
// | foo|
// +----+
// | bar|
// |null|
// +----+using an UDF
val get_field = udf((kvs: Map[String, String], k: String) => kvs.get(k))
df.select(get_field($"a_map", lit("foo"))).show
// +--------------+
// |UDF(a_map,foo)|
// +--------------+
// | bar|
// | null|
// +--------------+Growing number of
map_*
functions likemap_keys
(2.3+)import org.apache.spark.sql.functions.map_keys
df.select(map_keys($"a_map")).show
// +---------------+
// |map_keys(a_map)|
// +---------------+
// | [foo]|
// | [foz]|
// +---------------+or
map_values
(2.3+)import org.apache.spark.sql.functions.map_values
df.select(map_values($"a_map")).show
// +-----------------+
// |map_values(a_map)|
// +-----------------+
// | [bar]|
// | [baz]|
// +-----------------+
Please check SPARK-23899 for a detailed list.
struct (
StructType
) columns using full path with dot syntax:with DataFrame API
df.select($"a_struct.x").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+with raw SQL
sqlContext.sql("SELECT a_struct.x FROM df").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+
fields inside array of
structs
can be accessed using dot-syntax, names and standardColumn
methods:df.select($"an_array_of_structs.foo").show
// +----------+
// | foo|
// +----------+
// |[foo, bar]|
// |[foz, baz]|
// +----------+
sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
// +---+
// |_c0|
// +---+
// |foo|
// |foz|
// +---+
df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show
// +------------------------------+
// |an_array_of_structs.vals[1][1]|
// +------------------------------+
// | 4.0|
// | 8.0|
// +------------------------------+user defined types (UDTs) fields can be accessed using UDFs. See SparkSQL referencing attributes of UDT for details.
Notes:
- depending on a Spark version some of these methods can be available only with
HiveContext
. UDFs should work independent of version with both standardSQLContext
andHiveContext
. generally speaking nested values are a second class citizens. Not all typical operations are supported on nested fields. Depending on a context it could be better to flatten the schema and / or explode collections
df.select(explode($"an_array_of_structs")).show
// +--------------------+
// | col|
// +--------------------+
// |[foo,1,WrappedArr...|
// |[bar,2,WrappedArr...|
// |[foz,3,WrappedArr...|
// |[baz,4,WrappedArr...|
// +--------------------+Dot syntax can be combined with wildcard character (
*
) to select (possibly multiple) fields without specifying names explicitly:df.select($"a_struct.*").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+JSON columns can be queried using
get_json_object
andfrom_json
functions. See How to query JSON data column using Spark DataFrames? for details.
edited Oct 24 at 3:06
Community♦
11
11
answered Nov 22 '15 at 1:03
zero323
160k39460562
160k39460562
Is it possible to fetch all the elements in a struct array? Is something like this possible.. sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
– user1384205
Jun 6 '16 at 15:39
3
This should be the accepted answer.
– Patrick McGloin
Dec 16 '16 at 8:27
How to do the same thing asSELECT an_array_of_structs[0].foo FROM df
using code not spark sql? And is it supported to execute a UDF on an array of structs column(an_array_of_structs) using code? LikeSELECT max(an_array_of_structs.bar) FROM df
using code.
– DeepNightTwo
Mar 5 at 3:44
add a comment |
Is it possible to fetch all the elements in a struct array? Is something like this possible.. sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
– user1384205
Jun 6 '16 at 15:39
3
This should be the accepted answer.
– Patrick McGloin
Dec 16 '16 at 8:27
How to do the same thing asSELECT an_array_of_structs[0].foo FROM df
using code not spark sql? And is it supported to execute a UDF on an array of structs column(an_array_of_structs) using code? LikeSELECT max(an_array_of_structs.bar) FROM df
using code.
– DeepNightTwo
Mar 5 at 3:44
Is it possible to fetch all the elements in a struct array? Is something like this possible.. sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
– user1384205
Jun 6 '16 at 15:39
Is it possible to fetch all the elements in a struct array? Is something like this possible.. sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
– user1384205
Jun 6 '16 at 15:39
3
3
This should be the accepted answer.
– Patrick McGloin
Dec 16 '16 at 8:27
This should be the accepted answer.
– Patrick McGloin
Dec 16 '16 at 8:27
How to do the same thing as
SELECT an_array_of_structs[0].foo FROM df
using code not spark sql? And is it supported to execute a UDF on an array of structs column(an_array_of_structs) using code? Like SELECT max(an_array_of_structs.bar) FROM df
using code.– DeepNightTwo
Mar 5 at 3:44
How to do the same thing as
SELECT an_array_of_structs[0].foo FROM df
using code not spark sql? And is it supported to execute a UDF on an array of structs column(an_array_of_structs) using code? Like SELECT max(an_array_of_structs.bar) FROM df
using code.– DeepNightTwo
Mar 5 at 3:44
add a comment |
up vote
2
down vote
Once You convert it to DF, u can simply fetch data as
val rddRow= rdd.map(kv=>
val k = kv._1
val v = kv._2
Row(k, v)
)
val myFld1 = StructField("name", org.apache.spark.sql.types.StringType, true)
val myFld2 = StructField("map", org.apache.spark.sql.types.MapType(StringType, StringType), true)
val arr = Array( myFld1, myFld2)
val schema = StructType( arr )
val rowrddDF = sqc.createDataFrame(rddRow, schema)
rowrddDF.registerTempTable("rowtbl")
val rowrddDFFinal = rowrddDF.select(rowrddDF("map.one"))
or
val rowrddDFFinal = rowrddDF.select("map.one")
when I try this I geterror: value _1 is not a member of org.apache.spark.sql.Row
– Paul
Nov 23 '17 at 17:32
add a comment |
up vote
2
down vote
Once You convert it to DF, u can simply fetch data as
val rddRow= rdd.map(kv=>
val k = kv._1
val v = kv._2
Row(k, v)
)
val myFld1 = StructField("name", org.apache.spark.sql.types.StringType, true)
val myFld2 = StructField("map", org.apache.spark.sql.types.MapType(StringType, StringType), true)
val arr = Array( myFld1, myFld2)
val schema = StructType( arr )
val rowrddDF = sqc.createDataFrame(rddRow, schema)
rowrddDF.registerTempTable("rowtbl")
val rowrddDFFinal = rowrddDF.select(rowrddDF("map.one"))
or
val rowrddDFFinal = rowrddDF.select("map.one")
when I try this I geterror: value _1 is not a member of org.apache.spark.sql.Row
– Paul
Nov 23 '17 at 17:32
add a comment |
up vote
2
down vote
up vote
2
down vote
Once You convert it to DF, u can simply fetch data as
val rddRow= rdd.map(kv=>
val k = kv._1
val v = kv._2
Row(k, v)
)
val myFld1 = StructField("name", org.apache.spark.sql.types.StringType, true)
val myFld2 = StructField("map", org.apache.spark.sql.types.MapType(StringType, StringType), true)
val arr = Array( myFld1, myFld2)
val schema = StructType( arr )
val rowrddDF = sqc.createDataFrame(rddRow, schema)
rowrddDF.registerTempTable("rowtbl")
val rowrddDFFinal = rowrddDF.select(rowrddDF("map.one"))
or
val rowrddDFFinal = rowrddDF.select("map.one")
Once You convert it to DF, u can simply fetch data as
val rddRow= rdd.map(kv=>
val k = kv._1
val v = kv._2
Row(k, v)
)
val myFld1 = StructField("name", org.apache.spark.sql.types.StringType, true)
val myFld2 = StructField("map", org.apache.spark.sql.types.MapType(StringType, StringType), true)
val arr = Array( myFld1, myFld2)
val schema = StructType( arr )
val rowrddDF = sqc.createDataFrame(rddRow, schema)
rowrddDF.registerTempTable("rowtbl")
val rowrddDFFinal = rowrddDF.select(rowrddDF("map.one"))
or
val rowrddDFFinal = rowrddDF.select("map.one")
answered Sep 16 '15 at 1:24
sshroff
3712712
3712712
when I try this I geterror: value _1 is not a member of org.apache.spark.sql.Row
– Paul
Nov 23 '17 at 17:32
add a comment |
when I try this I geterror: value _1 is not a member of org.apache.spark.sql.Row
– Paul
Nov 23 '17 at 17:32
when I try this I get
error: value _1 is not a member of org.apache.spark.sql.Row
– Paul
Nov 23 '17 at 17:32
when I try this I get
error: value _1 is not a member of org.apache.spark.sql.Row
– Paul
Nov 23 '17 at 17:32
add a comment |
up vote
0
down vote
here was what I did and it worked
case class Test(name: String, m: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
val rdddf = rdd.toDF
rdddf.registerTempTable("mytable")
sqlContext.sql("select m.hello from mytable").show
Results
+------+
| hello|
+------+
| world|
|people|
+------+
add a comment |
up vote
0
down vote
here was what I did and it worked
case class Test(name: String, m: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
val rdddf = rdd.toDF
rdddf.registerTempTable("mytable")
sqlContext.sql("select m.hello from mytable").show
Results
+------+
| hello|
+------+
| world|
|people|
+------+
add a comment |
up vote
0
down vote
up vote
0
down vote
here was what I did and it worked
case class Test(name: String, m: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
val rdddf = rdd.toDF
rdddf.registerTempTable("mytable")
sqlContext.sql("select m.hello from mytable").show
Results
+------+
| hello|
+------+
| world|
|people|
+------+
here was what I did and it worked
case class Test(name: String, m: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
val rdddf = rdd.toDF
rdddf.registerTempTable("mytable")
sqlContext.sql("select m.hello from mytable").show
Results
+------+
| hello|
+------+
| world|
|people|
+------+
edited Jun 16 '16 at 15:56
zero323
160k39460562
160k39460562
answered Jun 6 '16 at 18:00
Sumit Pal
21948
21948
add a comment |
add a comment |
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f28332494%2fquerying-spark-sql-dataframe-with-complex-types%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
1
How about accepting that tome of an answer from @zero323 ?
– javadba
Dec 5 '17 at 6:08