summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/thread_pool/worker.rs
blob: 18c0db1f78d89558d48c5f0594e19ebeb2e60336 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
use crate::loom::cell::CausalCell;
use crate::loom::sync::Arc;
use crate::park::Park;
use crate::runtime::park::Parker;
use crate::runtime::thread_pool::{current, slice, Owned, Shared, Spawner};
use crate::runtime::{self, blocking};
use crate::task::Task;

use std::cell::Cell;
use std::marker::PhantomData;
use std::sync::atomic::Ordering::Relaxed;
use std::time::Duration;

thread_local! {
    /// Used to handle block_in_place
    static ON_BLOCK: Cell<Option<*const dyn Fn()>> = Cell::new(None)
}

cfg_blocking! {
    pub(crate) fn block_in_place<F, R>(f: F) -> R
    where
        F: FnOnce() -> R,
    {
        // Make the current worker give away its Worker to another thread so that we can safely block
        // this one without preventing progress on other futures the worker owns.
        ON_BLOCK.with(|ob| {
            let allow_blocking = ob
                .get()
                .expect("can only call blocking when on Tokio runtime");

            // This is safe, because ON_BLOCK was set from an &mut dyn FnMut in the worker that wraps
            // the worker's operation, and is unset just prior to when the FnMut is dropped.
            let allow_blocking = unsafe { &*allow_blocking };

            allow_blocking();
            f()
        })
    }
}

pub(crate) struct Worker {
    /// Parks the thread. Requires the calling worker to have obtained unique
    /// access via the generation synchronization action.
    inner: Arc<Inner>,

    /// Scheduler slices
    slices: Arc<slice::Set>,

    /// Slice assigned to this worker
    index: usize,

    /// Worker generation. This is used to synchronize access to the internal
    /// data.
    generation: usize,

    /// To indicate that the Worker has been given away and should no longer be used
    gone: Cell<bool>,
}

/// Internal worker state. This may be referenced from multiple threads, but the
/// generation guard protects unsafe access
struct Inner {
    /// Used to park the thread
    park: CausalCell<Parker>,
}

unsafe impl Send for Worker {}

/// Used to ensure the invariants are respected
struct GenerationGuard<'a> {
    /// Worker reference
    worker: &'a Worker,

    /// Prevent `Sync` access
    _p: PhantomData<Cell<()>>,
}

struct WorkerGone;

// TODO: Move into slices
pub(super) fn create_set(pool_size: usize, parker: Parker) -> (Arc<slice::Set>, Vec<Worker>) {
    // Create the parks...
    let parkers: Vec<_> = (0..pool_size).map(|_| parker.clone()).collect();

    let mut slices = Arc::new(slice::Set::new(&parkers));

    // Establish the circular link between the individual worker state
    // structure and the container.
    Arc::get_mut(&mut slices).unwrap().set_ptr();

    // This will contain each worker.
    let workers = parkers
        .into_iter()
        .enumerate()
        .map(|(index, parker)| Worker::new(slices.clone(), index, parker))
        .collect();

    (slices, workers)
}

/// After how many ticks is the global queue polled. This helps to ensure
/// fairness.
///
/// The number is fairly arbitrary. I believe this value was copied from golang.
const GLOBAL_POLL_INTERVAL: u16 = 61;

impl Worker {
    // Safe as aquiring a lock is required before doing anything potentially
    // dangerous.
    pub(super) fn new(slices: Arc<slice::Set>, index: usize, park: Parker) -> Self {
        Worker {
            inner: Arc::new(Inner {
                park: CausalCell::new(park),
            }),
            slices,
            index,
            generation: 0,
            gone: Cell::new(false),
        }
    }

