MAPREDUCE-7341. Intermediate Manifest Committer#2971
MAPREDUCE-7341. Intermediate Manifest Committer#2971steveloughran wants to merge 52 commits intoapache:trunkfrom
Conversation
4631327 to
7d65947
Compare
5e8cdd3 to
4f34112
Compare
7872e5f to
efad653
Compare
mukund-thakur
left a comment
There was a problem hiding this comment.
Went through the prod code. Looks really promising.
And glad to see the usage of state design pattern ( not exactly but similar)
.../src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/AbortTaskStage.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/CommitJobStage.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/AbstractJobCommitStage.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitter.java
Outdated
Show resolved
Hide resolved
efad653 to
3f07572
Compare
attilapiros
left a comment
There was a problem hiding this comment.
just some nits in the documentation
...uce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md
Outdated
Show resolved
Hide resolved
...uce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md
Outdated
Show resolved
Hide resolved
...hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md
Outdated
Show resolved
Hide resolved
...hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md
Outdated
Show resolved
Hide resolved
...preduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_protocol.md
Outdated
Show resolved
Hide resolved
* renamed the prepare.target.files option to delete.target.files as there's no longer parent dir preparation as an option. * clean up the docs to match * optimise the deletion by using the list of created dirs from the mkdir phase: if a dir was created, there's no need to check for/delete the target files. * test The test is a bit tricky as the local fs lets you rename a file onto a file; we use the FS XML contracts to probe when does an fs reject it, which we know abfs does. Change-Id: I0840a533a85280fad2450eda9fb999948e73ea8a
Co-authored-by: Attila Zsolt Piros <[email protected]>
Co-authored-by: Attila Zsolt Piros <[email protected]>
Co-authored-by: Attila Zsolt Piros <[email protected]>
Co-authored-by: Attila Zsolt Piros <[email protected]>
Co-authored-by: Attila Zsolt Piros <[email protected]>
|
@attilapiros thanks for those suggestions, just tried them through the "commit suggestions" button in github. nice |
|
May be useful to move some of the logs in PathOutputCommitter factory to INFO level? - to easily find out which committer is being used. |
sidseth
left a comment
There was a problem hiding this comment.
Left some comments. The main concern for me is still the dependency.
Have not reviewed the ABFS changes, and assuming the TaskPool was already reviewed.
| if (exception != null) { | ||
| // failure, report and continue | ||
| LOG.warn("{}: Deleting job directory {} failed: moving to trash", getName(), baseDir); | ||
| moveToTrash = true; |
There was a problem hiding this comment.
This shouldn't be automatically enabled, right? Especially in the absence of any entity to go and clean-up this data at a later point.
There was a problem hiding this comment.
good q. its about handling timeout of dir deletion in azure, which we've seen trying to delete the whole of _temporary and oauth...things were timing out. here we do delete each TA subdir in parallel. so the risk is much smaller -a single task would have to contain too many dirs, whereas today its #of dirs in a task attempt * number of task attempts.
| @Override | ||
| public boolean isTrashEnabled(Path path) { | ||
| try { | ||
| return fileSystem.getServerDefaults(path).getTrashInterval() > 0; |
There was a problem hiding this comment.
Will this ever be true? ABFS at least doesn't seem to implement getServerDefaults / derivatives.
There was a problem hiding this comment.
there's standard policies here.
|
|
||
| // move to trash? | ||
| // this will be set if delete fails. | ||
| boolean moveToTrash = args.moveToTrash; |
There was a problem hiding this comment.
This gets quite confusing. There's an argument to specify whether to move to trash, and subsequently there's a check on whether the filesystem has trash enabled - which I think ends up overriding this parameter?
If the FileSystem does have trash enabled - the delete FS operation would automatically take care of moving the contents to trash?
So the config parameter can end up being ignored either way
- FS enables trash, committer config has moveToTrash set to false <-- moveToTrash ignored in favor of the FS having trash enabled (implicitly via the delete).
- FS disables trash, committer config has moveToTrash set to true <-- moveToTrash ignored because the FS does nor support trash.
- FS enables trash, committer config has moveToTrash set to true <-- Redundant, because the delete will use the trash.
- FS disables trash, committer config has moveToTrash set to false <-- trash isn't used.
moveToTrash flag seems to not make a difference?
I think I'm missing something in the overall "moveToTrash being implemented in the committer" scenario.
There was a problem hiding this comment.
I think you are missing that FileSystem.delete() doesn't use trash, FsShell does, but otherwise the apps get to use the trash apis direct.
now, if we were to update delete() to a buillder I'd have it return a future and let you opt() as to whether you wanted atomic delete or a move to trash...
| * Potentially faster on some stores. | ||
| * Value: {@value}. | ||
| */ | ||
| public static final String OPT_CLEANUP_MOVE_TO_TRASH = |
There was a problem hiding this comment.
Left a comment about trash usage elsewhere, and how the config parameter ends up being ignored / irrelevant. Guessing I'm missing something there.
| return dir; | ||
| } | ||
| if (st.isFile()) { | ||
| return file; |
There was a problem hiding this comment.
Would a DirEntry with type = 'file' be valid? Likewise for a FileEntry.
Should there be a sanity validation in DirEntry.dirEntry / new FileEntry
There was a problem hiding this comment.
yes, a dir entry could be a file during task attempt scanning -these will be deleted in job commit.
I don't know if this ever really occurs in production (never had problems with the s3a committer with files being above files), but the FileOutputCommitter protocol implicitly cleans up during its treewalk, deleting the dest path before renaming a file or dir. I'm trying to stay as close to that protocol as I can
| */ | ||
| @InterfaceAudience.LimitedPrivate("mapreduce") | ||
| @InterfaceStability.Unstable | ||
| public class AbfsManifestStoreOperations extends |
There was a problem hiding this comment.
On avoiding the new dependency on hadoop-mapreduce-client (or the reverse where hadoop-mapreduce-client ends up depending on hadoop-cloud-storage / azure specifically)
Does it make sense to make ResilientCommitByRename a little more generic, and move it to hadoop-common as a LimitedPrivate("cloud-storage"?)/Unstable.
AzureBlobFileSystem is the only one to implement the interface right now.
AbfsManifestStoreOperations - can then make decisions based on whether the FileSystem implements the interface (and can potentially be merged into ManifestStoreOperationsThroughFileSystem itself) / or gets renamed to something like ResilientRenameCapableManifestStoreOperationsThroughFileSystem.
All depends on moving ResilientCommitByRename to hadoop-common. I think this is quite a bit cleaner than adding the dependencies in either direction.
There was a problem hiding this comment.
we did look at that at first but it'd force us to do a lot more work in a public API, specifically "get rename right". I've played with this in the past #2735 , with the API we could do for async and slow operations:
#2735 (comment)
problem here is that rename() is the nightmare, even working out what is good is hard, everything is inconsistent etc etc. Doing a special binding api in the abfs module lets us avoid worrying about this, lets us tune other operations if need to worry about problems like rate limiting there too, and lets us remove it when abfs makes this failure mode go away.
I did put the etag source API into hadoop common, with a PathCapabilities probe to verify that a store supports etag preservation over rename. So there is enough in hadoop common for hive to implement its own recovery. because that API is in common, it means that there's no dependency on the mapred- libraries in normal use of the client.
Added a statement in the javadocs of org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename to make clear that there mustn't be any references in there
| } | ||
|
|
||
| @Override | ||
| public boolean storePreservesEtagsThroughRenames(final Path path) { |
There was a problem hiding this comment.
Not needed. This is already checked from the FileSystem itself.
There was a problem hiding this comment.
it varies depending on whether you are talking to a v1 azure store (wasb) or abfs. the ABFS class doesn't provide an implementation of ResilientCommitByRename in that state. so yes, it is implicit "if you have the store, you have etag preservation"
I added this to the ManifestStoreOperations interface so the (optional) verify stage could be enabled and then scan the output dir to verify everything went through. Other stores (gcs?) may be different.
| * However, the actual API MUST NOT be used outside | ||
| * the manifest committer and its tests. | ||
| */ | ||
| @InterfaceAudience.LimitedPrivate("mapreduce, object-stores") |
There was a problem hiding this comment.
Apologies for repeating this all over the place ... object-stores should not be depending on this / implementing this in their own packages (i.e. no MR dependencies for people using object stores).
There was a problem hiding this comment.
there's none in the normal abfs client runtime, but only when you are trying to create committers for them. s3a does this already with its committer.
| */ | ||
| @InterfaceAudience.Private | ||
| @InterfaceStability.Unstable | ||
| public class UnreliableManifestStoreOperations extends ManifestStoreOperations { |
There was a problem hiding this comment.
Should be under 'test', instead of alongside the main code.
There was a problem hiding this comment.
I put it here in case i wanted to make it easier for fault injection in production deployments, but since I've not made it configurable, i will move it into the test package.
new API in hadoop-commonThe I understand your concerns about making mapreduce JAR a dependency, but the jar is optional. you If you want the new commmitter, yes, you need the JAR. But that's always been the case Putting it into hadoop common becomes a commitment to maintain it -it will be used by someone- I'd rather put that off to doing a proper Rename operation which worked well with stores But we'd need to get rename() right there, and that's a nightmare. cleanup
Now, I do agree things are over complex. i was trying to be resilient to timeouts on dir deletes,
This gives.
And all the recovery and tests are gone. What I could do here is reject that error on job setup, rather than waiting for cleanup. do you agree? my next steps
|
* javadocs of ResilientCommitByRename highlight no mapreduce dependencies allowed * moved UnreliableManifestStoreOperations to test source Change-Id: If6d655b958a78dcd132f913f41ade21009474228
…o trash This simplifies the cleanup design. It does mean that if parallel task attempt deletion isn't enough to avoid timeouts, cleanup must be disabled completely. If that occurs on a regular basis in real-world deployments, we can revisit this decision. Change-Id: Iffe28fe1e02c239d114ea51349960c52010273e8
|
I had an idea for testing this committer (and some of the others too). The basic idea is simple: as everything is based on the output directory content we can check the unified diff after each of the operations. One possibility is to represent the whole directory content as string (here the My asserts become: final Path textOutputPath = writeTextOutput(tContext);
dirState = assertDirStateDiff(dirState, Arrays.asList(
"--- before",
"+++ after",
"@@ -1,0 +1,9 @@",
"+file:///_temporary/job_ID_0001/01/tasks/attempt_ID_0001_m_000000_0/part-m-00000",
"+~~~",
"+key1\tval1",
"+val1",
"+val2",
"+key2",
"+key1",
"+key2\tval2",
"+~~~"));As you see I replaced the changing IDs with an ID literal and from the absolute paths I have removed the output directory part. And here comes what I don't like (the huge json files): commitTask(committer, tContext);
dirState = assertDirStateDiff(dirState, Arrays.asList(
"--- before",
"+++ after",
"@@ -9,1 +9,24 @@",
" ~~~",
"+file:///_temporary/job_ID_0001/01/manifests/task_ID_0001_m_000000-manifest.json",
"+~~~",
"+{",
"+ \"extraData\" : { },",
"+ \"type\" : \"org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest/1\",",
"+ \"version\" : 1,",
"+ \"jobId\" : \"job_ID_0001\",",
"+ \"jobAttemptNumber\" : 1,",
"+ \"taskID\" : \"task_ID_0001_m_000000\",",
"+ \"taskAttemptID\" : \"attempt_ID_0001_m_000000_0\",",
"+ \"taskAttemptDir\" : \"file:/_temporary/job_ID_0001/01/tasks/attempt_ID_0001_m_000000_0\",",
"+ \"files\" : [ {",
"+ \"s\" : \"file:///_temporary/job_ID_0001/01/tasks/attempt_ID_0001_m_000000_0/part-m-00000\",",
"+ \"d\" : \"file:/part-m-00000\",",
"+ \"z\" : 40",
"+ } ],",
"+ \"directories\" : [ ],",
"+ \"iostatistics\" : {",
"+ \"counters\" : {",
"+ \"committer_commit_job\" : 0,",
"+ \"op_msync\" : 0,",
"+ \"op_msync.failures\" : 0,",
"+~~~"));Here The code is not that complex: private static List<String> unifiedDiff(List<String> before, List<String> after) {
List<String> res =
UnifiedDiffUtils.generateUnifiedDiff("before", "after", before, DiffUtils.diff(before, after), 1);
// LOG.info("{}", "\"" + String.join("\", \"", res) + "\"");
return res;
}
private static String replaceChangingParts(String replace, String input) {
return input.replaceFirst("job_[0-9]*_", "job_ID_")
.replaceFirst("attempt_[0-9]*_", "attempt_ID_")
.replaceFirst("task_[0-9]*_", "task_ID_")
.replace(replace, "");
}
private void getOutputDirContent(List<String> lines, Path path) throws IOException {
FileStatus[] statuses = getFileSystem().listStatus(path);
for (FileStatus status : statuses) {
if (status.isDirectory()) {
getOutputDirContent(lines, status.getPath());
} else {
lines.add(replaceChangingParts(getOutputDir().toUri().getPath().toString(), status.getPath().toUri().toString()));
lines.add("~~~");
try(FSDataInputStream in = getFileSystem().open(status.getPath());
InputStreamReader is = new InputStreamReader(in);
BufferedReader br = new BufferedReader(is)) {
br.lines().limit(20).forEach((l) -> lines.add(replaceChangingParts(getOutputDir().toUri().getPath().toString(), l)));
}
lines.add("~~~");
}
}
}
private List<String> assertDirStateDiff(List<String> previousDirState, List<String> diffList) throws IOException {
List<String> currentDirState = getOutputDirContent();
Assertions.assertThat(unifiedDiff(previousDirState, currentDirState))
.isEqualTo(diffList);
return currentDirState;
}
private List<String> getOutputDirContent() throws IOException {
List<String> lines = new ArrayList<>();
getOutputDirContent(lines, getOutputDir());
return lines;
}For the unified diff I am using a new dependency (it has Apache-2.0 License): I think one of the advantage is |
|
OK. I'm going to say "sorry, no" to the idea of using diff to validate JSON files; think a bit about dest file validation. JSON is there to be parsed, the bundled diagnostics and iostats change, and the file paths will between local, abfs and gcs. the way to validate it is to read it in and make assertions on it. Alongside this PR, i have a private fork of google gcs which subclasses all the tests and runs them against google cloud and end to end test through spark standalone these tests verify the committer works for dataframe, and spark sql for orc/parquet and csv these tests are loading and validating the success file (and its truncated list of generated files) with the filesystem this is all an evolution of the existing suites for the s3a committers -which is where the success file came from. I would rather do the detailed test here as they are full integration tests. It is fairly tricky to get them building however; takes an hour+ for a full compile, which needs to be repeated every morning (-SNAPSHOT artifacts, see). what i can do in the hadoop tests is add a test to load a success file and validate it against the output, and that there are no unknown files there. i'd love some suggestions as improvements to the spark ones too. it's a mix of my own and some I moved from the apache spark sql suites and reworked to be targetable at different filesystems. one thing i don't test there is writing data over existing files in a complex partition tree...i should do that, which i can do after this patch is in... |
* properly marshall filenames with spaces. this is only for testing, but as our test paths have spaces in.. * remove all remaining references to move to trash in committer code and docs. Change-Id: I3098c175d386de6f9768b08f7399af0de075b17e
logs even on the normal route, so that we can see when that is still be picked up. Change-Id: Ia022beb0131720c105110ab4334ba1627d6e6bb6
|
just pushed an update with
there's a hardcoded limit on the number of files which get listed in that success data (100), so that on big jobs the time to write the success file doesn't itself slow the job down. is that too big a number? as if paths are long you could still have 50 kib of data or more. tested azure cardiff. |
Change-Id: I4a111f862c1ea725367c28b6c74377fec658824b
...uce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md
Show resolved
Hide resolved
...uce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md
Outdated
Show resolved
Hide resolved
...uce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md
Outdated
Show resolved
Hide resolved
...java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterFactory.java
Outdated
Show resolved
Hide resolved
| /** | ||
| * Extension of StoreOperationsThroughFileSystem with ABFS awareness. | ||
| * Purely for use by jobs committing work through the manifest committer. | ||
| * The {@link AzureManifestCommitterFactory} will configure the |
There was a problem hiding this comment.
typo: will configure twice.
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
Show resolved
Hide resolved
| * Constructor. | ||
| * @param capacity capacity in permits/second. | ||
| */ | ||
| private RestrictedRateLimiting(int capacity) { |
There was a problem hiding this comment.
Should we change the name to maxCapacity?
There was a problem hiding this comment.
i will call it capacityPerSecond to make it clear it uses time as part of the process
...p-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimitingFactory.java
Outdated
Show resolved
Hide resolved
| */ | ||
| @InterfaceAudience.Private | ||
| @InterfaceStability.Unstable | ||
| public interface RateLimiting { |
There was a problem hiding this comment.
I think we need to experiment/test more with test before moving for all operations in FS itself.
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimiting.java
Outdated
Show resolved
Hide resolved
| --> | ||
| <dependency> | ||
| <groupId>org.apache.hadoop</groupId> | ||
| <artifactId>hadoop-mapreduce-client-core</artifactId> |
There was a problem hiding this comment.
scope "provided" means used to compile but not included in the dependency list of the published artifact
there are no new runtime dependencies unless you want to use the manifest committer
| <dependency> | ||
| <groupId>org.apache.hadoop</groupId> | ||
| <artifactId>hadoop-mapreduce-client-core</artifactId> | ||
| <scope>test</scope> |
There was a problem hiding this comment.
this does mean that abfs-test jar does need it on the classpath.
if i declare this as provided, that will not be the case.
but it will then be a compile time dependency of the production code
| <type>test-jar</type> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.apache.hadoop</groupId> |
There was a problem hiding this comment.
scope is test only. we have lots of other test scope only dependencies already and it is not an issue
note also the hadoop-azure-test jar will not publish any of its transitive dependencies at all. so nobody will notice. which is good as we already pull in things like distcp
all trivial changes and mostly markdown and javadocs. Change-Id: Ideec4e63d345b0ecb967aa75da8a82c1ff01ccda
|
I have addressed all the little nits in the code -thank you for reviewing the text and Java docs so thoroughly. I have also improved that rejection of filesystem by schema, adding wasb to the set of unsupported stores, Once we add a better rename api to the filesystem/filecontext, we can add path capabilities for renames I have also tried to clarify in the Java docs and comments places where there was ambiguity. This includes comments in the hadoop-azure pom where the references to the mapreduce jars are imported with scopes of Everyone reviewing this patch needs to understand what these scopes mean, so they will understand why I have no intention of changing those declarations or adding anything to hadoop-common.
It is not a requirement to use the file system, nor is it exported as a new dependency. I am 100% confident of this because these same dependencies were added to hadoop-aws in HADOOP-13786, Add S3A committers for zero-rename commits to S3 endpoints -and nobody has ever reported the filesystem not instantiating. note, cloudstore storediag doesn't put these dependencies on the cp when invoked via tested: azure cardiff |
|
use of set code upset checkstyle. lets try again |
Change-Id: I963fc4ddff55e896a4c0f806ef4336bee3a2c0c7
mukund-thakur
left a comment
There was a problem hiding this comment.
+1, LGTM
I have reviewed the main algorithm, all stage execution and abfs changes. Impressive stage wise
implementation and mainifest committer integration.
Ready to go in with following things to track for future:
- There are pending questions from others. I have tried to clarify some of them based on my understanding.
2)We made the RateLimiting feature experimental as it is in initial phase. Will require more testing from QE and performance team.
-
Not moving the ResilientCommitByRename to hadoop-common as that will make it a public and require commitment for maintenance
in future. We plan to add new rename builder public api which can take parameters like etag etc. -
Haven't reviewed the iostats integration in detail but overall looks nice to see so many stats added related to committer.
-
TaskPool is already reviewed old code. So not reviewing it right now because of less time.
-
Haven't been able to review test code.
-
CreateOutputDirectoriesStage seemed a bit complex to me. Maybe we can do session to understand that later.
|
|
||
| /** | ||
| * Callback on stage entry. | ||
| * Sets the sactiveStage and updates the |
There was a problem hiding this comment.
nit : typo sactiveStage
There was a problem hiding this comment.
thanks, fixed
| // update the manifest list in a synchronized block. | ||
|
|
||
| synchronized (manifests) { | ||
| manifests.add(m); |
There was a problem hiding this comment.
One thing to notice here is manifests can grow in size and cause OOM.
There was a problem hiding this comment.
surfaced on s3a committer only on multi TB terasorts...but there each file included a list of etags, so was much bigger
merging all manifest dir lists lets us optimise dir creation; to do that and stil do independent manifest committing of work would require double loading of manifests, one for dir setup and another for renaming. I did start off with some parallel work here but it is both complex (need two thread pools to avoid deadlocks) and didn't seem tangibly more efficient
if it surfaces in real world jobs then i can worry about it.
| leaves.put(path, entry); | ||
|
|
||
| // if it is a file to delete, record this. | ||
| if (entry.getStatus() == EntryStatus.file) { |
There was a problem hiding this comment.
when will this case happen? as we are only getting the directories.
There was a problem hiding this comment.
that list of dirs is built up in task commit, where each task checks the status of the equivalent dir in the dest path. so we know which target dirs have files, as well as which don't exist
| OP_COMMIT_FILE_RENAME, () -> | ||
| operations.renameFile(source, dest)); | ||
| } | ||
| return new CommitOutcome(); |
There was a problem hiding this comment.
nit: CommitOutcome is empty only. Why not just void?
There was a problem hiding this comment.
leave open for others
| * @return status or null | ||
| * @throws IOException IO Failure. | ||
| */ | ||
| protected Boolean delete( |
There was a problem hiding this comment.
I think it is just refactoring and delete is called from multiple Stages.
| * @param <IN> Type of arguments to the stage. | ||
| * @param <OUT> Type of result. | ||
| */ | ||
| public abstract class AbstractJobCommitStage<IN, OUT> |
There was a problem hiding this comment.
I agree with name change as there are methods here which is getting called from multiple stages.
| trackDurationOfInvocation(getIOStatistics(), OP_LOAD_ALL_MANIFESTS, () -> | ||
| TaskPool.foreach(manifestFiles) | ||
| .executeWith(getIOProcessors()) | ||
| .stopOnFailure() |
There was a problem hiding this comment.
I think any exception in any thred will stop the ececution and will be thrown be from the method and finally a stage failure is reported.
| // list the directory. This may block until the listing is complete, | ||
| // or, if the FS does incremental or asynchronous fetching, until the | ||
| // first page of results is ready. | ||
| final RemoteIterator<FileStatus> listing = listStatusIterator(srcDir); |
There was a problem hiding this comment.
This is async paged list request and non recursive.
mehakmeet
left a comment
There was a problem hiding this comment.
Looks good. Some minor comments ran the whole Azure test suite with the new tests. took 2.5hr to complete, maybe I have to tune down the Configs in ITestAbfsTerasort for better performance?
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 02:39 h
[INFO] Finished at: 2022-03-16T13:50:48+05:30
[INFO] ------------------------------------------------------------------------
| return result.getStatusCode() == HttpURLConnection.HTTP_OK | ||
| && sourceEtag.equals(extractEtagHeader(result)); | ||
| } catch (AzureBlobFileSystemException ignored) { | ||
| // GetFileStatus on the destination failed, the rename did not take place |
There was a problem hiding this comment.
Can we add Debug logs here for this scenario?
| /** | ||
| * Prepare the test configuration. | ||
| * @param contractTestBinding test binding | ||
| * @return an extraced and patched configuration. |
| * the HDFS cluster binding is implicitly propagated to YARN. | ||
| * If one is not requested, the local filesystem is used as the cluster FS. | ||
| * @param conf configuration to start with. | ||
| * @param useHDFS should an HDFS cluster be instantiated. |
There was a problem hiding this comment.
aah, relic of the s3a code
| } | ||
|
|
||
|
|
||
| protected Job createJob(Configuration jobConf) throws IOException { |
There was a problem hiding this comment.
We never use this method
.../apache/hadoop/mapreduce/lib/output/committer/manifest/TestCreateOutputDirectoriesStage.java
Show resolved
Hide resolved
...test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestManifestStoreOperations.java
Outdated
Show resolved
Hide resolved
...test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestManifestStoreOperations.java
Outdated
Show resolved
Hide resolved
|
@mehakmeet thanks for your comments. make sure you aren't VPN'd to a vpn which redirects all azure/aws/gcs io, otherwise all your requests go round the world. i have hit this on GCS. |
renaming base stage class JobOrTaskStage, fix javadocs Change-Id: Icefbdfb81ca5b6f77640408d5ff395265444972a
A File output committer which uses a manifest file to pass the lists of directories to create and files to rename from task attempts to job committer; job commit does dir creation and file rename across a pool of threads. Based on lessons/code from the S3A committer,
It is faster than the V1 algorithm, does not require atomic/O(1) directory rename. It will be slower that v2 as files are renamed in job commit, but as it has prepared the lists of files to copy, directory creation and file rename (with deletes of destination artifacts) are all that is needed.
This committer works with any consistent filesystem/objects store for which file rename is fast. It does not perform any directory renames, so has no requirements there. It uses
listStatusIterator()to walk the tree of files to commit. This makes it straightforward to also build up the list of dirs to create -but it does mean it will underperform against any store for which a recursivelistStatus(path, true)is significantly faster. That is true for S3A, but this committer is not for use there. rename() takes too long.It is targeted at Apache Spark and does not support job recovery. There's no fundamental reason why this could not be added by a sufficiently motivated individual.
Status:
Features
Improvements
Tests
Functional tests are done on a par with s3a committers, want to add better fault injection (the design lines up for this nicely as we can test each stage in isolation), and some scale tests.
I'm adding spark integration tests in https://github.com/hortonworks-spark/cloud-integration ;already (july 2021) working
and on to stage conf. Paths valid etc.