#include #include #include #include #include #include #include "fdc_int.h" #include "variables.h" #include "values.h" #include "store.h" #include "splitting.h" #include "bound.h" #ifndef COMPACT_DOMAINS #error "only works with COMPACT_DOMAINS" #endif #if !defined(USE_STORE) #error "must USE_STORE" #endif #define MAX_AGENTS 256 // number of workers to use int fd__workers = -1; static fd_int *_fd_copies[MAX_AGENTS]; static _fd_store _fd_agents_stores[MAX_AGENTS]; // used to deliver the id of the agent static int successful; // used to signal the main thread that an agent has found a solution // XXX: could be done through a (shared) variable? static pthread_mutex_t success_mutex = PTHREAD_MUTEX_INITIALIZER; static sem_t ready_semaphore, notify_semaphore; // used when optimising, to release an agent after checking its solution static sem_t resume_semaphore; extern unsigned long long _fd_count_solutions(fd_int[], int); // used to accumulate the number of solutions found (when counting solutions) static unsigned long long total_solutions = 0; int tid = 0; // XXX: team ID, only used in SPLITGO_MPI, but seen in dsearch-sg.c static void _fd_copy_problem(int n) { #ifndef PACK_PROBLEM int i, v; for (i = 0; i < n; ++i) { _fd_copies[i] = calloc(fd_variables_count, sizeof(fd_int)); // XXX: NULL? for (v = 0; v < fd_variables_count; ++v) _fd_copies[i][v] = _fd_var_copy(_fd_variables[v]); _fd_import_constraints(_fd_copies[i]); } #else /* PACK_PROBLEM */ int i; for (i = 0; i < n; ++i) _fd_copies[i] = _fd_variables; // XXX #endif /* PACK_PROBLEM */ } // XXX: turn the variables' domains into ``indexes'' into the store static void _fd_despicable_hack(int n) { store = _fd_agents_stores[n]; } int _fd_agent(int n) { if (!_fd_counting_solutions) { int result; struct timeval ti, tis, to, tos; _fd_debug("starting agent %d\n", n); gettimeofday(&ti, NULL); _fd_despicable_hack(n); memcpy(_fd_variables, _fd_copies[n], fd_variables_count * sizeof(fd_int)); result = _fd_dsearch(_fd_copies[n], n); gettimeofday(&to, NULL); timersub(&to, &ti, &to); _fd_debug("agent %d took %d.%06ds\n", n, to.tv_sec, to.tv_usec); while (result == FD_OK) { _fd_debug("agent %d was successful\n", n); // ensure that only one agent signals its success // XXX: only good (?) if only looking for the 1st solution pthread_mutex_lock(&success_mutex); // make sure the agent releases the mutex if it is cancelled // in sem_wait() pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, &success_mutex); _fd_debug("agent %d acquired success mutex\n", n); // tell the main thread that this agent is done if (sem_wait(&ready_semaphore)) perror("sem_wait (ready)"); successful = n; if (sem_post(¬ify_semaphore)) perror("sem_post"); pthread_mutex_unlock(&success_mutex); pthread_cleanup_pop(0); _fd_debug("agent %d released success mutex\n", n); if (!_fd_optimising) return result; if (sem_wait(&resume_semaphore)) perror("sem_post (resume)"); _fd_debug("[%d] resuming, new bound is %d\n", n, _fd_bound_value()); // find the next solution gettimeofday(&tis, NULL); result = _fd_dsearch_again(_fd_copies[n]); gettimeofday(&to, NULL); timersub(&to, &tis, &tos); timersub(&to, &ti, &to); _fd_debug("agent %d took %d.%06ds (%d.%06ds)\n", n, tos.tv_sec, tos.tv_usec, to.tv_sec, to.tv_usec); } // eventually, the agent will fail to find a solution _fd_debug("agent %d was unsuccessful\n", n); // tell the main thread that this agent is done if (sem_wait(&ready_semaphore)) perror("sem_wait (ready)"); successful = -n - 1; // negative agent number means failure if (sem_post(¬ify_semaphore)) perror("sem_post"); return result; } else /* _fd_counting_solutions != 0 */ { unsigned long long solutions; struct timeval ti, to; _fd_debug("starting agent %d\n", n); gettimeofday(&ti, NULL); _fd_despicable_hack(n); memcpy(_fd_variables, _fd_copies[n], fd_variables_count * sizeof(fd_int)); solutions = _fd_count_solutions(_fd_copies[n], n); gettimeofday(&to, NULL); timersub(&to, &ti, &to); _fd_debug("agent %d took %d.%06ds\n", n, to.tv_sec, to.tv_usec); _fd_debug("agent %d found %llu solutions\n", n, solutions); // tell the main thread that this agent is done if (sem_wait(&ready_semaphore)) perror("sem_wait (ready)"); successful = n; // and update the solutions' running total total_solutions += solutions; if (sem_post(¬ify_semaphore)) perror("sem_post"); return 0; } } void _fd_search_space_sizes(_fd_store stores[], int n) { #ifndef FAST double s; int i, v; // when there are no agents, just return if (*stores == NULL) return; for (i = 0; i < n; ++i) { s = 1; for (v = 0; v < fd__label_vars_count; ++v) s *= _fd_val_size(SVALUE(stores[i][fd__label_vars[v]->index])); _fd_debug("search space %d size is %g\n", i, s); } #endif /* FAST */ } int _fd_dsolve() { pthread_t threads[MAX_AGENTS]; int nagents = 4, started, running, solver; int status = FD_NOSOLUTION; int parts; int i; struct timeval ti, to, ts; char *s; fd_int *local_variables = _fd_variables; _fd_store local_store = store; gettimeofday(&ti, NULL); if (fd__workers == -1 && (s = getenv("FDC_AGENTS"))) fd__workers = atoi(s); if (fd__workers != -1) { nagents = fd__workers; if (nagents < 0) { nagents = 0; _fd_debug("using %d workers\n", nagents); } else if (nagents > MAX_AGENTS) { nagents = MAX_AGENTS; _fd_debug("using %d workers\n", nagents); } } #ifdef FILTER_DOMAINS // XXX: *must* revise as the splitting proceeds! if (_fd_filter_domains() == FD_NOSOLUTION) return FD_NOSOLUTION; #endif _fd_copy_problem(nagents); // create agents' stores for (i = 0; i < nagents; ++i) _fd_agents_stores[i] = calloc(fd_variables_count, sizeof(*store)); // XXX: NULL fd__setup_label_vars(); parts = fd__split_problem(nagents, _fd_agents_stores, fd__split_problem_f); if (parts < nagents) { _fd_debug("reducing to %d agents!\n", parts); nagents = parts; } _fd_search_space_sizes(_fd_agents_stores, parts); gettimeofday(&to, NULL); timersub(&to, &ti, &to); _fd_debug("setup took %d.%06ds\n", to.tv_sec, to.tv_usec); #if 0 for (i = 0; i < nagents; ++i) _fd_cprint2(_fd_copies[i]); return FD_NOSOLUTION; #endif if (!_fd_counting_solutions) { int have_solution = -1; if (nagents > 0) { _fd_init_store_depository(nagents); if (sem_init(&ready_semaphore, 0, 1)) perror("sem_init (ready)"); if (sem_init(¬ify_semaphore, 0, 0)) perror("sem_init (notify)"); if (_fd_optimising && sem_init(&resume_semaphore, 0, 0)) perror("sem_init (resume)"); for (i = started = 0; i < nagents; ++i) { long l = i; // XXX: avoid a gcc warning pthread_create(&threads[i], NULL, (void *) _fd_agent, (void *) l); started++; // don't start any more agents if one has already succeeded if (!_fd_optimising) { if (pthread_mutex_trylock(&success_mutex) == 0) pthread_mutex_unlock(&success_mutex); else break; } } _fd_debug("main thread waiting\n"); running = started; do { if (sem_wait(¬ify_semaphore)) perror("sem_wait"); solver = successful; if (sem_post(&ready_semaphore)) perror("sem_post"); _fd_debug("got an answer from %s%d\n", solver == -1 ? "-" : "", solver + (solver < 0)); if (_fd_optimising && solver >= 0) { // check and update bound if (_fd_bound_check_set(_fd_agents_stores[solver])) { have_solution = solver; gettimeofday(&ts, NULL); memcpy(store, _fd_agents_stores[solver], fd_variables_count * sizeof(*store)); _fd_debug("releasing %d\n", solver); } else _fd_debug("invalid solution, releasing %d\n", solver); if (sem_post(&resume_semaphore)) perror("sem_post (resume)"); continue; } running--; } while ((_fd_optimising || solver < 0) && running > 0); _fd_debug("%s solution\n", solver < 0 ? "found no" : "got a"); for (i = 0; i < started; ++i) { int s; if ((s = pthread_cancel(threads[i])) && s != ESRCH) _fd_debug("error cancelling thread %d = %d\n", i, s); } // make sure all agents have stopped before exiting // XXX: only really needed when not optimising? for (i = 0; i < started; ++i) { int s; if ((s = pthread_join(threads[i], NULL)) && s != ESRCH) _fd_debug("error joining thread %d = %d\n", i, s); } if (_fd_optimising) { if (have_solution >= 0) { _fd_debug("using agent %d's results\n", have_solution); timersub(&ts, &ti, &ts); _fd_debug("time to solution was %d.%06ds\n", ts.tv_sec, ts.tv_usec); status = FD_OK; } } else if (solver >= 0) { _fd_debug("using agent %d's results\n", solver); status = FD_OK; memcpy(store, _fd_agents_stores[solver], fd_variables_count * sizeof(*store)); } } else /* agents == 0 */ { if (_fd_optimising) _fd_fatal("need at least one agent when optimising"); _fd_agents_stores[0] = store; _fd_init_store_depository(1); status = _fd_dsearch(_fd_variables, 0); store = _fd_agents_stores[0]; // restore its value so it can be freed } _fd_statistics_steal(); return status; } else /* _fd_counting_solutions != 0 */ { if (nagents > 0) { _fd_init_store_depository(nagents); if (sem_init(&ready_semaphore, 0, 1)) perror("sem_init (ready)"); if (sem_init(¬ify_semaphore, 0, 0)) perror("sem_init (notify)"); for (i = started = 0; i < nagents; ++i) { long l = i; // XXX: avoid a gcc warning pthread_create(&threads[i], NULL, (void *) _fd_agent, (void *) l); started++; } _fd_debug("main thread waiting\n"); running = started; do { if (sem_wait(¬ify_semaphore)) perror("sem_wait"); if (sem_post(&ready_semaphore)) perror("sem_post"); running--; } while (running > 0); } else /* agents == 0 */ { _fd_agents_stores[0] = store; _fd_init_store_depository(1); total_solutions = _fd_count_solutions(_fd_variables, 0); } _fd_statistics_steal(); _fd_output("found %llu solutions\n", total_solutions); // XXX return FD_NOSOLUTION; // XXX } }