RsBundle  Diff

Differences From Artifact [06f2bedc27]:

  • File src/mpi/problem.rs — part of check-in [7bbf05c830] at 2023-08-29 10:15:39 on branch add-subproblems — mpi: fix reference in doc (user: fifr size: 15999) [more...]

To Artifact [ea96420eea]:

  • File src/mpi/problem.rs — part of check-in [e5cba3104f] at 2024-01-02 16:50:28 on branch trunk — Update `mpi` to v0.7 (user: fifr size: 15999) [more...]

19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
use crate::problem::{
    FirstOrderProblem, ResultSender, SubgradientExtender, UpdateSendError, UpdateSender, UpdateState,
};
use crate::{DVector, Minorant, Real};

use log::{debug, error, info};
use mpi::environment::Universe;
use mpi::topology::{Communicator, SystemCommunicator};
use mpi::Rank;
use serde::{Deserialize, Serialize};
use std::sync::mpsc::{channel, Sender};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use thiserror::Error;

use crate::mpi::msg::{recv_msg, send_msg};







|







19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
use crate::problem::{
    FirstOrderProblem, ResultSender, SubgradientExtender, UpdateSendError, UpdateSender, UpdateState,
};
use crate::{DVector, Minorant, Real};

use log::{debug, error, info};
use mpi::environment::Universe;
use mpi::topology::{Communicator, SimpleCommunicator};
use mpi::Rank;
use serde::{Deserialize, Serialize};
use std::sync::mpsc::{channel, Sender};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use thiserror::Error;

use crate::mpi::msg::{recv_msg, send_msg};
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
        let nsubs = {
            let mut p = self.problem.write().unwrap();
            p.start();
            p.num_subproblems()
        };

        if self.compute_tx.is_none() {
            let world = SystemCommunicator::world();
            let nworkers = world.size() - 1;

            let (compute_tx, compute_rx) = channel::<ClientMessage<P>>();

            // First start one receiver thread for each worker. Each thread gets the result-channels
            // (for the algorithm) via another channel.
            let result_txs = (0..nworkers)
                .map(|client_idx| {
                    let rank = client_idx as Rank + 1;
                    let (req_tx, req_rx) = channel::<(usize, Box<dyn ResultSender<Self>>)>();
                    let compute_tx = compute_tx.clone();
                    thread::spawn(move || {
                        let world = SystemCommunicator::world();
                        let worker = world.process_at_rank(rank);
                        let mut result_txs = (0..nsubs).map(|_| None).collect::<Vec<_>>();
                        loop {
                            let msg = recv_msg(&worker);

                            let index = match msg {
                                ResultMsg::ObjectiveValue { index, .. } => index,







|












|







202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
        let nsubs = {
            let mut p = self.problem.write().unwrap();
            p.start();
            p.num_subproblems()
        };

        if self.compute_tx.is_none() {
            let world = SimpleCommunicator::world();
            let nworkers = world.size() - 1;

            let (compute_tx, compute_rx) = channel::<ClientMessage<P>>();

            // First start one receiver thread for each worker. Each thread gets the result-channels
            // (for the algorithm) via another channel.
            let result_txs = (0..nworkers)
                .map(|client_idx| {
                    let rank = client_idx as Rank + 1;
                    let (req_tx, req_rx) = channel::<(usize, Box<dyn ResultSender<Self>>)>();
                    let compute_tx = compute_tx.clone();
                    thread::spawn(move || {
                        let world = SimpleCommunicator::world();
                        let worker = world.process_at_rank(rank);
                        let mut result_txs = (0..nsubs).map(|_| None).collect::<Vec<_>>();
                        loop {
                            let msg = recv_msg(&worker);

                            let index = match msg {
                                ResultMsg::ObjectiveValue { index, .. } => index,
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
                    });
                    req_tx
                })
                .collect::<Vec<_>>();

            // This thread forwards computation requests to the workers.
            thread::spawn(move || {
                let world = SystemCommunicator::world();
                let mut numjobs = vec![0; nworkers as usize];
                let mut start_times = vec![Instant::now(); nworkers as usize];
                while let Ok(clientmsg) = compute_rx.recv() {
                    match clientmsg {
                        ClientMessage::WorkerTask { msg, result_tx } => match msg {
                            WorkerMsg::Terminate | WorkerMsg::ApplyUpdate(..) => {
                                // send message to all workers







|







265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
                    });
                    req_tx
                })
                .collect::<Vec<_>>();

            // This thread forwards computation requests to the workers.
            thread::spawn(move || {
                let world = SimpleCommunicator::world();
                let mut numjobs = vec![0; nworkers as usize];
                let mut start_times = vec![Instant::now(); nworkers as usize];
                while let Ok(clientmsg) = compute_rx.recv() {
                    match clientmsg {
                        ClientMessage::WorkerTask { msg, result_tx } => match msg {
                            WorkerMsg::Terminate | WorkerMsg::ApplyUpdate(..) => {
                                // send message to all workers