001    package org.apache.turbine.services.schedule;
002    
003    /*
004     * Licensed to the Apache Software Foundation (ASF) under one
005     * or more contributor license agreements.  See the NOTICE file
006     * distributed with this work for additional information
007     * regarding copyright ownership.  The ASF licenses this file
008     * to you under the Apache License, Version 2.0 (the
009     * "License"); you may not use this file except in compliance
010     * with the License.  You may obtain a copy of the License at
011     *
012     *   http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing,
015     * software distributed under the License is distributed on an
016     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017     * KIND, either express or implied.  See the License for the
018     * specific language governing permissions and limitations
019     * under the License.
020     */
021    
022    import java.util.Iterator;
023    import java.util.List;
024    
025    import javax.servlet.ServletConfig;
026    
027    import org.apache.commons.logging.Log;
028    import org.apache.commons.logging.LogFactory;
029    import org.apache.torque.TorqueException;
030    import org.apache.torque.util.Criteria;
031    import org.apache.turbine.services.InitializationException;
032    import org.apache.turbine.services.TurbineBaseService;
033    import org.apache.turbine.util.TurbineException;
034    
035    /**
036     * Service for a cron like scheduler.
037     *
038     * @author <a href="mailto:mbryson@mont.mindspring.com">Dave Bryson</a>
039     * @author <a href="mailto:quintonm@bellsouth.net">Quinton McCombs</a>
040     * @version $Id: TurbineSchedulerService.java 534527 2007-05-02 16:10:59Z tv $
041     */
042    public class TurbineSchedulerService
043            extends TurbineBaseService
044            implements ScheduleService
045    {
046        /** Logging */
047        protected static Log log = LogFactory.getLog(ScheduleService.LOGGER_NAME);
048    
049        /** The queue */
050        protected JobQueue scheduleQueue = null;
051    
052        /** Current status of the scheduler */
053        protected boolean enabled = false;
054    
055        /** The main loop for starting jobs. */
056        protected MainLoop mainLoop;
057    
058        /** The thread used to process commands.  */
059        protected Thread thread;
060    
061        /**
062         * Creates a new instance.
063         */
064        public TurbineSchedulerService()
065        {
066            mainLoop = null;
067            thread = null;
068        }
069    
070        /**
071         * Initializes the SchedulerService.
072         *
073         * @throws InitializationException Something went wrong in the init
074         *         stage
075         */
076        @Override
077        public void init()
078                throws InitializationException
079        {
080            try
081            {
082                setEnabled(getConfiguration().getBoolean("enabled", true));
083                scheduleQueue = new JobQueue();
084                mainLoop = new MainLoop();
085    
086                // Load all from cold storage.
087                List<JobEntry> jobs = JobEntryPeer.doSelect(new Criteria());
088    
089                if (jobs != null && jobs.size() > 0)
090                {
091                    Iterator<JobEntry> it = jobs.iterator();
092                    while (it.hasNext())
093                    {
094                        it.next().calcRunTime();
095                    }
096                    scheduleQueue.batchLoad(jobs);
097    
098                    restart();
099                }
100    
101                setInit(true);
102            }
103            catch (Exception e)
104            {
105                String errorMessage = "Could not initialize the scheduler service";
106                log.error(errorMessage, e);
107                throw new InitializationException(errorMessage, e);
108            }
109        }
110    
111        /**
112         * Called the first time the Service is used.<br>
113         *
114         * Load all the jobs from cold storage.  Add jobs to the queue
115         * (sorted in ascending order by runtime) and start the scheduler
116         * thread.
117         *
118         * @param config A ServletConfig.
119         * @deprecated use init() instead.
120         */
121        @Deprecated
122        public void init(ServletConfig config) throws InitializationException
123        {
124            init();
125        }
126    
127        /**
128         * Shutdowns the service.
129         *
130         * This methods interrupts the housekeeping thread.
131         */
132        @Override
133        public void shutdown()
134        {
135            if (getThread() != null)
136            {
137                getThread().interrupt();
138            }
139        }
140    
141        /**
142         * Get a specific Job from Storage.
143         *
144         * @param oid The int id for the job.
145         * @return A JobEntry.
146         * @exception TurbineException job could not be retreived.
147         */
148        public JobEntry getJob(int oid)
149                throws TurbineException
150        {
151            try
152            {
153                JobEntry je = JobEntryPeer.retrieveByPK(oid);
154                return scheduleQueue.getJob(je);
155            }
156            catch (TorqueException e)
157            {
158                String errorMessage = "Error retrieving job from persistent storage.";
159                log.error(errorMessage, e);
160                throw new TurbineException(errorMessage, e);
161            }
162        }
163    
164        /**
165         * Add a new job to the queue.
166         *
167         * @param je A JobEntry with the job to add.
168         * @throws TurbineException job could not be added
169         */
170        public void addJob(JobEntry je)
171                throws TurbineException
172        {
173            updateJob(je);
174        }
175    
176        /**
177         * Remove a job from the queue.
178         *
179         * @param je A JobEntry with the job to remove.
180         * @exception TurbineException job could not be removed
181         */
182        public void removeJob(JobEntry je)
183                throws TurbineException
184        {
185            try
186            {
187                // First remove from DB.
188                Criteria c = new Criteria().add(JobEntryPeer.JOB_ID, je.getPrimaryKey());
189                JobEntryPeer.doDelete(c);
190    
191                // Remove from the queue.
192                scheduleQueue.remove(je);
193    
194                // restart the scheduler
195                restart();
196            }
197            catch (Exception e)
198            {
199                String errorMessage = "Problem removing Scheduled Job: " + je.getTask();
200                log.error(errorMessage, e);
201                throw new TurbineException(errorMessage, e);
202            }
203        }
204    
205        /**
206         * Add or update a job.
207         *
208         * @param je A JobEntry with the job to modify
209         * @throws TurbineException job could not be updated
210         */
211        public void updateJob(JobEntry je)
212                throws TurbineException
213        {
214            try
215            {
216                je.calcRunTime();
217    
218                // Update the queue.
219                if (je.isNew())
220                {
221                    scheduleQueue.add(je);
222                }
223                else
224                {
225                    scheduleQueue.modify(je);
226                }
227    
228                je.save();
229    
230                restart();
231            }
232            catch (Exception e)
233            {
234                String errorMessage = "Problem updating Scheduled Job: " + je.getTask();
235                log.error(errorMessage, e);
236                throw new TurbineException(errorMessage, e);
237            }
238        }
239    
240        /**
241         * List jobs in the queue.  This is used by the scheduler UI.
242         *
243         * @return A List of jobs.
244         */
245        public List<JobEntry> listJobs()
246        {
247            return scheduleQueue.list();
248        }
249    
250        /**
251         * Sets the enabled status of the scheduler
252         *
253         * @param enabled
254         *
255         */
256        protected void setEnabled(boolean enabled)
257        {
258            this.enabled = enabled;
259        }
260    
261        /**
262         * Determines if the scheduler service is currently enabled.
263         *
264         * @return Status of the scheduler service.
265         */
266        public boolean isEnabled()
267        {
268            return enabled;
269        }
270    
271        /**
272         * Starts or restarts the scheduler if not already running.
273         */
274        public synchronized void startScheduler()
275        {
276            setEnabled(true);
277            restart();
278        }
279    
280        /**
281         * Stops the scheduler if it is currently running.
282         */
283        public synchronized void stopScheduler()
284        {
285            log.info("Stopping job scheduler");
286            Thread thread = getThread();
287            if (thread != null)
288            {
289                thread.interrupt();
290            }
291            enabled = false;
292        }
293    
294        /**
295         * Return the thread being used to process commands, or null if
296         * there is no such thread.  You can use this to invoke any
297         * special methods on the thread, for example, to interrupt it.
298         *
299         * @return A Thread.
300         */
301        public synchronized Thread getThread()
302        {
303            return thread;
304        }
305    
306        /**
307         * Set thread to null to indicate termination.
308         */
309        protected synchronized void clearThread()
310        {
311            thread = null;
312        }
313    
314        /**
315         * Start (or restart) a thread to process commands, or wake up an
316         * existing thread if one is already running.  This method can be
317         * invoked if the background thread crashed due to an
318         * unrecoverable exception in an executed command.
319         */
320        public synchronized void restart()
321        {
322            if (enabled)
323            {
324                log.info("Starting job scheduler");
325                if (thread == null)
326                {
327                    // Create the the housekeeping thread of the scheduler. It will wait
328                    // for the time when the next task needs to be started, and then
329                    // launch a worker thread to execute the task.
330                    thread = new Thread(mainLoop, ScheduleService.SERVICE_NAME);
331                    // Indicate that this is a system thread. JVM will quit only when there
332                    // are no more enabled user threads. Settings threads spawned internally
333                    // by Turbine as daemons allows commandline applications using Turbine
334                    // to terminate in an orderly manner.
335                    thread.setDaemon(true);
336                    thread.start();
337                }
338                else
339                {
340                    notify();
341                }
342            }
343        }
344    
345        /**
346         *  Return the next Job to execute, or null if thread is
347         *  interrupted.
348         *
349         * @return A JobEntry.
350         * @exception TurbineException a generic exception.
351         */
352        protected synchronized JobEntry nextJob()
353                throws TurbineException
354        {
355            try
356            {
357                while (!Thread.interrupted())
358                {
359                    // Grab the next job off the queue.
360                    JobEntry je = scheduleQueue.getNext();
361    
362                    if (je == null)
363                    {
364                        // Queue must be empty. Wait on it.
365                        wait();
366                    }
367                    else
368                    {
369                        long now = System.currentTimeMillis();
370                        long when = je.getNextRuntime();
371    
372                        if (when > now)
373                        {
374                            // Wait till next runtime.
375                            wait(when - now);
376                        }
377                        else
378                        {
379                            // Update the next runtime for the job.
380                            scheduleQueue.updateQueue(je);
381                            // Return the job to run it.
382                            return je;
383                        }
384                    }
385                }
386            }
387            catch (InterruptedException ex)
388            {
389                // ignore
390            }
391    
392            // On interrupt.
393            return null;
394        }
395    
396        /**
397         * Inner class.  This is isolated in its own Runnable class just
398         * so that the main class need not implement Runnable, which would
399         * allow others to directly invoke run, which is not supported.
400         */
401        protected class MainLoop
402                implements Runnable
403        {
404            /**
405             * Method to run the class.
406             */
407            public void run()
408            {
409                String taskName = null;
410                try
411                {
412                    while (enabled)
413                    {
414                        JobEntry je = nextJob();
415                        if (je != null)
416                        {
417                            taskName = je.getTask();
418    
419                            // Start the thread to run the job.
420                            Runnable wt = new WorkerThread(je);
421                            Thread helper = new Thread(wt);
422                            helper.start();
423                        }
424                        else
425                        {
426                            break;
427                        }
428                    }
429                }
430                catch (Exception e)
431                {
432                    log.error("Error running a Scheduled Job: " + taskName, e);
433                    enabled = false;
434                }
435                finally
436                {
437                    clearThread();
438                }
439            }
440        }
441    }