1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.dbunit.util.concurrent;
18
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22
23
24
25
26
27
28
29
30
31
32 public class BoundedBuffer implements BoundedChannel {
33
34
35
36
37 private static final Logger logger = LoggerFactory.getLogger(BoundedBuffer.class);
38
39 protected final Object[] array_;
40
41 protected int takePtr_ = 0;
42 protected int putPtr_ = 0;
43
44 protected int usedSlots_ = 0;
45 protected int emptySlots_;
46
47
48
49
50 protected final Object putMonitor_ = new Object();
51
52
53
54
55
56 public BoundedBuffer(int capacity) throws IllegalArgumentException {
57 if (capacity <= 0) throw new IllegalArgumentException();
58 array_ = new Object[capacity];
59 emptySlots_ = capacity;
60 }
61
62
63
64
65
66 public BoundedBuffer() {
67 this(DefaultChannelCapacity.get());
68 }
69
70
71
72
73
74
75 public synchronized int size() {
76 return usedSlots_;
77 }
78
79 public int capacity() {
80 return array_.length;
81 }
82
83 protected void incEmptySlots() {
84 synchronized(putMonitor_) {
85 ++emptySlots_;
86 putMonitor_.notify();
87 }
88 }
89
90 protected synchronized void incUsedSlots() {
91 ++usedSlots_;
92 notify();
93 }
94
95 protected final void insert(Object x) {
96 logger.debug("insert(x={}) - start", x);
97
98 --emptySlots_;
99 array_[putPtr_] = x;
100 if (++putPtr_ >= array_.length) putPtr_ = 0;
101 }
102
103 protected final Object extract() {
104 logger.debug("extract() - start");
105
106 --usedSlots_;
107 Object old = array_[takePtr_];
108 array_[takePtr_] = null;
109 if (++takePtr_ >= array_.length) takePtr_ = 0;
110 return old;
111 }
112
113 public Object peek() {
114 logger.debug("peek() - start");
115
116 synchronized(this) {
117 if (usedSlots_ > 0)
118 return array_[takePtr_];
119 else
120 return null;
121 }
122 }
123
124
125 public void put(Object x) throws InterruptedException {
126 logger.debug("put(x={}) - start", x);
127
128 if (x == null) throw new IllegalArgumentException();
129 if (Thread.interrupted()) throw new InterruptedException();
130
131 synchronized(putMonitor_) {
132 while (emptySlots_ <= 0) {
133 try { putMonitor_.wait(); }
134 catch (InterruptedException ex) {
135 putMonitor_.notify();
136 throw ex;
137 }
138 }
139 insert(x);
140 }
141 incUsedSlots();
142 }
143
144 public boolean offer(Object x, long msecs) throws InterruptedException {
145 logger.debug("offer(x={}, msecs={}) - start", x, msecs);
146
147 if (x == null) throw new IllegalArgumentException();
148 if (Thread.interrupted()) throw new InterruptedException();
149
150 synchronized(putMonitor_) {
151 long start = (msecs <= 0)? 0 : System.currentTimeMillis();
152 long waitTime = msecs;
153 while (emptySlots_ <= 0) {
154 if (waitTime <= 0) return false;
155 try { putMonitor_.wait(waitTime); }
156 catch (InterruptedException ex) {
157 putMonitor_.notify();
158 throw ex;
159 }
160 waitTime = msecs - (System.currentTimeMillis() - start);
161 }
162 insert(x);
163 }
164 incUsedSlots();
165 return true;
166 }
167
168
169
170 public Object take() throws InterruptedException {
171 logger.debug("take() - start");
172
173 if (Thread.interrupted()) throw new InterruptedException();
174 Object old = null;
175 synchronized(this) {
176 while (usedSlots_ <= 0) {
177 try { wait(); }
178 catch (InterruptedException ex) {
179 notify();
180 throw ex;
181 }
182 }
183 old = extract();
184 }
185 incEmptySlots();
186 return old;
187 }
188
189 public Object poll(long msecs) throws InterruptedException {
190 logger.debug("poll(msecs={}) - start", msecs);
191
192 if (Thread.interrupted()) throw new InterruptedException();
193 Object old = null;
194 synchronized(this) {
195 long start = (msecs <= 0)? 0 : System.currentTimeMillis();
196 long waitTime = msecs;
197
198 while (usedSlots_ <= 0) {
199 if (waitTime <= 0) return null;
200 try { wait(waitTime); }
201 catch (InterruptedException ex) {
202 notify();
203 throw ex;
204 }
205 waitTime = msecs - (System.currentTimeMillis() - start);
206
207 }
208 old = extract();
209 }
210 incEmptySlots();
211 return old;
212 }
213
214 }
215
216