forked from basho/riak-java-client
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathREADME
More file actions
343 lines (217 loc) · 13.3 KB
/
README
File metadata and controls
343 lines (217 loc) · 13.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
This document describes how to use this client to interact with Riak. See the [DEVELOPERS](/jonjlee/riak-java-client/wiki/Developers) document for a technical overview of the project.
# Overview #
This Java-based Riak client uses Commons HttpClient to perform HTTP requests. It provides:
* **HttpClient**-provided functionality such as connection pooling, timeouts, and retries, which is not provided by `HttpURLConnection`.
* **Link** and **Map/Reduce** query building support.
* **HTTP response data** returned directly to the client rather than via exceptions. While this slightly couples the domain model with the underlying HTTP model, it gives the benefit of allowing the full suite of HTTP to be used (in possibly unforseen ways) without requiring modifications to the client library. In reality, clients need to and do understand that each operation in fact translates to an HTTP operation. In any case, **PlainClient** provides the more traditional interface with domain objects and checked exceptions which can be used when appropriate.
* **Stream handling** for GET requests.
* **Exceptions** are unchecked and the **RiakExceptionHandler** interface allows all exceptions to be handled in a central location. This means the client does not need to wrap each operation in try/catch blocks.
# Including in your project #
To use `riak-client` in a [Maven](http://maven.apache.org/) project, add the following dependency to `pom.xml`:
<dependency>
<groupId>com.basho.riak</groupId>
<artifactId>riak-client</artifactId>
<version>0.9.1</version>
</dependency>
To build and install from source, first install Apache Maven (http://maven.apache.org/download.html). With Maven installed, run:
mvn clean install
# Quick start #
Connect to Riak:
RiakClient riak = new RiakClient("http://localhost:8098/riak");
Build an object:
RiakObject o = new RiakObject("bucket", "key", "value");
Store it:
riak.store(o);
Retrieve it:
FetchResponse r = riak.fetch("bucket", "key");
if (r.hasObject())
o = r.getObject();
Update it:
o.setValue("foo");
riak.store(o);
Handling siblings:
if (r.hasSiblings())
Collection<RiakObject> siblings = r.getSiblings();
# Connecting #
Connect to a Riak server by specifying the base URL of the Riak HTTP interface:
RiakClient riak = new RiakClient("http://localhost:8098/riak");
HttpClient parameters can be provided using a RiakConfig object:
RiakConfig config = new RiakConfig("http://localhost:8098/riak");
config.setTimeout(2000); // 2 second connection timeout
config.setMaxConnections(50); // 50 concurrent connections
RiakClient riak = new RiakClient(config);
The HttpClient instance itself can also be given:
MultiThreadedHttpConnectionManager m = new MultiThreadedHttpConnectionManager();
m.getParams().setIntParameter(HttpConnectionManagerParams.MAX_TOTAL_CONNECTIONS, 50);
HttpClient http = new HttpClient(m);
http.getParams().setLongParameter(HttpClientParams.CONNECTION_MANAGER_TIMEOUT, 2000);
RiakConfig config = new RiakConfig("http://localhost:8098/riak");
config.setHttpClient(http);
RiakClient riak = new RiakClient(config);
As an alternative to `RiakClient`, the `PlainClient` interface can be used, which exposes a different interface that hides HTTP response information from results.
PlainClient plain = PlainClient.getClient("http://localhost:8098/riak");
PlainClient plain = PlainClient.getClient(new RiakConfig("http://localhost:8098/riak"));
# Operations #
Using RiakClient, you can manipulate Riak objects and buckets.
## Store Objects ##
First create the object type corresponding to the interface being used and fill in any metadata and value. Then, store the object and check the server response. The response contains the updated vclock, last modified date, and vtag of the object, if it was returned by the server.
RiakObject o = new RiakObject("bucket", "key", "value");
o.getUsermeta().put("custom-max-age", "60");
o.setContentType("text/plain");
StoreResponse r = riak.store(o);
if (r.isSuccess()) obj.updateMeta(r);
Or, with Plain:
plain.store(new RiakObject("a", "b"));
The request `w` and `dw` values (number of write or durable write responses required for success) can be specified using a RequestMeta object:
client.store(obj, RequestMeta.writeParams(2 /* w-value */, 2 /* dw-value */));
## Fetch Objects ##
Simply request the object by bucket and key. The response contains the requested object.
FetchResponse r = riak.fetch("bucket", "key");
if (r.isSuccess())
RiakObject o = r.getObject();
With Plain:
RiakObject o = plain.fetch("bucket", "key");
The request `r` value (number of servers responding to a read request required for success) can be specified using a RequestMeta object:
client.fetch("bucket", "key", RequestMeta.readParams(2 /* r-value */));
## Fetch Object Metadata ##
This works identically to fetching an object, except that the object's value will not necessarily be populated.
client.fetchMeta("bucket", "key", /* optional */ RequestMeta.readParams(2));
## Modify Objects ##
To ensure that no conflicts are created when modifying an object, first fetch it, then update and store it.
FetchResponse r = riak.fetch("bucket", "key");
if (r.isSuccess()) {
RiakObject o = r.getObject();
o.setValue("foo");
client.store(o);
}
With Plain:
RiakObject o = plain.fetch("bucket", "key");
o.setValue("foo");
plain.store(o);
## Delete Objects ##
Simply execute the delete method. The `dw` value can be optionally specified as before with a RequestMeta object.
client.delete("bucket", "key", RequestMeta.writeParams(null /* no w value */, 2 /* dw value */));
## Handling Conflicts and Siblings ##
The Riak HTTP interface is able to return conflicting versions of the same object, known as siblings.
// ensure the that allow_mult bucket property is set to true, or Riak will not return siblings.
FetchResponse r = riak.fetch("bucket", "key");
if (r.isSuccess() && r.hasSiblings())
Collection<RiakObject> siblings = r.getSiblings();
With Plain:
Collection<? extends RiakObject> objects = plain.fetchAll("bucket", "key");
## Streaming Objects ##
To process an object as a stream, implement a StreamHandler and use the stream() method. The input stream of the HTTP response is given to the handler.
client.stream("bucket", "key", new MyStreamHandler(), RequestMeta.readParams(1 /* r value */));
RiakClient also contains a stream() method that returns a standard RiakObject with a value stream. The user is responsible for closing the response in this case.
FetchResponse r;
try {
r = riak.stream("bucket", "key");
if (r.isSuccess()) {
InputStream valueStream = r.getObject().getValueStream();
/* process valueStream */
}
} finally {
if (r != null) {
r.close();
}
}
## Buckets Keys and Schema ##
The bucket schema and a list the keys of all the objects in the bucket can be read using the listBucket() method and written using the setBucketSchema() method. The bucket schema is presented as a JSONObject and contains per-bucket information. For example, the `allow_mult` property allows Riak to return conflicting versions of the same object. See the Riak documentation for more information.
BucketResponse r = riak.listBucket("bucket");
if (r.isSuccess()) {
BucketInfo info = r.getBucketInfo();
Collection<String> keys = info.getKeys(); // list of all object keys in this bucket
// Update the schema and put
info.setAllowMult(true);
riak.setBucketSchema("bucket", info);
}
It is also possible to stream the keys in the bucket, which sends the `?keys=stream` query parameter to Riak.
BucketResponse r;
try {
r = riak.streamBucket("bucket");
if (r.isSuccess())
for (String key : r.getBucketInfo().getKeys())
// process key
} finally {
if (r != null)
r.close();
}
With Plain:
RiakBucketInfo info = plain.listBucket("bucket");
info.getSchema().put("allow_mult", true);
plain.setBucketSchema("bucket", info);
## Links and Link Walking ##
Links can be stored with each object. A link consists of the target object's bucket and key and a tag to identify the link.
RiakObject o = new RiakObject("bucket", "key");
o.getLinks().add(new RiakLink("bucket", "target-object", "link-tag"));
Link walking is performed by calling walk() with a walk specification (see the Riak documentation and JavaDocs for RiakWalkSpec). A list of lists of objects is returned. Each list of objects represents all the objects returned in a single step of the walk.
WalkResponse r = riak.walk("bucket", "key", "bucket,_,1");
if (r.isSuccess()) {
List<? extends List<RiakObject>> steps = r.getSteps();
for (List<RiakObject> step : steps) {
for (RiakObject o : step) {
// process the object
}
}
}
Alternatively, the link walk can be built from a `RiakObject`.
RiakObject o = new RiakObject(riak, "bucket", "key");
WalkResponse r = o.walk("bucket").run();
## Map/Reduce Queries ##
Refer to the [Riak Javascript Map/Reduce documentation](http://bitbucket.org/basho/riak/src/tip/doc/js-mapreduce.org) for a detailed explanation of how map/reduce works in Riak over HTTP.
First, build a Map/Reduce query by calling `mapReduceOverBucket()` or `mapReduceOverObjects()`. Then execute it by calling `submit()`.
MapReduceResponse r = riak.mapReduceOverBucket("bucket")
.link("bucket", "tag", false)
.map(JavascriptFunction.named("Riak.mapValuesJson"), false)
.reduce(new ErlangFunction("riak_mapreduce", "reduce_sort"), true)
.submit();
if (r.isSuccess()) {
JSONArray results = r.getResults();
// process the results array
}
A Map/Reduce query is built by chaining methods calls that to corresponding Riak Map/Reduce phase types.
* `map(function, keep)`
* `reduce(function, keep)`
* `link(bucket, tag, keep)`
The `keep` flag determines whether the results from that phase is returned by Riak.
The supported `map()` and `reduce()` function types are
* `ErlangFunction`: an Erlang function specified by a module and function
* `JavascriptFunction`: a named or anonymous Javascript function
* `LinkFunction`: a link specification consisting of a bucket name and tag
For example:
// Erlang function riak_mapreduce:reduce_sort
new ErlangFunction("riak_mapreduce", "reduce_sort");
// Built-in named Javascript function Riak.mapValuesJson
JavascriptFunction.named("Riak.mapValuesJson");
// Unnamed Javascript function
JavascriptFunction.anon("function(v) { return [v.values[0].data]; }");
// Follow links to bucket "b" tagged with tag "t"
new LinkFunction("b", "t");
If `submit()` succeeds, Riak returns the query result, which is a JSON array.
# HTTP Request/Response Information #
All of the above operations can also be called with a RequestMeta object to specify extra HTTP headers and query parameters:
RequestMeta meta = new RequestMeta();
meta.putHeader("X-Custom-Header", "value");
meta.addHeader("custom-query-param", "param");
client.fetch("bucket", "key", meta);
The operations also return results that implement the HttpResponse interface, which exposes the HTTP status code, headers, body, and original HttpMethod used for the request. The entity stream, however is closed, so the stream() function should be used to stream objects.
HttpResponse r = client.fetch("bucket", "key");
Map<String, String> httpHeaders = r.getHttpHeaders();
String httpBody = r.getBody();
# Exception Handling #
RiakClient will usually throw the unchecked exceptions RiakIORuntimeException and RiakResponseRuntimeException if there is an error talking to the server or if the server returns a response that can't be parsed. However, a RiakExceptionHandler can be installed in the client to prevent them from throwing the exceptions. Instead, they are passed to the exception handler. This allows a user, for example, to consolidate processing for these exceptional cases in a single class and avoid inline try/catch blocks.
In addition, the `ClientUtils.throwChecked()` method allows the unchecked exceptions to be easily converted to checked exceptions. Of course, in this case, methods using the `RiakClient` instance must be careful to declare the exception in their signatures, since the compiler will not produce the usual warnings.
RiakClient c = new RiakClient("");
c.setExceptionHandler(new RiakExceptionHandler() {
public void handle(RiakResponseRuntimeException e) {
// Log and ignore malformed responses. The operation in progress
// will return an HttpResponse with 0 status code.
LOG.warn("Received malformed server response", e);
}
public void handle(RiakIORuntimeException e) {
// Convert to a checked exception, which should be declared in
// the calling method signature
ClientUtils.throwChecked(
new IOException("Riak connection is down", e.getCause()));
}
});