Advertisement
Advertisement

新足迹

 找回密码
 注册
新足迹 门户 工作学习 查看内容

rust language tokio sleep hang forever

2023-4-2 09:36| 发布者: DDD888 | 查看: 1187| 原文链接

好奇怪,我调用 tokio sleep,就掉在那sleep函数里了,但改成调用thread::sleep就可以

下面是完整代码

use tokio;

use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sleep;
//use std::thread;
use std:uration;

use crate::manager::Manager;
use crossbeam_channel::Receiver;
pub async fn my_async_task(
    the_manager: Arc<Mutex<Manager>>,
    index: usize,
    signal_receiver: Receiver<()>,
) -> String {
    while true {
        let result = the_manager.lock().await.get();
        if result == 0 {
            return "".to_string();
        }

        if result == 1 {
            break;
        }
        // result = 2
        let _ = signal_receiver.recv();
        println!("debug my_async_task get recv signal {:#?}", index);
        //sleep(Duration::from_millis(10000)).await;
    }
    println!("debug my_async_task start {:#?}", index);

    sleep(Duration::from_millis(300)).await;
    //thread::sleep(Duration::from_secs(2));
    println!("debug my_async_task finished {:#?}", index);

    the_manager.lock().await.release();
    format!("{}", index)
}
+++++++++++++++++++++++++++++++++
use crossbeam_channel::Sender;

pub struct Manager {
    count: usize,
    total: usize,
    job_max: usize,
    signal_sender: Sender<()>,
}

impl Manager {
    pub fn new(job_max: usize, total: usize, sender: Sender<()>) -> Manager {
        Manager {
            count: 0,
            job_max,
            total,
            signal_sender: sender,
        }
    }

    pub fn get(&mut self) -> usize {
        if self.total < 1 {
            return 0;
        }

        if self.count < self.job_max {
            self.count = self.count + 1;
            return 1;
        }
        2
    }
    pub fn release(&mut self) {
        if self.count > 0 {
            self.count = self.count - 1;
        }
        if self.total > 0 {
            self.total = self.total - 1;
            if self.total > 0 {
                let _ = self.signal_sender.send(());
            }
        }
    }
}
++++++++++++++++++++++++++++++++++
mod manager;
mod my_async_task;

use tokio;

use futures::stream;
use futures::stream::StreamExt;
use std::sync::Arc;
use tokio::runtime::Builder;
use tokio::sync::Mutex;

use crate::{manager::Manager, my_async_task::my_async_task};
use crossbeam_channel::unbounded;

fn main() {
    let job_max = 2;
    let total = 6;
    // Create a channel of unbounded capacity.
    let (s, rx) = unbounded();

    let manager = Arc::new(Mutex::new(Manager::new(job_max, total, s)));
    let runtime = Builder::new_multi_thread()
        .enable_all()
        .thread_name("multi-thread")
        .worker_threads(1)
        .build()
        .unwrap();

    let v = runtime.block_on(async {
        let mut tasks = vec![];
        for index in 0..total {
            let rx_clone = rx.clone();
            tasks.push(tokio::spawn(my_async_task(
                manager.clone(),
                index,
                rx_clone,
            )));
        }

        let fetches = stream::iter(tasks)
            .buffer_unordered(job_max)
            .collect::<Vec<_>>();

        fetches.await
    });
    println!("length={}", v.len());
}
Advertisement
Advertisement


Advertisement
Advertisement
返回顶部