NonceQueue.kt

package io.github.psychoplasma.nonceq.queue

import java.math.BigInteger
import java.time.Instant

import mu.KotlinLogging


/**
 * Circular Queue implementation for incremental nonce values
 *
 * Circular Queue structure contains three variables
 *  - `head`: cursor indicates the most recent record in the queue
 *  - `tail`: cursor indicates the oldest record in the queue
 *  - `queue`: mapping(nonce value => nonce record), keeps incremental nonce records
 *
 *  ```
 *  fifo queue with capacity n
 *  __________________________________
 *  |  k  | k+1 | .. | .. | .. |k+n-1|
 *  __________________________________
 *    ^                           ^
 *   tail                        head
 *  ```
 *
 * And there is only one queue for every different address space
 */
internal class NonceQueue(
    private val repository: NonceQueueRepository,
    private val capacity: Long,
    private val expiry: Long = 10_000L,
) {
    private val logger = KotlinLogging.logger {}

    class QueueOverflowException(message: String) : IllegalStateException(message)

    /**
     * Inserts [value], and sets head and tail to [value].
     *
     * This should be used only for initialization,
     * otherwise queue can behave unexpectedly!
     */
    fun insert(address: String, value: BigInteger) {
        if (!has(address, value)) {
            repository.setHead(address, value)
            repository.setTail(address, value)

            // Create a new nonce record
            repository.putNonce(
                address,
                value,
                Nonce(value.toString(), false, Instant.now().toEpochMilli()),
            )

            logger.debug { "Inserting initial nonce: $value for $address" }
        }
    }

    /**
     * Returns the next value from the queue.
     * Also, it will try to remove the tail in each iteration.
     *
     * ```
     *  Iteration works as follows;
     *
     *  Let's say there are skipped values in slots k+2 and k+4,
     *  and the head is at k+1.
     *  ______________________________________________
     *  |  k  | k+1 | empty | k+3 | empty | .. |k+n-1|
     *  ______________________________________________
     *    ^      ^
     *   tail   head
     *
     *  if we request the next value, k+2 will be returned
     *  and the head will move to k+2.
     *  ____________________________________________
     *  |  k  | k+1 | k+2 | k+3 | empty | .. |k+n-1|
     *  ____________________________________________
     *    ^            ^
     *   tail         head
     *
     *  However the next value is already in use and the one after
     *  the next value is available. And if we request a next value,
     *  k+3 will be skipped due to the fact that it is already in use.
     *  The head will move to k+3. And it will iterate to the next slot
     *  until it find an empty one.
     *  ____________________________________________
     *  |  k  | k+1 | k+2 | k+3 | empty | .. |k+n-1|
     *  ____________________________________________
     *    ^                 ^
     *   tail              head
     *
     *  And finally it will return the empty slot's value
     *  ____________________________________________
     *  |  k  | k+1 | k+2 | k+3 | empty | .. |k+n-1|
     *  ____________________________________________
     *    ^                        ^
     *   tail                     head
     * ```
     *
     * If the queue has already reached its capacity,
     * throws [QueueOverflowException].
     */
    fun next(address: String): BigInteger {
        // If the queue size reaches the max capacity,
        // do not move any forward (don't accept any request),
        // instead throw an exception which should be handled in the caller.
        while (!isFull(address)) {
            // Remove the tail if it is used which
            // prevents the queue from growing indefinitely.
            // OR remove expired record from the tail.
            removeTail(address)

            // Move cursor to the next value
            val head = repository.getHead(address).inc()
            repository.setHead(address, head)

            // If there is no record at the head, or it's expired, use it
            if (
                !has(address, head)
                || repository.getNonce(address, head)!!.isExpired(expiry)
            ) {
                // Create a new nonce record
                repository.putNonce(
                    address,
                    head,
                    Nonce(head.toString(), false, Instant.now().toEpochMilli()),
                )

                logger.debug { "Next nonce: $head for $address" }
                return head
            }

            // If there is a record at the head which is not expired, then iterate to the next value.
            // Iteration will find an unused value (with no record) or
            // it will throw a QueueOverflowException when it hits the capacity
        }

        throw QueueOverflowException(
            "nonce queue is full for address: $address with capacity: $capacity",
        )
    }

    /**
     * Removes [value] from the queue and sets head to the value
     * right before [value] so that the removed [value] can be used
     * again in the next iteration/s.
     *
     * ```
     *  __________________________________
     *  |  k  | k+1 | .. | .. | .. |k+n-1|
     *  __________________________________
     *    ^                           ^
     *   tail                        head
     *
     *  if the value k+2 is removed, then head moves to k+1.
     *  ____________________________________________
     *  |  k  | k+1 | empty | k+3 | k+4 | .. |k+n-1|
     *  ____________________________________________
     *    ^      ^
     *   tail   head
     *
     *  In the next iteration/s, k+2 will be available again.
     * ```
     *
     * Notice that if there is removed value/s with a lower number,
     * then it won't update the head so that the lower value/s can be used first.
     * ```
     *  For example, with the above case, if the value k+4, also, is removed,
     *  then head won't move to slot k+3, instead it will stay at k+1
     *  because there is already empty slot with a lower value which is k+2.
     *  ______________________________________________
     *  |  k  | k+1 | empty | k+3 | empty | .. |k+n-1|
     *  _____________________________________________
     *    ^      ^
     *   tail   head
     * ```
     */
    fun remove(address: String, value: BigInteger) {
        if (has(address, value)) {
            logger.debug { "Discarding nonce: $value for $address" }

            repository.deleteNonce(address, value)

            var head = repository.getHead(address)

            if (head >= value) {
                // Move the head to the value right before the removed value
                // so that, in the next iteration/s, the removed value can be used again
                head = value.dec()
                repository.setHead(address, head)

                logger.debug { "Moving head to: $head for $address" }
            }
            // If the head has already been moved to a lower value previously
            // (i.e. skipped nonce with lower value), then do not update the head.
        }
    }

    /**
     * Marks [value] as used for later removal from the tail.
     */
    fun markUsed(address: String, value: BigInteger) {
        if (has(address, value)) {
            val nonce = repository.getNonce(address, value)
            nonce!!.used = true
            repository.putNonce(address, value, nonce)
        }
    }

    /**
     * Checks if the queue is empty.
     *
     * Notice that size of zero doesn't always mean that the queue is empty.
     * Even though size is zero, head cursor can point a next value.
     * For example if a value is requested and then removed, assuming
     * that the tail is the previous value (requested value - 1) which
     * will be removed upon requesting the next value, then he head will
     * move to previous value, although there is no record in the queue.
     */
    fun isEmpty(address: String): Boolean {
        val head = repository.getHead(address)
        return repository.size(address) == 0L && head == BigInteger.ZERO
    }

    /**
     * Removes all the records from the queue and reset head and tail cursors
     */
    fun reset(address: String) {
        repository.clear(address)
        logger.debug { "Resetting nonce queue for $address" }
    }

    /**
     * Checks if there is a record with the given [value]
     */
    fun has(address: String, value: BigInteger): Boolean {
        return repository.getNonce(address, value) != null
    }

    /**
     * Removes the tail recursively if it is marked as used,
     * not to grow the queue indefinitely.
     * OR removes an expired record from the tail(not recursively).
     */
    private fun removeTail(address: String) {
        while (true) {
            val tail = repository.getTail(address)
            val tailNonce = repository.getNonce(address, tail) ?: return

            // Remove the tail if it's already used
            if (tailNonce.used) {
                logger.debug {
                    "Removing tail nonce: ${tailNonce.value} for $address"
                }
                repository.deleteNonce(address, tail)
                repository.setTail(address, tail.inc())

                // Remove tail till head
                if (tail < repository.getHead(address)) {
                    continue
                }
            }
            // If not used but expired, remove the record at the tail slot to use it again.
            // Notice that expired means the record is not used either discarded
            // in the given period of time. So we have to discard it by force
            // with the assumption that it's not submitted to blockchain.
            else if (tailNonce.isExpired(expiry)) {
                logger.debug {
                    "Expired nonce: ${tailNonce.value} for $address"
                }
                remove(address, tail)
                // Do not update tail cursor, because we are just
                // emptying a slot to use in the next iteration.
            }

            break
        }
    }

    private fun isFull(address: String): Boolean {
        return repository.size(address) == capacity
    }
}