Querying Spark SQL DataFrame with complex types









up vote
39
down vote

favorite
22












How Can I query an RDD with complex types such as maps/arrays?
for example, when I was writing this test code:



case class Test(name: String, map: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))


I though the syntax would be something like:



sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world")


or



sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world")


but I get




Can't access nested field in type MapType(StringType,StringType,true)




and




org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes




respectively.










share|improve this question



















  • 1




    How about accepting that tome of an answer from @zero323 ?
    – javadba
    Dec 5 '17 at 6:08














up vote
39
down vote

favorite
22












How Can I query an RDD with complex types such as maps/arrays?
for example, when I was writing this test code:



case class Test(name: String, map: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))


I though the syntax would be something like:



sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world")


or



sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world")


but I get




Can't access nested field in type MapType(StringType,StringType,true)




and




org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes




respectively.










share|improve this question



















  • 1




    How about accepting that tome of an answer from @zero323 ?
    – javadba
    Dec 5 '17 at 6:08












up vote
39
down vote

favorite
22









up vote
39
down vote

favorite
22






22





How Can I query an RDD with complex types such as maps/arrays?
for example, when I was writing this test code:



case class Test(name: String, map: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))


I though the syntax would be something like:



sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world")


or



sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world")


but I get




Can't access nested field in type MapType(StringType,StringType,true)




and




org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes




respectively.










share|improve this question















How Can I query an RDD with complex types such as maps/arrays?
for example, when I was writing this test code:



case class Test(name: String, map: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))


I though the syntax would be something like:



sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world")


or



sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world")


but I get




Can't access nested field in type MapType(StringType,StringType,true)




and




org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes




respectively.







scala apache-spark dataframe apache-spark-sql spark-dataframe






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Mar 22 at 15:27









Community

11




11










asked Feb 4 '15 at 22:12









dvir

9451910




9451910







  • 1




    How about accepting that tome of an answer from @zero323 ?
    – javadba
    Dec 5 '17 at 6:08












  • 1




    How about accepting that tome of an answer from @zero323 ?
    – javadba
    Dec 5 '17 at 6:08







1




1




How about accepting that tome of an answer from @zero323 ?
– javadba
Dec 5 '17 at 6:08




How about accepting that tome of an answer from @zero323 ?
– javadba
Dec 5 '17 at 6:08












3 Answers
3






active

oldest

votes

















up vote
103
down vote



accepted










It depends on a type of the column. Lets start with some dummy data:



import org.apache.spark.sql.functions.udf, lit
import scala.util.Try

case class SubRecord(x: Int)
case class ArrayElement(foo: String, bar: Int, vals: Array[Double])
case class Record(
an_array: Array[Int], a_map: Map[String, String],
a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])


val df = sc.parallelize(Seq(
Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1),
Array(
ArrayElement("foo", 1, Array(1.0, 2.0, 2.0)),
ArrayElement("bar", 2, Array(3.0, 4.0, 5.0)))),
Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2),
Array(ArrayElement("foz", 3, Array(5.0, 6.0)),
ArrayElement("baz", 4, Array(7.0, 8.0))))
)).toDF




df.registerTempTable("df")
df.printSchema

