博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的TextOutputFormat
阅读量:5902 次
发布时间:2019-06-19

本文共 15152 字,大约阅读时间需要 50 分钟。

本文主要研究一下flink的TextOutputFormat

DataStream.writeAsText

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

/**     * Writes a DataStream to the file specified by path in text format.     *     * 

For every element of the DataStream the result of {@link Object#toString()} is written. * * @param path * The path pointing to the location the text file is written to * @param writeMode * Controls the behavior for existing files. Options are * NO_OVERWRITE and OVERWRITE. * * @return The closed DataStream. */ @PublicEvolving public DataStreamSink

writeAsText(String path, WriteMode writeMode) { TextOutputFormat
tof = new TextOutputFormat<>(new Path(path)); tof.setWriteMode(writeMode); return writeUsingOutputFormat(tof); } /** * Writes the dataStream into an output, described by an OutputFormat. * *

The output is not participating in Flink's checkpointing! * *

For writing to a file system periodically, the use of the "flink-connector-filesystem" * is recommended. * * @param format The output format * @return The closed DataStream */ @PublicEvolving public DataStreamSink

writeUsingOutputFormat(OutputFormat
format) { return addSink(new OutputFormatSinkFunction<>(format)); }

  • DataStream的writeAsText方法创建了TextOutputFormat,然后通过OutputFormatSinkFunction包装为sink function

TextOutputFormat

flink-java-1.7.0-sources.jar!/org/apache/flink/api/java/io/TextOutputFormat.java

