sp; import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
public abstract class multiformat<K extends WritableComparable<?>, V extends Writable>
extends FileOutputFormat<K, V> {
private MultiRecordWriter writer = null;
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException,
InterruptedException {
if (writer == null) {
writer = new MultiRecordWriter(job, getTaskOutputPath(job));
}
return writer;
}
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
Path workPath = null;
OutputCommitter committer = super.getOutputCommitter(conf);
if (committer instanceof FileOutputCommitter) {
workPath = ((FileOutputCommitter) committer).getWorkPath();
} else {
Path outputPath = super.getOutputPath(conf);
if (outputPath == null) {
throw new IOException("Undefined job output-path");
}
workPath = outputPath;
}
return workPath;
}
/**通过key, value, conf来确定输出文件名(含扩展名)*/
protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);
public class MultiRecordWriter extends RecordWriter<K, V> {
/**RecordWriter的缓存*/
private HashMap<String, RecordWriter<K, V>> recordWriters = null;
private TaskAttemptContext job = null;
/**输出目录*/
private Path workPath = null;
public MultiRecordWriter(TaskAttemptContext job, Path workPath) {
&nbs
上一页 [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] 下一页