Skip to content

MAPREDUCE-7341. Intermediate Manifest Committer#2971

Closed
steveloughran wants to merge 52 commits intoapache:trunkfrom
steveloughran:mr/MAPREDUCE-7341-manifest-committer
Closed

MAPREDUCE-7341. Intermediate Manifest Committer#2971
steveloughran wants to merge 52 commits intoapache:trunkfrom
steveloughran:mr/MAPREDUCE-7341-manifest-committer

Conversation

@steveloughran
Copy link
Contributor

@steveloughran steveloughran commented May 3, 2021

A File output committer which uses a manifest file to pass the lists of directories to create and files to rename from task attempts to job committer; job commit does dir creation and file rename across a pool of threads. Based on lessons/code from the S3A committer,

It is faster than the V1 algorithm, does not require atomic/O(1) directory rename. It will be slower that v2 as files are renamed in job commit, but as it has prepared the lists of files to copy, directory creation and file rename (with deletes of destination artifacts) are all that is needed.

This committer works with any consistent filesystem/objects store for which file rename is fast. It does not perform any directory renames, so has no requirements there. It uses listStatusIterator() to walk the tree of files to commit. This makes it straightforward to also build up the list of dirs to create -but it does mean it will underperform against any store for which a recursive listStatus(path, true) is significantly faster. That is true for S3A, but this committer is not for use there. rename() takes too long.

It is targeted at Apache Spark and does not support job recovery. There's no fundamental reason why this could not be added by a sufficiently motivated individual.

Status:

  • file formats.
  • implementation architecture "stages you can chain" done
  • Fork of S3A ITest contract test done, with implementations for local FS and ABFS.
  • ABFS Terasort ITest
  • Basic spark tests with validation of _SUCCESS output

Features

  • build stage config from job/task config
  • directory prepare/cleanup (parallelised)
  • wire up committer
  • progress callbacks
  • delete all task attempts in parallel for performance/scale on gcs and abfs when oauth authenticated
  • Always save _SUCCESS results to a report dir, even on failure
  • each task attempt to save its preparation time to the iostats in task manifest; job stats to aggregate these for ease of measuring task commit performance.

