[hg] galaxy 1581: Purge metadata files associated with a dataset...

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[hg] galaxy 1581: Purge metadata files associated with a dataset...

Greg Von Kuster
details:   http://www.bx.psu.edu/hg/galaxy/rev/4841a9e393c7
changeset: 1581:4841a9e393c7
user:      Greg Von Kuster <[hidden email]>
date:      Tue Oct 28 14:31:02 2008 -0400
description:
Purge metadata files associated with a dataset when the dataset is purged.  Also remembered log.exception logs the exception, so corrected a few things in jobs.__init__.

2 file(s) affected in this change:

lib/galaxy/jobs/__init__.py
scripts/cleanup_datasets/cleanup_datasets.py

diffs (279 lines):

diff -r 91f6455e19e4 -r 4841a9e393c7 lib/galaxy/jobs/__init__.py
--- a/lib/galaxy/jobs/__init__.py Tue Oct 28 12:57:39 2008 -0400
+++ b/lib/galaxy/jobs/__init__.py Tue Oct 28 14:31:02 2008 -0400
@@ -62,7 +62,7 @@
                 else :
                     self.use_policy = False
                     log.info("Scheduler policy not defined as expected, defaulting to FIFO")
-            except AttributeError, detail : # try may throw AttributeError
+            except AttributeError, detail: # try may throw AttributeError
                 self.use_policy = False
                 log.exception("Error while loading scheduler policy class, defaulting to FIFO")
         else :
@@ -117,8 +117,8 @@
         while self.running:
             try:
                 self.monitor_step()
-            except Exception, e:
-                log.exception( "Exception in monitor_step: %s" % str( e ) )
+            except:
+                log.exception( "Exception in monitor_step" )
             # Sleep
             self.sleeper.sleep( 1 )
 
@@ -184,9 +184,8 @@
                     job.info = msg
                     log.error( msg )
             except Exception, e:
-                msg = "failure running job %d: %s" % ( job.job_id, str( e ) )
-                job.info = msg
-                log.exception( msg )
+                job.info = "failure running job %d: %s" % ( job.job_id, str( e ) )
+                log.exception( "failure running job %d" % job.job_id )
         # Update the waiting list
         self.waiting = new_waiting
         # If special (e.g. fair) scheduling is enabled, dispatch all jobs
@@ -201,9 +200,8 @@
                     # squeue is empty, so stop dispatching
                     break
                 except Exception, e: # if something else breaks while dispatching
-                    msg = "failure running job %d: %s" % ( sjob.job_id, str( e ) )
-                    job.fail( msg )
-                    log.exception( msg )
+                    job.fail( "failure running job %d: %s" % ( sjob.job_id, str( e ) ) )
+                    log.exception( "failure running job %d" % sjob.job_id )
             
     def put( self, job_id, tool ):
         """Add a job to the queue (by job identifier)"""
@@ -473,8 +471,8 @@
                 os.remove( fname )
             if self.working_directory is not None:
                 os.rmdir( self.working_directory )
-        except Exception, e:
-            log.exception( "Unable to cleanup job %s, exception: %s" % ( str( self.job_id ), str( e ) ) )
+        except:
+            log.exception( "Unable to cleanup job %d" % self.job_id )
         
     def get_command_line( self ):
         return self.command_line
@@ -573,8 +571,8 @@
         while self.running:
             try:
                 self.monitor_step()
-            except Exception, e:
-                log.exception( "Exception in monitor_step: %s" % str( e ) )
+            except:
+                log.exception( "Exception in monitor_step" )
             # Sleep
             self.sleeper.sleep( 1 )
 
diff -r 91f6455e19e4 -r 4841a9e393c7 scripts/cleanup_datasets/cleanup_datasets.py
--- a/scripts/cleanup_datasets/cleanup_datasets.py Tue Oct 28 12:57:39 2008 -0400
+++ b/scripts/cleanup_datasets/cleanup_datasets.py Tue Oct 28 14:31:02 2008 -0400
@@ -47,6 +47,7 @@
     app = CleanupDatasetsApplication( database_connection=database_connection, file_path=file_path )
     h = app.model.History
     d = app.model.Dataset
+    m = app.model.MetadataFile
     cutoff_time = datetime.utcnow() - timedelta( days=options.days )
     now = strftime( "%Y-%m-%d %H:%M:%S" )
 
