MessageStatusProcessor.java

package uk.gov.dhsc.htbhf.claimant.message;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import uk.gov.dhsc.htbhf.claimant.entity.Message;
import uk.gov.dhsc.htbhf.claimant.repository.MessageRepository;

import java.time.LocalDateTime;
import java.util.List;
import javax.transaction.Transactional;

import static uk.gov.dhsc.htbhf.claimant.message.MessageStatus.ERROR;

@Component
public class MessageStatusProcessor {

    private static final long THIRTY_SECONDS = 30;
    private static final int MAX_EXPONENT_SIZE = 50;

    private final MessageRepository messageRepository;
    private final long maximumRetryDelaySeconds;

    public MessageStatusProcessor(
            MessageRepository messageRepository,
            @Value("${message-processor.maximum-retry-delay-seconds}") long maximumRetryDelaySeconds
    ) {
        this.messageRepository = messageRepository;
        this.maximumRetryDelaySeconds = maximumRetryDelaySeconds;
    }

    /**
     * Method responsible for updating/deleting messages post processing dependent upon their status.
     * If a message is COMPLETED then it will be deleted from the queue, otherwise the Message in the
     * database will updated with the status and timestamp and its count will be incremented.
     *
     * @param message       The message that has been processed
     * @param messageStatus The status of the message
     */
    @Transactional(Transactional.TxType.REQUIRES_NEW)
    public void processStatusForMessage(Message message, MessageStatus messageStatus) {
        if (messageStatus == MessageStatus.COMPLETED) {
            messageRepository.delete(message);
        } else {
            updateMessageWithStatusAndIncrementCount(message, messageStatus);
        }
    }

    /**
     * Will set all the given messages to an ERROR status and update delivery counts
     * and timestamps accordingly.
     *
     * @param messages The messages to update
     */
    @Transactional(Transactional.TxType.REQUIRES_NEW)
    public void updateMessagesToErrorAndIncrementCount(List<Message> messages) {
        messages.forEach(message -> updateMessageWithStatusAndIncrementCount(message, ERROR));
    }

    private void updateMessageWithStatusAndIncrementCount(Message message, MessageStatus status) {
        int initialDeliveryCount = message.getDeliveryCount();
        LocalDateTime newTimestamp = LocalDateTime.now().plusSeconds(getRetryDelayInSeconds(initialDeliveryCount));

        message.setDeliveryCount(initialDeliveryCount + 1);
        message.setProcessAfter(newTimestamp);
        message.setStatus(status);
        messageRepository.save(message);
    }

    private long getRetryDelayInSeconds(int deliveryCount) {
        int exponent = Math.min(deliveryCount, MAX_EXPONENT_SIZE); // we overflow the size of a long if the exponent is too large
        long delay = ((long) Math.pow(2, exponent)) * THIRTY_SECONDS;
        return Math.min(delay, maximumRetryDelaySeconds);
    }
}