From e43db26eb500e65e344415b011eaa1c639f1f7fe Mon Sep 17 00:00:00 2001 From: su-fang Date: Fri, 3 Mar 2023 17:15:32 +0800 Subject: [PATCH] Import Upstream version 0.9.3 --- .cargo_vcs_info.json | 6 + Cargo.toml | 56 + Cargo.toml.orig | 35 + LICENSE-APACHE | 201 +++ LICENSE-MIT | 25 + build.rs | 10 + src/lib.rs | 67 + src/parking_lot.rs | 1692 ++++++++++++++++++++++ src/spinwait.rs | 74 + src/thread_parker/generic.rs | 79 + src/thread_parker/linux.rs | 156 ++ src/thread_parker/mod.rs | 85 ++ src/thread_parker/redox.rs | 139 ++ src/thread_parker/sgx.rs | 94 ++ src/thread_parker/unix.rs | 242 ++++ src/thread_parker/wasm.rs | 54 + src/thread_parker/wasm_atomic.rs | 97 ++ src/thread_parker/windows/keyed_event.rs | 202 +++ src/thread_parker/windows/mod.rs | 188 +++ src/thread_parker/windows/waitaddress.rs | 138 ++ src/util.rs | 31 + src/word_lock.rs | 327 +++++ 22 files changed, 3998 insertions(+) create mode 100644 .cargo_vcs_info.json create mode 100644 Cargo.toml create mode 100644 Cargo.toml.orig create mode 100644 LICENSE-APACHE create mode 100644 LICENSE-MIT create mode 100644 build.rs create mode 100644 src/lib.rs create mode 100644 src/parking_lot.rs create mode 100644 src/spinwait.rs create mode 100644 src/thread_parker/generic.rs create mode 100644 src/thread_parker/linux.rs create mode 100644 src/thread_parker/mod.rs create mode 100644 src/thread_parker/redox.rs create mode 100644 src/thread_parker/sgx.rs create mode 100644 src/thread_parker/unix.rs create mode 100644 src/thread_parker/wasm.rs create mode 100644 src/thread_parker/wasm_atomic.rs create mode 100644 src/thread_parker/windows/keyed_event.rs create mode 100644 src/thread_parker/windows/mod.rs create mode 100644 src/thread_parker/windows/waitaddress.rs create mode 100644 src/util.rs create mode 100644 src/word_lock.rs diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json new file mode 100644 index 0000000..257de9a --- /dev/null +++ b/.cargo_vcs_info.json @@ -0,0 +1,6 @@ +{ + "git": { + "sha1": "c286ab4bad5bdbf1dabe8713b6410015e8293372" + }, + "path_in_vcs": "core" +} \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..900a062 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,56 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies. +# +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. + +[package] +edition = "2018" +name = "parking_lot_core" +version = "0.9.3" +authors = ["Amanieu d'Antras "] +description = "An advanced API for creating custom synchronization primitives." +keywords = [ + "mutex", + "condvar", + "rwlock", + "once", + "thread", +] +categories = ["concurrency"] +license = "MIT OR Apache-2.0" +repository = "https://github.com/Amanieu/parking_lot" + +#[dependencies.backtrace] +#version = "0.3.60" +#optional = true + +[dependencies.cfg-if] +version = "1.0.0" + +[dependencies.petgraph] +version = "0.6.0" +optional = true + +[dependencies.smallvec] +version = "1.6.1" + +[dependencies.thread-id] +version = "4.0.0" +optional = true + +[features] +deadlock_detection = [ + "petgraph", + "thread-id", + #"backtrace", +] +nightly = [] + +[target."cfg(unix)".dependencies.libc] +version = "0.2.95" diff --git a/Cargo.toml.orig b/Cargo.toml.orig new file mode 100644 index 0000000..cb8b004 --- /dev/null +++ b/Cargo.toml.orig @@ -0,0 +1,35 @@ +[package] +name = "parking_lot_core" +version = "0.9.3" +authors = ["Amanieu d'Antras "] +description = "An advanced API for creating custom synchronization primitives." +license = "MIT OR Apache-2.0" +repository = "https://github.com/Amanieu/parking_lot" +keywords = ["mutex", "condvar", "rwlock", "once", "thread"] +categories = ["concurrency"] +edition = "2018" + +[dependencies] +cfg-if = "1.0.0" +smallvec = "1.6.1" +petgraph = { version = "0.6.0", optional = true } +thread-id = { version = "4.0.0", optional = true } +backtrace = { version = "0.3.60", optional = true } + +[target.'cfg(unix)'.dependencies] +libc = "0.2.95" + +[target.'cfg(target_os = "redox")'.dependencies] +redox_syscall = "0.2.8" + +[target.'cfg(windows)'.dependencies] +windows-sys = { version = "0.36.0", features = [ + "Win32_Foundation", + "Win32_System_LibraryLoader", + "Win32_System_SystemServices", + "Win32_System_WindowsProgramming", +] } + +[features] +nightly = [] +deadlock_detection = ["petgraph", "thread-id", "backtrace"] diff --git a/LICENSE-APACHE b/LICENSE-APACHE new file mode 100644 index 0000000..16fe87b --- /dev/null +++ b/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..40b8817 --- /dev/null +++ b/LICENSE-MIT @@ -0,0 +1,25 @@ +Copyright (c) 2016 The Rust Project Developers + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..d29c769 --- /dev/null +++ b/build.rs @@ -0,0 +1,10 @@ +// Automatically detect tsan in a way that's compatible with both stable (which +// doesn't support sanitizers) and nightly (which does). Works because build +// scripts gets `cfg` info, even if the cfg is unstable. +fn main() { + println!("cargo:rerun-if-changed=build.rs"); + let santizer_list = std::env::var("CARGO_CFG_SANITIZE").unwrap_or_default(); + if santizer_list.contains("thread") { + println!("cargo:rustc-cfg=tsan_enabled"); + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..4845356 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,67 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +//! This library exposes a low-level API for creating your own efficient +//! synchronization primitives. +//! +//! # The parking lot +//! +//! To keep synchronization primitives small, all thread queuing and suspending +//! functionality is offloaded to the *parking lot*. The idea behind this is based +//! on the Webkit [`WTF::ParkingLot`](https://webkit.org/blog/6161/locking-in-webkit/) +//! class, which essentially consists of a hash table mapping of lock addresses +//! to queues of parked (sleeping) threads. The Webkit parking lot was itself +//! inspired by Linux [futexes](http://man7.org/linux/man-pages/man2/futex.2.html), +//! but it is more powerful since it allows invoking callbacks while holding a +//! queue lock. +//! +//! There are two main operations that can be performed on the parking lot: +//! +//! - *Parking* refers to suspending the thread while simultaneously enqueuing it +//! on a queue keyed by some address. +//! - *Unparking* refers to dequeuing a thread from a queue keyed by some address +//! and resuming it. +//! +//! See the documentation of the individual functions for more details. +//! +//! # Building custom synchronization primitives +//! +//! Building custom synchronization primitives is very simple since the parking +//! lot takes care of all the hard parts for you. A simple example for a +//! custom primitive would be to integrate a `Mutex` inside another data type. +//! Since a mutex only requires 2 bits, it can share space with other data. +//! For example, one could create an `ArcMutex` type that combines the atomic +//! reference count and the two mutex bits in the same atomic word. + +#![warn(missing_docs)] +#![warn(rust_2018_idioms)] +#![cfg_attr( + all(target_env = "sgx", target_vendor = "fortanix"), + feature(sgx_platform) +)] +#![cfg_attr( + all( + feature = "nightly", + target_family = "wasm", + target_feature = "atomics" + ), + feature(stdsimd) +)] + +mod parking_lot; +mod spinwait; +mod thread_parker; +mod util; +mod word_lock; + +pub use self::parking_lot::deadlock; +pub use self::parking_lot::{park, unpark_all, unpark_filter, unpark_one, unpark_requeue}; +pub use self::parking_lot::{ + FilterOp, ParkResult, ParkToken, RequeueOp, UnparkResult, UnparkToken, +}; +pub use self::parking_lot::{DEFAULT_PARK_TOKEN, DEFAULT_UNPARK_TOKEN}; +pub use self::spinwait::SpinWait; diff --git a/src/parking_lot.rs b/src/parking_lot.rs new file mode 100644 index 0000000..9b84525 --- /dev/null +++ b/src/parking_lot.rs @@ -0,0 +1,1692 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. +use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT}; +use crate::util::UncheckedOptionExt; +use crate::word_lock::WordLock; +use core::{ + cell::{Cell, UnsafeCell}, + ptr, + sync::atomic::{AtomicPtr, AtomicUsize, Ordering}, +}; +use smallvec::SmallVec; +use std::time::{Duration, Instant}; + +// Don't use Instant on wasm32-unknown-unknown, it just panics. +cfg_if::cfg_if! { + if #[cfg(all( + target_family = "wasm", + target_os = "unknown", + target_vendor = "unknown" + ))] { + #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] + struct TimeoutInstant; + impl TimeoutInstant { + fn now() -> TimeoutInstant { + TimeoutInstant + } + } + impl core::ops::Add for TimeoutInstant { + type Output = Self; + fn add(self, _rhs: Duration) -> Self::Output { + TimeoutInstant + } + } + } else { + use std::time::Instant as TimeoutInstant; + } +} + +static NUM_THREADS: AtomicUsize = AtomicUsize::new(0); + +/// Holds the pointer to the currently active `HashTable`. +/// +/// # Safety +/// +/// Except for the initial value of null, it must always point to a valid `HashTable` instance. +/// Any `HashTable` this global static has ever pointed to must never be freed. +static HASHTABLE: AtomicPtr = AtomicPtr::new(ptr::null_mut()); + +// Even with 3x more buckets than threads, the memory overhead per thread is +// still only a few hundred bytes per thread. +const LOAD_FACTOR: usize = 3; + +struct HashTable { + // Hash buckets for the table + entries: Box<[Bucket]>, + + // Number of bits used for the hash function + hash_bits: u32, + + // Previous table. This is only kept to keep leak detectors happy. + _prev: *const HashTable, +} + +impl HashTable { + #[inline] + fn new(num_threads: usize, prev: *const HashTable) -> Box { + let new_size = (num_threads * LOAD_FACTOR).next_power_of_two(); + let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1; + + let now = TimeoutInstant::now(); + let mut entries = Vec::with_capacity(new_size); + for i in 0..new_size { + // We must ensure the seed is not zero + entries.push(Bucket::new(now, i as u32 + 1)); + } + + Box::new(HashTable { + entries: entries.into_boxed_slice(), + hash_bits, + _prev: prev, + }) + } +} + +#[repr(align(64))] +struct Bucket { + // Lock protecting the queue + mutex: WordLock, + + // Linked list of threads waiting on this bucket + queue_head: Cell<*const ThreadData>, + queue_tail: Cell<*const ThreadData>, + + // Next time at which point be_fair should be set + fair_timeout: UnsafeCell, +} + +impl Bucket { + #[inline] + pub fn new(timeout: TimeoutInstant, seed: u32) -> Self { + Self { + mutex: WordLock::new(), + queue_head: Cell::new(ptr::null()), + queue_tail: Cell::new(ptr::null()), + fair_timeout: UnsafeCell::new(FairTimeout::new(timeout, seed)), + } + } +} + +struct FairTimeout { + // Next time at which point be_fair should be set + timeout: TimeoutInstant, + + // the PRNG state for calculating the next timeout + seed: u32, +} + +impl FairTimeout { + #[inline] + fn new(timeout: TimeoutInstant, seed: u32) -> FairTimeout { + FairTimeout { timeout, seed } + } + + // Determine whether we should force a fair unlock, and update the timeout + #[inline] + fn should_timeout(&mut self) -> bool { + let now = TimeoutInstant::now(); + if now > self.timeout { + // Time between 0 and 1ms. + let nanos = self.gen_u32() % 1_000_000; + self.timeout = now + Duration::new(0, nanos); + true + } else { + false + } + } + + // Pseudorandom number generator from the "Xorshift RNGs" paper by George Marsaglia. + fn gen_u32(&mut self) -> u32 { + self.seed ^= self.seed << 13; + self.seed ^= self.seed >> 17; + self.seed ^= self.seed << 5; + self.seed + } +} + +struct ThreadData { + parker: ThreadParker, + + // Key that this thread is sleeping on. This may change if the thread is + // requeued to a different key. + key: AtomicUsize, + + // Linked list of parked threads in a bucket + next_in_queue: Cell<*const ThreadData>, + + // UnparkToken passed to this thread when it is unparked + unpark_token: Cell, + + // ParkToken value set by the thread when it was parked + park_token: Cell, + + // Is the thread parked with a timeout? + parked_with_timeout: Cell, + + // Extra data for deadlock detection + #[cfg(feature = "deadlock_detection")] + deadlock_data: deadlock::DeadlockData, +} + +impl ThreadData { + fn new() -> ThreadData { + // Keep track of the total number of live ThreadData objects and resize + // the hash table accordingly. + let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1; + grow_hashtable(num_threads); + + ThreadData { + parker: ThreadParker::new(), + key: AtomicUsize::new(0), + next_in_queue: Cell::new(ptr::null()), + unpark_token: Cell::new(DEFAULT_UNPARK_TOKEN), + park_token: Cell::new(DEFAULT_PARK_TOKEN), + parked_with_timeout: Cell::new(false), + #[cfg(feature = "deadlock_detection")] + deadlock_data: deadlock::DeadlockData::new(), + } + } +} + +// Invokes the given closure with a reference to the current thread `ThreadData`. +#[inline(always)] +fn with_thread_data(f: impl FnOnce(&ThreadData) -> T) -> T { + // Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive + // to construct. Try to use a thread-local version if possible. Otherwise just + // create a ThreadData on the stack + let mut thread_data_storage = None; + thread_local!(static THREAD_DATA: ThreadData = ThreadData::new()); + let thread_data_ptr = THREAD_DATA + .try_with(|x| x as *const ThreadData) + .unwrap_or_else(|_| thread_data_storage.get_or_insert_with(ThreadData::new)); + + f(unsafe { &*thread_data_ptr }) +} + +impl Drop for ThreadData { + fn drop(&mut self) { + NUM_THREADS.fetch_sub(1, Ordering::Relaxed); + } +} + +/// Returns a reference to the latest hash table, creating one if it doesn't exist yet. +/// The reference is valid forever. However, the `HashTable` it references might become stale +/// at any point. Meaning it still exists, but it is not the instance in active use. +#[inline] +fn get_hashtable() -> &'static HashTable { + let table = HASHTABLE.load(Ordering::Acquire); + + // If there is no table, create one + if table.is_null() { + create_hashtable() + } else { + // SAFETY: when not null, `HASHTABLE` always points to a `HashTable` that is never freed. + unsafe { &*table } + } +} + +/// Returns a reference to the latest hash table, creating one if it doesn't exist yet. +/// The reference is valid forever. However, the `HashTable` it references might become stale +/// at any point. Meaning it still exists, but it is not the instance in active use. +#[cold] +fn create_hashtable() -> &'static HashTable { + let new_table = Box::into_raw(HashTable::new(LOAD_FACTOR, ptr::null())); + + // If this fails then it means some other thread created the hash table first. + let table = match HASHTABLE.compare_exchange( + ptr::null_mut(), + new_table, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => new_table, + Err(old_table) => { + // Free the table we created + // SAFETY: `new_table` is created from `Box::into_raw` above and only freed here. + unsafe { + Box::from_raw(new_table); + } + old_table + } + }; + // SAFETY: The `HashTable` behind `table` is never freed. It is either the table pointer we + // created here, or it is one loaded from `HASHTABLE`. + unsafe { &*table } +} + +// Grow the hash table so that it is big enough for the given number of threads. +// This isn't performance-critical since it is only done when a ThreadData is +// created, which only happens once per thread. +fn grow_hashtable(num_threads: usize) { + // Lock all buckets in the existing table and get a reference to it + let old_table = loop { + let table = get_hashtable(); + + // Check if we need to resize the existing table + if table.entries.len() >= LOAD_FACTOR * num_threads { + return; + } + + // Lock all buckets in the old table + for bucket in &table.entries[..] { + bucket.mutex.lock(); + } + + // Now check if our table is still the latest one. Another thread could + // have grown the hash table between us reading HASHTABLE and locking + // the buckets. + if HASHTABLE.load(Ordering::Relaxed) == table as *const _ as *mut _ { + break table; + } + + // Unlock buckets and try again + for bucket in &table.entries[..] { + // SAFETY: We hold the lock here, as required + unsafe { bucket.mutex.unlock() }; + } + }; + + // Create the new table + let mut new_table = HashTable::new(num_threads, old_table); + + // Move the entries from the old table to the new one + for bucket in &old_table.entries[..] { + // SAFETY: The park, unpark* and check_wait_graph_fast functions create only correct linked + // lists. All `ThreadData` instances in these lists will remain valid as long as they are + // present in the lists, meaning as long as their threads are parked. + unsafe { rehash_bucket_into(bucket, &mut new_table) }; + } + + // Publish the new table. No races are possible at this point because + // any other thread trying to grow the hash table is blocked on the bucket + // locks in the old table. + HASHTABLE.store(Box::into_raw(new_table), Ordering::Release); + + // Unlock all buckets in the old table + for bucket in &old_table.entries[..] { + // SAFETY: We hold the lock here, as required + unsafe { bucket.mutex.unlock() }; + } +} + +/// Iterate through all `ThreadData` objects in the bucket and insert them into the given table +/// in the bucket their key correspond to for this table. +/// +/// # Safety +/// +/// The given `bucket` must have a correctly constructed linked list under `queue_head`, containing +/// `ThreadData` instances that must stay valid at least as long as the given `table` is in use. +/// +/// The given `table` must only contain buckets with correctly constructed linked lists. +unsafe fn rehash_bucket_into(bucket: &'static Bucket, table: &mut HashTable) { + let mut current: *const ThreadData = bucket.queue_head.get(); + while !current.is_null() { + let next = (*current).next_in_queue.get(); + let hash = hash((*current).key.load(Ordering::Relaxed), table.hash_bits); + if table.entries[hash].queue_tail.get().is_null() { + table.entries[hash].queue_head.set(current); + } else { + (*table.entries[hash].queue_tail.get()) + .next_in_queue + .set(current); + } + table.entries[hash].queue_tail.set(current); + (*current).next_in_queue.set(ptr::null()); + current = next; + } +} + +// Hash function for addresses +#[cfg(target_pointer_width = "32")] +#[inline] +fn hash(key: usize, bits: u32) -> usize { + key.wrapping_mul(0x9E3779B9) >> (32 - bits) +} +#[cfg(target_pointer_width = "64")] +#[inline] +fn hash(key: usize, bits: u32) -> usize { + key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits) +} + +/// Locks the bucket for the given key and returns a reference to it. +/// The returned bucket must be unlocked again in order to not cause deadlocks. +#[inline] +fn lock_bucket(key: usize) -> &'static Bucket { + loop { + let hashtable = get_hashtable(); + + let hash = hash(key, hashtable.hash_bits); + let bucket = &hashtable.entries[hash]; + + // Lock the bucket + bucket.mutex.lock(); + + // If no other thread has rehashed the table before we grabbed the lock + // then we are good to go! The lock we grabbed prevents any rehashes. + if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ { + return bucket; + } + + // Unlock the bucket and try again + // SAFETY: We hold the lock here, as required + unsafe { bucket.mutex.unlock() }; + } +} + +/// Locks the bucket for the given key and returns a reference to it. But checks that the key +/// hasn't been changed in the meantime due to a requeue. +/// The returned bucket must be unlocked again in order to not cause deadlocks. +#[inline] +fn lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket) { + loop { + let hashtable = get_hashtable(); + let current_key = key.load(Ordering::Relaxed); + + let hash = hash(current_key, hashtable.hash_bits); + let bucket = &hashtable.entries[hash]; + + // Lock the bucket + bucket.mutex.lock(); + + // Check that both the hash table and key are correct while the bucket + // is locked. Note that the key can't change once we locked the proper + // bucket for it, so we just keep trying until we have the correct key. + if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ + && key.load(Ordering::Relaxed) == current_key + { + return (current_key, bucket); + } + + // Unlock the bucket and try again + // SAFETY: We hold the lock here, as required + unsafe { bucket.mutex.unlock() }; + } +} + +/// Locks the two buckets for the given pair of keys and returns references to them. +/// The returned buckets must be unlocked again in order to not cause deadlocks. +/// +/// If both keys hash to the same value, both returned references will be to the same bucket. Be +/// careful to only unlock it once in this case, always use `unlock_bucket_pair`. +#[inline] +fn lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Bucket) { + loop { + let hashtable = get_hashtable(); + + let hash1 = hash(key1, hashtable.hash_bits); + let hash2 = hash(key2, hashtable.hash_bits); + + // Get the bucket at the lowest hash/index first + let bucket1 = if hash1 <= hash2 { + &hashtable.entries[hash1] + } else { + &hashtable.entries[hash2] + }; + + // Lock the first bucket + bucket1.mutex.lock(); + + // If no other thread has rehashed the table before we grabbed the lock + // then we are good to go! The lock we grabbed prevents any rehashes. + if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ { + // Now lock the second bucket and return the two buckets + if hash1 == hash2 { + return (bucket1, bucket1); + } else if hash1 < hash2 { + let bucket2 = &hashtable.entries[hash2]; + bucket2.mutex.lock(); + return (bucket1, bucket2); + } else { + let bucket2 = &hashtable.entries[hash1]; + bucket2.mutex.lock(); + return (bucket2, bucket1); + } + } + + // Unlock the bucket and try again + // SAFETY: We hold the lock here, as required + unsafe { bucket1.mutex.unlock() }; + } +} + +/// Unlock a pair of buckets +/// +/// # Safety +/// +/// Both buckets must be locked +#[inline] +unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) { + bucket1.mutex.unlock(); + if !ptr::eq(bucket1, bucket2) { + bucket2.mutex.unlock(); + } +} + +/// Result of a park operation. +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +pub enum ParkResult { + /// We were unparked by another thread with the given token. + Unparked(UnparkToken), + + /// The validation callback returned false. + Invalid, + + /// The timeout expired. + TimedOut, +} + +impl ParkResult { + /// Returns true if we were unparked by another thread. + #[inline] + pub fn is_unparked(self) -> bool { + if let ParkResult::Unparked(_) = self { + true + } else { + false + } + } +} + +/// Result of an unpark operation. +#[derive(Copy, Clone, Default, Eq, PartialEq, Debug)] +pub struct UnparkResult { + /// The number of threads that were unparked. + pub unparked_threads: usize, + + /// The number of threads that were requeued. + pub requeued_threads: usize, + + /// Whether there are any threads remaining in the queue. This only returns + /// true if a thread was unparked. + pub have_more_threads: bool, + + /// This is set to true on average once every 0.5ms for any given key. It + /// should be used to switch to a fair unlocking mechanism for a particular + /// unlock. + pub be_fair: bool, + + /// Private field so new fields can be added without breakage. + _sealed: (), +} + +/// Operation that `unpark_requeue` should perform. +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +pub enum RequeueOp { + /// Abort the operation without doing anything. + Abort, + + /// Unpark one thread and requeue the rest onto the target queue. + UnparkOneRequeueRest, + + /// Requeue all threads onto the target queue. + RequeueAll, + + /// Unpark one thread and leave the rest parked. No requeuing is done. + UnparkOne, + + /// Requeue one thread and leave the rest parked on the original queue. + RequeueOne, +} + +/// Operation that `unpark_filter` should perform for each thread. +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +pub enum FilterOp { + /// Unpark the thread and continue scanning the list of parked threads. + Unpark, + + /// Don't unpark the thread and continue scanning the list of parked threads. + Skip, + + /// Don't unpark the thread and stop scanning the list of parked threads. + Stop, +} + +/// A value which is passed from an unparker to a parked thread. +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +pub struct UnparkToken(pub usize); + +/// A value associated with a parked thread which can be used by `unpark_filter`. +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +pub struct ParkToken(pub usize); + +/// A default unpark token to use. +pub const DEFAULT_UNPARK_TOKEN: UnparkToken = UnparkToken(0); + +/// A default park token to use. +pub const DEFAULT_PARK_TOKEN: ParkToken = ParkToken(0); + +/// Parks the current thread in the queue associated with the given key. +/// +/// The `validate` function is called while the queue is locked and can abort +/// the operation by returning false. If `validate` returns true then the +/// current thread is appended to the queue and the queue is unlocked. +/// +/// The `before_sleep` function is called after the queue is unlocked but before +/// the thread is put to sleep. The thread will then sleep until it is unparked +/// or the given timeout is reached. +/// +/// The `timed_out` function is also called while the queue is locked, but only +/// if the timeout was reached. It is passed the key of the queue it was in when +/// it timed out, which may be different from the original key if +/// `unpark_requeue` was called. It is also passed a bool which indicates +/// whether it was the last thread in the queue. +/// +/// # Safety +/// +/// You should only call this function with an address that you control, since +/// you could otherwise interfere with the operation of other synchronization +/// primitives. +/// +/// The `validate` and `timed_out` functions are called while the queue is +/// locked and must not panic or call into any function in `parking_lot`. +/// +/// The `before_sleep` function is called outside the queue lock and is allowed +/// to call `unpark_one`, `unpark_all`, `unpark_requeue` or `unpark_filter`, but +/// it is not allowed to call `park` or panic. +#[inline] +pub unsafe fn park( + key: usize, + validate: impl FnOnce() -> bool, + before_sleep: impl FnOnce(), + timed_out: impl FnOnce(usize, bool), + park_token: ParkToken, + timeout: Option, +) -> ParkResult { + // Grab our thread data, this also ensures that the hash table exists + with_thread_data(|thread_data| { + // Lock the bucket for the given key + let bucket = lock_bucket(key); + + // If the validation function fails, just return + if !validate() { + // SAFETY: We hold the lock here, as required + bucket.mutex.unlock(); + return ParkResult::Invalid; + } + + // Append our thread data to the queue and unlock the bucket + thread_data.parked_with_timeout.set(timeout.is_some()); + thread_data.next_in_queue.set(ptr::null()); + thread_data.key.store(key, Ordering::Relaxed); + thread_data.park_token.set(park_token); + thread_data.parker.prepare_park(); + if !bucket.queue_head.get().is_null() { + (*bucket.queue_tail.get()).next_in_queue.set(thread_data); + } else { + bucket.queue_head.set(thread_data); + } + bucket.queue_tail.set(thread_data); + // SAFETY: We hold the lock here, as required + bucket.mutex.unlock(); + + // Invoke the pre-sleep callback + before_sleep(); + + // Park our thread and determine whether we were woken up by an unpark + // or by our timeout. Note that this isn't precise: we can still be + // unparked since we are still in the queue. + let unparked = match timeout { + Some(timeout) => thread_data.parker.park_until(timeout), + None => { + thread_data.parker.park(); + // call deadlock detection on_unpark hook + deadlock::on_unpark(thread_data); + true + } + }; + + // If we were unparked, return now + if unparked { + return ParkResult::Unparked(thread_data.unpark_token.get()); + } + + // Lock our bucket again. Note that the hashtable may have been rehashed in + // the meantime. Our key may also have changed if we were requeued. + let (key, bucket) = lock_bucket_checked(&thread_data.key); + + // Now we need to check again if we were unparked or timed out. Unlike the + // last check this is precise because we hold the bucket lock. + if !thread_data.parker.timed_out() { + // SAFETY: We hold the lock here, as required + bucket.mutex.unlock(); + return ParkResult::Unparked(thread_data.unpark_token.get()); + } + + // We timed out, so we now need to remove our thread from the queue + let mut link = &bucket.queue_head; + let mut current = bucket.queue_head.get(); + let mut previous = ptr::null(); + let mut was_last_thread = true; + while !current.is_null() { + if current == thread_data { + let next = (*current).next_in_queue.get(); + link.set(next); + if bucket.queue_tail.get() == current { + bucket.queue_tail.set(previous); + } else { + // Scan the rest of the queue to see if there are any other + // entries with the given key. + let mut scan = next; + while !scan.is_null() { + if (*scan).key.load(Ordering::Relaxed) == key { + was_last_thread = false; + break; + } + scan = (*scan).next_in_queue.get(); + } + } + + // Callback to indicate that we timed out, and whether we were the + // last thread on the queue. + timed_out(key, was_last_thread); + break; + } else { + if (*current).key.load(Ordering::Relaxed) == key { + was_last_thread = false; + } + link = &(*current).next_in_queue; + previous = current; + current = link.get(); + } + } + + // There should be no way for our thread to have been removed from the queue + // if we timed out. + debug_assert!(!current.is_null()); + + // Unlock the bucket, we are done + // SAFETY: We hold the lock here, as required + bucket.mutex.unlock(); + ParkResult::TimedOut + }) +} + +/// Unparks one thread from the queue associated with the given key. +/// +/// The `callback` function is called while the queue is locked and before the +/// target thread is woken up. The `UnparkResult` argument to the function +/// indicates whether a thread was found in the queue and whether this was the +/// last thread in the queue. This value is also returned by `unpark_one`. +/// +/// The `callback` function should return an `UnparkToken` value which will be +/// passed to the thread that is unparked. If no thread is unparked then the +/// returned value is ignored. +/// +/// # Safety +/// +/// You should only call this function with an address that you control, since +/// you could otherwise interfere with the operation of other synchronization +/// primitives. +/// +/// The `callback` function is called while the queue is locked and must not +/// panic or call into any function in `parking_lot`. +#[inline] +pub unsafe fn unpark_one( + key: usize, + callback: impl FnOnce(UnparkResult) -> UnparkToken, +) -> UnparkResult { + // Lock the bucket for the given key + let bucket = lock_bucket(key); + + // Find a thread with a matching key and remove it from the queue + let mut link = &bucket.queue_head; + let mut current = bucket.queue_head.get(); + let mut previous = ptr::null(); + let mut result = UnparkResult::default(); + while !current.is_null() { + if (*current).key.load(Ordering::Relaxed) == key { + // Remove the thread from the queue + let next = (*current).next_in_queue.get(); + link.set(next); + if bucket.queue_tail.get() == current { + bucket.queue_tail.set(previous); + } else { + // Scan the rest of the queue to see if there are any other + // entries with the given key. + let mut scan = next; + while !scan.is_null() { + if (*scan).key.load(Ordering::Relaxed) == key { + result.have_more_threads = true; + break; + } + scan = (*scan).next_in_queue.get(); + } + } + + // Invoke the callback before waking up the thread + result.unparked_threads = 1; + result.be_fair = (*bucket.fair_timeout.get()).should_timeout(); + let token = callback(result); + + // Set the token for the target thread + (*current).unpark_token.set(token); + + // This is a bit tricky: we first lock the ThreadParker to prevent + // the thread from exiting and freeing its ThreadData if its wait + // times out. Then we unlock the queue since we don't want to keep + // the queue locked while we perform a system call. Finally we wake + // up the parked thread. + let handle = (*current).parker.unpark_lock(); + // SAFETY: We hold the lock here, as required + bucket.mutex.unlock(); + handle.unpark(); + + return result; + } else { + link = &(*current).next_in_queue; + previous = current; + current = link.get(); + } + } + + // No threads with a matching key were found in the bucket + callback(result); + // SAFETY: We hold the lock here, as required + bucket.mutex.unlock(); + result +} + +/// Unparks all threads in the queue associated with the given key. +/// +/// The given `UnparkToken` is passed to all unparked threads. +/// +/// This function returns the number of threads that were unparked. +/// +/// # Safety +/// +/// You should only call this function with an address that you control, since +/// you could otherwise interfere with the operation of other synchronization +/// primitives. +#[inline] +pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize { + // Lock the bucket for the given key + let bucket = lock_bucket(key); + + // Remove all threads with the given key in the bucket + let mut link = &bucket.queue_head; + let mut current = bucket.queue_head.get(); + let mut previous = ptr::null(); + let mut threads = SmallVec::<[_; 8]>::new(); + while !current.is_null() { + if (*current).key.load(Ordering::Relaxed) == key { + // Remove the thread from the queue + let next = (*current).next_in_queue.get(); + link.set(next); + if bucket.queue_tail.get() == current { + bucket.queue_tail.set(previous); + } + + // Set the token for the target thread + (*current).unpark_token.set(unpark_token); + + // Don't wake up threads while holding the queue lock. See comment + // in unpark_one. For now just record which threads we need to wake + // up. + threads.push((*current).parker.unpark_lock()); + current = next; + } else { + link = &(*current).next_in_queue; + previous = current; + current = link.get(); + } + } + + // Unlock the bucket + // SAFETY: We hold the lock here, as required + bucket.mutex.unlock(); + + // Now that we are outside the lock, wake up all the threads that we removed + // from the queue. + let num_threads = threads.len(); + for handle in threads.into_iter() { + handle.unpark(); + } + + num_threads +} + +/// Removes all threads from the queue associated with `key_from`, optionally +/// unparks the first one and requeues the rest onto the queue associated with +/// `key_to`. +/// +/// The `validate` function is called while both queues are locked. Its return +/// value will determine which operation is performed, or whether the operation +/// should be aborted. See `RequeueOp` for details about the different possible +/// return values. +/// +/// The `callback` function is also called while both queues are locked. It is +/// passed the `RequeueOp` returned by `validate` and an `UnparkResult` +/// indicating whether a thread was unparked and whether there are threads still +/// parked in the new queue. This `UnparkResult` value is also returned by +/// `unpark_requeue`. +/// +/// The `callback` function should return an `UnparkToken` value which will be +/// passed to the thread that is unparked. If no thread is unparked then the +/// returned value is ignored. +/// +/// # Safety +/// +/// You should only call this function with an address that you control, since +/// you could otherwise interfere with the operation of other synchronization +/// primitives. +/// +/// The `validate` and `callback` functions are called while the queue is locked +/// and must not panic or call into any function in `parking_lot`. +#[inline] +pub unsafe fn unpark_requeue( + key_from: usize, + key_to: usize, + validate: impl FnOnce() -> RequeueOp, + callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken, +) -> UnparkResult { + // Lock the two buckets for the given key + let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to); + + // If the validation function fails, just return + let mut result = UnparkResult::default(); + let op = validate(); + if op == RequeueOp::Abort { + // SAFETY: Both buckets are locked, as required. + unlock_bucket_pair(bucket_from, bucket_to); + return result; + } + + // Remove all threads with the given key in the source bucket + let mut link = &bucket_from.queue_head; + let mut current = bucket_from.queue_head.get(); + let mut previous = ptr::null(); + let mut requeue_threads: *const ThreadData = ptr::null(); + let mut requeue_threads_tail: *const ThreadData = ptr::null(); + let mut wakeup_thread = None; + while !current.is_null() { + if (*current).key.load(Ordering::Relaxed) == key_from { + // Remove the thread from the queue + let next = (*current).next_in_queue.get(); + link.set(next); + if bucket_from.queue_tail.get() == current { + bucket_from.queue_tail.set(previous); + } + + // Prepare the first thread for wakeup and requeue the rest. + if (op == RequeueOp::UnparkOneRequeueRest || op == RequeueOp::UnparkOne) + && wakeup_thread.is_none() + { + wakeup_thread = Some(current); + result.unparked_threads = 1; + } else { + if !requeue_threads.is_null() { + (*requeue_threads_tail).next_in_queue.set(current); + } else { + requeue_threads = current; + } + requeue_threads_tail = current; + (*current).key.store(key_to, Ordering::Relaxed); + result.requeued_threads += 1; + } + if op == RequeueOp::UnparkOne || op == RequeueOp::RequeueOne { + // Scan the rest of the queue to see if there are any other + // entries with the given key. + let mut scan = next; + while !scan.is_null() { + if (*scan).key.load(Ordering::Relaxed) == key_from { + result.have_more_threads = true; + break; + } + scan = (*scan).next_in_queue.get(); + } + break; + } + current = next; + } else { + link = &(*current).next_in_queue; + previous = current; + current = link.get(); + } + } + + // Add the requeued threads to the destination bucket + if !requeue_threads.is_null() { + (*requeue_threads_tail).next_in_queue.set(ptr::null()); + if !bucket_to.queue_head.get().is_null() { + (*bucket_to.queue_tail.get()) + .next_in_queue + .set(requeue_threads); + } else { + bucket_to.queue_head.set(requeue_threads); + } + bucket_to.queue_tail.set(requeue_threads_tail); + } + + // Invoke the callback before waking up the thread + if result.unparked_threads != 0 { + result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout(); + } + let token = callback(op, result); + + // See comment in unpark_one for why we mess with the locking + if let Some(wakeup_thread) = wakeup_thread { + (*wakeup_thread).unpark_token.set(token); + let handle = (*wakeup_thread).parker.unpark_lock(); + // SAFETY: Both buckets are locked, as required. + unlock_bucket_pair(bucket_from, bucket_to); + handle.unpark(); + } else { + // SAFETY: Both buckets are locked, as required. + unlock_bucket_pair(bucket_from, bucket_to); + } + + result +} + +/// Unparks a number of threads from the front of the queue associated with +/// `key` depending on the results of a filter function which inspects the +/// `ParkToken` associated with each thread. +/// +/// The `filter` function is called for each thread in the queue or until +/// `FilterOp::Stop` is returned. This function is passed the `ParkToken` +/// associated with a particular thread, which is unparked if `FilterOp::Unpark` +/// is returned. +/// +/// The `callback` function is also called while both queues are locked. It is +/// passed an `UnparkResult` indicating the number of threads that were unparked +/// and whether there are still parked threads in the queue. This `UnparkResult` +/// value is also returned by `unpark_filter`. +/// +/// The `callback` function should return an `UnparkToken` value which will be +/// passed to all threads that are unparked. If no thread is unparked then the +/// returned value is ignored. +/// +/// # Safety +/// +/// You should only call this function with an address that you control, since +/// you could otherwise interfere with the operation of other synchronization +/// primitives. +/// +/// The `filter` and `callback` functions are called while the queue is locked +/// and must not panic or call into any function in `parking_lot`. +#[inline] +pub unsafe fn unpark_filter( + key: usize, + mut filter: impl FnMut(ParkToken) -> FilterOp, + callback: impl FnOnce(UnparkResult) -> UnparkToken, +) -> UnparkResult { + // Lock the bucket for the given key + let bucket = lock_bucket(key); + + // Go through the queue looking for threads with a matching key + let mut link = &bucket.queue_head; + let mut current = bucket.queue_head.get(); + let mut previous = ptr::null(); + let mut threads = SmallVec::<[_; 8]>::new(); + let mut result = UnparkResult::default(); + while !current.is_null() { + if (*current).key.load(Ordering::Relaxed) == key { + // Call the filter function with the thread's ParkToken + let next = (*current).next_in_queue.get(); + match filter((*current).park_token.get()) { + FilterOp::Unpark => { + // Remove the thread from the queue + link.set(next); + if bucket.queue_tail.get() == current { + bucket.queue_tail.set(previous); + } + + // Add the thread to our list of threads to unpark + threads.push((current, None)); + + current = next; + } + FilterOp::Skip => { + result.have_more_threads = true; + link = &(*current).next_in_queue; + previous = current; + current = link.get(); + } + FilterOp::Stop => { + result.have_more_threads = true; + break; + } + } + } else { + link = &(*current).next_in_queue; + previous = current; + current = link.get(); + } + } + + // Invoke the callback before waking up the threads + result.unparked_threads = threads.len(); + if result.unparked_threads != 0 { + result.be_fair = (*bucket.fair_timeout.get()).should_timeout(); + } + let token = callback(result); + + // Pass the token to all threads that are going to be unparked and prepare + // them for unparking. + for t in threads.iter_mut() { + (*t.0).unpark_token.set(token); + t.1 = Some((*t.0).parker.unpark_lock()); + } + + // SAFETY: We hold the lock here, as required + bucket.mutex.unlock(); + + // Now that we are outside the lock, wake up all the threads that we removed + // from the queue. + for (_, handle) in threads.into_iter() { + handle.unchecked_unwrap().unpark(); + } + + result +} + +/// \[Experimental\] Deadlock detection +/// +/// Enabled via the `deadlock_detection` feature flag. +pub mod deadlock { + #[cfg(feature = "deadlock_detection")] + use super::deadlock_impl; + + #[cfg(feature = "deadlock_detection")] + pub(super) use super::deadlock_impl::DeadlockData; + + /// Acquire a resource identified by key in the deadlock detector + /// Noop if deadlock_detection feature isn't enabled. + /// + /// # Safety + /// + /// Call after the resource is acquired + #[inline] + pub unsafe fn acquire_resource(_key: usize) { + #[cfg(feature = "deadlock_detection")] + deadlock_impl::acquire_resource(_key); + } + + /// Release a resource identified by key in the deadlock detector. + /// Noop if deadlock_detection feature isn't enabled. + /// + /// # Panics + /// + /// Panics if the resource was already released or wasn't acquired in this thread. + /// + /// # Safety + /// + /// Call before the resource is released + #[inline] + pub unsafe fn release_resource(_key: usize) { + #[cfg(feature = "deadlock_detection")] + deadlock_impl::release_resource(_key); + } + + /// Returns all deadlocks detected *since* the last call. + /// Each cycle consist of a vector of `DeadlockedThread`. + #[cfg(feature = "deadlock_detection")] + #[inline] + pub fn check_deadlock() -> Vec> { + deadlock_impl::check_deadlock() + } + + #[inline] + pub(super) unsafe fn on_unpark(_td: &super::ThreadData) { + #[cfg(feature = "deadlock_detection")] + deadlock_impl::on_unpark(_td); + } +} + +#[cfg(feature = "deadlock_detection")] +mod deadlock_impl { + use super::{get_hashtable, lock_bucket, with_thread_data, ThreadData, NUM_THREADS}; + use crate::thread_parker::{ThreadParkerT, UnparkHandleT}; + use crate::word_lock::WordLock; + use backtrace::Backtrace; + use petgraph; + use petgraph::graphmap::DiGraphMap; + use std::cell::{Cell, UnsafeCell}; + use std::collections::HashSet; + use std::sync::atomic::Ordering; + use std::sync::mpsc; + use thread_id; + + /// Representation of a deadlocked thread + pub struct DeadlockedThread { + thread_id: usize, + backtrace: Backtrace, + } + + impl DeadlockedThread { + /// The system thread id + pub fn thread_id(&self) -> usize { + self.thread_id + } + + /// The thread backtrace + pub fn backtrace(&self) -> &Backtrace { + &self.backtrace + } + } + + pub struct DeadlockData { + // Currently owned resources (keys) + resources: UnsafeCell>, + + // Set when there's a pending callstack request + deadlocked: Cell, + + // Sender used to report the backtrace + backtrace_sender: UnsafeCell>>, + + // System thread id + thread_id: usize, + } + + impl DeadlockData { + pub fn new() -> Self { + DeadlockData { + resources: UnsafeCell::new(Vec::new()), + deadlocked: Cell::new(false), + backtrace_sender: UnsafeCell::new(None), + thread_id: thread_id::get(), + } + } + } + + pub(super) unsafe fn on_unpark(td: &ThreadData) { + if td.deadlock_data.deadlocked.get() { + let sender = (*td.deadlock_data.backtrace_sender.get()).take().unwrap(); + sender + .send(DeadlockedThread { + thread_id: td.deadlock_data.thread_id, + backtrace: Backtrace::new(), + }) + .unwrap(); + // make sure to close this sender + drop(sender); + + // park until the end of the time + td.parker.prepare_park(); + td.parker.park(); + unreachable!("unparked deadlocked thread!"); + } + } + + pub unsafe fn acquire_resource(key: usize) { + with_thread_data(|thread_data| { + (*thread_data.deadlock_data.resources.get()).push(key); + }); + } + + pub unsafe fn release_resource(key: usize) { + with_thread_data(|thread_data| { + let resources = &mut (*thread_data.deadlock_data.resources.get()); + + // There is only one situation where we can fail to find the + // resource: we are currently running TLS destructors and our + // ThreadData has already been freed. There isn't much we can do + // about it at this point, so just ignore it. + if let Some(p) = resources.iter().rposition(|x| *x == key) { + resources.swap_remove(p); + } + }); + } + + pub fn check_deadlock() -> Vec> { + unsafe { + // fast pass + if check_wait_graph_fast() { + // double check + check_wait_graph_slow() + } else { + Vec::new() + } + } + } + + // Simple algorithm that builds a wait graph f the threads and the resources, + // then checks for the presence of cycles (deadlocks). + // This variant isn't precise as it doesn't lock the entire table before checking + unsafe fn check_wait_graph_fast() -> bool { + let table = get_hashtable(); + let thread_count = NUM_THREADS.load(Ordering::Relaxed); + let mut graph = DiGraphMap::::with_capacity(thread_count * 2, thread_count * 2); + + for b in &(*table).entries[..] { + b.mutex.lock(); + let mut current = b.queue_head.get(); + while !current.is_null() { + if !(*current).parked_with_timeout.get() + && !(*current).deadlock_data.deadlocked.get() + { + // .resources are waiting for their owner + for &resource in &(*(*current).deadlock_data.resources.get()) { + graph.add_edge(resource, current as usize, ()); + } + // owner waits for resource .key + graph.add_edge(current as usize, (*current).key.load(Ordering::Relaxed), ()); + } + current = (*current).next_in_queue.get(); + } + // SAFETY: We hold the lock here, as required + b.mutex.unlock(); + } + + petgraph::algo::is_cyclic_directed(&graph) + } + + #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)] + enum WaitGraphNode { + Thread(*const ThreadData), + Resource(usize), + } + + use self::WaitGraphNode::*; + + // Contrary to the _fast variant this locks the entries table before looking for cycles. + // Returns all detected thread wait cycles. + // Note that once a cycle is reported it's never reported again. + unsafe fn check_wait_graph_slow() -> Vec> { + static DEADLOCK_DETECTION_LOCK: WordLock = WordLock::new(); + DEADLOCK_DETECTION_LOCK.lock(); + + let mut table = get_hashtable(); + loop { + // Lock all buckets in the old table + for b in &table.entries[..] { + b.mutex.lock(); + } + + // Now check if our table is still the latest one. Another thread could + // have grown the hash table between us getting and locking the hash table. + let new_table = get_hashtable(); + if new_table as *const _ == table as *const _ { + break; + } + + // Unlock buckets and try again + for b in &table.entries[..] { + // SAFETY: We hold the lock here, as required + b.mutex.unlock(); + } + + table = new_table; + } + + let thread_count = NUM_THREADS.load(Ordering::Relaxed); + let mut graph = + DiGraphMap::::with_capacity(thread_count * 2, thread_count * 2); + + for b in &table.entries[..] { + let mut current = b.queue_head.get(); + while !current.is_null() { + if !(*current).parked_with_timeout.get() + && !(*current).deadlock_data.deadlocked.get() + { + // .resources are waiting for their owner + for &resource in &(*(*current).deadlock_data.resources.get()) { + graph.add_edge(Resource(resource), Thread(current), ()); + } + // owner waits for resource .key + graph.add_edge( + Thread(current), + Resource((*current).key.load(Ordering::Relaxed)), + (), + ); + } + current = (*current).next_in_queue.get(); + } + } + + for b in &table.entries[..] { + // SAFETY: We hold the lock here, as required + b.mutex.unlock(); + } + + // find cycles + let cycles = graph_cycles(&graph); + + let mut results = Vec::with_capacity(cycles.len()); + + for cycle in cycles { + let (sender, receiver) = mpsc::channel(); + for td in cycle { + let bucket = lock_bucket((*td).key.load(Ordering::Relaxed)); + (*td).deadlock_data.deadlocked.set(true); + *(*td).deadlock_data.backtrace_sender.get() = Some(sender.clone()); + let handle = (*td).parker.unpark_lock(); + // SAFETY: We hold the lock here, as required + bucket.mutex.unlock(); + // unpark the deadlocked thread! + // on unpark it'll notice the deadlocked flag and report back + handle.unpark(); + } + // make sure to drop our sender before collecting results + drop(sender); + results.push(receiver.iter().collect()); + } + + DEADLOCK_DETECTION_LOCK.unlock(); + + results + } + + // normalize a cycle to start with the "smallest" node + fn normalize_cycle(input: &[T]) -> Vec { + let min_pos = input + .iter() + .enumerate() + .min_by_key(|&(_, &t)| t) + .map(|(p, _)| p) + .unwrap_or(0); + input + .iter() + .cycle() + .skip(min_pos) + .take(input.len()) + .cloned() + .collect() + } + + // returns all thread cycles in the wait graph + fn graph_cycles(g: &DiGraphMap) -> Vec> { + use petgraph::visit::depth_first_search; + use petgraph::visit::DfsEvent; + use petgraph::visit::NodeIndexable; + + let mut cycles = HashSet::new(); + let mut path = Vec::with_capacity(g.node_bound()); + // start from threads to get the correct threads cycle + let threads = g + .nodes() + .filter(|n| if let &Thread(_) = n { true } else { false }); + + depth_first_search(g, threads, |e| match e { + DfsEvent::Discover(Thread(n), _) => path.push(n), + DfsEvent::Finish(Thread(_), _) => { + path.pop(); + } + DfsEvent::BackEdge(_, Thread(n)) => { + let from = path.iter().rposition(|&i| i == n).unwrap(); + cycles.insert(normalize_cycle(&path[from..])); + } + _ => (), + }); + + cycles.iter().cloned().collect() + } +} + +#[cfg(test)] +mod tests { + use super::{ThreadData, DEFAULT_PARK_TOKEN, DEFAULT_UNPARK_TOKEN}; + use std::{ + ptr, + sync::{ + atomic::{AtomicIsize, AtomicPtr, AtomicUsize, Ordering}, + Arc, + }, + thread, + time::Duration, + }; + + /// Calls a closure for every `ThreadData` currently parked on a given key + fn for_each(key: usize, mut f: impl FnMut(&ThreadData)) { + let bucket = super::lock_bucket(key); + + let mut current: *const ThreadData = bucket.queue_head.get(); + while !current.is_null() { + let current_ref = unsafe { &*current }; + if current_ref.key.load(Ordering::Relaxed) == key { + f(current_ref); + } + current = current_ref.next_in_queue.get(); + } + + // SAFETY: We hold the lock here, as required + unsafe { bucket.mutex.unlock() }; + } + + macro_rules! test { + ( $( $name:ident( + repeats: $repeats:expr, + latches: $latches:expr, + delay: $delay:expr, + threads: $threads:expr, + single_unparks: $single_unparks:expr); + )* ) => { + $(#[test] + fn $name() { + let delay = Duration::from_micros($delay); + for _ in 0..$repeats { + run_parking_test($latches, delay, $threads, $single_unparks); + } + })* + }; + } + + test! { + unpark_all_one_fast( + repeats: 10000, latches: 1, delay: 0, threads: 1, single_unparks: 0 + ); + unpark_all_hundred_fast( + repeats: 100, latches: 1, delay: 0, threads: 100, single_unparks: 0 + ); + unpark_one_one_fast( + repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 1 + ); + unpark_one_hundred_fast( + repeats: 20, latches: 1, delay: 0, threads: 100, single_unparks: 100 + ); + unpark_one_fifty_then_fifty_all_fast( + repeats: 50, latches: 1, delay: 0, threads: 100, single_unparks: 50 + ); + unpark_all_one( + repeats: 100, latches: 1, delay: 10000, threads: 1, single_unparks: 0 + ); + unpark_all_hundred( + repeats: 100, latches: 1, delay: 10000, threads: 100, single_unparks: 0 + ); + unpark_one_one( + repeats: 10, latches: 1, delay: 10000, threads: 1, single_unparks: 1 + ); + unpark_one_fifty( + repeats: 1, latches: 1, delay: 10000, threads: 50, single_unparks: 50 + ); + unpark_one_fifty_then_fifty_all( + repeats: 2, latches: 1, delay: 10000, threads: 100, single_unparks: 50 + ); + hundred_unpark_all_one_fast( + repeats: 100, latches: 100, delay: 0, threads: 1, single_unparks: 0 + ); + hundred_unpark_all_one( + repeats: 1, latches: 100, delay: 10000, threads: 1, single_unparks: 0 + ); + } + + fn run_parking_test( + num_latches: usize, + delay: Duration, + num_threads: usize, + num_single_unparks: usize, + ) { + let mut tests = Vec::with_capacity(num_latches); + + for _ in 0..num_latches { + let test = Arc::new(SingleLatchTest::new(num_threads)); + let mut threads = Vec::with_capacity(num_threads); + for _ in 0..num_threads { + let test = test.clone(); + threads.push(thread::spawn(move || test.run())); + } + tests.push((test, threads)); + } + + for unpark_index in 0..num_single_unparks { + thread::sleep(delay); + for (test, _) in &tests { + test.unpark_one(unpark_index); + } + } + + for (test, threads) in tests { + test.finish(num_single_unparks); + for thread in threads { + thread.join().expect("Test thread panic"); + } + } + } + + struct SingleLatchTest { + semaphore: AtomicIsize, + num_awake: AtomicUsize, + /// Holds the pointer to the last *unprocessed* woken up thread. + last_awoken: AtomicPtr, + /// Total number of threads participating in this test. + num_threads: usize, + } + + impl SingleLatchTest { + pub fn new(num_threads: usize) -> Self { + Self { + // This implements a fair (FIFO) semaphore, and it starts out unavailable. + semaphore: AtomicIsize::new(0), + num_awake: AtomicUsize::new(0), + last_awoken: AtomicPtr::new(ptr::null_mut()), + num_threads, + } + } + + pub fn run(&self) { + // Get one slot from the semaphore + self.down(); + + // Report back to the test verification code that this thread woke up + let this_thread_ptr = super::with_thread_data(|t| t as *const _ as *mut _); + self.last_awoken.store(this_thread_ptr, Ordering::SeqCst); + self.num_awake.fetch_add(1, Ordering::SeqCst); + } + + pub fn unpark_one(&self, single_unpark_index: usize) { + // last_awoken should be null at all times except between self.up() and at the bottom + // of this method where it's reset to null again + assert!(self.last_awoken.load(Ordering::SeqCst).is_null()); + + let mut queue: Vec<*mut ThreadData> = Vec::with_capacity(self.num_threads); + for_each(self.semaphore_addr(), |thread_data| { + queue.push(thread_data as *const _ as *mut _); + }); + assert!(queue.len() <= self.num_threads - single_unpark_index); + + let num_awake_before_up = self.num_awake.load(Ordering::SeqCst); + + self.up(); + + // Wait for a parked thread to wake up and update num_awake + last_awoken. + while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 { + thread::yield_now(); + } + + // At this point the other thread should have set last_awoken inside the run() method + let last_awoken = self.last_awoken.load(Ordering::SeqCst); + assert!(!last_awoken.is_null()); + if !queue.is_empty() && queue[0] != last_awoken { + panic!( + "Woke up wrong thread:\n\tqueue: {:?}\n\tlast awoken: {:?}", + queue, last_awoken + ); + } + self.last_awoken.store(ptr::null_mut(), Ordering::SeqCst); + } + + pub fn finish(&self, num_single_unparks: usize) { + // The amount of threads not unparked via unpark_one + let mut num_threads_left = self.num_threads.checked_sub(num_single_unparks).unwrap(); + + // Wake remaining threads up with unpark_all. Has to be in a loop, because there might + // still be threads that has not yet parked. + while num_threads_left > 0 { + let mut num_waiting_on_address = 0; + for_each(self.semaphore_addr(), |_thread_data| { + num_waiting_on_address += 1; + }); + assert!(num_waiting_on_address <= num_threads_left); + + let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst); + + let num_unparked = + unsafe { super::unpark_all(self.semaphore_addr(), DEFAULT_UNPARK_TOKEN) }; + assert!(num_unparked >= num_waiting_on_address); + assert!(num_unparked <= num_threads_left); + + // Wait for all unparked threads to wake up and update num_awake + last_awoken. + while self.num_awake.load(Ordering::SeqCst) + != num_awake_before_unpark + num_unparked + { + thread::yield_now() + } + + num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap(); + } + // By now, all threads should have been woken up + assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads); + + // Make sure no thread is parked on our semaphore address + let mut num_waiting_on_address = 0; + for_each(self.semaphore_addr(), |_thread_data| { + num_waiting_on_address += 1; + }); + assert_eq!(num_waiting_on_address, 0); + } + + pub fn down(&self) { + let old_semaphore_value = self.semaphore.fetch_sub(1, Ordering::SeqCst); + + if old_semaphore_value > 0 { + // We acquired the semaphore. Done. + return; + } + + // We need to wait. + let validate = || true; + let before_sleep = || {}; + let timed_out = |_, _| {}; + unsafe { + super::park( + self.semaphore_addr(), + validate, + before_sleep, + timed_out, + DEFAULT_PARK_TOKEN, + None, + ); + } + } + + pub fn up(&self) { + let old_semaphore_value = self.semaphore.fetch_add(1, Ordering::SeqCst); + + // Check if anyone was waiting on the semaphore. If they were, then pass ownership to them. + if old_semaphore_value < 0 { + // We need to continue until we have actually unparked someone. It might be that + // the thread we want to pass ownership to has decremented the semaphore counter, + // but not yet parked. + loop { + match unsafe { + super::unpark_one(self.semaphore_addr(), |_| DEFAULT_UNPARK_TOKEN) + .unparked_threads + } { + 1 => break, + 0 => (), + i => panic!("Should not wake up {} threads", i), + } + } + } + } + + fn semaphore_addr(&self) -> usize { + &self.semaphore as *const _ as usize + } + } +} diff --git a/src/spinwait.rs b/src/spinwait.rs new file mode 100644 index 0000000..a57f4c1 --- /dev/null +++ b/src/spinwait.rs @@ -0,0 +1,74 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use crate::thread_parker; +use core::hint::spin_loop; + +// Wastes some CPU time for the given number of iterations, +// using a hint to indicate to the CPU that we are spinning. +#[inline] +fn cpu_relax(iterations: u32) { + for _ in 0..iterations { + spin_loop() + } +} + +/// A counter used to perform exponential backoff in spin loops. +#[derive(Default)] +pub struct SpinWait { + counter: u32, +} + +impl SpinWait { + /// Creates a new `SpinWait`. + #[inline] + pub fn new() -> Self { + Self::default() + } + + /// Resets a `SpinWait` to its initial state. + #[inline] + pub fn reset(&mut self) { + self.counter = 0; + } + + /// Spins until the sleep threshold has been reached. + /// + /// This function returns whether the sleep threshold has been reached, at + /// which point further spinning has diminishing returns and the thread + /// should be parked instead. + /// + /// The spin strategy will initially use a CPU-bound loop but will fall back + /// to yielding the CPU to the OS after a few iterations. + #[inline] + pub fn spin(&mut self) -> bool { + if self.counter >= 10 { + return false; + } + self.counter += 1; + if self.counter <= 3 { + cpu_relax(1 << self.counter); + } else { + thread_parker::thread_yield(); + } + true + } + + /// Spins without yielding the thread to the OS. + /// + /// Instead, the backoff is simply capped at a maximum value. This can be + /// used to improve throughput in `compare_exchange` loops that have high + /// contention. + #[inline] + pub fn spin_no_yield(&mut self) { + self.counter += 1; + if self.counter > 10 { + self.counter = 10; + } + cpu_relax(1 << self.counter); + } +} diff --git a/src/thread_parker/generic.rs b/src/thread_parker/generic.rs new file mode 100644 index 0000000..990bcb7 --- /dev/null +++ b/src/thread_parker/generic.rs @@ -0,0 +1,79 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +//! A simple spin lock based thread parker. Used on platforms without better +//! parking facilities available. + +use core::sync::atomic::{AtomicBool, Ordering}; +use core::hint::spin_loop; +use std::thread; +use std::time::Instant; + +// Helper type for putting a thread to sleep until some other thread wakes it up +pub struct ThreadParker { + parked: AtomicBool, +} + +impl super::ThreadParkerT for ThreadParker { + type UnparkHandle = UnparkHandle; + + const IS_CHEAP_TO_CONSTRUCT: bool = true; + + #[inline] + fn new() -> ThreadParker { + ThreadParker { + parked: AtomicBool::new(false), + } + } + + #[inline] + unsafe fn prepare_park(&self) { + self.parked.store(true, Ordering::Relaxed); + } + + #[inline] + unsafe fn timed_out(&self) -> bool { + self.parked.load(Ordering::Relaxed) != false + } + + #[inline] + unsafe fn park(&self) { + while self.parked.load(Ordering::Acquire) != false { + spin_loop(); + } + } + + #[inline] + unsafe fn park_until(&self, timeout: Instant) -> bool { + while self.parked.load(Ordering::Acquire) != false { + if Instant::now() >= timeout { + return false; + } + spin_loop(); + } + true + } + + #[inline] + unsafe fn unpark_lock(&self) -> UnparkHandle { + // We don't need to lock anything, just clear the state + self.parked.store(false, Ordering::Release); + UnparkHandle(()) + } +} + +pub struct UnparkHandle(()); + +impl super::UnparkHandleT for UnparkHandle { + #[inline] + unsafe fn unpark(self) {} +} + +#[inline] +pub fn thread_yield() { + thread::yield_now(); +} diff --git a/src/thread_parker/linux.rs b/src/thread_parker/linux.rs new file mode 100644 index 0000000..5d4e229 --- /dev/null +++ b/src/thread_parker/linux.rs @@ -0,0 +1,156 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use core::{ + ptr, + sync::atomic::{AtomicI32, Ordering}, +}; +use libc; +use std::thread; +use std::time::Instant; + +// x32 Linux uses a non-standard type for tv_nsec in timespec. +// See https://sourceware.org/bugzilla/show_bug.cgi?id=16437 +#[cfg(all(target_arch = "x86_64", target_pointer_width = "32"))] +#[allow(non_camel_case_types)] +type tv_nsec_t = i64; +#[cfg(not(all(target_arch = "x86_64", target_pointer_width = "32")))] +#[allow(non_camel_case_types)] +type tv_nsec_t = libc::c_long; + +fn errno() -> libc::c_int { + #[cfg(target_os = "linux")] + unsafe { + *libc::__errno_location() + } + #[cfg(target_os = "android")] + unsafe { + *libc::__errno() + } +} + +// Helper type for putting a thread to sleep until some other thread wakes it up +pub struct ThreadParker { + futex: AtomicI32, +} + +impl super::ThreadParkerT for ThreadParker { + type UnparkHandle = UnparkHandle; + + const IS_CHEAP_TO_CONSTRUCT: bool = true; + + #[inline] + fn new() -> ThreadParker { + ThreadParker { + futex: AtomicI32::new(0), + } + } + + #[inline] + unsafe fn prepare_park(&self) { + self.futex.store(1, Ordering::Relaxed); + } + + #[inline] + unsafe fn timed_out(&self) -> bool { + self.futex.load(Ordering::Relaxed) != 0 + } + + #[inline] + unsafe fn park(&self) { + while self.futex.load(Ordering::Acquire) != 0 { + self.futex_wait(None); + } + } + + #[inline] + unsafe fn park_until(&self, timeout: Instant) -> bool { + while self.futex.load(Ordering::Acquire) != 0 { + let now = Instant::now(); + if timeout <= now { + return false; + } + let diff = timeout - now; + if diff.as_secs() as libc::time_t as u64 != diff.as_secs() { + // Timeout overflowed, just sleep indefinitely + self.park(); + return true; + } + let ts = libc::timespec { + tv_sec: diff.as_secs() as libc::time_t, + tv_nsec: diff.subsec_nanos() as tv_nsec_t, + }; + self.futex_wait(Some(ts)); + } + true + } + + // Locks the parker to prevent the target thread from exiting. This is + // necessary to ensure that thread-local ThreadData objects remain valid. + // This should be called while holding the queue lock. + #[inline] + unsafe fn unpark_lock(&self) -> UnparkHandle { + // We don't need to lock anything, just clear the state + self.futex.store(0, Ordering::Release); + + UnparkHandle { futex: &self.futex } + } +} + +impl ThreadParker { + #[inline] + fn futex_wait(&self, ts: Option) { + let ts_ptr = ts + .as_ref() + .map(|ts_ref| ts_ref as *const _) + .unwrap_or(ptr::null()); + let r = unsafe { + libc::syscall( + libc::SYS_futex, + &self.futex, + libc::FUTEX_WAIT | libc::FUTEX_PRIVATE_FLAG, + 1, + ts_ptr, + ) + }; + debug_assert!(r == 0 || r == -1); + if r == -1 { + debug_assert!( + errno() == libc::EINTR + || errno() == libc::EAGAIN + || (ts.is_some() && errno() == libc::ETIMEDOUT) + ); + } + } +} + +pub struct UnparkHandle { + futex: *const AtomicI32, +} + +impl super::UnparkHandleT for UnparkHandle { + #[inline] + unsafe fn unpark(self) { + // The thread data may have been freed at this point, but it doesn't + // matter since the syscall will just return EFAULT in that case. + let r = libc::syscall( + libc::SYS_futex, + self.futex, + libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG, + 1, + ); + debug_assert!(r == 0 || r == 1 || r == -1); + if r == -1 { + debug_assert_eq!(errno(), libc::EFAULT); + } + } +} + +#[inline] +pub fn thread_yield() { + thread::yield_now(); +} diff --git a/src/thread_parker/mod.rs b/src/thread_parker/mod.rs new file mode 100644 index 0000000..fc162f4 --- /dev/null +++ b/src/thread_parker/mod.rs @@ -0,0 +1,85 @@ +use cfg_if::cfg_if; +use std::time::Instant; + +/// Trait for the platform thread parker implementation. +/// +/// All unsafe methods are unsafe because the Unix thread parker is based on +/// pthread mutexes and condvars. Those primitives must not be moved and used +/// from any other memory address than the one they were located at when they +/// were initialized. As such, it's UB to call any unsafe method on +/// `ThreadParkerT` if the implementing instance has moved since the last +/// call to any of the unsafe methods. +pub trait ThreadParkerT { + type UnparkHandle: UnparkHandleT; + + const IS_CHEAP_TO_CONSTRUCT: bool; + + fn new() -> Self; + + /// Prepares the parker. This should be called before adding it to the queue. + unsafe fn prepare_park(&self); + + /// Checks if the park timed out. This should be called while holding the + /// queue lock after park_until has returned false. + unsafe fn timed_out(&self) -> bool; + + /// Parks the thread until it is unparked. This should be called after it has + /// been added to the queue, after unlocking the queue. + unsafe fn park(&self); + + /// Parks the thread until it is unparked or the timeout is reached. This + /// should be called after it has been added to the queue, after unlocking + /// the queue. Returns true if we were unparked and false if we timed out. + unsafe fn park_until(&self, timeout: Instant) -> bool; + + /// Locks the parker to prevent the target thread from exiting. This is + /// necessary to ensure that thread-local ThreadData objects remain valid. + /// This should be called while holding the queue lock. + unsafe fn unpark_lock(&self) -> Self::UnparkHandle; +} + +/// Handle for a thread that is about to be unparked. We need to mark the thread +/// as unparked while holding the queue lock, but we delay the actual unparking +/// until after the queue lock is released. +pub trait UnparkHandleT { + /// Wakes up the parked thread. This should be called after the queue lock is + /// released to avoid blocking the queue for too long. + /// + /// This method is unsafe for the same reason as the unsafe methods in + /// `ThreadParkerT`. + unsafe fn unpark(self); +} + +cfg_if! { + if #[cfg(any(target_os = "linux", target_os = "android"))] { + #[path = "linux.rs"] + mod imp; + } else if #[cfg(unix)] { + #[path = "unix.rs"] + mod imp; + } else if #[cfg(windows)] { + #[path = "windows/mod.rs"] + mod imp; + } else if #[cfg(target_os = "redox")] { + #[path = "redox.rs"] + mod imp; + } else if #[cfg(all(target_env = "sgx", target_vendor = "fortanix"))] { + #[path = "sgx.rs"] + mod imp; + } else if #[cfg(all( + feature = "nightly", + target_family = "wasm", + target_feature = "atomics" + ))] { + #[path = "wasm_atomic.rs"] + mod imp; + } else if #[cfg(target_family = "wasm")] { + #[path = "wasm.rs"] + mod imp; + } else { + #[path = "generic.rs"] + mod imp; + } +} + +pub use self::imp::{thread_yield, ThreadParker, UnparkHandle}; diff --git a/src/thread_parker/redox.rs b/src/thread_parker/redox.rs new file mode 100644 index 0000000..fdf6bd1 --- /dev/null +++ b/src/thread_parker/redox.rs @@ -0,0 +1,139 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use core::{ + ptr, + sync::atomic::{AtomicI32, Ordering}, +}; +use std::thread; +use std::time::Instant; +use syscall::{ + call::futex, + data::TimeSpec, + error::{Error, EAGAIN, EFAULT, EINTR, ETIMEDOUT}, + flag::{FUTEX_WAIT, FUTEX_WAKE}, +}; + +const UNPARKED: i32 = 0; +const PARKED: i32 = 1; + +// Helper type for putting a thread to sleep until some other thread wakes it up +pub struct ThreadParker { + futex: AtomicI32, +} + +impl super::ThreadParkerT for ThreadParker { + type UnparkHandle = UnparkHandle; + + const IS_CHEAP_TO_CONSTRUCT: bool = true; + + #[inline] + fn new() -> ThreadParker { + ThreadParker { + futex: AtomicI32::new(UNPARKED), + } + } + + #[inline] + unsafe fn prepare_park(&self) { + self.futex.store(PARKED, Ordering::Relaxed); + } + + #[inline] + unsafe fn timed_out(&self) -> bool { + self.futex.load(Ordering::Relaxed) != UNPARKED + } + + #[inline] + unsafe fn park(&self) { + while self.futex.load(Ordering::Acquire) != UNPARKED { + self.futex_wait(None); + } + } + + #[inline] + unsafe fn park_until(&self, timeout: Instant) -> bool { + while self.futex.load(Ordering::Acquire) != UNPARKED { + let now = Instant::now(); + if timeout <= now { + return false; + } + let diff = timeout - now; + if diff.as_secs() > i64::max_value() as u64 { + // Timeout overflowed, just sleep indefinitely + self.park(); + return true; + } + let ts = TimeSpec { + tv_sec: diff.as_secs() as i64, + tv_nsec: diff.subsec_nanos() as i32, + }; + self.futex_wait(Some(ts)); + } + true + } + + #[inline] + unsafe fn unpark_lock(&self) -> UnparkHandle { + // We don't need to lock anything, just clear the state + self.futex.store(UNPARKED, Ordering::Release); + + UnparkHandle { futex: self.ptr() } + } +} + +impl ThreadParker { + #[inline] + fn futex_wait(&self, ts: Option) { + let ts_ptr = ts + .as_ref() + .map(|ts_ref| ts_ref as *const _) + .unwrap_or(ptr::null()); + let r = unsafe { + futex( + self.ptr(), + FUTEX_WAIT, + PARKED, + ts_ptr as usize, + ptr::null_mut(), + ) + }; + match r { + Ok(r) => debug_assert_eq!(r, 0), + Err(Error { errno }) => { + debug_assert!(errno == EINTR || errno == EAGAIN || errno == ETIMEDOUT); + } + } + } + + #[inline] + fn ptr(&self) -> *mut i32 { + &self.futex as *const AtomicI32 as *mut i32 + } +} + +pub struct UnparkHandle { + futex: *mut i32, +} + +impl super::UnparkHandleT for UnparkHandle { + #[inline] + unsafe fn unpark(self) { + // The thread data may have been freed at this point, but it doesn't + // matter since the syscall will just return EFAULT in that case. + let r = futex(self.futex, FUTEX_WAKE, PARKED, 0, ptr::null_mut()); + match r { + Ok(num_woken) => debug_assert!(num_woken == 0 || num_woken == 1), + Err(Error { errno }) => debug_assert_eq!(errno, EFAULT), + } + } +} + +#[inline] +pub fn thread_yield() { + thread::yield_now(); +} diff --git a/src/thread_parker/sgx.rs b/src/thread_parker/sgx.rs new file mode 100644 index 0000000..bc76fe7 --- /dev/null +++ b/src/thread_parker/sgx.rs @@ -0,0 +1,94 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use core::sync::atomic::{AtomicBool, Ordering}; +use std::time::Instant; +use std::{ + io, + os::fortanix_sgx::{ + thread::current as current_tcs, + usercalls::{ + self, + raw::{Tcs, EV_UNPARK, WAIT_INDEFINITE}, + }, + }, + thread, +}; + +// Helper type for putting a thread to sleep until some other thread wakes it up +pub struct ThreadParker { + parked: AtomicBool, + tcs: Tcs, +} + +impl super::ThreadParkerT for ThreadParker { + type UnparkHandle = UnparkHandle; + + const IS_CHEAP_TO_CONSTRUCT: bool = true; + + #[inline] + fn new() -> ThreadParker { + ThreadParker { + parked: AtomicBool::new(false), + tcs: current_tcs(), + } + } + + #[inline] + unsafe fn prepare_park(&self) { + self.parked.store(true, Ordering::Relaxed); + } + + #[inline] + unsafe fn timed_out(&self) -> bool { + self.parked.load(Ordering::Relaxed) + } + + #[inline] + unsafe fn park(&self) { + while self.parked.load(Ordering::Acquire) { + let result = usercalls::wait(EV_UNPARK, WAIT_INDEFINITE); + debug_assert_eq!(result.expect("wait returned error") & EV_UNPARK, EV_UNPARK); + } + } + + #[inline] + unsafe fn park_until(&self, _timeout: Instant) -> bool { + // FIXME: https://github.com/fortanix/rust-sgx/issues/31 + panic!("timeout not supported in SGX"); + } + + #[inline] + unsafe fn unpark_lock(&self) -> UnparkHandle { + // We don't need to lock anything, just clear the state + self.parked.store(false, Ordering::Release); + UnparkHandle(self.tcs) + } +} + +pub struct UnparkHandle(Tcs); + +impl super::UnparkHandleT for UnparkHandle { + #[inline] + unsafe fn unpark(self) { + let result = usercalls::send(EV_UNPARK, Some(self.0)); + if cfg!(debug_assertions) { + if let Err(error) = result { + // `InvalidInput` may be returned if the thread we send to has + // already been unparked and exited. + if error.kind() != io::ErrorKind::InvalidInput { + panic!("send returned an unexpected error: {:?}", error); + } + } + } + } +} + +#[inline] +pub fn thread_yield() { + thread::yield_now(); +} diff --git a/src/thread_parker/unix.rs b/src/thread_parker/unix.rs new file mode 100644 index 0000000..a75e176 --- /dev/null +++ b/src/thread_parker/unix.rs @@ -0,0 +1,242 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +#[cfg(any(target_os = "macos", target_os = "ios"))] +use core::ptr; +use core::{ + cell::{Cell, UnsafeCell}, + mem::MaybeUninit, +}; +use libc; +use std::time::Instant; +use std::{thread, time::Duration}; + +// x32 Linux uses a non-standard type for tv_nsec in timespec. +// See https://sourceware.org/bugzilla/show_bug.cgi?id=16437 +#[cfg(all(target_arch = "x86_64", target_pointer_width = "32"))] +#[allow(non_camel_case_types)] +type tv_nsec_t = i64; +#[cfg(not(all(target_arch = "x86_64", target_pointer_width = "32")))] +#[allow(non_camel_case_types)] +type tv_nsec_t = libc::c_long; + +// Helper type for putting a thread to sleep until some other thread wakes it up +pub struct ThreadParker { + should_park: Cell, + mutex: UnsafeCell, + condvar: UnsafeCell, + initialized: Cell, +} + +impl super::ThreadParkerT for ThreadParker { + type UnparkHandle = UnparkHandle; + + const IS_CHEAP_TO_CONSTRUCT: bool = false; + + #[inline] + fn new() -> ThreadParker { + ThreadParker { + should_park: Cell::new(false), + mutex: UnsafeCell::new(libc::PTHREAD_MUTEX_INITIALIZER), + condvar: UnsafeCell::new(libc::PTHREAD_COND_INITIALIZER), + initialized: Cell::new(false), + } + } + + #[inline] + unsafe fn prepare_park(&self) { + self.should_park.set(true); + if !self.initialized.get() { + self.init(); + self.initialized.set(true); + } + } + + #[inline] + unsafe fn timed_out(&self) -> bool { + // We need to grab the mutex here because another thread may be + // concurrently executing UnparkHandle::unpark, which is done without + // holding the queue lock. + let r = libc::pthread_mutex_lock(self.mutex.get()); + debug_assert_eq!(r, 0); + let should_park = self.should_park.get(); + let r = libc::pthread_mutex_unlock(self.mutex.get()); + debug_assert_eq!(r, 0); + should_park + } + + #[inline] + unsafe fn park(&self) { + let r = libc::pthread_mutex_lock(self.mutex.get()); + debug_assert_eq!(r, 0); + while self.should_park.get() { + let r = libc::pthread_cond_wait(self.condvar.get(), self.mutex.get()); + debug_assert_eq!(r, 0); + } + let r = libc::pthread_mutex_unlock(self.mutex.get()); + debug_assert_eq!(r, 0); + } + + #[inline] + unsafe fn park_until(&self, timeout: Instant) -> bool { + let r = libc::pthread_mutex_lock(self.mutex.get()); + debug_assert_eq!(r, 0); + while self.should_park.get() { + let now = Instant::now(); + if timeout <= now { + let r = libc::pthread_mutex_unlock(self.mutex.get()); + debug_assert_eq!(r, 0); + return false; + } + + if let Some(ts) = timeout_to_timespec(timeout - now) { + let r = libc::pthread_cond_timedwait(self.condvar.get(), self.mutex.get(), &ts); + if ts.tv_sec < 0 { + // On some systems, negative timeouts will return EINVAL. In + // that case we won't sleep and will just busy loop instead, + // which is the best we can do. + debug_assert!(r == 0 || r == libc::ETIMEDOUT || r == libc::EINVAL); + } else { + debug_assert!(r == 0 || r == libc::ETIMEDOUT); + } + } else { + // Timeout calculation overflowed, just sleep indefinitely + let r = libc::pthread_cond_wait(self.condvar.get(), self.mutex.get()); + debug_assert_eq!(r, 0); + } + } + let r = libc::pthread_mutex_unlock(self.mutex.get()); + debug_assert_eq!(r, 0); + true + } + + #[inline] + unsafe fn unpark_lock(&self) -> UnparkHandle { + let r = libc::pthread_mutex_lock(self.mutex.get()); + debug_assert_eq!(r, 0); + + UnparkHandle { + thread_parker: self, + } + } +} + +impl ThreadParker { + /// Initializes the condvar to use CLOCK_MONOTONIC instead of CLOCK_REALTIME. + #[cfg(any(target_os = "macos", target_os = "ios", target_os = "android", target_os = "espidf"))] + #[inline] + unsafe fn init(&self) {} + + /// Initializes the condvar to use CLOCK_MONOTONIC instead of CLOCK_REALTIME. + #[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "android", target_os = "espidf")))] + #[inline] + unsafe fn init(&self) { + let mut attr = MaybeUninit::::uninit(); + let r = libc::pthread_condattr_init(attr.as_mut_ptr()); + debug_assert_eq!(r, 0); + let r = libc::pthread_condattr_setclock(attr.as_mut_ptr(), libc::CLOCK_MONOTONIC); + debug_assert_eq!(r, 0); + let r = libc::pthread_cond_init(self.condvar.get(), attr.as_ptr()); + debug_assert_eq!(r, 0); + let r = libc::pthread_condattr_destroy(attr.as_mut_ptr()); + debug_assert_eq!(r, 0); + } +} + +impl Drop for ThreadParker { + #[inline] + fn drop(&mut self) { + // On DragonFly pthread_mutex_destroy() returns EINVAL if called on a + // mutex that was just initialized with libc::PTHREAD_MUTEX_INITIALIZER. + // Once it is used (locked/unlocked) or pthread_mutex_init() is called, + // this behaviour no longer occurs. The same applies to condvars. + unsafe { + let r = libc::pthread_mutex_destroy(self.mutex.get()); + debug_assert!(r == 0 || r == libc::EINVAL); + let r = libc::pthread_cond_destroy(self.condvar.get()); + debug_assert!(r == 0 || r == libc::EINVAL); + } + } +} + +pub struct UnparkHandle { + thread_parker: *const ThreadParker, +} + +impl super::UnparkHandleT for UnparkHandle { + #[inline] + unsafe fn unpark(self) { + (*self.thread_parker).should_park.set(false); + + // We notify while holding the lock here to avoid races with the target + // thread. In particular, the thread could exit after we unlock the + // mutex, which would make the condvar access invalid memory. + let r = libc::pthread_cond_signal((*self.thread_parker).condvar.get()); + debug_assert_eq!(r, 0); + let r = libc::pthread_mutex_unlock((*self.thread_parker).mutex.get()); + debug_assert_eq!(r, 0); + } +} + +// Returns the current time on the clock used by pthread_cond_t as a timespec. +#[cfg(any(target_os = "macos", target_os = "ios"))] +#[inline] +fn timespec_now() -> libc::timespec { + let mut now = MaybeUninit::::uninit(); + let r = unsafe { libc::gettimeofday(now.as_mut_ptr(), ptr::null_mut()) }; + debug_assert_eq!(r, 0); + // SAFETY: We know `libc::gettimeofday` has initialized the value. + let now = unsafe { now.assume_init() }; + libc::timespec { + tv_sec: now.tv_sec, + tv_nsec: now.tv_usec as tv_nsec_t * 1000, + } +} +#[cfg(not(any(target_os = "macos", target_os = "ios")))] +#[inline] +fn timespec_now() -> libc::timespec { + let mut now = MaybeUninit::::uninit(); + let clock = if cfg!(target_os = "android") { + // Android doesn't support pthread_condattr_setclock, so we need to + // specify the timeout in CLOCK_REALTIME. + libc::CLOCK_REALTIME + } else { + libc::CLOCK_MONOTONIC + }; + let r = unsafe { libc::clock_gettime(clock, now.as_mut_ptr()) }; + debug_assert_eq!(r, 0); + // SAFETY: We know `libc::clock_gettime` has initialized the value. + unsafe { now.assume_init() } +} + +// Converts a relative timeout into an absolute timeout in the clock used by +// pthread_cond_t. +#[inline] +fn timeout_to_timespec(timeout: Duration) -> Option { + // Handle overflows early on + if timeout.as_secs() > libc::time_t::max_value() as u64 { + return None; + } + + let now = timespec_now(); + let mut nsec = now.tv_nsec + timeout.subsec_nanos() as tv_nsec_t; + let mut sec = now.tv_sec.checked_add(timeout.as_secs() as libc::time_t); + if nsec >= 1_000_000_000 { + nsec -= 1_000_000_000; + sec = sec.and_then(|sec| sec.checked_add(1)); + } + + sec.map(|sec| libc::timespec { + tv_nsec: nsec, + tv_sec: sec, + }) +} + +#[inline] +pub fn thread_yield() { + thread::yield_now(); +} diff --git a/src/thread_parker/wasm.rs b/src/thread_parker/wasm.rs new file mode 100644 index 0000000..657425f --- /dev/null +++ b/src/thread_parker/wasm.rs @@ -0,0 +1,54 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +//! The wasm platform can't park when atomic support is not available. +//! So this ThreadParker just panics on any attempt to park. + +use std::thread; +use std::time::Instant; + +pub struct ThreadParker(()); + +impl super::ThreadParkerT for ThreadParker { + type UnparkHandle = UnparkHandle; + + const IS_CHEAP_TO_CONSTRUCT: bool = true; + + fn new() -> ThreadParker { + ThreadParker(()) + } + + unsafe fn prepare_park(&self) { + panic!("Parking not supported on this platform"); + } + + unsafe fn timed_out(&self) -> bool { + panic!("Parking not supported on this platform"); + } + + unsafe fn park(&self) { + panic!("Parking not supported on this platform"); + } + + unsafe fn park_until(&self, _timeout: Instant) -> bool { + panic!("Parking not supported on this platform"); + } + + unsafe fn unpark_lock(&self) -> UnparkHandle { + panic!("Parking not supported on this platform"); + } +} + +pub struct UnparkHandle(()); + +impl super::UnparkHandleT for UnparkHandle { + unsafe fn unpark(self) {} +} + +pub fn thread_yield() { + thread::yield_now(); +} diff --git a/src/thread_parker/wasm_atomic.rs b/src/thread_parker/wasm_atomic.rs new file mode 100644 index 0000000..f332aff --- /dev/null +++ b/src/thread_parker/wasm_atomic.rs @@ -0,0 +1,97 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use core::{ + arch::wasm32, + sync::atomic::{AtomicI32, Ordering}, +}; +use std::time::{Duration, Instant}; +use std::{convert::TryFrom, thread}; + +// Helper type for putting a thread to sleep until some other thread wakes it up +pub struct ThreadParker { + parked: AtomicI32, +} + +const UNPARKED: i32 = 0; +const PARKED: i32 = 1; + +impl super::ThreadParkerT for ThreadParker { + type UnparkHandle = UnparkHandle; + + const IS_CHEAP_TO_CONSTRUCT: bool = true; + + #[inline] + fn new() -> ThreadParker { + ThreadParker { + parked: AtomicI32::new(UNPARKED), + } + } + + #[inline] + unsafe fn prepare_park(&self) { + self.parked.store(PARKED, Ordering::Relaxed); + } + + #[inline] + unsafe fn timed_out(&self) -> bool { + self.parked.load(Ordering::Relaxed) == PARKED + } + + #[inline] + unsafe fn park(&self) { + while self.parked.load(Ordering::Acquire) == PARKED { + let r = wasm32::memory_atomic_wait32(self.ptr(), PARKED, -1); + // we should have either woken up (0) or got a not-equal due to a + // race (1). We should never time out (2) + debug_assert!(r == 0 || r == 1); + } + } + + #[inline] + unsafe fn park_until(&self, timeout: Instant) -> bool { + while self.parked.load(Ordering::Acquire) == PARKED { + if let Some(left) = timeout.checked_duration_since(Instant::now()) { + let nanos_left = i64::try_from(left.as_nanos()).unwrap_or(i64::max_value()); + let r = wasm32::memory_atomic_wait32(self.ptr(), PARKED, nanos_left); + debug_assert!(r == 0 || r == 1 || r == 2); + } else { + return false; + } + } + true + } + + #[inline] + unsafe fn unpark_lock(&self) -> UnparkHandle { + // We don't need to lock anything, just clear the state + self.parked.store(UNPARKED, Ordering::Release); + UnparkHandle(self.ptr()) + } +} + +impl ThreadParker { + #[inline] + fn ptr(&self) -> *mut i32 { + &self.parked as *const AtomicI32 as *mut i32 + } +} + +pub struct UnparkHandle(*mut i32); + +impl super::UnparkHandleT for UnparkHandle { + #[inline] + unsafe fn unpark(self) { + let num_notified = wasm32::memory_atomic_notify(self.0 as *mut i32, 1); + debug_assert!(num_notified == 0 || num_notified == 1); + } +} + +#[inline] +pub fn thread_yield() { + thread::yield_now(); +} diff --git a/src/thread_parker/windows/keyed_event.rs b/src/thread_parker/windows/keyed_event.rs new file mode 100644 index 0000000..bbe45a4 --- /dev/null +++ b/src/thread_parker/windows/keyed_event.rs @@ -0,0 +1,202 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use core::{ + ffi, + mem::{self, MaybeUninit}, + ptr, +}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::Instant; + +use windows_sys::Win32::{ + Foundation::{CloseHandle, BOOLEAN, HANDLE, NTSTATUS, STATUS_SUCCESS, STATUS_TIMEOUT}, + System::{ + LibraryLoader::{GetModuleHandleA, GetProcAddress}, + SystemServices::{GENERIC_READ, GENERIC_WRITE}, + }, +}; + +const STATE_UNPARKED: usize = 0; +const STATE_PARKED: usize = 1; +const STATE_TIMED_OUT: usize = 2; + +#[allow(non_snake_case)] +pub struct KeyedEvent { + handle: HANDLE, + NtReleaseKeyedEvent: extern "system" fn( + EventHandle: HANDLE, + Key: *mut ffi::c_void, + Alertable: BOOLEAN, + Timeout: *mut i64, + ) -> NTSTATUS, + NtWaitForKeyedEvent: extern "system" fn( + EventHandle: HANDLE, + Key: *mut ffi::c_void, + Alertable: BOOLEAN, + Timeout: *mut i64, + ) -> NTSTATUS, +} + +impl KeyedEvent { + #[inline] + unsafe fn wait_for(&self, key: *mut ffi::c_void, timeout: *mut i64) -> NTSTATUS { + (self.NtWaitForKeyedEvent)(self.handle, key, false.into(), timeout) + } + + #[inline] + unsafe fn release(&self, key: *mut ffi::c_void) -> NTSTATUS { + (self.NtReleaseKeyedEvent)(self.handle, key, false.into(), ptr::null_mut()) + } + + #[allow(non_snake_case)] + pub fn create() -> Option { + unsafe { + let ntdll = GetModuleHandleA(b"ntdll.dll\0".as_ptr()); + if ntdll == 0 { + return None; + } + + let NtCreateKeyedEvent = + GetProcAddress(ntdll, b"NtCreateKeyedEvent\0".as_ptr())?; + let NtReleaseKeyedEvent = + GetProcAddress(ntdll, b"NtReleaseKeyedEvent\0".as_ptr())?; + let NtWaitForKeyedEvent = + GetProcAddress(ntdll, b"NtWaitForKeyedEvent\0".as_ptr())?; + + let NtCreateKeyedEvent: extern "system" fn( + KeyedEventHandle: *mut HANDLE, + DesiredAccess: u32, + ObjectAttributes: *mut ffi::c_void, + Flags: u32, + ) -> NTSTATUS = mem::transmute(NtCreateKeyedEvent); + let mut handle = MaybeUninit::uninit(); + let status = NtCreateKeyedEvent( + handle.as_mut_ptr(), + GENERIC_READ | GENERIC_WRITE, + ptr::null_mut(), + 0, + ); + if status != STATUS_SUCCESS { + return None; + } + + Some(KeyedEvent { + handle: handle.assume_init(), + NtReleaseKeyedEvent: mem::transmute(NtReleaseKeyedEvent), + NtWaitForKeyedEvent: mem::transmute(NtWaitForKeyedEvent), + }) + } + } + + #[inline] + pub fn prepare_park(&'static self, key: &AtomicUsize) { + key.store(STATE_PARKED, Ordering::Relaxed); + } + + #[inline] + pub fn timed_out(&'static self, key: &AtomicUsize) -> bool { + key.load(Ordering::Relaxed) == STATE_TIMED_OUT + } + + #[inline] + pub unsafe fn park(&'static self, key: &AtomicUsize) { + let status = self.wait_for(key as *const _ as *mut ffi::c_void, ptr::null_mut()); + debug_assert_eq!(status, STATUS_SUCCESS); + } + + #[inline] + pub unsafe fn park_until(&'static self, key: &AtomicUsize, timeout: Instant) -> bool { + let now = Instant::now(); + if timeout <= now { + // If another thread unparked us, we need to call + // NtWaitForKeyedEvent otherwise that thread will stay stuck at + // NtReleaseKeyedEvent. + if key.swap(STATE_TIMED_OUT, Ordering::Relaxed) == STATE_UNPARKED { + self.park(key); + return true; + } + return false; + } + + // NT uses a timeout in units of 100ns. We use a negative value to + // indicate a relative timeout based on a monotonic clock. + let diff = timeout - now; + let value = (diff.as_secs() as i64) + .checked_mul(-10000000) + .and_then(|x| x.checked_sub((diff.subsec_nanos() as i64 + 99) / 100)); + + let mut nt_timeout = match value { + Some(x) => x, + None => { + // Timeout overflowed, just sleep indefinitely + self.park(key); + return true; + } + }; + + let status = self.wait_for(key as *const _ as *mut ffi::c_void, &mut nt_timeout); + if status == STATUS_SUCCESS { + return true; + } + debug_assert_eq!(status, STATUS_TIMEOUT); + + // If another thread unparked us, we need to call NtWaitForKeyedEvent + // otherwise that thread will stay stuck at NtReleaseKeyedEvent. + if key.swap(STATE_TIMED_OUT, Ordering::Relaxed) == STATE_UNPARKED { + self.park(key); + return true; + } + false + } + + #[inline] + pub unsafe fn unpark_lock(&'static self, key: &AtomicUsize) -> UnparkHandle { + // If the state was STATE_PARKED then we need to wake up the thread + if key.swap(STATE_UNPARKED, Ordering::Relaxed) == STATE_PARKED { + UnparkHandle { + key: key, + keyed_event: self, + } + } else { + UnparkHandle { + key: ptr::null(), + keyed_event: self, + } + } + } +} + +impl Drop for KeyedEvent { + #[inline] + fn drop(&mut self) { + unsafe { + let ok = CloseHandle(self.handle); + debug_assert_eq!(ok, true.into()); + } + } +} + +// Handle for a thread that is about to be unparked. We need to mark the thread +// as unparked while holding the queue lock, but we delay the actual unparking +// until after the queue lock is released. +pub struct UnparkHandle { + key: *const AtomicUsize, + keyed_event: &'static KeyedEvent, +} + +impl UnparkHandle { + // Wakes up the parked thread. This should be called after the queue lock is + // released to avoid blocking the queue for too long. + #[inline] + pub unsafe fn unpark(self) { + if !self.key.is_null() { + let status = self.keyed_event.release(self.key as *mut ffi::c_void); + debug_assert_eq!(status, STATUS_SUCCESS); + } + } +} diff --git a/src/thread_parker/windows/mod.rs b/src/thread_parker/windows/mod.rs new file mode 100644 index 0000000..1f5ed23 --- /dev/null +++ b/src/thread_parker/windows/mod.rs @@ -0,0 +1,188 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use core::{ + ptr, + sync::atomic::{AtomicPtr, AtomicUsize, Ordering}, +}; +use std::time::Instant; + +mod keyed_event; +mod waitaddress; + +enum Backend { + KeyedEvent(keyed_event::KeyedEvent), + WaitAddress(waitaddress::WaitAddress), +} + +static BACKEND: AtomicPtr = AtomicPtr::new(ptr::null_mut()); + +impl Backend { + #[inline] + fn get() -> &'static Backend { + // Fast path: use the existing object + let backend_ptr = BACKEND.load(Ordering::Acquire); + if !backend_ptr.is_null() { + return unsafe { &*backend_ptr }; + }; + + Backend::create() + } + + #[cold] + fn create() -> &'static Backend { + // Try to create a new Backend + let backend; + if let Some(waitaddress) = waitaddress::WaitAddress::create() { + backend = Backend::WaitAddress(waitaddress); + } else if let Some(keyed_event) = keyed_event::KeyedEvent::create() { + backend = Backend::KeyedEvent(keyed_event); + } else { + panic!( + "parking_lot requires either NT Keyed Events (WinXP+) or \ + WaitOnAddress/WakeByAddress (Win8+)" + ); + } + + // Try to set our new Backend as the global one + let backend_ptr = Box::into_raw(Box::new(backend)); + match BACKEND.compare_exchange( + ptr::null_mut(), + backend_ptr, + Ordering::Release, + Ordering::Relaxed, + ) { + Ok(_) => unsafe { &*backend_ptr }, + Err(global_backend_ptr) => { + unsafe { + // We lost the race, free our object and return the global one + Box::from_raw(backend_ptr); + &*global_backend_ptr + } + } + } + } +} + +// Helper type for putting a thread to sleep until some other thread wakes it up +pub struct ThreadParker { + key: AtomicUsize, + backend: &'static Backend, +} + +impl ThreadParker { + pub const IS_CHEAP_TO_CONSTRUCT: bool = true; + + #[inline] + pub fn new() -> ThreadParker { + // Initialize the backend here to ensure we don't get any panics + // later on, which could leave synchronization primitives in a broken + // state. + ThreadParker { + key: AtomicUsize::new(0), + backend: Backend::get(), + } + } + + // Prepares the parker. This should be called before adding it to the queue. + #[inline] + pub fn prepare_park(&self) { + match *self.backend { + Backend::KeyedEvent(ref x) => x.prepare_park(&self.key), + Backend::WaitAddress(ref x) => x.prepare_park(&self.key), + } + } + + // Checks if the park timed out. This should be called while holding the + // queue lock after park_until has returned false. + #[inline] + pub fn timed_out(&self) -> bool { + match *self.backend { + Backend::KeyedEvent(ref x) => x.timed_out(&self.key), + Backend::WaitAddress(ref x) => x.timed_out(&self.key), + } + } + + // Parks the thread until it is unparked. This should be called after it has + // been added to the queue, after unlocking the queue. + #[inline] + pub unsafe fn park(&self) { + match *self.backend { + Backend::KeyedEvent(ref x) => x.park(&self.key), + Backend::WaitAddress(ref x) => x.park(&self.key), + } + } + + // Parks the thread until it is unparked or the timeout is reached. This + // should be called after it has been added to the queue, after unlocking + // the queue. Returns true if we were unparked and false if we timed out. + #[inline] + pub unsafe fn park_until(&self, timeout: Instant) -> bool { + match *self.backend { + Backend::KeyedEvent(ref x) => x.park_until(&self.key, timeout), + Backend::WaitAddress(ref x) => x.park_until(&self.key, timeout), + } + } + + // Locks the parker to prevent the target thread from exiting. This is + // necessary to ensure that thread-local ThreadData objects remain valid. + // This should be called while holding the queue lock. + #[inline] + pub unsafe fn unpark_lock(&self) -> UnparkHandle { + match *self.backend { + Backend::KeyedEvent(ref x) => UnparkHandle::KeyedEvent(x.unpark_lock(&self.key)), + Backend::WaitAddress(ref x) => UnparkHandle::WaitAddress(x.unpark_lock(&self.key)), + } + } +} + +// Handle for a thread that is about to be unparked. We need to mark the thread +// as unparked while holding the queue lock, but we delay the actual unparking +// until after the queue lock is released. +pub enum UnparkHandle { + KeyedEvent(keyed_event::UnparkHandle), + WaitAddress(waitaddress::UnparkHandle), +} + +impl UnparkHandle { + // Wakes up the parked thread. This should be called after the queue lock is + // released to avoid blocking the queue for too long. + #[inline] + pub unsafe fn unpark(self) { + match self { + UnparkHandle::KeyedEvent(x) => x.unpark(), + UnparkHandle::WaitAddress(x) => x.unpark(), + } + } +} + +// Yields the rest of the current timeslice to the OS +#[inline] +pub fn thread_yield() { + // Note that this is manually defined here rather than using the definition + // through `winapi`. The `winapi` definition comes from the `synchapi` + // header which enables the "synchronization.lib" library. It turns out, + // however that `Sleep` comes from `kernel32.dll` so this activation isn't + // necessary. + // + // This was originally identified in rust-lang/rust where on MinGW the + // libsynchronization.a library pulls in a dependency on a newer DLL not + // present in older versions of Windows. (see rust-lang/rust#49438) + // + // This is a bit of a hack for now and ideally we'd fix MinGW's own import + // libraries, but that'll probably take a lot longer than patching this here + // and avoiding the `synchapi` feature entirely. + extern "system" { + fn Sleep(a: u32); + } + unsafe { + // We don't use SwitchToThread here because it doesn't consider all + // threads in the system and the thread we are waiting for may not get + // selected. + Sleep(0); + } +} diff --git a/src/thread_parker/windows/waitaddress.rs b/src/thread_parker/windows/waitaddress.rs new file mode 100644 index 0000000..dde0db7 --- /dev/null +++ b/src/thread_parker/windows/waitaddress.rs @@ -0,0 +1,138 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use core::{ + mem, + sync::atomic::{AtomicUsize, Ordering}, +}; +use std::{ffi, time::Instant}; +use windows_sys::Win32::{ + Foundation::{GetLastError, BOOL, ERROR_TIMEOUT}, + System::{ + LibraryLoader::{GetModuleHandleA, GetProcAddress}, + WindowsProgramming::INFINITE, + }, +}; + +#[allow(non_snake_case)] +pub struct WaitAddress { + WaitOnAddress: extern "system" fn( + Address: *mut ffi::c_void, + CompareAddress: *mut ffi::c_void, + AddressSize: usize, + dwMilliseconds: u32, + ) -> BOOL, + WakeByAddressSingle: extern "system" fn(Address: *mut ffi::c_void), +} + +impl WaitAddress { + #[allow(non_snake_case)] + pub fn create() -> Option { + unsafe { + // MSDN claims that that WaitOnAddress and WakeByAddressSingle are + // located in kernel32.dll, but they are lying... + let synch_dll = + GetModuleHandleA(b"api-ms-win-core-synch-l1-2-0.dll\0".as_ptr()); + if synch_dll == 0 { + return None; + } + + let WaitOnAddress = GetProcAddress(synch_dll, b"WaitOnAddress\0".as_ptr())?; + let WakeByAddressSingle = + GetProcAddress(synch_dll, b"WakeByAddressSingle\0".as_ptr())?; + + Some(WaitAddress { + WaitOnAddress: mem::transmute(WaitOnAddress), + WakeByAddressSingle: mem::transmute(WakeByAddressSingle), + }) + } + } + + #[inline] + pub fn prepare_park(&'static self, key: &AtomicUsize) { + key.store(1, Ordering::Relaxed); + } + + #[inline] + pub fn timed_out(&'static self, key: &AtomicUsize) -> bool { + key.load(Ordering::Relaxed) != 0 + } + + #[inline] + pub fn park(&'static self, key: &AtomicUsize) { + while key.load(Ordering::Acquire) != 0 { + let r = self.wait_on_address(key, INFINITE); + debug_assert!(r == true.into()); + } + } + + #[inline] + pub fn park_until(&'static self, key: &AtomicUsize, timeout: Instant) -> bool { + while key.load(Ordering::Acquire) != 0 { + let now = Instant::now(); + if timeout <= now { + return false; + } + let diff = timeout - now; + let timeout = diff + .as_secs() + .checked_mul(1000) + .and_then(|x| x.checked_add((diff.subsec_nanos() as u64 + 999999) / 1000000)) + .map(|ms| { + if ms > std::u32::MAX as u64 { + INFINITE + } else { + ms as u32 + } + }) + .unwrap_or(INFINITE); + if self.wait_on_address(key, timeout) == false.into() { + debug_assert_eq!(unsafe { GetLastError() }, ERROR_TIMEOUT); + } + } + true + } + + #[inline] + pub fn unpark_lock(&'static self, key: &AtomicUsize) -> UnparkHandle { + // We don't need to lock anything, just clear the state + key.store(0, Ordering::Release); + + UnparkHandle { + key: key, + waitaddress: self, + } + } + + #[inline] + fn wait_on_address(&'static self, key: &AtomicUsize, timeout: u32) -> BOOL { + let cmp = 1usize; + (self.WaitOnAddress)( + key as *const _ as *mut ffi::c_void, + &cmp as *const _ as *mut ffi::c_void, + mem::size_of::(), + timeout, + ) + } +} + +// Handle for a thread that is about to be unparked. We need to mark the thread +// as unparked while holding the queue lock, but we delay the actual unparking +// until after the queue lock is released. +pub struct UnparkHandle { + key: *const AtomicUsize, + waitaddress: &'static WaitAddress, +} + +impl UnparkHandle { + // Wakes up the parked thread. This should be called after the queue lock is + // released to avoid blocking the queue for too long. + #[inline] + pub fn unpark(self) { + (self.waitaddress.WakeByAddressSingle)(self.key as *mut ffi::c_void); + } +} diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..d7aaa87 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,31 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +// Option::unchecked_unwrap +pub trait UncheckedOptionExt { + unsafe fn unchecked_unwrap(self) -> T; +} + +impl UncheckedOptionExt for Option { + #[inline] + unsafe fn unchecked_unwrap(self) -> T { + match self { + Some(x) => x, + None => unreachable(), + } + } +} + +// hint::unreachable_unchecked() in release mode +#[inline] +unsafe fn unreachable() -> ! { + if cfg!(debug_assertions) { + unreachable!(); + } else { + core::hint::unreachable_unchecked() + } +} diff --git a/src/word_lock.rs b/src/word_lock.rs new file mode 100644 index 0000000..1109401 --- /dev/null +++ b/src/word_lock.rs @@ -0,0 +1,327 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use crate::spinwait::SpinWait; +use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT}; +use core::{ + cell::Cell, + mem, ptr, + sync::atomic::{fence, AtomicUsize, Ordering}, +}; + +struct ThreadData { + parker: ThreadParker, + + // Linked list of threads in the queue. The queue is split into two parts: + // the processed part and the unprocessed part. When new nodes are added to + // the list, they only have the next pointer set, and queue_tail is null. + // + // Nodes are processed with the queue lock held, which consists of setting + // the prev pointer for each node and setting the queue_tail pointer on the + // first processed node of the list. + // + // This setup allows nodes to be added to the queue without a lock, while + // still allowing O(1) removal of nodes from the processed part of the list. + // The only cost is the O(n) processing, but this only needs to be done + // once for each node, and therefore isn't too expensive. + queue_tail: Cell<*const ThreadData>, + prev: Cell<*const ThreadData>, + next: Cell<*const ThreadData>, +} + +impl ThreadData { + #[inline] + fn new() -> ThreadData { + assert!(mem::align_of::() > !QUEUE_MASK); + ThreadData { + parker: ThreadParker::new(), + queue_tail: Cell::new(ptr::null()), + prev: Cell::new(ptr::null()), + next: Cell::new(ptr::null()), + } + } +} + +// Invokes the given closure with a reference to the current thread `ThreadData`. +#[inline] +fn with_thread_data(f: impl FnOnce(&ThreadData) -> T) -> T { + let mut thread_data_ptr = ptr::null(); + // If ThreadData is expensive to construct, then we want to use a cached + // version in thread-local storage if possible. + if !ThreadParker::IS_CHEAP_TO_CONSTRUCT { + thread_local!(static THREAD_DATA: ThreadData = ThreadData::new()); + if let Ok(tls_thread_data) = THREAD_DATA.try_with(|x| x as *const ThreadData) { + thread_data_ptr = tls_thread_data; + } + } + // Otherwise just create a ThreadData on the stack + let mut thread_data_storage = None; + if thread_data_ptr.is_null() { + thread_data_ptr = thread_data_storage.get_or_insert_with(ThreadData::new); + } + + f(unsafe { &*thread_data_ptr }) +} + +const LOCKED_BIT: usize = 1; +const QUEUE_LOCKED_BIT: usize = 2; +const QUEUE_MASK: usize = !3; + +// Word-sized lock that is used to implement the parking_lot API. Since this +// can't use parking_lot, it instead manages its own queue of waiting threads. +pub struct WordLock { + state: AtomicUsize, +} + +impl WordLock { + /// Returns a new, unlocked, WordLock. + pub const fn new() -> Self { + WordLock { + state: AtomicUsize::new(0), + } + } + + #[inline] + pub fn lock(&self) { + if self + .state + .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + return; + } + self.lock_slow(); + } + + /// Must not be called on an already unlocked `WordLock`! + #[inline] + pub unsafe fn unlock(&self) { + let state = self.state.fetch_sub(LOCKED_BIT, Ordering::Release); + if state.is_queue_locked() || state.queue_head().is_null() { + return; + } + self.unlock_slow(); + } + + #[cold] + fn lock_slow(&self) { + let mut spinwait = SpinWait::new(); + let mut state = self.state.load(Ordering::Relaxed); + loop { + // Grab the lock if it isn't locked, even if there is a queue on it + if !state.is_locked() { + match self.state.compare_exchange_weak( + state, + state | LOCKED_BIT, + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => return, + Err(x) => state = x, + } + continue; + } + + // If there is no queue, try spinning a few times + if state.queue_head().is_null() && spinwait.spin() { + state = self.state.load(Ordering::Relaxed); + continue; + } + + // Get our thread data and prepare it for parking + state = with_thread_data(|thread_data| { + // The pthread implementation is still unsafe, so we need to surround `prepare_park` + // with `unsafe {}`. + #[allow(unused_unsafe)] + unsafe { + thread_data.parker.prepare_park(); + } + + // Add our thread to the front of the queue + let queue_head = state.queue_head(); + if queue_head.is_null() { + thread_data.queue_tail.set(thread_data); + thread_data.prev.set(ptr::null()); + } else { + thread_data.queue_tail.set(ptr::null()); + thread_data.prev.set(ptr::null()); + thread_data.next.set(queue_head); + } + if let Err(x) = self.state.compare_exchange_weak( + state, + state.with_queue_head(thread_data), + Ordering::AcqRel, + Ordering::Relaxed, + ) { + return x; + } + + // Sleep until we are woken up by an unlock + // Ignoring unused unsafe, since it's only a few platforms where this is unsafe. + #[allow(unused_unsafe)] + unsafe { + thread_data.parker.park(); + } + + // Loop back and try locking again + spinwait.reset(); + self.state.load(Ordering::Relaxed) + }); + } + } + + #[cold] + fn unlock_slow(&self) { + let mut state = self.state.load(Ordering::Relaxed); + loop { + // We just unlocked the WordLock. Just check if there is a thread + // to wake up. If the queue is locked then another thread is already + // taking care of waking up a thread. + if state.is_queue_locked() || state.queue_head().is_null() { + return; + } + + // Try to grab the queue lock + match self.state.compare_exchange_weak( + state, + state | QUEUE_LOCKED_BIT, + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(x) => state = x, + } + } + + // Now we have the queue lock and the queue is non-empty + 'outer: loop { + // First, we need to fill in the prev pointers for any newly added + // threads. We do this until we reach a node that we previously + // processed, which has a non-null queue_tail pointer. + let queue_head = state.queue_head(); + let mut queue_tail; + let mut current = queue_head; + loop { + queue_tail = unsafe { (*current).queue_tail.get() }; + if !queue_tail.is_null() { + break; + } + unsafe { + let next = (*current).next.get(); + (*next).prev.set(current); + current = next; + } + } + + // Set queue_tail on the queue head to indicate that the whole list + // has prev pointers set correctly. + unsafe { + (*queue_head).queue_tail.set(queue_tail); + } + + // If the WordLock is locked, then there is no point waking up a + // thread now. Instead we let the next unlocker take care of waking + // up a thread. + if state.is_locked() { + match self.state.compare_exchange_weak( + state, + state & !QUEUE_LOCKED_BIT, + Ordering::Release, + Ordering::Relaxed, + ) { + Ok(_) => return, + Err(x) => state = x, + } + + // Need an acquire fence before reading the new queue + fence_acquire(&self.state); + continue; + } + + // Remove the last thread from the queue and unlock the queue + let new_tail = unsafe { (*queue_tail).prev.get() }; + if new_tail.is_null() { + loop { + match self.state.compare_exchange_weak( + state, + state & LOCKED_BIT, + Ordering::Release, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(x) => state = x, + } + + // If the compare_exchange failed because a new thread was + // added to the queue then we need to re-scan the queue to + // find the previous element. + if state.queue_head().is_null() { + continue; + } else { + // Need an acquire fence before reading the new queue + fence_acquire(&self.state); + continue 'outer; + } + } + } else { + unsafe { + (*queue_head).queue_tail.set(new_tail); + } + self.state.fetch_and(!QUEUE_LOCKED_BIT, Ordering::Release); + } + + // Finally, wake up the thread we removed from the queue. Note that + // we don't need to worry about any races here since the thread is + // guaranteed to be sleeping right now and we are the only one who + // can wake it up. + unsafe { + (*queue_tail).parker.unpark_lock().unpark(); + } + break; + } + } +} + +// Thread-Sanitizer only has partial fence support, so when running under it, we +// try and avoid false positives by using a discarded acquire load instead. +#[inline] +fn fence_acquire(a: &AtomicUsize) { + if cfg!(tsan_enabled) { + let _ = a.load(Ordering::Acquire); + } else { + fence(Ordering::Acquire); + } +} + +trait LockState { + fn is_locked(self) -> bool; + fn is_queue_locked(self) -> bool; + fn queue_head(self) -> *const ThreadData; + fn with_queue_head(self, thread_data: *const ThreadData) -> Self; +} + +impl LockState for usize { + #[inline] + fn is_locked(self) -> bool { + self & LOCKED_BIT != 0 + } + + #[inline] + fn is_queue_locked(self) -> bool { + self & QUEUE_LOCKED_BIT != 0 + } + + #[inline] + fn queue_head(self) -> *const ThreadData { + (self & QUEUE_MASK) as *const ThreadData + } + + #[inline] + fn with_queue_head(self, thread_data: *const ThreadData) -> Self { + (self & !QUEUE_MASK) | thread_data as *const _ as usize + } +}