hadoop 怎么hive 查看hdfs文件ive原始文件

> 带索引的HIVEhive的查询跟普通的hadoopmapreduce没有什么大的区别,都是对原始数
带索引的HIVEhive的查询跟普通的hadoopmapreduce没有什么大的区别,都是对原始数
Liangdan_314 & &
发布时间: & &
浏览:38 & &
回复:0 & &
悬赏:0.0希赛币
带索引的HIVE  hive的查询跟普通的hadoop mapreduce没有什么大的区别,都是对原始数据的暴力扫描,如果能够像数据库那样,使用索引,那么数据扫描的速度将会大幅度提升
上次在mapreduce上使用了索引,具体参见下面这个链接
这次在这个基础上拓展到hive里(实际上也是一个特殊的inputformat),使用示例参见如下
一、创建索引(这个没啥好说的,直接看后面的源码吧)
hadoop jar ./higo-manager-1.3.1-SNAPSHOT.jar com.alipay.higo.hadoop.sequenceIndex.SequenceIndexExample create /group/tbdev/lingning/yannian.mu/input/1.txt /group/tbdev/lingning/yannian.mu/output 20
二、创建hive表(除了inputformat外,没任何特别之处)
CREATE EXTERNAL TABLE yannian_hive_index_test(col1 String,col2 String,col3 String,col4 String,col5 String)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'
STORED AS INPUTFORMAT 'com.alipay.higo.hadoop.sequenceIndex.SequenceIndexInputFormatForHive'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/group/tbdev/lingning/yannian.mu/output'
四、添加必要的jar
add jar ./higo-index-1.3.1-SNAPSHOT.&& //inputformat程序
add jar ./lucene-core-3.5-SNAPSHOT.//依赖的lucene
五、查询前的基于lucene索引的过滤
//设置hive表字段,一定要与创建表时候的字段顺序和个数一致
set hive.fields.sequence=col1,col2,col3,col4,col5;
//设置本地查询用到的字段,只有用到的字段才会被扫描
set lucene.fields=col1,col3,col2;
//lucene的查询条件-这里表示仅仅扫描col1字段前缀为1的数据行
set lucene.query=col1:1*;
六、经过lucene过滤后的结果,使用HIVE继续进行分析
select col1,col3 from yannian_hive_index_test limit 1000;
不算太麻烦吧,这回贴下完整的实现代码。
package com.alipay.higo.hadoop.sequenceI
  import java.io.DataI
import java.io.DataO
import java.io.FileNotFoundE
import java.io.IOE
import java.rmi.server.UID;
import java.security.MessageD
import java.util.A
import java.util.HashM
  import ormons.logging.L
