PySpark sum of last values by ID over timeseries window










0















I have this DataFrame in PySpark:



[Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 6095), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 15215), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 25456), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 35641), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 44516), timestamp=1532354662),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 106098), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 108248), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 118453), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 129638), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 138515), timestamp=1532354662),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 206095), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 215213), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 225445), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 234635), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 244514), timestamp=1532354662),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 306095), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 309226), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 319454), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 329651), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 337523), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 406077), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 415209), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 425481), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 435638), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 445548), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 506073), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 508245), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 519452), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 529641), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 537512), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 606087), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 615193), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 625452), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 635632), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 645538), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 706073), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 709212), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 718452), timestamp=1532354662),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 729642), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 738524), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 806095), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 815210), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 825455), timestamp=1532354662),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 834640), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 844520), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 906083), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 908243), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 918445), timestamp=1532354662),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 929632), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 938511), timestamp=1532354662)]


I need to sum only last known value for each id in second window.



Outpt should look like this:



 [Row(time=datetime.datetime(2018, 7, 23, 14, 4, 22), sum=176213),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 23), sum=176112),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 24), sum=175933),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 25), sum=175543),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 26), sum=175219),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 27), sum=175002),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 28), sum=174892)...]


I have tried this:



w = Window.partitionBy(F.col('id')).orderBy('timestamp')
_df.withColumn('last_known', F.last('value').over(w)).sort('time').take(1000)


and it produces new column with last known value for each id, but I don`t know how to sum it.



[Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 6095), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 15215), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 25456), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 35641), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 44516), timestamp=1532354662, last_known=35187),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 106098), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 108248), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 118453), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 129638), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 138515), timestamp=1532354662, last_known=35187),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 206095), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 215213), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 225445), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 234635), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 244514), timestamp=1532354662, last_known=35187),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 306095), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 309226), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 319454), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 329651), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 337523), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 406077), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 415209), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 425481), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 435638), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 445548), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 506073), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 508245), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 519452), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 529641), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 537512), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 606087), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 615193), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 625452), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 635632), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 645538), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 706073), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 709212), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 718452), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 729642), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 738524), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 806095), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 815210), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 825455), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 834640), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 844520), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 906083), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 908243), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 918445), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 929632), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 938511), timestamp=1532354662, last_known=35187)]


Other solution is:



_df.orderBy('time').groupBy('timestamp', 'id').agg(F.last('value').alias('last'))
.groupBy('timestamp').agg(F.sum('last').alias('sum'))
.sort('timestamp').take(50)


Output looks promising, but double aggregate seems like asking for troubles... and it should run on TB of data, so speed is concern too.



[Row(timestamp=1532354662, sum=176142),
Row(timestamp=1532354663, sum=176142),
Row(timestamp=1532354664, sum=176139),
Row(timestamp=1532354665, sum=176137),
Row(timestamp=1532354666, sum=176133),
Row(timestamp=1532354667, sum=176128),
Row(timestamp=1532354668, sum=176125),
Row(timestamp=1532354669, sum=176122),
Row(timestamp=1532354670, sum=176120),
Row(timestamp=1532354671, sum=176118),
Row(timestamp=1532354672, sum=176117),
Row(timestamp=1532354673, sum=176114),


Any help would be much appreciated!



EDIT
Terry`s answer is best right now. If someone has better idea please post.










