From 80748b39e4e9eb948e9b511222fcaee81473e931 Mon Sep 17 00:00:00 2001 From: Miklos Szeredi Date: Wed, 16 Feb 2005 14:00:59 +0000 Subject: fix --- ChangeLog | 5 ++ opts.c | 9 +-- sshfs.c | 229 +++++++++++++++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 201 insertions(+), 42 deletions(-) diff --git a/ChangeLog b/ChangeLog index 3fc39f5..59bb2e5 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,8 @@ +2005-02-16 Miklos Szeredi + + * Added simple readahead (big performance gain in case of + sequential read pattern). Can be disabled with '-o no_readahead' + 2005-02-14 Miklos Szeredi * Added asynchronous writeback (big performance gain) and made diff --git a/opts.c b/opts.c index ab39f36..4027d18 100644 --- a/opts.c +++ b/opts.c @@ -9,6 +9,7 @@ #include "opts.h" #include #include +#include static int process_option(char *arg, struct opt opts[], int case_sensitive) { @@ -32,8 +33,8 @@ static int process_option(char *arg, struct opt opts[], int case_sensitive) opts[i].present = 1; if (eq) { if (opts[i].value) - free(opts[i].value); - opts[i].value = strdup(eq+1); + g_free(opts[i].value); + opts[i].value = g_strdup(eq+1); } return 1; } @@ -84,7 +85,7 @@ void process_options(int *argcp, char *argv[], struct opt opts[], arg = argv[argctr]; removed = process_option_group(arg, opts, case_sensitive); if (removed) - free(argv[argctr-1]); + g_free(argv[argctr-1]); else if (argctr != newargctr) argv[newargctr++] = argv[argctr-1]; @@ -92,7 +93,7 @@ void process_options(int *argcp, char *argv[], struct opt opts[], } } if (removed) - free(arg); + g_free(arg); else { if(argctr != newargctr) argv[newargctr] = arg; diff --git a/sshfs.c b/sshfs.c index d51f661..ae88cf8 100644 --- a/sshfs.c +++ b/sshfs.c @@ -88,6 +88,7 @@ static int infd; static int outfd; static int debug = 0; static int sync_write = 0; +static int sync_read = 0; static char *base_path; struct buffer { @@ -116,19 +117,21 @@ struct request { struct list_head list; }; -struct openfile { - unsigned int read_ctr; - unsigned int write_ctr; - int rw; - struct buffer read_handle; - struct buffer write_handle; +struct read_chunk { + sem_t ready; + off_t offset; + size_t size; + struct buffer data; + int refs; + int res; }; struct sshfs_file { struct buffer handle; - struct list_head write_req_list; - pthread_cond_t write_req_finished; + struct list_head write_reqs; + pthread_cond_t write_finished; int write_error; + struct read_chunk *readahead; }; static GHashTable *reqtab; @@ -182,14 +185,16 @@ static struct opt ssh_opts[] = { enum { SOPT_DIRECTPORT, SOPT_SSHCMD, - SOPT_SYNC, + SOPT_SYNC_WRITE, + SOPT_SYNC_READ, SOPT_LAST /* Last entry in this list! */ }; static struct opt sshfs_opts[] = { [SOPT_DIRECTPORT] = { .optname = "directport" }, [SOPT_SSHCMD] = { .optname = "ssh_command" }, - [SOPT_SYNC] = { .optname = "sshfs_sync" }, + [SOPT_SYNC_WRITE] = { .optname = "sshfs_sync" }, + [SOPT_SYNC_READ] = { .optname = "no_readahead" }, [SOPT_LAST] = { .optname = NULL } }; @@ -692,6 +697,29 @@ static int sftp_read(uint8_t *type, struct buffer *buf) return res; } +static void request_free(struct request *req) +{ + buf_free(&req->reply); + sem_destroy(&req->ready); + g_free(req); +} + +static void chunk_free(struct read_chunk *chunk) +{ + buf_free(&chunk->data); + sem_destroy(&chunk->ready); + g_free(chunk); +} + +static void chunk_put(struct read_chunk *chunk) +{ + if (chunk) { + chunk->refs--; + if (!chunk->refs) + chunk_free(chunk); + } +} + static void *process_requests(void *_data) { (void) _data; @@ -728,14 +756,12 @@ static void *process_requests(void *_data) req->reply = buf; req->reply_type = type; req->replied = 1; - if (req->want_reply) { + if (req->want_reply) sem_post(&req->ready); - } else { + else { if (req->end_func) req->end_func(req); - buf_free(&req->reply); - sem_destroy(&req->ready); - free(req); + request_free(req); } } else buf_free(&buf); @@ -769,7 +795,7 @@ static int sftp_request_common(uint8_t type, const struct buffer *buf, int err; struct buffer buf2; uint32_t id = sftp_get_id(); - struct request *req = (struct request *) malloc(sizeof(struct request)); + struct request *req = g_new0(struct request, 1); buf_init(&buf2, buf->len + 4); buf_add_uint32(&buf2, id); @@ -845,9 +871,7 @@ static int sftp_request_common(uint8_t type, const struct buffer *buf, if (end_func) end_func(req); buf_free(&buf2); - buf_free(&req->reply); - sem_destroy(&req->ready); - free(req); + request_free(req); return err; } @@ -952,6 +976,7 @@ static int sshfs_mkdir(const char *path, mode_t mode) buf_free(&buf); return err; } + static int sshfs_mknod(const char *path, mode_t mode, dev_t rdev) { int err; @@ -1098,8 +1123,8 @@ static int sshfs_open(const char *path, struct fuse_file_info *fi) return -EINVAL; sf = g_new0(struct sshfs_file, 1); - list_init(&sf->write_req_list); - pthread_cond_init(&sf->write_req_finished, NULL); + list_init(&sf->write_reqs); + pthread_cond_init(&sf->write_finished, NULL); buf_init(&buf, 0); buf_add_path(&buf, path); buf_add_uint32(&buf, pflags); @@ -1118,7 +1143,7 @@ static int sshfs_flush(const char *path, struct fuse_file_info *fi) { int err; struct sshfs_file *sf = (struct sshfs_file *) fi->fh; - struct list_head write_req_list; + struct list_head write_reqs; struct list_head *curr_list; if (sync_write) @@ -1126,13 +1151,13 @@ static int sshfs_flush(const char *path, struct fuse_file_info *fi) (void) path; pthread_mutex_lock(&lock); - if (!list_empty(&sf->write_req_list)) { - curr_list = sf->write_req_list.prev; - list_del(&sf->write_req_list); - list_init(&sf->write_req_list); - list_add(&write_req_list, curr_list); - while (!list_empty(&write_req_list)) - pthread_cond_wait(&sf->write_req_finished, &lock); + if (!list_empty(&sf->write_reqs)) { + curr_list = sf->write_reqs.prev; + list_del(&sf->write_reqs); + list_init(&sf->write_reqs); + list_add(&write_reqs, curr_list); + while (!list_empty(&write_reqs)) + pthread_cond_wait(&sf->write_finished, &lock); } err = sf->write_error; sf->write_error = 0; @@ -1154,19 +1179,18 @@ static int sshfs_release(const char *path, struct fuse_file_info *fi) sshfs_flush(path, fi); sftp_request(SSH_FXP_CLOSE, handle, 0, NULL); buf_free(handle); + chunk_put(sf->readahead); g_free(sf); return 0; } -static int sshfs_read(const char *path, char *rbuf, size_t size, off_t offset, - struct fuse_file_info *fi) +static int sshfs_sync_read(struct sshfs_file *sf, char *rbuf, size_t size, + off_t offset) { int err; struct buffer buf; struct buffer data; - struct sshfs_file *sf = (struct sshfs_file *) fi->fh; struct buffer *handle = &sf->handle; - (void) path; buf_init(&buf, 0); buf_add_buf(&buf, handle); buf_add_uint64(&buf, offset); @@ -1190,11 +1214,137 @@ static int sshfs_read(const char *path, char *rbuf, size_t size, off_t offset, return err; } +static void sshfs_read_end(struct request *req) +{ + struct read_chunk *chunk = (struct read_chunk *) req->data; + if (req->replied) { + chunk->res = -EPROTO; + + if (req->reply_type == SSH_FXP_STATUS) { + uint32_t serr; + if (buf_get_uint32(&req->reply, &serr) != -1) { + if (serr == SSH_FX_EOF) + chunk->res = 0; + } + } else if (req->reply_type == SSH_FXP_DATA) { + uint32_t retsize; + if (buf_get_uint32(&req->reply, &retsize) != -1) { + if (retsize > chunk->size) + fprintf(stderr, "long read\n"); + else { + chunk->res = retsize; + chunk->data = req->reply; + buf_init(&req->reply, 0); + } + } + } else + fprintf(stderr, "protocol error\n"); + } else + chunk->res = -EIO; + + sem_post(&chunk->ready); + pthread_mutex_lock(&lock); + chunk_put(chunk); + pthread_mutex_unlock(&lock); +} +static void sshfs_read_begin(struct request *req) +{ + struct read_chunk *chunk = (struct read_chunk *) req->data; + pthread_mutex_lock(&lock); + chunk->refs++; + pthread_mutex_unlock(&lock); +} + +static int sshfs_async_read(struct sshfs_file *sf, struct read_chunk *chunk) +{ + int err; + struct buffer buf; + struct buffer *handle = &sf->handle; + buf_init(&buf, 0); + buf_add_buf(&buf, handle); + buf_add_uint64(&buf, chunk->offset); + buf_add_uint32(&buf, chunk->size); + err = sftp_request_async(SSH_FXP_READ, &buf, sshfs_read_begin, + sshfs_read_end, chunk); + buf_free(&buf); + return err; +} + +static int submit_read(struct sshfs_file *sf, size_t size, off_t offset, + struct read_chunk **chunkp) +{ + int err; + struct read_chunk *chunk = g_new0(struct read_chunk, 1); + + sem_init(&chunk->ready, 0, 0); + buf_init(&chunk->data, 0); + chunk->offset = offset; + chunk->size = size; + chunk->refs = 1; + err = sshfs_async_read(sf, chunk); + if (!err) { + pthread_mutex_lock(&lock); + chunk_put(*chunkp); + *chunkp = chunk; + pthread_mutex_unlock(&lock); + } else + chunk_put(chunk); + + return err; +} + +static int wait_chunk(struct read_chunk *chunk, char *buf) +{ + int res; + sem_wait(&chunk->ready); + res = chunk->res; + if (res > 0) + buf_get_mem(&chunk->data, buf, res); + chunk_put(chunk); + return res; +} + +static struct read_chunk *search_read_chunk(struct sshfs_file *sf, size_t size, + off_t offset) +{ + struct read_chunk *ch = sf->readahead; + if (ch && ch->size == size && ch->offset == offset) { + sf->readahead = NULL; + return ch; + } else + return NULL; +} + +static int sshfs_read(const char *path, char *rbuf, size_t size, off_t offset, + struct fuse_file_info *fi) +{ + int res = 0; + struct sshfs_file *sf = (struct sshfs_file *) fi->fh; + if (sync_read) + res = sshfs_sync_read(sf, rbuf, size, offset); + else { + struct read_chunk *chunk; + (void) path; + + pthread_mutex_lock(&lock); + chunk = search_read_chunk(sf, size, offset); + pthread_mutex_unlock(&lock); + if (!chunk) + res = submit_read(sf, size, offset, &chunk); + if (!res) + submit_read(sf, size, offset + size, &sf->readahead); + if (!res) + res = wait_chunk(chunk, rbuf); + } + + return res; +} + static void sshfs_write_begin(struct request *req) { struct sshfs_file *sf = (struct sshfs_file *) req->data; pthread_mutex_lock(&lock); - list_add(&req->list, &sf->write_req_list); + list_add(&req->list, &sf->write_reqs); pthread_mutex_unlock(&lock); } @@ -1211,7 +1361,7 @@ static void sshfs_write_end(struct request *req) sf->write_error = -EIO; } list_del(&req->list); - pthread_cond_broadcast(&sf->write_req_finished); + pthread_cond_broadcast(&sf->write_finished); pthread_mutex_unlock(&lock); } @@ -1317,6 +1467,7 @@ static void usage(const char *progname) " -p PORT equivalent to '-o port=PORT'\n" " -C equivalent to '-o compression=yes'\n" " -o sshfs_sync synchronous writes\n" +" -o no_readahead synchronous reads (no speculative readahead)\n" " -o cache=YESNO enable caching {yes,no} (default: yes)\n" " -o cache_timeout=N sets timeout for caches in seconds (default: 20)\n" " -o cache_X_timeout=N sets timeout for {stat,dir,link} cache\n" @@ -1369,11 +1520,11 @@ int main(int argc, char *argv[]) } break; } - newargv[newargc++] = strdup(arg); + newargv[newargc++] = g_strdup(arg); } else if (!host && strchr(arg, ':')) host = g_strdup(arg); else - newargv[newargc++] = strdup(arg); + newargv[newargc++] = g_strdup(arg); } if (!host) { fprintf(stderr, "missing host\n"); @@ -1390,8 +1541,10 @@ int main(int argc, char *argv[]) base_path = g_strdup(base_path); process_options(&newargc, newargv, sshfs_opts, 1); - if (sshfs_opts[SOPT_SYNC].present) + if (sshfs_opts[SOPT_SYNC_WRITE].present) sync_write = 1; + if (sshfs_opts[SOPT_SYNC_READ].present) + sync_read = 1; if (sshfs_opts[SOPT_DIRECTPORT].present) res = connect_to(host, sshfs_opts[SOPT_DIRECTPORT].value); else { -- cgit v1.2.3