Improvements

  • improve performance of PrepareDirectoriesStage
  • RateLimiting class in hadoop common to wrap guava one; hide from modules the origin of the limiter. This could automatically update IOStats source. (oh no, it'll need a test too...)

Tests

Functional tests are done on a par with s3a committers, want to add better fault injection (the design lines up for this nicely as we can test each stage in isolation), and some scale tests.

I'm adding spark integration tests in https://github.com/hortonworks-spark/cloud-integration ;already (july 2021) working

  • job/task conf to manifest committer conf
    and on to stage conf. Paths valid etc.
  • round trip of manifest
  • _SUCCESS
  • individual stages with large fake dir tree
  • directory merge
  • fail fast if job or task attempt directory exists
  • abstract protocol test
  • Source File missing on rename
  • Dest File is file/dir
  • Parent entry of a directory to create is actually a file.
  • other job overwrites parent dir with file
  • two jobs overwrite same file (will only surface in validate)
  • two task attempts writing to same dest
  • multi-task job commit with failure in one of the tasks while others are active
  • SPARK_WRITE_UUID picked up from job to _SUCCESS
  • overwrite an existing tree with new data
  • cleanup stage: rename to trash option
  • cleanup stage: parallel TA delete. Use iostats to count #of deletes, or return # in return value.
  • many task attempts in a task dir
  • wrong task ID in a loaded TA? actually, we don't check.
  • rate limited job commit on local fs with triggering

@apache apache deleted a comment from hadoop-yetus May 3, 2021
@apache apache deleted a comment from hadoop-yetus May 3, 2021
@apache apache deleted a comment from hadoop-yetus May 5, 2021
@steveloughran steveloughran force-pushed the mr/MAPREDUCE-7341-manifest-committer branch from 4631327 to 7d65947 Compare May 10, 2021 12:55
@apache apache deleted a comment from hadoop-yetus May 11, 2021
@apache apache deleted a comment from hadoop-yetus May 11, 2021
@apache apache deleted a comment from hadoop-yetus May 12, 2021
@steveloughran steveloughran marked this pull request as draft May 13, 2021 19:13
@steveloughran steveloughran added enhancement fs/azure changes related to azure; submitter must declare test endpoint MapReduce labels May 13, 2021
@apache apache deleted a comment from hadoop-yetus May 22, 2021
@steveloughran steveloughran force-pushed the mr/MAPREDUCE-7341-manifest-committer branch from 5e8cdd3 to 4f34112 Compare May 25, 2021 15:08
@apache apache deleted a comment from hadoop-yetus Jun 1, 2021
@apache apache deleted a comment from hadoop-yetus Jun 1, 2021
@apache apache deleted a comment from hadoop-yetus Jun 1, 2021
@apache apache deleted a comment from hadoop-yetus Jun 1, 2021
@steveloughran steveloughran force-pushed the mr/MAPREDUCE-7341-manifest-committer branch from 7872e5f to efad653 Compare June 5, 2021 11:41
Copy link
Contributor

@mukund-thakur mukund-thakur left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went through the prod code. Looks really promising.
And glad to see the usage of state design pattern ( not exactly but similar)

@apache apache deleted a comment from hadoop-yetus Jun 8, 2021
@apache apache deleted a comment from hadoop-yetus Jun 8, 2021
@steveloughran steveloughran force-pushed the mr/MAPREDUCE-7341-manifest-committer branch from efad653 to 3f07572 Compare June 9, 2021 17:12
Copy link

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just some nits in the documentation

steveloughran and others added 6 commits March 10, 2022 18:02
* renamed the prepare.target.files option to delete.target.files as
  there's no longer parent dir preparation as an option.
* clean up the docs to match
* optimise the deletion by using the list of created dirs from
  the mkdir phase: if a dir was created, there's no need to check
  for/delete the target files.
* test

The test is a bit tricky as the local fs lets you rename a file onto a file;
we use the FS XML contracts to probe when does an fs reject it, which
we know abfs does.

Change-Id: I0840a533a85280fad2450eda9fb999948e73ea8a
Co-authored-by: Attila Zsolt Piros <[email protected]>
Co-authored-by: Attila Zsolt Piros <[email protected]>
Co-authored-by: Attila Zsolt Piros <[email protected]>
Co-authored-by: Attila Zsolt Piros <[email protected]>
Co-authored-by: Attila Zsolt Piros <[email protected]>
@steveloughran
Copy link
Contributor Author

@attilapiros thanks for those suggestions, just tried them through the "commit suggestions" button in github. nice

@sidseth
Copy link
Contributor

sidseth commented Mar 11, 2022

May be useful to move some of the logs in PathOutputCommitter factory to INFO level? - to easily find out which committer is being used.

Copy link
Contributor

@sidseth sidseth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments. The main concern for me is still the dependency.
Have not reviewed the ABFS changes, and assuming the TaskPool was already reviewed.

if (exception != null) {
// failure, report and continue
LOG.warn("{}: Deleting job directory {} failed: moving to trash", getName(), baseDir);
moveToTrash = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be automatically enabled, right? Especially in the absence of any entity to go and clean-up this data at a later point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good q. its about handling timeout of dir deletion in azure, which we've seen trying to delete the whole of _temporary and oauth...things were timing out. here we do delete each TA subdir in parallel. so the risk is much smaller -a single task would have to contain too many dirs, whereas today its #of dirs in a task attempt * number of task attempts.

@Override
public boolean isTrashEnabled(Path path) {
try {
return fileSystem.getServerDefaults(path).getTrashInterval() > 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this ever be true? ABFS at least doesn't seem to implement getServerDefaults / derivatives.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's standard policies here.


// move to trash?
// this will be set if delete fails.
boolean moveToTrash = args.moveToTrash;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets quite confusing. There's an argument to specify whether to move to trash, and subsequently there's a check on whether the filesystem has trash enabled - which I think ends up overriding this parameter?

If the FileSystem does have trash enabled - the delete FS operation would automatically take care of moving the contents to trash?

So the config parameter can end up being ignored either way

  1. FS enables trash, committer config has moveToTrash set to false <-- moveToTrash ignored in favor of the FS having trash enabled (implicitly via the delete).
  2. FS disables trash, committer config has moveToTrash set to true <-- moveToTrash ignored because the FS does nor support trash.
  3. FS enables trash, committer config has moveToTrash set to true <-- Redundant, because the delete will use the trash.
  4. FS disables trash, committer config has moveToTrash set to false <-- trash isn't used.

moveToTrash flag seems to not make a difference?

I think I'm missing something in the overall "moveToTrash being implemented in the committer" scenario.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are missing that FileSystem.delete() doesn't use trash, FsShell does, but otherwise the apps get to use the trash apis direct.

now, if we were to update delete() to a buillder I'd have it return a future and let you opt() as to whether you wanted atomic delete or a move to trash...

* Potentially faster on some stores.
* Value: {@value}.
*/
public static final String OPT_CLEANUP_MOVE_TO_TRASH =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a comment about trash usage elsewhere, and how the config parameter ends up being ignored / irrelevant. Guessing I'm missing something there.

return dir;
}
if (st.isFile()) {
return file;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a DirEntry with type = 'file' be valid? Likewise for a FileEntry.
Should there be a sanity validation in DirEntry.dirEntry / new FileEntry

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, a dir entry could be a file during task attempt scanning -these will be deleted in job commit.

I don't know if this ever really occurs in production (never had problems with the s3a committer with files being above files), but the FileOutputCommitter protocol implicitly cleans up during its treewalk, deleting the dest path before renaming a file or dir. I'm trying to stay as close to that protocol as I can

*/
@InterfaceAudience.LimitedPrivate("mapreduce")
@InterfaceStability.Unstable
public class AbfsManifestStoreOperations extends
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On avoiding the new dependency on hadoop-mapreduce-client (or the reverse where hadoop-mapreduce-client ends up depending on hadoop-cloud-storage / azure specifically)

Does it make sense to make ResilientCommitByRename a little more generic, and move it to hadoop-common as a LimitedPrivate("cloud-storage"?)/Unstable.
AzureBlobFileSystem is the only one to implement the interface right now.

AbfsManifestStoreOperations - can then make decisions based on whether the FileSystem implements the interface (and can potentially be merged into ManifestStoreOperationsThroughFileSystem itself) / or gets renamed to something like ResilientRenameCapableManifestStoreOperationsThroughFileSystem.

All depends on moving ResilientCommitByRename to hadoop-common. I think this is quite a bit cleaner than adding the dependencies in either direction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we did look at that at first but it'd force us to do a lot more work in a public API, specifically "get rename right". I've played with this in the past #2735 , with the API we could do for async and slow operations:
#2735 (comment)

problem here is that rename() is the nightmare, even working out what is good is hard, everything is inconsistent etc etc. Doing a special binding api in the abfs module lets us avoid worrying about this, lets us tune other operations if need to worry about problems like rate limiting there too, and lets us remove it when abfs makes this failure mode go away.

I did put the etag source API into hadoop common, with a PathCapabilities probe to verify that a store supports etag preservation over rename. So there is enough in hadoop common for hive to implement its own recovery. because that API is in common, it means that there's no dependency on the mapred- libraries in normal use of the client.
Added a statement in the javadocs of org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename to make clear that there mustn't be any references in there

}

@Override
public boolean storePreservesEtagsThroughRenames(final Path path) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed. This is already checked from the FileSystem itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it varies depending on whether you are talking to a v1 azure store (wasb) or abfs. the ABFS class doesn't provide an implementation of ResilientCommitByRename in that state. so yes, it is implicit "if you have the store, you have etag preservation"

I added this to the ManifestStoreOperations interface so the (optional) verify stage could be enabled and then scan the output dir to verify everything went through. Other stores (gcs?) may be different.

* However, the actual API MUST NOT be used outside
* the manifest committer and its tests.
*/
@InterfaceAudience.LimitedPrivate("mapreduce, object-stores")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for repeating this all over the place ... object-stores should not be depending on this / implementing this in their own packages (i.e. no MR dependencies for people using object stores).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's none in the normal abfs client runtime, but only when you are trying to create committers for them. s3a does this already with its committer.

*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class UnreliableManifestStoreOperations extends ManifestStoreOperations {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be under 'test', instead of alongside the main code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put it here in case i wanted to make it easier for fault injection in production deployments, but since I've not made it configurable, i will move it into the test package.

@steveloughran
Copy link
Contributor Author

new API in hadoop-common

The hadoop-mapreduce-client-core JAR is only used in AbfsManifestStoreOperations and

I understand your concerns about making mapreduce JAR a dependency, but the jar is optional. you
don't need it for FS operations. maven tags it as provided so it isn't published transitively, and
the only imports are in a new committer factory and the store operations.

If you want the new commmitter, yes, you need the JAR. But that's always been the case
(and why spark, ParquetOutputFormat etc all import it).

Putting it into hadoop common becomes a commitment to maintain it -it will be used by someone-
and adds the need for better testing and strict spec.

I'd rather put that off to doing a proper Rename operation which worked well with stores
as well as HDFS, and always raised an exception on failure, and come in on an interface both
FileSystem and FileContext implemented. Being builder based, etags would be an optional
builder param for stores which cared.

But we'd need to get rename() right there, and that's a nightmare.

cleanup

FileSystem.delete() doesn't use trash, so the moveToTrash flag
is needed.

FS trash moveToTrash outcome
true false files deleted
true true files to trash
false false files deleted
false true files deleted

Now, I do agree things are over complex. i was trying to be resilient to timeouts on dir deletes,
but as I think that will be rare, how about I change the code

  • no attempt to downgrade from delete to move to trash if delete raises an exception
  • if moveToTrash is true, then fs trash must be enabled.

This gives.

FS trash moveToTrash outcome
true false files deleted
true true files to trash
false false files deleted
false true error

And all the recovery and tests are gone.

What I could do here is reject that error on job setup, rather than waiting for cleanup.

do you agree?

my next steps

  • will cut recovery from delete failures in cleanup.
  • log committer at info in factory
  • whatever else yetus complains of

* javadocs of ResilientCommitByRename highlight no mapreduce dependencies
  allowed
* moved UnreliableManifestStoreOperations to test source

Change-Id: If6d655b958a78dcd132f913f41ade21009474228
…o trash

This simplifies the cleanup design.
It does mean that if parallel task attempt deletion isn't enough to
avoid timeouts, cleanup must be disabled completely.

If that occurs on a regular basis in real-world deployments, we
can revisit this decision.

Change-Id: Iffe28fe1e02c239d114ea51349960c52010273e8
@attilapiros
Copy link

I had an idea for testing this committer (and some of the others too).
Actually I started to put it together but now I am a bit uncertain how valuable this will be so I decided to share it with you in the current state before I waste too much time.

The basic idea is simple: as everything is based on the output directory content we can check the unified diff after each of the operations. One possibility is to represent the whole directory content as string (here the before and after just arbitrary labels).

My asserts become:

    final Path textOutputPath = writeTextOutput(tContext);
    dirState = assertDirStateDiff(dirState, Arrays.asList(
        "--- before",
        "+++ after",
        "@@ -1,0 +1,9 @@",
        "+file:///_temporary/job_ID_0001/01/tasks/attempt_ID_0001_m_000000_0/part-m-00000",
        "+~~~",
        "+key1\tval1",
        "+val1",
        "+val2",
        "+key2",
        "+key1",
        "+key2\tval2",
        "+~~~"));

As you see I replaced the changing IDs with an ID literal and from the absolute paths I have removed the output directory part.

And here comes what I don't like (the huge json files):

  commitTask(committer, tContext);
  dirState = assertDirStateDiff(dirState, Arrays.asList(
    "--- before",
    "+++ after",
    "@@ -9,1 +9,24 @@",
    " ~~~",
    "+file:///_temporary/job_ID_0001/01/manifests/task_ID_0001_m_000000-manifest.json",
    "+~~~",
    "+{",
    "+  \"extraData\" : { },",
    "+  \"type\" : \"org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest/1\",",
    "+  \"version\" : 1,",
    "+  \"jobId\" : \"job_ID_0001\",",
    "+  \"jobAttemptNumber\" : 1,",
    "+  \"taskID\" : \"task_ID_0001_m_000000\",",
    "+  \"taskAttemptID\" : \"attempt_ID_0001_m_000000_0\",",
    "+  \"taskAttemptDir\" : \"file:/_temporary/job_ID_0001/01/tasks/attempt_ID_0001_m_000000_0\",",
    "+  \"files\" : [ {",
    "+    \"s\" : \"file:///_temporary/job_ID_0001/01/tasks/attempt_ID_0001_m_000000_0/part-m-00000\",",
    "+    \"d\" : \"file:/part-m-00000\",",
    "+    \"z\" : 40",
    "+  } ],",
    "+  \"directories\" : [ ],",
    "+  \"iostatistics\" : {",
    "+    \"counters\" : {",
    "+      \"committer_commit_job\" : 0,",
    "+      \"op_msync\" : 0,",
    "+      \"op_msync.failures\" : 0,",
    "+~~~"));

Here assertDirStateDiff always cut the files longer then 20 lines. But we could use a an external resource too for the longer files.

The code is not that complex:

  private static List<String> unifiedDiff(List<String> before, List<String> after) {
    List<String> res =
      UnifiedDiffUtils.generateUnifiedDiff("before", "after", before, DiffUtils.diff(before, after), 1);
    // LOG.info("{}", "\"" + String.join("\", \"", res) + "\"");
    return res;
  }

  private static String replaceChangingParts(String replace, String input) {
    return input.replaceFirst("job_[0-9]*_", "job_ID_")
      .replaceFirst("attempt_[0-9]*_", "attempt_ID_")
      .replaceFirst("task_[0-9]*_", "task_ID_")
      .replace(replace, "");
  }

  private void getOutputDirContent(List<String> lines, Path path) throws IOException {
    FileStatus[] statuses = getFileSystem().listStatus(path);
    for (FileStatus status : statuses) {
      if (status.isDirectory()) {
        getOutputDirContent(lines, status.getPath());
      } else {
        lines.add(replaceChangingParts(getOutputDir().toUri().getPath().toString(), status.getPath().toUri().toString()));
        lines.add("~~~");
        try(FSDataInputStream in = getFileSystem().open(status.getPath());
            InputStreamReader is = new InputStreamReader(in);
            BufferedReader br = new BufferedReader(is)) {
          br.lines().limit(20).forEach((l) -> lines.add(replaceChangingParts(getOutputDir().toUri().getPath().toString(), l)));
        }
        lines.add("~~~");
      }
    }
  }

  private List<String> assertDirStateDiff(List<String> previousDirState, List<String> diffList) throws IOException {
    List<String> currentDirState = getOutputDirContent();
    Assertions.assertThat(unifiedDiff(previousDirState, currentDirState))
      .isEqualTo(diffList);
    return currentDirState;
  }

  private List<String> getOutputDirContent() throws IOException {
    List<String> lines = new ArrayList<>();
    getOutputDirContent(lines, getOutputDir());
    return lines;
  }

For the unified diff I am using a new dependency (it has Apache-2.0 License):

    <dependency>
      <groupId>io.github.java-diff-utils</groupId>
      <artifactId>java-diff-utils</artifactId>
      <version>4.9</version>
    </dependency>

I think one of the advantage is getOutputDirContent is always a full scan and if something need to be removed it will be in contained in the unified diff and never be left out.
Do you see any value in this idea? Is not this too much restrictions on this code?

@steveloughran
Copy link
Contributor Author

OK.

I'm going to say "sorry, no" to the idea of using diff to validate JSON files; think a bit about dest file validation.

JSON is there to be parsed, the bundled diagnostics and iostats change, and the file paths will between local, abfs and gcs.

the way to validate it is to read it in and make assertions on it.

Alongside this PR, i have a private fork of google gcs which subclasses all the tests and runs them against google cloud

and end to end test through spark standalone
https://github.com/hortonworks-spark/cloud-integration

these tests verify the committer works for dataframe, and spark sql for orc/parquet and csv
https://github.com/hortonworks-spark/cloud-integration/blob/master/cloud-examples/src/test/scala/com/cloudera/spark/cloud/abfs/commit/AbfsCommitDataframeSuite.scala#L83
https://github.com/hortonworks-spark/cloud-integration/tree/master/cloud-examples/src/test/scala/org/apache/spark/sql/hive/orc/abfs
https://github.com/hortonworks-spark/cloud-integration/tree/master/cloud-examples/src/test/scala/org/apache/spark/sql/hive/orc/gs

these tests are loading and validating the success file (and its truncated list of generated files) with the filesystem
https://github.com/hortonworks-spark/cloud-integration/blob/master/cloud-examples/src/main/scala/com/cloudera/spark/cloud/s3/S3AOperations.scala#L54

this is all an evolution of the existing suites for the s3a committers -which is where the success file came from.

I would rather do the detailed test here as they are full integration tests. It is fairly tricky to get them building however; takes an hour+ for a full compile, which needs to be repeated every morning (-SNAPSHOT artifacts, see).

what i can do in the hadoop tests is add a test to load a success file and validate it against the output, and that there are no unknown files there.

i'd love some suggestions as improvements to the spark ones too. it's a mix of my own and some I moved from the apache spark sql suites and reworked to be targetable at different filesystems. one thing i don't test there is writing data over existing files in a complex partition tree...i should do that, which i can do after this patch is in...

* properly marshall filenames with spaces.
  this is only for testing, but as our test paths have spaces in..
* remove all remaining references to move to trash in committer code and docs.

Change-Id: I3098c175d386de6f9768b08f7399af0de075b17e
logs even on the normal route, so that we can see when that is
still be picked up.

Change-Id: Ia022beb0131720c105110ab4334ba1627d6e6bb6
@steveloughran
Copy link
Contributor Author

just pushed an update with

  • PathOutputCommitter logs at factory
  • a bit more vaildation of the manifest summary data (which showed the testing-only-path list wasn't marshalling spaces properly, a bug which must still be in the s3a code)

there's a hardcoded limit on the number of files which get listed in that success data (100), so that on big jobs the time to write the success file doesn't itself slow the job down.

is that too big a number? as if paths are long you could still have 50 kib of data or more.
could cut down to something minimal, like, say, 20. enough for basic tests but not for performance issues.

tested azure cardiff.

Change-Id: I4a111f862c1ea725367c28b6c74377fec658824b
/**
* Extension of StoreOperationsThroughFileSystem with ABFS awareness.
* Purely for use by jobs committing work through the manifest committer.
* The {@link AzureManifestCommitterFactory} will configure the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: will configure twice.

* Constructor.
* @param capacity capacity in permits/second.
*/
private RestrictedRateLimiting(int capacity) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change the name to maxCapacity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will call it capacityPerSecond to make it clear it uses time as part of the process

*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface RateLimiting {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to experiment/test more with test before moving for all operations in FS itself.

-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scope "provided" means used to compile but not included in the dependency list of the published artifact

there are no new runtime dependencies unless you want to use the manifest committer

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>test</scope>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this does mean that abfs-test jar does need it on the classpath.

if i declare this as provided, that will not be the case.

but it will then be a compile time dependency of the production code

<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scope is test only. we have lots of other test scope only dependencies already and it is not an issue

note also the hadoop-azure-test jar will not publish any of its transitive dependencies at all. so nobody will notice. which is good as we already pull in things like distcp

all trivial changes and mostly markdown and javadocs.

Change-Id: Ideec4e63d345b0ecb967aa75da8a82c1ff01ccda
@steveloughran
Copy link
Contributor Author

steveloughran commented Mar 15, 2022

@mukund-thakur,

I have addressed all the little nits in the code -thank you for reviewing the text and Java docs so thoroughly.

I have also improved that rejection of filesystem by schema, adding wasb to the set of unsupported stores,
and a test for it.

Once we add a better rename api to the filesystem/filecontext, we can add path capabilities for renames
to probe for.

I have also tried to clarify in the Java docs and comments places where there was ambiguity.

This includes comments in the hadoop-azure pom where the references to the mapreduce jars are imported with scopes of provided and test.

Everyone reviewing this patch needs to understand what these scopes mean, so they will understand why I have no intention of changing those declarations or adding anything to hadoop-common.

scope name classpath of transitive?
provided src/main and src/test builds no
test src/test builds no

It is not a requirement to use the file system, nor is it exported as a new dependency. I am 100% confident of this because these same dependencies were added to hadoop-aws in HADOOP-13786, Add S3A committers for zero-rename commits to S3 endpoints -and nobody has ever reported the filesystem not instantiating.

note, cloudstore storediag doesn't put these dependencies on the cp when invoked via hadoop jar cloudstore.jar storediag so verifying my statements is straightforward.

tested: azure cardiff

@steveloughran
Copy link
Contributor Author

use of set code upset checkstyle. lets try again

Change-Id: I963fc4ddff55e896a4c0f806ef4336bee3a2c0c7
Copy link
Contributor

@mukund-thakur mukund-thakur left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM
I have reviewed the main algorithm, all stage execution and abfs changes. Impressive stage wise
implementation and mainifest committer integration.
Ready to go in with following things to track for future:

  1. There are pending questions from others. I have tried to clarify some of them based on my understanding.

2)We made the RateLimiting feature experimental as it is in initial phase. Will require more testing from QE and performance team.

  1. Not moving the ResilientCommitByRename to hadoop-common as that will make it a public and require commitment for maintenance
    in future. We plan to add new rename builder public api which can take parameters like etag etc.

  2. Haven't reviewed the iostats integration in detail but overall looks nice to see so many stats added related to committer.

  3. TaskPool is already reviewed old code. So not reviewing it right now because of less time.

  4. Haven't been able to review test code.

  5. CreateOutputDirectoriesStage seemed a bit complex to me. Maybe we can do session to understand that later.


/**
* Callback on stage entry.
* Sets the sactiveStage and updates the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : typo sactiveStage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, fixed

// update the manifest list in a synchronized block.

synchronized (manifests) {
manifests.add(m);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to notice here is manifests can grow in size and cause OOM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

surfaced on s3a committer only on multi TB terasorts...but there each file included a list of etags, so was much bigger

merging all manifest dir lists lets us optimise dir creation; to do that and stil do independent manifest committing of work would require double loading of manifests, one for dir setup and another for renaming. I did start off with some parallel work here but it is both complex (need two thread pools to avoid deadlocks) and didn't seem tangibly more efficient

if it surfaces in real world jobs then i can worry about it.

leaves.put(path, entry);

// if it is a file to delete, record this.
if (entry.getStatus() == EntryStatus.file) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will this case happen? as we are only getting the directories.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that list of dirs is built up in task commit, where each task checks the status of the equivalent dir in the dest path. so we know which target dirs have files, as well as which don't exist

OP_COMMIT_FILE_RENAME, () ->
operations.renameFile(source, dest));
}
return new CommitOutcome();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: CommitOutcome is empty only. Why not just void?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leave open for others

* @return status or null
* @throws IOException IO Failure.
*/
protected Boolean delete(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is just refactoring and delete is called from multiple Stages.

* @param <IN> Type of arguments to the stage.
* @param <OUT> Type of result.
*/
public abstract class AbstractJobCommitStage<IN, OUT>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with name change as there are methods here which is getting called from multiple stages.

trackDurationOfInvocation(getIOStatistics(), OP_LOAD_ALL_MANIFESTS, () ->
TaskPool.foreach(manifestFiles)
.executeWith(getIOProcessors())
.stopOnFailure()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think any exception in any thred will stop the ececution and will be thrown be from the method and finally a stage failure is reported.

// list the directory. This may block until the listing is complete,
// or, if the FS does incremental or asynchronous fetching, until the
// first page of results is ready.
final RemoteIterator<FileStatus> listing = listStatusIterator(srcDir);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is async paged list request and non recursive.

Copy link
Contributor

@mehakmeet mehakmeet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Some minor comments ran the whole Azure test suite with the new tests. took 2.5hr to complete, maybe I have to tune down the Configs in ITestAbfsTerasort for better performance?

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  02:39 h
[INFO] Finished at: 2022-03-16T13:50:48+05:30
[INFO] ------------------------------------------------------------------------

return result.getStatusCode() == HttpURLConnection.HTTP_OK
&& sourceEtag.equals(extractEtagHeader(result));
} catch (AzureBlobFileSystemException ignored) {
// GetFileStatus on the destination failed, the rename did not take place
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add Debug logs here for this scenario?

/**
* Prepare the test configuration.
* @param contractTestBinding test binding
* @return an extraced and patched configuration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: "extracted"

* the HDFS cluster binding is implicitly propagated to YARN.
* If one is not requested, the local filesystem is used as the cluster FS.
* @param conf configuration to start with.
* @param useHDFS should an HDFS cluster be instantiated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a param

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aah, relic of the s3a code

}


protected Job createJob(Configuration jobConf) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never use this method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cut

@steveloughran
Copy link
Contributor Author

@mehakmeet thanks for your comments. make sure you aren't VPN'd to a vpn which redirects all azure/aws/gcs io, otherwise all your requests go round the world. i have hit this on GCS.

renaming base stage class JobOrTaskStage, fix javadocs

Change-Id: Icefbdfb81ca5b6f77640408d5ff395265444972a
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement fs/azure changes related to azure; submitter must declare test endpoint MapReduce

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants