#include #include #include #include #include #include #include #include #include "fdc_int.h" #include "variables.h" #include "values.h" #include "store.h" #include "splitting.h" #include "bound.h" #ifndef COMPACT_DOMAINS #error "only works with COMPACT_DOMAINS" #endif #if !defined(PACK_PROBLEM) || !defined(USE_STORE) #error "must PACK_PROBLEM and USE_STORE" #endif /* sizes are taken from the MPI-2 standard */ #if UNIT_BITS == 64 /* some MPI implementations, such as New Madeleine's, don't have MPI_UNSIGNED_LONG_LONG */ # ifndef MPI_UNSIGNED_LONG_LONG # ifndef MPI_LONG_LONG # warning "neither MPI_UNSIGNED_LONG_LONG nor MPI_LONG_LONG are defined" # endif # define MPI_DOMAIN_TYPE MPI_LONG_LONG # else # define MPI_DOMAIN_TYPE MPI_UNSIGNED_LONG_LONG # endif #elif UNIT_BITS == 32 # define MPI_DOMAIN_TYPE MPI_UNSIGNED_LONG #else # error "don't know which MPI type to use for MPI_DOMAIN_TYPE" #endif #define MAX_AGENTS 256 // number of workers to use int fd__workers = -1; // events within the main loop #define EV_INT_SUCCESS 0x001 // an agent found a solution #define EV_INT_FAIL 0x002 // an agent exhausted its search space #define EV_EXT_SUCCESS 0x010 // another process found a solution #define EV_EXT_REQ_WORK 0x020 // another process exhausted its search space #define EV_EXT_NO_WORK 0x040 // there is no work to give to another process #define EV_EXT_STORE 0x080 // this process received more work #define EV_EXT_FAIL 0x100 // there is no more work available #define EV_EXT_DONE 0x200 // the search has succeeded and should stop #define EV_EXT_QUIT 0x300 // no more work is available for sharing #define EV_EXT_READY 0x600 // process has no work and nothing pending #define EV_EXT_BOUND 0x500 // best value of the objective function #define EV_INT_COUNT 0x004 // solutions count from an agent #define EV_EXT_COUNT 0x400 // solutions count from a process #define EV_EXT_REQ_SHARE 0x030 // a process couldn't find work #define EV_EXT_NO_SHARE 0x050 // a process has no work to share #define EV_INT_TIMEOUT 0x008 // some timeout #define WAIT_NORM 1000000L // pause between event probes (1ms) #define WAIT_TOUT 250000L // pause between event probes on a deadline (0.25ms) #define FEED1_TIMEOUT 20 // ms to wait for the reply to the 1st request for work #define FEED2_TIMEOUT 10 // ms to wait for the reply to the 2nd request for work #define POLL_TIMEOUT 10 // ms to wait for the reply to a poll for work #define millisleep(ms) do { \ struct timespec ts = { 0, ms * 1000000 }; \ nanosleep(&ts, NULL); \ } while (0) #ifdef STATS_MSGS static unsigned long _fd_msgs_received = 0, _fd_msgs_sent = 0; static int _fd_MPI_Recv(void *b, int c, MPI_Datatype d, int s, int t, MPI_Comm w, MPI_Status *x) { ++_fd_msgs_received; return MPI_Recv(b, c, d, s, t, w, x); } #define MPI_Recv _fd_MPI_Recv static int _fd_MPI_Send(void *b, int c, MPI_Datatype d, int s, int t, MPI_Comm w) { ++_fd_msgs_sent; return MPI_Send(b, c, d, s, t, w); } #define MPI_Send _fd_MPI_Send static int _fd_MPI_Isend(void *b, int c, MPI_Datatype d, int s, int t, MPI_Comm w, MPI_Request *r) { ++_fd_msgs_sent; return MPI_Isend(b, c, d, s, t, w, r); } #define MPI_Isend _fd_MPI_Isend static int _fd_MPI_Ssend(void *b, int c, MPI_Datatype d, int s, int t, MPI_Comm w) { ++_fd_msgs_sent; return MPI_Ssend(b, c, d, s, t, w); } #define MPI_Ssend _fd_MPI_Ssend static int _fd_MPI_Issend(void *b, int c, MPI_Datatype d, int s, int t, MPI_Comm w, MPI_Request *r) { ++_fd_msgs_sent; return MPI_Issend(b, c, d, s, t, w, r); } #define MPI_Issend _fd_MPI_Issend void _fd_statistics_msgs() { extern int tid; _fd_output("[%d] messages: %lu sent, %lu received\n", tid, _fd_msgs_sent, _fd_msgs_received); } #else /* STATS_MSGS */ #define _fd_statistics_msgs() ((void) 0) #endif /* STATS_MSGS */ static fd_int *_fd_copies[MAX_AGENTS]; static _fd_store _fd_agents_stores[MAX_AGENTS]; static _fd_store _fd_processes_stores[MAX_AGENTS]; // XXX: MAX_AGENTS!? // only needed by process 0 // used to deliver the id of the agent static int successful; // used to signal the main thread that an agent has found a solution // XXX: could be done through a (shared) variable? static pthread_mutex_t success_mutex = PTHREAD_MUTEX_INITIALIZER; static sem_t ready_semaphore, notify_semaphore; // used when optimising, to release an agent after checking its solution static sem_t resume_semaphore; #if STEAL_WORK >= 2 // where agents will wait for more work static pthread_mutex_t continue_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t continue_cond = PTHREAD_COND_INITIALIZER; #endif // number of agents which will actually run (depends on the splitting) static int agents_to_run; extern unsigned long long _fd_count_solutions(fd_int[], int); // used to accumulate the number of solutions found (when counting solutions) static unsigned long long total_solutions = 0; // record issued MPI requests, so they can be cleaned up before // calling MPI_Finalize // XXX: this list should be checked once in a while, otherwise it will // keep growing as the program runs static fd_list pending_requests; int tid; // team ID // XXX: only used outside _fd_dsolve() for debugging static void _fd_copy_problem(int n) { #ifndef PACK_PROBLEM int i, v; for (i = 0; i < n; ++i) { _fd_copies[i] = calloc(fd_variables_count, sizeof(fd_int)); // XXX: NULL? for (v = 0; v < fd_variables_count; ++v) _fd_copies[i][v] = _fd_var_copy(_fd_variables[v]); _fd_import_constraints(_fd_copies[i]); } #else /* PACK_PROBLEM */ int i; for (i = 0; i < n; ++i) _fd_copies[i] = _fd_variables; // XXX #endif /* PACK_PROBLEM */ } // XXX: turn the variables' domains into ``indexes'' into the store static void _fd_despicable_hack(int n) { store = _fd_agents_stores[n]; } int _fd_agent(int n) { if (!_fd_counting_solutions) { int result; struct timeval ti, tis, to, tos; _fd_debug("[%d.%d] agent starting\n", tid, n); gettimeofday(&ti, NULL); _fd_despicable_hack(n); memcpy(_fd_variables, _fd_copies[n], fd_variables_count * sizeof(fd_int)); // make sure agents release the mutexes they hold when cancelled pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, &success_mutex); #if STEAL_WORK >= 2 pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, &continue_mutex); #endif do { gettimeofday(&tis, NULL); result = _fd_dsearch(_fd_copies[n], n); gettimeofday(&to, NULL); timersub(&to, &tis, &tos); timersub(&to, &ti, &to); _fd_debug("[%d.%d] took %d.%06ds (%d.%06ds)\n", tid, n, tos.tv_sec, tos.tv_usec, to.tv_sec, to.tv_usec); while (result == FD_OK) { _fd_debug("[%d.%d] agent was successful\n", tid, n); // ensure that only one agent signals its success // XXX: only good (?) if only looking for the 1st solution pthread_mutex_lock(&success_mutex); _fd_debug("[%d.%d] acquired success mutex\n", tid, n); // tell the main thread that this agent is done if (sem_wait(&ready_semaphore)) perror("sem_wait (ready)"); successful = n; if (sem_post(¬ify_semaphore)) perror("sem_post"); pthread_mutex_unlock(&success_mutex); _fd_debug("[%d.%d] released success mutex\n", tid, n); if (!_fd_optimising) break; if (sem_wait(&resume_semaphore)) perror("sem_post (resume)"); _fd_debug("[%d.%d] resuming, new bound is %d\n", tid, n, _fd_bound_value()); // find the next solution gettimeofday(&tis, NULL); result = _fd_dsearch_again(_fd_copies[n]); gettimeofday(&to, NULL); timersub(&to, &tis, &tos); timersub(&to, &ti, &to); _fd_debug("[%d.%d] took %d.%06ds (%d.%06ds)\n", tid, n, tos.tv_sec, tos.tv_usec, to.tv_sec, to.tv_usec); } // XXX: only interested in the 1st solution, for now if (!_fd_optimising && result == FD_OK) break; // eventually, the agent will fail to find a solution _fd_debug("[%d.%d] agent was unsuccessful\n", tid, n); // tell the main thread that this agent is done if (sem_wait(&ready_semaphore)) perror("sem_wait (ready)"); successful = -n - 1; // negative agent number means failure #if STEAL_WORK >= 2 pthread_mutex_lock(&continue_mutex); #endif if (sem_post(¬ify_semaphore)) perror("sem_post"); #if STEAL_WORK >= 2 do { // wait for more work _fd_debug("[%d.%d] waiting for more work\n", tid, n); pthread_cond_wait(&continue_cond, &continue_mutex); } while(n >= agents_to_run); if (pthread_mutex_unlock(&continue_mutex)) perror("pthread_mutex_unlock"); #endif } while (STEAL_WORK >= 2); #if STEAL_WORK >= 2 pthread_cleanup_pop(0); #endif pthread_cleanup_pop(0); return result; } else /* _fd_counting_solutions != 0 */ { unsigned long long solutions; struct timeval ti, tis, to, tos; _fd_debug("[%d.%d] agent starting\n", tid, n); gettimeofday(&ti, NULL); _fd_despicable_hack(n); memcpy(_fd_variables, _fd_copies[n], fd_variables_count * sizeof(fd_int)); #if STEAL_WORK >= 2 pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, &continue_mutex); #endif do { gettimeofday(&tis, NULL); solutions = _fd_count_solutions(_fd_copies[n], n); gettimeofday(&to, NULL); timersub(&to, &tis, &tos); timersub(&to, &ti, &to); _fd_debug("[%d.%d] took %d.%06ds (%d.%06ds)\n", tid, n, tos.tv_sec, tos.tv_usec, to.tv_sec, to.tv_usec); _fd_debug("[%d.%d] found %llu solutions\n", tid, n, solutions); // tell the main thread that this agent is done if (sem_wait(&ready_semaphore)) perror("sem_wait (ready)"); successful = n; // and update the solutions' running total total_solutions += solutions; #if STEAL_WORK >= 2 pthread_mutex_lock(&continue_mutex); #endif if (sem_post(¬ify_semaphore)) perror("sem_post"); #if STEAL_WORK >= 2 do { // wait for more work _fd_debug("[%d.%d] waiting for more work\n", tid, n); pthread_cond_wait(&continue_cond, &continue_mutex); } while (n >= agents_to_run); if (pthread_mutex_unlock(&continue_mutex)) perror("pthread_mutex_unlock"); #endif /* STEAL_WORK >= 2 */ } while (STEAL_WORK >= 2); #if STEAL_WORK >= 2 pthread_cleanup_pop(0); #endif return 0; } } static void clock_add(struct timespec *ts, long delta) { clock_gettime(CLOCK_REALTIME, ts); ts->tv_nsec += delta; if (ts->tv_nsec >= 1000000000L) { ts->tv_sec += ts->tv_nsec / 1000000000L; ts->tv_nsec %= 1000000000L; } } /* send TO a dataless message (non-blocking) */ static void _fd_send(int to, int tag) { MPI_Request mpi_request = MPI_REQUEST_NULL; if (MPI_Isend(NULL, 0, MPI_CHAR, to, tag, MPI_COMM_WORLD, &mpi_request)) _fd_fatal("MPI_Isend failed"); } /* send TO a message with data (non-blocking) */ static void _fd_send_data(int to, int tag, void *buf, int n, MPI_Datatype type) { MPI_Request mpi_request = MPI_REQUEST_NULL; if (MPI_Isend(buf, n, type, to, tag, MPI_COMM_WORLD, &mpi_request)) _fd_fatal("MPI_Isend failed"); } #if STEAL_WORK >= 2 /* send TO a message with data and keep the record of the REQUEST */ static void _fd_ssend_data(int to, int tag, void *buf, int n, MPI_Datatype type, MPI_Request *request) { if (request) { #ifndef MAD_MPI if (MPI_Issend(buf, n, type, to, tag, MPI_COMM_WORLD, request)) _fd_fatal("MPI_Issend failed"); #if 0 { int mpi_flag; _fd_debug("[%d] MPI_Test = %d\n", tid, MPI_Test(request, &mpi_flag, MPI_STATUS_IGNORE)); if (mpi_flag) _fd_debug("[%d] send completed\n", tid); else _fd_debug("[%d] send didn't complete yet\n", tid); } #endif #else /* MAD_MPI */ // XXX: NMAD doesn't have MPI_Issend // XXX: no record of the sending is left if (MPI_Ssend(buf, n, type, to, tag, MPI_COMM_WORLD)) _fd_fatal("MPI_Send failed"); *request = MPI_REQUEST_NULL; #endif /* MAD_MPI */ } else { MPI_Request mpi_request = MPI_REQUEST_NULL; if (MPI_Isend(buf, n, type, to, tag, MPI_COMM_WORLD, &mpi_request)) _fd_fatal("MPI_Isend failed"); } } #endif /* STEAL_WORK >= 2 */ static void fd__send_empty_store(int to, int tag, _fd_store buffer, MPI_Request *request) { if (MPI_Isend(buffer, 0, MPI_DOMAIN_TYPE, to, tag, MPI_COMM_WORLD, request)) _fd_fatal("MPI_Isend failed"); } static void _fd_send_store(int to, int tag, _fd_store buffer, MPI_Request *request, bool synchronous) { if (synchronous) { _fd_ssend_data(to, tag, buffer, fd_variables_count * DOMAIN_WORDS, MPI_DOMAIN_TYPE, request); } else { if (MPI_Isend(buffer, fd_variables_count * DOMAIN_WORDS, MPI_DOMAIN_TYPE, to, tag, MPI_COMM_WORLD, request)) _fd_fatal("MPI_Isend failed"); } } static int _fd_recv_store(int from, int tag, _fd_store buffer) { MPI_Status status; int count; if (MPI_Recv(buffer, fd_variables_count * DOMAIN_WORDS, MPI_DOMAIN_TYPE, from, tag, MPI_COMM_WORLD, &status)) _fd_fatal("MPI_Recv failed"); MPI_Get_count(&status, MPI_DOMAIN_TYPE, &count); return count; } static void _fd_broadcast(int msg, int processes, int from) { #if 0 MPI_Request *mpi_request /* = MPI_REQUEST_NULL*/; int i; for (i = 0; i < processes; ++i) if (i != from) { mpi_request = malloc(sizeof(MPI_Request)); if (MPI_Isend(NULL, 0, MPI_CHAR, i, msg, MPI_COMM_WORLD, mpi_request)) _fd_fatal("MPI_Isend failed"); fd_list_append(pending_requests, mpi_request); } #else MPI_Request mpi_request = MPI_REQUEST_NULL; int i; for (i = 0; i < processes; ++i) if (i != from) if (MPI_Isend(NULL, 0, MPI_CHAR, i, msg, MPI_COMM_WORLD, &mpi_request)) _fd_fatal("MPI_Isend failed"); #endif } static void _fd_broadcast_data(int msg, void *buf, int n, MPI_Datatype type, int processes, int from) { int i; for (i = 0; i < processes; ++i) if (i != from) _fd_send_data(i, msg, buf, n, type); } #ifdef GET_SIGNAL #include void sig_handler(int sig) { printf("******** caught signal %d ********\n", sig); } struct sigaction old_actions[_NSIG]; struct sigaction new_action = { sig_handler, 0, 0, NULL }; #endif static bool _fd_probe_external(int tag, char *name, int *source, bool consume) { MPI_Status mpi_status; int mpi_flag; int s; s = MPI_Iprobe(MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &mpi_flag, &mpi_status); #if 0 // XXX: what the heck is the return value supposed to mean? if (s) { char es[1024]; int _; _fd_debug("MPI_Iprobe %s returned %d\n", name, s); MPI_Error_string(s, es, &_); _fd_debug("%s\n", es); } #endif if (mpi_flag) { *source = mpi_status.MPI_SOURCE; _fd_debug("[%d] got %s from %d\n", tid, name, *source); // consume message if (consume) if (MPI_Recv(NULL, 0, MPI_CHAR, mpi_status.MPI_SOURCE, tag, MPI_COMM_WORLD, &mpi_status)) _fd_fatal("MPI_Recv failed"); return true; } return false; } static int _fd_get_event(int running, int *source, int *timeout) { struct timespec ts; double tin, tmout = 0; int event; _fd_debug("[%d] getting next event\n", tid); if (timeout && *timeout) { tin = MPI_Wtime(); tmout = tin + *timeout / 1000.0; // timeout comes in milliseconds } for (;;) { // see if any of the agents has anything to say if (running > 0) { if (sem_trywait(¬ify_semaphore)) { if (errno == EAGAIN) ; //_fd_debug("[%d] notify semaphore is busy\n", tid); else perror("sem_trywait (notify)"); } else { // got something from one of the agents _fd_debug("[%d] got something from an agent\n", tid); if (!_fd_counting_solutions) { int s = successful; if (sem_post(&ready_semaphore)) perror("sem_post"); _fd_debug("[%d] got an answer from %s%d\n", tid, s == -1 ? "-" : "", s + (s < 0)); if (s >= 0) { *source = s; event = EV_INT_SUCCESS; break; } *source = -(s + 1); return EV_INT_FAIL; } else /* _fd_counting_solutions != 0 */ { *source = successful; if (sem_post(&ready_semaphore)) perror("sem_post"); event = EV_INT_COUNT; break; } } } #ifndef MAD_MPI { MPI_Status mpi_status; int mpi_flag; bool consume = true; char *name; MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &mpi_flag, &mpi_status); if (mpi_flag) { *source = mpi_status.MPI_SOURCE; switch (mpi_status.MPI_TAG) { case FD_MSG_SOLUTION: name = "FD_MSG_SOLUTION"; consume = false; event = EV_EXT_SUCCESS; break; case FD_MSG_FAIL: name = "FD_MSG_FAIL"; event = EV_EXT_FAIL; break; case FD_MSG_DONE: name = "FD_MSG_DONE"; event = EV_EXT_DONE; break; case FD_MSG_BOUND: name = "FD_MSG_BOUND"; consume = false; event = EV_EXT_BOUND; break; case FD_MSG_COUNT: name = "FD_MSG_COUNT"; consume = false; event = EV_EXT_COUNT; break; case FD_MSG_STORE: name = "FD_MSG_STORE"; consume = false; event = EV_EXT_STORE; break; case FD_MSG_FEED_ME: name = "FD_MSG_FEED_ME"; event = EV_EXT_REQ_WORK; break; case FD_MSG_NO_WORK: name = "FD_MSG_NO_WORK"; event = EV_EXT_NO_WORK; break; case FD_MSG_NO_SHARE: name = "FD_MSG_NO_SHARE"; event = EV_EXT_NO_SHARE; break; case FD_MSG_SHARE: name = "FD_MSG_SHARE"; event = EV_EXT_REQ_SHARE; break; case FD_MSG_QUIT: name = "FD_MSG_QUIT"; event = EV_EXT_QUIT; break; case FD_MSG_READY: name = "FD_MSG_READY"; event = EV_EXT_READY; break; default: _fd_debug("[%d] unknown tag %d\n", tid, mpi_status.MPI_TAG); _fd_fatal("unknown tag in message"); } _fd_debug("[%d] got %s from %d\n", tid, name, *source); // consume message if (consume) if (MPI_Recv(NULL, 0, MPI_CHAR, mpi_status.MPI_SOURCE, mpi_status.MPI_TAG, MPI_COMM_WORLD, &mpi_status)) _fd_fatal("MPI_Recv failed"); break; } } #else /* MAD_MPI */ // XXX: New Madeleine doesn't implement MPI_*probe(..., MPI_ANY_TAG, ...) if (_fd_probe_external(FD_MSG_SOLUTION, "FD_MSG_SOLUTION", source, false)) { event = EV_EXT_SUCCESS; break; } if (_fd_probe_external(FD_MSG_BOUND, "FD_MSG_BOUND", source, false)) { event = EV_EXT_BOUND; break; } if (_fd_probe_external(FD_MSG_QUIT, "FD_MSG_QUIT", source, true)) { event = EV_EXT_QUIT; break; } if (_fd_probe_external(FD_MSG_NO_WORK, "FD_MSG_NO_WORK", source, true)) { event = EV_EXT_NO_WORK; break; } if (_fd_probe_external(FD_MSG_STORE, "FD_MSG_STORE", source, false)) { event = EV_EXT_STORE; break; } if (_fd_probe_external(FD_MSG_SHARE, "FD_MSG_SHARE", source, true)) { event = EV_EXT_REQ_SHARE; break; } if (_fd_probe_external(FD_MSG_NO_SHARE, "FD_MSG_NO_SHARE", source, true)) { event = EV_EXT_NO_SHARE; break; } /* must be checked for after FD_MSG_STORE: if the `supplier' sends an FD_MSG_STORE, then finishes its work and sends an FD_MSG_FEED_ME and the checks are reversed, the latter will be ignored (since this is not yet the supplier) and the solver will not halt (XXX: only if not polling?) */ if (_fd_probe_external(FD_MSG_FEED_ME, "FD_MSG_FEED_ME", source, true)) { event = EV_EXT_REQ_WORK; break; } if (_fd_probe_external(FD_MSG_FAIL, "FD_MSG_FAIL", source, true)) { event = EV_EXT_FAIL; break; } // XXX: only processes with rank != 0 shall receive DONE if (_fd_probe_external(FD_MSG_DONE, "FD_MSG_DONE", source, true)) { event = EV_EXT_DONE; break; } if (_fd_probe_external(FD_MSG_READY, "FD_MSG_READY", source, true)) { event = EV_EXT_READY; break; } if (_fd_counting_solutions) // XXX: only the process with rank 0 shall receive COUNTs if (_fd_probe_external(FD_MSG_COUNT, "FD_MSG_COUNT", source, false)) { event = EV_EXT_COUNT; break; } #endif /* MAD_MPI */ if (tmout && MPI_Wtime() >= tmout) { *timeout = 0; return EV_INT_TIMEOUT; } if (tmout) { struct timespec ts = { 0, WAIT_TOUT }; nanosleep(&ts, NULL); } else { struct timespec ts = { 0, WAIT_NORM }; nanosleep(&ts, NULL); } } if (tmout) { *timeout = (int) ((tmout - MPI_Wtime()) * 1000.0 + 0.5); if (*timeout < 1) *timeout = 1; } return event; } static void _fd_flush_events(int procno) { MPI_Status mpi_status; int mpi_flag; int got_one; MPI_Request *mpi_request; // check the status of MPI pending requests while (mpi_request = fd_list_remove(pending_requests)) { if (MPI_Test(mpi_request, &mpi_flag, MPI_STATUS_IGNORE)) _fd_debug("[%d] MPI_Test failed\n", procno); if (!mpi_flag) { #if 0 // this is probably the solution just sent, not cancelling it _fd_debug("[%d] cancelling request\n", procno); if (MPI_Cancel(mpi_request)) _fd_debug("[%d] MPI_Cancel failed\n", procno); _fd_debug("[%d] waiting for cancellation completion\n", procno); if (MPI_Wait(mpi_request, MPI_STATUS_IGNORE)) _fd_debug("[%d] MPI_Wait failed\n", procno); #else _fd_debug("[%d] found a not completed request\n", procno); #endif } free(mpi_request); } fd_list_dispose(pending_requests); #ifndef MAD_MPI // XXX: should be cancelled at the sending site? // consume pending incoming communications for (;;) { int source; char *name; MPI_Datatype type = MPI_CHAR; int count = 0; MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &mpi_flag, &mpi_status); if (!mpi_flag) // nothing to do if no incoming message is found break; source = mpi_status.MPI_SOURCE; switch (mpi_status.MPI_TAG) { case FD_MSG_SOLUTION: name = "FD_MSG_SOLUTION"; type = MPI_DOMAIN_TYPE; count = fd_variables_count * DOMAIN_WORDS; _fd_error("[%d] dropping %s from %d\n", tid, name, source); break; case FD_MSG_FAIL: name = "FD_MSG_FAIL"; break; case FD_MSG_DONE: name = "FD_MSG_DONE"; break; case FD_MSG_BOUND: name = "FD_MSG_BOUND"; type = MPI_INT; count = 1; break; case FD_MSG_COUNT: name = "FD_MSG_COUNT"; type = MPI_LONG_LONG; count = 1; _fd_error("[%d] dropping %s from %d\n", tid, name, source); break; case FD_MSG_STORE: name = "FD_MSG_STORE"; type = MPI_DOMAIN_TYPE; count = fd_variables_count * DOMAIN_WORDS; _fd_error("[%d] *** dropping %s from %d\n", tid, name, source); break; case FD_MSG_FEED_ME: name = "FD_MSG_FEED_ME"; break; case FD_MSG_NO_WORK: name = "FD_MSG_NO_WORK"; break; case FD_MSG_NO_SHARE: name = "FD_MSG_NO_SHARE"; break; case FD_MSG_SHARE: name = "FD_MSG_SHARE"; break; case FD_MSG_QUIT: name = "FD_MSG_QUIT"; break; case FD_MSG_READY: name = "FD_MSG_READY"; break; default: _fd_debug("[%d] unknown tag %d\n", tid, mpi_status.MPI_TAG); _fd_fatal("unknown tag in message"); } _fd_debug("[%d] dropping %s from %d\n", tid, name, source); // consume message { char *buffer = alloca(fd_variables_count * sizeof(*store)); if (MPI_Recv(buffer, count, type, source, mpi_status.MPI_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE)) _fd_error("[%d] MPI_Recv failed (dropping %s)\n", tid, name); } } #else /* MAD_MPI */ // XXX: New Madeleine doesn't implement MPI_*probe(..., MPI_ANY_TAG, ...) //#ifdef MAD_MPI // // XXX: otherwise, assume they will be cancelled at the sending site // consume pending incoming communications do { got_one = 0; MPI_Iprobe(MPI_ANY_SOURCE, FD_MSG_SOLUTION, MPI_COMM_WORLD, &mpi_flag, &mpi_status); if (mpi_flag) { _fd_store null = alloca(fd_variables_count * sizeof(*null)); _fd_debug("[%d] dropping FD_MSG_SOLUTION from %d\n", procno, mpi_status.MPI_SOURCE); _fd_recv_store(mpi_status.MPI_SOURCE, FD_MSG_SOLUTION, null); got_one++; } MPI_Iprobe(MPI_ANY_SOURCE, FD_MSG_FEED_ME, MPI_COMM_WORLD, &mpi_flag, &mpi_status); if (mpi_flag) { _fd_debug("[%d] dropping FD_MSG_FEED_ME from %d\n", procno, mpi_status.MPI_SOURCE); // consume message if (MPI_Recv(NULL, 0, MPI_CHAR, mpi_status.MPI_SOURCE, FD_MSG_FEED_ME, MPI_COMM_WORLD, &mpi_status)) _fd_fatal("MPI_Recv failed"); got_one++; } MPI_Iprobe(MPI_ANY_SOURCE, FD_MSG_NO_WORK, MPI_COMM_WORLD, &mpi_flag, &mpi_status); if (mpi_flag) { _fd_debug("[%d] dropping FD_MSG_NO_WORK from %d\n", procno, mpi_status.MPI_SOURCE); // consume message if (MPI_Recv(NULL, 0, MPI_CHAR, mpi_status.MPI_SOURCE, FD_MSG_NO_WORK, MPI_COMM_WORLD, &mpi_status)) _fd_fatal("MPI_Recv failed"); got_one++; } MPI_Iprobe(MPI_ANY_SOURCE, FD_MSG_STORE, MPI_COMM_WORLD, &mpi_flag, &mpi_status); if (mpi_flag) { _fd_store null = alloca(fd_variables_count * sizeof(*null)); _fd_debug("[%d] dropping FD_MSG_STORE from %d\n", procno, mpi_status.MPI_SOURCE); _fd_recv_store(mpi_status.MPI_SOURCE, FD_MSG_STORE, null); got_one++; } MPI_Iprobe(MPI_ANY_SOURCE, FD_MSG_FAIL, MPI_COMM_WORLD, &mpi_flag, &mpi_status); if (mpi_flag) { _fd_debug("[%d] dropping FD_MSG_FAIL from %d\n", procno, mpi_status.MPI_SOURCE); // consume message if (MPI_Recv(NULL, 0, MPI_CHAR, mpi_status.MPI_SOURCE, FD_MSG_FAIL, MPI_COMM_WORLD, &mpi_status)) _fd_fatal("MPI_Recv failed"); got_one++; } MPI_Iprobe(MPI_ANY_SOURCE, FD_MSG_DONE, MPI_COMM_WORLD, &mpi_flag, &mpi_status); if (mpi_flag) { _fd_debug("[%d] dropping FD_MSG_DONE from %d\n", procno, mpi_status.MPI_SOURCE); // consume message if (MPI_Recv(NULL, 0, MPI_CHAR, mpi_status.MPI_SOURCE, FD_MSG_DONE, MPI_COMM_WORLD, &mpi_status)) _fd_fatal("MPI_Recv failed"); got_one++; } MPI_Iprobe(MPI_ANY_SOURCE, FD_MSG_SHARE, MPI_COMM_WORLD, &mpi_flag, &mpi_status); if (mpi_flag) { _fd_debug("[%d] dropping FD_MSG_SHARE from %d\n", procno, mpi_status.MPI_SOURCE); // consume message if (MPI_Recv(NULL, 0, MPI_CHAR, mpi_status.MPI_SOURCE, FD_MSG_SHARE, MPI_COMM_WORLD, &mpi_status)) _fd_fatal("MPI_Recv failed"); got_one++; } MPI_Iprobe(MPI_ANY_SOURCE, FD_MSG_NO_SHARE, MPI_COMM_WORLD, &mpi_flag, &mpi_status); if (mpi_flag) { _fd_debug("[%d] dropping FD_MSG_NO_SHARE from %d\n", procno, mpi_status.MPI_SOURCE); // consume message if (MPI_Recv(NULL, 0, MPI_CHAR, mpi_status.MPI_SOURCE, FD_MSG_NO_SHARE, MPI_COMM_WORLD, &mpi_status)) _fd_fatal("MPI_Recv failed"); got_one++; } } while (got_one); #endif /* MAD_MPI */ } void _fd_cleanup_mpi_state(int process) { // if (!_fd_counting_solutions) { // this rendez-vous ensures that no stop message is cancelled before // a still running process has a chance to see it // XXX: New Madeleine can't cope with messages en route #if 0 //#ifndef MAD_MPI _fd_debug("[%d] setting up barrier 1\n", process); if (MPI_Barrier(MPI_COMM_WORLD)) _fd_error("[%d] MPI_Barrier failed\n", process); else _fd_debug("[%d] leaving barrier 1\n", process); #endif _fd_flush_events(process); #if 0 _fd_debug("[%d] setting up barrier 2\n", process); if (MPI_Barrier(MPI_COMM_WORLD)) _fd_error("[%d] MPI_Barrier failed\n", process); else _fd_debug("[%d] leaving barrier 2\n", process); #endif } _fd_statistics_msgs(); _fd_debug("[%d] calling MPI_Finalize\n", process); if (MPI_Finalize()) _fd_error("[%d] MPI_Finalize failed\n", process); else _fd_debug("[%d] returned from MPI_Finalize\n", process); } static void _fd_exit_process(int process) { _fd_cleanup_mpi_state(process); _fd_debug("[%d] exiting\n", process); exit(0); } void _fd_search_space_sizes(_fd_store stores[], int n) { #ifndef FAST double s; int i, v; // when there are no agents, just return if (*stores == NULL) return; for (i = 0; i < n; ++i) { s = 1; for (v = 0; v < fd__label_vars_count; ++v) s *= _fd_val_size(SVALUE(stores[i][fd__label_vars[v]->index])); _fd_debug("[%d] search space %d size is %g\n", tid, i, s); } #endif /* FAST */ } static void count_singletons(_fd_store store) { #ifndef FAST int i, n; n = 0; for (i = 0; i < fd_variables_count; ++i) if (_fd_val_single(SVALUE(store[i]), NULL)) ++n; _fd_debug("[%d] store has %d/%d/%d singletons\n", tid, n, fd__label_vars_count, fd_variables_count); #endif } #if STEAL_WORK >= 2 static int _fd_restart_agents(int tid, int nagents) { int parts; // split the store among the agents parts = fd__split_problem(nagents, _fd_agents_stores, fd__split_team_problem_f); if (parts < nagents) _fd_debug("[%d] WARNING: store only good for %d agents (out of %d)\n", tid, parts, nagents); agents_to_run = parts; // reset the pool _fd_init_store_depository(agents_to_run); // give the agents the go-ahead if (pthread_mutex_lock(&continue_mutex)) perror("pthread_mutex_lock"); pthread_cond_broadcast(&continue_cond); if (pthread_mutex_unlock(&continue_mutex)) perror("pthread_mutex_unlock"); _fd_debug("[%d] told waiting agents to proceed\n", tid); return parts; } #ifndef ROUND_ROBIN_POLL #define _fd_start_polling(x) _fd_poll_next(processes, x) static int _fd_poll_next(int last, int skip) { int poll; poll = last - 1; // skip over both self and the ex-supplier (if it is not process 0) while (poll == tid || poll == skip && skip != 0) --poll; if (poll >= 0) { // send a forceful request for work _fd_debug("[%d] polling %d\n", tid, poll); _fd_send(poll, FD_MSG_SHARE); } return poll; } #else /* ROUND_ROBIN_POLL */ #define _fd_start_polling(x) _fd_poll_rr(true, processes, x) #define _fd_poll_next(l, x) _fd_poll_rr(false, l, x) static int _fd_poll_rr(bool starting, int np_or_last, int skip) { static int processes = 0; static int tried, last_polled; int poll; int i; if (starting) { tried = 0; if (processes == 0) { processes = np_or_last; last_polled = tid; } } poll = (last_polled + 1) % processes; // skip over both the ex-supplier and self for (; tried < processes && (poll == tid || poll == skip); ++tried) poll = (poll + 1) % processes; if (tried == processes) return -1; // send a forceful request for work _fd_debug("[%d] polling %d\n", tid, poll); _fd_send(poll, FD_MSG_SHARE); last_polled = poll; return poll; } #endif /* ROUND_ROBIN_POLL */ // try to share work with PEER, return true on success // XXX: side-effect: when true is returned, the process will cease to // be the supplier static bool _fd_share_work(int peer, MPI_Request *request, fd_list saved_stores, bool polled) { _fd_store saved; int mpi_flag; // test whether a previously sent store has yet to be received if (MPI_Test(request, &mpi_flag, MPI_STATUS_IGNORE)) perror("MPI_Test"); if (!mpi_flag) { // the previous send has not completed _fd_debug("[%d] there's a store in transit to %d\n", tid, peer); /* despite MPI_Test() having failed, the store may already have been received and processed; since process 0 must receive or give an answer, otherwise there's a risk of deadlock, we'll force one when it is involved */ if (peer != 0 && tid != 0) return true; if (MPI_Wait(request, MPI_STATUS_IGNORE)) perror("MPI_Wait"); } // see if there's a saved store which can be sent if (saved = fd_list_remove(saved_stores)) { memcpy(_fd_processes_stores[peer], saved, fd_variables_count * sizeof(*saved)); _fd_debug("[%d] sending store (saved) to %d\n", tid, peer); _fd_send_store(peer, FD_MSG_STORE, _fd_processes_stores[peer], request, true); free(saved); return true; } #ifndef INDEX_IN_POOL if (_fd_steal_store(_fd_processes_stores[peer], -1, 0)) #else if (_fd_steal_store(_fd_processes_stores[peer], NULL, -1, 0)) #endif { _fd_debug("[%d] sending store to %d\n", tid, peer); _fd_send_store(peer, FD_MSG_STORE, _fd_processes_stores[peer], request, true); return true; } // tell the requester that there is no work available if (polled) { _fd_debug("[%d] no work to share with %d\n", tid, peer); _fd_send(peer, FD_MSG_NO_SHARE); } else { _fd_debug("[%d] can't find work requested by %d\n", tid, peer); _fd_send(peer, FD_MSG_NO_WORK); } return false; } /* Process PEER has polled last resort process 0 for work unsuccessfully, and (probably) has become idle. If all remaining processes are idle, stop them. */ #define mark_process_idle(peer) \ do \ { \ int _p = peer; \ \ if (!process_idle[_p]) \ { \ /* record polling process as idle */ \ gettimeofday(&to, NULL); timersub(&to, &ti, &to); \ _fd_debug("[%d] %d idled after %d.%06ds\n", tid, _p, \ to.tv_sec, to.tv_usec); \ \ process_idle[_p] = true; \ if (++idle_processes == live_procs - 1) \ { \ _fd_debug("[%d] sending 2nd quit\n", tid); \ \ _fd_broadcast(FD_MSG_QUIT, processes, tid); \ } \ } \ } \ while (0) #endif /* STEAL_WORK >= 2 */ int _fd_dsolve() { pthread_t threads[MAX_AGENTS]; int nagents = 4, started, running, peer; int status = FD_NOSOLUTION; int parts; int i; struct timeval ti, to, ts = { 0, 0 }; char *s; int processes, live_procs; #if STEAL_WORK >= 2 bool *process_idle; // processes who have polled process 0 int idle_processes = 0; bool quitting = false; MPI_Request *stores_pending; // to trace the reception of a store int supplier; #endif MPI_Status mpi_status; int done, stopped; _fd_store solution; bool have_solution = false; int bound; unsigned long long external_solutions = 0; // other processes' solutions count fd_int *local_variables = _fd_variables; _fd_store local_store = store; gettimeofday(&ti, NULL); if (MPI_Comm_rank(MPI_COMM_WORLD, &tid)) _fd_fatal("MPI_Comm_rank failed\n"); _fd_debug("I'm MPI rank %d\n", tid); { char host[512]; gethostname(host, sizeof(host)); _fd_debug("[%d] process %d at %s\n", tid, getpid(), host); } if (MPI_Comm_size(MPI_COMM_WORLD, &processes)) _fd_fatal("MPI_Comm_size failed\n"); _fd_debug("[%d] There are %d of us\n", tid, processes); pending_requests = fd_list_new(); #if STEAL_WORK >= 2 stores_pending = calloc(processes, sizeof(*stores_pending)); // XXX: NULL for (i = 0; i < processes; ++i) stores_pending[i] = MPI_REQUEST_NULL; // the last process is the one which can supply work to the others supplier = tid == processes - 1; process_idle = alloca(processes * sizeof(*process_idle)); memset(process_idle, 0, processes * sizeof(*process_idle)); #endif if (fd__workers == -1 && (s = getenv("FDC_AGENTS"))) fd__workers = atoi(s); if (fd__workers != -1) { nagents = fd__workers; if (nagents < 0) { nagents = 0; _fd_debug("[%d] using %d workers\n", tid, nagents); } else if (nagents > MAX_AGENTS) { nagents = MAX_AGENTS; _fd_debug("[%d] using %d workers\n", tid, nagents); } } // allocate stores for all the processes, in case one needs to borrow // work from this one for (i = 0; i < processes; ++i) _fd_processes_stores[i] = calloc(fd_variables_count, sizeof(*store)); // XXX: NULL fd__setup_label_vars(); if (tid == 0) { // code for the controlling process MPI_Request mpi_request = MPI_REQUEST_NULL; #ifdef FILTER_DOMAINS // XXX: *must* revise as the splitting proceeds! if (_fd_filter_domains() == FD_NOSOLUTION) return FD_NOSOLUTION; #endif // do an initial partition among all the processes // XXX: check if processes <= MAX_AGENTS ('cause of _fd_processes_stores) parts = fd__split_problem(processes, _fd_processes_stores, fd__split_problem_f); _fd_search_space_sizes(_fd_processes_stores, parts); if (parts < processes) _fd_error("WARNING: work enough for %d processes only\n", parts); // now send the resulting parts to the other processes // XXX: could each have done what this did and now pick its share? for (i = 1; i < parts; ++i) _fd_send_store(i, FD_MSG_STORE, _fd_processes_stores[i], &mpi_request, false); for (i = parts; i < processes; ++i) fd__send_empty_store(i, FD_MSG_STORE, _fd_processes_stores[i], &mpi_request); processes = parts; // install our part of the search space memcpy(store, _fd_processes_stores[0], fd_variables_count * sizeof(*store)); } else { // code for the remaining processes _fd_debug("[%d] waiting for initial store\n", tid); // retrieve the domains assigned to the process if (!_fd_recv_store(0, FD_MSG_STORE, store)) { _fd_debug("[%d] empty initial store\n", tid); _fd_exit_process(tid); } _fd_debug("[%d] received initial store\n", tid); } #if FILTER_DOMAINS > 1 // XXX: *must* revise as the splitting proceeds! if (_fd_filter_domains() == FD_NOSOLUTION) return FD_NOSOLUTION; // XXX: should try to obtain a new store #endif // now, get on with the search _fd_copy_problem(nagents); // allocate space to keep a solution if (_fd_optimising) solution = calloc(fd_variables_count, sizeof(*solution)); // XXX: NULL? else solution = store; // create agents' stores for (i = 0; i < nagents; ++i) _fd_agents_stores[i] = calloc(fd_variables_count, sizeof(*store)); // XXX: NULL parts = fd__split_problem(nagents, _fd_agents_stores, fd__split_team_problem_f); if (parts < nagents) { _fd_debug("[%d] reducing to %d agents!\n", tid, parts); nagents = parts; } // XXX: pretend there are stores for every agent (should always happen) agents_to_run = nagents; _fd_search_space_sizes(_fd_agents_stores, parts); gettimeofday(&to, NULL); timersub(&to, &ti, &to); _fd_debug("[%d] setup took %d.%06ds\n", tid, to.tv_sec, to.tv_usec); #if 0 for (i = 0; i < nagents; ++i) _fd_cprint2(_fd_copies[i]); return FD_NOSOLUTION; #endif if (nagents > 0) { int timeout = 0; #if STEAL_WORK >= 2 int timeouts; int ex_supplier, polling; fd_list saved_stores = fd_list_new(); _fd_message waiting_on; bool ready = false; // true means no pending communications #ifdef ASK_EARLY bool waiting_for_work = false; bool getting_work_failed = false; #endif #endif _fd_init_store_depository(nagents); if (sem_init(&ready_semaphore, 0, 1)) perror("sem_init (ready)"); if (sem_init(¬ify_semaphore, 0, 0)) perror("sem_init (notify)"); if (_fd_optimising && sem_init(&resume_semaphore, 0, 0)) perror("sem_init (resume)"); for (i = started = 0; i < nagents; ++i) { long l = i; // XXX: avoid a gcc warning pthread_create(&threads[i], NULL, (void *) _fd_agent, (void *) l); started++; // don't start any more agents if one has already succeeded if (!_fd_counting_solutions && !_fd_optimising) { if (pthread_mutex_trylock(&success_mutex) == 0) pthread_mutex_unlock(&success_mutex); else break; } } _fd_debug("[%d] main thread waiting\n", tid); running = started; live_procs = processes; done = stopped = 0; do { int event = _fd_get_event(running, &peer, &timeout); switch (event) { #if STEAL_WORK >= 2 case EV_INT_TIMEOUT: _fd_debug("[%d] timed out waiting for event\n", tid); switch (waiting_on) { case FD_MSG_ANY: done = 1; break; case FD_MSG_FEED_ME: // no work sharing after having been told to stop if (quitting || ready) break; if (timeouts++) { if (tid != 0 || live_procs > 1) { polling = _fd_start_polling(ex_supplier = tid); waiting_on = FD_MSG_SHARE; if (polling == 0) timeout = 0; // last resort polling else timeout = POLL_TIMEOUT; } else #ifndef ASK_EARLY done = 1; #else done = running == 0; waiting_for_work = false; getting_work_failed = true; #endif } else { // resend request for work _fd_debug("[%d] re-asking for work\n", tid); _fd_broadcast(FD_MSG_FEED_ME, processes, tid); timeout = FEED2_TIMEOUT; } break; case FD_MSG_SHARE: // no work sharing after having been told to stop if (quitting || ready) break; // the last polled process must have stopped // try the next one if ((polling = _fd_poll_next(polling, ex_supplier)) >= 0) { if (polling == 0) timeout = 0; // last resort polling else timeout = POLL_TIMEOUT; // reset timeout } else { // no additional work could be found done = tid != 0 || live_procs == 1; #ifdef ASK_EARLY done = done && running == 0; waiting_for_work = false; getting_work_failed = true; #endif if (tid == 0) { // XXX: ??? // become the supplier, to answer further requests // for work, even if negatively supplier = 1; } } break; default: _fd_debug("[%d] unknown timeout: %d\n", tid, waiting_on); _fd_fatal("unknown timeout reason"); } break; #endif /* STEAL_WORK >= 2 */ /* first solution internal events (from the workers) */ case EV_INT_SUCCESS: _fd_debug("[%d] using agent %d's results\n", tid, peer); assert(!_fd_counting_solutions); if (_fd_optimising) { // check and update bound if (!_fd_bound_check_set(_fd_agents_stores[peer])) { _fd_debug("[%d] invalid solution, releasing %d\n", tid, peer); if (sem_post(&resume_semaphore)) perror("sem_post (resume)"); break; } gettimeofday(&ts, NULL); // save solution memcpy(solution, _fd_agents_stores[peer], fd_variables_count * sizeof(*solution)); // release agent _fd_debug("[%d] releasing agent %d\n", tid, peer); if (sem_post(&resume_semaphore)) perror("sem_post (resume)"); _fd_debug("[%d] bound updated to %d\n", tid, _fd_bound_value()); // broadcast new bound bound = _fd_bound_value(); _fd_broadcast_data(FD_MSG_BOUND, &bound, 1, MPI_INT, processes, tid); status = FD_OK; have_solution = true; break; } status = FD_OK; memcpy(store, _fd_agents_stores[peer], fd_variables_count * sizeof(*store)); done = 1; break; case EV_INT_FAIL: _fd_debug("[%d] agent %d found no solution\n", tid, peer); assert(!_fd_counting_solutions); running--; #if STEAL_WORK < 2 done = running == 0 && (tid != 0 || live_procs == 1); #else /* STEAL_WORK < 2 */ #ifdef ASK_EARLY if (running == agents_to_run - 1) { // the first agent halted and the other agents must // be stopping soon; try to get work before they do // no need if we already have a store saved up if (fd_list_empty(saved_stores)) { // ask the other processes for more work, unless this // is the rank 0 process and all others have finished if (tid != 0 || live_procs > 1) { if (!supplier) { _fd_debug("[%d] asking for work\n", tid); _fd_broadcast(FD_MSG_FEED_ME, processes, tid); waiting_on = FD_MSG_FEED_ME; timeout = FEED1_TIMEOUT; timeouts = 0; } else { // poll the other processes for work polling = _fd_start_polling(ex_supplier = tid); waiting_on = FD_MSG_SHARE; if (polling == 0) timeout = 0; // last resort polling else timeout = POLL_TIMEOUT; // set timeout } waiting_for_work = true; } } } #endif /* ASK_EARLY */ if (running == 0) { _fd_store saved; // see if there's some store saved up if (saved = fd_list_remove(saved_stores)) { _fd_debug("[%d] using saved store\n", tid); memcpy(store, saved, fd_variables_count * sizeof(*saved)); count_singletons(store); // restart the agents running = _fd_restart_agents(tid, nagents); free(saved); } #ifdef ASK_EARLY else if (waiting_for_work) { // we're already trying to get work, let's // wait and see how that fans out break; } else if (getting_work_failed) { // if we already have tried to get work and // failed, it's no use doing it again done = tid != 0 || live_procs == 1; break; } #endif /* ASK_EARLY */ else if (quitting || ready) done = quitting && (tid != 0 || live_procs == 1); else if (tid != 0 || live_procs > 1) { // there is at least one other process still running if (!supplier) { // ask the other processes for more work _fd_debug("[%d] asking for work\n", tid); _fd_broadcast(FD_MSG_FEED_ME, processes, tid); waiting_on = FD_MSG_FEED_ME; timeout = FEED1_TIMEOUT; timeouts = 0; } else { // poll the other processes for work polling = _fd_start_polling(ex_supplier = tid); waiting_on = FD_MSG_SHARE; if (polling == 0) timeout = 0; // last resort polling else timeout = POLL_TIMEOUT; // set timeout } #ifdef ASK_EARLY waiting_for_work = true; #endif } else done = 1; } #endif /* STEAL_WORK < 2 */ break; /* first solution external events (from the other processes) */ case EV_EXT_SUCCESS: _fd_debug("[%d] received a solution from %d\n", tid, peer); assert(!_fd_counting_solutions); _fd_recv_store(peer, FD_MSG_SOLUTION, store); _fd_debug("[%d] using process %d's results\n", tid, peer); gettimeofday(&to, NULL); timersub(&to, &ti, &to); #if defined(STATS_PROCS) || !defined(FAST) fd__info("[%d] process %d took %d.%06ds\n", tid, peer, to.tv_sec, to.tv_usec); #endif if (_fd_optimising) { // see if it is the better solution if (!have_solution || _fd_better_solution(store, solution)) { _fd_debug("[%d] process %d solution is better\n", tid, peer); memcpy(solution, store, fd_variables_count * sizeof(*store)); status = FD_OK; have_solution = true; ts.tv_sec = 0; // XXX: could update the current bound } live_procs--; done = live_procs == 1 && running == 0; break; } status = FD_OK; done = 1; break; case EV_EXT_FAIL: _fd_debug("[%d] process %d found no solution\n", tid, peer); assert(!_fd_counting_solutions); gettimeofday(&to, NULL); timersub(&to, &ti, &to); #if defined(STATS_PROCS) || !defined(FAST) fd__info("[%d] process %d took %d.%06ds\n", tid, peer, to.tv_sec, to.tv_usec); #endif live_procs--; #if STEAL_WORK >= 2 // become the supplier, so further requests for work will // be answered // XXX: some requests for work may get lost // XXX: side effect: this process *will* try to supply work supplier = 1; #endif done = running == 0 && live_procs == 1; #ifdef ASK_EARLY done = done && !waiting_for_work; #endif break; case EV_EXT_DONE: _fd_debug("[%d] process %d told me to stop\n", tid, peer); assert(!_fd_counting_solutions); done = stopped = 1; break; /* optimisation external events (from the other processes) */ case EV_EXT_BOUND: _fd_debug("[%d] received new bound from %d\n", tid, peer); assert(!_fd_counting_solutions && _fd_optimising); { int bound; if (MPI_Recv(&bound, 1, MPI_INT, peer, FD_MSG_BOUND, MPI_COMM_WORLD, &mpi_status)) _fd_fatal("MPI_Recv failed"); // try to set new bound if (_fd_set_bound(bound)) { _fd_debug("[%d] new bound is %d\n", tid, bound); // bound has been reset, invalidate previous solution if (have_solution && !_fd_bound_check(solution)) { _fd_debug("[%d] invalidating current solution\n", tid); status = FD_NOSOLUTION; have_solution = false; } } else _fd_debug("[%d] invalid bound %d\n", tid, bound); } break; /* counting solutions internal events (from the workers) */ case EV_INT_COUNT: _fd_debug("[%d] agent %d is done\n", tid, peer); assert(_fd_counting_solutions); running--; #if STEAL_WORK < 2 done = running == 0 && (tid != 0 || live_procs == 1); #else /* STEAL_WORK < 2 */ #ifdef ASK_EARLY if (running == agents_to_run - 1) { // the first agent halted and the other agents must // be stopping soon; try to get work before they do // no need if we already have a store saved up if (fd_list_empty(saved_stores)) { // ask the other processes for more work, unless this // is the rank 0 process and all others have finished if (tid != 0 || live_procs > 1) { if (!supplier) { _fd_debug("[%d] asking for work\n", tid); _fd_broadcast(FD_MSG_FEED_ME, processes, tid); waiting_on = FD_MSG_FEED_ME; timeout = FEED1_TIMEOUT; timeouts = 0; } else { // poll the other processes for work polling = _fd_start_polling(ex_supplier = tid); waiting_on = FD_MSG_SHARE; if (polling == 0) timeout = 0; // last resort polling else timeout = POLL_TIMEOUT; // set timeout } waiting_for_work = true; } } } #endif /* ASK_EARLY */ if (running == 0) { _fd_store saved; // see if there's some store saved up if (saved = fd_list_remove(saved_stores)) { _fd_debug("[%d] using saved store\n", tid); memcpy(store, saved, fd_variables_count * sizeof(*saved)); count_singletons(store); // restart the agents running = _fd_restart_agents(tid, nagents); free(saved); } #ifdef ASK_EARLY else if (waiting_for_work) { // we're already trying to get work, let's // wait and see how that fans out break; } else if (getting_work_failed) { // if we already have tried to get work and // failed, it's no use doing it again done = tid != 0 || live_procs == 1; break; } #endif /* ASK_EARLY */ else if (quitting || ready) done = quitting && (tid != 0 || live_procs == 1); else if (tid != 0 || live_procs > 1) { // there is at least one other process still running if (!supplier) { // ask the other processes for more work _fd_debug("[%d] asking for work\n", tid); _fd_broadcast(FD_MSG_FEED_ME, processes, tid); waiting_on = FD_MSG_FEED_ME; timeout = FEED1_TIMEOUT; timeouts = 0; } else { // poll the other processes for work polling = _fd_start_polling(ex_supplier = tid); waiting_on = FD_MSG_SHARE; if (polling == 0) timeout = 0; // last resort polling else timeout = POLL_TIMEOUT; // set timeout } #ifdef ASK_EARLY waiting_for_work = true; #endif } else done = 1; } #endif /* STEAL_WORK < 2 */ break; /* counting solutions external events (from the other processes) */ case EV_EXT_COUNT: _fd_debug("[%d] receiving count from %d\n", tid, peer); assert(_fd_counting_solutions); { unsigned long long ull; if (MPI_Recv(&ull, 1, MPI_LONG_LONG, peer, FD_MSG_COUNT, MPI_COMM_WORLD, &mpi_status)) _fd_fatal("MPI_Recv failed"); external_solutions += ull; _fd_debug("[%d] process %d found %llu solutions\n", tid, peer, ull); } gettimeofday(&to, NULL); timersub(&to, &ti, &to); #if defined(STATS_PROCS) || !defined(FAST) fd__info("[%d] process %d took %d.%06ds\n", tid, peer, to.tv_sec, to.tv_usec); #endif live_procs--; #if STEAL_WORK >= 2 // become the supplier, so further requests for work will // be answered // XXX: some requests for work may get lost // XXX: side effect: this process *will* try to supply work supplier = 1; #endif done = running == 0 && live_procs == 1; #ifdef ASK_EARLY done = done && !waiting_for_work; #endif break; #if STEAL_WORK >= 2 /* work stealing events (external) */ case EV_EXT_STORE: _fd_debug("[%d] process %d sent a store\n", tid, peer); if (waiting_on == FD_MSG_ANY) _fd_debug("[%d] recovered store\n", tid); if (running == 0) { // retrieve the store _fd_recv_store(peer, FD_MSG_STORE, store); // restart the agents running = _fd_restart_agents(tid, nagents); count_singletons(store); } else { // save the store just received for later _fd_store new = calloc(fd_variables_count, sizeof(*new)); // XXX: NULL // retrieve the store _fd_recv_store(peer, FD_MSG_STORE, new); fd_list_append(saved_stores, new); } // this process becomes the supplier of work // XXX: between the moment the store was sent and now // some requests for work may have been lost supplier = !ready || tid == 0; // cancel timeout timeout = 0; waiting_on = FD_MSG_NONE; #ifdef ASK_EARLY waiting_for_work = false; #endif break; case EV_EXT_REQ_WORK: _fd_debug("[%d] process %d wants more work\n", tid, peer); if (supplier) { if (quitting || ready) { _fd_send(peer, FD_MSG_NO_WORK); break; } if (_fd_share_work(peer, stores_pending + peer, saved_stores, false)) supplier = 0; } break; case EV_EXT_NO_WORK: _fd_debug("[%d] process %d can't supply work\n", tid, peer); // no work sharing after having been told to stop if (quitting || ready) break; // if the message is not expected, just ignore it #ifndef ASK_EARLY if (running > 0 || waiting_on != FD_MSG_FEED_ME) #else if (!waiting_for_work || waiting_on != FD_MSG_FEED_ME) #endif break; // poll the other processes for work if ((polling = _fd_start_polling(ex_supplier = peer)) >= 0) { waiting_on = FD_MSG_SHARE; if (polling == 0) timeout = 0; // last resort polling else timeout = POLL_TIMEOUT; // set timeout } else { // no additional work could be found if (tid != 0) #ifndef ASK_EARLY done = 1; #else done = running == 0; #endif else { // become the supplier, to answer further requests // for work, even if negatively supplier = 1; done = running == 0 && live_procs == 1; } // cancel timeout timeout = 0; #ifdef ASK_EARLY waiting_for_work = false; getting_work_failed = true; #endif } break; case EV_EXT_NO_SHARE: _fd_debug("[%d] process %d can't share work\n", tid, peer); // no work sharing after having been told to stop if (quitting || ready) break; // if the message is not expected, just ignore it #ifndef ASK_EARLY if (running > 0 || waiting_on != FD_MSG_SHARE) #else if (!waiting_for_work || waiting_on != FD_MSG_SHARE) #endif break; if (peer != polling) { _fd_debug("[%d] polling %d but got answer from %d\n", tid, polling, peer); // belated answer, ignore break; } // try the next process if ((polling = _fd_poll_next(polling, ex_supplier)) >= 0) { if (polling == 0) timeout = 0; // last resort polling else timeout = POLL_TIMEOUT; // reset timeout } else { // no additional work could be found if (tid != 0) #ifndef ASK_EARLY { quitting = true; done = 1; } #else done = running == 0; #endif else { // become the supplier, to answer further requests // for work, even if negatively supplier = 1; done = running == 0 && live_procs == 1; } // cancel timeout timeout = 0; #ifdef ASK_EARLY waiting_for_work = false; getting_work_failed = true; #endif } break; case EV_EXT_REQ_SHARE: _fd_debug("[%d] process %d really wants work\n", tid, peer); // if we're idle, there's nothing to share if (running == 0 || quitting || ready) // XXX: could be < nagents { // notify the asking process that there really isn't // work to be shared _fd_debug("[%d] no work to share with %d\n", tid, peer); _fd_send(peer, FD_MSG_NO_SHARE); break; } // see if there is work to share if (_fd_share_work(peer, stores_pending + peer, saved_stores, true)) supplier = 0; break; case EV_EXT_READY: // peer has no work and no outgoing communications pending _fd_debug("[%d] %d is done\n", tid, peer); assert(tid == 0); mark_process_idle(peer); // XXX: in mark_process_idle() //_fd_debug("[%d] sending 2nd quit\n", tid); //_fd_broadcast(FD_MSG_QUIT, processes, tid); //done = running == 0 && live_procs == 1; // become the supplier, to answer further requests // for work, even if negatively supplier = 1; break; case EV_EXT_QUIT: // possibilities for finding work have been exhausted _fd_debug("[%d] %d told me to stop\n", tid, peer); assert(tid != 0); quitting = true; done = running == 0 && ready; if (done) waiting_on = FD_MSG_NONE; break; #endif /* STEAL_WORK >= 2 */ default: _fd_debug("[%d] unknown event in main loop: %d\n", tid, event); _fd_fatal("unknown event in main loop"); } #if STEAL_WORK >= 2 if (done) _fd_debug("[%d] done (quitting = %d, ready = %d, timeout = %d)\n", tid, quitting, ready, timeout); // if it is important that the full search space is explored, // make an extra effort for receiving messages that may have // been delayed if (done && waiting_on != FD_MSG_ANY && (_fd_counting_solutions || _fd_optimising || status != FD_OK && !stopped)) { _fd_debug("[%d] checking for any event before leaving\n", tid); done = 0; waiting_on = FD_MSG_ANY; timeout = 1; continue; } #ifdef MPICH2 # define _count count # define _cancelled cancelled #elif OPEN_MPI && OMPI_MAJOR_VERSION == 1 && OMPI_MINOR_VERSION > 4 # define _count _ucount #endif // try to ensure that no store sent to another process is // lost, which may happen when the process it was addressed // to stopped before receiving it // this applies when optimising or counting all solutions, // and when looking for the first solution and none has yet // been found if (done && !ready && (status != FD_OK && !stopped || _fd_optimising)) { bool cancel_pending = false; for (i = 0; i < processes; ++i) { int tries = 3; MPI_Status mpi_status = { 99, 99, 99, 99, 99 }; int mpi_flag; if (MPI_Test(stores_pending + i, &mpi_flag, &mpi_status)) perror("MPI_Test"); if (!mpi_flag) // request hasn't completed { _fd_debug("[%d] recovering outstanding store to %d\n", tid, i); // try cancelling the send (or the cancel?) if (MPI_Cancel(stores_pending + i)) perror("MPI_Cancel"); _fd_debug("[%d] cancelled request\n", tid); // allow the cancel some time to complete do { if (MPI_Test(stores_pending + i, &mpi_flag, &mpi_status)) perror("MPI_Test"); if (mpi_flag) break; _fd_debug("[%d] cancelling store to %d didn't complete\n", tid, i); if (--tries == 0) break; millisleep(5); } while (!mpi_flag); if (!mpi_flag) // cancel didn't complete, give up for now { _fd_debug("[%d] giving up cancelling store to %i for now\n", tid, i); // if no store is cancelled, will have to wait // for this cancellation to complete; in the // meantime, will check the incoming messages done = 0; timeout = 1; waiting_on = FD_MSG_ANY; continue; } _fd_debug("[%d] status = (%d, %d, %d, %d, %d)\n", tid, mpi_status.MPI_SOURCE, mpi_status.MPI_TAG, mpi_status.MPI_ERROR, mpi_status._count, mpi_status._cancelled); MPI_Test_cancelled(&mpi_status, &mpi_flag); if (!mpi_flag) { _fd_debug("[%d] cancel did not succeed\n", tid); continue; } _fd_debug("[%d] cancel succeeded\n", tid); } else { // this request may already have been cancelled MPI_Test_cancelled(&mpi_status, &mpi_flag); if (!mpi_flag) continue; // the send has completed _fd_debug("[%d] previously cancelled store to %d\n", tid, i); } assert(stores_pending[i] == MPI_REQUEST_NULL); // recover the store memcpy(store, _fd_processes_stores[i], fd_variables_count * sizeof(*store)); count_singletons(store); // and restart running = _fd_restart_agents(tid, nagents); done = 0; // cancel timeout timeout = 0; break; } if (done) // no store was cancelled { if (tid != 0) { _fd_debug("[%d] ready to leave\n", tid); _fd_send(0, FD_MSG_READY); // cease to be the supplier supplier = 0; } done = tid == 0 && live_procs == 1; timeout = 0; ready = true; quitting = false; } } #ifdef MPICH2 # undef _count # undef _cancelled #elif OPEN_MPI && OMPI_MAJOR_VERSION == 1 && OMPI_MINOR_VERSION > 4 # undef _count #endif #endif /* STEAL_WORK >= 2 */ } while (!done); if (!_fd_counting_solutions) { // send word that this process is done if (tid == 0) { // stop the other processes if (live_procs > 1) { _fd_debug("[%d] stopping all processes\n", tid); _fd_broadcast(FD_MSG_DONE, processes, tid); } else _fd_debug("[%d] all other processes already stopped\n", tid); if (_fd_optimising && status == FD_OK) { assert(have_solution); memcpy(store, solution, fd_variables_count * sizeof(*store)); if (ts.tv_sec) { timersub(&ts, &ti, &ts); _fd_debug("[%d] time to solution was %d.%06ds\n", tid, ts.tv_sec, ts.tv_usec); } } } else if (!stopped) { MPI_Request mpi_request; if (status == FD_OK) { MPI_Request *mpi_request = malloc(sizeof(MPI_Request)); _fd_debug("[%d] sending solution\n", tid); if (_fd_optimising) assert(have_solution); _fd_send_store(0, FD_MSG_SOLUTION, solution, mpi_request, false); if (ts.tv_sec) { timersub(&ts, &ti, &ts); _fd_debug("[%d] time to solution was %d.%06ds\n", tid, ts.tv_sec, ts.tv_usec); } fd_list_append(pending_requests, mpi_request); } else { _fd_debug("[%d] sending failure notice\n", tid); _fd_send(0, FD_MSG_FAIL); } } // stop all the agents _fd_debug("[%d] stopping agents\n", tid); for (i = 0; i < started; ++i) { int s; if ((s = pthread_cancel(threads[i])) && s != ESRCH) _fd_debug("[%d] error cancelling thread %d = %d\n", tid, i, s); } // make sure all agents have stopped before exiting if (!_fd_optimising && (status == FD_OK || stopped)) for (i = 0; i < started; ++i) { int s; _fd_debug("[%d] joining agent %d\n", tid, i); if ((s = pthread_join(threads[i], NULL)) && s != ESRCH) _fd_debug("[%d] error joining thread %d = %d\n", tid, i, s); } #if STEAL_WORK >= 2 fd_list_dispose_deep(saved_stores, free); #endif /* STEAL_WORK >= 2 */ } else /* _fd_counting_solutions != 0 */ { // send solutions count to the rank 0 process, telling it that we // are done if (tid != 0) _fd_send_data(0, FD_MSG_COUNT, &total_solutions, 1, MPI_LONG_LONG); #if STEAL_WORK >= 2 // stop all the agents, which are waiting for work _fd_debug("[%d] stopping agents\n", tid); for (i = 0; i < started; ++i) { int s; if ((s = pthread_cancel(threads[i])) && s != ESRCH) _fd_debug("[%d] error cancelling thread %d = %d\n", tid, i, s); } // make sure all agents have stopped before exiting for (i = 0; i < started; ++i) { int s; _fd_debug("[%d] joining agent %d\n", tid, i); if ((s = pthread_join(threads[i], NULL)) && s != ESRCH) _fd_debug("[%d] error joining thread %d = %d\n", tid, i, s); } if (!fd_list_empty(saved_stores)) _fd_fatal("saved_stores not empty on exit"); fd_list_dispose(saved_stores); #endif /* STEAL_WORK >= 2 */ } _fd_statistics_steal(); } else /* agents == 0 */ { _fd_agents_stores[0] = store; _fd_init_store_depository(1); if (!_fd_counting_solutions) status = _fd_dsearch(_fd_variables, 0); else total_solutions = _fd_count_solutions(_fd_variables, 0); store = _fd_agents_stores[0]; // restore its value so it can be freed } if (tid != 0) _fd_exit_process(tid); if (_fd_counting_solutions) { _fd_output("found %llu solutions\n", total_solutions + external_solutions); // XXX status = FD_NOSOLUTION; // XXX } gettimeofday(&to, NULL); timersub(&to, &ti, &to); #if defined(STATS_PROCS) || !defined(FAST) fd__info("process %d took %d.%06ds\n", tid, to.tv_sec, to.tv_usec); #endif return status; }