How to convert and insert Vec<MyStruct> to postgres custom type

70 Views Asked by At

Background

I'm trying to recreate a microservice written in Golang with Rust. Due to database security policies, the database user can only access some stored functions and procedures in PostgreSQL.

We have a custom type in database:

CREATE TYPE product_data AS (
  id          uuid,
  reference   varchar,
  name        varchar,
  description text,
  price       numeric
);

And a stored procedure:

CREATE OR REPLACE PROCEDURE update_products (
  p_location_id uuid,
  p_products product_data[]
)
LANGUAGE plpgsql
SECURITY DEFINER
AS $$
  DECLARE
    l_product product_data;
  BEGIN
    -- Loop through every item to UPSERT as much as possible instead of
    -- UPSERTing it all, to avoid unable to save when any conflict arise
    FOR l_product IN SELECT * FROM UNNEST(p_products) ORDER BY updated_at LOOP
      BEGIN
        INSERT INTO product (
          id,
          location_id,
          reference,
          name,
          description,
          price
        )
        SELECT l_product.id,
               location_id,
               l_product.reference,
               l_product.name,
               l_product.description,
               l_product.price
        ON CONFLICT (id) DO UPDATE SET
          reference = EXCLUDED.reference,
          name = EXCLUDED.name,
          description = EXCLUDED.description,
          price = EXCLUDED.price;
      EXCEPTION WHEN unique_violation THEN
        -- Do nothing
      END;
    END LOOP;
  END;
$$;

The relevant Golang code:

package main

import (
    "fmt"
    "gopkg.in/guregu/null.v4"
    "log"
    "strconv"
    "strings"
    "github.com/google/uuid"

    // Encoding
    "encoding/json"

    // Database
    "database/sql"
    "database/sql/driver"
    "github.com/lib/pq"
)

var db *sql.DB

