View Javadoc
1   package net.avcompris.status.dao.impl;
2   
3   import static com.google.common.base.Preconditions.checkNotNull;
4   import static com.google.common.base.Preconditions.checkState;
5   import static com.google.common.collect.Lists.newArrayList;
6   import static com.google.common.collect.Sets.newHashSet;
7   import static net.avcompris.commons3.databeans.DataBeans.instantiate;
8   import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
9   
10  import java.io.IOException;
11  import java.sql.Connection;
12  import java.sql.PreparedStatement;
13  import java.sql.ResultSet;
14  import java.sql.SQLException;
15  import java.sql.SQLIntegrityConstraintViolationException;
16  import java.util.List;
17  import java.util.Set;
18  
19  import javax.annotation.Nullable;
20  import javax.sql.DataSource;
21  
22  import org.springframework.beans.factory.annotation.Autowired;
23  import org.springframework.beans.factory.annotation.Value;
24  import org.springframework.stereotype.Component;
25  
26  import net.avcompris.commons3.dao.impl.AbstractDaoInRDS;
27  import net.avcompris.commons3.utils.Clock;
28  import net.avcompris.status.dao.CheckDto;
29  import net.avcompris.status.dao.EndpointDto;
30  import net.avcompris.status.dao.ServicesStatusHistoryDto;
31  import net.avcompris.status.dao.StatusDao;
32  import net.avcompris.status.query.CheckStatus;
33  
34  @Component
35  public final class StatusDaoInRDS extends AbstractDaoInRDS implements StatusDao {
36  
37  	private final boolean debug;
38  
39  	@Autowired
40  	public StatusDaoInRDS( //
41  			@Value("#{rds.dataSource}") final DataSource dataSource, //
42  			@Value("#{rds.tableNames.correlations}") final String tableName, //
43  			final Clock clock) {
44  
45  		super(dataSource, tableName, clock);
46  
47  		debug = System.getProperty("debug") != null;
48  	}
49  
50  	@Override
51  	public ServicesStatusHistoryDto getServicesCachedStatus(final EndpointDto... endpoints)
52  			throws SQLException, IOException {
53  
54  		checkNotNull(endpoints, "endpoints");
55  
56  		final MutableServicesStatusHistoryDto history = instantiate(MutableServicesStatusHistoryDto.class);
57  
58  		for (final EndpointDto e : endpoints) {
59  
60  			final String serviceId = e.getServiceId();
61  			final String endpoint = e.getEndpoint();
62  
63  			final MutableServiceStatusHistoryDto serviceStatus = instantiate(MutableServiceStatusHistoryDto.class) //
64  					.setServiceId(serviceId) //
65  					.setEndpoint(endpoint) //
66  					.setStart(0) //
67  					.setTotal(getCheckTotalCount(endpoint));
68  
69  			history.addToItems(serviceStatus);
70  
71  			for (final CheckDto check : getChecks(endpoint)) {
72  
73  				serviceStatus.addToChecks(check);
74  			}
75  		}
76  
77  		return history;
78  	}
79  
80  	private int getCheckTotalCount(final String endpoint) throws SQLException {
81  
82  		try (Connection cxn = getConnection()) {
83  
84  			try (PreparedStatement pstmt = cxn.prepareStatement("SELECT" //
85  					+ " COUNT(1)" //
86  					+ " FROM " + tableName + "_checks" //
87  					+ " WHERE endpoint = ?" //
88  			)) {
89  
90  				setString(pstmt, 1, endpoint);
91  
92  				try (ResultSet rs = pstmt.executeQuery()) {
93  
94  					rs.next();
95  
96  					return getInt(rs, 1);
97  				}
98  			}
99  		}
100 	}
101 
102 	private Iterable<CheckDto> getChecks(final String endpoint) throws SQLException {
103 
104 		final List<CheckDto> checks = newArrayList();
105 
106 		try (Connection cxn = getConnection()) {
107 
108 			try (PreparedStatement pstmt = cxn.prepareStatement("SELECT" //
109 					+ " c.check_id," //
110 					+ " c.status," //
111 					+ " c.started_at," //
112 					+ " c.ended_at," //
113 					+ " c.elapsed_ms," //
114 					+ " c.status_code," //
115 					+ " e.error_message" //
116 					+ " FROM " + tableName + "_checks AS c" //
117 					+ " LEFT JOIN " + tableName + "_checks_errors AS e" //
118 					+ " ON c.check_id = e.check_id" //
119 					+ " WHERE c.endpoint = ?" //
120 					+ " ORDER BY started_at DESC" //
121 			)) {
122 
123 				setString(pstmt, 1, endpoint);
124 
125 				try (ResultSet rs = pstmt.executeQuery()) {
126 
127 					final Set<String> checkIds = newHashSet();
128 
129 					while (rs.next()) {
130 
131 						final String checkId = getString(rs, "check_id");
132 
133 						if (checkIds.contains(checkId)) {
134 							continue;
135 						}
136 
137 						checks.add(instantiate(MutableCheckDto.class) //
138 								.setId(checkId) //
139 								.setStatus(getEnum(rs, "status", CheckStatus.class)) //
140 								.setStartedAt(getDateTime(rs, "started_at")) //
141 								.setEndedAt(getDateTime(rs, "ended_at")) //
142 								.setElapsedMs(getInteger(rs, "elapsed_ms")) //
143 								.setStatusCode(getInteger(rs, "status_code")) //
144 								.setErrorMessage(getString(rs, "error_message")));
145 					}
146 				}
147 			}
148 		}
149 
150 		return checks;
151 	}
152 
153 	@Override
154 	public String initCheck( //
155 			final String serviceId, //
156 			final String endpoint //
157 	) throws SQLException, IOException {
158 
159 		checkNotNull(serviceId, "serviceId");
160 		checkNotNull(endpoint, "endpoint");
161 
162 		final String checkId;
163 
164 		try (Connection cxn = getConnection()) {
165 
166 			ensureEndpoint(cxn, endpoint);
167 
168 			checkId = "K-" //
169 					+ System.currentTimeMillis() + "-" //
170 					+ randomAlphanumeric(20);
171 
172 			try (PreparedStatement pstmt = cxn.prepareStatement("INSERT INTO " + tableName + "_checks" //
173 					+ " (check_id," //
174 					+ " endpoint," //
175 					+ " status," //
176 					+ " started_at)" //
177 					+ " VALUES (?, ?, ?, ?)" //
178 			)) {
179 
180 				setString(pstmt, 1, checkId);
181 				setString(pstmt, 2, endpoint);
182 				setString(pstmt, 3, CheckStatus.RUNNING.name());
183 				setDateTime(pstmt, 4, clock.now());
184 
185 				pstmt.executeUpdate();
186 			}
187 		}
188 
189 		return checkId;
190 	}
191 
192 	private void ensureEndpoint( //
193 			final Connection cxn, //
194 			final String endpoint //
195 	) throws SQLException {
196 
197 		try (PreparedStatement pstmt = cxn.prepareStatement("SELECT" //
198 				+ " 1" //
199 				+ " FROM " + tableName //
200 				+ " WHERE endpoint = ?" //
201 		)) {
202 
203 			setString(pstmt, 1, endpoint);
204 
205 			try (ResultSet rs = pstmt.executeQuery()) {
206 
207 				if (rs.next()) {
208 
209 					return; // The endpoint already exists in the DataBase
210 				}
211 			}
212 		}
213 
214 		try (PreparedStatement pstmt = cxn.prepareStatement("INSERT INTO " + tableName //
215 				+ " (endpoint)" //
216 				+ " VALUES (?)" //
217 		)) {
218 
219 			setString(pstmt, 1, endpoint);
220 
221 			try {
222 
223 				pstmt.executeUpdate();
224 
225 			} catch (final SQLIntegrityConstraintViolationException e) {
226 
227 				// do nothing if rare concurrent condition occurs
228 			}
229 		}
230 	}
231 
232 	@Override
233 	public void addCheckError( //
234 			final String serviceId, //
235 			final String endpoint, //
236 			@Nullable final String checkId, //
237 			final String errorMessage //
238 	) throws SQLException, IOException {
239 
240 		checkNotNull(serviceId, "serviceId");
241 		checkNotNull(endpoint, "endpoint");
242 		checkNotNull(errorMessage, "errorMessage");
243 
244 		try (Connection cxn = getConnection()) {
245 
246 			if (checkId != null) {
247 
248 				cxn.setAutoCommit(false);
249 
250 				try (PreparedStatement pstmt = cxn.prepareStatement("UPDATE " + tableName + "_checks" //
251 						+ " SET error_message = ?," //
252 						+ " ended_at = ?," //
253 						+ " status = ?" //
254 						+ " WHERE check_id = ?" //
255 						+ " AND error_message IS NULL" //
256 				)) {
257 
258 					setString(pstmt, 1, errorMessage);
259 					setDateTime(pstmt, 2, clock.now());
260 					setString(pstmt, 3, CheckStatus.ERROR.name());
261 
262 					setString(pstmt, 4, checkId);
263 
264 					pstmt.executeUpdate();
265 				}
266 			}
267 
268 			try (PreparedStatement pstmt = cxn.prepareStatement("INSERT INTO " + tableName + "_checks_errors" //
269 					+ " (service_id," //
270 					+ " endpoint," //
271 					+ " check_id," //
272 					+ " error_at," //
273 					+ " error_message)" //
274 					+ " VALUES (?, ?, ?, ?, ?)" //
275 			)) {
276 
277 				setString(pstmt, 1, serviceId);
278 				setString(pstmt, 2, endpoint);
279 				setString(pstmt, 3, checkId);
280 				setDateTime(pstmt, 4, clock.now());
281 				setString(pstmt, 5, errorMessage);
282 
283 				pstmt.executeUpdate();
284 			}
285 
286 			if (checkId != null) {
287 				cxn.commit();
288 			}
289 		}
290 	}
291 
292 	@Override
293 	public void endCheck( //
294 			final String checkId, //
295 			final int elapsedMs, //
296 			final int statusCode //
297 	) throws SQLException, IOException {
298 
299 		checkNotNull(checkId, "checkId");
300 
301 		try (Connection cxn = getConnection()) {
302 
303 			try (PreparedStatement pstmt = cxn.prepareStatement("UPDATE " + tableName + "_checks" //
304 					+ " SET status = ?," //
305 					+ " ended_at = ?," //
306 					+ " elapsed_ms = ?," //
307 					+ " status_code = ?" //
308 					+ " WHERE check_id = ?" //
309 					+ " AND status = ?" //
310 					+ " AND ended_at IS NULL" //
311 					+ " AND error_message IS NULL" //
312 			)) {
313 
314 				setString(pstmt, 1, CheckStatus.SUCCESS.name());
315 				setDateTime(pstmt, 2, clock.now());
316 				setInt(pstmt, 3, elapsedMs);
317 				setInt(pstmt, 4, statusCode);
318 
319 				setString(pstmt, 5, checkId);
320 				setString(pstmt, 6, CheckStatus.RUNNING.name());
321 
322 				final int updated = pstmt.executeUpdate();
323 
324 				if (updated != 1) {
325 					throw new IllegalStateException("Cannot update checkId: " + checkId);
326 				}
327 			}
328 		}
329 	}
330 
331 	@Override
332 	public CheckDto getCheck( //
333 			final String checkId //
334 	) throws SQLException, IOException {
335 
336 		checkNotNull(checkId, "checkId");
337 
338 		try (Connection cxn = getConnection()) {
339 
340 			try (PreparedStatement pstmt = cxn.prepareStatement("SELECT" //
341 					+ " status," //
342 					+ " started_at," //
343 					+ " ended_at," //
344 					+ " elapsed_ms," //
345 					+ " status_code," //
346 					+ " error_message" //
347 					+ " FROM " + tableName + "_checks" //
348 					+ " WHERE check_id = ?" //
349 			)) {
350 
351 				setString(pstmt, 1, checkId);
352 
353 				try (ResultSet rs = pstmt.executeQuery()) {
354 
355 					if (!rs.next()) {
356 						checkState(false, "checkId not found: %s", checkId);
357 					}
358 
359 					return instantiate(MutableCheckDto.class) //
360 							.setId(checkId) //
361 							.setStatus(getEnum(rs, "status", CheckStatus.class)) //
362 							.setStartedAt(getDateTime(rs, "started_at")) //
363 							.setEndedAt(getDateTime(rs, "ended_at")) //
364 							.setElapsedMs(getInteger(rs, "elapsed_ms")) //
365 							.setStatusCode(getInteger(rs, "status_code")) //
366 							.setErrorMessage(getString(rs, "error_message"));
367 				}
368 			}
369 		}
370 	}
371 }