Commit 9b1c2e08 by Andrew Tulloch Committed by Tianqi Chen

[Runtime] [ThreadPool] Make SpscTaskQueue::Pop(..) spin_count configurable (#3577)

In cases where we have multiple models or threadpools active, spinning around
`sched_yield()` may not be desirable, as it prevents the OS from effectively
scheduling other threads.

Thus, allow users to conditionally disable this behaviour (via an environment
variable `TVM_THREAD_POOL_SPIN_COUNT`, similar to existing environment flags for
the thread pool such as `TVM_BIND_THREADS`, etc).

This substantially improves tail latencies in some of our multi-tenant
workloads in practice.

Unit tests have been added - on my laptop, running:

```
TVM_THREAD_POOL_SPIN_COUNT=0 ./build/threading_backend_test;
TVM_THREAD_POOL_SPIN_COUNT=1 ./build/threading_backend_test;
./build/threading_backend_test;
```

gives https://gist.github.com/ajtulloch/1805ca6cbaa27f5d442d23f9d0021ce6 (i.e.
97ms -> <1ms after this change)
parent 19eb829e
...@@ -6,9 +6,9 @@ ...@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, * Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an * software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
...@@ -44,6 +44,19 @@ const constexpr int kL1CacheBytes = 64; ...@@ -44,6 +44,19 @@ const constexpr int kL1CacheBytes = 64;
namespace tvm { namespace tvm {
namespace runtime { namespace runtime {
namespace {
constexpr uint32_t kDefaultSpinCount = 300000;
uint32_t GetSpinCount() {
const char* val = getenv("TVM_THREAD_POOL_SPIN_COUNT");
if (!val) {
return kDefaultSpinCount;
}
return atoi(val);
}
} // namespace
// stride in the page, fit to cache line. // stride in the page, fit to cache line.
constexpr int kSyncStride = 64 / sizeof(std::atomic<int>); constexpr int kSyncStride = 64 / sizeof(std::atomic<int>);
...@@ -176,7 +189,7 @@ class SpscTaskQueue { ...@@ -176,7 +189,7 @@ class SpscTaskQueue {
* \param spin_count The number of iterations to spin before sleep. * \param spin_count The number of iterations to spin before sleep.
* \return Whether pop is successful (true) or we need to exit now (false). * \return Whether pop is successful (true) or we need to exit now (false).
*/ */
bool Pop(Task* output, uint32_t spin_count = 300000) { bool Pop(Task* output, uint32_t spin_count) {
// Busy wait a bit when the queue is empty. // Busy wait a bit when the queue is empty.
// If a new task comes to the queue quickly, this wait avoid the worker from sleeping. // If a new task comes to the queue quickly, this wait avoid the worker from sleeping.
// The default spin count is set by following the typical omp convention // The default spin count is set by following the typical omp convention
...@@ -335,7 +348,11 @@ class ThreadPool { ...@@ -335,7 +348,11 @@ class ThreadPool {
SpscTaskQueue* queue = queues_[worker_id].get(); SpscTaskQueue* queue = queues_[worker_id].get();
SpscTaskQueue::Task task; SpscTaskQueue::Task task;
ParallelLauncher::ThreadLocal()->is_worker = true; ParallelLauncher::ThreadLocal()->is_worker = true;
while (queue->Pop(&task)) { // Initialize the spin count (from envvar TVM_THREAD_POOL_SPIN_COUNT) on
// the global first use of the ThreadPool.
// TODO(tulloch): should we make this configurable via standard APIs?
static size_t spin_count = GetSpinCount();
while (queue->Pop(&task, spin_count)) {
CHECK(task.launcher != nullptr); CHECK(task.launcher != nullptr);
TVMParallelGroupEnv* penv = &(task.launcher->env); TVMParallelGroupEnv* penv = &(task.launcher->env);
void* cdata = task.launcher->cdata; void* cdata = task.launcher->cdata;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <atomic>
#include <memory>
#include <thread>
#include <gtest/gtest.h>
#include <tvm/runtime/c_backend_api.h>
constexpr size_t N = 128;
static FTVMParallelLambda atomic_add_task_id = [](int task_id, TVMParallelGroupEnv* penv,
void* cdata) -> int {
auto* data = reinterpret_cast<std::atomic<size_t>*>(cdata);
const size_t N_per_task = (N + penv->num_task - 1) / penv->num_task;
for (size_t i = task_id * N_per_task; i < N && i < (task_id + 1) * N_per_task; ++i) {
data->fetch_add(i, std::memory_order_relaxed);
}
return 0;
};
TEST(ThreadingBackend, TVMBackendParallelLaunch) {
std::atomic<size_t> acc(0);
TVMBackendParallelLaunch(atomic_add_task_id, &acc, 0);
EXPECT_EQ(acc.load(std::memory_order_relaxed), N * (N - 1) / 2);
}
TEST(ThreadingBackend, TVMBackendParallelLaunchMultipleThreads) {
// TODO(tulloch) use parameterised tests when available.
size_t num_jobs_per_thread = 3;
size_t max_num_threads = 2;
for (size_t num_threads = 1; num_threads < max_num_threads; ++num_threads) {
std::vector<std::unique_ptr<std::thread>> ts;
for (size_t i = 0; i < num_threads; ++i) {
ts.emplace_back(new std::thread([&]() {
for (size_t j = 0; j < num_jobs_per_thread; ++j) {
std::atomic<size_t> acc(0);
TVMBackendParallelLaunch(atomic_add_task_id, &acc, 0);
EXPECT_EQ(acc.load(std::memory_order_relaxed), N * (N - 1) / 2);
}
}));
}
for (auto& t : ts) {
t->join();
}
}
}
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
testing::FLAGS_gtest_death_test_style = "threadsafe";
return RUN_ALL_TESTS();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment