unioil-loyalty-rn-app/ios/Pods/Flipper-Folly/folly/io/async/EventBaseAtomicNotification...

298 lines
8.9 KiB
C++

/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/FileUtil.h>
#include <folly/io/async/EventBaseAtomicNotificationQueue.h>
#include <folly/system/Pid.h>
namespace folly {
template <typename Task, typename Consumer>
EventBaseAtomicNotificationQueue<Task, Consumer>::
EventBaseAtomicNotificationQueue(Consumer&& consumer)
: pid_(get_cached_pid()),
notificationQueue_(),
consumer_(std::move(consumer)) {
#ifdef FOLLY_HAVE_EVENTFD
eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (eventfd_ == -1) {
if (errno == ENOSYS || errno == EINVAL) {
// eventfd not availalble
LOG(ERROR) << "failed to create eventfd for AtomicNotificationQueue: "
<< errno << ", falling back to pipe mode (is your kernel "
<< "> 2.6.30?)";
} else {
// some other error
folly::throwSystemError(
"Failed to create eventfd for AtomicNotificationQueue", errno);
}
}
#endif
if (eventfd_ == -1) {
if (pipe(pipeFds_)) {
folly::throwSystemError(
"Failed to create pipe for AtomicNotificationQueue", errno);
}
try {
// put both ends of the pipe into non-blocking mode
if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
folly::throwSystemError(
"failed to put AtomicNotificationQueue pipe read "
"endpoint into non-blocking mode",
errno);
}
if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
folly::throwSystemError(
"failed to put AtomicNotificationQueue pipe write "
"endpoint into non-blocking mode",
errno);
}
} catch (...) {
::close(pipeFds_[0]);
::close(pipeFds_[1]);
throw;
}
}
}
template <typename Task, typename Consumer>
EventBaseAtomicNotificationQueue<Task, Consumer>::
~EventBaseAtomicNotificationQueue() {
// discard pending tasks and disarm the queue
while (drive(
[](Task&&) { return AtomicNotificationQueueTaskStatus::DISCARD; })) {
}
// We must unregister before closing the fd. Otherwise the base class
// would unregister the fd after it's already closed, which is invalid
// (some other thread could've opened something that reused the fd).
unregisterHandler();
// Don't drain fd in the child process.
if (pid_ == get_cached_pid()) {
// Wait till we observe all the writes before closing fds
while (writesObserved_ <
(successfulArmCount_ - consumerDisarmedCount_) + writesLocal_) {
drainFd();
}
DCHECK(
writesObserved_ ==
(successfulArmCount_ - consumerDisarmedCount_) + writesLocal_);
}
if (eventfd_ >= 0) {
::close(eventfd_);
eventfd_ = -1;
}
if (pipeFds_[0] >= 0) {
::close(pipeFds_[0]);
pipeFds_[0] = -1;
}
if (pipeFds_[1] >= 0) {
::close(pipeFds_[1]);
pipeFds_[1] = -1;
}
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::setMaxReadAtOnce(
uint32_t maxAtOnce) {
notificationQueue_.setMaxReadAtOnce(maxAtOnce);
}
template <typename Task, typename Consumer>
size_t EventBaseAtomicNotificationQueue<Task, Consumer>::size() const {
return notificationQueue_.size();
}
template <typename Task, typename Consumer>
bool EventBaseAtomicNotificationQueue<Task, Consumer>::empty() const {
return notificationQueue_.empty();
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::drain() {
while (drive(consumer_)) {
}
}
template <typename Task, typename Consumer>
template <typename... Args>
void EventBaseAtomicNotificationQueue<Task, Consumer>::putMessage(
Args&&... args) {
if (notificationQueue_.push(std::forward<Args>(args)...)) {
notifyFd();
}
}
template <typename Task, typename Consumer>
bool EventBaseAtomicNotificationQueue<Task, Consumer>::tryPutMessage(
Task&& task, uint32_t maxSize) {
auto result = notificationQueue_.tryPush(std::forward<Task>(task), maxSize);
if (result ==
AtomicNotificationQueue<Task>::TryPushResult::SUCCESS_AND_ARMED) {
notifyFd();
}
return result !=
AtomicNotificationQueue<Task>::TryPushResult::FAILED_LIMIT_REACHED;
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::stopConsuming() {
evb_ = nullptr;
cancelLoopCallback();
unregisterHandler();
detachEventBase();
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::startConsuming(
EventBase* evb) {
startConsumingImpl(evb, false);
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::startConsumingInternal(
EventBase* evb) {
startConsumingImpl(evb, true);
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::startConsumingImpl(
EventBase* evb, bool internal) {
evb_ = evb;
initHandler(
evb_,
folly::NetworkSocket::fromFd(eventfd_ >= 0 ? eventfd_ : pipeFds_[0]));
auto registerHandlerResult = internal
? registerInternalHandler(READ | PERSIST)
: registerHandler(READ | PERSIST);
if (registerHandlerResult) {
edgeTriggeredSet_ = eventfd_ >= 0 && setEdgeTriggered();
++writesLocal_;
notifyFd();
} else {
edgeTriggeredSet_ = false;
}
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::notifyFd() {
checkPid();
ssize_t bytes_written = 0;
size_t bytes_expected = 0;
do {
if (eventfd_ >= 0) {
// eventfd(2) dictates that we must write a 64-bit integer
uint64_t signal = 1;
bytes_expected = sizeof(signal);
bytes_written = ::write(eventfd_, &signal, bytes_expected);
} else {
uint8_t signal = 1;
bytes_expected = sizeof(signal);
bytes_written = ::write(pipeFds_[1], &signal, bytes_expected);
}
} while (bytes_written == -1 && errno == EINTR);
if (bytes_written != ssize_t(bytes_expected)) {
folly::throwSystemError(
"failed to signal AtomicNotificationQueue after "
"write",
errno);
}
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::drainFd() {
checkPid();
uint64_t message = 0;
if (eventfd_ >= 0) {
auto result = readNoInt(eventfd_, &message, sizeof(message));
CHECK(result == sizeof(message) || errno == EAGAIN);
writesObserved_ += message;
} else {
ssize_t result;
while ((result = readNoInt(pipeFds_[0], &message, sizeof(message))) != -1) {
writesObserved_ += result;
}
CHECK(result == -1 && errno == EAGAIN);
}
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::
runLoopCallback() noexcept {
DCHECK(!armed_);
if (!notificationQueue_.arm()) {
activateEvent();
} else {
armed_ = true;
successfulArmCount_++;
}
}
template <typename Task, typename Consumer>
template <typename T>
bool EventBaseAtomicNotificationQueue<Task, Consumer>::drive(T&& consumer) {
auto wasEmpty = !notificationQueue_.drive(std::forward<T>(consumer));
if (wasEmpty && armed_) {
consumerDisarmedCount_++;
}
armed_ = false;
return !wasEmpty;
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::handlerReady(
uint16_t) noexcept {
execute();
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::execute() {
if (!edgeTriggeredSet_) {
drainFd();
}
drive(consumer_);
evb_->runInLoop(this, false, nullptr);
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::activateEvent() {
if (!EventHandler::activateEvent(0)) {
// Fallback for EventBase backends that don't support activateEvent
++writesLocal_;
notifyFd();
}
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::checkPid() const {
if (FOLLY_UNLIKELY(pid_ != get_cached_pid())) {
checkPidFail();
}
}
template <typename Task, typename Consumer>
[[noreturn]] FOLLY_NOINLINE void
EventBaseAtomicNotificationQueue<Task, Consumer>::checkPidFail() const {
folly::terminate_with<std::runtime_error>(
"Pid mismatch. Pid = " + folly::to<std::string>(get_cached_pid()) +
". Expecting " + folly::to<std::string>(pid_));
}
} // namespace folly