/*
* Copyright (c) 2023 Frank Fischer <frank-fischer@shadow-soft.de>
*
* This program is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>
*/
use super::msg::{ResultMsg, WorkerMsg};
use crate::problem::{FirstOrderProblem, ResultSendError, ResultSender};
use crate::Real;
use log::{debug, info};
use mpi::environment::Universe;
use mpi::topology::{Communicator, SystemCommunicator};
use serde::Serialize;
use crate::mpi::msg::{recv_msg, send_msg};
use crate::mpi::problem::DistributedFirstOrderProblem;
use std::sync::mpsc::{channel, Sender};
use std::thread;
struct WorkerResultSender<P: FirstOrderProblem> {
index: usize,
sender: Sender<ResultMsg<P::Minorant, P::Err>>,
}
impl<P: FirstOrderProblem> Drop for WorkerResultSender<P> {
fn drop(&mut self) {
let _ = self.sender.send(ResultMsg::Done { index: self.index });
}
}
impl<P: FirstOrderProblem> ResultSender<P> for WorkerResultSender<P> {
fn objective(&self, value: Real) -> Result<(), ResultSendError> {
self.sender
.send(ResultMsg::ObjectiveValue {
index: self.index,
value,
})
.map_err(|_| ResultSendError::Connection)
}
fn minorant(&self, minorant: P::Minorant) -> Result<(), ResultSendError> {
self.sender
.send(ResultMsg::Minorant {
index: self.index,
minorant,
})
.map_err(|_| ResultSendError::Connection)
}
fn error(&self, err: P::Err) -> Result<(), ResultSendError> {
self.sender
.send(ResultMsg::Error {
index: self.index,
error: err,
})
.map_err(|_| ResultSendError::Connection)
}
}
pub struct Worker<P: FirstOrderProblem + 'static> {
universe: Universe,
problem: P,
}
impl<P: DistributedFirstOrderProblem> Worker<P>
where
P::Minorant: Serialize,
P::Err: Serialize,
{
pub fn new(universe: Universe, problem: P) -> Self {
Worker { universe, problem }
}
pub fn run(&mut self) {
let (result_tx, result_rx) = channel();
let myrank = self.universe.world().rank();
info!("Start worker process {}", myrank);
// spawn the thread receiving incoming requests from the host
thread::scope(|scope| {
scope.spawn(move || {
let world = SystemCommunicator::world();
let host = world.process_at_rank(0);
loop {
debug!("Worker recv...");
let msg = recv_msg(&host);
match msg {
WorkerMsg::Terminate => {
result_tx.send(ResultMsg::Terminate).unwrap();
break;
}
WorkerMsg::ApplyUpdate(update) => {
if let Err(err) = self.problem.apply_update(&update) {
send_msg(&host, &ResultMsg::<P::Minorant, _>::Error { index: 0, error: err });
}
}
WorkerMsg::Evaluate { i, y } => {
debug!("Evaluate sub:{} on worker:{}", i, myrank);
if let Err(err) = self.problem.evaluate(
i,
y,
WorkerResultSender {
index: i,
sender: result_tx.clone(),
},
) {
send_msg(&host, &ResultMsg::<P::Minorant, _>::Error { index: i, error: err });
}
debug!("Evaluate sub:{} on worker:{} returned", i, myrank);
}
}
}
debug!("Terminate receive thread on worker:{}", myrank);
});
// spawn the thread receiving evaluation results forwarding them to the host
scope.spawn(move || {
let world = SystemCommunicator::world();
let host = world.process_at_rank(0);
loop {
let rmsg = result_rx.recv().expect("channel receive error");
debug!("Send result message on {}", myrank);
send_msg(&host, &rmsg);
if let ResultMsg::Terminate = rmsg {
break;
}
}
debug!("Terminate send thread on worker:{}", myrank);
});
});
info!("Terminate worker process {}", myrank);
}
}