Coverage Report - org.apache.turbine.services.schedule.TurbineSchedulerService
 
Classes in this File Line Coverage Branch Coverage Complexity
TurbineSchedulerService
42%
41/97
36%
8/22
2,389
TurbineSchedulerService$MainLoop
50%
8/16
50%
2/4
2,389
 
 1  
 package org.apache.turbine.services.schedule;
 2  
 
 3  
 /*
 4  
  * Licensed to the Apache Software Foundation (ASF) under one
 5  
  * or more contributor license agreements.  See the NOTICE file
 6  
  * distributed with this work for additional information
 7  
  * regarding copyright ownership.  The ASF licenses this file
 8  
  * to you under the Apache License, Version 2.0 (the
 9  
  * "License"); you may not use this file except in compliance
 10  
  * with the License.  You may obtain a copy of the License at
 11  
  *
 12  
  *   http://www.apache.org/licenses/LICENSE-2.0
 13  
  *
 14  
  * Unless required by applicable law or agreed to in writing,
 15  
  * software distributed under the License is distributed on an
 16  
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 17  
  * KIND, either express or implied.  See the License for the
 18  
  * specific language governing permissions and limitations
 19  
  * under the License.
 20  
  */
 21  
 
 22  
 import java.util.Iterator;
 23  
 import java.util.List;
 24  
 
 25  
 import javax.servlet.ServletConfig;
 26  
 
 27  
 import org.apache.commons.logging.Log;
 28  
 import org.apache.commons.logging.LogFactory;
 29  
 import org.apache.torque.TorqueException;
 30  
 import org.apache.torque.util.Criteria;
 31  
 import org.apache.turbine.services.InitializationException;
 32  
 import org.apache.turbine.services.TurbineBaseService;
 33  
 import org.apache.turbine.util.TurbineException;
 34  
 
 35  
 /**
 36  
  * Service for a cron like scheduler.
 37  
  *
 38  
  * @author <a href="mailto:mbryson@mont.mindspring.com">Dave Bryson</a>
 39  
  * @author <a href="mailto:quintonm@bellsouth.net">Quinton McCombs</a>
 40  
  * @version $Id: TurbineSchedulerService.java 534527 2007-05-02 16:10:59Z tv $
 41  
  */
 42  
 public class TurbineSchedulerService
 43  
         extends TurbineBaseService
 44  
         implements ScheduleService
 45  
 {
 46  
     /** Logging */
 47  2
     protected static Log log = LogFactory.getLog(ScheduleService.LOGGER_NAME);
 48  
 
 49  
     /** The queue */
 50  2
     protected JobQueue scheduleQueue = null;
 51  
 
 52  
     /** Current status of the scheduler */
 53  2
     protected boolean enabled = false;
 54  
 
 55  
     /** The main loop for starting jobs. */
 56  
     protected MainLoop mainLoop;
 57  
 
 58  
     /** The thread used to process commands.  */
 59  
     protected Thread thread;
 60  
 
 61  
     /**
 62  
      * Creates a new instance.
 63  
      */
 64  
     public TurbineSchedulerService()
 65  2
     {
 66  2
         mainLoop = null;
 67  2
         thread = null;
 68  2
     }
 69  
 
 70  
     /**
 71  
      * Initializes the SchedulerService.
 72  
      *
 73  
      * @throws InitializationException Something went wrong in the init
 74  
      *         stage
 75  
      */
 76  
     @Override
 77  
     public void init()
 78  
             throws InitializationException
 79  
     {
 80  
         try
 81  
         {
 82  0
             setEnabled(getConfiguration().getBoolean("enabled", true));
 83  0
             scheduleQueue = new JobQueue();
 84  0
             mainLoop = new MainLoop();
 85  
 
 86  
             // Load all from cold storage.
 87  0
             List<JobEntry> jobs = JobEntryPeer.doSelect(new Criteria());
 88  
 
 89  0
             if (jobs != null && jobs.size() > 0)
 90  
             {
 91  0
                 Iterator<JobEntry> it = jobs.iterator();
 92  0
                 while (it.hasNext())
 93  
                 {
 94  0
                     it.next().calcRunTime();
 95  
                 }
 96  0
                 scheduleQueue.batchLoad(jobs);
 97  
 
 98  0
                 restart();
 99  
             }
 100  
 
 101  0
             setInit(true);
 102  
         }
 103  0
         catch (Exception e)
 104  
         {
 105  0
             String errorMessage = "Could not initialize the scheduler service";
 106  0
             log.error(errorMessage, e);
 107  0
             throw new InitializationException(errorMessage, e);
 108  0
         }
 109  0
     }
 110  
 
 111  
     /**
 112  
      * Called the first time the Service is used.<br>
 113  
      *
 114  
      * Load all the jobs from cold storage.  Add jobs to the queue
 115  
      * (sorted in ascending order by runtime) and start the scheduler
 116  
      * thread.
 117  
      *
 118  
      * @param config A ServletConfig.
 119  
      * @deprecated use init() instead.
 120  
      */
 121  
     @Deprecated
 122  
     public void init(ServletConfig config) throws InitializationException
 123  
     {
 124  0
         init();
 125  0
     }
 126  
 
 127  
     /**
 128  
      * Shutdowns the service.
 129  
      *
 130  
      * This methods interrupts the housekeeping thread.
 131  
      */
 132  
     @Override
 133  
     public void shutdown()
 134  
     {
 135  0
         if (getThread() != null)
 136  
         {
 137  0
             getThread().interrupt();
 138  
         }
 139  0
     }
 140  
 
 141  
     /**
 142  
      * Get a specific Job from Storage.
 143  
      *
 144  
      * @param oid The int id for the job.
 145  
      * @return A JobEntry.
 146  
      * @exception TurbineException job could not be retreived.
 147  
      */
 148  
     public JobEntry getJob(int oid)
 149  
             throws TurbineException
 150  
     {
 151  
         try
 152  
         {
 153  0
             JobEntry je = JobEntryPeer.retrieveByPK(oid);
 154  0
             return scheduleQueue.getJob(je);
 155  
         }
 156  0
         catch (TorqueException e)
 157  
         {
 158  0
             String errorMessage = "Error retrieving job from persistent storage.";
 159  0
             log.error(errorMessage, e);
 160  0
             throw new TurbineException(errorMessage, e);
 161  
         }
 162  
     }
 163  
 
 164  
     /**
 165  
      * Add a new job to the queue.
 166  
      *
 167  
      * @param je A JobEntry with the job to add.
 168  
      * @throws TurbineException job could not be added
 169  
      */
 170  
     public void addJob(JobEntry je)
 171  
             throws TurbineException
 172  
     {
 173  0
         updateJob(je);
 174  0
     }
 175  
 
 176  
     /**
 177  
      * Remove a job from the queue.
 178  
      *
 179  
      * @param je A JobEntry with the job to remove.
 180  
      * @exception TurbineException job could not be removed
 181  
      */
 182  
     public void removeJob(JobEntry je)
 183  
             throws TurbineException
 184  
     {
 185  
         try
 186  
         {
 187  
             // First remove from DB.
 188  0
             Criteria c = new Criteria().add(JobEntryPeer.JOB_ID, je.getPrimaryKey());
 189  0
             JobEntryPeer.doDelete(c);
 190  
 
 191  
             // Remove from the queue.
 192  0
             scheduleQueue.remove(je);
 193  
 
 194  
             // restart the scheduler
 195  0
             restart();
 196  
         }
 197  0
         catch (Exception e)
 198  
         {
 199  0
             String errorMessage = "Problem removing Scheduled Job: " + je.getTask();
 200  0
             log.error(errorMessage, e);
 201  0
             throw new TurbineException(errorMessage, e);
 202  0
         }
 203  0
     }
 204  
 
 205  
     /**
 206  
      * Add or update a job.
 207  
      *
 208  
      * @param je A JobEntry with the job to modify
 209  
      * @throws TurbineException job could not be updated
 210  
      */
 211  
     public void updateJob(JobEntry je)
 212  
             throws TurbineException
 213  
     {
 214  
         try
 215  
         {
 216  0
             je.calcRunTime();
 217  
 
 218  
             // Update the queue.
 219  0
             if (je.isNew())
 220  
             {
 221  0
                 scheduleQueue.add(je);
 222  
             }
 223  
             else
 224  
             {
 225  0
                 scheduleQueue.modify(je);
 226  
             }
 227  
 
 228  0
             je.save();
 229  
 
 230  0
             restart();
 231  
         }
 232  0
         catch (Exception e)
 233  
         {
 234  0
             String errorMessage = "Problem updating Scheduled Job: " + je.getTask();
 235  0
             log.error(errorMessage, e);
 236  0
             throw new TurbineException(errorMessage, e);
 237  0
         }
 238  0
     }
 239  
 
 240  
     /**
 241  
      * List jobs in the queue.  This is used by the scheduler UI.
 242  
      *
 243  
      * @return A List of jobs.
 244  
      */
 245  
     public List<JobEntry> listJobs()
 246  
     {
 247  6
         return scheduleQueue.list();
 248  
     }
 249  
 
 250  
     /**
 251  
      * Sets the enabled status of the scheduler
 252  
      *
 253  
      * @param enabled
 254  
      *
 255  
      */
 256  
     protected void setEnabled(boolean enabled)
 257  
     {
 258  4
         this.enabled = enabled;
 259  4
     }
 260  
 
 261  
     /**
 262  
      * Determines if the scheduler service is currently enabled.
 263  
      *
 264  
      * @return Status of the scheduler service.
 265  
      */
 266  
     public boolean isEnabled()
 267  
     {
 268  4
         return enabled;
 269  
     }
 270  
 
 271  
     /**
 272  
      * Starts or restarts the scheduler if not already running.
 273  
      */
 274  
     public synchronized void startScheduler()
 275  
     {
 276  2
         setEnabled(true);
 277  2
         restart();
 278  2
     }
 279  
 
 280  
     /**
 281  
      * Stops the scheduler if it is currently running.
 282  
      */
 283  
     public synchronized void stopScheduler()
 284  
     {
 285  2
         log.info("Stopping job scheduler");
 286  2
         Thread thread = getThread();
 287  2
         if (thread != null)
 288  
         {
 289  2
             thread.interrupt();
 290  
         }
 291  2
         enabled = false;
 292  2
     }
 293  
 
 294  
     /**
 295  
      * Return the thread being used to process commands, or null if
 296  
      * there is no such thread.  You can use this to invoke any
 297  
      * special methods on the thread, for example, to interrupt it.
 298  
      *
 299  
      * @return A Thread.
 300  
      */
 301  
     public synchronized Thread getThread()
 302  
     {
 303  2
         return thread;
 304  
     }
 305  
 
 306  
     /**
 307  
      * Set thread to null to indicate termination.
 308  
      */
 309  
     protected synchronized void clearThread()
 310  
     {
 311  2
         thread = null;
 312  2
     }
 313  
 
 314  
     /**
 315  
      * Start (or restart) a thread to process commands, or wake up an
 316  
      * existing thread if one is already running.  This method can be
 317  
      * invoked if the background thread crashed due to an
 318  
      * unrecoverable exception in an executed command.
 319  
      */
 320  
     public synchronized void restart()
 321  
     {
 322  8
         if (enabled)
 323  
         {
 324  4
             log.info("Starting job scheduler");
 325  4
             if (thread == null)
 326  
             {
 327  
                 // Create the the housekeeping thread of the scheduler. It will wait
 328  
                 // for the time when the next task needs to be started, and then
 329  
                 // launch a worker thread to execute the task.
 330  2
                 thread = new Thread(mainLoop, ScheduleService.SERVICE_NAME);
 331  
                 // Indicate that this is a system thread. JVM will quit only when there
 332  
                 // are no more enabled user threads. Settings threads spawned internally
 333  
                 // by Turbine as daemons allows commandline applications using Turbine
 334  
                 // to terminate in an orderly manner.
 335  2
                 thread.setDaemon(true);
 336  2
                 thread.start();
 337  
             }
 338  
             else
 339  
             {
 340  2
                 notify();
 341  
             }
 342  
         }
 343  8
     }
 344  
 
 345  
     /**
 346  
      *  Return the next Job to execute, or null if thread is
 347  
      *  interrupted.
 348  
      *
 349  
      * @return A JobEntry.
 350  
      * @exception TurbineException a generic exception.
 351  
      */
 352  
     protected synchronized JobEntry nextJob()
 353  
             throws TurbineException
 354  
     {
 355  
         try
 356  
         {
 357  3
             while (!Thread.interrupted())
 358  
             {
 359  
                 // Grab the next job off the queue.
 360  3
                 JobEntry je = scheduleQueue.getNext();
 361  
 
 362  3
                 if (je == null)
 363  
                 {
 364  
                     // Queue must be empty. Wait on it.
 365  0
                     wait();
 366  
                 }
 367  
                 else
 368  
                 {
 369  3
                     long now = System.currentTimeMillis();
 370  3
                     long when = je.getNextRuntime();
 371  
 
 372  3
                     if (when > now)
 373  
                     {
 374  
                         // Wait till next runtime.
 375  3
                         wait(when - now);
 376  
                     }
 377  
                     else
 378  
                     {
 379  
                         // Update the next runtime for the job.
 380  0
                         scheduleQueue.updateQueue(je);
 381  
                         // Return the job to run it.
 382  0
                         return je;
 383  
                     }
 384  
                 }
 385  1
             }
 386  
         }
 387  2
         catch (InterruptedException ex)
 388  
         {
 389  
             // ignore
 390  0
         }
 391  
 
 392  
         // On interrupt.
 393  2
         return null;
 394  
     }
 395  
 
 396  
     /**
 397  
      * Inner class.  This is isolated in its own Runnable class just
 398  
      * so that the main class need not implement Runnable, which would
 399  
      * allow others to directly invoke run, which is not supported.
 400  
      */
 401  2
     protected class MainLoop
 402  
             implements Runnable
 403  
     {
 404  
         /**
 405  
          * Method to run the class.
 406  
          */
 407  
         public void run()
 408  
         {
 409  2
             String taskName = null;
 410  
             try
 411  
             {
 412  2
                 while (enabled)
 413  
                 {
 414  2
                     JobEntry je = nextJob();
 415  2
                     if (je != null)
 416  
                     {
 417  0
                         taskName = je.getTask();
 418  
 
 419  
                         // Start the thread to run the job.
 420  0
                         Runnable wt = new WorkerThread(je);
 421  0
                         Thread helper = new Thread(wt);
 422  0
                         helper.start();
 423  
                     }
 424  
                     else
 425  
                     {
 426  
                         break;
 427  
                     }
 428  0
                 }
 429  
             }
 430  0
             catch (Exception e)
 431  
             {
 432  0
                 log.error("Error running a Scheduled Job: " + taskName, e);
 433  0
                 enabled = false;
 434  
             }
 435  
             finally
 436  
             {
 437  2
                 clearThread();
 438  2
             }
 439  2
         }
 440  
     }
 441  
 }