/** * A {@link FileOutputFormat} that writes objects to a text file. * * 

Objects are converted to Strings using either {@link Object#toString()} or a {@link TextFormatter}. * @param

type of elements */@PublicEvolvingpublic class TextOutputFormat
extends FileOutputFormat
{ private static final long serialVersionUID = 1L; private static final int NEWLINE = '\n'; private String charsetName; private transient Charset charset; // -------------------------------------------------------------------------------------------- /** * Formatter that transforms values into their {@link String} representations. * @param
type of input elements */ public interface TextFormatter
extends Serializable { String format(IN value); } public TextOutputFormat(Path outputPath) { this(outputPath, "UTF-8"); } public TextOutputFormat(Path outputPath, String charset) { super(outputPath); this.charsetName = charset; } public String getCharsetName() { return charsetName; } public void setCharsetName(String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException { if (charsetName == null) { throw new NullPointerException(); } if (!Charset.isSupported(charsetName)) { throw new UnsupportedCharsetException("The charset " + charsetName + " is not supported."); } this.charsetName = charsetName; } // -------------------------------------------------------------------------------------------- @Override public void open(int taskNumber, int numTasks) throws IOException { super.open(taskNumber, numTasks); try { this.charset = Charset.forName(charsetName); } catch (IllegalCharsetNameException e) { throw new IOException("The charset " + charsetName + " is not valid.", e); } catch (UnsupportedCharsetException e) { throw new IOException("The charset " + charsetName + " is not supported.", e); } } @Override public void writeRecord(T record) throws IOException { byte[] bytes = record.toString().getBytes(charset); this.stream.write(bytes); this.stream.write(NEWLINE); } // -------------------------------------------------------------------------------------------- @Override public String toString() { return "TextOutputFormat (" + getOutputFilePath() + ") - " + this.charsetName; }}

  • TextOutputFormat继承了FileOutputFormat,其open方法主要是调用FileOutputFormat的open方法,而writeRecord方法则直接往stream进行write,写完一条record之后再写一个换行(\n)

FileOutputFormat

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/io/FileOutputFormat.java

/** * The abstract base class for all Rich output formats that are file based. Contains the logic to * open/close the target * file streams. */@Publicpublic abstract class FileOutputFormat
extends RichOutputFormat
implements InitializeOnMaster, CleanupWhenUnsuccessful { //...... /** * Initialization of the distributed file system if it is used. * * @param parallelism The task parallelism. */ @Override public void initializeGlobal(int parallelism) throws IOException { final Path path = getOutputFilePath(); final FileSystem fs = path.getFileSystem(); // only distributed file systems can be initialized at start-up time. if (fs.isDistributedFS()) { final WriteMode writeMode = getWriteMode(); final OutputDirectoryMode outDirMode = getOutputDirectoryMode(); if (parallelism == 1 && outDirMode == OutputDirectoryMode.PARONLY) { // output is not written in parallel and should be written to a single file. // prepare distributed output path if(!fs.initOutPathDistFS(path, writeMode, false)) { // output preparation failed! Cancel task. throw new IOException("Output path could not be initialized."); } } else { // output should be written to a directory // only distributed file systems can be initialized at start-up time. if(!fs.initOutPathDistFS(path, writeMode, true)) { throw new IOException("Output directory could not be created."); } } } } @Override public void tryCleanupOnError() { if (this.fileCreated) { this.fileCreated = false; try { close(); } catch (IOException e) { LOG.error("Could not properly close FileOutputFormat.", e); } try { FileSystem.get(this.actualFilePath.toUri()).delete(actualFilePath, false); } catch (FileNotFoundException e) { // ignore, may not be visible yet or may be already removed } catch (Throwable t) { LOG.error("Could not remove the incomplete file " + actualFilePath + '.', t); } } } @Override public void configure(Configuration parameters) { // get the output file path, if it was not yet set if (this.outputFilePath == null) { // get the file parameter String filePath = parameters.getString(FILE_PARAMETER_KEY, null); if (filePath == null) { throw new IllegalArgumentException("The output path has been specified neither via constructor/setters" + ", nor via the Configuration."); } try { this.outputFilePath = new Path(filePath); } catch (RuntimeException rex) { throw new RuntimeException("Could not create a valid URI from the given file path name: " + rex.getMessage()); } } // check if have not been set and use the defaults in that case if (this.writeMode == null) { this.writeMode = DEFAULT_WRITE_MODE; } if (this.outputDirectoryMode == null) { this.outputDirectoryMode = DEFAULT_OUTPUT_DIRECTORY_MODE; } } @Override public void open(int taskNumber, int numTasks) throws IOException { if (taskNumber < 0 || numTasks < 1) { throw new IllegalArgumentException("TaskNumber: " + taskNumber + ", numTasks: " + numTasks); } if (LOG.isDebugEnabled()) { LOG.debug("Opening stream for output (" + (taskNumber+1) + "/" + numTasks + "). WriteMode=" + writeMode + ", OutputDirectoryMode=" + outputDirectoryMode); } Path p = this.outputFilePath; if (p == null) { throw new IOException("The file path is null."); } final FileSystem fs = p.getFileSystem(); // if this is a local file system, we need to initialize the local output directory here if (!fs.isDistributedFS()) { if (numTasks == 1 && outputDirectoryMode == OutputDirectoryMode.PARONLY) { // output should go to a single file // prepare local output path. checks for write mode and removes existing files in case of OVERWRITE mode if(!fs.initOutPathLocalFS(p, writeMode, false)) { // output preparation failed! Cancel task. throw new IOException("Output path '" + p.toString() + "' could not be initialized. Canceling task..."); } } else { // numTasks > 1 || outDirMode == OutputDirectoryMode.ALWAYS if(!fs.initOutPathLocalFS(p, writeMode, true)) { // output preparation failed! Cancel task. throw new IOException("Output directory '" + p.toString() + "' could not be created. Canceling task..."); } } } // Suffix the path with the parallel instance index, if needed this.actualFilePath = (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) ? p.suffix("/" + getDirectoryFileName(taskNumber)) : p; // create output file this.stream = fs.create(this.actualFilePath, writeMode); // at this point, the file creation must have succeeded, or an exception has been thrown this.fileCreated = true; } @Override public void close() throws IOException { final FSDataOutputStream s = this.stream; if (s != null) { this.stream = null; s.close(); } }}
  • FileOutputFormat继承了RichOutputFormat,实现了InitializeOnMaster(initializeGlobal方法)、CleanupWhenUnsuccessful(tryCleanupOnError方法)接口
  • initializeGlobal主要是判断,如果文件是分布式系统文件,那么就在启动的时候全局初始化一下;tryCleanupOnError方法先close,然后再delete文件
  • FileOutputFormat还实现了OutputFormat接口的configure、open、close方法,而writeRecord方法由子类来实现;configure方法主要是配置outputFilePath、writeMode、outputDirectoryMode这几个属性;open方法则根据taskNumber来获取actualFilePath(对于numTasks大于1的,则根据tasknumber在配置的outputFilePath目录下新增文件,文件名为tasknumber对应的数值+1),然后创建stream;close方法只要是关闭stream

RichOutputFormat

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/io/RichOutputFormat.java

/** * An abstract stub implementation for Rich output formats. * Rich formats have access to their runtime execution context via {@link #getRuntimeContext()}. */@Publicpublic abstract class RichOutputFormat
implements OutputFormat
{ private static final long serialVersionUID = 1L; // -------------------------------------------------------------------------------------------- // Runtime context access // -------------------------------------------------------------------------------------------- private transient RuntimeContext runtimeContext; public void setRuntimeContext(RuntimeContext t) { this.runtimeContext = t; } public RuntimeContext getRuntimeContext() { if (this.runtimeContext != null) { return this.runtimeContext; } else { throw new IllegalStateException("The runtime context has not been initialized yet. Try accessing " + "it in one of the other life cycle methods."); } }}
  • RichOutputFormat声明实现OutputFormat接口,它主要是增加了RuntimeContext属性

OutputFormat

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/io/OutputFormat.java

/** * The base interface for outputs that consumes records. The output format * describes how to store the final records, for example in a file. * 

* The life cycle of an output format is the following: *

    *
  1. configure() is invoked a single time. The method can be used to implement initialization from * the parameters (configuration) that may be attached upon instantiation.
  2. *
  3. Each parallel output task creates an instance, configures it and opens it.
  4. *
  5. All records of its parallel instance are handed to the output format.
  6. *
  7. The output format is closed
  8. *
* * @param
The type of the consumed records. */@Publicpublic interface OutputFormat
extends Serializable { /** * Configures this output format. Since output formats are instantiated generically and hence parameterless, * this method is the place where the output formats set their basic fields based on configuration values. *

* This method is always called first on a newly instantiated output format. * * @param parameters The configuration with all parameters. */ void configure(Configuration parameters); /** * Opens a parallel instance of the output format to store the result of its parallel instance. *

* When this method is called, the output format it guaranteed to be configured. * * @param taskNumber The number of the parallel instance. * @param numTasks The number of parallel tasks. * @throws IOException Thrown, if the output could not be opened due to an I/O problem. */ void open(int taskNumber, int numTasks) throws IOException; /** * Adds a record to the output. *

* When this method is called, the output format it guaranteed to be opened. * * @param record The records to add to the output. * @throws IOException Thrown, if the records could not be added to to an I/O problem. */ void writeRecord(IT record) throws IOException; /** * Method that marks the end of the life-cycle of parallel output instance. Should be used to close * channels and streams and release resources. * After this method returns without an error, the output is assumed to be correct. *

* When this method is called, the output format it guaranteed to be opened. * * @throws IOException Thrown, if the input could not be closed properly. */ void close() throws IOException;}

  • OutputFormat接口定义了configure、open、writeRecord、close方法

小结

  • DataStream的writeAsText方法创建了TextOutputFormat,然后通过OutputFormatSinkFunction包装为sink function
  • TextOutputFormat继承了FileOutputFormat,其open方法主要是调用FileOutputFormat的open方法,而writeRecord方法则直接往stream进行write,写完一条record之后再写一个换行(\n)
  • FileOutputFormat继承了RichOutputFormat,实现了InitializeOnMaster(initializeGlobal方法)、CleanupWhenUnsuccessful(tryCleanupOnError方法)接口,以及OutputFormat接口的configure、open、close方法,而writeRecord方法由子类来实现;
  • FileOutputFormat的open方法则根据taskNumber来获取actualFilePath(对于numTasks大于1的,则根据tasknumber在配置的outputFilePath目录下新增文件,文件名为tasknumber对应的数值+1),然后创建stream
  • RichOutputFormat声明实现OutputFormat接口,它主要是增加了RuntimeContext属性;OutputFormat接口则定义了configure、open、writeRecord、close方法

doc

转载地址:http://iuupx.baihongyu.com/

你可能感兴趣的文章
Developing a Service Provider using Java API(Service Provider Interface)(转)
查看>>
BAE Flask UEditor 使用七牛云
查看>>
Bootstrap系列 -- 15. 下拉选择框select
查看>>
关于WinPE安装操作系统
查看>>
LeetCode Median of Two Sorted Arrays
查看>>
oschina程序开发
查看>>
mysql创建每月执行一次的event
查看>>
ReactNative常用组件汇总
查看>>
nested exception is java.lang.NoClassDefFoundError: net/sf/cglib/proxy/CallbackFilter
查看>>
“正在注册字体”问题解决
查看>>
windows10 更新后要输入2次密码才能进入系统
查看>>
iOS开发-OpenGL ES入门教程1
查看>>
平衡二叉树(AVL树)
查看>>
面向对象思想(第一天)
查看>>
微信小程序 js逻辑
查看>>
linux 安装 sftp
查看>>
openStack queens
查看>>
(转)EOSIO开发(四)- nodeos、keosd与cleos
查看>>
MVC5+EF6 入门完整教程八
查看>>
Java 设计模式专栏
查看>>