Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 61 additions & 12 deletions src/agents/query_engine/query_element/And.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@ class And : public Operator<N> {
* @param clauses Array with N clauses (each clause is supposed to be a Source or an Operator).
* @param link_templates Vector with all the query elements in clauses which are LinkTemplate
* objects. This is stored in the And object just to make shure they don't get released before
* @param not_operator_flag A flag indicating if this AND operator should act like a AND_NOT
* operator instead. An AND_NOT operator is like an AND operator but it assumes a NOT attached
* to its last clause. For instance AND_NOT(A, B, C) is true if A AND B AND NOT C is true.
* the And operation ends.
*/
And(const array<shared_ptr<QueryElement>, N>& clauses,
const vector<shared_ptr<QueryElement>>& link_templates = {})
: Operator<N>(clauses) {
const vector<shared_ptr<QueryElement>>& link_templates = {},
bool not_operator_flag = false)
: Operator<N>(clauses), not_operator_flag(not_operator_flag) {
initialize(clauses);
this->link_templates = link_templates;
}
Expand All @@ -42,12 +46,14 @@ class And : public Operator<N> {
*/
~And() {
graceful_shutdown();
LOG_LOCAL_DEBUG("AND operator destructor. Deleting query answers...");
for (size_t i = 0; i < N; i++) {
for (auto answer : this->query_answer[i]) {
delete answer;
}
this->query_answer[i].clear();
}
LOG_LOCAL_DEBUG("AND operator destructor. Deleting query answers... DONE");
}

// --------------------------------------------------------------------------------------------
Expand All @@ -59,7 +65,9 @@ class And : public Operator<N> {
}

virtual void graceful_shutdown() {
LOG_LOCAL_DEBUG("And::graceful_shutdown() BEGIN");
if (Operator<N>::is_flow_finished()) {
LOG_LOCAL_DEBUG("And::graceful_shutdown() early END");
return;
}
Operator<N>::graceful_shutdown();
Expand All @@ -68,6 +76,7 @@ class And : public Operator<N> {
delete this->operator_thread;
this->operator_thread = NULL;
}
LOG_LOCAL_DEBUG("And::graceful_shutdown() END");
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -124,6 +133,8 @@ class And : public Operator<N> {
thread* operator_thread;
unsigned int query_answer_count;
vector<shared_ptr<QueryElement>> link_templates;
bool not_operator_flag;
unsigned int num_and_clauses;

void initialize(const array<shared_ptr<QueryElement>, N>& clauses) {
this->operator_thread = NULL;
Expand All @@ -133,7 +144,13 @@ class And : public Operator<N> {
}
this->no_more_answers_to_arrive = false;
this->query_answer_count = 0;
this->id = "And(";
if (this->not_operator_flag) {
this->id = "AndNot(";
this->num_and_clauses = N - 1;
} else {
this->id = "And(";
this->num_and_clauses = N;
}
for (unsigned int i = 0; i < N; i++) {
this->id += clauses[i]->id;
if (i != (N - 1)) {
Expand All @@ -145,13 +162,20 @@ class And : public Operator<N> {
}

bool ready_to_process_candidate() {
for (unsigned int i = 0; i < N; i++) {
for (unsigned int i = 0; i < this->num_and_clauses; i++) {
if ((!this->all_answers_arrived[i]) &&
(this->query_answer[i].size() <= (this->next_input_to_process[i] + 1))) {
return false;
}
}
return true;
if (this->not_operator_flag) {
// If running an AndNot operator, all answers of the NOT clause must arrive
// BEFORE any combination is eavaluated because we must be sure that any potentially
// approved combination aren't returned in the NOT clause.
return this->all_answers_arrived[this->num_and_clauses];
} else {
return true;
}
}

void ingest_newly_arrived_answers() {
Expand Down Expand Up @@ -184,27 +208,42 @@ class And : public Operator<N> {

void operate_candidate(const CandidateRecord& candidate) {
QueryAnswer* new_query_answer = QueryAnswer::copy(candidate.answer[0]);
for (unsigned int i = 1; i < N; i++) {
for (unsigned int i = 1; i < this->num_and_clauses; i++) {
if (!new_query_answer->merge(candidate.answer[i])) {
delete new_query_answer;
return;
}
}
if (this->not_operator_flag) {
if (this->query_answer[this->num_and_clauses].size() == 0) {
LOG_DEBUG("NOT clause didn't match. Disregarding it.");
} else {
for (auto answer : this->query_answer[this->num_and_clauses]) {
LOG_DEBUG(new_query_answer->to_string() + " AND NOT " + answer->to_string());
if (new_query_answer->assignment.is_compatible(answer->assignment)) {
LOG_DEBUG("Discarding query answer");
delete new_query_answer;
return;
}
}
}
}
this->query_answer_count++;
LOG_DEBUG("Reporting answer: " + new_query_answer->to_string());
this->output_buffer->add_query_answer(new_query_answer);
}

bool processed_all_input() {
if (this->border.size() > 0) {
return false;
} else {
for (unsigned int i = 0; i < N; i++) {
for (unsigned int i = 0; i < this->num_and_clauses; i++) {
if ((this->next_input_to_process[i] == this->query_answer[i].size()) &&
(this->all_answers_arrived[i])) {
return true;
}
}
for (unsigned int i = 0; i < N; i++) {
for (unsigned int i = 0; i < this->num_and_clauses; i++) {
if (this->next_input_to_process[i] < this->query_answer[i].size()) {
return false;
}
Expand All @@ -217,10 +256,12 @@ class And : public Operator<N> {
CandidateRecord candidate;
unsigned int index_in_queue;
bool abort_candidate;
for (unsigned int new_candidate_count = 0; new_candidate_count < N; new_candidate_count++) {
for (unsigned int new_candidate_count = 0; new_candidate_count < this->num_and_clauses;
new_candidate_count++) {
abort_candidate = false;
candidate.fitness = 1.0;
for (unsigned int answer_queue_index = 0; answer_queue_index < N; answer_queue_index++) {
for (unsigned int answer_queue_index = 0; answer_queue_index < this->num_and_clauses;
answer_queue_index++) {
index_in_queue = last_used_candidate.index[answer_queue_index];
if (answer_queue_index == new_candidate_count) {
index_in_queue++;
Expand All @@ -241,6 +282,10 @@ class And : public Operator<N> {
if (abort_candidate) {
continue;
}
if (this->not_operator_flag) {
candidate.answer[this->num_and_clauses] = NULL;
candidate.index[this->num_and_clauses] = 0;
}
if (visited.find(candidate) == visited.end()) {
this->border.push(candidate);
this->visited.insert(candidate);
Expand Down Expand Up @@ -277,7 +322,7 @@ class And : public Operator<N> {
// processed_all_input() is double-checked on purpose to avoid race condition
processed_all_input()) {
this->output_buffer->query_answers_finished();
LOG_INFO(this->id << " processed " << this->query_answer_count << " answers.");
LOG_INFO(this->id << " reported " << this->query_answer_count << " answers.");
}
STOP_WATCH_STOP(and_operator);
Utils::sleep();
Expand All @@ -291,12 +336,16 @@ class And : public Operator<N> {
}
CandidateRecord candidate;
double fitness = 1.0;
for (unsigned int i = 0; i < N; i++) {
for (unsigned int i = 0; i < this->num_and_clauses; i++) {
candidate.answer[i] = this->query_answer[i][this->next_input_to_process[i]],
candidate.index[i] = this->next_input_to_process[i];
this->next_input_to_process[i]++;
fitness *= candidate.answer[i]->importance;
}
if (this->not_operator_flag) {
candidate.answer[this->num_and_clauses] = NULL;
candidate.index[this->num_and_clauses] = 0;
}
candidate.fitness = fitness;
this->border.push(candidate);
this->visited.insert(candidate);
Expand Down
Loading
Loading