Skip to content

Method: introduce(DependencyTask)

1: package de.fhdw.wtf.common.task;
2:
3: import java.util.ArrayList;
4: import java.util.Collection;
5: import java.util.Iterator;
6: import java.util.concurrent.ConcurrentLinkedQueue;
7: import java.util.concurrent.ExecutionException;
8: import java.util.concurrent.ExecutorService;
9: import java.util.concurrent.Executors;
10: import java.util.concurrent.Future;
11:
12: import de.fhdw.wtf.common.task.result.TaskResult;
13:
14: /**
15: *
16: *
17: */
18: public final class TaskExecutorFixed implements TaskExecutor {
19:         
20:         private static final int MAX_NUM_THREADS = 10;
21:         
22:         private final ExecutorService executor;
23:         
24:         private final ConcurrentLinkedQueue<Future<TaskResult>> results;
25:         
26:         private final Collection<DependencyTask> knownTasks;
27:         
28:         /**
29:          * Constructor of {@link TaskExecutorFixed}.
30:          */
31:         private TaskExecutorFixed() {
32:                 this.executor = Executors.newFixedThreadPool(MAX_NUM_THREADS);
33:                 this.results = new ConcurrentLinkedQueue<>();
34:                 this.knownTasks = new ArrayList<>();
35:         }
36:         
37:         /**
38:          * Creates a {@link TaskExecutorFixed}-Object.
39:          *
40:          * @return The {@link TaskExecutorFixed}-Object.
41:          */
42:         public static TaskExecutorFixed create() {
43:                 return new TaskExecutorFixed();
44:         }
45:         
46:         /**
47:          * Starts all tasks contained by the given collection <code>tasks</code>. Successors of this tasks are specified by
48:          * its dependencies.
49:          *
50:          * @param tasks
51:          * : Collection of Tasks.
52:          */
53:         public void start(final Collection<? extends DependencyTask> tasks) {
54:                 for (final DependencyTask task : tasks) {
55:                         this.submit(task);
56:                 }
57:         }
58:         
59:         @Override
60:         public synchronized void submit(final DependencyTask a) {
61:                 final Future<TaskResult> result = this.executor.submit(a);
62:                 this.results.add(result);
63:         }
64:         
65:         private synchronized boolean isQueueEmpty() {
66:                 return this.results.isEmpty();
67:         }
68:         
69:         @Override
70:         public Collection<TaskResult> getResultsAndShutdown() throws InterruptedException, ExecutionException {
71:                 final Collection<TaskResult> result = new ArrayList<>();
72:                 while (!this.isQueueEmpty()) {
73:                         final Future<TaskResult> current = this.results.poll();
74:                         result.add(current.get());
75:                 }
76:                 this.executor.shutdown();
77:                 return result;
78:         }
79:         
80:         @Override
81:         public void introduce(final DependencyTask newTask) {
82:                 this.knownTasks.add(newTask);
83:         }
84:         
85:         @Override
86:         public void startAllKnownTasks() throws InterruptedException {
87:                 final Collection<DependencyTask> startThese = new ArrayList<>();
88:                 final Iterator<DependencyTask> i = this.knownTasks.iterator();
89:                 while (i.hasNext()) {
90:                         final DependencyTask current = i.next();
91:                         if (current.getDependencies().isEmpty()) {
92:                                 startThese.add(current);
93:                         }
94:                 }
95:                 this.results.addAll(this.executor.invokeAll(startThese));
96:         }
97:         
98: }