RsBundle  Artifact [e79992a54d]

Artifact e79992a54dc4aeac95af5ba76c0442029bc9426c:

  • File src/mcf/problem.rs — part of check-in [d02e699a01] at 2023-04-05 21:24:04 on branch mpi — Fix many clippy/linter warnings (user: fifr size: 20996)

// Copyright (c) 2016-2023 Frank Fischer <frank-fischer@shadow-soft.de>
//
// This program is free software: you can redistribute it and/or
// modify it under the terms of the GNU General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
// General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program.  If not, see  <http://www.gnu.org/licenses/>
//

use crate::mcf::{self, Solver};
#[cfg(feature = "mpi")]
use crate::mpi::DistributedFirstOrderProblem;
use crate::problem::{
    FirstOrderProblem as ParallelProblem, ResultSender, UpdateSender, UpdateState as ParallelUpdateState,
};
use crate::{DVector, Minorant, Real};

use log::{debug, warn};
use num_traits::Float;
use thiserror::Error;
use threadpool::ThreadPool;

use std::fs::File;
use std::io::{BufRead, BufReader};
use std::result;

use std::sync::{Arc, RwLock};
use std::time::Duration;

/// An error in the mmcf file format.
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum MMCFReadError {
    #[error("format error")]
    Format(#[source] Box<dyn std::error::Error>),
    #[error("missing value '{0}' in line in node file")]
    MissingNodField(&'static str),
    #[error("missing value '{0}' in line in supply file")]
    MissingSupField(&'static str),
    #[error("missing value '{0}' in line in arc file")]
    MissingArcField(&'static str),
    #[error("missing value '{0}' in line in mut file")]
    MissingMutField(&'static str),
    #[error("extra field at the end of line in node file")]
    ExtraNodField,
    #[error("extra field at the end of line in supply file")]
    ExtraSupField,
    #[error("extra field at the end of line in arc file")]
    ExtraArcField,
    #[error("extra field at the end of line in mut file")]
    ExtraMutField,
    #[error("invalid node number {0} (must be in 1..{1})")]
    InvalidNode(usize, usize),
    #[error("invalid arc number {0} (must be in 1..{1})")]
    InvalidArc(usize, usize),
    #[error("invalid com number {0} (must be in 1..{1})")]
    InvalidCom(usize, usize),
    #[error("invalid mut number {0} (must be in 1..{1})")]
    InvalidMut(usize, usize),
    #[error("error in MCF solver")]
    Solver(#[from] crate::mcf::solver::Error),
    #[error("io error")]
    Io(#[from] std::io::Error),
}

impl From<std::num::ParseIntError> for MMCFReadError {
    fn from(err: std::num::ParseIntError) -> MMCFReadError {
        MMCFReadError::Format(Box::new(err))
    }
}

impl From<std::num::ParseFloatError> for MMCFReadError {
    fn from(err: std::num::ParseFloatError) -> MMCFReadError {
        MMCFReadError::Format(Box::new(err))
    }
}

/// Type of errors of the MMCFProblem.
pub type Error = crate::mcf::solver::Error;

/// Result type of the MMCFProblem.
pub type Result<T> = result::Result<T, Error>;

/// Data of a single arc read from an instance file.
#[derive(Clone, Copy, Debug)]
#[allow(dead_code)]
struct ArcInfo {
    /// The number of this arc
    arc: usize,
    /// The source node number
    src: usize,
    /// The sink node number
    snk: usize,
}

/// A single coefficient in a linear coupling constraint
#[derive(Clone, Copy, Debug)]
struct Elem {
    /// The variable (arc) index
    ind: usize,
    /// The coefficient
    val: Real,
}

/// A single MMCF subproblem, i.e. one network.
struct Subproblem {
    /// The (net, cost) pair.
    net: Box<dyn Solver>,
    c: DVector,
    /// optional delay for evaluation
    delay: Option<Duration>,
}

/// Constraint data of one subproblem.
///
/// This information is used to create the augmented objective function, i.e.
/// cbase - lhs'* y as well as the constant term rhs' * y
struct SubData {
    /// The lhs of all constraints. For each constraint this is a list of coefficients.
    lhs: Vec<Vec<Elem>>,
    /// The right-hand side of each constraint
    rhs: DVector,
    /// The base costs of each arc
    cbase: DVector,
}

unsafe impl Send for Subproblem {}
unsafe impl Sync for Subproblem {}

/// Configuration options for MMCFProblem
#[derive(Default)]
pub struct MMCFConfig {
    /// If coupling constraints should be separated.
    pub separate_constraints: bool,
    /// A list of subproblems to be delayed.
    pub delayed_subs: Vec<usize>,
    /// Delay
    pub delay: Duration,
}

/// A single multi-commodity flow problem instance.
///
/// The current implementation always keeps a list of all potential
/// coupling constraints but, depending on the separation choice, the
/// may not be in the model. If separation is used a list of "active"
/// constraints is maintained holding the list of those constraints
/// that are in the model. "inactive" constraints may be separated
/// later on.
pub struct MMCFProblem {
    /// The list of subproblems (single network flows)
    subs: Vec<Arc<RwLock<Subproblem>>>,
    /// The coupling constraints for each subproblem.
    subdatas: Arc<Vec<SubData>>,
    /// The list of currently active constraints
    active_constraints: Arc<RwLock<Vec<usize>>>,
    /// The list of inactive constraints
    inactive_constraints: Arc<RwLock<Vec<usize>>>,
    /// The thread pool for parallel evaluation
    pool: Option<ThreadPool>,
}

impl Subproblem {
    fn evaluate<I>(&mut self, data: &SubData, y: &[Real], active: I) -> Result<(Real, DVector, DVector)>
    where
        I: IntoIterator<Item = usize>,
        I::IntoIter: Clone,
    {
        let active = active.into_iter();
        // lhs and rhs in correct order
        let lhs = &data.lhs;
        let lhs = active.clone().map(|i| &lhs[i]);

        // compute costs
        self.c.clear();
        self.c.extend(data.cbase.iter());
        for (y, lhs) in y.iter().zip(lhs.clone()) {
            for elem in lhs {
                self.c[elem.ind] += y * elem.val;
            }
        }

        debug!("y={:?}", y);
        debug!("c={}", self.c);

        // solve subproblem
        self.net.set_objective(&self.c)?;
        self.net.solve()?;
        debug!("obj={}", self.net.objective()?);

        // compute minorant
        let (objective, mut subg) = if data.rhs.len() > 0 {
            let rhs = active.map(|i| data.rhs[i]);
            let rhsval = y.iter().zip(rhs.clone()).map(|(y, r)| y * r).sum::<Real>();
            (rhsval - self.net.objective()?, rhs.collect())
        } else {
            (-self.net.objective()?, dvec![0.0; y.len()])
        };

        let sol = self.net.get_solution()?;
        for (i, lhs) in lhs.enumerate() {
            subg[i] -= lhs.iter().map(|elem| elem.val * sol[elem.ind]).sum::<Real>();
        }

        if let Some(delay) = self.delay {
            std::thread::sleep(delay);
        }

        Ok((objective, subg, sol))
    }
}

impl SubData {
    fn evaluate_constraint(&self, primal: &DVector, cidx: usize) -> Real {
        let rhs = self.rhs.get(cidx).unwrap_or(&0.0);
        let lhs = self.lhs[cidx]
            .iter()
            .map(|elem| elem.val * primal[elem.ind])
            .sum::<Real>();
        rhs - lhs
    }
}

impl MMCFProblem {
    pub fn num_subproblems(&self) -> usize {
        self.subs.len()
    }

    pub fn read_mnetgen(basename: &str, config: MMCFConfig) -> result::Result<MMCFProblem, MMCFReadError> {
        MMCFProblem::read_mnetgen_with_solver::<mcf::NetSpxSolver>(basename, config)
    }

    pub fn read_mnetgen_with_solver<S>(basename: &str, config: MMCFConfig) -> result::Result<MMCFProblem, MMCFReadError>
    where
        S: Solver + 'static,
    {
        use MMCFReadError::*;

        let ncom;
        let nnodes;
        let narcs;
        let ncaps;

        {
            let mut data = File::open(&format!("{}.nod", basename)).map(BufReader::new)?.lines();
            ncom = data.next().ok_or(MissingNodField("ncom"))??.parse::<usize>()?;
            nnodes = data.next().ok_or(MissingNodField("nnodes"))??.parse::<usize>()?;
            narcs = data.next().ok_or(MissingNodField("narcs"))??.parse::<usize>()?;
            ncaps = data.next().ok_or(MissingNodField("ncaps"))??.parse::<usize>()?;
            if data.next().is_some() {
                return Err(ExtraNodField);
            }
        }

        // read nodes
        let mut nets = Vec::with_capacity(ncom);
        for i in 0..ncom {
            nets.push(S::new(i, nnodes)?)
        }
        for line in File::open(&format!("{}.sup", basename)).map(BufReader::new)?.lines() {
            let line = line?;
            let mut data = line.split_whitespace();
            let node = data.next().ok_or(MissingSupField("node"))?.parse::<usize>()?;
            let com = data.next().ok_or(MissingSupField("com"))?.parse::<usize>()?;
            let supply = data.next().ok_or(MissingSupField("supply"))?.parse::<Real>()?;
            if data.next().is_some() {
                return Err(ExtraSupField);
            }
            if node < 1 || node > nnodes {
                return Err(InvalidNode(node, nnodes));
            }
            if com < 1 || com > ncom {
                return Err(InvalidCom(com, ncom));
            }
            nets[com - 1].set_balance(node - 1, supply)?;
        }

        // read arcs
        let mut arcmap = vec![vec![]; ncom];
        let mut cbase = vec![dvec![]; ncom];

        // lhs nonzeros
        let mut lhsidx = vec![vec![vec![]; ncom]; ncaps];
        for line in File::open(&format!("{}.arc", basename)).map(BufReader::new)?.lines() {
            let line = line?;
            let mut data = line.split_whitespace();
            let arc = data.next().ok_or(MissingArcField("arc"))?.parse::<usize>()?;
            let src = data.next().ok_or(MissingArcField("src"))?.parse::<usize>()?;
            let snk = data.next().ok_or(MissingArcField("snk"))?.parse::<usize>()?;
            let com = data.next().ok_or(MissingArcField("com"))?.parse::<usize>()?;
            let cost = data.next().ok_or(MissingArcField("cost"))?.parse::<Real>()?;
            let cap = data.next().ok_or(MissingArcField("cap"))?.parse::<Real>()?;
            let mt = data.next().ok_or(MissingArcField("mt"))?.parse::<usize>()?;
            if data.next().is_some() {
                return Err(ExtraArcField);
            }
            if arc < 1 || arc > narcs {
                return Err(InvalidArc(arc, narcs));
            }
            if src < 1 || src > nnodes {
                return Err(InvalidNode(src, nnodes));
            }
            if snk < 1 || snk > nnodes {
                return Err(InvalidNode(snk, nnodes));
            }
            if com < 1 || com > ncom {
                return Err(InvalidCom(com, ncom));
            }
            if mt > ncaps {
                return Err(InvalidMut(mt, ncaps));
            }
            let com = com - 1;
            // set internal coeff
            let coeff = arcmap[com].len();
            arcmap[com].push(ArcInfo { arc, src, snk });
            // add arc
            nets[com].add_arc(src - 1, snk - 1, cost, if cap < 0.0 { f64::INFINITY } else { cap })?;
            // set objective
            cbase[com].push(cost); // + 1e-6 * coeff
                                   // add to mutual capacity constraint
            if mt > 0 {
                lhsidx[(mt - 1)][com].push(coeff);
            }
        }

        // read rhs of coupling constraints
        let mut rhs = dvec![0.0; ncaps];
        for line in File::open(&format!("{}.mut", basename)).map(BufReader::new)?.lines() {
            let line = line?;
            let mut data = line.split_whitespace();
            let mt = data.next().ok_or(MissingMutField("mt"))?.parse::<usize>()?;
            let cap = data.next().ok_or(MissingMutField("cap"))?.parse::<Real>()?;
            if data.next().is_some() {
                return Err(ExtraMutField);
            }
            if mt < 1 || mt > ncaps {
                return Err(InvalidMut(mt, ncaps));
            }
            rhs[mt - 1] = cap;
        }

        // set lhs
        let lhs = (0..ncom)
            .map(|fidx| {
                (0..ncaps)
                    .map(|i| lhsidx[i][fidx].iter().map(|&j| Elem { ind: j, val: 1.0 }).collect())
                    .collect::<Vec<_>>()
            })
            .collect::<Vec<_>>();

        let subdatas = Arc::new(
            cbase
                .into_iter()
                .zip(lhs)
                .zip(std::iter::once(rhs).chain(std::iter::repeat(dvec![])))
                .map(|((cbase, lhs), rhs)| SubData { lhs, rhs, cbase })
                .collect(),
        );

        let subproblems = nets
            .into_iter()
            .enumerate()
            .map(|(i, net)| Subproblem {
                net: Box::new(net),
                c: dvec![],
                delay: if config.delayed_subs.contains(&i) {
                    Some(config.delay)
                } else {
                    None
                },
            })
            .map(RwLock::new)
            .map(Arc::new)
            .collect();

        let mut mcf = MMCFProblem {
            subs: subproblems,
            subdatas,
            active_constraints: Arc::new(RwLock::new((0..ncaps).collect())),
            inactive_constraints: Arc::new(RwLock::new(vec![])),
            pool: None,
        };
        mcf.set_separate_constraints(config.separate_constraints);
        Ok(mcf)
    }

    /// Set whether coupling constraints should be separated.
    ///
    /// If `separate` is true than all constraints will be made
    /// inactive, otherwise all constraints will be made active.
    pub fn set_separate_constraints(&mut self, separate: bool) {
        let nconstrs = self.subdatas[0].lhs.len();
        if separate {
            self.active_constraints.write().unwrap().clear();
            self.inactive_constraints = Arc::new(RwLock::new((0..nconstrs).collect()));
        } else {
            self.active_constraints = Arc::new(RwLock::new((0..nconstrs).collect()));
            self.inactive_constraints.write().unwrap().clear();
        }
    }

    /// Compute costs for a primal solution.
    pub fn get_primal_costs(&self, fidx: usize, primals: &DVector) -> Real {
        let sub = &self.subdatas[fidx];
        primals.iter().enumerate().map(|(i, x)| x * sub.cbase[i]).sum()
    }

    /// Aggregate primal vectors.
    pub fn aggregate_primals_ref(&self, primals: &[(Real, &Vec<DVector>)]) -> Vec<DVector> {
        let mut aggr = primals[0]
            .1
            .iter()
            .map(|x| {
                let mut r = dvec![];
                r.scal(primals[0].0, x);
                r
            })
            .collect::<Vec<_>>();

        for &(alpha, primal) in &primals[1..] {
            for (j, x) in primal.iter().enumerate() {
                aggr[j].add_scaled(alpha, x);
            }
        }

        aggr
    }

    fn create_update<U>(&self, state: U, apply: impl FnOnce(Vec<usize>) -> Result<()> + Send + 'static) -> Result<bool>
    where
        U: ParallelUpdateState<DVector>,
    {
        if self.inactive_constraints.read().unwrap().is_empty() {
            return Ok(false);
        }

        let subdatas = self.subdatas.clone();
        let inactive_constraints = self.inactive_constraints.clone();

        self.pool.as_ref().unwrap().execute(move || {
            let newconstraints = {
                inactive_constraints
                    .read()
                    .unwrap()
                    .iter()
                    .map(|&cidx| {
                        subdatas
                            .iter()
                            .enumerate()
                            .map(|(fidx, sub)| {
                                let primal = state.aggregated_primal(fidx);
                                sub.evaluate_constraint(primal, cidx)
                            })
                            .sum::<Real>()
                    })
                    .enumerate()
                    .filter_map(|(i, sg)| if sg < 1e-3 { Some(i) } else { None })
                    .collect::<Vec<_>>()
            };

            if !newconstraints.is_empty() {
                apply(newconstraints).unwrap();
            }
        });

        Ok(true)
    }

    fn apply_update(
        update: &[usize],
        active: Arc<RwLock<Vec<usize>>>,
        inactive: Arc<RwLock<Vec<usize>>>,
    ) -> result::Result<(), Error> {
        let mut inactive = inactive.write().unwrap();
        let mut active = active.write().unwrap();
        active.extend(update.iter().rev().map(|&i| inactive.swap_remove(i)));
        Ok(())
    }

    fn send_update<S>(
        subdatas: Arc<Vec<SubData>>,
        actives: Arc<RwLock<Vec<usize>>>,
        update: &[usize],
        tx: S,
    ) -> Result<()>
    where
        S: UpdateSender<Self> + 'static,
    {
        let nnew = update.len();
        let actives = actives.read().unwrap().clone();
        if let Err(err) = tx.add_variables(
            vec![(0.0, Real::infinity()); nnew],
            Box::new(move |fidx: usize, m: &mut (Real, DVector, DVector)| {
                let sub = &subdatas[fidx];
                let n = m.dim();
                let primal = &m.2;
                m.1.extend((n..actives.len()).map(|i| sub.evaluate_constraint(primal, actives[i])));
                Ok(())
            }),
        ) {
            warn!("Error sending problem update: {}", err);
        }

        Ok(())
    }
}

impl ParallelProblem for MMCFProblem {
    type Err = Error;

    type Minorant = (Real, DVector, DVector);

    fn num_variables(&self) -> usize {
        self.active_constraints.read().unwrap().len()
    }

    fn lower_bounds(&self) -> Option<Vec<Real>> {
        Some(vec![0.0; self.num_variables()])
    }

    fn num_subproblems(&self) -> usize {
        self.subs.len()
    }

    fn start(&mut self) {
        let pool = ThreadPool::new(4);
        self.pool = Some(pool);
    }

    fn stop(&mut self) {
        self.pool.take();
    }

    fn evaluate<S>(&mut self, i: usize, y: Arc<DVector>, tx: S) -> Result<()>
    where
        S: ResultSender<Self> + 'static,
    {
        if self.pool.is_none() {
            self.start()
        }
        let sub = self.subs[i].clone();
        let subdatas = self.subdatas.clone();
        // Attention: `y` might be shorter than the set of active constraints
        // because evaluation and problem update are not synchronized, i.e. the
        // evaluation may still use an older model with less constraints.
        let active_constraints = self.active_constraints.read().unwrap()[0..y.len()].to_vec();
        self.pool.as_ref().unwrap().execute(move || {
            let subdata = &subdatas[i];
            match sub.write().unwrap().evaluate(subdata, &y, active_constraints) {
                Ok((objective, subg, primal)) => {
                    tx.objective(objective).unwrap();
                    tx.minorant((objective, subg, primal)).unwrap();
                }
                Err(err) => tx.error(err).unwrap(),
            }
        });
        Ok(())
    }

    fn update<U, S>(&mut self, state: U, tx: S) -> Result<()>
    where
        U: ParallelUpdateState<<Self::Minorant as Minorant>::Primal>,
        S: UpdateSender<Self> + 'static,
    {
        if self.inactive_constraints.read().unwrap().is_empty() {
            return Ok(());
        }

        let inactive_constraints = self.inactive_constraints.clone();
        let active_constraints = self.active_constraints.clone();
        let subdatas = self.subdatas.clone();
        MMCFProblem::create_update(self, state, move |update| {
            MMCFProblem::apply_update(&update, active_constraints.clone(), inactive_constraints)?;
            MMCFProblem::send_update(subdatas, active_constraints, &update, tx)?;
            Ok(())
        })?;

        Ok(())
    }
}

impl DistributedFirstOrderProblem for MMCFProblem {
    type Update = Vec<usize>;

    fn create_update<U>(
        &self,
        state: U,
        apply: impl FnOnce(Self::Update) -> result::Result<(), Self::Err> + Send + 'static,
    ) -> result::Result<bool, Self::Err>
    where
        U: ParallelUpdateState<<Self::Minorant as Minorant>::Primal>,
    {
        MMCFProblem::create_update(self, state, apply)
    }

    fn apply_update(&mut self, update: &Self::Update) -> result::Result<(), Self::Err> {
        MMCFProblem::apply_update(
            update,
            self.active_constraints.clone(),
            self.inactive_constraints.clone(),
        )
    }

    fn send_update<S>(&self, update: &Self::Update, tx: S) -> Result<()>
    where
        S: UpdateSender<Self> + 'static,
        Self: Sized,
    {
        MMCFProblem::send_update(self.subdatas.clone(), self.active_constraints.clone(), update, tx)
    }
}

unsafe impl Send for MMCFProblem {}
unsafe impl Sync for MMCFProblem {}