当前位置: 首页 > news >正文

Heritrix 3.1.0 源码解析(十二)

接下来分析BdbFrontier类的CrawlURI next()方法,该方法是获取下一个待采集的CrawlURI对象

该方法是在BdbFrontier类的父类的父类AbstractFrontier里面

org.archive.crawler.frontier.BdbFrontier

           org.archive.crawler.frontier.AbstractFrontier

/* (non-Javadoc)
     * @see org.archive.crawler.framework.Frontier#next()
     */
    public CrawlURI next() throws InterruptedException {
        CrawlURI crawlable = null;
        while(crawlable==null) {
            outboundLock.readLock().lockInterruptibly();
            // try filling outbound until we get something to work on
            crawlable = findEligibleURI();
            outboundLock.readLock().unlock();
        }
        return crawlable;
    }

继续调用BdbFrontier类的CrawlURI findEligibleURI()方法,在它的父类WorkQueueFrontier里面

/**
     * Return the next CrawlURI eligible to be processed (and presumably
     * visited/fetched) by a a worker thread.
     *
     * Relies on the readyClassQueues having been loaded with
     * any work queues that are eligible to provide a URI. 
     *
     * @return next CrawlURI eligible to be processed, or null if none available
     *
     * @see org.archive.crawler.framework.Frontier#next()
     */
    protected CrawlURI findEligibleURI() {
            // wake any snoozed queues
            wakeQueues();
            // consider rescheduled URIS
            checkFutures();
                   
            // find a non-empty ready queue, if any 
            // TODO: refactor to untangle these loops, early-exits, etc!
            WorkQueue readyQ = null;
            findauri: while(true) {
                findaqueue: do {
                    String key = readyClassQueues.poll();
                    if(key==null) {
                        // no ready queues; try to activate one
                        if(!getInactiveQueuesByPrecedence().isEmpty() 
                            && highestPrecedenceWaiting < getPrecedenceFloor()) {
                            activateInactiveQueue();
                            continue findaqueue;
                        } else {
                            // nothing ready or readyable
                            break findaqueue;
                        }
                    }
                    readyQ = getQueueFor(key);
                    if(readyQ==null) {
                         // readyQ key wasn't in all queues: unexpected
                        logger.severe("Key "+ key +
                            " in readyClassQueues but not allQueues");
                        break findaqueue;
                    }
                    if(readyQ.getCount()==0) {
                        // readyQ is empty and ready: it's exhausted
                        readyQ.noteExhausted(); 
                        readyQ.makeDirty();
                        readyQ = null;
                        continue; 
                    }
                    if(!inProcessQueues.add(readyQ)) {
                        // double activation; discard this and move on
                        // (this guard allows other enqueuings to ready or 
                        // the various inactive-by-precedence queues to 
                        // sometimes redundantly enqueue a queue key)
                        readyQ = null; 
                        continue;
                    }
                    // queue has gone 'in process' 
                    readyQ.considerActive();
                    readyQ.setWakeTime(0); // clear obsolete wake time, if any

                    readyQ.setSessionBudget(getBalanceReplenishAmount());
                    readyQ.setTotalBudget(getQueueTotalBudget()); 
                    if (readyQ.isOverSessionBudget()) {
                        deactivateQueue(readyQ);
                        readyQ.makeDirty();
                        readyQ = null;
                        continue; 
                    }
                    if (readyQ.isOverTotalBudget()) {
                        retireQueue(readyQ);
                        readyQ.makeDirty();
                        readyQ = null;
                        continue; 
                    }
                } while (readyQ == null);
                
                if (readyQ == null) {
                    // no queues left in ready or readiable
                    break findauri; 
                }
           
                returnauri: while(true) { // loop left by explicit return or break on empty
                    CrawlURI curi = null;
                    curi = readyQ.peek(this);   
                    if(curi == null) {
                        // should not reach
                        logger.severe("No CrawlURI from ready non-empty queue "
                                + readyQ.classKey + "\n" 
                                + readyQ.shortReportLegend() + "\n"
                                + readyQ.shortReportLine() + "\n");
                        break returnauri;
                    }
                    
                    // from queues, override names persist but not map source
                    curi.setOverlayMapsSource(sheetOverlaysManager);
                    // TODO: consider optimizations avoiding this recalc of
                    // overrides when not necessary
                    sheetOverlaysManager.applyOverlaysTo(curi);
                    // check if curi belongs in different queue
                    String currentQueueKey;
                    try {
                        KeyedProperties.loadOverridesFrom(curi);
                        currentQueueKey = getClassKey(curi);
                    } finally {
                        KeyedProperties.clearOverridesFrom(curi); 
                    }
                    if (currentQueueKey.equals(curi.getClassKey())) {
                        // curi was in right queue, emit
                        noteAboutToEmit(curi, readyQ);
                        return curi;
                    }
                    // URI's assigned queue has changed since it
                    // was queued (eg because its IP has become
                    // known). Requeue to new queue.
                    // TODO: consider synchronization on readyQ
                    readyQ.dequeue(this,curi);
                    doJournalRelocated(curi);
                    curi.setClassKey(currentQueueKey);
                    decrementQueuedCount(1);
                    curi.setHolderKey(null);
                    sendToQueue(curi);
                    if(readyQ.getCount()==0) {
                        // readyQ is empty and ready: it's exhausted
                        // release held status, allowing any subsequent 
                        // enqueues to again put queue in ready
                        // FIXME: tiny window here where queue could 
                        // receive new URI, be readied, fail not-in-process?
                        inProcessQueues.remove(readyQ);
                        readyQ.noteExhausted();
                        readyQ.makeDirty();
                        readyQ = null;
                        continue findauri;
                    }
                }
            }
                
            if(inProcessQueues.size()==0) {
                // Nothing was ready or in progress or imminent to wake; ensure 
                // any piled-up pending-scheduled URIs are considered
                uriUniqFilter.requestFlush();
            }
            
            // if truly nothing ready, wait a moment before returning null
            // so that loop in surrounding next() has a chance of getting something
            // next time
            if(getTotalEligibleInactiveQueues()==0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // 
                } 
            }
            
            // nothing eligible
            return null; 
    }

 首先是根据ClassKey获取WorkQueue类型对象,这里是BdbWorkQueue对象(这个ClassKey值的获取涉及到Heritrix3.1.0工作队列的调度,后文再分析),

