001 package org.apache.turbine.services.schedule; 002 003 /* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022 import java.util.Iterator; 023 import java.util.List; 024 025 import javax.servlet.ServletConfig; 026 027 import org.apache.commons.logging.Log; 028 import org.apache.commons.logging.LogFactory; 029 import org.apache.torque.TorqueException; 030 import org.apache.torque.util.Criteria; 031 import org.apache.turbine.services.InitializationException; 032 import org.apache.turbine.services.TurbineBaseService; 033 import org.apache.turbine.util.TurbineException; 034 035 /** 036 * Service for a cron like scheduler. 037 * 038 * @author <a href="mailto:mbryson@mont.mindspring.com">Dave Bryson</a> 039 * @author <a href="mailto:quintonm@bellsouth.net">Quinton McCombs</a> 040 * @version $Id: TurbineSchedulerService.java 534527 2007-05-02 16:10:59Z tv $ 041 */ 042 public class TurbineSchedulerService 043 extends TurbineBaseService 044 implements ScheduleService 045 { 046 /** Logging */ 047 protected static Log log = LogFactory.getLog(ScheduleService.LOGGER_NAME); 048 049 /** The queue */ 050 protected JobQueue scheduleQueue = null; 051 052 /** Current status of the scheduler */ 053 protected boolean enabled = false; 054 055 /** The main loop for starting jobs. */ 056 protected MainLoop mainLoop; 057 058 /** The thread used to process commands. */ 059 protected Thread thread; 060 061 /** 062 * Creates a new instance. 063 */ 064 public TurbineSchedulerService() 065 { 066 mainLoop = null; 067 thread = null; 068 } 069 070 /** 071 * Initializes the SchedulerService. 072 * 073 * @throws InitializationException Something went wrong in the init 074 * stage 075 */ 076 @Override 077 public void init() 078 throws InitializationException 079 { 080 try 081 { 082 setEnabled(getConfiguration().getBoolean("enabled", true)); 083 scheduleQueue = new JobQueue(); 084 mainLoop = new MainLoop(); 085 086 // Load all from cold storage. 087 List<JobEntry> jobs = JobEntryPeer.doSelect(new Criteria()); 088 089 if (jobs != null && jobs.size() > 0) 090 { 091 Iterator<JobEntry> it = jobs.iterator(); 092 while (it.hasNext()) 093 { 094 it.next().calcRunTime(); 095 } 096 scheduleQueue.batchLoad(jobs); 097 098 restart(); 099 } 100 101 setInit(true); 102 } 103 catch (Exception e) 104 { 105 String errorMessage = "Could not initialize the scheduler service"; 106 log.error(errorMessage, e); 107 throw new InitializationException(errorMessage, e); 108 } 109 } 110 111 /** 112 * Called the first time the Service is used.<br> 113 * 114 * Load all the jobs from cold storage. Add jobs to the queue 115 * (sorted in ascending order by runtime) and start the scheduler 116 * thread. 117 * 118 * @param config A ServletConfig. 119 * @deprecated use init() instead. 120 */ 121 @Deprecated 122 public void init(ServletConfig config) throws InitializationException 123 { 124 init(); 125 } 126 127 /** 128 * Shutdowns the service. 129 * 130 * This methods interrupts the housekeeping thread. 131 */ 132 @Override 133 public void shutdown() 134 { 135 if (getThread() != null) 136 { 137 getThread().interrupt(); 138 } 139 } 140 141 /** 142 * Get a specific Job from Storage. 143 * 144 * @param oid The int id for the job. 145 * @return A JobEntry. 146 * @exception TurbineException job could not be retreived. 147 */ 148 public JobEntry getJob(int oid) 149 throws TurbineException 150 { 151 try 152 { 153 JobEntry je = JobEntryPeer.retrieveByPK(oid); 154 return scheduleQueue.getJob(je); 155 } 156 catch (TorqueException e) 157 { 158 String errorMessage = "Error retrieving job from persistent storage."; 159 log.error(errorMessage, e); 160 throw new TurbineException(errorMessage, e); 161 } 162 } 163 164 /** 165 * Add a new job to the queue. 166 * 167 * @param je A JobEntry with the job to add. 168 * @throws TurbineException job could not be added 169 */ 170 public void addJob(JobEntry je) 171 throws TurbineException 172 { 173 updateJob(je); 174 } 175 176 /** 177 * Remove a job from the queue. 178 * 179 * @param je A JobEntry with the job to remove. 180 * @exception TurbineException job could not be removed 181 */ 182 public void removeJob(JobEntry je) 183 throws TurbineException 184 { 185 try 186 { 187 // First remove from DB. 188 Criteria c = new Criteria().add(JobEntryPeer.JOB_ID, je.getPrimaryKey()); 189 JobEntryPeer.doDelete(c); 190 191 // Remove from the queue. 192 scheduleQueue.remove(je); 193 194 // restart the scheduler 195 restart(); 196 } 197 catch (Exception e) 198 { 199 String errorMessage = "Problem removing Scheduled Job: " + je.getTask(); 200 log.error(errorMessage, e); 201 throw new TurbineException(errorMessage, e); 202 } 203 } 204 205 /** 206 * Add or update a job. 207 * 208 * @param je A JobEntry with the job to modify 209 * @throws TurbineException job could not be updated 210 */ 211 public void updateJob(JobEntry je) 212 throws TurbineException 213 { 214 try 215 { 216 je.calcRunTime(); 217 218 // Update the queue. 219 if (je.isNew()) 220 { 221 scheduleQueue.add(je); 222 } 223 else 224 { 225 scheduleQueue.modify(je); 226 } 227 228 je.save(); 229 230 restart(); 231 } 232 catch (Exception e) 233 { 234 String errorMessage = "Problem updating Scheduled Job: " + je.getTask(); 235 log.error(errorMessage, e); 236 throw new TurbineException(errorMessage, e); 237 } 238 } 239 240 /** 241 * List jobs in the queue. This is used by the scheduler UI. 242 * 243 * @return A List of jobs. 244 */ 245 public List<JobEntry> listJobs() 246 { 247 return scheduleQueue.list(); 248 } 249 250 /** 251 * Sets the enabled status of the scheduler 252 * 253 * @param enabled 254 * 255 */ 256 protected void setEnabled(boolean enabled) 257 { 258 this.enabled = enabled; 259 } 260 261 /** 262 * Determines if the scheduler service is currently enabled. 263 * 264 * @return Status of the scheduler service. 265 */ 266 public boolean isEnabled() 267 { 268 return enabled; 269 } 270 271 /** 272 * Starts or restarts the scheduler if not already running. 273 */ 274 public synchronized void startScheduler() 275 { 276 setEnabled(true); 277 restart(); 278 } 279 280 /** 281 * Stops the scheduler if it is currently running. 282 */ 283 public synchronized void stopScheduler() 284 { 285 log.info("Stopping job scheduler"); 286 Thread thread = getThread(); 287 if (thread != null) 288 { 289 thread.interrupt(); 290 } 291 enabled = false; 292 } 293 294 /** 295 * Return the thread being used to process commands, or null if 296 * there is no such thread. You can use this to invoke any 297 * special methods on the thread, for example, to interrupt it. 298 * 299 * @return A Thread. 300 */ 301 public synchronized Thread getThread() 302 { 303 return thread; 304 } 305 306 /** 307 * Set thread to null to indicate termination. 308 */ 309 protected synchronized void clearThread() 310 { 311 thread = null; 312 } 313 314 /** 315 * Start (or restart) a thread to process commands, or wake up an 316 * existing thread if one is already running. This method can be 317 * invoked if the background thread crashed due to an 318 * unrecoverable exception in an executed command. 319 */ 320 public synchronized void restart() 321 { 322 if (enabled) 323 { 324 log.info("Starting job scheduler"); 325 if (thread == null) 326 { 327 // Create the the housekeeping thread of the scheduler. It will wait 328 // for the time when the next task needs to be started, and then 329 // launch a worker thread to execute the task. 330 thread = new Thread(mainLoop, ScheduleService.SERVICE_NAME); 331 // Indicate that this is a system thread. JVM will quit only when there 332 // are no more enabled user threads. Settings threads spawned internally 333 // by Turbine as daemons allows commandline applications using Turbine 334 // to terminate in an orderly manner. 335 thread.setDaemon(true); 336 thread.start(); 337 } 338 else 339 { 340 notify(); 341 } 342 } 343 } 344 345 /** 346 * Return the next Job to execute, or null if thread is 347 * interrupted. 348 * 349 * @return A JobEntry. 350 * @exception TurbineException a generic exception. 351 */ 352 protected synchronized JobEntry nextJob() 353 throws TurbineException 354 { 355 try 356 { 357 while (!Thread.interrupted()) 358 { 359 // Grab the next job off the queue. 360 JobEntry je = scheduleQueue.getNext(); 361 362 if (je == null) 363 { 364 // Queue must be empty. Wait on it. 365 wait(); 366 } 367 else 368 { 369 long now = System.currentTimeMillis(); 370 long when = je.getNextRuntime(); 371 372 if (when > now) 373 { 374 // Wait till next runtime. 375 wait(when - now); 376 } 377 else 378 { 379 // Update the next runtime for the job. 380 scheduleQueue.updateQueue(je); 381 // Return the job to run it. 382 return je; 383 } 384 } 385 } 386 } 387 catch (InterruptedException ex) 388 { 389 // ignore 390 } 391 392 // On interrupt. 393 return null; 394 } 395 396 /** 397 * Inner class. This is isolated in its own Runnable class just 398 * so that the main class need not implement Runnable, which would 399 * allow others to directly invoke run, which is not supported. 400 */ 401 protected class MainLoop 402 implements Runnable 403 { 404 /** 405 * Method to run the class. 406 */ 407 public void run() 408 { 409 String taskName = null; 410 try 411 { 412 while (enabled) 413 { 414 JobEntry je = nextJob(); 415 if (je != null) 416 { 417 taskName = je.getTask(); 418 419 // Start the thread to run the job. 420 Runnable wt = new WorkerThread(je); 421 Thread helper = new Thread(wt); 422 helper.start(); 423 } 424 else 425 { 426 break; 427 } 428 } 429 } 430 catch (Exception e) 431 { 432 log.error("Error running a Scheduled Job: " + taskName, e); 433 enabled = false; 434 } 435 finally 436 { 437 clearThread(); 438 } 439 } 440 } 441 }