@@ -63,7 +64,7 @@
             print "# Datasets will be removed from disk...\n"
         else:
             print "# Datasets will NOT be removed from disk...\n"
-        purge_histories( h, d, cutoff_time, options.remove_from_disk )
+        purge_histories( h, d, m, cutoff_time, options.remove_from_disk )
     elif options.info_purge_datasets:
         info_purge_datasets( d, cutoff_time )
     elif options.purge_datasets:
@@ -71,7 +72,7 @@
             print "# Datasets will be removed from disk...\n"
         else:
             print "# Datasets will NOT be removed from disk...\n"
-        purge_datasets( d, cutoff_time, options.remove_from_disk )
+        purge_datasets( d, m, cutoff_time, options.remove_from_disk )
     sys.exit(0)
 
 def info_delete_userless_histories( h, cutoff_time ):
@@ -79,7 +80,7 @@
     history_count = 0
     dataset_count = 0
     where = ( h.table.c.user_id==None ) & ( h.table.c.deleted=='f' ) & ( h.table.c.update_time < cutoff_time )
-    histories = h.query().filter( where ).options( eagerload( 'active_datasets' ) )
+    histories = h.query().filter( where ).options( eagerload( 'active_datasets' ) ).all()
 
     print '# The following datasets and associated userless histories will be deleted'
     start = time.clock()
@@ -105,13 +106,13 @@
 
     print '# The following datasets and associated userless histories have been deleted'
     start = time.clock()
-    histories = h.query().filter( h_where ).options( eagerload( 'active_datasets' ) )
+    histories = h.query().filter( h_where ).options( eagerload( 'active_datasets' ) ).all()
     for history in histories:
         for dataset_assoc in history.active_datasets:
             if not dataset_assoc.deleted:
                 # Mark all datasets as deleted
                 d_where = ( d.table.c.id==dataset_assoc.dataset_id )
-                datasets = d.query().filter( d_where )
+                datasets = d.query().filter( d_where ).all()
                 for dataset in datasets:
                     if not dataset.deleted:
                         dataset.deleted = True
@@ -139,13 +140,13 @@
 
     print '# The following datasets and associated deleted histories will be purged'
     start = time.clock()
-    histories = h.query().filter( h_where ).options( eagerload( 'datasets' ) )  
+    histories = h.query().filter( h_where ).options( eagerload( 'datasets' ) ).all()
     for history in histories:
         for dataset_assoc in history.datasets:
             # Datasets can only be purged if their HistoryDatasetAssociation has been deleted.
             if dataset_assoc.deleted:
                 d_where = ( d.table.c.id==dataset_assoc.dataset_id )
-                datasets = d.query().filter( d_where )
+                datasets = d.query().filter( d_where ).all()
                 for dataset in datasets:
                     if dataset.purgable and not dataset.purged:
                         print "%s" % dataset.file_name
@@ -160,7 +161,7 @@
     print '# %d histories ( including a total of %d datasets ) will be purged.  Freed disk space: ' %( history_count, dataset_count ), disk_space, '\n'
     print "Elapsed time: ", stop - start, "\n"
 
-def purge_histories( h, d, cutoff_time, remove_from_disk ):
+def purge_histories( h, d, m, cutoff_time, remove_from_disk ):
     # Purges deleted histories whose update_time is older than the cutoff_time.
     # The datasets associated with each history are also purged.
     history_count = 0
@@ -172,13 +173,13 @@
 
     print '# The following datasets and associated deleted histories have been purged'
     start = time.clock()
-    histories = h.query().filter( h_where ).options( eagerload( 'datasets' ) )      
+    histories = h.query().filter( h_where ).options( eagerload( 'datasets' ) ).all()    
     for history in histories:
         errors = False
         for dataset_assoc in history.datasets:
             if dataset_assoc.deleted:
                 d_where = ( d.table.c.id==dataset_assoc.dataset_id )
-                datasets = d.query().filter( d_where )
+                datasets = d.query().filter( d_where ).all()
                 for dataset in datasets:
                     if dataset.purgable and not dataset.purged:
                         file_size = dataset.file_size
@@ -186,7 +187,7 @@
                         dataset.file_size = 0
                         if remove_from_disk:
                             dataset.flush()