func quoteNullString(input null.String, inputType string, level int) string {
    if true == input.Valid {
        switch inputType {
        case "numeric":
            if _, err := strconv.ParseFloat(input.ValueOrZero(), 64); err == nil {
                return input.ValueOrZero()
            } else {
                return ""
            }
        default:
            var repetition int
            for i := 0; i < level; i++ {
                repetition = repetition*2 + 1
            }
            return fmt.Sprintf(`%s"%s%s"`, strings.Repeat(`\`, repetition), input.ValueOrZero(), strings.Repeat(`\`, repetition))
        }
    } else {
        return ""
    }
}

type ProductData struct {
    Id          uuid.UUID   `json:"id"`
    Reference   string      `json:"reference"`
    Name        string      `json:"name"`
    Description null.String `json:"description"`
    Price       *string     `json:"price"`
}

func (n ProductData) Value() (driver.Value, error) {
    var price string
    if nil != n.Price {
        price = *n.Price
    } else {
        price = "0"
    }

    str := fmt.Sprintf("(%s,%s,%s,%s,%s)",
        n.Id,
        n.Reference,
        n.Name,
        quoteNullString(n.Description, "string", 0),
        price,
    )

    return []byte(str), nil
}

func updateProductData (locationId uuid.UUID, originalProductsData []ProductData) {
    // Check every item and assign a copy if valid
    var products []ProductData
    for _, product := range originalProductsData {
        // Trim reference and check whether it is empty
        product.Reference = strings.TrimSpace(product.Reference)
        if "" == product.Reference {
            continue
        }

        products = append(products, product)
    }

    // Update database
    if _, err := db.Exec("CALL update_products($1, $2);", locationId, pq.Array(products)); nil != err {
        log.Print(err)
    }

    return
}

So, when we want to update a batch of products data, we just call updateProductData with the array of ProductData and it will be done.

So far so good!

Problem

Now with Rust, I am trying to archive the same result with this:

// Encoding
use serde::{Deserialize, Serialize};
use uuid::Uuid;

// Postgres
use deadpool_postgres::{Config, ManagerConfig, Pool, RecyclingMethod, Runtime};
use rust_decimal::prelude::*;
use tokio_postgres::{
    types::ToSql,
    NoTls,
};

use std::env::var;

#[derive(Serialize, Deserialize, Debug, ToSql)]
#[postgres(name = "product_data")]
struct ProductData {
    id: Uuid,
    reference: String,
    name: String,
    description: Option<String>,
    price: Decimal,
}

async fn update_product_data(
    location_id: Uuid,
    original_products_data: Vec<ProductData>,
) {
    // Create database pool
    let mut config = Config::new();

    config.user = Some(var("PGUSER").expect("PGUSER not defined"));
    config.password = Some(var("PGPASSWORD").unwrap_or("".to_string()));
    config.dbname = Some(var("PGDATABASE").expect("PGDATABASE not defined"));
    config.host = Some(var("PGHOST").unwrap_or("localhost".to_string()));
    config.manager = Some(ManagerConfig {
        recycling_method: RecyclingMethod::Fast,
    });

    let db_pool = config.create_pool(Some(Runtime::Tokio1), NoTls).unwrap();
    db_pool.resize(15);

    let conn = db_pool
                    .get()
                    .await
                    .expect("Database connection error");

    if !original_products_data.is_empty() {
        // Check every item and assign a copy if valid
        let mut products: Vec<ProductData> =
            Vec::with_capacity(original_products_data.len());
        for mut product in original_products_data {
            // Trim reference and check whether it is empty
            product.reference = product.reference.trim().to_string();
            if "" == product.reference {
                continue;
            }

            products.push(product);
        }

        // Update database
        if let Err(e) = conn
            .execute(
                "CALL update_products($1, $2);",
                &[&location_id, &products],
            )
            .await
        {
            println!("{}", e.to_string());
        }
    }
}

And this would result the following error:

error serializing parameter 1: cannot convert between the Rust type `alloc::vec::Vec<api::ProductData>` and the Postgres type `_product_data`

So, the question is, how can I convert correctly Vec into an array of PostgreSQL custom type?

PD: I tried to manually write the to_sql code:

impl ToSql for ProductData {
    fn to_sql(
        &self,
        ty: &Type,
        out: &mut BytesMut,
    ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> {
        let to_sql_bytes = |ty: &Type, value: &[u8]| -> BytesMut {
            let mut out = BytesMut::with_capacity(value.len() + 8);
            out.put_u32(Type::oid(ty));

            if value.len() == 0 {
                out.extend_from_slice(&[255, 255, 255, 255]);
            } else {
                out.put_u32(value.len() as u32);
                out.extend_from_slice(value);
            }
            out
        };

        let write_uuid_bytes = |opt: &Uuid| to_sql_bytes(&Type::UUID, opt.as_bytes());
        let write_varchar_bytes = |opt: &String| to_sql_bytes(&Type::VARCHAR, opt.as_bytes());
        let write_text_bytes = |opt: &String| to_sql_bytes(&Type::TEXT, opt.as_bytes());
        let write_numeric_bytes =
            |opt: &Decimal| to_sql_bytes(&Type::NUMERIC, opt.to_string().as_bytes());

        let write_null_text_bytes = |opt: &Option<String>| {
            opt.as_ref()
                .map_or_else(|| to_sql_bytes(&Type::TEXT, &[]), |v| write_text_bytes(&v))
        };

        out.extend_from_slice(&[0, 0, 0, 5]);
        out.unsplit(write_uuid_bytes(&self.id));
        out.unsplit(write_varchar_bytes(&self.reference));
        out.unsplit(write_varchar_bytes(&self.name));
        out.unsplit(write_null_text_bytes(&self.description));
        out.unsplit(write_numeric_bytes(&self.price));

        Ok(IsNull::No)
    }

    fn accepts(ty: &Type) -> bool {
        true
    }

    to_sql_checked!();
}

But this will give me insufficient data left in message error, weird.

0

There are 0 best solutions below