View Javadoc
1   package net.avcompris.status.core.impl;
2   
3   import static com.google.common.base.Preconditions.checkArgument;
4   import static com.google.common.base.Preconditions.checkNotNull;
5   import static com.google.common.collect.Lists.newArrayList;
6   import static net.avcompris.commons3.core.DateTimeHolderImpl.toDateTimeHolder;
7   import static net.avcompris.commons3.core.DateTimeHolderImpl.toDateTimeHolderOrNull;
8   import static net.avcompris.commons3.databeans.DataBeans.instantiate;
9   import static net.avcompris.commons3.utils.EnvUtils.getEnvProperty;
10  import static org.apache.commons.lang3.StringUtils.substringAfter;
11  
12  import java.io.File;
13  import java.io.FileNotFoundException;
14  import java.io.IOException;
15  import java.net.HttpURLConnection;
16  import java.net.URL;
17  import java.util.List;
18  
19  import javax.annotation.Nullable;
20  
21  import org.apache.commons.lang3.NotImplementedException;
22  import org.apache.commons.logging.Log;
23  import org.joda.time.DateTime;
24  import org.springframework.beans.factory.annotation.Autowired;
25  import org.springframework.stereotype.Service;
26  
27  import com.google.common.collect.Iterables;
28  
29  import net.avcompris.commons3.api.exception.ServiceException;
30  import net.avcompris.commons3.api.exception.UnexpectedException;
31  import net.avcompris.commons3.core.Permissions;
32  import net.avcompris.commons3.core.impl.AbstractServiceImpl;
33  import net.avcompris.commons3.utils.Clock;
34  import net.avcompris.commons3.utils.LogFactory;
35  import net.avcompris.status.api.Check;
36  import net.avcompris.status.api.InlineService;
37  import net.avcompris.status.api.ServiceStatus;
38  import net.avcompris.status.api.ServiceStatusHistory;
39  import net.avcompris.status.api.ServicesStatus;
40  import net.avcompris.status.api.ServicesStatusHistory;
41  import net.avcompris.status.api.StatusConfig;
42  import net.avcompris.status.api.StatusConfig.Expect;
43  import net.avcompris.status.api.StatusService;
44  import net.avcompris.status.api.TriggerType;
45  import net.avcompris.status.dao.CheckDto;
46  import net.avcompris.status.dao.EndpointDto;
47  import net.avcompris.status.dao.ServiceStatusHistoryDto;
48  import net.avcompris.status.dao.ServicesStatusHistoryDto;
49  import net.avcompris.status.dao.StatusDao;
50  import net.avcompris.status.query.CheckStatus;
51  
52  @Service
53  public final class StatusServiceImpl extends AbstractServiceImpl implements StatusService {
54  
55  	private static final Log logger = LogFactory.getLog(StatusServiceImpl.class);
56  
57  	private final StatusDao statusDao;
58  
59  	private final StatusConfig config;
60  
61  	@Autowired
62  	public StatusServiceImpl(final Permissions permissions, final Clock clock, //
63  			final StatusDao statusDao) throws IOException {
64  
65  		super(permissions, clock);
66  
67  		this.statusDao = checkNotNull(statusDao, "statusDao");
68  
69  		final String configFilePath = getEnvProperty("configFile", "/etc/avcompris/service-status.yml");
70  
71  		final File configFile = new File(configFilePath);
72  
73  		if (!configFile.isFile()) {
74  			throw new FileNotFoundException("configFile should exist: " + configFile.getCanonicalPath());
75  		}
76  
77  		config = ConfigLoader.loadConfig(configFile);
78  	}
79  
80  	@Override
81  	public StatusConfig getStatusConfig(final String correlationId) throws ServiceException {
82  
83  		checkNotNull(correlationId, "correlationId");
84  
85  		return config;
86  	}
87  
88  	@Override
89  	public ServicesStatus getServicesLiveStatus(final String correlationId) throws ServiceException {
90  
91  		checkNotNull(correlationId, "correlationId");
92  
93  		// LogFactory.setCorrelationId(correlationId);
94  
95  		final MutableServicesStatus servicesStatus = instantiate(MutableServicesStatus.class) //
96  				.setCheckStartedAt(toDateTimeHolder(clock.now())) //
97  				.setTriggerType(TriggerType.UNKNOWN);
98  
99  		final List<CheckThread> threads = newArrayList();
100 
101 		for (final StatusConfig.ServiceConfig service : config.getServices()) {
102 
103 			threads.add(new CheckThread( //
104 					correlationId, //
105 					service.getId(), //
106 					service.getEndpoint(), //
107 					service.getTimeOutMs().intValue(), //
108 					service.getExpect()));
109 		}
110 
111 		threads.stream().forEach((thread) -> thread.start());
112 
113 		threads.stream().forEach((thread) -> {
114 
115 			try {
116 
117 				thread.join();
118 
119 			} catch (final InterruptedException e) {
120 
121 				try {
122 
123 					addCheckError(thread.serviceId, thread.endpoint, thread.checkId, e);
124 
125 				} catch (final ServiceException e2) {
126 
127 					throw new RuntimeException(e2);
128 				}
129 			}
130 		});
131 
132 		for (final CheckThread thread : threads) {
133 
134 			final String checkId = thread.checkId;
135 
136 			if (checkId == null) {
137 				continue;
138 			}
139 
140 			final CheckDto check = wrap(() ->
141 
142 			statusDao.getCheck(checkId));
143 
144 			servicesStatus.addToItems(instantiate(MutableServiceStatus.class) //
145 					.setServiceId(thread.serviceId) //
146 					.setEndpoint(thread.endpoint) //
147 					.setCheck(dto2Check(check)));
148 		}
149 
150 		servicesStatus.setCheckEndedAt(toDateTimeHolder(clock.now()));
151 
152 		return servicesStatus;
153 	}
154 
155 	private StatusConfig.ServiceConfig extractServiceConfig(final String serviceId) {
156 
157 		for (final StatusConfig.ServiceConfig serviceConfig : config.getServices()) {
158 
159 			if (serviceId.contentEquals(serviceConfig.getId())) {
160 				return serviceConfig;
161 			}
162 		}
163 
164 		throw new IllegalArgumentException("Unknown serviceId: " + serviceId);
165 	}
166 
167 	@Override
168 	public ServiceStatus getServiceLiveStatus(final String correlationId, final String serviceId)
169 			throws ServiceException {
170 
171 		checkNotNull(correlationId, "correlationId");
172 		checkNotNull(serviceId, "serviceId");
173 
174 		final StatusConfig.ServiceConfig serviceConfig = extractServiceConfig(serviceId);
175 
176 		final CheckThread thread = new CheckThread( //
177 				correlationId, //
178 				serviceId, //
179 				serviceConfig.getEndpoint(), //
180 				serviceConfig.getTimeOutMs(), //
181 				serviceConfig.getExpect());
182 
183 		thread.run();
184 
185 		final String checkId = thread.checkId;
186 
187 		if (checkId == null) {
188 			throw new UnexpectedException("Could not acquire checkId");
189 		}
190 
191 		final CheckDto check = wrap(() ->
192 
193 		statusDao.getCheck(checkId));
194 
195 		return instantiate(MutableServiceStatus.class) //
196 				.setServiceId(thread.serviceId) //
197 				.setEndpoint(thread.endpoint) //
198 				.setCheck(dto2Check(check));
199 	}
200 
201 	@Override
202 	public ServicesStatusHistory getServicesStatusHistory(final String correlationId) throws ServiceException {
203 
204 		checkNotNull(correlationId, "correlationId");
205 
206 		final List<EndpointDto> endpoints = newArrayList();
207 
208 		for (final StatusConfig.ServiceConfig serviceConfig : config.getServices()) {
209 
210 			endpoints.add(instantiate(EndpointDto.class) //
211 					.setServiceId(serviceConfig.getId()) //
212 					.setEndpoint(serviceConfig.getEndpoint()));
213 		}
214 
215 		final ServicesStatusHistoryDto servicesStatusHistoryDto = wrap(()
216 
217 		-> statusDao.getServicesCachedStatus(Iterables.toArray(endpoints, EndpointDto.class)));
218 
219 		final MutableServicesStatusHistory history = instantiate(MutableServicesStatusHistory.class);
220 
221 		for (final ServiceStatusHistoryDto serviceStatusHistoryDto : servicesStatusHistoryDto.getItems()) {
222 
223 			history.addToItems(dto2ServiceStatusHistory(serviceStatusHistoryDto));
224 		}
225 
226 		return history;
227 	}
228 
229 	private static ServiceStatusHistory dto2ServiceStatusHistory(
230 			final ServiceStatusHistoryDto serviceStatusHistoryDto) {
231 
232 		final MutableServiceStatusHistory history = instantiate(MutableServiceStatusHistory.class) //
233 				.setServiceId(serviceStatusHistoryDto.getServiceId()) //
234 				.setEndpoint(serviceStatusHistoryDto.getEndpoint()) //
235 				.setStart(serviceStatusHistoryDto.getStart()) //
236 				.setTotal(serviceStatusHistoryDto.getTotal());
237 
238 		for (final CheckDto checkDto : serviceStatusHistoryDto.getChecks()) {
239 
240 			history.addToChecks(dto2Check(checkDto));
241 		}
242 
243 		return history;
244 	}
245 
246 	private static Check dto2Check(final CheckDto checkDto) {
247 
248 		final CheckStatus status = checkDto.getStatus();
249 
250 		final boolean success = status == CheckStatus.SUCCESS;
251 
252 		final DateTime startedAt = checkDto.getStartedAt();
253 		final DateTime endedAt = checkDto.getEndedAt();
254 
255 		return instantiate(MutableCheck.class) //
256 				.setId(checkDto.getId()) //
257 				.setStartedAt(toDateTimeHolder(startedAt)) //
258 				.setEndedAt(toDateTimeHolderOrNull(endedAt)) //
259 				.setElapsedMs(checkDto.getElapsedMs()) //
260 				.setTriggerType(TriggerType.UNKNOWN) //
261 				.setStatus(status) //
262 				.setSuccess(success) //
263 				.setErrorMessage(checkDto.getErrorMessage()) //
264 				.setStatusCode(checkDto.getStatusCode());
265 	}
266 
267 	@Override
268 	public ServiceStatusHistory getServiceStatusHistory(final String correlationId, final String serviceId)
269 			throws ServiceException {
270 
271 		checkNotNull(correlationId, "correlationId");
272 		checkNotNull(serviceId, "serviceId");
273 
274 		throw new NotImplementedException("");
275 	}
276 
277 	private String initCheck(final String serviceId, final String endpoint) throws ServiceException {
278 
279 		return wrap(()
280 
281 		-> statusDao.initCheck(serviceId, endpoint));
282 	}
283 
284 	private void addCheckError(final String serviceId, final String endpoint, @Nullable final String checkId,
285 			final Throwable error) throws ServiceException {
286 
287 		logger.warn("addCheckError(): " + endpoint, error);
288 
289 		wrap(()
290 
291 		-> statusDao.addCheckError(serviceId, endpoint, checkId, error.getMessage()));
292 	}
293 
294 	private void addCheckError(final String serviceId, final String endpoint, @Nullable final String checkId,
295 			final String errorMessage) throws ServiceException {
296 
297 		logger.warn("addCheckError(): " + endpoint + ": " + errorMessage);
298 
299 		wrap(()
300 
301 		-> statusDao.addCheckError(serviceId, endpoint, checkId, errorMessage));
302 	}
303 
304 	private void endCheck( //
305 			final String checkId, //
306 			final CheckResult result //
307 	) throws ServiceException {
308 
309 		wrap(()
310 
311 		-> statusDao.endCheck(checkId, result.elapsedMs, result.statusCode));
312 	}
313 
314 	private final class CheckThread extends Thread {
315 
316 		private final String correlationId;
317 		private final String serviceId;
318 		private final String endpoint;
319 		private final int timeOutMs;
320 		private final Expect expect;
321 
322 		@Nullable
323 		private String checkId;
324 
325 		public CheckThread( //
326 				final String correlationId, //
327 				final String serviceId, //
328 				final String endpoint, //
329 				final int timeOutMs, //
330 				final Expect expect) {
331 
332 			this.correlationId = checkNotNull(correlationId, "correlationId");
333 			this.serviceId = checkNotNull(serviceId, "serviceId");
334 			this.endpoint = checkNotNull(endpoint, "endpoint");
335 			this.timeOutMs = timeOutMs;
336 			this.expect = checkNotNull(expect, "expect");
337 		}
338 
339 		@Override
340 		public void run() {
341 
342 			LogFactory.setCorrelationId(correlationId);
343 
344 			final CheckResult result;
345 
346 			try {
347 
348 				checkId = initCheck(serviceId, endpoint);
349 
350 				result = doCheckEndpoint(endpoint);
351 
352 			} catch (final ServiceException e) {
353 
354 				try {
355 
356 					addCheckError(serviceId, endpoint, checkId, e);
357 
358 					return;
359 
360 				} catch (final ServiceException e2) {
361 
362 					throw new RuntimeException(e2);
363 				}
364 
365 			} catch (final IOException | RuntimeException | Error e) {
366 
367 				try {
368 
369 					addCheckError(serviceId, endpoint, checkId, e);
370 
371 					return;
372 
373 				} catch (final ServiceException e2) {
374 
375 					throw new RuntimeException(e2);
376 				}
377 			}
378 
379 			try {
380 
381 				endCheck(checkId, result);
382 
383 				if (expect.getStatusCode() != null) {
384 
385 					final int expectedStatusCode = expect.getStatusCode();
386 
387 					if (expectedStatusCode != result.statusCode) {
388 
389 						addCheckError(serviceId, endpoint, checkId,
390 								"Expected statusCode: " + expectedStatusCode + ", but was: " + result.statusCode);
391 
392 						return;
393 					}
394 				}
395 
396 			} catch (final ServiceException e) {
397 
398 				throw new RuntimeException(e);
399 			}
400 		}
401 	}
402 
403 	private static CheckResult doCheckEndpoint(final String endpoint) throws IOException {
404 
405 		checkArgument(endpoint.startsWith("GET "), "Endpoint should start with \"GET \", but was: %s", endpoint);
406 
407 		final String url = substringAfter(endpoint, "GET").trim();
408 
409 		checkArgument(url.startsWith("https://") //
410 				|| url.startsWith("http://"), //
411 				"URL should start with \"https://\" or \"http://\", but was: %s", endpoint);
412 
413 		final long startMs = System.currentTimeMillis();
414 
415 		final HttpURLConnection cxn = (HttpURLConnection) new URL(url).openConnection();
416 
417 		final long elapsedMs = System.currentTimeMillis() - startMs;
418 
419 		final int statusCode = cxn.getResponseCode();
420 
421 		cxn.disconnect();
422 
423 		return new CheckResult((int) elapsedMs, statusCode);
424 	}
425 
426 	private static class CheckResult {
427 
428 		public final int elapsedMs;
429 		public final int statusCode;
430 
431 		public CheckResult( //
432 				final int elapsedMs, //
433 				final int statusCode //
434 		) {
435 
436 			this.elapsedMs = elapsedMs;
437 			this.statusCode = statusCode;
438 		}
439 	}
440 
441 	@Override
442 	public ServiceStatus getInlineServiceLiveStatus(final String correlationId, final String serviceId,
443 			final InlineService inlineService) throws ServiceException {
444 
445 		checkNotNull(correlationId, "correlationId");
446 		checkNotNull(serviceId, "serviceId");
447 		checkNotNull(inlineService, "inlineService");
448 
449 		// LogFactory.setCorrelationId(correlationId);
450 
451 		final Expect expect = inlineService.getExpect() != null //
452 				? inlineService.getExpect() //
453 				: instantiate(InlineService.Expect.class) //
454 						.setStatusCode(200);
455 
456 		final CheckThread thread = new CheckThread( //
457 				correlationId, //
458 				serviceId, //
459 				inlineService.getEndpoint(), //
460 				inlineService.getTimeOutMs() != null //
461 						? inlineService.getTimeOutMs().intValue() //
462 						: 60_000, //
463 				expect);
464 
465 		thread.run();
466 
467 		final String checkId = thread.checkId;
468 
469 		if (checkId == null) {
470 			throw new UnexpectedException("Could not acquire checkId");
471 		}
472 
473 		final CheckDto check = wrap(() ->
474 
475 		statusDao.getCheck(checkId));
476 
477 		return instantiate(MutableServiceStatus.class) //
478 				.setServiceId(thread.serviceId) //
479 				.setEndpoint(thread.endpoint) //
480 				.setCheck(dto2Check(check));
481 	}
482 }