MessageProcessor.java
package uk.gov.dhsc.htbhf.claimant.message;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.PageRequest;
import org.springframework.util.CollectionUtils;
import uk.gov.dhsc.htbhf.claimant.entity.Message;
import uk.gov.dhsc.htbhf.claimant.exception.EventFailedException;
import uk.gov.dhsc.htbhf.claimant.repository.MessageRepository;
import uk.gov.dhsc.htbhf.claimant.service.audit.EventAuditor;
import uk.gov.dhsc.htbhf.logging.event.FailureEvent;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.counting;
import static java.util.stream.Collectors.groupingBy;
import static uk.gov.dhsc.htbhf.claimant.message.MessageStatus.ERROR;
import static uk.gov.dhsc.htbhf.logging.ExceptionDetailGenerator.constructExceptionDetail;
/**
* Component that is triggered on a schedule and is responsible for finding all the messages that need to be
* processed and passing them off to the {@link MessageTypeProcessor} matching the {@link MessageType} of
* the message stored in the database.
*/
@RequiredArgsConstructor
@Slf4j
public class MessageProcessor {
private final MessageStatusProcessor messageStatusProcessor;
private final MessageRepository messageRepository;
private final EventAuditor eventAuditor;
private final Map<MessageType, MessageTypeProcessor> messageProcessorsByType;
@Value("${message-processor.message-limit}")
private final int messageProcessingLimit;
public void processMessagesOfType(MessageType messageType) {
List<Message> messages = messageRepository.findAllMessagesOfTypeWithTimestampBeforeNow(messageType, PageRequest.of(0, messageProcessingLimit));
processMessagesOfType(messages, messageType);
}
private void processMessagesOfType(List<Message> messages, MessageType messageType) {
if (CollectionUtils.isEmpty(messages)) {
log.trace("No {} messages found to process", messageType);
return;
}
log.info("Processing {} {} message(s)", messages.size(), messageType);
MessageTypeProcessor messageTypeProcessor = getMessageTypeProcessor(messageType, messages);
List<MessageStatus> statuses = messages.stream()
.map(message -> processMessage(message, messageTypeProcessor))
.collect(Collectors.toList());
logResults(statuses, messageType, messageTypeProcessor);
}
private MessageTypeProcessor getMessageTypeProcessor(MessageType messageType, List<Message> messages) {
MessageTypeProcessor messageTypeProcessor = messageProcessorsByType.get(messageType);
if (messageTypeProcessor == null) {
messageStatusProcessor.updateMessagesToErrorAndIncrementCount(messages);
throw new IllegalArgumentException("No message type processor found in application context for message type: "
+ messageType + ", there are " + messages.size() + " message(s) due to be processed");
}
return messageTypeProcessor;
}
private MessageStatus processMessage(Message message, MessageTypeProcessor messageTypeProcessor) {
MessageStatus status = invokeMessageTypeProcessor(message, messageTypeProcessor);
messageStatusProcessor.processStatusForMessage(message, status);
return status;
}
private MessageStatus invokeMessageTypeProcessor(Message message, MessageTypeProcessor messageTypeProcessor) {
try {
return messageTypeProcessor.processMessage(message);
} catch (EventFailedException efe) {
log.error("Failure event caught for message with id {}, exception detail: {}", message.getId(), constructExceptionDetail(efe), efe);
FailureEvent failureEvent = efe.getFailureEvent();
eventAuditor.auditFailedEvent(failureEvent);
messageTypeProcessor.processFailedMessage(message, failureEvent);
return ERROR;
} catch (RuntimeException e) {
log.error("Unable to process message with id {}, exception detail: {}", message.getId(), constructExceptionDetail(e), e);
return ERROR;
}
}
private void logResults(List<MessageStatus> statuses, MessageType messageType, MessageTypeProcessor messageTypeProcessor) {
Map<MessageStatus, Long> statusesCountMap = statuses.stream()
.peek(messageStatus -> logNullMessageStatus(messageTypeProcessor, messageStatus))
.filter(Objects::nonNull)
.collect(groupingBy(identity(), counting()));
statusesCountMap.forEach((messageStatus, count) -> log.info("Processed {} {} message(s) with status {}", count, messageType, messageStatus.name()));
}
private void logNullMessageStatus(MessageTypeProcessor messageTypeProcessor, MessageStatus messageStatus) {
if (messageStatus == null) {
log.error("Received null message status from MessageTypeProcessor: {}", messageTypeProcessor.getClass().getCanonicalName());
}
}
}