What causes UnrecoverableTimeoutException and how should i fix or avoid it?

165 Views Asked by At

I have N sets of 1 producer to 1 consumer. The consumer will write to Chronicle Queue. I just found out today that there is an error I have not previously seen before.

Exception in thread "TaskConsumer2" Exception in thread "TaskConsumer0" net.openhft.chronicle.wire.UnrecoverableTimeoutException: Couldn't acquire write lock after 15000 ms for the lock file:./chronicle/roll/metadata.cq4t. Lock was held by me. You can manually unlock with net.openhft.chronicle.queue.main.UnlockMain
        at net.openhft.chronicle.queue.impl.single.TableStoreWriteLock.lock(TableStoreWriteLock.java:96)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.prepareAndReturnWriteContext(StoreAppender.java:430)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:406)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:394)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.writeBytes(StoreAppender.java:194)
        at service.producerconsumer.TaskConsumer.runTask(TaskConsumer.java:80)
        at service.producerconsumer.TaskConsumer.run(TaskConsumer.java:142)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Couldn't acquire write lock after 15000 ms for the lock file:./chronicle/roll/metadata.cq4t. Lock was held by me. You can manually unlock with net.openhft.chronicle.queue.main.UnlockMain
        ... 8 more
net.openhft.chronicle.wire.UnrecoverableTimeoutException: Couldn't acquire write lock after 15000 ms for the lock file:./chronicle/roll/metadata.cq4t. Lock was held by me. You can manually unlock with net.openhft.chronicle.queue.main.UnlockMain
        at net.openhft.chronicle.queue.impl.single.TableStoreWriteLock.lock(TableStoreWriteLock.java:96)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.prepareAndReturnWriteContext(StoreAppender.java:430)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:406)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:394)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.writeBytes(StoreAppender.java:194)
        at service.producerconsumer.TaskConsumer.runTask(TaskConsumer.java:80)
        at service.producerconsumer.TaskConsumer.run(TaskConsumer.java:142)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Couldn't acquire write lock after 15000 ms for the lock file:./chronicle/roll/metadata.cq4t. Lock was held by me. You can manually unlock with net.openhft.chronicle.queue.main.UnlockMain
        ... 8 more

Is this due to multi-thread access to chronicle queue? I reuse the chronicle queue and use thread local appenders and the following is a sample of how I use the class.

public class TaskConsumer implements Runnable {
    private final ChronicleQueue QUEUE;
    private CustomQueueClass queue;
    private ExcerptAppender APPENDER;
    //other code

    public TaskConsumer(ChronicleQueue queue) {
        this.QUEUE= queue;
        //instantiate queue
        //other code
    }

    private long millis;
    private long nanos;
    private ByteBuffer buffer;
    private InetAddress remoteAdd;
    private int remotePort;
    private String ni;
    private int remaining;
    private int senderId;
    private long seqNum;
    private MoldUdpHeader moldUdpHeader = new MoldUdpHeader();
    private final PrimitiveIntPair pair = new PrimitiveIntPair(0, 0);
    private final WriteBytesMarshallable marshallable = (bytes) -> bytes.writeLong(this.millis)
        .writeLong(this.nanos)
        .write(this.remoteAdd.getAddress())
        .writeInt(this.remotePort)
        .writeUtf8(this.ni)
        .writeInt(this.remaining)
        .writeInt(this.senderId)
        .writeLong(this.seqNum)
        .write(this.buffer.array(), 0, this.remaining);  //sbe-style writes seqNum, remoteAddress, and the ByteBuffer

    private void runTask() {
        LOGGER.debug(logMarker, "{} {} {} {} {} | senderId: {} seqNum: {} msgCnt: {}",
            () -> ZonedDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneId.of("Asia/Hong_Kong")),
            () -> remoteAdd.getHostName(), () -> remotePort, () -> ni,
            () -> remaining, () -> moldUdpHeader.getSenderId(), () -> moldUdpHeader.getSeqNum(),
            () -> moldUdpHeader.getMsgCnt());
        this.APPENDER.writeBytes(marshallable);  //<--error thrown here
    }

    public void run() {
        this.APPENDER = QUEUE.acquireAppender();
        TaskHolder task = null;
        while (true) {
            if (Thread.currentThread().isInterrupted()) {
                closeAppender();
                break;
            }
            if (task == null) {
                try {
                    task = queue.getForConsuming(TIMEOUT, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }

            if (task != null) {
                buffer = task.getByteBuffer();
                if (task.getUdpChannel() != null) {
                    remoteAdd = task.getUdpChannel().getGROUP_ADDRESS();
                    remotePort = task.getUdpChannel().getPORT();
                    millis = task.getMillis();
                    nanos = task.getNanos();
                    ni = task.getUdpChannel().getNETWORK_INTERFACE().getName();
                    remaining = buffer.remaining();

                    if (DECODING.equals("TRUE")) {
                        moldUdpHeader = (MoldUdpHeader) moldUdpHeader.decode(buffer, 0);
                    }
                    senderId = moldUdpHeader.getSenderId();
                    seqNum = moldUdpHeader.getSeqNum();

                    pair.setId(moldUdpHeader.getSenderId());
                    pair.setIndex(getIndex(task.getUdpChannel()));

                    triplesHashmap.computeIfAbsent(pair.copy(), k -> (DECODING.equals("TRUE")) ?
                        new Triple<>(new MutableLong(Long.MIN_VALUE), new LongArrayVisitationCounter(10000000), new PacketStats()) :
                        new Triple<>(new MutableLong(Long.MIN_VALUE), new LongArrayVisitationCounter(10), new PacketStats()));  //using a supplier to lazily instantiate
                    runTask();  //<--- error thrown here
                    synchronized (triplesHashmap.get(pair).getType3()) {
                        if (DECODING.equals("TRUE")) {
                            checkReadValueSequence(triplesHashmap.get(pair), pair, moldUdpHeader.getSeqNum());
                        } else {
                            PacketStats stats = triplesHashmap.get(pair).getType3();
                            stats.incrementPacketsRead();
                            stats.incrementBytesReadBy(remaining);
                        }
                    }
                }
                task.clearValues();
                queue.incrementReadIndex();
                task = null;
            }
        }
    }

    //other code
}

What is weird is that I have deployed the jar onto multiple servers, but it is just this one server that has this error. The other servers are working as per normal. I could use try-catch to just ignore the error and recursion on the catch to retry running the task, but I would like to know what is causing this and how to avoid it

1

There are 1 best solutions below

6
Peter Lawrey On

It sounds like you are going the right thing. You can share the Chronicle Queue, a thread-local appender and tailer shouldn't have a problem. Some older versions had problems with clean-up of resources esp if a thread died. We have better control over that now.

I suggest you try 5.23.37 or 5.24ea17.

BTW DECODING.equals("TRUE") is expensive to do every time. I suggest caching this in a local variable outside the loop.