work.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584
  1. /*
  2. * Copyright (c) 2013-2014 Richard Braun.
  3. *
  4. * This program is free software: you can redistribute it and/or modify
  5. * it under the terms of the GNU General Public License as published by
  6. * the Free Software Foundation, either version 3 of the License, or
  7. * (at your option) any later version.
  8. *
  9. * This program is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. * GNU General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU General Public License
  15. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  16. */
  17. #include <assert.h>
  18. #include <errno.h>
  19. #include <stdalign.h>
  20. #include <stddef.h>
  21. #include <stdio.h>
  22. #include <kern/bitmap.h>
  23. #include <kern/error.h>
  24. #include <kern/init.h>
  25. #include <kern/kmem.h>
  26. #include <kern/list.h>
  27. #include <kern/log.h>
  28. #include <kern/macros.h>
  29. #include <kern/panic.h>
  30. #include <kern/percpu.h>
  31. #include <kern/spinlock.h>
  32. #include <kern/syscnt.h>
  33. #include <kern/thread.h>
  34. #include <kern/work.h>
  35. #include <machine/cpu.h>
  36. #define WORK_PRIO_NORMAL THREAD_SCHED_FS_PRIO_DEFAULT
  37. #define WORK_PRIO_HIGH THREAD_SCHED_FS_PRIO_MAX
  38. #define WORK_INVALID_CPU ((unsigned int)-1)
  39. /*
  40. * Keep at least that many threads alive when a work pool is idle.
  41. */
  42. #define WORK_THREADS_SPARE 4
  43. /*
  44. * When computing the maximum number of worker threads, start with multiplying
  45. * the number of processors by the ratio below. If the result is greater than
  46. * the threshold, retry by decreasing the ratio until either the result is
  47. * less than the threshold or the ratio is 1.
  48. */
  49. #define WORK_THREADS_RATIO 4
  50. #define WORK_THREADS_THRESHOLD 512
  51. #define WORK_MAX_THREADS MAX(CONFIG_MAX_CPUS, WORK_THREADS_THRESHOLD)
  52. /*
  53. * Work pool flags.
  54. */
  55. #define WORK_PF_GLOBAL 0x1 /* System-wide work queue */
  56. #define WORK_PF_HIGHPRIO 0x2 /* High priority worker threads */
  57. struct work_thread {
  58. struct list node;
  59. struct thread *thread;
  60. struct work_pool *pool;
  61. unsigned int id;
  62. };
  63. /*
  64. * Pool of threads and works.
  65. *
  66. * Interrupts must be disabled when accessing a work pool. Holding the
  67. * lock is required for global pools only, whereas exclusive access on
  68. * per-processor pools is achieved by disabling preemption.
  69. *
  70. * There are two internal queues of pending works. When first scheduling
  71. * a work, it is inserted into queue0. After a periodic event, works still
  72. * present in queue0 are moved to queue1. If these works are still present
  73. * in queue1 at the next periodic event, it means they couldn't be processed
  74. * for a complete period between two periodic events, at which point it is
  75. * assumed that processing works on the same processor they were queued on
  76. * becomes less relevant. As a result, periodic events also trigger the
  77. * transfer of works from queue1 to the matching global pool. Global pools
  78. * only use one queue.
  79. */
  80. struct work_pool {
  81. alignas(CPU_L1_SIZE) struct spinlock lock;
  82. int flags;
  83. struct work_queue queue0;
  84. struct work_queue queue1;
  85. struct work_thread *manager;
  86. struct syscnt sc_transfers;
  87. unsigned int cpu;
  88. unsigned int max_threads;
  89. unsigned int nr_threads;
  90. unsigned int nr_available_threads;
  91. struct list available_threads;
  92. struct list dead_threads;
  93. BITMAP_DECLARE(bitmap, WORK_MAX_THREADS);
  94. };
  95. static int work_thread_create(struct work_pool *pool, unsigned int id);
  96. static struct work_pool work_pool_cpu_main __percpu;
  97. static struct work_pool work_pool_cpu_highprio __percpu;
  98. static struct work_pool work_pool_main;
  99. static struct work_pool work_pool_highprio;
  100. static struct kmem_cache work_thread_cache;
  101. static unsigned int
  102. work_pool_alloc_id(struct work_pool *pool)
  103. {
  104. int bit;
  105. assert(pool->nr_threads < pool->max_threads);
  106. pool->nr_threads++;
  107. bit = bitmap_find_first_zero(pool->bitmap, pool->max_threads);
  108. assert(bit >= 0);
  109. bitmap_set(pool->bitmap, bit);
  110. return bit;
  111. }
  112. static void
  113. work_pool_free_id(struct work_pool *pool, unsigned int id)
  114. {
  115. assert(pool->nr_threads != 0);
  116. pool->nr_threads--;
  117. bitmap_clear(pool->bitmap, id);
  118. }
  119. static unsigned int
  120. work_pool_cpu_id(const struct work_pool *pool)
  121. {
  122. assert(!(pool->flags & WORK_PF_GLOBAL));
  123. return pool->cpu;
  124. }
  125. static unsigned int
  126. work_pool_compute_max_threads(unsigned int nr_cpus)
  127. {
  128. unsigned int max_threads, ratio;
  129. ratio = WORK_THREADS_RATIO;
  130. max_threads = nr_cpus * ratio;
  131. while ((ratio > 1) && (max_threads > WORK_THREADS_THRESHOLD)) {
  132. ratio--;
  133. max_threads = nr_cpus * ratio;
  134. }
  135. assert(max_threads != 0);
  136. assert(max_threads <= WORK_MAX_THREADS);
  137. return max_threads;
  138. }
  139. static void __init
  140. work_pool_init(struct work_pool *pool)
  141. {
  142. spinlock_init(&pool->lock);
  143. work_queue_init(&pool->queue0);
  144. work_queue_init(&pool->queue1);
  145. pool->manager = NULL;
  146. }
  147. static void __init
  148. work_pool_build(struct work_pool *pool, unsigned int cpu, int flags)
  149. {
  150. char name[SYSCNT_NAME_SIZE];
  151. const char *suffix;
  152. unsigned int id, nr_cpus, max_threads;
  153. int error;
  154. pool->flags = flags;
  155. if (flags & WORK_PF_GLOBAL) {
  156. nr_cpus = cpu_count();
  157. pool->cpu = WORK_INVALID_CPU;
  158. } else {
  159. nr_cpus = 1;
  160. suffix = (flags & WORK_PF_HIGHPRIO) ? "h" : "";
  161. snprintf(name, sizeof(name), "work_transfers/%u%s", cpu, suffix);
  162. syscnt_register(&pool->sc_transfers, name);
  163. pool->cpu = cpu;
  164. }
  165. max_threads = work_pool_compute_max_threads(nr_cpus);
  166. pool->max_threads = max_threads;
  167. pool->nr_threads = 0;
  168. pool->nr_available_threads = 0;
  169. list_init(&pool->available_threads);
  170. list_init(&pool->dead_threads);
  171. bitmap_zero(pool->bitmap, WORK_MAX_THREADS);
  172. id = work_pool_alloc_id(pool);
  173. error = work_thread_create(pool, id);
  174. if (error) {
  175. goto error_thread;
  176. }
  177. return;
  178. error_thread:
  179. panic("work: unable to create initial worker thread");
  180. }
  181. static struct work_pool *
  182. work_pool_cpu_select(int flags)
  183. {
  184. return (flags & WORK_HIGHPRIO)
  185. ? cpu_local_ptr(work_pool_cpu_highprio)
  186. : cpu_local_ptr(work_pool_cpu_main);
  187. }
  188. static void
  189. work_pool_acquire(struct work_pool *pool, unsigned long *flags)
  190. {
  191. if (pool->flags & WORK_PF_GLOBAL) {
  192. spinlock_lock_intr_save(&pool->lock, flags);
  193. } else {
  194. thread_preempt_disable_intr_save(flags);
  195. }
  196. }
  197. static void
  198. work_pool_release(struct work_pool *pool, unsigned long flags)
  199. {
  200. if (pool->flags & WORK_PF_GLOBAL) {
  201. spinlock_unlock_intr_restore(&pool->lock, flags);
  202. } else {
  203. thread_preempt_enable_intr_restore(flags);
  204. }
  205. }
  206. static int
  207. work_pool_nr_works(const struct work_pool *pool)
  208. {
  209. return (work_queue_nr_works(&pool->queue0)
  210. + work_queue_nr_works(&pool->queue1));
  211. }
  212. static struct work *
  213. work_pool_pop_work(struct work_pool *pool)
  214. {
  215. if (!(pool->flags & WORK_PF_GLOBAL)) {
  216. if (work_queue_nr_works(&pool->queue1) != 0) {
  217. return work_queue_pop(&pool->queue1);
  218. }
  219. }
  220. return work_queue_pop(&pool->queue0);
  221. }
  222. static void
  223. work_pool_wakeup_manager(struct work_pool *pool)
  224. {
  225. if (work_pool_nr_works(pool) == 0) {
  226. return;
  227. }
  228. if (pool->manager != NULL) {
  229. thread_wakeup(pool->manager->thread);
  230. }
  231. }
  232. static void
  233. work_pool_shift_queues(struct work_pool *pool, struct work_queue *old_queue)
  234. {
  235. assert(!(pool->flags & WORK_PF_GLOBAL));
  236. work_queue_transfer(old_queue, &pool->queue1);
  237. work_queue_transfer(&pool->queue1, &pool->queue0);
  238. work_queue_init(&pool->queue0);
  239. if (work_queue_nr_works(old_queue) != 0) {
  240. syscnt_inc(&pool->sc_transfers);
  241. }
  242. }
  243. static void
  244. work_pool_push_work(struct work_pool *pool, struct work *work)
  245. {
  246. work_queue_push(&pool->queue0, work);
  247. work_pool_wakeup_manager(pool);
  248. }
  249. static void
  250. work_pool_concat_queue(struct work_pool *pool, struct work_queue *queue)
  251. {
  252. work_queue_concat(&pool->queue0, queue);
  253. work_pool_wakeup_manager(pool);
  254. }
  255. static void
  256. work_thread_destroy(struct work_thread *worker)
  257. {
  258. thread_join(worker->thread);
  259. kmem_cache_free(&work_thread_cache, worker);
  260. }
  261. static void
  262. work_process(void *arg)
  263. {
  264. struct work_thread *self, *worker;
  265. struct work_pool *pool;
  266. struct work *work;
  267. struct spinlock *lock;
  268. unsigned long flags;
  269. unsigned int id;
  270. int error;
  271. self = arg;
  272. pool = self->pool;
  273. lock = (pool->flags & WORK_PF_GLOBAL) ? &pool->lock : NULL;
  274. work_pool_acquire(pool, &flags);
  275. for (;;) {
  276. if (pool->manager != NULL) {
  277. list_insert_tail(&pool->available_threads, &self->node);
  278. pool->nr_available_threads++;
  279. do {
  280. thread_sleep(lock, pool, "work_spr");
  281. } while (pool->manager != NULL);
  282. list_remove(&self->node);
  283. pool->nr_available_threads--;
  284. }
  285. if (!list_empty(&pool->dead_threads)) {
  286. worker = list_first_entry(&pool->dead_threads,
  287. struct work_thread, node);
  288. list_remove(&worker->node);
  289. work_pool_release(pool, flags);
  290. id = worker->id;
  291. work_thread_destroy(worker);
  292. /*
  293. * Release worker ID last so that, if the pool is full, no new
  294. * worker can be created unless all the resources of the worker
  295. * being destroyed have been freed. This is important to enforce
  296. * a strict boundary on the total amount of resources allocated
  297. * for a pool at any time.
  298. */
  299. work_pool_acquire(pool, &flags);
  300. work_pool_free_id(pool, id);
  301. continue;
  302. }
  303. if (work_pool_nr_works(pool) == 0) {
  304. if (pool->nr_threads > WORK_THREADS_SPARE) {
  305. break;
  306. }
  307. pool->manager = self;
  308. do {
  309. thread_sleep(lock, pool, "work_mgr");
  310. } while (work_pool_nr_works(pool) == 0);
  311. pool->manager = NULL;
  312. }
  313. work = work_pool_pop_work(pool);
  314. if (work_pool_nr_works(pool) != 0) {
  315. if (pool->nr_available_threads != 0) {
  316. worker = list_first_entry(&pool->available_threads,
  317. struct work_thread, node);
  318. thread_wakeup(worker->thread);
  319. } else if (pool->nr_threads < pool->max_threads) {
  320. id = work_pool_alloc_id(pool);
  321. work_pool_release(pool, flags);
  322. error = work_thread_create(pool, id);
  323. work_pool_acquire(pool, &flags);
  324. if (error) {
  325. work_pool_free_id(pool, id);
  326. log_warning("work: unable to create worker thread");
  327. }
  328. }
  329. }
  330. work_pool_release(pool, flags);
  331. work->fn(work);
  332. work_pool_acquire(pool, &flags);
  333. }
  334. list_insert_tail(&pool->dead_threads, &self->node);
  335. work_pool_release(pool, flags);
  336. }
  337. static int
  338. work_thread_create(struct work_pool *pool, unsigned int id)
  339. {
  340. char name[THREAD_NAME_SIZE];
  341. struct thread_attr attr;
  342. struct cpumap *cpumap;
  343. struct work_thread *worker;
  344. const char *suffix;
  345. unsigned short priority;
  346. int error;
  347. worker = kmem_cache_alloc(&work_thread_cache);
  348. if (worker == NULL) {
  349. return ENOMEM;
  350. }
  351. worker->pool = pool;
  352. worker->id = id;
  353. if (pool->flags & WORK_PF_HIGHPRIO) {
  354. suffix = "h";
  355. priority = WORK_PRIO_HIGH;
  356. } else {
  357. suffix = "";
  358. priority = WORK_PRIO_NORMAL;
  359. }
  360. if (pool->flags & WORK_PF_GLOBAL) {
  361. cpumap = NULL;
  362. snprintf(name, sizeof(name),
  363. THREAD_KERNEL_PREFIX "work_process/g:%u%s",
  364. worker->id, suffix);
  365. } else {
  366. unsigned int pool_id;
  367. error = cpumap_create(&cpumap);
  368. if (error) {
  369. goto error_cpumap;
  370. }
  371. pool_id = work_pool_cpu_id(pool);
  372. cpumap_zero(cpumap);
  373. cpumap_set(cpumap, pool_id);
  374. snprintf(name, sizeof(name),
  375. THREAD_KERNEL_PREFIX "work_process/%u:%u%s",
  376. pool_id, worker->id, suffix);
  377. }
  378. thread_attr_init(&attr, name);
  379. thread_attr_set_priority(&attr, priority);
  380. if (cpumap != NULL) {
  381. thread_attr_set_cpumap(&attr, cpumap);
  382. }
  383. error = thread_create(&worker->thread, &attr, work_process, worker);
  384. if (cpumap != NULL) {
  385. cpumap_destroy(cpumap);
  386. }
  387. if (error) {
  388. goto error_thread;
  389. }
  390. return 0;
  391. error_thread:
  392. error_cpumap:
  393. kmem_cache_free(&work_thread_cache, worker);
  394. return error;
  395. }
  396. static int __init
  397. work_bootstrap(void)
  398. {
  399. work_pool_init(cpu_local_ptr(work_pool_cpu_main));
  400. work_pool_init(cpu_local_ptr(work_pool_cpu_highprio));
  401. return 0;
  402. }
  403. INIT_OP_DEFINE(work_bootstrap,
  404. INIT_OP_DEP(cpu_setup, true),
  405. INIT_OP_DEP(spinlock_setup, true),
  406. INIT_OP_DEP(thread_bootstrap, true));
  407. static int __init
  408. work_setup(void)
  409. {
  410. kmem_cache_init(&work_thread_cache, "work_thread",
  411. sizeof(struct work_thread), 0, NULL, 0);
  412. for (unsigned int i = 1; i < cpu_count(); i++) {
  413. work_pool_init(percpu_ptr(work_pool_cpu_main, i));
  414. work_pool_init(percpu_ptr(work_pool_cpu_highprio, i));
  415. }
  416. work_pool_init(&work_pool_main);
  417. work_pool_init(&work_pool_highprio);
  418. for (unsigned int i = 0; i < cpu_count(); i++) {
  419. work_pool_build(percpu_ptr(work_pool_cpu_main, i), i, 0);
  420. work_pool_build(percpu_ptr(work_pool_cpu_highprio, i), i,
  421. WORK_PF_HIGHPRIO);
  422. }
  423. work_pool_build(&work_pool_main, WORK_INVALID_CPU, WORK_PF_GLOBAL);
  424. work_pool_build(&work_pool_highprio, WORK_INVALID_CPU,
  425. WORK_PF_GLOBAL | WORK_PF_HIGHPRIO);
  426. log_info("work: threads per pool (per-cpu/global): %u/%u, spare: %u",
  427. percpu_var(work_pool_cpu_main.max_threads, 0),
  428. work_pool_main.max_threads, WORK_THREADS_SPARE);
  429. return 0;
  430. }
  431. INIT_OP_DEFINE(work_setup,
  432. INIT_OP_DEP(cpu_mp_probe, true),
  433. INIT_OP_DEP(cpumap_setup, true),
  434. INIT_OP_DEP(kmem_setup, true),
  435. INIT_OP_DEP(log_setup, true),
  436. INIT_OP_DEP(spinlock_setup, true),
  437. INIT_OP_DEP(syscnt_setup, true),
  438. INIT_OP_DEP(thread_setup, true),
  439. INIT_OP_DEP(work_bootstrap, true));
  440. void
  441. work_schedule(struct work *work, int flags)
  442. {
  443. struct work_pool *pool;
  444. unsigned long cpu_flags;
  445. thread_pin();
  446. pool = work_pool_cpu_select(flags);
  447. work_pool_acquire(pool, &cpu_flags);
  448. work_pool_push_work(pool, work);
  449. work_pool_release(pool, cpu_flags);
  450. thread_unpin();
  451. }
  452. void
  453. work_queue_schedule(struct work_queue *queue, int flags)
  454. {
  455. struct work_pool *pool;
  456. unsigned long cpu_flags;
  457. thread_pin();
  458. pool = work_pool_cpu_select(flags);
  459. work_pool_acquire(pool, &cpu_flags);
  460. work_pool_concat_queue(pool, queue);
  461. work_pool_release(pool, cpu_flags);
  462. thread_unpin();
  463. }
  464. void
  465. work_report_periodic_event(void)
  466. {
  467. struct work_queue queue, highprio_queue;
  468. assert(thread_check_intr_context());
  469. work_pool_shift_queues(cpu_local_ptr(work_pool_cpu_main), &queue);
  470. work_pool_shift_queues(cpu_local_ptr(work_pool_cpu_highprio),
  471. &highprio_queue);
  472. if (work_queue_nr_works(&queue) != 0) {
  473. spinlock_lock(&work_pool_main.lock);
  474. work_pool_concat_queue(&work_pool_main, &queue);
  475. spinlock_unlock(&work_pool_main.lock);
  476. }
  477. if (work_queue_nr_works(&highprio_queue) != 0) {
  478. spinlock_lock(&work_pool_highprio.lock);
  479. work_pool_concat_queue(&work_pool_highprio, &highprio_queue);
  480. spinlock_unlock(&work_pool_highprio.lock);
  481. }
  482. }