share|improve this question




























    0















    I have this DataFrame in PySpark:



    [Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 6095), timestamp=1532354662),
    Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 15215), timestamp=1532354662),
    Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 25456), timestamp=1532354662),
    Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 35641), timestamp=1532354662),
    Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 44516), timestamp=1532354662),
    Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 106098), timestamp=1532354662),
    Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 108248), timestamp=1532354662),
    Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 118453), timestamp=1532354662),
    Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 129638), timestamp=1532354662),
    Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 138515), timestamp=1532354662),
    Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 206095), timestamp=1532354662),
    Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 215213), timestamp=1532354662),
    Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 225445), timestamp=1532354662),
    Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 234635), timestamp=1532354662),
    Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 244514), timestamp=1532354662),
    Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 306095), timestamp=1532354662),
    Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 309226), timestamp=1532354662),
    Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 319454), timestamp=1532354662),
    Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 329651), timestamp=1532354662),
    Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 337523), timestamp=1532354662),
    Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 406077), timestamp=1532354662),
    Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 415209), timestamp=1532354662),
    Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 425481), timestamp=1532354662),
    Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 435638), timestamp=1532354662),
    Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 445548), timestamp=1532354662),
    Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 506073), timestamp=1532354662),
    Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 508245), timestamp=1532354662),
    Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 519452), timestamp=1532354662),
    Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 529641), timestamp=1532354662),
    Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 537512), timestamp=1532354662),
    Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 606087), timestamp=1532354662),
    Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 615193), timestamp=1532354662),
    Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 625452), timestamp=1532354662),
    Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 635632), timestamp=1532354662),
    Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 645538), timestamp=1532354662),
    Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 706073), timestamp=1532354662),
    Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 709212), timestamp=1532354662),
    Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 718452), timestamp=1532354662),
    Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 729642), timestamp=1532354662),
    Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 738524), timestamp=1532354662),
    Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 806095), timestamp=1532354662),
    Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 815210), timestamp=1532354662),
    Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 825455), timestamp=1532354662),
    Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 834640), timestamp=1532354662),
    Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 844520), timestamp=1532354662),
    Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 906083), timestamp=1532354662),
    Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 908243), timestamp=1532354662),
    Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 918445), timestamp=1532354662),
    Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 929632), timestamp=1532354662),
    Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 938511), timestamp=1532354662)]


    I need to sum only last known value for each id in second window.



    Outpt should look like this:



     [Row(time=datetime.datetime(2018, 7, 23, 14, 4, 22), sum=176213),
    Row(time=datetime.datetime(2018, 7, 23, 14, 4, 23), sum=176112),
    Row(time=datetime.datetime(2018, 7, 23, 14, 4, 24), sum=175933),
    Row(time=datetime.datetime(2018, 7, 23, 14, 4, 25), sum=175543),
    Row(time=datetime.datetime(2018, 7, 23, 14, 4, 26), sum=175219),
    Row(time=datetime.datetime(2018, 7, 23, 14, 4, 27), sum=175002),
    Row(time=datetime.datetime(2018, 7, 23, 14, 4, 28), sum=174892)...]


    I have tried this:



    w = Window.partitionBy(F.col('id')).orderBy('timestamp')
    _df.withColumn('last_known', F.last('value').over(w)).sort('time').take(1000)


    and it produces new column with last known value for each id, but I don`t know how to sum it.



    [Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 6095), timestamp=1532354662, last_known=35184),
    Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 15215), timestamp=1532354662, last_known=35285),
    Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 25456), timestamp=1532354662, last_known=35211),
    Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 35641), timestamp=1532354662, last_known=35275),
    Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 44516), timestamp=1532354662, last_known=35187),
    Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 106098), timestamp=1532354662, last_known=35184),
    Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 108248), timestamp=1532354662, last_known=35285),
    Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 118453), timestamp=1532354662, last_known=35211),
    Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 129638), timestamp=1532354662, last_known=35275),
    Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 138515), timestamp=1532354662, last_known=35187),
    Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 206095), timestamp=1532354662, last_known=35184),
    Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 215213), timestamp=1532354662, last_known=35285),
    Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 225445), timestamp=1532354662, last_known=35211),
    Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 234635), timestamp=1532354662, last_known=35275),
    Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 244514), timestamp=1532354662, last_known=35187),
    Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 306095), timestamp=1532354662, last_known=35184),
    Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 309226), timestamp=1532354662, last_known=35285),
    Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 319454), timestamp=1532354662, last_known=35211),
    Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 329651), timestamp=1532354662, last_known=35275),
    Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 337523), timestamp=1532354662, last_known=35187),
    Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 406077), timestamp=1532354662, last_known=35184),
    Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 415209), timestamp=1532354662, last_known=35285),
    Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 425481), timestamp=1532354662, last_known=35211),
    Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 435638), timestamp=1532354662, last_known=35275),
    Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 445548), timestamp=1532354662, last_known=35187),
    Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 506073), timestamp=1532354662, last_known=35184),
    Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 508245), timestamp=1532354662, last_known=35285),
    Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 519452), timestamp=1532354662, last_known=35211),
    Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 529641), timestamp=1532354662, last_known=35275),
    Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 537512), timestamp=1532354662, last_known=35187),
    Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 606087), timestamp=1532354662, last_known=35184),
    Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 615193), timestamp=1532354662, last_known=35285),
    Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 625452), timestamp=1532354662, last_known=35211),
    Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 635632), timestamp=1532354662, last_known=35275),
    Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 645538), timestamp=1532354662, last_known=35187),
    Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 706073), timestamp=1532354662, last_known=35184),
    Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 709212), timestamp=1532354662, last_known=35285),
    Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 718452), timestamp=1532354662, last_known=35211),
    Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 729642), timestamp=1532354662, last_known=35275),
    Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 738524), timestamp=1532354662, last_known=35187),
    Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 806095), timestamp=1532354662, last_known=35184),
    Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 815210), timestamp=1532354662, last_known=35285),
    Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 825455), timestamp=1532354662, last_known=35211),
    Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 834640), timestamp=1532354662, last_known=35275),
    Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 844520), timestamp=1532354662, last_known=35187),
    Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 906083), timestamp=1532354662, last_known=35184),
    Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 908243), timestamp=1532354662, last_known=35285),
    Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 918445), timestamp=1532354662, last_known=35211),
    Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 929632), timestamp=1532354662, last_known=35275),
    Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 938511), timestamp=1532354662, last_known=35187)]


    Other solution is:



    _df.orderBy('time').groupBy('timestamp', 'id').agg(F.last('value').alias('last'))
    .groupBy('timestamp').agg(F.sum('last').alias('sum'))
    .sort('timestamp').take(50)


    Output looks promising, but double aggregate seems like asking for troubles... and it should run on TB of data, so speed is concern too.



    [Row(timestamp=1532354662, sum=176142),
    Row(timestamp=1532354663, sum=176142),
    Row(timestamp=1532354664, sum=176139),
    Row(timestamp=1532354665, sum=176137),
    Row(timestamp=1532354666, sum=176133),
    Row(timestamp=1532354667, sum=176128),
    Row(timestamp=1532354668, sum=176125),
    Row(timestamp=1532354669, sum=176122),
    Row(timestamp=1532354670, sum=176120),
    Row(timestamp=1532354671, sum=176118),
    Row(timestamp=1532354672, sum=176117),
    Row(timestamp=1532354673, sum=176114),


    Any help would be much appreciated!



    EDIT
    Terry`s answer is best right now. If someone has better idea please post.










    share|improve this question


























      0












      0








      0








      I have this DataFrame in PySpark:



      [Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 6095), timestamp=1532354662),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 15215), timestamp=1532354662),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 25456), timestamp=1532354662),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 35641), timestamp=1532354662),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 44516), timestamp=1532354662),
      Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 106098), timestamp=1532354662),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 108248), timestamp=1532354662),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 118453), timestamp=1532354662),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 129638), timestamp=1532354662),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 138515), timestamp=1532354662),
      Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 206095), timestamp=1532354662),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 215213), timestamp=1532354662),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 225445), timestamp=1532354662),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 234635), timestamp=1532354662),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 244514), timestamp=1532354662),
      Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 306095), timestamp=1532354662),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 309226), timestamp=1532354662),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 319454), timestamp=1532354662),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 329651), timestamp=1532354662),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 337523), timestamp=1532354662),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 406077), timestamp=1532354662),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 415209), timestamp=1532354662),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 425481), timestamp=1532354662),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 435638), timestamp=1532354662),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 445548), timestamp=1532354662),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 506073), timestamp=1532354662),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 508245), timestamp=1532354662),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 519452), timestamp=1532354662),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 529641), timestamp=1532354662),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 537512), timestamp=1532354662),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 606087), timestamp=1532354662),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 615193), timestamp=1532354662),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 625452), timestamp=1532354662),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 635632), timestamp=1532354662),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 645538), timestamp=1532354662),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 706073), timestamp=1532354662),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 709212), timestamp=1532354662),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 718452), timestamp=1532354662),
      Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 729642), timestamp=1532354662),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 738524), timestamp=1532354662),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 806095), timestamp=1532354662),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 815210), timestamp=1532354662),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 825455), timestamp=1532354662),
      Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 834640), timestamp=1532354662),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 844520), timestamp=1532354662),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 906083), timestamp=1532354662),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 908243), timestamp=1532354662),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 918445), timestamp=1532354662),
      Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 929632), timestamp=1532354662),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 938511), timestamp=1532354662)]


      I need to sum only last known value for each id in second window.



      Outpt should look like this:



       [Row(time=datetime.datetime(2018, 7, 23, 14, 4, 22), sum=176213),
      Row(time=datetime.datetime(2018, 7, 23, 14, 4, 23), sum=176112),
      Row(time=datetime.datetime(2018, 7, 23, 14, 4, 24), sum=175933),
      Row(time=datetime.datetime(2018, 7, 23, 14, 4, 25), sum=175543),
      Row(time=datetime.datetime(2018, 7, 23, 14, 4, 26), sum=175219),
      Row(time=datetime.datetime(2018, 7, 23, 14, 4, 27), sum=175002),
      Row(time=datetime.datetime(2018, 7, 23, 14, 4, 28), sum=174892)...]


      I have tried this:



      w = Window.partitionBy(F.col('id')).orderBy('timestamp')
      _df.withColumn('last_known', F.last('value').over(w)).sort('time').take(1000)


      and it produces new column with last known value for each id, but I don`t know how to sum it.



      [Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 6095), timestamp=1532354662, last_known=35184),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 15215), timestamp=1532354662, last_known=35285),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 25456), timestamp=1532354662, last_known=35211),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 35641), timestamp=1532354662, last_known=35275),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 44516), timestamp=1532354662, last_known=35187),
      Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 106098), timestamp=1532354662, last_known=35184),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 108248), timestamp=1532354662, last_known=35285),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 118453), timestamp=1532354662, last_known=35211),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 129638), timestamp=1532354662, last_known=35275),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 138515), timestamp=1532354662, last_known=35187),
      Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 206095), timestamp=1532354662, last_known=35184),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 215213), timestamp=1532354662, last_known=35285),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 225445), timestamp=1532354662, last_known=35211),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 234635), timestamp=1532354662, last_known=35275),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 244514), timestamp=1532354662, last_known=35187),
      Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 306095), timestamp=1532354662, last_known=35184),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 309226), timestamp=1532354662, last_known=35285),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 319454), timestamp=1532354662, last_known=35211),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 329651), timestamp=1532354662, last_known=35275),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 337523), timestamp=1532354662, last_known=35187),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 406077), timestamp=1532354662, last_known=35184),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 415209), timestamp=1532354662, last_known=35285),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 425481), timestamp=1532354662, last_known=35211),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 435638), timestamp=1532354662, last_known=35275),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 445548), timestamp=1532354662, last_known=35187),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 506073), timestamp=1532354662, last_known=35184),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 508245), timestamp=1532354662, last_known=35285),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 519452), timestamp=1532354662, last_known=35211),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 529641), timestamp=1532354662, last_known=35275),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 537512), timestamp=1532354662, last_known=35187),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 606087), timestamp=1532354662, last_known=35184),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 615193), timestamp=1532354662, last_known=35285),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 625452), timestamp=1532354662, last_known=35211),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 635632), timestamp=1532354662, last_known=35275),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 645538), timestamp=1532354662, last_known=35187),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 706073), timestamp=1532354662, last_known=35184),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 709212), timestamp=1532354662, last_known=35285),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 718452), timestamp=1532354662, last_known=35211),
      Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 729642), timestamp=1532354662, last_known=35275),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 738524), timestamp=1532354662, last_known=35187),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 806095), timestamp=1532354662, last_known=35184),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 815210), timestamp=1532354662, last_known=35285),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 825455), timestamp=1532354662, last_known=35211),
      Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 834640), timestamp=1532354662, last_known=35275),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 844520), timestamp=1532354662, last_known=35187),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 906083), timestamp=1532354662, last_known=35184),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 908243), timestamp=1532354662, last_known=35285),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 918445), timestamp=1532354662, last_known=35211),
      Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 929632), timestamp=1532354662, last_known=35275),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 938511), timestamp=1532354662, last_known=35187)]


      Other solution is:



      _df.orderBy('time').groupBy('timestamp', 'id').agg(F.last('value').alias('last'))
      .groupBy('timestamp').agg(F.sum('last').alias('sum'))
      .sort('timestamp').take(50)


      Output looks promising, but double aggregate seems like asking for troubles... and it should run on TB of data, so speed is concern too.



      [Row(timestamp=1532354662, sum=176142),
      Row(timestamp=1532354663, sum=176142),
      Row(timestamp=1532354664, sum=176139),
      Row(timestamp=1532354665, sum=176137),
      Row(timestamp=1532354666, sum=176133),
      Row(timestamp=1532354667, sum=176128),
      Row(timestamp=1532354668, sum=176125),
      Row(timestamp=1532354669, sum=176122),
      Row(timestamp=1532354670, sum=176120),
      Row(timestamp=1532354671, sum=176118),
      Row(timestamp=1532354672, sum=176117),
      Row(timestamp=1532354673, sum=176114),


      Any help would be much appreciated!



      EDIT
      Terry`s answer is best right now. If someone has better idea please post.










      share|improve this question
















      I have this DataFrame in PySpark:



      [Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 6095), timestamp=1532354662),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 15215), timestamp=1532354662),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 25456), timestamp=1532354662),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 35641), timestamp=1532354662),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 44516), timestamp=1532354662),
      Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 106098), timestamp=1532354662),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 108248), timestamp=1532354662),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 118453), timestamp=1532354662),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 129638), timestamp=1532354662),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 138515), timestamp=1532354662),
      Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 206095), timestamp=1532354662),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 215213), timestamp=1532354662),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 225445), timestamp=1532354662),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 234635), timestamp=1532354662),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 244514), timestamp=1532354662),
      Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 306095), timestamp=1532354662),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 309226), timestamp=1532354662),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 319454), timestamp=1532354662),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 329651), timestamp=1532354662),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 337523), timestamp=1532354662),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 406077), timestamp=1532354662),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 415209), timestamp=1532354662),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 425481), timestamp=1532354662),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 435638), timestamp=1532354662),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 445548), timestamp=1532354662),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 506073), timestamp=1532354662),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 508245), timestamp=1532354662),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 519452), timestamp=1532354662),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 529641), timestamp=1532354662),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 537512), timestamp=1532354662),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 606087), timestamp=1532354662),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 615193), timestamp=1532354662),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 625452), timestamp=1532354662),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 635632), timestamp=1532354662),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 645538), timestamp=1532354662),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 706073), timestamp=1532354662),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 709212), timestamp=1532354662),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 718452), timestamp=1532354662),
      Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 729642), timestamp=1532354662),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 738524), timestamp=1532354662),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 806095), timestamp=1532354662),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 815210), timestamp=1532354662),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 825455), timestamp=1532354662),
      Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 834640), timestamp=1532354662),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 844520), timestamp=1532354662),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 906083), timestamp=1532354662),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 908243), timestamp=1532354662),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 918445), timestamp=1532354662),
      Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 929632), timestamp=1532354662),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 938511), timestamp=1532354662)]


      I need to sum only last known value for each id in second window.



      Outpt should look like this:



       [Row(time=datetime.datetime(2018, 7, 23, 14, 4, 22), sum=176213),
      Row(time=datetime.datetime(2018, 7, 23, 14, 4, 23), sum=176112),
      Row(time=datetime.datetime(2018, 7, 23, 14, 4, 24), sum=175933),
      Row(time=datetime.datetime(2018, 7, 23, 14, 4, 25), sum=175543),
      Row(time=datetime.datetime(2018, 7, 23, 14, 4, 26), sum=175219),
      Row(time=datetime.datetime(2018, 7, 23, 14, 4, 27), sum=175002),
      Row(time=datetime.datetime(2018, 7, 23, 14, 4, 28), sum=174892)...]


      I have tried this:



      w = Window.partitionBy(F.col('id')).orderBy('timestamp')
      _df.withColumn('last_known', F.last('value').over(w)).sort('time').take(1000)


      and it produces new column with last known value for each id, but I don`t know how to sum it.



      [Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 6095), timestamp=1532354662, last_known=35184),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 15215), timestamp=1532354662, last_known=35285),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 25456), timestamp=1532354662, last_known=35211),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 35641), timestamp=1532354662, last_known=35275),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 44516), timestamp=1532354662, last_known=35187),
      Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 106098), timestamp=1532354662, last_known=35184),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 108248), timestamp=1532354662, last_known=35285),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 118453), timestamp=1532354662, last_known=35211),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 129638), timestamp=1532354662, last_known=35275),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 138515), timestamp=1532354662, last_known=35187),
      Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 206095), timestamp=1532354662, last_known=35184),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 215213), timestamp=1532354662, last_known=35285),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 225445), timestamp=1532354662, last_known=35211),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 234635), timestamp=1532354662, last_known=35275),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 244514), timestamp=1532354662, last_known=35187),
      Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 306095), timestamp=1532354662, last_known=35184),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 309226), timestamp=1532354662, last_known=35285),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 319454), timestamp=1532354662, last_known=35211),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 329651), timestamp=1532354662, last_known=35275),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 337523), timestamp=1532354662, last_known=35187),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 406077), timestamp=1532354662, last_known=35184),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 415209), timestamp=1532354662, last_known=35285),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 425481), timestamp=1532354662, last_known=35211),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 435638), timestamp=1532354662, last_known=35275),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 445548), timestamp=1532354662, last_known=35187),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 506073), timestamp=1532354662, last_known=35184),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 508245), timestamp=1532354662, last_known=35285),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 519452), timestamp=1532354662, last_known=35211),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 529641), timestamp=1532354662, last_known=35275),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 537512), timestamp=1532354662, last_known=35187),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 606087), timestamp=1532354662, last_known=35184),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 615193), timestamp=1532354662, last_known=35285),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 625452), timestamp=1532354662, last_known=35211),
      Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 635632), timestamp=1532354662, last_known=35275),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 645538), timestamp=1532354662, last_known=35187),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 706073), timestamp=1532354662, last_known=35184),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 709212), timestamp=1532354662, last_known=35285),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 718452), timestamp=1532354662, last_known=35211),
      Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 729642), timestamp=1532354662, last_known=35275),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 738524), timestamp=1532354662, last_known=35187),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 806095), timestamp=1532354662, last_known=35184),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 815210), timestamp=1532354662, last_known=35285),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 825455), timestamp=1532354662, last_known=35211),
      Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 834640), timestamp=1532354662, last_known=35275),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 844520), timestamp=1532354662, last_known=35187),
      Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 906083), timestamp=1532354662, last_known=35184),
      Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 908243), timestamp=1532354662, last_known=35285),
      Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 918445), timestamp=1532354662, last_known=35211),
      Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 929632), timestamp=1532354662, last_known=35275),
      Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 938511), timestamp=1532354662, last_known=35187)]


      Other solution is:



      _df.orderBy('time').groupBy('timestamp', 'id').agg(F.last('value').alias('last'))
      .groupBy('timestamp').agg(F.sum('last').alias('sum'))
      .sort('timestamp').take(50)


      Output looks promising, but double aggregate seems like asking for troubles... and it should run on TB of data, so speed is concern too.



      [Row(timestamp=1532354662, sum=176142),
      Row(timestamp=1532354663, sum=176142),
      Row(timestamp=1532354664, sum=176139),
      Row(timestamp=1532354665, sum=176137),
      Row(timestamp=1532354666, sum=176133),
      Row(timestamp=1532354667, sum=176128),
      Row(timestamp=1532354668, sum=176125),
      Row(timestamp=1532354669, sum=176122),
      Row(timestamp=1532354670, sum=176120),
      Row(timestamp=1532354671, sum=176118),
      Row(timestamp=1532354672, sum=176117),
      Row(timestamp=1532354673, sum=176114),


      Any help would be much appreciated!



      EDIT
      Terry`s answer is best right now. If someone has better idea please post.







      python apache-spark pyspark apache-spark-sql pyspark-sql






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 15 '18 at 22:22







      Doman

















      asked Nov 15 '18 at 14:42









      DomanDoman

      4815




      4815






















          1 Answer
          1






          active

          oldest

          votes


















          0














          I believe you can replace your first groupBy for a 'dropDuplicates' and set ascending = False in orderBy. Like this:



          df.orderBy(['time'], ascending = False) 
          .dropDuplicates(['timestamp', 'id'])
          .groupBy('timestamp').agg(F.sum('value').alias('sum')).sort('timestamp').show()





          share|improve this answer






















            Your Answer






            StackExchange.ifUsing("editor", function ()
            StackExchange.using("externalEditor", function ()
            StackExchange.using("snippets", function ()
            StackExchange.snippets.init();
            );
            );
            , "code-snippets");

            StackExchange.ready(function()
            var channelOptions =
            tags: "".split(" "),
            id: "1"
            ;
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function()
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled)
            StackExchange.using("snippets", function()
            createEditor();
            );

            else
            createEditor();

            );

            function createEditor()
            StackExchange.prepareEditor(
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader:
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            ,
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            );



            );













            draft saved

            draft discarded


















            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53321915%2fpyspark-sum-of-last-values-by-id-over-timeseries-window%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            0














            I believe you can replace your first groupBy for a 'dropDuplicates' and set ascending = False in orderBy. Like this:



            df.orderBy(['time'], ascending = False) 
            .dropDuplicates(['timestamp', 'id'])
            .groupBy('timestamp').agg(F.sum('value').alias('sum')).sort('timestamp').show()





            share|improve this answer



























              0














              I believe you can replace your first groupBy for a 'dropDuplicates' and set ascending = False in orderBy. Like this:



              df.orderBy(['time'], ascending = False) 
              .dropDuplicates(['timestamp', 'id'])
              .groupBy('timestamp').agg(F.sum('value').alias('sum')).sort('timestamp').show()





              share|improve this answer

























                0












                0








                0







                I believe you can replace your first groupBy for a 'dropDuplicates' and set ascending = False in orderBy. Like this:



                df.orderBy(['time'], ascending = False) 
                .dropDuplicates(['timestamp', 'id'])
                .groupBy('timestamp').agg(F.sum('value').alias('sum')).sort('timestamp').show()





                share|improve this answer













                I believe you can replace your first groupBy for a 'dropDuplicates' and set ascending = False in orderBy. Like this:



                df.orderBy(['time'], ascending = False) 
                .dropDuplicates(['timestamp', 'id'])
                .groupBy('timestamp').agg(F.sum('value').alias('sum')).sort('timestamp').show()






                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Nov 15 '18 at 16:14









                TerryTerry

                417412




                417412





























                    draft saved

                    draft discarded
















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid


                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.

                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function ()
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53321915%2fpyspark-sum-of-last-values-by-id-over-timeseries-window%23new-answer', 'question_page');

                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    這個網誌中的熱門文章

                    How to read a connectionString WITH PROVIDER in .NET Core?

                    In R, how to develop a multiplot heatmap.2 figure showing key labels successfully

                    Museum of Modern and Contemporary Art of Trento and Rovereto