i have problem update state in topology (local mode) - want write result in database:
run topology way:
tridenttopology topology = new tridenttopology(); statefactory statefactorydb = new locationdbfactory(); messagesubmittedopaquetransspout spout = new messagesubmittedopaquetransspout(); stream globalsubmittedmessagestream = topology.newstream("globalsubmittedmessagestream", spout).parallelismhint(1); globalsubmittedmessagestream .project(new fields("stormtxid","bookingid", "timestamp", "messagessubmitted", "messagesfailed")) .each(new fields("bookingid"), new filternotzero() ) .each(new fields("timestamp"), new filternull()) .each(new fields("bookingid", "messagessubmitted"),new printfilter("success submitted - start: ")) .partitionpersist( statefactorydb, new fields("stormtxid","bookingid", "timestamp", "messagesfailed", "messagessubmitted"), new locationdbstateupdater(), new fields("test") );
data correctly got database (by messagesubmittedopaquetransspout) , emitted each filter data first transactions saved statefactorydb (locationdbfactory) partitionpersist, non transactions updated.
code updatestate methods locationdbstateupdater class:
public void updatestate(locationdb state, list<tridenttuple> tuples,tridentcollector collector) { for(tridenttuple tuple : tuples){ log.debug("^%*&^$&^#&%$@#^%$#^%$*&^%(&^%(&^%(&%(&%& > {} ", tuple); } state.save(tuples, collector); }
code locationdb class:
public void begincommit(long txid) { log.info("begincommit...... txid: {}", txid); } @override public void commit(long txid) { log.info("commit...... txid: {}", txid); } public void save(list<tridenttuple> tuples, tridentcollector collector) { log.info("save ......... start"); for(tridenttuple tuple : tuples){ log.info("#@#@@@@@@@@@@@@@@@@@@@@@@@@@ test tuple: {}", tuple); } for(tridenttuple t : tuples){ collector.emit(new values("true")); } log.info("save ......... end"); }
logs:
6056 [thread-10] info backtype.storm.daemon.worker - worker f818dc1b-0f34-43f8-b70a-a12afaef9d88 storm test-topology-1-1373534952 on cbe0de9a-4cef-483f-bc11-77a15b0ad6f9:4 has finished loading 6073 [thread-23] info c.n.c.f.imps.curatorframeworkimpl - starting 0 [nioservercxn.factory:0.0.0.0/0.0.0.0:2000] warn org.apache.zookeeper.server.nioservercnxn - endofstreamexception: unable read additional data client sessionid 0x13fcd101957000d, client has closed socket 6098 [thread-23] info backtype.storm.daemon.executor - prepared bolt spout0:(5) 6098 [thread-17] info backtype.storm.daemon.executor - prepared bolt $spoutcoord-spout0:(2) 6106 [thread-25] info c.n.c.f.imps.curatorframeworkimpl - starting 32 [nioservercxn.factory:0.0.0.0/0.0.0.0:2000] warn org.apache.zookeeper.server.nioservercnxn - endofstreamexception: unable read additional data client sessionid 0x13fcd101957000f, client has closed socket 6120 [thread-25] info c.a.s.messagesubmittedopaquetransspout - getcoordinator !!! 6120 [thread-25] info backtype.storm.daemon.executor - opened spout $mastercoord-bg0:(1) 6122 [thread-25] info backtype.storm.daemon.executor - activating spout $mastercoord-bg0:(1) 6122 [thread-25] info c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 1 !!! 6225 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - opening jdbc connection 6237 [thread-25] info c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 2 !!! 6288 [thread-25] info c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 3 !!! 6341 [thread-25] info c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 4 !!! 6460 [thread-25] info c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 5 !!! 6511 [thread-25] info c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 6 !!! 6562 [thread-25] info c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 7 !!! 6670 [thread-25] info c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 8 !!! 6725 [thread-25] info c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 9 !!! 6776 [thread-25] info c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 10 !!! 6828 [thread-25] info c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 11 !!! 6879 [thread-25] info c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 12 !!! 6931 [thread-25] info c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 13 !!! 6982 [thread-25] info c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 14 !!! 7034 [thread-25] info c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 15 !!! 7085 [thread-25] info c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 16 !!! 7136 [thread-25] info c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 17 !!! 7187 [thread-25] info c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 18 !!! 7238 [thread-25] info c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 19 !!! 7289 [thread-25] info c.a.s.messagesubmittedopaquetransspout - coordinator - isready method, txid: 20 !!! 8238 [thread-23] debug o.a.i.d.pooled.pooleddatasource - created connection 19739814. 8391 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ooo using connection [com.mysql.jdbc.jdbc4connection@12d34a6] 8392 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ==> preparing: select max(id) max_id msisdn_status 8446 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ==> parameters: 8619 [thread-23] info c.archermobile.storm.statusmanager - max id: 2902288 8620 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - resetting autocommit true on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6] 8773 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - closing jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6] 8773 [thread-23] debug o.a.i.d.pooled.pooleddatasource - returned connection 19739814 pool. 8774 [thread-23] info c.a.s.messagesubmittedopaquetransspout - emitpartitionbatch, startid: 0, endid: 50, max id: 2902288, transactionattempt: 1:0, partition: 0 8779 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - opening jdbc connection 8779 [thread-23] debug o.a.i.d.pooled.pooleddatasource - checked out connection 19739814 pool. 8780 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - setting autocommit false on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6] 8931 [thread-23] debug c.a.s.d.j.getsubmittedstatistics - ooo using connection [com.mysql.jdbc.jdbc4connection@12d34a6] 8931 [thread-23] debug c.a.s.d.j.getsubmittedstatistics - ==> preparing: select sum(f.messages_submitted) messages_submitted, sum(f.messages_failed) messages_failed, fb.booking_id,f.classification_id,f.submission_account_id, f.timestamp ( select count(if(msisdn_status.status >= 0, 1, null)) messages_submitted, count(if(msisdn_status.status < 0, 1, null)) messages_failed, msisdn_status.flight_id, date(msisdn_status.timestamp) timestamp, msisdn_status.classification_id, msisdn_status.submission_account_id msisdn_status ( msisdn_status.id > ? , msisdn_status.id <= ? , msisdn_status.timestamp not null) group flight_id, timestamp, classification_id, submission_account_id ) f left join jupiter.flight_reference fr on f.flight_id = fr.flight_id left join jupiterbs.flightboard fb on fb.flight_reference = fr.flight_reference group f.timestamp, fb.booking_id, f.classification_id, f.submission_account_id; 8932 [thread-23] debug c.a.s.d.j.getsubmittedstatistics - ==> parameters: 0(long), 50(long) 9141 [thread-23] info c.archermobile.storm.statusmanager - submitted list [msisdnstatus{29, 2011-01-27 00:00:00.0, -1, -1, 6, 0}, msisdnstatus{29, 2011-01-27 00:00:00.0, 1, -1, 0, 7}, msisdnstatus{29, 2011-01-27 00:00:00.0, 2, -1, 0, 2}, msisdnstatus{29, 2011-01-28 00:00:00.0, -1, -1, 23, 0}, msisdnstatus{29, 2011-01-28 00:00:00.0, 1, -1, 0, 8}, msisdnstatus{29, 2011-01-28 00:00:00.0, 2, -1, 0, 4}] 9141 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - resetting autocommit true on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6] 9292 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - closing jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6] 9292 [thread-23] debug o.a.i.d.pooled.pooleddatasource - returned connection 19739814 pool. 9295 [thread-21] info c.a.storm.utils.printfilter - success submitted - start: > [29, 0] 9295 [thread-21] info c.a.storm.utils.printfilter - success submitted - start: > [29, 7] 9295 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - opening jdbc connection 9295 [thread-21] info c.a.storm.utils.printfilter - success submitted - start: > [29, 2] 9296 [thread-23] debug o.a.i.d.pooled.pooleddatasource - checked out connection 19739814 pool. 9296 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - setting autocommit false on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6] 9296 [thread-21] info c.a.storm.utils.printfilter - success submitted - start: > [29, 0] 9296 [thread-21] info c.a.storm.utils.printfilter - success submitted - start: > [29, 8] 9296 [thread-21] info c.a.storm.utils.printfilter - success submitted - start: > [29, 4] 9298 [thread-21] info c.archermobile.storm.db.locationdb - begincommit...... txid: 1 9298 [thread-21] debug c.a.storm.db.locationdbstateupdater - ^%*&^$&^#&%$@#^%$#^%$*&^%(&^%(&^%(&%(&%& > [1, 29, 2011-01-27 00:00:00.0, 6, 0] 9299 [thread-21] debug c.a.storm.db.locationdbstateupdater - ^%*&^$&^#&%$@#^%$#^%$*&^%(&^%(&^%(&%(&%& > [1, 29, 2011-01-27 00:00:00.0, 0, 7] 9299 [thread-21] debug c.a.storm.db.locationdbstateupdater - ^%*&^$&^#&%$@#^%$#^%$*&^%(&^%(&^%(&%(&%& > [1, 29, 2011-01-27 00:00:00.0, 0, 2] 9299 [thread-21] debug c.a.storm.db.locationdbstateupdater - ^%*&^$&^#&%$@#^%$#^%$*&^%(&^%(&^%(&%(&%& > [1, 29, 2011-01-28 00:00:00.0, 23, 0] 9299 [thread-21] debug c.a.storm.db.locationdbstateupdater - ^%*&^$&^#&%$@#^%$#^%$*&^%(&^%(&^%(&%(&%& > [1, 29, 2011-01-28 00:00:00.0, 0, 8] 9299 [thread-21] debug c.a.storm.db.locationdbstateupdater - ^%*&^$&^#&%$@#^%$#^%$*&^%(&^%(&^%(&%(&%& > [1, 29, 2011-01-28 00:00:00.0, 0, 4] 9299 [thread-21] info c.archermobile.storm.db.locationdb - save ......... start 9300 [thread-21] info c.archermobile.storm.db.locationdb - #@#@@@@@@@@@@@@@@@@@@@@@@@@@ test tuple: [1, 29, 2011-01-27 00:00:00.0, 6, 0] 9300 [thread-21] info c.archermobile.storm.db.locationdb - #@#@@@@@@@@@@@@@@@@@@@@@@@@@ test tuple: [1, 29, 2011-01-27 00:00:00.0, 0, 7] 9300 [thread-21] info c.archermobile.storm.db.locationdb - #@#@@@@@@@@@@@@@@@@@@@@@@@@@ test tuple: [1, 29, 2011-01-27 00:00:00.0, 0, 2] 9300 [thread-21] info c.archermobile.storm.db.locationdb - #@#@@@@@@@@@@@@@@@@@@@@@@@@@ test tuple: [1, 29, 2011-01-28 00:00:00.0, 23, 0] 9300 [thread-21] info c.archermobile.storm.db.locationdb - #@#@@@@@@@@@@@@@@@@@@@@@@@@@ test tuple: [1, 29, 2011-01-28 00:00:00.0, 0, 8] 9300 [thread-21] info c.archermobile.storm.db.locationdb - #@#@@@@@@@@@@@@@@@@@@@@@@@@@ test tuple: [1, 29, 2011-01-28 00:00:00.0, 0, 4] 9301 [thread-21] info c.archermobile.storm.db.locationdb - save ......... end 9301 [thread-21] info c.archermobile.storm.db.locationdb - commit...... txid: 1 9447 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ooo using connection [com.mysql.jdbc.jdbc4connection@12d34a6] 9447 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ==> preparing: select max(id) max_id msisdn_status 9447 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ==> parameters: 9599 [thread-23] info c.archermobile.storm.statusmanager - max id: 2902288 9599 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - resetting autocommit true on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6] 9750 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - closing jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6] 9750 [thread-23] debug o.a.i.d.pooled.pooleddatasource - returned connection 19739814 pool. 9751 [thread-23] info c.a.s.messagesubmittedopaquetransspout - emitpartitionbatch, startid: 50, endid: 100, max id: 2902288, transactionattempt: 2:0, partition: 0 9751 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - opening jdbc connection 9751 [thread-23] debug o.a.i.d.pooled.pooleddatasource - checked out connection 19739814 pool. 9751 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - setting autocommit false on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6] 9902 [thread-23] debug c.a.s.d.j.getsubmittedstatistics - ooo using connection [com.mysql.jdbc.jdbc4connection@12d34a6] 9902 [thread-23] debug c.a.s.d.j.getsubmittedstatistics - ==> preparing: select sum(f.messages_submitted) messages_submitted, sum(f.messages_failed) messages_failed, fb.booking_id,f.classification_id,f.submission_account_id, f.timestamp ( select count(if(msisdn_status.status >= 0, 1, null)) messages_submitted, count(if(msisdn_status.status < 0, 1, null)) messages_failed, msisdn_status.flight_id, date(msisdn_status.timestamp) timestamp, msisdn_status.classification_id, msisdn_status.submission_account_id msisdn_status ( msisdn_status.id > ? , msisdn_status.id <= ? , msisdn_status.timestamp not null) group flight_id, timestamp, classification_id, submission_account_id ) f left join jupiter.flight_reference fr on f.flight_id = fr.flight_id left join jupiterbs.flightboard fb on fb.flight_reference = fr.flight_reference group f.timestamp, fb.booking_id, f.classification_id, f.submission_account_id; 9903 [thread-23] debug c.a.s.d.j.getsubmittedstatistics - ==> parameters: 50(long), 100(long) 10087 [thread-23] info c.archermobile.storm.statusmanager - submitted list [msisdnstatus{29, 2011-01-28 00:00:00.0, -1, -1, 35, 0}, msisdnstatus{29, 2011-01-28 00:00:00.0, 1, -1, 0, 6}, msisdnstatus{29, 2011-01-28 00:00:00.0, 2, -1, 0, 5}, msisdnstatus{29, 2011-01-28 00:00:00.0, 11, -1, 4, 0}] 10087 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - resetting autocommit true on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6] 10238 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - closing jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6] 10238 [thread-23] debug o.a.i.d.pooled.pooleddatasource - returned connection 19739814 pool. 10239 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - opening jdbc connection 10239 [thread-21] info c.a.storm.utils.printfilter - success submitted - start: > [29, 0] 10239 [thread-23] debug o.a.i.d.pooled.pooleddatasource - checked out connection 19739814 pool. 10239 [thread-21] info c.a.storm.utils.printfilter - success submitted - start: > [29, 6] 10239 [thread-21] info c.a.storm.utils.printfilter - success submitted - start: > [29, 5] 10239 [thread-21] info c.a.storm.utils.printfilter - success submitted - start: > [29, 0] 10239 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - setting autocommit false on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6] 10393 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ooo using connection [com.mysql.jdbc.jdbc4connection@12d34a6] 10393 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ==> preparing: select max(id) max_id msisdn_status 10393 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ==> parameters: 10546 [thread-23] info c.archermobile.storm.statusmanager - max id: 2902288 10546 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - resetting autocommit true on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6] 10698 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - closing jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6] 10699 [thread-23] debug o.a.i.d.pooled.pooleddatasource - returned connection 19739814 pool. 10699 [thread-23] info c.a.s.messagesubmittedopaquetransspout - emitpartitionbatch, startid: 100, endid: 150, max id: 2902288, transactionattempt: 3:0, partition: 0 10699 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - opening jdbc connection 10699 [thread-23] debug o.a.i.d.pooled.pooleddatasource - checked out connection 19739814 pool. 10699 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - setting autocommit false on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6] 10853 [thread-23] debug c.a.s.d.j.getsubmittedstatistics - ooo using connection [com.mysql.jdbc.jdbc4connection@12d34a6] 10854 [thread-23] debug c.a.s.d.j.getsubmittedstatistics - ==> preparing: select sum(f.messages_submitted) messages_submitted, sum(f.messages_failed) messages_failed, fb.booking_id,f.classification_id,f.submission_account_id, f.timestamp ( select count(if(msisdn_status.status >= 0, 1, null)) messages_submitted, count(if(msisdn_status.status < 0, 1, null)) messages_failed, msisdn_status.flight_id, date(msisdn_status.timestamp) timestamp, msisdn_status.classification_id, msisdn_status.submission_account_id msisdn_status ( msisdn_status.id > ? , msisdn_status.id <= ? , msisdn_status.timestamp not null) group flight_id, timestamp, classification_id, submission_account_id ) f left join jupiter.flight_reference fr on f.flight_id = fr.flight_id left join jupiterbs.flightboard fb on fb.flight_reference = fr.flight_reference group f.timestamp, fb.booking_id, f.classification_id, f.submission_account_id; 10854 [thread-23] debug c.a.s.d.j.getsubmittedstatistics - ==> parameters: 100(long), 150(long) 11036 [thread-23] info c.archermobile.storm.statusmanager - submitted list [msisdnstatus{29, 2011-01-28 00:00:00.0, -1, -1, 39, 0}, msisdnstatus{29, 2011-01-28 00:00:00.0, 1, -1, 0, 4}, msisdnstatus{29, 2011-01-28 00:00:00.0, 2, -1, 0, 1}, msisdnstatus{29, 2011-01-28 00:00:00.0, 11, -1, 4, 2}] 11036 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - resetting autocommit true on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6] 11188 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - closing jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6] 11189 [thread-23] debug o.a.i.d.pooled.pooleddatasource - returned connection 19739814 pool. 11189 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - opening jdbc connection 11189 [thread-23] debug o.a.i.d.pooled.pooleddatasource - checked out connection 19739814 pool. 11189 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - setting autocommit false on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6] 11190 [thread-21] info c.a.storm.utils.printfilter - success submitted - start: > [29, 0] 11190 [thread-21] info c.a.storm.utils.printfilter - success submitted - start: > [29, 4] 11190 [thread-21] info c.a.storm.utils.printfilter - success submitted - start: > [29, 1] 11190 [thread-21] info c.a.storm.utils.printfilter - success submitted - start: > [29, 2] 11340 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ooo using connection [com.mysql.jdbc.jdbc4connection@12d34a6] 11341 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ==> preparing: select max(id) max_id msisdn_status 11341 [thread-23] debug c.a.s.d.j.getmaxsubmittedid - ==> parameters: 11494 [thread-23] info c.archermobile.storm.statusmanager - max id: 2902288 11494 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - resetting autocommit true on jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6] 11646 [thread-23] debug o.a.i.t.jdbc.jdbctransaction - closing jdbc connection [com.mysql.jdbc.jdbc4connection@12d34a6] 11646 [thread-23] debug o.a.i.d.pooled.pooleddatasource - returned connection 19739814 pool. ..............
on beginning of logs can see additional information like:
0 [nioservercxn.factory:0.0.0.0/0.0.0.0:2000] warn org.apache.zookeeper.server.nioservercnxn - endofstreamexception: unable read additional data client sessionid 0x13fcd101957000d, client has closed socket
my problem can connected warning? do wrong? maybe there way store result in database?
Comments
Post a Comment