拨开荷叶行,寻梦已然成。仙女莲花里,翩翩白鹭情。
IMG-LOGO
主页 文章列表 正确使用std::atomic>与非平凡物件?

正确使用std::atomic>与非平凡物件?

白鹭 - 2022-03-02 2243 0 0

我正在尝试通过std::atomic<std::shared_ptr>>对容器等非平凡物件进行操作来实作无锁包装器。我在这两个主题中找到了一些相关的信息:

  • 记忆栅栏
  • 原子使用

但这仍然不是我需要的。

举个例子:

TEST_METHOD(FechAdd)
    {
        constexpr size_t loopCount = 5000000;
        auto&& container           = std::atomic<size_t>(0);
        auto thread1               = std::jthread([&]()
              {
                  for (size_t i = 0; i < loopCount; i  )
                      container  ;
              });

        auto thread2 = std::jthread([&]()
            {
                for (size_t i = 0; i < loopCount; i  )
                    container  ;
            });
        thread1.join();
        thread2.join();
        Assert::AreEqual(loopCount * 2, container.load());
    }

此函式作业正常,因为后增量运算子使用内部fetch_add()原子操作。

另一方面:

TEST_METHOD(LoadStore)
{
    constexpr size_t loopCount = 5000000;
    auto&& container           = std::atomic<size_t>(0);
    auto thread1               = std::jthread([&]()
          {
              for (size_t i = 0; i < loopCount; i  )
              {
                  auto value = container.load();
                  value  ;
                  container.store(value);
              }
          });

    auto thread2 = std::jthread([&]()
        {
            for (size_t i = 0; i < loopCount; i  )
            {
                auto value = container.load();
                value  ;
                container.store(value);
            }
        });
    thread1.join();
    thread2.join();
    Assert::AreEqual(loopCount * 2, container.load());
}

而如果我用.load().store()操作和这两个操作之间的增量替换它,结果就不一样了。这是两个原子操作,因此无法在这些操作之间进行同步。

我的最终目标是通过std::atomic<std::shared_ptr>加载物件的实际状态,执行一些非常量操作,并再次通过存盘操作保存它。

TEST_METHOD(AtomicSharedPtr)
{
    constexpr size_t loopCount = 5000000;
    auto&& container           = std::atomic(std::make_shared<std::unordered_set<int>>());
    auto thread1               = std::jthread([&]([[maybe_unused]] std::stop_token token)
          {
              for (size_t i = 0; i < loopCount; i  )
              {
                  // some other lock-free synchronization primitives as barrier, conditions or?
                  auto reader = container.load();
                  reader->emplace(5);
                  container.store(reader);
              }
          });

    auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
        {
            for (size_t i = 0; i < loopCount; i  )
            {
                // some other lock-free synchronization primitives as barrier, conditions or?
                auto reader = container.load();
                reader->erase(5);
                container.store(reader);
            }
        });
}

I knew that the second thread also has only shared_ptr from atomic and non-const operations on shared_ptr, which can only cause data race.

So any hint on how to implement a lock-free wrapper that will work with non-const operations of the object stored in std::atomic<std::shared_ptr>?

uj5u.com热心网友回复:

首先,旁注。 std::atomic<std::shared_ptr<T>>提供对指标的原子访问,并且提供任何同步T在这里要注意这一点非常重要。并且您的代码显示您正在尝试同步T,而不是指标,因此 没有按照atomic您的想法进行操作。为了使用std::atomic<std::shared_ptr<T>>,您必须将指向的物件T视为const

有两种方法可以以执行绪安全的方式处理任意资料的读取-修改-写入。第一个显然是使用锁。这通常执行起来更快,并且由于它的简单性,通常错误更少,因此强烈建议。如果你真的想用原子操作来做到这一点,这很困难,而且执行速度较慢。

它通常看起来像这样,您对指向的资料进行深度复制,对副本进行变异,然后尝试用新资料替换旧资料。如果其他人在此期间更改了资料,则您将其全部丢弃并重新开始整个突变。

template<class T, class F>
bool readModifyWrite(std::atomic<std::shared_ptr<T>>& container, F&& function) {
   do {
       const auto&& oldT = container.load();
       //first a deep copy, to enforce immutability
       auto&& newT = std::make_shared(oldT.get());
       //then mutate the T
       if (!function(*newT))
           return false; //function aborted
       //then attempt to save the modified T.
       //if someone else changed the container during our modification, start over
  } while(container.compare_exchange_strong(oldT, newT) == false);
    //Note that this may take MANY tries to eventually succeed.
    return true;
}

然后用法类似于您所拥有的:

auto&& container           = std::atomic(std::make_shared<std::unordered_set<int>>());
auto thread1               = std::jthread([&]([[maybe_unused]] std::stop_token token)
      {
          for (size_t i = 0; i < loopCount; i  )
          {
              readModifyWrite(container, [](auto& reader) {
                 reader.emplace(5);
                 return true;
              });
          }
      });

auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
    {
        for (size_t i = 0; i < loopCount; i  )
        {
              readModifyWrite(container, [](auto& reader) {
                 reader.erase(5);
                 return true;
              });
        }
    });
}

请注意,由于一个执行绪是插入5 loopCount次数,另一个是擦除5 loopCount次数,但它们之间不同步,因此第一个执行绪可能会连续写入多次(对于集合而言是空操作),然后是第二个执行绪可能会连续擦除多次(这是一组无操作),因此您无法真正保证此处的最终结果,但我假设您知道这一点。

但是,如果您想使用突变进行同步,那会变得相当复杂。如果成功或中止,变异函式必须回传,然后呼叫者readModifyWrite必须处理修改中止的情况。(注意,readModifyWrite有效地从函式回传值,所以它从修改步骤回传值。写步骤不影响回传值)

auto thread1               = std::jthread([&]([[maybe_unused]] std::stop_token token)
      {
          for (size_t i = 0; i < loopCount; )
          {
              bool did_emplace = readModifyWrite(container, [](auto& reader) {
                 return reader.emplace(5);
              });
              if (did_emplace) i  ;
          }
      });

auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
    {
        for (size_t i = 0; i < loopCount; )
        {
              bool did_erase = readModifyWrite(container, [](auto& reader) {
                 return reader.erase(5);
              });
              if (did_erase) i  ;
        }
    });
}
标签:

0 评论

发表评论

您的电子邮件地址不会被公开。 必填的字段已做标记 *