1 /*
2 * Copyright (C) 2010, Nokia <ivan.frade@nokia.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
18 */
19
20 #include "config.h"
21
22 #include <string.h>
23 #include <unistd.h>
24 #include <errno.h>
25
26 #include <gio/gunixfdlist.h>
27 #include <gio/gunixinputstream.h>
28
29 #include "tracker-extract-client.h"
30
31 /* Size of buffers used when sending data over a pipe, using DBus FD passing */
32 #define DBUS_PIPE_BUFFER_SIZE 65536
33
34 #define DBUS_SERVICE_EXTRACT "org.freedesktop.Tracker1.Extract"
35 #define DBUS_PATH_EXTRACT "/org/freedesktop/Tracker1/Extract"
36 #define DBUS_INTERFACE_EXTRACT "org.freedesktop.Tracker1.Extract"
37
38 static GDBusConnection *connection = NULL;
39
40 typedef void (* SendAndSpliceCallback) (void *buffer,
41 gssize buffer_size,
42 GError *error, /* Don't free */
43 gpointer user_data);
44
45 typedef struct {
46 GInputStream *unix_input_stream;
47 GInputStream *buffered_input_stream;
48 GOutputStream *output_stream;
49 SendAndSpliceCallback callback;
50 GCancellable *cancellable;
51 gpointer data;
52 gboolean splice_finished;
53 gboolean dbus_finished;
54 GError *error;
55 } SendAndSpliceData;
56
57 typedef struct {
58 TrackerExtractInfo *info;
59 GSimpleAsyncResult *res;
60 } MetadataCallData;
61
62 static SendAndSpliceData *
63 send_and_splice_data_new (GInputStream *unix_input_stream,
64 GInputStream *buffered_input_stream,
65 GOutputStream *output_stream,
66 GCancellable *cancellable,
67 SendAndSpliceCallback callback,
68 gpointer user_data)
69 {
70 SendAndSpliceData *data;
71
72 data = g_slice_new0 (SendAndSpliceData);
73 data->unix_input_stream = unix_input_stream;
74 data->buffered_input_stream = buffered_input_stream;
75 data->output_stream = output_stream;
76
77 if (cancellable) {
78 data->cancellable = g_object_ref (cancellable);
79 } else {
80 data->cancellable = g_cancellable_new ();
81 }
82
83 data->callback = callback;
84 data->data = user_data;
85
86 return data;
87 }
88
89 static void
90 send_and_splice_data_free (SendAndSpliceData *data)
91 {
92 if (data->cancellable) {
93 g_object_unref (data->cancellable);
94 }
95
96 g_output_stream_close (data->output_stream, NULL, NULL);
97 g_input_stream_close (data->buffered_input_stream, NULL, NULL);
98
99 g_object_unref (data->unix_input_stream);
100 g_object_unref (data->buffered_input_stream);
101 g_object_unref (data->output_stream);
102
103 if (data->error) {
104 g_error_free (data->error);
105 }
106
107 g_slice_free (SendAndSpliceData, data);
108 }
109
110 static MetadataCallData *
111 metadata_call_data_new (TrackerExtractInfo *info,
112 GSimpleAsyncResult *res)
113 {
114 MetadataCallData *data;
115
116 data = g_slice_new (MetadataCallData);
117 data->res = g_object_ref (res);
118 data->info = tracker_extract_info_ref (info);
119
120 return data;
121 }
122
123 static void
124 metadata_call_data_free (MetadataCallData *data)
125 {
126 tracker_extract_info_unref (data->info);
127 g_object_unref (data->res);
128 g_slice_free (MetadataCallData, data);
129 }
130
131 static void
132 dbus_send_and_splice_async_finish (SendAndSpliceData *data)
133 {
134 if (!data->error) {
135 (* data->callback) (g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (data->output_stream)),
136 g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (data->output_stream)),
137 NULL,
138 data->data);
139 } else {
140 (* data->callback) (NULL, -1, data->error, data->data);
141 }
142
143 send_and_splice_data_free (data);
144 }
145
146 static void
147 send_and_splice_splice_callback (GObject *source,
148 GAsyncResult *result,
149 gpointer user_data)
150 {
151 SendAndSpliceData *data = user_data;
152 GError *error = NULL;
153
154 g_output_stream_splice_finish (G_OUTPUT_STREAM (source), result, &error);
155
156 if (error) {
157 if (!data->error) {
158 data->error = error;
159 } else {
160 g_error_free (error);
161 }
162
163 /* Ensure the other operation is cancelled */
164 if (!data->dbus_finished) {
165 g_cancellable_cancel (data->cancellable);
166 }
167 }
168
169 data->splice_finished = TRUE;
170
171 if (data->dbus_finished) {
172 dbus_send_and_splice_async_finish (data);
173 }
174 }
175
176 static void
177 send_and_splice_dbus_callback (GObject *source,
178 GAsyncResult *result,
179 gpointer user_data)
180 {
181 SendAndSpliceData *data = user_data;
182 GDBusMessage *reply;
183 GError *error = NULL;
184
185 reply = g_dbus_connection_send_message_with_reply_finish (G_DBUS_CONNECTION (source),
186 result, &error);
187
188 if (reply) {
189 if (g_dbus_message_get_message_type (reply) == G_DBUS_MESSAGE_TYPE_ERROR) {
190 g_dbus_message_to_gerror (reply, &error);
191 }
192
193 g_object_unref (reply);
194 }
195
196 if (error) {
197 if (!data->error) {
198 data->error = error;
199 } else {
200 g_error_free (error);
201 }
202
203 /* Ensure the other operation is cancelled */
204 if (!data->splice_finished) {
205 g_cancellable_cancel (data->cancellable);
206 }
207 }
208
209 data->dbus_finished = TRUE;
210
211 if (data->splice_finished) {
212 dbus_send_and_splice_async_finish (data);
213 }
214 }
215
216 static void
217 dbus_send_and_splice_async (GDBusConnection *connection,
218 GDBusMessage *message,
219 int fd,
220 GCancellable *cancellable,
221 SendAndSpliceCallback callback,
222 gpointer user_data)
223 {
224 SendAndSpliceData *data;
225 GInputStream *unix_input_stream;
226 GInputStream *buffered_input_stream;
227 GOutputStream *output_stream;
228
229 unix_input_stream = g_unix_input_stream_new (fd, TRUE);
230 buffered_input_stream = g_buffered_input_stream_new_sized (unix_input_stream,
231 DBUS_PIPE_BUFFER_SIZE);
232 output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
233
234 data = send_and_splice_data_new (unix_input_stream,
235 buffered_input_stream,
236 output_stream,
237 cancellable,
238 callback,
239 user_data);
240
241 g_dbus_connection_send_message_with_reply (connection,
242 message,
243 G_DBUS_SEND_MESSAGE_FLAGS_NONE,
244 -1,
245 NULL,
246 cancellable,
247 send_and_splice_dbus_callback,
248 data);
249
250 g_output_stream_splice_async (data->output_stream,
251 data->buffered_input_stream,
252 G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
253 G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
254 0,
255 cancellable,
256 send_and_splice_splice_callback,
257 data);
258 }
259
260 static inline gchar *
261 get_metadata_fast_read (GDataInputStream *data_input_stream,
262 gsize *remaining,
263 GError *error)
264 {
265 gchar *output;
266 gsize len_read;
267
268 if (error) {
269 return NULL;
270 }
271
272 g_return_val_if_fail (*remaining > 0, NULL);
273
274 /* Read data */
275 output = g_data_input_stream_read_upto (data_input_stream, "\0", 1, &len_read, NULL, &error);
276
277 if (error) {
278 g_free (output);
279 return NULL;
280 }
281
282 *remaining -= len_read;
283
284 if (*remaining <= 0) {
285 g_warning ("Expected remaining bytes to be > 0 when it wasn't after g_data_input_stream_read_upto() call");
286 g_free (output);
287 return NULL;
288 }
289
290 /* Read NUL terminating byte.
291 *
292 * The g_data_input_stream_read_upto() function doesn't
293 * consume the bytes we read up to according to the
294 * documentation unlike the _until() variant which is now
295 * deprecated anyway.
296 */
297 g_data_input_stream_read_byte (data_input_stream, NULL, &error);
298
299 if (error) {
300 g_free (output);
301 return NULL;
302 }
303
304 *remaining -= 1;
305
306 return output;
307 }
308
309 static void
310 get_metadata_fast_cb (void *buffer,
311 gssize buffer_size,
312 GError *error,
313 gpointer user_data)
314 {
315 MetadataCallData *data;
316
317 data = user_data;
318
319 if (G_UNLIKELY (error)) {
320 g_simple_async_result_set_from_error (data->res, error);
321 } else {
322 GInputStream *input_stream;
323 GDataInputStream *data_input_stream;
324 gchar *preupdate, *postupdate, *sparql, *where;
325 TrackerSparqlBuilder *builder;
326 gssize remaining;
327
328 /* So the structure is like this:
329 *
330 * [buffer,'\0'][buffer,'\0'][...]
331 *
332 * We avoid strlen() using
333 * g_data_input_stream_read_upto() and the
334 * NUL-terminating byte given strlen() has a size_t
335 * limitation and costs us time evaluating string
336 * lengths.
337 */
338 preupdate = postupdate = sparql = where = NULL;
339 remaining = buffer_size;
340
341 input_stream = g_memory_input_stream_new_from_data (buffer, buffer_size, NULL);
342 data_input_stream = g_data_input_stream_new (input_stream);
343 g_data_input_stream_set_byte_order (G_DATA_INPUT_STREAM (data_input_stream),
344 G_DATA_STREAM_BYTE_ORDER_HOST_ENDIAN);
345
346 preupdate = get_metadata_fast_read (data_input_stream, &remaining, error);
pointer targets in passing argument 2 of 'get_metadata_fast_read' differ in signedness
(emitted by gcc)
347 postupdate = get_metadata_fast_read (data_input_stream, &remaining, error);
348 sparql = get_metadata_fast_read (data_input_stream, &remaining, error);
349 where = get_metadata_fast_read (data_input_stream, &remaining, error);
350
351 g_object_unref (data_input_stream);
352 g_object_unref (input_stream);
353
354 if (where) {
355 tracker_extract_info_set_where_clause (data->info, where);
356 g_free (where);
357 }
358
359 if (preupdate) {
360 builder = tracker_extract_info_get_preupdate_builder (data->info);
361 tracker_sparql_builder_prepend (builder, preupdate);
362 g_free (preupdate);
363 }
364
365 if (postupdate) {
366 builder = tracker_extract_info_get_postupdate_builder (data->info);
367 tracker_sparql_builder_prepend (builder, postupdate);
368 g_free (postupdate);
369 }
370
371 if (sparql) {
372 builder = tracker_extract_info_get_metadata_builder (data->info);
373 tracker_sparql_builder_prepend (builder, sparql);
374 g_free (sparql);
375 }
376
377 g_simple_async_result_set_op_res_gpointer (data->res,
378 tracker_extract_info_ref (data->info),
379 (GDestroyNotify) tracker_extract_info_unref);
380 }
381
382 g_simple_async_result_complete_in_idle (data->res);
383 metadata_call_data_free (data);
384 }
385
386 static void
387 get_metadata_fast_async (GDBusConnection *connection,
388 GFile *file,
389 const gchar *mime_type,
390 const gchar *graph,
391 GCancellable *cancellable,
392 GSimpleAsyncResult *res)
393 {
394 MetadataCallData *data;
395 TrackerExtractInfo *info;
396 GDBusMessage *message;
397 GUnixFDList *fd_list;
398 int pipefd[2], fd_index;
399 GError *error = NULL;
400 gchar *uri;
401
402 if (pipe (pipefd) < 0) {
403 gint err = errno;
404
405 g_critical ("Coudln't open pipe");
406 g_simple_async_result_set_error (res,
407 G_IO_ERROR,
408 g_io_error_from_errno (err),
409 "Could not open pipe to extractor");
410 g_simple_async_result_complete_in_idle (res);
411 return;
412 }
413
414 fd_list = g_unix_fd_list_new ();
415
416 if ((fd_index = g_unix_fd_list_append (fd_list, pipefd[1], &error)) == -1) {
417 g_simple_async_result_set_from_error (res, error);
418 g_simple_async_result_complete_in_idle (res);
419
420 g_object_unref (fd_list);
421 g_error_free (error);
422
423 /* FIXME: Close pipes? */
424
425 return;
426 }
427
428 message = g_dbus_message_new_method_call (DBUS_SERVICE_EXTRACT,
429 DBUS_PATH_EXTRACT,
430 DBUS_INTERFACE_EXTRACT,
431 "GetMetadataFast");
432
433 uri = g_file_get_uri (file);
434
435 g_dbus_message_set_body (message,
436 g_variant_new ("(sssh)",
437 uri,
438 mime_type,
439 graph,
440 fd_index));
441 g_dbus_message_set_unix_fd_list (message, fd_list);
442
443 /* We need to close the fd as g_unix_fd_list_append duplicates the fd */
444
445 g_object_unref (fd_list);
446 close (pipefd[1]);
447 g_free (uri);
448
449 info = tracker_extract_info_new (file, mime_type, graph);
450 data = metadata_call_data_new (info, res);
451
452 dbus_send_and_splice_async (connection,
453 message,
454 pipefd[0],
455 cancellable,
456 get_metadata_fast_cb,
457 data);
458 g_object_unref (message);
459 tracker_extract_info_unref (info);
460 }
461
462 /**
463 * tracker_extract_client_get_metadata:
464 * @file: a #GFile
465 * @mime_type: mimetype of @file
466 * @graph: graph that should be used for the generated insert clauses, or %NULL
467 * @cancellable: (allow-none): cancellable for the async operation, or %NULL
468 * @callback: (scope async): callback to call when the request is satisfied.
469 * @user_data: (closure): data for the callback function
470 *
471 * Asynchronously requests metadata for @file, this request is sent to the
472 * tracker-extract daemon.
473 *
474 * When the request is finished, @callback will be executed. You can then
475 * call tracker_extract_client_get_metadata_finish() to get the result of
476 * the operation.
477 *
478 * Since: 0.12
479 **/
480 void
481 tracker_extract_client_get_metadata (GFile *file,
482 const gchar *mime_type,
483 const gchar *graph,
484 GCancellable *cancellable,
485 GAsyncReadyCallback callback,
486 gpointer user_data)
487 {
488 GSimpleAsyncResult *res;
489 GError *error = NULL;
490
491 g_return_if_fail (G_IS_FILE (file));
492 g_return_if_fail (mime_type != NULL);
493 g_return_if_fail (!cancellable || G_IS_CANCELLABLE (cancellable));
494 g_return_if_fail (callback != NULL);
495
496 if (G_UNLIKELY (!connection)) {
497 connection = g_bus_get_sync (G_BUS_TYPE_SESSION, cancellable, &error);
498
499 if (error) {
500 g_simple_async_report_gerror_in_idle (G_OBJECT (file), callback, user_data, error);
501 g_error_free (error);
502 return;
503 }
504 }
505
506 res = g_simple_async_result_new (G_OBJECT (file), callback, user_data, NULL);
507 g_simple_async_result_set_handle_cancellation (res, TRUE);
508
509 get_metadata_fast_async (connection, file, mime_type, graph,
510 cancellable, res);
511 g_object_unref (res);
512 }
513
514 /**
515 * tracker_extract_client_get_metadata_finish:
516 * @file: a #GFile
517 * @res: a #GAsyncResult
518 * @error: return location for error, or %NULL to ignore.
519 *
520 * Finishes an asynchronous metadata request.
521 *
522 * Returns: (transfer full): the #TrackerExtractInfo holding the result.
523 *
524 * Since: 0.12
525 **/
526 TrackerExtractInfo *
527 tracker_extract_client_get_metadata_finish (GFile *file,
528 GAsyncResult *res,
529 GError **error)
530 {
531 g_return_val_if_fail (G_IS_FILE (file), NULL);
532 g_return_val_if_fail (G_IS_ASYNC_RESULT (res), NULL);
533 g_return_val_if_fail (!error || !*error, NULL);
534
535 if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error)) {
536 return NULL;
537 }
538
539 return g_simple_async_result_get_op_res_gpointer (G_SIMPLE_ASYNC_RESULT (res));
540 }
541
542 /**
543 * tracker_extract_client_cancel_for_prefix:
544 * @prefix: a #GFile
545 *
546 * Cancels any ongoing extraction task for (or within) @prefix.
547 *
548 * Since: 0.12
549 **/
550 void
551 tracker_extract_client_cancel_for_prefix (GFile *prefix)
552 {
553 GDBusMessage *message;
554 gchar *uris[2];
555
556 if (G_UNLIKELY (!connection)) {
557 GError *error = NULL;
558
559 connection = g_bus_get_sync (G_BUS_TYPE_SESSION, NULL, &error);
560 if (error) {
561 g_warning ("Couldn't get session bus, cannot cancel extractor tasks: '%s'",
562 error->message);
563 g_error_free (error);
564 return;
565 }
566 }
567
568 uris[0] = g_file_get_uri (prefix);
569 uris[1] = NULL;
570
571 message = g_dbus_message_new_method_call (DBUS_SERVICE_EXTRACT,
572 DBUS_PATH_EXTRACT,
573 DBUS_INTERFACE_EXTRACT,
574 "CancelTasks");
575
576 g_dbus_message_set_body (message, g_variant_new ("(^as)", uris));
577 g_dbus_connection_send_message (connection, message,
578 G_DBUS_SEND_MESSAGE_FLAGS_NONE,
579 NULL, NULL);
580
581 g_free (uris[0]);
582 g_object_unref (message);
583 }