// root
// |-- an_array: array (nullable = true)
// | |-- element: integer (containsNull = false)
// |-- a_map: map (nullable = true)
// | |-- key: string
// | |-- value: string (valueContainsNull = true)
// |-- a_struct: struct (nullable = true)
// | |-- x: integer (nullable = false)
// |-- an_array_of_structs: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- foo: string (nullable = true)
// | | |-- bar: integer (nullable = false)
// | | |-- vals: array (nullable = true)
// | | | |-- element: double (containsNull = false)



  • array (ArrayType) columns:




    • Column.getItem method



      df.select($"an_array".getItem(1)).show

      // +-----------+
      // |an_array[1]|
      // +-----------+
      // | 2|
      // | 5|
      // +-----------+



    • Hive brackets syntax:



      sqlContext.sql("SELECT an_array[1] FROM df").show

      // +---+
      // |_c0|
      // +---+
      // | 2|
      // | 5|
      // +---+



    • an UDF



      val get_ith = udf((xs: Seq[Int], i: Int) => Try(xs(i)).toOption)

      df.select(get_ith($"an_array", lit(1))).show

      // +---------------+
      // |UDF(an_array,1)|
      // +---------------+
      // | 2|
      // | 5|
      // +---------------+



    • Additionally to the methods listed above Spark supports a growing list of built-in functions operating on complex types. Notable examples include higher order functions like transform (SQL only, 2.4+):



      df.selectExpr("transform(an_array, x -> x + 1) an_array_inc").show
      // +------------+
      // |an_array_inc|
      // +------------+
      // | [2, 3, 4]|
      // | [5, 6, 7]|
      // +------------+



    • filter (SQL only, 2.4+)



      df.selectExpr("filter(an_array, x -> x % 2 == 0) an_array_even").show
      // +-------------+
      // |an_array_even|
      // +-------------+
      // | [2]|
      // | [4, 6]|
      // +-------------+



    • aggregate (SQL only, 2.4+):



      df.selectExpr("aggregate(an_array, 0, (acc, x) -> acc + x, acc -> acc) an_array_sum").show
      // +------------+
      // |an_array_sum|
      // +------------+
      // | 6|
      // | 15|
      // +------------+



    • array processing functions (array_*) like array_distinct (2.4+):



      import org.apache.spark.sql.functions.array_distinct

      df.select(array_distinct($"an_array_of_structs.vals"(0))).show
      // +-------------------------------------------+
      // |array_distinct(an_array_of_structs.vals[0])|
      // +-------------------------------------------+
      // | [1.0, 2.0]|
      // | [5.0, 6.0]|
      // +-------------------------------------------+



    • array_max (array_min, 2.4+):



      import org.apache.spark.sql.functions.array_max

      df.select(array_max($"an_array")).show
      // +-------------------+
      // |array_max(an_array)|
      // +-------------------+
      // | 3|
      // | 6|
      // +-------------------+



    • flatten (2.4+)



      import org.apache.spark.sql.functions.flatten

      df.select(flatten($"an_array_of_structs.vals")).show
      // +---------------------------------+
      // |flatten(an_array_of_structs.vals)|
      // +---------------------------------+
      // | [1.0, 2.0, 2.0, 3...|
      // | [5.0, 6.0, 7.0, 8.0]|
      // +---------------------------------+



    • arrays_zip (2.4+):



      import org.apache.spark.sql.functions.arrays_zip

      df.select(arrays_zip($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show(false)
      // +--------------------------------------------------------------------+
      // |arrays_zip(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
      // +--------------------------------------------------------------------+
      // |[[1.0, 3.0], [2.0, 4.0], [2.0, 5.0]] |
      // |[[5.0, 7.0], [6.0, 8.0]] |
      // +--------------------------------------------------------------------+



    • array_union (2.4+):



      import org.apache.spark.sql.functions.array_union

      df.select(array_union($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show
      // +---------------------------------------------------------------------+
      // |array_union(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
      // +---------------------------------------------------------------------+
      // | [1.0, 2.0, 3.0, 4...|
      // | [5.0, 6.0, 7.0, 8.0]|
      // +---------------------------------------------------------------------+



    • slice (2.4+):



      import org.apache.spark.sql.functions.slice

      df.select(slice($"an_array", 2, 2)).show
      // +---------------------+
      // |slice(an_array, 2, 2)|
      // +---------------------+
      // | [2, 3]|
      // | [5, 6]|
      // +---------------------+




  • map (MapType) columns




    • using Column.getField method:



      df.select($"a_map".getField("foo")).show

      // +----------+
      // |a_map[foo]|
      // +----------+
      // | bar|
      // | null|
      // +----------+



    • using Hive brackets syntax:



      sqlContext.sql("SELECT a_map['foz'] FROM df").show

      // +----+
      // | _c0|
      // +----+
      // |null|
      // | baz|
      // +----+



    • using a full path with dot syntax:



      df.select($"a_map.foo").show

      // +----+
      // | foo|
      // +----+
      // | bar|
      // |null|
      // +----+



    • using an UDF



      val get_field = udf((kvs: Map[String, String], k: String) => kvs.get(k))

      df.select(get_field($"a_map", lit("foo"))).show

      // +--------------+
      // |UDF(a_map,foo)|
      // +--------------+
      // | bar|
      // | null|
      // +--------------+



    • Growing number of map_* functions like map_keys (2.3+)



      import org.apache.spark.sql.functions.map_keys

      df.select(map_keys($"a_map")).show
      // +---------------+
      // |map_keys(a_map)|
      // +---------------+
      // | [foo]|
      // | [foz]|
      // +---------------+



    • or map_values (2.3+)



      import org.apache.spark.sql.functions.map_values

      df.select(map_values($"a_map")).show
      // +-----------------+
      // |map_values(a_map)|
      // +-----------------+
      // | [bar]|
      // | [baz]|
      // +-----------------+


    Please check SPARK-23899 for a detailed list.




  • struct (StructType) columns using full path with dot syntax:




    • with DataFrame API



      df.select($"a_struct.x").show

      // +---+
      // | x|
      // +---+
      // | 1|
      // | 2|
      // +---+



    • with raw SQL



      sqlContext.sql("SELECT a_struct.x FROM df").show

      // +---+
      // | x|
      // +---+
      // | 1|
      // | 2|
      // +---+




  • fields inside array of structs can be accessed using dot-syntax, names and standard Column methods:



    df.select($"an_array_of_structs.foo").show

    // +----------+
    // | foo|
    // +----------+
    // |[foo, bar]|
    // |[foz, baz]|
    // +----------+

    sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show

    // +---+
    // |_c0|
    // +---+
    // |foo|
    // |foz|
    // +---+

    df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show

    // +------------------------------+
    // |an_array_of_structs.vals[1][1]|
    // +------------------------------+
    // | 4.0|
    // | 8.0|
    // +------------------------------+


  • user defined types (UDTs) fields can be accessed using UDFs. See SparkSQL referencing attributes of UDT for details.


Notes:



  • depending on a Spark version some of these methods can be available only with HiveContext. UDFs should work independent of version with both standard SQLContext and HiveContext.


  • generally speaking nested values are a second class citizens. Not all typical operations are supported on nested fields. Depending on a context it could be better to flatten the schema and / or explode collections



    df.select(explode($"an_array_of_structs")).show

    // +--------------------+
    // | col|
    // +--------------------+
    // |[foo,1,WrappedArr...|
    // |[bar,2,WrappedArr...|
    // |[foz,3,WrappedArr...|
    // |[baz,4,WrappedArr...|
    // +--------------------+



  • Dot syntax can be combined with wildcard character (*) to select (possibly multiple) fields without specifying names explicitly:



    df.select($"a_struct.*").show
    // +---+
    // | x|
    // +---+
    // | 1|
    // | 2|
    // +---+


  • JSON columns can be queried using get_json_object and from_json functions. See How to query JSON data column using Spark DataFrames? for details.






share|improve this answer






















  • Is it possible to fetch all the elements in a struct array? Is something like this possible.. sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
    – user1384205
    Jun 6 '16 at 15:39






  • 3




    This should be the accepted answer.
    – Patrick McGloin
    Dec 16 '16 at 8:27










  • How to do the same thing as SELECT an_array_of_structs[0].foo FROM df using code not spark sql? And is it supported to execute a UDF on an array of structs column(an_array_of_structs) using code? Like SELECT max(an_array_of_structs.bar) FROM df using code.
    – DeepNightTwo
    Mar 5 at 3:44

















up vote
2
down vote













Once You convert it to DF, u can simply fetch data as



 val rddRow= rdd.map(kv=>
val k = kv._1
val v = kv._2
Row(k, v)
)

val myFld1 = StructField("name", org.apache.spark.sql.types.StringType, true)
val myFld2 = StructField("map", org.apache.spark.sql.types.MapType(StringType, StringType), true)
val arr = Array( myFld1, myFld2)
val schema = StructType( arr )
val rowrddDF = sqc.createDataFrame(rddRow, schema)
rowrddDF.registerTempTable("rowtbl")
val rowrddDFFinal = rowrddDF.select(rowrddDF("map.one"))
or
val rowrddDFFinal = rowrddDF.select("map.one")





share|improve this answer




















  • when I try this I get error: value _1 is not a member of org.apache.spark.sql.Row
    – Paul
    Nov 23 '17 at 17:32

















up vote
0
down vote













here was what I did and it worked



case class Test(name: String, m: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
val rdddf = rdd.toDF
rdddf.registerTempTable("mytable")
sqlContext.sql("select m.hello from mytable").show


Results



+------+
| hello|
+------+
| world|
|people|
+------+





share|improve this answer






















    Your Answer






    StackExchange.ifUsing("editor", function ()
    StackExchange.using("externalEditor", function ()
    StackExchange.using("snippets", function ()
    StackExchange.snippets.init();
    );
    );
    , "code-snippets");

    StackExchange.ready(function()
    var channelOptions =
    tags: "".split(" "),
    id: "1"
    ;
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function()
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled)
    StackExchange.using("snippets", function()
    createEditor();
    );

    else
    createEditor();

    );

    function createEditor()
    StackExchange.prepareEditor(
    heartbeatType: 'answer',
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader:
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    ,
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    );



    );













     

    draft saved


    draft discarded


















    StackExchange.ready(
    function ()
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f28332494%2fquerying-spark-sql-dataframe-with-complex-types%23new-answer', 'question_page');

    );

    Post as a guest















    Required, but never shown

























    3 Answers
    3






    active

    oldest

    votes








    3 Answers
    3






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes








    up vote
    103
    down vote



    accepted










    It depends on a type of the column. Lets start with some dummy data:



    import org.apache.spark.sql.functions.udf, lit
    import scala.util.Try

    case class SubRecord(x: Int)
    case class ArrayElement(foo: String, bar: Int, vals: Array[Double])
    case class Record(
    an_array: Array[Int], a_map: Map[String, String],
    a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])


    val df = sc.parallelize(Seq(
    Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1),
    Array(
    ArrayElement("foo", 1, Array(1.0, 2.0, 2.0)),
    ArrayElement("bar", 2, Array(3.0, 4.0, 5.0)))),
    Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2),
    Array(ArrayElement("foz", 3, Array(5.0, 6.0)),
    ArrayElement("baz", 4, Array(7.0, 8.0))))
    )).toDF




    df.registerTempTable("df")
    df.printSchema

    // root
    // |-- an_array: array (nullable = true)
    // | |-- element: integer (containsNull = false)
    // |-- a_map: map (nullable = true)
    // | |-- key: string
    // | |-- value: string (valueContainsNull = true)
    // |-- a_struct: struct (nullable = true)
    // | |-- x: integer (nullable = false)
    // |-- an_array_of_structs: array (nullable = true)
    // | |-- element: struct (containsNull = true)
    // | | |-- foo: string (nullable = true)
    // | | |-- bar: integer (nullable = false)
    // | | |-- vals: array (nullable = true)
    // | | | |-- element: double (containsNull = false)



    • array (ArrayType) columns:




      • Column.getItem method



        df.select($"an_array".getItem(1)).show

        // +-----------+
        // |an_array[1]|
        // +-----------+
        // | 2|
        // | 5|
        // +-----------+



      • Hive brackets syntax:



        sqlContext.sql("SELECT an_array[1] FROM df").show

        // +---+
        // |_c0|
        // +---+
        // | 2|
        // | 5|
        // +---+



      • an UDF



        val get_ith = udf((xs: Seq[Int], i: Int) => Try(xs(i)).toOption)

        df.select(get_ith($"an_array", lit(1))).show

        // +---------------+
        // |UDF(an_array,1)|
        // +---------------+
        // | 2|
        // | 5|
        // +---------------+



      • Additionally to the methods listed above Spark supports a growing list of built-in functions operating on complex types. Notable examples include higher order functions like transform (SQL only, 2.4+):



        df.selectExpr("transform(an_array, x -> x + 1) an_array_inc").show
        // +------------+
        // |an_array_inc|
        // +------------+
        // | [2, 3, 4]|
        // | [5, 6, 7]|
        // +------------+



      • filter (SQL only, 2.4+)



        df.selectExpr("filter(an_array, x -> x % 2 == 0) an_array_even").show
        // +-------------+
        // |an_array_even|
        // +-------------+
        // | [2]|
        // | [4, 6]|
        // +-------------+



      • aggregate (SQL only, 2.4+):



        df.selectExpr("aggregate(an_array, 0, (acc, x) -> acc + x, acc -> acc) an_array_sum").show
        // +------------+
        // |an_array_sum|
        // +------------+
        // | 6|
        // | 15|
        // +------------+



      • array processing functions (array_*) like array_distinct (2.4+):



        import org.apache.spark.sql.functions.array_distinct

        df.select(array_distinct($"an_array_of_structs.vals"(0))).show
        // +-------------------------------------------+
        // |array_distinct(an_array_of_structs.vals[0])|
        // +-------------------------------------------+
        // | [1.0, 2.0]|
        // | [5.0, 6.0]|
        // +-------------------------------------------+



      • array_max (array_min, 2.4+):



        import org.apache.spark.sql.functions.array_max

        df.select(array_max($"an_array")).show
        // +-------------------+
        // |array_max(an_array)|
        // +-------------------+
        // | 3|
        // | 6|
        // +-------------------+



      • flatten (2.4+)



        import org.apache.spark.sql.functions.flatten

        df.select(flatten($"an_array_of_structs.vals")).show
        // +---------------------------------+
        // |flatten(an_array_of_structs.vals)|
        // +---------------------------------+
        // | [1.0, 2.0, 2.0, 3...|
        // | [5.0, 6.0, 7.0, 8.0]|
        // +---------------------------------+



      • arrays_zip (2.4+):



        import org.apache.spark.sql.functions.arrays_zip

        df.select(arrays_zip($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show(false)
        // +--------------------------------------------------------------------+
        // |arrays_zip(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
        // +--------------------------------------------------------------------+
        // |[[1.0, 3.0], [2.0, 4.0], [2.0, 5.0]] |
        // |[[5.0, 7.0], [6.0, 8.0]] |
        // +--------------------------------------------------------------------+



      • array_union (2.4+):



        import org.apache.spark.sql.functions.array_union

        df.select(array_union($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show
        // +---------------------------------------------------------------------+
        // |array_union(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
        // +---------------------------------------------------------------------+
        // | [1.0, 2.0, 3.0, 4...|
        // | [5.0, 6.0, 7.0, 8.0]|
        // +---------------------------------------------------------------------+



      • slice (2.4+):



        import org.apache.spark.sql.functions.slice

        df.select(slice($"an_array", 2, 2)).show
        // +---------------------+
        // |slice(an_array, 2, 2)|
        // +---------------------+
        // | [2, 3]|
        // | [5, 6]|
        // +---------------------+




    • map (MapType) columns




      • using Column.getField method:



        df.select($"a_map".getField("foo")).show

        // +----------+
        // |a_map[foo]|
        // +----------+
        // | bar|
        // | null|
        // +----------+



      • using Hive brackets syntax:



        sqlContext.sql("SELECT a_map['foz'] FROM df").show

        // +----+
        // | _c0|
        // +----+
        // |null|
        // | baz|
        // +----+



      • using a full path with dot syntax:



        df.select($"a_map.foo").show

        // +----+
        // | foo|
        // +----+
        // | bar|
        // |null|
        // +----+



      • using an UDF



        val get_field = udf((kvs: Map[String, String], k: String) => kvs.get(k))

        df.select(get_field($"a_map", lit("foo"))).show

        // +--------------+
        // |UDF(a_map,foo)|
        // +--------------+
        // | bar|
        // | null|
        // +--------------+



      • Growing number of map_* functions like map_keys (2.3+)



        import org.apache.spark.sql.functions.map_keys

        df.select(map_keys($"a_map")).show
        // +---------------+
        // |map_keys(a_map)|
        // +---------------+
        // | [foo]|
        // | [foz]|
        // +---------------+



      • or map_values (2.3+)



        import org.apache.spark.sql.functions.map_values

        df.select(map_values($"a_map")).show
        // +-----------------+
        // |map_values(a_map)|
        // +-----------------+
        // | [bar]|
        // | [baz]|
        // +-----------------+


      Please check SPARK-23899 for a detailed list.




    • struct (StructType) columns using full path with dot syntax:




      • with DataFrame API



        df.select($"a_struct.x").show

        // +---+
        // | x|
        // +---+
        // | 1|
        // | 2|
        // +---+



      • with raw SQL



        sqlContext.sql("SELECT a_struct.x FROM df").show

        // +---+
        // | x|
        // +---+
        // | 1|
        // | 2|
        // +---+




    • fields inside array of structs can be accessed using dot-syntax, names and standard Column methods:



      df.select($"an_array_of_structs.foo").show

      // +----------+
      // | foo|
      // +----------+
      // |[foo, bar]|
      // |[foz, baz]|
      // +----------+

      sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show

      // +---+
      // |_c0|
      // +---+
      // |foo|
      // |foz|
      // +---+

      df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show

      // +------------------------------+
      // |an_array_of_structs.vals[1][1]|
      // +------------------------------+
      // | 4.0|
      // | 8.0|
      // +------------------------------+


    • user defined types (UDTs) fields can be accessed using UDFs. See SparkSQL referencing attributes of UDT for details.


    Notes:



    • depending on a Spark version some of these methods can be available only with HiveContext. UDFs should work independent of version with both standard SQLContext and HiveContext.


    • generally speaking nested values are a second class citizens. Not all typical operations are supported on nested fields. Depending on a context it could be better to flatten the schema and / or explode collections



      df.select(explode($"an_array_of_structs")).show

      // +--------------------+
      // | col|
      // +--------------------+
      // |[foo,1,WrappedArr...|
      // |[bar,2,WrappedArr...|
      // |[foz,3,WrappedArr...|
      // |[baz,4,WrappedArr...|
      // +--------------------+



    • Dot syntax can be combined with wildcard character (*) to select (possibly multiple) fields without specifying names explicitly:



      df.select($"a_struct.*").show
      // +---+
      // | x|
      // +---+
      // | 1|
      // | 2|
      // +---+


    • JSON columns can be queried using get_json_object and from_json functions. See How to query JSON data column using Spark DataFrames? for details.






    share|improve this answer






















    • Is it possible to fetch all the elements in a struct array? Is something like this possible.. sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
      – user1384205
      Jun 6 '16 at 15:39






    • 3




      This should be the accepted answer.
      – Patrick McGloin
      Dec 16 '16 at 8:27










    • How to do the same thing as SELECT an_array_of_structs[0].foo FROM df using code not spark sql? And is it supported to execute a UDF on an array of structs column(an_array_of_structs) using code? Like SELECT max(an_array_of_structs.bar) FROM df using code.
      – DeepNightTwo
      Mar 5 at 3:44














    up vote
    103
    down vote



    accepted










    It depends on a type of the column. Lets start with some dummy data:



    import org.apache.spark.sql.functions.udf, lit
    import scala.util.Try

    case class SubRecord(x: Int)
    case class ArrayElement(foo: String, bar: Int, vals: Array[Double])
    case class Record(
    an_array: Array[Int], a_map: Map[String, String],
    a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])


    val df = sc.parallelize(Seq(
    Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1),
    Array(
    ArrayElement("foo", 1, Array(1.0, 2.0, 2.0)),
    ArrayElement("bar", 2, Array(3.0, 4.0, 5.0)))),
    Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2),
    Array(ArrayElement("foz", 3, Array(5.0, 6.0)),
    ArrayElement("baz", 4, Array(7.0, 8.0))))
    )).toDF




    df.registerTempTable("df")
    df.printSchema

    // root
    // |-- an_array: array (nullable = true)
    // | |-- element: integer (containsNull = false)
    // |-- a_map: map (nullable = true)
    // | |-- key: string
    // | |-- value: string (valueContainsNull = true)
    // |-- a_struct: struct (nullable = true)
    // | |-- x: integer (nullable = false)
    // |-- an_array_of_structs: array (nullable = true)
    // | |-- element: struct (containsNull = true)
    // | | |-- foo: string (nullable = true)
    // | | |-- bar: integer (nullable = false)
    // | | |-- vals: array (nullable = true)
    // | | | |-- element: double (containsNull = false)



    • array (ArrayType) columns:




      • Column.getItem method



        df.select($"an_array".getItem(1)).show

        // +-----------+
        // |an_array[1]|
        // +-----------+
        // | 2|
        // | 5|
        // +-----------+



      • Hive brackets syntax:



        sqlContext.sql("SELECT an_array[1] FROM df").show

        // +---+
        // |_c0|
        // +---+
        // | 2|
        // | 5|
        // +---+



      • an UDF



        val get_ith = udf((xs: Seq[Int], i: Int) => Try(xs(i)).toOption)

        df.select(get_ith($"an_array", lit(1))).show

        // +---------------+
        // |UDF(an_array,1)|
        // +---------------+
        // | 2|
        // | 5|
        // +---------------+



      • Additionally to the methods listed above Spark supports a growing list of built-in functions operating on complex types. Notable examples include higher order functions like transform (SQL only, 2.4+):



        df.selectExpr("transform(an_array, x -> x + 1) an_array_inc").show
        // +------------+
        // |an_array_inc|
        // +------------+
        // | [2, 3, 4]|
        // | [5, 6, 7]|
        // +------------+



      • filter (SQL only, 2.4+)



        df.selectExpr("filter(an_array, x -> x % 2 == 0) an_array_even").show
        // +-------------+
        // |an_array_even|
        // +-------------+
        // | [2]|
        // | [4, 6]|
        // +-------------+



      • aggregate (SQL only, 2.4+):



        df.selectExpr("aggregate(an_array, 0, (acc, x) -> acc + x, acc -> acc) an_array_sum").show
        // +------------+
        // |an_array_sum|
        // +------------+
        // | 6|
        // | 15|
        // +------------+



      • array processing functions (array_*) like array_distinct (2.4+):



        import org.apache.spark.sql.functions.array_distinct

        df.select(array_distinct($"an_array_of_structs.vals"(0))).show
        // +-------------------------------------------+
        // |array_distinct(an_array_of_structs.vals[0])|
        // +-------------------------------------------+
        // | [1.0, 2.0]|
        // | [5.0, 6.0]|
        // +-------------------------------------------+



      • array_max (array_min, 2.4+):



        import org.apache.spark.sql.functions.array_max

        df.select(array_max($"an_array")).show
        // +-------------------+
        // |array_max(an_array)|
        // +-------------------+
        // | 3|
        // | 6|
        // +-------------------+



      • flatten (2.4+)



        import org.apache.spark.sql.functions.flatten

        df.select(flatten($"an_array_of_structs.vals")).show
        // +---------------------------------+
        // |flatten(an_array_of_structs.vals)|
        // +---------------------------------+
        // | [1.0, 2.0, 2.0, 3...|
        // | [5.0, 6.0, 7.0, 8.0]|
        // +---------------------------------+



      • arrays_zip (2.4+):



        import org.apache.spark.sql.functions.arrays_zip

        df.select(arrays_zip($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show(false)
        // +--------------------------------------------------------------------+
        // |arrays_zip(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
        // +--------------------------------------------------------------------+
        // |[[1.0, 3.0], [2.0, 4.0], [2.0, 5.0]] |
        // |[[5.0, 7.0], [6.0, 8.0]] |
        // +--------------------------------------------------------------------+



      • array_union (2.4+):



        import org.apache.spark.sql.functions.array_union

        df.select(array_union($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show
        // +---------------------------------------------------------------------+
        // |array_union(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
        // +---------------------------------------------------------------------+
        // | [1.0, 2.0, 3.0, 4...|
        // | [5.0, 6.0, 7.0, 8.0]|
        // +---------------------------------------------------------------------+



      • slice (2.4+):



        import org.apache.spark.sql.functions.slice

        df.select(slice($"an_array", 2, 2)).show
        // +---------------------+
        // |slice(an_array, 2, 2)|
        // +---------------------+
        // | [2, 3]|
        // | [5, 6]|
        // +---------------------+




    • map (MapType) columns




      • using Column.getField method:



        df.select($"a_map".getField("foo")).show

        // +----------+
        // |a_map[foo]|
        // +----------+
        // | bar|
        // | null|
        // +----------+



      • using Hive brackets syntax:



        sqlContext.sql("SELECT a_map['foz'] FROM df").show

        // +----+
        // | _c0|
        // +----+
        // |null|
        // | baz|
        // +----+



      • using a full path with dot syntax:



        df.select($"a_map.foo").show

        // +----+
        // | foo|
        // +----+
        // | bar|
        // |null|
        // +----+



      • using an UDF



        val get_field = udf((kvs: Map[String, String], k: String) => kvs.get(k))

        df.select(get_field($"a_map", lit("foo"))).show

        // +--------------+
        // |UDF(a_map,foo)|
        // +--------------+
        // | bar|
        // | null|
        // +--------------+



      • Growing number of map_* functions like map_keys (2.3+)



        import org.apache.spark.sql.functions.map_keys

        df.select(map_keys($"a_map")).show
        // +---------------+
        // |map_keys(a_map)|
        // +---------------+
        // | [foo]|
        // | [foz]|
        // +---------------+



      • or map_values (2.3+)



        import org.apache.spark.sql.functions.map_values

        df.select(map_values($"a_map")).show
        // +-----------------+
        // |map_values(a_map)|
        // +-----------------+
        // | [bar]|
        // | [baz]|
        // +-----------------+


      Please check SPARK-23899 for a detailed list.




    • struct (StructType) columns using full path with dot syntax:




      • with DataFrame API



        df.select($"a_struct.x").show

        // +---+
        // | x|
        // +---+
        // | 1|
        // | 2|
        // +---+



      • with raw SQL



        sqlContext.sql("SELECT a_struct.x FROM df").show

        // +---+
        // | x|
        // +---+
        // | 1|
        // | 2|
        // +---+




    • fields inside array of structs can be accessed using dot-syntax, names and standard Column methods:



      df.select($"an_array_of_structs.foo").show

      // +----------+
      // | foo|
      // +----------+
      // |[foo, bar]|
      // |[foz, baz]|
      // +----------+

      sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show

      // +---+
      // |_c0|
      // +---+
      // |foo|
      // |foz|
      // +---+

      df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show

      // +------------------------------+
      // |an_array_of_structs.vals[1][1]|
      // +------------------------------+
      // | 4.0|
      // | 8.0|
      // +------------------------------+


    • user defined types (UDTs) fields can be accessed using UDFs. See SparkSQL referencing attributes of UDT for details.


    Notes:



    • depending on a Spark version some of these methods can be available only with HiveContext. UDFs should work independent of version with both standard SQLContext and HiveContext.


    • generally speaking nested values are a second class citizens. Not all typical operations are supported on nested fields. Depending on a context it could be better to flatten the schema and / or explode collections



      df.select(explode($"an_array_of_structs")).show

      // +--------------------+
      // | col|
      // +--------------------+
      // |[foo,1,WrappedArr...|
      // |[bar,2,WrappedArr...|
      // |[foz,3,WrappedArr...|
      // |[baz,4,WrappedArr...|
      // +--------------------+



    • Dot syntax can be combined with wildcard character (*) to select (possibly multiple) fields without specifying names explicitly:



      df.select($"a_struct.*").show
      // +---+
      // | x|
      // +---+
      // | 1|
      // | 2|
      // +---+


    • JSON columns can be queried using get_json_object and from_json functions. See How to query JSON data column using Spark DataFrames? for details.






    share|improve this answer






















    • Is it possible to fetch all the elements in a struct array? Is something like this possible.. sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
      – user1384205
      Jun 6 '16 at 15:39






    • 3




      This should be the accepted answer.
      – Patrick McGloin
      Dec 16 '16 at 8:27










    • How to do the same thing as SELECT an_array_of_structs[0].foo FROM df using code not spark sql? And is it supported to execute a UDF on an array of structs column(an_array_of_structs) using code? Like SELECT max(an_array_of_structs.bar) FROM df using code.
      – DeepNightTwo
      Mar 5 at 3:44












    up vote
    103
    down vote



    accepted







    up vote
    103
    down vote



    accepted






    It depends on a type of the column. Lets start with some dummy data:



    import org.apache.spark.sql.functions.udf, lit
    import scala.util.Try

    case class SubRecord(x: Int)
    case class ArrayElement(foo: String, bar: Int, vals: Array[Double])
    case class Record(
    an_array: Array[Int], a_map: Map[String, String],
    a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])


    val df = sc.parallelize(Seq(
    Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1),
    Array(
    ArrayElement("foo", 1, Array(1.0, 2.0, 2.0)),
    ArrayElement("bar", 2, Array(3.0, 4.0, 5.0)))),
    Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2),
    Array(ArrayElement("foz", 3, Array(5.0, 6.0)),
    ArrayElement("baz", 4, Array(7.0, 8.0))))
    )).toDF




    df.registerTempTable("df")
    df.printSchema

    // root
    // |-- an_array: array (nullable = true)
    // | |-- element: integer (containsNull = false)
    // |-- a_map: map (nullable = true)
    // | |-- key: string
    // | |-- value: string (valueContainsNull = true)
    // |-- a_struct: struct (nullable = true)
    // | |-- x: integer (nullable = false)
    // |-- an_array_of_structs: array (nullable = true)
    // | |-- element: struct (containsNull = true)
    // | | |-- foo: string (nullable = true)
    // | | |-- bar: integer (nullable = false)
    // | | |-- vals: array (nullable = true)
    // | | | |-- element: double (containsNull = false)



    • array (ArrayType) columns:




      • Column.getItem method



        df.select($"an_array".getItem(1)).show

        // +-----------+
        // |an_array[1]|
        // +-----------+
        // | 2|
        // | 5|
        // +-----------+



      • Hive brackets syntax:



        sqlContext.sql("SELECT an_array[1] FROM df").show

        // +---+
        // |_c0|
        // +---+
        // | 2|
        // | 5|
        // +---+



      • an UDF



        val get_ith = udf((xs: Seq[Int], i: Int) => Try(xs(i)).toOption)

        df.select(get_ith($"an_array", lit(1))).show

        // +---------------+
        // |UDF(an_array,1)|
        // +---------------+
        // | 2|
        // | 5|
        // +---------------+



      • Additionally to the methods listed above Spark supports a growing list of built-in functions operating on complex types. Notable examples include higher order functions like transform (SQL only, 2.4+):



        df.selectExpr("transform(an_array, x -> x + 1) an_array_inc").show
        // +------------+
        // |an_array_inc|
        // +------------+
        // | [2, 3, 4]|
        // | [5, 6, 7]|
        // +------------+



      • filter (SQL only, 2.4+)



        df.selectExpr("filter(an_array, x -> x % 2 == 0) an_array_even").show
        // +-------------+
        // |an_array_even|
        // +-------------+
        // | [2]|
        // | [4, 6]|
        // +-------------+



      • aggregate (SQL only, 2.4+):



        df.selectExpr("aggregate(an_array, 0, (acc, x) -> acc + x, acc -> acc) an_array_sum").show
        // +------------+
        // |an_array_sum|
        // +------------+
        // | 6|
        // | 15|
        // +------------+



      • array processing functions (array_*) like array_distinct (2.4+):



        import org.apache.spark.sql.functions.array_distinct

        df.select(array_distinct($"an_array_of_structs.vals"(0))).show
        // +-------------------------------------------+
        // |array_distinct(an_array_of_structs.vals[0])|
        // +-------------------------------------------+
        // | [1.0, 2.0]|
        // | [5.0, 6.0]|
        // +-------------------------------------------+



      • array_max (array_min, 2.4+):



        import org.apache.spark.sql.functions.array_max

        df.select(array_max($"an_array")).show
        // +-------------------+
        // |array_max(an_array)|
        // +-------------------+
        // | 3|
        // | 6|
        // +-------------------+



      • flatten (2.4+)



        import org.apache.spark.sql.functions.flatten

        df.select(flatten($"an_array_of_structs.vals")).show
        // +---------------------------------+
        // |flatten(an_array_of_structs.vals)|
        // +---------------------------------+
        // | [1.0, 2.0, 2.0, 3...|
        // | [5.0, 6.0, 7.0, 8.0]|
        // +---------------------------------+



      • arrays_zip (2.4+):



        import org.apache.spark.sql.functions.arrays_zip

        df.select(arrays_zip($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show(false)
        // +--------------------------------------------------------------------+
        // |arrays_zip(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
        // +--------------------------------------------------------------------+
        // |[[1.0, 3.0], [2.0, 4.0], [2.0, 5.0]] |
        // |[[5.0, 7.0], [6.0, 8.0]] |
        // +--------------------------------------------------------------------+



      • array_union (2.4+):



        import org.apache.spark.sql.functions.array_union

        df.select(array_union($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show
        // +---------------------------------------------------------------------+
        // |array_union(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
        // +---------------------------------------------------------------------+
        // | [1.0, 2.0, 3.0, 4...|
        // | [5.0, 6.0, 7.0, 8.0]|
        // +---------------------------------------------------------------------+



      • slice (2.4+):



        import org.apache.spark.sql.functions.slice

        df.select(slice($"an_array", 2, 2)).show
        // +---------------------+
        // |slice(an_array, 2, 2)|
        // +---------------------+
        // | [2, 3]|
        // | [5, 6]|
        // +---------------------+




    • map (MapType) columns




      • using Column.getField method:



        df.select($"a_map".getField("foo")).show

        // +----------+
        // |a_map[foo]|
        // +----------+
        // | bar|
        // | null|
        // +----------+



      • using Hive brackets syntax:



        sqlContext.sql("SELECT a_map['foz'] FROM df").show

        // +----+
        // | _c0|
        // +----+
        // |null|
        // | baz|
        // +----+



      • using a full path with dot syntax:



        df.select($"a_map.foo").show

        // +----+
        // | foo|
        // +----+
        // | bar|
        // |null|
        // +----+



      • using an UDF



        val get_field = udf((kvs: Map[String, String], k: String) => kvs.get(k))

        df.select(get_field($"a_map", lit("foo"))).show

        // +--------------+
        // |UDF(a_map,foo)|
        // +--------------+
        // | bar|
        // | null|
        // +--------------+



      • Growing number of map_* functions like map_keys (2.3+)



        import org.apache.spark.sql.functions.map_keys

        df.select(map_keys($"a_map")).show
        // +---------------+
        // |map_keys(a_map)|
        // +---------------+
        // | [foo]|
        // | [foz]|
        // +---------------+



      • or map_values (2.3+)



        import org.apache.spark.sql.functions.map_values

        df.select(map_values($"a_map")).show
        // +-----------------+
        // |map_values(a_map)|
        // +-----------------+
        // | [bar]|
        // | [baz]|
        // +-----------------+


      Please check SPARK-23899 for a detailed list.




    • struct (StructType) columns using full path with dot syntax:




      • with DataFrame API



        df.select($"a_struct.x").show

        // +---+
        // | x|
        // +---+
        // | 1|
        // | 2|
        // +---+



      • with raw SQL



        sqlContext.sql("SELECT a_struct.x FROM df").show

        // +---+
        // | x|
        // +---+
        // | 1|
        // | 2|
        // +---+




    • fields inside array of structs can be accessed using dot-syntax, names and standard Column methods:



      df.select($"an_array_of_structs.foo").show

      // +----------+
      // | foo|
      // +----------+
      // |[foo, bar]|
      // |[foz, baz]|
      // +----------+

      sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show

      // +---+
      // |_c0|
      // +---+
      // |foo|
      // |foz|
      // +---+

      df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show

      // +------------------------------+
      // |an_array_of_structs.vals[1][1]|
      // +------------------------------+
      // | 4.0|
      // | 8.0|
      // +------------------------------+


    • user defined types (UDTs) fields can be accessed using UDFs. See SparkSQL referencing attributes of UDT for details.


    Notes:



    • depending on a Spark version some of these methods can be available only with HiveContext. UDFs should work independent of version with both standard SQLContext and HiveContext.


    • generally speaking nested values are a second class citizens. Not all typical operations are supported on nested fields. Depending on a context it could be better to flatten the schema and / or explode collections



      df.select(explode($"an_array_of_structs")).show

      // +--------------------+
      // | col|
      // +--------------------+
      // |[foo,1,WrappedArr...|
      // |[bar,2,WrappedArr...|
      // |[foz,3,WrappedArr...|
      // |[baz,4,WrappedArr...|
      // +--------------------+



    • Dot syntax can be combined with wildcard character (*) to select (possibly multiple) fields without specifying names explicitly:



      df.select($"a_struct.*").show
      // +---+
      // | x|
      // +---+
      // | 1|
      // | 2|
      // +---+


    • JSON columns can be queried using get_json_object and from_json functions. See How to query JSON data column using Spark DataFrames? for details.






    share|improve this answer














    It depends on a type of the column. Lets start with some dummy data:



    import org.apache.spark.sql.functions.udf, lit
    import scala.util.Try

    case class SubRecord(x: Int)
    case class ArrayElement(foo: String, bar: Int, vals: Array[Double])
    case class Record(
    an_array: Array[Int], a_map: Map[String, String],
    a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])


    val df = sc.parallelize(Seq(
    Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1),
    Array(
    ArrayElement("foo", 1, Array(1.0, 2.0, 2.0)),
    ArrayElement("bar", 2, Array(3.0, 4.0, 5.0)))),
    Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2),
    Array(ArrayElement("foz", 3, Array(5.0, 6.0)),
    ArrayElement("baz", 4, Array(7.0, 8.0))))
    )).toDF




    df.registerTempTable("df")
    df.printSchema

    // root
    // |-- an_array: array (nullable = true)
    // | |-- element: integer (containsNull = false)
    // |-- a_map: map (nullable = true)
    // | |-- key: string
    // | |-- value: string (valueContainsNull = true)
    // |-- a_struct: struct (nullable = true)
    // | |-- x: integer (nullable = false)
    // |-- an_array_of_structs: array (nullable = true)
    // | |-- element: struct (containsNull = true)
    // | | |-- foo: string (nullable = true)
    // | | |-- bar: integer (nullable = false)
    // | | |-- vals: array (nullable = true)
    // | | | |-- element: double (containsNull = false)



    • array (ArrayType) columns:




      • Column.getItem method



        df.select($"an_array".getItem(1)).show

        // +-----------+
        // |an_array[1]|
        // +-----------+
        // | 2|
        // | 5|
        // +-----------+



      • Hive brackets syntax:



        sqlContext.sql("SELECT an_array[1] FROM df").show

        // +---+
        // |_c0|
        // +---+
        // | 2|
        // | 5|
        // +---+



      • an UDF



        val get_ith = udf((xs: Seq[Int], i: Int) => Try(xs(i)).toOption)

        df.select(get_ith($"an_array", lit(1))).show

        // +---------------+
        // |UDF(an_array,1)|
        // +---------------+
        // | 2|
        // | 5|
        // +---------------+



      • Additionally to the methods listed above Spark supports a growing list of built-in functions operating on complex types. Notable examples include higher order functions like transform (SQL only, 2.4+):



        df.selectExpr("transform(an_array, x -> x + 1) an_array_inc").show
        // +------------+
        // |an_array_inc|
        // +------------+
        // | [2, 3, 4]|
        // | [5, 6, 7]|
        // +------------+



      • filter (SQL only, 2.4+)



        df.selectExpr("filter(an_array, x -> x % 2 == 0) an_array_even").show
        // +-------------+
        // |an_array_even|
        // +-------------+
        // | [2]|
        // | [4, 6]|
        // +-------------+



      • aggregate (SQL only, 2.4+):



        df.selectExpr("aggregate(an_array, 0, (acc, x) -> acc + x, acc -> acc) an_array_sum").show
        // +------------+
        // |an_array_sum|
        // +------------+
        // | 6|
        // | 15|
        // +------------+



      • array processing functions (array_*) like array_distinct (2.4+):



        import org.apache.spark.sql.functions.array_distinct

        df.select(array_distinct($"an_array_of_structs.vals"(0))).show
        // +-------------------------------------------+
        // |array_distinct(an_array_of_structs.vals[0])|
        // +-------------------------------------------+
        // | [1.0, 2.0]|
        // | [5.0, 6.0]|
        // +-------------------------------------------+



      • array_max (array_min, 2.4+):



        import org.apache.spark.sql.functions.array_max

        df.select(array_max($"an_array")).show
        // +-------------------+
        // |array_max(an_array)|
        // +-------------------+
        // | 3|
        // | 6|
        // +-------------------+



      • flatten (2.4+)



        import org.apache.spark.sql.functions.flatten

        df.select(flatten($"an_array_of_structs.vals")).show
        // +---------------------------------+
        // |flatten(an_array_of_structs.vals)|
        // +---------------------------------+
        // | [1.0, 2.0, 2.0, 3...|
        // | [5.0, 6.0, 7.0, 8.0]|
        // +---------------------------------+



      • arrays_zip (2.4+):



        import org.apache.spark.sql.functions.arrays_zip

        df.select(arrays_zip($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show(false)
        // +--------------------------------------------------------------------+
        // |arrays_zip(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
        // +--------------------------------------------------------------------+
        // |[[1.0, 3.0], [2.0, 4.0], [2.0, 5.0]] |
        // |[[5.0, 7.0], [6.0, 8.0]] |
        // +--------------------------------------------------------------------+



      • array_union (2.4+):



        import org.apache.spark.sql.functions.array_union

        df.select(array_union($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show
        // +---------------------------------------------------------------------+
        // |array_union(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
        // +---------------------------------------------------------------------+
        // | [1.0, 2.0, 3.0, 4...|
        // | [5.0, 6.0, 7.0, 8.0]|
        // +---------------------------------------------------------------------+



      • slice (2.4+):



        import org.apache.spark.sql.functions.slice

        df.select(slice($"an_array", 2, 2)).show
        // +---------------------+
        // |slice(an_array, 2, 2)|
        // +---------------------+
        // | [2, 3]|
        // | [5, 6]|
        // +---------------------+




    • map (MapType) columns




      • using Column.getField method:



        df.select($"a_map".getField("foo")).show

        // +----------+
        // |a_map[foo]|
        // +----------+
        // | bar|
        // | null|
        // +----------+



      • using Hive brackets syntax:



        sqlContext.sql("SELECT a_map['foz'] FROM df").show

        // +----+
        // | _c0|
        // +----+
        // |null|
        // | baz|
        // +----+



      • using a full path with dot syntax:



        df.select($"a_map.foo").show

        // +----+
        // | foo|
        // +----+
        // | bar|
        // |null|
        // +----+



      • using an UDF



        val get_field = udf((kvs: Map[String, String], k: String) => kvs.get(k))

        df.select(get_field($"a_map", lit("foo"))).show

        // +--------------+
        // |UDF(a_map,foo)|
        // +--------------+
        // | bar|
        // | null|
        // +--------------+



      • Growing number of map_* functions like map_keys (2.3+)



        import org.apache.spark.sql.functions.map_keys

        df.select(map_keys($"a_map")).show
        // +---------------+
        // |map_keys(a_map)|
        // +---------------+
        // | [foo]|
        // | [foz]|
        // +---------------+



      • or map_values (2.3+)



        import org.apache.spark.sql.functions.map_values

        df.select(map_values($"a_map")).show
        // +-----------------+
        // |map_values(a_map)|
        // +-----------------+
        // | [bar]|
        // | [baz]|
        // +-----------------+


      Please check SPARK-23899 for a detailed list.




    • struct (StructType) columns using full path with dot syntax:




      • with DataFrame API



        df.select($"a_struct.x").show

        // +---+
        // | x|
        // +---+
        // | 1|
        // | 2|
        // +---+



      • with raw SQL



        sqlContext.sql("SELECT a_struct.x FROM df").show

        // +---+
        // | x|
        // +---+
        // | 1|
        // | 2|
        // +---+




    • fields inside array of structs can be accessed using dot-syntax, names and standard Column methods:



      df.select($"an_array_of_structs.foo").show

      // +----------+
      // | foo|
      // +----------+
      // |[foo, bar]|
      // |[foz, baz]|
      // +----------+

      sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show

      // +---+
      // |_c0|
      // +---+
      // |foo|
      // |foz|
      // +---+

      df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show

      // +------------------------------+
      // |an_array_of_structs.vals[1][1]|
      // +------------------------------+
      // | 4.0|
      // | 8.0|
      // +------------------------------+


    • user defined types (UDTs) fields can be accessed using UDFs. See SparkSQL referencing attributes of UDT for details.


    Notes:



    • depending on a Spark version some of these methods can be available only with HiveContext. UDFs should work independent of version with both standard SQLContext and HiveContext.


    • generally speaking nested values are a second class citizens. Not all typical operations are supported on nested fields. Depending on a context it could be better to flatten the schema and / or explode collections



      df.select(explode($"an_array_of_structs")).show

      // +--------------------+
      // | col|
      // +--------------------+
      // |[foo,1,WrappedArr...|
      // |[bar,2,WrappedArr...|
      // |[foz,3,WrappedArr...|
      // |[baz,4,WrappedArr...|
      // +--------------------+



    • Dot syntax can be combined with wildcard character (*) to select (possibly multiple) fields without specifying names explicitly:



      df.select($"a_struct.*").show
      // +---+
      // | x|
      // +---+
      // | 1|
      // | 2|
      // +---+


    • JSON columns can be queried using get_json_object and from_json functions. See How to query JSON data column using Spark DataFrames? for details.







    share|improve this answer














    share|improve this answer



    share|improve this answer








    edited Oct 24 at 3:06









    Community

    11




    11










    answered Nov 22 '15 at 1:03









    zero323

    160k39460562




    160k39460562











    • Is it possible to fetch all the elements in a struct array? Is something like this possible.. sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
      – user1384205
      Jun 6 '16 at 15:39






    • 3




      This should be the accepted answer.
      – Patrick McGloin
      Dec 16 '16 at 8:27










    • How to do the same thing as SELECT an_array_of_structs[0].foo FROM df using code not spark sql? And is it supported to execute a UDF on an array of structs column(an_array_of_structs) using code? Like SELECT max(an_array_of_structs.bar) FROM df using code.
      – DeepNightTwo
      Mar 5 at 3:44
















    • Is it possible to fetch all the elements in a struct array? Is something like this possible.. sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
      – user1384205
      Jun 6 '16 at 15:39






    • 3




      This should be the accepted answer.
      – Patrick McGloin
      Dec 16 '16 at 8:27










    • How to do the same thing as SELECT an_array_of_structs[0].foo FROM df using code not spark sql? And is it supported to execute a UDF on an array of structs column(an_array_of_structs) using code? Like SELECT max(an_array_of_structs.bar) FROM df using code.
      – DeepNightTwo
      Mar 5 at 3:44















    Is it possible to fetch all the elements in a struct array? Is something like this possible.. sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
    – user1384205
    Jun 6 '16 at 15:39




    Is it possible to fetch all the elements in a struct array? Is something like this possible.. sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
    – user1384205
    Jun 6 '16 at 15:39




    3




    3




    This should be the accepted answer.
    – Patrick McGloin
    Dec 16 '16 at 8:27




    This should be the accepted answer.
    – Patrick McGloin
    Dec 16 '16 at 8:27












    How to do the same thing as SELECT an_array_of_structs[0].foo FROM df using code not spark sql? And is it supported to execute a UDF on an array of structs column(an_array_of_structs) using code? Like SELECT max(an_array_of_structs.bar) FROM df using code.
    – DeepNightTwo
    Mar 5 at 3:44




    How to do the same thing as SELECT an_array_of_structs[0].foo FROM df using code not spark sql? And is it supported to execute a UDF on an array of structs column(an_array_of_structs) using code? Like SELECT max(an_array_of_structs.bar) FROM df using code.
    – DeepNightTwo
    Mar 5 at 3:44












    up vote
    2
    down vote













    Once You convert it to DF, u can simply fetch data as



     val rddRow= rdd.map(kv=>
    val k = kv._1
    val v = kv._2
    Row(k, v)
    )

    val myFld1 = StructField("name", org.apache.spark.sql.types.StringType, true)
    val myFld2 = StructField("map", org.apache.spark.sql.types.MapType(StringType, StringType), true)
    val arr = Array( myFld1, myFld2)
    val schema = StructType( arr )
    val rowrddDF = sqc.createDataFrame(rddRow, schema)
    rowrddDF.registerTempTable("rowtbl")
    val rowrddDFFinal = rowrddDF.select(rowrddDF("map.one"))
    or
    val rowrddDFFinal = rowrddDF.select("map.one")





    share|improve this answer




















    • when I try this I get error: value _1 is not a member of org.apache.spark.sql.Row
      – Paul
      Nov 23 '17 at 17:32














    up vote
    2
    down vote













    Once You convert it to DF, u can simply fetch data as



     val rddRow= rdd.map(kv=>
    val k = kv._1
    val v = kv._2
    Row(k, v)
    )

    val myFld1 = StructField("name", org.apache.spark.sql.types.StringType, true)
    val myFld2 = StructField("map", org.apache.spark.sql.types.MapType(StringType, StringType), true)
    val arr = Array( myFld1, myFld2)
    val schema = StructType( arr )
    val rowrddDF = sqc.createDataFrame(rddRow, schema)
    rowrddDF.registerTempTable("rowtbl")
    val rowrddDFFinal = rowrddDF.select(rowrddDF("map.one"))
    or
    val rowrddDFFinal = rowrddDF.select("map.one")





    share|improve this answer




















    • when I try this I get error: value _1 is not a member of org.apache.spark.sql.Row
      – Paul
      Nov 23 '17 at 17:32












    up vote
    2
    down vote










    up vote
    2
    down vote









    Once You convert it to DF, u can simply fetch data as



     val rddRow= rdd.map(kv=>
    val k = kv._1
    val v = kv._2
    Row(k, v)
    )

    val myFld1 = StructField("name", org.apache.spark.sql.types.StringType, true)
    val myFld2 = StructField("map", org.apache.spark.sql.types.MapType(StringType, StringType), true)
    val arr = Array( myFld1, myFld2)
    val schema = StructType( arr )
    val rowrddDF = sqc.createDataFrame(rddRow, schema)
    rowrddDF.registerTempTable("rowtbl")
    val rowrddDFFinal = rowrddDF.select(rowrddDF("map.one"))
    or
    val rowrddDFFinal = rowrddDF.select("map.one")





    share|improve this answer












    Once You convert it to DF, u can simply fetch data as



     val rddRow= rdd.map(kv=>
    val k = kv._1
    val v = kv._2
    Row(k, v)
    )

    val myFld1 = StructField("name", org.apache.spark.sql.types.StringType, true)
    val myFld2 = StructField("map", org.apache.spark.sql.types.MapType(StringType, StringType), true)
    val arr = Array( myFld1, myFld2)
    val schema = StructType( arr )
    val rowrddDF = sqc.createDataFrame(rddRow, schema)
    rowrddDF.registerTempTable("rowtbl")
    val rowrddDFFinal = rowrddDF.select(rowrddDF("map.one"))
    or
    val rowrddDFFinal = rowrddDF.select("map.one")






    share|improve this answer












    share|improve this answer



    share|improve this answer










    answered Sep 16 '15 at 1:24









    sshroff

    3712712




    3712712











    • when I try this I get error: value _1 is not a member of org.apache.spark.sql.Row
      – Paul
      Nov 23 '17 at 17:32
















    • when I try this I get error: value _1 is not a member of org.apache.spark.sql.Row
      – Paul
      Nov 23 '17 at 17:32















    when I try this I get error: value _1 is not a member of org.apache.spark.sql.Row
    – Paul
    Nov 23 '17 at 17:32




    when I try this I get error: value _1 is not a member of org.apache.spark.sql.Row
    – Paul
    Nov 23 '17 at 17:32










    up vote
    0
    down vote













    here was what I did and it worked



    case class Test(name: String, m: Map[String, String])
    val map = Map("hello" -> "world", "hey" -> "there")
    val map2 = Map("hello" -> "people", "hey" -> "you")
    val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
    val rdddf = rdd.toDF
    rdddf.registerTempTable("mytable")
    sqlContext.sql("select m.hello from mytable").show


    Results



    +------+
    | hello|
    +------+
    | world|
    |people|
    +------+





    share|improve this answer


























      up vote
      0
      down vote













      here was what I did and it worked



      case class Test(name: String, m: Map[String, String])
      val map = Map("hello" -> "world", "hey" -> "there")
      val map2 = Map("hello" -> "people", "hey" -> "you")
      val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
      val rdddf = rdd.toDF
      rdddf.registerTempTable("mytable")
      sqlContext.sql("select m.hello from mytable").show


      Results



      +------+
      | hello|
      +------+
      | world|
      |people|
      +------+





      share|improve this answer
























        up vote
        0
        down vote










        up vote
        0
        down vote









        here was what I did and it worked



        case class Test(name: String, m: Map[String, String])
        val map = Map("hello" -> "world", "hey" -> "there")
        val map2 = Map("hello" -> "people", "hey" -> "you")
        val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
        val rdddf = rdd.toDF
        rdddf.registerTempTable("mytable")
        sqlContext.sql("select m.hello from mytable").show


        Results



        +------+
        | hello|
        +------+
        | world|
        |people|
        +------+





        share|improve this answer














        here was what I did and it worked



        case class Test(name: String, m: Map[String, String])
        val map = Map("hello" -> "world", "hey" -> "there")
        val map2 = Map("hello" -> "people", "hey" -> "you")
        val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
        val rdddf = rdd.toDF
        rdddf.registerTempTable("mytable")
        sqlContext.sql("select m.hello from mytable").show


        Results



        +------+
        | hello|
        +------+
        | world|
        |people|
        +------+






        share|improve this answer














        share|improve this answer



        share|improve this answer








        edited Jun 16 '16 at 15:56









        zero323

        160k39460562




        160k39460562










        answered Jun 6 '16 at 18:00









        Sumit Pal

        21948




        21948



























             

            draft saved


            draft discarded















































             


            draft saved


            draft discarded














            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f28332494%2fquerying-spark-sql-dataframe-with-complex-types%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            這個網誌中的熱門文章

            How to read a connectionString WITH PROVIDER in .NET Core?

            Node.js Script on GitHub Pages or Amazon S3

            Museum of Modern and Contemporary Art of Trento and Rovereto