FastFlow(2)---任务调度Task Schedule
1. 发送任务Task给指定的Farm的Worker
下面的程序将任务0与9发送给worker 0来处理
#include <vector>
#include <ff/farm.hpp>
#include <ff/pipeline.hpp>
#include <iostream>
using namespace ff;
struct Worker : ff_node_t<long> {
long *svc(long *task) {
std::cout << "Worker " << get_my_id()
<< " has got the task "<< *task << "\n";
delete task;
return GO_ON;
}
};
struct Emitter : ff_node_t<long> {
Emitter(ff_loadbalancer *const lb) :lb(lb) {}
long *svc(long*) {
for (long i = 0; i < size; ++i){
if (i == 0 || i == (size - 1))
lb->ff_send_out_to(new long(i), 0);
else
ff_send_out(new long(i));
}
return EOS;
}
ff_loadbalancer *lb;
const long size = 10;
};
int main(int argc, char* argv[]) {
assert(argc > 1);
int nworkers = atoi(argv[1]);
std::vector<std::unique_ptr<ff_node> > Workers;
for (int i = 0; i < nworkers; ++i)
Workers.push_back(make_unique<Worker>());
ff_Farm<long> farm(std::move(Workers));
Emitter E(farm.getlb());
farm.add_emitter(E);
farm.remove_collector();
if (farm.run_and_wait_end() < 0) error("running farm");
return 0;
}
运行结果:
2. 广播任务给所有的Workers处理
#include <vector>
#include <ff/farm.hpp>
#include <ff/pipeline.hpp>
#include <iostream>
using namespace ff;
struct WorkerA : ff_node_t<long> {
long *svc(long *task) {
std::cout << "WorkerA has got the task " << *task << "\n";
return task;
}
};
struct WorkerB : ff_node_t<long> {
long *svc(long *task) {
std::cout << "WorkerB has got the task " << *task << "\n";
return task;
}
};
struct Emitter : ff_node_t<long> {
Emitter(ff_loadbalancer *const lb) :lb(lb) {}
long *svc(long*) {
for (long i = 0; i < size; ++i){
lb->broadcast_task(new long(i));
}
return EOS;
}
ff_loadbalancer* const lb;
const long size = 10;
};
struct Collector : ff_node_t<long> {
Collector(ff_gatherer *const gt) :gt(gt) {}
long *svc(long *task) {
std::cout << "received task from Worker " << gt->get_channel_id() << "\n";
if (gt->get_channel_id() == 0) delete task;
return GO_ON;
}
ff_gatherer* const gt;
};
int main(int argc, char* argv[]) {
assert(argc > 1);
int nworkers = atoi(argv[1]);
assert(nworkers >= 2);
std::vector<std::unique_ptr<ff_node> > Workers;
for (int i = 0; i < nworkers/2; ++i)
Workers.push_back(make_unique<WorkerA>());
for (int i = nworkers / 2; i < nworkers; ++i)
Workers.push_back(make_unique<WorkerB>());
ff_Farm<long> farm(std::move(Workers));
Emitter E(farm.getlb());
farm.add_emitter(E);
Collector C(farm.getgt());
farm.add_collector(C);
if (farm.run_and_wait_end() < 0) error("running farm");
return 0;
}
运行结果: