1、概述
在不用爬虫框架的情况,经过多方学习,尝试实现了一个分布式爬虫系统,并且可以将数据保存到不同地方,类似MySQL、HBase等。 基于面向接口的编码思想来开发,因此这个系统具有一定的扩展性,有兴趣的朋友直接看一下代码,就能理解其设计思想,虽然代码目前来说很多地方还是比较紧耦合,但只要花些时间和精力,很多都是可抽取出来并且可配置化的。
因为时间的关系,我只写了京东和苏宁易购两个网站的爬虫,但是完全可以实现不同网站爬虫的随机调度,基于其代码结构,再写国美、天猫等的商品爬取,难度不大,但是估计需要花很多时间和精力。因为在解析网页的数据时,实际上需要花很多时间,比如我在爬取苏宁易购商品的价格时,价格是异步获取的,并且其api是一长串的数字组合,我花了几个小时的时间才发现其规律,当然也承认,我的经验不足。
这个系统的设计,除了基本的数据爬取以外,更关注以下几个方面的问题:
1.如何实现分布式 ,同一个程序打包后分发到不同的节点运行时,不影响整体的数据爬取
2.如何实现url随机循环调度 ,核心是针对不同的顶级域名做随机
3.如何定时向url仓库中添加种子url ,达到不让爬虫系统停下来的目的
4.如何实现对爬虫节点程序的监控 ,并能够发邮件报警
5.如何实现一个随机IP代理库 ,目的跟第2点有点类似,都是为了反反爬虫
下面会针对这个系统来做一个整体的基本介绍,其实我在代码中都有非常详细的注释,有兴趣的朋友可以参考一下代码,最后我会给出一些我爬虫时的数据分析。
另外需要注意的是,这个爬虫系统是基于Java实现的,但是语言本身仍然不是最重要的,有兴趣的朋友可以尝试用Python实现。
2 分布式爬虫系统架构 整体系统架构如下:
所以从上面的架构可以看出,整个系统主要分为三个部分:
爬虫系统就是用来爬取数据的,因为系统设计为分布式,因此,爬虫程序本身可以运行在不同的服务器节点上。
url调度系统核心在于url仓库,所谓的url仓库其实就是用Redis保存了需要爬取的url列表,并且在我们的url调度器中根据一定的策略来消费其中的url,从这个角度考虑,url仓库其实也是一个url队列。
监控报警系统主要是对爬虫节点进行监控,虽然并行执行的爬虫节点中的某一个挂掉了对整体数据爬取本身没有影响(只是降低了爬虫的速度),但是我们还是希望知道能够主动接收到节点挂掉的通知,而不是被动地发现。
下面将会针对以上三个方面并结合部分代码片段来对整个系统的设计思路做一些基本的介绍,对系统完整实现有浓厚兴趣的朋友可以直接参考源代码。
3 爬虫系统 (说明:zookeeper监控属于监控报警系统,url调度器属于URL调度系统)
爬虫系统是一个独立运行的进程,我们把我们的爬虫系统打包成jar包,然后分发到不同的节点上执行,这样并行爬取数据可以提高爬虫的效率。
3.1 随机IP代理器 加入随机IP代理主要是为了反反爬虫,因此如果有一个IP代理库,并且可以在构建http客户端时可以随机地使用不同的代理,那么对我们进行反反爬虫则会有很大的帮助。
在系统中使用IP代理库,需要先在文本文件中添加可用的代理地址信息:
# IPProxyRepository .txt
58.60 .255 .104 :8118 219.135 .164 .245 :3128 27.44 .171 .27 :9999 219.135 .164 .245 :3128 58.60 .255 .104 :8118 58.252 .6 .165 :9000 ......
需要注意的是,上面的代理IP是我在西刺代理上拿到的一些代理IP,不一定可用,建议是自己花钱购买一批代理IP,这样可以节省很多时间和精力去寻找代理IP。
然后在构建http客户端的工具类中,当第一次使用工具类时,会把这些代理IP加载进内存中,加载到Java的一个HashMap:
private static Map IPProxyRepository = new HashMap<>();private static String[] keysArray = null ;
static { InputStream in = HttpUtil.class.getClassLoader().getResourceAsStream("IPProxyRepository.txt" ); InputStreamReader isr = new InputStreamReader(in); BufferedReader bfr = new BufferedReader(isr); String line = null ; try { while ((line = bfr.readLine()) != null ) { String[] split = line.split(":" ); String host = split[0 ]; int port = Integer.valueOf(split[1 ]); IPProxyRepository.put(host, port); } Set keys = IPProxyRepository.keySet(); keysArray = keys.toArray(new String[keys.size()]); } catch (IOException e) { e.printStackTrace(); } }
之后,在每次构建http客户端时,都会先到map中看是否有代理IP,有则使用,没有则不使用代理:
CloseableHttpClient httpClient = null ; HttpHost proxy = null ;if (IPProxyRepository.size() > 0 ) {
proxy = getRandomProxy(); httpClient = HttpClients.custom().setProxy(proxy).build(); } else { httpClient = HttpClients.custom().build(); } HttpGet request = new HttpGet(url); ......
随机代理对象则通过下面的方法生成:
public static HttpHost getRandomProxy () { Random random = new Random(); String host = keysArray[random.nextInt(keysArray.length)]; int port = IPProxyRepository.get(host); HttpHost proxy = new HttpHost(host, port); return proxy; }
这样,通过上面的设计,基本就实现了随机IP代理器的功能,当然,其中还有很多可以完善的地方,比如,当使用这个IP代理而请求失败时,是否可以把这一情况记录下来,当超过一定次数时,再将其从代理库中删除,同时生成日志供开发人员或运维人员参考,这是完全可以实现的,不过我就不做这一步功能了。
3.2 网页下载器 网页下载器就是用来下载网页中的数据,主要基于下面的接口开发:
public interface IDownload { public Page download (String url) ; }
基于此,在系统中只实现了一个http get的下载器,但是也可以完成我们所需要的功能了:
public class HttpGetDownloadImpl implements IDownload { @Override public Page download (String url) { Page page = new Page(); String content = HttpUtil.getHttpContent(url); page.setUrl(url); page.setContent(content); return page; } }
3.3 网页解析器 网页解析器就是把下载的网页中我们感兴趣的数据解析出来,并保存到某个对象中,供数据存储器进一步处理以保存到不同的持久化仓库中,其基于下面的接口进行开发:
public interface IParser { public void parser (Page page) ; }
网页解析器在整个系统的开发中也算是比较重头戏的一个组件,功能不复杂,主要是代码比较多,针对不同的商城不同的商品,对应的解析器可能就不一样了,因此需要针对特别的商城的商品进行开发,因为很显然,京东用的网页模板跟苏宁易购的肯定不一样,天猫用的跟京东用的也肯定不一样,所以这个完全是看自己的需要来进行开发了,只是说,在解析器开发的过程当中会发现有部分重复代码,这时就可以把这些代码抽象出来开发一个工具类了。
目前在系统中爬取的是京东和苏宁易购的手机商品数据,因此与就写了这两个实现类:
public class JDHtmlParserImpl implements IParser { ...... }public class SNHtmlParserImpl implements IParser { ......
}
3.4 数据存储器 数据存储器主要是将网页解析器解析出来的数据对象保存到不同的,而对于本次爬取的手机商品,数据对象是下面一个Page对象:
public class Page { private String content; private String id; private String source; private String brand; private String title; private float price; private int commentCount; private String url; private String imgUrl;
private String params; private List urls = new ArrayList<>(); }
对应的,在MySQL中,表数据结构如下:
DROP TABLE IF EXISTS `phone` ;CREATE TABLE `phone` ( `id` varchar (30 ) CHARACTER SET armscii8 NOT NULL COMMENT '商品id' , `source` varchar (30 ) NOT NULL COMMENT '商品来源,如jd suning gome等' , `brand` varchar (30 ) DEFAULT NULL
COMMENT '手机品牌' , `title` varchar (255 ) DEFAULT NULL COMMENT '商品页面的手机标题' , `price` float (10 ,2 ) DEFAULT NULL COMMENT '手机价格' , `comment_count` varchar (30 ) DEFAULT NULL COMMENT '手机评论' , `url` varchar (500 ) DEFAULT NULL COMMENT '手机详细信息地址' , `img_url` varchar (500 ) DEFAULT NULL COMMENT '图片地址' , `params` text COMMENT '手机参数,json格式存储' , PRIMARY KEY (`id` ,`source`
) ) ENGINE =InnoDB DEFAULT CHARSET =utf8;
而在HBase中的表结构则为如下:
create 'phone' , 'cf1' , 'cf2' hbase(main ):135 :0 > desc 'phone' Table phone is ENABLED phone COLUMN FAMILIES DESCRIPTION {NAME => 'cf1' , BLOOMFILTER => 'ROW' , VERSIONS => '1' , IN_MEMORY => 'false' , KEEP_DELETED_CELLS => 'FALSE' , DATA_BLOCK _ENCODING => 'NONE' , TTL => 'FOREVER' , COMPRESSION => 'NONE' , MIN_VERSIONS => '0' , BLOCKCACHE => 'true' , BLOCKSIZE => '65536'
, REPLICATION_SCOPE => '0' } {NAME => 'cf2' , BLOOMFILTER => 'ROW' , VERSIONS => '1' , IN_MEMORY => 'false' , KEEP_DELETED_CELLS => 'FALSE' , DATA_BLOCK _ENCODING => 'NONE' , TTL => 'FOREVER' , COMPRESSION => 'NONE' , MIN_VERSIONS => '0' , BLOCKCACHE => 'true' , BLOCKSIZE => '65536' , REPLICATION_SCOPE => '0' } 2 row (s) in 0.0350 seconds
即在HBase中建立了两个列族,分别为cf1、cf2,其中cf1用来保存id source price comment brand url字段信息,cf2用来保存title params imgUrl字段信息。
不同的数据存储用的是不同的实现类,但是其都是基于下面同一个接口开发的:
public interface IStore { public void store (Page page) ; }
然后基于此开发了MySQL的存储实现类、HBase的存储实现类还有控制台的输出实现类,如MySQL的存储实现类,其实就是简单的数据插入语句:
public class MySQLStoreImpl implements IStore { private QueryRunner queryRunner = new QueryRunner(DBCPUtil.getDataSource()); @Override public void store (Page page) { String sql = "insert into phone(id, source, brand, title, price, comment_count, url, img_url, params) values(?, ?, ?, ?, ?, ?, ?, ?, ?)" ; try { queryRunner.update(sql, page.getId(), page.getSource(), page.getBrand(), page.getTitle(), page.getPrice(), page.getCommentCount(), page.getUrl(), page.getImgUrl(), page.getParams()); } catch (SQLException e) { e.printStackTrace(); } } }
而HBase的存储实现类,则是HBase Java API的常用插入语句代码:
...... Put pricePut = new Put(rowKey);
pricePut.addColumn(cf1, "price" .getBytes(), page.getPrice() != null ? String.valueOf(page.getPrice()).getBytes() : "" .getBytes()); puts.add(pricePut); Put commentPut = new Put(rowKey); commentPut.addColumn(cf1, "comment" .getBytes(), page.getCommentCount() != null ? String.valueOf(page.getCommentCount()).getBytes() : "" .getBytes()); puts.add(commentPut); Put brandPut = new Put(rowKey); brandPut.addColumn(cf1, "brand" .getBytes(), page.getBrand() != null ? page.getBrand().getBytes() : "" .getBytes()); puts.add(brandPut); ......
当然,至于要将数据存储在哪个地方,在初始化爬虫程序时,是可以手动选择的:
iSpider.setStore(new HBaseStoreImpl());
目前还没有把代码写成可以同时存储在多个地方,按照目前代码的架构,要实现这一点也比较简单,修改一下相应代码就好了。实际上,是可以先把数据保存到MySQL中,然后通过Sqoop导入到HBase中,详细操作可以参考我写的Sqoop文章。
仍然需要注意的是,如果确定需要将数据保存到HBase中,请保证你有可用的集群环境,并且需要将如下配置文档添加到classpath下:
core-site.xml hbase-site.xml hdfs-site.xml
对大数据感兴趣的同学可以折腾一下这一点,如果之前没有接触过的,直接使用MySQL存储就好了,只需要在初始化爬虫程序时注入MySQL存储器即可:
iSpider.setStore(new MySQLStoreImpl());
4 URL调度系统
URL调度系统是实现整个爬虫系统分布式的桥梁与关键,正是通过URL调度系统的使用,才使得整个爬虫系统可以较为高效(Redis作为存储)随机地获取url,并实现整个系统的分布式。
4.1 URL仓库 通过架构图可以看出,所谓的URL仓库不过是Redis仓库,即在我们的系统中使用Redis来保存url地址列表,正是这样,才能保证我们的程序实现分布式,只要保存了url是唯一的,这样不管我们的爬虫程序有多少个,最终保存下来的数据都是只有唯一一份的,而不会重复,是通过这样来实现分布式的。
同时url仓库中的url地址在获取时的策略是通过队列的方式来实现的,待会通过URL调度器的实现即可知道。
另外,在我们的url仓库中,主要保存了下面的数据:
Redis的数据类型为list。
种子URL是持久化存储的,一定时间后,由URL定时器通过种子URL获取URL,并将其注入到我们的爬虫程序需要使用的高优先级URL队列中,这样就可以保存我们的爬虫程序可以源源不断地爬取数据而不需要中止程序的执行。
Redis的数据类型为set。
什么是高优先级URL队列?其实它就是用来保存列表url的。
那么什么是列表url呢?
说白了就是一个列表中含有多个商品,以京东为列,我们打开一个手机列表为例:
该地址中包含的不是一个具体商品的url,而是包含了多个我们需要爬取的数据(手机商品)的列表,通过对每个高级url的解析,我们可以获取到非常多的具体商品url,而具体的商品url,就是低优先url,其会保存到低优先级URL队列中。
那么以这个系统为例,保存的数据类似如下:
jd.com.higher --https: ... suning.com.higher --https: ...
Redis的数据类型为set。
低优先级URL其实就是具体某个商品的URL,如下面一个手机商品:
通过下载该url的数据,并对其进行解析,就能够获取到我们想要的数据。
那么以这个系统为例,保存的数据类似如下:
jd.com.lower ... suning.com.lower ...
4.2 URL调度器 所谓url调度器,其实说白了就是url仓库java代码的调度策略,不过因为其核心在于调度,所以将其放到URL调度器中来进行说明,目前其调度基于以下接口开发:
public interface IRepository { public String poll () ; public void offerHigher
(String highUrl) ; public void offerLower (String lowUrl) ; }
其基于Redis作为URL仓库的实现如下:
public class
RandomRedisRepositoryImpl implements IRepository { public RandomRedisRepositoryImpl () { init(); } public void init () { Jedis jedis = JedisUtil.getJedis(); Set domains = jedis.smembers(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY); String higherUrlKey; String lowerUrlKey; for (String domain : domains) { higherUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX; lowerUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX; jedis.del(higherUrlKey, lowerUrlKey); } JedisUtil.returnJedis(jedis); } @Override
public String poll () { Jedis jedis = JedisUtil.getJedis(); String randomDomain = jedis.srandmember(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY); String key = randomDomain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX; String url = jedis.lpop(key); if (url == null ) { key = randomDomain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX; url = jedis.lpop(key); } JedisUtil.returnJedis(jedis); return url; } @Override public void offerHigher (String highUrl) { offerUrl(highUrl, SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX); } @Override public void
offerLower (String lowUrl) { offerUrl(lowUrl, SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX); } public void offerUrl (String url, String urlTypeSuffix) { Jedis jedis = JedisUtil.getJedis(); String domain = SpiderUtil.getTopDomain(url); String key = domain + urlTypeSuffix; jedis.lpush(key, url); JedisUtil.returnJedis(jedis); } }
通过代码分析也是可以知道,其核心就在如何调度url仓库(Redis)中的url。
4.3 URL定时器 一段时间后,高优先级URL队列和低优先URL队列中的url都会被消费完,为了让程序可以继续爬取数据,同时减少人为的干预,可以预先在Redis中插入种子url,之后定时让URL定时器从种子url中取出url定存放到高优先级URL队列中,以此达到程序定时不间断爬取数据的目的。
url消费完毕后,是否需要循环不断爬取数据根据个人业务需求而不同,因此这一步不是必需的,只是也提供了这样的操作。因为事实上,我们需要爬取的数据也是每隔一段时间就会更新的,如果希望我们爬取的数据也跟着定时更新,那么这时定时器就有非常重要的作用了。不过需要注意的是,一旦决定需要循环重复爬取数据,则在设计存储器实现时需要考虑重复数据的问题,即重复数据应该是更新操作,目前在我设计的存储器不包括这个功能,有兴趣的朋友可以自己实现,只需要在插入数据前判断数据库中是否存在该数据即可。
另外需要注意的一点是,URL定时器是一个独立的进程,需要单独启动。
定时器基于Quartz实现,下面是其job的代码:
public class UrlJob implements
Job { private Logger logger = LoggerFactory.getLogger(UrlJob.class); @Override public void execute (JobExecutionContext context) throws JobExecutionException { Jedis jedis = JedisUtil.getJedis(); Set seedUrls = jedis.smembers(SpiderConstants.SPIDER_SEED_URLS_KEY); for (String seedUrl : seedUrls) { String domain = SpiderUtil.getTopDomain(seedUrl); jedis.sadd(domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX, seedUrl); logger.info("获取种子:{}" , seedUrl); } JedisUtil.returnJedis(jedis); } }
调度器的实现如下:
public class UrlJobScheduler
{ public UrlJobScheduler () { init(); } public void init () { try { Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); scheduler.start(); String name = "URL_SCHEDULER_JOB" ; String group = "URL_SCHEDULER_JOB_GROUP" ; JobDetail jobDetail = new JobDetail(name, group, UrlJob.class); String cronExpression = "0 10 1 * * ?" ; Trigger trigger = new CronTrigger(name, group, cronExpression); scheduler.scheduleJob(jobDetail, trigger); } catch (SchedulerException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } } public static
void main (String[] args) { UrlJobScheduler urlJobScheduler = new UrlJobScheduler(); urlJobScheduler.start(); } private void start () { while (true ) { } } }
5 监控报警系统 监控报警系统的加入主要是为了让使用者可以主动发现节点宕机,而不是被动地发现,因为实际中爬虫程序可能是持续不断运行的,并且我们会在多个节点上部署我们的爬虫程序,因此很有必要对节点进行监控,并且在节点出现问题时可以及时发现并修正,需要注意的是,监控报警系统是一个独立的进程,需要单独启动。
5.1 基本原理 首先需要先在zookeeper中创建一个/ispider
节点:
[zk: localhost:2181(CONNECTED) 1] create /ispider ispider Created /ispider
监控报警系统的开发主要依赖于zookeeper实现,监控程序对zookeeper下面的这个节点目录进行监听:
[zk: localhost:2181(CONNECTED) 0] ls /ispider []
爬虫程序启动时会在该节点目录下注册一个临时节点目录:
[zk: localhost:2181(CONNECTED) 0] ls /ispider [192.168.43.166]
当节点出现宕机时,该临时节点目录就会被zookeeper删除
[zk: localhost:2181(CONNECTED) 0] ls /ispider []
同时因为我们监听了节点目录/ispider
,所以当zookeeper删除其下的节点目录时(或增加一个节点目录),zookeeper会给我们的监控程序发送通知,即我们的监控程序会得到回调,这样便可以在回调程序中执行报警的系统动作,从而完成监控报警的功能。
5.2 zookeeper Java API使用说明 可以使用zookeeper原生的Java API,我在另外写的一个RPC框架(底层基于Netty实现远程通信)中就是使用原生的API,不过显然代码会复杂很多,并且本身需要对zookeeper有更多的学习和了解,这样用起来才会容易一些。
所以为了降低开发的难度,这里使用第三方封装的API,即curator,来进行zookeeper客户端程序的开发。
5.3 爬虫系统zookeeper注册 在启动爬虫系统时,我们的程序都会启动一个zookeeper客户端来向zookeeper来注册自身的节点信息,主要是ip地址,并在/ispider
节点目录以创建一个以该爬虫程序所在的节点IP地址命名的节点,如/ispider/192.168.43.116
,实现的代码如下:
private void registerZK () { String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181" ; int baseSleepTimeMs = 1000 ; int maxRetries = 3 ; RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); CuratorFramework curator = CuratorFrameworkFactory.newClient(zkStr, retryPolicy); curator.start();
String ip = null ; try { ip = InetAddress.getLocalHost().getHostAddress(); curator.create().withMode(CreateMode.EPHEMERAL).forPath("/ispider/" + ip, ip.getBytes()); } catch (UnknownHostException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } }
应该注意到的是,我们创建的节点为临时节点,要想实现监控报警功能,必须要为临时节点。
5.4 监控程序 首先需要先监听zookeeper中的一个节点目录,在我们的系统中,设计是监听/ispider
这个节点目录:
public SpiderMonitorTask () { String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181" ; int baseSleepTimeMs = 1000 ; int maxRetries = 3 ; RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); curator = CuratorFrameworkFactory.newClient(zkStr, retryPolicy); curator.start(); try { previousNodes = curator.getChildren().usingWatcher(this ).forPath("/ispider" ); } catch (Exception e) {
e.printStackTrace(); } }
在上面注册了zookeeper中的watcher
,也就是接收通知的回调程序,在该程序中,执行我们报警的逻辑:
@Override public void process (WatchedEvent event) { try { List currentNodes = curator.getChildren().usingWatcher(this ).forPath("/ispider" ); if (currentNodes.size() > previousNodes.size()) { for (String node : currentNodes) { if (!previousNodes.contains(node)) { logger.info("----有新的爬虫节点{}新增进来" , node); } } } else if (currentNodes.size() < previousNodes.size()) { for (String node : previousNodes) { if (!currentNodes.contains(node)) {
logger.info("----有爬虫节点{}挂掉了" , node); MailUtil.sendMail("有爬虫节点挂掉了,请人工查看爬虫节点的情况,节点信息为:" , node); } } } previousNodes = currentNodes; } catch (Exception e) { e.printStackTrace(); } }
当然,判断节点是否挂掉,上面的逻辑还是存在一定的问题的,按照上面的逻辑,假如某一时刻新增节点和删除节点事件同时发生,那么其就不能判断出来,所以如果需要更精准的话,可以将上面的程序代码修改一下。
5.5 邮件发送模块 使用模板代码就可以了,不过需要注意的是,在使用时,发件人的信息请使用自己的邮箱。
下面是爬虫节点挂掉时接收到的邮件:
实际上,如果购买了短信服务,那么通过短信API也可以向我们的手机发送短信。
6 实战:爬取京东、苏宁易购全网手机商品数据 因为前面在介绍这个系统的时候也提到了,我只写了京东和苏宁易购的网页解析器,所以接下来也就是爬取其全网的手机商品数据。
6.1 环境说明 需要确保Redis、Zookeeper服务可用,另外如果需要使用HBase来存储数据,需要确保Hadoop集群中的HBase可用,并且相关配置文件已经加入到爬虫程序的classpath中。
还有一点需要注意的是,URL定时器和监控报警系统是作为单独的进程来运行的,并且也是可选的。
6.2 爬虫结果 进行了两次爬取,分别尝试将数据保存到MySQL和HBase中,给出如下数据情况。
6.2.1 保存到MySQL mysql> select count(*) from phone;
+----------+ | count(*) | +----------+ | 12052 | +----------+ 1 row in set mysql> select count(*) from phone where source='jd.com'; +----------+ | count(*) | +----------+ | 9578 | +----------+ 1 row in set mysql> select count(*) from phone where source='suning .com'; +----------+ | count(*) | +----------+ | 2474 | +----------+ 1 row in set
在可视化工具中查看数据情况:
6.2.2 保存到HBase hbase(main): 225 : 0 * count 'phone' Current count: 1000 , row: 11155386088_ jd.com Current count: 2000 , row: 136191393_ suning.com Current count: 3000 , row: 16893837301_ jd.com Current count:
4000 , row: 19036619855_ jd.com Current count: 5000 , row: 1983786945_ jd.com Current count: 6000 , row: 1997392141_ jd.com Current count: 7000 , row: 21798495372_ jd.com Current count: 8000 , row: 24154264902_ jd.com Current count: 9000 , row: 25687565618_ jd.com Current count: 10000 , row: 26458674797_ jd.com Current count: 11000 , row: 617169906_ suning.com Current count: 12000 , row: 769705049_ suning.com 12348 row(s) in 1.5720 seconds => 12348
在HDFS中查看数据情况:
6.2.3 数据量与实际情况分析 京东手机的列表大概有160多页,每个列表有60个商品数据,所以总量在9600左右,我们的数据基本是符合的,后面通过日志分析其实可以知道,一般丢失的数据为连接超时导致的,所以在选取爬虫的环境时,更建议在网络环境好的主机上进行,同时如果可以有IP代理地址库就更好了,另外对于连接超时的情况,其实是可以进一步在我们的程序中加以控制,一旦出现爬取数据失败的url,可以将其加入到重试url队列中,目前这一点功能我是没有做,有兴趣的同学可以试一下。
再来看看苏宁的,其有100页左右的手机列表,每页也是60个商品数据,所以总量在6000左右。但可以看到,我们的数据却只有3000这样的数量级(缺少的依然是频繁爬取造成的连接失败问题),这是为什么呢?
这是因为,打开苏宁的某个列表页面后,其是先加载30个商品,当鼠标向下滑动时,才会通过另外的API去加载其它的30个商品数据,每一个列表页面都是如此,所以,实际上,我们是缺少了一半的商品数据没有爬取。知道这个原因之后,实现也不难,但是因为时间关系,我就没有做了,有兴趣的朋友折腾一下吧。
6.3 通过日志分析爬虫系统的性能 在我们的爬虫系统中,每个关键的地方,如网页下载、数据解析等都是有打logger的,所以通过日志,可以大概分析出相关的时间参数。
2018-04-01 21:26:03 [pool-1-thread-1] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下载网页:https://list.jd.com/list.html?cat=9987,653,655&page=1,消耗时长:590 ms,代理信息:null:null 2018-04-01 21:26:03 [pool-1-thread-1] [cn.xpleaf.spider.core.parser.Impl.JDHtmlParserImpl] [INFO] - 解析列表页面:https://list.jd.com/list.html?cat=9987,653,655&page=1, 消耗时长:46ms 2018-04-01 21:26:03 [pool-1-thread-3] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - 解析列表页面:https://list.suning.com/0-20006-0.html, 消耗时长:49ms 2018-04-01 21:26:04 [pool-1-thread-5] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下载网页:https://item.jd.com/6737464.html,消耗时长:219 ms,代理信息:null:null 2018-04-01 21:26:04 [pool-1-thread-2] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下载网页:https://list.jd.com/list.html?cat=9987,653,655&page=2&sort=sort_rank_asc&trans=1&JL=6_0_0,消耗时长:276 ms,代理信息:null:null 2018-04-01 21:26:04 [pool-1-thread-4] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下载网页:https://list.suning.com/0-20006-99.html,消耗时长:300 ms,代理信息:null:null 2018-04-01 21:26:04 [pool-1-thread-4] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - 解析列表页面:https://list.suning.com/0-20006-99.html, 消耗时长:4ms ...... 2018-04-01 21:27:49 [pool-1-thread-3] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下载网页:https://club.jd.com/comment/productCommentSummaries.action?referenceIds=23934388891,消耗时长:176 ms,代理信息:null:null 2018-04-01 21:27:49 [pool-1-thread-3] [cn.xpleaf.spider.core.parser.Impl.JDHtmlParserImpl] [INFO] - 解析商品页面:https://item.jd.com/23934388891.html, 消耗时长:413ms 2018-04-01 21:27:49 [pool-1-thread-2] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下载网页:https://review.suning.com/ajax/review_satisfy/general-00000000010017793337-0070079092-----satisfy.htm,消耗时长:308 ms,代理信息:null:null 2018-04-01 21:27:49 [pool-1-thread-2] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - 解析商品页面:https://product.suning.com/0070079092/10017793337.html, 消耗时长:588ms ......
平均下来,下载一个商品网页数据的时间在200~500毫秒不等,当然这个还需要取决于当时的网络情况。
另外,如果想要真正计算爬取一个商品的数据,可以通过日志下面的数据来计算:
下载一个商品页面数据的时间
获取价格数据的时间
获取评论数据的时间
在我的主机上(CPU:E5 10核心,内存:32GB,分别开启1个虚拟机和3个虚拟机),情况如下:
节点数 每节点线程数 商品数量 时间 1 5 京东+苏宁易购近13000个商品数据 141分钟 3 5 京东+苏宁易购近13000个商品数据 65分钟
可以看到,当使用3个节点时,时间并不会相应地缩小为原来的1/3,这是因为此时影响爬虫性能的问题主要是网络问题,节点数量多,线程数量大,网络请求也多,但是带宽一定,并且在没有使用代理的情况,请求频繁,连接失败的情况也会增多,对时间也有一定的影响,如果使用随机代理库,情况将会好很多。
但可以肯定的是,在横向扩展增加爬虫节点之后,确实可以大大缩小我们的爬虫时间,这也是分布式爬虫系统的好处。
7 爬虫系统中使用的反反爬虫策略 在整个爬虫系统的设计中,主要使用下面的策略来达到反反爬虫的目的:
8 总结 需要说明的是,本系统是基于Java实现的,但个人觉得,语言本身依然不是问题,核心在于对整个系统的设计上以及理解上,写此文章是希望分享这样一种分布式爬虫系统的架构给大家,如果对源代码感兴趣,可以到我的GitHub上查看。
代码地址:https://github.com/xpleaf/ispider
原文出处:http://blog.51cto.com/xpleaf/2093952
本文获取作者授权,版权为作者所有。