-                            errmsg = purge_dataset( dataset )
+                            errmsg = purge_dataset( dataset, m )
                             if errmsg:
                                 errors = True
                                 print errmsg
@@ -196,6 +197,14 @@
                             dataset.purged = True
                             dataset.flush()
                             print "%s" % dataset.file_name
+                            # Mark all associated MetadataFiles as deleted and purged
+                            print "The following metadata files associated with dataset '%s' have been marked purged" % dataset.file_name
+                            for hda in dataset.history_associations:
+                                for metadata_file in m.filter( m.table.c.hda_id==hda.id ).all():
+                                    metadata_file.deleted = True
+                                    metadata_file.purged = True
+                                    metadata_file.flush()
+                                    print "%s" % metadata_file.file_name()
                         dataset_count += 1
                         try:
                             disk_space += file_size
@@ -218,7 +227,7 @@
 
     print '# The following deleted datasets will be purged'    
     start = time.clock()
-    datasets = d.query().filter( where )
+    datasets = d.query().filter( where ).all()
     for dataset in datasets:
         print "%s" % dataset.file_name
         dataset_count += 1
@@ -230,7 +239,7 @@
     print '# %d datasets will be purged.  Freed disk space: ' %dataset_count, disk_space, '\n'
     print "Elapsed time: ", stop - start, "\n"
 
-def purge_datasets( d, cutoff_time, remove_from_disk ):
+def purge_datasets( d, m, cutoff_time, remove_from_disk ):
     # Purges deleted datasets whose update_time is older than cutoff_time.  Files may or may
     # not be removed from disk.
     dataset_count = 0
@@ -240,11 +249,11 @@
 
     print '# The following deleted datasets have been purged'
     start = time.clock()
-    datasets = d.query().filter( where )
+    datasets = d.query().filter( where ).all()
     for dataset in datasets:
         file_size = dataset.file_size
         if remove_from_disk:
-            errmsg = purge_dataset( dataset )
+            errmsg = purge_dataset( dataset, m )
             if errmsg:
                print errmsg
             else:
@@ -255,6 +264,14 @@
             dataset.file_size = 0
             dataset.flush()
             print "%s" % dataset.file_name
+            # Mark all associated MetadataFiles as deleted and purged
+            print "The following metadata files associated with dataset '%s' have been marked purged" % dataset.file_name
+            for hda in dataset.history_associations:
+                for metadata_file in m.filter( m.table.c.hda_id==hda.id ).all():
+                    metadata_file.deleted = True
+                    metadata_file.purged = True
+                    metadata_file.flush()
+                    print "%s" % metadata_file.file_name()
             dataset_count += 1
         try:
             disk_space += file_size
@@ -266,11 +283,10 @@
         print '# Freed disk space: ', disk_space, '\n'
     print "Elapsed time: ", stop - start, "\n"
 
-def purge_dataset( dataset ):
+def purge_dataset( dataset, m ):
     # Removes the file from disk and updates the database accordingly.
     if dataset.deleted:
         # Remove files from disk and update the database
-        purgable = False
         try:
             dataset.purged = True
             dataset.file_size = 0
@@ -284,15 +300,24 @@
                 if not shared_data.deleted:
                     break #only purge when not shared
             else:
+                # Remove dataset file from disk
                 os.unlink( dataset.file_name )
-                purgable = True
+                # Mark all associated MetadataFiles as deleted and purged and remove them from disk
+                print "The following metadata files associated with dataset '%s' have been purged" % dataset.file_name
+                for hda in dataset.history_associations:
+                    for metadata_file in m.filter( m.table.c.hda_id==hda.id ).all():
+                        os.unlink( metadata_file.file_name() )
+                        metadata_file.deleted = True
+                        metadata_file.purged = True
+                        metadata_file.flush()
+                        print "%s" % metadata_file.file_name()
+                try:
+                    # Remove associated extra files from disk if they exist
+                    os.unlink( dataset.extra_files_path )
+                except:
+                    pass
         except Exception, exc:
             return "# Error, exception: %s caught attempting to purge %s\n" %( str( exc ), dataset.file_name )
-        try:
-            if purgable:
-                os.unlink( dataset.extra_files_path )
-        except:
-            pass
     else:
         return "# Error: '%s' has not previously been deleted, so it cannot be purged\n" %dataset.file_name
     return ""