然后是调用BdbWorkQueue对象的CrawlURI peek(final WorkQueueFrontier frontier)方法,在它的父类WorkQueue里面

/**
     * Return the topmost queue item -- and remember it,
     * such that even later higher-priority inserts don't
     * change it. 
     * 
     * TODO: evaluate if this is really necessary
     * @param frontier Work queues manager
     * 
     * @return topmost queue item, or null
     */
    public synchronized CrawlURI peek(final WorkQueueFrontier frontier) {
        if(peekItem == null && count > 0) {
            try {
                peekItem = peekItem(frontier);
            } catch (IOException e) {
                //FIXME better exception handling
                logger.log(Level.SEVERE,"peek failure",e);
                e.printStackTrace();
                // throw new RuntimeException(e);
            }
            if(peekItem != null) {
                lastPeeked = peekItem.toString();
            }
        }
        return peekItem;
    }

进一步调用CrawlURI peekItem(final WorkQueueFrontier frontier)方法

org.archive.crawler.frontier.BdbWorkQueue

protected CrawlURI peekItem(final WorkQueueFrontier frontier)
    throws IOException {
        final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier)
            .getWorkQueues();
        DatabaseEntry key = new DatabaseEntry(origin);
        CrawlURI curi = null;
        int tries = 1;
        while(true) {
            try {
                curi = queues.get(key);
            } catch (DatabaseException e) {
                LOGGER.log(Level.SEVERE,"peekItem failure; retrying",e);
            }
            
            // ensure CrawlURI, if any,  came from acceptable range: 
            if(!ArchiveUtils.startsWith(key.getData(),origin)) {
                LOGGER.severe(
                    "inconsistency: "+classKey+"("+
                    getPrefixClassKey(origin)+") with " + getCount() + " items gave "
                    + curi +"("+getPrefixClassKey(key.getData()));
                // clear curi to allow retry
                curi = null; 
                // reset key to original origin for retry
                key.setData(origin);
            }
            
            if (curi!=null) {
                // success
                break;
            }
            
            if (tries>3) {
                LOGGER.severe("no item where expected in queue "+classKey);
                break;
            }
            tries++;
            LOGGER.severe("Trying get #" + Integer.toString(tries)
                    + " in queue " + classKey + " with " + getCount()
                    + " items using key "
                    + getPrefixClassKey(key.getData()));
        }
 
        return curi;
    }

 上面我们可以看到,之后调用的是BdbMultipleWorkQueues对象的方法,传入DatabaseEntry key = new DatabaseEntry(origin)参数

这里的origin是byte[]类型的,是根据BdbWorkQueue工作队列的classKey值算出来的,在BdbWorkQueue的构造函数里面

