1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.dbunit.util.concurrent;
17
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 import java.lang.reflect.Constructor;
22 import java.lang.reflect.InvocationTargetException;
23
24
25
26
27
28
29
30
31
32
33
34 public abstract class SemaphoreControlledChannel implements BoundedChannel {
35
36
37
38
39 private static final Logger logger = LoggerFactory.getLogger(SemaphoreControlledChannel.class);
40
41 protected final Semaphore putGuard_;
42 protected final Semaphore takeGuard_;
43 protected int capacity_;
44
45
46
47
48
49
50
51 public SemaphoreControlledChannel(int capacity)
52 throws IllegalArgumentException {
53 if (capacity <= 0) throw new IllegalArgumentException();
54 capacity_ = capacity;
55 putGuard_ = new Semaphore(capacity);
56 takeGuard_ = new Semaphore(0);
57 }
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 public SemaphoreControlledChannel(int capacity, Class semaphoreClass)
74 throws IllegalArgumentException,
75 NoSuchMethodException,
76 SecurityException,
77 InstantiationException,
78 IllegalAccessException,
79 InvocationTargetException {
80 if (capacity <= 0) throw new IllegalArgumentException();
81 capacity_ = capacity;
82 Class[] longarg = { Long.TYPE };
83 Constructor ctor = semaphoreClass.getDeclaredConstructor(longarg);
84 Long[] cap = {Long.valueOf(capacity)};
85 putGuard_ = (Semaphore)(ctor.newInstance(cap));
86 Long[] zero = {0L};
87 takeGuard_ = (Semaphore)(ctor.newInstance(zero));
88 }
89
90
91
92 public int capacity() {
93 logger.debug("capacity() - start");
94 return capacity_; }
95
96
97
98
99
100
101
102 public int size() {
103 logger.debug("size() - start");
104 return (int)(takeGuard_.permits()); }
105
106
107
108
109 protected abstract void insert(Object x);
110
111
112
113
114 protected abstract Object extract();
115
116 public void put(Object x) throws InterruptedException {
117 logger.debug("put(x=" + x + ") - start");
118
119 if (x == null) throw new IllegalArgumentException();
120 if (Thread.interrupted()) throw new InterruptedException();
121 putGuard_.acquire();
122 try {
123 insert(x);
124 takeGuard_.release();
125 }
126 catch (ClassCastException ex) {
127 putGuard_.release();
128 throw ex;
129 }
130 }
131
132 public boolean offer(Object x, long msecs) throws InterruptedException {
133 logger.debug("offer(x=" + x + ", msecs=" + msecs + ") - start");
134
135 if (x == null) throw new IllegalArgumentException();
136 if (Thread.interrupted()) throw new InterruptedException();
137 if (!putGuard_.attempt(msecs))
138 return false;
139 else {
140 try {
141 insert(x);
142 takeGuard_.release();
143 return true;
144 }
145 catch (ClassCastException ex) {
146 putGuard_.release();
147 throw ex;
148 }
149 }
150 }
151
152 public Object take() throws InterruptedException {
153 logger.debug("take() - start");
154
155 if (Thread.interrupted()) throw new InterruptedException();
156 takeGuard_.acquire();
157 try {
158 Object x = extract();
159 putGuard_.release();
160 return x;
161 }
162 catch (ClassCastException ex) {
163 takeGuard_.release();
164 throw ex;
165 }
166 }
167
168 public Object poll(long msecs) throws InterruptedException {
169 logger.debug("poll(msecs=" + msecs + ") - start");
170
171 if (Thread.interrupted()) throw new InterruptedException();
172 if (!takeGuard_.attempt(msecs))
173 return null;
174 else {
175 try {
176 Object x = extract();
177 putGuard_.release();
178 return x;
179 }
180 catch (ClassCastException ex) {
181 takeGuard_.release();
182 throw ex;
183 }
184 }
185 }
186
187 }