Background
I'm trying to recreate a microservice written in Golang with Rust. Due to database security policies, the database user can only access some stored functions and procedures in PostgreSQL.
We have a custom type in database:
CREATE TYPE product_data AS (
id uuid,
reference varchar,
name varchar,
description text,
price numeric
);
And a stored procedure:
CREATE OR REPLACE PROCEDURE update_products (
p_location_id uuid,
p_products product_data[]
)
LANGUAGE plpgsql
SECURITY DEFINER
AS $$
DECLARE
l_product product_data;
BEGIN
-- Loop through every item to UPSERT as much as possible instead of
-- UPSERTing it all, to avoid unable to save when any conflict arise
FOR l_product IN SELECT * FROM UNNEST(p_products) ORDER BY updated_at LOOP
BEGIN
INSERT INTO product (
id,
location_id,
reference,
name,
description,
price
)
SELECT l_product.id,
location_id,
l_product.reference,
l_product.name,
l_product.description,
l_product.price
ON CONFLICT (id) DO UPDATE SET
reference = EXCLUDED.reference,
name = EXCLUDED.name,
description = EXCLUDED.description,
price = EXCLUDED.price;
EXCEPTION WHEN unique_violation THEN
-- Do nothing
END;
END LOOP;
END;
$$;
The relevant Golang code:
package main
import (
"fmt"
"gopkg.in/guregu/null.v4"
"log"
"strconv"
"strings"
"github.com/google/uuid"
// Encoding
"encoding/json"
// Database
"database/sql"
"database/sql/driver"
"github.com/lib/pq"
)
var db *sql.DB
func quoteNullString(input null.String, inputType string, level int) string {
if true == input.Valid {
switch inputType {
case "numeric":
if _, err := strconv.ParseFloat(input.ValueOrZero(), 64); err == nil {
return input.ValueOrZero()
} else {
return ""
}
default:
var repetition int
for i := 0; i < level; i++ {
repetition = repetition*2 + 1
}
return fmt.Sprintf(`%s"%s%s"`, strings.Repeat(`\`, repetition), input.ValueOrZero(), strings.Repeat(`\`, repetition))
}
} else {
return ""
}
}
type ProductData struct {
Id uuid.UUID `json:"id"`
Reference string `json:"reference"`
Name string `json:"name"`
Description null.String `json:"description"`
Price *string `json:"price"`
}
func (n ProductData) Value() (driver.Value, error) {
var price string
if nil != n.Price {
price = *n.Price
} else {
price = "0"
}
str := fmt.Sprintf("(%s,%s,%s,%s,%s)",
n.Id,
n.Reference,
n.Name,
quoteNullString(n.Description, "string", 0),
price,
)
return []byte(str), nil
}
func updateProductData (locationId uuid.UUID, originalProductsData []ProductData) {
// Check every item and assign a copy if valid
var products []ProductData
for _, product := range originalProductsData {
// Trim reference and check whether it is empty
product.Reference = strings.TrimSpace(product.Reference)
if "" == product.Reference {
continue
}
products = append(products, product)
}
// Update database
if _, err := db.Exec("CALL update_products($1, $2);", locationId, pq.Array(products)); nil != err {
log.Print(err)
}
return
}
So, when we want to update a batch of products data, we just call updateProductData with the array of ProductData and it will be done.
So far so good!
Problem
Now with Rust, I am trying to archive the same result with this:
// Encoding
use serde::{Deserialize, Serialize};
use uuid::Uuid;
// Postgres
use deadpool_postgres::{Config, ManagerConfig, Pool, RecyclingMethod, Runtime};
use rust_decimal::prelude::*;
use tokio_postgres::{
types::ToSql,
NoTls,
};
use std::env::var;
#[derive(Serialize, Deserialize, Debug, ToSql)]
#[postgres(name = "product_data")]
struct ProductData {
id: Uuid,
reference: String,
name: String,
description: Option<String>,
price: Decimal,
}
async fn update_product_data(
location_id: Uuid,
original_products_data: Vec<ProductData>,
) {
// Create database pool
let mut config = Config::new();
config.user = Some(var("PGUSER").expect("PGUSER not defined"));
config.password = Some(var("PGPASSWORD").unwrap_or("".to_string()));
config.dbname = Some(var("PGDATABASE").expect("PGDATABASE not defined"));
config.host = Some(var("PGHOST").unwrap_or("localhost".to_string()));
config.manager = Some(ManagerConfig {
recycling_method: RecyclingMethod::Fast,
});
let db_pool = config.create_pool(Some(Runtime::Tokio1), NoTls).unwrap();
db_pool.resize(15);
let conn = db_pool
.get()
.await
.expect("Database connection error");
if !original_products_data.is_empty() {
// Check every item and assign a copy if valid
let mut products: Vec<ProductData> =
Vec::with_capacity(original_products_data.len());
for mut product in original_products_data {
// Trim reference and check whether it is empty
product.reference = product.reference.trim().to_string();
if "" == product.reference {
continue;
}
products.push(product);
}
// Update database
if let Err(e) = conn
.execute(
"CALL update_products($1, $2);",
&[&location_id, &products],
)
.await
{
println!("{}", e.to_string());
}
}
}
And this would result the following error:
error serializing parameter 1: cannot convert between the Rust type `alloc::vec::Vec<api::ProductData>` and the Postgres type `_product_data`
So, the question is, how can I convert correctly Vec into an array of PostgreSQL custom type?
PD: I tried to manually write the to_sql code:
impl ToSql for ProductData {
fn to_sql(
&self,
ty: &Type,
out: &mut BytesMut,
) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> {
let to_sql_bytes = |ty: &Type, value: &[u8]| -> BytesMut {
let mut out = BytesMut::with_capacity(value.len() + 8);
out.put_u32(Type::oid(ty));
if value.len() == 0 {
out.extend_from_slice(&[255, 255, 255, 255]);
} else {
out.put_u32(value.len() as u32);
out.extend_from_slice(value);
}
out
};
let write_uuid_bytes = |opt: &Uuid| to_sql_bytes(&Type::UUID, opt.as_bytes());
let write_varchar_bytes = |opt: &String| to_sql_bytes(&Type::VARCHAR, opt.as_bytes());
let write_text_bytes = |opt: &String| to_sql_bytes(&Type::TEXT, opt.as_bytes());
let write_numeric_bytes =
|opt: &Decimal| to_sql_bytes(&Type::NUMERIC, opt.to_string().as_bytes());
let write_null_text_bytes = |opt: &Option<String>| {
opt.as_ref()
.map_or_else(|| to_sql_bytes(&Type::TEXT, &[]), |v| write_text_bytes(&v))
};
out.extend_from_slice(&[0, 0, 0, 5]);
out.unsplit(write_uuid_bytes(&self.id));
out.unsplit(write_varchar_bytes(&self.reference));
out.unsplit(write_varchar_bytes(&self.name));
out.unsplit(write_null_text_bytes(&self.description));
out.unsplit(write_numeric_bytes(&self.price));
Ok(IsNull::No)
}
fn accepts(ty: &Type) -> bool {
true
}
to_sql_checked!();
}
But this will give me insufficient data left in message error, weird.