java - How to update state - (store result in database) using storm -


i have problem update state in topology (local mode) - want write result in database:

run topology way:

tridenttopology topology = new tridenttopology();  statefactory statefactorydb = new locationdbfactory(); messagesubmittedopaquetransspout spout = new messagesubmittedopaquetransspout();  stream globalsubmittedmessagestream = topology.newstream("globalsubmittedmessagestream", spout).parallelismhint(1);  globalsubmittedmessagestream .project(new fields("stormtxid","bookingid", "timestamp", "messagessubmitted", "messagesfailed"))     .each(new fields("bookingid"), new filternotzero() )     .each(new fields("timestamp"), new filternull())     .each(new fields("bookingid", "messagessubmitted"),new printfilter("success submitted - start: "))     .partitionpersist(         statefactorydb,          new fields("stormtxid","bookingid", "timestamp", "messagesfailed", "messagessubmitted"),          new locationdbstateupdater(),         new fields("test")     ); 

data correctly got database (by messagesubmittedopaquetransspout) , emitted each filter data first transactions saved statefactorydb (locationdbfactory) partitionpersist, non transactions updated.

code updatestate methods locationdbstateupdater class:

public void updatestate(locationdb state, list<tridenttuple> tuples,tridentcollector collector) {  for(tridenttuple tuple : tuples){   log.debug("^%*&^$&^#&%$@#^%$#^%$*&^%(&^%(&^%(&%(&%&  > {} ", tuple);  }  state.save(tuples, collector); } 

code locationdb class:

public void begincommit(long txid) {  log.info("begincommit...... txid: {}", txid); }  @override public void commit(long txid) {  log.info("commit...... txid: {}", txid); }  public void save(list<tridenttuple> tuples, tridentcollector collector) {  log.info("save ......... start");        for(tridenttuple tuple : tuples){   log.info("#@#@@@@@@@@@@@@@@@@@@@@@@@@@ test tuple: {}", tuple);  }        for(tridenttuple t : tuples){   collector.emit(new values("true"));  }  log.info("save ......... end"); } 

logs:

