当前位置: 首页 > news >正文

FastFlow(2)---任务调度Task Schedule

1. 发送任务Task给指定的Farm的Worker

下面的程序将任务0与9发送给worker 0来处理

#include <vector>
#include <ff/farm.hpp>
#include <ff/pipeline.hpp>
#include <iostream>

using namespace ff;

struct Worker : ff_node_t<long> {
    long *svc(long *task) {
        std::cout << "Worker " << get_my_id() 
            << " has got the task "<< *task << "\n";
        delete task;
        return GO_ON;
    }
};

struct Emitter : ff_node_t<long> {
    Emitter(ff_loadbalancer *const lb) :lb(lb) {}
    long *svc(long*) {
        for (long i = 0; i < size; ++i){
            if (i == 0 || i == (size - 1))
                lb->ff_send_out_to(new long(i), 0);
            else
                ff_send_out(new long(i));
        }
        return EOS;
    }
    ff_loadbalancer *lb;
    const long size = 10;
};

int main(int argc, char* argv[]) {
    assert(argc > 1);
    int nworkers = atoi(argv[1]);

    std::vector<std::unique_ptr<ff_node> > Workers;
    for (int i = 0; i < nworkers; ++i)
        Workers.push_back(make_unique<Worker>());

    ff_Farm<long> farm(std::move(Workers));
    Emitter E(farm.getlb());
    farm.add_emitter(E);
    farm.remove_collector();

    if (farm.run_and_wait_end() < 0) error("running farm");

    return 0;
}

运行结果:

 2. 广播任务给所有的Workers处理

#include <vector>
#include <ff/farm.hpp>
#include <ff/pipeline.hpp>
#include <iostream>

using namespace ff;

struct WorkerA : ff_node_t<long> {
    long *svc(long *task) {
        std::cout << "WorkerA has got the task  " << *task << "\n";
        return  task;
    }
};

struct WorkerB : ff_node_t<long> {
    long *svc(long *task) {
        std::cout << "WorkerB has got the task  " << *task << "\n";
        return  task;
    }
};

struct Emitter : ff_node_t<long> {
    Emitter(ff_loadbalancer *const lb) :lb(lb) {}
    long *svc(long*) {
        for (long i = 0; i < size; ++i){
            lb->broadcast_task(new long(i));
        }
        return EOS;
    }
    ff_loadbalancer* const lb;
    const long size = 10;
};

struct Collector : ff_node_t<long> {
    Collector(ff_gatherer *const gt) :gt(gt) {}
    long *svc(long *task) {
        std::cout << "received task from Worker " << gt->get_channel_id() << "\n";
        if (gt->get_channel_id() == 0) delete task;
        return GO_ON;
    }
    ff_gatherer* const gt;
};

int main(int argc, char* argv[]) {
    assert(argc > 1);
    int nworkers = atoi(argv[1]);
    assert(nworkers >= 2);

    std::vector<std::unique_ptr<ff_node> > Workers;
    for (int i = 0; i < nworkers/2; ++i)
        Workers.push_back(make_unique<WorkerA>());

    for (int i = nworkers / 2; i < nworkers; ++i)
        Workers.push_back(make_unique<WorkerB>());

    ff_Farm<long> farm(std::move(Workers));
    Emitter E(farm.getlb());
    farm.add_emitter(E);
    Collector C(farm.getgt());
    farm.add_collector(C);

    if (farm.run_and_wait_end() < 0) error("running farm");

    return 0;
}

运行结果:

 

相关文章:

  • 根据当前日期获取前一天日期-小工具
  • 金仓数据库KingbaseES客户端应用参考手册--12. sys_dumpall
  • 最详细说明spring cloud和Spring Cloud Alibaba的联系和区别
  • 【0基础学算法】二分查找 (超详细讲解+私人笔记+源码)
  • 计算机网络作业(存储单位k、KB、MB、GB、TB、PB;手机运行内存和内存的区别)
  • 正则表达式 (Regex) 2022教程
  • 第五章Redis 的发布和订阅
  • Dueling Network Architectures for Deep Reinforcement Learning(Dueling-DQN)
  • Vue 3.2 + TypeScript + Pinia + Vite2 + Element-Plus 管理系统(开源啦 )
  • vue工程化vue-cli创建项目以及图形创建vue项目
  • 浏览器http提交protobuf二进制数据正常,微信小程序失败解决方案
  • 实现 QQuickImageProvider 的若干问题的思路
  • 算法——查找
  • C/C++语言100题练习计划 82——加勒比海盗船(贪心算法实现)
  • RHCE(四)--- DNS服务的正反向解析配置
  • [PHP内核探索]PHP中的哈希表
  • 【腾讯Bugly干货分享】从0到1打造直播 App
  • 230. Kth Smallest Element in a BST
  • 3.7、@ResponseBody 和 @RestController
  • 30天自制操作系统-2
  • Angular4 模板式表单用法以及验证
  • Apache的80端口被占用以及访问时报错403
  • Babel配置的不完全指南
  • css的样式优先级
  • Java Agent 学习笔记
  • k个最大的数及变种小结
  • Python学习之路13-记分
  • zookeeper系列(七)实战分布式命名服务
  • 关于for循环的简单归纳
  • 蓝海存储开关机注意事项总结
  • 前嗅ForeSpider教程:创建模板
  • 人脸识别最新开发经验demo
  • 日剧·日综资源集合(建议收藏)
  • MyCAT水平分库
  • 如何在招聘中考核.NET架构师
  • $.extend({},旧的,新的);合并对象,后面的覆盖前面的
  • (07)Hive——窗口函数详解
  • (html转换)StringEscapeUtils类的转义与反转义方法
  • (Redis使用系列) Springboot 使用Redis+Session实现Session共享 ,简单的单点登录 五
  • (笔试题)合法字符串
  • (理论篇)httpmoudle和httphandler一览
  • (三)centos7案例实战—vmware虚拟机硬盘挂载与卸载
  • (十)DDRC架构组成、效率Efficiency及功能实现
  • (五)网络优化与超参数选择--九五小庞
  • ******IT公司面试题汇总+优秀技术博客汇总
  • .chm格式文件如何阅读
  • .NET Core 实现 Redis 批量查询指定格式的Key
  • .NET 编写一个可以异步等待循环中任何一个部分的 Awaiter
  • .NET 药厂业务系统 CPU爆高分析
  • .NET 中 GetProcess 相关方法的性能
  • .NetCore 如何动态路由
  • .net程序集学习心得
  • .net分布式压力测试工具(Beetle.DT)
  • .NET平台开源项目速览(15)文档数据库RavenDB-介绍与初体验
  • .NET下的多线程编程—1-线程机制概述