1 package net.avcompris.status.core.impl;
2
3 import static com.google.common.base.Preconditions.checkArgument;
4 import static com.google.common.base.Preconditions.checkNotNull;
5 import static com.google.common.collect.Lists.newArrayList;
6 import static net.avcompris.commons3.core.DateTimeHolderImpl.toDateTimeHolder;
7 import static net.avcompris.commons3.core.DateTimeHolderImpl.toDateTimeHolderOrNull;
8 import static net.avcompris.commons3.databeans.DataBeans.instantiate;
9 import static net.avcompris.commons3.utils.EnvUtils.getEnvProperty;
10 import static org.apache.commons.lang3.StringUtils.substringAfter;
11
12 import java.io.File;
13 import java.io.FileNotFoundException;
14 import java.io.IOException;
15 import java.net.HttpURLConnection;
16 import java.net.URL;
17 import java.util.List;
18
19 import javax.annotation.Nullable;
20
21 import org.apache.commons.lang3.NotImplementedException;
22 import org.apache.commons.logging.Log;
23 import org.joda.time.DateTime;
24 import org.springframework.beans.factory.annotation.Autowired;
25 import org.springframework.stereotype.Service;
26
27 import com.google.common.collect.Iterables;
28
29 import net.avcompris.commons3.api.exception.ServiceException;
30 import net.avcompris.commons3.api.exception.UnexpectedException;
31 import net.avcompris.commons3.core.Permissions;
32 import net.avcompris.commons3.core.impl.AbstractServiceImpl;
33 import net.avcompris.commons3.utils.Clock;
34 import net.avcompris.commons3.utils.LogFactory;
35 import net.avcompris.status.api.Check;
36 import net.avcompris.status.api.InlineService;
37 import net.avcompris.status.api.ServiceStatus;
38 import net.avcompris.status.api.ServiceStatusHistory;
39 import net.avcompris.status.api.ServicesStatus;
40 import net.avcompris.status.api.ServicesStatusHistory;
41 import net.avcompris.status.api.StatusConfig;
42 import net.avcompris.status.api.StatusConfig.Expect;
43 import net.avcompris.status.api.StatusService;
44 import net.avcompris.status.api.TriggerType;
45 import net.avcompris.status.dao.CheckDto;
46 import net.avcompris.status.dao.EndpointDto;
47 import net.avcompris.status.dao.ServiceStatusHistoryDto;
48 import net.avcompris.status.dao.ServicesStatusHistoryDto;
49 import net.avcompris.status.dao.StatusDao;
50 import net.avcompris.status.query.CheckStatus;
51
52 @Service
53 public final class StatusServiceImpl extends AbstractServiceImpl implements StatusService {
54
55 private static final Log logger = LogFactory.getLog(StatusServiceImpl.class);
56
57 private final StatusDao statusDao;
58
59 private final StatusConfig config;
60
61 @Autowired
62 public StatusServiceImpl(final Permissions permissions, final Clock clock,
63 final StatusDao statusDao) throws IOException {
64
65 super(permissions, clock);
66
67 this.statusDao = checkNotNull(statusDao, "statusDao");
68
69 final String configFilePath = getEnvProperty("configFile", "/etc/avcompris/service-status.yml");
70
71 final File configFile = new File(configFilePath);
72
73 if (!configFile.isFile()) {
74 throw new FileNotFoundException("configFile should exist: " + configFile.getCanonicalPath());
75 }
76
77 config = ConfigLoader.loadConfig(configFile);
78 }
79
80 @Override
81 public StatusConfig getStatusConfig(final String correlationId) throws ServiceException {
82
83 checkNotNull(correlationId, "correlationId");
84
85 return config;
86 }
87
88 @Override
89 public ServicesStatus getServicesLiveStatus(final String correlationId) throws ServiceException {
90
91 checkNotNull(correlationId, "correlationId");
92
93
94
95 final MutableServicesStatus servicesStatus = instantiate(MutableServicesStatus.class)
96 .setCheckStartedAt(toDateTimeHolder(clock.now()))
97 .setTriggerType(TriggerType.UNKNOWN);
98
99 final List<CheckThread> threads = newArrayList();
100
101 for (final StatusConfig.ServiceConfig service : config.getServices()) {
102
103 threads.add(new CheckThread(
104 correlationId,
105 service.getId(),
106 service.getEndpoint(),
107 service.getTimeOutMs().intValue(),
108 service.getExpect()));
109 }
110
111 threads.stream().forEach((thread) -> thread.start());
112
113 threads.stream().forEach((thread) -> {
114
115 try {
116
117 thread.join();
118
119 } catch (final InterruptedException e) {
120
121 try {
122
123 addCheckError(thread.serviceId, thread.endpoint, thread.checkId, e);
124
125 } catch (final ServiceException e2) {
126
127 throw new RuntimeException(e2);
128 }
129 }
130 });
131
132 for (final CheckThread thread : threads) {
133
134 final String checkId = thread.checkId;
135
136 if (checkId == null) {
137 continue;
138 }
139
140 final CheckDto check = wrap(() ->
141
142 statusDao.getCheck(checkId));
143
144 servicesStatus.addToItems(instantiate(MutableServiceStatus.class)
145 .setServiceId(thread.serviceId)
146 .setEndpoint(thread.endpoint)
147 .setCheck(dto2Check(check)));
148 }
149
150 servicesStatus.setCheckEndedAt(toDateTimeHolder(clock.now()));
151
152 return servicesStatus;
153 }
154
155 private StatusConfig.ServiceConfig extractServiceConfig(final String serviceId) {
156
157 for (final StatusConfig.ServiceConfig serviceConfig : config.getServices()) {
158
159 if (serviceId.contentEquals(serviceConfig.getId())) {
160 return serviceConfig;
161 }
162 }
163
164 throw new IllegalArgumentException("Unknown serviceId: " + serviceId);
165 }
166
167 @Override
168 public ServiceStatus getServiceLiveStatus(final String correlationId, final String serviceId)
169 throws ServiceException {
170
171 checkNotNull(correlationId, "correlationId");
172 checkNotNull(serviceId, "serviceId");
173
174 final StatusConfig.ServiceConfig serviceConfig = extractServiceConfig(serviceId);
175
176 final CheckThread thread = new CheckThread(
177 correlationId,
178 serviceId,
179 serviceConfig.getEndpoint(),
180 serviceConfig.getTimeOutMs(),
181 serviceConfig.getExpect());
182
183 thread.run();
184
185 final String checkId = thread.checkId;
186
187 if (checkId == null) {
188 throw new UnexpectedException("Could not acquire checkId");
189 }
190
191 final CheckDto check = wrap(() ->
192
193 statusDao.getCheck(checkId));
194
195 return instantiate(MutableServiceStatus.class)
196 .setServiceId(thread.serviceId)
197 .setEndpoint(thread.endpoint)
198 .setCheck(dto2Check(check));
199 }
200
201 @Override
202 public ServicesStatusHistory getServicesStatusHistory(final String correlationId) throws ServiceException {
203
204 checkNotNull(correlationId, "correlationId");
205
206 final List<EndpointDto> endpoints = newArrayList();
207
208 for (final StatusConfig.ServiceConfig serviceConfig : config.getServices()) {
209
210 endpoints.add(instantiate(EndpointDto.class)
211 .setServiceId(serviceConfig.getId())
212 .setEndpoint(serviceConfig.getEndpoint()));
213 }
214
215 final ServicesStatusHistoryDto servicesStatusHistoryDto = wrap(()
216
217 -> statusDao.getServicesCachedStatus(Iterables.toArray(endpoints, EndpointDto.class)));
218
219 final MutableServicesStatusHistory history = instantiate(MutableServicesStatusHistory.class);
220
221 for (final ServiceStatusHistoryDto serviceStatusHistoryDto : servicesStatusHistoryDto.getItems()) {
222
223 history.addToItems(dto2ServiceStatusHistory(serviceStatusHistoryDto));
224 }
225
226 return history;
227 }
228
229 private static ServiceStatusHistory dto2ServiceStatusHistory(
230 final ServiceStatusHistoryDto serviceStatusHistoryDto) {
231
232 final MutableServiceStatusHistory history = instantiate(MutableServiceStatusHistory.class)
233 .setServiceId(serviceStatusHistoryDto.getServiceId())
234 .setEndpoint(serviceStatusHistoryDto.getEndpoint())
235 .setStart(serviceStatusHistoryDto.getStart())
236 .setTotal(serviceStatusHistoryDto.getTotal());
237
238 for (final CheckDto checkDto : serviceStatusHistoryDto.getChecks()) {
239
240 history.addToChecks(dto2Check(checkDto));
241 }
242
243 return history;
244 }
245
246 private static Check dto2Check(final CheckDto checkDto) {
247
248 final CheckStatus status = checkDto.getStatus();
249
250 final boolean success = status == CheckStatus.SUCCESS;
251
252 final DateTime startedAt = checkDto.getStartedAt();
253 final DateTime endedAt = checkDto.getEndedAt();
254
255 return instantiate(MutableCheck.class)
256 .setId(checkDto.getId())
257 .setStartedAt(toDateTimeHolder(startedAt))
258 .setEndedAt(toDateTimeHolderOrNull(endedAt))
259 .setElapsedMs(checkDto.getElapsedMs())
260 .setTriggerType(TriggerType.UNKNOWN)
261 .setStatus(status)
262 .setSuccess(success)
263 .setErrorMessage(checkDto.getErrorMessage())
264 .setStatusCode(checkDto.getStatusCode());
265 }
266
267 @Override
268 public ServiceStatusHistory getServiceStatusHistory(final String correlationId, final String serviceId)
269 throws ServiceException {
270
271 checkNotNull(correlationId, "correlationId");
272 checkNotNull(serviceId, "serviceId");
273
274 throw new NotImplementedException("");
275 }
276
277 private String initCheck(final String serviceId, final String endpoint) throws ServiceException {
278
279 return wrap(()
280
281 -> statusDao.initCheck(serviceId, endpoint));
282 }
283
284 private void addCheckError(final String serviceId, final String endpoint, @Nullable final String checkId,
285 final Throwable error) throws ServiceException {
286
287 logger.warn("addCheckError(): " + endpoint, error);
288
289 wrap(()
290
291 -> statusDao.addCheckError(serviceId, endpoint, checkId, error.getMessage()));
292 }
293
294 private void addCheckError(final String serviceId, final String endpoint, @Nullable final String checkId,
295 final String errorMessage) throws ServiceException {
296
297 logger.warn("addCheckError(): " + endpoint + ": " + errorMessage);
298
299 wrap(()
300
301 -> statusDao.addCheckError(serviceId, endpoint, checkId, errorMessage));
302 }
303
304 private void endCheck(
305 final String checkId,
306 final CheckResult result
307 ) throws ServiceException {
308
309 wrap(()
310
311 -> statusDao.endCheck(checkId, result.elapsedMs, result.statusCode));
312 }
313
314 private final class CheckThread extends Thread {
315
316 private final String correlationId;
317 private final String serviceId;
318 private final String endpoint;
319 private final int timeOutMs;
320 private final Expect expect;
321
322 @Nullable
323 private String checkId;
324
325 public CheckThread(
326 final String correlationId,
327 final String serviceId,
328 final String endpoint,
329 final int timeOutMs,
330 final Expect expect) {
331
332 this.correlationId = checkNotNull(correlationId, "correlationId");
333 this.serviceId = checkNotNull(serviceId, "serviceId");
334 this.endpoint = checkNotNull(endpoint, "endpoint");
335 this.timeOutMs = timeOutMs;
336 this.expect = checkNotNull(expect, "expect");
337 }
338
339 @Override
340 public void run() {
341
342 LogFactory.setCorrelationId(correlationId);
343
344 final CheckResult result;
345
346 try {
347
348 checkId = initCheck(serviceId, endpoint);
349
350 result = doCheckEndpoint(endpoint);
351
352 } catch (final ServiceException e) {
353
354 try {
355
356 addCheckError(serviceId, endpoint, checkId, e);
357
358 return;
359
360 } catch (final ServiceException e2) {
361
362 throw new RuntimeException(e2);
363 }
364
365 } catch (final IOException | RuntimeException | Error e) {
366
367 try {
368
369 addCheckError(serviceId, endpoint, checkId, e);
370
371 return;
372
373 } catch (final ServiceException e2) {
374
375 throw new RuntimeException(e2);
376 }
377 }
378
379 try {
380
381 endCheck(checkId, result);
382
383 if (expect.getStatusCode() != null) {
384
385 final int expectedStatusCode = expect.getStatusCode();
386
387 if (expectedStatusCode != result.statusCode) {
388
389 addCheckError(serviceId, endpoint, checkId,
390 "Expected statusCode: " + expectedStatusCode + ", but was: " + result.statusCode);
391
392 return;
393 }
394 }
395
396 } catch (final ServiceException e) {
397
398 throw new RuntimeException(e);
399 }
400 }
401 }
402
403 private static CheckResult doCheckEndpoint(final String endpoint) throws IOException {
404
405 checkArgument(endpoint.startsWith("GET "), "Endpoint should start with \"GET \", but was: %s", endpoint);
406
407 final String url = substringAfter(endpoint, "GET").trim();
408
409 checkArgument(url.startsWith("https://") //
410 || url.startsWith("http://"), //
411 "URL should start with \"https://\" or \"http://\", but was: %s", endpoint);
412
413 final long startMs = System.currentTimeMillis();
414
415 final HttpURLConnection cxn = (HttpURLConnection) new URL(url).openConnection();
416
417 final long elapsedMs = System.currentTimeMillis() - startMs;
418
419 final int statusCode = cxn.getResponseCode();
420
421 cxn.disconnect();
422
423 return new CheckResult((int) elapsedMs, statusCode);
424 }
425
426 private static class CheckResult {
427
428 public final int elapsedMs;
429 public final int statusCode;
430
431 public CheckResult(
432 final int elapsedMs,
433 final int statusCode
434 ) {
435
436 this.elapsedMs = elapsedMs;
437 this.statusCode = statusCode;
438 }
439 }
440
441 @Override
442 public ServiceStatus getInlineServiceLiveStatus(final String correlationId, final String serviceId,
443 final InlineService inlineService) throws ServiceException {
444
445 checkNotNull(correlationId, "correlationId");
446 checkNotNull(serviceId, "serviceId");
447 checkNotNull(inlineService, "inlineService");
448
449
450
451 final Expect expect = inlineService.getExpect() != null
452 ? inlineService.getExpect()
453 : instantiate(InlineService.Expect.class)
454 .setStatusCode(200);
455
456 final CheckThread thread = new CheckThread(
457 correlationId,
458 serviceId,
459 inlineService.getEndpoint(),
460 inlineService.getTimeOutMs() != null
461 ? inlineService.getTimeOutMs().intValue()
462 : 60_000,
463 expect);
464
465 thread.run();
466
467 final String checkId = thread.checkId;
468
469 if (checkId == null) {
470 throw new UnexpectedException("Could not acquire checkId");
471 }
472
473 final CheckDto check = wrap(() ->
474
475 statusDao.getCheck(checkId));
476
477 return instantiate(MutableServiceStatus.class)
478 .setServiceId(thread.serviceId)
479 .setEndpoint(thread.endpoint)
480 .setCheck(dto2Check(check));
481 }
482 }