[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[tyndur-devel] [PATCH 1/6] LIOv1: Readahead



Signed-off-by: Kevin Wolf <kevin@xxxxxxxxxx>
---
 src/modules/include/io_struct.h                  |    3 +
 src/modules/include/lostio.h                     |    6 +++
 src/modules/lib/lostio/client/file.c             |   40 +++++++++++++++----
 src/modules/lib/lostio/handler.c                 |   46 +++++++++++++++++++++-
 src/modules/lib/lostio/include/lostio_internal.h |    3 +
 src/modules/lib/lostio/lostio.c                  |    1 +
 6 files changed, 90 insertions(+), 9 deletions(-)

diff --git a/src/modules/include/io_struct.h b/src/modules/include/io_struct.h
index e2500a2..c4ec7d3 100644
--- a/src/modules/include/io_struct.h
+++ b/src/modules/include/io_struct.h
@@ -45,6 +45,8 @@
 #define IO_OPEN_MODE_LINK 64
 #define IO_OPEN_MODE_SYNC 128
 
+#define IO_RES_FLAG_READAHEAD 1
+
 typedef uint32_t io_resource_id_t;
 typedef uint8_t io_direntry_type_t;
 typedef struct
@@ -52,6 +54,7 @@ typedef struct
     io_resource_id_t    id;
     pid_t               pid;
     io_resource_id_t    resid;
+    int                 flags;
 
     lio_stream_t        lio2_stream;
     lio_resource_t      lio2_res;
diff --git a/src/modules/include/lostio.h b/src/modules/include/lostio.h
index 8fc4e4d..4b31fba 100644
--- a/src/modules/include/lostio.h
+++ b/src/modules/include/lostio.h
@@ -51,6 +51,8 @@
 #define LOSTIO_FLAG_SYMLINK 0x40000
 /// EOF nicht automatisch setzen wenn beim Oeffnen die Groesse 0 ist
 #define LOSTIO_FLAG_NOAUTOEOF 0x80000
+/// Readahead erlauben (d.h. ein zweites read muss dieselben Daten liefern)
+#define LOSTIO_FLAG_READAHEAD 0x100000
 
 /*#define LOSTIO_MODE_READ 0x1
 #define LOSTIO_MODE_WRITE 0x2
@@ -110,6 +112,7 @@ typedef struct
     void            (*post_open)(lostio_filehandle_t*);
 
     size_t          (*read)(lostio_filehandle_t*,void*,size_t,size_t);
+    size_t          (*readahead)(lostio_filehandle_t*,void*,size_t,size_t);
     size_t          (*write)(lostio_filehandle_t*,size_t,size_t,void*);
     int             (*seek)(lostio_filehandle_t*,uint64_t,int);
     int             (*close)(lostio_filehandle_t*);
@@ -212,6 +215,9 @@ int lio_compat_close(io_resource_t* io_res);
 ssize_t lio_compat_read(void* dest, size_t blocksize, size_t blockcount,
     io_resource_t* io_res);
 
+/** Liest aus einer Ressource, ohne den Dateizeiger zu verändern */
+ssize_t lio_compat_readahead(void* dest, size_t size, io_resource_t* io_res);
+
 /**
  * Schreibt in eine Ressource
  *
diff --git a/src/modules/lib/lostio/client/file.c b/src/modules/lib/lostio/client/file.c
index 8fe5ebe..160851b 100644
--- a/src/modules/lib/lostio/client/file.c
+++ b/src/modules/lib/lostio/client/file.c
@@ -118,6 +118,7 @@ static io_resource_t* lio2_open(const char* filename, uint8_t attr)
     *result = (io_resource_t) {
         .lio2_stream    =   s,
         .lio2_res       =   r,
+        .flags          =   IO_RES_FLAG_READAHEAD,
     };
 
 out_err:
@@ -198,8 +199,8 @@ int lio_compat_close(io_resource_t* io_res)
  *
  * @return Anzahl gelesener Bytes; 0 bei Dateiende; negativ im Fehlerfall.
  */
-ssize_t lio_compat_read(void* dest, size_t blocksize, size_t blockcount,
-    io_resource_t* io_res)
+static ssize_t lio1_read_fn(void* dest, size_t blocksize, size_t blockcount,
+    io_resource_t* io_res, char* rpc_fn)
 {
     size_t size;
 
@@ -209,17 +210,13 @@ ssize_t lio_compat_read(void* dest, size_t blocksize, size_t blockcount,
         .blockcount = blockcount,
     };
 
-    if (IS_LIO2(io_res)) {
-        return lio_read(io_res->lio2_stream, blocksize * blockcount, dest);
-    }
-
     // Wenn mehr als 256 Bytes gelesen werden sollen, wird SHM benutzt
     if ((blocksize * blockcount) > 256) {
 
         get_shm(blocksize * blockcount);
         read_request.shared_mem_id = shm_id;
 
-        response_t* resp = rpc_get_response(io_res->pid, "IO_READ ",
+        response_t* resp = rpc_get_response(io_res->pid, rpc_fn,
             sizeof(read_request), (char*) &(read_request));
         size = *((size_t*) resp->data);
 
@@ -236,7 +233,7 @@ ssize_t lio_compat_read(void* dest, size_t blocksize, size_t blockcount,
     } else {
         read_request.shared_mem_id = 0;
 
-        response_t* resp = rpc_get_response(io_res->pid, "IO_READ ",
+        response_t* resp = rpc_get_response(io_res->pid, rpc_fn,
             sizeof(read_request), (char*) &(read_request));
         size = resp->data_length;
 
@@ -258,6 +255,16 @@ ssize_t lio_compat_read(void* dest, size_t blocksize, size_t blockcount,
     return size;
 }
 
+ssize_t lio_compat_read(void* dest, size_t blocksize, size_t blockcount,
+    io_resource_t* io_res)
+{
+    if (IS_LIO2(io_res)) {
+        return lio_read(io_res->lio2_stream, blocksize * blockcount, dest);
+    }
+
+    return lio1_read_fn(dest, blocksize, blockcount, io_res, "IO_READ ");
+}
+
 /**
  * Schreibt in eine Ressource
  *
@@ -317,3 +324,20 @@ int lio_compat_eof(io_resource_t* io_res)
     return rpc_get_dword(io_res->pid, "IO_EOF  ", sizeof(io_eof_request_t),
         (char*) &eof_request);
 }
+
+/**
+ * Liest aus einer Ressource, ohne den Dateizeiger zu verändern
+ */
+ssize_t lio_compat_readahead(void* dest, size_t size, io_resource_t* io_res)
+{
+    if (IS_LIO2(io_res)) {
+        uint64_t pos = lio_seek(io_res->lio2_stream, 0, LIO_SEEK_CUR);
+        if (pos < 0) {
+            return pos;
+        }
+
+        return lio_pread(io_res->lio2_stream, pos, size, dest);
+    } else {
+        return lio1_read_fn(dest, 1, size, io_res, "IO_READA");
+    }
+}
diff --git a/src/modules/lib/lostio/handler.c b/src/modules/lib/lostio/handler.c
index b3bca65..38fac60 100644
--- a/src/modules/lib/lostio/handler.c
+++ b/src/modules/lib/lostio/handler.c
@@ -125,6 +125,7 @@ void rpc_io_open(pid_t pid, uint32_t correlation_id, size_t data_size, void* dat
                 // Symlink wieder schliessen, sonst kann er anschliessend nicht
                 // mehr geloescht werden.
                 lostio_close(pid, filehandle->id);
+                goto out;
             }
         } else {
             io_res.id = filehandle->id;
@@ -137,7 +138,11 @@ void rpc_io_open(pid_t pid, uint32_t correlation_id, size_t data_size, void* dat
             free(int_source->res);
             free(int_source);
         }
-    
+        else if ((filehandle->node->flags & LOSTIO_FLAG_READAHEAD) != 0) {
+            io_res.flags |= IO_RES_FLAG_READAHEAD;
+        }
+
+    out:
         rpc_send_response(pid, correlation_id, sizeof(io_resource_t), (char*) &io_res);
     }
     //v();
@@ -216,6 +221,45 @@ void rpc_io_read(pid_t pid, uint32_t correlation_id, size_t data_size, void* dat
     //v();
 }
 
+void rpc_io_readahead(pid_t pid, uint32_t correlation_id, size_t data_size,
+    void* data)
+{
+    io_read_request_t* read_request = (io_read_request_t*) data;
+
+    lostio_filehandle_t* filehandle = get_filehandle(pid, read_request->id);
+
+    if(filehandle == NULL) {
+        rpc_send_dword_response(pid, correlation_id, 0);
+        return;
+    }
+
+    typehandle_t* typehandle = get_typehandle(filehandle->node->type);
+
+    // Wenn kein Handler fuer read registriert ist, abbrechen
+    if ((typehandle == NULL) || (typehandle->readahead == NULL)) {
+        rpc_send_dword_response(pid, correlation_id, 0);
+        return;
+    }
+
+    // Falls kein Shared Memory uebergeben wurde, temporaeren Buffer auf dem
+    // Stack anlegen und per RPC zurueckschicken
+    if (read_request->shared_mem_id == 0) {
+        uint8_t buf[read_request->blocksize * read_request->blockcount];
+        size_t size = typehandle->readahead(filehandle, buf,
+            read_request->blocksize, read_request->blockcount);
+
+        rpc_send_response(pid, correlation_id, size, (char*) buf);
+        return;
+    }
+
+    // Ansonsten direkt in den SHM einlesen
+    void* shm_ptr = open_shared_memory(read_request->shared_mem_id);
+    size_t size = typehandle->readahead(filehandle, shm_ptr,
+            read_request->blocksize, read_request->blockcount);
+    close_shared_memory(read_request->shared_mem_id);
+
+    rpc_send_dword_response(pid, correlation_id, size);
+}
 
 /**
  * Ein Prozess hat ein fwrite auf einem File-Handle dieses Treibers ausgefuehrt
diff --git a/src/modules/lib/lostio/include/lostio_internal.h b/src/modules/lib/lostio/include/lostio_internal.h
index 9739205..4d65bad 100644
--- a/src/modules/lib/lostio/include/lostio_internal.h
+++ b/src/modules/lib/lostio/include/lostio_internal.h
@@ -54,6 +54,9 @@ void    rpc_io_close(pid_t pid, uint32_t correlation_id, size_t data_size, void*
 
 ///RPC-Handler fuer ein read
 void    rpc_io_read(pid_t pid, uint32_t correlation_id, size_t data_size, void* data);
+//
+///RPC-Handler fuer ein readahead
+void    rpc_io_readahead(pid_t pid, uint32_t correlation_id, size_t data_size, void* data);
 
 ///RPC-Handler fuer ein write
 void    rpc_io_write(pid_t pid, uint32_t correlation_id, size_t data_size, void* data);
diff --git a/src/modules/lib/lostio/lostio.c b/src/modules/lib/lostio/lostio.c
index 505f0a1..3cbc3b1 100644
--- a/src/modules/lib/lostio/lostio.c
+++ b/src/modules/lib/lostio/lostio.c
@@ -65,6 +65,7 @@ void lostio_init()
     register_message_handler("IO_OPEN ", &rpc_io_open);
     register_message_handler("IO_CLOSE", &rpc_io_close);
     register_message_handler("IO_READ ", &rpc_io_read);
+    register_message_handler("IO_READA", &rpc_io_readahead);
     register_message_handler("IO_WRITE", &rpc_io_write);
     register_message_handler("IO_SEEK ", &rpc_io_seek);
     register_message_handler("IO_EOF  ", &rpc_io_eof);
-- 
1.7.7