python - pyspark error when working with window function (Spark 2.1.0 reports issue with column not found)? -


update: have created following jira issue: https://issues.apache.org/jira/browse/spark-20086 status: fixed! (over weekend! amazingly quick!)

update2: issue fixed https://github.com/apache/spark/pull/17432 versions 2.1.1, 2.2.0. got newer spark version nightly builds @ http://people.apache.org/~pwendell/spark-nightly/ still run issue if on <=2.1.0.

original post:

i error when working pyspark window function. here example code:

import pyspark import pyspark.sql.functions sf import pyspark.sql.types sparktypes pyspark.sql import window  sc = pyspark.sparkcontext() sqlc = pyspark.sqlcontext(sc) rdd = sc.parallelize([(1, 2.0), (1, 3.0), (1, 1.), (1, -2.), (1, -1.)]) df = sqlc.createdataframe(rdd, ["x", "amtpaid"]) df.show() 

gives:

+---+-------+ |  x|amtpaid| +---+-------+ |  1|    2.0| |  1|    3.0| |  1|    1.0| |  1|   -2.0| |  1|   -1.0| +---+-------+ 

next, compute cumulative sum

win_spec_max = (window.window                 .partitionby(['x'])                 .rowsbetween(window.window.unboundedpreceding, 0))) df = df.withcolumn('amtpaidcumsum',                    sf.sum(sf.col('amtpaid')).over(win_spec_max)) df.show() 

gives,

+---+-------+-------------+ |  x|amtpaid|amtpaidcumsum| +---+-------+-------------+ |  1|    2.0|          2.0| |  1|    3.0|          5.0| |  1|    1.0|          6.0| |  1|   -2.0|          4.0| |  1|   -1.0|          3.0| +---+-------+-------------+  

next, compute cumulative max,

df = df.withcolumn('amtpaidcumsummax', sf.max(sf.col('amtpaidcumsum')).over(win_spec_max))  df.show() 

gives error log

 py4jjavaerror: error occurred while calling o2609.showstring. 

with traceback:

