330 lines
9.8 KiB
C++
330 lines
9.8 KiB
C++
/*
|
|
* Copyright (c) Facebook, Inc. and its affiliates.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include <folly/FileUtil.h>
|
|
#include <folly/io/async/AtomicNotificationQueue.h>
|
|
#include <folly/system/Pid.h>
|
|
|
|
namespace folly {
|
|
|
|
template <typename Task>
|
|
AtomicNotificationQueue<Task>::Queue::Queue(Queue&& other) noexcept
|
|
: head_(std::exchange(other.head_, nullptr)),
|
|
size_(std::exchange(other.size_, 0)) {}
|
|
|
|
template <typename Task>
|
|
typename AtomicNotificationQueue<Task>::Queue&
|
|
AtomicNotificationQueue<Task>::Queue::operator=(Queue&& other) noexcept {
|
|
clear();
|
|
std::swap(head_, other.head_);
|
|
std::swap(size_, other.size_);
|
|
return *this;
|
|
}
|
|
|
|
template <typename Task>
|
|
AtomicNotificationQueue<Task>::Queue::~Queue() {
|
|
clear();
|
|
}
|
|
|
|
template <typename Task>
|
|
bool AtomicNotificationQueue<Task>::Queue::empty() const {
|
|
return !head_;
|
|
}
|
|
|
|
template <typename Task>
|
|
ssize_t AtomicNotificationQueue<Task>::Queue::size() const {
|
|
return size_;
|
|
}
|
|
|
|
template <typename Task>
|
|
typename AtomicNotificationQueue<Task>::Node&
|
|
AtomicNotificationQueue<Task>::Queue::front() {
|
|
return *head_;
|
|
}
|
|
|
|
template <typename Task>
|
|
void AtomicNotificationQueue<Task>::Queue::pop() {
|
|
std::unique_ptr<Node>(std::exchange(head_, head_->next));
|
|
--size_;
|
|
}
|
|
|
|
template <typename Task>
|
|
void AtomicNotificationQueue<Task>::Queue::clear() {
|
|
while (!empty()) {
|
|
pop();
|
|
}
|
|
}
|
|
|
|
template <typename Task>
|
|
AtomicNotificationQueue<Task>::Queue::Queue(Node* head, ssize_t size)
|
|
: head_(head), size_(size) {}
|
|
|
|
template <typename Task>
|
|
typename AtomicNotificationQueue<Task>::Queue
|
|
AtomicNotificationQueue<Task>::Queue::fromReversed(Node* tail) {
|
|
// Reverse a linked list.
|
|
Node* head{nullptr};
|
|
ssize_t size = 0;
|
|
while (tail) {
|
|
head = std::exchange(tail, std::exchange(tail->next, head));
|
|
++size;
|
|
}
|
|
return Queue(head, size);
|
|
}
|
|
|
|
template <typename Task>
|
|
AtomicNotificationQueue<Task>::AtomicQueue::~AtomicQueue() {
|
|
DCHECK(!head_);
|
|
if (reinterpret_cast<intptr_t>(head_.load(std::memory_order_relaxed)) ==
|
|
kQueueArmedTag) {
|
|
return;
|
|
}
|
|
if (auto head = head_.load(std::memory_order_acquire)) {
|
|
auto queueContentsToDrop = Queue::fromReversed(head);
|
|
}
|
|
}
|
|
|
|
template <typename Task>
|
|
template <typename... Args>
|
|
bool AtomicNotificationQueue<Task>::AtomicQueue::push(Args&&... args) {
|
|
std::unique_ptr<Node> node(new Node(std::forward<Args>(args)...));
|
|
auto head = head_.load(std::memory_order_relaxed);
|
|
while (true) {
|
|
node->next =
|
|
reinterpret_cast<intptr_t>(head) == kQueueArmedTag ? nullptr : head;
|
|
if (head_.compare_exchange_weak(
|
|
head,
|
|
node.get(),
|
|
std::memory_order_acq_rel,
|
|
std::memory_order_relaxed)) {
|
|
node.release();
|
|
return reinterpret_cast<intptr_t>(head) == kQueueArmedTag;
|
|
}
|
|
}
|
|
}
|
|
|
|
template <typename Task>
|
|
bool AtomicNotificationQueue<Task>::AtomicQueue::hasTasks() const {
|
|
auto head = head_.load(std::memory_order_relaxed);
|
|
return head && reinterpret_cast<intptr_t>(head) != kQueueArmedTag;
|
|
}
|
|
|
|
template <typename Task>
|
|
typename AtomicNotificationQueue<Task>::Queue
|
|
AtomicNotificationQueue<Task>::AtomicQueue::getTasks() {
|
|
auto head = head_.exchange(nullptr, std::memory_order_acquire);
|
|
if (head && reinterpret_cast<intptr_t>(head) != kQueueArmedTag) {
|
|
return Queue::fromReversed(head);
|
|
}
|
|
return {};
|
|
}
|
|
|
|
template <typename Task>
|
|
typename AtomicNotificationQueue<Task>::Queue
|
|
AtomicNotificationQueue<Task>::AtomicQueue::arm() {
|
|
auto head = head_.load(std::memory_order_relaxed);
|
|
if (!head &&
|
|
head_.compare_exchange_strong(
|
|
head,
|
|
reinterpret_cast<Node*>(kQueueArmedTag),
|
|
std::memory_order_release,
|
|
std::memory_order_relaxed)) {
|
|
return {};
|
|
}
|
|
DCHECK(reinterpret_cast<intptr_t>(head) != kQueueArmedTag);
|
|
return getTasks();
|
|
}
|
|
|
|
template <typename Task>
|
|
AtomicNotificationQueue<Task>::AtomicNotificationQueue() {}
|
|
|
|
template <typename Task>
|
|
AtomicNotificationQueue<Task>::~AtomicNotificationQueue() {
|
|
// Empty the queue
|
|
atomicQueue_.getTasks();
|
|
}
|
|
|
|
template <typename Task>
|
|
void AtomicNotificationQueue<Task>::setMaxReadAtOnce(uint32_t maxAtOnce) {
|
|
maxReadAtOnce_ = maxAtOnce;
|
|
}
|
|
|
|
template <typename Task>
|
|
size_t AtomicNotificationQueue<Task>::size() const {
|
|
auto queueSize = pushCount_.load(std::memory_order_relaxed) -
|
|
taskExecuteCount_.load(std::memory_order_relaxed);
|
|
return queueSize >= 0 ? queueSize : 0;
|
|
}
|
|
|
|
template <typename Task>
|
|
bool AtomicNotificationQueue<Task>::empty() const {
|
|
return queue_.empty() && !atomicQueue_.hasTasks();
|
|
}
|
|
|
|
template <typename Task>
|
|
bool AtomicNotificationQueue<Task>::arm() {
|
|
if (!queue_.empty()) {
|
|
return false;
|
|
}
|
|
auto queue = atomicQueue_.arm();
|
|
if (queue.empty()) {
|
|
return true;
|
|
} else {
|
|
queue_ = std::move(queue);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
template <typename Task>
|
|
template <typename... Args>
|
|
bool AtomicNotificationQueue<Task>::push(Args&&... args) {
|
|
pushCount_.fetch_add(1, std::memory_order_relaxed);
|
|
return atomicQueue_.push(std::forward<Args>(args)...);
|
|
}
|
|
|
|
template <typename Task>
|
|
typename AtomicNotificationQueue<Task>::TryPushResult
|
|
AtomicNotificationQueue<Task>::tryPush(Task&& task, uint32_t maxSize) {
|
|
auto pushed = pushCount_.load(std::memory_order_relaxed);
|
|
while (true) {
|
|
auto executed = taskExecuteCount_.load(std::memory_order_relaxed);
|
|
if (pushed - executed >= maxSize) {
|
|
return TryPushResult::FAILED_LIMIT_REACHED;
|
|
}
|
|
if (pushCount_.compare_exchange_weak(
|
|
pushed,
|
|
pushed + 1,
|
|
std::memory_order_relaxed,
|
|
std::memory_order_relaxed)) {
|
|
break;
|
|
}
|
|
}
|
|
return atomicQueue_.push(std::move(task)) ? TryPushResult::SUCCESS_AND_ARMED
|
|
: TryPushResult::SUCCESS;
|
|
}
|
|
|
|
namespace detail {
|
|
template <
|
|
typename Task,
|
|
typename Consumer,
|
|
typename = std::enable_if_t<std::is_same<
|
|
invoke_result_t<Consumer, Task&&>,
|
|
AtomicNotificationQueueTaskStatus>::value>>
|
|
AtomicNotificationQueueTaskStatus invokeConsumerWithTask(
|
|
Consumer&& consumer, Task&& task, std::shared_ptr<RequestContext>&& rctx) {
|
|
RequestContextScopeGuard rcsg(std::move(rctx));
|
|
return consumer(std::forward<Task>(task));
|
|
}
|
|
|
|
template <
|
|
typename Task,
|
|
typename Consumer,
|
|
typename = std::enable_if_t<std::is_same<
|
|
invoke_result_t<Consumer, Task&&, std::shared_ptr<RequestContext>&&>,
|
|
AtomicNotificationQueueTaskStatus>::value>,
|
|
typename = void>
|
|
AtomicNotificationQueueTaskStatus invokeConsumerWithTask(
|
|
Consumer&& consumer, Task&& task, std::shared_ptr<RequestContext>&& rctx) {
|
|
return consumer(
|
|
std::forward<Task>(task),
|
|
std::forward<std::shared_ptr<RequestContext>>(rctx));
|
|
}
|
|
|
|
template <
|
|
typename Task,
|
|
typename Consumer,
|
|
typename = std::enable_if_t<
|
|
std::is_same<invoke_result_t<Consumer, Task&&>, void>::value>,
|
|
typename = void,
|
|
typename = void>
|
|
AtomicNotificationQueueTaskStatus invokeConsumerWithTask(
|
|
Consumer&& consumer, Task&& task, std::shared_ptr<RequestContext>&& rctx) {
|
|
RequestContextScopeGuard rcsg(std::move(rctx));
|
|
consumer(std::forward<Task>(task));
|
|
return AtomicNotificationQueueTaskStatus::CONSUMED;
|
|
}
|
|
|
|
template <
|
|
typename Task,
|
|
typename Consumer,
|
|
typename = std::enable_if_t<std::is_same<
|
|
invoke_result_t<Consumer, Task&&, std::shared_ptr<RequestContext>&&>,
|
|
void>::value>,
|
|
typename = void,
|
|
typename = void,
|
|
typename = void>
|
|
AtomicNotificationQueueTaskStatus invokeConsumerWithTask(
|
|
Consumer&& consumer, Task&& task, std::shared_ptr<RequestContext>&& rctx) {
|
|
consumer(
|
|
std::forward<Task>(task),
|
|
std::forward<std::shared_ptr<RequestContext>>(rctx));
|
|
return AtomicNotificationQueueTaskStatus::CONSUMED;
|
|
}
|
|
|
|
} // namespace detail
|
|
|
|
template <typename Task>
|
|
template <typename Consumer>
|
|
bool AtomicNotificationQueue<Task>::drive(Consumer&& consumer) {
|
|
Queue nextQueue;
|
|
// Since we cannot know if a task will be discarded before trying to execute
|
|
// it, this check may cause this function to return early. That is, even
|
|
// though:
|
|
// 1. numConsumed < maxReadAtOnce_
|
|
// 2. atomicQueue_ is not empty
|
|
// This is not an issue in practice because these tasks will be executed in
|
|
// the next round.
|
|
//
|
|
// In short, if `size() > maxReadAtOnce_`:
|
|
// * at least maxReadAtOnce_ tasks will be "processed"
|
|
// * at most maxReadAtOnce_ tasks will be "executed" (rest being discarded)
|
|
if (maxReadAtOnce_ == 0 || queue_.size() < maxReadAtOnce_) {
|
|
nextQueue = atomicQueue_.getTasks();
|
|
}
|
|
const bool wasAnyProcessed = !queue_.empty() || !nextQueue.empty();
|
|
for (int32_t numConsumed = 0;
|
|
maxReadAtOnce_ == 0 || numConsumed < maxReadAtOnce_;) {
|
|
if (queue_.empty()) {
|
|
queue_ = std::move(nextQueue);
|
|
if (queue_.empty()) {
|
|
break;
|
|
}
|
|
}
|
|
// This is faster than fetch_add and is safe because only consumer thread
|
|
// writes to taskExecuteCount_.
|
|
taskExecuteCount_.store(
|
|
taskExecuteCount_.load(std::memory_order_relaxed) + 1,
|
|
std::memory_order_relaxed);
|
|
{
|
|
auto& curNode = queue_.front();
|
|
AtomicNotificationQueueTaskStatus consumeTaskStatus =
|
|
detail::invokeConsumerWithTask(
|
|
std::forward<Consumer>(consumer),
|
|
std::move(curNode.task),
|
|
std::move(curNode.rctx));
|
|
if (consumeTaskStatus == AtomicNotificationQueueTaskStatus::CONSUMED) {
|
|
++numConsumed;
|
|
}
|
|
}
|
|
queue_.pop();
|
|
}
|
|
return wasAnyProcessed;
|
|
}
|
|
|
|
} // namespace folly
|