ENH: improve suspend/resume handling of Pstream profiling (#2737)

- allow reporting even when profiling is suspended

- consolidate reporting into profilingPstream itself
  (avoids code scatter).

Example of possible advanced use for timing only one section of
code:

    ====
    // Profile local operations
    profilingPstream::enable();

    ... do something

    // Don't profile elsewhere
    profilingPstream::suspend();
    ====
This commit is contained in:
Mark Olesen
2023-04-05 15:11:18 +02:00
parent 9577a0f6b5
commit 5de59417f8
4 changed files with 460 additions and 416 deletions

View File

@ -26,6 +26,9 @@ License
\*---------------------------------------------------------------------------*/
#include "profilingPstream.H"
#include "List.H"
#include "Tuple2.H"
#include "UPstream.H"
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
@ -37,37 +40,16 @@ Foam::profilingPstream::timingList Foam::profilingPstream::times_(double(0));
Foam::profilingPstream::countList Foam::profilingPstream::counts_(uint64_t(0));
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
Foam::profilingPstream::profilingPstream()
{
enable();
}
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
Foam::profilingPstream::~profilingPstream()
{
disable();
}
// * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * //
void Foam::profilingPstream::enable()
{
if (timer_)
{
timer_->resetCpuTime(); // Not necessarily required ...
}
else
if (!timer_)
{
timer_.reset(new cpuTime);
times_ = double(0);
counts_ = uint64_t(0);
}
suspend_ = false;
}
@ -79,6 +61,13 @@ void Foam::profilingPstream::disable() noexcept
}
void Foam::profilingPstream::reset()
{
times_ = double(0);
counts_ = uint64_t(0);
}
double Foam::profilingPstream::elapsedTime()
{
double total = 0;
@ -91,4 +80,412 @@ double Foam::profilingPstream::elapsedTime()
}
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
namespace Foam
{
// Loop over all values (with striding) and extract the value at given index
template<class Type>
inline static void extractValues
(
UList<Type>& result,
const int index,
const UList<Type>& allValues
)
{
if (result.empty())
{
return;
}
const label numProc = result.size();
const Type* values = allValues.cbegin();
const label stride = allValues.size() / numProc;
if (!values || !stride)
{
result = Type(0);
return;
}
for (label proci = 0; proci < numProc; ++proci, values += stride)
{
result[proci] = values[index];
}
}
// Loop over all values (with striding) and extract combined value
// using the given unary function
template<class Type, class Extract>
inline static void extractValues
(
UList<Type>& result,
const UList<Type>& allValues,
const Extract& extract
)
{
if (result.empty())
{
return;
}
const label numProc = result.size();
const Type* values = allValues.cbegin();
const label stride = allValues.size() / numProc;
if (!values || !stride)
{
result = Type(0);
return;
}
for (label proci = 0; proci < numProc; ++proci, values += stride)
{
result[proci] = extract(values);
}
}
inline static void printTimingDetail(const UList<double>& values)
{
const label numProc = values.size();
if (numProc)
{
Info<< indent << " times " << numProc << '(';
for (label proci = 0; proci < numProc; ++proci)
{
if (proci) Info<< ' ';
Info<< values[proci];
}
Info<< ')' << nl;
}
}
inline static void printTimingDetail(const UList<uint64_t>& values)
{
const label numProc = values.size();
if (numProc)
{
// Output via std::ostream to avoid conversion to Foam::label
// that Ostream performs
auto& os = Info.stdStream();
Info<< indent << " counts " << numProc << '(';
for (label proci = 0; proci < numProc; ++proci)
{
if (proci) os << ' ';
os << values[proci];
}
Info<< ')' << nl;
}
}
} // End namespace Foam
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
void Foam::profilingPstream::report(const int reportLevel)
{
const label numProc = (UPstream::parRun() ? UPstream::nProcs() : 1);
if (numProc < 2)
{
return;
}
// Use mpiGather on all values and perform the combinations
// and statistics locally. This reduces the overall number of MPI
// calls. For detailed output we need this information anyhow.
// NB: profilingPstream uses a FixedList for timings(), counts()
// so sizes are guaranteed to be consistent and identical everywhere.
List<double> allTimes;
List<uint64_t> allCounts;
// Avoid disturbing any information
const bool oldSuspend = suspend();
{
// The timings
const auto& procValues = times_;
if (UPstream::master())
{
allTimes.resize(numProc * procValues.size());
}
UPstream::mpiGather
(
procValues.cdata_bytes(), // Send
procValues.size_bytes(), // Num send per proc
allTimes.data_bytes(), // Recv
procValues.size_bytes(), // Num recv per proc
UPstream::commWorld()
);
}
if (reportLevel > 1)
{
// The counts
const auto& procValues = counts_;
if (UPstream::master())
{
allCounts.resize(numProc * procValues.size());
}
UPstream::mpiGather
(
procValues.cdata_bytes(), // Send
procValues.size_bytes(), // Num send per proc
allCounts.data_bytes(), // Recv
procValues.size_bytes(), // Num recv per proc
UPstream::commWorld()
);
}
// Resume if not previously suspended
if (!oldSuspend)
{
resume();
}
// (Time, Processor) for each of: min/max/sum(avg)
typedef FixedList<Tuple2<double, int>, 3> statData;
// Extract min/max/average
auto calcStats = [](const UList<double>& data) -> statData
{
statData stats;
stats = Tuple2<double, int>((data.empty() ? 0 : data[0]), 0);
const label np = data.size();
for (label proci = 1; proci < np; ++proci)
{
Tuple2<double, int> tup(data[proci], proci);
// 0: min, 1: max, 2: total(avg)
if (stats[0].first() > tup.first()) stats[0] = tup;
if (stats[1].first() < tup.first()) stats[1] = tup;
stats[2].first() += tup.first();
}
// From total -> average value
if (np) { stats[2].first() /= np; }
return stats;
};
const auto printTimingStats =
[&](Ostream& os, const char* tag, const statData& stats)
{
os << indent << tag << ": avg = " << stats[2].first()
<< ", min = " << stats[0].first()
<< " (proc " << stats[0].second() << ')'
<< ", max = " << stats[1].first()
<< " (proc " << stats[1].second() << ')'
<< nl;
};
if (UPstream::master())
{
Info<< "profiling(parallel):" << nl
<< incrIndent;
statData stats;
List<double> extractedTimes(numProc);
List<uint64_t> extractedCounts;
if (reportLevel > 1)
{
extractedCounts.resize(numProc);
}
// Total times
{
extractValues
(
extractedTimes,
allTimes,
[=](const double values[])
{
double total = 0;
for (unsigned i = 0; i < timingType::nCategories; ++i)
{
total += values[i];
}
return total;
}
);
stats = calcStats(extractedTimes);
printTimingStats(Info(), "total ", stats);
if (reportLevel > 0) printTimingDetail(extractedTimes);
}
// all-all
{
const int index = int(timingType::ALL_TO_ALL);
extractValues(extractedTimes, index, allTimes);
extractValues(extractedCounts, index, allCounts);
stats = calcStats(extractedTimes);
printTimingStats(Info(), "all-all ", stats);
if (reportLevel > 0) printTimingDetail(extractedTimes);
if (reportLevel > 1) printTimingDetail(extractedCounts);
}
// broadcast
{
const int index = int(timingType::BROADCAST);
extractValues(extractedTimes, index, allTimes);
extractValues(extractedCounts, index, allCounts);
stats = calcStats(extractedTimes);
printTimingStats(Info(), "broadcast ", stats);
if (reportLevel > 0) printTimingDetail(extractedTimes);
if (reportLevel > 1) printTimingDetail(extractedCounts);
}
// probe
{
const int index = int(timingType::PROBE);
extractValues(extractedTimes, index, allTimes);
extractValues(extractedCounts, index, allCounts);
stats = calcStats(extractedTimes);
printTimingStats(Info(), "probe ", stats);
if (reportLevel > 0) printTimingDetail(extractedTimes);
if (reportLevel > 1) printTimingDetail(extractedCounts);
}
// Reduce/scatter times
{
// const int index = int(timingType::REDUCE);
extractValues
(
extractedTimes,
allTimes,
[=](const double values[])
{
return
(
values[timingType::REDUCE]
+ values[timingType::GATHER]
+ values[timingType::SCATTER]
);
}
);
extractValues
(
extractedCounts,
allCounts,
[=](const uint64_t values[])
{
return
(
values[timingType::REDUCE]
+ values[timingType::GATHER]
+ values[timingType::SCATTER]
);
}
);
stats = calcStats(extractedTimes);
printTimingStats(Info(), "reduce ", stats);
if (reportLevel > 0) printTimingDetail(extractedTimes);
if (reportLevel > 1) printTimingDetail(extractedCounts);
}
// Recv/send times
#if 0 // FUTURE?
{
// const int index = int(timingType::RECV);
extractValues
(
extractedTimes,
allTimes,
[=](const double values[])
{
return
(
values[timingType::RECV]
+ values[timingType::SEND]
);
}
);
extractValues
(
extractedCounts,
allCounts,
[=](const uint64_t values[])
{
return
(
values[timingType::RECV]
+ values[timingType::SEND]
);
}
);
stats = calcStats(extractedTimes);
printTimingStats(Info(), "send/recv ", stats);
if (reportLevel > 0) printTimingDetail(extractedTimes);
if (reportLevel > 1) printTimingDetail(extractedCounts);
}
#endif
// request
{
const int index = int(timingType::REQUEST);
extractValues(extractedTimes, index, allTimes);
extractValues(extractedCounts, index, allCounts);
stats = calcStats(extractedTimes);
printTimingStats(Info(), "request ", stats);
if (reportLevel > 0) printTimingDetail(extractedTimes);
if (reportLevel > 1) printTimingDetail(extractedCounts);
}
// wait
{
const int index = int(timingType::WAIT);
extractValues(extractedTimes, index, allTimes);
extractValues(extractedCounts, index, allCounts);
stats = calcStats(extractedTimes);
printTimingStats(Info(), "wait ", stats);
if (reportLevel > 0) printTimingDetail(extractedTimes);
if (reportLevel > 1) printTimingDetail(extractedCounts);
}
Info<< decrIndent;
}
}
// ************************************************************************* //

