Skip to content

Commit 9cc7cf6

Browse files
kevemantensorflower-gardener
authored andcommitted
Implement a file factory mechanism to handle network file systems.
- Env dispatches to a FileSystem interface - FileSystemFactory is used to look up the correct FileSystem implementation based on the prefix of the filename - Provide a registration mechanism to register different factories Change: 119236345
1 parent 1bcb12b commit 9cc7cf6

14 files changed

Lines changed: 482 additions & 1137 deletions

tensorflow/core/kernels/immutable_constant_op_test.cc

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -54,17 +54,18 @@ class TestReadOnlyMemoryRegion : public ReadOnlyMemoryRegion {
5454
uint64 length_;
5555
};
5656

57-
// A mock file system and environment class that creates ReadOnlyMemoryRegion
58-
// from allocated memory.
59-
class TestFileSystem : public NullFileSystem {
57+
// A mock environment class that creates ReadOnlyMemoryRegion from allocated
58+
// memory.
59+
class TestEnvironment : public EnvWrapper {
6060
public:
61-
~TestFileSystem() override = default;
61+
explicit TestEnvironment(Env* env) : EnvWrapper(env) {}
62+
~TestEnvironment() override = default;
6263
Status NewReadOnlyMemoryRegionFromFile(
6364
const string& fname, ReadOnlyMemoryRegion** result) override {
6465
float val = 0;
6566
// For the tests create in-memory regions with float values equal to the
6667
// first letter of the region name.
67-
switch (GetNameFromURI(fname).front()) {
68+
switch (fname.front()) {
6869
case '2':
6970
val = 2.0f;
7071
break;
@@ -83,23 +84,20 @@ class TestFileSystem : public NullFileSystem {
8384
}
8485
};
8586

86-
REGISTER_FILE_SYSTEM("test", TestFileSystem);
87-
8887
struct ImmutableConstantOpTest {};
8988

9089
TEST(ImmutableConstantOpTest, Simple) {
9190
const TensorShape kTestTensorShape({4, 1});
9291
const TensorShape kTestTensorShapeT({1, 4});
9392
GraphDefBuilder b(GraphDefBuilder::kFailImmediately);
94-
Node* node1 =
95-
ops::ImmutableConst(DT_FLOAT, kTestTensorShape, "test://2", b.opts());
96-
Node* node2 =
97-
ops::ImmutableConst(DT_FLOAT, kTestTensorShapeT, "test://3", b.opts());
93+
Node* node1 = ops::ImmutableConst(DT_FLOAT, kTestTensorShape, "2", b.opts());
94+
Node* node2 = ops::ImmutableConst(DT_FLOAT, kTestTensorShapeT, "3", b.opts());
9895
Node* result = ops::MatMul(node1, node2, b.opts());
9996
GraphDef graph_def;
10097
TF_ASSERT_OK(b.ToGraphDef(&graph_def));
98+
std::unique_ptr<Env> env_ptr(new TestEnvironment(Env::Default()));
10199
SessionOptions session_options;
102-
session_options.env = Env::Default();
100+
session_options.env = env_ptr.get();
103101
session_options.config.mutable_graph_options()
104102
->mutable_optimizer_options()
105103
->set_opt_level(OptimizerOptions_Level_L0);
@@ -122,15 +120,14 @@ TEST(ImmutableConstantOpTest, ExecutionError) {
122120
const TensorShape kBadTensorShape({40, 100});
123121
const TensorShape kTestTensorShapeT({1, 4});
124122
GraphDefBuilder b(GraphDefBuilder::kFailImmediately);
125-
Node* node1 =
126-
ops::ImmutableConst(DT_FLOAT, kBadTensorShape, "test://2", b.opts());
127-
Node* node2 =
128-
ops::ImmutableConst(DT_FLOAT, kTestTensorShapeT, "test://3", b.opts());
123+
Node* node1 = ops::ImmutableConst(DT_FLOAT, kBadTensorShape, "2", b.opts());
124+
Node* node2 = ops::ImmutableConst(DT_FLOAT, kTestTensorShapeT, "3", b.opts());
129125
Node* result = ops::MatMul(node1, node2, b.opts());
130126
GraphDef graph_def;
131127
TF_ASSERT_OK(b.ToGraphDef(&graph_def));
128+
std::unique_ptr<Env> env_ptr(new TestEnvironment(Env::Default()));
132129
SessionOptions session_options;
133-
session_options.env = Env::Default();
130+
session_options.env = env_ptr.get();
134131
std::unique_ptr<Session> session(NewSession(session_options));
135132
ASSERT_TRUE(session != nullptr) << "Failed to create session";
136133
TF_ASSERT_OK(session->Create(graph_def)) << "Can't create test graph";

tensorflow/core/platform/env.cc

Lines changed: 2 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -15,100 +15,16 @@ limitations under the License.
1515

1616
#include "tensorflow/core/platform/env.h"
1717
#include "tensorflow/core/lib/core/errors.h"
18-
#include "tensorflow/core/lib/gtl/map_util.h"
1918
#include "tensorflow/core/lib/gtl/stl_util.h"
2019
#include "tensorflow/core/platform/protobuf.h"
2120

2221
namespace tensorflow {
2322

2423
Env::~Env() {}
2524

26-
Status Env::GetFileSystemForFile(const string& fname, FileSystem** result) {
27-
string scheme = GetSchemeFromURI(fname);
28-
FileSystem* file_system = GlobalFileSystemRegistry()->Lookup(scheme);
29-
if (!file_system) {
30-
return errors::Unimplemented("File system scheme ", scheme,
31-
" not implemented");
32-
}
33-
*result = file_system;
34-
return Status::OK();
35-
}
36-
37-
Status Env::NewRandomAccessFile(const string& fname,
38-
RandomAccessFile** result) {
39-
FileSystem* fs;
40-
TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
41-
return fs->NewRandomAccessFile(fname, result);
42-
}
43-
44-
Status Env::NewWritableFile(const string& fname, WritableFile** result) {
45-
FileSystem* fs;
46-
TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
47-
return fs->NewWritableFile(fname, result);
48-
}
49-
50-
Status Env::NewAppendableFile(const string& fname, WritableFile** result) {
51-
FileSystem* fs;
52-
TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
53-
return fs->NewAppendableFile(fname, result);
54-
}
55-
56-
Status Env::NewReadOnlyMemoryRegionFromFile(const string& fname,
57-
ReadOnlyMemoryRegion** result) {
58-
FileSystem* fs;
59-
TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
60-
return fs->NewReadOnlyMemoryRegionFromFile(fname, result);
61-
}
62-
63-
bool Env::FileExists(const string& fname) {
64-
FileSystem* fs;
65-
if (!GetFileSystemForFile(fname, &fs).ok()) {
66-
return false;
67-
}
68-
return fs->FileExists(fname);
69-
}
25+
RandomAccessFile::~RandomAccessFile() {}
7026

71-
Status Env::GetChildren(const string& dir, std::vector<string>* result) {
72-
FileSystem* fs;
73-
TF_RETURN_IF_ERROR(GetFileSystemForFile(dir, &fs));
74-
return fs->GetChildren(dir, result);
75-
}
76-
77-
Status Env::DeleteFile(const string& fname) {
78-
FileSystem* fs;
79-
TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
80-
return fs->DeleteFile(fname);
81-
}
82-
83-
Status Env::CreateDir(const string& dirname) {
84-
FileSystem* fs;
85-
TF_RETURN_IF_ERROR(GetFileSystemForFile(dirname, &fs));
86-
return fs->CreateDir(dirname);
87-
}
88-
89-
Status Env::DeleteDir(const string& dirname) {
90-
FileSystem* fs;
91-
TF_RETURN_IF_ERROR(GetFileSystemForFile(dirname, &fs));
92-
return fs->DeleteDir(dirname);
93-
}
94-
95-
Status Env::GetFileSize(const string& fname, uint64* file_size) {
96-
FileSystem* fs;
97-
TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
98-
return fs->GetFileSize(fname, file_size);
99-
}
100-
101-
Status Env::RenameFile(const string& src, const string& target) {
102-
FileSystem* src_fs;
103-
FileSystem* target_fs;
104-
TF_RETURN_IF_ERROR(GetFileSystemForFile(src, &src_fs));
105-
TF_RETURN_IF_ERROR(GetFileSystemForFile(target, &target_fs));
106-
if (src_fs != target_fs) {
107-
return errors::Unimplemented("Renaming ", src, " to ", target,
108-
" not implemented");
109-
}
110-
return src_fs->RenameFile(src, target);
111-
}
27+
WritableFile::~WritableFile() {}
11228

11329
Thread::~Thread() {}
11430

tensorflow/core/platform/env.h

Lines changed: 107 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,19 @@ limitations under the License.
1818

1919
#include <stdint.h>
2020
#include <string>
21-
#include <unordered_map>
2221
#include <vector>
23-
#include "tensorflow/core/lib/core/errors.h"
2422
#include "tensorflow/core/lib/core/status.h"
2523
#include "tensorflow/core/lib/core/stringpiece.h"
26-
#include "tensorflow/core/platform/file_system.h"
2724
#include "tensorflow/core/platform/macros.h"
28-
#include "tensorflow/core/platform/mutex.h"
2925
#include "tensorflow/core/platform/protobuf.h"
3026
#include "tensorflow/core/platform/types.h"
3127

3228
namespace tensorflow {
3329

30+
class RandomAccessFile;
31+
class ReadOnlyMemoryRegion;
3432
class Thread;
33+
class WritableFile;
3534
struct ThreadOptions;
3635

3736
/// \brief An interface used by the tensorflow implementation to
@@ -56,11 +55,6 @@ class Env {
5655
/// The result of Default() belongs to this library and must never be deleted.
5756
static Env* Default();
5857

59-
/// \brief Returns the FileSystem object to handle operations on the file
60-
/// specified by 'fname'. The FileSystem object is used as the implementation
61-
/// for the file system related (non-virtual) functions that follow.
62-
virtual Status GetFileSystemForFile(const string& fname, FileSystem** result);
63-
6458
/// \brief Creates a brand new random access read-only file with the
6559
/// specified name.
6660

@@ -70,7 +64,8 @@ class Env {
7064
/// status.
7165
///
7266
/// The returned file may be concurrently accessed by multiple threads.
73-
Status NewRandomAccessFile(const string& fname, RandomAccessFile** result);
67+
virtual Status NewRandomAccessFile(const string& fname,
68+
RandomAccessFile** result) = 0;
7469

7570
/// \brief Creates an object that writes to a new file with the specified
7671
/// name.
@@ -81,7 +76,8 @@ class Env {
8176
/// returns non-OK.
8277
///
8378
/// The returned file will only be accessed by one thread at a time.
84-
Status NewWritableFile(const string& fname, WritableFile** result);
79+
virtual Status NewWritableFile(const string& fname,
80+
WritableFile** result) = 0;
8581

8682
/// \brief Creates an object that either appends to an existing file, or
8783
/// writes to a new file (if the file does not exist to begin with).
@@ -91,7 +87,8 @@ class Env {
9187
/// non-OK.
9288
///
9389
/// The returned file will only be accessed by one thread at a time.
94-
Status NewAppendableFile(const string& fname, WritableFile** result);
90+
virtual Status NewAppendableFile(const string& fname,
91+
WritableFile** result) = 0;
9592

9693
/// \brief Creates a readonly region of memory with the file context.
9794
///
@@ -100,33 +97,34 @@ class Env {
10097
/// the caller. On failure stores nullptr in *result and returns non-OK.
10198
///
10299
/// The returned memory region can be accessed from many threads in parallel.
103-
Status NewReadOnlyMemoryRegionFromFile(const string& fname,
104-
ReadOnlyMemoryRegion** result);
100+
virtual Status NewReadOnlyMemoryRegionFromFile(
101+
const string& fname, ReadOnlyMemoryRegion** result) = 0;
105102

106103
/// Returns true iff the named file exists.
107-
bool FileExists(const string& fname);
104+
virtual bool FileExists(const string& fname) = 0;
108105

109106
/// \brief Stores in *result the names of the children of the specified
110107
/// directory. The names are relative to "dir".
111108
///
112109
/// Original contents of *results are dropped.
113-
Status GetChildren(const string& dir, std::vector<string>* result);
110+
virtual Status GetChildren(const string& dir,
111+
std::vector<string>* result) = 0;
114112

115113
/// Deletes the named file.
116-
Status DeleteFile(const string& fname);
114+
virtual Status DeleteFile(const string& fname) = 0;
117115

118116
/// Creates the specified directory.
119-
Status CreateDir(const string& dirname);
117+
virtual Status CreateDir(const string& dirname) = 0;
120118

121119
/// Deletes the specified directory.
122-
Status DeleteDir(const string& dirname);
120+
virtual Status DeleteDir(const string& dirname) = 0;
123121

124122
/// Stores the size of `fname` in `*file_size`.
125-
Status GetFileSize(const string& fname, uint64* file_size);
123+
virtual Status GetFileSize(const string& fname, uint64* file_size) = 0;
126124

127125
/// \brief Renames file src to target. If target already exists, it will be
128126
/// replaced.
129-
Status RenameFile(const string& src, const string& target);
127+
virtual Status RenameFile(const string& src, const string& target) = 0;
130128

131129
// TODO(jeff,sanjay): Add back thread/thread-pool support if needed.
132130
// TODO(jeff,sanjay): if needed, tighten spec so relative to epoch, or
@@ -186,6 +184,68 @@ class Env {
186184
void operator=(const Env&);
187185
};
188186

187+
/// A file abstraction for randomly reading the contents of a file.
188+
class RandomAccessFile {
189+
public:
190+
RandomAccessFile() {}
191+
virtual ~RandomAccessFile();
192+
193+
/// \brief Reads up to `n` bytes from the file starting at `offset`.
194+
///
195+
/// `scratch[0..n-1]` may be written by this routine. Sets `*result`
196+
/// to the data that was read (including if fewer than `n` bytes were
197+
/// successfully read). May set `*result` to point at data in
198+
/// `scratch[0..n-1]`, so `scratch[0..n-1]` must be live when
199+
/// `*result` is used.
200+
///
201+
/// On OK returned status: `n` bytes have been stored in `*result`.
202+
/// On non-OK returned status: `[0..n]` bytes have been stored in `*result`.
203+
///
204+
/// Returns `OUT_OF_RANGE` if fewer than n bytes were stored in `*result`
205+
/// because of EOF.
206+
///
207+
/// Safe for concurrent use by multiple threads.
208+
virtual Status Read(uint64 offset, size_t n, StringPiece* result,
209+
char* scratch) const = 0;
210+
211+
private:
212+
/// No copying allowed
213+
RandomAccessFile(const RandomAccessFile&);
214+
void operator=(const RandomAccessFile&);
215+
};
216+
217+
/// \brief A file abstraction for sequential writing.
218+
///
219+
/// The implementation must provide buffering since callers may append
220+
/// small fragments at a time to the file.
221+
class WritableFile {
222+
public:
223+
WritableFile() {}
224+
virtual ~WritableFile();
225+
226+
virtual Status Append(const StringPiece& data) = 0;
227+
virtual Status Close() = 0;
228+
virtual Status Flush() = 0;
229+
virtual Status Sync() = 0;
230+
231+
private:
232+
/// No copying allowed
233+
WritableFile(const WritableFile&);
234+
void operator=(const WritableFile&);
235+
};
236+
237+
/// \brief A readonly memmapped file abstraction.
238+
///
239+
/// The implementation must guarantee that all memory is accessable when the
240+
/// object exists, independently from the Env that created it.
241+
class ReadOnlyMemoryRegion {
242+
public:
243+
ReadOnlyMemoryRegion() {}
244+
virtual ~ReadOnlyMemoryRegion() = default;
245+
virtual const void* data() = 0;
246+
virtual uint64 length() = 0;
247+
};
248+
189249
/// \brief An implementation of Env that forwards all calls to another Env.
190250
///
191251
/// May be useful to clients who wish to override just part of the
@@ -199,11 +259,33 @@ class EnvWrapper : public Env {
199259
/// Returns the target to which this Env forwards all calls
200260
Env* target() const { return target_; }
201261

202-
Status GetFileSystemForFile(const string& fname,
203-
FileSystem** result) override {
204-
return target_->GetFileSystemForFile(fname, result);
262+
// The following text is boilerplate that forwards all methods to target()
263+
Status NewRandomAccessFile(const string& f, RandomAccessFile** r) override {
264+
return target_->NewRandomAccessFile(f, r);
265+
}
266+
Status NewWritableFile(const string& f, WritableFile** r) override {
267+
return target_->NewWritableFile(f, r);
268+
}
269+
Status NewAppendableFile(const string& f, WritableFile** r) override {
270+
return target_->NewAppendableFile(f, r);
271+
}
272+
Status NewReadOnlyMemoryRegionFromFile(
273+
const string& fname, ReadOnlyMemoryRegion** result) override {
274+
return target_->NewReadOnlyMemoryRegionFromFile(fname, result);
275+
}
276+
bool FileExists(const string& f) override { return target_->FileExists(f); }
277+
Status GetChildren(const string& dir, std::vector<string>* r) override {
278+
return target_->GetChildren(dir, r);
279+
}
280+
Status DeleteFile(const string& f) override { return target_->DeleteFile(f); }
281+
Status CreateDir(const string& d) override { return target_->CreateDir(d); }
282+
Status DeleteDir(const string& d) override { return target_->DeleteDir(d); }
283+
Status GetFileSize(const string& f, uint64* s) override {
284+
return target_->GetFileSize(f, s);
285+
}
286+
Status RenameFile(const string& s, const string& t) override {
287+
return target_->RenameFile(s, t);
205288
}
206-
207289
uint64 NowMicros() override { return target_->NowMicros(); }
208290
void SleepForMicroseconds(int micros) override {
209291
target_->SleepForMicroseconds(micros);

0 commit comments

Comments
 (0)