Issues with Lock-Free Dynamically Resizing Array Implementation

143 Views Asked by At

I've been working on implementing a lock-free array following the instructions from this paper. The goal is to create a thread-safe, dynamically resizing array using lock-free operations.

The implementation appears to work fine for small loads. However, when I try to insert 2e5 elements from two threads simultaneously I have noticed the following errors:

1.

Process finished with exit code 139 (interrupted by signal 11: SIGSEGV). All I could find using the debugger is that my descriptor ends somehow being null.

2.

ds(86913,0x16dde7000) malloc: Heap corruption detected, free list is damaged at 0x60000192c000
*** Incorrect guard value: 4329210352
ds(86913,0x16de73000) malloc: *** error for object 0x60000192c000: pointer being freed was not allocated
ds(86913,0x16de73000) malloc: *** set a breakpoint in malloc_error_break to debug

Here is my implementation:

#ifndef DS_LOCKFREEARRAY_H
#define DS_LOCKFREEARRAY_H

#include <algorithm>
#include <cstddef>

namespace fast_ds {

    namespace internal {
        template<class T>
        class write_descriptor {
        public:
            T old_value_;
            T new_value_;
            size_t location_;
            bool completed_;

            write_descriptor() : location_(0), completed_(true) {}

            explicit write_descriptor(T oldValue, T newValue, size_t location, bool completed) : old_value_(oldValue),
                                                                                                 new_value_(newValue),
                                                                                                 location_(location),
                                                                                                 completed_(completed) {}
        };

        template<class T>
        class v_descriptor {
        public:
            size_t size_ = 0;
            write_descriptor<T> writeDescriptor_;

            v_descriptor() = default;

            explicit v_descriptor(size_t size, write_descriptor<T> w_descriptor) : size_(size),
                                                                                   writeDescriptor_(w_descriptor) {}
        };
    }

    template<class T>
    class LockFreeArray {
    private:
        static constexpr size_t kNumberOfBuckets = 32;
        static constexpr size_t kFirstBucketCapacity = 2;

        std::atomic<std::atomic<T> *> *data_;
        std::shared_ptr<internal::v_descriptor<T>> descriptor_ = std::make_shared<internal::v_descriptor<T>>();

        void CompleteWrite(internal::write_descriptor<T> write_descriptor) {
            if (!write_descriptor.completed_) {
                std::atomic<T> *memoryLocation = At(write_descriptor.location_);
                memoryLocation->compare_exchange_weak(write_descriptor.old_value_,
                                                      write_descriptor.new_value_);

                write_descriptor.completed_ = true;
            }
        }

        inline int HighestBitSet(unsigned int x) const {
            return 31 - __builtin_clz(x);
        }

        void AllocBucket(int bucket) {
            int bucketSize = 1 << (bucket + HighestBitSet(kFirstBucketCapacity));
            std::atomic<T> *currentBucket = data_[bucket].load();
            auto newBucket = new std::atomic<T>[bucketSize];

            if (!data_[bucket].compare_exchange_weak(currentBucket, newBucket)) {
                delete[] newBucket;
            }
        }

    public:
        explicit LockFreeArray() {
            data_ = new std::atomic<std::atomic<T> *>[kNumberOfBuckets];
            data_[0].store(new std::atomic<T>[kFirstBucketCapacity]);
        }

        LockFreeArray(const LockFreeArray &other) = delete;

        size_t Size() noexcept {
            auto current_descriptor = descriptor_;
            auto current_size = current_descriptor->size_;

            if (!descriptor_->writeDescriptor_.completed_) {
                current_size--;
            }

            return current_size;
        }

        std::atomic<T> *At(size_t index) const {
            auto pos = index + kFirstBucketCapacity;
            auto hibit = HighestBitSet(pos);
            auto index_in_bucket = (pos ^ (1 << hibit));
            return &data_[hibit - HighestBitSet(kFirstBucketCapacity)][index_in_bucket];
        }

        void PushBack(const T &value) {
            internal::v_descriptor<T> *current_descriptor;
            std::shared_ptr<internal::v_descriptor<T>> new_descriptor;
            std::shared_ptr<internal::v_descriptor<T>> current_descriptor_ptr = descriptor_;

            do {
                current_descriptor = descriptor_.get();
                if (current_descriptor == nullptr) {
                    continue;
                }
                current_descriptor_ptr = descriptor_;
                int current_size = current_descriptor->size_;
                internal::write_descriptor<T> wDescriptor = current_descriptor->writeDescriptor_;
                CompleteWrite(wDescriptor);

                if (int bucket = HighestBitSet(current_size + kFirstBucketCapacity) -
                                 HighestBitSet(kFirstBucketCapacity); data_[bucket].load() == 0) {
                    AllocBucket(bucket);
                }

                auto oldValue = At(current_size)->load();
                auto write_op = internal::write_descriptor<T>(oldValue, value,
                                                              current_size,
                                                              false);

                new_descriptor = std::make_shared<internal::v_descriptor<T>>(
                        internal::v_descriptor(current_size + 1, write_op));
            } while (!std::atomic_compare_exchange_strong(&descriptor_, &current_descriptor_ptr, new_descriptor));
            CompleteWrite(new_descriptor->writeDescriptor_);
        }
    };
}

#endif

And here is the main file:

#include <iostream>
#include <thread>
#include "include/vectors/LockFreeArray.h"

using namespace fast_ds;
using namespace std;

const int N = 100000;

auto v = LockFreeArray<int>();

void f1() {
    for(int i = 1; i <= N; ++i) {
        v.PushBack(i);
    }
}

void f2() {
    for(int i = 1 + N; i <= 2 * N; ++i) {
        v.PushBack(i);
    }
}

int main() {
    std::thread thread1(f1);
    std::thread thread2(f2);

    thread1.join();
    thread2.join();

    cout << v.Size() << endl;
}

Additional notes:

  1. I use apple clang17 to compile the code on M1.
  2. I tried to use std::experimental::atomic_shared_ptr<T> and atomic<shared_ptr<T>> (from clang20) but they don't seem to be implemented by this compiler nor by the gnu.
  3. I used raw pointers to make the operations as fast as possible but I take in consideration switching to smart pointers.

I am open to suggestions for improving my implementation.

1

There are 1 best solutions below

1
Drew Dormann On

Your issue is with this line.

auto oldValue = At(current_size)->load();

When current_size is 0, what would At() return?

An array's size is never a valid index for that array.

Compiling your code with -fsanitize=address reveals this mistake.