C# WebAPI Thread Aborting when Sharing Data Access Logic

72 Views Asked by At

I'm getting the following error on my C# Web API: "Exception thrown: 'System.Threading.ThreadAbortException' in System.Data.dll Thread was being aborted". I have a long running process on one thread using my data access logic class to get and update records being process. Meanwhile a user submits another group to process which has need of the same data access logic class, thus resulting in the error. Here is a rough sketch of what I'm doing.

WebAPI Class:

public IHttpActionResult OkToProcess(string groupNameToProcess)
{
    var logic = GetLogic();
    //Gets All Unprocessed Records and Adds them to Blocking Queue
    Task.Factory.StartNew(() => dataAccessLogic.LoadAndProcess(groupNameToProcess);
}

public IHttpActionResult AddToProcess(int recordIdToProcess)
{
    StaticProcessingFactory.AddToQueue(recordIdToProcess);
}

StaticProcessingFactory

internal static ConcurrentDictionary<ApplicationEnvironment, Logic> correctors = new ConcurrentDictionary<ApplicationEnvironment, Logic>();
        internal static BlockingCollection<CorrectionMessage> MessageQueue = new BlockingCollection<Message>(2000);

public void StartService(){
   Task.Factory.StartNew(() => LoadService());
}

public void LoadService(){
    var logic = GetLogic();
    if(isFirstGroupOkToProcessAsPerTextFileLog())
         logic.LoadAndProcess("FirstGroup");
    if(isSeconddGroupOkToProcessAsPerTextFileLog())
         logic.LoadAndProcess("SecondGroup");
}

public static GetLogic(){
     var sqlConnectionFactory = Tools.GetSqlConnectionFactory();
     string environment = ConfigurationManager.AppSettings["DefaultApplicationEnvironment"];
     ApplicationEnvironment applicationEnvironment = 
         ApplicationEnvironmentExtensions.ToApplicationEnvironment(environment);
     return correctors.GetOrAdd(applicationEnvironment, new Logic(sqlConnectionFactory ));
}

        public static void AddToQueue(Message message, bool completeAdding = true)
        {
            if (MessageQueue.IsAddingCompleted)
                MessageQueue = new BlockingCollection<Message>();

            if (completeAdding && message.ProcessImmediately)
                StartQueue(message);
            else
                MessageQueue.Add(message);
        }

     public static void StartQueue(Message message = null)
     {
          if (message != null)
            { 
                if(!string.IsNullOrEmpty(message.ID))
                    MessageQueue.Add(message);
                Logic logic = GetLogic(message.Environment);

                try
                {
                    var messages = MessageQueue.TakeWhile(x => logic.IsPartOfGroup(x.GroupName, message.GroupName));
                    if (messages.Count() > 0)
                        MessageQueue.CompleteAdding();
                    int i = 0;
                    foreach (var msg in messages)
                    {
                        i++;
                        Process(msg);
                    }
                }
                catch (InvalidOperationException) { MessageQueue.CompleteAdding(); }

            }
     }

     public static void Process(Message message)
     {
         Var logic = GetLogic(message.Environment);
         var record = logic.GetRecord(message.ID);
         record.Status = Status.Processed;
         logic.Save(record);
     }

Logic Class

private readonly DataAccess DataAccess;

public Logic(SqlConnectionFactory factory)
{
     DataAccess = new DataAcess(factory);
}

public void LoadAndProcess(string groupName)
{
     var groups = DataAccess.GetGroups();
     var records = DataAccess.GetRecordsReadyToProcess(groups);
     for(int i = 0; i < records.Count; i++)
            {
                Message message = new Message();
                message.Enviornment = environment.ToString();
                message.ID = records[i].ID;
                message.User = user;
                message.Group = groupName;
                message.ProcessImmediately = true;

                StaticProcessingFactory.AddToQueue(message, i + 1 == records.Count);
            }
}

Any ideas how I might ensure that all traffic from all threads have access to the Data Access Logic without threads being systematically aborted?

0

There are 0 best solutions below