This application demonstrates how to write to files by using the APIs
hdfsWrite() and hdfsPwrite(): hdfs_write_revised.c
Before running this application:
To build and run it, download it from this page to a MapR client or to a system with the
mapr-core package installed. Then, modify the run.sh
script in Building and Running C Applications on filesystem Clients to point to this sample
application. Run the script and then run the application.
The application includes these header files:
stdio.h
hdfs.h
errno.h
fcntl.h
The APIs are defined in hdfs.h. The file fcntl.h defines
the file-access flags.
The application performs the actions that are described in the following sections.
When you launch the application, provide the path and name of the file, the size of the file, and the number of bytes to write.
hdfs_write <filename> <filesize> <buffersize>
hdfsSetRpcTimeout() is specific to the libMapRClient
version of libhdfs and takes a value that is specified in seconds. The
default is 99 seconds. If you change this value, set it either to 0 (which eliminates
timeouts) or to a value greater than 30.
int err = hdfsSetRpcTimeout(30);
if (err) {
fprintf(stderr, "Failed to set rpc timeout!\n");
exit(-1);
}
The application tries to connect to the first filesystem cluster that is specified in the
mapr-clusters.conf file in the MAPR_HOME/conf directory
on the client. After connecting to the filesystem, the application returns a handle to the
filesystem.
hdfsFS fs = hdfsConnect("default", 0);
if (!fs) {
fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
exit(-1);
}The application stores the values of the arguments in a character array and in two
variables of type tSize. This datatype is defined in
hdfs.h and is a fixed-width, signed 32-byte integer type for storing the
size of data for read or write operations.
const char* rfile = argv[1];
tSize fileSize = strtoul(argv[2], NULL, 10);
tSize bufferSize = strtoul(argv[3], NULL, 10);The application opens the specified file, passing the following values to the
hdfsOpenFile() function:
O_WRONLY. This flag creates the file if the file does not exist and
truncates the file if the file does exist. If the file existed and you wanted to preserve
the content of the file, you would specify O_WRONLY | O_APPEND for flag.
These flags are defined in the header file fcntl.h.
Although there are two other parameters in the hdfsOpenFile() function –
the fourth and fifth, the libMapRClient version of libhdfs
ignores them.
hdfsFile writeFile = hdfsOpenFile(fs, rfile, O_WRONLY, 0, 0, 0);
if (!writeFile) {
fprintf(stderr, "Failed to open %s for writing!\n", rfile);
exit(-2);
}At this point that the application, creates a string to populate the buffer. This is the data that the application will write.
char* buffer = malloc(sizeof(char) * bufferSize);
if(buffer == NULL) {
fprintf(stderr, "Failed to allocate memory!\n");
return -2;
}
int i;
for (i=0; i<bufferSize; i++) {
buffer[i] = 'a' + i%26;
}The application calls the function writeLength():
int ret = writeLength(fs, writeFile, buffer, bufferSize, fileSize);
if (ret < 0) {
goto done;
}
This function writes the content of the buffer to the file, starting at offset 0.
int
writeLength(hdfsFS fs, hdfsFile writeFile, char *buffer, tSize bufferSize, tSize writeSize)
{
tSize writeBytes = 0;
tSize ret = 0;
uint64_t totalWrite = 0;
if (fs == NULL || writeFile == NULL || buffer == NULL) {
return -1;
}
if (writeSize == 0) {
return 0;
}
for (writeBytes=0; writeBytes<writeSize; writeBytes+=bufferSize) {
ret = hdfsWrite(fs, writeFile, (void*)buffer, bufferSize);
if (ret > 0) {
totalWrite += ret;
} else {
fprintf(stderr, "hdfsWrite failed with error %d \n", errno);
hdfsCloseFile(fs, writeFile);
return -1;
}
}
return 0;
}
The application next calls the function writeAtOffse():
tSize writeBytes = writeAtOffset(fs, writeFile, 0, buffer, bufferSize);
if (writeBytes < 0) {
goto done;
}
This function writes the content of the buffer to the file, starting at the specified offset. If the file already exists, the file is first truncated to this offset before the write operation begins. In this case, the specified offset is 0.
The difference between this function and the previous function is that, before writing, it
calls hdfsSeek() to move to the specified offset in the file.
tSize
writeAtOffset(hdfsFS fs, hdfsFile writeFile, tOffset offset,
char *buffer, tSize bufferSize)
{
tSize ret = 0;
if (fs == NULL || writeFile == NULL || buffer == NULL) {
return -1;
}
ret = hdfsSeek(fs, writeFile, offset);
if (!ret) {
//hdfsWrite will return -1 if ret != number of bytes asked to
//be written.
ret = hdfsWrite(fs, writeFile, buffer, bufferSize);
if (ret < 0) {
fprintf(stderr, "hdfsWrite failed with error %d \n", errno);
}
} else {
fprintf(stderr, "hdfsSeek failed with error %d \n", errno);
}
if (ret < 0) {
//hdfsWrite does a flush in case of an error, explicit flush
//is not required.
hdfsCloseFile(fs, writeFile);
}
//Current offset within the file will be positioned at (offset + writeBytes)th byte.
return ret;
}
The application next calls the function positionalWrite():
writeBytes = positionalWrite(fs, writeFile, 20, buffer, bufferSize);
if (writeBytes < 0) {
goto done;
}
This function writes the content of the buffer to the file, starting at the offset that you specify.
tSize
positionalWrite(hdfsFS fs, hdfsFile writeFile, tOffset offset,
char *buffer, tSize bufferSize)
{
tSize writeBytes = 0;
if (fs == NULL || writeFile == NULL || buffer == NULL) {
return -1;
}
writeBytes = hdfsPwrite(fs, writeFile, offset, buffer, bufferSize);
if (writeBytes < 0) {
fprintf(stderr, "hdfsPwrite failed with error %d \n", errno);
hdfsCloseFile(fs, writeFile);
}
//Current offset within the file will not be advanced if hdfsPwrite is used
return writeBytes;
}hdfsCloseFile(fs, writeFile);free(buffer);hdfsDisconnect(fs);/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include "hdfs.h"
#include <errno.h>
#include <fcntl.h>
tSize
writeAtOffset(hdfsFS fs, hdfsFile writeFile, tOffset offset,
char *buffer, tSize bufferSize)
{
tSize ret = 0;
if (fs == NULL || writeFile == NULL || buffer == NULL) {
return -1;
}
ret = hdfsSeek(fs, writeFile, offset);
if (!ret) {
//hdfsWrite will return -1 if ret != number of bytes asked to
//be written.
ret = hdfsWrite(fs, writeFile, buffer, bufferSize);
if (ret < 0) {
fprintf(stderr, "hdfsWrite failed with error %d \n", errno);
}
} else {
fprintf(stderr, "hdfsSeek failed with error %d \n", errno);
}
if (ret < 0) {
//hdfsWrite does a flush in case of an error, explicit flush
//is not required.
hdfsCloseFile(fs, writeFile);
}
//Current offset within the file will be positioned at (offset + writeBytes)th byte.
return ret;
}
tSize
positionalWrite(hdfsFS fs, hdfsFile writeFile, tOffset offset,
char *buffer, tSize bufferSize)
{
tSize writeBytes = 0;
if (fs == NULL || writeFile == NULL || buffer == NULL) {
return -1;
}
writeBytes = hdfsPwrite(fs, writeFile, offset, buffer, bufferSize);
if (writeBytes < 0) {
fprintf(stderr, "hdfsPwrite failed with error %d \n", errno);
hdfsCloseFile(fs, writeFile);
}
//Current offset within the file will not be advanced if hdfsPwrite is used
return writeBytes;
}
int
writeLength(hdfsFS fs, hdfsFile writeFile, char *buffer, tSize bufferSize, tSize writeSize)
{
tSize writeBytes = 0;
tSize ret = 0;
uint64_t totalWrite = 0;
if (fs == NULL || writeFile == NULL || buffer == NULL) {
return -1;
}
if (writeSize == 0) {
return 0;
}
for (writeBytes=0; writeBytes<writeSize; writeBytes+=bufferSize) {
ret = hdfsWrite(fs, writeFile, (void*)buffer, bufferSize);
if (ret > 0) {
totalWrite += ret;
} else {
fprintf(stderr, "hdfsWrite failed with error %d \n", errno);
hdfsCloseFile(fs, writeFile);
return -1;
}
}
return 0;
}
int
main(int argc, char **argv)
{
if (argc != 4) {
fprintf(stderr, "Usage: hdfs_write <filename> <filesize> <buffersize>\n");
exit(-1);
}
int err = hdfsSetRpcTimeout(30);
if (err) {
fprintf(stderr, "Oops! Failed to set rpc timeout!\n");
exit(-1);
}
hdfsFS fs = hdfsConnect("default", 0);
if (!fs) {
fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
exit(-1);
}
const char* rfile = argv[1];
tSize fileSize = strtoul(argv[2], NULL, 10);
tSize bufferSize = strtoul(argv[3], NULL, 10);
//O_WRONLY creates the file if the file doesn't exist.
//O_WRONLY truncates the file if the file exists.
//O_WRONLY | O_APPEND will preserve the contents of the file if the file exists.
hdfsFile writeFile = hdfsOpenFile(fs, rfile, O_WRONLY, 0, 0, 0);
if (!writeFile) {
fprintf(stderr, "Failed to open %s for writing!\n", rfile);
exit(-2);
}
char* buffer = malloc(sizeof(char) * bufferSize);
if(buffer == NULL) {
fprintf(stderr, "Failed to allocate memory!\n");
return -2;
}
int i;
for (i=0; i<bufferSize; i++) {
buffer[i] = 'a' + i%26;
}
//Write entire file from the beginning
int ret = writeLength(fs, writeFile, buffer, bufferSize, fileSize);
if (ret < 0) {
goto done;
}
//Write file at a particular offset
//In this case, we are writing from offset 0
tSize writeBytes = writeAtOffset(fs, writeFile, 0, buffer, bufferSize);
if (writeBytes < 0) {
goto done;
}
//Write file at a particular offset using positional write
//In this case, write from offset 20
writeBytes = positionalWrite(fs, writeFile, 20, buffer, bufferSize);
if (writeBytes < 0) {
goto done;
}
hdfsCloseFile(fs, writeFile);
done:
free(buffer);
hdfsDisconnect(fs);
return 0;
}
/**
* vim: ts=4: sw=4: et:
*/