source: branches/prototype-v0/zoo-project/zoo-kernel/service_callback.c @ 899

Last change on this file since 899 was 899, checked in by djay, 5 years ago

Fix header files location

  • Property svn:keywords set to Id
File size: 25.4 KB
Line 
1/*
2 * Author : Gérald FENOY
3 *
4 *  Copyright 2017-2019 GeoLabs SARL. All rights reserved.
5 *
6 * This work was supported by public funds received in the framework of GEOSUD,
7 * a project (ANR-10-EQPX-20) of the program "Investissements d'Avenir" managed
8 * by the French National Research Agency
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 */
28
29#include "service_callback.h"
30#include "service_json.h"
31#include "service_internal_ms.h"
32#include "sqlapi.h"
33#include <pthread.h>
34#include <libxml/tree.h>
35#include <libxml/parser.h>
36#include <libxml/xpath.h>
37#include <libxml/xpathInternals.h>
38
39#include <libxslt/xslt.h>
40#include <libxslt/xsltInternals.h>
41#include <libxslt/transform.h>
42#include <libxslt/xsltutils.h>
43
44#ifdef __cplusplus
45extern "C" {
46#endif
47
48  /**
49   * Parameter definition to be used for sending parameters to a thread.
50   */
51  typedef struct {
52    maps *conf;      //!< the main configuration file
53    map *url;        //!< the callback url maps
54    json_object *res;//!< the JSON object to post
55    int step;        //!< the current step [0,6]
56    int state;       //!< the current state [0,1]
57  } local_params;
58
59  /**
60   * Number of threads
61   */
62  int nbThreads=0;
63  /**
64   * Current step
65   */
66  int cStep=0;
67  /**
68   * Is there any ongoing HTTP request
69   */
70  int isOngoing=0;
71  /**
72   * Threads array
73   */
74  pthread_t* myThreads=NULL;
75  /**
76   * Steps array
77   */
78  bool steps[7][2]={
79    {false,false},
80    {false,false},
81    {false,false},
82    {false,false},
83    {false,false},
84    {false,false},
85    {false,false}
86  };
87  /**
88   * Arguments array to give to the _invokeCallback thread's function
89   */
90  local_params** local_arguments;
91 
92  /**
93   * Check if a service name is prohibited, meaning that the Kernel doesn't have
94   * to invoke the callback for this specific service.
95   *
96   * @param conf the main configuration file maps
97   * @param serviceName the serviceName
98   * @return a bool true if the service is prohibited, false in other case
99   */
100  bool isProhibited(maps* conf,const char* serviceName){
101    map* plist=getMapFromMaps(conf,"callback","prohibited");
102    if(plist!=NULL){
103      char *tmp=plist->value;
104      char *tmpS=strtok(tmp,",");
105      while(tmpS!=NULL){
106        if(strcmp(serviceName,tmpS)==0)
107          return true;
108        tmpS=strtok(NULL,",");
109      }
110    }
111    return false;
112  }
113
114  /**
115   * Practically invoke the callback, meaning sending the HTTP POST request.
116   *
117   * @param args local_params containing all the variables required
118   */
119  void* _invokeCallback(void* args){
120    local_params* arg=(local_params*)args;
121    HINTERNET hInternet,res1;
122    const struct tm *tm;
123    size_t len;
124    time_t now;
125    char *tmp1;
126    map *tmpStatus;
127    maps* tmpConf=createMaps("main");
128    tmpConf->content=createMap("memory","load");
129    hInternet=InternetOpen("ZooWPSClient\0",
130                           INTERNET_OPEN_TYPE_PRECONFIG,
131                           NULL,NULL, 0);
132    if(!CHECK_INET_HANDLE(hInternet)){
133      InternetCloseHandle (&hInternet);
134      return NULL;
135    }
136    char *URL=(char*)malloc((strlen(arg->url->value)+5)*sizeof(char));
137    sprintf(URL,"%s%d_%d/",arg->url->value,arg->step,arg->state);
138    const char* jsonStr=json_object_to_json_string_ext(arg->res,JSON_C_TO_STRING_PLAIN);
139    hInternet.waitingRequests[0] = zStrdup(URL);
140    free(URL);
141#ifdef CALLBACK_DEBUG
142    now = time ( NULL );
143    tm = localtime ( &now );
144    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
145    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
146    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS cStep %d %d\n",pthread_self(),__FILE__,__LINE__,cStep,isOngoing);
147    fprintf(stderr," * JSON: [%s] \n",jsonStr);
148    fprintf(stderr," * URL: %s/ \n\n",hInternet.waitingRequests[0]);
149    fprintf(stderr," * DATE: %s/ \n\n",tmp1);
150    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS\n",pthread_self(),__FILE__,__LINE__);
151    free(tmp1);
152#endif
153    while( (arg->step!=7 || isOngoing>0) &&
154           ( cStep!=arg->step || (arg->state!=0 && steps[arg->step][0]==false) )
155           ){
156      zSleep(100);
157    }
158    isOngoing=1;
159#ifdef CALLBACK_DEBUG
160    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START\n\n",pthread_self(),__FILE__,__LINE__);
161    int i=0;
162    for(i=0;i<7;i++){
163      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
164    }
165#endif
166   
167    now = time ( NULL );
168    tm = localtime ( &now );
169   
170    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
171    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
172
173#ifdef CALLBACK_DEBUG   
174    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START (%s)\n",pthread_self(),__FILE__,__LINE__,tmp1);
175    fflush(stderr);
176#endif   
177    free(tmp1);
178    res1 = InternetOpenUrl (&hInternet,
179                            hInternet.waitingRequests[0], 
180                            (char*)jsonStr, strlen(jsonStr),
181                            INTERNET_FLAG_NO_CACHE_WRITE,
182                            0,tmpConf);
183    AddHeaderEntries(&hInternet,arg->conf);
184    //curl_easy_setopt(hInternet.ihandle[hInternet.nb].handle, CURLOPT_VERBOSE, 1);x
185    processDownloads(&hInternet);
186    freeMaps(&tmpConf);
187    free(tmpConf);
188    now = time ( NULL );
189    tm = localtime ( &now );
190    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
191    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
192   
193#ifdef CALLBACK_DEBUG   
194    fprintf(stderr,"************************* From thread %d %s %d: REQUEST END (%s)\n\n",pthread_self(),__FILE__,__LINE__,tmp1);
195#endif
196    free(tmp1);
197    char *tmp = (char *) malloc ((hInternet.ihandle[0].nDataLen + 1)
198                                 * sizeof (char));
199    if (tmp == NULL)
200      {
201        setMapInMaps(arg->conf,"lenv","message",_("Unable to allocate memory"));
202        setMapInMaps(arg->conf,"lenv","code","InternalError");
203        return NULL;
204      }
205    size_t bRead;
206    InternetReadFile (hInternet.ihandle[0],
207                      (LPVOID) tmp,
208                      hInternet.
209                      ihandle[0].nDataLen,
210                      &bRead);
211    tmp[hInternet.ihandle[0].nDataLen] = 0;
212    json_object_put(arg->res);
213    InternetCloseHandle(&hInternet);
214    isOngoing=0;
215    if(cStep==0 || cStep==6 || arg->state==1)
216      cStep=arg->step+1;
217#ifdef CALLBACK_DEBUG
218    now = time ( NULL );
219    tm = localtime ( &now );
220    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
221    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
222    fprintf(stderr,"************************* From thread %d %s %d: RESPONSE CONTENT (%s)\n",pthread_self(),__FILE__,__LINE__,tmp1);
223    for(i=0;i<7;i++){
224      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
225    }
226    fprintf(stderr,"Result: \n%s\n\n",tmp);
227    fprintf(stderr,"************************* From thread %d %s %d\n\n",pthread_self(),__FILE__,__LINE__);
228    fflush(stderr);
229    free(tmp1);
230#endif
231    steps[arg->step][arg->state]=true;
232    free(tmp);
233#ifdef CALLBACK_DEBUG
234    fprintf(stderr,"************************* From thread %d %s %d: EXIT\n\n",pthread_self(),__FILE__,__LINE__);
235    fflush(stderr);
236#endif
237    pthread_exit(NULL);
238  }
239 
240  /**
241   * Invoke the callback in case there is a [callback] section containing a url parameter
242   *
243   * @param m the maps containing the main configuration file definitions
244   * @param inputs the inputs defined in the request (can be null if not yet initialized)
245   * @param inputs the outputs provided in the request (can be null if not yet initialized)
246   * @param step the step number, steps are defined as:
247   *  0: Analyze creation
248   *  1: Fetching Data Inputs
249   *  2: Uploading data inputs to cluster
250   *  3: Creating Job Script
251   *  4: Submitting Job to Cluster
252   *  5: Downloading processed output from cluster
253   *  6: Finalize
254   *  7: Dismiss or Error
255   * @param state 0 in case the step starts, 1 when it ends
256   * @return bool true in case of success, false in other cases
257   */
258  bool invokeCallback(maps* conf,maps* inputs,maps* outputs,int step,int state){
259    map* url=getMapFromMaps(conf,"callback","url");
260    if(url==NULL)
261      return false;
262     
263    maps* lenv=getMaps(conf,"lenv");
264    map* sname=getMap(lenv->content,"identifier");
265    if(sname!=NULL && isProhibited(conf,sname->value))
266      return false;
267     
268    json_object *res=json_object_new_object();
269
270    map* sid=getMapFromMaps(conf,"lenv","usid");
271    if(sid!=NULL){
272      json_object *jsStr=json_object_new_string(sid->value);
273      json_object_object_add(res,"jobid",jsStr);
274    }
275    const struct tm *tm;
276    size_t len;
277    time_t now;
278    char *tmp1;
279    map *tmpStatus;
280 
281    now = time ( NULL );
282    tm = localtime ( &now );
283
284    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
285    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%H:%M:%SZ", tm );
286    json_object *jsStr0=json_object_new_string(tmp1);
287    json_object_object_add(res,"datetime",jsStr0);
288    free(tmp1);
289   
290    switch(step){
291    case 0: {
292      // Create a new analyze
293      maps* lenv=getMaps(conf,"lenv");
294      sid=getMapFromMaps(conf,"renv","xrequest");
295      if(sid!=NULL){
296        json_object *jsStr=json_object_new_string(sid->value);
297        json_object_object_add(res,"request_execute_content",jsStr);
298      }
299      sid=getMapFromMaps(conf,"lenv","identifier");
300      if(sid!=NULL){
301        json_object *jsStr=json_object_new_string(sid->value);
302        json_object_object_add(res,"process_identifier",jsStr);
303      }
304      // Save the Execute request on disk
305      map* tmpPath=getMapFromMaps(conf,"main","tmpPath");
306      map* req=getMapFromMaps(conf,"renv","xrequest");
307      sid=getMapFromMaps(conf,"lenv","usid");
308      char* executePath=(char*)malloc((strlen(tmpPath->value)+strlen(sid->value)+14)*sizeof(char));
309      sprintf(executePath,"%s/execute_%s.xml",tmpPath->value,sid->value);
310      FILE* saveExecute=fopen(executePath,"wb");
311      fwrite(req->value,1,strlen(req->value)*sizeof(char),saveExecute);
312      fflush(saveExecute);
313      fclose(saveExecute);
314      setMapInMaps(conf,"lenv","execute_file",executePath);
315      free(executePath);
316      break;
317    }
318     
319    case 1: {
320      // Update the execute request stored on disk at step 0,0 to modify the references used.
321      if(state==1){
322        maps* curs=inputs;
323        xmlInitParser();
324        map* xmlPath=getMapFromMaps(conf,"lenv","execute_file");
325        while(curs!=NULL){
326          map* length=getMap(curs->content,"length");
327          map* useMS=getMap(curs->content,"useMapserver");
328          if(length==NULL){
329            addToMap(curs->content,"length","1");
330            length=getMap(curs->content,"length");
331          }
332          int len=atoi(length->value);
333          for(int ii=0;ii<len;ii++){
334            if(getMapArray(curs->content,"byValue",ii)!=NULL && getMapArray(curs->content,"mimeType",ii)!=NULL && useMS!=NULL && strncasecmp(useMS->value,"true",4)==0){
335              map* tmpMap=getMapArray(curs->content,"value",ii);
336              char tmpStr[100];
337              sprintf(tmpStr,"%d",strlen(tmpMap->value));
338              setMapArray(curs->content,"size",ii,tmpStr);
339              tmpMap=getMapArray(curs->content,"mimeType",ii);
340              setMapArray(curs->content,"fmimeType",ii,tmpMap->value);
341              tmpMap=getMapArray(curs->content,"cache_file",ii);
342              setMapArray(curs->content,"generated_file",ii,tmpMap->value);
343              setMapArray(curs->content,"storage",ii,tmpMap->value);
344              setReferenceUrl(conf,curs);
345              addIntToMap(curs->content,"published_id",ii+1);
346              const char *params[7];
347              int xmlLoadExtDtdDefaultValue;
348              int hasFile=-1;
349              map* xslPath=getMapFromMaps(conf,"callback","template");
350              map* filePath=getMapArray(curs->content,"ref_wfs_link",ii);
351              if(filePath==NULL)
352                filePath=getMap(curs->content,"ref_wcs_link");
353              char* inputName=curs->name;
354              if(xslPath==NULL || xmlPath==NULL || filePath==NULL)
355                break;
356              char *tmpParam=(char*)malloc((strlen(curs->name)+11)*sizeof(char));
357              char *tmpParam1=(char*)malloc((strlen(filePath->value)+11)*sizeof(char));
358              char tmpParam2[16];
359              sprintf(tmpParam2,"string(\"%d\")",ii);
360              setMapArray(curs->content,"href",ii,filePath->value);
361              setMapArray(curs->content,"xlink:href",ii,filePath->value);
362              tmpMap=getMapArray(curs->content,"cache_url",ii);
363              if(tmpMap!=NULL)
364                setMapArray(curs->content,"xlink:href",ii,tmpMap->value);
365              else
366                setMapArray(curs->content,"xlink:href",ii,filePath->value);
367              sprintf(tmpParam,"string(\"%s\")",curs->name);
368              sprintf(tmpParam1,"string(\"%s\")",filePath->value);
369              sprintf(tmpParam2,"string(\"%d\")",ii);
370              params[0]="attr";
371              params[1]=tmpParam;
372              params[2]="value";
373              params[3]=tmpParam1;//filePath->value;
374              params[4]="cnt";
375              params[5]=tmpParam2;
376              params[6]=NULL;
377              fprintf(stderr, "## XSLT PARAMETERS ATTR: %s VALUE: %s INDEX: %s\n",
378                      tmpParam,tmpParam1,tmpParam2);
379              fflush(stderr);
380              xmlSubstituteEntitiesDefault(1);
381              xmlLoadExtDtdDefaultValue = 0;
382              xsltStylesheetPtr cur = NULL;
383              xmlDocPtr doc, res;
384              cur = xsltParseStylesheetFile(BAD_CAST xslPath->value);
385              doc = xmlParseFile(xmlPath->value);
386              fflush(stderr);
387              res = xsltApplyStylesheet(cur, doc, params);
388              xmlChar *xmlbuff;
389              int buffersize;
390              xmlDocDumpFormatMemory(res, &xmlbuff, &buffersize, 1);
391              // Store the executeRequest in file again
392              free(tmpParam);
393              free(tmpParam1);
394              fprintf(stderr," # Request / XSLT: %s\n",xmlbuff);
395              fflush(stderr);
396              FILE* saveExecute=fopen(xmlPath->value,"wb");
397              if(saveExecute!=NULL){
398                fwrite(xmlbuff,1,buffersize,saveExecute);
399                fflush(saveExecute);
400                fclose(saveExecute);
401              }
402              xmlFree(xmlbuff);
403              xmlFreeDoc(doc);
404              xsltFreeStylesheet(cur);
405            }
406          }
407          addIntToMap(curs->content,"published_id",0);
408          curs=curs->next;
409        }
410        xmlCleanupParser();
411        FILE* f0=fopen(xmlPath->value,"rb");
412        if(f0!=NULL){
413          long flen;
414          char *fcontent;
415          fseek (f0, 0, SEEK_END);
416          flen = ftell (f0);
417          fseek (f0, 0, SEEK_SET);
418          fcontent = (char *) malloc ((flen + 1) * sizeof (char));
419          fread(fcontent,flen,1,f0);
420          fcontent[flen]=0;
421          fclose(f0);
422          map *schema=getMapFromMaps(conf,"database","schema");
423          map* sid=getMapFromMaps(conf,"lenv","usid");
424          char *req=(char*)malloc((flen+strlen(schema->value)+strlen(sid->value)+66)*sizeof(char));
425          sprintf(req,"UPDATE %s.services set request_execute_content=$$%s$$ WHERE uuid=$$%s$$",schema->value,fcontent,sid->value);
426#ifdef RELY_ON_DB
427          execSql(conf,1,req);
428#endif
429          free(fcontent);
430          free(req);
431        }
432      }
433
434      // Fetching data inputs
435      maps* curs=inputs;
436      dumpMaps(curs);
437      char *keys[11][2]={
438        {
439          "xlink:href",
440          "ref_download_link"
441        },
442        {
443          "cache_file",
444          "cachefile"
445        },
446        {
447          "fmimeType",
448          "mimetype"
449        },
450        {
451          "size",
452          "size"
453        },
454        {
455          "ref_wms_link",
456          "ref_wms_link"
457        },
458        {
459          "ref_wfs_link",
460          "ref_wfs_link"
461        },
462        {
463          "ref_wcs_link",
464          "ref_wcs_link"
465        },
466        {
467          "ref_wcs_link",
468          "ref_wcs_link"
469        },
470        {
471          "ref_wcs_preview_link",
472          "ref_wcs_preview_link"
473        },
474        {
475          "geodatatype",
476          "datatype"
477        },
478        {
479          "wgs84_extent",
480          "boundingbox"
481        }       
482      };
483      json_object *res1=json_object_new_object();
484      while(curs!=NULL){
485        if(getMap(curs->content,"length")==NULL){
486          addToMap(curs->content,"length","1");
487        }
488        map* length=getMap(curs->content,"length");
489        int len=atoi(length->value);
490        json_object *res3;
491        int hasRef=-1;
492        for(int ii=0;ii<len;ii++){
493          map* tmpMap=getMapArray(curs->content,"cache_file",ii);
494          sid=getMapArray(curs->content,"ref_wms_link",ii);
495          json_object *res2=json_object_new_object();
496          if(tmpMap!=NULL){
497            if(sid==NULL){
498              setMapArray(curs->content,"generated_file",ii,tmpMap->value);
499              setMapArray(curs->content,"storage",ii,tmpMap->value);
500            }
501            struct stat buf;
502            char timeStr[ 100 ] = "";
503            if (stat(tmpMap->value, &buf)==0){
504              strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
505              json_object *jsStr=json_object_new_string(timeStr);
506              json_object_object_add(res2,"creation_date",jsStr);
507            }
508            tmpMap=getMapArray(curs->content,"fmimeType",ii);
509            if(tmpMap!=NULL){
510              setMapArray(curs->content,"mimeType",ii,tmpMap->value);
511            }
512            setReferenceUrl(conf,curs);
513          }else{         
514          }
515          addIntToMap(curs->content,"published_id",ii+1);
516          int i=0;
517          for(;i<11;i++){
518            sid=getMapArray(curs->content,keys[i][0],ii);
519            if(sid!=NULL){
520              json_object *jsStr=json_object_new_string(sid->value);
521              json_object_object_add(res2,keys[i][1],jsStr);
522              if(i==0){
523                hasRef=1;
524                json_object *jsStr1=json_object_new_string(getProvenance(conf,sid->value));
525                json_object_object_add(res2,"dataOrigin",jsStr1);
526              }
527            }
528          }
529          if(len>1){
530            if(ii==0)
531              res3=json_object_new_array();
532            json_object_array_add(res3,res2);
533          }else
534            res3=res2;
535        }
536        if(hasRef<0)
537          json_object_put(res3);
538        else{
539          json_object_object_add(res1,curs->name,json_object_get(res3));
540          json_object_put(res3);
541        }
542        addIntToMap(curs->content,"published_id",0);
543        curs=curs->next;
544      }
545      json_object_object_add(res,"inputs",res1);
546      break;
547    }
548     
549    case 2: {
550      // Uploading data input to cluster
551      maps* in=getMaps(conf,"uploadQueue");
552      if(in!=NULL){
553        maps* curs=in;
554        map* length=getMapFromMaps(in,"uploadQueue","length");
555        if(length!=NULL){
556          json_object *res1=json_object_new_object();
557          int limit=atoi(length->value);
558          int i=0;
559          maps* uploadQueue=getMaps(in,"uploadQueue");
560          map* tmp=uploadQueue->content;
561          for(;i<limit;i++){
562            map* tmp0=getMapArray(tmp,"input",i);
563            map* tmp1=getMapArray(tmp,"localPath",i);
564            map* tmp2=getMapArray(tmp,"targetPath",i);
565            if(tmp0!=NULL && tmp1!=NULL && tmp2!=NULL){
566              json_object *res2=json_object_new_object();
567              json_object *jsStr=json_object_new_string(tmp1->value);
568              json_object_object_add(res2,"local_path",jsStr);
569              jsStr=json_object_new_string(tmp2->value);
570              json_object_object_add(res2,"target_path",jsStr);
571              json_object *res4=json_object_object_get(res1,tmp0->value);
572              if(json_object_is_type(res4,json_type_null)){
573                json_object_object_add(res1,tmp0->value,res2);
574              }else{
575                if(json_object_is_type(res4,json_type_object) && !json_object_is_type(res4, json_type_array)){
576                  json_object *res3=json_object_new_array();
577                  json_object_array_add(res3,json_object_get(res4));
578                  json_object_array_add(res3,res2);
579                  json_object_object_del(res1,tmp0->value);
580                  json_object_object_add(res1,tmp0->value,res3);
581                }else
582                  json_object_array_add(res4,res2);
583              }
584            }
585          }
586          json_object_object_add(res,"inputs",res1);
587        }
588      }
589      break;
590    }
591     
592    case 3: {
593      // Generating job script
594      sid=getMapFromMaps(conf,"lenv","local_script");
595      if(sid!=NULL){
596        json_object *jsStr=json_object_new_string(sid->value);
597        json_object_object_add(res,"script",jsStr);
598      }
599      break;
600    }
601     
602    case 4: {
603      // Submitting job to cluster
604      sid=getMapFromMaps(conf,"lenv","remote_script");
605      if(sid!=NULL){
606        json_object *jsStr=json_object_new_string(sid->value);
607        json_object_object_add(res,"script",jsStr);
608      }
609      break;
610    }
611     
612    case 5: {
613      // Downloading process outputs from cluster
614      maps* curs=outputs;
615      dumpMaps(curs);
616      char *keys[10][2]={
617        {
618          "Reference",
619          "ref"
620        },
621        {
622          "generated_file",
623          "cachefile"
624        },
625        {
626          "mimeType",
627          "mimetype"
628        },
629        {
630          "size",
631          "size"
632        },
633        {
634          "geodatatype",
635          "datatype"
636        },
637        {
638          "wgs84_extent",
639          "boundingbox"
640        },
641        {
642          "ref_wms_link",
643          "ref_wms_link"
644        },
645        {
646          "ref_wcs_link",
647          "ref_wcs_link"
648        },
649        {
650          "ref_wcs_preview_link",
651          "ref_wcs_preview_link"
652        },
653        {
654          "ref_wfs_link",
655          "ref_wfs_link"
656        }       
657      };
658      char* specifics[5][2]={
659        {
660          "download_link",
661          "ref_download_link"
662        },
663        {
664          "wms_link",
665          "ref_wms_link"
666        },
667        {
668          "wfs_link",
669          "ref_wfs_link"
670        },
671        {
672          "wcs_link",
673          "ref_wcs_link"
674        },
675        {
676          "wcs_link",
677          "ref_wcs_preview_link"
678        }
679      };
680      json_object *res1=json_object_new_object();
681      while(curs!=NULL){       
682        json_object *res2=json_object_new_object();
683        int i=0;
684        int hasRef=-1;
685        for(;i<10;i++){
686          sid=getMap(curs->content,keys[i][0]);
687          if(sid!=NULL){
688            json_object *jsStr=json_object_new_string(sid->value);
689            json_object_object_add(res2,keys[i][1],jsStr);
690            if(i==0){
691              hasRef=1;
692              json_object_object_add(res2,"ref_download_link",jsStr);
693            }
694            if(i==1){
695              struct stat buf;
696              char timeStr[ 100 ] = "";
697              if (stat(sid->value, &buf)==0){
698                strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
699                json_object *jsStr=json_object_new_string(timeStr);
700                json_object_object_add(res2,"creation_date",jsStr);
701              }
702            }
703          }
704        }
705        if(hasRef>0)
706          json_object_object_add(res1,curs->name,res2);
707        else{
708          maps* curs0=curs->child;
709          int i=0;
710          int bypass=-1;
711          for(i=0;i<5;i++){
712            maps* specificMaps;
713            if((specificMaps=getMaps(curs0,specifics[i][0]))!=NULL){
714              int hasRef0=-1;
715              int i0=0;
716              for(;i0<6;i0++){
717                sid=getMap(specificMaps->content,keys[i0][0]);
718                if(sid!=NULL){
719                  json_object *jsStr=json_object_new_string(sid->value);
720                  if(i0==0){
721                    json_object_object_add(res2,specifics[i][1],jsStr);
722                  }
723                  else
724                    json_object_object_add(res2,keys[i0][1],jsStr);
725                  hasRef0=1;
726                  bypass=1;
727                  if(i==1){
728                    struct stat buf;
729                    char timeStr[ 100 ] = "";
730                    if (stat(sid->value, &buf)==0){
731                      strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
732                      json_object *jsStr=json_object_new_string(timeStr);
733                      json_object_object_add(res2,"creation_date",jsStr);
734                    }
735                  }
736                }
737              }       
738            }
739          }
740          if(bypass<0)
741            while(curs0!=NULL){
742              json_object *res3=json_object_new_object();
743              int i0=0;
744              int hasRef0=-1;
745              for(;i0<10;i0++){
746                sid=getMap(curs0->content,keys[i0][0]);
747                if(sid!=NULL){
748                  json_object *jsStr=json_object_new_string(sid->value);
749                  json_object_object_add(res3,keys[i0][1],jsStr);
750                  hasRef0=1;
751                }
752              }
753              if(hasRef0<0)
754                json_object_put(res3);
755              else
756                json_object_object_add(res2,curs0->name,res3);
757              curs0=curs0->next;
758            }
759          json_object_object_add(res1,curs->name,res2);
760        }
761        curs=curs->next;
762      }
763      json_object_object_add(res,"outputs",res1);
764      break;
765    }
766     
767    case 6: {
768      // Finalize HPC
769      char *keys[6][2]={
770        {
771          //"SubmitTime",
772          "Submit",
773          "hpc_submission_date"
774        },
775        {
776          "JobId",
777          "hpc_job_identifier"
778        },
779        {
780          "JobName",
781          "hpc_job_name"
782        },
783        {
784          //"StartTime",
785          "Start",
786          "hpc_start_date"
787        },
788        {
789          //"EndTime",
790          "End",
791          "hpc_end_date"
792        },
793        {
794          //"JobState",
795          "State",
796          "hpc_status"
797        }       
798      };
799      int i=0;
800      if(getMaps(conf,"henv")!=NULL){
801        for(i=0;i<6;i++){
802          sid=getMapFromMaps(conf,"henv",keys[i][0]);
803          if(sid!=NULL){
804            json_object *jsStr=json_object_new_string(sid->value);
805            json_object_object_add(res,keys[i][1],jsStr);
806          }
807        }
808      }
809      if((sid=getMapFromMaps(conf,"henv","billing_nb_cpu"))!=NULL){
810        json_object *jsStr=json_object_new_string(sid->value);
811        json_object_object_add(res,"hpc_cpu_usage",jsStr);
812      }else{
813        json_object *jsStr=json_object_new_string("1");
814        json_object_object_add(res,"hpc_cpu_usage",jsStr);
815      }
816      json_object *jsStr=json_object_new_string("succeeded");
817      json_object_object_add(res,"wps_status",jsStr);
818      break;
819    }
820     
821    case 7: {
822      // Error or Dismiss
823      sid=getMapFromMaps(conf,"lenv","message");
824      if(sid!=NULL){
825        json_object *jsStr=json_object_new_string(sid->value);
826        json_object_object_add(res,"message",jsStr);
827      }
828      json_object *jsStr;
829      if(state==1)
830        jsStr=json_object_new_string("dismissed");
831      else
832        jsStr=json_object_new_string("failed");
833      json_object_object_add(res,"wps_status",jsStr);
834      break;
835    }
836    others: {
837        break;
838      }
839    }
840
841    if(local_arguments==NULL)
842      local_arguments=(local_params**)malloc(sizeof(local_params*));
843    else
844      local_arguments=(local_params**)realloc(local_arguments,(nbThreads+1)*sizeof(local_params*));
845    local_arguments[nbThreads]=(local_params*)malloc(MAPS_SIZE+MAP_SIZE+sizeof(json_object*)+(2*sizeof(int))); 
846    local_arguments[nbThreads]->conf=conf;
847    local_arguments[nbThreads]->url=url;
848    local_arguments[nbThreads]->res=res;
849    local_arguments[nbThreads]->step=step;
850    local_arguments[nbThreads]->state=state;
851    if(myThreads==NULL)
852      myThreads=(pthread_t*)malloc((nbThreads+1)*sizeof(pthread_t));
853    else
854      myThreads=(pthread_t*)realloc(myThreads,(nbThreads+1)*sizeof(pthread_t));
855    if(pthread_create(&myThreads[nbThreads], NULL, _invokeCallback, (void*)local_arguments[nbThreads])==-1){
856      setMapInMaps(conf,"lenv","message",_("Unable to create a new thread"));
857      return false;
858    }
859    nbThreads++;
860    return true;
861  }
862
863  /**
864   * Wait for the threads to end then, clean used memory.
865   */
866  void cleanupCallbackThreads(){
867    int i=0;
868    for(i=0;i<nbThreads;i++){
869      pthread_join(myThreads[i],NULL);
870      free(local_arguments[i]);
871    }
872    free(local_arguments);
873    free(myThreads);
874  }
875
876#ifdef __cplusplus
877}
878#endif
Note: See TracBrowser for help on using the repository browser.

Search

ZOO Sponsors

http://www.zoo-project.org/trac/chrome/site/img/geolabs-logo.pnghttp://www.zoo-project.org/trac/chrome/site/img/neogeo-logo.png http://www.zoo-project.org/trac/chrome/site/img/apptech-logo.png http://www.zoo-project.org/trac/chrome/site/img/3liz-logo.png http://www.zoo-project.org/trac/chrome/site/img/gateway-logo.png

Become a sponsor !

Knowledge partners

http://www.zoo-project.org/trac/chrome/site/img/ocu-logo.png http://www.zoo-project.org/trac/chrome/site/img/gucas-logo.png http://www.zoo-project.org/trac/chrome/site/img/polimi-logo.png http://www.zoo-project.org/trac/chrome/site/img/fem-logo.png http://www.zoo-project.org/trac/chrome/site/img/supsi-logo.png http://www.zoo-project.org/trac/chrome/site/img/cumtb-logo.png

Become a knowledge partner

Related links

http://zoo-project.org/img/ogclogo.png http://zoo-project.org/img/osgeologo.png