1 package org.apache.turbine.services.schedule;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Iterator;
23 import java.util.List;
24
25 import javax.servlet.ServletConfig;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.torque.TorqueException;
30 import org.apache.torque.util.Criteria;
31 import org.apache.turbine.services.InitializationException;
32 import org.apache.turbine.services.TurbineBaseService;
33 import org.apache.turbine.util.TurbineException;
34
35
36
37
38
39
40
41
42 public class TurbineSchedulerService
43 extends TurbineBaseService
44 implements ScheduleService
45 {
46
47 protected static Log log = LogFactory.getLog(ScheduleService.LOGGER_NAME);
48
49
50 protected JobQueue scheduleQueue = null;
51
52
53 protected boolean enabled = false;
54
55
56 protected MainLoop mainLoop;
57
58
59 protected Thread thread;
60
61
62
63
64 public TurbineSchedulerService()
65 {
66 mainLoop = null;
67 thread = null;
68 }
69
70
71
72
73
74
75
76 @Override
77 public void init()
78 throws InitializationException
79 {
80 try
81 {
82 setEnabled(getConfiguration().getBoolean("enabled", true));
83 scheduleQueue = new JobQueue();
84 mainLoop = new MainLoop();
85
86
87 List<JobEntry> jobs = JobEntryPeer.doSelect(new Criteria());
88
89 if (jobs != null && jobs.size() > 0)
90 {
91 Iterator<JobEntry> it = jobs.iterator();
92 while (it.hasNext())
93 {
94 it.next().calcRunTime();
95 }
96 scheduleQueue.batchLoad(jobs);
97
98 restart();
99 }
100
101 setInit(true);
102 }
103 catch (Exception e)
104 {
105 String errorMessage = "Could not initialize the scheduler service";
106 log.error(errorMessage, e);
107 throw new InitializationException(errorMessage, e);
108 }
109 }
110
111
112
113
114
115
116
117
118
119
120
121 @Deprecated
122 public void init(ServletConfig config) throws InitializationException
123 {
124 init();
125 }
126
127
128
129
130
131
132 @Override
133 public void shutdown()
134 {
135 if (getThread() != null)
136 {
137 getThread().interrupt();
138 }
139 }
140
141
142
143
144
145
146
147
148 public JobEntry getJob(int oid)
149 throws TurbineException
150 {
151 try
152 {
153 JobEntry je = JobEntryPeer.retrieveByPK(oid);
154 return scheduleQueue.getJob(je);
155 }
156 catch (TorqueException e)
157 {
158 String errorMessage = "Error retrieving job from persistent storage.";
159 log.error(errorMessage, e);
160 throw new TurbineException(errorMessage, e);
161 }
162 }
163
164
165
166
167
168
169
170 public void addJob(JobEntry je)
171 throws TurbineException
172 {
173 updateJob(je);
174 }
175
176
177
178
179
180
181
182 public void removeJob(JobEntry je)
183 throws TurbineException
184 {
185 try
186 {
187
188 Criteria c = new Criteria().add(JobEntryPeer.JOB_ID, je.getPrimaryKey());
189 JobEntryPeer.doDelete(c);
190
191
192 scheduleQueue.remove(je);
193
194
195 restart();
196 }
197 catch (Exception e)
198 {
199 String errorMessage = "Problem removing Scheduled Job: " + je.getTask();
200 log.error(errorMessage, e);
201 throw new TurbineException(errorMessage, e);
202 }
203 }
204
205
206
207
208
209
210
211 public void updateJob(JobEntry je)
212 throws TurbineException
213 {
214 try
215 {
216 je.calcRunTime();
217
218
219 if (je.isNew())
220 {
221 scheduleQueue.add(je);
222 }
223 else
224 {
225 scheduleQueue.modify(je);
226 }
227
228 je.save();
229
230 restart();
231 }
232 catch (Exception e)
233 {
234 String errorMessage = "Problem updating Scheduled Job: " + je.getTask();
235 log.error(errorMessage, e);
236 throw new TurbineException(errorMessage, e);
237 }
238 }
239
240
241
242
243
244
245 public List<JobEntry> listJobs()
246 {
247 return scheduleQueue.list();
248 }
249
250
251
252
253
254
255
256 protected void setEnabled(boolean enabled)
257 {
258 this.enabled = enabled;
259 }
260
261
262
263
264
265
266 public boolean isEnabled()
267 {
268 return enabled;
269 }
270
271
272
273
274 public synchronized void startScheduler()
275 {
276 setEnabled(true);
277 restart();
278 }
279
280
281
282
283 public synchronized void stopScheduler()
284 {
285 log.info("Stopping job scheduler");
286 Thread thread = getThread();
287 if (thread != null)
288 {
289 thread.interrupt();
290 }
291 enabled = false;
292 }
293
294
295
296
297
298
299
300
301 public synchronized Thread getThread()
302 {
303 return thread;
304 }
305
306
307
308
309 protected synchronized void clearThread()
310 {
311 thread = null;
312 }
313
314
315
316
317
318
319
320 public synchronized void restart()
321 {
322 if (enabled)
323 {
324 log.info("Starting job scheduler");
325 if (thread == null)
326 {
327
328
329
330 thread = new Thread(mainLoop, ScheduleService.SERVICE_NAME);
331
332
333
334
335 thread.setDaemon(true);
336 thread.start();
337 }
338 else
339 {
340 notify();
341 }
342 }
343 }
344
345
346
347
348
349
350
351
352 protected synchronized JobEntry nextJob()
353 throws TurbineException
354 {
355 try
356 {
357 while (!Thread.interrupted())
358 {
359
360 JobEntry je = scheduleQueue.getNext();
361
362 if (je == null)
363 {
364
365 wait();
366 }
367 else
368 {
369 long now = System.currentTimeMillis();
370 long when = je.getNextRuntime();
371
372 if (when > now)
373 {
374
375 wait(when - now);
376 }
377 else
378 {
379
380 scheduleQueue.updateQueue(je);
381
382 return je;
383 }
384 }
385 }
386 }
387 catch (InterruptedException ex)
388 {
389
390 }
391
392
393 return null;
394 }
395
396
397
398
399
400
401 protected class MainLoop
402 implements Runnable
403 {
404
405
406
407 public void run()
408 {
409 String taskName = null;
410 try
411 {
412 while (enabled)
413 {
414 JobEntry je = nextJob();
415 if (je != null)
416 {
417 taskName = je.getTask();
418
419
420 Runnable wt = new WorkerThread(je);
421 Thread helper = new Thread(wt);
422 helper.start();
423 }
424 else
425 {
426 break;
427 }
428 }
429 }
430 catch (Exception e)
431 {
432 log.error("Error running a Scheduled Job: " + taskName, e);
433 enabled = false;
434 }
435 finally
436 {
437 clearThread();
438 }
439 }
440 }
441 }