aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMiklos Szeredi <mszeredi@suse.cz>2011-11-16 14:54:37 +0100
committerMiklos Szeredi <mszeredi@suse.cz>2011-11-16 14:54:37 +0100
commit97228b866ab9a37a55244463a6b9fc3fdc6d0419 (patch)
tree3c6cff730acdf78adf835ba1ad5986e5e3410f8c
parentf3286cdf8682bff65c73663f55dc67bd316d2de8 (diff)
downloadsshfs-97228b866ab9a37a55244463a6b9fc3fdc6d0419.tar
sshfs-97228b866ab9a37a55244463a6b9fc3fdc6d0419.tar.gz
sshfs-97228b866ab9a37a55244463a6b9fc3fdc6d0419.tar.bz2
sshfs-97228b866ab9a37a55244463a6b9fc3fdc6d0419.zip
Submit max 32k reads and writes to the sftp server
Also don't limit the kernel to 64k reads and writes, rather split into 32k sized chunks and send them to the server all at once. This is more efficient and less demanding from the server. Reported by Ludovic Courtès. Fix suggested by Niels Möller
-rw-r--r--ChangeLog8
-rw-r--r--sshfs.c375
2 files changed, 269 insertions, 114 deletions
diff --git a/ChangeLog b/ChangeLog
index 4c6ba2d..8f11917 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,11 @@
+2011-11-16 Miklos Szeredi <miklos@szeredi.hu>
+
+ * Submit max 32k reads and writes to the sftp server. Also don't
+ limit the kernel to 64k reads and writes, rather split into 32k
+ sized chunks and send them to the server all at once. This is
+ more efficient and less demanding from the server. Reported by
+ Ludovic Courtès. Fix suggested by Niels Möller
+
2011-11-14 Miklos Szeredi <miklos@szeredi.hu>
* Fix double free if reconnection races with request sending.
diff --git a/sshfs.c b/sshfs.c
index 632bb1c..e14aa71 100644
--- a/sshfs.c
+++ b/sshfs.c
@@ -146,14 +146,27 @@ struct request {
struct list_head list;
};
+struct sshfs_io {
+ int num_reqs;
+ pthread_cond_t finished;
+ int error;
+};
+
+struct read_req {
+ struct sshfs_io *sio;
+ struct list_head list;
+ struct buffer data;
+ size_t size;
+ ssize_t res;
+};
+
struct read_chunk {
- sem_t ready;
off_t offset;
size_t size;
- struct buffer data;
int refs;
- int res;
long modifver;
+ struct list_head reqs;
+ struct sshfs_io sio;
};
struct sshfs_file {
@@ -1191,8 +1204,14 @@ static void request_free(struct request *req)
static void chunk_free(struct read_chunk *chunk)
{
- buf_free(&chunk->data);
- sem_destroy(&chunk->ready);
+ while (!list_empty(&chunk->reqs)) {
+ struct read_req *rreq;
+
+ rreq = list_entry(chunk->reqs.prev, struct read_req, list);
+ list_del(&rreq->list);
+ buf_free(&rreq->data);
+ g_free(rreq);
+ }
g_free(chunk);
}
@@ -2348,131 +2367,178 @@ static int sshfs_release(const char *path, struct fuse_file_info *fi)
return 0;
}
-static int sshfs_sync_read(struct sshfs_file *sf, char *rbuf, size_t size,
- off_t offset)
-{
- int err;
- struct buffer buf;
- struct buffer data;
- struct buffer *handle = &sf->handle;
- buf_init(&buf, 0);
- buf_add_buf(&buf, handle);
- buf_add_uint64(&buf, offset);
- buf_add_uint32(&buf, size);
- err = sftp_request(SSH_FXP_READ, &buf, SSH_FXP_DATA, &data);
- if (!err) {
- uint32_t retsize;
- err = -EIO;
- if (buf_get_uint32(&data, &retsize) != -1) {
- if (retsize > size)
- fprintf(stderr, "long read\n");
- else {
- buf_get_mem(&data, rbuf, retsize);
- err = retsize;
- }
- }
- buf_free(&data);
- } else if (err == MY_EOF)
- err = 0;
- buf_free(&buf);
- return err;
-}
-
static void sshfs_read_end(struct request *req)
{
- struct read_chunk *chunk = (struct read_chunk *) req->data;
+ struct read_req *rreq = (struct read_req *) req->data;
if (req->error)
- chunk->res = req->error;
+ rreq->res = req->error;
else if (req->replied) {
- chunk->res = -EIO;
+ rreq->res = -EIO;
if (req->reply_type == SSH_FXP_STATUS) {
uint32_t serr;
if (buf_get_uint32(&req->reply, &serr) != -1) {
if (serr == SSH_FX_EOF)
- chunk->res = 0;
+ rreq->res = 0;
+ } else {
+ rreq->res = -sftp_error_to_errno(serr);
}
} else if (req->reply_type == SSH_FXP_DATA) {
uint32_t retsize;
if (buf_get_uint32(&req->reply, &retsize) != -1) {
- if (retsize > chunk->size)
+ if (retsize > rreq->size) {
fprintf(stderr, "long read\n");
- else {
- chunk->res = retsize;
- chunk->data = req->reply;
+ } else if (buf_check_get(&req->reply, retsize) != -1) {
+ rreq->res = retsize;
+ rreq->data = req->reply;
buf_init(&req->reply, 0);
}
}
- } else
+ } else {
fprintf(stderr, "protocol error\n");
- } else
- chunk->res = -EIO;
+ }
+ } else {
+ rreq->res = -EIO;
+ }
- sem_post(&chunk->ready);
- chunk_put(chunk);
+ rreq->sio->num_reqs--;
+ if (!rreq->sio->num_reqs)
+ pthread_cond_broadcast(&rreq->sio->finished);
}
static void sshfs_read_begin(struct request *req)
{
- struct read_chunk *chunk = (struct read_chunk *) req->data;
- chunk->refs++;
+ struct read_req *rreq = (struct read_req *) req->data;
+ rreq->sio->num_reqs++;
}
-static void sshfs_send_async_read(struct sshfs_file *sf,
- struct read_chunk *chunk)
+static struct read_chunk *sshfs_send_read(struct sshfs_file *sf, size_t size,
+ off_t offset)
{
- struct buffer buf;
+ struct read_chunk *chunk = g_new0(struct read_chunk, 1);
struct buffer *handle = &sf->handle;
- struct iovec iov;
- buf_init(&buf, 0);
- buf_add_buf(&buf, handle);
- buf_add_uint64(&buf, chunk->offset);
- buf_add_uint32(&buf, chunk->size);
- buf_to_iov(&buf, &iov);
- sftp_request_send(SSH_FXP_READ, &iov, 1, sshfs_read_begin,
- sshfs_read_end, 0, chunk, NULL);
- buf_free(&buf);
+ pthread_cond_init(&chunk->sio.finished, NULL);
+ list_init(&chunk->reqs);
+ chunk->size = size;
+ chunk->offset = offset;
+ chunk->refs = 1;
+
+ while (size) {
+ int err;
+ struct buffer buf;
+ struct iovec iov[1];
+ struct read_req *rreq;
+ size_t bsize = size < sshfs.max_read ? size : sshfs.max_read;
+
+ rreq = g_new0(struct read_req, 1);
+ rreq->sio = &chunk->sio;
+ rreq->size = bsize;
+ buf_init(&rreq->data, 0);
+ list_add(&rreq->list, &chunk->reqs);
+
+ buf_init(&buf, 0);
+ buf_add_buf(&buf, handle);
+ buf_add_uint64(&buf, offset);
+ buf_add_uint32(&buf, bsize);
+ buf_to_iov(&buf, &iov[0]);
+ err = sftp_request_send(SSH_FXP_READ, iov, 1,
+ sshfs_read_begin,
+ sshfs_read_end,
+ 0, rreq, NULL);
+
+ buf_free(&buf);
+ if (err)
+ break;
+
+ size -= bsize;
+ offset += bsize;
+ }
+
+ return chunk;
}
-static void submit_read(struct sshfs_file *sf, size_t size, off_t offset,
- struct read_chunk **chunkp)
+static int wait_chunk(struct read_chunk *chunk, char *buf, size_t size)
{
- struct read_chunk *chunk = g_new0(struct read_chunk, 1);
+ int res = 0;
+ struct read_req *rreq;
- sem_init(&chunk->ready, 0, 0);
- buf_init(&chunk->data, 0);
- chunk->offset = offset;
- chunk->size = size;
- chunk->refs = 1;
- pthread_mutex_lock(&sshfs.lock);
- chunk->modifver = sshfs.modifver;
- pthread_mutex_unlock(&sshfs.lock);
- sshfs_send_async_read(sf, chunk);
pthread_mutex_lock(&sshfs.lock);
- chunk_put(*chunkp);
- *chunkp = chunk;
+ while (chunk->sio.num_reqs)
+ pthread_cond_wait(&chunk->sio.finished, &sshfs.lock);
pthread_mutex_unlock(&sshfs.lock);
-}
-static int wait_chunk(struct read_chunk *chunk, char *buf, size_t size)
-{
- int res;
- while (sem_wait(&chunk->ready));
- res = chunk->res;
+
+ if (chunk->sio.error) {
+ if (chunk->sio.error != MY_EOF)
+ res = chunk->sio.error;
+
+ goto out;
+ }
+
+ while (!list_empty(&chunk->reqs) && size) {
+ rreq = list_entry(chunk->reqs.prev, struct read_req, list);
+
+ if (rreq->res < 0) {
+ chunk->sio.error = rreq->res;
+ break;
+ } if (rreq->res == 0) {
+ chunk->sio.error = MY_EOF;
+ break;
+ } else if (size < (size_t) rreq->res) {
+ buf_get_mem(&rreq->data, buf, size);
+ rreq->res -= size;
+ rreq->size -= size;
+ res += size;
+ break;
+ } else {
+ buf_get_mem(&rreq->data, buf, rreq->res);
+ res += rreq->res;
+ if ((size_t) rreq->res < rreq->size) {
+ chunk->sio.error = MY_EOF;
+ break;
+ }
+ buf += rreq->res;
+ size -= rreq->res;
+ list_del(&rreq->list);
+ buf_free(&rreq->data);
+ g_free(rreq);
+ }
+ }
+
if (res > 0) {
- if ((size_t) res > size)
- res = size;
- buf_get_mem(&chunk->data, buf, res);
chunk->offset += res;
chunk->size -= res;
- chunk->res -= res;
}
- sem_post(&chunk->ready);
+
+out:
chunk_put_locked(chunk);
return res;
}
+static int sshfs_sync_read(struct sshfs_file *sf, char *buf, size_t size,
+ off_t offset)
+{
+ struct read_chunk *chunk;
+
+ chunk = sshfs_send_read(sf, size, offset);
+ return wait_chunk(chunk, buf, size);
+}
+
+static void submit_read(struct sshfs_file *sf, size_t size, off_t offset,
+ struct read_chunk **chunkp)
+{
+ struct read_chunk *chunk;
+
+ chunk = sshfs_send_read(sf, size, offset);
+ pthread_mutex_lock(&sshfs.lock);
+ chunk->modifver = sshfs.modifver;
+ chunk_put(*chunkp);
+ *chunkp = chunk;
+ chunk->refs++;
+ pthread_mutex_unlock(&sshfs.lock);
+}
+
static struct read_chunk *search_read_chunk(struct sshfs_file *sf, off_t offset)
{
struct read_chunk *ch = sf->readahead;
@@ -2576,14 +2642,110 @@ static void sshfs_write_end(struct request *req)
sshfs_file_put(sf);
}
+static int sshfs_async_write(struct sshfs_file *sf, const char *wbuf,
+ size_t size, off_t offset)
+{
+ int err = 0;
+ struct buffer *handle = &sf->handle;
+
+ while (!err && size) {
+ struct buffer buf;
+ struct iovec iov[2];
+ size_t bsize = size < sshfs.max_write ? size : sshfs.max_write;
+
+ buf_init(&buf, 0);
+ buf_add_buf(&buf, handle);
+ buf_add_uint64(&buf, offset);
+ buf_add_uint32(&buf, bsize);
+ buf_to_iov(&buf, &iov[0]);
+ iov[1].iov_base = (void *) wbuf;
+ iov[1].iov_len = bsize;
+ err = sftp_request_send(SSH_FXP_WRITE, iov, 2,
+ sshfs_write_begin, sshfs_write_end,
+ 0, sf, NULL);
+ buf_free(&buf);
+ size -= bsize;
+ wbuf += bsize;
+ offset += bsize;
+ }
+
+ return err;
+}
+
+static void sshfs_sync_write_begin(struct request *req)
+{
+ struct sshfs_io *sio = (struct sshfs_io *) req->data;
+ sio->num_reqs++;
+}
+
+static void sshfs_sync_write_end(struct request *req)
+{
+ uint32_t serr;
+ struct sshfs_io *sio = (struct sshfs_io *) req->data;
+
+ if (req->error) {
+ sio->error = req->error;
+ } else if (req->replied) {
+ if (req->reply_type != SSH_FXP_STATUS) {
+ fprintf(stderr, "protocol error\n");
+ } else if (buf_get_uint32(&req->reply, &serr) != -1 &&
+ serr != SSH_FX_OK) {
+ sio->error = -EIO;
+ }
+ }
+ sio->num_reqs--;
+ if (!sio->num_reqs)
+ pthread_cond_broadcast(&sio->finished);
+}
+
+
+static int sshfs_sync_write(struct sshfs_file *sf, const char *wbuf,
+ size_t size, off_t offset)
+{
+ int err = 0;
+ struct buffer *handle = &sf->handle;
+ struct sshfs_io sio = { .error = 0, .num_reqs = 0 };
+
+ pthread_cond_init(&sio.finished, NULL);
+
+ while (!err && size) {
+ struct buffer buf;
+ struct iovec iov[2];
+ size_t bsize = size < sshfs.max_write ? size : sshfs.max_write;
+
+ buf_init(&buf, 0);
+ buf_add_buf(&buf, handle);
+ buf_add_uint64(&buf, offset);
+ buf_add_uint32(&buf, bsize);
+ buf_to_iov(&buf, &iov[0]);
+ iov[1].iov_base = (void *) wbuf;
+ iov[1].iov_len = bsize;
+ err = sftp_request_send(SSH_FXP_WRITE, iov, 2,
+ sshfs_sync_write_begin,
+ sshfs_sync_write_end,
+ 0, &sio, NULL);
+ buf_free(&buf);
+ size -= bsize;
+ wbuf += bsize;
+ offset += bsize;
+ }
+
+ pthread_mutex_lock(&sshfs.lock);
+ while (sio.num_reqs)
+ pthread_cond_wait(&sio.finished, &sshfs.lock);
+ pthread_mutex_unlock(&sshfs.lock);
+
+ if (!err)
+ err = sio.error;
+
+ return err;
+}
+
static int sshfs_write(const char *path, const char *wbuf, size_t size,
off_t offset, struct fuse_file_info *fi)
{
int err;
- struct buffer buf;
struct sshfs_file *sf = get_sshfs_file(fi);
- struct buffer *handle = &sf->handle;
- struct iovec iov[2];
(void) path;
@@ -2591,22 +2753,12 @@ static int sshfs_write(const char *path, const char *wbuf, size_t size,
return -EIO;
sshfs_inc_modifver();
- buf_init(&buf, 0);
- buf_add_buf(&buf, handle);
- buf_add_uint64(&buf, offset);
- buf_add_uint32(&buf, size);
- buf_to_iov(&buf, &iov[0]);
- iov[1].iov_base = (void *) wbuf;
- iov[1].iov_len = size;
- if (!sshfs.sync_write && !sf->write_error) {
- err = sftp_request_send(SSH_FXP_WRITE, iov, 2,
- sshfs_write_begin, sshfs_write_end,
- 0, sf, NULL);
- } else {
- err = sftp_request_iov(SSH_FXP_WRITE, iov, 2, SSH_FXP_STATUS,
- NULL);
- }
- buf_free(&buf);
+
+ if (!sshfs.sync_write && !sf->write_error)
+ err = sshfs_async_write(sf, wbuf, size, offset);
+ else
+ err = sshfs_sync_write(sf, wbuf, size, offset);
+
return err ? err : (int) size;
}
@@ -3272,8 +3424,9 @@ int main(int argc, char *argv[])
g_thread_init(NULL);
sshfs.blksize = 4096;
- sshfs.max_read = 65536;
- sshfs.max_write = 65536;
+ /* SFTP spec says all servers should allow at least 32k I/O */
+ sshfs.max_read = 32768;
+ sshfs.max_write = 32768;
sshfs.nodelay_workaround = 1;
sshfs.nodelaysrv_workaround = 0;
sshfs.rename_workaround = 0;
@@ -3353,12 +3506,6 @@ int main(int argc, char *argv[])
if (fuse_is_lib_option("ac_attr_timeout="))
fuse_opt_insert_arg(&args, 1, "-oauto_cache,ac_attr_timeout=0");
- tmp = g_strdup_printf("-omax_read=%u", sshfs.max_read);
- fuse_opt_insert_arg(&args, 1, tmp);
- g_free(tmp);
- tmp = g_strdup_printf("-omax_write=%u", sshfs.max_write);
- fuse_opt_insert_arg(&args, 1, tmp);
- g_free(tmp);
#if FUSE_VERSION >= 27
libver = fuse_version();
assert(libver >= 27);