6056 [thread-10] info  backtype.storm.daemon.worker - worker f818dc1b-0f34-43f8-b70a-a12afaef9d88 storm test-topology-1-1373534952 on cbe0de9a-4cef-483f-bc11-77a15b0ad6f9:4 has finished loading  6073 [thread-23] info  c.n.c.f.imps.curatorframeworkimpl - starting  0    [nioservercxn.factory:0.0.0.0/0.0.0.0:2000] warn  org.apache.zookeeper.server.nioservercnxn  - endofstreamexception: unable read additional data client sessionid 0x13fcd101957000d, client has closed socket 6098 [thread-23] info  backtype.storm.daemon.executor - prepared bolt spout0:(5)  6098 [thread-17] info  backtype.storm.daemon.executor - prepared bolt $spoutcoord-spout0:(2)  6106 [thread-25] info  c.n.c.f.imps.curatorframeworkimpl - starting  32   [nioservercxn.factory:0.0.0.0/0.0.0.0:2000] warn  org.apache.zookeeper.server.nioservercnxn  - endofstreamexception: unable read additional data client sessionid 0x13fcd101957000f, client has closed socket 6120 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - getcoordinator !!!  6120 [thread-25] info  backtype.storm.daemon.executor - opened spout $mastercoord-bg0:(1)  6122 [thread-25] info  backtype.storm.daemon.executor - activating spout $mastercoord-bg0:(1)  6122 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 1 !!!  6225 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - opening jdbc connection  6237 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 2 !!!  6288 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 3 !!!  6341 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 4 !!!  6460 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 5 !!!  6511 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 6 !!!  6562 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 7 !!!  6670 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 8 !!!  6725 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 9 !!!  6776 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 10 !!!  6828 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 11 !!!  6879 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 12 !!!  6931 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 13 !!!  6982 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 14 !!!  7034 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 15 !!!  7085 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 16 !!!  7136 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 17 !!!  7187 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 18 !!!  7238 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 19 !!!  7289 [thread-25] info  c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 20 !!!  8238 [thread-23] debug o.a.i.d.pooled.pooleddatasource - created connection 19739814.  8391 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ooo using connection [com.mysql.jdbc.jdbc4connection@12d34a6]  8392 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ==>  preparing: select max(id) max_id msisdn_status   8446 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ==> parameters:   8619 [thread-23] info  c.archermobile.storm.statusmanager - max id: 2902288  8620 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - resetting autocommit true on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6]  8773 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - closing jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6]  8773 [thread-23] debug o.a.i.d.pooled.pooleddatasource - returned connection 19739814 pool.  8774 [thread-23] info  c.a.s.messagesubmittedopaquetransspout - emitpartitionbatch, startid: 0, endid: 50, max id: 2902288, transactionattempt: 1:0, partition: 0   8779 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - opening jdbc connection  8779 [thread-23] debug o.a.i.d.pooled.pooleddatasource - checked out connection 19739814 pool.  8780 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - setting autocommit false on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6]  8931 [thread-23] debug c.a.s.d.j.getsubmittedstatistics - ooo using connection [com.mysql.jdbc.jdbc4connection@12d34a6]  8931 [thread-23] debug c.a.s.d.j.getsubmittedstatistics - ==>  preparing: select sum(f.messages_submitted) messages_submitted, sum(f.messages_failed) messages_failed, fb.booking_id,f.classification_id,f.submission_account_id, f.timestamp ( select count(if(msisdn_status.status >= 0, 1, null)) messages_submitted, count(if(msisdn_status.status < 0, 1, null)) messages_failed, msisdn_status.flight_id, date(msisdn_status.timestamp) timestamp, msisdn_status.classification_id, msisdn_status.submission_account_id msisdn_status ( msisdn_status.id > ? , msisdn_status.id <= ? , msisdn_status.timestamp not null) group flight_id, timestamp, classification_id, submission_account_id ) f left join jupiter.flight_reference fr on f.flight_id = fr.flight_id left join jupiterbs.flightboard fb on fb.flight_reference = fr.flight_reference group f.timestamp, fb.booking_id, f.classification_id, f.submission_account_id;   8932 [thread-23] debug c.a.s.d.j.getsubmittedstatistics - ==> parameters: 0(long), 50(long)  9141 [thread-23] info  c.archermobile.storm.statusmanager -  submitted list [msisdnstatus{29, 2011-01-27 00:00:00.0, -1, -1, 6, 0}, msisdnstatus{29, 2011-01-27 00:00:00.0, 1, -1, 0, 7}, msisdnstatus{29, 2011-01-27 00:00:00.0, 2, -1, 0, 2}, msisdnstatus{29, 2011-01-28 00:00:00.0, -1, -1, 23, 0}, msisdnstatus{29, 2011-01-28 00:00:00.0, 1, -1, 0, 8}, msisdnstatus{29, 2011-01-28 00:00:00.0, 2, -1, 0, 4}]  9141 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - resetting autocommit true on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6]  9292 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - closing jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6]  9292 [thread-23] debug o.a.i.d.pooled.pooleddatasource - returned connection 19739814 pool.  9295 [thread-21] info  c.a.storm.utils.printfilter - success submitted - start:  > [29, 0]   9295 [thread-21] info  c.a.storm.utils.printfilter - success submitted - start:  > [29, 7]   9295 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - opening jdbc connection  9295 [thread-21] info  c.a.storm.utils.printfilter - success submitted - start:  > [29, 2]   9296 [thread-23] debug o.a.i.d.pooled.pooleddatasource - checked out connection 19739814 pool.  9296 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - setting autocommit false on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6]  9296 [thread-21] info  c.a.storm.utils.printfilter - success submitted - start:  > [29, 0]   9296 [thread-21] info  c.a.storm.utils.printfilter - success submitted - start:  > [29, 8]   9296 [thread-21] info  c.a.storm.utils.printfilter - success submitted - start:  > [29, 4]   9298 [thread-21] info  c.archermobile.storm.db.locationdb - begincommit...... txid: 1  9298 [thread-21] debug c.a.storm.db.locationdbstateupdater - ^%*&^$&^#&%$@#^%$#^%$*&^%(&^%(&^%(&%(&%&  > [1, 29, 2011-01-27 00:00:00.0, 6, 0]   9299 [thread-21] debug c.a.storm.db.locationdbstateupdater - ^%*&^$&^#&%$@#^%$#^%$*&^%(&^%(&^%(&%(&%&  > [1, 29, 2011-01-27 00:00:00.0, 0, 7]   9299 [thread-21] debug c.a.storm.db.locationdbstateupdater - ^%*&^$&^#&%$@#^%$#^%$*&^%(&^%(&^%(&%(&%&  > [1, 29, 2011-01-27 00:00:00.0, 0, 2]   9299 [thread-21] debug c.a.storm.db.locationdbstateupdater - ^%*&^$&^#&%$@#^%$#^%$*&^%(&^%(&^%(&%(&%&  > [1, 29, 2011-01-28 00:00:00.0, 23, 0]   9299 [thread-21] debug c.a.storm.db.locationdbstateupdater - ^%*&^$&^#&%$@#^%$#^%$*&^%(&^%(&^%(&%(&%&  > [1, 29, 2011-01-28 00:00:00.0, 0, 8]   9299 [thread-21] debug c.a.storm.db.locationdbstateupdater - ^%*&^$&^#&%$@#^%$#^%$*&^%(&^%(&^%(&%(&%&  > [1, 29, 2011-01-28 00:00:00.0, 0, 4]   9299 [thread-21] info  c.archermobile.storm.db.locationdb - save ......... start  9300 [thread-21] info  c.archermobile.storm.db.locationdb - #@#@@@@@@@@@@@@@@@@@@@@@@@@@ test tuple: [1, 29, 2011-01-27 00:00:00.0, 6, 0]  9300 [thread-21] info  c.archermobile.storm.db.locationdb - #@#@@@@@@@@@@@@@@@@@@@@@@@@@ test tuple: [1, 29, 2011-01-27 00:00:00.0, 0, 7]  9300 [thread-21] info  c.archermobile.storm.db.locationdb - #@#@@@@@@@@@@@@@@@@@@@@@@@@@ test tuple: [1, 29, 2011-01-27 00:00:00.0, 0, 2]  9300 [thread-21] info  c.archermobile.storm.db.locationdb - #@#@@@@@@@@@@@@@@@@@@@@@@@@@ test tuple: [1, 29, 2011-01-28 00:00:00.0, 23, 0]  9300 [thread-21] info  c.archermobile.storm.db.locationdb - #@#@@@@@@@@@@@@@@@@@@@@@@@@@ test tuple: [1, 29, 2011-01-28 00:00:00.0, 0, 8]  9300 [thread-21] info  c.archermobile.storm.db.locationdb - #@#@@@@@@@@@@@@@@@@@@@@@@@@@ test tuple: [1, 29, 2011-01-28 00:00:00.0, 0, 4]  9301 [thread-21] info  c.archermobile.storm.db.locationdb - save ......... end  9301 [thread-21] info  c.archermobile.storm.db.locationdb - commit...... txid: 1  9447 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ooo using connection [com.mysql.jdbc.jdbc4connection@12d34a6]  9447 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ==>  preparing: select max(id) max_id msisdn_status   9447 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ==> parameters:   9599 [thread-23] info  c.archermobile.storm.statusmanager - max id: 2902288  9599 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - resetting autocommit true on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6]  9750 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - closing jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6]  9750 [thread-23] debug o.a.i.d.pooled.pooleddatasource - returned connection 19739814 pool.  9751 [thread-23] info  c.a.s.messagesubmittedopaquetransspout - emitpartitionbatch, startid: 50, endid: 100, max id: 2902288, transactionattempt: 2:0, partition: 0   9751 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - opening jdbc connection  9751 [thread-23] debug o.a.i.d.pooled.pooleddatasource - checked out connection 19739814 pool.  9751 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - setting autocommit false on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6]  9902 [thread-23] debug c.a.s.d.j.getsubmittedstatistics - ooo using connection [com.mysql.jdbc.jdbc4connection@12d34a6]  9902 [thread-23] debug c.a.s.d.j.getsubmittedstatistics - ==>  preparing: select sum(f.messages_submitted) messages_submitted, sum(f.messages_failed) messages_failed, fb.booking_id,f.classification_id,f.submission_account_id, f.timestamp ( select count(if(msisdn_status.status >= 0, 1, null)) messages_submitted, count(if(msisdn_status.status < 0, 1, null)) messages_failed, msisdn_status.flight_id, date(msisdn_status.timestamp) timestamp, msisdn_status.classification_id, msisdn_status.submission_account_id msisdn_status ( msisdn_status.id > ? , msisdn_status.id <= ? , msisdn_status.timestamp not null) group flight_id, timestamp, classification_id, submission_account_id ) f left join jupiter.flight_reference fr on f.flight_id = fr.flight_id left join jupiterbs.flightboard fb on fb.flight_reference = fr.flight_reference group f.timestamp, fb.booking_id, f.classification_id, f.submission_account_id;   9903 [thread-23] debug c.a.s.d.j.getsubmittedstatistics - ==> parameters: 50(long), 100(long)  10087 [thread-23] info  c.archermobile.storm.statusmanager -  submitted list [msisdnstatus{29, 2011-01-28 00:00:00.0, -1, -1, 35, 0}, msisdnstatus{29, 2011-01-28 00:00:00.0, 1, -1, 0, 6}, msisdnstatus{29, 2011-01-28 00:00:00.0, 2, -1, 0, 5}, msisdnstatus{29, 2011-01-28 00:00:00.0, 11, -1, 4, 0}]  10087 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - resetting autocommit true on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6]  10238 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - closing jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6]  10238 [thread-23] debug o.a.i.d.pooled.pooleddatasource - returned connection 19739814 pool.  10239 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - opening jdbc connection  10239 [thread-21] info  c.a.storm.utils.printfilter - success submitted - start:  > [29, 0]   10239 [thread-23] debug o.a.i.d.pooled.pooleddatasource - checked out connection 19739814 pool.  10239 [thread-21] info  c.a.storm.utils.printfilter - success submitted - start:  > [29, 6]   10239 [thread-21] info  c.a.storm.utils.printfilter - success submitted - start:  > [29, 5]   10239 [thread-21] info  c.a.storm.utils.printfilter - success submitted - start:  > [29, 0]   10239 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - setting autocommit false on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6]  10393 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ooo using connection [com.mysql.jdbc.jdbc4connection@12d34a6]  10393 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ==>  preparing: select max(id) max_id msisdn_status   10393 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ==> parameters:   10546 [thread-23] info  c.archermobile.storm.statusmanager - max id: 2902288  10546 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - resetting autocommit true on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6]  10698 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - closing jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6]  10699 [thread-23] debug o.a.i.d.pooled.pooleddatasource - returned connection 19739814 pool.  10699 [thread-23] info  c.a.s.messagesubmittedopaquetransspout - emitpartitionbatch, startid: 100, endid: 150, max id: 2902288, transactionattempt: 3:0, partition: 0   10699 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - opening jdbc connection  10699 [thread-23] debug o.a.i.d.pooled.pooleddatasource - checked out connection 19739814 pool.  10699 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - setting autocommit false on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6]  10853 [thread-23] debug c.a.s.d.j.getsubmittedstatistics - ooo using connection [com.mysql.jdbc.jdbc4connection@12d34a6]  10854 [thread-23] debug c.a.s.d.j.getsubmittedstatistics - ==>  preparing: select sum(f.messages_submitted) messages_submitted, sum(f.messages_failed) messages_failed, fb.booking_id,f.classification_id,f.submission_account_id, f.timestamp ( select count(if(msisdn_status.status >= 0, 1, null)) messages_submitted, count(if(msisdn_status.status < 0, 1, null)) messages_failed, msisdn_status.flight_id, date(msisdn_status.timestamp) timestamp, msisdn_status.classification_id, msisdn_status.submission_account_id msisdn_status ( msisdn_status.id > ? , msisdn_status.id <= ? , msisdn_status.timestamp not null) group flight_id, timestamp, classification_id, submission_account_id ) f left join jupiter.flight_reference fr on f.flight_id = fr.flight_id left join jupiterbs.flightboard fb on fb.flight_reference = fr.flight_reference group f.timestamp, fb.booking_id, f.classification_id, f.submission_account_id;   10854 [thread-23] debug c.a.s.d.j.getsubmittedstatistics - ==> parameters: 100(long), 150(long)  11036 [thread-23] info  c.archermobile.storm.statusmanager -  submitted list [msisdnstatus{29, 2011-01-28 00:00:00.0, -1, -1, 39, 0}, msisdnstatus{29, 2011-01-28 00:00:00.0, 1, -1, 0, 4}, msisdnstatus{29, 2011-01-28 00:00:00.0, 2, -1, 0, 1}, msisdnstatus{29, 2011-01-28 00:00:00.0, 11, -1, 4, 2}]  11036 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - resetting autocommit true on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6]  11188 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - closing jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6]  11189 [thread-23] debug o.a.i.d.pooled.pooleddatasource - returned connection 19739814 pool.  11189 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - opening jdbc connection  11189 [thread-23] debug o.a.i.d.pooled.pooleddatasource - checked out connection 19739814 pool.  11189 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - setting autocommit false on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6]  11190 [thread-21] info  c.a.storm.utils.printfilter - success submitted - start:  > [29, 0]   11190 [thread-21] info  c.a.storm.utils.printfilter - success submitted - start:  > [29, 4]   11190 [thread-21] info  c.a.storm.utils.printfilter - success submitted - start:  > [29, 1]   11190 [thread-21] info  c.a.storm.utils.printfilter - success submitted - start:  > [29, 2]   11340 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ooo using connection [com.mysql.jdbc.jdbc4connection@12d34a6]  11341 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ==>  preparing: select max(id) max_id msisdn_status   11341 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ==> parameters:   11494 [thread-23] info  c.archermobile.storm.statusmanager - max id: 2902288  11494 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - resetting autocommit true on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6]  11646 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - closing jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6]  11646 [thread-23] debug o.a.i.d.pooled.pooleddatasource - returned connection 19739814 pool.  .............. 

on beginning of logs can see additional information like:

0    [nioservercxn.factory:0.0.0.0/0.0.0.0:2000] warn  org.apache.zookeeper.server.nioservercnxn  - endofstreamexception: unable read additional data client sessionid 0x13fcd101957000d, client has closed socket 

my problem can connected warning? do wrong? maybe there way store result in database?


Comments