diff --git a/pywebhdfs/webhdfs.py b/pywebhdfs/webhdfs.py index 656b7f6..36720a1 100644 --- a/pywebhdfs/webhdfs.py +++ b/pywebhdfs/webhdfs.py @@ -32,6 +32,7 @@ def __init__(self, host='localhost', port='50070', user_name=None): self.host = host self.port = port self.user_name = user_name + self.session = requests.Session() # create base uri to be used in request operations self.base_uri = 'http://{host}:{port}/webhdfs/v1/'.format( @@ -78,7 +79,7 @@ def create_file(self, path, file_data, **kwargs): # make the initial CREATE call to the HDFS namenode optional_args = kwargs uri = self._create_uri(path, operations.CREATE, **optional_args) - init_response = requests.put(uri, allow_redirects=False) + init_response = self._put(uri, allow_redirects=False) if not init_response.status_code == httplib.TEMPORARY_REDIRECT: _raise_pywebhdfs_exception( @@ -88,7 +89,7 @@ def create_file(self, path, file_data, **kwargs): # initial response from the namenode and make the CREATE request # to the datanode uri = init_response.headers['location'] - response = requests.put( + response = self._put( uri, data=file_data, headers={'content-type': 'application/octet-stream'}) @@ -133,7 +134,7 @@ def append_file(self, path, file_data, **kwargs): # make the initial APPEND call to the HDFS namenode optional_args = kwargs uri = self._create_uri(path, operations.APPEND, **optional_args) - init_response = requests.post(uri, allow_redirects=False) + init_response = self._post(uri, allow_redirects=False) if not init_response.status_code == httplib.TEMPORARY_REDIRECT: _raise_pywebhdfs_exception( @@ -143,7 +144,7 @@ def append_file(self, path, file_data, **kwargs): # initial response from the namenode and make the APPEND request # to the datanode uri = init_response.headers['location'] - response = requests.post( + response = self._post( uri, data=file_data, headers={'content-type': 'application/octet-stream'}) @@ -180,7 +181,7 @@ def read_file(self, path, **kwargs): optional_args = kwargs uri = self._create_uri(path, operations.OPEN, **optional_args) - response = requests.get(uri, allow_redirects=True) + response = self._get(uri, allow_redirects=True) if not response.status_code == httplib.OK: _raise_pywebhdfs_exception(response.status_code, response.content) @@ -213,7 +214,7 @@ def make_dir(self, path, **kwargs): optional_args = kwargs uri = self._create_uri(path, operations.MKDIRS, **optional_args) - response = requests.put(uri, allow_redirects=True) + response = self._put(uri, allow_redirects=True) if not response.status_code == httplib.OK: _raise_pywebhdfs_exception(response.status_code, response.content) @@ -243,7 +244,7 @@ def rename_file_dir(self, path, destination_path): uri = self._create_uri(path, operations.RENAME, destination=destination_path) - response = requests.put(uri, allow_redirects=True) + response = self._put(uri, allow_redirects=True) if not response.status_code == httplib.OK: _raise_pywebhdfs_exception(response.status_code, response.content) @@ -274,7 +275,7 @@ def delete_file_dir(self, path, recursive=False): """ uri = self._create_uri(path, operations.DELETE, recursive=recursive) - response = requests.delete(uri, allow_redirects=True) + response = self._delete(uri, allow_redirects=True) if not response.status_code == httplib.OK: _raise_pywebhdfs_exception(response.status_code, response.content) @@ -332,7 +333,7 @@ def get_file_dir_status(self, path): """ uri = self._create_uri(path, operations.GETFILESTATUS) - response = requests.get(uri, allow_redirects=True) + response = self._get(uri, allow_redirects=True) if not response.status_code == httplib.OK: _raise_pywebhdfs_exception(response.status_code, response.content) @@ -388,7 +389,7 @@ def list_dir(self, path): """ uri = self._create_uri(path, operations.LISTSTATUS) - response = requests.get(uri, allow_redirects=True) + response = self._get(uri, allow_redirects=True) if not response.status_code == httplib.OK: _raise_pywebhdfs_exception(response.status_code, response.content) @@ -430,6 +431,18 @@ def _create_uri(self, path, operation, **kwargs): return uri + def _put(self, *args, **kwargs): + return self.session.put(*args, **kwargs) + + def _post(self, *args, **kwargs): + return self.session.post(*args, **kwargs) + + def _get(self, *args, **kwargs): + return self.session.get(*args, **kwargs) + + def _delete(self, *args, **kwargs): + return self.session.delete(*args, **kwargs) + def _raise_pywebhdfs_exception(resp_code, message=None):