feature: supported ftp source reader and target#915
feature: supported ftp source reader and target#915vainhope merged 12 commits intoDTStack:masterfrom Jixiangup:feat_ftpsource_onsync
Conversation
|
check this |
|
Why do you base on branch DTstack:feat/ftp rather than branch master? |
because the front end wants to submit a pr to merge it, i changed it |
…fied type to determine whether to rewrite
| import com.dtstack.taier.develop.vo.develop.query.*; | ||
| import com.dtstack.taier.develop.vo.develop.result.*; |
| <!-- https://mvnrepository.com/artifact/org.apache.poi/poi --> | ||
| <dependency> | ||
| <groupId>org.apache.poi</groupId> | ||
| <artifactId>poi</artifactId> | ||
| </dependency> | ||
|
|
||
| <!-- https://mvnrepository.com/artifact/org.apache.poi/poi-ooxml --> | ||
| <dependency> | ||
| <groupId>org.apache.poi</groupId> | ||
| <artifactId>poi-ooxml</artifactId> | ||
| </dependency> |
There was a problem hiding this comment.
What is the purpose of depending on apache.poi?
|
| import com.dtstack.taier.common.enums.EScheduleJobType; | ||
| import com.dtstack.taier.common.enums.EScheduleStatus; | ||
| import com.dtstack.taier.common.enums.ResourceRefType; | ||
| import com.dtstack.taier.common.enums.*; |
| import org.apache.commons.lang.BooleanUtils; | ||
| import org.apache.commons.lang.StringUtils; | ||
| import org.apache.commons.lang3.ObjectUtils; | ||
| import org.apache.poi.ss.usermodel.*; |
| import org.springframework.stereotype.Service; | ||
| import org.springframework.transaction.annotation.Transactional; | ||
|
|
||
| import java.io.*; |
| import java.util.Objects; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
| import java.util.*; |
| private List<FTPColumn> getTxtColumns(DevelopTaskParsingFTPFileParamVO payload, InputStream fileInputStream) { | ||
| List<FTPColumn> columns = new ArrayList<>(); | ||
| try { | ||
| // sftp特性,当你关闭ChannelSftp时随即你获取到的流对象也会被关闭所以这里要爆漏channelSftp获取流对象 | ||
| // The sftp feature, when you close the ChannelSftp, the stream object you get will also be closed, so here we need to leak the ChannelSftp to get the stream object | ||
| BufferedReader bis = new BufferedReader(new InputStreamReader(fileInputStream, payload.getEncoding())); | ||
| // memory to store buffered stream per read | ||
| int limit = 1; | ||
| String line = null; | ||
| while ((line = bis.readLine()) != null && limit <= 1) { | ||
| String[] split = line.split(payload.getColumnSeparator()); | ||
| if (payload.getFirstColumnName() && limit == 1) { | ||
| for (int i = 0; i < split.length; i++) { | ||
| FTPColumn ftpColumn = new FTPColumn(); | ||
| ftpColumn.setName(split[i]); | ||
| ftpColumn.setType("string"); | ||
| ftpColumn.setIndex(i); | ||
| columns.add(ftpColumn); | ||
| } | ||
| } else if (!payload.getFirstColumnName() && limit == 1) { | ||
| for (int i = 0; i < split.length; i++) { | ||
| FTPColumn ftpColumn = new FTPColumn(); | ||
| ftpColumn.setName("column" + i); | ||
| ftpColumn.setType("string"); | ||
| ftpColumn.setIndex(i); | ||
| columns.add(ftpColumn); | ||
| } | ||
| } | ||
| limit++; | ||
| } | ||
| } catch (IOException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| return columns; | ||
| } |
There was a problem hiding this comment.
Unclosed stream. Check the code, please.
| import org.springframework.stereotype.Component; | ||
|
|
||
| import javax.annotation.PostConstruct; | ||
| import java.util.*; |
|
init sql lost sql change |
…lectionUtils, remove import all
check it |
|
|
ftp 手动映射字段 column为空 2022-12-06 20:16:25:org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Field names must be unique. Duplicate field: 'null' |
我有时间会确认它! |
| */ | ||
| public abstract class CollectionUtils { | ||
|
|
||
| public static boolean isEmpty(Collection<?> collection) { |
| @Override | ||
| public TaskResourceParam beforeProcessing(TaskResourceParam taskResourceParam) { | ||
| Map<String, Object> sourceMap = taskResourceParam.getSourceMap(); | ||
| assertSourceParam(sourceMap); |
There was a problem hiding this comment.
when create new task, path must be null

Related PRs
Please make sure acesss this PR with #911