001/* 
002    Licensed to the Apache Software Foundation (ASF) under one
003    or more contributor license agreements.  See the NOTICE file
004    distributed with this work for additional information
005    regarding copyright ownership.  The ASF licenses this file
006    to you under the Apache License, Version 2.0 (the
007    "License"); you may not use this file except in compliance
008    with the License.  You may obtain a copy of the License at
009
010       http://www.apache.org/licenses/LICENSE-2.0
011
012    Unless required by applicable law or agreed to in writing,
013    software distributed under the License is distributed on an
014    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015    KIND, either express or implied.  See the License for the
016    specific language governing permissions and limitations
017    under the License.  
018 */
019package org.apache.wiki.workflow;
020
021import org.apache.commons.collections4.queue.CircularFifoQueue;
022import org.apache.commons.lang3.time.StopWatch;
023import org.apache.logging.log4j.LogManager;
024import org.apache.logging.log4j.Logger;
025import org.apache.wiki.api.core.Context;
026import org.apache.wiki.api.core.Engine;
027import org.apache.wiki.api.core.Session;
028import org.apache.wiki.api.exceptions.WikiException;
029import org.apache.wiki.auth.AuthorizationManager;
030import org.apache.wiki.auth.acl.UnresolvedPrincipal;
031import org.apache.wiki.event.WikiEvent;
032import org.apache.wiki.event.WikiEventEmitter;
033import org.apache.wiki.event.WorkflowEvent;
034import org.apache.wiki.util.TextUtil;
035
036import java.io.*;
037import java.nio.file.Files;
038import java.security.Principal;
039import java.util.*;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.CopyOnWriteArrayList;
042
043
044/**
045 * <p>
046 * Monitor class that tracks running Workflows. The WorkflowManager also keeps track of the names of
047 * users or groups expected to approve particular Workflows.
048 * </p>
049 */
050public class DefaultWorkflowManager implements WorkflowManager, Serializable {
051
052    private static final Logger LOG = LogManager.getLogger( DefaultWorkflowManager.class );
053    static final String SERIALIZATION_FILE = "wkflmgr.ser";
054
055    /** We use this also a generic serialization id */
056    private static final long serialVersionUID = 6L;
057
058    DecisionQueue m_queue;
059    Set< Workflow > m_workflows;
060    final Map< String, Principal > m_approvers;
061    Queue< Workflow > m_completed;
062    private Engine m_engine;
063    private int retainCompleted;
064
065    /**
066     * Constructs a new WorkflowManager, with an empty workflow cache.
067     */
068    public DefaultWorkflowManager() {
069        m_workflows = ConcurrentHashMap.newKeySet();
070        m_approvers = new ConcurrentHashMap<>();
071        m_queue = new DecisionQueue();
072        WikiEventEmitter.attach( this );
073    }
074
075    /**
076     * {@inheritDoc}
077     */
078    @Override
079    public Set< Workflow > getWorkflows() {
080        final Set< Workflow > workflows = ConcurrentHashMap.newKeySet();
081        workflows.addAll( m_workflows );
082        return workflows;
083    }
084
085    /**
086     * {@inheritDoc}
087     */
088    @Override
089    public List< Workflow > getCompletedWorkflows() {
090        return new CopyOnWriteArrayList< >( m_completed );
091    }
092
093    /**
094     * {@inheritDoc}
095     *
096     * Any properties that begin with {@link #PROPERTY_APPROVER_PREFIX} will be assumed to be Decisions that require approval. For a given
097     * property key, everything after the prefix denotes the Decision's message key. The property value indicates the Principal (Role,
098     * GroupPrincipal, WikiPrincipal) that must approve the Decision. For example, if the property key/value pair is
099     * {@code jspwiki.approver.workflow.saveWikiPage=Admin}, the Decision's message key is <code>workflow.saveWikiPage</code>. The Principal
100     * <code>Admin</code> will be resolved via {@link org.apache.wiki.auth.AuthorizationManager#resolvePrincipal(String)}.
101     */
102    @Override
103    public void initialize( final Engine engine, final Properties props ) {
104        m_engine = engine;
105        retainCompleted = TextUtil.getIntegerProperty( engine.getWikiProperties(), "jspwiki.workflow.completed.retain", 2048 );
106        m_completed = new CircularFifoQueue<>( retainCompleted );
107
108        // Identify the workflows requiring approvals
109        for( final Object o : props.keySet() ) {
110            final String prop = ( String )o;
111            if( prop.startsWith( PROPERTY_APPROVER_PREFIX ) ) {
112                // For the key, everything after the prefix is the workflow name
113                final String key = prop.substring( PROPERTY_APPROVER_PREFIX.length() );
114                if( key.length() > 0 ) {
115                    // Only use non-null/non-blank approvers
116                    final String approver = props.getProperty( prop );
117                    if( approver != null && !approver.isEmpty() ) {
118                        m_approvers.put( key, new UnresolvedPrincipal( approver ) );
119                    }
120                }
121            }
122        }
123
124        unserializeFromDisk( new File( m_engine.getWorkDir(), SERIALIZATION_FILE ) );
125    }
126
127    /**
128     *  Reads the serialized data from the disk back to memory.
129     *
130     * @return the date when the data was last written on disk or {@code 0} if there has been problems reading from disk.
131     */
132    @SuppressWarnings( "unchecked" )
133    synchronized long unserializeFromDisk( final File f ) {
134        long saved = 0L;
135        final StopWatch sw = new StopWatch();
136        sw.start();
137        try( final ObjectInputStream in = new ObjectInputStream( new BufferedInputStream( Files.newInputStream( f.toPath() ) ) ) ) {
138            final long ver = in.readLong();
139            if( ver != serialVersionUID ) {
140                LOG.warn( "File format has changed; Unable to recover workflows and decision queue from disk." );
141            } else {
142                saved        = in.readLong();
143                m_workflows  = ( Set< Workflow > )in.readObject();
144                m_queue      = ( DecisionQueue )in.readObject();
145                m_completed = new CircularFifoQueue<>( retainCompleted );
146                m_completed.addAll( ( Collection< Workflow > )in.readObject() );
147                LOG.debug( "Read serialized data successfully in " + sw );
148            }
149        } catch( final IOException | ClassNotFoundException e ) {
150            LOG.warn( "unable to recover from disk workflows and decision queue: " + e.getMessage() );
151        }
152        sw.stop();
153
154        return saved;
155    }
156
157    /**
158     *  Serializes workflows and decisionqueue to disk.  The format is private, don't touch it.
159     */
160    synchronized void serializeToDisk( final File f ) {
161        try( final ObjectOutputStream out = new ObjectOutputStream( new BufferedOutputStream( Files.newOutputStream( f.toPath() ) ) ) ) {
162            final StopWatch sw = new StopWatch();
163            sw.start();
164
165            out.writeLong( serialVersionUID );
166            out.writeLong( System.currentTimeMillis() ); // Timestamp
167            out.writeObject( m_workflows );
168            out.writeObject( m_queue );
169            out.writeObject( m_completed );
170
171            sw.stop();
172
173            LOG.debug( "serialization done - took " + sw );
174        } catch( final IOException ioe ) {
175            LOG.error( "Unable to serialize!", ioe );
176        }
177    }
178
179    /**
180     * {@inheritDoc}
181     */
182    @Override
183    public boolean requiresApproval( final String messageKey ) {
184        return  m_approvers.containsKey( messageKey );
185    }
186
187    /**
188     * {@inheritDoc}
189     */
190    @Override
191    public Principal getApprover( final String messageKey ) throws WikiException {
192        Principal approver = m_approvers.get( messageKey );
193        if ( approver == null ) {
194            throw new WikiException( "Workflow '" + messageKey + "' does not require approval." );
195        }
196
197        // Try to resolve UnresolvedPrincipals
198        if ( approver instanceof UnresolvedPrincipal ) {
199            final String name = approver.getName();
200            approver = m_engine.getManager( AuthorizationManager.class ).resolvePrincipal( name );
201
202            // If still unresolved, throw exception; otherwise, freshen our cache
203            if ( approver instanceof UnresolvedPrincipal ) {
204                throw new WikiException( "Workflow approver '" + name + "' cannot not be resolved." );
205            }
206
207            m_approvers.put( messageKey, approver );
208        }
209        return approver;
210    }
211
212    /**
213     * Protected helper method that returns the associated Engine
214     *
215     * @return the wiki engine
216     */
217    protected Engine getEngine() {
218        if ( m_engine == null ) {
219            throw new IllegalStateException( "Engine cannot be null; please initialize WorkflowManager first." );
220        }
221        return m_engine;
222    }
223
224    /**
225     * Returns the DecisionQueue associated with this WorkflowManager
226     *
227     * @return the decision queue
228     */
229    @Override
230    public DecisionQueue getDecisionQueue() {
231        return m_queue;
232    }
233
234    /**
235     * {@inheritDoc}
236     */
237    @Override
238    public List< Workflow > getOwnerWorkflows( final Session session ) {
239        final List< Workflow > workflows = new ArrayList<>();
240        if ( session.isAuthenticated() ) {
241            final Principal[] sessionPrincipals = session.getPrincipals();
242            for( final Workflow w : m_workflows ) {
243                final Principal owner = w.getOwner();
244                if (Arrays.stream(sessionPrincipals).anyMatch(sessionPrincipal -> sessionPrincipal.equals(owner))) {
245                    workflows.add(w);
246                }
247            }
248        }
249        return workflows;
250    }
251
252    /**
253     * Listens for {@link WorkflowEvent} objects emitted by Workflows. In particular, this method listens for {@link WorkflowEvent#CREATED},
254     * {@link WorkflowEvent#ABORTED}, {@link WorkflowEvent#COMPLETED} and {@link WorkflowEvent#DQ_REMOVAL} events. If a workflow is created,
255     * it is automatically added to the cache. If one is aborted or completed, it is automatically removed. If a removal from decision queue
256     * is issued, the current step from workflow, which is assumed to be a {@link Decision}, is removed from the {@link DecisionQueue}.
257     * 
258     * @param event the event passed to this listener
259     */
260    @Override
261    public void actionPerformed( final WikiEvent event ) {
262        if( event instanceof WorkflowEvent ) {
263            if( event.getSrc() instanceof Workflow ) {
264                final Workflow workflow = event.getSrc();
265                switch( event.getType() ) {
266                // Remove from manager
267                case WorkflowEvent.ABORTED   :
268                case WorkflowEvent.COMPLETED : remove( workflow ); break;
269                // Add to manager
270                case WorkflowEvent.CREATED   : add( workflow ); break;
271                default: break;
272                }
273            } else if( event.getSrc() instanceof Decision ) {
274                final Decision decision = event.getSrc();
275                switch( event.getType() ) {
276                // Add to DecisionQueue
277                case WorkflowEvent.DQ_ADDITION : addToDecisionQueue( decision ); break;
278                // Remove from DecisionQueue
279                case WorkflowEvent.DQ_REMOVAL  : removeFromDecisionQueue( decision, event.getArg( 0, Context.class ) ); break;
280                default: break;
281                }
282            }
283            serializeToDisk( new File( m_engine.getWorkDir(), SERIALIZATION_FILE ) );
284        }
285    }
286
287    /**
288     * Protected helper method that adds a newly created Workflow to the cache, and sets its {@code workflowManager} and
289     * {@code Id} properties if not set.
290     *
291     * @param workflow the workflow to add
292     */
293    protected void add( final Workflow workflow ) {
294        m_workflows.add( workflow );
295    }
296
297    /**
298     * Protected helper method that removes a specified Workflow from the cache, and moves it to the workflow history list. This method
299     * defensively checks to see if the workflow has not yet been removed.
300     *
301     * @param workflow the workflow to remove
302     */
303    protected void remove( final Workflow workflow ) {
304        if( m_workflows.contains( workflow ) ) {
305            m_workflows.remove( workflow );
306            m_completed.add( workflow );
307        }
308    }
309
310    protected void removeFromDecisionQueue( final Decision decision, final Context context ) {
311        // If current workflow is waiting for input, restart it and remove Decision from DecisionQueue
312        final int workflowId = decision.getWorkflowId();
313        final Optional< Workflow > optw = m_workflows.stream().filter( w -> w.getId() == workflowId ).findAny();
314        if( optw.isPresent() ) {
315            final Workflow w = optw.get();
316            if( w.getCurrentState() == Workflow.WAITING && decision.equals( w.getCurrentStep() ) ) {
317                getDecisionQueue().remove( decision );
318                // Restart workflow
319                try {
320                    w.restart( context );
321                } catch( final WikiException e ) {
322                    LOG.error( "restarting workflow #" + w.getId() + " caused " + e.getMessage(), e );
323                }
324            }
325        }
326    }
327
328    protected void addToDecisionQueue( final Decision decision ) {
329        getDecisionQueue().add( decision );
330    }
331
332}