aboutsummaryrefslogtreecommitdiff
path: root/sshfs.c
diff options
context:
space:
mode:
Diffstat (limited to 'sshfs.c')
-rw-r--r--sshfs.c229
1 files changed, 191 insertions, 38 deletions
diff --git a/sshfs.c b/sshfs.c
index d51f661..ae88cf8 100644
--- a/sshfs.c
+++ b/sshfs.c
@@ -88,6 +88,7 @@ static int infd;
static int outfd;
static int debug = 0;
static int sync_write = 0;
+static int sync_read = 0;
static char *base_path;
struct buffer {
@@ -116,19 +117,21 @@ struct request {
struct list_head list;
};
-struct openfile {
- unsigned int read_ctr;
- unsigned int write_ctr;
- int rw;
- struct buffer read_handle;
- struct buffer write_handle;
+struct read_chunk {
+ sem_t ready;
+ off_t offset;
+ size_t size;
+ struct buffer data;
+ int refs;
+ int res;
};
struct sshfs_file {
struct buffer handle;
- struct list_head write_req_list;
- pthread_cond_t write_req_finished;
+ struct list_head write_reqs;
+ pthread_cond_t write_finished;
int write_error;
+ struct read_chunk *readahead;
};
static GHashTable *reqtab;
@@ -182,14 +185,16 @@ static struct opt ssh_opts[] = {
enum {
SOPT_DIRECTPORT,
SOPT_SSHCMD,
- SOPT_SYNC,
+ SOPT_SYNC_WRITE,
+ SOPT_SYNC_READ,
SOPT_LAST /* Last entry in this list! */
};
static struct opt sshfs_opts[] = {
[SOPT_DIRECTPORT] = { .optname = "directport" },
[SOPT_SSHCMD] = { .optname = "ssh_command" },
- [SOPT_SYNC] = { .optname = "sshfs_sync" },
+ [SOPT_SYNC_WRITE] = { .optname = "sshfs_sync" },
+ [SOPT_SYNC_READ] = { .optname = "no_readahead" },
[SOPT_LAST] = { .optname = NULL }
};
@@ -692,6 +697,29 @@ static int sftp_read(uint8_t *type, struct buffer *buf)
return res;
}
+static void request_free(struct request *req)
+{
+ buf_free(&req->reply);
+ sem_destroy(&req->ready);
+ g_free(req);
+}
+
+static void chunk_free(struct read_chunk *chunk)
+{
+ buf_free(&chunk->data);
+ sem_destroy(&chunk->ready);
+ g_free(chunk);
+}
+
+static void chunk_put(struct read_chunk *chunk)
+{
+ if (chunk) {
+ chunk->refs--;
+ if (!chunk->refs)
+ chunk_free(chunk);
+ }
+}
+
static void *process_requests(void *_data)
{
(void) _data;
@@ -728,14 +756,12 @@ static void *process_requests(void *_data)
req->reply = buf;
req->reply_type = type;
req->replied = 1;
- if (req->want_reply) {
+ if (req->want_reply)
sem_post(&req->ready);
- } else {
+ else {
if (req->end_func)
req->end_func(req);
- buf_free(&req->reply);
- sem_destroy(&req->ready);
- free(req);
+ request_free(req);
}
} else
buf_free(&buf);
@@ -769,7 +795,7 @@ static int sftp_request_common(uint8_t type, const struct buffer *buf,
int err;
struct buffer buf2;
uint32_t id = sftp_get_id();
- struct request *req = (struct request *) malloc(sizeof(struct request));
+ struct request *req = g_new0(struct request, 1);
buf_init(&buf2, buf->len + 4);
buf_add_uint32(&buf2, id);
@@ -845,9 +871,7 @@ static int sftp_request_common(uint8_t type, const struct buffer *buf,
if (end_func)
end_func(req);
buf_free(&buf2);
- buf_free(&req->reply);
- sem_destroy(&req->ready);
- free(req);
+ request_free(req);
return err;
}
@@ -952,6 +976,7 @@ static int sshfs_mkdir(const char *path, mode_t mode)
buf_free(&buf);
return err;
}
+
static int sshfs_mknod(const char *path, mode_t mode, dev_t rdev)
{
int err;
@@ -1098,8 +1123,8 @@ static int sshfs_open(const char *path, struct fuse_file_info *fi)
return -EINVAL;
sf = g_new0(struct sshfs_file, 1);
- list_init(&sf->write_req_list);
- pthread_cond_init(&sf->write_req_finished, NULL);
+ list_init(&sf->write_reqs);
+ pthread_cond_init(&sf->write_finished, NULL);
buf_init(&buf, 0);
buf_add_path(&buf, path);
buf_add_uint32(&buf, pflags);
@@ -1118,7 +1143,7 @@ static int sshfs_flush(const char *path, struct fuse_file_info *fi)
{
int err;
struct sshfs_file *sf = (struct sshfs_file *) fi->fh;
- struct list_head write_req_list;
+ struct list_head write_reqs;
struct list_head *curr_list;
if (sync_write)
@@ -1126,13 +1151,13 @@ static int sshfs_flush(const char *path, struct fuse_file_info *fi)
(void) path;
pthread_mutex_lock(&lock);
- if (!list_empty(&sf->write_req_list)) {
- curr_list = sf->write_req_list.prev;
- list_del(&sf->write_req_list);
- list_init(&sf->write_req_list);
- list_add(&write_req_list, curr_list);
- while (!list_empty(&write_req_list))
- pthread_cond_wait(&sf->write_req_finished, &lock);
+ if (!list_empty(&sf->write_reqs)) {
+ curr_list = sf->write_reqs.prev;
+ list_del(&sf->write_reqs);
+ list_init(&sf->write_reqs);
+ list_add(&write_reqs, curr_list);
+ while (!list_empty(&write_reqs))
+ pthread_cond_wait(&sf->write_finished, &lock);
}
err = sf->write_error;
sf->write_error = 0;
@@ -1154,19 +1179,18 @@ static int sshfs_release(const char *path, struct fuse_file_info *fi)
sshfs_flush(path, fi);
sftp_request(SSH_FXP_CLOSE, handle, 0, NULL);
buf_free(handle);
+ chunk_put(sf->readahead);
g_free(sf);
return 0;
}
-static int sshfs_read(const char *path, char *rbuf, size_t size, off_t offset,
- struct fuse_file_info *fi)
+static int sshfs_sync_read(struct sshfs_file *sf, char *rbuf, size_t size,
+ off_t offset)
{
int err;
struct buffer buf;
struct buffer data;
- struct sshfs_file *sf = (struct sshfs_file *) fi->fh;
struct buffer *handle = &sf->handle;
- (void) path;
buf_init(&buf, 0);
buf_add_buf(&buf, handle);
buf_add_uint64(&buf, offset);
@@ -1190,11 +1214,137 @@ static int sshfs_read(const char *path, char *rbuf, size_t size, off_t offset,
return err;
}
+static void sshfs_read_end(struct request *req)
+{
+ struct read_chunk *chunk = (struct read_chunk *) req->data;
+ if (req->replied) {
+ chunk->res = -EPROTO;
+
+ if (req->reply_type == SSH_FXP_STATUS) {
+ uint32_t serr;
+ if (buf_get_uint32(&req->reply, &serr) != -1) {
+ if (serr == SSH_FX_EOF)
+ chunk->res = 0;
+ }
+ } else if (req->reply_type == SSH_FXP_DATA) {
+ uint32_t retsize;
+ if (buf_get_uint32(&req->reply, &retsize) != -1) {
+ if (retsize > chunk->size)
+ fprintf(stderr, "long read\n");
+ else {
+ chunk->res = retsize;
+ chunk->data = req->reply;
+ buf_init(&req->reply, 0);
+ }
+ }
+ } else
+ fprintf(stderr, "protocol error\n");
+ } else
+ chunk->res = -EIO;
+
+ sem_post(&chunk->ready);
+ pthread_mutex_lock(&lock);
+ chunk_put(chunk);
+ pthread_mutex_unlock(&lock);
+}
+static void sshfs_read_begin(struct request *req)
+{
+ struct read_chunk *chunk = (struct read_chunk *) req->data;
+ pthread_mutex_lock(&lock);
+ chunk->refs++;
+ pthread_mutex_unlock(&lock);
+}
+
+static int sshfs_async_read(struct sshfs_file *sf, struct read_chunk *chunk)
+{
+ int err;
+ struct buffer buf;
+ struct buffer *handle = &sf->handle;
+ buf_init(&buf, 0);
+ buf_add_buf(&buf, handle);
+ buf_add_uint64(&buf, chunk->offset);
+ buf_add_uint32(&buf, chunk->size);
+ err = sftp_request_async(SSH_FXP_READ, &buf, sshfs_read_begin,
+ sshfs_read_end, chunk);
+ buf_free(&buf);
+ return err;
+}
+
+static int submit_read(struct sshfs_file *sf, size_t size, off_t offset,
+ struct read_chunk **chunkp)
+{
+ int err;
+ struct read_chunk *chunk = g_new0(struct read_chunk, 1);
+
+ sem_init(&chunk->ready, 0, 0);
+ buf_init(&chunk->data, 0);
+ chunk->offset = offset;
+ chunk->size = size;
+ chunk->refs = 1;
+ err = sshfs_async_read(sf, chunk);
+ if (!err) {
+ pthread_mutex_lock(&lock);
+ chunk_put(*chunkp);
+ *chunkp = chunk;
+ pthread_mutex_unlock(&lock);
+ } else
+ chunk_put(chunk);
+
+ return err;
+}
+
+static int wait_chunk(struct read_chunk *chunk, char *buf)
+{
+ int res;
+ sem_wait(&chunk->ready);
+ res = chunk->res;
+ if (res > 0)
+ buf_get_mem(&chunk->data, buf, res);
+ chunk_put(chunk);
+ return res;
+}
+
+static struct read_chunk *search_read_chunk(struct sshfs_file *sf, size_t size,
+ off_t offset)
+{
+ struct read_chunk *ch = sf->readahead;
+ if (ch && ch->size == size && ch->offset == offset) {
+ sf->readahead = NULL;
+ return ch;
+ } else
+ return NULL;
+}
+
+static int sshfs_read(const char *path, char *rbuf, size_t size, off_t offset,
+ struct fuse_file_info *fi)
+{
+ int res = 0;
+ struct sshfs_file *sf = (struct sshfs_file *) fi->fh;
+ if (sync_read)
+ res = sshfs_sync_read(sf, rbuf, size, offset);
+ else {
+ struct read_chunk *chunk;
+ (void) path;
+
+ pthread_mutex_lock(&lock);
+ chunk = search_read_chunk(sf, size, offset);
+ pthread_mutex_unlock(&lock);
+ if (!chunk)
+ res = submit_read(sf, size, offset, &chunk);
+ if (!res)
+ submit_read(sf, size, offset + size, &sf->readahead);
+ if (!res)
+ res = wait_chunk(chunk, rbuf);
+ }
+
+ return res;
+}
+
static void sshfs_write_begin(struct request *req)
{
struct sshfs_file *sf = (struct sshfs_file *) req->data;
pthread_mutex_lock(&lock);
- list_add(&req->list, &sf->write_req_list);
+ list_add(&req->list, &sf->write_reqs);
pthread_mutex_unlock(&lock);
}
@@ -1211,7 +1361,7 @@ static void sshfs_write_end(struct request *req)
sf->write_error = -EIO;
}
list_del(&req->list);
- pthread_cond_broadcast(&sf->write_req_finished);
+ pthread_cond_broadcast(&sf->write_finished);
pthread_mutex_unlock(&lock);
}
@@ -1317,6 +1467,7 @@ static void usage(const char *progname)
" -p PORT equivalent to '-o port=PORT'\n"
" -C equivalent to '-o compression=yes'\n"
" -o sshfs_sync synchronous writes\n"
+" -o no_readahead synchronous reads (no speculative readahead)\n"
" -o cache=YESNO enable caching {yes,no} (default: yes)\n"
" -o cache_timeout=N sets timeout for caches in seconds (default: 20)\n"
" -o cache_X_timeout=N sets timeout for {stat,dir,link} cache\n"
@@ -1369,11 +1520,11 @@ int main(int argc, char *argv[])
}
break;
}
- newargv[newargc++] = strdup(arg);
+ newargv[newargc++] = g_strdup(arg);
} else if (!host && strchr(arg, ':'))
host = g_strdup(arg);
else
- newargv[newargc++] = strdup(arg);
+ newargv[newargc++] = g_strdup(arg);
}
if (!host) {
fprintf(stderr, "missing host\n");
@@ -1390,8 +1541,10 @@ int main(int argc, char *argv[])
base_path = g_strdup(base_path);
process_options(&newargc, newargv, sshfs_opts, 1);
- if (sshfs_opts[SOPT_SYNC].present)
+ if (sshfs_opts[SOPT_SYNC_WRITE].present)
sync_write = 1;
+ if (sshfs_opts[SOPT_SYNC_READ].present)
+ sync_read = 1;
if (sshfs_opts[SOPT_DIRECTPORT].present)
res = connect_to(host, sshfs_opts[SOPT_DIRECTPORT].value);
else {