ENH: parallel runTimeModifiable - master only

This commit is contained in:
mattijs
2010-11-16 12:41:44 +00:00
parent 57a443a183
commit b5dddd8980
13 changed files with 700 additions and 275 deletions

View File

@ -32,17 +32,17 @@ Class
#include "PackedList.H"
#include "PstreamReduceOps.H"
#include "OSspecific.H"
#include "regIOobject.H" // for fileModificationSkew symbol
#ifdef FOAM_USE_STAT
# include "OSspecific.H"
# include "regIOobject.H" // for fileModificationSkew symbol
#else
#ifdef FOAM_USE_INOTIFY
# include <sys/inotify.h>
# include <sys/ioctl.h>
# include <errno.h>
# define EVENT_SIZE ( sizeof (struct inotify_event) )
# define EVENT_LEN (EVENT_SIZE + 16)
# define EVENT_BUF_LEN ( 1024 * EVENT_LEN )
#else
# include "OSspecific.H"
#endif
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
@ -111,78 +111,77 @@ namespace Foam
{
public:
#ifdef FOAM_USE_STAT
//- From watch descriptor to modified time
DynamicList<time_t> lastMod_;
const bool useInotify_;
//- initialize HashTable size
inline fileMonitorWatcher(const label sz = 20)
:
lastMod_(sz)
{}
// For inotify
inline bool addWatch(const label watchFd, const fileName& fName)
{
if (watchFd < lastMod_.size() && lastMod_[watchFd] != 0)
{
// Reuse of watchFd : should have lastMod set to 0.
FatalErrorIn("addWatch(const label, const fileName&)")
<< "Problem adding watch " << watchFd
<< " to file " << fName
<< abort(FatalError);
}
//- File descriptor for the inotify instance
int inotifyFd_;
lastMod_(watchFd) = lastModified(fName);
return true;
}
//- Current watchIDs and corresponding directory id
DynamicList<label> dirWatches_;
DynamicList<fileName> dirFiles_;
inline bool removeWatch(const label watchFd)
{
lastMod_[watchFd] = 0;
return true;
}
// For stat
//- From watch descriptor to modified time
DynamicList<time_t> lastMod_;
#else
//- File descriptor for the inotify instance
int inotifyFd_;
//- Current watchIDs and corresponding directory id
DynamicList<label> dirWatches_;
DynamicList<fileName> dirFiles_;
//- initialise inotify
inline fileMonitorWatcher(const label sz = 20)
inline fileMonitorWatcher(const bool useInotify, const label sz = 20)
:
inotifyFd_(inotify_init()),
dirWatches_(sz),
dirFiles_(sz)
useInotify_(useInotify)
{
if (inotifyFd_ < 0)
if (useInotify_)
{
static bool hasWarned = false;
if (!hasWarned)
#ifdef FOAM_USE_INOTIFY
inotifyFd_ = inotify_init();
dirWatches_.setCapacity(sz);
dirFiles_.setCapacity(sz);
if (inotifyFd_ < 0)
{
hasWarned = true;
WarningIn("fileMonitorWatcher(const label)")
<< "Failed allocating an inotify descriptor : "
<< string(strerror(errno)) << endl
<< " Please increase the number of allowable "
<< "inotify instances" << endl
<< " (/proc/sys/fs/inotify/max_user_instances"
<< " on Linux)" << endl
<< " , switch off runTimeModifiable." << endl
<< " or compile this file with FOAM_USE_STAT to use"
<< " time stamps instead of inotify." << endl
<< " Continuing without additional file monitoring."
<< endl;
static bool hasWarned = false;
if (!hasWarned)
{
hasWarned = true;
WarningIn("fileMonitorWatcher(const bool, const label)")
<< "Failed allocating an inotify descriptor : "
<< string(strerror(errno)) << endl
<< " Please increase the number of allowable "
<< "inotify instances" << endl
<< " (/proc/sys/fs/inotify/max_user_instances"
<< " on Linux)" << endl
<< " , switch off runTimeModifiable." << endl
<< " or compile this file without "
<< "FOAM_USE_INOTIFY"
<< " to use time stamps instead of inotify." << endl
<< " Continuing without additional file"
<< " monitoring."
<< endl;
}
}
#else
FatalErrorIn("fileMonitorWatcher(const bool, const label)")
<< "You selected inotify but this file was compiled"
<< " without FOAM_USE_INOTIFY"
<< "Please select another fileModification test method"
<< exit(FatalError);
#endif
}
else
{
lastMod_.setCapacity(sz);
}
}
//- remove all watches
inline ~fileMonitorWatcher()
{
if (inotifyFd_ >= 0)
#ifdef FOAM_USE_INOTIFY
if (useInotify_ && inotifyFd_ >= 0)
{
forAll(dirWatches_, i)
{
@ -197,57 +196,92 @@ namespace Foam
}
}
}
#endif
}
inline bool addWatch(const label watchFd, const fileName& fName)
{
if (inotifyFd_ < 0)
if (useInotify_)
{
return false;
if (inotifyFd_ < 0)
{
return false;
}
#ifdef FOAM_USE_INOTIFY
// Add/retrieve watch on directory containing file.
// Note that fName might be non-existing in special situations
// (master-only reading for IODictionaries)
const fileName dir = fName.path();
label dirWatchID = -1;
if (isDir(dir))
{
dirWatchID = inotify_add_watch
(
inotifyFd_,
dir.c_str(),
IN_CLOSE_WRITE
);
if (dirWatchID < 0)
{
FatalErrorIn("addWatch(const label, const fileName&)")
<< "Failed adding watch " << watchFd
<< " to directory " << fName << " due to "
<< string(strerror(errno))
<< exit(FatalError);
}
}
if (watchFd < dirWatches_.size() && dirWatches_[watchFd] != -1)
{
// Reuse of watchFd : should have dir watchID set to -1.
FatalErrorIn("addWatch(const label, const fileName&)")
<< "Problem adding watch " << watchFd
<< " to file " << fName
<< abort(FatalError);
}
dirWatches_(watchFd) = dirWatchID;
dirFiles_(watchFd) = fName.name();
#endif
}
else
{
if (watchFd < lastMod_.size() && lastMod_[watchFd] != 0)
{
// Reuse of watchFd : should have lastMod set to 0.
FatalErrorIn("addWatch(const label, const fileName&)")
<< "Problem adding watch " << watchFd
<< " to file " << fName
<< abort(FatalError);
}
lastMod_(watchFd) = lastModified(fName);
}
// Add/retrieve watch on directory containing file
label dirWatchID = inotify_add_watch
(
inotifyFd_,
fName.path().c_str(),
IN_CLOSE_WRITE
);
if (dirWatchID < 0)
{
FatalErrorIn("addWatch(const label, const fileName&)")
<< "Failed adding watch " << watchFd
<< " to directory " << fName << " due to "
<< string(strerror(errno))
<< exit(FatalError);
}
if (watchFd < dirWatches_.size() && dirWatches_[watchFd] != -1)
{
// Reuse of watchFd : should have dir watchID set to -1.
FatalErrorIn("addWatch(const label, const fileName&)")
<< "Problem adding watch " << watchFd
<< " to file " << fName
<< abort(FatalError);
}
dirWatches_(watchFd) = dirWatchID;
dirFiles_(watchFd) = fName.name();
return true;
}
inline bool removeWatch(const label watchFd)
{
if (inotifyFd_ < 0)
if (useInotify_)
{
return false;
}
if (inotifyFd_ < 0)
{
return false;
}
dirWatches_[watchFd] = -1;
dirWatches_[watchFd] = -1;
}
else
{
lastMod_[watchFd] = 0;
}
return true;
}
#endif
};
//! @endcond
@ -258,131 +292,146 @@ namespace Foam
void Foam::fileMonitor::checkFiles() const
{
#ifdef FOAM_USE_STAT
forAll(watcher_->lastMod_, watchFd)
if (useInotify_)
{
time_t oldTime = watcher_->lastMod_[watchFd];
#ifdef FOAM_USE_INOTIFY
// Large buffer for lots of events
char buffer[EVENT_BUF_LEN];
if (oldTime != 0)
while (true)
{
const fileName& fName = watchFile_[watchFd];
time_t newTime = lastModified(fName);
struct timeval zeroTimeout = {0, 0};
if (newTime == 0)
//- Pre-allocated structure containing file descriptors
fd_set fdSet;
// Add notify descriptor to select fd_set
FD_ZERO(&fdSet);
FD_SET(watcher_->inotifyFd_, &fdSet);
int ready = select
(
watcher_->inotifyFd_+1, // num filedescriptors in fdSet
&fdSet, // fdSet with only inotifyFd
NULL, // No writefds
NULL, // No errorfds
&zeroTimeout // eNo timeout
);
if (ready < 0)
{
state_[watchFd] = DELETED;
FatalErrorIn("fileMonitor::updateStates()")
<< "Problem in issuing select."
<< abort(FatalError);
}
else if (FD_ISSET(watcher_->inotifyFd_, &fdSet))
{
// Read events
ssize_t nBytes = read
(
watcher_->inotifyFd_,
buffer,
EVENT_BUF_LEN
);
if (nBytes < 0)
{
FatalErrorIn("fileMonitor::updateStates(const fileName&)")
<< "read of " << watcher_->inotifyFd_
<< " failed with " << label(nBytes)
<< abort(FatalError);
}
// Go through buffer, consuming events
int i = 0;
while (i < nBytes)
{
const struct inotify_event* inotifyEvent =
reinterpret_cast<const struct inotify_event*>
(
&buffer[i]
);
//Pout<< "watchFd:" << inotifyEvent->wd << nl
// << "mask:" << inotifyEvent->mask << nl
// << endl;
//Pout<< "file:" << fileName(inotifyEvent->name) << endl;
//Pout<< "len:" << inotifyEvent->len << endl;
if
(
(inotifyEvent->mask & IN_CLOSE_WRITE)
&& inotifyEvent->len
)
{
// Search for file
forAll(watcher_->dirWatches_, i)
{
label id = watcher_->dirWatches_[i];
if
(
id == inotifyEvent->wd
&& inotifyEvent->name == watcher_->dirFiles_[i]
)
{
// Correct directory and name
state_[i] = MODIFIED;
}
}
}
i += EVENT_SIZE + inotifyEvent->len;
}
}
else
{
if (newTime > (oldTime + regIOobject::fileModificationSkew))
// No data
return;
}
}
#endif
}
else
{
forAll(watcher_->lastMod_, watchFd)
{
time_t oldTime = watcher_->lastMod_[watchFd];
if (oldTime != 0)
{
const fileName& fName = watchFile_[watchFd];
time_t newTime = lastModified(fName);
if (newTime == 0)
{
watcher_->lastMod_[watchFd] = newTime;
state_[watchFd] = MODIFIED;
state_[watchFd] = DELETED;
}
else
{
state_[watchFd] = UNMODIFIED;
}
}
}
}
#else
// Large buffer for lots of events
char buffer[EVENT_BUF_LEN];
while (true)
{
struct timeval zeroTimeout = {0, 0};
//- Pre-allocated structure containing file descriptors
fd_set fdSet;
// Add notify descriptor to select fd_set
FD_ZERO(&fdSet);
FD_SET(watcher_->inotifyFd_, &fdSet);
int ready = select
(
watcher_->inotifyFd_+1, // num filedescriptors in fdSet
&fdSet, // fdSet with only inotifyFd
NULL, // No writefds
NULL, // No errorfds
&zeroTimeout // eNo timeout
);
if (ready < 0)
{
FatalErrorIn("fileMonitor::updateStates()")
<< "Problem in issuing select."
<< abort(FatalError);
}
else if (FD_ISSET(watcher_->inotifyFd_, &fdSet))
{
// Read events
ssize_t nBytes = read(watcher_->inotifyFd_, buffer, EVENT_BUF_LEN);
if (nBytes < 0)
{
FatalErrorIn("fileMonitor::updateStates(const fileName&)")
<< "read of " << watcher_->inotifyFd_
<< " failed with " << label(nBytes)
<< abort(FatalError);
}
// Go through buffer, consuming events
int i = 0;
while (i < nBytes)
{
const struct inotify_event* inotifyEvent =
reinterpret_cast<const struct inotify_event*>
(
&buffer[i]
);
//Pout<< "watchFd:" << inotifyEvent->wd << nl
// << "mask:" << inotifyEvent->mask << nl
// << endl;
//Pout<< "file:" << fileName(inotifyEvent->name) << endl;
//Pout<< "len:" << inotifyEvent->len << endl;
if ((inotifyEvent->mask & IN_CLOSE_WRITE) && inotifyEvent->len)
{
// Search for file
forAll(watcher_->dirWatches_, i)
if (newTime > (oldTime + regIOobject::fileModificationSkew))
{
label id = watcher_->dirWatches_[i];
if
(
id == inotifyEvent->wd
&& inotifyEvent->name == watcher_->dirFiles_[i]
)
{
// Correct directory and name
state_[i] = MODIFIED;
}
watcher_->lastMod_[watchFd] = newTime;
state_[watchFd] = MODIFIED;
}
else
{
state_[watchFd] = UNMODIFIED;
}
}
i += EVENT_SIZE + inotifyEvent->len;
}
}
else
{
// No data
return;
}
}
#endif
}
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
Foam::fileMonitor::fileMonitor()
Foam::fileMonitor::fileMonitor(const bool useInotify)
:
useInotify_(useInotify),
state_(20),
watchFile_(20),
freeWatchFds_(2),
watcher_(new fileMonitorWatcher(20))
watcher_(new fileMonitorWatcher(useInotify_, 20))
{}
@ -394,6 +443,8 @@ Foam::fileMonitor::~fileMonitor()
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
// Note: fName might not exist (on slaves if in master-only mode for
// regIOobject)
Foam::label Foam::fileMonitor::addWatch(const fileName& fName)
{
label watchFd;
@ -458,50 +509,97 @@ const
}
void Foam::fileMonitor::updateStates(const bool syncPar) const
void Foam::fileMonitor::updateStates
(
const bool masterOnly,
const bool syncPar
) const
{
checkFiles();
if (Pstream::master() || !masterOnly)
{
checkFiles();
}
if (syncPar)
{
PackedList<2> stats(state_.size());
forAll(state_, watchFd)
// Pack current state (might be on master only)
PackedList<2> stats(state_.size(), MODIFIED);
if (Pstream::master() || !masterOnly)
{
stats[watchFd] = static_cast<unsigned int>(state_[watchFd]);
forAll(state_, watchFd)
{
stats[watchFd] = static_cast<unsigned int>(state_[watchFd]);
}
}
// Save local state for warning message below
PackedList<2> thisProcStats(stats);
if (stats.storage().size() == 1)
// Save local state for warning message below
PackedList<2> thisProcStats;
if (!masterOnly)
{
// Optimisation valid for most cases.
reduce(stats.storage()[0], reduceFileStates());
thisProcStats = stats;
}
// Scatter or reduce to synchronise state
if (masterOnly)
{
// Scatter
if (stats.storage().size() == 1)
{
Pstream::scatter(stats.storage()[0]);
}
else
{
Pstream::listCombineScatter(stats.storage());
}
}
else
{
Pstream::listCombineGather
(
stats.storage(),
combineReduceFileStates()
);
// Reduce
if (stats.storage().size() == 1)
{
// Optimisation valid for most cases.
reduce(stats.storage()[0], reduceFileStates());
}
else
{
Pstream::listCombineGather
(
stats.storage(),
combineReduceFileStates()
);
}
}
// Update local state
forAll(state_, watchFd)
{
if (thisProcStats[watchFd] != UNMODIFIED)
if (masterOnly)
{
if (stats[watchFd] == UNMODIFIED)
// No need to check for inconsistent state. Just assign.
unsigned int stat = stats[watchFd];
state_[watchFd] = fileState(stat);
}
else
{
// Check for inconsistent state before assigning.
if (thisProcStats[watchFd] != UNMODIFIED)
{
WarningIn("fileMonitor::updateStates(const bool) const")
<< "Delaying reading " << watchFile_[watchFd]
<< " due to inconsistent "
"file time-stamps between processors"
<< endl;
}
else
{
unsigned int stat = stats[watchFd];
state_[watchFd] = fileState(stat);
if (stats[watchFd] == UNMODIFIED)
{
WarningIn("fileMonitor::updateStates(const bool) const")
<< "Delaying reading " << watchFile_[watchFd]
<< " due to inconsistent "
"file time-stamps between processors"
<< endl;
}
else
{
unsigned int stat = stats[watchFd];
state_[watchFd] = fileState(stat);
}
}
}
}
@ -511,10 +609,12 @@ void Foam::fileMonitor::updateStates(const bool syncPar) const
void Foam::fileMonitor::setUnmodified(const label watchFd)
{
#ifdef FOAM_USE_STAT
watcher_->lastMod_[watchFd] = lastModified(watchFile_[watchFd]);
#endif
state_[watchFd] = UNMODIFIED;
if (!useInotify_)
{
watcher_->lastMod_[watchFd] = lastModified(watchFile_[watchFd]);
}
}