python - Make database I/O reliable in multithreaded program -


first, apologies posting entire block of code. think it's important able answer (broad) question.

i have application. collects data (eventually external source, randomly generated strings now) , logs mysql database single-column table.

there 2 threads plus main thread. 1 thread collects data (sim_collectdata()) , puts in queue. second thread takes data queue , puts in db (logdata()).

my question concerns robustness , error-handling database i/o stuff. going running in remote location minimal human interaction extended periods of time, , needs pretty robust.

what can ensure database issues don't cause fail? it's not huge deal of record or 2 lost occasionally, needs able recover. mysql database server running locally, don't think need terribly worried losing connection (is valid assumption?). issues related failed write?

i'm curious: make sense logging thread start , control db connection, or should created in main thread , passed logging thread necessary?

def sim_collectdata(input_queue, stop_event):     ''' provides output simulating serial     data data logging hardware.      '''     while not stop_event.is_set():         # generate random string         lst = [random.choice(string.ascii_letters) n in xrange(random.randint(15,40))]         data = "".join(lst)          input_queue.put("data:" + data)          # wait random time         stop_event.wait(random.randint(1,5))      input_queue.put(none) # send signal telling logging thread we're done     print "[collection thread] terminated data collection."     return  def logdata(input_queue):      # make database connection     try:         con = mdb.connect('localhost', 'database_user', 'password', 'database_name');         cur = con.cursor()     except mdb.error, e:         print "error %d: %s" % (e.args[0],e.args[1])         sys.exit(1)      # log data     while true:         d = input_queue.get()         if d none:             input_queue.task_done()             print "[logging thread] finished logging."             con.close()             return          if d.startswith("data:"):             data = d[4:] # remove 'data:'             sql_query = "insert example(data) values('%s')" % data             con:                 cur = con.cursor(mdb.cursors.dictcursor)                 cur.execute(sql_query)                 input_queue.task_done()   def main():     input_queue = queue.queue()      # used signal termination thread     stop_event = threading.event()       # start logging , data collection threads         print "[main] starting data collection thread..."     collection_thread = threading.thread(target=sim_collectdata, args=(input_queue, stop_event))     collection_thread.start()     print "done."      print "[main] starting logging thread..."     logging_thread = threading.thread(target=logdata, args=(input_queue,))     logging_thread.start()     print "done."      # listen keyboard interrupts     try:         while true:             time.sleep(10)     except (keyboardinterrupt, systemexit):         # stop data collection. let logging thread finish logging in queue         stop_event.set()         collection_thread.join()         logging_thread.join()  main() 


Comments