RsBundle  Artifact [7f4a789290]

Artifact 7f4a789290cca01a0afc30d9441eea283cb38559:

  • File src/mpi/worker.rs — part of check-in [18f093f760] at 2023-07-08 17:50:44 on branch mpi — mpi::worker: `send` may fail in `Drop`. This is not problematic as the other end point should be closed anyway. (user: fifr size: 5313) [more...]

/*
 * Copyright (c) 2023 Frank Fischer <frank-fischer@shadow-soft.de>
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see  <http://www.gnu.org/licenses/>
 */

use super::msg::{ResultMsg, WorkerMsg};
use crate::problem::{FirstOrderProblem, ResultSendError, ResultSender};
use crate::Real;

use log::{debug, info};
use mpi::environment::Universe;
use mpi::topology::{Communicator, SystemCommunicator};
use serde::Serialize;

use crate::mpi::msg::{recv_msg, send_msg};
use crate::mpi::problem::DistributedFirstOrderProblem;
use std::sync::mpsc::{channel, Sender};
use std::thread;

struct WorkerResultSender<P: FirstOrderProblem> {
    index: usize,
    sender: Sender<ResultMsg<P::Minorant, P::Err>>,
}

impl<P: FirstOrderProblem> Drop for WorkerResultSender<P> {
    fn drop(&mut self) {
        let _ = self.sender.send(ResultMsg::Done { index: self.index });
    }
}

impl<P: FirstOrderProblem> ResultSender<P> for WorkerResultSender<P> {
    fn objective(&self, value: Real) -> Result<(), ResultSendError> {
        self.sender
            .send(ResultMsg::ObjectiveValue {
                index: self.index,
                value,
            })
            .map_err(|_| ResultSendError::Connection)
    }

    fn minorant(&self, minorant: P::Minorant) -> Result<(), ResultSendError> {
        self.sender
            .send(ResultMsg::Minorant {
                index: self.index,
                minorant,
            })
            .map_err(|_| ResultSendError::Connection)
    }

    fn error(&self, err: P::Err) -> Result<(), ResultSendError> {
        self.sender
            .send(ResultMsg::Error {
                index: self.index,
                error: err,
            })
            .map_err(|_| ResultSendError::Connection)
    }
}

pub struct Worker<P: FirstOrderProblem + 'static> {
    universe: Universe,
    problem: P,
}

impl<P: DistributedFirstOrderProblem> Worker<P>
where
    P::Minorant: Serialize,
    P::Err: Serialize,
{
    pub fn new(universe: Universe, problem: P) -> Self {
        Worker { universe, problem }
    }

    pub fn run(&mut self) {
        let (result_tx, result_rx) = channel();
        let myrank = self.universe.world().rank();

        info!("Start worker process {}", myrank);

        // spawn the thread receiving incoming requests from the host
        thread::scope(|scope| {
            scope.spawn(move || {
                let world = SystemCommunicator::world();
                let host = world.process_at_rank(0);
                loop {
                    debug!("Worker recv...");
                    let msg = recv_msg(&host);

                    match msg {
                        WorkerMsg::Terminate => {
                            result_tx.send(ResultMsg::Terminate).unwrap();
                            break;
                        }
                        WorkerMsg::ApplyUpdate(update) => {
                            if let Err(err) = self.problem.apply_update(&update) {
                                send_msg(&host, &ResultMsg::<P::Minorant, _>::Error { index: 0, error: err });
                            }
                        }
                        WorkerMsg::Evaluate { i, y } => {
                            debug!("Evaluate sub:{} on worker:{}", i, myrank);
                            if let Err(err) = self.problem.evaluate(
                                i,
                                y,
                                WorkerResultSender {
                                    index: i,
                                    sender: result_tx.clone(),
                                },
                            ) {
                                send_msg(&host, &ResultMsg::<P::Minorant, _>::Error { index: i, error: err });
                            }
                            debug!("Evaluate sub:{} on worker:{} returned", i, myrank);
                        }
                    }
                }
                debug!("Terminate receive thread on worker:{}", myrank);
            });

            // spawn the thread receiving evaluation results forwarding them to the host
            scope.spawn(move || {
                let world = SystemCommunicator::world();
                let host = world.process_at_rank(0);
                loop {
                    let rmsg = result_rx.recv().expect("channel receive error");
                    debug!("Send result message on {}", myrank);
                    send_msg(&host, &rmsg);
                    if let ResultMsg::Terminate = rmsg {
                        break;
                    }
                }
                debug!("Terminate send thread on worker:{}", myrank);
            });
        });

        info!("Terminate worker process {}", myrank);
    }
}