py4jjavaerrortraceback (most recent call last) <ipython-input-215-3106d06b6e49> in <module>() ----> 1 df.show()  /users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)     316         """     317         if isinstance(truncate, bool) , truncate: --> 318             print(self._jdf.showstring(n, 20))     319         else:     320             print(self._jdf.showstring(n, int(truncate)))  /users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)    1131         answer = self.gateway_client.send_command(command)    1132         return_value = get_return_value( -> 1133             answer, self.gateway_client, self.target_id, self.name)    1134     1135         temp_arg in temp_args:  /users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)      61     def deco(*a, **kw):      62         try: ---> 63             return f(*a, **kw)      64         except py4j.protocol.py4jjavaerror e:      65             s = e.java_exception.tostring()  /users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)     317                 raise py4jjavaerror(     318                     "an error occurred while calling {0}{1}{2}.\n". --> 319                     format(target_id, ".", name), value)     320             else:     321                 raise py4jerror( 

but interestingly enough, if introduce change before sencond window operation, inserting column not give error:

df = df.withcolumn('maxbound', sf.lit(6.)) df.show() +---+-------+-------------+--------+ |  x|amtpaid|amtpaidcumsum|maxbound| +---+-------+-------------+--------+ |  1|    2.0|          2.0|     6.0| |  1|    3.0|          5.0|     6.0| |  1|    1.0|          6.0|     6.0| |  1|   -2.0|          4.0|     6.0| |  1|   -1.0|          3.0|     6.0| +---+-------+-------------+--------+   #then apply second window operations df = df.withcolumn('amtpaidcumsummax', sf.max(sf.col('amtpaidcumsum')).over(win_spec_max)) df.show()  +---+-------+-------------+--------+----------------+ |  x|amtpaid|amtpaidcumsum|maxbound|amtpaidcumsummax| +---+-------+-------------+--------+----------------+ |  1|    2.0|          2.0|     6.0|             2.0| |  1|    3.0|          5.0|     6.0|             5.0| |  1|    1.0|          6.0|     6.0|             6.0| |  1|   -2.0|          4.0|     6.0|             6.0| |  1|   -1.0|          3.0|     6.0|             6.0| +---+-------+-------------+--------+----------------+    

i not understand behaviour

well, far good, try operation again similar error:

def _udf_compare_cumsum_sll(x):     if x['amtpaidcumsummax'] >= x['maxbound']:         output = 0     else:         output = x['amtpaid']     return output   udf_compare_cumsum_sll = sf.udf(_udf_compare_cumsum_sll, sparktypes.floattype()) df = df.withcolumn('amtpaidadjusted', udf_compare_cumsum_sll(sf.struct([df[x] x in df.columns]))) df.show() 

gives,

py4jjavaerrortraceback (most recent call last) <ipython-input-18-3106d06b6e49> in <module>() ----> 1 df.show()  /users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)     316         """     317         if isinstance(truncate, bool) , truncate: --> 318             print(self._jdf.showstring(n, 20))     319         else:     320             print(self._jdf.showstring(n, int(truncate)))  /users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)    1131         answer = self.gateway_client.send_command(command)    1132         return_value = get_return_value( -> 1133             answer, self.gateway_client, self.target_id, self.name)    1134     1135         temp_arg in temp_args:  /users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)      61     def deco(*a, **kw):      62         try: ---> 63             return f(*a, **kw)      64         except py4j.protocol.py4jjavaerror e:      65             s = e.java_exception.tostring()  /users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)     317                 raise py4jjavaerror(     318                     "an error occurred while calling {0}{1}{2}.\n". --> 319                     format(target_id, ".", name), value)     320             else:     321                 raise py4jerror(  py4jjavaerror: error occurred while calling o91.showstring. : org.apache.spark.sparkexception: job aborted due stage failure: task 0 in stage 36.0 failed 1 times, recent failure: lost task 0.0 in stage 36.0 (tid 645, localhost, executor driver): org.apache.spark.sql.catalyst.errors.package$treenodeexception: binding attribute, tree: amtpaidcumsum#10 

i wonder if reproduce behaviour ...

here complete log ..

py4jjavaerrortraceback (most recent call last) <ipython-input-69-3106d06b6e49> in <module>() ----> 1 df.show()  /users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)     316         """     317         if isinstance(truncate, bool) , truncate: --> 318             print(self._jdf.showstring(n, 20))     319         else:     320             print(self._jdf.showstring(n, int(truncate)))  /users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)    1131         answer = self.gateway_client.send_command(command)    1132         return_value = get_return_value( -> 1133             answer, self.gateway_client, self.target_id, self.name)    1134    1135         temp_arg in temp_args:  /users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)      61     def deco(*a, **kw):      62         try: ---> 63             return f(*a, **kw)      64         except py4j.protocol.py4jjavaerror e:      65             s = e.java_exception.tostring()  /users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)     317                 raise py4jjavaerror(     318                     "an error occurred while calling {0}{1}{2}.\n". --> 319                     format(target_id, ".", name), value)     320             else:     321                 raise py4jerror(  py4jjavaerror: error occurred while calling o703.showstring. : org.apache.spark.sparkexception: job aborted due stage failure: task 0 in stage 119.0 failed 1 times, recent failure: lost task 0.0 in stage 119.0 (tid 1817, localhost, executor driver): org.apache.spark.sql.catalyst.errors.package$treenodeexception: binding attribute, tree: amtpaidcumsum#2076     @ org.apache.spark.sql.catalyst.errors.package$.attachtree(package.scala:56)     @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1.applyorelse(boundattribute.scala:88)     @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1.applyorelse(boundattribute.scala:87)     @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:288)     @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:288)     @ org.apache.spark.sql.catalyst.trees.currentorigin$.withorigin(treenode.scala:70)     @ org.apache.spark.sql.catalyst.trees.treenode.transformdown(treenode.scala:287)     @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$transformdown$1.apply(treenode.scala:293)     @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$transformdown$1.apply(treenode.scala:293)     @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$5$$anonfun$apply$11.apply(treenode.scala:360)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)     @ scala.collection.immutable.list.foreach(list.scala:381)     @ scala.collection.traversablelike$class.map(traversablelike.scala:234)     @ scala.collection.immutable.list.map(list.scala:285)     @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$5.apply(treenode.scala:358)     @ org.apache.spark.sql.catalyst.trees.treenode.mapproductiterator(treenode.scala:188)     @ org.apache.spark.sql.catalyst.trees.treenode.transformchildren(treenode.scala:329)     @ org.apache.spark.sql.catalyst.trees.treenode.transformdown(treenode.scala:293)     @ org.apache.spark.sql.catalyst.trees.treenode.transform(treenode.scala:277)     @ org.apache.spark.sql.catalyst.expressions.bindreferences$.bindreference(boundattribute.scala:87)     @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$$anonfun$bind$1.apply(generatemutableprojection.scala:38)     @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$$anonfun$bind$1.apply(generatemutableprojection.scala:38)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)     @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59)     @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:48)     @ scala.collection.traversablelike$class.map(traversablelike.scala:234)     @ scala.collection.abstracttraversable.map(traversable.scala:104)     @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$.bind(generatemutableprojection.scala:38)     @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$.generate(generatemutableprojection.scala:44)     @ org.apache.spark.sql.execution.sparkplan.newmutableprojection(sparkplan.scala:353)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$org$apache$spark$sql$execution$window$windowexec$$anonfun$$processor$1$1.apply(windowexec.scala:203)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$org$apache$spark$sql$execution$window$windowexec$$anonfun$$processor$1$1.apply(windowexec.scala:202)     @ org.apache.spark.sql.execution.window.aggregateprocessor$.apply(aggregateprocessor.scala:98)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2.org$apache$spark$sql$execution$window$windowexec$$anonfun$$processor$1(windowexec.scala:198)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$6.apply(windowexec.scala:225)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$6.apply(windowexec.scala:222)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14$$anon$1$$anonfun$16.apply(windowexec.scala:318)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14$$anon$1$$anonfun$16.apply(windowexec.scala:318)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)     @ scala.collection.indexedseqoptimized$class.foreach(indexedseqoptimized.scala:33)     @ scala.collection.mutable.arrayops$ofref.foreach(arrayops.scala:186)     @ scala.collection.traversablelike$class.map(traversablelike.scala:234)     @ scala.collection.mutable.arrayops$ofref.map(arrayops.scala:186)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14$$anon$1.<init>(windowexec.scala:318)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14.apply(windowexec.scala:290)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14.apply(windowexec.scala:289)     @ org.apache.spark.rdd.rdd$$anonfun$mappartitions$1$$anonfun$apply$23.apply(rdd.scala:796)     @ org.apache.spark.rdd.rdd$$anonfun$mappartitions$1$$anonfun$apply$23.apply(rdd.scala:796)     @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38)     @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323)     @ org.apache.spark.rdd.rdd.iterator(rdd.scala:287)     @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38)     @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323)     @ org.apache.spark.rdd.rdd.iterator(rdd.scala:287)     @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:87)     @ org.apache.spark.scheduler.task.run(task.scala:99)     @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:282)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617)     @ java.lang.thread.run(thread.java:745) caused by: java.lang.runtimeexception: couldn't find amtpaidcumsum#2076 in [sum#2299,max#2300,x#2066l,amtpaid#2067]     @ scala.sys.package$.error(package.scala:27)     @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1$$anonfun$applyorelse$1.apply(boundattribute.scala:94)     @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1$$anonfun$applyorelse$1.apply(boundattribute.scala:88)     @ org.apache.spark.sql.catalyst.errors.package$.attachtree(package.scala:52)     ... 62 more  driver stacktrace:     @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler.scala:1435)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1423)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1422)     @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59)     @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:48)     @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1422)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:802)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:802)     @ scala.option.foreach(option.scala:257)     @ org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler.scala:802)     @ org.apache.spark.scheduler.dagschedulereventprocessloop.doonreceive(dagscheduler.scala:1650)     @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1605)     @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1594)     @ org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48)     @ org.apache.spark.scheduler.dagscheduler.runjob(dagscheduler.scala:628)     @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1918)     @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1931)     @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1944)     @ org.apache.spark.sql.execution.sparkplan.executetake(sparkplan.scala:333)     @ org.apache.spark.sql.execution.collectlimitexec.executecollect(limit.scala:38)     @ org.apache.spark.sql.dataset$$anonfun$org$apache$spark$sql$dataset$$execute$1$1.apply(dataset.scala:2371)     @ org.apache.spark.sql.execution.sqlexecution$.withnewexecutionid(sqlexecution.scala:57)     @ org.apache.spark.sql.dataset.withnewexecutionid(dataset.scala:2765)     @ org.apache.spark.sql.dataset.org$apache$spark$sql$dataset$$execute$1(dataset.scala:2370)     @ org.apache.spark.sql.dataset.org$apache$spark$sql$dataset$$collect(dataset.scala:2377)     @ org.apache.spark.sql.dataset$$anonfun$head$1.apply(dataset.scala:2113)     @ org.apache.spark.sql.dataset$$anonfun$head$1.apply(dataset.scala:2112)     @ org.apache.spark.sql.dataset.withtypedcallback(dataset.scala:2795)     @ org.apache.spark.sql.dataset.head(dataset.scala:2112)     @ org.apache.spark.sql.dataset.take(dataset.scala:2327)     @ org.apache.spark.sql.dataset.showstring(dataset.scala:248)     @ sun.reflect.generatedmethodaccessor83.invoke(unknown source)     @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43)     @ java.lang.reflect.method.invoke(method.java:498)     @ py4j.reflection.methodinvoker.invoke(methodinvoker.java:244)     @ py4j.reflection.reflectionengine.invoke(reflectionengine.java:357)     @ py4j.gateway.invoke(gateway.java:280)     @ py4j.commands.abstractcommand.invokemethod(abstractcommand.java:132)     @ py4j.commands.callcommand.execute(callcommand.java:79)     @ py4j.gatewayconnection.run(gatewayconnection.java:214)     @ java.lang.thread.run(thread.java:745) caused by: org.apache.spark.sql.catalyst.errors.package$treenodeexception: binding attribute, tree: null     @ org.apache.spark.sql.catalyst.errors.package$.attachtree(package.scala:56)     @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1.applyorelse(boundattribute.scala:88)     @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1.applyorelse(boundattribute.scala:87)     @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:288)     @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:288)     @ org.apache.spark.sql.catalyst.trees.currentorigin$.withorigin(treenode.scala:70)     @ org.apache.spark.sql.catalyst.trees.treenode.transformdown(treenode.scala:287)     @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$transformdown$1.apply(treenode.scala:293)     @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$transformdown$1.apply(treenode.scala:293)     @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$5$$anonfun$apply$11.apply(treenode.scala:360)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)     @ scala.collection.immutable.list.foreach(list.scala:381)     @ scala.collection.traversablelike$class.map(traversablelike.scala:234)     @ scala.collection.immutable.list.map(list.scala:285)     @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$5.apply(treenode.scala:358)     @ org.apache.spark.sql.catalyst.trees.treenode.mapproductiterator(treenode.scala:188)     @ org.apache.spark.sql.catalyst.trees.treenode.transformchildren(treenode.scala:329)     @ org.apache.spark.sql.catalyst.trees.treenode.transformdown(treenode.scala:293)     @ org.apache.spark.sql.catalyst.trees.treenode.transform(treenode.scala:277)     @ org.apache.spark.sql.catalyst.expressions.bindreferences$.bindreference(boundattribute.scala:87)     @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$$anonfun$bind$1.apply(generatemutableprojection.scala:38)     @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$$anonfun$bind$1.apply(generatemutableprojection.scala:38)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)     @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59)     @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:48)     @ scala.collection.traversablelike$class.map(traversablelike.scala:234)     @ scala.collection.abstracttraversable.map(traversable.scala:104)     @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$.bind(generatemutableprojection.scala:38)     @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$.generate(generatemutableprojection.scala:44)     @ org.apache.spark.sql.execution.sparkplan.newmutableprojection(sparkplan.scala:353)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$org$apache$spark$sql$execution$window$windowexec$$anonfun$$processor$1$1.apply(windowexec.scala:203)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$org$apache$spark$sql$execution$window$windowexec$$anonfun$$processor$1$1.apply(windowexec.scala:202)     @ org.apache.spark.sql.execution.window.aggregateprocessor$.apply(aggregateprocessor.scala:98)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2.org$apache$spark$sql$execution$window$windowexec$$anonfun$$processor$1(windowexec.scala:198)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$6.apply(windowexec.scala:225)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$6.apply(windowexec.scala:222)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14$$anon$1$$anonfun$16.apply(windowexec.scala:318)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14$$anon$1$$anonfun$16.apply(windowexec.scala:318)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)     @ scala.collection.indexedseqoptimized$class.foreach(indexedseqoptimized.scala:33)     @ scala.collection.mutable.arrayops$ofref.foreach(arrayops.scala:186)     @ scala.collection.traversablelike$class.map(traversablelike.scala:234)     @ scala.collection.mutable.arrayops$ofref.map(arrayops.scala:186)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14$$anon$1.<init>(windowexec.scala:318)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14.apply(windowexec.scala:290)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14.apply(windowexec.scala:289)     @ org.apache.spark.rdd.rdd$$anonfun$mappartitions$1$$anonfun$apply$23.apply(rdd.scala:796)     @ org.apache.spark.rdd.rdd$$anonfun$mappartitions$1$$anonfun$apply$23.apply(rdd.scala:796)     @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38)     @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323)     @ org.apache.spark.rdd.rdd.iterator(rdd.scala:287)     @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38)     @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323)     @ org.apache.spark.rdd.rdd.iterator(rdd.scala:287)     @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:87)     @ org.apache.spark.scheduler.task.run(task.scala:99)     @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:282)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617)     ... 1 more caused by: java.lang.runtimeexception: couldn't find amtpaidcumsum#2076 in [sum#2299,max#2300,x#2066l,amtpaid#2067]     @ scala.sys.package$.error(package.scala:27)     @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1$$anonfun$applyorelse$1.apply(boundattribute.scala:94)     @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1$$anonfun$applyorelse$1.apply(boundattribute.scala:88)     @ org.apache.spark.sql.catalyst.errors.package$.attachtree(package.scala:52)     ... 62 more 

it appears there issue window operator support in spark 2.1 , 2.2.0-snapshot (built today master). see following in scala.

think should report issue in spark's jira.

val inventory = seq(   (1, 2.0), (1, 3.0), (1, 1.0), (1, -2.0), (1, -1.0)).todf("x", "amtpaid")  scala> inventory.printschema root  |-- x: integer (nullable = false)  |-- amtpaid: double (nullable = false)  import org.apache.spark.sql.expressions.window val byxwithallrowsbefore = window.partitionby("x").rowsbetween(window.unboundedpreceding, window.currentrow)  import org.apache.spark.sql.functions.sum val sumoveramtpaid = inventory.withcolumn("amtpaidcumsum", sum($"amtpaid") on byxwithallrowsbefore)  scala> sumoveramtpaid.show +---+-------+-------------+ |  x|amtpaid|amtpaidcumsum| +---+-------+-------------+ |  1|    2.0|          2.0| |  1|    3.0|          5.0| |  1|    1.0|          6.0| |  1|   -2.0|          4.0| |  1|   -1.0|          3.0| +---+-------+-------------+  scala> sumoveramtpaid.printschema root  |-- x: integer (nullable = false)  |-- amtpaid: double (nullable = false)  |-- amtpaidcumsum: double (nullable = true) 

so far good. in python.

cumulative max

the following won't work due java.lang.runtimeexception.

import org.apache.spark.sql.functions.max val cumulativemax = sumoveramtpaid   .withcolumn("amtpaidcumsummax", max($"amtpaidcumsum") on byxwithallrowsbefore)  scala> cumulativemax.show 17/03/24 22:12:16 error executor: exception in task 0.0 in stage 11.0 (tid 210) org.apache.spark.sql.catalyst.errors.package$treenodeexception: binding attribute, tree: amtpaidcumsum#11     @ org.apache.spark.sql.catalyst.errors.package$.attachtree(package.scala:56)     @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1.applyorelse(boundattribute.scala:88)     @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1.applyorelse(boundattribute.scala:87)     @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$2.apply(treenode.scala:267)     @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$2.apply(treenode.scala:267)     @ org.apache.spark.sql.catalyst.trees.currentorigin$.withorigin(treenode.scala:70)     @ org.apache.spark.sql.catalyst.trees.treenode.transformdown(treenode.scala:266)     @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$transformdown$1.apply(treenode.scala:272)     @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$transformdown$1.apply(treenode.scala:272)     @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$4$$anonfun$apply$11.apply(treenode.scala:335)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)     @ scala.collection.immutable.list.foreach(list.scala:381)     @ scala.collection.traversablelike$class.map(traversablelike.scala:234)     @ scala.collection.immutable.list.map(list.scala:285)     @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$4.apply(treenode.scala:333)     @ org.apache.spark.sql.catalyst.trees.treenode.mapproductiterator(treenode.scala:187)     @ org.apache.spark.sql.catalyst.trees.treenode.mapchildren(treenode.scala:304)     @ org.apache.spark.sql.catalyst.trees.treenode.transformdown(treenode.scala:272)     @ org.apache.spark.sql.catalyst.trees.treenode.transform(treenode.scala:256)     @ org.apache.spark.sql.catalyst.expressions.bindreferences$.bindreference(boundattribute.scala:87)     @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$$anonfun$bind$1.apply(generatemutableprojection.scala:38)     @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$$anonfun$bind$1.apply(generatemutableprojection.scala:38)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)     @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59)     @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:48)     @ scala.collection.traversablelike$class.map(traversablelike.scala:234)     @ scala.collection.abstracttraversable.map(traversable.scala:104)     @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$.bind(generatemutableprojection.scala:38)     @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$.generate(generatemutableprojection.scala:44)     @ org.apache.spark.sql.execution.sparkplan.newmutableprojection(sparkplan.scala:353)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$org$apache$spark$sql$execution$window$windowexec$$anonfun$$processor$1$1.apply(windowexec.scala:201)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$org$apache$spark$sql$execution$window$windowexec$$anonfun$$processor$1$1.apply(windowexec.scala:200)     @ org.apache.spark.sql.execution.window.aggregateprocessor$.apply(aggregateprocessor.scala:98)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2.org$apache$spark$sql$execution$window$windowexec$$anonfun$$processor$1(windowexec.scala:196)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$6.apply(windowexec.scala:223)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$6.apply(windowexec.scala:220)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14$$anon$1$$anonfun$16.apply(windowexec.scala:319)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14$$anon$1$$anonfun$16.apply(windowexec.scala:319)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)     @ scala.collection.indexedseqoptimized$class.foreach(indexedseqoptimized.scala:33)     @ scala.collection.mutable.arrayops$ofref.foreach(arrayops.scala:186)     @ scala.collection.traversablelike$class.map(traversablelike.scala:234)     @ scala.collection.mutable.arrayops$ofref.map(arrayops.scala:186)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14$$anon$1.<init>(windowexec.scala:319)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14.apply(windowexec.scala:289)     @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14.apply(windowexec.scala:288)     @ org.apache.spark.rdd.rdd$$anonfun$mappartitions$1$$anonfun$apply$23.apply(rdd.scala:797)     @ org.apache.spark.rdd.rdd$$anonfun$mappartitions$1$$anonfun$apply$23.apply(rdd.scala:797)     @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38)     @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323)     @ org.apache.spark.rdd.rdd.iterator(rdd.scala:287)     @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38)     @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323)     @ org.apache.spark.rdd.rdd.iterator(rdd.scala:287)     @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:87)     @ org.apache.spark.scheduler.task.run(task.scala:108)     @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:320)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617)     @ java.lang.thread.run(thread.java:745) caused by: java.lang.runtimeexception: couldn't find amtpaidcumsum#11 in [sum#234,max#235,x#5,amtpaid#6]     @ scala.sys.package$.error(package.scala:27)     @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1$$anonfun$applyorelse$1.apply(boundattribute.scala:94)     @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1$$anonfun$applyorelse$1.apply(boundattribute.scala:88)     @ org.apache.spark.sql.catalyst.errors.package$.attachtree(package.scala:52)     ... 62 more 

the runtimeexception says:

couldn't find amtpaidcumsum#11 in [sum#234,max#235,x#5,amtpaid#6]

it appears there's sum column, isn't it? let's use instead of $"amtpaidcumsum" in max.

this time spark reports analysisexception includes amtpaidcumsum column (!)

org.apache.spark.sql.analysisexception: cannot resolve 'sum' given input columns: [x, amtpaid, amtpaidcumsum];;

scala> val cumulativemax = sumoveramtpaid.withcolumn("amtpaidcumsummax", max($"sum") on byxwithallrowsbefore) org.apache.spark.sql.analysisexception: cannot resolve '`sum`' given input columns: [x, amtpaid, amtpaidcumsum];; 'project [x#5, amtpaid#6, amtpaidcumsum#11, max('sum) windowspecdefinition(x#5, rows between unbounded preceding , current row) amtpaidcumsummax#237] +- project [x#5, amtpaid#6, amtpaidcumsum#11]    +- project [x#5, amtpaid#6, amtpaidcumsum#11, amtpaidcumsum#11]       +- window [sum(amtpaid#6) windowspecdefinition(x#5, rows between unbounded preceding , current row) amtpaidcumsum#11], [x#5]          +- project [x#5, amtpaid#6]             +- project [_1#2 x#5, _2#3 amtpaid#6]                +- localrelation [_1#2, _2#3]    @ org.apache.spark.sql.catalyst.analysis.package$analysiserrorat.failanalysis(package.scala:42)   @ org.apache.spark.sql.catalyst.analysis.checkanalysis$$anonfun$checkanalysis$1$$anonfun$apply$2.applyorelse(checkanalysis.scala:89)   @ org.apache.spark.sql.catalyst.analysis.checkanalysis$$anonfun$checkanalysis$1$$anonfun$apply$2.applyorelse(checkanalysis.scala:86)   @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$transformup$1.apply(treenode.scala:289)   @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$transformup$1.apply(treenode.scala:289)   @ org.apache.spark.sql.catalyst.trees.currentorigin$.withorigin(treenode.scala:70)   @ org.apache.spark.sql.catalyst.trees.treenode.transformup(treenode.scala:288)   @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:286)   @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:286)   @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$4.apply(treenode.scala:306)   @ org.apache.spark.sql.catalyst.trees.treenode.mapproductiterator(treenode.scala:187)   @ org.apache.spark.sql.catalyst.trees.treenode.mapchildren(treenode.scala:304)   @ org.apache.spark.sql.catalyst.trees.treenode.transformup(treenode.scala:286)   @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:286)   @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:286)   @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$4.apply(treenode.scala:306)   @ org.apache.spark.sql.catalyst.trees.treenode.mapproductiterator(treenode.scala:187)   @ org.apache.spark.sql.catalyst.trees.treenode.mapchildren(treenode.scala:304)   @ org.apache.spark.sql.catalyst.trees.treenode.transformup(treenode.scala:286)   @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:286)   @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:286)   @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$4.apply(treenode.scala:306)   @ org.apache.spark.sql.catalyst.trees.treenode.mapproductiterator(treenode.scala:187)   @ org.apache.spark.sql.catalyst.trees.treenode.mapchildren(treenode.scala:304)   @ org.apache.spark.sql.catalyst.trees.treenode.transformup(treenode.scala:286)   @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:286)   @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:286)   @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$4.apply(treenode.scala:306)   @ org.apache.spark.sql.catalyst.trees.treenode.mapproductiterator(treenode.scala:187)   @ org.apache.spark.sql.catalyst.trees.treenode.mapchildren(treenode.scala:304)   @ org.apache.spark.sql.catalyst.trees.treenode.transformup(treenode.scala:286)   @ org.apache.spark.sql.catalyst.plans.queryplan$$anonfun$transformexpressionsup$1.apply(queryplan.scala:256)   @ org.apache.spark.sql.catalyst.plans.queryplan$$anonfun$transformexpressionsup$1.apply(queryplan.scala:256)   @ org.apache.spark.sql.catalyst.plans.queryplan.transformexpression$1(queryplan.scala:267)   @ org.apache.spark.sql.catalyst.plans.queryplan.org$apache$spark$sql$catalyst$plans$queryplan$$recursivetransform$1(queryplan.scala:277)   @ org.apache.spark.sql.catalyst.plans.queryplan$$anonfun$org$apache$spark$sql$catalyst$plans$queryplan$$recursivetransform$1$1.apply(queryplan.scala:281)   @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)   @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234)   @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59)   @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:48)   @ scala.collection.traversablelike$class.map(traversablelike.scala:234)   @ scala.collection.abstracttraversable.map(traversable.scala:104)   @ org.apache.spark.sql.catalyst.plans.queryplan.org$apache$spark$sql$catalyst$plans$queryplan$$recursivetransform$1(queryplan.scala:281)   @ org.apache.spark.sql.catalyst.plans.queryplan$$anonfun$6.apply(queryplan.scala:286)   @ org.apache.spark.sql.catalyst.trees.treenode.mapproductiterator(treenode.scala:187)   @ org.apache.spark.sql.catalyst.plans.queryplan.mapexpressions(queryplan.scala:286)   @ org.apache.spark.sql.catalyst.plans.queryplan.transformexpressionsup(queryplan.scala:256)   @ org.apache.spark.sql.catalyst.analysis.checkanalysis$$anonfun$checkanalysis$1.apply(checkanalysis.scala:86)   @ org.apache.spark.sql.catalyst.analysis.checkanalysis$$anonfun$checkanalysis$1.apply(checkanalysis.scala:79)   @ org.apache.spark.sql.catalyst.trees.treenode.foreachup(treenode.scala:127)   @ org.apache.spark.sql.catalyst.analysis.checkanalysis$class.checkanalysis(checkanalysis.scala:79)   @ org.apache.spark.sql.catalyst.analysis.analyzer.checkanalysis(analyzer.scala:90)   @ org.apache.spark.sql.execution.queryexecution.assertanalyzed(queryexecution.scala:53)   @ org.apache.spark.sql.dataset$.ofrows(dataset.scala:67)   @ org.apache.spark.sql.dataset.org$apache$spark$sql$dataset$$withplan(dataset.scala:2832)   @ org.apache.spark.sql.dataset.select(dataset.scala:1137)   @ org.apache.spark.sql.dataset.withcolumn(dataset.scala:1882)   ... 48 elided 

Comments

Post a Comment