RsBundle  Artifact [76d4d166a8]

Artifact 76d4d166a8aae66a191f149ad96e3a9eec92012e:

  • File src/mcf/problem.rs — part of check-in [be42fa28cd] at 2020-07-18 12:24:34 on branch minorant-primal — Add `primal` field to `Minorant` (user: fifr size: 18719) [more...]

// Copyright (c) 2016-2020 Frank Fischer <frank-fischer@shadow-soft.de>
//
// This program is free software: you can redistribute it and/or
// modify it under the terms of the GNU General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
// General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program.  If not, see  <http://www.gnu.org/licenses/>
//

use crate::mcf;
use crate::problem::{
    FirstOrderProblem as ParallelProblem, ResultSender, UpdateSender, UpdateState as ParallelUpdateState,
};
use crate::{DVector, Minorant, Real};

use log::{debug, warn};
use num_traits::Float;
use threadpool::ThreadPool;

use std::f64::INFINITY;
use std::fmt;
use std::fs::File;
use std::io::Read;
use std::result;
use std::sync::{Arc, RwLock};

/// An error in the mmcf file format.
#[derive(Debug)]
pub enum MMCFReadError {
    Format(String),
    MissingNodField(&'static str),
    MissingSupField(&'static str),
    MissingArcField(&'static str),
    MissingMutField(&'static str),
    ExtraNodField,
    ExtraSupField,
    ExtraArcField,
    ExtraMutField,
    InvalidNode(usize, usize),
    InvalidArc(usize, usize),
    InvalidCom(usize, usize),
    InvalidMut(usize, usize),
    Solver(mcf::solver::Error),
    Io(std::io::Error),
}

impl fmt::Display for MMCFReadError {
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
        use MMCFReadError::*;
        match self {
            Format(msg) => write!(fmt, "Format error: {}", msg),
            MissingNodField(what) => write!(fmt, "Missing value '{}' in line in node file", what),
            MissingSupField(what) => write!(fmt, "Missing value '{}' in line in supply file", what),
            MissingArcField(what) => write!(fmt, "Missing value '{}' in line in arc file", what),
            MissingMutField(what) => write!(fmt, "Missing value '{}' in line in mut file", what),
            ExtraNodField => write!(fmt, "Extra field at the end of line in node file"),
            ExtraSupField => write!(fmt, "Extra field at the end of line in supply file"),
            ExtraArcField => write!(fmt, "Extra field at the end of line in arc file"),
            ExtraMutField => write!(fmt, "Extra field at the end of line in mut file"),
            InvalidNode(u, n) => write!(fmt, "Invalid node number {} (must be in 1..{})", u, n),
            InvalidArc(u, n) => write!(fmt, "Invalid arc number {} (must be in 1..{})", u, n),
            InvalidCom(u, n) => write!(fmt, "Invalid com number {} (must be in 1..{})", u, n),
            InvalidMut(u, n) => write!(fmt, "Invalid mut number {} (must be in 1..{})", u, n),
            Solver(err) => err.fmt(fmt),
            Io(err) => err.fmt(fmt),
        }
    }
}

impl std::error::Error for MMCFReadError {
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        use MMCFReadError::*;
        match self {
            Solver(err) => Some(err),
            Io(err) => Some(err),
            _ => None,
        }
    }
}

impl From<mcf::solver::Error> for MMCFReadError {
    fn from(err: mcf::solver::Error) -> MMCFReadError {
        MMCFReadError::Solver(err)
    }
}

impl From<std::io::Error> for MMCFReadError {
    fn from(err: std::io::Error) -> MMCFReadError {
        MMCFReadError::Io(err)
    }
}

impl From<std::num::ParseIntError> for MMCFReadError {
    fn from(err: std::num::ParseIntError) -> MMCFReadError {
        MMCFReadError::Format(format!("Parse error: {}", err))
    }
}

impl From<std::num::ParseFloatError> for MMCFReadError {
    fn from(err: std::num::ParseFloatError) -> MMCFReadError {
        MMCFReadError::Format(format!("Parse error: {}", err))
    }
}

/// Type of errors of the MMCFProblem.
pub type Error = crate::mcf::solver::Error;

/// Result type of the MMCFProblem.
pub type Result<T> = result::Result<T, Error>;

#[derive(Clone, Copy, Debug)]
struct ArcInfo {
    arc: usize,
    src: usize,
    snk: usize,
}

#[derive(Clone, Copy, Debug)]
struct Elem {
    ind: usize,
    val: Real,
}

/// A single MMCF subproblem, i.e. one network.
struct Subproblem {
    net: mcf::Solver,
    lhs: Vec<Vec<Elem>>,
    /// The right-hand side ... might be empty.
    rhs: DVector,
    cbase: DVector,
    c: DVector,
}

unsafe impl Send for Subproblem {}
unsafe impl Sync for Subproblem {}

pub struct MMCFProblem {
    pub multimodel: bool,

    subs: Vec<Arc<RwLock<Subproblem>>>,
    active_constraints: Arc<RwLock<Vec<usize>>>,
    inactive_constraints: Arc<RwLock<Vec<usize>>>,
    pool: Option<ThreadPool>,
}

impl Subproblem {
    fn evaluate<I>(&mut self, y: &[Real], active: I) -> Result<(Real, DVector, DVector)>
    where
        I: IntoIterator<Item = usize>,
        I::IntoIter: Clone,
    {
        let active = active.into_iter();
        // lhs and rhs in correct order
        let lhs = &self.lhs;
        let lhs = active.clone().map(|i| &lhs[i]);

        // compute costs
        self.c.clear();
        self.c.extend(self.cbase.iter());
        for (y, lhs) in y.iter().zip(lhs.clone()) {
            for elem in lhs {
                self.c[elem.ind] += y * elem.val;
            }
        }

        debug!("y={:?}", y);
        debug!("c={}", self.c);
        self.net.set_objective(&self.c)?;

        // solve subproblem
        self.net.solve()?;
        debug!("obj={}", self.net.objective()?);

        // compute minorant
        let (objective, mut subg) = if self.rhs.len() > 0 {
            let rhs = active.map(|i| self.rhs[i]);
            let rhsval = y.iter().zip(rhs.clone()).map(|(y, r)| y * r).sum::<Real>();
            (rhsval - self.net.objective()?, rhs.collect())
        } else {
            (-self.net.objective()?, dvec![0.0; y.len()])
        };

        let sol = self.net.get_solution()?;
        for (i, lhs) in lhs.enumerate() {
            subg[i] -= lhs.iter().map(|elem| elem.val * sol[elem.ind]).sum::<Real>();
        }

        Ok((objective, subg, sol))
    }

    fn evaluate_constraint(&self, primal: &DVector, cidx: usize) -> Real {
        let rhs = self.rhs.get(cidx).unwrap_or(&0.0);
        let lhs = self.lhs[cidx]
            .iter()
            .map(|elem| elem.val * primal[elem.ind])
            .sum::<Real>();
        rhs - lhs
    }
}

impl MMCFProblem {
    pub fn num_subproblems(&self) -> usize {
        self.subs.len()
    }

    pub fn read_mnetgen(basename: &str) -> std::result::Result<MMCFProblem, MMCFReadError> {
        use MMCFReadError::*;

        let mut buffer = String::new();
        {
            let mut f = File::open(&format!("{}.nod", basename))?;
            f.read_to_string(&mut buffer)?;
        }

        let ncom;
        let nnodes;
        let narcs;
        let ncaps;

        {
            let mut data = buffer.split_whitespace();
            ncom = data.next().ok_or_else(|| MissingNodField("ncom"))?.parse::<usize>()?;
            nnodes = data.next().ok_or_else(|| MissingNodField("nnodes"))?.parse::<usize>()?;
            narcs = data.next().ok_or_else(|| MissingNodField("narcs"))?.parse::<usize>()?;
            ncaps = data.next().ok_or_else(|| MissingNodField("ncaps"))?.parse::<usize>()?;
            if data.next().is_some() {
                return Err(ExtraNodField);
            }
        }

        // read nodes
        let mut nets = Vec::with_capacity(ncom);
        for _ in 0..ncom {
            nets.push(mcf::Solver::new(nnodes)?)
        }
        {
            let mut f = File::open(&format!("{}.sup", basename))?;
            buffer.clear();
            f.read_to_string(&mut buffer)?;
        }
        for line in buffer.lines() {
            let mut data = line.split_whitespace();
            let node = data.next().ok_or_else(|| MissingSupField("node"))?.parse::<usize>()?;
            let com = data.next().ok_or_else(|| MissingSupField("com"))?.parse::<usize>()?;
            let supply = data.next().ok_or_else(|| MissingSupField("supply"))?.parse::<Real>()?;
            if data.next().is_some() {
                return Err(ExtraSupField);
            }
            if node < 1 || node > nnodes {
                return Err(InvalidNode(node, nnodes));
            }
            if com < 1 || com > ncom {
                return Err(InvalidCom(com, ncom));
            }
            nets[com - 1].set_balance(node - 1, supply)?;
        }

        // read arcs
        let mut arcmap = vec![vec![]; ncom];
        let mut cbase = vec![dvec![]; ncom];

        // lhs nonzeros
        let mut lhsidx = vec![vec![vec![]; ncom]; ncaps];
        {
            let mut f = File::open(&format!("{}.arc", basename))?;
            buffer.clear();
            f.read_to_string(&mut buffer)?;
        }
        for line in buffer.lines() {
            let mut data = line.split_whitespace();
            let arc = data.next().ok_or_else(|| MissingArcField("arc"))?.parse::<usize>()?;
            let src = data.next().ok_or_else(|| MissingArcField("src"))?.parse::<usize>()?;
            let snk = data.next().ok_or_else(|| MissingArcField("snk"))?.parse::<usize>()?;
            let com = data.next().ok_or_else(|| MissingArcField("com"))?.parse::<usize>()?;
            let cost = data.next().ok_or_else(|| MissingArcField("cost"))?.parse::<Real>()?;
            let cap = data.next().ok_or_else(|| MissingArcField("cap"))?.parse::<Real>()?;
            let mt = data.next().ok_or_else(|| MissingArcField("mt"))?.parse::<usize>()?;
            if data.next().is_some() {
                return Err(ExtraArcField);
            }
            if arc < 1 || arc > narcs {
                return Err(InvalidArc(arc, narcs));
            }
            if src < 1 || src > nnodes {
                return Err(InvalidNode(src, nnodes));
            }
            if snk < 1 || snk > nnodes {
                return Err(InvalidNode(snk, nnodes));
            }
            if com < 1 || com > ncom {
                return Err(InvalidCom(com, ncom));
            }
            if mt > ncaps {
                return Err(InvalidMut(mt, ncaps));
            }
            let com = com - 1;
            // set internal coeff
            let coeff = arcmap[com].len();
            arcmap[com].push(ArcInfo { arc, src, snk });
            // add arc
            nets[com].add_arc(src - 1, snk - 1, cost, if cap < 0.0 { INFINITY } else { cap })?;
            // set objective
            cbase[com].push(cost); // + 1e-6 * coeff
                                   // add to mutual capacity constraint
            if mt > 0 {
                lhsidx[(mt - 1) as usize][com].push(coeff);
            }
        }

        // read rhs of coupling constraints
        {
            let mut f = File::open(&format!("{}.mut", basename))?;
            buffer.clear();
            f.read_to_string(&mut buffer)?;
        }
        let mut rhs = dvec![0.0; ncaps];
        for line in buffer.lines() {
            let mut data = line.split_whitespace();
            let mt = data.next().ok_or_else(|| MissingMutField("mt"))?.parse::<usize>()?;
            let cap = data.next().ok_or_else(|| MissingMutField("cap"))?.parse::<Real>()?;
            if data.next().is_some() {
                return Err(ExtraMutField);
            }
            if mt < 1 || mt > ncaps {
                return Err(InvalidMut(mt, ncaps));
            }
            rhs[mt - 1] = cap;
        }

        // set lhs
        let lhs = (0..ncom)
            .map(|fidx| {
                (0..ncaps)
                    .map(|i| lhsidx[i][fidx].iter().map(|&j| Elem { ind: j, val: 1.0 }).collect())
                    .collect::<Vec<_>>()
            })
            .collect::<Vec<_>>();

        let subproblems = nets
            .into_iter()
            .zip(cbase)
            .zip(lhs)
            .zip(std::iter::once(rhs).chain(std::iter::repeat(dvec![])))
            .map(|(((net, cbase), lhs), rhs)| Subproblem {
                net,
                cbase,
                c: dvec![],
                lhs,
                rhs,
            })
            .map(RwLock::new)
            .map(Arc::new)
            .collect();

        Ok(MMCFProblem {
            multimodel: false,
            subs: subproblems,
            active_constraints: Arc::new(RwLock::new((0..ncaps).collect())),
            inactive_constraints: Arc::new(RwLock::new(vec![])),
            pool: None,
        })
    }

    /// Mark all constraints as being separated.
    pub fn set_separate_constraints(&mut self, separate: bool) {
        let nconstrs = self.subs[0].read().unwrap().lhs.len();
        if separate {
            self.active_constraints.write().unwrap().clear();
            self.inactive_constraints = Arc::new(RwLock::new((0..nconstrs).collect()));
        } else {
            self.active_constraints = Arc::new(RwLock::new((0..nconstrs).collect()));
            self.inactive_constraints.write().unwrap().clear();
        }
    }

    /// Compute costs for a primal solution.
    pub fn get_primal_costs(&self, fidx: usize, primals: &[DVector]) -> Real {
        let sub = self.subs[fidx].read().unwrap();
        if self.multimodel {
            primals[0].iter().enumerate().map(|(i, x)| x * sub.cbase[i]).sum()
        } else {
            let mut sum = 0.0;
            for p in primals {
                for (i, x) in p.iter().enumerate() {
                    sum += x * sub.cbase[i];
                }
            }
            sum
        }
    }

    /// Aggregate primal vectors.
    pub fn aggregate_primals_ref(&self, primals: &[(Real, &Vec<DVector>)]) -> Vec<DVector> {
        let mut aggr = primals[0]
            .1
            .iter()
            .map(|x| {
                let mut r = dvec![];
                r.scal(primals[0].0, x);
                r
            })
            .collect::<Vec<_>>();

        for &(alpha, primal) in &primals[1..] {
            for (j, x) in primal.iter().enumerate() {
                aggr[j].add_scaled(alpha, x);
            }
        }

        aggr
    }
}

impl ParallelProblem for MMCFProblem {
    type Err = Error;

    type Primal = DVector;

    fn num_variables(&self) -> usize {
        self.active_constraints.read().unwrap().len()
    }

    fn lower_bounds(&self) -> Option<Vec<Real>> {
        Some(vec![0.0; self.num_variables()])
    }

    fn num_subproblems(&self) -> usize {
        self.subs.len()
    }

    fn start(&mut self) {
        let pool = ThreadPool::new(4);
        self.pool = Some(pool);
    }

    fn stop(&mut self) {
        self.pool.take();
    }

    fn evaluate<S>(&mut self, i: usize, y: Arc<DVector>, tx: S) -> Result<()>
    where
        S: ResultSender<Self> + 'static,
    {
        if self.pool.is_none() {
            self.start()
        }
        let sub = self.subs[i].clone();
        // Attention: `y` might be shorter than the set of active constraints
        // because evaluation and problem update are not synchronized, i.e. the
        // evaluation may still use an older model with less constraints.
        let active_constraints = self.active_constraints.read().unwrap()[0..y.len()].to_vec();
        self.pool
            .as_ref()
            .unwrap()
            .execute(move || match sub.write().unwrap().evaluate(&y, active_constraints) {
                Ok((objective, subg, primal)) => {
                    tx.objective(objective).unwrap();
                    tx.minorant(Minorant {
                        constant: objective,
                        linear: subg,
                        primal,
                    })
                    .unwrap();
                }
                Err(err) => tx.error(err).unwrap(),
            });
        Ok(())
    }

    fn update<U, S>(&mut self, state: U, tx: S) -> Result<()>
    where
        U: ParallelUpdateState<Self::Primal>,
        S: UpdateSender<Self> + 'static,
    {
        if self.inactive_constraints.read().unwrap().is_empty() {
            return Ok(());
        }

        let inactive_constraints = self.inactive_constraints.clone();
        let active_constraints = self.active_constraints.clone();
        let nold = self.active_constraints.read().unwrap().len();
        let subs = self.subs.clone();

        self.pool.as_ref().unwrap().execute(move || {
            let newconstraints = inactive_constraints
                .read()
                .unwrap()
                .iter()
                .map(|&cidx| {
                    subs.iter()
                        .enumerate()
                        .map(|(fidx, sub)| {
                            let primal = state.aggregated_primal(fidx);
                            sub.read().unwrap().evaluate_constraint(&primal, cidx)
                        })
                        .sum::<Real>()
                })
                .enumerate()
                .filter_map(|(i, sg)| if sg < 1e-3 { Some(i) } else { None })
                .collect::<Vec<_>>();

            let mut inactive = inactive_constraints.write().unwrap();
            let mut active = active_constraints.write().unwrap();
            active.extend(newconstraints.into_iter().rev().map(|i| inactive.swap_remove(i)));

            // *** The following code needs `drain_filter`, which is experimental as
            // of rust 1.36 ***

            // active
            //     .extend(inactive.drain_filter(|&mut cidx| {
            //         subs.iter()
            //             .enumerate()
            //             .map(|(fidx, sub)| {
            //                 let primal = state.aggregated_primal(fidx);
            //                 sub.read().unwrap().evaluate_constraint(&primal, cidx)
            //             })
            //             .sum::<Real>() < 1e-3
            //     }));

            let nnew = active.len() - nold;
            if nnew > 0 {
                let actives = active.clone();
                if let Err(err) = tx.add_variables(
                    vec![(0.0, Real::infinity()); nnew],
                    Box::new(move |fidx: usize, primal: &DVector, inds: &[usize]| {
                        let sub = subs[fidx].read().unwrap();
                        Ok(inds
                            .iter()
                            .map(|&i| sub.evaluate_constraint(primal, actives[i]))
                            .collect())
                    }),
                ) {
                    warn!("Error sending problem update: {}", err);
                }
            }
        });

        Ok(())
    }
}