    pub(super) fn run(self, blocking_pool: blocking::Spawner) {
        // First, acquire a lock on the worker.
        let guard = match self.acquire_lock() {
            Some(guard) => guard,
            None => return,
        };

        let spawner = Spawner::new(self.slices.clone());

        // Track the current worker
        current::set(&self.slices, self.index, || {
            // Enter a runtime context
            let _enter = crate::runtime::enter();

            crate::runtime::global::with_thread_pool(&spawner, || {
                blocking_pool.enter(|| {
                    ON_BLOCK.with(|ob| {
                        // Ensure that the ON_BLOCK is removed from the thread-local context
                        // when leaving the scope. This handles cases that involve panicking.
                        struct Reset<'a>(&'a Cell<Option<*const dyn Fn()>>);

                        impl<'a> Drop for Reset<'a> {
                            fn drop(&mut self) {
                                self.0.set(None);
                            }
                        }

                        let _reset = Reset(ob);

                        let allow_blocking: &dyn Fn() = &|| self.block_in_place(&blocking_pool);

                        ob.set(Some(unsafe {
                            // NOTE: We cannot use a safe cast to raw pointer here, since we are
                            // _also_ erasing the lifetime of these pointers. That is safe here,
                            // because we know that ob will set back to None before allow_blocking
                            // is dropped.
                            #[allow(clippy::useless_transmute)]
                            std::mem::transmute::<_, *const dyn Fn()>(allow_blocking)
                        }));

                        let _ = guard.run();

                        // Ensure that we reset ob before allow_blocking is dropped.
                        drop(_reset);
                    });
                })
            })
        });

        if self.gone.get() {
            // Synchronize with the pool for load(Acquire) in is_closed to get
            // up-to-date value.
            self.slices.wait_for_unlocked();

            if self.slices.is_closed() {
                // If the pool is shutting down, some other thread may be
                // waiting to clean up after the task that we were holding on
                // to. If we completed that task, we did nothing (because
                // task.run() returned None), and so crucially we did not wait
                // up any such thread.
                //
                // So, we have to do that here.
                self.slices.notify_all();
            }
        }
    }

    /// Acquire the lock
    fn acquire_lock(&self) -> Option<GenerationGuard<'_>> {
        // Safety: Only getting `&self` access to access atomic field
        let owned = unsafe { &*self.slices.owned()[self.index].get() };

        // The lock is only to establish mutual exclusion. Other synchronization
        // handles memory orderings
        let prev = owned.generation.compare_and_swap(
            self.generation,
            self.generation.wrapping_add(1),
            Relaxed,
        );

        if prev == self.generation {
            Some(GenerationGuard {
                worker: self,
                _p: PhantomData,
            })
        } else {
            None
        }
    }

    /// Enter an in-place blocking section
    fn block_in_place(&self, blocking_pool: &blocking::Spawner) {
        // If our Worker has already been given away, then blocking is fine!
        if self.gone.get() {
            return;
        }

        // make sure no subsequent code thinks that it is on a worker
        current::clear();

        // Track that the worker is gone
        self.gone.set(true);

        // If this method is called, we need to move the entire worker onto a
        // separate (blocking) thread before returning. Once we return, the
        // caller is going to execute some blocking code which would otherwise
        // block our reactor from making progress. Since we are _in the middle_
        // of running a task, this isn't trivial, as the Worker is "active".
        // We do have the luxury of knowing that we are on the worker thread,
        // so we can assert exclusive access to any Worker-specific state.
        //
        // More specifically, the caller is _currently_ "stuck" in
        // Entry::run_task at:
        //
        //   if let Some(task) = task.run(self.shared().into()) {
        //
        // And _we_ get to decide when it continues (specifically, by choosing
        // when we return from the second callback (i.e., after the FnOnce
        // passed to blocking has returned).
        //
        // Here's what we'll have to do:
        //
        //  - Reconstruct our `Worker` struct
        //  - Spawn the reconstructed `Worker` on another blocking thread
        //  - Clear any state indicating what worker we are on, since at this
        //    point we are effectively no longer "on" that worker.
        //  - Allow the caller of `blocking` to continue.
        //
        // Once the caller completes the blocking operations, we need to ensure
        // that async code can continue running in that context. Luckily, since
        // `Arc<slice::Set>` has a fallback for when
        // curre