I am using RSA algorithm to encrypt data. The frontend uses the window.crypto and the backend uses the pycryptodome.
However, I discovered that after the frontend encrypts a payload, the backend can't decrypt it. The frontend refers MDN's example code: https://github.com/mdn/dom-examples/blob/main/web-crypto/import-key/spki.js, and my code is not shown on GitHub now. The backend is copied from the example which is written by myself: https://github.com/zvms/rsa-bcrypt-jwt-login-eg/blob/main/cert.py. The full code is showed here:
Frontend
crpyto.ts
// Function to convert PEM formatted public key to a CryptoKey object
async function importPublicKey(pemKey: string) {
const withoutNewlines = pemKey
.replace('-----BEGIN PUBLIC KEY-----', '')
.replace('-----END PUBLIC KEY-----', '')
.split('\n')
.filter((line) => line.trim() !== '')
.join('')
console.log(withoutNewlines)
// Base64 decode the string to get the binary data
const binaryDerString = window.atob(withoutNewlines)
// Convert from a binary string to an ArrayBuffer
const binaryDer = str2ab(binaryDerString)
return window.crypto.subtle.importKey(
'spki',
binaryDer,
{
name: 'RSA-OAEP',
hash: 'SHA-256' // Specify the hash algorithm
},
true,
['encrypt']
)
}
// Utility function to convert a binary string to an ArrayBuffer
function str2ab(str: string) {
const buf = new ArrayBuffer(str.length)
const bufView = new Uint8Array(buf)
for (let i = 0, strLen = str.length; i < strLen; i++) {
bufView[i] = str.charCodeAt(i)
}
return buf
}
// Function to encrypt data using RSA-OAEP
async function encryptData(publicKey: CryptoKey, data: string) {
const encoder = new TextEncoder()
const encodedData = encoder.encode(data)
const encryptedData = await window.crypto.subtle.encrypt(
{
name: 'RSA-OAEP'
},
publicKey,
encodedData
)
return encryptedData
}
export { importPublicKey, encryptData }
auth.ts
async function UserLogin(user: string, password: string, term: 'long' | 'short' = 'long') {
const payload = JSON.stringify({
password: password,
time: Date.now()
})
const publicKey = await importPublicKey(await getRSAPublicCert())
const credential = await encryptData(publicKey, payload)
// console.log(credential)
const hex = byteArrayToHex(new Uint8Array(credential))
console.log(`User ${user} login with ${term} term, with the credential ${hex}`)
const result = (await axios('/user/auth', {
method: 'POST',
data: {
id: user.toString(),
credential: hex,
mode: term
}
})) as Response<LoginResult>
}
Backend
import bcrypt
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from fastapi import HTTPException
import jwt
import json
import datetime
from database import db
from bson import ObjectId
from bcrypt import checkpw
class Auth:
password: str
time: int
def hash_password(password):
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
def check_password(password, hashed):
return bcrypt.checkpw(password.encode("utf-8"), hashed)
public_key = RSA.import_key(open("rsa_public_key.pem", "rb").read())
private_key = RSA.import_key(open("rsa_private_key.pem", "rb").read())
jwt_private_key = open("aes_key.txt", "r").read()
def rsa_encrypt(plaintext):
cipher = PKCS1_OAEP.new(public_key)
encrypt_text = cipher.encrypt(bytes(plaintext.encode("utf8")))
return encrypt_text.hex()
def rsa_decrypt(ciphertext):
cipher = PKCS1_OAEP.new(private_key)
decrypt_text = cipher.decrypt(bytes.fromhex(ciphertext))
return decrypt_text.decode("utf8")
def jwt_encode(id: str):
payload = {
"exp": datetime.datetime.utcnow() + datetime.timedelta(seconds=60),
"iat": datetime.datetime.utcnow(),
"sub": id,
"scope": "access_token",
"type": "long-term",
}
return jwt.encode(payload, jwt_private_key, algorithm="HS256")
def jwt_decode(token):
return jwt.decode(token, jwt_private_key, algorithms=["HS256"], verify=True)
def validate_by_cert(id: str, cert: str):
auth_field = json.loads(rsa_decrypt(cert))
time = auth_field["time"]
# in a minute
if time > datetime.datetime.now().timestamp() + 60:
raise HTTPException(status_code=401, detail="Token expired")
if checkpwd(id, auth_field["password"]):
raise HTTPException(status_code=401, detail="Password incorrect")
return jwt_encode(id)
def checkpwd(id: str, pwd: str):
user = db.zvms.users.find_one({"_id": ObjectId(id)})
if checkpw(bytes(pwd, 'utf-8'), user.get('password')):
return True
return False
and the api handler directly calls validate_by_cert method.
Key Generation
Directly use openssl.
openssl genrsa -out rsa_private_key.pem 1024
openssl rsa -in rsa_private_key.pem -pubout -out rsa_public_key.pem
openssl rand -hex 32 > aes_key.txt
Behavior
It returns error when calling this api:
File "/Users/*/**/zvms/routers/users_router.py", line 43, in auth_user
result = validate_by_cert(id, credential)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/*/**/zvms/util/cert.py", line 59, in validate_by_cert
auth_field = json.loads(rsa_decrypt(cert))
^^^^^^^^^^^^^^^^^
File "/Users/*/**/zvms/util/cert.py", line 39, in rsa_decrypt
decrypt_text = cipher.decrypt(bytes.fromhex(ciphertext))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/*/anaconda3/envs/zvms/lib/python3.12/site-packages/Crypto/Cipher/PKCS1_OAEP.py", line 191, in decrypt
raise ValueError("Incorrect decryption.")
However, when I call the native rsa_encrypt method and use it to decrypt, it works well.
The frontend and backend can transfer encrypted data successfully.
The Python stack trace shows that the error is caused in
rsa_decrypt().The RSA decryption fails because in the Python/PyCryptodome code the default value, namely SHA-1, is used as (OAEP and MGF1) digest, see
Crypto.Cipher.PKCS1_OAEP.new(), description ofhashAlgoparameter, while in the JavaScript/WebCrypto code SHA-256 is applied (see thewindow.crypto.subtle.importKey()call).To use SHA-256 on the Python side as well, fix in
rsa_decrypt():With this change, decryption works for me.