1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.dbunit.dataset.stream;
22
23 import org.dbunit.dataset.AbstractTable;
24 import org.dbunit.dataset.DataSetException;
25 import org.dbunit.dataset.ITable;
26 import org.dbunit.dataset.ITableIterator;
27 import org.dbunit.dataset.ITableMetaData;
28 import org.dbunit.dataset.RowOutOfBoundsException;
29 import org.dbunit.util.concurrent.BoundedBuffer;
30 import org.dbunit.util.concurrent.Channel;
31 import org.dbunit.util.concurrent.Puttable;
32 import org.dbunit.util.concurrent.Takable;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36
37
38
39
40
41
42
43
44 public class StreamingIterator implements ITableIterator
45 {
46
47
48
49
50 private static final Logger logger = LoggerFactory.getLogger(StreamingIterator.class);
51
52 private static final Object EOD = new Object();
53
54 private final Takable _channel;
55 private StreamingTable _activeTable;
56 private Object _taken = null;
57 private boolean _eod = false;
58
59
60
61 private Exception _asyncException;
62
63
64
65
66
67
68
69
70
71 public StreamingIterator(IDataSetProducer source) throws DataSetException
72 {
73 Channel channel = new BoundedBuffer(30);
74 _channel = channel;
75
76 AsynchronousConsumer consumer = new AsynchronousConsumer(source, channel, this);
77 Thread thread = new Thread(consumer, "StreamingIterator");
78 thread.setDaemon(true);
79 thread.start();
80
81
82 try
83 {
84 _taken = _channel.take();
85 }
86 catch (InterruptedException e)
87 {
88 logger.debug("Thread '" + Thread.currentThread() + "' was interrupted");
89 throw resolveException(e);
90 }
91 }
92
93 private DataSetException resolveException(InterruptedException cause) throws DataSetException
94 {
95 String msg = "Current thread was interrupted (Thread=" + Thread.currentThread() + ")";
96 if(this._asyncException != null)
97 {
98 return new DataSetException(msg, this._asyncException);
99 }
100 else
101 {
102 return new DataSetException(msg, cause);
103 }
104 }
105
106
107
108
109 public boolean next() throws DataSetException
110 {
111 logger.debug("next() - start");
112
113
114 if (_eod)
115 {
116 return false;
117 }
118
119
120 while (_activeTable != null && _activeTable.next())
121 ;
122
123
124 if (_taken == EOD)
125 {
126 _eod = true;
127 _activeTable = null;
128
129 logger.debug("End of iterator.");
130 return false;
131 }
132
133
134 if (_taken instanceof ITableMetaData)
135 {
136 _activeTable = new StreamingTable((ITableMetaData)_taken);
137 return true;
138 }
139
140 throw new IllegalStateException(
141 "Unexpected object taken from asyncronous handler: " + _taken);
142 }
143
144 public ITableMetaData getTableMetaData() throws DataSetException
145 {
146 logger.debug("getTableMetaData() - start");
147
148 return _activeTable.getTableMetaData();
149 }
150
151 public ITable getTable() throws DataSetException
152 {
153 logger.debug("getTable() - start");
154
155 return _activeTable;
156 }
157
158 private void handleException(Exception e)
159 {
160
161 this._asyncException = e;
162 }
163
164
165
166
167 private class StreamingTable extends AbstractTable
168 {
169
170
171
172
173 private final Logger logger = LoggerFactory.getLogger(StreamingTable.class);
174
175 private ITableMetaData _metaData;
176 private int _lastRow = -1;
177 private boolean _eot = false;
178 private Object[] _rowValues;
179
180 public StreamingTable(ITableMetaData metaData)
181 {
182 _metaData = metaData;
183 }
184
185 boolean next() throws DataSetException
186 {
187 logger.debug("next() - start");
188
189
190 if (_eot)
191 {
192 return false;
193 }
194
195 try
196 {
197 _taken = _channel.take();
198 if (!(_taken instanceof Object[]))
199 {
200 _eot = true;
201 return false;
202 }
203
204 _lastRow++;
205 _rowValues = (Object[])_taken;
206 return true;
207 }
208 catch (InterruptedException e)
209 {
210 throw resolveException(e);
211 }
212 }
213
214
215
216
217 public ITableMetaData getTableMetaData()
218 {
219 logger.debug("getTableMetaData() - start");
220
221 return _metaData;
222 }
223
224 public int getRowCount()
225 {
226 logger.debug("getRowCount() - start");
227
228 throw new UnsupportedOperationException();
229 }
230
231 public Object getValue(int row, String columnName) throws DataSetException
232 {
233 if(logger.isDebugEnabled())
234 logger.debug("getValue(row={}, columnName={}) - start", Integer.toString(row), columnName);
235
236
237 while (!_eot && row > _lastRow)
238 {
239 next();
240 }
241
242 if (row < _lastRow)
243 {
244 throw new UnsupportedOperationException("Cannot go backward!");
245 }
246
247 if (_eot || row > _lastRow)
248 {
249 throw new RowOutOfBoundsException(row + " > " + _lastRow);
250 }
251
252 return _rowValues[getColumnIndex(columnName)];
253 }
254
255 public String toString()
256 {
257 StringBuilder sb = new StringBuilder();
258 sb.append(getClass().getName()).append("[");
259 sb.append("_metaData=")
260 .append(this._metaData == null ? "null" : this._metaData
261 .toString());
262 sb.append(", _eot=").append(this._eot);
263 sb.append(", _lastRow=").append(this._lastRow);
264 sb.append(", _rowValues=").append(
265 this._rowValues == null ? "null" : this._rowValues
266 .toString());
267 sb.append("]");
268 return sb.toString();
269 }
270 }
271
272
273
274
275 private static class AsynchronousConsumer implements Runnable, IDataSetConsumer
276 {
277
278
279
280
281 private static final Logger logger = LoggerFactory.getLogger(AsynchronousConsumer.class);
282
283 private final IDataSetProducer _producer;
284 private final Puttable _channel;
285 private final StreamingIterator _exceptionHandler;
286 private final Thread _invokerThread;
287
288 public AsynchronousConsumer(IDataSetProducer source, Puttable channel, StreamingIterator exceptionHandler)
289 {
290 _producer = source;
291 _channel = channel;
292 _exceptionHandler = exceptionHandler;
293 _invokerThread = Thread.currentThread();
294 }
295
296
297
298
299 public void run()
300 {
301 logger.debug("run() - start");
302
303 try
304 {
305 _producer.setConsumer(this);
306 _producer.produce();
307 }
308 catch (Exception e)
309 {
310 _exceptionHandler.handleException(e);
311
312 _invokerThread.interrupt();
313 }
314
315 logger.debug("End of thread " + Thread.currentThread());
316 }
317
318
319
320
321 public void startDataSet() throws DataSetException
322 {
323 }
324
325 public void endDataSet() throws DataSetException
326 {
327 logger.debug("endDataSet() - start");
328
329 try
330 {
331 _channel.put(EOD);
332 }
333 catch (InterruptedException e)
334 {
335 throw new DataSetException("Operation was interrupted");
336 }
337 }
338
339 public void startTable(ITableMetaData metaData) throws DataSetException
340 {
341 logger.debug("startTable(metaData={}) - start", metaData);
342
343 try
344 {
345 _channel.put(metaData);
346 }
347 catch (InterruptedException e)
348 {
349 throw new DataSetException("Operation was interrupted");
350 }
351 }
352
353 public void endTable() throws DataSetException
354 {
355 }
356
357 public void row(Object[] values) throws DataSetException
358 {
359 logger.debug("row(values={}) - start", values);
360
361 try
362 {
363 _channel.put(values);
364 }
365 catch (InterruptedException e)
366 {
367 throw new DataSetException("Operation was interrupted");
368 }
369 }
370 }
371
372 }