需要引入的类包:mongo-java-driver-2.11.2.jar、mongo-hadoop-core_1.0.4-1.1.0.jar 一、从MongoDB上读数据,进行MapReduce后,把结果在在HDFS上。 1、Job的配置启动类: package com.test.similarity.dataimport; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.log4j.Logger; import com.test.similarity.util.Constants; import com.test.similarity.util.HdfsUtil; import com.mongodb.hadoop.MongoInputFormat; import com.mongodb.hadoop.util.MongoConfigUtil; /** * 从mongondb中读取数据,导入到指定的hdfs路径上. * @author 907897 */ public class ImportJob { private static final Logger logger = Logger.getLogger(ImportJob.class); public boolean run(String import_source,String import_data){ try{ Configuration conf = new Configuration(); MongoConfigUtil.setInputURI(conf,import_source); conf.set("mapred.job.tracker", Constants.MAPRED_JOB_TRACKER); conf.set("fs.default.name", Constants.FS_DEFAULT_NAME); Job job = new Job(conf, "MongoDBJob job"); // job 处理类配置 job.setJarByClass(ImportJob.class); job.setMapperClass(ImportMapper.class); job.setReducerClass(ImportReducer.class); FileSystem fileSys = FileSystem.get(conf); // job 类型配置 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(MongoInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); // 判断输出路径relation_data是否已存在。如存在:则删除之。 if (HdfsUtil.checkFileExist(fileSys, import_data)) { logger.info("输出路径:" + import_data + "已存在。程序已对其进行删除。"); HdfsUtil.deleteFileOrPath(fileSys, import_data); } // job 输入输出路径配置 FileOutputFormat.setOutputPath(job, new Path(import_data)); // job 运行 boolean runOK = job.waitForCompletion(true) ? true : false; return runOK; }catch(Exception e){ e.printStackTrace(); } return false; } } 2、对应的Mapper处理函数 package com.test.similarity.dataimport; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.bson.BSONObject; import com.test.similarity.asset.AssetUtil; import com.test.similarity.asset.AssetVO; public class ImportMapper extends Mapper<Object, BSONObject, Text, Text> { public void map(Object key, BSONObject value, Context context) throws IOException, InterruptedException{ // System.out.println("value:"+value); AssetVO assetVO = AssetUtil.trunBSONObjectToAssetVO(value); value = null; context.write(new Text(assetVO.getAssetId()), new Text(assetVO.toString())); } } 3、对应的Reducer处理函数 package com.test.similarity.dataimport; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class ImportReducer extends Reducer<Text, Text, Text, Text>{ public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Iterator<Text> iterator = values.iterator(); while(iterator.hasNext()) { context.write(iterator.next(),null); } } } 二、从HDFS上读数据,通过MapReduce处理后,结果存放于MongoDB中。 1、Job的配置启动类: package com.test.similarity.dataexport; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import com.test.similarity.util.Constants; import com.mongodb.hadoop.MongoOutputFormat; import com.mongodb.hadoop.util.MongoConfigUtil; public class ExportJob { //private static Log logger = LogFactory.getLog(ExportJob.class); public boolean run(String relation_data,String export_dest){ try{ Configuration conf = new Configuration(); MongoConfigUtil.setOutputURI(conf,export_dest); conf.set("mapred.job.tracker", Constants.MAPRED_JOB_TRACKER); conf.set("fs.default.name", Constants.FS_DEFAULT_NAME); Job job = new Job(conf, "MongoDBJob job"); // job 配置 job.setJarByClass(ExportJob.class); job.setMapperClass(ExportMapper.class); job.setReducerClass(ExportReducer.class); job.setOutputKeyClass(Text.class); // 输出的key类型,在OutputFormat会检查 job.setOutputValueClass(Text.class);// 输出的value类型,在OutputFormat会检查 job.setInputFormatClass(KeyValueTextInputFormat.class); job.setOutputFormatClass(MongoOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(relation_data)); // job 运行 boolean runOK = job.waitForCompletion(true) ? true : false; return runOK; }catch(Exception e){ e.printStackTrace(); } return false; } } 2、对应的Mapper处理函数 package com.test.similarity.dataexport; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class ExportMapper extends Mapper<Text, Text, Text, Text> { //private static Log logger = LogFactory.getLog(ExportMapper.class); public void map(Text key, Text value, Context context ) throws IOException, InterruptedException{ System.out.println("key:"+key.toString()); System.out.println("value:"+value.toString()); context.write(key, value); } } 3、对应的Reducer处理函数 package com.test.similarity.dataexport; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.bson.types.ObjectId; import com.test.similarity.util.Constants; import com.test.similarity.util.DateUtil; import com.test.similarity.util.StringUtil; import com.mongodb.BasicDBObject; import com.mongodb.hadoop.io.BSONWritable; public class ExportReducer extends Reducer<Text, Text, Text, BSONWritable>{ private static final String splitFlag = Constants.FIELD_SPLIT; // ; private static final String keyValueFlag = Constants.KEY_VALUE_SIGN; // = private static final String itemSplitFlag = Constants.ITEM_SPLIT_SIGN; // #@# public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String text = key.toString(); if (StringUtil.isNull(text)) { return; } BSONWritable bsonWritable = new BSONWritable(); BasicDBObject document = new BasicDBObject(); String[] result = text.toString().trim().split(itemSplitFlag); int pos = 0; String content = ""; document.put("_id",new ObjectId()); for (int i = 0; i < result.length; i++) { content = result[i]; pos = content.indexOf(keyValueFlag); document.put(content.substring(0, pos), content.substring(pos + 1)); } // document.put("createTime", DateUtil.getCurDate(1)); bsonWritable.setDoc(document); // System.out.println("bsonWritable:"+bsonWritable); context.write(key, bsonWritable); } }
相关推荐
mapreduce方式入库hbase hive hdfs,速度很快,里面详细讲述了代码的编写过程,值得下载
Hadoop中的HDFS和Mapreduce详细的讲解,原理以及样例
Hadoop介绍,HDFS和MapReduce工作原理
hdfs的api操作、mapreduce以及重写patitioner的例子,eclipse项目格式,导入就可以了
MapReduce 是 Google 在 2004 年发布的一个软件框架,用于支持大规模数据的分布式计算。 MongoDB 是一个开源的面向文档的 NoSQL 数据库系统,使用 C++ 编写。
Hadoop Core
Hadoop正是基于谷歌的mapreduce-osdi04和gfs-sosp2003这两篇论文构建起来的。
NULL 博文链接:https://jsh0401.iteye.com/blog/2096103
Hadoop简单应用案例,包括MapReduce、单词统计、HDFS基本操作、web日志分析、Zookeeper基本使用、Hive简单操作等
(2)打开网站localhost:8088和localhost:50070,查看MapReduce任务启动情况 (3)写wordcount代码并把代码生成jar包 (4)运行命令 (1):把linus下的文件放到hdfs上 (2):运行MapReduce (5):查看运行结果 ...
分布式文件管理系统 Hadoop MapReduce Hive
Hadoop HDFS和MapReduce架构浅析.pdf 更多资源请点击:https://blog.csdn.net/weixin_44155966
对Hadoop中的HDFS、MapReduce、Hbase系列知识的介绍。如果想初略了解Hadoop 可下载观看
Hadoop Eclipse开发MapReduce,HDFS,hive示例代码
学习hadoop的比较全的中文资料。文中几乎综合了所有入门用户需要的内容,包括hadoop项目的单机,为分布式,分布式的搭建和环境配置,以及具体的hdfs的内部结构等。
DS_MapReduce Hadoop MapReduce 实现与 HDFS 一起有 2 个主服务器 NameNode 和 Jobtracker。它们可以在不同的机器上运行。 Datanodes 和 TaskTracker 必须在同一台机器上运行。 GeneralClient 应该用于与 HDFS 通信...
Hadoop+HDFS和MapReduce架构浅析
1.HDFS命令行操作 1.1 HDFS命令行操作 1.1.1HDFS概述 HDFS概述 Hadoop由3个部分构成: Core HDFS MapReduce 1.1.2HDFS命令 HDFS命令 HDFS基本操作命令: -help [cmd] // 显示命令的帮助信息 -ls(r) <path> // 显示...
MongoDB MapReduce MapReduce是一种计算模型,简单的说就是将大批量的工作(数据)分解(MAP)执行,然后再将结果合并成最终结果(REDUCE)。这样做的好处是可以在任务被分解后,可以通过大量机器进行并行计算,减少...