import ormons.logging.LogF
import ornf.C
import org.apache.hadoop.fs.ChecksumE
import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FSDataOutputS
import org.apache.hadoop.fs.FileS
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.DataOutputB
import org.apache.hadoop.io.T
import org.apache.hadoop.io.VersionMismatchE
import org.apache.hadoop.io.SequenceFile.M
import org.apache.hadoop.io.WritableC
import org.apache.hadoop.util.P
import org.apache.lucene.store.BufferedIndexI
import org.apache.lucene.store.D
import org.apache.lucene.store.IndexI
import org.apache.lucene.store.IndexO
import org.apache.lucene.store.L
&* 基于lucene索引的顺序索引
&* @author yannian.mu
public class SequenceIndex {
&private static final Log LOG = LogFactory.getLog(SequenceIndex.class);
&private static final byte VERSION_WITH_METADATA = (byte) 6;
  private static final int SYNC_ESCAPE = -1; // &length& of sync entries
  private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
&private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; // escape + hash
  public static final int SYNC_INTERVAL = 100 * SYNC_SIZE;
&private static byte[] VERSION = new byte[] { (byte) 'S', (byte) 'E',
&&&(byte) 'I', VERSION_WITH_METADATA };
&public static Writer create(FileSystem fs, Configuration conf, Path name,
&&&int bufferSize, short replication, long blockSize,
&&&Progressable progress, Metadata metadata) throws IOException
&&return new Writer(fs, conf, name, bufferSize, replication, blockSize, progress, metadata);
&public static Reader open(FileSystem fs, Path file, int bufferSize, long start,
&&&long length, Configuration conf, boolean tempReader) throws IOException
&&return new Reader(fs, file, bufferSize, start, length, conf, tempReader);
  public static class Writer implements java.io.Closeable {
&&FSDataOutputS
&&boolean ownOutputStream =
&&DataOutputBuffer buffer = new DataOutputBuffer();
&&Metadata metadata =
&&long lastSyncP // position of last sync
&&byte[] // 16 random bytes
&&&&MessageDigest digester = MessageDigest.getInstance(&MD5&);
&&&&long time = System.currentTimeMillis();
&&&&digester.update((new UID() + &@& + time).getBytes());
&&&&sync = digester.digest();
&&&} catch (Exception e) {
&&&&throw new RuntimeException(e);
  public Writer(FileSystem fs, Configuration conf, Path name,
&&&&int bufferSize, short replication, long blockSize,
&&&&Progressable progress, Metadata metadata) throws IOException {
&&&this.conf =
&&&this.out = fs.create(name, true, bufferSize, replication,
&&&&&blockSize, progress);
&&&this.metadata =
&&&out.write(VERSION);
&&&this.metadata.write(out);
&&&out.write(sync); // write the sync bytes
&&&out.flush();
  public void sync() throws IOException {
&&&if (sync != null && lastSyncPos != out.getPos()) {
&&&&out.writeInt(SYNC_ESCAPE); // mark the start of the sync
&&&&out.write(sync); // write sync
&&&&lastSyncPos = out.getPos(); // update lastSyncPos
  public Configuration getConf() {
  public synchronized void close() throws IOException {
&&&if (out != null) {
  if (ownOutputStream) {
&&&&&out.close();
&&&&} else {
&&&&&out.flush();
  synchronized void checkAndWriteSync() throws IOException {
&&&if (sync != null && out.getPos() &= lastSyncPos + SYNC_INTERVAL) { // time
&&&&sync();
  public synchronized void append(Text key, Directory dir)
&&&&throws IOException {
&&&checkAndWriteSync();
&&&String[] names=dir.listAll();
&&&out.writeInt(key.getLength());
&&&out.write(key.getBytes(), 0, key.getLength());
&&&out.writeInt(names.length);
&&&for (String name : dir.listAll()) {
&&&&Text nameText=new Text(name);
&&&&out.writeInt(nameText.getLength());
&&&&out.write(nameText.getBytes(), 0,nameText.getLength());
&&&&long filelen=dir.fileLength(name);
&&&&out.writeLong(filelen);
&&&&this.writeTo(filelen, dir.openInput(name.toString()), out);
&&& private void writeTo(long end,IndexInput input,FSDataOutputStream out) throws IOException {
&&&&&& long pos = 0;
&&&&&& int bufflen=1024;
&&&&&& while (pos & end) {
&&&&&&&& int length =
&&&&&&&& long nextPos = pos +
&&&&&&&& if (nextPos & end) {&&&&&&&&&&&&&&&&&&&&&&& // at the last buffer
&&&&&&&&&& length = (int)(end - pos);
&&&&&&&& }
&&&&&&& byte[] buff=new byte[length];
&&&&&&& input.readBytes(buff, 0, length);
&&&&&&&& out.write(buff,0,length);
&&&&&&&& pos = nextP
  public synchronized long getLength() throws IOException {
&&&return out.getPos();
  public static class Reader implements java.io.Closeable {
&&private P
&&private FSDataInputS
&&private FSDataInputStream shardIn;
&&private Metadata metadata =
  private byte[] sync = new byte[SYNC_HASH_SIZE];
&&private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
&&private boolean syncS
&&private C
&&private Reader(FileSystem fs, Path file, int bufferSize, long start,
&&&&long length, Configuration conf, boolean tempReader)
&&&&throws IOException {
&&&this.file =
&&&this.in = fs.open(file, bufferSize);
&&&this.shardIn=fs.open(file, bufferSize);
&&&this.conf =
&&&seek(start);
&&&this.end = in.getPos() +
&&&init(tempReader);
  private void init(boolean tempReader) throws IOException {
&&&byte[] versionBlock = new byte[VERSION.length];
&&&in.readFully(versionBlock);
  if ((versionBlock[0] != VERSION[0])
&&&&&|| (versionBlock[1] != VERSION[1])
&&&&&|| (versionBlock[2] != VERSION[2]))
&&&&throw new IOException(file + & not a SequenceIndex&);
  version = versionBlock[3];
&&&if (version & VERSION[3])
&&&&throw new VersionMismatchException(VERSION[3], version);
  this.metadata = new Metadata();
&&&if (version &= VERSION_WITH_METADATA) { // if version &= 6
&&&&this.metadata.readFields(in);
  if (version & 1) { // if version & 1
&&&&in.readFully(sync); // read sync bytes
  public synchronized void close() throws IOException {
&&&in.close();
&&&this.shardIn.close();
  private synchronized int readKeyLength() throws IOException {
&&&if (in.getPos() &= end) {
&&&&return -1;
&&&int length = in.readInt();
&&&if (version & 1 && sync != null && length == SYNC_ESCAPE) { // process
&&&&in.readFully(syncCheck); // read syncCheck
&&&&if (!Arrays.equals(sync, syncCheck)) // check it
&&&&&throw new IOException(&File is corrupt!&);
&&&&syncSeen =
&&&&if (in.getPos() &= end) {
&&&&&return -1;
&&&&length = in.readInt(); // re-read length
&&&} else {
&&&&syncSeen =
  public synchronized int next(Text key, SequenceIndexDirectory dir)
&&&&throws IOException {
&&&int length = readKeyLength();
&&&if (length == -1) {
&&&&return -1;
&&&dir.setShareStream(this.shardIn);
&&&byte[] keydata = new byte[length];
&&&in.read(keydata, 0, length);
&&&key.set(keydata);
&&&int filecount = in.readInt();
&&&for (int i = 0; i & i++) {
&&&&int namelen = in.readInt();
&&&&byte[] namebyte = new byte[namelen];
&&&&in.read(namebyte, 0, namelen);
&&&&Text name = new Text(namebyte);
&&&&long filelen = in.readLong();
&&&&long pos = in.getPos();
&&&&in.skip(filelen);
&&&&dir.addFile(name.toString(), pos, filelen);
  public Metadata getMetadata() {
&&&return this.
  Configuration getConf() {
  public synchronized void seek(long position) throws IOException {
&&&in.seek(position);
  public synchronized void sync(long position) throws IOException {
&&&if (position + SYNC_SIZE &= end) {
&&&&seek(end);
&&&&seek(position + 4); // skip escape
&&&&in.readFully(syncCheck);
&&&&int syncLen = sync.
&&&&for (int i = 0; in.getPos() & i++) {
&&&&&int j = 0;
&&&&&for (; j & syncL j++) {
&&&&&&if (sync[j] != syncCheck[(i + j) % syncLen])
&&&&&if (j == syncLen) {
&&&&&&in.seek(in.getPos() - SYNC_SIZE); // position before
&&&&&&&&&&&&&&&// sync
&&&&&syncCheck[i % syncLen] = in.readByte();
&&&} catch (ChecksumException e) { // checksum failure
&&&&handleChecksumException(e);
  private void handleChecksumException(ChecksumException e)
&&&&throws IOException {
&&&if (this.conf.getBoolean(&io.skip.checksum.errors&, false)) {
&&&&LOG.warn(&Bad checksum at & + getPosition()&+ &. Skipping entries.&);
&&&&sync(getPosition()+ this.conf.getInt(&io.bytes.per.checksum&, 512));
&&&} else {
  public boolean syncSeen() {
&&&return syncS
  public synchronized long getPosition() throws IOException {
&&&return in.getPos();
  public String toString() {
&&&return file.toString();
&public static class HadoopDirectory implements WritableComparable{
&&Directory dir=
&&public Directory getDir() {
  public void setDir(Directory dir) {
&&&this.dir =
  @Override
&&public void write(DataOutput out) throws IOException {
&&&throw new UnsupportedOperationException();
  @Override
&&public void readFields(DataInput in) throws IOException {
&&&throw new UnsupportedOperationException();
  @Override
&&public int compareTo(Object arg0) {
&&&throw new UnsupportedOperationException();
  public static class SequenceIndexDirectory extends Directory {
&&private static int BUFFER_SIZE = 1024;
&&private static final class FileEntry {
  public FileEntry(long offset, long length) {
&&&&this.offset =
&&&&this.length =
  private FSDataInputStream shareS
&&private HashMap&String, FileEntry& entries = new HashMap&String, FileEntry&();
  @Override
&&public synchronized void close() throws IOException {
&&&if (shareStream == null)
&&&&throw new IOException(&Already closed&);
  entries.clear();
&&&shareStream =
  public void setShareStream(FSDataInputStream _stream) {
&&&this.shareStream = _
  public void addFile(String name, long offset, long length) {
&&&entries.put(name, new FileEntry(offset, length));
  @Override
&&public synchronized IndexInput openInput(String id) throws IOException {
&&&return openInput(id, BUFFER_SIZE);
  @Override
&&public synchronized IndexInput openInput(String id, int readBufferSize)
&&&&throws IOException {
&&&if (shareStream == null)
&&&&throw new IOException(&Stream closed&);
  FileEntry entry = entries.get(id);
&&&if (entry == null) {
&&&&throw new IOException(&No sub-file with id & + id
&&&&&&+ & found (files: & + entries.keySet() + &)&);
&&&return new ShareIndexInput(id,shareStream, entry.offset,
&&&&&entry.length);
  @Override
&&public String[] listAll() {
&&&return entries.keySet().toArray(new String[entries.size()]);
  @Override
&&public boolean fileExists(String name) {
&&&return entries.containsKey(name);
  @Override
&&public long fileModified(String name) throws IOException {
&&&throw new UnsupportedOperationException();
  @Override
&&@Deprecated
&&public void touchFile(String name) throws IOException {
&&&throw new UnsupportedOperationException();
  @Override
&&public void deleteFile(String name) {
&&&throw new UnsupportedOperationException();
  public void renameFile(String from, String to) {
&&&throw new UnsupportedOperationException();
  @Override
&&public long fileLength(String name) throws IOException {
&&&FileEntry e = entries.get(name);
&&&if (e == null)
&&&&throw new FileNotFoundException(name);
&&&return e.
  @Override
&&public IndexOutput createOutput(String name) {
&&&throw new UnsupportedOperationException();
  @Override
&&public Lock makeLock(String name) {
&&&throw new UnsupportedOperationException();
  public static class ShareIndexInput extends BufferedIndexInput {
&&&public class Descriptor{
&&&&FSDataInputStream in=
&&&&public FSDataInputStream getIn() {
  public void setIn(FSDataInputStream in) {
&&&&&this.in =
&&&&public void close()
&&&private final D
&&&@Override
&&&public String toString() {
&&&&return &ShareIndexInput [length=& + length + &, fileOffset=&
&&&&&&+ fileOffset + &, filename=& + filename + &]&;
  private boolean isO
&&&private boolean isC
&&&private long fileO
&&&private S
  public ShareIndexInput(String _filename,FSDataInputStream shareStream,
&&&&&long _fileOffset, long _length) throws IOException {
&&&&super(&sequenceIndex input&);
&&&&this.filename=_
&&&&this.descriptor = new Descriptor();
&&&&this.descriptor.setIn(shareStream);
&&&&this.fileOffset = _fileO
&&&&this.length = _
&&&&this.isOpen =
&&&&this.isClone =
&&&protected void readInternal(byte[] b, int offset, int len)
&&&&&throws IOException {
&&&&synchronized (descriptor.in) {
&&&&&long position = getFilePointer();
&&&&&if ((position+this.fileOffset) != descriptor.in.getPos()) {
&&&&&&descriptor.in.seek(position+this.fileOffset);
&&&&&int total = 0;
&&&&&&int i = descriptor.in.read(b, offset + total, len
&&&&&&&&- total);
&&&&&&if (i == -1) {
&&&&&&&throw new IOException(&Read past EOF&);
&&&&&&total +=
&&&&&} while (total & len);
  public void close() throws IOException {
&&&&if (!isClone) {
&&&&&if (isOpen) {
&&&&&&descriptor.close();
&&&&&&isOpen =
&&&&&} else {
&&&&&&throw new IOException(&Index file already closed&);
  public long length() {
  protected void finalize() throws IOException {
&&&&if (!isClone && isOpen) {
&&&&&close();
  public Object clone() {
&&&&ShareIndexInput clone = (ShareIndexInput) super.clone();
&&&&clone.isClone =
  @Override
&&&protected void seekInternal(long pos) throws IOException {
###################################################################################
package com.alipay.higo.hadoop.sequenceI
  import java.io.IOE
  import ornf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.BytesW
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapred.JobC
import org.apache.hadoop.mapred.JobC
import org.apache.hadoop.mapred.OutputC
import org.apache.hadoop.mapred.R
import org.apache.hadoop.mapred.RunningJ
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.output.TextOutputF
import org.apache.lucene.analysis.standard.StandardA
import org.apache.lucene.document.D
import org.apache.lucene.document.F
import org.apache.lucene.document.F
import org.apache.lucene.document.Field.I
import org.apache.lucene.document.Field.S
import org.apache.lucene.index.IndexR
import org.apache.lucene.index.IndexW
import org.apache.lucene.index.KeepOnlyLastCommitDeletionP
import org.apache.lucene.index.IndexWriter.MaxFieldL
import org.apache.lucene.queryParser.ParseE
import org.apache.lucene.queryParser.QueryP
import org.apache.lucene.search.IndexS
import org.apache.lucene.search.ScoreD
import org.apache.lucene.search.TopD
import org.apache.lucene.store.D
import org.apache.lucene.store.RAMD
import org.apache.lucene.util.V
  import com.alipay.higo.hadoop.sequenceIndex.SequenceIndex.HadoopD
  public class SequenceIndexExample {
&public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
&&String type=args[0];
&&String input=args[1];
&&String output=args[2];
&&Integer numreduce=Integer.parseInt(args[3]);
&&if(type.equals(&create&))
&&&create(input, output,numreduce);
&&}if(type.equals(&searchold&))
&&&searchOld(input, output,numreduce);
&&&search(input, output,numreduce);
&private static void search(String input,String output,int numreduce) throws IOException, InterruptedException, ClassNotFoundException
&&Job job = new Job(new Configuration());
&&job.setInputFormatClass(SequenceIndexInputFormat.class);
&&SequenceIndexInputFormat.addInputPath(job, new Path(input));
&&job.setMapperClass(IndexMap.class);
&&job.setJarByClass(SequenceIndexExample.class);
&&job.setOutputKeyClass(Text.class);
&&job.setOutputValueClass(Text.class);
&&TextOutputFormat.setOutputPath(job, new Path(output));
&&job.setNumReduceTasks(numreduce);
&&job.waitForCompletion(true);
&private static void searchOld(String input,String output,int numreduce) throws IOException, InterruptedException, ClassNotFoundException
&&Configuration conf=new Configuration();
&&conf.set(&hive.fields.sequence&,&index,col1,col2,col3,col4,col5&);
&&conf.set(&lucene.fields&,&index,col3&);
&&conf.set(&lucene.query&,&index:500&);
&&JobConf jobconf=new JobConf(conf, SequenceIndexInputFormatForHive.class);
&&jobconf.setJobName(&oldsearch&);
&&jobconf.setNumReduceTasks(numreduce);
&&jobconf.setInputFormat(SequenceIndexInputFormatForHive.class);
&&jobconf.setMapperClass(OldMapper.class);
&&jobconf.setOutputKeyClass(Text.class);
&&jobconf.setOutputValueClass(Text.class);
&&SequenceIndexInputFormatForHive.addInputPath(jobconf, new Path(input));
&&org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(jobconf,new Path(output));
&&&&&&& RunningJob rj = JobClient.runJob(jobconf);
& public static class OldMapper implements org.apache.hadoop.mapred.Mapper&LongWritable, BytesWritable, Text, Text& {
  @Override
&&public void configure(JobConf job) {
  @Override
&&public void close() throws IOException {
  @Override
&&public void map(LongWritable key, BytesWritable value,
&&&&OutputCollector&Text, Text& output, Reporter reporter)
&&&&throws IOException {
&&&output.collect(new Text(String.valueOf(key.get())), new Text(value.get()));
  private static void create(String input,String output,int numreduce) throws IOException, InterruptedException, ClassNotFoundException
&&Job job = new Job(new Configuration());
&&FileInputFormat.addInputPath(job, new Path(input));
&&job.setJarByClass(SequenceIndexExample.class);
&&job.setMapOutputKeyClass(LongWritable.class);
&&job.setMapOutputValueClass(Text.class);
&&job.setReducerClass(IndexReducer.class);
&&job.setOutputKeyClass(Text.class);
&&job.setOutputValueClass(HadoopDirectory.class);
&&job.setOutputFormatClass(SequenceIndexOutputFormat.class);
&&SequenceIndexOutputFormat.setOutputPath(job, new Path(output));
&&job.setNumReduceTasks(numreduce);
&&job.waitForCompletion(true);
&public static class IndexMap extends
&&&Mapper&Text, HadoopDirectory, Text, Text& {
  protected void map(Text key, HadoopDirectory value, Context context)
&&&&throws IOException, InterruptedException {
&&&Directory dir = value.getDir();
&&&IndexReader reader = IndexReader.open(dir);
&&&StandardAnalyzer an = new StandardAnalyzer(Version.LUCENE_35);
&&&QueryParser q = new QueryParser(Version.LUCENE_35, &index&, an);
&&&IndexSearcher searcher = new IndexSearcher(reader);
&&&&docs = searcher.search(q.parse(&index:500&), 20);
&&&} catch (ParseException e) {
&&&&throw new RuntimeException(e);
  ScoreDoc[] list = docs.scoreD
&&&if (list != null && list.length & 0) {
&&&&StringBuffer buff = new StringBuffer();
&&&&for (ScoreDoc doc : list) {
&&&&&Document document = searcher.doc(doc.doc);
&&&&&for (Fieldable f : document.getFields()) {
&&&&&&buff.append(f.name() + &=&
&&&&&&&&+ document.getFieldable(f.name()).stringValue()
&&&&&&&&+ &,&);
&&&&&context.write(key, new Text(buff.toString()));
&public static class IndexReducer extends
&&& Reducer&LongWritable, Text, Text, HadoopDirectory& {
&&boolean setup=
&&& protected void reduce(LongWritable key, Iterable&Text& values,
&&&&&&& Context context) throws java.io.IOException, InterruptedException {
&&&if(setup)
&&&& for(int k=0;k&10000;k++)
&&&&& HadoopDirectory hdir=new HadoopDirectory();
&&&&& hdir.setDir(new RAMDirectory());
&&&&& IndexWriter writer = new IndexWriter(hdir.getDir(), null,
&&&&&&&&&&&& new KeepOnlyLastCommitDeletionPolicy(),
&&&&&&&&&&&& MaxFieldLength.UNLIMITED);
&&&& writer.setUseCompoundFile(false);
&&&& writer.setMergeFactor(2);
&&&&System.out.println(k);
&&&&for(int i=0;i&1000;i++)
&&&&&Document doc=new Document();
&&&&&doc.add(new Field(&index&, String.valueOf(i), Store.YES, Index.NOT_ANALYZED_NO_NORMS));
&&&&&for(int j=0;j&10;j++)
&&&&&&doc.add(new Field(&col&+j, String.valueOf(i)+&,&+j+&,&+k, Store.YES, Index.NOT_ANALYZED_NO_NORMS));
&&&&&writer.addDocument(doc);
&&&&writer.optimize();
&&&&writer.close();
&&&&context.write(new Text(String.valueOf(k)), hdir);
  #####################################################################
package com.alipay.higo.hadoop.sequenceI
  import java.io.IOE
  import ornf.C
import org.apache.hadoop.fs.FileS
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.SequenceF
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.InputS
import org.apache.hadoop.mapreduce.RecordR
import org.apache.hadoop.mapreduce.TaskAttemptC
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.FileS
  import com.alipay.higo.hadoop.sequenceIndex.SequenceIndex.HadoopD
import com.alipay.higo.hadoop.sequenceIndex.SequenceIndex.SequenceIndexD
  public class SequenceIndexInputFormat extends FileInputFormat&Text,HadoopDirectory&{
&@Override
&& public RecordReader&Text,HadoopDirectory& createRecordReader(InputSplit split,
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& TaskAttemptContext context
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& ) throws IOException {
&&&return& new SequenceIndexRecordReader(split,context);
&&} catch (InterruptedException e) {
&&&throw new IOException(e);
  @Override
&& protected long getFormatMinSplitSize() {
&&&& return SequenceIndex.SYNC_INTERVAL;
&& public static class SequenceIndexRecordReader extends RecordReader&Text,HadoopDirectory&{
  private SequenceIndex.R
&&& private boolean more =
&&& private Text key =
&&& private HadoopDirectory value =
&&& protected C
&&& public void initialize(InputSplit split,
&&&&&&&&&&&&&&&&& TaskAttemptContext context
&&&&&&&&&&&&&&&&& ) throws IOException, InterruptedException {
&&& public SequenceIndexRecordReader(InputSplit split,
&&&&&&&&&&&&&&&&&&&&&&&&&& TaskAttemptContext context
&&&&&&&&&&&&&&&&&&&&&&&&&& ) throws IOException, InterruptedException {
&&&& FileSplit fileSplit = (FileSplit)
&&&&& this.init(context.getConfiguration(), fileSplit.getPath(), fileSplit.getStart(), fileSplit.getLength());
&&public SequenceIndexRecordReader(Configuration _conf,Path _path,long _start,long _len) throws IOException,
&&&&InterruptedException {
&&&this.init(_conf, _path, _start, _len);
&&private void init(Configuration _conf, Path path,
&&&&long _start, long len) throws IOException, InterruptedException {
&&&conf = _
&&&FileSystem fs = path.getFileSystem(conf);
&&&this.in = SequenceIndex.open(fs, path, conf.getInt(
&&&&&&io.file.buffer.size&, 4096), 0, fs.getFileStatus(path)
&&&&&.getLen(), conf, false);// new SequenceFile.Reader(fs, path,
&&&this.end = _start +
  if (_start & in.getPosition()) {
&&&&in.sync(_start); // sync to start
  this.start = in.getPosition();
&&&more = _start &
  @Override
&&& public boolean nextKeyValue() throws IOException, InterruptedException {
&&&&& if (!more) {
&&&&& long pos = in.getPosition();
&&&&& this.key=new Text();
&&&&& this.value=new HadoopDirectory();
&&&&& SequenceIndexDirectory dir=new SequenceIndexDirectory();
&&&&& this.value.setDir(dir);
&&&&& if(this.in.next(this.key, dir)&0||(pos &= end && in.syncSeen()))
&&&&& &more =
&&&&&& key =
&&&&&& value =
  @Override
&&& public Text getCurrentKey() {
&&& @Override
&&& public HadoopDirectory getCurrentValue() {
&&& public long getpos() throws IOException
&&&& return in.getPosition();
&&& public float getProgress() throws IOException {
&&&&& if (end == start) {
&&&&&&& return 0.0f;
&&&&& } else {
&&&&&&& return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
&&& public synchronized void close() throws IOException { in.close(); }
#############################################################
package com.alipay.higo.hadoop.sequenceI
  import java.io.IOE
import java.util.ArrayL
import java.util.A
import java.util.HashM
import ornf.C
import org.apache.hadoop.io.BytesW
import org.apache.hadoop.io.LongW
import org.apache.hadoop.mapred.FileS
import org.apache.hadoop.mapred.InputS
import org.apache.hadoop.mapred.JobC
import org.apache.hadoop.mapred.RecordR
import org.apache.hadoop.mapred.R
import org.apache.hadoop.mapred.SequenceFileInputF
import org.apache.lucene.analysis.standard.StandardA
import org.apache.lucene.document.D
import org.apache.lucene.document.F
import org.apache.lucene.document.MapFieldS
import org.apache.lucene.index.IndexR
import org.apache.lucene.queryParser.ParseE
import org.apache.lucene.queryParser.QueryP
import org.apache.lucene.search.IndexS
import org.apache.lucene.search.ScoreD
import org.apache.lucene.search.TopD
import org.apache.lucene.util.V
  import com.alipay.higo.hadoop.sequenceIndex.SequenceIndex.HadoopD
import com.alipay.higo.hadoop.sequenceIndex.SequenceIndexInputFormat.SequenceIndexRecordR
public class SequenceIndexInputFormatForHive extends SequenceFileInputFormat&LongWritable, BytesWritable& {
&public RecordReader&LongWritable, BytesWritable& getRecordReader(
&&&InputSplit split, JobConf job, Reporter reporter)
&&&throws IOException {
&&FileSplit part = (FileSplit)
&&return new HiveTarRecordReader(job, part);
&public static class HiveTarRecordReader implements
&&&RecordReader&LongWritable, BytesWritable& {
&&private SequenceIndexRecordReader seqReader =
&&IndexReader reader=
&&private String hive_fields = &&;
&&private ArrayList&String& rowFields = new ArrayList&String&();
&&private ArrayList&String& lucene_fields = new ArrayList&String&();
&&private String lucene_query = &&;
&&private HadoopD
&&private IndexS
&&private ScoreDoc[]
&&private int lineIndex = -1;
&&public HiveTarRecordReader(Configuration conf, FileSplit _split)
&&&&throws IOException {
&&&this.hive_fields = conf.get(&hive.fields.sequence&,&&);
&&&this.split=_
&&&for(String f:this.hive_fields.split(&,&))
&&&&this.rowFields.add(f);
&&&for(String f:conf.get(&lucene.fields&,&&).split(&,&))
&&&&this.lucene_fields.add(f);
&&&this.lucene_query = conf.get(&lucene.query&);
&&&&seqReader = new SequenceIndexRecordReader(conf,_split.getPath(),_split.getStart(),_split.getLength());
&&&} catch (InterruptedException e) {
&&&&throw new IOException(e);
  public synchronized boolean next(LongWritable pos, BytesWritable k)
&&&&throws IOException {
&&&while (lineIndex == -1 || list == null || lineIndex &= list.length) {
&&&&&if (!seqReader.nextKeyValue()) {
&&&&} catch (InterruptedException e1) {
&&&&&throw new IOException(e1);
&&&&if(this.searcher!=null)
&&&&&this.searcher.close();
&&&&if(this.reader!=null)
&&&&&this.reader.close();
&&&&if(this.dir!=null)
&&&&&this.dir.getDir().close();
&&&&this.dir = seqReader.getCurrentValue();
&&&&&this.reader = IndexReader.open(dir.getDir());
&&&&}catch(IOException e)
&&&&&throw new IOException(this.split.toString()+&@@@&+dir.getDir().toString()+&@@@&+dir.getDir().getClass().getName(), e);
&&&&StandardAnalyzer an = new StandardAnalyzer(Version.LUCENE_35);
&&&&QueryParser q = new QueryParser(Version.LUCENE_35, &index&, an);
&&&&this.searcher = new IndexSearcher(reader);
&&&&&docs = this.searcher.search(q.parse(this.lucene_query), );
&&&&} catch (ParseException e) {
&&&&&throw new RuntimeException(e);
  this.list = docs.scoreD
&&&&this.lineIndex=0;
&&&ScoreDoc doc=this.list[this.lineIndex];
&&&Document document = this.searcher.doc(doc.doc,new MapFieldSelector(this.lucene_fields));
&&&HashMap&String,String& val=new HashMap&String,String&();
&&&for (Fieldable f : document.getFields()) {
&&&&String fname=f.name();
&&&&val.put(fname, document.getFieldable(fname).stringValue());
&&&StringBuffer buff = new StringBuffer();
&&&String joinchar=&&;
&&&for(String f:this.rowFields)
&&&&buff.append(joinchar);&&&&
&&&&if(val.containsKey(f))
&&&&&buff.append(val.get(f));
&&&&}else{
&&&&&buff.append(&-&);
&&&&joinchar=&\001&;
&&&&& pos.set(this.seqReader.getpos());
&&&&& String line=buff.toString();
&&&&&&&&&&& byte[] textBytes = line.getBytes();
&&&&&&&&&&& int length = line.length();
&&&&&&&&&&& k.set(textBytes, 0, textBytes.length);
&&&lineIndex++;
&&&&&&& public void close() throws IOException {
&&&&&&&&&&& seqReader.close();
  public LongWritable createKey() {
&&&&&&&&&&& return new LongWritable();
  public BytesWritable createValue() {
&&&&&&&&&&& return new BytesWritable();
  public long getPos() throws IOException {
&&&&&&&&&&& return seqReader.getpos();
  public float getProgress() throws IOException {
&&&&&&&&&&& return seqReader.getProgress();
  ###################################################################################
  package com.alipay.higo.hadoop.sequenceI
  import java.io.IOE
import ornf.C
import org.apache.hadoop.fs.FileS
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.T
import org.apache.hadoop.io.SequenceFile.M
import org.apache.hadoop.mapreduce.RecordW
import org.apache.hadoop.mapreduce.TaskAttemptC
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
  import com.alipay.higo.hadoop.sequenceIndex.SequenceIndex.HadoopD
public class SequenceIndexOutputFormat extends FileOutputFormat&Text,HadoopDirectory&{
  public RecordWriter&Text,HadoopDirectory&
&&&&&&&&& getRecordWriter(TaskAttemptContext context
&&&&&&&&&&&&&&&&&&&&&&&&& ) throws IOException, InterruptedException {
&&&& Configuration conf = context.getConfiguration();
&&&& Path file = getDefaultWorkFile(context, &&);
&&&& FileSystem fs = file.getFileSystem(conf);
&&&& final SequenceIndex.Writer out = SequenceIndex.create(fs, conf, file, conf.getInt(&io.file.buffer.size&, 4096),& fs.getDefaultReplication(), fs.getDefaultBlockSize(), null, new Metadata());
  return new RecordWriter&Text,HadoopDirectory&() {
  public void write(Text key, HadoopDirectory value)
&&&&&&&&&& throws IOException {
&&&&&&&&&& out.append(key, value.getDir());
&&&&&&&& }
  public void close(TaskAttemptContext context) throws IOException {
&&&&&&&&&& out.close();
&&&&&&&& }
  write by 开心延年 from
本问题标题:
本问题地址:
温馨提示:本问题已经关闭,不能解答。
暂无合适的专家
&&&&&&&&&&&&&&&
希赛网 版权所有 & &&}

我要回帖

更多关于 reghive文件查看器 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信