好奇怪,我调用 tokio sleep,就掉在那sleep函数里了,但改成调用thread::sleep就可以 下面是完整代码 use tokio; use std::sync::Arc; use tokio::sync::Mutex; use tokio::sleep; //use std::thread; use std:uration; use crate::manager::Manager; use crossbeam_channel::Receiver; pub async fn my_async_task( the_manager: Arc<Mutex<Manager>>, index: usize, signal_receiver: Receiver<()>, ) -> String { while true { let result = the_manager.lock().await.get(); if result == 0 { return "".to_string(); } if result == 1 { break; } // result = 2 let _ = signal_receiver.recv(); println!("debug my_async_task get recv signal {:#?}", index); //sleep(Duration::from_millis(10000)).await; } println!("debug my_async_task start {:#?}", index); sleep(Duration::from_millis(300)).await; //thread::sleep(Duration::from_secs(2)); println!("debug my_async_task finished {:#?}", index); the_manager.lock().await.release(); format!("{}", index) } +++++++++++++++++++++++++++++++++ use crossbeam_channel::Sender; pub struct Manager { count: usize, total: usize, job_max: usize, signal_sender: Sender<()>, } impl Manager { pub fn new(job_max: usize, total: usize, sender: Sender<()>) -> Manager { Manager { count: 0, job_max, total, signal_sender: sender, } } pub fn get(&mut self) -> usize { if self.total < 1 { return 0; } if self.count < self.job_max { self.count = self.count + 1; return 1; } 2 } pub fn release(&mut self) { if self.count > 0 { self.count = self.count - 1; } if self.total > 0 { self.total = self.total - 1; if self.total > 0 { let _ = self.signal_sender.send(()); } } } } ++++++++++++++++++++++++++++++++++ mod manager; mod my_async_task; use tokio; use futures::stream; use futures::stream::StreamExt; use std::sync::Arc; use tokio::runtime::Builder; use tokio::sync::Mutex; use crate::{manager::Manager, my_async_task::my_async_task}; use crossbeam_channel::unbounded; fn main() { let job_max = 2; let total = 6; // Create a channel of unbounded capacity. let (s, rx) = unbounded(); let manager = Arc::new(Mutex::new(Manager::new(job_max, total, s))); let runtime = Builder::new_multi_thread() .enable_all() .thread_name("multi-thread") .worker_threads(1) .build() .unwrap(); let v = runtime.block_on(async { let mut tasks = vec![]; for index in 0..total { let rx_clone = rx.clone(); tasks.push(tokio::spawn(my_async_task( manager.clone(), index, rx_clone, ))); } let fetches = stream::iter(tasks) .buffer_unordered(job_max) .collect::<Vec<_>>(); fetches.await }); println!("length={}", v.len()); } |