Can Cassanda aggregate LINQ expression in its query?

201 Views Asked by At

I am quite new to Cassandra so I have a question that I can not find answer to. In EF Core I can pass a list of LINQ expression as conditions and aggregate them so I can find what I need for example:

public async Task<IEnumerable<string>> GetDataStream(List<Expression<Func<Model, bool>>> predicates)
{
    var query = _context.Model.AsQueryable();
    if (predicates != null)
    {
        query = predicates.Aggregate(query, (@event, condition) => @event.Where(condition));
    }
    return await query.Select(data => data.).ToListAsync();
} 

Now I am wondering if there is a such possibility in Cassandra. I tried:

public async Task<IEnumerable<Model>> Find(List<Expression<Func<Model, bool>>> predicates, int assetId)
{
    IQueryable<Model> query = _table.AsQueryable();
    if (predicates != null)
    {
        query = predicates.Aggregate(query, (@event, condition) => @event.Where(condition));
    }

    return await query.Select(data => data); // here is a problem dont know ow to execute this
}

So is such a thing possible?

EDIT:

So I tried with aggregate combination

query.Select(d => d).Execute();

also and got this exception in result

The expression Call = [SELECT gap_end, gap_start, uuid FROM gaps_state_data.Where(data => (data.EndValue == null))] is not supported in None parse phase.

It looks expression aggregate is not being format for some reason.

2

There are 2 best solutions below

5
Wojciech Szabowicz On BEST ANSWER

Ok, I figured this one out, using hint in comments, let me show full code of this and how it should work:

public Task<IEnumerable<Model>> Find(List<Expression<Func<Model, bool>>> predicates)
{
    CqlQuery<Model> query = _table.Select(d => d);

    if (predicates != null)
    {
        query = predicates.Aggregate(query, (@event, condition) => @event.Where(condition));
    }

    return query.ExecuteAsync();
}

As I needed to replace table type with IQueryable, but when I do this early on like _table.AsQueryable() select expression compiled itself. So I needed start expression that will change table to IQueryable and that is the role of that Select(). Afterwards I could add expressions from parameter.

1
João Reis On

I believe this is what you are looking for:

public static async Task<IEnumerable<Model>> Find(Table<Model> table, List<Expression<Func<Model, bool>>> predicates)
{
    CqlQuery<Model> query = table;
    if (predicates != null)
    {
        query = predicates.Aggregate(query, (@event, condition) => @event.Where(condition));
    }
    return await query.ExecuteAsync();
}

It is basically the same as your answer but you don't actually need the .Select() part, you just need to cast the table to a CqlQuery<Model> object.

Here's the full example that I used to test this (keep in mind that this snippet creates and drops a keyspace and table):

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Linq.Expressions;
using System.Net;
using System.Threading.Tasks;

using Cassandra;
using Cassandra.Data.Linq;
using Cassandra.Mapping.Attributes;

using Exception = System.Exception;

namespace OssSandbox
{
    public class ProgramLinq
    {
        public static void Main(string[] args)
        {
            Cassandra.Diagnostics.CassandraTraceSwitch.Level = TraceLevel.Info;
            Trace.AutoFlush = true;
            using var cluster = Cassandra.Cluster.Builder()
                .AddContactPoint(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9042))
                .Build();

            using var session = cluster.Connect();

            session.CreateKeyspaceIfNotExists("ks1", new Dictionary<string, string> { { "class", "SimpleStrategy"}, { "replication_factor", "1"} });

            var table = new Table<Model>(session);
            session.Execute("DROP TABLE IF EXISTS ks1.Model");
            table.CreateIfNotExists();

            table.Insert(new Model { Id = 1, Value1 = "1", Value2 = "2", Value3 = "3" }).Execute();
            table.Insert(new Model { Id = 1, Value1 = "1", Value2 = "2", Value3 = "23" }).Execute();
            table.Insert(new Model { Id = 1, Value1 = "1", Value2 = "22", Value3 = "23" }).Execute();
            table.Insert(new Model { Id = 1, Value1 = "21", Value2 = "22", Value3 = "23" }).Execute();
            table.Insert(new Model { Id = 1, Value1 = "31", Value2 = "32", Value3 = "33" }).Execute();
            table.Insert(new Model { Id = 1, Value1 = "41", Value2 = "42", Value3 = "43" }).Execute();
            table.Insert(new Model { Id = 2, Value1 = "221", Value2 = "222", Value3 = "223" }).Execute();

            var results1 = Find(table, new List<Expression<Func<Model, bool>>>
            {
                m => m.Id == 1,
                m => m.Value1 == "1",
                m => m.Value2 == "2",
            }).GetAwaiter().GetResult();

            PrintRowsResult(results1, "Id == 1 && Value1 == 1 && Value2 == 2");
        }

        public static void PrintRowsResult(IEnumerable<Model> results, string query)
        {
            Console.WriteLine();
            Console.WriteLine(query);
            Console.WriteLine();
            try
            {
                Console.WriteLine();

                foreach (var row in results)
                {
                    Console.WriteLine("Id: " + row.Id);
                    Console.WriteLine("Value1: " + row.Value1);
                    Console.WriteLine("Value2: " + row.Value2);
                    Console.WriteLine("Value3: " + row.Value3);
                    Console.WriteLine();
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine();
                Console.WriteLine("####### ERROR: " + ex.Message);
                Console.WriteLine();
            }
        }

        public static async Task<IEnumerable<Model>> Find(Table<Model> table, List<Expression<Func<Model, bool>>> predicates)
        {
            CqlQuery<Model> query = table;
            if (predicates != null)
            {
                query = predicates.Aggregate(query, (@event, condition) => @event.Where(condition));
            }
            Console.WriteLine(query.ToString()); // just for debug purposes
            return await query.ExecuteAsync();
        }

        [Cassandra.Mapping.Attributes.Table(Keyspace = "ks1")]
        public class Model
        {
            [Cassandra.Mapping.Attributes.PartitionKey]
            public int Id { get; set; }

            [Cassandra.Mapping.Attributes.ClusteringKey(0)]
            public string Value1 { get; set; }

            [Cassandra.Mapping.Attributes.ClusteringKey(1)]
            public string Value2 { get; set; }

            [Cassandra.Mapping.Attributes.ClusteringKey(2)]
            public string Value3 { get; set; }
        }
    }
}