/**
     * Create a virtual queue inside the given BdbMultipleWorkQueues 
     * 
     * @param classKey
     */
    public BdbWorkQueue(String classKey, BdbFrontier frontier) {
        super(classKey);
        this.origin = BdbMultipleWorkQueues.calculateOriginKey(classKey);
        if (LOGGER.isLoggable(Level.FINE)) {
            LOGGER.fine(getPrefixClassKey(this.origin) + " " + classKey);
        }
        // add the queue-front 'cap' entry; see...
        // http://sourceforge.net/tracker/index.php?func=detail&aid=1262665&group_id=73833&atid=539102
        frontier.getWorkQueues().addCap(origin);
    }

至于怎么算出来的,BdbMultipleWorkQueues类的byte[] calculateOriginKey(String classKey)静态方法 

/**
     * Calculate the 'origin' key for a virtual queue of items
     * with the given classKey. This origin key will be a 
     * prefix of the keys for all items in the queue. 
     * 
     * @param classKey String key to derive origin byte key from 
     * @return a byte array key 
     */
    static byte[] calculateOriginKey(String classKey) {
        byte[] classKeyBytes = null;
        int len = 0;
        try {
            classKeyBytes = classKey.getBytes("UTF-8");
            len = classKeyBytes.length;
        } catch (UnsupportedEncodingException e) {
            // should be impossible; all JVMs must support UTF-8
            e.printStackTrace();
        }
        byte[] keyData = new byte[len+1];
        System.arraycopy(classKeyBytes,0,keyData,0,len);
        keyData[len]=0;
        return keyData;
    }

最后BdbMultipleWorkQueues对象的CrawlURI get(DatabaseEntry headKey)方法,我们在前面的文章已经看过了,这里不再贴出

---------------------------------------------------------------------------

本系列Heritrix 3.1.0 源码解析系本人原创

转载请注明出处 博客园 刺猬的温驯

本文链接 http://www.cnblogs.com/chenying99/archive/2013/04/17/3025414.html

相关文章:

  • oracle alert 日志位置
  • 转:字符编码笔记:SCII,Unicode和UTF-8
  • tomcat服务器宕机解决方案
  • JS页面跳转
  • 在Firefox 58中,WebAssembly组件性能提升了10倍
  • Java之jdbc_采用Statement查询全部数据
  • Node学习4-Buffer模块
  • nginx 和apache 性能测试对比
  • 初识 Vue(07)---(Vue 实例的生命周期钩子)
  • 征服 TIME_WAIT !
  • 如何给操作文档添加目录
  • 保存Hive查询结果的方法
  • Session
  • greenplum安装文档
  • 第42件事 移动App设计的11大法则
  • JavaScript 如何正确处理 Unicode 编码问题!
  • 分享一款快速APP功能测试工具
  • 08.Android之View事件问题
  • Consul Config 使用Git做版本控制的实现
  • JavaScript学习总结——原型
  • js中forEach回调同异步问题
  • Linux快速配置 VIM 实现语法高亮 补全 缩进等功能
  • node和express搭建代理服务器(源码)
  • Redis 中的布隆过滤器
  • Twitter赢在开放,三年创造奇迹
  • uni-app项目数字滚动
  • 搭建gitbook 和 访问权限认证
  • 基于Android乐音识别(2)
  • 讲清楚之javascript作用域
  • 今年的LC3大会没了?
  • 开发了一款写作软件(OSX,Windows),附带Electron开发指南
  • 快速体验 Sentinel 集群限流功能,只需简单几步
  • 入门到放弃node系列之Hello Word篇
  • 想晋级高级工程师只知道表面是不够的!Git内部原理介绍
  • 学习HTTP相关知识笔记
  • 这几个编码小技巧将令你 PHP 代码更加简洁
  • 正则表达式
  • - 转 Ext2.0 form使用实例
  • ​香农与信息论三大定律
  • #NOIP 2014#day.2 T1 无限网络发射器选址
  • #基础#使用Jupyter进行Notebook的转换 .ipynb文件导出为.md文件
  • #我与Java虚拟机的故事#连载09:面试大厂逃不过的JVM
  • $(function(){})与(function($){....})(jQuery)的区别
  • (二)【Jmeter】专栏实战项目靶场drupal部署
  • (附源码)springboot掌上博客系统 毕业设计063131
  • (附源码)计算机毕业设计ssm本地美食推荐平台
  • (黑客游戏)HackTheGame1.21 过关攻略
  • (简单有案例)前端实现主题切换、动态换肤的两种简单方式
  • (免费领源码)python#django#mysql公交线路查询系统85021- 计算机毕业设计项目选题推荐
  • (一)pytest自动化测试框架之生成测试报告(mac系统)
  • (转)c++ std::pair 与 std::make
  • (转)Spring4.2.5+Hibernate4.3.11+Struts1.3.8集成方案一
  • (转)重识new
  • .CSS-hover 的解释
  • .Net Attribute详解(上)-Attribute本质以及一个简单示例