RsBundle  Diff

Differences From Artifact [755c0a7ef7]:

  • File src/mpi/worker.rs — part of check-in [cc34a43d31] at 2023-07-08 10:58:54 on branch mpi — mpi: single worker, multiple evaluations (user: fifr size: 4901)

To Artifact [56026b40be]:

  • File src/mpi/worker.rs — part of check-in [32faf2a516] at 2023-07-08 17:43:17 on branch mpi — mpi::worker: add debug output (user: fifr size: 5314)

87
88
89
90
91
92
93
94
95
96


97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120

121
122


123
124
125
126
127
128
129
130
131

132
133
134
135
136

137
138
139
140
141
142
        let (result_tx, result_rx) = channel();
        let myrank = self.universe.world().rank();

        info!("Start worker process {}", myrank);

        // spawn the thread receiving incoming requests from the host
        thread::scope(|scope| {
            scope.spawn(move || loop {
                let world = SystemCommunicator::world();
                let host = world.process_at_rank(0);


                let msg = recv_msg(&host);

                match msg {
                    WorkerMsg::Terminate => {
                        result_tx.send(ResultMsg::Terminate).unwrap();
                        break;
                    }
                    WorkerMsg::ApplyUpdate(update) => {
                        if let Err(err) = self.problem.apply_update(&update) {
                            send_msg(&host, &ResultMsg::<P::Minorant, _>::Error { index: 0, error: err });
                        }
                    }
                    WorkerMsg::Evaluate { i, y } => {
                        debug!("Evaluate sub:{} on worker:{}", i, myrank);
                        if let Err(err) = self.problem.evaluate(
                            i,
                            y,
                            WorkerResultSender {
                                index: i,
                                sender: result_tx.clone(),
                            },
                        ) {
                            send_msg(&host, &ResultMsg::<P::Minorant, _>::Error { index: i, error: err });
                        }

                    }
                }


            });

            // spawn the thread receiving evaluation results forwarding them to the host
            scope.spawn(move || {
                let world = SystemCommunicator::world();
                let host = world.process_at_rank(0);
                loop {
                    let rmsg = result_rx.recv().expect("channel receive error");
                    debug!("Send result message on {}", myrank);

                    if let ResultMsg::Terminate = rmsg {
                        break;
                    }
                    send_msg(&host, &rmsg);
                }

            });
        });

        info!("Terminate worker process {}", myrank);
    }
}







|


>
>
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
>
|
|
>
>









>



<

>






87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140

141
142
143
144
145
146
147
148
        let (result_tx, result_rx) = channel();
        let myrank = self.universe.world().rank();

        info!("Start worker process {}", myrank);

        // spawn the thread receiving incoming requests from the host
        thread::scope(|scope| {
            scope.spawn(move || {
                let world = SystemCommunicator::world();
                let host = world.process_at_rank(0);
                loop {
                    debug!("Worker recv...");
                    let msg = recv_msg(&host);

                    match msg {
                        WorkerMsg::Terminate => {
                            result_tx.send(ResultMsg::Terminate).unwrap();
                            break;
                        }
                        WorkerMsg::ApplyUpdate(update) => {
                            if let Err(err) = self.problem.apply_update(&update) {
                                send_msg(&host, &ResultMsg::<P::Minorant, _>::Error { index: 0, error: err });
                            }
                        }
                        WorkerMsg::Evaluate { i, y } => {
                            debug!("Evaluate sub:{} on worker:{}", i, myrank);
                            if let Err(err) = self.problem.evaluate(
                                i,
                                y,
                                WorkerResultSender {
                                    index: i,
                                    sender: result_tx.clone(),
                                },
                            ) {
                                send_msg(&host, &ResultMsg::<P::Minorant, _>::Error { index: i, error: err });
                            }
                            debug!("Evaluate sub:{} on worker:{} returned", i, myrank);
                        }
                    }
                }
                debug!("Terminate receive thread on worker:{}", myrank);
            });

            // spawn the thread receiving evaluation results forwarding them to the host
            scope.spawn(move || {
                let world = SystemCommunicator::world();
                let host = world.process_at_rank(0);
                loop {
                    let rmsg = result_rx.recv().expect("channel receive error");
                    debug!("Send result message on {}", myrank);
                    send_msg(&host, &rmsg);
                    if let ResultMsg::Terminate = rmsg {
                        break;
                    }

                }
                debug!("Terminate send thread on worker:{}", myrank);
            });
        });

        info!("Terminate worker process {}", myrank);
    }
}