View File

@ -60,16 +60,16 @@ public:
//- The enumerated timing categories (for times and counts arrays)
enum timingType : unsigned
{
BROADCAST = 0,
REDUCE,
ALL_TO_ALL = 0,
BROADCAST,
PROBE,
REDUCE,
GATHER, // gather (or recv)
SCATTER, // scatter (or send)
REQUEST,
WAIT,
GATHER,
SCATTER,
ALL_TO_ALL,
OTHER,
nCategories // Dimensioning size
nCategories // Dimensioning size
};
//- Fixed-size container for timing values
@ -98,28 +98,29 @@ private:
public:
// Constructors
// Static Member Functions
//- Default construct, enables global timer
profilingPstream();
// Management
//- True if timer is active (ie, enabled and not suspended)
static bool active() noexcept { return !suspend_ && timer_; }
//- Destructor, disables global timer
~profilingPstream();
// Member Functions
//- Create timer for measuring communication, or reset existing
//- Create timer for measuring communication or un-suspend existing
static void enable();
//- Remove timer for measuring communication activity
//- Remove timer for measuring communication activity.
//- Does not affect times/counts.
static void disable() noexcept;
//- Suspend use of timer (if active)
static void suspend() noexcept
//- Reset times/counts. Does not affect the timer itself
static void reset();
//- Suspend use of timer. Return old status
static bool suspend() noexcept
{
bool old(suspend_);
suspend_ = bool(timer_);
return old;
}
//- Resume use of timer (if previously active)
@ -128,14 +129,8 @@ public:
suspend_ = false;
}
//- Timer is active (not suspended and enabled)
static bool active() noexcept
{
return !suspend_ && bool(timer_);
}
//- The total of times
static double elapsedTime();
// Timing/Counts
//- Access to the timing information
static timingList& times() noexcept { return times_; }
@ -155,10 +150,13 @@ public:
return counts_[idx];
}
//- The total of times
static double elapsedTime();
//- Update timer prior to measurement
static void beginTiming()
{
if (active())
if (!suspend_ && timer_)
{
(void) timer_->cpuTimeIncrement();
}
@ -167,7 +165,7 @@ public:
//- Add time increment
static void addTime(const timingType idx)
{
if (active())
if (!suspend_ && timer_)
{
times_[idx] += timer_->cpuTimeIncrement();
++counts_[idx];
@ -227,6 +225,12 @@ public:
{
addTime(timingType::OTHER);
}
// Output
//- Report current information. Uses parallel communication!
static void report(const int reportLevel = 0);
};

View File

@ -26,12 +26,9 @@ License
\*---------------------------------------------------------------------------*/
#include "parProfiling.H"
#include "addToRunTimeSelectionTable.H"
#include "Pstream.H"
#include "PstreamReduceOps.H"
#include "profilingPstream.H"
#include "Tuple2.H"
#include "FixedList.H"
#include "Pstream.H"
#include "addToRunTimeSelectionTable.H"
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
@ -62,9 +59,9 @@ Foam::functionObjects::parProfiling::parProfiling
)
:
functionObject(name),
detailLevel_(0)
reportLevel_(0)
{
dict.readIfPresent("detail", detailLevel_);
dict.readIfPresent("detail", reportLevel_);
profilingPstream::enable();
}
@ -77,368 +74,14 @@ Foam::functionObjects::parProfiling::~parProfiling()
}
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
namespace Foam
{
// Loop over all values (with striding) and extract the value at given index
template<class Type>
inline static void extractValues
(
UList<Type>& result,
const int index,
const UList<Type>& allValues
)
{
if (result.empty())
{
return;
}
const label numProc = result.size();
const Type* values = allValues.cbegin();
const label stride = allValues.size() / numProc;
if (!values || !stride)
{
result = Type(0);
return;
}
for (label proci = 0; proci < numProc; ++proci, values += stride)
{
result[proci] = values[index];
}
}
// Loop over all values (with striding) and extract combined value
// using the given unary function
template<class Type, class Extract>
inline static void extractValues
(
UList<Type>& result,
const UList<Type>& allValues,
const Extract& extract
)
{
if (result.empty())
{
return;
}
const label numProc = result.size();
const Type* values = allValues.cbegin();
const label stride = allValues.size() / numProc;
if (!values || !stride)
{
result = Type(0);
return;
}
for (label proci = 0; proci < numProc; ++proci, values += stride)
{
result[proci] = extract(values);
}
}
inline static void printTimingDetail(const UList<double>& values)
{
const label numProc = values.size();
if (numProc)
{
Info<< indent << " times " << numProc << '(';
for (label proci = 0; proci < numProc; ++proci)
{
if (proci) Info<< ' ';
Info<< values[proci];
}
Info<< ')' << nl;
}
}
inline static void printTimingDetail(const UList<uint64_t>& values)
{
const label numProc = values.size();
if (numProc)
{
// Output via std::ostream to avoid conversion to Foam::label
// that Ostream performs
auto& os = Info.stdStream();
Info<< indent << " counts " << numProc << '(';
for (label proci = 0; proci < numProc; ++proci)
{
if (proci) os << ' ';
os << values[proci];
}
Info<< ')' << nl;
}
}
} // End namespace Foam
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
void Foam::functionObjects::parProfiling::report()
{
const label numProc = (UPstream::parRun() ? UPstream::nProcs() : 1);
if (!profilingPstream::active() || numProc < 2)
if (UPstream::parRun() && UPstream::nProcs() > 1)
{
return;
}
// Use mpiGather on all values and perform the combinations
// and statistics locally. This reduces the overall number of MPI
// calls. For detailed output we need this information anyhow.
// NB: profilingPstream uses a FixedList for timings(), counts()
// so the sizes are guaranteed to be consistent and identical
// everywhere.
List<double> allTimes;
List<uint64_t> allCounts;
// Avoid disturbing the counts
profilingPstream::suspend();
{
// The timings
const auto& procTimes = profilingPstream::times();
if (Pstream::master())
{
allTimes.resize(numProc * procTimes.size());
}
UPstream::mpiGather
(
procTimes.cdata_bytes(), // Send
procTimes.size_bytes(), // Num send per proc
allTimes.data_bytes(), // Recv
procTimes.size_bytes(), // Num recv per proc
UPstream::commWorld()
);
}
if (detailLevel_ > 1)
{
// The counts
const auto& procCounts = profilingPstream::counts();
if (Pstream::master())
{
allCounts.resize(numProc * procCounts.size());
}
UPstream::mpiGather
(
procCounts.cdata_bytes(), // Send
procCounts.size_bytes(), // Num send per proc
allCounts.data_bytes(), // Recv
procCounts.size_bytes(), // Num recv per proc
UPstream::commWorld()
);
}
profilingPstream::resume();
// (Time, Processor) for each of: min/max/sum(avg)
typedef FixedList<Tuple2<double, int>, 3> statData;
// Extract min/max/average
auto calcStats = [](const UList<double>& data) -> statData
{
statData stats;
stats = Tuple2<double, int>((data.empty() ? 0 : data[0]), 0);
const label np = data.size();
for (label proci = 1; proci < np; ++proci)
{
Tuple2<double, int> tup(data[proci], proci);
// 0: min, 1: max, 2: total(avg)
if (stats[0].first() > tup.first()) stats[0] = tup;
if (stats[1].first() < tup.first()) stats[1] = tup;
stats[2].first() += tup.first();
}
// From total -> average value
if (np) { stats[2].first() /= np; }
return stats;
};
const auto printTimingStats =
[&](Ostream& os, const char* tag, const statData& stats)
{
os << indent << tag << ": avg = " << stats[2].first()
<< ", min = " << stats[0].first()
<< " (proc " << stats[0].second() << ')'
<< ", max = " << stats[1].first()
<< " (proc " << stats[1].second() << ')'
<< nl;
};
if (Pstream::master())
{
statData stats;
List<double> extractedTimes(numProc);
List<uint64_t> extractedCounts;
if (detailLevel_ > 1)
{
extractedCounts.resize(numProc);
}
Info<< type() << ':' << nl
<< incrIndent;
// Total times
{
extractValues
(
extractedTimes,
allTimes,
[=](const double values[])
{
double total = 0;
for (unsigned i = 0; i < profilingPstream::nCategories; ++i)
{
total += values[i];
}
return total;
}
);
stats = calcStats(extractedTimes);
printTimingStats(Info(), "total ", stats);
if (detailLevel_ > 0) printTimingDetail(extractedTimes);
}
// all-all
{
const int index = int(profilingPstream::ALL_TO_ALL);
extractValues(extractedTimes, index, allTimes);
extractValues(extractedCounts, index, allCounts);
stats = calcStats(extractedTimes);
printTimingStats(Info(), "all-all ", stats);
if (detailLevel_ > 0) printTimingDetail(extractedTimes);
if (detailLevel_ > 1) printTimingDetail(extractedCounts);
}
// broadcast
{
const int index = int(profilingPstream::BROADCAST);
extractValues(extractedTimes, index, allTimes);
extractValues(extractedCounts, index, allCounts);
stats = calcStats(extractedTimes);
printTimingStats(Info(), "broadcast ", stats);
if (detailLevel_ > 0) printTimingDetail(extractedTimes);
if (detailLevel_ > 1) printTimingDetail(extractedCounts);
}
// probe
{
const int index = int(profilingPstream::PROBE);
extractValues(extractedTimes, index, allTimes);
extractValues(extractedCounts, index, allCounts);
stats = calcStats(extractedTimes);
printTimingStats(Info(), "probe ", stats);
if (detailLevel_ > 0) printTimingDetail(extractedTimes);
if (detailLevel_ > 1) printTimingDetail(extractedCounts);
}
// Reduce/scatter times
{
// const int index = int(profilingPstream::REDUCE);
extractValues
(
extractedTimes,
allTimes,
[=](const double values[])
{
return
(
values[profilingPstream::REDUCE]
+ values[profilingPstream::GATHER]
+ values[profilingPstream::SCATTER]
);
}
);
extractValues
(
extractedCounts,
allCounts,
[=](const uint64_t values[])
{
return
(
values[profilingPstream::REDUCE]
+ values[profilingPstream::GATHER]
+ values[profilingPstream::SCATTER]
);
}
);
stats = calcStats(extractedTimes);
printTimingStats(Info(), "reduce ", stats);
if (detailLevel_ > 0) printTimingDetail(extractedTimes);
if (detailLevel_ > 1) printTimingDetail(extractedCounts);
}
// request
{
const int index = int(profilingPstream::REQUEST);
extractValues(extractedTimes, index, allTimes);
extractValues(extractedCounts, index, allCounts);
stats = calcStats(extractedTimes);
printTimingStats(Info(), "request ", stats);
if (detailLevel_ > 0) printTimingDetail(extractedTimes);
if (detailLevel_ > 1) printTimingDetail(extractedCounts);
}
// wait
{
const int index = int(profilingPstream::WAIT);
extractValues(extractedTimes, index, allTimes);
extractValues(extractedCounts, index, allCounts);
stats = calcStats(extractedTimes);
printTimingStats(Info(), "wait ", stats);
if (detailLevel_ > 0) printTimingDetail(extractedTimes);
if (detailLevel_ > 1) printTimingDetail(extractedCounts);
}
Info<< decrIndent;
Info<< nl;
profilingPstream::report(reportLevel_);
}
}

View File

@ -74,9 +74,9 @@ class parProfiling
{
// Private Data
//- The level of detail
//- The reporting level
// 0: summary, 1: per-proc times, 2: per-proc times/counts
int detailLevel_;
int reportLevel_;
public: