RsBundle  Diff

Differences From Artifact [06f2bedc27]:

  • File src/mpi/problem.rs — part of check-in [7bbf05c830] at 2023-08-29 10:15:39 on branch add-subproblems — mpi: fix reference in doc (user: fifr size: 15999) [more...]

To Artifact [ea96420eea]:

  • File src/mpi/problem.rs — part of check-in [e5cba3104f] at 2024-01-02 16:50:28 on branch trunk — Update `mpi` to v0.7 (user: fifr size: 15999) [more...]

19
20
21
22
23
24
25
26

27
28
29
30
31
32
33
19
20
21
22
23
24
25

26
27
28
29
30
31
32
33







-
+







use crate::problem::{
    FirstOrderProblem, ResultSender, SubgradientExtender, UpdateSendError, UpdateSender, UpdateState,
};
use crate::{DVector, Minorant, Real};

use log::{debug, error, info};
use mpi::environment::Universe;
use mpi::topology::{Communicator, SystemCommunicator};
use mpi::topology::{Communicator, SimpleCommunicator};
use mpi::Rank;
use serde::{Deserialize, Serialize};
use std::sync::mpsc::{channel, Sender};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use thiserror::Error;

use crate::mpi::msg::{recv_msg, send_msg};
202
203
204
205
206
207
208
209

210
211
212
213
214
215
216
217
218
219
220
221
222

223
224
225
226
227
228
229
202
203
204
205
206
207
208

209
210
211
212
213
214
215
216
217
218
219
220
221

222
223
224
225
226
227
228
229







-
+












-
+







        let nsubs = {
            let mut p = self.problem.write().unwrap();
            p.start();
            p.num_subproblems()
        };

        if self.compute_tx.is_none() {
            let world = SystemCommunicator::world();
            let world = SimpleCommunicator::world();
            let nworkers = world.size() - 1;

            let (compute_tx, compute_rx) = channel::<ClientMessage<P>>();

            // First start one receiver thread for each worker. Each thread gets the result-channels
            // (for the algorithm) via another channel.
            let result_txs = (0..nworkers)
                .map(|client_idx| {
                    let rank = client_idx as Rank + 1;
                    let (req_tx, req_rx) = channel::<(usize, Box<dyn ResultSender<Self>>)>();
                    let compute_tx = compute_tx.clone();
                    thread::spawn(move || {
                        let world = SystemCommunicator::world();
                        let world = SimpleCommunicator::world();
                        let worker = world.process_at_rank(rank);
                        let mut result_txs = (0..nsubs).map(|_| None).collect::<Vec<_>>();
                        loop {
                            let msg = recv_msg(&worker);

                            let index = match msg {
                                ResultMsg::ObjectiveValue { index, .. } => index,
265
266
267
268
269
270
271
272

273
274
275
276
277
278
279
265
266
267
268
269
270
271

272
273
274
275
276
277
278
279







-
+







                    });
                    req_tx
                })
                .collect::<Vec<_>>();

            // This thread forwards computation requests to the workers.
            thread::spawn(move || {
                let world = SystemCommunicator::world();
                let world = SimpleCommunicator::world();
                let mut numjobs = vec![0; nworkers as usize];
                let mut start_times = vec![Instant::now(); nworkers as usize];
                while let Ok(clientmsg) = compute_rx.recv() {
                    match clientmsg {
                        ClientMessage::WorkerTask { msg, result_tx } => match msg {
                            WorkerMsg::Terminate | WorkerMsg::ApplyUpdate(..) => {
                                // send message to all workers