Relative Content

Tag Archive for pythonamazon-web-servicesdataframepyspark

transfer csv/parquet data from one s3 bucket to other s3 bucket with the help of pyspark

I am using spark cluster which is consisting of ec2 machines and now with the help of pyspark i want to transfer data from source s3 bucket to destination bucket in parquet format.both of the buckets have different IAM roles and bucket policies. I am setting spark aws access key and secret key at hadoop level using this code def set_s3_credentials(access_key, secret_key):
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set(“fs.s3a.access.key”, access_key)
hadoop_conf.set(“fs.s3a.secret.key”, secret_key) but i am getting this ERROR – Error occurred: An error occurred while calling o56.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 8 times, most recent failure: Lost task 0.7 in stage 1.0 (TID 8) (172.12.2.153 executor 0): java.nio.file.AccessDeniedException: s3a://wayfinder-doceree-s3-customer-data/export/mx_submits_2024_06/part-00000-tid-1074794359726202293-9f504b1d-da56-4031-963b-be9f22348eb4-141340-1.c000.snappy.parquet: getFileStatus on s3a://wayfinder-doceree-s3-customer-data/export/mx_submits_2024_06/part-00000-tid-1074794359726202293-9f504b1d-da56-4031-963b-be9f22348eb4-141340-1.c000.snappy.parquet: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 6C07ME6ZAV2B4XSA; S3 Extended Request ID: Dx8EtSGjnYMl0Ld6kwSs9L9CMk0sdrDkzzdCSsXaG2KXk1uhC6iAIkly0mBCmB6rehqSuat0RlR0WHjPQlFkQQ==; Proxy: null), S3 Extended Request ID: Dx8EtSGjnYMl0Ld6kwSs9L9CMk0sdrDkzzdCSsXaG2KXk1uhC6iAIkly0mBCmB6rehqSuat0RlR0WHjPQlFkQQ==:403 Forbidden
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255).some times i am getting 056.parquet in place of 056.showString. In my code first seting credentials for source bucket by using above funciton and then reading my data from source bucket using spark.read.parquet() and successfully able to read it in dataframe and able to use aggregate function like df.count() or able to see it using df.show(). but after that i am trying to change the credentials for destination bucket using the above function but after changing those credentails when i am trying to write or use df.show() the data it is showing me the exception.
def copy_parquet_file():
try:
# Set and log source AWS credentials
set_s3_credentials(source_aws_access_key_id, source_aws_secret_access_key)
logging.info(f”Set source AWS credentials for bucket: ‘{source_bucket_name}'”)
# update_spark_conf(source_s3_conf)