python - pyspark error when working with window function (Spark 2.1.0 reports issue with column not found)? -
update: have created following jira issue: https://issues.apache.org/jira/browse/spark-20086 status: fixed! (over weekend! amazingly quick!)
update2: issue fixed https://github.com/apache/spark/pull/17432 versions 2.1.1, 2.2.0
. got newer spark version nightly builds @ http://people.apache.org/~pwendell/spark-nightly/ still run issue if on <=2.1.0
.
original post:
i error when working pyspark window function. here example code:
import pyspark import pyspark.sql.functions sf import pyspark.sql.types sparktypes pyspark.sql import window sc = pyspark.sparkcontext() sqlc = pyspark.sqlcontext(sc) rdd = sc.parallelize([(1, 2.0), (1, 3.0), (1, 1.), (1, -2.), (1, -1.)]) df = sqlc.createdataframe(rdd, ["x", "amtpaid"]) df.show()
gives:
+---+-------+ | x|amtpaid| +---+-------+ | 1| 2.0| | 1| 3.0| | 1| 1.0| | 1| -2.0| | 1| -1.0| +---+-------+
next, compute cumulative sum
win_spec_max = (window.window .partitionby(['x']) .rowsbetween(window.window.unboundedpreceding, 0))) df = df.withcolumn('amtpaidcumsum', sf.sum(sf.col('amtpaid')).over(win_spec_max)) df.show()
gives,
+---+-------+-------------+ | x|amtpaid|amtpaidcumsum| +---+-------+-------------+ | 1| 2.0| 2.0| | 1| 3.0| 5.0| | 1| 1.0| 6.0| | 1| -2.0| 4.0| | 1| -1.0| 3.0| +---+-------+-------------+
next, compute cumulative max,
df = df.withcolumn('amtpaidcumsummax', sf.max(sf.col('amtpaidcumsum')).over(win_spec_max)) df.show()
gives error log
py4jjavaerror: error occurred while calling o2609.showstring.
with traceback:
py4jjavaerrortraceback (most recent call last) <ipython-input-215-3106d06b6e49> in <module>() ----> 1 df.show() /users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate) 316 """ 317 if isinstance(truncate, bool) , truncate: --> 318 print(self._jdf.showstring(n, 20)) 319 else: 320 print(self._jdf.showstring(n, int(truncate))) /users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 temp_arg in temp_args: /users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.py4jjavaerror e: 65 s = e.java_exception.tostring() /users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 317 raise py4jjavaerror( 318 "an error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else: 321 raise py4jerror(
but interestingly enough, if introduce change before sencond window operation, inserting column not give error:
df = df.withcolumn('maxbound', sf.lit(6.)) df.show() +---+-------+-------------+--------+ | x|amtpaid|amtpaidcumsum|maxbound| +---+-------+-------------+--------+ | 1| 2.0| 2.0| 6.0| | 1| 3.0| 5.0| 6.0| | 1| 1.0| 6.0| 6.0| | 1| -2.0| 4.0| 6.0| | 1| -1.0| 3.0| 6.0| +---+-------+-------------+--------+ #then apply second window operations df = df.withcolumn('amtpaidcumsummax', sf.max(sf.col('amtpaidcumsum')).over(win_spec_max)) df.show() +---+-------+-------------+--------+----------------+ | x|amtpaid|amtpaidcumsum|maxbound|amtpaidcumsummax| +---+-------+-------------+--------+----------------+ | 1| 2.0| 2.0| 6.0| 2.0| | 1| 3.0| 5.0| 6.0| 5.0| | 1| 1.0| 6.0| 6.0| 6.0| | 1| -2.0| 4.0| 6.0| 6.0| | 1| -1.0| 3.0| 6.0| 6.0| +---+-------+-------------+--------+----------------+
i not understand behaviour
well, far good, try operation again similar error:
def _udf_compare_cumsum_sll(x): if x['amtpaidcumsummax'] >= x['maxbound']: output = 0 else: output = x['amtpaid'] return output udf_compare_cumsum_sll = sf.udf(_udf_compare_cumsum_sll, sparktypes.floattype()) df = df.withcolumn('amtpaidadjusted', udf_compare_cumsum_sll(sf.struct([df[x] x in df.columns]))) df.show()
gives,
py4jjavaerrortraceback (most recent call last) <ipython-input-18-3106d06b6e49> in <module>() ----> 1 df.show() /users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate) 316 """ 317 if isinstance(truncate, bool) , truncate: --> 318 print(self._jdf.showstring(n, 20)) 319 else: 320 print(self._jdf.showstring(n, int(truncate))) /users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 temp_arg in temp_args: /users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.py4jjavaerror e: 65 s = e.java_exception.tostring() /users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 317 raise py4jjavaerror( 318 "an error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else: 321 raise py4jerror( py4jjavaerror: error occurred while calling o91.showstring. : org.apache.spark.sparkexception: job aborted due stage failure: task 0 in stage 36.0 failed 1 times, recent failure: lost task 0.0 in stage 36.0 (tid 645, localhost, executor driver): org.apache.spark.sql.catalyst.errors.package$treenodeexception: binding attribute, tree: amtpaidcumsum#10
i wonder if reproduce behaviour ...
here complete log ..
py4jjavaerrortraceback (most recent call last) <ipython-input-69-3106d06b6e49> in <module>() ----> 1 df.show() /users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate) 316 """ 317 if isinstance(truncate, bool) , truncate: --> 318 print(self._jdf.showstring(n, 20)) 319 else: 320 print(self._jdf.showstring(n, int(truncate))) /users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 temp_arg in temp_args: /users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.py4jjavaerror e: 65 s = e.java_exception.tostring() /users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 317 raise py4jjavaerror( 318 "an error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else: 321 raise py4jerror( py4jjavaerror: error occurred while calling o703.showstring. : org.apache.spark.sparkexception: job aborted due stage failure: task 0 in stage 119.0 failed 1 times, recent failure: lost task 0.0 in stage 119.0 (tid 1817, localhost, executor driver): org.apache.spark.sql.catalyst.errors.package$treenodeexception: binding attribute, tree: amtpaidcumsum#2076 @ org.apache.spark.sql.catalyst.errors.package$.attachtree(package.scala:56) @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1.applyorelse(boundattribute.scala:88) @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1.applyorelse(boundattribute.scala:87) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:288) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:288) @ org.apache.spark.sql.catalyst.trees.currentorigin$.withorigin(treenode.scala:70) @ org.apache.spark.sql.catalyst.trees.treenode.transformdown(treenode.scala:287) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$transformdown$1.apply(treenode.scala:293) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$transformdown$1.apply(treenode.scala:293) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$5$$anonfun$apply$11.apply(treenode.scala:360) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.immutable.list.foreach(list.scala:381) @ scala.collection.traversablelike$class.map(traversablelike.scala:234) @ scala.collection.immutable.list.map(list.scala:285) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$5.apply(treenode.scala:358) @ org.apache.spark.sql.catalyst.trees.treenode.mapproductiterator(treenode.scala:188) @ org.apache.spark.sql.catalyst.trees.treenode.transformchildren(treenode.scala:329) @ org.apache.spark.sql.catalyst.trees.treenode.transformdown(treenode.scala:293) @ org.apache.spark.sql.catalyst.trees.treenode.transform(treenode.scala:277) @ org.apache.spark.sql.catalyst.expressions.bindreferences$.bindreference(boundattribute.scala:87) @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$$anonfun$bind$1.apply(generatemutableprojection.scala:38) @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$$anonfun$bind$1.apply(generatemutableprojection.scala:38) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:48) @ scala.collection.traversablelike$class.map(traversablelike.scala:234) @ scala.collection.abstracttraversable.map(traversable.scala:104) @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$.bind(generatemutableprojection.scala:38) @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$.generate(generatemutableprojection.scala:44) @ org.apache.spark.sql.execution.sparkplan.newmutableprojection(sparkplan.scala:353) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$org$apache$spark$sql$execution$window$windowexec$$anonfun$$processor$1$1.apply(windowexec.scala:203) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$org$apache$spark$sql$execution$window$windowexec$$anonfun$$processor$1$1.apply(windowexec.scala:202) @ org.apache.spark.sql.execution.window.aggregateprocessor$.apply(aggregateprocessor.scala:98) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2.org$apache$spark$sql$execution$window$windowexec$$anonfun$$processor$1(windowexec.scala:198) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$6.apply(windowexec.scala:225) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$6.apply(windowexec.scala:222) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14$$anon$1$$anonfun$16.apply(windowexec.scala:318) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14$$anon$1$$anonfun$16.apply(windowexec.scala:318) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.indexedseqoptimized$class.foreach(indexedseqoptimized.scala:33) @ scala.collection.mutable.arrayops$ofref.foreach(arrayops.scala:186) @ scala.collection.traversablelike$class.map(traversablelike.scala:234) @ scala.collection.mutable.arrayops$ofref.map(arrayops.scala:186) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14$$anon$1.<init>(windowexec.scala:318) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14.apply(windowexec.scala:290) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14.apply(windowexec.scala:289) @ org.apache.spark.rdd.rdd$$anonfun$mappartitions$1$$anonfun$apply$23.apply(rdd.scala:796) @ org.apache.spark.rdd.rdd$$anonfun$mappartitions$1$$anonfun$apply$23.apply(rdd.scala:796) @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:287) @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:287) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:87) @ org.apache.spark.scheduler.task.run(task.scala:99) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:282) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745) caused by: java.lang.runtimeexception: couldn't find amtpaidcumsum#2076 in [sum#2299,max#2300,x#2066l,amtpaid#2067] @ scala.sys.package$.error(package.scala:27) @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1$$anonfun$applyorelse$1.apply(boundattribute.scala:94) @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1$$anonfun$applyorelse$1.apply(boundattribute.scala:88) @ org.apache.spark.sql.catalyst.errors.package$.attachtree(package.scala:52) ... 62 more driver stacktrace: @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler.scala:1435) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1423) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1422) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:48) @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1422) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:802) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:802) @ scala.option.foreach(option.scala:257) @ org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler.scala:802) @ org.apache.spark.scheduler.dagschedulereventprocessloop.doonreceive(dagscheduler.scala:1650) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1605) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1594) @ org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48) @ org.apache.spark.scheduler.dagscheduler.runjob(dagscheduler.scala:628) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1918) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1931) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1944) @ org.apache.spark.sql.execution.sparkplan.executetake(sparkplan.scala:333) @ org.apache.spark.sql.execution.collectlimitexec.executecollect(limit.scala:38) @ org.apache.spark.sql.dataset$$anonfun$org$apache$spark$sql$dataset$$execute$1$1.apply(dataset.scala:2371) @ org.apache.spark.sql.execution.sqlexecution$.withnewexecutionid(sqlexecution.scala:57) @ org.apache.spark.sql.dataset.withnewexecutionid(dataset.scala:2765) @ org.apache.spark.sql.dataset.org$apache$spark$sql$dataset$$execute$1(dataset.scala:2370) @ org.apache.spark.sql.dataset.org$apache$spark$sql$dataset$$collect(dataset.scala:2377) @ org.apache.spark.sql.dataset$$anonfun$head$1.apply(dataset.scala:2113) @ org.apache.spark.sql.dataset$$anonfun$head$1.apply(dataset.scala:2112) @ org.apache.spark.sql.dataset.withtypedcallback(dataset.scala:2795) @ org.apache.spark.sql.dataset.head(dataset.scala:2112) @ org.apache.spark.sql.dataset.take(dataset.scala:2327) @ org.apache.spark.sql.dataset.showstring(dataset.scala:248) @ sun.reflect.generatedmethodaccessor83.invoke(unknown source) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:498) @ py4j.reflection.methodinvoker.invoke(methodinvoker.java:244) @ py4j.reflection.reflectionengine.invoke(reflectionengine.java:357) @ py4j.gateway.invoke(gateway.java:280) @ py4j.commands.abstractcommand.invokemethod(abstractcommand.java:132) @ py4j.commands.callcommand.execute(callcommand.java:79) @ py4j.gatewayconnection.run(gatewayconnection.java:214) @ java.lang.thread.run(thread.java:745) caused by: org.apache.spark.sql.catalyst.errors.package$treenodeexception: binding attribute, tree: null @ org.apache.spark.sql.catalyst.errors.package$.attachtree(package.scala:56) @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1.applyorelse(boundattribute.scala:88) @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1.applyorelse(boundattribute.scala:87) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:288) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:288) @ org.apache.spark.sql.catalyst.trees.currentorigin$.withorigin(treenode.scala:70) @ org.apache.spark.sql.catalyst.trees.treenode.transformdown(treenode.scala:287) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$transformdown$1.apply(treenode.scala:293) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$transformdown$1.apply(treenode.scala:293) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$5$$anonfun$apply$11.apply(treenode.scala:360) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.immutable.list.foreach(list.scala:381) @ scala.collection.traversablelike$class.map(traversablelike.scala:234) @ scala.collection.immutable.list.map(list.scala:285) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$5.apply(treenode.scala:358) @ org.apache.spark.sql.catalyst.trees.treenode.mapproductiterator(treenode.scala:188) @ org.apache.spark.sql.catalyst.trees.treenode.transformchildren(treenode.scala:329) @ org.apache.spark.sql.catalyst.trees.treenode.transformdown(treenode.scala:293) @ org.apache.spark.sql.catalyst.trees.treenode.transform(treenode.scala:277) @ org.apache.spark.sql.catalyst.expressions.bindreferences$.bindreference(boundattribute.scala:87) @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$$anonfun$bind$1.apply(generatemutableprojection.scala:38) @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$$anonfun$bind$1.apply(generatemutableprojection.scala:38) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:48) @ scala.collection.traversablelike$class.map(traversablelike.scala:234) @ scala.collection.abstracttraversable.map(traversable.scala:104) @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$.bind(generatemutableprojection.scala:38) @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$.generate(generatemutableprojection.scala:44) @ org.apache.spark.sql.execution.sparkplan.newmutableprojection(sparkplan.scala:353) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$org$apache$spark$sql$execution$window$windowexec$$anonfun$$processor$1$1.apply(windowexec.scala:203) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$org$apache$spark$sql$execution$window$windowexec$$anonfun$$processor$1$1.apply(windowexec.scala:202) @ org.apache.spark.sql.execution.window.aggregateprocessor$.apply(aggregateprocessor.scala:98) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2.org$apache$spark$sql$execution$window$windowexec$$anonfun$$processor$1(windowexec.scala:198) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$6.apply(windowexec.scala:225) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$6.apply(windowexec.scala:222) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14$$anon$1$$anonfun$16.apply(windowexec.scala:318) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14$$anon$1$$anonfun$16.apply(windowexec.scala:318) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.indexedseqoptimized$class.foreach(indexedseqoptimized.scala:33) @ scala.collection.mutable.arrayops$ofref.foreach(arrayops.scala:186) @ scala.collection.traversablelike$class.map(traversablelike.scala:234) @ scala.collection.mutable.arrayops$ofref.map(arrayops.scala:186) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14$$anon$1.<init>(windowexec.scala:318) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14.apply(windowexec.scala:290) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14.apply(windowexec.scala:289) @ org.apache.spark.rdd.rdd$$anonfun$mappartitions$1$$anonfun$apply$23.apply(rdd.scala:796) @ org.apache.spark.rdd.rdd$$anonfun$mappartitions$1$$anonfun$apply$23.apply(rdd.scala:796) @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:287) @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:287) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:87) @ org.apache.spark.scheduler.task.run(task.scala:99) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:282) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) ... 1 more caused by: java.lang.runtimeexception: couldn't find amtpaidcumsum#2076 in [sum#2299,max#2300,x#2066l,amtpaid#2067] @ scala.sys.package$.error(package.scala:27) @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1$$anonfun$applyorelse$1.apply(boundattribute.scala:94) @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1$$anonfun$applyorelse$1.apply(boundattribute.scala:88) @ org.apache.spark.sql.catalyst.errors.package$.attachtree(package.scala:52) ... 62 more
it appears there issue window operator support in spark 2.1 , 2.2.0-snapshot (built today master). see following in scala.
think should report issue in spark's jira.
val inventory = seq( (1, 2.0), (1, 3.0), (1, 1.0), (1, -2.0), (1, -1.0)).todf("x", "amtpaid") scala> inventory.printschema root |-- x: integer (nullable = false) |-- amtpaid: double (nullable = false) import org.apache.spark.sql.expressions.window val byxwithallrowsbefore = window.partitionby("x").rowsbetween(window.unboundedpreceding, window.currentrow) import org.apache.spark.sql.functions.sum val sumoveramtpaid = inventory.withcolumn("amtpaidcumsum", sum($"amtpaid") on byxwithallrowsbefore) scala> sumoveramtpaid.show +---+-------+-------------+ | x|amtpaid|amtpaidcumsum| +---+-------+-------------+ | 1| 2.0| 2.0| | 1| 3.0| 5.0| | 1| 1.0| 6.0| | 1| -2.0| 4.0| | 1| -1.0| 3.0| +---+-------+-------------+ scala> sumoveramtpaid.printschema root |-- x: integer (nullable = false) |-- amtpaid: double (nullable = false) |-- amtpaidcumsum: double (nullable = true)
so far good. in python.
cumulative max
the following won't work due java.lang.runtimeexception
.
import org.apache.spark.sql.functions.max val cumulativemax = sumoveramtpaid .withcolumn("amtpaidcumsummax", max($"amtpaidcumsum") on byxwithallrowsbefore) scala> cumulativemax.show 17/03/24 22:12:16 error executor: exception in task 0.0 in stage 11.0 (tid 210) org.apache.spark.sql.catalyst.errors.package$treenodeexception: binding attribute, tree: amtpaidcumsum#11 @ org.apache.spark.sql.catalyst.errors.package$.attachtree(package.scala:56) @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1.applyorelse(boundattribute.scala:88) @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1.applyorelse(boundattribute.scala:87) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$2.apply(treenode.scala:267) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$2.apply(treenode.scala:267) @ org.apache.spark.sql.catalyst.trees.currentorigin$.withorigin(treenode.scala:70) @ org.apache.spark.sql.catalyst.trees.treenode.transformdown(treenode.scala:266) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$transformdown$1.apply(treenode.scala:272) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$transformdown$1.apply(treenode.scala:272) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$4$$anonfun$apply$11.apply(treenode.scala:335) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.immutable.list.foreach(list.scala:381) @ scala.collection.traversablelike$class.map(traversablelike.scala:234) @ scala.collection.immutable.list.map(list.scala:285) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$4.apply(treenode.scala:333) @ org.apache.spark.sql.catalyst.trees.treenode.mapproductiterator(treenode.scala:187) @ org.apache.spark.sql.catalyst.trees.treenode.mapchildren(treenode.scala:304) @ org.apache.spark.sql.catalyst.trees.treenode.transformdown(treenode.scala:272) @ org.apache.spark.sql.catalyst.trees.treenode.transform(treenode.scala:256) @ org.apache.spark.sql.catalyst.expressions.bindreferences$.bindreference(boundattribute.scala:87) @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$$anonfun$bind$1.apply(generatemutableprojection.scala:38) @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$$anonfun$bind$1.apply(generatemutableprojection.scala:38) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:48) @ scala.collection.traversablelike$class.map(traversablelike.scala:234) @ scala.collection.abstracttraversable.map(traversable.scala:104) @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$.bind(generatemutableprojection.scala:38) @ org.apache.spark.sql.catalyst.expressions.codegen.generatemutableprojection$.generate(generatemutableprojection.scala:44) @ org.apache.spark.sql.execution.sparkplan.newmutableprojection(sparkplan.scala:353) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$org$apache$spark$sql$execution$window$windowexec$$anonfun$$processor$1$1.apply(windowexec.scala:201) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$org$apache$spark$sql$execution$window$windowexec$$anonfun$$processor$1$1.apply(windowexec.scala:200) @ org.apache.spark.sql.execution.window.aggregateprocessor$.apply(aggregateprocessor.scala:98) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2.org$apache$spark$sql$execution$window$windowexec$$anonfun$$processor$1(windowexec.scala:196) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$6.apply(windowexec.scala:223) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$windowframeexpressionfactorypairs$2$$anonfun$6.apply(windowexec.scala:220) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14$$anon$1$$anonfun$16.apply(windowexec.scala:319) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14$$anon$1$$anonfun$16.apply(windowexec.scala:319) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.indexedseqoptimized$class.foreach(indexedseqoptimized.scala:33) @ scala.collection.mutable.arrayops$ofref.foreach(arrayops.scala:186) @ scala.collection.traversablelike$class.map(traversablelike.scala:234) @ scala.collection.mutable.arrayops$ofref.map(arrayops.scala:186) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14$$anon$1.<init>(windowexec.scala:319) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14.apply(windowexec.scala:289) @ org.apache.spark.sql.execution.window.windowexec$$anonfun$14.apply(windowexec.scala:288) @ org.apache.spark.rdd.rdd$$anonfun$mappartitions$1$$anonfun$apply$23.apply(rdd.scala:797) @ org.apache.spark.rdd.rdd$$anonfun$mappartitions$1$$anonfun$apply$23.apply(rdd.scala:797) @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:287) @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:287) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:87) @ org.apache.spark.scheduler.task.run(task.scala:108) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:320) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745) caused by: java.lang.runtimeexception: couldn't find amtpaidcumsum#11 in [sum#234,max#235,x#5,amtpaid#6] @ scala.sys.package$.error(package.scala:27) @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1$$anonfun$applyorelse$1.apply(boundattribute.scala:94) @ org.apache.spark.sql.catalyst.expressions.bindreferences$$anonfun$bindreference$1$$anonfun$applyorelse$1.apply(boundattribute.scala:88) @ org.apache.spark.sql.catalyst.errors.package$.attachtree(package.scala:52) ... 62 more
the runtimeexception says:
couldn't find amtpaidcumsum#11 in [sum#234,max#235,x#5,amtpaid#6]
it appears there's sum
column, isn't it? let's use instead of $"amtpaidcumsum"
in max
.
this time spark reports analysisexception
includes amtpaidcumsum
column (!)
org.apache.spark.sql.analysisexception: cannot resolve '
sum
' given input columns: [x, amtpaid, amtpaidcumsum];;
scala> val cumulativemax = sumoveramtpaid.withcolumn("amtpaidcumsummax", max($"sum") on byxwithallrowsbefore) org.apache.spark.sql.analysisexception: cannot resolve '`sum`' given input columns: [x, amtpaid, amtpaidcumsum];; 'project [x#5, amtpaid#6, amtpaidcumsum#11, max('sum) windowspecdefinition(x#5, rows between unbounded preceding , current row) amtpaidcumsummax#237] +- project [x#5, amtpaid#6, amtpaidcumsum#11] +- project [x#5, amtpaid#6, amtpaidcumsum#11, amtpaidcumsum#11] +- window [sum(amtpaid#6) windowspecdefinition(x#5, rows between unbounded preceding , current row) amtpaidcumsum#11], [x#5] +- project [x#5, amtpaid#6] +- project [_1#2 x#5, _2#3 amtpaid#6] +- localrelation [_1#2, _2#3] @ org.apache.spark.sql.catalyst.analysis.package$analysiserrorat.failanalysis(package.scala:42) @ org.apache.spark.sql.catalyst.analysis.checkanalysis$$anonfun$checkanalysis$1$$anonfun$apply$2.applyorelse(checkanalysis.scala:89) @ org.apache.spark.sql.catalyst.analysis.checkanalysis$$anonfun$checkanalysis$1$$anonfun$apply$2.applyorelse(checkanalysis.scala:86) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$transformup$1.apply(treenode.scala:289) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$transformup$1.apply(treenode.scala:289) @ org.apache.spark.sql.catalyst.trees.currentorigin$.withorigin(treenode.scala:70) @ org.apache.spark.sql.catalyst.trees.treenode.transformup(treenode.scala:288) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:286) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:286) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$4.apply(treenode.scala:306) @ org.apache.spark.sql.catalyst.trees.treenode.mapproductiterator(treenode.scala:187) @ org.apache.spark.sql.catalyst.trees.treenode.mapchildren(treenode.scala:304) @ org.apache.spark.sql.catalyst.trees.treenode.transformup(treenode.scala:286) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:286) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:286) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$4.apply(treenode.scala:306) @ org.apache.spark.sql.catalyst.trees.treenode.mapproductiterator(treenode.scala:187) @ org.apache.spark.sql.catalyst.trees.treenode.mapchildren(treenode.scala:304) @ org.apache.spark.sql.catalyst.trees.treenode.transformup(treenode.scala:286) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:286) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:286) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$4.apply(treenode.scala:306) @ org.apache.spark.sql.catalyst.trees.treenode.mapproductiterator(treenode.scala:187) @ org.apache.spark.sql.catalyst.trees.treenode.mapchildren(treenode.scala:304) @ org.apache.spark.sql.catalyst.trees.treenode.transformup(treenode.scala:286) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:286) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$3.apply(treenode.scala:286) @ org.apache.spark.sql.catalyst.trees.treenode$$anonfun$4.apply(treenode.scala:306) @ org.apache.spark.sql.catalyst.trees.treenode.mapproductiterator(treenode.scala:187) @ org.apache.spark.sql.catalyst.trees.treenode.mapchildren(treenode.scala:304) @ org.apache.spark.sql.catalyst.trees.treenode.transformup(treenode.scala:286) @ org.apache.spark.sql.catalyst.plans.queryplan$$anonfun$transformexpressionsup$1.apply(queryplan.scala:256) @ org.apache.spark.sql.catalyst.plans.queryplan$$anonfun$transformexpressionsup$1.apply(queryplan.scala:256) @ org.apache.spark.sql.catalyst.plans.queryplan.transformexpression$1(queryplan.scala:267) @ org.apache.spark.sql.catalyst.plans.queryplan.org$apache$spark$sql$catalyst$plans$queryplan$$recursivetransform$1(queryplan.scala:277) @ org.apache.spark.sql.catalyst.plans.queryplan$$anonfun$org$apache$spark$sql$catalyst$plans$queryplan$$recursivetransform$1$1.apply(queryplan.scala:281) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:48) @ scala.collection.traversablelike$class.map(traversablelike.scala:234) @ scala.collection.abstracttraversable.map(traversable.scala:104) @ org.apache.spark.sql.catalyst.plans.queryplan.org$apache$spark$sql$catalyst$plans$queryplan$$recursivetransform$1(queryplan.scala:281) @ org.apache.spark.sql.catalyst.plans.queryplan$$anonfun$6.apply(queryplan.scala:286) @ org.apache.spark.sql.catalyst.trees.treenode.mapproductiterator(treenode.scala:187) @ org.apache.spark.sql.catalyst.plans.queryplan.mapexpressions(queryplan.scala:286) @ org.apache.spark.sql.catalyst.plans.queryplan.transformexpressionsup(queryplan.scala:256) @ org.apache.spark.sql.catalyst.analysis.checkanalysis$$anonfun$checkanalysis$1.apply(checkanalysis.scala:86) @ org.apache.spark.sql.catalyst.analysis.checkanalysis$$anonfun$checkanalysis$1.apply(checkanalysis.scala:79) @ org.apache.spark.sql.catalyst.trees.treenode.foreachup(treenode.scala:127) @ org.apache.spark.sql.catalyst.analysis.checkanalysis$class.checkanalysis(checkanalysis.scala:79) @ org.apache.spark.sql.catalyst.analysis.analyzer.checkanalysis(analyzer.scala:90) @ org.apache.spark.sql.execution.queryexecution.assertanalyzed(queryexecution.scala:53) @ org.apache.spark.sql.dataset$.ofrows(dataset.scala:67) @ org.apache.spark.sql.dataset.org$apache$spark$sql$dataset$$withplan(dataset.scala:2832) @ org.apache.spark.sql.dataset.select(dataset.scala:1137) @ org.apache.spark.sql.dataset.withcolumn(dataset.scala:1882) ... 48 elided
Excellent blog post. I certainly appreciate this website. Stick with it!
ReplyDeleteselenium online trainings
selenium trainings